Source code for hwtLib.amba.axi_comp.stream_to_mem

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import Concat, If, Switch
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.bitsCastUtils import fitTo
from hwt.hdl.types.enum import HEnum
from hwt.hdl.types.struct import HStruct
from hwt.hwIOs.std import HwIODataRdVld
from hwt.hwIOs.utils import addClkRstn, propagateClkRstn
from hwt.hwModule import HwModule
from hwt.hwParam import HwParam
from hwt.pyUtils.typingFuture import override
from hwtLib.amba.axi4 import Axi4
from hwtLib.amba.axi4Lite import Axi4Lite
from hwtLib.amba.axiLite_comp.endpoint import AxiLiteEndpoint
from hwtLib.amba.constants import BURST_INCR, CACHE_DEFAULT, LOCK_DEFAULT, \
    PROT_DEFAULT, BYTES_IN_TRANS, QOS_DEFAULT
from hwtLib.types.ctypes import uint32_t
from pyMathBitPrecise.bit_utils import mask


[docs] class Axi4streamToMem(HwModule): """ Most simple DMA for AXI4 interface. * 0x0 control reg. * bit 0, rw - on/off (1 means on) * bit 1, r - idle (1 if no transaction in progress) 0x4 baseAddr Length of written data is specified by DATA_LEN. Input data is splited on smaller frames to fit MAX_BUTST_LEN. If there is transaction pending idle flag is 0, if on/off is set to 0 in this state unit continues until all data are send and then stays off. This could be use as synchronization with the software. 1) driver enables this unit, then tests while not idle. 2) then waits while idle. 3) then reads the data and back to 1 or unit is enabled and driver disables it only for the time of reading. .. hwt-autodoc:: """ @override def hwConfig(self): self.ADDR_WIDTH = HwParam(32) self.DATA_WIDTH = HwParam(32) self.CNTRL_AW = HwParam(5) # size of data which should be transferred in worlds self.DATA_LEN = HwParam(33) self.MAX_BUTST_LEN = HwParam(16) self.REGISTER_MAP = HStruct( (uint32_t, "control"), (uint32_t, "baseAddr") ) @override def hwDeclr(self): with self._hwParamsShared(): addClkRstn(self) self.axi = Axi4()._m() self.axi.HAS_R = False self.dataIn = HwIODataRdVld() cntrl = self.cntrlBus = Axi4Lite() regs = self.regsConventor = AxiLiteEndpoint(self.REGISTER_MAP) cntrl.ADDR_WIDTH = self.CNTRL_AW cntrl.DATA_WIDTH = self.DATA_WIDTH regs.ADDR_WIDTH = self.CNTRL_AW regs.DATA_WIDTH = self.DATA_WIDTH
[docs] def axiWAddrHandler(self, st, baseAddr, actualAddr, lenRem): """ AXI write addr logic """ axi = self.axi st_t = st._dtype axi.aw.valid(st._eq(st_t.writeAddr)) axi.aw.addr(actualAddr) axi.aw.id(0) axi.aw.burst(BURST_INCR) axi.aw.cache(CACHE_DEFAULT) If(lenRem > self.MAX_BUTST_LEN, axi.aw.len(self.MAX_BUTST_LEN - 1) ).Else( axi.aw.len(lenRem - 1, fit=True) ) axi.aw.lock(LOCK_DEFAULT) axi.aw.prot(PROT_DEFAULT) axi.aw.size(BYTES_IN_TRANS(self.DATA_WIDTH // 8)) axi.aw.qos(QOS_DEFAULT) # lenRem, actualAddr logic Switch(st)\ .Case(st_t.fullIdle, lenRem(self.DATA_LEN), actualAddr(baseAddr) ).Case(st_t.writeAddr, If(axi.aw.ready, If(lenRem > self.MAX_BUTST_LEN, actualAddr(actualAddr + (self.MAX_BUTST_LEN * self.DATA_WIDTH // 8)), lenRem(lenRem - self.MAX_BUTST_LEN) ).Else( actualAddr(actualAddr + fitTo(lenRem, actualAddr)), lenRem(0) ) ) )
[docs] def connectRegisters(self, st, onoff, baseAddr): """ connection of AXI-lite registers """ idle = st._eq(st._dtype.fullIdle) regs = self.regsConventor.decoded regs.control.din(Concat(onoff, idle, HBits(self.DATA_WIDTH - 2).from_py(0))) If(regs.control.dout.vld, onoff(regs.control.dout.data[0]) ) regs.baseAddr.din(baseAddr) If(regs.baseAddr.dout.vld, baseAddr(regs.baseAddr.dout.data) )
[docs] def mainFsm(self, st, onoff, lenRem, actualLenRem): axi = self.axi st_t = st._dtype w_ackAll = self.w_allAck(st) Switch(st)\ .Case(st_t.fullIdle, If(onoff, st(st_t.writeAddr) ) ).Case(st_t.writeAddr, If(axi.aw.ready, If(lenRem._eq(1), st(st_t.writeDataLast) ).Else( st(st_t.writeData) ) ) ).Case(st_t.writeData, If(w_ackAll & (actualLenRem._eq(2)), st(st_t.writeDataLast) ) ).Case(st_t.writeDataLast, If(w_ackAll, If(lenRem != 0, st(st_t.writeAddr) ).Else( st(st_t.fullIdle) ) ) )
[docs] def w_allAck(self, st): """ In this clk data word will be transferred """ st_t = st._dtype w_en = st._eq(st_t.writeData) | st._eq(st_t.writeDataLast) return w_en & self.dataIn.vld & self.axi.w.ready
[docs] def dataWFeed(self, st, lenRem, actualLenRem): """ Connection between din and axi.w channel """ w = self.axi.w din = self.dataIn st_t = st._dtype last = st._eq(st_t.writeDataLast) w_en = st._eq(st_t.writeData) | last w.valid(din.vld & w_en) w.data(din.data) w.strb(mask(w.strb._dtype.bit_length())) w.last(last) din.rd(w_en & w.ready) w_allAck = self.w_allAck(st) # actualLenRem driver Switch(st)\ .Case(st_t.writeData, If(w_allAck, actualLenRem(actualLenRem - 1) ) ).Case(st_t.writeDataLast, If(w_allAck, actualLenRem(0) ) ).Default( If(lenRem > self.MAX_BUTST_LEN, actualLenRem(self.MAX_BUTST_LEN) ).Else( actualLenRem(lenRem, fit=True) ) )
@override def hwImpl(self): propagateClkRstn(self) self.regsConventor.bus(self.cntrlBus) axi = self.axi axi.b.ready(1) # we do ignore write confirmations st_t = HEnum("state_type", ["fullIdle", "writeAddr", "writeData", "writeDataLast"]) onoff = self._reg("on_off_reg", def_val=0) baseAddr = self._reg("baseAddr_reg", HBits(self.ADDR_WIDTH), 0) st = self._reg("state_reg", st_t, st_t.fullIdle) actualAddr = self._reg("actualAddr_reg", HBits(self.ADDR_WIDTH)) lenRem = self._reg("lenRem_reg", HBits(int(self.DATA_LEN).bit_length() + 1), self.DATA_LEN) actualLenRem = self._reg("actualLenRem_reg", axi.aw.len._dtype) self.connectRegisters(st, onoff, baseAddr) self.axiWAddrHandler(st, baseAddr, actualAddr, lenRem) self.mainFsm(st, onoff, lenRem, actualLenRem) self.dataWFeed(st, lenRem, actualLenRem)
if __name__ == "__main__": from hwt.synth import to_rtl_str m = Axi4streamToMem() # m = AxiLiteRegs(Axi4streamToMem().REGISTER_MAP) print(to_rtl_str(m))