Source code for hwtLib.avalon.stSimFrameUtils

from typing import Union, Sequence, Generator, Deque, Self

from hdlConvertorAst.to.hdlUtils import iter_with_last
from hwt.code import Concat
from hwt.hdl.const import HConst
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.bitsConst import HBitsConst
from hwt.hdl.types.defs import BIT
from hwt.pyUtils.typingFuture import override
from hwt.synthesizer.vectorUtils import iterBits
from hwtLib.abstract.simFrameUtils import SimFrameUtils
from hwtLib.avalon.st import AvalonST, AvalonSTAgentWordType
from pyMathBitPrecise.bit_utils import get_bit_range


[docs] class AvalonStSimFrameUtils(SimFrameUtils[AvalonSTAgentWordType]): """ Implementation of SimFrameUtils for AvalonST interface. """
[docs] def __init__(self, SEGMENT_DATA_WIDTH:int, USE_EMPTY: bool, BYTE_WIDTH:int=8, SEGMENT_CNT:int=1, SUPPORT_ZLP:bool=False, USE_SOF:bool=True, ERROR_WIDTH:int=0): self.SEGMENT_CNT = SEGMENT_CNT self.BYTE_WIDTH = BYTE_WIDTH self.SEGMENT_DATA_WIDTH = SEGMENT_DATA_WIDTH self.SEGMENT_BYTE_CNT = SEGMENT_DATA_WIDTH // BYTE_WIDTH assert SEGMENT_DATA_WIDTH % BYTE_WIDTH == 0 self.SUPPORT_ZLP = SUPPORT_ZLP self.USE_SOF = USE_SOF self.ERROR_WIDTH = ERROR_WIDTH self.USE_EMPTY = USE_EMPTY
[docs] @override @classmethod def from_HwIO(cls, hwio: AvalonST) -> Self: return cls(hwio.DATA_WIDTH, hwio.USE_EMPTY, SEGMENT_CNT=hwio.packetsPerClock, BYTE_WIDTH=hwio.dataBitsPerSymbol, SUPPORT_ZLP=hwio.SUPPORT_ZLP, USE_SOF=True, ERROR_WIDTH=hwio.ERROR_WIDTH)
[docs] @override def pack_frame(self, frameVal: Union[HConst, Sequence[int]])\ ->Generator[AvalonSTAgentWordType, None, None]: frameVal, frameIsZeroLen = self._toHConst(frameVal) byte_cnt = self.SEGMENT_BYTE_CNT USE_SOF = self.USE_SOF USE_EMPTY = self.USE_EMPTY ERROR_WIDTH = self.ERROR_WIDTH if frameIsZeroLen: word = [None # data ] if USE_EMPTY: word.append(byte_cnt) if ERROR_WIDTH: word.append(0) if USE_SOF: word.append(1) word.append(1) # eof yield tuple(word) return dataWidth = self.SEGMENT_DATA_WIDTH words = iterBits(frameVal, bitsInOne=dataWidth, skipPadding=False, fillup=True) first = True for last, data in iter_with_last(words): assert data._dtype.bit_length() == dataWidth, data._dtype.bit_length() word = [data, ] if USE_EMPTY: if last: leftOverSize = (frameVal._dtype.bit_length() // self.BYTE_WIDTH) % byte_cnt if leftOverSize: empty = byte_cnt - leftOverSize else: empty = 0 else: empty = 0 word.append(empty) if ERROR_WIDTH: word.append(0) if USE_SOF: word.append(first) word.append(last) # eof yield tuple(word) if first: first = False
[docs] @override def concatWordBits(self, frameBeats: Sequence[AvalonSTAgentWordType]): USE_SOF = self.USE_SOF USE_EMPTY = self.USE_EMPTY ERROR_WIDTH = self.ERROR_WIDTH if USE_EMPTY: emptyT = HBits(AvalonST._getWidthOfEmpty(self.DATA_WIDTH, self.BYTE_WIDTH, self.SUPPORT_ZLP)).from_py b = BIT.from_py if ERROR_WIDTH: errorT = HBits(ERROR_WIDTH).from_py if USE_EMPTY: if USE_SOF: for data, empty, error, sof, eof in frameBeats: yield Concat(b(eof), b(sof), errorT(error), emptyT(empty), data) else: for data, empty, error, eof in frameBeats: yield Concat(b(eof), errorT(error), emptyT(empty), data) else: if USE_SOF: for data, error, sof, eof in frameBeats: yield Concat(b(eof), b(sof), errorT(error), data) else: for data, error, eof in frameBeats: yield Concat(b(eof), errorT(error), data) else: if USE_EMPTY: if USE_SOF: for data, empty, error, sof, eof in frameBeats: yield Concat(b(eof), b(sof), emptyT(empty), data) else: for data, empty, eof in frameBeats: yield Concat(b(eof), emptyT(empty), data) else: if USE_SOF: for data, sof, eof in frameBeats: yield Concat(b(eof), b(sof), data) else: for data, eof in frameBeats: yield Concat(b(eof), data)
[docs] @override def receive_bytes(self, ag_data:Deque[AvalonSTAgentWordType]) -> tuple[int, list[Union[int, HBitsConst]], bool]: data_B = [] eof = False err = 0 first = True SEGMENT_BYTE_CNT = self.SEGMENT_BYTE_CNT USE_SOF = self.USE_SOF ERROR_WIDTH = self.ERROR_WIDTH USE_EMPTY = self.USE_EMPTY while ag_data: _d = ag_data.popleft() if ERROR_WIDTH: if USE_EMPTY: if USE_SOF: data, empty, err, sof, eof = _d else: data, empty, err, eof = _d sof = not data_B else: empty = 0 if USE_SOF: data, err, sof, eof = _d else: data, err, eof = _d sof = not data_B else: err = 0 if USE_EMPTY: if USE_SOF: data, empty, sof, eof = _d else: data, empty, eof = _d sof = not data_B else: empty = 0 if USE_SOF: data, sof, eof = _d else: data, eof = _d sof = not data_B if USE_SOF: sof = bool(sof) assert sof == first, ("Soft must be 1 in the first segment of the frame", sof, first) if ERROR_WIDTH: err |= int(err) eof = bool(eof) if USE_EMPTY: empty = int(empty) if not eof and empty != 0: assert eof, "non-full word in the middle of the packet" for i in range(SEGMENT_BYTE_CNT - empty): d = get_bit_range(data.val, i * 8, 8) if get_bit_range(data.vld_mask, i * 8, 8) != 0xff: raise AssertionError( "Data not valid but it should be" f' based on value of "empty" B_i:{i:d}, empty:{empty:d}, vld_mask:0x{data.vld_mask:x}') data_B.append(d) if first: first = False if eof: break if not eof: if data_B: raise ValueError("Unfinished frame", data_B) else: raise ValueError("No frame available") return 0, data_B, err