Source code for hwtLib.amba.axis_comp.storedBurst
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import math
from typing import Tuple, Union
from hwt.code import If, Switch
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.bitsVal import BitsVal
from hwt.interfaces.utils import addClkRstn
from hwt.pyUtils.arrayQuery import grouper
from hwt.synthesizer.param import Param
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axis import AxiStream
from pyMathBitPrecise.bit_utils import mask, byte_list_to_be_int
[docs]class AxiSStoredBurst(Unit):
"""
This unit send data stored in property :obj:`~.DATA` over axi-stream interface
:ivar ~.DATA: bytes or integer values for each word
:ivar ~.REPEAT: if False this component works in one-shot mode
.. hwt-autodoc::
"""
def _config(self):
AxiStream._config(self)
self.REPEAT: bool = Param(False)
self.DATA: Union[bytes, Tuple[Union[int, BitsVal, None], ...]] = Param("Hello world!!!!!".encode())
[docs] def dataRd(self):
return self.dataOut.ready
def _declr(self):
addClkRstn(self)
with self._paramsShared():
self.dataOut: AxiStream = AxiStream()._m()
[docs] def nextWordIndexLogic(self, wordIndex: RtlSignal, DATA_LEN:int):
if self.REPEAT:
return If(wordIndex < DATA_LEN,
wordIndex(wordIndex + 1)
).Else(
wordIndex(0)
)
else:
return If(wordIndex < DATA_LEN,
wordIndex(wordIndex + 1)
)
def _impl(self):
dout = self.dataOut
all_mask = mask(self.DATA_WIDTH // 8)
DATA = self.DATA
if isinstance(DATA, bytes):
# convert bytes to integer for words
DATA_SIZE = len(DATA)
if not self.USE_STRB and not self.USE_KEEP:
assert DATA_SIZE % (self.DATA_WIDTH // 8) == 0, (
"Not using any kind of mask and the data does not exactly fit into words", DATA, self.DATA_WIDTH)
last_mask = mask((DATA_SIZE % (self.DATA_WIDTH // 8) // 8))
if last_mask == 0:
last_mask = all_mask
words = []
for word_bytes in grouper(self.DATA_WIDTH // 8, DATA, 0):
word = byte_list_to_be_int(word_bytes)
words.append(word)
words.append(word)
DATA = words
else:
last_mask = all_mask
DATA_LEN = len(DATA)
wordIndex_w = int(math.log2(DATA_LEN) + 1)
wordIndex = self._reg("wordIndex", Bits(wordIndex_w), def_val=0)
Switch(wordIndex)\
.add_cases([(i, dout.data(d))
for i, d in enumerate(DATA)])\
.Default(dout.data(None))
dout.last(wordIndex._eq(DATA_LEN - 1))
if self.USE_STRB or self.USE_KEEP:
if last_mask == all_mask:
out_mask = all_mask
else:
out_mask = dout.last._ternary(last_mask, all_mask)
If(wordIndex < DATA_LEN,
dout.strb(out_mask) if self.USE_STRB else [],
dout.keep(out_mask) if self.USE_KEEP else [],
dout.valid(1)
).Else(
dout.strb(None) if self.USE_STRB else [],
dout.keep(None) if self.USE_KEEP else [],
dout.valid(0)
)
If(self.dataRd(),
self.nextWordIndexLogic(wordIndex, DATA_LEN)
)
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
print(to_rtl_str(AxiSStoredBurst()))