Source code for hwtLib.amba.axi_comp.oooOp.reorder_buffer
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Tuple
from hwt.code import If
from hwt.constants import READ, WRITE
from hwt.hdl.types.bits import HBits
from hwt.hwIOs.agents.rdVldSync import HwIODataRdVldAgent
from hwt.hwIOs.hwIOStruct import HwIOStructRdVld
from hwt.hwIOs.std import HwIOVectSignal
from hwt.hwIOs.utils import addClkRstn, propagateClkRstn
from hwt.hwModule import HwModule
from hwt.hwParam import HwParam
from hwt.pyUtils.typingFuture import override
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwtLib.handshaked.builder import HsBuilder
from hwtLib.handshaked.ramAsAddrDataRdVld import RamAsAddrDataRdVld
from hwtLib.handshaked.streamNode import StreamNode
from hwtLib.logic.binToOneHot import binToOneHot
from hwtLib.mem.ram import RamSingleClock
from hwtLib.types.ctypes import uint16_t
from hwtSimApi.hdlSimulator import HdlSimulator
from pyMathBitPrecise.bit_utils import apply_set_and_clear
[docs]
class HwIOStructRdVldWithId(HwIOStructRdVld):
@override
def hwConfig(self):
HwIOStructRdVld.hwConfig(self)
self.ID_WIDTH = HwParam(6)
@override
def hwDeclr(self):
self.id = HwIOVectSignal(self.ID_WIDTH)
HwIOStructRdVld.hwDeclr(self)
[docs]
@override
def _initSimAgent(self, sim: HdlSimulator):
self._ag = HwIOStructRdVldWithIdAgent(sim, self)
[docs]
class HwIOStructRdVldWithIdAgent(HwIODataRdVldAgent):
[docs]
@override
def get_data(self):
i = self.hwIO
return (i.id.read(), i.data.read())
[docs]
@override
def set_data(self, data):
i = self.hwIO
if data is None:
t, d = (None, None)
else:
t, d = data
i.id.write(t)
i.data.write(d)
[docs]
class ReorderBuffer(HwModule):
"""
Serialize an unordered input sequence to continuous output sequence.
:ivar dataIn: an input interface with an id signal which is used to reconstruct the sequence
:ivar dataOut: an output interface with a data from dataIn which is ordered according to its id
.. hwt-autodoc::
"""
@override
def hwConfig(self):
HwIOStructRdVldWithId.hwConfig(self)
self.T = uint16_t
@override
def hwDeclr(self):
assert self.T is not None
addClkRstn(self)
with self._hwParamsShared():
self.dataIn = HwIOStructRdVldWithId()
self.dataOut = HwIOStructRdVld()._m()
self.storage_w = RamAsAddrDataRdVld()
self.storage_w.HAS_W = True
self.storage_w.HAS_R = False
self.storage_w.ADDR_WIDTH = self.ID_WIDTH;
self.storage_w.DATA_WIDTH = self.T.bit_length()
self.storage_r = RamAsAddrDataRdVld()
self.storage_r._updateHwParamsFrom(self.storage_w)
self.storage_r.HAS_R = True
self.storage_r.HAS_W = False
self.storage_ram = RamSingleClock()
self.storage_ram._updateHwParamsFrom(self.storage_w)
self.storage_ram.PORT_CNT = (WRITE, READ)
[docs]
def item_occupancy_reg(self) -> Tuple[RtlSignal, RtlSignal, RtlSignal]:
item_occ_set = self._sig("item_occ_set", HBits(2 ** self.ID_WIDTH))
item_occ_clear = self._sig("item_occ_clear", HBits(2 ** self.ID_WIDTH))
item_occ = self._reg("item_occ", HBits(2 ** self.ID_WIDTH), def_val=0)
item_occ(apply_set_and_clear(item_occ, item_occ_set, item_occ_clear))
return item_occ_set, item_occ, item_occ_clear
@override
def hwImpl(self):
self.storage_ram.port[0](self.storage_w.ram)
self.storage_ram.port[1](self.storage_r.ram)
dataIn_reg = HsBuilder(self, self.dataIn).buff(1).end
item_occ_set, item_occ, item_occ_clear = self.item_occupancy_reg()
# Input side
sn_in = StreamNode(
[dataIn_reg],
[self.storage_w.w],
extraConds={dataIn_reg:~(item_occ[dataIn_reg.id])}
)
sn_in.sync()
self.storage_w.w.addr(dataIn_reg.id)
self.storage_w.w.data(dataIn_reg.data._reinterpret_cast(self.storage_w.w.data._dtype))
If(sn_in.ack(),
item_occ_set(binToOneHot(dataIn_reg.id))
).Else(
item_occ_set(0)
)
# Output
out_cntr = self._reg("item_occ", HBits(self.ID_WIDTH), def_val=0)
sn_addr = StreamNode(
[],
[self.storage_r.r.addr],
extraConds={self.storage_r.r.addr: item_occ[out_cntr]}
)
sn_addr.sync()
self.storage_r.r.addr.data(out_cntr)
If(sn_addr.ack(),
item_occ_clear(binToOneHot(out_cntr)),
out_cntr(out_cntr + 1)
).Else(
item_occ_clear(0)
)
self.dataOut(self.storage_r.r.data, exclude=[self.dataOut.data])
self.dataOut.data(self.storage_r.r.data.data._reinterpret_cast(self.dataOut.data._dtype))
propagateClkRstn(self)
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = ReorderBuffer()
m.T = uint16_t
m.ID_WIDTH = 4
print(to_rtl_str(m))