from typing import Union, Optional, Tuple, List
from hwt.code import If, Switch, Concat
from hwt.hdl.frameTmplUtils import ChoicesOfFrameParts
from hwt.hdl.transPart import TransPart
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.stream import HStream
from hwt.hdl.types.struct import HStructField
from hwt.interfaces.structIntf import StructIntf
from hwt.interfaces.unionIntf import UnionSource, UnionSink
from hwt.synthesizer.byteOrder import reverseByteOrder
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axis import AxiStream
from hwtLib.amba.axis_comp.frame_parser.out_containers import ListOfOutNodeInfos, \
ExclusieveListOfHsNodes, InNodeInfo, InNodeReadOnlyInfo, OutStreamNodeGroup, \
OutStreamNodeInfo, OutNodeInfo
from hwtLib.handshaked.builder import HsBuilder
from pyMathBitPrecise.bit_utils import mask
[docs]def get_byte_order_modifier(axis: AxiStream):
if axis.IS_BIGENDIAN:
return reverseByteOrder
else:
def byteOrderCare(sig):
return sig
return byteOrderCare
[docs]class AxiS_frameParserFieldConnector():
[docs] def __init__(self, parent: Unit, dataIn: AxiStream, dataOut: Union[StructIntf, UnionSource]):
self.parent = parent
self.dataIn = dataIn
self.dataOut = dataOut
self.byteOrderCare = get_byte_order_modifier(dataIn)
self._tmpRegsForSelect = {}
# TransTmpl: List[RtlSignal]
self._signalsOfParts = {}
# AxiStream: OutStreamNodeInfo
self._streamNodes = {}
[docs] def getInDataSignal(self, transPart: TransPart):
busDataSignal = self.dataIn.data
high, low = transPart.getBusWordBitRange()
return busDataSignal[high:low]
[docs] def choiceIsSelected(self,
interfaceOfChoice: Union[UnionSource, UnionSink]):
"""
Check if union member is selected by _select interface
in union interface
"""
parent = interfaceOfChoice._parent
r = self._tmpRegsForSelect[parent]
i = parent._interfaces.index(interfaceOfChoice)
return i, r.data._eq(i), r.vld
[docs] def connectParts(self,
allOutNodes: ListOfOutNodeInfos,
words: Tuple[int, List[Union[TransPart, ChoicesOfFrameParts]], bool],
wordIndex: Optional[RtlSignal]):
"""
Create main datamux from dataIn to dataOut
"""
for currentWordIndex, transParts, _ in words:
# each word index is used and there may be TransParts which are
# representation of padding
outNondes = ListOfOutNodeInfos()
for part in transParts:
self.connectPart(outNondes, part, BIT.from_py(1), BIT.from_py(1),
wordIndex, currentWordIndex)
allOutNodes.addWord(currentWordIndex, outNondes)
[docs] def connectChoicesOfFrameParts(
self,
hsNondes: ListOfOutNodeInfos,
part: ChoicesOfFrameParts,
en: Union[RtlSignal, bool],
exclusiveEn: Optional[RtlSignal],
wordIndex: Optional[RtlSignal],
currentWordIndex: int):
tToIntf = self.dataOut._fieldsToInterfaces
parentIntf = tToIntf[part.origin.parent.getFieldPath()]
try:
sel = self._tmpRegsForSelect[parentIntf]
except KeyError:
sel = HsBuilder(self.parent, parentIntf._select).buff().end
self._tmpRegsForSelect[parentIntf] = sel
unionGroup = ExclusieveListOfHsNodes(sel)
# for unions
for choice in part:
# connect data signals of choices and collect info about
# streams
intfOfChoice = tToIntf[choice.tmpl.getFieldPath()]
selIndex, isSelected, isSelectValid = self.choiceIsSelected(
intfOfChoice)
_exclusiveEn = isSelectValid & isSelected & exclusiveEn
unionMemberPart = ListOfOutNodeInfos()
for p in choice:
self.connectPart(unionMemberPart, p, en, _exclusiveEn,
wordIndex, currentWordIndex)
unionGroup.append(selIndex, unionMemberPart)
hsNondes.append(unionGroup)
if wordIndex is not None:
en = en & wordIndex._eq(currentWordIndex)
if part.isLastPart():
# synchronization of reading from _select register for unions
selNode = InNodeInfo(sel, en)
else:
selNode = InNodeReadOnlyInfo(sel, en)
hsNondes.append(selNode)
[docs] def connectStreamOfFrameParts(
self,
hsNondes: ListOfOutNodeInfos,
part: Union[TransPart, ChoicesOfFrameParts],
en: Union[RtlSignal, bool],
exclusiveEn: Optional[RtlSignal],
wordIndex: Optional[RtlSignal],
currentWordIndex: int):
orig = part.tmpl.origin[-1]
# use tmpl.parent because part is actually a chunk of data
# in the stream
path_to_stream_port = part.tmpl.parent.getFieldPath()
dout = self.dataOut._fieldsToInterfaces[path_to_stream_port]
if isinstance(orig, HStructField):
orig = orig.dtype
assert isinstance(orig, HStream), orig
# if not part.isLastPart():
# raise NotImplementedError()
if not len(orig.start_offsets) == 1:
raise NotImplementedError()
din = self.dataIn
is_first_part_in_stream = part.tmpl.parent.bitAddr == part.startOfPart
if is_first_part_in_stream:
frame_range = part.tmpl.parent.bitAddr, part.tmpl.parent.bitAddrEnd
DW = din.DATA_WIDTH
start_offset = frame_range[0] % DW
end_rem = frame_range[1] % DW
if orig.start_offsets != (0,):
raise NotImplementedError()
# this is a first part of stream
# now connect all data for each part
non_data_signals = [din.valid, din.ready, din.last]
if start_offset == 0 and end_rem == 0:
pass
else:
if dout.USE_STRB:
non_data_signals.append(din.strb)
if dout.USE_KEEP:
non_data_signals.append(din.keep)
assert start_offset % 8 == 0, start_offset
assert end_rem % 8 == 0, end_rem
first_word_i = frame_range[0] // DW
last_word_i = (frame_range[1] - 1) // DW
first_word_mask = mask((DW - start_offset) // 8) << start_offset // 8
body_word_mask = mask(DW // 8)
def set_mask(m):
res = []
if dout.USE_STRB:
res.append(dout.strb(m))
if dout.USE_KEEP:
res.append(dout.keep(m))
return res
if end_rem == 0:
last_word_mask = body_word_mask
else:
last_word_mask = mask(end_rem // 8)
if first_word_i == last_word_i:
# only single word, mask is constant
m = first_word_mask & last_word_mask
set_mask(m)
elif first_word_i == last_word_i + 1:
# only two words, mask is differnt between word 0 and 1
raise NotImplementedError()
else:
# 2+ words, first and body or last and body may be same
if first_word_mask == body_word_mask:
# last is only word with a special mask
If(wordIndex._eq(last_word_i),
*set_mask(last_word_mask)
).Else(
*set_mask(body_word_mask)
)
elif last_word_mask == body_word_mask:
# first is only word with special mask
If(wordIndex._eq(first_word_i),
*set_mask(first_word_mask)
).Else(
*set_mask(body_word_mask)
)
else:
# first, last, body word have all unique masks
Switch(wordIndex)\
.Case(first_word_i, set_mask(first_word_mask))\
.Case(last_word_i, set_mask(last_word_mask))\
.Default(set_mask(body_word_mask))
dout(din, exclude=non_data_signals)
is_last_part_in_stream = part.tmpl.parent.bitAddrEnd == part.endOfPart
if is_last_part_in_stream:
if wordIndex is None:
last = 1
else:
last = wordIndex._eq(currentWordIndex)
dout.last(last)
if is_first_part_in_stream or part.startOfPart % din.DATA_WIDTH == 0:
# first part in current word
streamGroup = self._streamNodes.setdefault(dout, OutStreamNodeGroup(wordIndex, currentWordIndex))
streamGroup.word_range_max = currentWordIndex
on = OutStreamNodeInfo(self.parent, dout, en, exclusiveEn, streamGroup)
hsNondes.append(on)
[docs] def connectPart(self,
hsNondes: ListOfOutNodeInfos,
part: Union[TransPart, ChoicesOfFrameParts],
en: Union[RtlSignal, bool],
exclusiveEn: Union[RtlSignal, bool],
wordIndex: Optional[RtlSignal],
currentWordIndex: int):
"""
Create datamux for a single output word in main fsm
and colect metainformations for handshake logic
and strb/keep
:param hsNondes: list of nodes of handshaked logic
"""
if isinstance(part, ChoicesOfFrameParts):
# connect union field
return self.connectChoicesOfFrameParts(
hsNondes, part, en, exclusiveEn, wordIndex, currentWordIndex)
# elif isinstance(part, StreamOfFrameParts):
# return self.connectStreamOfFrameParts(
# hsNondes, part, en, exclusiveEn, wordIndex)
elif part.isPadding:
return
fieldInfo = part.tmpl.origin[-1]
if isinstance(fieldInfo, HStream) or (
isinstance(fieldInfo, HStructField) and
isinstance(fieldInfo.dtype, HStream)):
return self.connectStreamOfFrameParts(
hsNondes, part, en, exclusiveEn, wordIndex, currentWordIndex)
# connect regular scalar field
fPartSig = self.getInDataSignal(part)
assert isinstance(fieldInfo, HStructField), fieldInfo
try:
signalsOfParts = self._signalsOfParts[part.tmpl]
except KeyError:
signalsOfParts = []
self._signalsOfParts[part.tmpl] = signalsOfParts
if wordIndex is not None:
en = en & wordIndex._eq(currentWordIndex)
if part.isLastPart():
# connect all parts in this group to output stream
signalsOfParts.append(fPartSig)
tToIntf = self.dataOut._fieldsToInterfaces
intf = tToIntf[part.tmpl.getFieldPath()]
intf.data(self.byteOrderCare(
Concat(
*reversed(signalsOfParts)
))
)
on = OutNodeInfo(self.parent, intf, en, exclusiveEn)
hsNondes.append(on)
else:
# part is not in same word as last part, we have to store it's value
# to register until the last part arrive
fPartReg = self.parent._reg(f"{fieldInfo.name:s}_part_{len(signalsOfParts):d}",
fPartSig._dtype)
dataVld = self.dataIn.valid & en & exclusiveEn
If(dataVld,
fPartReg(fPartSig)
)
signalsOfParts.append(fPartReg)