Source code for hwtLib.examples.hierarchy.extractHierarchy
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from collections import deque
from itertools import chain
from typing import Union, Optional, Sequence, Dict, Deque, List
from hwt.hdl.operator import Operator
from hwt.hdl.operatorDefs import EVENT_OPS
from hwt.hdl.portItem import HdlPortItem
from hwt.hdl.statements.statement import HdlStatement
from hwt.hdl.value import HValue
from hwt.interfaces.std import Clk, Rst, Rst_n, Signal
from hwt.pyUtils.uniqList import UniqList
from hwt.synthesizer.rtlLevel.netlist import RtlNetlist
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.rtlLevel.rtlSyncSignal import RtlSyncSignal
from hwt.synthesizer.rtlLevel.signalUtils.exceptions import SignalDriverErr
from hwt.synthesizer.unit import Unit
from hwtLib.abstract.componentBuilder import AbstractComponentBuilder
[docs]def consumeExpr(e: Union[RtlSignal, HValue],
inputs:UniqList[RtlSignal],
seenObjs: UniqList[Union[RtlSignal, Operator]]):
"""
To make output code readable we must extract also expressions used in statements if they are private
to selected statements.
:note: Walk from endpoint to a driver. For every signal on path if all its endpoints are
in seenObjs, add this to seenObjs and continue search on its drivers.
"""
if isinstance(e, HValue):
return
try:
d = e.singleDriver()
except SignalDriverErr:
seenObjs.append(e)
inputs.append(e)
return
if isinstance(d, (HdlPortItem, HdlStatement)):
seenObjs.append(e)
inputs.append(e)
return
else:
assert isinstance(d, Operator), d
d: Operator
if d.operator in EVENT_OPS:
c = d.operands[0]
seenObjs.extend((d, e, c))
inputs.append(c)
else:
for ep in e.endpoints:
if ep not in seenObjs:
inputs.append(e)
return
inputs.discard(e)
seenObjs.append(e)
seenObjs.append(d)
for d in d.operands:
consumeExpr(d, inputs, seenObjs)
[docs]class ExtractedUnit(Unit):
"""
An unit which will extract selected circuit from parent on instantiation.
"""
[docs] def __init__(self, externInputs: UniqList[RtlSignal],
externObjsToExtract: UniqList[Union[RtlSignal, Operator]],
externOutputs: UniqList[RtlSignal],
hdl_name_override:Optional[str]=None):
Unit.__init__(self, hdl_name_override=hdl_name_override)
self._externInputs = externInputs
self._externObjsToExtract = externObjsToExtract
self._externOutputs = externOutputs
def _declr(self) -> None:
ioMap = self._ioMap = {}
for i in self._externInputs:
i: RtlSignal
intf = getattr(i, "_interface", None)
iCloned = None
if intf is not None:
if isinstance(intf, (Clk, Rst, Rst_n)):
iCloned = intf.__class__()
iCloned._updateParamsFrom(intf)
if iCloned is None:
iCloned = Signal(i._dtype)
name = AbstractComponentBuilder(self, None, "")._findSuitableName(i.name, firstWithoutCntrSuffix=True)
setattr(self, name, iCloned)
assert i not in ioMap
ioMap[i] = iCloned
for o in self._externOutputs:
o: RtlSignal
oCloned = Signal(o._dtype)._m()
name = AbstractComponentBuilder(self, None, "")._findSuitableName(o.name, firstWithoutCntrSuffix=True)
setattr(self, name, oCloned)
assert o not in ioMap
ioMap[o] = oCloned
[docs] def _handleUpdateForOutput(self, o: RtlSignal,
translation: Dict[RtlSignal, RtlSignal],
interOuts: Dict[RtlSignal, RtlSignal],
outerIoMap: Dict[RtlSignal, RtlSignal],
toTranslate: Deque[Union[Operator, HdlStatement, HdlPortItem]],
):
priv = self._externObjsToExtract
translation[o] = o
outerIo = outerIoMap.get(o, None)
# drop operand cache items for expressions which will not be moved to this netlist
# Must replace o with outerIo in every use of o outside of this unit
assert o.ctx is self._ctx, o
for ep in tuple(o.endpoints):
if ep in priv:
toTranslate.append(ep)
else:
assert outerIo is not None, (o, ep)
if isinstance(ep, HdlStatement):
ep._replace_input((o, outerIo))
elif isinstance(ep, HdlPortItem):
raise NotImplementedError(ep)
else:
assert isinstance(ep, Operator), ep
if ep in priv:
assert o not in ep.perands, (o, ep)
assert ep.result.ctx is self._ctx, ep
else:
# replace operand of the operator node
ep._replace_input(o, outerIo)
toDrop = []
for k, v in o._usedOps.items():
if isinstance(v, HValue):
continue
v: RtlSignal
if v.ctx is self._ctx:
continue
try:
d = v.singleDriver()
except SignalDriverErr:
raise NotImplementedError()
# if d in priv:
# # will be moved later, we can keep it
# continue
toDrop.append(k)
for kToDrop in toDrop:
o._usedOps.pop(kToDrop)
o._usedOpsAlias.pop(kToDrop).remove(kToDrop)
if outerIo is not None:
interOuts[o](o) # drive output of this unit
[docs] def _moveOrCopyCircuitFromParent(self, translation: Dict[RtlSignal, RtlSignal],
interOuts: Dict[RtlSignal, RtlSignal],
outerIoMap: Dict[RtlSignal, RtlSignal]):
# BFS move or copy circuit to this unit from parent
priv = self._externObjsToExtract
toTranslate: Deque[Union[Operator, HdlStatement, HdlPortItem]] = deque()
for i in self._externInputs:
i: RtlSignal
toTranslate.extend(ep for ep in i.endpoints if ep in priv)
# if the signal is private, it is necessary to mark it as not hidden
# in order to provide handle for subexpression replace (_replace_input)
while toTranslate:
obj = toTranslate.popleft()
if obj in translation:
# already translated
continue
if isinstance(obj, HdlStatement):
stm: HdlStatement = obj
ctx = self._parent._ctx
assert stm in ctx.statements
newInputs: Optional[List[Union[RtlSignal, HValue]]] = []
for i in stm._inputs:
_i = translation.get(i, None)
if _i is None:
# postpone until all inputs are translated
newInputs = None
break
else:
assert _i.ctx is self._ctx, (i, _i)
newInputs.append(_i)
if newInputs is None:
assert toTranslate, stm
toTranslate.append(stm)
continue
for newI, i in zip(newInputs, stm._inputs):
if newI is not i:
stm._replace_input((i, newI))
for o in stm._outputs:
# move signal to a specified netlist
o.ctx.signals.remove(o)
o.ctx = self._ctx
o.ctx.signals.add(o)
self._handleUpdateForOutput(o, translation, interOuts, outerIoMap, toTranslate)
ctx.statements.remove(stm)
self._ctx.statements.add(stm)
translation[stm] = stm
stm._clean_signal_meta()
elif isinstance(obj, Operator):
shouldCopy = obj not in priv or obj.operator in EVENT_OPS
if shouldCopy:
newOps = []
for op in obj.operands:
if isinstance(op, RtlSignal):
_op = translation.get(op)
if _op is None:
# some input not available yet, must postpone translation
newOps = None
break
else:
assert _op.ctx is self._ctx
newOps.append(_op)
else:
assert isinstance(op, HValue), op
newOps.append(op)
if newOps is None:
toTranslate.append(obj)
continue
# create a new copy of this object in this unit
newSig = obj.operator._evalFn(*newOps)
assert newSig.ctx is self._ctx, newSig
assert obj.result.ctx is not self._ctx, obj.result
else:
# move this object to his unit, move is done by translation of operand
# and by move is result signal to the netlist of this unit
# move signal to a specified netlist
o = obj.result
if o.ctx is not self._ctx:
o.ctx.signals.remove(o)
o.ctx = self._ctx
o.ctx.signals.add(o)
translated = True
for op in tuple(obj.operands):
if isinstance(op, RtlSignal):
if op.ctx is self._ctx:
continue
_op = translation.get(op)
if _op is None:
# some input not available yet, must postpone translation
translated = False
break
else:
assert _op.ctx is self._ctx
obj._replace_input(op, _op)
if not translated:
toTranslate.append(obj)
self._handleUpdateForOutput(o, translation, interOuts, outerIoMap, toTranslate)
newSig = o
translation[obj.result] = newSig
translation[obj] = newSig.singleDriver()
toTranslate.extend(ep for ep in obj.result.endpoints if ep in priv)
outerIo = outerIoMap.get(obj.result, None)
if outerIo is not None:
outerIo(newSig)
else:
raise NotImplementedError(obj.__class__, obj)
def _impl(self) -> None:
# transfer circuit to subunit
# re-connect signals which should be connected to IO of new subunit
translation = {i: self._ioMap[i]._sig for i in self._externInputs}
interOuts = {i: self._ioMap[i]._sig for i in self._externOutputs}
self._cleanAsSubunit()
super(ExtractedUnit, self)._signalsForSubUnitEntity(self._parent._ctx, "sig_" + self._name)
outerIo = {i: self._ioMap[i]._sig for i in chain(self._externInputs, self._externOutputs)}
self._moveOrCopyCircuitFromParent(translation, interOuts, outerIo)
# connect external input signal to a input of this unit
for i in self._externInputs:
self._ioMap[i](i)
self._outerIo = outerIo
[docs] def _signalsForSubUnitEntity(self, context: RtlNetlist, prefix: str):
assert context is self._parent._ctx, self
assert prefix == "sig_" + self._name, (prefix, self._name)
for i in chain(self._externInputs, self._externOutputs):
self._ioMap[i]._sig = self._outerIo[i]
[docs]def extractNetlistPartToSubunit(
inputs: UniqList[RtlSignal],
outputs: UniqList[RtlSignal],
priv: UniqList[Union[RtlSignal, Operator]]) -> ExtractedUnit:
"""
:attention: extraction is executed on unit instantiation
:attention: each object in priv must be reachable from inputs
"""
privOutputs = []
for o in outputs:
inputs.discard(o)
usedOnlyInternally = True
for ep in o.endpoints:
if ep not in priv:
usedOnlyInternally = False
break
if usedOnlyInternally:
privOutputs.append(o)
for o in privOutputs:
outputs.remove(o)
for i in inputs:
priv.discard(i)
return ExtractedUnit(inputs, priv, outputs)
[docs]def extractRegsToSubunit(regs: Sequence[RtlSyncSignal]) -> ExtractedUnit:
# resolve IO
inputs: UniqList[RtlSignal] = UniqList()
outputs: UniqList[RtlSignal] = UniqList()
priv: UniqList[Union[RtlSignal, Operator]] = UniqList()
for r in regs:
dffStm = r.singleDriver()._cut_off_drivers_of(r)
dffNextStm = r.next.singleDriver()._cut_off_drivers_of(r.next)
priv.extend((dffStm, dffNextStm))
for stm in (dffStm, dffNextStm):
for i in stm._inputs:
consumeExpr(i, inputs, priv)
outputs.extend(stm._outputs)
return extractNetlistPartToSubunit(inputs, outputs, priv)