Source code for hwtLib.avalon.st

from typing import Optional, Union

from hwt.hdl.types.bits import HBits
from hwt.hdl.types.bitsConst import HBitsConst
from hwt.hdl.types.defs import BIT
from hwt.hwIOs.agents.rdVldSync import HwIODataRdVldAgent
from hwt.hwIOs.std import HwIODataRdVld, HwIOVectSignal, HwIOSignal, HwIORdSync, \
    HwIOVldSync
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwtSimApi.hdlSimulator import HdlSimulator


[docs] class AvalonST(HwIODataRdVld): """ Avalon stream interface :note: handshaked stream with channel, error, sof, eof signal Based on Avalon Interface Specifications Updated for Intel Quartus Prime Design Suite: 20.1 https://cdrdv2.intel.com/v1/dl/getContent/667068?fileName=mnl_avalon_spec-683091-667068.pdf :ivar USE_EMPTY: add "empty" signal which represents the number of symbols that are empty :ivar dataBitsPerSymbol: symbol represents minimal unit of transferred data, it is unit for units of "empty" signal :ivar readyLatency: if 0 the interface works as a typical (AXI4 Stream) ready/valid handshake. if >0 the ready signaling is delayed. The ready cycle is when source received delayed ready=1 from the sink, The source may assert valid only in ready cycle (when it is receiving delayed ready=1) :note: if readyLatency > the ready is delayed inside of the source and Avalon-ST itself will use non-delayed value :ivar readyAllowance: defines how many data can sink capture with ready=False, if it is set to None then readyAllowance=readyLatency :note: readyLatency/readyAllowance are parameters of FIFO implemented on sink side the "ready" represents "almost-full" of this FIFO, readyLatency=almost-full-remaining-size-1, readyAllowance=FIFO total size-1. e.g. readyLatency=0, readyAllowance=1 can buffer 1 item with ready=0 .. figure:: ./_static/avalon_st_readyLatency_readyAllowance.png .. caption:: Avalon Interface Specifications Updated for Intel Quartus Prime Design Suite: 20.1, https://audentia-gestion.fr/INTEL/PDF/mnl_avalon_spec.pdf Figure 27 - anotated :attention: if maxChannel of src > dst only frames with low enought number are forwarded if connected in Quartus. if src.maxChannel=8, dst.maxChannel=0 then only frames for channel=0 will be forwarded :ivar firstSymbolInHighOrderBits: if True the first symbol on msb bits, for False it is on data[dataBitsPerSymbol:0] .. hwt-autodoc:: """ @override def hwConfig(self): HwIODataRdVld.hwConfig(self) self.dataBitsPerSymbol:int = HwParam(8) self.maxChannel:int = HwParam(0) self.readyLatency:int = HwParam(0) self.readyAllowance: Optional[int] = HwParam(None) self.ERROR_WIDTH:int = HwParam(0) self.USE_EMPTY:bool = HwParam(None) self.SUPPORT_ZLP: bool = HwParam(False) self.packetsPerClock = HwParam(1) self.firstSymbolInHighOrderBits = HwParam(True)
[docs] @staticmethod def _getWidthOfEmpty(SEGMENT_DATA_WIDTH: int, BYTE_WIDTH: int, SUPPORT_ZLP: bool): return log2ceil((SEGMENT_DATA_WIDTH // BYTE_WIDTH) + (1 if SUPPORT_ZLP else 0))
[docs] def _getWidthOfEmptyForSelf(self): return self._getWidthOfEmpty(self.DATA_WIDTH // self.packetsPerClock, self.dataBitsPerSymbol, self.SUPPORT_ZLP)
@override def hwDeclr(self): # :see: Avalon Interface Specifications 5.9.1. Data Transfers Using readyLatency and readyAllowance if self.readyAllowance is not None: assert self.readyAllowance >= self.readyLatency, (self.readyAllowance, self.readyLatency) else: self.readyAllowance = self.readyLatency if self.USE_EMPTY is None: self.USE_EMPTY = self.DATA_WIDTH > self.dataBitsPerSymbol or self.SUPPORT_ZLP # fundamentals if self.maxChannel: self.channel = HwIOVectSignal(log2ceil(self.maxChannel)) self.data = HwIOVectSignal(self.DATA_WIDTH) if self.USE_EMPTY: self.empty = HwIOVectSignal(self._getWidthOfEmptyForSelf()) if self.ERROR_WIDTH: self.error = HwIOVectSignal(self.ERROR_WIDTH) # packet transfer signals pktSignalT = BIT if self.packetsPerClock else HBits(self.packetsPerClock) self.startOfPacket = HwIOSignal(pktSignalT) self.endOfPacket = HwIOSignal(pktSignalT) HwIOVldSync.hwDeclr(self) HwIORdSync.hwDeclr(self)
[docs] @override def _initSimAgent(self, sim: HdlSimulator): self._ag = AvalonSTAgent(sim, self)
AvalonSTAgentWordType = Union[ # (channel?, data, error?, startOfPacket, endOfPacket) tuple[HBitsConst, HBitsConst, HBitsConst, HBitsConst, HBitsConst], tuple[HBitsConst, HBitsConst, HBitsConst, HBitsConst], tuple[HBitsConst, HBitsConst, HBitsConst], ]
[docs] class AvalonSTAgent(HwIODataRdVldAgent): """ Simulation Agent for AvalonST interface Data is stored in .data property and data format is tuple """
[docs] def __init__(self, sim: HdlSimulator, hwIO: AvalonST, allowNoReset=False): if hwIO.readyAllowance: raise NotImplementedError(hwIO) if hwIO.readyLatency != 0: raise NotImplementedError(hwIO) HwIODataRdVldAgent.__init__(self, sim, hwIO, allowNoReset)
[docs] @override def get_data(self) -> AvalonSTAgentWordType: hwIO = self.hwIO d = [] if hwIO.maxChannel: d.append(hwIO.channel.read()) d.append(hwIO.data.read()) if hwIO.USE_EMPTY: d.append(hwIO.empty.read()) if hwIO.ERROR_WIDTH: d.append(hwIO.error.read()) d.append(hwIO.startOfPacket.read()) d.append(hwIO.endOfPacket.read()) return tuple(d)
[docs] @override def set_data(self, data: Optional[AvalonSTAgentWordType]): hwIO = self.hwIO if data is None: if hwIO.maxChannel: hwIO.channel.write(None) hwIO.data.write(None) if hwIO.USE_EMPTY: hwIO.empty.write(None) if hwIO.ERROR_WIDTH: hwIO.error.write(None) hwIO.endOfPacket.write(None) hwIO.startOfPacket.write(None) else: i = 0 if hwIO.maxChannel: hwIO.channel.write(data[0]) i = 1 hwIO.data.write(data[i]) i += 1 if hwIO.USE_EMPTY: hwIO.empty.write(data[i]) i += 1 if hwIO.ERROR_WIDTH: hwIO.error.write(data[i]) i += 1 hwIO.startOfPacket.write(data[i]) hwIO.endOfPacket.write(data[i + 1]) assert len(data) == i + 2, (len(data), i + 2)