Source code for hwtLib.abstract.streamBuilder

from hwt.code import If
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import Rst_n, Handshaked
from hwt.synthesizer.interfaceLevel.interfaceUtils.proxy import InterfaceProxy
from hwt.synthesizer.interfaceLevel.unitImplHelpers import getClk, getRst


[docs]class AbstractStreamBuilder(object): """ :attention: this is just abstract class unit classes has to be specified in concrete implementation :cvar FifoCls: fifo unit class :cvar JoinSelectCls: select order based join unit class :cvar JoinFairCls: round robin based join unit class :cvar JoinPrioritizedCls: priority based join unit class :cvar RegCls: register unit class :cvar ResizerCls: resizer unit class :cvar SplitCopyCls: copy based split unit class :cvar SplitSelectCls: select order based split unit class (demultiplexer) :cvar SplitFairCls: round robin based split unit class :cvar SplitPrioritizedCls: priority based split unit class :ivar compId: used for sequential number of components :ivar lastComp: last builded component :ivar end: last interface of data-path :attention: input port is taken from self.end """ FifoCls = NotImplemented JoinSelectCls = NotImplemented JoinPrioritizedCls = NotImplemented JoinFairCls = NotImplemented RegCls = NotImplemented ResizerCls = NotImplemented SplitCopyCls = NotImplemented SplitSelectCls = NotImplemented SplitFairCls = NotImplemented SplitPrioritizedCls = NotImplemented
[docs] def __init__(self, parent, srcInterface, name=None): """ :param parent: unit in which will be all units created by this builder instantiated :param name: prefix for all instantiated units :param srcInterface: start of data-path """ self.parent = parent self.lastComp = None self.end = srcInterface if name is None: name = "gen_" + srcInterface._name self.name = name self.compId = 0
[docs] def getClk(self): """ lookup clock signal on parent """ return getClk(self.parent)
[docs] def getRstn(self): """ lookup reset(n) signal on parent """ rst = getRst(self.parent) if isinstance(rst, Rst_n): return rst else: return ~rst
[docs] def getInfCls(self): """ Get class of interface which this builder is currently using. """ return self._getIntfCls(self.end)
[docs] def _getIntfCls(self, intf): """ Get real interface class of interface """ if isinstance(intf, InterfaceProxy): return intf._origIntf.__class__ return intf.__class__
[docs] def _findSuitableName(self, unitName): """ find suitable name for component (= name without collisions) """ while True: name = "%s_%s_%d" % (self.name, unitName, self.compId) try: getattr(self.parent, name) except AttributeError: return name break self.compId += 1 self.compId += 1
[docs] def _propagateClkRstn(self, u): """ Connect clock and reset to unit "u" """ if hasattr(u, "clk"): u.clk(self.getClk()) if hasattr(u, 'rst_n'): u.rst_n(self.getRstn()) if hasattr(u, "rst"): u.rst(~self.getRstn())
[docs] def _genericInstance(self, unitCls, unitName, setParams=lambda u: u): """ Instantiate generic component and connect basics :param unitCls: class of unit which is being created :param unitName: name for unitCls :param setParams: function which updates parameters as is required (parameters are already shared with self.end interface) """ u = unitCls(self.getInfCls()) u._updateParamsFrom(self.end) setParams(u) setattr(self.parent, self._findSuitableName(unitName), u) self._propagateClkRstn(u) u.dataIn(self.end) self.lastComp = u self.end = u.dataOut return self
[docs] @classmethod def _join(cls, joinCls, parent, srcInterfaces, name, configAs, extraConfigFn): """ Create builder from many interfaces by joining them together :param joinCls: join component class which should be used :param parent: unit where builder should place components :param srcInterfacecs: sequence of interfaces which should be joined together (lower index = higher priority) :param configureAs: interface or another object which configuration should be applied :param extraConfigFn: function which is applied on join unit in configuration phase (can be None) """ srcInterfaces = list(srcInterfaces) if name is None: if configAs is None: name = "gen_join" else: name = "gen_" + configAs._name if configAs is None: configAs = srcInterfaces[0] self = cls(parent, None, name=name) u = joinCls(self._getIntfCls(configAs)) if extraConfigFn is not None: extraConfigFn(u) u._updateParamsFrom(configAs) u.INPUTS.set(len(srcInterfaces)) setattr(self.parent, self._findSuitableName(name + "_join"), u) self._propagateClkRstn(u) for joinIn, inputIntf in zip(u.dataIn, srcInterfaces): joinIn(inputIntf) self.lastComp = u self.end = u.dataOut return self
[docs] @classmethod def join_fair(cls, parent, srcInterfaces, name=None, configAs=None, exportSelected=False): """ create builder from fairly joined interfaces (round robin for input select) :param exportSelected: if True join component will have handshaked interface with index of selected input :note: other parameters same as in `.AbstractStreamBuilder.join_fair` """ def extraConfig(u): u.EXPORT_SELECTED.set(exportSelected) return cls._join(cls.JoinFairCls, parent, srcInterfaces, name, configAs, extraConfig)
[docs] def buff(self, items=1, latency=None, delay=None): """ Use registers and fifos to create buffer of specified paramters :note: if items <= latency registers are used else fifo is used :param items: number of items in buffer :param latency: latency of buffer (number of clk ticks required to get data from input to input) :param delay: delay of buffer (number of clk ticks required to get data to buffer) :note: delay can be used as synchronization method or to solve timing related problems because it will split valid signal path :note: if latency or delay is None the most optimal value is used """ if items == 1: if latency is None: latency = 1 if delay is None: delay = 0 else: if latency is None: latency = 2 if delay is None: delay = 0 assert latency >= 1 and delay >= 0, (latency, delay) if latency == 1 or latency >= items: # instantiate buffer as register def applyParams(u): u.LATENCY.set(latency) u.DELAY.set(delay) return self._genericInstance(self.RegCls, "reg", setParams=applyParams) else: # instantiate buffer as fifo if latency != 2 or delay != 0: raise NotImplementedError() def setDepth(u): u.DEPTH.set(items) return self._genericInstance(self.FifoCls, "fifo", setDepth)
[docs] def split_copy(self, noOfOutputs): """ Clone input data to all outputs :param noOfOutputs: number of output interfaces of the split """ def setChCnt(u): u.OUTPUTS.set(noOfOutputs) return self._genericInstance(self.SplitCopyCls, 'splitCopy', setChCnt)
[docs] def split_copy_to(self, *outputs): """ Same like split_copy, but outputs are automatically connected :param outputs: ports on which should be outputs of split component connected to """ noOfOutputs = len(outputs) s = self.split_copy(noOfOutputs) for toComponent, fromFork in zip(outputs, self.end): toComponent(fromFork) self.end = None # invalidate None because port was fully connected return s
[docs] def split_select(self, outputSelSignalOrSequence, noOfOutputs): """ Create a demultiplexer with number of outputs specified by noOfOutputs :param noOfOutputs: number of outputs of multiplexer :param outputSelSignalOrSequence: handshaked interface (onehot encoded) to control selected output or sequence of output indexes which should be used (will be repeated) """ def setChCnt(u): u.OUTPUTS.set(noOfOutputs) self._genericInstance(self.SplitSelectCls, 'select', setChCnt) if isinstance(outputSelSignalOrSequence, Handshaked): self.lastComp.selectOneHot(outputSelSignalOrSequence) else: seq = outputSelSignalOrSequence t = Bits(self.lastComp.selectOneHot.data._dtype.bit_length()) size = len(seq) ohIndexes = map(lambda x: 1 << x, seq) indexes = self.parent._sig(self.name + "split_seq", t[size], defVal=ohIndexes) actual = self.parent._reg(self.name + "split_seq_index", Bits(size.bit_length()), 0) iin = self.lastComp.selectOneHot iin.data(indexes[actual]) iin.vld(1) If(iin.rd, If(actual._eq(size - 1), actual(0) ).Else( actual(actual + 1) ) ) return self
[docs] def split_select_to(self, outputSelSignalOrSequence, *outputs): """ Same like split_select, but outputs are automatically connected :param outputs: ports on which should be outputs of split component connected to """ noOfOutputs = len(outputs) s = self.split_select(outputSelSignalOrSequence, noOfOutputs) for toComponent, fromFork in zip(outputs, self.end): toComponent(fromFork) self.end = None # invalidate None because port was fully connected return s
[docs] def split_prioritized(self, noOfOutputs): """ data from input is send to output witch is ready and has highest priority from all ready outputs :param noOfOutputs: number of output interfaces of the fork """ def setChCnt(u): u.OUTPUTS.set(noOfOutputs) self._genericInstance(self.SplitPrioritizedCls, 'splitPrio', setChCnt) return self
[docs] def split_prioritized_to(self, *outputs): """ Same like split_prioritized, but outputs are automatically connected :param outputs: ports on which should be outputs of split component connected to """ noOfOutputs = len(outputs) s = self.split_prioritized(noOfOutputs) for toComponent, fromFork in zip(outputs, self.end): toComponent(fromFork) self.end = None # invalidate None because port was fully connected return s
[docs] def split_fair(self, noOfOutputs, exportSelected=False): """ Create a rund robin selector with number of outputs specified by noOfOutputs :param noOfOutputs: number of outputs of multiplexer :param exportSelected: if is True split component will have interface "selectedOneHot" of type VldSynced wich will have one hot index of selected item """ def setChCnt(u): u.OUTPUTS.set(noOfOutputs) self._genericInstance(self.SplitFairCls, 'splitFair', setChCnt) return self
[docs] def split_fair_to(self, *outputs, exportSelected=False): """ Same like split_fair, but outputs are automatically connected :param outputs: ports on which should be outputs of split component connected to :param exportSelected: if is True split component will have interface "selectedOneHot" of type VldSynced wich will have one hot index of selected item """ noOfOutputs = len(outputs) s = self.split_fair(noOfOutputs, exportSelected=exportSelected) for toComponent, fromFork in zip(outputs, self.end): toComponent(fromFork) self.end = None # invalidate None because port was fully connected return s