Source code for hwtLib.amba.axi_comp.stream_to_mem

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import Concat, If, Switch
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.enum import HEnum
from hwt.hdl.types.struct import HStruct
from hwt.interfaces.std import Handshaked
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.synthesizer.param import Param
from hwt.synthesizer.unit import Unit
from hwt.synthesizer.vectorUtils import fitTo
from hwtLib.amba.axi4 import Axi4
from hwtLib.amba.axi4Lite import Axi4Lite
from hwtLib.amba.axiLite_comp.endpoint import AxiLiteEndpoint
from hwtLib.amba.constants import BURST_INCR, CACHE_DEFAULT, LOCK_DEFAULT, \
    PROT_DEFAULT, BYTES_IN_TRANS, QOS_DEFAULT
from hwtLib.types.ctypes import uint32_t
from pyMathBitPrecise.bit_utils import mask


[docs]class Axi4streamToMem(Unit): """ Most simple DMA for AXI4 interface. * 0x0 control reg. * bit 0, rw - on/off (1 means on) * bit 1, r - idle (1 if no transaction in progress) 0x4 baseAddr Length of written data is specified by DATA_LEN. Input data is splited on smaller frames to fit MAX_BUTST_LEN. If there is transaction pending idle flag is 0, if on/off is set to 0 in this state unit continues until all data are send and then stays off. This could be use as synchronization with the software. 1) driver enables this unit, then tests while not idle. 2) then waits while idle. 3) then reads the data and back to 1 or unit is enabled and driver disables it only for the time of reading. .. hwt-autodoc:: """ def _config(self): self.ADDR_WIDTH = Param(32) self.DATA_WIDTH = Param(32) self.CNTRL_AW = Param(5) # size of data which should be transfered in worlds self.DATA_LEN = Param(33) self.MAX_BUTST_LEN = Param(16) self.REGISTER_MAP = HStruct( (uint32_t, "control"), (uint32_t, "baseAddr") ) def _declr(self): with self._paramsShared(): addClkRstn(self) self.axi = Axi4()._m() self.axi.HAS_R = False self.dataIn = Handshaked() cntrl = self.cntrlBus = Axi4Lite() regs = self.regsConventor = AxiLiteEndpoint(self.REGISTER_MAP) cntrl.ADDR_WIDTH = self.CNTRL_AW cntrl.DATA_WIDTH = self.DATA_WIDTH regs.ADDR_WIDTH = self.CNTRL_AW regs.DATA_WIDTH = self.DATA_WIDTH
[docs] def axiWAddrHandler(self, st, baseAddr, actualAddr, lenRem): """ AXI write addr logic """ axi = self.axi st_t = st._dtype axi.aw.valid(st._eq(st_t.writeAddr)) axi.aw.addr(actualAddr) axi.aw.id(0) axi.aw.burst(BURST_INCR) axi.aw.cache(CACHE_DEFAULT) If(lenRem > self.MAX_BUTST_LEN, axi.aw.len(self.MAX_BUTST_LEN - 1) ).Else( axi.aw.len(lenRem - 1, fit=True) ) axi.aw.lock(LOCK_DEFAULT) axi.aw.prot(PROT_DEFAULT) axi.aw.size(BYTES_IN_TRANS(self.DATA_WIDTH // 8)) axi.aw.qos(QOS_DEFAULT) # lenRem, actualAddr logic Switch(st)\ .Case(st_t.fullIdle, lenRem(self.DATA_LEN), actualAddr(baseAddr) ).Case(st_t.writeAddr, If(axi.aw.ready, If(lenRem > self.MAX_BUTST_LEN, actualAddr(actualAddr + (self.MAX_BUTST_LEN * self.DATA_WIDTH // 8)), lenRem(lenRem - self.MAX_BUTST_LEN) ).Else( actualAddr(actualAddr + fitTo(lenRem, actualAddr)), lenRem(0) ) ) )
[docs] def connectRegisters(self, st, onoff, baseAddr): """ connection of AXI-lite registers """ idle = st._eq(st._dtype.fullIdle) regs = self.regsConventor.decoded regs.control.din(Concat(onoff, idle, Bits(self.DATA_WIDTH - 2).from_py(0))) If(regs.control.dout.vld, onoff(regs.control.dout.data[0]) ) regs.baseAddr.din(baseAddr) If(regs.baseAddr.dout.vld, baseAddr(regs.baseAddr.dout.data) )
[docs] def mainFsm(self, st, onoff, lenRem, actualLenRem): axi = self.axi st_t = st._dtype w_ackAll = self.w_allAck(st) Switch(st)\ .Case(st_t.fullIdle, If(onoff, st(st_t.writeAddr) ) ).Case(st_t.writeAddr, If(axi.aw.ready, If(lenRem._eq(1), st(st_t.writeDataLast) ).Else( st(st_t.writeData) ) ) ).Case(st_t.writeData, If(w_ackAll & (actualLenRem._eq(2)), st(st_t.writeDataLast) ) ).Case(st_t.writeDataLast, If(w_ackAll, If(lenRem != 0, st(st_t.writeAddr) ).Else( st(st_t.fullIdle) ) ) )
[docs] def w_allAck(self, st): """ In this clk data word will be transfered """ st_t = st._dtype w_en = st._eq(st_t.writeData) | st._eq(st_t.writeDataLast) return w_en & self.dataIn.vld & self.axi.w.ready
[docs] def dataWFeed(self, st, lenRem, actualLenRem): """ Connection between din and axi.w channel """ w = self.axi.w din = self.dataIn st_t = st._dtype last = st._eq(st_t.writeDataLast) w_en = st._eq(st_t.writeData) | last w.valid(din.vld & w_en) w.data(din.data) w.strb(mask(w.strb._dtype.bit_length())) w.last(last) din.rd(w_en & w.ready) w_allAck = self.w_allAck(st) # actualLenRem driver Switch(st)\ .Case(st_t.writeData, If(w_allAck, actualLenRem(actualLenRem - 1) ) ).Case(st_t.writeDataLast, If(w_allAck, actualLenRem(0) ) ).Default( If(lenRem > self.MAX_BUTST_LEN, actualLenRem(self.MAX_BUTST_LEN) ).Else( actualLenRem(lenRem, fit=True) ) )
def _impl(self): propagateClkRstn(self) self.regsConventor.bus(self.cntrlBus) axi = self.axi axi.b.ready(1) # we do ignore write confirmations st_t = HEnum("state_type", ["fullIdle", "writeAddr", "writeData", "writeDataLast"]) onoff = self._reg("on_off_reg", def_val=0) baseAddr = self._reg("baseAddr_reg", Bits(self.ADDR_WIDTH), 0) st = self._reg("state_reg", st_t, st_t.fullIdle) actualAddr = self._reg("actualAddr_reg", Bits(self.ADDR_WIDTH)) lenRem = self._reg("lenRem_reg", Bits(int(self.DATA_LEN).bit_length() + 1), self.DATA_LEN) actualLenRem = self._reg("actualLenRem_reg", axi.aw.len._dtype) self.connectRegisters(st, onoff, baseAddr) self.axiWAddrHandler(st, baseAddr, actualAddr, lenRem) self.mainFsm(st, onoff, lenRem, actualLenRem) self.dataWFeed(st, lenRem, actualLenRem)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = Axi4streamToMem() # u = AxiLiteRegs(Axi4streamToMem().REGISTER_MAP) print(to_rtl_str(u))