#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.interfaces.std import Handshaked
from hwt.interfaces.utils import propagateClkRstn
from hwt.math import log2ceil
from hwt.synthesizer.hObjList import HObjList
from hwt.synthesizer.param import Param
from hwtLib.amba.axi4 import Axi4
from hwtLib.amba.axi_comp.interconnect.common import AxiInterconnectCommon
from hwtLib.amba.axi_comp.interconnect.matrixAddrCrossbar import AxiInterconnectMatrixAddrCrossbar
from hwtLib.amba.axi_comp.interconnect.matrixCrossbar import AxiInterconnectMatrixCrossbar
from hwtLib.handshaked.builder import HsBuilder
from hwtLib.handshaked.fifo import HandshakedFifo
from hwtLib.amba.axis_comp.builder import AxiSBuilder
[docs]class AxiInterconnectMatrixCrossbarB(AxiInterconnectMatrixCrossbar):
[docs] def get_last(self, intf):
return 1
[docs]class AxiInterconnectMatrixW(AxiInterconnectCommon):
"""
Write-only AXI3/4/Lite interconnect with supports transaction overlapping
and guarantees the order order of transactions on the bus
.. hwt-autodoc:: example_AxiInterconnectMatrixW
"""
def _config(self):
AxiInterconnectCommon._config(self)
self.AW_AND_W_WORD_TOGETHER = Param(True)
def _declr(self):
AxiInterconnectCommon._declr(self, has_r=False, has_w=True)
masters_for_slave = AxiInterconnectMatrixCrossbar._masters_for_slave(
self.MASTERS, len(self.SLAVES))
# fifo for master index for each slave so slave knows
# which master did read and where is should send it
order_m_index_for_s_data = HObjList()
order_m_index_for_s_b = HObjList()
for connected_masters in masters_for_slave:
if len(connected_masters) > 1:
f_w = HandshakedFifo(Handshaked)
f_b = HandshakedFifo(Handshaked)
for _f in [f_w, f_b]:
_f.DEPTH = self.MAX_TRANS_OVERLAP
_f.DATA_WIDTH = log2ceil(len(self.MASTERS))
else:
f_w, f_b = None, None
order_m_index_for_s_data.append(f_w)
order_m_index_for_s_b.append(f_b)
self.order_m_index_for_s_data = order_m_index_for_s_data
self.order_m_index_for_s_b = order_m_index_for_s_b
# fifo for slave index for each master
# so master knows where it should expect the data
order_s_index_for_m_data = HObjList()
order_s_index_for_m_b = HObjList()
for connected_slaves in self.MASTERS:
if len(connected_slaves) > 1:
f_w = HandshakedFifo(Handshaked)
f_b = HandshakedFifo(Handshaked)
for f in [f_w, f_b]:
f.DEPTH = self.MAX_TRANS_OVERLAP
f.DATA_WIDTH = log2ceil(len(self.SLAVES))
else:
f_w, f_b = None, None
order_s_index_for_m_data.append(f_w)
order_s_index_for_m_b.append(f_b)
self.order_s_index_for_m_data = order_s_index_for_m_data
self.order_s_index_for_m_b = order_s_index_for_m_b
AXI = self.intfCls
with self._paramsShared():
self.addr_crossbar = AxiInterconnectMatrixAddrCrossbar(
AXI.AW_CLS)
with self._paramsShared():
c = self.data_crossbar = AxiInterconnectMatrixCrossbar(
AXI.W_CLS)
c.INPUT_CNT = len(self.MASTERS)
W_OUTPUTS = [set() for _ in self.SLAVES]
for m_i, accessible_slaves in enumerate(self.MASTERS):
for s_i in accessible_slaves:
W_OUTPUTS[s_i].add(m_i)
c.OUTPUTS = W_OUTPUTS
with self._paramsShared():
c = self.b_crossbar = AxiInterconnectMatrixCrossbarB(
AXI.B_CLS)
c.INPUT_CNT = len(self.SLAVES)
c.OUTPUTS = self.MASTERS
def _impl(self):
propagateClkRstn(self)
addr_crossbar = self.addr_crossbar
data_crossbar = self.data_crossbar
b_crossbar = self.b_crossbar
master_addr_channels = HObjList([m.aw for m in self.s])
slave_addr_channels = HObjList([s.aw for s in self.m])
if self.AW_AND_W_WORD_TOGETHER:
slave_addr_channels = HObjList([AxiSBuilder(self, aw, master_to_slave=False).buff(1).end for aw in slave_addr_channels])
addr_crossbar.s(master_addr_channels)
slave_addr_channels(addr_crossbar.m)
master_w_channels = HObjList([m.w for m in self.s])
if self.AW_AND_W_WORD_TOGETHER:
master_w_channels = HObjList([AxiSBuilder(self, w).buff(1).end for w in master_w_channels])
data_crossbar.dataIn(master_w_channels)
slave_w_channels = HObjList([s.w for s in self.m])
slave_w_channels(data_crossbar.dataOut)
master_b_channels = HObjList([m.b for m in self.s])
master_b_channels(b_crossbar.dataOut)
slave_b_channels = HObjList([s.b for s in self.m])
b_crossbar.dataIn(slave_b_channels)
for addr_crossbar_s_index_out, f_w, f_b in zip(
addr_crossbar.order_s_index_for_m_data_out,
self.order_s_index_for_m_data,
self.order_s_index_for_m_b):
if f_w is None:
assert f_b is None
continue
HsBuilder(self, addr_crossbar_s_index_out)\
.split_copy_to(f_w.dataIn, f_b.dataIn)
for f_w, f_b, data_dout_for_din, b_din_for_dout in zip(
self.order_s_index_for_m_data,
self.order_s_index_for_m_b,
data_crossbar.order_dout_index_for_din_in,
b_crossbar.order_din_index_for_dout_in):
if f_w is None:
assert f_b is None
continue
data_dout_for_din(f_w.dataOut)
b_din_for_dout(f_b.dataOut)
for addr_crossbar_m_index_out, f_w, f_b in zip(
addr_crossbar.order_m_index_for_s_data_out,
self.order_m_index_for_s_data,
self.order_m_index_for_s_b):
if f_w is None:
assert f_b is None
assert addr_crossbar_m_index_out is None
continue
HsBuilder(self, addr_crossbar_m_index_out)\
.split_copy_to(f_w.dataIn, f_b.dataIn)
for f_w, f_b, data_din_for_dout, b_dout_for_din in zip(
self.order_m_index_for_s_data,
self.order_m_index_for_s_b,
data_crossbar.order_din_index_for_dout_in,
b_crossbar.order_dout_index_for_din_in):
if f_w is None:
assert f_b is None
assert data_din_for_dout is None
continue
data_din_for_dout(f_w.dataOut)
b_dout_for_din(f_b.dataOut)
[docs]def example_AxiInterconnectMatrixW():
u = AxiInterconnectMatrixW(Axi4)
# u.MASTERS = ({0}, )
# u.MASTERS = ({0, 1}, )
u.MASTERS = ({0, 1}, {0, 1})
# u.MASTERS = ({0, 1, 2}, )
# u.MASTERS = ({0}, {0}, {0}]
# u.SLAVES = ((0x1000, 0x1000),
# (0x2000, 0x1000),
# (0x3000, 0x1000),
# )
# u.SLAVES = ((0x1000, 0x1000), )
# u.MASTERS = ({0, 1}, {0, 1})
u.SLAVES = ((0x1000, 0x1000),
(0x2000, 0x1000),
)
return u
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
u = example_AxiInterconnectMatrixW()
print(to_rtl_str(u))