Source code for hwtLib.amba.axis_comp.frame_join.input_reg

from hwt.code import If
from hwt.code_utils import rename_signal
from hwt.hObjList import HObjList
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.struct import HStruct
from hwt.hwIO import HwIO
from hwt.hwIOs.hwIOArray import HwIOArray
from hwt.hwIOs.std import HwIOVectSignal, HwIOSignal
from hwt.hwIOs.utils import addClkRstn
from hwt.hwModule import HwModule
from hwt.hwParam import HwParam
from hwt.pyUtils.arrayQuery import iter_with_last
from hwt.pyUtils.typingFuture import override
from hwt.serializer.mode import serializeParamsUniq
from hwtLib.amba.axi4s import Axi4Stream
from pyMathBitPrecise.bit_utils import mask


[docs] class UnalignedJoinRegIntf(HwIO): """ .. hwt-autodoc:: """ @override def hwConfig(self): Axi4Stream.hwConfig(self) @override def hwDeclr(self): self.data = HwIOVectSignal(self.DATA_WIDTH) self.keep = HwIOVectSignal(self.DATA_WIDTH // 8) if self.USE_STRB: self.strb = HwIOVectSignal(self.DATA_WIDTH // 8) self.relict = HwIOSignal() self.last = HwIOSignal()
[docs] @serializeParamsUniq class FrameJoinInputReg(HwModule): """ Pipeline of registers for Axi4Stream with keep mask and flushing .. hwt-autodoc:: """ @override def hwConfig(self): self.REG_CNT = HwParam(2) Axi4Stream.hwConfig(self) self.USE_KEEP = True @override def hwDeclr(self): assert self.USE_KEEP addClkRstn(self) with self._hwParamsShared(): self.dataIn = Axi4Stream() self.regs = HObjList( UnalignedJoinRegIntf()._m() for _ in range(self.REG_CNT)) self.keep_masks = HwIOArray( HwIOVectSignal(self.DATA_WIDTH // 8) for _ in range(self.REG_CNT) ) # used to shift whole register pipeline using input keep_mask self.ready = HwIOSignal() if self.ID_WIDTH or self.USER_WIDTH or self.DEST_WIDTH: raise NotImplementedError("It is not clear how id/user/dest" " should be managed between the frames") @override def hwImpl(self): mask_t = HBits(self.DATA_WIDTH // 8, force_vector=self.DATA_WIDTH == 8) data_fieds = [ (HBits(self.DATA_WIDTH), "data"), (mask_t, "keep"), # valid= keep != 0 (BIT, "relict"), # flag for partially consumed word (BIT, "last"), # flag for end of frame ] if self.USE_STRB: data_fieds.append((mask_t, "strb"), ) data_t = HStruct(*data_fieds) # regs[0] connected to output as first, regs[-1] connected to input regs = [ self._reg(f"r{r_i:d}", data_t, def_val={"keep": 0, "last": 0, "relict": 0}) for r_i in range(self.REG_CNT) ] ready = self.ready keep_masks = self.keep_masks fully_consumed_flags = [] for i, r in enumerate(regs): _fully_consumed = (r.keep & keep_masks[i])._eq(0) if i == 0: _fully_consumed = _fully_consumed & self.ready fully_consumed_flags.append(rename_signal(self, _fully_consumed, f"r{i:d}_fully_consumed")) for i, (is_first_on_input_r, r) in enumerate(iter_with_last(regs)): keep_mask_all = mask(r.keep._dtype.bit_length()) prev_keep_mask = self._sig(f"prev_keep_mask_{i:d}_tmp", r.keep._dtype) prev_last_mask = self._sig(f"prev_last_mask_{i:d}_tmp") is_empty = rename_signal(self, r.keep._eq(0) & ~(r.last & r.relict), f"r{i:d}_is_empty") if is_first_on_input_r: # is register connected directly to dataIn r_prev = self.dataIn If(r_prev.valid, prev_keep_mask(keep_mask_all), prev_last_mask(1) ).Else( # flush (invalid input but the data can be dispersed # in registers so we need to collapse it) prev_keep_mask(0), prev_last_mask(0), ) if self.REG_CNT > 1: next_r = regs[i - 1] next_empty = next_r.keep._eq(0) & ~(next_r.relict & next_r.last) else: next_empty = 0 whole_pipeline_shift = (ready & (regs[0].keep & self.keep_masks[0])._eq(0)) r_prev.ready(is_empty # last input reg empty | whole_pipeline_shift | next_empty) else: r_prev = regs[i + 1] prev_last_mask(1) If(is_empty, # flush prev_keep_mask(keep_mask_all), ).Else( prev_keep_mask(keep_masks[i + 1]), ) data_drive = [r.data(r_prev.data), ] if self.USE_STRB: data_drive.append(r.strb(r_prev.strb)) fully_consumed = fully_consumed_flags[i] if i == 0: # last register in path If((ready & fully_consumed) | is_empty, *data_drive, r.keep(r_prev.keep & prev_keep_mask), r.last(r_prev.last & prev_last_mask), r.relict( r_prev.valid & r_prev.keep._eq(0) if is_first_on_input_r else # [TODO] potentially it should not be keep[0] but fist keep with 1 r_prev.relict | (r_prev.last & (r_prev.keep[0] & ~keep_masks[i + 1][0] & ~fully_consumed_flags[i + 1])) ) ).Elif(ready, r.keep(r.keep & keep_masks[i]), r.relict(1), # became relict if there is some 1 in keep (== not fully consumed) ) else: next_fully_consumed = fully_consumed_flags[i - 1] next_r = regs[i - 1] next_is_empty = next_r.keep._eq(0) & ~(next_r.relict & next_r.last) if is_first_on_input_r: is_relict = r_prev.valid & r_prev.keep._eq(0) else: prev_fully_consumed = fully_consumed_flags[i + 1] is_relict = r_prev.relict | ~prev_fully_consumed If((ready & next_fully_consumed) | is_empty | next_is_empty, *data_drive, r.keep(r_prev.keep & prev_keep_mask), r.last(r_prev.last & prev_last_mask), r.relict(is_relict) ) for rout, rin in zip(self.regs, regs): rout.data(rin.data) if self.USE_STRB: rout.strb(rin.strb) rout.keep(rin.keep) rout.relict(rin.relict) rout.last(rin.last)