Source code for hwtLib.examples.hierarchy.extractHierarchy

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from collections import deque
from itertools import chain
from typing import Union, Optional, Sequence, Dict, Deque, List

from hwt.hdl.const import HConst
from hwt.hdl.operator import HOperatorNode
from hwt.hdl.operatorDefs import EVENT_OPS
from hwt.hdl.portItem import HdlPortItem
from hwt.hdl.statements.statement import HdlStatement
from hwt.hwIOs.std import HwIOClk, HwIORst, HwIORst_n, HwIOSignal
from hwt.pyUtils.setList import SetList
from hwt.hwModule import HwModule
from hwt.synthesizer.rtlLevel.netlist import RtlNetlist
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.rtlLevel.exceptions import SignalDriverErr
from hwtLib.abstract.componentBuilder import AbstractComponentBuilder


[docs] def consumeExpr(e: Union[RtlSignal, HConst], inputs:SetList[RtlSignal], seenObjs: SetList[Union[RtlSignal, HOperatorNode]]): """ To make output code readable we must extract also expressions used in statements if they are private to selected statements. :note: Walk from endpoint to a driver. For every signal on path if all its endpoints are in seenObjs, add this to seenObjs and continue search on its drivers. """ if isinstance(e, HConst): return try: d = e.singleDriver() except SignalDriverErr: seenObjs.append(e) inputs.append(e) return if isinstance(d, (HdlPortItem, HdlStatement)): seenObjs.append(e) inputs.append(e) return else: assert isinstance(d, HOperatorNode), d d: HOperatorNode if d.operator in EVENT_OPS: c = d.operands[0] seenObjs.extend((d, e, c)) inputs.append(c) else: for ep in e._rtlEndpoints: if ep not in seenObjs: inputs.append(e) return inputs.discard(e) seenObjs.append(e) seenObjs.append(d) for d in d.operands: consumeExpr(d, inputs, seenObjs)
[docs] class ExtractedHwModule(HwModule): """ An unit which will extract selected circuit from parent on instantiation. """
[docs] def __init__(self, externInputs: SetList[RtlSignal], externObjsToExtract: SetList[Union[RtlSignal, HOperatorNode]], externOutputs: SetList[RtlSignal], hdlName:Optional[str]=None): HwModule.__init__(self, hdlName=hdlName) self._externInputs = externInputs self._externObjsToExtract = externObjsToExtract self._externOutputs = externOutputs
def hwDeclr(self) -> None: ioMap = self._ioMap = {} for i in self._externInputs: i: RtlSignal hwIO = getattr(i, "_hwIO", None) iCloned = None if hwIO is not None: if isinstance(hwIO, (HwIOClk, HwIORst, HwIORst_n)): iCloned = hwIO.__class__() iCloned._updateHwParamsFrom(hwIO) if iCloned is None: iCloned = HwIOSignal(i._dtype) name = AbstractComponentBuilder(self, None, "")._findSuitableName(i._name, firstWithoutCntrSuffix=True) setattr(self, name, iCloned) assert i not in ioMap ioMap[i] = iCloned for o in self._externOutputs: o: RtlSignal oCloned = HwIOSignal(o._dtype)._m() name = AbstractComponentBuilder(self, None, "")._findSuitableName(o._name, firstWithoutCntrSuffix=True) setattr(self, name, oCloned) assert o not in ioMap ioMap[o] = oCloned
[docs] def _handleUpdateForOutput(self, o: RtlSignal, translation: Dict[RtlSignal, RtlSignal], interOuts: Dict[RtlSignal, RtlSignal], outerIoMap: Dict[RtlSignal, RtlSignal], toTranslate: Deque[Union[HOperatorNode, HdlStatement, HdlPortItem]], ): priv = self._externObjsToExtract translation[o] = o outerIo = outerIoMap.get(o, None) # drop operand cache items for expressions which will not be moved to this netlist # Must replace o with outerIo in every use of o outside of this unit assert o._rtlCtx is self._rtlCtx, o for ep in tuple(o._rtlEndpoints): if ep in priv: toTranslate.append(ep) else: assert outerIo is not None, (o, ep) if isinstance(ep, HdlStatement): ep._replace_input((o, outerIo)) elif isinstance(ep, HdlPortItem): raise NotImplementedError(ep) else: assert isinstance(ep, HOperatorNode), ep if ep in priv: assert o not in ep.perands, (o, ep) assert ep.result._rtlCtx is self._rtlCtx, ep else: # replace operand of the operator node ep._replace_input(o, outerIo) toDrop = [] for k, v in o._usedOps.items(): if isinstance(v, HConst): continue v: RtlSignal if v._rtlCtx is self._rtlCtx: continue try: d = v.singleDriver() except SignalDriverErr: raise NotImplementedError() # if d in priv: # # will be moved later, we can keep it # continue toDrop.append(k) for kToDrop in toDrop: o._usedOps.pop(kToDrop) o._usedOpsAlias.pop(kToDrop).remove(kToDrop) if outerIo is not None: interOuts[o](o) # drive output of this unit
[docs] def _moveOrCopyCircuitFromParent(self, translation: Dict[RtlSignal, RtlSignal], interOuts: Dict[RtlSignal, RtlSignal], outerIoMap: Dict[RtlSignal, RtlSignal]): # BFS move or copy circuit to this unit from parent priv = self._externObjsToExtract toTranslate: Deque[Union[HOperatorNode, HdlStatement, HdlPortItem]] = deque() for i in self._externInputs: i: RtlSignal toTranslate.extend(ep for ep in i._rtlEndpoints if ep in priv) # if the signal is private, it is necessary to mark it as not hidden # in order to provide handle for subexpression replace (_replace_input) while toTranslate: obj = toTranslate.popleft() if obj in translation: # already translated continue if isinstance(obj, HdlStatement): stm: HdlStatement = obj ctx = self._parent._rtlCtx assert stm in ctx.statements newInputs: Optional[List[Union[RtlSignal, HConst]]] = [] for i in stm._inputs: _i = translation.get(i, None) if _i is None: # postpone until all inputs are translated newInputs = None break else: assert _i._rtlCtx is self._rtlCtx, (i, _i) newInputs.append(_i) if newInputs is None: assert toTranslate, stm toTranslate.append(stm) continue for newI, i in zip(newInputs, stm._inputs): if newI is not i: stm._replace_input((i, newI)) for o in stm._outputs: # move signal to a specified netlist o._rtlCtx.signals.remove(o) o._rtlCtx = self._rtlCtx o._rtlCtx.signals.add(o) self._handleUpdateForOutput(o, translation, interOuts, outerIoMap, toTranslate) ctx.statements.remove(stm) self._rtlCtx.statements.add(stm) translation[stm] = stm stm._clean_signal_meta() elif isinstance(obj, HOperatorNode): shouldCopy = obj not in priv or obj.operator in EVENT_OPS if shouldCopy: newOps = [] for op in obj.operands: if isinstance(op, RtlSignal): _op = translation.get(op) if _op is None: # some input not available yet, must postpone translation newOps = None break else: assert _op._rtlCtx is self._rtlCtx newOps.append(_op) else: assert isinstance(op, HConst), op newOps.append(op) if newOps is None: toTranslate.append(obj) continue # create a new copy of this object in this unit newSig = obj.operator._evalFn(*newOps) assert newSig._rtlCtx is self._rtlCtx, newSig assert obj.result._rtlCtx is not self._rtlCtx, obj.result else: # move this object to his unit, move is done by translation of operand # and by move is result signal to the netlist of this unit # move signal to a specified netlist o = obj.result if o._rtlCtx is not self._rtlCtx: o._rtlCtx.signals.remove(o) o._rtlCtx = self._rtlCtx o._rtlCtx.signals.add(o) translated = True for op in tuple(obj.operands): if isinstance(op, RtlSignal): if op._rtlCtx is self._rtlCtx: continue _op = translation.get(op) if _op is None: # some input not available yet, must postpone translation translated = False break else: assert _op._rtlCtx is self._rtlCtx obj._replace_input(op, _op) if not translated: toTranslate.append(obj) self._handleUpdateForOutput(o, translation, interOuts, outerIoMap, toTranslate) newSig = o translation[obj.result] = newSig translation[obj] = newSig.singleDriver() toTranslate.extend(ep for ep in obj.result._rtlEndpoints if ep in priv) outerIo = outerIoMap.get(obj.result, None) if outerIo is not None: outerIo(newSig) else: raise NotImplementedError(obj.__class__, obj)
def hwImpl(self) -> None: # transfer circuit to subunit # re-connect signals which should be connected to IO of new subunit translation = {i: self._ioMap[i]._sig for i in self._externInputs} interOuts = {i: self._ioMap[i]._sig for i in self._externOutputs} self._cleanThisSubunitRtlSignals() super(ExtractedHwModule, self)._signalsForSubHwModuleEntity(self._parent._rtlCtx, "sig_" + self._name) outerIo = {i: self._ioMap[i]._sig for i in chain(self._externInputs, self._externOutputs)} self._moveOrCopyCircuitFromParent(translation, interOuts, outerIo) # connect external input signal to a input of this unit for i in self._externInputs: self._ioMap[i](i) self._outerIo = outerIo
[docs] def _signalsForSubHwModuleEntity(self, context: RtlNetlist, prefix: str): assert context is self._parent._rtlCtx, self assert prefix == "sig_" + self._name, (prefix, self._name) for i in chain(self._externInputs, self._externOutputs): self._ioMap[i]._sig = self._outerIo[i]
[docs] def extractNetlistPartToSubmodule( inputs: SetList[RtlSignal], outputs: SetList[RtlSignal], priv: SetList[Union[RtlSignal, HOperatorNode]]) -> ExtractedHwModule: """ :attention: extraction is executed on unit instantiation :attention: each object in priv must be reachable from inputs """ privOutputs = [] for o in outputs: inputs.discard(o) usedOnlyInternally = True for ep in o._rtlEndpoints: if ep not in priv: usedOnlyInternally = False break if usedOnlyInternally: privOutputs.append(o) for o in privOutputs: outputs.remove(o) for i in inputs: priv.discard(i) return ExtractedHwModule(inputs, priv, outputs)
[docs] def extractRegsToSubmodule(regs: Sequence[RtlSignal]) -> ExtractedHwModule: # resolve IO inputs: SetList[RtlSignal] = SetList() outputs: SetList[RtlSignal] = SetList() priv: SetList[Union[RtlSignal, HOperatorNode]] = SetList() for r in regs: dffStm = r.singleDriver()._cut_off_drivers_of(r) dffNextStm = r._rtlNextSig.singleDriver()._cut_off_drivers_of(r._rtlNextSig) priv.extend((dffStm, dffNextStm)) for stm in (dffStm, dffNextStm): for i in stm._inputs: consumeExpr(i, inputs, priv) outputs.extend(stm._outputs) return extractNetlistPartToSubmodule(inputs, outputs, priv)