from typing import List, Union, Optional
from hwt.code import If, And, Or
from hwt.hdl.types.defs import BIT
from hwt.interfaces.std import Handshaked, VldSynced
from hwt.pyUtils.arrayQuery import where
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axis import AxiStream
[docs]class InNodeInfo():
"""
Interface has to be ready and handshaked logic should be constructed
"""
[docs] def __init__(self, inInterface: Handshaked, en: RtlSignal):
self.inInterface = inInterface
self.en = en
[docs] def sync(self, others: List['OutNodeInfo'], en: RtlSignal, in_vld: RtlSignal):
ackOfOthers = getAckOfOthers(self, others)
self.inInterface.rd(en & self.en & ackOfOthers & in_vld)
[docs] def ack(self) -> RtlSignal:
return self.inInterface.vld
[docs]class InNodeReadOnlyInfo(InNodeInfo):
"""
Interface has to be ready but handshake logic is not constructed
"""
[docs] def sync(self, others: List['OutNodeInfo'], en: RtlSignal, in_vld: RtlSignal):
pass
[docs]class OutNodeInfo():
"""
Container for informations about output for handshake logic
:ivar ~.outInterface: output parsed interface
:ivar ~.en: enable signal of word
:ivar ~.exvlusiveEn: enable signal of union member
"""
[docs] def __init__(self, parent: Unit,
outInterface: Union[Handshaked, VldSynced],
en: RtlSignal,
exclusiveEn: Optional[RtlSignal]):
self.parent = parent
self.outInterface = outInterface
self.en = en
self.exclusiveEn = exclusiveEn
self._ready = self.outInterface.rd
self._ack = parent._sig(outInterface._name + "_ack")
self._valid = self.outInterface.vld
[docs] def ack(self) -> RtlSignal:
# if output is not selected in union we do not need to care
# about it's ready
# if self.exclusiveEn is not None:
# ack = ack | ~self.exclusiveEn
return self._ack
[docs] def _sync(self, others: List['OutNodeInfo'], en: RtlSignal, din_vld: RtlSignal):
"""
:return: output validity signal which is checked
if data from this word was previously consumed
"""
en_extra = self.en & self.exclusiveEn
en = en & en_extra
if self.exclusiveEn is not None:
en = en & self.exclusiveEn
if len(others) > 1:
# used as a flag for the case where data for this part was
# consumed but some other part was still not
was_consumed = self.parent._reg(
self.outInterface._name + "_was_consumed_", def_val=0)
self._ack(self._ready | was_consumed)
ackOfOthers = getAckOfOthers(self, others)
If(en & din_vld,
If(was_consumed & ackOfOthers,
# restart was_consumed because all others were consumed
was_consumed(0),
).Elif(~ackOfOthers & self._ready,
# set was consumed because the data is passed to output
# but some other part can not do the same
was_consumed(1)
)
)
# value of this node was not consumed and it is enabled
return en_extra & ~was_consumed
else:
assert others[0] is self
self._ack(self._ready)
return en_extra
[docs] def sync(self, others: List['OutNodeInfo'], en: RtlSignal, din_vld: RtlSignal):
vld = self._sync(others, en, din_vld)
self._valid(en & vld & din_vld)
[docs] def __repr__(self):
return f"<{self.__class__.__name__:s} for {self.outInterface._name:s}>"
[docs]class OutStreamNodeGroup():
[docs] def __init__(self, word_index: Optional[RtlSignal], word_index_start: int):
self.word_index = word_index
self.members = []
self.word_range_min = word_index_start
self.word_range_max = word_index_start
self.others_first = []
self.others_last = []
self.vld_first = BIT.from_py(0)
self.vld_last = BIT.from_py(0)
[docs]class OutStreamNodeInfo(OutNodeInfo):
[docs] def __init__(self, parent: Unit,
outInterface: AxiStream,
en: RtlSignal,
exclusiveEn: Optional[RtlSignal],
streamGroup: OutStreamNodeGroup
):
self.streamGroup = streamGroup
self.parent = parent
self.outInterface = outInterface
self.en = en
self.exclusiveEn = exclusiveEn
self._ready = self.outInterface.ready
self._ack = parent._sig(outInterface._name + "_ack")
self._valid = self.outInterface.valid
streamGroup.members.append(self)
[docs] def is_in_word_range(self, range_min: int, range_max: int):
if range_min > range_max:
return False
w = self.streamGroup.word_index
if w is None:
return BIT.from_py(1)
if range_min == range_max:
en = w._eq(range_min)
elif range_min == 0:
en = (w <= (range_max))
else:
en = ((w > (range_min - 1)) & (
w <= (range_max)))
return en
[docs] def sync(self, others: List['OutNodeInfo'], en: RtlSignal, din_vld: RtlSignal):
g = self.streamGroup
w = g.word_index
is_first_stream_member = g.members[0] is self
is_last_stream_member = g.members[-1] is self
if is_first_stream_member:
g.others_first = others
if not is_last_stream_member:
g.vld_first = OutNodeInfo._sync(
self, others, en & w._eq(g.word_range_min), din_vld)
# else we will process it later in last member processing
if is_last_stream_member:
g.others_last = others
if g.word_range_min == g.word_range_max:
en = self.is_in_word_range(g.word_range_min, g.word_range_max)
return OutNodeInfo.sync(self, others, en, din_vld)
else:
# last word != first word
if len(g.others_last) > 1:
g.vld_last = OutNodeInfo._sync(
self, others, en & w._eq(g.word_range_max), din_vld)
else:
self._ack(self._ready)
in_body = self.is_in_word_range(g.word_range_min + 1,
g.word_range_max - 1)
self._valid(en & din_vld & (g.vld_first | g.vld_last | in_body))
if not is_first_stream_member and not is_last_stream_member:
# stream member for body of the frame
self._ack(self._ready)
[docs]def getAckOfOthers(self: OutNodeInfo, others: List[OutNodeInfo]):
ackOfOthers = [BIT.from_py(1) if o is self else o.ack() for o in others]
return And(*ackOfOthers)
[docs]class ListOfOutNodeInfos(list):
[docs] def ack(self) -> RtlSignal:
if self:
return And(*map(lambda x: x.ack(), self))
else:
return BIT.from_py(1)
[docs] def sync(self, allNodes: List[OutNodeInfo], en: RtlSignal, din_vld: RtlSignal) -> None:
nodesWithoutMe = list(where(allNodes, lambda x: x is not self))
for node in self:
_allNodes = nodesWithoutMe + self
node.sync(_allNodes, en, din_vld)
[docs]class ExclusieveListOfHsNodes(list):
"""
@ivar selectorIntf: selector for this node
"""
[docs] def __init__(self, selectorIntf):
self.selectorIntf = selectorIntf
[docs] def append(self, selectorVal, item):
return list.append(self, (selectorVal, item))
[docs] def ack(self) -> RtlSignal:
ack = BIT.from_py(1)
if self:
acks = [x[1].ack() & self.selectorIntf.data._eq(x[0])
for x in self]
ack = Or(*acks)
return ack & self.selectorIntf.vld
[docs] def sync(self, allNodes: List[OutNodeInfo], en: RtlSignal, din_vld: RtlSignal) -> None:
nodesWithoutMe = list(where(allNodes, lambda x: x is not self))
for _, item in self:
item.sync(nodesWithoutMe, en, din_vld)