Source code for hwtLib.abstract.streamBuilder

from typing import Optional, Tuple, Union

from hwt.code import If
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import Handshaked
from hwtLib.abstract.componentBuilder import AbstractComponentBuilder


[docs]class AbstractStreamBuilder(AbstractComponentBuilder): """ :note: see :class:`AbstractComponentBuilder` :attention: this is just abstract class unit classes has to be specified in concrete implementation :cvar ~.FifoCls: FIFO unit class :cvar ~.FifoAsyncCls: asynchronous FIFO (FIFO with separate clock per port) unit class :cvar ~.JoinSelectCls: select order based join unit class :cvar ~.JoinFairCls: round robin based join unit class :cvar ~.JoinPrioritizedCls: priority based join unit class :cvar ~.RegCls: register unit class :cvar ~.RegCdcCls: Clock domain crossing register unit class :cvar ~.ResizerCls: resizer unit class (used to change data width of an interface) :cvar ~.SplitCopyCls: copy based split unit class :cvar ~.SplitSelectCls: select order based split unit class (demultiplexer) :cvar ~.SplitFairCls: round robin based split unit class :cvar ~.SplitPrioritizedCls: priority based split unit class """ FifoCls = NotImplemented FifoAsyncCls = NotImplemented JoinSelectCls = NotImplemented JoinPrioritizedCls = NotImplemented JoinFairCls = NotImplemented RegCls = NotImplemented ResizerCls = NotImplemented SplitCopyCls = NotImplemented SplitSelectCls = NotImplemented SplitFairCls = NotImplemented SplitPrioritizedCls = NotImplemented
[docs] def _genericInstance(self, unit_cls, name, set_params=lambda u: u, update_params=True, propagate_clk_rst=True): """ Instantiate generic component and connect basics :param unit_cls: class of unit which is being created :param name: name for unit_cls instance :param set_params: function which updates parameters as is required (parameters are already shared with self.end interface) """ u = unit_cls(self.getInfCls()) if update_params: u._updateParamsFrom(self.end) set_params(u) setattr(self.parent, self._findSuitableName(name), u) if propagate_clk_rst: self._propagateClkRstn(u) self.lastComp = u if self.master_to_slave: u.dataIn(self.end) self.end = u.dataOut else: self.end(u.dataOut) self.end = u.dataIn return self
[docs] @classmethod def _join(cls, joinCls, parent, srcInterfaces, name, configAs, extraConfigFn): """ Create builder from many interfaces by joining them together :param joinCls: join component class which should be used :param parent: unit where builder should place components :param srcInterfacecs: sequence of interfaces which should be joined together (lower index = higher priority) :param configureAs: interface or another object which configuration should be applied :param extraConfigFn: function which is applied on join unit in configuration phase (can be None) """ srcInterfaces = list(srcInterfaces) if name is None: if configAs is None: name = "gen_join" else: name = "gen_" + configAs._name if configAs is None: configAs = srcInterfaces[0] self = cls(parent, None, name=name) u = joinCls(self._getIntfCls(configAs)) if extraConfigFn is not None: extraConfigFn(u) u._updateParamsFrom(configAs) u.INPUTS = len(srcInterfaces) setattr(self.parent, self._findSuitableName(name + "_join"), u) self._propagateClkRstn(u) for joinIn, inputIntf in zip(u.dataIn, srcInterfaces): joinIn(inputIntf) self.lastComp = u self.end = u.dataOut return self
[docs] @classmethod def join_prioritized(cls, parent, srcInterfaces, name=None, configAs=None, extraConfigFn=None): """ create builder from fairly joined interfaces (round robin for input select) :note: other parameters same as in `.AbstractStreamBuilder._join` """ return cls._join(cls.JoinPrioritizedCls, parent, srcInterfaces, name, configAs, extraConfigFn)
[docs] @classmethod def join_fair(cls, parent, srcInterfaces, name=None, configAs=None, exportSelected=False): """ create builder from fairly joined interfaces (round robin for input select) :param exportSelected: if True join component will have handshaked interface with index of selected input :note: other parameters same as in `.AbstractStreamBuilder._join` """ def extraConfig(u): u.EXPORT_SELECTED = exportSelected return cls._join(cls.JoinFairCls, parent, srcInterfaces, name, configAs, extraConfig)
[docs] def buff(self, items:int=1, latency: Union[None, int, Tuple[int, int]]=None, delay: Optional[int]=None, init_data: tuple=()): """ Use registers and FIFOs to create buffer of specified parameters :note: if items <= latency registers are used else FIFO is used :param items: number of items in buffer :param latency: latency of buffer (number of clk ticks required to get data from input to input) :param delay: delay of buffer (number of clk ticks required to get data to buffer) :note: delay can be used as synchronization method or to solve timing related problems because it will split valid signal path :param init_data: a reset value of buffer (data is transfered after reset) if items=1 and interface has just data:uint8_t signal the init_data will be in format ((0,),) :note: if latency or delay is None the most optimal value is used """ if items == 0: assert latency is None or latency == 0 assert delay is None or delay == 0 return self elif items == 1: if latency is None: latency = 1 if delay is None: delay = 0 else: if latency is None: latency = 2 if delay is None: delay = 0 if init_data is not None: init_data = tuple(init_data) assert len(init_data) <= items, (items, "more init data than init size", init_data) assert isinstance(latency, tuple) or latency >= 1 and delay >= 0, (latency, delay) if isinstance(latency, tuple) or latency == 1 or latency >= items: # instantiate buffer as register def applyParams(u): u.LATENCY = latency u.DELAY = delay u.INIT_DATA = init_data return self._genericInstance(self.RegCls, "reg", set_params=applyParams) else: # instantiate buffer as fifo if latency != 2 or delay != 0: raise NotImplementedError() def applyParams(u): u.DEPTH = items u.INIT_DATA = init_data return self._genericInstance(self.FifoCls, "fifo", applyParams)
[docs] def buff_cdc(self, clk, rst, items=1): """ Instantiate a CDC (Clock Domain Crossing) buffer or AsyncFifo on selected interface :note: if items==1 CDC clock synchronization register is used if items>1 asynchronous FIFO is used """ in_clk = self.getClk() in_rst_n = self.getRstn() if not self.master_to_slave: in_clk, clk = clk, in_clk in_rst_n, rst = rst, in_rst_n def set_clk_freq(u): u.IN_FREQ = in_clk.FREQ u.OUT_FREQ = clk.FREQ if items > 1: def configure(u): u.DEPTH = items set_clk_freq(u) res = self._genericInstance( self.FifoAsyncCls, "cdcAFifo", configure, propagate_clk_rst=False) else: assert items == 1, items res = self._genericInstance( self.RegCdcCls, "cdcReg", set_clk_freq, propagate_clk_rst=False) b = res.lastComp b.dataIn_clk(in_clk) b.dataIn_rst_n(in_rst_n) b.dataOut_clk(clk) b.dataOut_rst_n(rst) return res
[docs] def split_copy(self, noOfOutputs): """ Clone input data to all outputs :param noOfOutputs: number of output interfaces of the split """ if not self.master_to_slave: assert len(self.end) == noOfOutputs, self.end def setChCnt(u): u.OUTPUTS = noOfOutputs return self._genericInstance(self.SplitCopyCls, 'splitCopy', setChCnt)
[docs] def split_copy_to(self, *outputs): """ Same like split_copy, but outputs are automatically connected :param outputs: ports on which should be outputs of split component connected to """ assert self.master_to_slave, "This function does not make sense if building in reverse order" noOfOutputs = len(outputs) s = self.split_copy(noOfOutputs) for toComponent, fromFork in zip(outputs, self.end): toComponent(fromFork) self.end = None # invalidate None because port was fully connected return s
[docs] def split_select(self, outputSelSignalOrSequence, noOfOutputs): """ Create a demultiplexer with number of outputs specified by noOfOutputs :param noOfOutputs: number of outputs of multiplexer :param outputSelSignalOrSequence: handshaked interface (onehot encoded) to control selected output or sequence of output indexes which should be used (will be repeated) """ if not self.master_to_slave: assert len(self.end) == noOfOutputs, self.end def setChCnt(u): u.OUTPUTS = noOfOutputs self._genericInstance(self.SplitSelectCls, 'select', setChCnt) if isinstance(outputSelSignalOrSequence, Handshaked): self.lastComp.selectOneHot(outputSelSignalOrSequence) else: seq = outputSelSignalOrSequence t = Bits(self.lastComp.selectOneHot.data._dtype.bit_length()) size = len(seq) ohIndexes = map(lambda x: 1 << x, seq) indexes = self.parent._sig(self.name + "split_seq", t[size], def_val=ohIndexes) actual = self.parent._reg(self.name + "split_seq_index", Bits(size.bit_length()), 0) iin = self.lastComp.selectOneHot iin.data(indexes[actual]) iin.vld(1) If(iin.rd, If(actual._eq(size - 1), actual(0) ).Else( actual(actual + 1) ) ) return self
[docs] def split_select_to(self, outputSelSignalOrSequence, *outputs): """ Same like split_select, but outputs are automatically connected :param outputs: ports on which should be outputs of split component connected to """ assert self.master_to_slave, "This function does not make sense if building in reverse order" noOfOutputs = len(outputs) s = self.split_select(outputSelSignalOrSequence, noOfOutputs) for toComponent, fromFork in zip(outputs, self.end): toComponent(fromFork) self.end = None # invalidate None because port was fully connected return s
[docs] def split_prioritized(self, noOfOutputs): """ data from input is send to output which is ready and has highest priority from all ready outputs :param noOfOutputs: number of output interfaces of the fork """ if not self.master_to_slave: assert len(self.end) == noOfOutputs, self.end def setChCnt(u): u.OUTPUTS = noOfOutputs self._genericInstance(self.SplitPrioritizedCls, 'splitPrio', setChCnt) return self
[docs] def split_prioritized_to(self, *outputs): """ Same like split_prioritized, but outputs are automatically connected :param outputs: ports on which should be outputs of split component connected to """ assert self.master_to_slave, "This function does not make sense if building in reverse order" noOfOutputs = len(outputs) s = self.split_prioritized(noOfOutputs) for toComponent, fromFork in zip(outputs, self.end): toComponent(fromFork) self.end = None # invalidate None because port was fully connected return s
[docs] def split_fair(self, noOfOutputs, exportSelected=False): """ Create a round robin selector with number of outputs specified by noOfOutputs :param noOfOutputs: number of outputs of multiplexer :param exportSelected: if is True split component will have interface "selectedOneHot" of type VldSynced wich will have one hot index of selected item """ if not self.master_to_slave: assert len(self.end) == noOfOutputs, self.end def setChCnt(u): u.OUTPUTS = noOfOutputs self._genericInstance(self.SplitFairCls, 'splitFair', setChCnt) return self
[docs] def split_fair_to(self, *outputs, exportSelected=False): """ Same like split_fair, but outputs are automatically connected :param outputs: ports on which should be outputs of split component connected to :param exportSelected: if is True split component will have interface "selectedOneHot" of type VldSynced which will have one hot index of selected item """ assert self.master_to_slave, "This function does not make sense if building in reverse order" noOfOutputs = len(outputs) s = self.split_fair(noOfOutputs, exportSelected=exportSelected) for toComponent, fromFork in zip(outputs, self.end): toComponent(fromFork) self.end = None # invalidate None because port was fully connected return s