Source code for hwtLib.handshaked.joinFair
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import Or, rol, SwitchLogic
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import VldSynced
from hwt.interfaces.utils import addClkRstn
from hwt.synthesizer.param import Param
from hwt.synthesizer.vectorUtils import iterBits
from hwtLib.handshaked.joinPrioritized import HsJoinPrioritized
[docs]class HsJoinFairShare(HsJoinPrioritized):
"""
Join input stream to single output stream
inputs with lower number has higher priority
Priority is changing every clock
If prioritized input is not sending valid data,
input with lowest index and valid is used
combinational
"""
[docs] def _config(self):
HsJoinPrioritized._config(self)
self.EXPORT_SELECTED = Param(True)
[docs] def _declr(self):
HsJoinPrioritized._declr(self)
addClkRstn(self)
if self.EXPORT_SELECTED:
self.selectedOneHot = VldSynced()
self.selectedOneHot._replaceParam("DATA_WIDTH", self.INPUTS)
[docs] @staticmethod
def priorityAck(priorityReg, vldSignals, index):
"""
Generate ack logic for selected input
:param priorityReg: priority register with one hot encoding, 1 means input of this index should have be prioritized.
:param vldSignals: list of vld signals of input
:param index: index of input for which you wont get ack logic
:return: ack signal for this input
"""
priorityOverdrives = []
vldWithHigherPriority = list(vldSignals[:index])
for i, (p, vld) in enumerate(zip(iterBits(priorityReg), vldSignals)):
if i > index:
priorityOverdrives.append(p & vld)
# ack when no one with higher priority has vld or this input have the priority
ack = ~Or(*priorityOverdrives, *vldWithHigherPriority) | priorityReg[index]
return ack
[docs] def isSelectedLogic(self):
"""
Resolve isSelected signal flags for each input, when isSelected flag signal is 1 it means
input has clearance to make transaction
"""
vld = self.getVld
rd = self.getRd
dout = self.dataOut
priority = self._reg("priority", Bits(self.INPUTS), defVal=1)
priority(rol(priority, 1))
vldSignals = list(map(vld, self.dataIn))
isSelectedFlags = []
for i, din in enumerate(self.dataIn):
isSelected = self._sig("isSelected_%d" % i)
isSelected(self.priorityAck(priority, vldSignals, i))
isSelectedFlags.append(isSelected)
rd(din)(isSelected & rd(dout))
if self.EXPORT_SELECTED:
self.selectedOneHot.data[i](isSelected & vld(din))
if self.EXPORT_SELECTED:
self.selectedOneHot.vld(Or(*vldSignals) & rd(dout))
return isSelectedFlags, vldSignals
[docs] def _impl(self):
isSelectedFlags, vldSignals = self.isSelectedLogic()
self.inputMuxLogic(isSelectedFlags, vldSignals)
if __name__ == "__main__":
from hwt.interfaces.std import Handshaked
from hwt.synthesizer.utils import toRtl
u = HsJoinFairShare(Handshaked)
u.INPUTS.set(3)
print(toRtl(u))