Source code for hwtLib.amba.axi_comp.virtualDma

from typing import Union, Optional, List

from hwt.hdl.frameTmpl import FrameTmpl
from hwt.hdl.transTmpl import TransTmpl
from hwt.hdl.types.hdlType import HdlType
from hwt.hdl.types.stream import HStream
from hwt.interfaces.std import Handshaked
from hwt.interfaces.structIntf import StructIntf
from hwt.interfaces.unionIntf import UnionSource
from hwt.synthesizer.unit import Unit
from hwtLib.abstract.componentBuilder import AbstractComponentBuilder
from hwtLib.amba.axi3 import Axi3
from hwtLib.amba.axi3Lite import Axi3Lite
from hwtLib.amba.axi4 import Axi4
from hwtLib.amba.axi4Lite import Axi4Lite
from hwtLib.amba.datapump.r import Axi_rDatapump
from hwtLib.amba.datapump.utils import connectDp
from hwtLib.structManipulators.structReader import StructReader
from hwtLib.structManipulators.structWriter import StructWriter
from hwtLib.amba.datapump.w import Axi_wDatapump


[docs]class AxiVirtualDma(AbstractComponentBuilder): """ An object which can be used to generate read/write logic for AMBA AXI interfaces. It does these things: * Based on alignment of the data and data type it optimizes shift logic for alignment * If transaction can spawn over multiple AXI transactions it also generates the logic for dispatching and merging of such a transactions. * It propagates read/write/aligmnent and input errors as a hwt InHwExceptions :ivar alignas: specifies alignment requirement for a data type t (in bits), same functionailty as C++11 alignas specifier, used to discard alignment logic """
[docs] def __init__(self, axi: Union[Axi3, Axi3Lite, Axi4, Axi4Lite], alignas: int=8, max_trans_overlap=16, ): """ :param axi: AMBA AXI bus used to read the data """ self.alignas = alignas self.max_trans_overlap = max_trans_overlap p = axi while p._parent is not None: p = p._parent assert isinstance(p, Unit), p AbstractComponentBuilder.__init__(self, p, axi, 'AxiVirtualDma')
[docs] def read(self, t: HdlType, tmpl: Optional[TransTmpl]=None, frames: Optional[List[FrameTmpl]]=None, transaction_id=0) -> Union[UnionSource, StructIntf, Handshaked]: """ :param ~.t: instance of HStruct which specifies data format to download :param ~.tmpl: instance of TransTmpl for this t :param ~.frames: list of FrameTmpl instances for this tmpl :param ~.transaction_id: id value for axi :note: if tmpl and frames are None they are resolved from structT parseTemplate :note: A single transaction can be split to multiple frames, if they are specified by "frames". """ axi = self.end r = StructReader(t, tmpl=tmpl, frames=frames) r.ID_WIDTH = 0 r.ADDR_WIDTH = axi.ADDR_WIDTH r.DATA_WIDTH = axi.DATA_WIDTH setattr(self.parent, self._findSuitableName("rReader"), r) self._propagateClkRstn(r) dp = Axi_rDatapump(axi.__class__) dp.ADDR_WIDTH = axi.ADDR_WIDTH dp.ID_WIDTH = axi.ID_WIDTH dp.DATA_WIDTH = axi.DATA_WIDTH dp.USE_STRB = False dp.ID_VAL = transaction_id dp.MAX_TRANS_OVERLAP = self.max_trans_overlap dp.ALIGNAS = self.alignas if isinstance(t, HStream): raise NotImplementedError() else: dp.MAX_CHUNKS = 1 dp.CHUNK_WIDTH = r.rDatapump.MAX_BYTES * 8 setattr(self.parent, self._findSuitableName("rDataPump"), dp) self._propagateClkRstn(dp) connectDp(self.parent, r, dp, self.end) return r.get, r.dataOut
# def read_to_reg(self, t: HdlType, dst: Union[RtlSyncSignal, StructIntf], # ready: RtlSignal, id_=0, # tmpl: Optional[TransTmpl]=None, # frames: Optional[List[FrameTmpl]]=None # ): # """ # :note: same as :meth:`AxiMemRead.read` just store the result in to a register. # # :return: signal which is 1 when last part of the data is beeing stored in to thes register. # """ # # return addr, last # # def read_out_of_order(self, t: HdlType, allocate, receive, deallocate, # tmpl: Optional[TransTmpl]=None, # frames: Optional[List[FrameTmpl]]=None, # ): # """ # Rest of the paramters described in :meth:`~.AxiMemRead.read` # """ # # return addr, rData # # def read_out_of_order_to_reg(self, t: HdlType, allocate, receive, deallocate, # dst: Union[RtlSyncSignal, StructIntf], ready: RtlSignal, # tmpl: Optional[TransTmpl]=None, # frames: Optional[List[FrameTmpl]]=None, # ): # return addr, id, last
[docs] def write(self, t: HdlType, id_=0, tmpl: Optional[TransTmpl]=None, frames: Optional[List[FrameTmpl]]=None,): """ Rest of the paramters described in :meth:`~.AxiMemRead.read` """ axi = self.end w = StructWriter(t, tmpl=tmpl, frames=frames) w.ID_WIDTH = 0 w.ADDR_WIDTH = axi.ADDR_WIDTH w.DATA_WIDTH = axi.DATA_WIDTH w.USE_STRB = True setattr(self.parent, self._findSuitableName("wWriter"), w) self._propagateClkRstn(w) dp = Axi_wDatapump(axi.__class__) dp.ADDR_WIDTH = axi.ADDR_WIDTH dp.ID_WIDTH = axi.ID_WIDTH dp.DATA_WIDTH = axi.DATA_WIDTH dp.ID_VAL = id_ dp.MAX_TRANS_OVERLAP = self.max_trans_overlap dp.ALIGNAS = self.alignas if isinstance(t, HStream): raise NotImplementedError() else: dp.MAX_CHUNKS = 1 dp.CHUNK_WIDTH = w.wDatapump.MAX_BYTES * 8 setattr(self.parent, self._findSuitableName("wDataPump"), dp) self._propagateClkRstn(dp) connectDp(self.parent, w, dp, self.end) return w.set, w.dataIn, w.writeAck
[docs] def build(self): """ Build an DMA logic from previously stacked reads/writes :note: placeholder for future use """