Source code for hwtLib.mem.ramCumulativeMask

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from math import ceil

from hwt.code import Concat, Or, If
from hwt.code_utils import rename_signal
from hwt.hdl.constants import WRITE, READ
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import BramPort_withoutClk, HandshakeSync, VectSignal, \
    Signal
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.pyUtils.arrayQuery import iter_with_last
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.vectorUtils import iterBits
from hwtLib.mem.ram import RamSingleClock
from ipCorePackager.constants import DIRECTION
from pyMathBitPrecise.bit_utils import mask


[docs]class BramPort_withReadMask_withoutClk(BramPort_withoutClk): """ Block RAM port with a :py:obj:`~en` handshaked interface for arbitration :ivar do_accumulate: Signal if 1 the mask bits are or-ed together with the value in stored in ram :ivar do_overwrite: Signal if 1 the the data mask in ram is set to current we value :note: :py:obj:`~.do_overwrite` has higher priority than :py:obj:`~.do_accumulate` :note: :py:obj:`~.do_accumulate` affect only the bytes in memory where data validity mask is stored. :ivar dout_mask: Read port contains this signal which contains the cumulative validity mask for the data. :note: en is related to a address, and write data, the read data may be available in nect clock cycle depending on read latency of the RAM .. hwt-autodoc:: """ def _declr(self): assert self.HAS_R or self.HAS_W, "has to have at least read or write part" self.addr = VectSignal(self.ADDR_WIDTH) DATA_WIDTH = self.DATA_WIDTH if self.HAS_W: self.din = VectSignal(DATA_WIDTH) if self.HAS_R: self.dout = VectSignal(DATA_WIDTH, masterDir=DIRECTION.IN) self.en = HandshakeSync() if (self.HAS_R and self.HAS_W) or (self.HAS_W and self.HAS_BE): # in write only mode we do not need this as well as we can use "en" if self.HAS_BE: assert DATA_WIDTH % 8 == 0, DATA_WIDTH self.we = VectSignal(DATA_WIDTH // 8) else: self.we = Signal() if self.HAS_W and self.HAS_BE: self.do_accumulate = Signal() self.do_overwrite = Signal() if self.HAS_R and self.HAS_BE: assert self.DATA_WIDTH % 8 == 0 self.dout_mask = VectSignal(self.DATA_WIDTH // 8, masterDir=DIRECTION.IN)
[docs]def is_mask_byte_unaligned(mask_signal: RtlSignal) -> RtlSignal: # True if each byte of the mask is all 0 or all 1 we_bytes = list(iterBits(mask_signal, bitsInOne=8, fillup=True)) write_mask_not_aligned = [] for last, b in iter_with_last(we_bytes): if last: # cut off padding if required mask_rem_w = mask_signal._dtype.bit_length() % 8 if mask_rem_w: b = b[mask_rem_w:] write_mask_not_aligned.append((b != 0) & (b != mask(b._dtype.bit_length()))) return Or(*write_mask_not_aligned)
[docs]class RamCumulativeMask(RamSingleClock): """ RAM which stores also byte enable value for each data word (to keep track of which bytes were updated). :note: :class:`~.BramPort_withReadMask_withoutClk` contains the informations about how to control this component. """ PORT_CLS = BramPort_withReadMask_withoutClk def _config(self): super(RamCumulativeMask, self)._config() self.HAS_BE = True self.PORT_CNT = (WRITE, READ) def _declr(self): assert self.HAS_BE assert len(self.PORT_CNT) >= 2, "Requres at least 1 WRITE and 1 READ port" assert self.PORT_CNT == ( WRITE, *(READ for _ in range(len(self.PORT_CNT) - 1))),\ self.PORT_CNT addClkRstn(self) RamSingleClock._declr_ports(self) self.MASK_W = self.DATA_WIDTH // 8 # padding to make mask width % 8 == 0 self.MASK_PADDING_W = ceil(self.MASK_W / 8) * 8 - self.MASK_W with self._paramsShared(exclude=({"DATA_WIDTH"}, {})): ram = self.ram = RamSingleClock() ram.DATA_WIDTH = self.DATA_WIDTH + self.MASK_PADDING_W + self.MASK_W def _impl(self): w = self.port[0] ram_w = self.ram.port[0] # True if each byte of the mask is 0xff or 0x00 we_bytes = list(iterBits(w.we, bitsInOne=8, fillup=True)) # cut off padding we_for_we_bytes = [] for last, b in iter_with_last(we_bytes): if last and self.MASK_PADDING_W: mask_rem_w = self.MASK_W % 8 b = b[mask_rem_w:] we_for_we_bytes.append(b != 0) we_for_we_bytes = rename_signal( self, Concat(*[b | ~w.do_accumulate | w.do_overwrite for b in reversed(we_for_we_bytes)]), "we_for_we_bytes") preload = self._reg("preload", def_val=0) If(w.en.vld, preload(~preload & w.do_accumulate & ~w.do_overwrite) ) w.en.rd(~w.do_accumulate | w.do_overwrite | preload) ram_w.addr(w.addr) ram_w.en(w.en.vld & (w.do_overwrite | ~w.do_accumulate | preload)) ram_w.we(Concat(w.we, we_for_we_bytes)) w_mask = w.we if self.MASK_PADDING_W: w_mask = Concat(Bits(self.MASK_PADDING_W).from_py(0), w_mask) is_first_read_port = True for ram_r, r in zip(self.ram.port[1:], self.port[1:]): if is_first_read_port: w_mask = preload._ternary(w_mask | ram_r.dout[self.MASK_PADDING_W + self.MASK_W:], w_mask) w_mask = rename_signal(self, w_mask, "w_mask") ram_w.din(Concat(w.din, w_mask)) will_preload_for_accumulate = rename_signal( self, w.en.vld & w.do_accumulate & ~w.do_overwrite, "will_preload_for_accumulate") ram_r.addr(will_preload_for_accumulate._ternary(w.addr, r.addr)) ram_r.en(will_preload_for_accumulate | r.en.vld) # [TODO] check if r.en.rd is according to spec r.en.rd(~will_preload_for_accumulate | preload) is_first_read_port = False else: ram_r.addr(r.addr) ram_r.en(r.en.vld) r.en.rd(1) r.dout(ram_r.dout[:self.MASK_PADDING_W + self.MASK_W]) r.dout_mask(ram_r.dout[self.MASK_W:]) propagateClkRstn(self)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str # from hwtLib.xilinx.constants import XILINX_VIVADO_MAX_DATA_WIDTH u = RamCumulativeMask() # u.ADDR_WIDTH = 5 # u.DATA_WIDTH = 512 # u.MAX_BLOCK_DATA_WIDTH = XILINX_VIVADO_MAX_DATA_WIDTH print(to_rtl_str(u))