#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import List, Tuple, Union
from hwt.code import If
from hwt.constants import NOT_SPECIFIED
from hwt.hwIOs.hwIOStruct import HwIO_to_HdlType
from hwt.hwIOs.utils import addClkRstn
from hwt.hwParam import HwParam
from hwt.pyUtils.arrayQuery import iter_with_last
from hwt.pyUtils.typingFuture import override
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.interfaceLevel.hwModuleImplHelpers import getSignalName
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwtLib.handshaked.compBase import HandshakedCompBase
[docs]
@serializeParamsUniq
class HandshakedReg(HandshakedCompBase):
"""
Register for HwIODataRdVld interfaces
:note: latency and delay can be specified as well as HwIO class
:note: if LATENCY == (1, 2) or (0, 1) the ready chain is broken.
That there is an extra register for potential data overflow and
no combinational path between input ready, valid and output ready/valid exists.
The variant with latency (0, 1) known as Skid buffer.
:note: if this generates Skid buffer,
the ready chain is broken.
:ivar INIT_DATA: a reset value of register (data is transferred from this register after reset)
(an item for each stage of register, typically just 1 item)
e.g. if register has latency=1 and interface has just data:uint8_t signal
the INIT_DATA will be in format ((0,),)
.. hwt-autodoc:: _example_HandshakedReg
"""
@override
def hwConfig(self):
HandshakedCompBase.hwConfig(self)
self.LATENCY: Union[int, Tuple[int, int]] = HwParam(1)
self.DELAY = HwParam(0)
self.INIT_DATA: tuple = HwParam(())
@override
def hwDeclr(self):
addClkRstn(self)
with self._hwParamsShared():
self.dataIn = self.hwIOCls()
self.dataOut = self.hwIOCls()._m()
[docs]
def _implSkidBuff(self, inVld: RtlSignal, inRd: RtlSignal, inData: List[RtlSignal],
outVld: RtlSignal, outRd: RtlSignal):
"""
latency 0-1, dealy 0, pass input directly to output and if output.ready=0, store data
in internal buffer and wait until it is consumed before setting input.ready=1 again.
Breaks ready chain but has combination path on "valid".
https://github.com/j-marjanovic/chisel-stuff/blob/master/example-14-pcie-endpoint/src/main/scala/pcie_endpoint/SkidBuffer.scala
"""
hasInit = False
if self.INIT_DATA:
if len(self.INIT_DATA) > 1:
raise NotImplementedError()
else:
initData = self.INIT_DATA[0]
hasInit = True
else:
initData = [NOT_SPECIFIED for _ in inData]
isOccupied = self._reg("latency1_isOccupied", def_val=hasInit)
regs_we = self._sig('latency1_reg_we')
outData = []
assert len(initData) == len(inData)
for iin, init in zip(inData, initData):
r = self._reg('latency1_reg_' + getSignalName(iin), iin._dtype,
def_val=None if init is NOT_SPECIFIED else init)
If(regs_we,
r(iin)
)
outData.append(isOccupied._ternary(r, iin))
If(isOccupied,
inRd(0),
regs_we(0),
If(outRd,
isOccupied(0)
)
).Else(
inRd(1),
regs_we(inVld & ~outRd),
isOccupied(inVld & ~outRd)
)
outVld(isOccupied | inVld)
return outData
[docs]
def _impl_latency(self, inVld: RtlSignal, inRd: RtlSignal, inData: List[RtlSignal],
outVld: RtlSignal, outRd: RtlSignal, prefix:str, initData: list,
hasInit:bool):
"""
Create a normal handshaked register
:param inVld: input valid signal (1 if producer is sending data)
:param inRd: input ready signal (send 1 if we are ready to receive the data)
:param inData: list of input data signals
:param outVld: output valid signal (1 if we are sending valid data)
:param outRd: output ready signal (1 if consummer is ready to receive the data)
:param prefix: name prefix used for internal signals
:param initData: list of init data for each data signal on iterface
"""
isOccupied = self._reg(prefix + "isOccupied", def_val=hasInit)
regs_we = self._sig(prefix + 'reg_we')
outData = []
assert len(initData) == len(inData)
for iin, init in zip(inData, initData):
r = self._reg(prefix + 'reg_' + getSignalName(iin), HwIO_to_HdlType().apply(iin),
def_val=None if init is NOT_SPECIFIED else init)
If(regs_we,
r(iin)
)
outData.append(r)
If(isOccupied,
inRd(outRd),
regs_we(inVld & outRd),
If(outRd & ~inVld,
isOccupied(0)
)
).Else(
inRd(1),
regs_we(inVld),
isOccupied(inVld)
)
outVld(isOccupied)
return outData
[docs]
def _implLatencyAndDelay(self, inVld: RtlSignal, inRd: RtlSignal, inData: List[RtlSignal],
outVld: RtlSignal, outRd: RtlSignal, prefix: str, initData: list, hasInit:bool):
"""
Create a register pipe
:see: For param description see :meth:`HandshakedReg._impl_latency`
"""
wordLoaded = self._reg(prefix + "wordLoaded", def_val=hasInit)
If(wordLoaded,
wordLoaded(~outRd)
).Else(
wordLoaded(inVld)
)
outData = []
# construct delay register for each signal
assert len(initData) == len(inData)
for iin, init in zip(inData, initData):
r = self._reg('reg_' + getSignalName(iin), iin._dtype,
def_val=None if init is NOT_SPECIFIED else init)
If(~wordLoaded,
r(iin)
)
outData.append(r)
inRd(~wordLoaded)
outVld(wordLoaded)
return outData
[docs]
def _implReadyChainBreak(self, in_vld: RtlSignal, in_rd: RtlSignal, in_data: List[RtlSignal],
out_vld: RtlSignal, out_rd: RtlSignal, prefix: str, initData: list, hasInit:bool):
"""
Two sets of registers 0. is prioritized 1. is used as a backup
The in_rd is not combinationally connected to out_rd
The out_vld is not combinationally connected to in_vld
:see: For param description see :meth:`HandshakedReg._impl_latency`
"""
occupied = [self._reg(f"{prefix}occupied_{i:d}", def_val=hasInit if i == 0 else 0) for i in range(2)]
reader_prio = self._reg(f"{prefix}reader_prio", def_val=0)
consume_0 = (reader_prio._eq(0) & occupied[0]) | ~occupied[1]
consume_1 = (reader_prio._eq(1) & occupied[1]) | ~occupied[0]
outData = []
assert len(initData) == len(in_data), (initData, in_data)
for iin, init in zip(in_data, initData):
r0 = self._reg(prefix + 'reg0_' + getSignalName(iin), iin._dtype,
def_val=None if init is NOT_SPECIFIED else init)
If(in_vld & ~occupied[0],
r0(iin)
)
r1 = self._reg(prefix + 'reg1_' + getSignalName(iin), iin._dtype)
If(in_vld & ~occupied[1],
r1(iin)
)
o = self._sig(prefix + 'out_tmp_' + getSignalName(iin), iin._dtype)
If(consume_0 & occupied[0],
o(r0)
).Elif(consume_1 & occupied[1],
o(r1)
).Else(
o(None)
)
outData.append(o)
If(in_vld & (~occupied[0] | ~occupied[1]),
If(occupied[0],
reader_prio(0),
).Elif(occupied[1],
reader_prio(1),
)
)
oc0_set = in_vld & ~occupied[0]
oc0_clr = out_rd & occupied[0] & consume_0
oc1_set = in_vld & occupied[0] & ~occupied[1]
oc1_clr = out_rd & occupied[1] & consume_1
occupied[0]((occupied[0] | oc0_set) & ~oc0_clr)
occupied[1]((occupied[1] | oc1_set) & ~oc1_clr)
in_rd(~occupied[0] | ~occupied[1])
out_vld(occupied[0] | occupied[1])
return outData
@override
def hwImpl(self):
LATENCY = self.LATENCY
DELAY = self.DELAY
vld = self.get_valid_signal
rd = self.get_ready_signal
data = self.get_data
Out = self.dataOut
In = self.dataIn
no_init = [NOT_SPECIFIED for _ in data(In)]
hasInit = False
if LATENCY == (0, 1):
# ready chain break, skid buffer
if DELAY != 0:
raise NotImplementedError()
in_vld, in_rd, in_data = vld(In), rd(In), data(In)
out_vld, out_rd = vld(Out), rd(Out)
outData = self._implSkidBuff(in_vld, in_rd, in_data, out_vld, out_rd)
elif LATENCY == (0, 1) or LATENCY == (1, 2):
# ready chain break, similar to FIFO
if DELAY != 0:
raise NotImplementedError()
in_vld, in_rd, in_data = vld(In), rd(In), data(In)
out_vld, out_rd = vld(Out), rd(Out)
if self.INIT_DATA:
if len(self.INIT_DATA) > 1:
raise NotImplementedError()
else:
init = self.INIT_DATA[0]
hasInit = True
else:
init = no_init
outData = self._implReadyChainBreak(in_vld, in_rd, in_data, out_vld, out_rd,
"ready_chain_break_", init, hasInit)
elif DELAY == 0:
assert len(self.INIT_DATA) <= self.LATENCY
in_vld, in_rd, in_data = vld(In), rd(In), data(In)
for last, i in iter_with_last(range(LATENCY)):
if last:
out_vld, out_rd = vld(Out), rd(Out)
else:
out_vld = self._sig("latency%d_vld" % (i + 1))
out_rd = self._sig("latency%d_rd" % (i + 1))
try:
init = self.INIT_DATA[i]
hasInit = True
except IndexError:
init = no_init
outData = self._impl_latency(in_vld, in_rd, in_data,
out_vld, out_rd,
f"latency{i:d}_", init, hasInit)
in_vld, in_rd, in_data = out_vld, out_rd, outData
elif LATENCY == 2 and DELAY == 1:
latency1_vld = self._sig("latency1_vld")
latency1_rd = self._sig("latency1_rd")
if self.INIT_DATA:
if len(self.INIT_DATA) == 1:
init0 = self.INIT_DATA[0]
hasInit = True
else:
init0, init1 = self.INIT_DATA
hasInit = True
else:
init0 = init1 = no_init
outData = self._impl_latency(vld(In), rd(In), data(In),
latency1_vld, latency1_rd,
"latency1_", init0, hasInit)
outData = self._implLatencyAndDelay(latency1_vld, latency1_rd,
outData, vld(Out), rd(Out),
"latency2_delay1_", init1, hasInit)
else:
raise NotImplementedError(LATENCY, DELAY)
for ds, dm in zip(data(Out), outData):
ds(dm)
[docs]
def _example_HandshakedReg():
from hwt.hwIOs.std import HwIODataRdVld
m = HandshakedReg(HwIODataRdVld)
return m
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = _example_HandshakedReg()
print(to_rtl_str(m))