Source code for hwtLib.handshaked.cdc
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If
from hwt.constraints import set_max_delay
from hwt.interfaces.std import Signal, Handshaked
from hwt.interfaces.utils import addClkRst
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.hObjList import HObjList
from hwt.synthesizer.param import Param
from hwt.synthesizer.unit import Unit
from hwtLib.clocking.cdc import CdcPulseGen
from hwtLib.clocking.vldSynced_cdc import VldSyncedCdc
from hwtLib.handshaked.compBase import HandshakedCompBase
[docs]@serializeParamsUniq
class HandshakeFSM(Unit):
"""
.. hwt-autodoc::
"""
def _declr(self):
addClkRst(self)
self.ack = Signal()
self.vld = Signal()
self.rd = Signal()._m()
def _impl(self):
rd = self._reg("rd", def_val=1)
If(rd,
rd(~self.vld)
).Else(
rd(self.ack)
)
self.rd(rd)
[docs]@serializeParamsUniq
class HandshakedCdc(HandshakedCompBase):
"""
CDC (Clock Domain Crossing) for handshaked interface
:note: This component uses syncrhorization by pulse CDCs which means it's throughput is significantly limited
(eta slowest clk / 3)
.. hwt-autodoc:: example_HandshakedCdc
"""
def _config(self):
HandshakedCompBase._config(self)
self.DATA_RESET_VAL = Param(None)
self.IN_FREQ = Param(int(100e6))
self.OUT_FREQ = Param(int(100e6))
def _declr(self):
VldSyncedCdc._declr(self)
ipg = self.in_ack_pulse_gen = CdcPulseGen()
ipg.IN_FREQ = self.OUT_FREQ
ipg.OUT_FREQ = self.IN_FREQ
opg = self.out_en_pulse_gen = CdcPulseGen()
opg.IN_FREQ = self.IN_FREQ
opg.OUT_FREQ = self.OUT_FREQ
self.tx_fsm = HandshakeFSM()
self.rx_fsm = HandshakeFSM()
[docs] def propagate_clk(self, u, reverse=False):
if reverse:
u.dataIn_clk(self.dataOut_clk)
u.dataIn_rst(~self.dataOut_rst_n)
u.dataOut_clk(self.dataIn_clk)
u.dataOut_rst(~self.dataIn_rst_n)
else:
u.dataIn_clk(self.dataIn_clk)
u.dataIn_rst(~self.dataIn_rst_n)
u.dataOut_clk(self.dataOut_clk)
u.dataOut_rst(~self.dataOut_rst_n)
[docs] def propagate_in_clk(self, u):
u.clk(self.dataIn_clk)
u.rst(~self.dataIn_rst_n)
[docs] def propagate_out_clk(self, u):
u.clk(self.dataOut_clk)
u.rst(~self.dataOut_rst_n)
[docs] def create_data_reg(self, name_prefix, clk=None, rst=None):
"""
Create a registers for data signals with default values
from :class:`hwt.synthesizer.unit.Unit` parameters and with specified clk/rst
"""
regs = HObjList()
def_vals = self.DATA_RESET_VAL
d_sigs = self.get_data(self.dataIn)
if def_vals is None or isinstance(def_vals, (int)):
def_vals = [def_vals for _ in d_sigs]
for d, def_val in zip(d_sigs, def_vals):
r = self._reg(name_prefix + d._name,
d._dtype,
def_val=def_val,
clk=clk, rst=rst)
regs.append(r)
return regs
def _impl(self):
vld, rd = self.get_valid_signal, self.get_ready_signal
din = self.dataIn
dout = self.dataOut
din_clk = {
"clk": self.dataIn_clk,
"rst": self.dataIn_rst_n
}
dout_clk = {
"clk": self.dataOut_clk,
"rst": self.dataOut_rst_n
}
din_en = self._reg("din_en", def_val=0, **din_clk)
dout_ack = self._reg("dout_ack", def_val=0, **dout_clk)
din_data = self.create_data_reg("din_", **din_clk)
tx_fsm = self.tx_fsm
self.propagate_in_clk(tx_fsm)
tx_fsm.vld(vld(din))
in_ready = tx_fsm.rd
rd(din)(in_ready)
out_en_gen = self.out_en_pulse_gen
self.propagate_clk(out_en_gen, reverse=False)
dout_en = out_en_gen.dataOut_en
rx_fsm = self.rx_fsm
self.propagate_out_clk(rx_fsm)
rx_fsm.ack(rd(dout))
rx_fsm.vld(dout_en)
dout_vld_n = rx_fsm.rd
next_dataIn = vld(din) & in_ready
If(next_dataIn,
din_data(HObjList(self.get_data(din)))
)
din_en(din_en ^ next_dataIn)
out_en_gen.dataIn(din_en)
dout_data = self.create_data_reg("dout_", **dout_clk)
dout_load_data = ~dout_vld_n & rd(dout)
If(dout_load_data,
dout_data(din_data)
)
FAST_CLK_PERIOD_NS = 1e9 / max(self.dataIn_clk.FREQ, self.dataOut_clk.FREQ)
for d_in, d_out in zip(din_data, dout_data):
set_max_delay(d_in, d_out, FAST_CLK_PERIOD_NS)
HObjList(self.get_data(dout))(dout_data)
dout_vld = ~dout_vld_n
dout_vld_delayed = self._reg("dout_vld_delayed", def_val=0, **dout_clk)
dout_vld_delayed(dout_vld)
dout_vld = dout_vld_delayed
vld(dout)(dout_vld)
dout_ack(dout_ack ^ dout_load_data)
in_ack_gen = self.in_ack_pulse_gen
self.propagate_clk(in_ack_gen, reverse=True)
in_ack_gen.dataIn(dout_ack)
din_ack = in_ack_gen.dataOut_en
tx_fsm.ack(din_ack)
[docs]def example_HandshakedCdc():
u = HandshakedCdc(Handshaked)
u.IN_FREQ = int(100e6)
u.OUT_FREQ = int(200e6)
return u
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
u = example_HandshakedCdc()
print(to_rtl_str(u))