Source code for hwtLib.avalon.mmToAxi

from hwt.code import Switch, If
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.synthesizer.param import Param
from hwt.synthesizer.vectorUtils import fitTo
from hwtLib.abstract.busBridge import BusBridge
from hwtLib.amba.axi4 import Axi4, Axi4_addr
from hwtLib.amba.constants import CACHE_DEFAULT, BURST_INCR, LOCK_DEFAULT, \
    PROT_DEFAULT, QOS_DEFAULT, BYTES_IN_TRANS, RESP_DECERR, RESP_SLVERR, \
    RESP_EXOKAY, RESP_OKAY
from hwtLib.avalon.mm import AvalonMM, RESP_SLAVEERROR, RESP_DECODEERROR, RESP_OKAY as AVMM_RESP_OKAY
from hwtLib.avalon.mm_buff import AvalonMmBuff
from hwtLib.handshaked.streamNode import StreamNode


[docs]class AvalonMm_to_Axi4(BusBridge): """ Convert AvalonMm to AMBA Axi4 interface .. hwt-autodoc:: """ def _config(self) -> None: AvalonMM._config(self) self.ID_WIDTH = Param(1) def _declr(self) -> None: addClkRstn(self) with self._paramsShared(): self.s = AvalonMM() self.m: Axi4 = Axi4()._m() def _impl(self) -> None: avmm_buff = AvalonMmBuff() avmm_buff._updateParamsFrom(self.s) avmm_buff.ADDR_BUFF_DEPTH = 1 avmm_buff.DATA_BUFF_DEPTH = 1 self.avmm_buff = avmm_buff avmm_buff.s(self.s) avmm = avmm_buff.m axi = self.m for a in [axi.ar, axi.aw]: a: Axi4_addr a.addr(avmm.address) a.burst(BURST_INCR) a.cache(CACHE_DEFAULT) a.lock(LOCK_DEFAULT) if self.ID_WIDTH > 0: a.id(0) if self.MAX_BURST > 1: a.len(fitTo(avmm.burstCount - 1, a.len, shrink=False)) else: a.len(0) a.prot(PROT_DEFAULT) a.qos(QOS_DEFAULT) a.size(BYTES_IN_TRANS(self.DATA_WIDTH // 8)) if self.MAX_BURST > 1: w_last_cntr = self._reg("w_last_cntr", avmm.burstCount._dtype, def_val=0) w_last_cntr_vld = self._reg("w_last_cntr_vld", def_val=0) avmm_addr_hs = (avmm.read | avmm.write, ~avmm.waitRequest) StreamNode( [avmm_addr_hs, ], [axi.ar, axi.aw, axi.w], extraConds={ axi.ar: avmm.read, axi.aw: avmm.write & axi.w.ready & ~w_last_cntr_vld, axi.w: avmm.write & ((axi.aw.ready & ~w_last_cntr_vld) | w_last_cntr_vld), }, skipWhen={ axi.ar:~avmm.read, axi.aw:~avmm.write | w_last_cntr_vld, axi.w:~avmm.write, } ).sync() If(avmm.write & axi.aw.ready & axi.w.ready & ~w_last_cntr_vld, w_last_cntr(avmm.burstCount - 2), w_last_cntr_vld(avmm.burstCount > 1), ).Elif(avmm.write & axi.w.ready & (axi.aw.ready | w_last_cntr_vld), If(w_last_cntr != 0, w_last_cntr(w_last_cntr - 1), ).Else( w_last_cntr_vld(0), ) ) If(w_last_cntr_vld, axi.w.last(w_last_cntr._eq(0)) ).Else( axi.w.last(avmm.burstCount._eq(1)) ) else: StreamNode( [(avmm.read | avmm.write, ~avmm.waitRequest)], [axi.ar, axi.aw, axi.w], extraConds={ axi.ar: avmm.read, axi.aw: avmm.write & axi.w.ready, axi.w: avmm.write & axi.w.ready, }, skipWhen={ axi.ar:~avmm.read, axi.aw:~avmm.write, axi.w:~avmm.write, } ).sync() axi.w.last(1) axi.w.data(avmm.writeData) axi.w.strb(avmm.byteEnable) avmm.readData(axi.r.data) avmm.readDataValid(axi.r.valid) axi.r.ready(1) Switch(axi.b.resp)\ .Case(RESP_OKAY, avmm.response(AVMM_RESP_OKAY), ).Case(RESP_EXOKAY, avmm.response(AVMM_RESP_OKAY), ).Case(RESP_SLVERR, avmm.response(RESP_SLAVEERROR), ).Case(RESP_DECERR, avmm.response(RESP_DECODEERROR), ).Default( avmm.response(None) ) avmm.writeResponseValid(axi.b.valid) axi.b.ready(1) propagateClkRstn(self)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = AvalonMm_to_Axi4() u.MAX_BURST = 2 ** 7 - 1 u.DATA_WIDTH = 256 u.ADDR_WIDTH = 26 print(to_rtl_str(u))