Source code for hwtLib.handshaked.splitFair

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import Or, rol, connect
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import Handshaked
from hwt.interfaces.utils import addClkRstn
from hwt.synthesizer.param import Param
from hwtLib.handshaked.joinFair import HsJoinFairShare
from hwtLib.handshaked.splitCopy import HsSplitCopy


[docs]class HsSplitFair(HsSplitCopy): """ Split input stream to outputs, select single output for every input data based on priority. Priority is changing every clock If prioritized output is not ready, input with lowest index and ready is used combinational .. aafig:: +-------+ +------> out0 | | +-------+ +-------+ input stream | | +-------+ +-------------> split +------> out1 | | | +-------+ +-------+ | +-------+ +------> out2 | +-------+ :ivar selectedOneHot: handshaked interface with one hot encoded index of selected output """
[docs] def _config(self): HsSplitCopy._config(self) self.EXPORT_SELECTED = Param(True)
[docs] def _declr(self): HsSplitCopy._declr(self) addClkRstn(self) if self.EXPORT_SELECTED: self.selectedOneHot = Handshaked() self.selectedOneHot._replaceParam("DATA_WIDTH", self.OUTPUTS)
[docs] def isSelectedLogic(self): """ Resolve isSelected signal flags for each input, when isSelected flag signal is 1 it means input has clearance to make transaction """ vld = self.getVld rd = self.getRd din = self.dataIn EXPORT_SELECTED = bool(self.EXPORT_SELECTED) priority = self._reg("priority", Bits(self.OUTPUTS), defVal=1) priority(rol(priority, 1)) rdSignals = list(map(rd, self.dataOut)) for i, dout in enumerate(self.dataOut): isSelected = self._sig("isSelected_%d" % i) isSelected(HsJoinFairShare.priorityAck(priority, rdSignals, i)) if EXPORT_SELECTED: self.selectedOneHot.data[i](isSelected & vld(din)) vld(dout)(isSelected & vld(din) & self.selectedOneHot.rd) else: vld(dout)(isSelected & vld(din)) if EXPORT_SELECTED: self.selectedOneHot.vld(Or(*rdSignals) & vld(din)) return rdSignals
[docs] def _impl(self): rdSignals = self.isSelectedLogic() for dout in self.dataOut: connect(self.dataIn, dout, exclude={self.getRd(dout), self.getVld(dout)}) if self.EXPORT_SELECTED: self.getRd(self.dataIn)(Or(*rdSignals) & self.selectedOneHot.rd) else: self.getRd(self.dataIn)(Or(*rdSignals))
if __name__ == "__main__": from hwt.synthesizer.utils import toRtl u = HsSplitFair(Handshaked) print(toRtl(u))