Source code for hwtLib.handshaked.joinPrioritized
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import And, Or, SwitchLogic
from hwt.synthesizer.param import Param
from hwtLib.handshaked.compBase import HandshakedCompBase
[docs]class HsJoinPrioritized(HandshakedCompBase):
"""
Join input stream to single output stream
inputs with lower number has higher priority
combinational
"""
[docs] def _config(self):
self.INPUTS = Param(2)
super()._config()
[docs] def _declr(self):
with self._paramsShared():
self.dataIn = self.intfCls(asArraySize=self.INPUTS)
self.dataOut = self.intfCls()
[docs] def dataConnectionExpr(self, dIn, dOut):
"""Create connection between input and output interface"""
data = self.getData
dataConnectExpr = []
outDataSignals = list(data(dOut))
if dIn is None:
dIn = [None for _ in outDataSignals]
else:
dIn = data(dIn)
for _din, _dout in zip(dIn, outDataSignals):
dataConnectExpr.append(_dout(_din))
return dataConnectExpr
[docs] def _impl(self):
rd = self.getRd
vld = self.getVld
dout = self.dataOut
vldSignals = list(map(vld, self.dataIn))
# data out mux
dataCases = []
for i, din in enumerate(self.dataIn):
allLowerPriorNotReady = map(lambda x: ~x, vldSignals[:i])
rd(din)(And(rd(dout), *allLowerPriorNotReady))
cond = vld(din)
dataConnectExpr = self.dataConnectionExpr(din, dout)
dataCases.append((cond, dataConnectExpr))
dataDefault = self.dataConnectionExpr(None, dout)
SwitchLogic(dataCases, dataDefault)
vld(dout)(Or(*vldSignals))
if __name__ == "__main__":
from hwt.interfaces.std import Handshaked
from hwt.synthesizer.utils import toRtl
u = HsJoinPrioritized(Handshaked)
print(toRtl(u))