Source code for hwtLib.amba.axi_comp.interconnect.matrixW

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from typing import Optional

from hwt.hObjList import HObjList
from hwt.hwIOs.hwIOArray import HwIOArray
from hwt.hwIOs.std import HwIODataRdVld
from hwt.hwIOs.utils import propagateClkRstn
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwtLib.amba.axi4 import Axi4
from hwtLib.amba.axi_comp.interconnect.common import AxiInterconnectCommon
from hwtLib.amba.axi_comp.interconnect.matrixAddrCrossbar import AxiInterconnectMatrixAddrCrossbar
from hwtLib.amba.axi_comp.interconnect.matrixCrossbar import AxiInterconnectMatrixCrossbar
from hwtLib.amba.axis_comp.builder import Axi4SBuilder
from hwtLib.handshaked.builder import HsBuilder
from hwtLib.handshaked.fifo import HandshakedFifo


[docs] class AxiInterconnectMatrixCrossbarB(AxiInterconnectMatrixCrossbar):
[docs] @override def get_last(self, hwIO): return 1
[docs] class AxiInterconnectMatrixW(AxiInterconnectCommon): """ Write-only AXI3/4/Lite interconnect with supports transaction overlapping and guarantees the order order of transactions on the bus .. hwt-autodoc:: example_AxiInterconnectMatrixW """ @override def hwConfig(self): AxiInterconnectCommon.hwConfig(self) self.AW_AND_W_WORD_TOGETHER = HwParam(True) @override def hwDeclr(self): AxiInterconnectCommon.hwDeclr(self, has_r=False, has_w=True) masters_for_slave = AxiInterconnectMatrixCrossbar._masters_for_slave( self.MASTERS, len(self.SLAVES)) # fifo for master index for each slave so slave knows # which master did read and where is should send it order_m_index_for_s_data: HObjList[Optional[HandshakedFifo]] = HObjList() order_m_index_for_s_b: HObjList[Optional[HandshakedFifo]] = HObjList() for connected_masters in masters_for_slave: if len(connected_masters) > 1: f_w = HandshakedFifo(HwIODataRdVld) f_b = HandshakedFifo(HwIODataRdVld) for _f in [f_w, f_b]: _f.DEPTH = self.MAX_TRANS_OVERLAP _f.DATA_WIDTH = log2ceil(len(self.MASTERS)) else: f_w, f_b = None, None order_m_index_for_s_data.append(f_w) order_m_index_for_s_b.append(f_b) self.order_m_index_for_s_data = order_m_index_for_s_data self.order_m_index_for_s_b = order_m_index_for_s_b # fifo for slave index for each master # so master knows where it should expect the data order_s_index_for_m_data: HObjList[Optional[HandshakedFifo]] = HObjList() order_s_index_for_m_b: HObjList[Optional[HandshakedFifo]] = HObjList() for connected_slaves in self.MASTERS: if len(connected_slaves) > 1: f_w = HandshakedFifo(HwIODataRdVld) f_b = HandshakedFifo(HwIODataRdVld) for f in [f_w, f_b]: f.DEPTH = self.MAX_TRANS_OVERLAP f.DATA_WIDTH = log2ceil(len(self.SLAVES)) else: f_w, f_b = None, None order_s_index_for_m_data.append(f_w) order_s_index_for_m_b.append(f_b) self.order_s_index_for_m_data = order_s_index_for_m_data self.order_s_index_for_m_b = order_s_index_for_m_b AXI = self.hwIOCls with self._hwParamsShared(): self.addr_crossbar = AxiInterconnectMatrixAddrCrossbar( AXI.AW_CLS) with self._hwParamsShared(): c = self.data_crossbar = AxiInterconnectMatrixCrossbar( AXI.W_CLS) c.INPUT_CNT = len(self.MASTERS) W_OUTPUTS = [set() for _ in self.SLAVES] for m_i, accessible_slaves in enumerate(self.MASTERS): for s_i in accessible_slaves: W_OUTPUTS[s_i].add(m_i) c.OUTPUTS = W_OUTPUTS with self._hwParamsShared(): c = self.b_crossbar = AxiInterconnectMatrixCrossbarB( AXI.B_CLS) c.INPUT_CNT = len(self.SLAVES) c.OUTPUTS = self.MASTERS @override def hwImpl(self): propagateClkRstn(self) addr_crossbar = self.addr_crossbar data_crossbar = self.data_crossbar b_crossbar = self.b_crossbar master_addr_channels = HwIOArray(m.aw for m in self.s) slave_addr_channels = HwIOArray(s.aw for s in self.m) if self.AW_AND_W_WORD_TOGETHER: slave_addr_channels = HwIOArray([Axi4SBuilder(self, aw, master_to_slave=False).buff(1).end for aw in slave_addr_channels]) addr_crossbar.s(master_addr_channels) slave_addr_channels(addr_crossbar.m) master_w_channels = HwIOArray(m.w for m in self.s) if self.AW_AND_W_WORD_TOGETHER: master_w_channels = HwIOArray(Axi4SBuilder(self, w).buff(1).end for w in master_w_channels) data_crossbar.dataIn(master_w_channels) slave_w_channels = HwIOArray(s.w for s in self.m) slave_w_channels(data_crossbar.dataOut) master_b_channels = HwIOArray(m.b for m in self.s) master_b_channels(b_crossbar.dataOut) slave_b_channels = HwIOArray(s.b for s in self.m) b_crossbar.dataIn(slave_b_channels) for addr_crossbar_s_index_out, f_w, f_b in zip( addr_crossbar.order_s_index_for_m_data_out, self.order_s_index_for_m_data, self.order_s_index_for_m_b): if f_w is None: assert f_b is None continue HsBuilder(self, addr_crossbar_s_index_out)\ .split_copy_to(f_w.dataIn, f_b.dataIn) for f_w, f_b, data_dout_for_din, b_din_for_dout in zip( self.order_s_index_for_m_data, self.order_s_index_for_m_b, data_crossbar.order_dout_index_for_din_in, b_crossbar.order_din_index_for_dout_in): if f_w is None: assert f_b is None continue data_dout_for_din(f_w.dataOut) b_din_for_dout(f_b.dataOut) for addr_crossbar_m_index_out, f_w, f_b in zip( addr_crossbar.order_m_index_for_s_data_out, self.order_m_index_for_s_data, self.order_m_index_for_s_b): if f_w is None: assert f_b is None assert addr_crossbar_m_index_out is None continue HsBuilder(self, addr_crossbar_m_index_out)\ .split_copy_to(f_w.dataIn, f_b.dataIn) for f_w, f_b, data_din_for_dout, b_dout_for_din in zip( self.order_m_index_for_s_data, self.order_m_index_for_s_b, data_crossbar.order_din_index_for_dout_in, b_crossbar.order_dout_index_for_din_in): if f_w is None: assert f_b is None assert data_din_for_dout is None continue data_din_for_dout(f_w.dataOut) b_dout_for_din(f_b.dataOut)
[docs] def example_AxiInterconnectMatrixW(): m = AxiInterconnectMatrixW(Axi4) # m.MASTERS = ({0}, ) # m.MASTERS = ({0, 1}, ) m.MASTERS = ({0, 1}, {0, 1}) # m.MASTERS = ({0, 1, 2}, ) # m.MASTERS = ({0}, {0}, {0}] # m.SLAVES = ((0x1000, 0x1000), # (0x2000, 0x1000), # (0x3000, 0x1000), # ) # m.SLAVES = ((0x1000, 0x1000), ) # m.MASTERS = ({0, 1}, {0, 1}) m.SLAVES = ((0x1000, 0x1000), (0x2000, 0x1000), ) return m
if __name__ == "__main__": from hwt.synth import to_rtl_str m = example_AxiInterconnectMatrixW() print(to_rtl_str(m))