Source code for hwtLib.amba.axis_comp.joinPrioritized

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import Or, If
from hwtLib.amba.axis_comp.base import AxiSCompBase
from hwtLib.handshaked.joinPrioritized import HsJoinPrioritized
from hwt.interfaces.utils import addClkRstn


[docs]class AxiSJoinPrioritized(AxiSCompBase, HsJoinPrioritized): """ Join input stream to single output stream inputs with lower number has higher priority :note: The frame from each interface is always taken as a whole. :see: :class:`hwtLib.handshaked.joinPrioritized.HsJoinPrioritized` .. hwt-autodoc:: """ def _declr(self) -> None: addClkRstn(self) HsJoinPrioritized._declr(self) def _impl(self) -> None: join = HsJoinPrioritized(self.INTF_CLS) join._updateParamsFrom(self) join.get_valid_signal = self.get_valid_signal join.get_ready_signal = self.get_ready_signal self.join = join # en override ens = [self._reg(f"en{i:d}", def_val=0) for i in range(len(self.dataIn))] all_0 = ~Or(*ens) for en_flag, din, c_din in zip(ens, self.dataIn, self.join.dataIn): en = (all_0 | en_flag) vld = self.get_valid_signal(c_din) vld(en & self.get_valid_signal(din)) rd = self.get_ready_signal(din) rd(en & self.get_ready_signal(c_din)) self.dataConnectionExpr(din, c_din) If(rd & vld, en_flag(~din.last) ) self.dataOut(join.dataOut)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = AxiSJoinPrioritized() print(to_rtl_str(u))