from builtins import isinstance
from copy import copy
from math import ceil
from typing import Union, Tuple
from hwt.code import Switch, Concat
from hwt.hdl.constants import INTF_DIRECTION
from hwt.hdl.frameTmpl import FrameTmpl
from hwt.hdl.transTmpl import TransTmpl
from hwt.hdl.types.array import HArray
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.hdlType import HdlType
from hwt.hdl.types.struct import HStruct, HStructField
from hwt.hdl.types.structUtils import field_path_get_type, HdlType_select
from hwt.interfaces.intf_map import IntfMap_get_by_field_path, IntfMap, \
walkStructIntfAndIntfMap, HTypeFromIntfMap
from hwt.interfaces.std import BramPort_withoutClk, RegCntrl, Signal, VldSynced
from hwt.interfaces.structIntf import StructIntf
from hwt.interfaces.unionIntf import UnionSink, UnionSource
from hwt.interfaces.utils import addClkRstn
from hwt.math import log2ceil, inRange, isPow2
from hwt.synthesizer.hObjList import HObjList
from hwt.synthesizer.rtlLevel.mainBases import RtlSignalBase
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.typePath import TypePath
from hwt.synthesizer.unit import Unit
from hwtLib.abstract.addressStepTranslation import AddressStepTranslation
from ipCorePackager.constants import DIRECTION
[docs]def TransTmpl_get_min_addr(t: TransTmpl):
if not t.children:
return t.bitAddr
elif isinstance(t.children, list) and t.children:
for c in t.children:
r = TransTmpl_get_min_addr(c)
if r is not None:
return r
return None
else:
return TransTmpl_get_min_addr(t.children)
[docs]def TransTmpl_get_max_addr(t: TransTmpl):
if t.itemCnt is None:
offset = 0
else:
item_size = t.bit_length() // t.itemCnt
offset = t.bitAddr + (t.itemCnt - 1) * item_size
if not t.children:
return t.bitAddrEnd
elif isinstance(t.children, list) and t.children:
for c in reversed(t.children):
r = TransTmpl_get_max_addr(c)
if r is not None:
return offset + r
return None
else:
return offset + TransTmpl_get_max_addr(t.children)
[docs]class BusEndpoint(Unit):
"""
Abstract unit
Delegate request from bus to fields of structure
(fields are represented by various interfaces)
write has higher priority
:note: implementation is usually address decoder
.. figure:: ./_static/BusEndpoint.png
"""
[docs] def __init__(self, structTemplate, intfCls=None, shouldEnterFn=None):
"""
:param structTemplate: instance of HStruct which describes
address space of this endpoint
:param intfCls: class of bus interface which should be used
:param shouldEnterFn: function(root_t, structFieldPath) return (shouldEnter, shouldUse)
where shouldEnter is flag that means iterator over this interface
should look inside of this actual object
and shouldUse flag means that this field should be used
(to create interface)
"""
assert intfCls is not None, "intfCls has to be specified"
self._intfCls = intfCls
self.STRUCT_TEMPLATE = structTemplate
if shouldEnterFn is None:
self.shouldEnterFn = self._defaultShouldEnterFn
else:
self.shouldEnterFn = shouldEnterFn
Unit.__init__(self)
[docs] @staticmethod
def _defaultShouldEnterFn(root: HdlType, field_path: Tuple[Union[str, int]]):
"""
Default method which resolves how the parts of input data type
should be represented on interface level.
"""
t = field_path_get_type(root, field_path)
isNonPrimitiveArray = isinstance(t, HArray) and\
not isinstance(t.element_t, Bits)
shouldEnter = isinstance(t, HStruct) or isNonPrimitiveArray
shouldUse = not shouldEnter
return shouldEnter, shouldUse
[docs] def _getWordAddrStep(self) -> int:
"""
:return: how many address units is one word on bus (e.g. 32b AXI -> 4)
"""
raise NotImplementedError(
"Should be overridden in concrete implementation, this is abstract class")
[docs] def _getAddrStep(self) -> int:
"""
:return: how many bits does 1 address unit addresses, (e.g. AXI -> 8b, index to uint32_t[N] -> 32)
"""
raise NotImplementedError(
"Should be overridden in concrete implementation, this is abstract class")
def _config(self):
self._intfCls._config(self)
def _declr(self):
addClkRstn(self)
with self._paramsShared():
self.bus = self._intfCls()
self.decoded = StructIntf(
self.STRUCT_TEMPLATE, tuple(),
instantiateFieldFn=self._mkFieldInterface)._m()
[docs] @staticmethod
def intf_for_Bits(t):
if t.const:
t = copy(t)
t.const = False
p = Signal(dtype=t, masterDir=DIRECTION.IN)
else:
p = RegCntrl()
p.DATA_WIDTH = t.bit_length()
return p
[docs] def _mkFieldInterface(self, structIntf: StructIntf, field: HStructField):
"""
Instantiate field interface for fields in structure template of this endpoint
:return: interface for specified field
"""
t = field.dtype
path = structIntf._field_path / field.name
shouldEnter, shouldUse = self.shouldEnterFn(self.STRUCT_TEMPLATE, path)
if shouldUse:
if isinstance(t, Bits):
p = BusEndpoint.intf_for_Bits(t)
elif isinstance(t, HArray):
p = BramPort_withoutClk()
assert isinstance(t.element_t, Bits), t.element_t
p.DATA_WIDTH = t.element_t.bit_length()
p.ADDR_WIDTH = log2ceil(t.size)
else:
raise NotImplementedError(t)
elif shouldEnter:
if isinstance(t, HArray):
e_t = t.element_t
if isinstance(e_t, Bits):
p = HObjList()
for i_i in range(int(t.size)):
i = BusEndpoint.intf_for_Bits(e_t)
structIntf._fieldsToInterfaces[path / i_i] = i
p.append(i)
elif isinstance(e_t, HStruct):
p = HObjList(
StructIntf(t.element_t,
path / i,
instantiateFieldFn=self._mkFieldInterface)
for i in range(int(t.size))
)
for i in p:
i._fieldsToInterfaces = structIntf._fieldsToInterfaces
else:
raise NotImplementedError()
elif isinstance(t, HStruct):
p = StructIntf(t, path,
instantiateFieldFn=self._mkFieldInterface)
p._fieldsToInterfaces = structIntf._fieldsToInterfaces
else:
raise TypeError(t)
return p
[docs] def getPort(self, transTmpl: TransTmpl):
p = tuple(transTmpl.getFieldPath())
return self.decoded._fieldsToInterfaces[p]
[docs] def isInMyAddrRange(self, addr_sig):
return inRange(addr_sig, self._ADDR_MIN, self._ADDR_MAX)
[docs] def _parseTemplate(self):
self.WORD_ADDR_STEP = self._getWordAddrStep()
self.ADDR_STEP = self._getAddrStep()
AW = int(self.ADDR_WIDTH)
SUGGESTED_AW = self._suggestedAddrWidth()
assert SUGGESTED_AW <= AW, ("Address width too small", SUGGESTED_AW, AW)
tmpl = TransTmpl(self.STRUCT_TEMPLATE)
self._ADDR_MIN = TransTmpl_get_min_addr(tmpl) // self.ADDR_STEP
self._ADDR_MAX = ceil(TransTmpl_get_max_addr(tmpl) / self.ADDR_STEP)
# resolve addresses for bram port mapped fields
self._bramPortMapped = []
def shouldEnterFn(trans_tmpl: TransTmpl):
p = trans_tmpl.getFieldPath()
intf = self.decoded._fieldsToInterfaces[p]
if isinstance(intf, (StructIntf, UnionSink, UnionSource, HObjList)):
shouldEnter = True
shouldUse = False
elif isinstance(intf, BramPort_withoutClk):
shouldEnter = False
shouldUse = True
else:
shouldEnter = False
shouldUse = False
return shouldEnter, shouldUse
for ((base, end), t) in tmpl.walkFlatten(shouldEnterFn=shouldEnterFn):
self._bramPortMapped.append(((base, end), t))
# resolve exact addresses for directly mapped field parts
directly_mapped_fields = {}
for p, out in self.decoded._fieldsToInterfaces.items():
if not isinstance(out, (RegCntrl, Signal)):
continue
a = directly_mapped_fields
for _p in p:
if isinstance(_p, int) and _p != 0:
# we need spec only for first array item
break
a = a.setdefault(_p, {})
dmw = self._directly_mapped_words = []
if directly_mapped_fields:
DW = self.DATA_WIDTH
directly_mapped_t = HdlType_select(
self.STRUCT_TEMPLATE,
directly_mapped_fields)
tmpl = TransTmpl(directly_mapped_t)
frames = list(FrameTmpl.framesFromTransTmpl(
tmpl, DW, maxPaddingWords=0,
trimPaddingWordsOnStart=True,
trimPaddingWordsOnEnd=True,))
for f in frames:
f_word_offset = f.startBitAddr // DW
for (w_i, items) in f.walkWords(showPadding=True):
dmw.append((w_i + f_word_offset, items))
[docs] def _suggestedAddrWidth(self):
"""
Based on struct template resolve how many bits for
address is needed
"""
bitSize = self.STRUCT_TEMPLATE.bit_length()
wordAddrStep = self._getWordAddrStep()
addrStep = self._getAddrStep()
# align to word size
if bitSize % wordAddrStep != 0:
bitSize += wordAddrStep - (bitSize % wordAddrStep)
maxAddr = (bitSize // addrStep) - 1
return maxAddr.bit_length()
[docs] def propagateAddr(self, src_addr_sig: RtlSignal,
src_addr_step: int,
dst_addr_sig: RtlSignal,
dst_addr_step: int,
transTmpl: TransTmpl):
"""
:param src_addr_sig: input signal with address
:param src_addr_step: how many bits is addressing one unit of src_addr_sig
:param dst_addr_sig: output signal for address
:param dst_addr_step: how many bits is addressing one unit of dst_addr_sig
:param transTmpl: TransTmpl which has meta-informations
about this address space transition
"""
IN_W = src_addr_sig._dtype.bit_length()
# _prefix = transTmpl.getMyAddrPrefix(src_addr_step)
assert dst_addr_step % src_addr_step == 0
if not isinstance(transTmpl.dtype, HArray):
raise TypeError(transTmpl.dtype)
assert transTmpl.bitAddr % dst_addr_step == 0, (
f"Has to be addressable by address with this step ({transTmpl})")
addrIsAligned = transTmpl.bitAddr % transTmpl.bit_length() == 0 and isPow2(transTmpl.bit_length())
bitsForAlignment = AddressStepTranslation(src_addr_step, dst_addr_step).align_bits
bitsOfSubAddr = (
(transTmpl.bitAddrEnd - transTmpl.bitAddr - 1)
// dst_addr_step
).bit_length()
if addrIsAligned:
bitsOfAddr = bitsOfSubAddr + bitsForAlignment
bitsOfPrefix = IN_W - bitsOfAddr
prefix = (transTmpl.bitAddr // src_addr_step) >> bitsOfAddr
if bitsOfPrefix == 0:
addrIsInRange = True
else:
addrIsInRange = src_addr_sig[:(IN_W - bitsOfPrefix)]._eq(prefix)
addr_tmp = src_addr_sig
else:
_addr = transTmpl.bitAddr // src_addr_step
_addrEnd = transTmpl.bitAddrEnd // src_addr_step
addrIsInRange = inRange(src_addr_sig, _addr, _addrEnd)
addr_tmp = self._sig(dst_addr_sig._name +
"_addr_tmp", Bits(self.ADDR_WIDTH))
addr_tmp(src_addr_sig - _addr)
if bitsOfSubAddr == 0:
# :note: bitsOfSubAddr could be 0 if target array has 1 item
assert dst_addr_sig._dtype.bit_length() == 1, dst_addr_sig._dtype
connectedAddr = dst_addr_sig(0)
else:
addr_h = bitsOfSubAddr + bitsForAlignment
connectedAddr = dst_addr_sig(
addr_tmp[addr_h:bitsForAlignment]
)
return (addrIsInRange, connectedAddr)
[docs] def connect_directly_mapped_read(self, ar_addr: RtlSignal,
r_data: RtlSignal, default_r_data_drive):
"""
Connect the RegCntrl.din interfaces to a bus
"""
DW = int(self.DATA_WIDTH)
ADDR_STEP = self._getAddrStep()
directlyMappedWords = []
for (w_i, items) in self._directly_mapped_words:
w_data = []
last_end = w_i * DW
for tpart in items:
assert last_end == tpart.startOfPart, (last_end, tpart.startOfPart)
if tpart.tmpl is None:
# padding
din = Bits(tpart.bit_length()).from_py(None)
else:
din = self.getPort(tpart.tmpl)
if isinstance(din, RegCntrl):
din = din.din
if din._dtype.bit_length() > 1:
fr = tpart.getFieldBitRange()
din = din[fr[0]:fr[1]]
w_data.append(din)
last_end = tpart.endOfPart
end_of_word = (w_i + 1) * DW
assert last_end == end_of_word, (last_end, end_of_word)
word_val = Concat(*reversed(w_data))
assert word_val._dtype.bit_length() == DW, (items, word_val)
directlyMappedWords.append((w_i * (DW // ADDR_STEP), word_val))
mux = Switch(ar_addr).add_cases(
[(word_i, r_data(val))
for (word_i, val) in directlyMappedWords]
)
if default_r_data_drive:
mux.Default(
default_r_data_drive
)
return mux
[docs] def connect_directly_mapped_write(self, aw_addr: RtlSignal,
w_data: RtlSignal, en: RtlSignal):
"""
Connect the RegCntrl.dout interfaces to a bus
"""
DW = int(self.DATA_WIDTH)
addrWidth = int(self.ADDR_WIDTH)
ADDR_STEP = self._getAddrStep()
for w_i, items in self._directly_mapped_words:
for tpart in items:
if tpart.tmpl is None:
# padding
continue
out = self.getPort(tpart.tmpl)
if not isinstance(out, RegCntrl):
continue
else:
out = out.dout
field_range = tpart.getFieldBitRange()
if field_range != (out.DATA_WIDTH, 0):
raise NotImplementedError("Write on field not aligned to a word boundary", tpart)
bus_range = tpart.getBusWordBitRange()
out.data(w_data[bus_range[0]: bus_range[1]])
addr = w_i * (DW // ADDR_STEP)
out.vld(en & (aw_addr._eq(Bits(addrWidth).from_py(addr))))
[docs] def connectByInterfaceMap(self, interfaceMap: IntfMap):
"""
Connect "decoded" struct interface to interfaces specified
in interface map
"""
assert isinstance(interfaceMap, IntfMap), interfaceMap
# connect interfaces as was specified by register map
for convIntf, intf in walkStructIntfAndIntfMap(self.decoded,
interfaceMap):
if isinstance(intf, Signal):
assert intf._direction == INTF_DIRECTION.MASTER
if isinstance(convIntf, Signal):
convIntf(intf)
else:
convIntf.din(intf)
elif isinstance(intf, RtlSignalBase):
convIntf.din(intf)
elif isinstance(intf, RegCntrl):
assert intf._direction == INTF_DIRECTION.SLAVE
intf(convIntf)
elif isinstance(intf, VldSynced):
assert intf._direction == INTF_DIRECTION.SLAVE
intf(convIntf.dout)
elif isinstance(intf, BramPort_withoutClk):
intf(convIntf)
else:
raise NotImplementedError(intf)
[docs] @classmethod
def fromInterfaceMap(cls, interfaceMap):
"""
Generate converter by struct data type specified by interface map
:param interfaceMap: :func:`hwt.interfaces.intf_map.HTypeFromIntfMap`
"""
t = HTypeFromIntfMap(interfaceMap)
def shouldEnter(root: HdlType, field_path: TypePath):
actual = IntfMap_get_by_field_path(interfaceMap, field_path)
shouldEnter = isinstance(actual, (list, tuple, HStruct)) or (
isinstance(actual, HStructField) and isinstance(actual.dtype, HStruct))
shouldUse = not shouldEnter
return shouldEnter, shouldUse
# instantiate converter
return cls(t, shouldEnterFn=shouldEnter)