Source code for hwtLib.amba.axis_comp.frame_join.input_reg

from hwt.code import If
from hwt.code_utils import rename_signal
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.struct import HStruct
from hwt.interfaces.std import VectSignal, Signal
from hwt.interfaces.utils import addClkRstn
from hwt.pyUtils.arrayQuery import iter_with_last
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.hObjList import HObjList
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.param import Param
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axis import AxiStream
from pyMathBitPrecise.bit_utils import mask


[docs]class UnalignedJoinRegIntf(Interface): """ .. hwt-autodoc:: """ def _config(self): AxiStream._config(self) def _declr(self): self.data = VectSignal(self.DATA_WIDTH) self.keep = VectSignal(self.DATA_WIDTH // 8) if self.USE_STRB: self.strb = VectSignal(self.DATA_WIDTH // 8) self.relict = Signal() self.last = Signal()
[docs]@serializeParamsUniq class FrameJoinInputReg(Unit): """ Pipeline of registers for AxiStream with keep mask and flushing .. hwt-autodoc:: """ def _config(self): self.REG_CNT = Param(2) AxiStream._config(self) self.USE_KEEP = True def _declr(self): assert self.USE_KEEP addClkRstn(self) with self._paramsShared(): self.dataIn = AxiStream() self.regs = HObjList( UnalignedJoinRegIntf()._m() for _ in range(self.REG_CNT)) self.keep_masks = HObjList( VectSignal(self.DATA_WIDTH // 8) for _ in range(self.REG_CNT) ) # used to shift whole register pipeline using input keep_mask self.ready = Signal() if self.ID_WIDTH or self.USER_WIDTH or self.DEST_WIDTH: raise NotImplementedError("It is not clear how id/user/dest" " should be managed between the frames") def _impl(self): mask_t = Bits(self.DATA_WIDTH // 8, force_vector=True) data_fieds = [ (Bits(self.DATA_WIDTH), "data"), (mask_t, "keep"), # valid= keep != 0 (BIT, "relict"), # flag for partially consumed word (BIT, "last"), # flag for end of frame ] if self.USE_STRB: data_fieds.append((mask_t, "strb"), ) data_t = HStruct(*data_fieds) # regs[0] connected to output as first, regs[-1] connected to input regs = [ self._reg(f"r{r_i:d}", data_t, def_val={"keep": 0, "last": 0, "relict": 0}) for r_i in range(self.REG_CNT) ] ready = self.ready keep_masks = self.keep_masks fully_consumed_flags = [] for i, r in enumerate(regs): _fully_consumed = (r.keep & keep_masks[i])._eq(0) if i == 0: _fully_consumed = _fully_consumed & self.ready fully_consumed_flags.append(rename_signal(self, _fully_consumed, f"r{i:d}_fully_consumed")) for i, (is_first_on_input_r, r) in enumerate(iter_with_last(regs)): keep_mask_all = mask(r.keep._dtype.bit_length()) prev_keep_mask = self._sig(f"prev_keep_mask_{i:d}_tmp", r.keep._dtype) prev_last_mask = self._sig(f"prev_last_mask_{i:d}_tmp") is_empty = rename_signal(self, r.keep._eq(0) & ~(r.last & r.relict), f"r{i:d}_is_empty") if is_first_on_input_r: # is register connected directly to dataIn r_prev = self.dataIn If(r_prev.valid, prev_keep_mask(keep_mask_all), prev_last_mask(1) ).Else( # flush (invalid input but the data can be dispersed # in registers so we need to collapse it) prev_keep_mask(0), prev_last_mask(0), ) if self.REG_CNT > 1: next_r = regs[i - 1] next_empty = next_r.keep._eq(0) & ~(next_r.relict & next_r.last) else: next_empty = 0 whole_pipeline_shift = (ready & (regs[0].keep & self.keep_masks[0])._eq(0)) r_prev.ready(is_empty # last input reg empty | whole_pipeline_shift | next_empty) else: r_prev = regs[i + 1] prev_last_mask(1) If(is_empty, # flush prev_keep_mask(keep_mask_all), ).Else( prev_keep_mask(keep_masks[i + 1]), ) data_drive = [r.data(r_prev.data), ] if self.USE_STRB: data_drive.append(r.strb(r_prev.strb)) fully_consumed = fully_consumed_flags[i] if i == 0: # last register in path If((ready & fully_consumed) | is_empty, *data_drive, r.keep(r_prev.keep & prev_keep_mask), r.last(r_prev.last & prev_last_mask), r.relict( r_prev.valid & r_prev.keep._eq(0) if is_first_on_input_r else # [TODO] potentially it should not be keep[0] but fist keep with 1 r_prev.relict | (r_prev.last & (r_prev.keep[0] & ~keep_masks[i + 1][0] & ~fully_consumed_flags[i + 1])) ) ).Elif(ready, r.keep(r.keep & keep_masks[i]), r.relict(1), # became relict if there is some 1 in keep (== not fully consumed) ) else: next_fully_consumed = fully_consumed_flags[i - 1] next_r = regs[i - 1] next_is_empty = next_r.keep._eq(0) & ~(next_r.relict & next_r.last) if is_first_on_input_r: is_relict = r_prev.valid & r_prev.keep._eq(0) else: prev_fully_consumed = fully_consumed_flags[i + 1] is_relict = r_prev.relict | ~prev_fully_consumed If((ready & next_fully_consumed) | is_empty | next_is_empty, *data_drive, r.keep(r_prev.keep & prev_keep_mask), r.last(r_prev.last & prev_last_mask), r.relict(is_relict) ) for rout, rin in zip(self.regs, regs): rout.data(rin.data) if self.USE_STRB: rout.strb(rin.strb) rout.keep(rin.keep) rout.relict(rin.relict) rout.last(rin.last)