#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If, Switch, SwitchLogic
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.stream import HStream
from hwt.hdl.types.struct import HStruct
from hwt.hdl.value import HValue
from hwt.interfaces.std import Signal, HandshakeSync, VectSignal
from hwt.interfaces.utils import propagateClkRstn
from hwt.math import log2ceil
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.param import Param
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwtLib.amba.axis_comp.frame_join import AxiS_FrameJoin
from hwtLib.amba.constants import RESP_OKAY
from hwtLib.amba.datapump.base import AxiDatapumpBase
from hwtLib.amba.datapump.intf import AxiRDatapumpIntf
from hwtLib.handshaked.fifo import HandshakedFifo
from hwtLib.handshaked.streamNode import StreamNode
from pyMathBitPrecise.bit_utils import mask
from hwt.hdl.types.defs import BIT
[docs]class TransEndInfo(HandshakeSync):
"""
.. hwt-autodoc::
"""
def _config(self):
self.ID_WIDTH = Param(0)
self.DATA_WIDTH = Param(64)
self.HAS_PROPAGATE_LAST = Param(True)
self.SHIFT_OPTIONS = Param((0,))
def _declr(self):
if self.ID_WIDTH:
self.id = VectSignal(self.ID_WIDTH)
# rem is number of bits in last word which is valid - 1,
# if rem == 0 it means all bytes are valid
self.rem = VectSignal(log2ceil(self.DATA_WIDTH // 8))
if self.SHIFT_OPTIONS != (0,):
self.shift = VectSignal(log2ceil(len(self.SHIFT_OPTIONS)))
if self.HAS_PROPAGATE_LAST:
self.propagateLast = Signal()
HandshakeSync._declr(self)
[docs]@serializeParamsUniq
class Axi_rDatapump(AxiDatapumpBase):
"""
Forward request to axi address read channel
and collect data to data channel form axi read data channel
* Blocks data channel when there is no request pending.
* If req len is wider transaction is internally split to multiple axi
transactions, but returned read data is a single packet as originally requested.
* errorRead stays high when there was error on axi read channel
it will not affect unit functionality
* id of driver is a different id than is used on AXI
this is because the id on driver side is used to distinguish between
transactions and on AXI side it has to be same to assert that the transactions
will be finished in-order.
:see: :class:`hwtLib.amba.datapump.base.AxiDatapumpBase`
.. hwt-autodoc::
"""
def _declr(self):
super()._declr() # add clk, rst, axi addr channel and req channel
self.errorRead = Signal()._m()
if self.ALIGNAS != 8:
self.errorAlignment = Signal()._m()
with self._paramsShared():
self.axi.HAS_W = False
d = self.driver = AxiRDatapumpIntf()
d.ID_WIDTH = 0
d.MAX_BYTES = self.MAX_CHUNKS * (self.CHUNK_WIDTH // 8)
f = self.sizeRmFifo = HandshakedFifo(TransEndInfo)
f.ID_WIDTH = 0
f.DEPTH = self.MAX_TRANS_OVERLAP
f.SHIFT_OPTIONS = self.getShiftOptions()
[docs] def storeTransInfo(self, transInfo: TransEndInfo, isLast: bool):
if isLast:
rem = self.driver.req.rem
else:
rem = 0
offset = self.driver.req.addr[self.getSizeAlignBits():]
return [
transInfo.rem(rem),
transInfo.propagateLast(int(isLast)),
*([]
if self.isAlwaysAligned() else
[self.encodeShiftValue(transInfo.SHIFT_OPTIONS, offset, transInfo.shift), ]),
]
[docs] def remSizeToStrb(self, remSize: RtlSignal, strb: RtlSignal, isFirstWord, isLastWord):
sizeRm = self.sizeRmFifo.dataOut
STRB_W = strb._dtype.bit_length()
if self.isAlwaysAligned():
STRB_ALL = mask(STRB_W)
strbSwitch = Switch(remSize)\
.Case(0,
strb(STRB_ALL)
).add_cases(
[(i + 1, strb(mask(i + 1)))
for i in range(STRB_W - 1)]
).Default(
strb(None)
)
if isinstance(isLastWord, (bool, int, HValue)):
if isLastWord:
return strbSwitch
else:
return strb(STRB_ALL)
else:
return If(isLastWord,
strbSwitch
).Else(
strb(STRB_ALL)
)
else:
CHUNK = self.CHUNK_WIDTH // 8
MAX_BYTES = CHUNK * self.MAX_CHUNKS
STRB_ALL = mask(min(STRB_W, MAX_BYTES))
ALIGNAS = self.ALIGNAS
possibleBytesInLastWord = set()
assert self.DATA_WIDTH % ALIGNAS == 0, ("Required to resolve number of bytes in last word", self.DATA_WIDTH, ALIGNAS)
for CHUNK_CNT in range(1, min(self.MAX_CHUNKS, max(3, self.DATA_WIDTH // CHUNK * 3)) + 1):
for o in range(0, STRB_W, ALIGNAS // 8):
bytesInLastWord = (o + CHUNK * CHUNK_CNT) % (self.DATA_WIDTH // 8)
if bytesInLastWord in possibleBytesInLastWord:
break
possibleBytesInLastWord.add(bytesInLastWord)
possibleBytesInLastWord = sorted(possibleBytesInLastWord)
offsetsAlignmentCombinations = set([
# bytesInLastWord, offset value of value in last word, index of shift option
(min(bytesInLastWord, MAX_BYTES), sh // 8, sh_i)
for bytesInLastWord in possibleBytesInLastWord
for sh_i, sh in enumerate(sizeRm.SHIFT_OPTIONS)
if bytesInLastWord <= MAX_BYTES
])
offsetsAlignmentCombinations = sorted(offsetsAlignmentCombinations)
t = strb._dtype.from_py
# :attention: last word can be first word as well
MASK_ALL = mask(STRB_W)
WORD_W = strb._dtype.bit_length()
return \
SwitchLogic([
(remSize._eq(0 if bytesInLastWord == STRB_W else bytesInLastWord) & sizeRm.shift._eq(shift_i),
strb(
# dissable prefix bytes if this is first word
isFirstWord._ternary(t((MASK_ALL << shift) & MASK_ALL), t(MASK_ALL)) &
# dissable suffix bytes if this last word
isLastWord._ternary(t(MASK_ALL >> ((WORD_W - bytesInLastWord - shift) % WORD_W)), t(MASK_ALL))
)
)
for bytesInLastWord, shift, shift_i in offsetsAlignmentCombinations
],
default=strb(None)
)
[docs] def dataHandler(self, rErrFlag: RtlSignal, rmSizeOut: TransEndInfo):
rIn = self.axi.r
rOut = self.driver.r
if self.axi.LEN_WIDTH:
last = rIn.last
else:
last = BIT.from_py(1)
rInLast = last
if self.useTransSplitting():
last = rmSizeOut.propagateLast & last
if self.isAlwaysAligned():
# without shift logic
* ([self.remSizeToStrb(rmSizeOut.rem, rOut.strb, False, rIn.valid & last), ] if self.USE_STRB else []),
rOut.data(rIn.data)
rOut.last(last)
StreamNode(
masters=[rIn, rmSizeOut],
slaves=[rOut],
extraConds={
rmSizeOut: rInLast,
rOut:~rErrFlag
}
).sync()
else:
# align shifted incoming read data and optionally merge frames
aligner = AxiS_FrameJoin()
aligner.T = HStruct(
(HStream(Bits(self.CHUNK_WIDTH),
start_offsets=[i // 8 for i in self.getShiftOptions()],
frame_len=(1, self.MAX_CHUNKS)
), "f0"),
)
aligner.USE_STRB = False
aligner.DATA_WIDTH = self.DATA_WIDTH
self.aligner = aligner
isSingleWordOnly = self.CHUNK_WIDTH * self.MAX_CHUNKS <= self.DATA_WIDTH and self.ALIGNAS % (self.CHUNK_WIDTH * self.MAX_CHUNKS) == 0
if isSingleWordOnly:
first = BIT.from_py(1)
else:
# first beat of output frame (not necessary input frame, as multiple input
# frames could be merged in to a single output frame)
first = self._reg(f"first", def_val=1)
If(StreamNode([rIn, rmSizeOut], [aligner.dataIn[0], ]).ack(),
first(last),
)
aligner.dataIn[0].data(rIn.data)
aligner.dataIn[0].last(last)
self.remSizeToStrb(rmSizeOut.rem, aligner.dataIn[0].keep, first, last)
StreamNode(
[rIn, rmSizeOut],
[aligner.dataIn[0], ],
extraConds={
rmSizeOut:~rErrFlag & rInLast,
}
).sync()
if self.USE_STRB:
rOut.strb(aligner.dataOut.keep)
rOut.data(aligner.dataOut.data)
rOut.last(aligner.dataOut.last)
StreamNode(
masters=[aligner.dataOut, ],
slaves=[rOut],
extraConds={
rOut:~rErrFlag
}
).sync()
def _impl(self):
r = self.axi.r
errorRead = self._reg("errorRead", def_val=0)
If(r.valid & (r.resp != RESP_OKAY),
errorRead(1)
)
self.errorRead(errorRead)
err = errorRead
if self.ALIGNAS != 8:
req = self.driver.req
errorAlignment = self._reg("errorAlignment", def_val=0)
If(req.vld & (req.addr[log2ceil(self.ALIGNAS // 8):] != 0),
errorAlignment(1)
)
self.errorAlignment(errorAlignment)
err = err | errorAlignment
self.addrHandler(self.driver.req, self.axi.ar, self.sizeRmFifo.dataIn, err)
self.dataHandler(err, self.sizeRmFifo.dataOut)
propagateClkRstn(self)
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
# import sys
# sys.setrecursionlimit(5000)
u = Axi_rDatapump()
u.DATA_WIDTH = 512
u.MAX_CHUNKS = 1
u.CHUNK_WIDTH = 32
u.ALIGNAS = 32
print(to_rtl_str(u))