Source code for hwtLib.handshaked.joinPrioritized
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import And, Or, SwitchLogic
from hwt.hwIOs.hwIOArray import HwIOArray
from hwt.hwParam import HwParam
from hwt.pyUtils.typingFuture import override
from hwtLib.handshaked.compBase import HandshakedCompBase
[docs]
class HsJoinPrioritized(HandshakedCompBase):
"""
Join input stream to single output stream
inputs with lower number has higher priority
:note: combinational
.. hwt-autodoc:: _example_HsJoinPrioritized
"""
@override
def hwConfig(self):
self.INPUTS = HwParam(2)
super().hwConfig()
@override
def hwDeclr(self):
with self._hwParamsShared():
self.dataIn = HwIOArray(
self.hwIOCls() for _ in range(int(self.INPUTS))
)
self.dataOut = self.hwIOCls()._m()
[docs]
def dataConnectionExpr(self, dIn, dOut):
"""Create connection between input and output interface"""
data = self.get_data
dataConnectExpr = []
outDataSignals = list(data(dOut))
if dIn is None:
dIn = [None for _ in outDataSignals]
else:
dIn = data(dIn)
for _din, _dout in zip(dIn, outDataSignals):
dataConnectExpr.append(_dout(_din))
return dataConnectExpr
@override
def hwImpl(self):
rd = self.get_ready_signal
vld = self.get_valid_signal
dout = self.dataOut
vldSignals = [vld(d) for d in self.dataIn]
# data out mux
dataCases = []
for i, din in enumerate(self.dataIn):
allLowerPriorNotReady = map(lambda x: ~x, vldSignals[:i])
rd(din)(And(rd(dout), *allLowerPriorNotReady))
cond = vld(din)
dataConnectExpr = self.dataConnectionExpr(din, dout)
dataCases.append((cond, dataConnectExpr))
dataDefault = self.dataConnectionExpr(None, dout)
SwitchLogic(dataCases, dataDefault)
vld(dout)(Or(*vldSignals))
[docs]
def _example_HsJoinPrioritized():
from hwt.hwIOs.std import HwIODataRdVld
m = HsJoinPrioritized(HwIODataRdVld)
return m
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = _example_HsJoinPrioritized()
print(to_rtl_str(m))