Source code for hwtLib.handshaked.splitCopy

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import And
from hwt.hwIOs.hwIOArray import HwIOArray
from hwt.hwIOs.std import HwIODataRdVld
from hwt.hwParam import HwParam
from hwt.pyUtils.typingFuture import override
from hwtLib.handshaked.compBase import HandshakedCompBase


[docs] class HsSplitCopy(HandshakedCompBase): """ Clone input stream to n identical output streams transaction is made in all interfaces or none of them :note: combinational .. figure:: ./_static/HsSplitCopy.png .. hwt-autodoc:: _example_HsSplitCopy """ @override def hwConfig(self): self.OUTPUTS = HwParam(2) super().hwConfig() @override def hwDeclr(self): with self._hwParamsShared(): self.dataIn = self.hwIOCls() self.dataOut = HwIOArray( self.hwIOCls() for _ in range(int(self.OUTPUTS)) )._m() @override def hwImpl(self): rd = self.get_ready_signal vld = self.get_valid_signal data = self.get_data for io in self.dataOut: for i, o in zip(data(self.dataIn), data(io)): o(i) outRd = And(*[rd(i) for i in self.dataOut]) rd(self.dataIn)(outRd) for o in self.dataOut: # everyone else is ready and input is valid deps = [vld(self.dataIn)] for otherO in self.dataOut: if otherO is o: continue deps.append(rd(otherO)) _vld = And(*deps) vld(o)(_vld)
[docs] def _example_HsSplitCopy(): m = HsSplitCopy(HwIODataRdVld) return m
if __name__ == "__main__": from hwt.synth import to_rtl_str m = _example_HsSplitCopy() print(to_rtl_str(m))