from collections import deque
from dataclasses import dataclass # , fields
from math import ceil
from typing import Union, Deque, \
Self
from hwt.code import segment_get, Concat
from hwt.hdl.const import HConst
from hwt.hdl.types.bitConstFunctions import AnyHBitsValue
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.bitsConst import HBitsConst
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.struct import HStruct
from hwt.hwIOs.agents.rdVldSync import UniversalRdVldSyncAgent
from hwt.hwIOs.hwIOArray import HwIOArray
from hwt.hwIOs.hwIOStruct import HdlType_to_HwIO
from hwt.hwIOs.std import HwIOVectSignal
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwtLib.amba.axi_common import Axi_user, Axi_hs
from hwtLib.amba.sim.agentCommon import BaseAxiAgent
from hwtSimApi.hdlSimulator import HdlSimulator
[docs]
@dataclass
class Axi4StreamSegmentedMockSegmentUserTy():
"""
Mock type to keep python typed for dynamically generated HdlType for Axi4StreamSegmented segment user
:attention: in real HStructConst the property is never None, but instead it is not presetnt
"""
enable: AnyHBitsValue | None = None
sof: AnyHBitsValue | None = None
eof: AnyHBitsValue | None = None
err: AnyHBitsValue | None = None
empty: AnyHBitsValue | None = None
[docs]
@dataclass
class Axi4StreamSegmentedMockSegmentTy():
"""
Mock type to keep python typed for dynamically generated HdlType for Axi4StreamSegmented segment
"""
data: AnyHBitsValue
user: Axi4StreamSegmentedMockSegmentUserTy
[docs]
@dataclass
class Axi4StreamSegmentedMockWordNoPackTy():
"""
Mock type to keep python typed for dynamically generated HdlType for Axi4StreamSegmented segment
"""
data: list[AnyHBitsValue]
user: list[Axi4StreamSegmentedMockSegmentUserTy]
# for field in fields(YourDataclass):
# print(field.name, getattr(YourDataclass, field.name)
[docs]
class Axi4StreamSegmented(Axi_hs, Axi_user):
"""
Xilinx/AMD Segmented Axi4Stream interface used for 100G+ Ethernet
https://docs.amd.com/r/1.3-English/pg314-versal-mrmac/Segmented-Mode?tocId=bPOyhICzaCLnbaKgrT0K4g
:ivar ~.SEGMENT_CNT: number of segments of the bus
:ivar ~.SEGMENT_DATA_WIDTH: width of 1 segment of data signal
:ivar ~.ERROR_WIDTH: width of 1 segment of "err" signal (can be 0)
:ivar ~.SUPPORT_ZLP: if True "empty" signal 1b is wider and Zero Length Packets are supported
:ivar ~.USE_SOF: if True sof is present in user struct,
sof + enable potentially allows for sparse streams (frame continuing in not directly consequent segment)
:irar ~.PACK_SEGMENT_BITS: if True the bits for sof, eof, enable, empty and err are packed together to a single
bit vector, this is beneficial for smaller code size, but it makes harder to read the simulation wave
:note: user signal is used to encode bus state, format is described in :meth:`Axi4StreamSegmented.hwDeclr`
Typical configuration for Ethernet:
Ethernet DATA_WIDTH SEGMENT_CNT Frequency[MHz]
======== ========== =========== ==============
40G 128 1 312.5 [0]
50G 256 2 195.3125 [0]
100G 384 3 260.5 [0]
100G 512 1 390.625 [2]
400G 1024 8 390.625 [3]
600G 1536 12 390.625 [3]
[0] https://www.xilinx.com/content/dam/xilinx/publications/presentations/xilinx_network_security_offerings.pdf
[1] https://www.intel.com/content/www/us/en/docs/programmable/773413.html
[2] https://cdrdv2.intel.com/v1/dl/getContent/827074?fileName=ug20085-683100-827074.pdf
[3] https://docs.amd.com/r/en-US/pg369-dcmac
.. hwt-autodoc::
"""
@override
def hwConfig(self):
self.SEGMENT_CNT:int = HwParam(4)
self.SEGMENT_DATA_WIDTH:int = HwParam(64)
self.BYTE_WIDTH:int = HwParam(8)
self.SUPPORT_ZLP:bool = HwParam(False)
self.USE_SOF:bool = HwParam(False)
self.ERROR_WIDTH:int = HwParam(0)
self.PACK_SEGMENT_BITS:bool = HwParam(False)
[docs]
@staticmethod
def getWastedBandwidthPercent(SEGMENT_CNT:int, DATA_WIDTH: int, PACKET_WIDTH: int):
assert DATA_WIDTH % SEGMENT_CNT == 0, (DATA_WIDTH, SEGMENT_CNT)
SEGMENT_WIDTH = DATA_WIDTH // SEGMENT_CNT
leftover = PACKET_WIDTH % SEGMENT_WIDTH
wasted = SEGMENT_WIDTH - leftover
packetSegmentCnt = ceil(PACKET_WIDTH / SEGMENT_WIDTH)
return wasted / (packetSegmentCnt * SEGMENT_WIDTH)
[docs]
@classmethod
def getEffectiveThroughput(cls, CLK_FREQ: Union[float, int], SEGMENT_CNT: int, DATA_WIDTH: int, PACKET_WIDTH: int):
efficiency = cls.getWastedBandwidthPercent(SEGMENT_CNT, DATA_WIDTH, PACKET_WIDTH)
return CLK_FREQ * DATA_WIDTH * (1. - efficiency)
[docs]
@classmethod
def getMinNumberOfSegments(cls, BITRATE: Union[float, int], DATA_WIDTH: int, CLK_FREQ: float, MIN_PACKET_WIDTH: int):
"""
:param MIN_PACKET_WIDTH: the width of most waistful packet, e.g. (64 + 1) * 8b for ethernet
"""
assert BITRATE <= CLK_FREQ * DATA_WIDTH
segmentCnt = 1
while True:
ebpsMinPkt = cls.getEffectiveThroughput(CLK_FREQ, segmentCnt, DATA_WIDTH, MIN_PACKET_WIDTH)
segmentWidth = DATA_WIDTH // segmentCnt
if MIN_PACKET_WIDTH < segmentWidth:
ebpsMinPkt = min(ebpsMinPkt, cls.getEffectiveThroughput(CLK_FREQ, segmentCnt, DATA_WIDTH, segmentWidth + 8))
if ebpsMinPkt >= BITRATE:
break
segmentCnt += 1
while DATA_WIDTH % segmentCnt != 0 or (DATA_WIDTH // segmentCnt) % 8 != 0:
segmentCnt += 1
if segmentCnt >= DATA_WIDTH // 8:
raise AssertionError("Clock frequency too low")
return segmentCnt
[docs]
@staticmethod
def _hasEmpty(SEGMENT_DATA_WIDTH: int, BYTE_WIDTH: int, SUPPORT_ZLP: bool):
return SEGMENT_DATA_WIDTH > BYTE_WIDTH or SUPPORT_ZLP
[docs]
@staticmethod
def _hasEnable(SEGMENT_CNT: int):
return SEGMENT_CNT > 1
[docs]
@staticmethod
def _getWidthOfEmpty(SEGMENT_DATA_WIDTH: int, BYTE_WIDTH: int, SUPPORT_ZLP: bool):
return log2ceil((SEGMENT_DATA_WIDTH // BYTE_WIDTH) + (1 if SUPPORT_ZLP else 0))
[docs]
def resolveTypes(self):
"""
:see: :class:`~.Axi4StreamSegmentedMockSegmentTy` :class:`~.Axi4StreamSegmentedMockSegmentUserTy`
"""
if hasattr(self, "USER_SEGMENT_T"):
return
SEGMENT_CNT = self.SEGMENT_CNT
BYTE_WIDTH = self.BYTE_WIDTH
SEGMENT_DATA_WIDTH = self.SEGMENT_DATA_WIDTH
assert SEGMENT_DATA_WIDTH > 0, (SEGMENT_DATA_WIDTH, BYTE_WIDTH)
assert SEGMENT_DATA_WIDTH % BYTE_WIDTH == 0, (SEGMENT_DATA_WIDTH, BYTE_WIDTH)
assert BYTE_WIDTH <= SEGMENT_DATA_WIDTH, (SEGMENT_DATA_WIDTH, BYTE_WIDTH)
USE_EMPTY = self._hasEmpty(SEGMENT_DATA_WIDTH, BYTE_WIDTH, self.SUPPORT_ZLP)
USE_ENABLE = self._hasEnable(SEGMENT_CNT)
EMPTY_WIDTH = self._getWidthOfEmpty(SEGMENT_DATA_WIDTH, BYTE_WIDTH, self.SUPPORT_ZLP)
# :note: 1st member of struct is on bit 0
self.USER_SEGMENT_T = HStruct(
# ENABLE 1 indicates that the associated data signal contains valid data,
# and that the other flags on the associated user_* signals are valid
# :note: enable is there to spare MUXes on for other struct members in not populated segments
* (((BIT, "enable"),) if USE_ENABLE else ()),
# SOF 1 indicates that the associated data segment contains the beginning of a new frame
* (((BIT, "sof"),) if self.USE_SOF else ()),
# EOF indicates end of frame
(BIT, "eof"),
# Err indicates error in frame
* (((HBits(self.ERROR_WIDTH), "err"),) if self.ERROR_WIDTH else ()),
# Empty indicates the number of empty (unused) bytes in the final (EOP) segment.
# Valid only in the segment where EOP and ENA are asserted to 1.
# for 8B interface: 0 == 8 bytes valid, 7 == 1 byte valid, ...
* (((HBits(EMPTY_WIDTH), "empty"),) if USE_EMPTY else ()),
)
self.WORD_T = HStruct(
(HBits(SEGMENT_DATA_WIDTH)[SEGMENT_CNT], "data"),
(self.USER_SEGMENT_T[SEGMENT_CNT], "user")
)
@override
def hwDeclr(self):
self.resolveTypes()
SEGMENT_CNT = self.SEGMENT_CNT
if self.PACK_SEGMENT_BITS:
self.data = HwIOVectSignal(SEGMENT_CNT * self.SEGMENT_DATA_WIDTH)
self.user = HwIOVectSignal(SEGMENT_CNT * self.USER_SEGMENT_T.bit_length())
else:
self.data = HwIOArray(HwIOVectSignal(self.SEGMENT_DATA_WIDTH) for _ in range(SEGMENT_CNT))
self.user = HwIOArray(HdlType_to_HwIO().apply(self.USER_SEGMENT_T) for _ in range(SEGMENT_CNT))
Axi_hs.hwDeclr(self)
[docs]
def unpackSegment(self, segmentIndex:int, v=None) -> tuple[AnyHBitsValue, Axi4StreamSegmentedMockSegmentUserTy]:
if v is None:
v = self
data = segment_get(v.data, self.SEGMENT_DATA_WIDTH, segmentIndex)
user = segment_get(v.user, self.USER_SEGMENT_T.bit_length(), segmentIndex)
return data, user._reinterpret_cast(self.USER_SEGMENT_T)
[docs]
@override
def _initSimAgent(self, sim: HdlSimulator):
self._ag = Axi4StreamSegmentedAgent(sim, self)
# tuple of segments
Axi4StreamSegmentedAgentWordType = tuple[tuple[HBitsConst, HConst], ...]
[docs]
class Axi4StreamSegmentedAgent(BaseAxiAgent, UniversalRdVldSyncAgent):
"""
Simulation agent for :class:`.Axi4StreamSegmented` interface
input/output data stored in list under "data" property
data contains tuples
"""
[docs]
def __init__(self, sim: HdlSimulator, hwIO: Axi4StreamSegmented, allowNoReset=False):
UniversalRdVldSyncAgent.__init__(self, sim, hwIO, allowNoReset=allowNoReset)
self.DATA_SEGMENT_T = HBits(hwIO.SEGMENT_DATA_WIDTH)
self.DATA_SEGMENT_INVALID = self.DATA_SEGMENT_T.from_py(None)
self.USE_ENABLE = hwIO._hasEnable(hwIO.SEGMENT_CNT)
assert self._sigCnt == 2, ('expect only "data", "user" signals', self._signals)
if hwIO.PACK_SEGMENT_BITS:
self.USER_SEGMENT_PACKED_T = HBits(hwIO.USER_SEGMENT_T.bit_length())
self.USER_WORD_T = HBits(self.USER_SEGMENT_PACKED_T.bit_length() * hwIO.SEGMENT_CNT)
else:
self.USER_SEGMENT_DISABLED = hwIO.USER_SEGMENT_T.from_py({"enable": 0} if self.USE_ENABLE else {})
self.USE_EMPTY = hwIO._hasEmpty(hwIO.SEGMENT_DATA_WIDTH, hwIO.BYTE_WIDTH, hwIO.SUPPORT_ZLP)
[docs]
def get_data(self):
hwIO: Axi4StreamSegmented = self.hwIO
USER_SEGMENT_T = hwIO.USER_SEGMENT_T
if hwIO.PACK_SEGMENT_BITS:
data = hwIO.data.read()
_user = hwIO.user.read()
DW = hwIO.SEGMENT_DATA_WIDTH
data = [data[(s + 1) * DW: s * DW] for s in range(hwIO.SEGMENT_CNT)]
UW = self.USER_SEGMENT_PACKED_T.bit_length()
user = []
USER_SEGMENT_PACKED_T = self.USER_SEGMENT_PACKED_T
for s in range(hwIO.SEGMENT_CNT):
userSegment = _user[(s + 1) * UW: s * UW]
userSegment = USER_SEGMENT_PACKED_T.from_py(userSegment.val, userSegment.vld_mask)._reinterpret_cast(USER_SEGMENT_T)
user.append(userSegment)
else:
USE_SOF = hwIO.USE_SOF
ERROR_WIDTH = hwIO.ERROR_WIDTH
USE_EMPTY = self.USE_EMPTY
USE_ENABLE = self.USE_ENABLE
data = [d.read() for d in hwIO.data]
user = []
for u in hwIO.user:
uVal = {
"eof": u.eof.read(),
}
if USE_ENABLE:
uVal["enable"] = u.enable.read()
if USE_EMPTY:
uVal["empty"] = u.empty.read()
if ERROR_WIDTH:
uVal["err"] = u.err.read()
if USE_SOF:
uVal["sof"] = u.sof.read()
user.append(USER_SEGMENT_T.from_py(uVal))
return (tuple(data), tuple(user))
[docs]
def set_data(self, dataWord: Axi4StreamSegmentedAgentWordType):
"""
:param dataWord: tuple of segments
"""
hwIO: Axi4StreamSegmented = self.hwIO
USE_SOF = hwIO.USE_SOF
ERROR_WIDTH = hwIO.ERROR_WIDTH
USE_ENABLE = self.USE_ENABLE
USE_EMPTY = self.USE_EMPTY
if dataWord is None:
if hwIO.PACK_SEGMENT_BITS:
hwIO.data.write(None)
hwIO.user.write(None)
else:
for d in hwIO.data:
d.write(None)
for u in hwIO.user:
if USE_ENABLE:
u.enable.write(None)
if USE_SOF:
u.sof.write(None)
u.eof.write(None)
if ERROR_WIDTH:
u.err.write(None)
if USE_EMPTY:
u.empty.write(None)
else:
data = []
user = []
for d, u in dataWord:
data.append(d)
user.append(u)
paddingSegmentCnt = hwIO.SEGMENT_CNT - len(dataWord)
if paddingSegmentCnt:
# add padding
for _ in range(paddingSegmentCnt):
data.append(self.DATA_SEGMENT_INVALID)
user.append(self.USER_SEGMENT_DISABLED)
if hwIO.PACK_SEGMENT_BITS:
data = Concat(*reversed(data)) # data[0] to LSB side
hwIO.data.write(data)
userPacked = Concat(*reversed(list(u._reinterpret_cast(self.USER_SEGMENT_PACKED_T) for u in user)))
hwIO.user.write(userPacked)
else:
for dIO, d in zip(hwIO.data, data):
dIO.write(d)
for uIO, u in zip(hwIO.user, user):
if USE_ENABLE:
uIO.enable.write(u.enable)
if USE_SOF:
uIO.sof.write(u.sof)
uIO.eof.write(u.eof)
if ERROR_WIDTH:
uIO.err.write(u.err)
if USE_EMPTY:
uIO.empty.write(u.empty)
_Axi4StreamSegmentedWordOfSegment = tuple[HBitsConst, HConst]
[docs]
class _Axi4StreamSegmentedWord():
"""
:ivar segmentWords: word of data and control signals encoded in user signal for a single segment
"""
[docs]
def __init__(self, segmentWords:tuple[Deque[HBitsConst], Deque[HConst]], SEGMENT_CNT:int):
self.segmentWords = segmentWords
self.SEGMENT_CNT = SEGMENT_CNT
[docs]
def empty(self):
return not self.segmentWords[0]
[docs]
def popleft(self):
segmentIndex = self.SEGMENT_CNT - len(self.segmentWords[0])
return (segmentIndex, self.segmentWords[0].popleft(), self.segmentWords[1].popleft())
[docs]
@classmethod
def popSegmentWordFromAgentData(cls, ag_data:Deque[tuple[HBitsConst, HConst]], SEGMENT_CNT:int) -> Self:
cur = ag_data[0]
if not isinstance(cur, _Axi4StreamSegmentedWord):
segments = (deque(cur[0]), deque(cur[1]))
cur = ag_data[0] = _Axi4StreamSegmentedWord(segments, SEGMENT_CNT)
segmentData = cur.popleft()
if cur.empty():
ag_data.popleft()
return segmentData
if __name__ == "__main__":
ref = [
# (40e9 , 128, 1, 312.5e6),
# (50e9 , 256, 2, 195.3125e6),
# (100e9, 384, 3, 260.5e6),
# (100e9, 512, 1, 390.625e6),
# (400e9, 1024, 8, 390.625e6),
# (600e9, 1536, 12, 390.625e6),
# (600e9, 768, 4, 1000e6),
(100e9, 112, 4, 1000e6),
]
PACKET_WIDTH = (64 + 1) * 8
for bitrate, dataWidth, refSegmentCnt, freq in ref:
segmentCnt = Axi4StreamSegmented.getMinNumberOfSegments(bitrate, dataWidth, freq, PACKET_WIDTH)
print(bitrate / 1e9, segmentCnt, segmentCnt == refSegmentCnt)