Source code for hwtLib.amba.axis_comp.builder

from typing import Union, Tuple

from hwt.code import If
from hwt.hdl.types.bitsVal import BitsVal
from hwt.hdl.types.hdlType import HdlType
from hwt.interfaces.structIntf import StructIntf
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.unit import Unit
from hwtLib.abstract.streamBuilder import AbstractStreamBuilder
from hwtLib.amba.axis import AxiStream
from hwtLib.amba.axis_comp.cdc import AxiSCdc
from hwtLib.amba.axis_comp.fifo import AxiSFifo
from hwtLib.amba.axis_comp.fifoDrop import AxiSFifoDrop
from hwtLib.amba.axis_comp.fifo_async import AxiSFifoAsync
from hwtLib.amba.axis_comp.frame_deparser import AxiS_frameDeparser
from hwtLib.amba.axis_comp.frame_parser import AxiS_frameParser
from hwtLib.amba.axis_comp.joinPrioritized import AxiSJoinPrioritized
from hwtLib.amba.axis_comp.reg import AxiSReg
from hwtLib.amba.axis_comp.resizer import AxiS_resizer
from hwtLib.amba.axis_comp.splitCopy import AxiSSplitCopy
from hwtLib.amba.axis_comp.splitSelect import AxiSSpliSelect
from hwtLib.amba.axis_comp.storedBurst import AxiSStoredBurst


[docs]class AxiSBuilder(AbstractStreamBuilder): """ Helper class which simplifies building of large stream paths """ FifoCls = AxiSFifo FifoAsyncCls = AxiSFifoAsync FifoDropCls = AxiSFifoDrop RegCdcCls = AxiSCdc RegCls = AxiSReg SplitCopyCls = AxiSSplitCopy SplitSelectCls = AxiSSpliSelect ResizerCls = AxiS_resizer JoinPrioritizedCls = AxiSJoinPrioritized
[docs] def resize(self, newDataWidth): """ Change data width of axi stream """ def set_OUT_DATA_WIDTH(u): if self.master_to_slave: u.OUT_DATA_WIDTH = newDataWidth else: u.DATA_WIDTH = newDataWidth u.OUT_DATA_WIDTH = self.end.DATA_WIDTH return self._genericInstance(AxiS_resizer, "resize", set_OUT_DATA_WIDTH)
[docs] def startOfFrame(self) -> RtlSignal: """ generate start of frame signal, high when we expect new frame to start """ lastseen = self.parent._reg(self.name + "_sof_lastseen", def_val=1) intf = self.end ack = intf.valid & intf.ready If(ack, lastseen(intf.last) ) return lastseen
[docs] def parse(self, typeToParse) -> StructIntf: """ :param typeToParse: structuralized type to parse :return: interface with parsed data (e.g. StructIntf for HStruct) """ assert self.master_to_slave u = AxiS_frameParser(typeToParse) u._updateParamsFrom(self.end) setattr(self.parent, self._findSuitableName("parser"), u) self._propagateClkRstn(u) u.dataIn(self.end) self.lastComp = u self.end = None return u.dataOut
[docs] @classmethod def deparse(cls, parent: Unit, typeToForge: HdlType, intfCls: AxiStream, setupFn=None, name:str=None) -> Tuple["AxiSBuilder", StructIntf]: """ generate frame assembler for specified type :note: you can set endianity and others in setupFn :param parent: unit where generated units should be instantiated :param typeToForge: instance of HType used as template for frame to assembly :param intfCls: class for output interface :param setupFn: setup function for output interface :param name: name prefix for generated units :return: tuple (builder, interface with deparsed frame) """ u = AxiS_frameDeparser(typeToForge) if setupFn: setupFn(u) if name is None: # name can not be empty due AxiSBuilder initialization without interface name = "deparsed" self = cls(parent, None, name) setattr(parent, self._findSuitableName("deparser"), u) self._propagateClkRstn(u) self.end = u.dataOut self.lastComp = u self.end = u.dataOut return self, u.dataIn
[docs] def buff_drop(self, items:int, export_size=False, export_space=False): """ Instantiate a FIFO buffer with externally controlled frame drop functionality (use "dataIn_discard" signal) """ def set_params(u: AxiSFifoDrop): u.DEPTH = items u.EXPORT_SIZE = export_size u.EXPORT_SPACE = export_space return self._genericInstance(self.FifoDropCls, "buff_drop", set_params)
[docs] @classmethod def constant_frame(cls, parent: Unit, value: Union[bytes, Tuple[Union[int, BitsVal, None], ...]], data_width: int, use_strb:bool=False, use_keep:bool=False, repeat=True, name=None) -> "AxiSBuilder": """ Instantiate a constant buffer which wil produce the frame of specified data """ u = AxiSStoredBurst() u.DATA = value u.DATA_WIDTH = data_width u.USE_STRB = use_strb u.USE_KEEP = use_keep u.REPEAT = repeat if name is None: # name can not be empty due AxiSBuilder initialization without interface name = "stored_burst" self = cls(parent, None, name) setattr(parent, self._findSuitableName("stored_burst"), u) self._propagateClkRstn(u) self.end = u.dataOut self.lastComp = u self.end = u.dataOut return self