Source code for hwtLib.amba.axis_comp.resizer

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import If, Concat, Switch
from hwt.hdl.types.bits import Bits
from hwt.interfaces.utils import addClkRstn
from hwt.math import log2ceil
from hwt.synthesizer.param import Param
from hwtLib.amba.axis import AxiStream
from hwtLib.amba.axis_comp.base import AxiSCompBase
from hwtLib.amba.axis_comp.reg import AxiSReg
from hwtLib.handshaked.streamNode import StreamNode


[docs]class AxiS_resizer(AxiSCompBase): """ Change data with of AxiStream interface :attention: start of frame is expected to be aligned on first word :attention: strb can be not fully set only in last word :attention: in upscale mode id and other signals which are not dependent on data width are propagated only from last word :note: interface is configurable and schematic is example with AxiStream :note: first schematic is for upsize mode, second one is for downsize mode .. hwt-params: _example_AxiS_resizer_upscale .. hwt-interfaces: _example_AxiS_resizer_upscale .. hwt-schematic:: _example_AxiS_resizer_upscale .. hwt-schematic:: _example_AxiS_resizer_downscale """ def _config(self): AxiSCompBase._config(self) self.OUT_DATA_WIDTH = Param(64) self.USE_STRB = True def _declr(self): assert self.USE_STRB addClkRstn(self) with self._paramsShared(): self.dataIn = AxiStream() with self._paramsShared(exclude=({"DATA_WIDTH"}, set())): o = self.dataOut = AxiStream()._m() o.DATA_WIDTH = self.OUT_DATA_WIDTH
[docs] def nextAreNotValidLogic(self, inStrb, actualItemIndx, ITEMS, ITEM_DW): res = None ITEM_W = ITEM_DW // 8 for i in range(ITEMS - 1): # -1 because if it is last we do not need this strbHi = ITEMS * ITEM_W strbLo = (i + 1) * ITEM_W othersNotValid = actualItemIndx._eq( i) & inStrb[strbHi:strbLo]._eq(0) if res is None: res = othersNotValid else: res = res | othersNotValid if res is None: return True _res = self._sig("nextAreNotValid") _res(res) return _res
[docs] def upscale(self, IN_DW, OUT_DW): if OUT_DW % IN_DW != 0: raise NotImplementedError() ITEMS = OUT_DW // IN_DW dIn = self.getDataWidthDependent(self.dataIn) dataOut = self.dataOut dOut = self.getDataWidthDependent(dataOut) itemCntr = self._reg("itemCntr", Bits(log2ceil(ITEMS + 1)), def_val=0) hs = StreamNode([self.dataIn], [dataOut]).ack() isLastItem = (itemCntr._eq(ITEMS - 1) | self.dataIn.last) vld = self.get_valid_signal(self.dataIn) outputs = {outp: [] for outp in dOut} for wordIndx in range(ITEMS): for inp, outp in zip(dIn, dOut): # generate register if is not last item s = self._sig(f"item_{wordIndx:d}_{inp._name:s}", inp._dtype) if wordIndx <= ITEMS - 1: r = self._reg(f"reg_{inp._name:s}_{wordIndx:d}", inp._dtype, def_val=0) If(hs & isLastItem, r(0) ).Elif(vld & itemCntr._eq(wordIndx), r(inp) ) If(itemCntr._eq(wordIndx), s(inp) ).Else( s(r) ) else: # last item does not need register If(itemCntr._eq(wordIndx), s(inp) ).Else( s(0) ) outputs[outp].append(s) # dataIn/dataOut hs self.get_ready_signal(self.dataIn)(self.get_ready_signal(dataOut)) self.get_valid_signal(dataOut)(vld & (isLastItem)) # connect others signals directly for inp, outp in zip(self.get_data(self.dataIn), self.get_data(dataOut)): if inp not in dIn: outp(inp) # connect data signals to utput for outp, outItems in outputs.items(): outp(Concat(*reversed(outItems))) # itemCntr next logic If(hs, If(isLastItem, itemCntr(0) ).Else( itemCntr(itemCntr + 1) ) )
[docs] def downscale(self, IN_DW, OUT_DW): if IN_DW % OUT_DW != 0: raise NotImplementedError() dOut = self.getDataWidthDependent(self.dataOut) # instantiate AxiSReg, AxiSBuilder is not used to avoid dependencies inReg = AxiSReg(self.intfCls) inReg._updateParamsFrom(self.dataIn) self.inReg = inReg inReg.clk(self.clk) inReg.rst_n(self.rst_n) inReg.dataIn(self.dataIn) dataIn = inReg.dataOut dIn = self.getDataWidthDependent(dataIn) ITEMS = IN_DW // OUT_DW itemCntr = self._reg("itemCntr", Bits(log2ceil(ITEMS + 1)), def_val=0) hs = StreamNode([dataIn], [self.dataOut]).ack() isLastItem = itemCntr._eq(ITEMS - 1) strbLastOverride = self.nextAreNotValidLogic(dataIn.strb, itemCntr, ITEMS, OUT_DW) if strbLastOverride is not True: isLastItem = isLastItem | strbLastOverride # connected item selected by itemCntr to output for inp, outp in zip(dIn, dOut): w = outp._dtype.bit_length() Switch(itemCntr)\ .add_cases([ (wordIndx, outp(inp[((wordIndx + 1) * w):(w * wordIndx)])) for wordIndx in range(ITEMS) ])\ .Default( outp(None) ) # connect others signals directly for inp, outp in zip(self.get_data(dataIn), self.get_data(self.dataOut)): if inp not in dIn and inp is not dataIn.last: outp(inp) self.dataOut.last(dataIn.last & isLastItem) self.get_ready_signal(dataIn)(self.get_ready_signal(self.dataOut) & isLastItem & dataIn.valid) self.get_valid_signal(self.dataOut)(self.get_valid_signal(dataIn)) If(hs, If(isLastItem, itemCntr(0) ).Else( itemCntr(itemCntr + 1) ) )
def _impl(self): IN_DW = int(self.DATA_WIDTH) OUT_DW = int(self.OUT_DATA_WIDTH) if IN_DW < OUT_DW: # UPSCALE self.upscale(IN_DW, OUT_DW) elif IN_DW > OUT_DW: # DOWNSCALE self.downscale(IN_DW, OUT_DW) else: # same raise AssertionError( "Input and output width are same, this instance is useless")
[docs]def _example_AxiS_resizer_upscale(): from hwtLib.amba.axis import AxiStream u = AxiS_resizer(AxiStream) u.ID_WIDTH = 3 u.DATA_WIDTH = 32 u.OUT_DATA_WIDTH = 64 return u
[docs]def _example_AxiS_resizer_downscale(): from hwtLib.amba.axis import AxiStream u = AxiS_resizer(AxiStream) u.ID_WIDTH = 3 u.DATA_WIDTH = 64 u.OUT_DATA_WIDTH = 32 return u
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = _example_AxiS_resizer_downscale() print(to_rtl_str(u))