Source code for hwtLib.handshaked.reg

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from typing import List, Tuple, Union

from hwt.code import If
from hwt.interfaces.utils import addClkRstn
from hwt.pyUtils.arrayQuery import iter_with_last
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.interfaceLevel.unitImplHelpers import getSignalName
from hwt.synthesizer.param import Param
from hwt.synthesizer.rtlLevel.constants import NOT_SPECIFIED
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwtLib.handshaked.compBase import HandshakedCompBase


[docs]@serializeParamsUniq class HandshakedReg(HandshakedCompBase): """ Register for Handshaked interfaces :note: latency and delay can be specified as well as interface class :note: if LATENCY == (1, 2) the ready chain is broken. That there is an extra register for potential data overflow and no combinational path between input ready, valid and output ready/valid exists. :ivar INIT_DATA: a reset value of register (data is transfered from this register after reset) (an item for each stage of register, typically just 1 item) e.g. if register has latency=1 and interface has just data:uint8_t signal the INIT_DATA will be in format ((0,),) .. hwt-autodoc:: _example_HandshakedReg """ def _config(self): HandshakedCompBase._config(self) self.LATENCY: Union[int, Tuple[int, int]] = Param(1) self.DELAY = Param(0) self.INIT_DATA: tuple = Param(()) def _declr(self): addClkRstn(self) with self._paramsShared(): self.dataIn = self.intfCls() self.dataOut = self.intfCls()._m()
[docs] def _impl_latency(self, inVld: RtlSignal, inRd: RtlSignal, inData: List[RtlSignal], outVld: RtlSignal, outRd: RtlSignal, prefix:str, initData: list, hasInit:bool): """ Create a normal handshaked register :param inVld: input valid signal (1 if producer is sending data) :param inRd: input ready signal (send 1 if we are ready to receive the data) :param inData: list of input data signals :param outVld: output valid signal (1 if we are sending valid data) :param outRd: output ready signal (1 if consummer is ready to receive the data) :param prefix: name prefix used for internal signals :param initData: list of init data for each data signal on iterface """ isOccupied = self._reg(prefix + "isOccupied", def_val=hasInit) regs_we = self._sig(prefix + 'reg_we') outData = [] assert len(initData) == len(inData) for iin, init in zip(inData, initData): r = self._reg(prefix + 'reg_' + getSignalName(iin), iin._dtype, def_val=None if init is NOT_SPECIFIED else init) If(regs_we, r(iin) ) outData.append(r) If(isOccupied, inRd(outRd), regs_we(inVld & outRd), If(outRd & ~inVld, isOccupied(0) ) ).Else( inRd(1), regs_we(inVld), isOccupied(inVld) ) outVld(isOccupied) return outData
[docs] def _implLatencyAndDelay(self, inVld: RtlSignal, inRd: RtlSignal, inData: List[RtlSignal], outVld: RtlSignal, outRd: RtlSignal, prefix: str, initData: list, hasInit:bool): """ Create a register pipe :see: For param description see :meth:`HandshakedReg._impl_latency` """ wordLoaded = self._reg(prefix + "wordLoaded", def_val=hasInit) If(wordLoaded, wordLoaded(~outRd) ).Else( wordLoaded(inVld) ) outData = [] # construct delay register for each signal assert len(initData) == len(inData) for iin, init in zip(inData, initData): r = self._reg('reg_' + getSignalName(iin), iin._dtype, def_val=None if init is NOT_SPECIFIED else init) If(~wordLoaded, r(iin) ) outData.append(r) inRd(~wordLoaded) outVld(wordLoaded) return outData
[docs] def _implReadyChainBreak(self, in_vld: RtlSignal, in_rd: RtlSignal, in_data: List[RtlSignal], out_vld: RtlSignal, out_rd: RtlSignal, prefix: str, initData: list, hasInit:bool): """ Two sets of registers 0. is prioritized 1. is used as a backup The in_rd is not combinationally connected to out_rd The out_vld is not combinationally connected to in_vld :see: For param description see :meth:`HandshakedReg._impl_latency` """ occupied = [self._reg(f"{prefix}occupied_{i:d}", def_val=hasInit if i == 0 else 0) for i in range(2)] reader_prio = self._reg(f"{prefix}reader_prio", def_val=0) consume_0 = (reader_prio._eq(0) & occupied[0]) | ~occupied[1] consume_1 = (reader_prio._eq(1) & occupied[1]) | ~occupied[0] outData = [] assert len(initData) == len(in_data), (initData, in_data) for iin, init in zip(in_data, initData): r0 = self._reg(prefix + 'reg0_' + getSignalName(iin), iin._dtype, def_val=None if init is NOT_SPECIFIED else init) If(in_vld & ~occupied[0], r0(iin) ) r1 = self._reg(prefix + 'reg1_' + getSignalName(iin), iin._dtype) If(in_vld & ~occupied[1], r1(iin) ) o = self._sig(prefix + 'out_tmp_' + getSignalName(iin), iin._dtype) If(consume_0 & occupied[0], o(r0) ).Elif(consume_1 & occupied[1], o(r1) ).Else( o(None) ) outData.append(o) If(in_vld & (~occupied[0] | ~occupied[1]), If(occupied[0], reader_prio(0), ).Elif(occupied[1], reader_prio(1), ) ) oc0_set = in_vld & ~occupied[0] oc0_clr = out_rd & occupied[0] & consume_0 oc1_set = in_vld & occupied[0] & ~occupied[1] oc1_clr = out_rd & occupied[1] & consume_1 occupied[0]((occupied[0] | oc0_set) & ~oc0_clr) occupied[1]((occupied[1] | oc1_set) & ~oc1_clr) in_rd(~occupied[0] | ~occupied[1]) out_vld(occupied[0] | occupied[1]) return outData
def _impl(self): LATENCY = self.LATENCY DELAY = self.DELAY vld = self.get_valid_signal rd = self.get_ready_signal data = self.get_data Out = self.dataOut In = self.dataIn no_init = [NOT_SPECIFIED for _ in data(In)] hasInit = False if LATENCY == (1, 2): # ready chain break if DELAY != 0: raise NotImplementedError() in_vld, in_rd, in_data = vld(In), rd(In), data(In) out_vld, out_rd = vld(Out), rd(Out) if self.INIT_DATA: if len(self.INIT_DATA) > 1: raise NotImplementedError() else: init = self.INIT_DATA[0] hasInit = True else: init = no_init outData = self._implReadyChainBreak(in_vld, in_rd, in_data, out_vld, out_rd, "ready_chain_break_", init, hasInit) elif DELAY == 0: assert len(self.INIT_DATA) <= self.LATENCY in_vld, in_rd, in_data = vld(In), rd(In), data(In) for last, i in iter_with_last(range(LATENCY)): if last: out_vld, out_rd = vld(Out), rd(Out) else: out_vld = self._sig("latency%d_vld" % (i + 1)) out_rd = self._sig("latency%d_rd" % (i + 1)) try: init = self.INIT_DATA[i] hasInit = True except IndexError: init = no_init outData = self._impl_latency(in_vld, in_rd, in_data, out_vld, out_rd, f"latency{i:d}_", init, hasInit) in_vld, in_rd, in_data = out_vld, out_rd, outData elif LATENCY == 2 and DELAY == 1: latency1_vld = self._sig("latency1_vld") latency1_rd = self._sig("latency1_rd") if self.INIT_DATA: if len(self.INIT_DATA) == 1: init0 = self.INIT_DATA[0] hasInit = True else: init0, init1 = self.INIT_DATA hasInit = True else: init0 = init1 = no_init outData = self._impl_latency(vld(In), rd(In), data(In), latency1_vld, latency1_rd, "latency1_", init0, hasInit) outData = self._implLatencyAndDelay(latency1_vld, latency1_rd, outData, vld(Out), rd(Out), "latency2_delay1_", init1, hasInit) else: raise NotImplementedError(LATENCY, DELAY) for ds, dm in zip(data(Out), outData): ds(dm)
[docs]def _example_HandshakedReg(): from hwt.interfaces.std import Handshaked u = HandshakedReg(Handshaked) return u
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = _example_HandshakedReg() print(to_rtl_str(u))