Source code for hwtLib.amba.datapump.w

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import If, Switch, Concat
from hwt.code_utils import rename_signal
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.struct import HStruct
from hwt.hwIOs.std import HwIOSignal, HwIODataRdVld, HwIOVectSignal, \
    HwIORdVldSync
from hwt.hwIOs.utils import propagateClkRstn
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.interfaceLevel.utils import NotSpecifiedError
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwtLib.amba.constants import RESP_OKAY
from hwtLib.amba.datapump.base import AxiDatapumpBase
from hwtLib.amba.datapump.intf import HwIOAxiWDatapump
from hwtLib.handshaked.fifo import HandshakedFifo
from hwtLib.handshaked.streamNode import StreamNode
from hwtSimApi.hdlSimulator import HdlSimulator


[docs] class WFifoIntf(HwIODataRdVld): """ .. hwt-autodoc:: """ @override def hwConfig(self): self.SHIFT_OPTIONS = HwParam((0,)) @override def hwDeclr(self): if self.SHIFT_OPTIONS != (0,): # The encoded value of how many bytes should be the data from input write data be shifted # in order to fit the word on output write bus self.shift = HwIOVectSignal(log2ceil(len(self.SHIFT_OPTIONS))) # last word can be canceled because the address can have some offset which could # potentially spot new word but due to limited transaction size (using req.rem) # this should not happen, this flags provides this information self.drop_last_word = HwIOSignal() HwIORdVldSync.hwDeclr(self)
[docs] @override def _initSimAgent(self, sim:HdlSimulator): raise NotSpecifiedError()
[docs] class BFifoIntf(HwIODataRdVld): """ .. hwt-autodoc:: """ @override def hwConfig(self): pass @override def hwDeclr(self): self.isLast = HwIOSignal() HwIORdVldSync.hwDeclr(self)
[docs] @override def _initSimAgent(self, sim:HdlSimulator): raise NotSpecifiedError()
[docs] @serializeParamsUniq class Axi_wDatapump(AxiDatapumpBase): """ Axi3/Axi3Lte/Axi4/Axi4Lite to axi write datapump, :see: :class:`hwtLib.amba.datapump.base.AxiDatapumpBase` .. hwt-autodoc:: """ @override def hwDeclr(self): super().hwDeclr() # add clk, rst, axi addr channel and req channel self.errorWrite = HwIOSignal()._m() if self.ALIGNAS != 8: self.errorAlignment = HwIOSignal()._m() with self._hwParamsShared(): self.axi.HAS_R = False d = self.driver = HwIOAxiWDatapump() d.ID_WIDTH = 0 d.ID_WIDTH = 0 d.MAX_BYTES = self.MAX_CHUNKS * (self.CHUNK_WIDTH // 8) # fifo for id propagation and frame splitting on axi.w channel wf = self.writeInfoFifo = HandshakedFifo(WFifoIntf) wf.DEPTH = self.MAX_TRANS_OVERLAP wf.SHIFT_OPTIONS = self.getShiftOptions() # fifo for propagation of end of frame from axi.b channel bf = self.bInfoFifo = HandshakedFifo(BFifoIntf) bf.DEPTH = self.MAX_TRANS_OVERLAP
[docs] def storeTransInfo(self, transInfo: WFifoIntf, isLast: bool): if self.isAlwaysAligned(): return [] else: req = self.driver.req offset = req.addr[self.getSizeAlignBits():] crossesWordBoundary = self.isCrossingWordBoundary(req.addr, req.rem) return [ self.encodeShiftValue(transInfo.SHIFT_OPTIONS, offset, transInfo.shift), transInfo.drop_last_word(~self.addrIsAligned(req.addr) & ~crossesWordBoundary) ]
[docs] def axiWHandler(self, wErrFlag: RtlSignal): w = self.axi.w wIn = self.driver.w wInfo = self.writeInfoFifo.dataOut bInfo = self.bInfoFifo.dataIn dataAck = self._sig("dataAck") inLast = wIn.last if hasattr(w, "id"): # AXI3 has id signal, AXI4 does not w.id(self.ID_VAL) if self.isAlwaysAligned(): w.data(wIn.data) w.strb(wIn.strb) if self.axi.LEN_WIDTH: doSplit = wIn.last else: doSplit = BIT.from_py(1) waitForShift = BIT.from_py(0) else: isFirst = self._reg("isFirstData", def_val=1) prevData = self._reg("prevData", HStruct( (wIn.data._dtype, "data"), (wIn.strb._dtype, "strb"), (BIT, "waitingForShift"), ), def_val={"waitingForShift": 0}) waitForShift = prevData.waitingForShift isShifted = (wInfo.shift != 0) | (wInfo.SHIFT_OPTIONS[0] != 0) wInWillWaitForShift = wIn.valid & wIn.last & isShifted & ~prevData.waitingForShift & ~wInfo.drop_last_word If(StreamNode([wIn, wInfo], [w, bInfo], skipWhen={wIn: waitForShift}).ack() & ~wErrFlag, # data feed in to prevData is stalled if we need to dispath # the remainder from previous word which was not yet dispatched due data shift # the last data from wIn is consumed on wIn.last, however there is 1 beat stall # for wIn i transaction was not aligned. wInfo and bInfo channels are activated # after last beat of wOut is send If(~prevData.waitingForShift, prevData.data(wIn.data), prevData.strb(wIn.strb), ), waitForShift(wInWillWaitForShift), isFirst((isShifted & waitForShift) | ((~isShifted | wInfo.drop_last_word) & wIn.last)) ) def applyShift(sh): if sh == 0 and wInfo.SHIFT_OPTIONS[0] == 0: return [ w.data(wIn.data), w.strb(wIn.strb), ] else: rem_w = self.DATA_WIDTH - sh return [ # wIn.data starts on 0 we need to shift it sh bits # in first word the prefix is invalid, in rest of the frames it is taken from # previous data If(waitForShift, w.data(Concat(HBits(rem_w).from_py(None), prevData.data[:rem_w])), ).Else( w.data(Concat(wIn.data[rem_w:], prevData.data[:rem_w])), ), If(waitForShift, # wait until remainder of previous data is send w.strb(Concat(HBits(rem_w // 8).from_py(0), prevData.strb[:rem_w // 8])), ).Elif(isFirst, # ignore previous data w.strb(Concat(wIn.strb[rem_w // 8:], HBits(sh // 8).from_py(0))), ).Else( # take what is left from prev data and append from wIn w.strb(Concat(wIn.strb[rem_w // 8:], prevData.strb[:rem_w // 8])), ) ] Switch(wInfo.shift).add_cases([ (i, applyShift(sh)) for i, sh in enumerate(wInfo.SHIFT_OPTIONS) ]).Default( w.data(None), w.strb(None), ) inLast = rename_signal(self, isShifted._ternary(waitForShift | (wIn.last & wInfo.drop_last_word), wIn.last), "inLast") doSplit = inLast if self.useTransSplitting(): wordCntr = self._reg("wWordCntr", self.getLen_t(), 0) doSplit = rename_signal(self, wordCntr._eq(self.getAxiLenMax()) | doSplit, "doSplit1") If(StreamNode([wInfo, wIn], [bInfo, w]).ack() & ~wErrFlag, If(doSplit, wordCntr(0) ).Else( wordCntr(wordCntr + 1) ) ) if self.AXI_CLS.LEN_WIDTH != 0: w.last(doSplit) # if this frame was split into a multiple frames wIn.last will equal 0 bInfo.isLast(inLast) dataNode = StreamNode( masters=[wIn, wInfo], slaves=[bInfo, w], skipWhen={ wIn: waitForShift, }, extraConds={ wIn:~waitForShift, wInfo: doSplit, bInfo: doSplit, w:~wErrFlag} ) dataAck(dataNode.ack()) dataNode.sync()
[docs] def axiBHandler(self): b = self.axi.b ack = self.driver.ack lastFlags = self.bInfoFifo.dataOut StreamNode( masters=[b, lastFlags], slaves=[ack], extraConds={ ack: lastFlags.isLast } ).sync()
@override def hwImpl(self): propagateClkRstn(self) b = self.axi.b wErrFlag = self._reg("wErrFlag", def_val=0) If(b.valid & (b.resp != RESP_OKAY), wErrFlag(1) ) self.errorWrite(wErrFlag) if self.ALIGNAS != 8: wErrAlignFlag = self._reg("wErrAlignFlag", def_val=0) req = self.driver.req If(req.vld & ~self.addrIsAligned(req.addr), wErrAlignFlag(1) ) self.errorAlignment(wErrAlignFlag) wErrFlag = wErrFlag | wErrAlignFlag self.addrHandler(self.driver.req, self.axi.aw, self.writeInfoFifo.dataIn, wErrFlag) self.axiWHandler(wErrFlag) self.axiBHandler()
if __name__ == "__main__": from hwt.synth import to_rtl_str m = Axi_wDatapump() # m.ALIGNAS = 8 print(to_rtl_str(m))