Source code for hwtLib.abstract.busInterconnect
from typing import List, Set
from hwt.hdl.constants import READ, WRITE, READ_WRITE
from hwt.math import log2ceil, isPow2
from hwt.synthesizer.param import Param
from hwt.synthesizer.unit import Unit
[docs]class AUTO_ADDR():
"""constant which means that address should be picked automatically"""
pass
"""
:var ALL: constant used to specify that master connected to interconnect has visibility to all slaves
"""
ALL = "ALL"
[docs]class BusInterconnectUtils():
[docs] @classmethod
def _normalize_master_configs(cls, MASTERS, SLAVES):
slave_cnt = len(SLAVES)
masters = []
for m in MASTERS:
if m is ALL:
new_master_config = {
(i, READ_WRITE)
for i in range(slave_cnt)
}
else:
new_master_config = set()
for s in m:
if isinstance(s, int):
access = READ_WRITE
else:
s, access = s
assert s >= 0 and s < slave_cnt, (s, slave_cnt)
assert access in [READ_WRITE, READ, WRITE], access
new_master_config.add((s, access))
masters.append(new_master_config)
return tuple(masters)
[docs] def _normalize_slave_configs(self, SLAVES, data_width=None):
if data_width is None:
data_width = self.DATA_WIDTH
_slaves = []
maxAddr = 0
for address, size in SLAVES:
if not isPow2(size):
# it is not possible to just use prefix bits to select a destination
raise AssertionError(
"Size which is not power of 2 is suboptimal for interconnects")
if address == AUTO_ADDR:
address = maxAddr
isAligned = (address % size) == 0
if not isAligned:
address = ((address // size) + 1) * size
else:
isAligned = (address % size) == 0
if not isAligned:
raise AssertionError(
f"Offset which is not aligned to size is suboptimal 0x{address:x} 0x{size:x}")
maxAddr = max(maxAddr, address + size)
_slaves.append((address, size))
self._slaves = sorted(_slaves, key=lambda x: x[0])
# check for address space colisions
lastAddr = -1
for addr, size in self._slaves:
if lastAddr >= addr:
raise ValueError(
f"Address space on address 0x{addr:x} colliding with previous")
lastAddr = addr + size - 1
return _slaves
[docs] @classmethod
def _assert_non_overlapping(cls, slaves):
offset = 0
slaves = sorted(slaves, key=lambda x: x[0])
for addr, size in slaves:
assert addr >= offset, (addr, size)
offset = addr + size
[docs] @classmethod
def _extract_separable_group(cls, mi: int, seen_m: Set[int],
masters_connected_to: List[Set[int]],
slaves_connected_to: List[Set[int]]):
"""
Transitive enclosure of master/stalave ports on connected relation
(Extract a group of all ports which are somehow connected to each other)
"""
m_to_search = [mi, ]
g_m = set()
g_s = set()
while m_to_search:
mi = m_to_search.pop()
if mi in seen_m:
continue
g_m.add(mi)
seen_m.add(mi)
slvs = masters_connected_to[mi]
g_s.update(slvs)
for s in slvs:
m_to_search.extend(slaves_connected_to[s])
if len(g_m) == 0 and len(g_s) == 0:
return None
else:
return (g_m, g_s)
[docs] @classmethod
def _extract_separable_groups(cls, MASTERS, SLAVES, access_type):
"""
Try to find independent subgraphs in conected master-slave ports
on R/W channel separately
:param access_type: READ or WRITE
"""
masters_connected_to = [{_m[0] for _m in m
if _m[1] == access_type or _m[1] == READ_WRITE}
for m in MASTERS]
slaves_connected_to = [set() for _ in SLAVES]
for mi, slvs in enumerate(masters_connected_to):
for si in slvs:
slaves_connected_to[si].add(mi)
#for si, mstrs in enumerate(slaves_connected_to):
# if not mstrs:
# raise AssertionError(
# f"Slave {si:d} can not communcate with any master")
#for mi, slvs in enumerate(slaves_connected_to):
# if not slvs:
# raise AssertionError(
# "Maser {mi:d} can not communcate with any master")
seen_m = set()
groups = []
for mi in range(len(masters_connected_to)):
g = cls._extract_separable_group(mi, seen_m,
masters_connected_to,
slaves_connected_to)
if g is not None:
groups.append(g)
return groups
[docs]class BusInterconnect(Unit):
"""
Abstract class of bus interconnects
:ivar ~.m: HObjList of master interfaces
:ivar ~.s: HObjList of slave interfaces
"""
def _config(self):
self.SLAVES = Param(tuple())
self.MASTERS = Param(tuple())
self._config_normalized = False
[docs] def _normalize_config(self):
if self._config_normalized:
return
self.SLAVES = BusInterconnectUtils._normalize_slave_configs(
self, self.SLAVES)
BusInterconnectUtils._assert_non_overlapping(self.SLAVES)
self.MASTERS = BusInterconnectUtils._normalize_master_configs(
self.MASTERS, self.SLAVES)
self._config_normalized = True
[docs] def getOptimalAddrSize(self):
if not self._config_normalized:
self._normalize_config()
last = self.SLAVES[-1]
maxAddr = last[0] + last[1]
maxAddr -= self.DATA_WIDTH // 8
assert maxAddr >= 0
return log2ceil(maxAddr)