#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If, Switch, Concat
from hwt.code_utils import rename_signal
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.struct import HStruct
from hwt.hwIOs.std import HwIOSignal, HwIODataRdVld, HwIOVectSignal, \
HwIORdVldSync
from hwt.hwIOs.utils import propagateClkRstn
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.interfaceLevel.utils import NotSpecifiedError
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwtLib.amba.constants import RESP_OKAY
from hwtLib.amba.datapump.base import AxiDatapumpBase
from hwtLib.amba.datapump.intf import HwIOAxiWDatapump
from hwtLib.handshaked.fifo import HandshakedFifo
from hwtLib.handshaked.streamNode import StreamNode
from hwtSimApi.hdlSimulator import HdlSimulator
[docs]
class WFifoIntf(HwIODataRdVld):
"""
.. hwt-autodoc::
"""
@override
def hwConfig(self):
self.SHIFT_OPTIONS = HwParam((0,))
@override
def hwDeclr(self):
if self.SHIFT_OPTIONS != (0,):
# The encoded value of how many bytes should be the data from input write data be shifted
# in order to fit the word on output write bus
self.shift = HwIOVectSignal(log2ceil(len(self.SHIFT_OPTIONS)))
# last word can be canceled because the address can have some offset which could
# potentially spot new word but due to limited transaction size (using req.rem)
# this should not happen, this flags provides this information
self.drop_last_word = HwIOSignal()
HwIORdVldSync.hwDeclr(self)
[docs]
@override
def _initSimAgent(self, sim:HdlSimulator):
raise NotSpecifiedError()
[docs]
class BFifoIntf(HwIODataRdVld):
"""
.. hwt-autodoc::
"""
@override
def hwConfig(self):
pass
@override
def hwDeclr(self):
self.isLast = HwIOSignal()
HwIORdVldSync.hwDeclr(self)
[docs]
@override
def _initSimAgent(self, sim:HdlSimulator):
raise NotSpecifiedError()
[docs]
@serializeParamsUniq
class Axi_wDatapump(AxiDatapumpBase):
"""
Axi3/Axi3Lte/Axi4/Axi4Lite to axi write datapump,
:see: :class:`hwtLib.amba.datapump.base.AxiDatapumpBase`
.. hwt-autodoc::
"""
@override
def hwDeclr(self):
super().hwDeclr() # add clk, rst, axi addr channel and req channel
self.errorWrite = HwIOSignal()._m()
if self.ALIGNAS != 8:
self.errorAlignment = HwIOSignal()._m()
with self._hwParamsShared():
self.axi.HAS_R = False
d = self.driver = HwIOAxiWDatapump()
d.ID_WIDTH = 0
d.ID_WIDTH = 0
d.MAX_BYTES = self.MAX_CHUNKS * (self.CHUNK_WIDTH // 8)
# fifo for id propagation and frame splitting on axi.w channel
wf = self.writeInfoFifo = HandshakedFifo(WFifoIntf)
wf.DEPTH = self.MAX_TRANS_OVERLAP
wf.SHIFT_OPTIONS = self.getShiftOptions()
# fifo for propagation of end of frame from axi.b channel
bf = self.bInfoFifo = HandshakedFifo(BFifoIntf)
bf.DEPTH = self.MAX_TRANS_OVERLAP
[docs]
def storeTransInfo(self, transInfo: WFifoIntf, isLast: bool):
if self.isAlwaysAligned():
return []
else:
req = self.driver.req
offset = req.addr[self.getSizeAlignBits():]
crossesWordBoundary = self.isCrossingWordBoundary(req.addr, req.rem)
return [
self.encodeShiftValue(transInfo.SHIFT_OPTIONS, offset, transInfo.shift),
transInfo.drop_last_word(~self.addrIsAligned(req.addr) & ~crossesWordBoundary)
]
[docs]
def axiWHandler(self, wErrFlag: RtlSignal):
w = self.axi.w
wIn = self.driver.w
wInfo = self.writeInfoFifo.dataOut
bInfo = self.bInfoFifo.dataIn
dataAck = self._sig("dataAck")
inLast = wIn.last
if hasattr(w, "id"):
# AXI3 has id signal, AXI4 does not
w.id(self.ID_VAL)
if self.isAlwaysAligned():
w.data(wIn.data)
w.strb(wIn.strb)
if self.axi.LEN_WIDTH:
doSplit = wIn.last
else:
doSplit = BIT.from_py(1)
waitForShift = BIT.from_py(0)
else:
isFirst = self._reg("isFirstData", def_val=1)
prevData = self._reg("prevData", HStruct(
(wIn.data._dtype, "data"),
(wIn.strb._dtype, "strb"),
(BIT, "waitingForShift"),
),
def_val={"waitingForShift": 0})
waitForShift = prevData.waitingForShift
isShifted = (wInfo.shift != 0) | (wInfo.SHIFT_OPTIONS[0] != 0)
wInWillWaitForShift = wIn.valid & wIn.last & isShifted & ~prevData.waitingForShift & ~wInfo.drop_last_word
If(StreamNode([wIn, wInfo], [w, bInfo], skipWhen={wIn: waitForShift}).ack() & ~wErrFlag,
# data feed in to prevData is stalled if we need to dispath
# the remainder from previous word which was not yet dispatched due data shift
# the last data from wIn is consumed on wIn.last, however there is 1 beat stall
# for wIn i transaction was not aligned. wInfo and bInfo channels are activated
# after last beat of wOut is send
If(~prevData.waitingForShift,
prevData.data(wIn.data),
prevData.strb(wIn.strb),
),
waitForShift(wInWillWaitForShift),
isFirst((isShifted & waitForShift) | ((~isShifted | wInfo.drop_last_word) & wIn.last))
)
def applyShift(sh):
if sh == 0 and wInfo.SHIFT_OPTIONS[0] == 0:
return [
w.data(wIn.data),
w.strb(wIn.strb),
]
else:
rem_w = self.DATA_WIDTH - sh
return [
# wIn.data starts on 0 we need to shift it sh bits
# in first word the prefix is invalid, in rest of the frames it is taken from
# previous data
If(waitForShift,
w.data(Concat(HBits(rem_w).from_py(None), prevData.data[:rem_w])),
).Else(
w.data(Concat(wIn.data[rem_w:], prevData.data[:rem_w])),
),
If(waitForShift,
# wait until remainder of previous data is send
w.strb(Concat(HBits(rem_w // 8).from_py(0), prevData.strb[:rem_w // 8])),
).Elif(isFirst,
# ignore previous data
w.strb(Concat(wIn.strb[rem_w // 8:], HBits(sh // 8).from_py(0))),
).Else(
# take what is left from prev data and append from wIn
w.strb(Concat(wIn.strb[rem_w // 8:], prevData.strb[:rem_w // 8])),
)
]
Switch(wInfo.shift).add_cases([
(i, applyShift(sh))
for i, sh in enumerate(wInfo.SHIFT_OPTIONS)
]).Default(
w.data(None),
w.strb(None),
)
inLast = rename_signal(self, isShifted._ternary(waitForShift | (wIn.last & wInfo.drop_last_word), wIn.last), "inLast")
doSplit = inLast
if self.useTransSplitting():
wordCntr = self._reg("wWordCntr", self.getLen_t(), 0)
doSplit = rename_signal(self, wordCntr._eq(self.getAxiLenMax()) | doSplit, "doSplit1")
If(StreamNode([wInfo, wIn], [bInfo, w]).ack() & ~wErrFlag,
If(doSplit,
wordCntr(0)
).Else(
wordCntr(wordCntr + 1)
)
)
if self.AXI_CLS.LEN_WIDTH != 0:
w.last(doSplit)
# if this frame was split into a multiple frames wIn.last will equal 0
bInfo.isLast(inLast)
dataNode = StreamNode(
masters=[wIn, wInfo],
slaves=[bInfo, w],
skipWhen={
wIn: waitForShift,
},
extraConds={
wIn:~waitForShift,
wInfo: doSplit,
bInfo: doSplit,
w:~wErrFlag}
)
dataAck(dataNode.ack())
dataNode.sync()
[docs]
def axiBHandler(self):
b = self.axi.b
ack = self.driver.ack
lastFlags = self.bInfoFifo.dataOut
StreamNode(
masters=[b, lastFlags],
slaves=[ack],
extraConds={
ack: lastFlags.isLast
}
).sync()
@override
def hwImpl(self):
propagateClkRstn(self)
b = self.axi.b
wErrFlag = self._reg("wErrFlag", def_val=0)
If(b.valid & (b.resp != RESP_OKAY),
wErrFlag(1)
)
self.errorWrite(wErrFlag)
if self.ALIGNAS != 8:
wErrAlignFlag = self._reg("wErrAlignFlag", def_val=0)
req = self.driver.req
If(req.vld & ~self.addrIsAligned(req.addr),
wErrAlignFlag(1)
)
self.errorAlignment(wErrAlignFlag)
wErrFlag = wErrFlag | wErrAlignFlag
self.addrHandler(self.driver.req, self.axi.aw, self.writeInfoFifo.dataIn, wErrFlag)
self.axiWHandler(wErrFlag)
self.axiBHandler()
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = Axi_wDatapump()
# m.ALIGNAS = 8
print(to_rtl_str(m))