Source code for hwtLib.handshaked.joinPrioritized

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import And, Or, SwitchLogic
from hwt.synthesizer.hObjList import HObjList
from hwt.synthesizer.param import Param
from hwtLib.handshaked.compBase import HandshakedCompBase


[docs]class HsJoinPrioritized(HandshakedCompBase): """ Join input stream to single output stream inputs with lower number has higher priority :note: combinational .. hwt-autodoc:: _example_HsJoinPrioritized """ def _config(self): self.INPUTS = Param(2) super()._config() def _declr(self): with self._paramsShared(): self.dataIn = HObjList( self.intfCls() for _ in range(int(self.INPUTS)) ) self.dataOut = self.intfCls()._m()
[docs] def dataConnectionExpr(self, dIn, dOut): """Create connection between input and output interface""" data = self.get_data dataConnectExpr = [] outDataSignals = list(data(dOut)) if dIn is None: dIn = [None for _ in outDataSignals] else: dIn = data(dIn) for _din, _dout in zip(dIn, outDataSignals): dataConnectExpr.append(_dout(_din)) return dataConnectExpr
def _impl(self): rd = self.get_ready_signal vld = self.get_valid_signal dout = self.dataOut vldSignals = [vld(d) for d in self.dataIn] # data out mux dataCases = [] for i, din in enumerate(self.dataIn): allLowerPriorNotReady = map(lambda x: ~x, vldSignals[:i]) rd(din)(And(rd(dout), *allLowerPriorNotReady)) cond = vld(din) dataConnectExpr = self.dataConnectionExpr(din, dout) dataCases.append((cond, dataConnectExpr)) dataDefault = self.dataConnectionExpr(None, dout) SwitchLogic(dataCases, dataDefault) vld(dout)(Or(*vldSignals))
[docs]def _example_HsJoinPrioritized(): from hwt.interfaces.std import Handshaked u = HsJoinPrioritized(Handshaked) return u
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = _example_HsJoinPrioritized() print(to_rtl_str(u))