Source code for hwtLib.abstract.simFrameUtils

from math import ceil
from typing import Self, Union, Sequence, Generator, Generic, TypeVar, \
    Deque

from hwt.hdl.const import HConst
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.bitsConst import HBitsConst
from hwt.hdl.types.hdlType import HdlType
from hwt.hdl.types.utils import HConst_from_words
from hwt.hwIO import HwIO

WordTupleTy = TypeVar('WordTupleTy')  # a type representing a word used by simulation agent for a target hwio

b8_t = HBits(8)


[docs] class SimFrameUtils(Generic[WordTupleTy]): """ This class is a base class for utility classes which converts between pythonic formats of frames and formats used by simulation agents. :note: The main purpose is to simplify work with the frames and to make test agnostic to a specific stream hwio type. """
[docs] def __init__(self, DATA_WIDTH: int): self.DATA_WIDTH = DATA_WIDTH
[docs] @classmethod def from_HwIO(cls, hwio: HwIO) -> Self: """ Create a an instance of self configured for specified hwio instance """ raise NotImplementedError("Override this in your implementation of this abstract class")
[docs] @staticmethod def _getWordDataFn(wordTuple:tuple[HConst]): return wordTuple[0]
[docs] def _toHConst(self, v: Union[HConst, Sequence[int]]): isVoid = False if not isinstance(v, HConst): try: valByteCnt = len(v) except: v = tuple(v) valByteCnt = len(v) if valByteCnt == 0: v = None isVoid = True else: v = b8_t[valByteCnt].from_py(v) return v, isVoid
[docs] def pack_frame(self, structVal: Union[HConst, Sequence[int]])\ ->Generator[WordTupleTy, None, None]: """ pack data of structure into words of target hwio Words are tuples specific to each hwio. :param structVal: value to be send, HConst instance or list of int for each byte :returns: generator of word tuples """ raise NotImplementedError("Override this in your implementation of this abstract class")
[docs] def unpack_frame(self, structT: HdlType, frameData: Deque[WordTupleTy]) -> HConst: """ opposite of :meth:`~.pack_frame` """ res = HConst_from_words(structT, frameData, self._getWordDataFn, self.DATA_WIDTH) for _ in range(ceil(structT.bit_length() / self.DATA_WIDTH)): frameData.popleft() return res
[docs] def concatWordBits(self, frameBeats: Sequence[WordTupleTy]) -> Generator[HBitsConst, None, None]: """ Convert word tuple (produced by :meth:`~.send_bytes` and :meth:`~.pack_frame`) to flat :class:`HBitsConst` (members of input tuple are typically ints so they need to be cast to correct type first) """ raise NotImplementedError("Override this in your implementation of this abstract class")
[docs] def updackWordBits(self): """ opposite of concatWordBits """ raise NotImplementedError("Override this in your implementation of this abstract class")
[docs] def receive_bytes(self, ag_data: Deque[WordTupleTy]) -> tuple[int, list[int]]: """ :param ag_data: list of stream hwio words :return: tuple dataStartOffset, data_Bytes """ raise NotImplementedError("Override this in your implementation of this abstract class")
[docs] def send_bytes(self, data_B: Union[bytes, list[int]], ag_data: Deque[WordTupleTy], offset:int=0)\ ->list[WordTupleTy]: """ Build frame out of data_B bytes and insert it into ag_data deque which is expected to be a queue of driver sim agent """ if data_B: if isinstance(data_B, bytes): data_B = [int(x) for x in data_B] t = b8_t[len(data_B) + offset] _data_B = t.from_py([None for _ in range(offset)] + data_B) else: _data_B = data_B # :attention: strb signal is reinterpreted as a keep signal f = self.pack_frame(_data_B) ag_data.extend(f) return f