Source code for hwtLib.mem.fifoArray

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import If
from hwt.code_utils import rename_signal
from hwt.hdl.types.bits import HBits
from hwt.hwIOs.std import HwIOSignal, HwIOVectSignal, HwIORdVldSync
from hwt.hwIOs.utils import addClkRstn
from hwt.hwModule import HwModule
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwtLib.commonHwIO.addr_data_bidir import HwIOAddrInDataOutRdVld, HwIOAddrInDataOutRdVldAgent
from hwtLib.handshaked.hwIOBiDirectional import HwIORdVldSyncBiDirectionalData, \
    HwIORdVldSyncBiDirectionalDataAgent
from hwtLib.logic.binToOneHot import binToOneHot
from hwtLib.logic.oneHotToBin import oneHotToBin
from hwtSimApi.hdlSimulator import HdlSimulator
from ipCorePackager.constants import DIRECTION
from pyMathBitPrecise.bit_utils import mask


[docs] class FifoArrayInsertInterface(HwIORdVldSyncBiDirectionalData): """ :ivar ~.append: if append = 1 the item is appended to last list item specified using "addr" else new list is created and "addr" value is ignored :ivar ~.addr: an address with potential end of the list :ivar ~.data: data to store in next list node :ivar ~.addr_ret: an address where the item was inserted to .. hwt-autodoc:: """ @override def hwConfig(self): self.ADDR_WIDTH = HwParam(32) self.DATA_WIDTH = HwParam(32) @override def hwDeclr(self): self.addr = HwIOVectSignal(self.ADDR_WIDTH) self.append = HwIOSignal() self.data = HwIOVectSignal(self.DATA_WIDTH) # an address where the item was stored self.addr_ret = HwIOVectSignal(self.ADDR_WIDTH, masterDir=DIRECTION.IN) HwIORdVldSync.hwDeclr(self)
[docs] @override def _initSimAgent(self, sim: HdlSimulator): self._ag = FifoArrayInsertInterfaceAgent(sim, self)
[docs] class FifoArrayInsertInterfaceAgent(HwIORdVldSyncBiDirectionalDataAgent): """ Simulation agent for :class:`.FifoArrayInsertInterface` interface """
[docs] @override def onMonitorReady(self): a = self.dinData.popleft() self.hwIO.addr_ret.write(a)
[docs] @override def onDriverWriteAck(self): a = self.hwIO.addr_ret.read() self.dinData.append(a)
[docs] @override def get_data(self): i = self.hwIO return (i.addr.read(), i.append.read(), i.data.read())
[docs] @override def set_data(self, data): if data is None: a, ap, d = (None, None, None) else: a, ap, d = data i = self.hwIO return (i.addr.write(a), i.append.write(ap), i.data.write(d))
[docs] class FifoArrayPopInterface(HwIOAddrInDataOutRdVld): """ :ivar ~.addr: the address of the list head to read from: :ivar ~.data: the return data which was read :ivar ~.last: flag which tell if this node was last in this list and thus this list is now empty and deallocated :ivar ~.addr_next: address on a next item in this FIFO .. hwt-autodoc:: """ @override def hwDeclr(self): super(FifoArrayPopInterface, self).hwDeclr() self.last = HwIOSignal() self.addr_next = HwIOVectSignal(self.ADDR_WIDTH)
[docs] @override def _initSimAgent(self, sim: HdlSimulator): self._ag = FifoArrayPopInterfaceAgent(sim, self)
[docs] class FifoArrayPopInterfaceAgent(HwIOAddrInDataOutRdVldAgent): """ Simulation agent for :class:`.FifoArrayPopInterfaceAgent` interface """
[docs] @override def set_data(self, data): i = self.hwIO if data is None: d, last, an = (None, None, None) else: d, last, an = data i.data.write(d) i.last.write(last) i.addr_next.write(an)
[docs] @override def get_data(self): i = self.hwIO return (i.data.read(), i.last.read(), i.addr_next.read())
[docs] class FifoArray(HwModule): """ This component is an array of list nodes, which can be used to emulate multiple FIFOs. The memory is shared and the number of lists stored in this array is limited only by memory. Corresponds to data structure: .. code-block:: cpp // note that in implementation each part of struct item is stored in separate array struct item { value_t value; item * next; bool valid; bool last; }; item items[ITEMS]; :note: The insert_addr is used to pop from specific list. The list can be read only from it's head in FIFO order manner. Item is last if it's next pointer points on this item. :note: DistRAM implementation. :note: The pop address is not checked it is possible to pop from wrong list if address is specified incorrectly .. hwt-autodoc:: """ @override def hwConfig(self): self.ITEMS = HwParam(4) self.DATA_WIDTH = HwParam(8) @override def hwDeclr(self): assert self.ITEMS > 1, self.ITEMS self.addr_t = HBits(log2ceil(self.ITEMS - 1), signed=False) self.value_t = HBits(self.DATA_WIDTH) addClkRstn(self) self.insert = FifoArrayInsertInterface() self.pop = FifoArrayPopInterface()._m() for i in [self.insert, self.pop]: i.ADDR_WIDTH = self.addr_t.bit_length() i.DATA_WIDTH = self.value_t.bit_length() @override def hwImpl(self): addr_t = self.addr_t value_t = self.value_t item_mask_t = HBits(self.ITEMS) # bitmap used to quickly detect position of an empty node item_valid = self._reg("item_valid", item_mask_t, def_val=0) item_last = self._reg("item_last", item_mask_t, def_val=0) # an address on where next item should be inserted insert_addr_next = self._reg("insert_addr_next", addr_t, def_val=0) insert_addr_next(oneHotToBin(self, rename_signal(self, ~item_valid._rtlNextSig, "item_become_invalid"))) # get index of first non valid pop = self.pop insert = self.insert insert.addr_ret(insert_addr_next) insert.rd(item_valid != mask(self.ITEMS)) pop_one_hot = rename_signal( self, binToOneHot(pop.addr, en=pop.vld & pop.rd), "pop_one_hot") insert_one_hot = rename_signal( self, binToOneHot(insert_addr_next, en=insert.vld & insert.rd), "insert_one_hot") insert_parent_one_hot = rename_signal( self, binToOneHot(insert.addr, en=insert.vld & insert.rd & insert.append), "insert_parent_one_hot") item_valid((item_valid & ~pop_one_hot) | insert_one_hot) item_last((item_last & ~insert_parent_one_hot) | insert_one_hot) values = self._sig("values", value_t[self.ITEMS]) next_ptrs = self._sig("next_ptrs", addr_t[self.ITEMS]) If(self.clk._onRisingEdge(), If(insert.vld & insert.rd, next_ptrs[insert.addr](insert_addr_next), # append behind parent node at insert_ptr values[insert_addr_next](insert.data), ) ) pop.data(values[pop.addr]) pop.addr_next(next_ptrs[pop.addr]) pop.last(item_last[pop.addr] & item_valid[pop.addr]) pop.vld(item_valid != 0)
if __name__ == "__main__": from hwt.synth import to_rtl_str m = FifoArray() print(to_rtl_str(m))