#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Dict, Set, Optional
from hwt.code import Or, Switch
from hwt.code_utils import rename_signal
from hwt.hwIO import HwIO
from hwt.hwIOs.hwIOArray import HwIOArray
from hwt.hwIOs.std import HwIODataRdVld
from hwt.hwIOs.utils import addClkRstn, propagateClkRstn
from hwt.hwModule import HwModule
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwtLib.amba.axi4 import Axi4
[docs]
class AxiInterconnectMatrixCrossbar(HwModule):
"""
Crossbar for AXI-Stream like interfaces where internal switch box
can be driven by
.. hwt-autodoc:: example_AxiInterconnectMatrixCrossbar
"""
[docs]
def __init__(self, hwIOCls, hdlName:Optional[str]=None):
self.hwIOCls = hwIOCls
super(AxiInterconnectMatrixCrossbar, self).__init__(hdlName=hdlName)
[docs]
@staticmethod
def _masters_for_slave(masters, slave_cnt) -> Dict[int, Set[int]]:
masters_for_slave = [set() for _ in range(slave_cnt)]
for m_i, slvs in enumerate(masters):
for s_i in slvs:
masters_for_slave[s_i].add(m_i)
return masters_for_slave
@override
def hwConfig(self):
self.HWIO_CLS = HwParam(self.hwIOCls)
self.INPUT_CNT = HwParam(1)
# set of input indexes for each output
self.OUTPUTS = HwParam([{0}])
self.hwIOCls.hwConfig(self)
@override
def hwDeclr(self):
addClkRstn(self)
HWIO_CLS = self.hwIOCls
INPUT_CNT = self.INPUT_CNT
OUTPUT_CNT = len(self.OUTPUTS)
self.OUTS_FOR_IN = self._masters_for_slave(self.OUTPUTS, self.INPUT_CNT)
with self._hwParamsShared():
self.dataIn = HwIOArray([
HWIO_CLS()
for _ in range(INPUT_CNT)])
with self._hwParamsShared():
self.dataOut = HwIOArray([
HWIO_CLS()
for _ in range(OUTPUT_CNT)])._m()
# master index for each slave so slave knows
# which master did read and where is should send it
order_dout_index_for_din_in = HwIOArray()
for connected_outs in self.OUTS_FOR_IN:
if len(connected_outs) > 1:
f = HwIODataRdVld()
f.DATA_WIDTH = log2ceil(OUTPUT_CNT)
else:
f = None
order_dout_index_for_din_in.append(f)
self.order_dout_index_for_din_in = order_dout_index_for_din_in
order_din_index_for_dout_in = HwIOArray()
# slave index for each master
# so master knows where it should expect the data
for connected_ins in self.OUTPUTS:
if len(connected_ins) > 1:
f = HwIODataRdVld()
f.DATA_WIDTH = log2ceil(INPUT_CNT)
else:
f = None
order_din_index_for_dout_in.append(f)
self.order_din_index_for_dout_in = order_din_index_for_dout_in
[docs]
def get_last(self, hwIO: HwIO):
return hwIO.last
[docs]
def handler_din_rd(self, dataOut_channels,
dataIn_channels,
order_dout_index_for_din,
order_din_index_for_dout):
for din_i, (order_m_for_s, dataIn, connected_outputs) in enumerate(zip(
order_dout_index_for_din, dataIn_channels, self.OUTS_FOR_IN)):
selected_dataOut_ready = []
for dout_i, (d, order_s_for_m) in enumerate(zip(dataOut_channels,
order_din_index_for_dout)):
if dout_i not in connected_outputs:
continue
rd = d.ready
if order_m_for_s is not None:
rd = rd & order_m_for_s.data._eq(dout_i)
if order_s_for_m is not None:
rd = rd & order_s_for_m.data._eq(din_i)\
& order_s_for_m.vld
selected_dataOut_ready.append(rd)
assert selected_dataOut_ready, (
dataIn,
"entirely disconnected from crossbar,"
" this should have been handled before")
selected_dataOut_ready = rename_signal(
self, Or(*selected_dataOut_ready),
f"dataIn_{din_i:d}_selected_dataOut_ready")
if order_m_for_s is not None:
dataIn.ready(order_m_for_s.vld & selected_dataOut_ready)
order_m_for_s.rd(dataIn.valid & self.get_last(dataIn) &
selected_dataOut_ready)
else:
dataIn.ready(selected_dataOut_ready)
[docs]
def handler_dout_vld(self, dataOut_channels,
dataIn_channels,
order_dout_index_for_din,
order_din_index_for_dout):
for dout_i, (dataOut, order_s_for_m, connected_inputs) in enumerate(zip(
dataOut_channels, order_din_index_for_dout, self.OUTPUTS)):
selected_dataIn_valid = []
for din_i, (dataIn, order_m_for_s) in enumerate(zip(
dataIn_channels, order_dout_index_for_din)):
if din_i not in connected_inputs:
continue
vld = dataIn.valid
if order_s_for_m is not None:
vld = vld & order_s_for_m.data._eq(din_i)
if order_m_for_s is not None:
vld = vld & order_m_for_s.vld \
& order_m_for_s.data._eq(dout_i)
selected_dataIn_valid.append(vld)
assert selected_dataIn_valid, (dataOut, "entirely disconnected from crossbar, this should have been handled before")
selected_dataIn_valid = rename_signal(
self, Or(*selected_dataIn_valid),
f"dataOut_{dout_i:d}_selected_dataIn_valid")
if order_s_for_m is None:
dataOut.valid(selected_dataIn_valid)
else:
dataOut.valid(selected_dataIn_valid & order_s_for_m.vld)
selected_dataIn_last = []
for din_i, (dataIn, order_m_for_s) in enumerate(zip(
dataIn_channels, order_dout_index_for_din)):
if din_i not in connected_inputs:
continue
last = dataIn.valid & self.get_last(dataIn)
if order_s_for_m is not None:
last = last & order_s_for_m.vld\
& order_s_for_m.data._eq(din_i)
selected_dataIn_last.append(last)
selected_dataIn_last = rename_signal(
self, Or(*selected_dataIn_last),
f"dataOut_{dout_i:d}_selected_dataIn_last")
if order_s_for_m is not None:
order_s_for_m.rd(
selected_dataIn_valid
& selected_dataIn_last
& dataOut.ready)
[docs]
def handler_data_mux(self, dataOut_channels,
dataIn_channels,
order_din_index_for_dout):
for out_i, (dataOut, order_s_for_m) in enumerate(zip(dataOut_channels,
order_din_index_for_dout)):
if order_s_for_m is None:
connected_in = self.OUTPUTS[out_i]
assert len(connected_in) == 1, connected_in
connected_in = list(connected_in)[0]
dataOut(dataIn_channels[connected_in],
exclude={dataOut.valid, dataOut.ready})
else:
cases = []
for si, s in enumerate(dataIn_channels):
cases.append(
(si, dataOut(s, exclude={s.valid, s.ready})))
Switch(order_s_for_m.data)\
.add_cases(cases)\
.Default(
s(None)
for s in dataOut._hwIOs
if s not in {dataOut.valid, dataOut.ready}
)
[docs]
def connection_handler_N_to_M(
self,
dataOut_channels,
dataIn_channels,
order_dout_index_for_din,
order_din_index_for_dout):
self.handler_din_rd(dataOut_channels, dataIn_channels,
order_dout_index_for_din, order_din_index_for_dout)
self.handler_dout_vld(dataOut_channels, dataIn_channels,
order_dout_index_for_din, order_din_index_for_dout)
self.handler_data_mux(dataOut_channels, dataIn_channels,
order_din_index_for_dout)
@override
def hwImpl(self):
propagateClkRstn(self)
dataOut_channels = self.dataOut
dataIn_channels = self.dataIn
# N:M,
# for each master use slave index from order_dout_index_for_din
# to select the slave for each master interface
# and then wait until there is not index of this master
# on top of order_din_index_for_dout
# N:1,
# channel is connected directly to all slaves with data order synchronization
# driven by order_din_index_for_dout
# 1:N, add logic which enables communication with slave only
# if it is selected by address
# 1:1, just connect
self.connection_handler_N_to_M(
dataOut_channels,
dataIn_channels,
self.order_dout_index_for_din_in,
self.order_din_index_for_dout_in)
[docs]
def example_AxiInterconnectMatrixCrossbar():
dut = AxiInterconnectMatrixCrossbar(Axi4.R_CLS)
dut.INPUT_CNT = 2
dut.OUTPUTS = [{0, 1}, {0, 1}]
return dut
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = example_AxiInterconnectMatrixCrossbar()
print(to_rtl_str(m))