#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional
from hwt.code import If
from hwt.hwIOs.std import HwIORdVldSync, HwIOVectSignal
from hwt.hwIOs.utils import addClkRstn, propagateClkRstn
from hwt.hwParam import HwParam
from hwt.pyUtils.typingFuture import override
from hwtLib.abstract.busBridge import BusBridge
from hwtLib.amba.axi4 import Axi4, Axi4_addr
from hwtLib.amba.axi4Lite import Axi4Lite, Axi4Lite_addr
from hwtLib.amba.axi_comp.buff import AxiBuff
from hwtLib.amba.constants import PROT_DEFAULT
from hwtLib.handshaked.fifo import HandshakedFifo
from hwtLib.handshaked.streamNode import StreamNode
[docs]
class HandshakedIdAndLen(HwIORdVldSync):
"""
.. hwt-autodoc::
"""
@override
def hwConfig(self):
self.ID_WIDTH = HwParam(4)
self.LEN_WIDTH = HwParam(8)
@override
def hwDeclr(self):
if self.ID_WIDTH > 0:
self.id = HwIOVectSignal(self.ID_WIDTH)
self.len = HwIOVectSignal(self.LEN_WIDTH)
super(HandshakedIdAndLen, self).hwDeclr()
[docs]
class Axi_to_AxiLite(BusBridge):
"""
AXI3/4 -> Axi4Lite bridge
:attention: AXI interfaces works in read first mode, overlapping transactions
are not checked to end up in proper r/w order
:attention: only last response code on AxiLite for transaction is used as a response code for Axi4
That means if the error appears somewhere in middle beat of the transaction the error is ignored
:ivar ~.MAX_TRANS_OVERLAP: depth of internal FIFO which is used to allow the transactions
to overlap each other in order to pipeline the execution of transactions
.. hwt-autodoc::
"""
[docs]
def __init__(self, hwIOCls=Axi4, hdlName:Optional[str]=None):
self.hwIOCls = hwIOCls
super(Axi_to_AxiLite, self).__init__(hdlName=hdlName)
@override
def hwConfig(self):
self.hwIOCls.hwConfig(self)
self.MAX_TRANS_OVERLAP = HwParam(4)
@override
def hwDeclr(self):
addClkRstn(self)
with self._hwParamsShared():
self.s = self.hwIOCls()
self.m = Axi4Lite()._m()
# *_req_fifo are used to aviod blocking during addr/data/confirmation waiting on axi channels
r_f = self.r_req_fifo = HandshakedFifo(HandshakedIdAndLen)
w_f = self.w_req_fifo = HandshakedFifo(HandshakedIdAndLen)
for f in [w_f, r_f]:
f.ID_WIDTH = self.ID_WIDTH
f.LEN_WIDTH = self.hwIOCls.LEN_WIDTH
f.DEPTH = self.MAX_TRANS_OVERLAP
with self._hwParamsShared():
self.out_reg = AxiBuff(Axi4Lite)
self.in_reg = AxiBuff(self.hwIOCls)
self.in_reg.DATA_BUFF_DEPTH = \
self.in_reg.ADDR_BUFF_DEPTH = \
self.out_reg.DATA_BUFF_DEPTH = \
self.out_reg.ADDR_BUFF_DEPTH = 1
[docs]
def gen_addr_logic(self, addr_ch_in: Axi4_addr,
addr_ch_out: Axi4Lite_addr,
req_fifo_inp: HandshakedIdAndLen):
"""
Instanciate logic which splits the transactions to a beats on AxiLite interface
and propagate informations about the transacttions to req_fifo_inp for later use
"""
name_prefix = addr_ch_in._name + "_"
len_rem = self._reg(name_prefix + "len_rem", addr_ch_in.len._dtype, def_val=0)
# len_rem_vld is valid only if len > 0 otherwise transaction is processed
# without lem_rem care
len_rem_vld = self._reg(name_prefix + "len_rem_vld", def_val=0)
addr_step = self.DATA_WIDTH // 8
actual_addr = self._reg(name_prefix + "actual_addr", addr_ch_in.addr._dtype)
If(len_rem_vld,
addr_ch_out.addr(actual_addr),
addr_ch_in.ready(0), # because we need to process pending req. first
addr_ch_out.valid(1),
If(addr_ch_out.ready,
# move on next beat
actual_addr(actual_addr + addr_step),
If(len_rem._eq(0),
len_rem_vld(0),
),
len_rem(len_rem - 1),
),
).Else(
addr_ch_out.addr(addr_ch_in.addr),
# have to store request to register if it is longer than
# a single beat
actual_addr(addr_ch_out.addr + addr_step),
len_rem(addr_ch_in.len - 1),
len_rem_vld(addr_ch_in.valid & (addr_ch_in.len != 0) & addr_ch_out.ready & req_fifo_inp.rd),
# directly pass this first beat
StreamNode([addr_ch_in], [addr_ch_out]).sync(req_fifo_inp.rd),
)
addr_ch_out.prot(PROT_DEFAULT)
# push new request to req_fifo only on start of new requirest
req_fifo_inp.vld(~len_rem_vld & addr_ch_in.valid & addr_ch_out.ready)
if self.ID_WIDTH:
req_fifo_inp.id(addr_ch_in.id)
req_fifo_inp.len(addr_ch_in.len)
[docs]
def gen_w_logic(self, w_in, w_out):
"""
Directly connect the w channels with ignore of extra signals
(The data should be already synchronized by order of beats on channel)
"""
ignored = {w_in.last}
if hasattr(w_in, "id"):
ignored.add(w_in.id)
w_out(w_in, exclude=ignored)
[docs]
def gen_b_or_r_logic(self, inp, outp, fifo_out, propagete_only_on_last):
"""
Use counter to skip intermediate generated transactions
and pass only confirmation from last beat of the original transaction
"""
name_prefix = outp._name
rem = self._reg(name_prefix + "rem", self.s.aw.len._dtype)
if self.ID_WIDTH:
id_tmp = self._reg(name_prefix + "id_tmp", outp.id._dtype)
rem_vld = self._reg(name_prefix + "rem_vld", def_val=0)
StreamNode(
[inp],
[outp],
extraConds={
outp: rem_vld & rem._eq(0) if propagete_only_on_last else rem_vld,
inp: rem_vld,
}
).sync()
If(rem_vld,
fifo_out.rd(inp.valid & outp.ready & rem._eq(0)),
If(inp.valid & outp.ready,
# now processing next beat
If(rem != 0,
# this was not the last beat
rem(rem - 1)
).Elif(fifo_out.vld,
# this was the last beat and we can directly start new one
rem(fifo_out.len),
id_tmp(fifo_out.id) if self.ID_WIDTH else [],
).Else(
# this was the last beat and there is no next transaction
rem_vld(0),
)
),
).Else(
# in iddle store the information from b_fifo
rem(fifo_out.len),
id_tmp(fifo_out.id) if self.ID_WIDTH else [],
rem_vld(fifo_out.vld),
fifo_out.rd(1),
)
already_connected = {outp.valid, outp.ready}
if self.ID_WIDTH:
outp.id(id_tmp)
already_connected.add(outp.id)
if hasattr(outp, "last"):
outp.last(rem._eq(0) & rem_vld)
already_connected.add(outp.last)
outp(inp, exclude=already_connected)
@override
def hwImpl(self):
m, s = self.in_reg.m, self.out_reg.s
w_fifo, r_fifo = self.w_req_fifo, self.r_req_fifo
propagateClkRstn(self)
self.in_reg.s(self.s)
self.gen_addr_logic(m.ar, s.ar, r_fifo.dataIn)
self.gen_addr_logic(m.aw, s.aw, w_fifo.dataIn)
self.gen_w_logic(m.w, s.w)
self.gen_b_or_r_logic(s.r, m.r, r_fifo.dataOut, False)
self.gen_b_or_r_logic(s.b, m.b, w_fifo.dataOut, True)
self.m(self.out_reg.m)
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = Axi_to_AxiLite()
print(to_rtl_str(m))