Source code for hwtLib.amba.axis

from math import ceil
from typing import List, Tuple, Union, Deque

from hwt.hdl.types.bitsVal import BitsVal
from hwt.hdl.types.hdlType import HdlType
from hwt.hdl.types.utils import HValue_from_words
from hwt.interfaces.agents.handshaked import UniversalHandshakedAgent
from hwt.interfaces.std import Signal, VectSignal
from hwt.pyUtils.arrayQuery import iter_with_last
from hwt.synthesizer.param import Param
from hwt.synthesizer.vectorUtils import iterBits
from hwtLib.amba.axi_intf_common import Axi_user, Axi_id, Axi_hs, Axi_strb
from hwtLib.amba.sim.agentCommon import BaseAxiAgent
from hwtLib.types.ctypes import uint8_t
from hwtSimApi.hdlSimulator import HdlSimulator
from ipCorePackager.intfIpMeta import IntfIpMeta
from pyMathBitPrecise.bit_utils import mask, get_bit, \
    get_bit_range, set_bit


# http://www.xilinx.com/support/documentation/ip_documentation/ug761_axi_reference_guide.pdf
[docs]class AxiStream(Axi_hs, Axi_id, Axi_user, Axi_strb): """ AMBA AXI-stream interface https://static.docs.arm.com/ihi0051/a/IHI0051A_amba4_axi4_stream_v1_0_protocol_spec.pdf :ivar ~.IS_BIGENDIAN: Param which specifies if interface uses bigendian byte order or little-endian byte order :ivar ~.DATA_WIDTH: Param which specifies width of data signal :ivar ~.HAS_STRB: if set strb signal is present :ivar ~.HAS_KEEP: if set keep signal is present :ivar ~.ID_WIDTH: if > 0 id signal is present and this is it's width :ivar ~.DEST_WIDTH: if > 0 dest signal is present and this is it's width :attention: no checks are made for endianity, this is just information :note: bigendian for interface means that items which are send through this interface have reversed byte endianity. That means that most significant byte is is on lower address than les significant ones e.g. little endian value 0x1a2b will be 0x2b1a but iterface itselelf is not reversed in any way :ivar ~.id: optional signal wich specifies id of transaction :ivar ~.dest: optional signal which specifies destination of transaction :ivar ~.data: main data signal :ivar ~.keep: optional signal which signalize which bytes should be keept and which should be discarded :ivar ~.strb: optional signal which signalize which bytes are valid :ivar ~.last: signal which if high this data is last in this frame :ivar ~.user: optional signal which can be used for arbitrary purposes .. hwt-autodoc:: """ def _config(self): self.IS_BIGENDIAN:bool = Param(False) self.USE_STRB:bool = Param(False) self.USE_KEEP:bool = Param(False) Axi_id._config(self) self.DEST_WIDTH:int = Param(0) self.DATA_WIDTH:int = Param(64) Axi_user._config(self) def _declr(self): Axi_id._declr(self) if self.DEST_WIDTH: self.dest = VectSignal(self.DEST_WIDTH) self.data = VectSignal(self.DATA_WIDTH) if self.USE_STRB: Axi_strb._declr(self) if self.USE_KEEP: self.keep = VectSignal(self.DATA_WIDTH // 8) Axi_user._declr(self) self.last = Signal() super(AxiStream, self)._declr()
[docs] def _getIpCoreIntfClass(self): return IP_AXIStream
[docs] def _initSimAgent(self, sim: HdlSimulator): self._ag = AxiStreamAgent(sim, self)
[docs]class AxiStreamAgent(BaseAxiAgent, UniversalHandshakedAgent): """ Simulation agent for :class:`.AxiStream` interface input/output data stored in list under "data" property data contains tuples Format of data tules is derived from signals on AxiStream interface Order of values coresponds to definition of interface signals. If all signals are present fotmat of tuple will be (id, dest, data, strb, keep, user, last) """
[docs] def __init__(self, sim: HdlSimulator, intf: AxiStream, allowNoReset=False): UniversalHandshakedAgent.__init__(self, sim, intf, allowNoReset=allowNoReset)
[docs]def packAxiSFrame(dataWidth, structVal, withStrb=False): """ pack data of structure into words on axis interface Words are tuples (data, last) or (data, mask, last) depending on args. """ if withStrb: byte_cnt = dataWidth // 8 if structVal == []: if withStrb: yield (None, 0, 1) else: yield (None, 1) return words = iterBits(structVal, bitsInOne=dataWidth, skipPadding=False, fillup=True) for last, d in iter_with_last(words): assert d._dtype.bit_length() == dataWidth, d._dtype.bit_length() if withStrb: word_mask = 0 for B_i in range(byte_cnt): m = get_bit_range(d.vld_mask, B_i * 8, 8) if m == 0xff: word_mask = set_bit(word_mask, B_i) else: assert m == 0, ("Each byte has to be entirely valid" " or entirely invalid," " because of mask granularity", m) yield (d, word_mask, last) else: yield (d, last)
[docs]def unpackAxiSFrame(structT: HdlType, frameData: Deque[Union[BitsVal, int]], getDataFn=None, dataWidth=None): """ opposite of packAxiSFrame """ if getDataFn is None: def _getDataFn(x): return x[0] getDataFn = _getDataFn res = HValue_from_words(structT, frameData, getDataFn, dataWidth) if dataWidth is None: dataWidth = frameData[0][0]._dtype.bit_length() for _ in range(ceil(structT.bit_length() / dataWidth)): frameData.popleft() return res
[docs]def _axis_recieve_bytes(ag_data: Deque[Union[ Tuple[BitsVal, BitsVal, BitsVal, BitsVal], Tuple[BitsVal, BitsVal, BitsVal], Tuple[BitsVal, BitsVal]]], D_B: int, use_keep: bool, use_id: bool) -> Tuple[int, List[int]]: """ :param ag_data: list of axi stream words, number of item in tuple depends on use_keep and use_id :param use_keep: specifies if input tuples contain keep mask :param use_id: specifies if input tuples contain axi stream id :param D_B: number of bytes in word """ offset = None data_B = [] last = False first = True current_id = 0 mask_all = mask(D_B) while ag_data: _d = ag_data.popleft() if use_id: if use_keep: id_, data, keep, last = _d keep = int(keep) else: id_, data, last = _d keep = mask_all id_ = int(id_) else: if use_keep: data, keep, last = _d keep = int(keep) else: data, last = _d keep = mask_all id_ = 0 last = int(last) if keep == 0: assert not data_B, "Empty word in the middle of the packet" assert last, "Empty word at the beginning of the packet" offset = 0 if offset is None: # first iteration # expecting potential 0s in keep and the rest 1 for i in range(D_B): # i represents number of 0 from te beginning of of the keep # value if keep & (1 << i): offset = i break assert offset is not None, keep for i in range(D_B): if get_bit(keep, i): d = get_bit_range(data.val, i * 8, 8) if get_bit_range(data.vld_mask, i * 8, 8) != 0xff: raise AssertionError( "Data not valid but it should be" f" based on strb/keep B_i:{i:d}, 0x{keep:x}, 0x{data.vld_mask:x}") data_B.append(d) if first: offset_mask = mask(offset) assert offset_mask & keep == 0, (offset_mask, keep) first = False current_id = id_ elif not last: assert keep == mask_all, (keep, "Does not support non-full words in the non-last word of the frame") if not first: assert current_id == id_, ("id changed in frame beats", current_id, "->", id_) if last: break if not last: if data_B: raise ValueError("Unfinished frame", data_B) else: raise ValueError("No frame available") if use_id: return offset, id_, data_B else: return offset, data_B
[docs]def axis_recieve_bytes(axis: AxiStream) -> Tuple[int, List[int]]: """ Read data from AXI Stream agent in simulation and use keep signal to mask out unused bytes """ ag_data = axis._ag.data D_B = axis.DATA_WIDTH // 8 USE_ID = bool(getattr(axis, "ID_WIDTH", 0)) if getattr(axis, "USER_WIDTH", 0): raise NotImplementedError() USE_KEEP = hasattr(axis, "keep") USE_STRB = hasattr(axis, "strb") if USE_KEEP and USE_STRB: raise NotImplementedError() use_keep = USE_KEEP | USE_STRB return _axis_recieve_bytes(ag_data, D_B, use_keep, USE_ID)
[docs]def _axis_send_bytes(axis: AxiStream, data_B: List[int], withStrb:bool, offset:int)\ ->List[Tuple[int, int, int]]: if data_B: t = uint8_t[len(data_B) + offset] _data_B = t.from_py([None for _ in range(offset)] + data_B) else: _data_B = data_B # :attention: strb signal is reinterpreted as a keep signal return packAxiSFrame(axis.DATA_WIDTH, _data_B, withStrb=withStrb)
[docs]def axis_send_bytes(axis: AxiStream, data_B: Union[List[int], bytes], offset=0) -> None: """ :param axis: AxiStream master which is driver from the simulation :param data_B: bytes to send :param offset: number of empty bytes which should be added before data in frame (and use keep signal to mark such a bytes) """ if axis.ID_WIDTH: raise NotImplementedError() if axis.USER_WIDTH: raise NotImplementedError() if axis.USE_KEEP and axis.USE_STRB: raise NotImplementedError() withStrb = axis.USE_KEEP | axis.USE_STRB if isinstance(data_B, bytes): data_B = [int(x) for x in data_B] f = _axis_send_bytes(axis, data_B, withStrb, offset) axis._ag.data.extend(f)
[docs]def axis_mask_propagate_best_effort(src: AxiStream, dst: AxiStream): res = [] if src.USE_STRB: if not src.USE_KEEP and not dst.USE_STRB and dst.USE_KEEP: res.append(dst.keep(src.strb)) if src.USE_KEEP: if not src.USE_STRB and not dst.USE_KEEP and dst.USE_STRB: res.append(dst.strb(src.keep)) if not src.USE_KEEP and not src.USE_STRB: if dst.USE_KEEP: res.append(dst.keep(mask(dst.keep._dtype.bit_length()))) if dst.USE_STRB: res.append(dst.strb(mask(dst.strb._dtype.bit_length()))) return res
[docs]class IP_AXIStream(IntfIpMeta): """ Class which specifies how to describe AxiStream interfaces in IP-core """
[docs] def __init__(self): super().__init__() self.name = "axis" self.quartus_name = "axi4stream" self.version = "1.0" self.vendor = "xilinx.com" self.library = "interface" self.map = { 'id': "TID", 'dest': "TDEST", 'data': "TDATA", 'strb': "TSTRB", 'keep': "TKEEP", 'user': 'TUSER', 'last': "TLAST", 'valid': "TVALID", 'ready': "TREADY" } self.quartus_map = { k: v.lower() for k, v in self.map.items() }