Source code for hwtLib.handshaked.joinPrioritized

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import And, Or, SwitchLogic
from hwt.synthesizer.param import Param
from hwtLib.handshaked.compBase import HandshakedCompBase


[docs]class HsJoinPrioritized(HandshakedCompBase): """ Join input stream to single output stream inputs with lower number has higher priority combinational """
[docs] def _config(self): self.INPUTS = Param(2) super()._config()
[docs] def _declr(self): with self._paramsShared(): self.dataIn = self.intfCls(asArraySize=self.INPUTS) self.dataOut = self.intfCls()
[docs] def dataConnectionExpr(self, dIn, dOut): """Create connection between input and output interface""" data = self.getData dataConnectExpr = [] outDataSignals = list(data(dOut)) if dIn is None: dIn = [None for _ in outDataSignals] else: dIn = data(dIn) for _din, _dout in zip(dIn, outDataSignals): dataConnectExpr.append(_dout(_din)) return dataConnectExpr
[docs] def _impl(self): rd = self.getRd vld = self.getVld dout = self.dataOut vldSignals = list(map(vld, self.dataIn)) # data out mux dataCases = [] for i, din in enumerate(self.dataIn): allLowerPriorNotReady = map(lambda x: ~x, vldSignals[:i]) rd(din)(And(rd(dout), *allLowerPriorNotReady)) cond = vld(din) dataConnectExpr = self.dataConnectionExpr(din, dout) dataCases.append((cond, dataConnectExpr)) dataDefault = self.dataConnectionExpr(None, dout) SwitchLogic(dataCases, dataDefault) vld(dout)(Or(*vldSignals))
if __name__ == "__main__": from hwt.interfaces.std import Handshaked from hwt.synthesizer.utils import toRtl u = HsJoinPrioritized(Handshaked) print(toRtl(u))