Source code for hwtLib.examples.builders.hwException
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If
from hwt.interfaces.std import Handshaked
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.synthesizer.unit import Unit
from hwtLib.abstract.hwExceptionCtx import HwExceptionCtx, InHwError
from hwtLib.handshaked.streamNode import StreamNode
[docs]class ExampleHwException0(InHwError):
"""
An example of InHwError error.
This object behaves as a regular python exception and can be used in
:class:`hwtLib.abstract.hwExceptionCtx.HwExceptionCtx` functions
which are construction an exception raising/catching logic.
"""
pass
[docs]class ExampleHwException1(InHwError):
pass
[docs]class HwExceptionRaise(Unit):
"""
An example of :class:`hwtLib.abstract.hwExceptionCtx.HwExceptionCtx` usage.
(a handshaked wire which will stall until exception is handled, however only exception caused by value 1
has an interface for handlig and thus all other exceptions will cause stall until reset)
.. hwt-autodoc:
"""
def _declr(self):
# clock and reset is required because the status
# of the exception handling needs to be stored in register
addClkRstn(self)
self.dataIn = Handshaked()
self.dataOut = Handshaked()._m()
def _impl(self):
err0 = self._sig("exception_handling_in_progress0")
err0_detected = self._sig("exception_detected0")
err1 = self._sig("exception_handling_in_progress1")
err1_detected = self._sig("exception_detected1")
din = self.dataIn
errCtx = HwExceptionCtx(self)
If(din.vld,
If(din.data._eq(0),
# HwExceptionCtx.hw_raise creates an io and status flag for an exception instance
# the exception can have hardware parameters (first argument) which are translated
# to an interface as well and the python arguments (rest of the arguments)
# :note: exception raising involves only synchronization signal routing, it does not copy the
# arguments of the exception and thus they needs to stay constant duing exception handling
# :note: pending_flag and raising_flag can be used to get the signal which signalizes that the exception is pending
# and is not handled yet.
# :note: The raise of error does not affect the rest of the code, in order to do so we
# need to use raising_flag/pending_flag, in this case we used it to stall dataIn/dataOut
errCtx.hw_raise(
# first argument is passed to hardware interface responsible for
# exception handling, the rest of arguments are standard Python Exception args/kwargs
ExampleHwException0((din.data,), "Example raise 1"),
raising_flag=err0_detected,
pending_flag=err0,
)
).Elif(din.data._eq(1),
errCtx.hw_raise(
ExampleHwException1((), "Example raise 2"),
raising_flag=err1_detected,
pending_flag=err1
)
)
)
dout = self.dataOut
If(err0 | err0_detected | err1 | err1_detected,
dout.data(None)
).Else(
dout.data(din.data)
)
# dissable input if any error is beeing handled
# do not propagate values which are rising an error on output
StreamNode(
[din], [dout],
extraConds={
din: ~(err0 | err1), # stall while an error is beeing processed
dout: ~(err0 | err0_detected | err1 | err1_detected)}, # stall if errror appeared or error is beeing processed
skipWhen={
dout: err0_detected | err1_detected, # drop if error value is detected
}
).sync()
[docs]class HwExceptionCatch(Unit):
"""
An example of :class:`hwtLib.abstract.hwExceptionCtx.HwExceptionCtx` usage.
(a handshaked wire which will stall until reset if 0,1 or 2 appears in data,
only exception caused by value 1 can be handled)
.. hwt-autodoc:
"""
def _declr(self):
HwExceptionRaise._declr(self)
def _impl(self):
c = self.c = HwExceptionRaise()
err0 = self._sig("exception_handling_in_progress0")
err0_detected = self._sig("exception_detected0")
din = self.dataIn
# dissable input if error is beeing handled
If(err0 | err0_detected,
c.dataIn.data(None)
).Else(
c.dataIn.data(din.data)
)
# dissable input if error is beeing handled
# do not propagate input values which are rising an error on subcomponent c input
StreamNode(
[din], [c.dataIn],
extraConds={
din: ~err0, # stall while an error is beeing processed
c.dataIn: ~(err0 | err0_detected)}, # stall if errror appeared or error is beeing processed
skipWhen={
c.dataIn: err0_detected, # drop if error value is detected
}
).sync()
self.dataOut(c.dataOut)
errCtx = HwExceptionCtx(self)
If(din.vld,
If(din.data._eq(2),
errCtx.hw_raise(
ExampleHwException0((din.data,), "Example raise 1"),
raising_flag=err0_detected,
pending_flag=err0)
)
)
# HwExceptionCtx.hw_catch catches all exceptions of specified class from current scope
# and all children components if the exception was not catched yet
for e_intf in errCtx.hw_catch(ExampleHwException0):
# never handle and thus stall this component until hard reset
e_intf.rd(0)
# those exception handle interfaces which are left with undriven rd
# are considered to be unhadled and will be returned also in another catch
# of this exception
# HwExceptionCtx.propagate() propagates all uncatched exceptions from all children
# (unahdled exceptions from self are propagated automatically)
# :note: If there is some unhandled exception and the propagate is not called
# it will result in the driver error of the exception interface
errCtx.propagate()
propagateClkRstn(self)
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
u = HwExceptionCatch()
print(to_rtl_str(u))