#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If, Switch, SwitchLogic
from hwt.hdl.const import HConst
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.stream import HStream
from hwt.hdl.types.struct import HStruct
from hwt.hwIOs.std import HwIOSignal, HwIORdVldSync, HwIOVectSignal
from hwt.hwIOs.utils import propagateClkRstn
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwtLib.amba.axis_comp.frame_join import Axi4S_FrameJoin
from hwtLib.amba.constants import RESP_OKAY
from hwtLib.amba.datapump.base import AxiDatapumpBase
from hwtLib.amba.datapump.intf import HwIOAxiRDatapump
from hwtLib.handshaked.fifo import HandshakedFifo
from hwtLib.handshaked.streamNode import StreamNode
from pyMathBitPrecise.bit_utils import mask
[docs]
class TransEndInfo(HwIORdVldSync):
"""
.. hwt-autodoc::
"""
@override
def hwConfig(self):
self.ID_WIDTH = HwParam(0)
self.DATA_WIDTH = HwParam(64)
self.HAS_PROPAGATE_LAST = HwParam(True)
self.SHIFT_OPTIONS = HwParam((0,))
@override
def hwDeclr(self):
if self.ID_WIDTH:
self.id = HwIOVectSignal(self.ID_WIDTH)
# rem is number of bits in last word which is valid - 1,
# if rem == 0 it means all bytes are valid
self.rem = HwIOVectSignal(log2ceil(self.DATA_WIDTH // 8))
if self.SHIFT_OPTIONS != (0,):
self.shift = HwIOVectSignal(log2ceil(len(self.SHIFT_OPTIONS)))
if self.HAS_PROPAGATE_LAST:
self.propagateLast = HwIOSignal()
HwIORdVldSync.hwDeclr(self)
[docs]
@serializeParamsUniq
class Axi_rDatapump(AxiDatapumpBase):
"""
Forward request to axi address read channel
and collect data to data channel form axi read data channel
* Blocks data channel when there is no request pending.
* If req len is wider transaction is internally split to multiple axi
transactions, but returned read data is a single packet as originally requested.
* errorRead stays high when there was error on axi read channel
it will not affect unit functionality
* id of driver is a different id than is used on AXI
this is because the id on driver side is used to distinguish between
transactions and on AXI side it has to be same to assert that the transactions
will be finished in-order.
:see: :class:`hwtLib.amba.datapump.base.AxiDatapumpBase`
.. hwt-autodoc::
"""
@override
def hwDeclr(self):
super().hwDeclr() # add clk, rst, axi addr channel and req channel
self.errorRead = HwIOSignal()._m()
if self.ALIGNAS != 8:
self.errorAlignment = HwIOSignal()._m()
with self._hwParamsShared():
self.axi.HAS_W = False
d = self.driver = HwIOAxiRDatapump()
d.ID_WIDTH = 0
d.MAX_BYTES = self.MAX_CHUNKS * (self.CHUNK_WIDTH // 8)
f = self.sizeRmFifo = HandshakedFifo(TransEndInfo)
f.ID_WIDTH = 0
f.DEPTH = self.MAX_TRANS_OVERLAP
f.SHIFT_OPTIONS = self.getShiftOptions()
[docs]
def storeTransInfo(self, transInfo: TransEndInfo, isLast: bool):
if isLast:
rem = self.driver.req.rem
else:
rem = 0
offset = self.driver.req.addr[self.getSizeAlignBits():]
return [
transInfo.rem(rem),
transInfo.propagateLast(int(isLast)),
*([]
if self.isAlwaysAligned() else
[self.encodeShiftValue(transInfo.SHIFT_OPTIONS, offset, transInfo.shift), ]),
]
[docs]
def remSizeToStrb(self, remSize: RtlSignal, strb: RtlSignal, isFirstWord, isLastWord):
sizeRm = self.sizeRmFifo.dataOut
STRB_W = strb._dtype.bit_length()
if self.isAlwaysAligned():
STRB_ALL = mask(STRB_W)
strbSwitch = Switch(remSize)\
.Case(0,
strb(STRB_ALL)
).add_cases(
[(i + 1, strb(mask(i + 1)))
for i in range(STRB_W - 1)]
).Default(
strb(None)
)
if isinstance(isLastWord, (bool, int, HConst)):
if isLastWord:
return strbSwitch
else:
return strb(STRB_ALL)
else:
return If(isLastWord,
strbSwitch
).Else(
strb(STRB_ALL)
)
else:
CHUNK = self.CHUNK_WIDTH // 8
MAX_BYTES = CHUNK * self.MAX_CHUNKS
STRB_ALL = mask(min(STRB_W, MAX_BYTES))
ALIGNAS = self.ALIGNAS
possibleBytesInLastWord = set()
assert self.DATA_WIDTH % ALIGNAS == 0, ("Required to resolve number of bytes in last word", self.DATA_WIDTH, ALIGNAS)
for CHUNK_CNT in range(1, min(self.MAX_CHUNKS, max(3, self.DATA_WIDTH // CHUNK * 3)) + 1):
for o in range(0, STRB_W, ALIGNAS // 8):
bytesInLastWord = (o + CHUNK * CHUNK_CNT) % (self.DATA_WIDTH // 8)
if bytesInLastWord in possibleBytesInLastWord:
break
possibleBytesInLastWord.add(bytesInLastWord)
possibleBytesInLastWord = sorted(possibleBytesInLastWord)
offsetsAlignmentCombinations = set([
# bytesInLastWord, offset value of value in last word, index of shift option
(min(bytesInLastWord, MAX_BYTES), sh // 8, sh_i)
for bytesInLastWord in possibleBytesInLastWord
for sh_i, sh in enumerate(sizeRm.SHIFT_OPTIONS)
if bytesInLastWord <= MAX_BYTES
])
offsetsAlignmentCombinations = sorted(offsetsAlignmentCombinations)
t = strb._dtype.from_py
# :attention: last word can be first word as well
MASK_ALL = mask(STRB_W)
WORD_W = strb._dtype.bit_length()
return \
SwitchLogic([
(remSize._eq(0 if bytesInLastWord == STRB_W else bytesInLastWord) & sizeRm.shift._eq(shift_i),
strb(
# dissable prefix bytes if this is first word
isFirstWord._ternary(t((MASK_ALL << shift) & MASK_ALL), t(MASK_ALL)) &
# dissable suffix bytes if this last word
isLastWord._ternary(t(MASK_ALL >> ((WORD_W - bytesInLastWord - shift) % WORD_W)), t(MASK_ALL))
)
)
for bytesInLastWord, shift, shift_i in offsetsAlignmentCombinations
],
default=strb(None)
)
[docs]
def dataHandler(self, rErrFlag: RtlSignal, rmSizeOut: TransEndInfo):
rIn = self.axi.r
rOut = self.driver.r
if self.axi.LEN_WIDTH:
last = rIn.last
else:
last = BIT.from_py(1)
rInLast = last
if self.useTransSplitting():
last = rmSizeOut.propagateLast & last
if self.isAlwaysAligned():
# without shift logic
* ([self.remSizeToStrb(rmSizeOut.rem, rOut.strb, False, rIn.valid & last), ] if self.USE_STRB else []),
rOut.data(rIn.data)
rOut.last(last)
StreamNode(
masters=[rIn, rmSizeOut],
slaves=[rOut],
extraConds={
rmSizeOut: rInLast,
rOut:~rErrFlag
}
).sync()
else:
# align shifted incoming read data and optionally merge frames
aligner = Axi4S_FrameJoin()
aligner.T = HStruct(
(HStream(HBits(self.CHUNK_WIDTH),
start_offsets=[i // 8 for i in self.getShiftOptions()],
frame_len=(1, self.MAX_CHUNKS)
), "f0"),
)
aligner.USE_STRB = False
aligner.DATA_WIDTH = self.DATA_WIDTH
self.aligner = aligner
isSingleWordOnly = self.CHUNK_WIDTH * self.MAX_CHUNKS <= self.DATA_WIDTH and self.ALIGNAS % (self.CHUNK_WIDTH * self.MAX_CHUNKS) == 0
if isSingleWordOnly:
first = BIT.from_py(1)
else:
# first beat of output frame (not necessary input frame, as multiple input
# frames could be merged in to a single output frame)
first = self._reg(f"first", def_val=1)
If(StreamNode([rIn, rmSizeOut], [aligner.dataIn[0], ]).ack(),
first(last),
)
aligner.dataIn[0].data(rIn.data)
aligner.dataIn[0].last(last)
self.remSizeToStrb(rmSizeOut.rem, aligner.dataIn[0].keep, first, last)
StreamNode(
[rIn, rmSizeOut],
[aligner.dataIn[0], ],
extraConds={
rmSizeOut:~rErrFlag & rInLast,
}
).sync()
if self.USE_STRB:
rOut.strb(aligner.dataOut.keep)
rOut.data(aligner.dataOut.data)
rOut.last(aligner.dataOut.last)
StreamNode(
masters=[aligner.dataOut, ],
slaves=[rOut],
extraConds={
rOut:~rErrFlag
}
).sync()
@override
def hwImpl(self):
r = self.axi.r
errorRead = self._reg("errorRead", def_val=0)
If(r.valid & (r.resp != RESP_OKAY),
errorRead(1)
)
self.errorRead(errorRead)
err = errorRead
if self.ALIGNAS != 8:
req = self.driver.req
errorAlignment = self._reg("errorAlignment", def_val=0)
If(req.vld & (req.addr[log2ceil(self.ALIGNAS // 8):] != 0),
errorAlignment(1)
)
self.errorAlignment(errorAlignment)
err = err | errorAlignment
self.addrHandler(self.driver.req, self.axi.ar, self.sizeRmFifo.dataIn, err)
self.dataHandler(err, self.sizeRmFifo.dataOut)
propagateClkRstn(self)
if __name__ == "__main__":
from hwt.synth import to_rtl_str
# import sys
# sys.setrecursionlimit(5000)
m = Axi_rDatapump()
m.DATA_WIDTH = 512
m.MAX_CHUNKS = 1
m.CHUNK_WIDTH = 32
m.ALIGNAS = 32
print(to_rtl_str(m))