Source code for hwtLib.amba.axi_comp.oooOp.reorder_buffer

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from typing import Tuple

from hwt.code import If
from hwt.hdl.constants import READ, WRITE
from hwt.hdl.types.bits import Bits
from hwt.interfaces.agents.handshaked import HandshakedAgent
from hwt.interfaces.hsStructIntf import HsStructIntf
from hwt.interfaces.std import VectSignal
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.synthesizer.param import Param
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.rtlLevel.rtlSyncSignal import RtlSyncSignal
from hwt.synthesizer.unit import Unit
from hwtLib.handshaked.builder import HsBuilder
from hwtLib.handshaked.ramAsHs import RamAsHs
from hwtLib.handshaked.streamNode import StreamNode
from hwtLib.logic.binToOneHot import binToOneHot
from hwtLib.mem.ram import RamSingleClock
from hwtLib.types.ctypes import uint16_t
from hwtSimApi.hdlSimulator import HdlSimulator
from pyMathBitPrecise.bit_utils import apply_set_and_clear


[docs]class HsStructWithIdIntf(HsStructIntf): def _config(self): HsStructIntf._config(self) self.ID_WIDTH = Param(6) def _declr(self): self.id = VectSignal(self.ID_WIDTH) HsStructIntf._declr(self)
[docs] def _initSimAgent(self, sim: HdlSimulator): self._ag = HsStructWithIdIntfAgent(sim, self)
[docs]class HsStructWithIdIntfAgent(HandshakedAgent):
[docs] def get_data(self): i = self.intf return (i.id.read(), i.data.read())
[docs] def set_data(self, data): i = self.intf if data is None: t, d = (None, None) else: t, d = data i.id.write(t) i.data.write(d)
[docs]class ReorderBuffer(Unit): """ Serialize an unordered input sequence to continuous output sequence. :ivar dataIn: an input interface with an id signal which is used to reconstruct the sequence :ivar dataOut: an output interface with a data from dataIn which is ordered according to its id .. hwt-autodoc:: """ def _config(self): HsStructWithIdIntf._config(self) self.T = uint16_t def _declr(self): assert self.T is not None addClkRstn(self) with self._paramsShared(): self.dataIn = HsStructWithIdIntf() self.dataOut = HsStructIntf()._m() self.storage_w = RamAsHs() self.storage_w.HAS_W = True self.storage_w.HAS_R = False self.storage_w.ADDR_WIDTH = self.ID_WIDTH; self.storage_w.DATA_WIDTH = self.T.bit_length() self.storage_r = RamAsHs() self.storage_r._updateParamsFrom(self.storage_w) self.storage_r.HAS_R = True self.storage_r.HAS_W = False self.storage_ram = RamSingleClock() self.storage_ram._updateParamsFrom(self.storage_w) self.storage_ram.PORT_CNT = (WRITE, READ)
[docs] def item_occupancy_reg(self) -> Tuple[RtlSignal, RtlSyncSignal, RtlSignal]: item_occ_set = self._sig("item_occ_set", Bits(2 ** self.ID_WIDTH)) item_occ_clear = self._sig("item_occ_clear", Bits(2 ** self.ID_WIDTH)) item_occ = self._reg("item_occ", Bits(2 ** self.ID_WIDTH), def_val=0) item_occ(apply_set_and_clear(item_occ, item_occ_set, item_occ_clear)) return item_occ_set, item_occ, item_occ_clear
def _impl(self): self.storage_ram.port[0](self.storage_w.ram) self.storage_ram.port[1](self.storage_r.ram) dataIn_reg = HsBuilder(self, self.dataIn).buff(1).end item_occ_set, item_occ, item_occ_clear = self.item_occupancy_reg() # Input side sn_in = StreamNode( [dataIn_reg], [self.storage_w.w], extraConds={dataIn_reg:~(item_occ[dataIn_reg.id])} ) sn_in.sync() self.storage_w.w.addr(dataIn_reg.id) self.storage_w.w.data(dataIn_reg.data._reinterpret_cast(self.storage_w.w.data._dtype)) If(sn_in.ack(), item_occ_set(binToOneHot(dataIn_reg.id)) ).Else( item_occ_set(0) ) # Output out_cntr = self._reg("item_occ", Bits(self.ID_WIDTH), def_val=0) sn_addr = StreamNode( [], [self.storage_r.r.addr], extraConds={self.storage_r.r.addr: item_occ[out_cntr]} ) sn_addr.sync() self.storage_r.r.addr.data(out_cntr) If(sn_addr.ack(), item_occ_clear(binToOneHot(out_cntr)), out_cntr(out_cntr + 1) ).Else( item_occ_clear(0) ) self.dataOut(self.storage_r.r.data, exclude=[self.dataOut.data]) self.dataOut.data(self.storage_r.r.data.data._reinterpret_cast(self.dataOut.data._dtype)) propagateClkRstn(self)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = ReorderBuffer() u.T = uint16_t u.ID_WIDTH = 4 print(to_rtl_str(u))