Source code for hwtLib.peripheral.ethernet.rmii_adapter

from math import ceil

from hwt.code import If, Concat, rol, Switch, In
from hwt.hdl.operatorDefs import AllOps
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.enum import HEnum
from hwt.interfaces.utils import addClkRstn
from hwt.math import log2ceil
from hwt.synthesizer.param import Param
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axis import AxiStream
from hwtLib.peripheral.ethernet.constants import ETH
from hwtLib.peripheral.ethernet.rmii import Rmii
from hwtLib.peripheral.ethernet.vldsynced_data_err_last import VldSyncedDataErrLast
from pyMathBitPrecise.bit_utils import mask


[docs]class RmiiAdapter(Unit): """ Convertor which converts RMII interface to a simple AxiStream/VldSynced interface :note: handles CDC, Tx IPG (Inter packet grap), RMII Tx/Rx logic, add/remove preamble and SFD :attention: inside of packet the tx.vld has to stay 1 otherwise the underflow error will appear and frame data will be corrupted :ivar ~.FREQ: specifies the clk.FREQ for this core, f None it means that clk and eth.ref_clk is the same signal and synchronisation is not required :ivar ~.ASYNC_BUFF_DEPTH: depth of asynchronous buffers between eth.ref_clk clock domain and clk domain (if set to None the clock domain has to be the same) .. hwt-autodoc:: """ def _config(self): self.FREQ = Param(None) self.ASYNC_BUFF_DEPTH = Param(None) def _declr(self): addClkRstn(self) self.eth = Rmii()._m() if self.FREQ is None: self.clk.FREQ = self.eth.FREQ else: self.clk.FREQ = self.FREQ self.tx = AxiStream() self.rx = VldSyncedDataErrLast()._m() for ch in [self.rx, self.tx]: ch.DATA_WIDTH = 8
[docs] def _tx_ipg_logic(self, clk_edge, tx: AxiStream, tx_last_bits: RtlSignal, RMII_W): """ Tx IPG (Inter Packet Gap) logic, wait 96 bit times before sending next packet """ cntr_max = ceil(96 / RMII_W) - 1 tx_ipg_cntr = self._reg("tx_ipg_cntr", Bits(log2ceil(cntr_max), signed=False), def_val=cntr_max, clk=(self.eth.ref_clk, clk_edge)) If(tx.valid & tx.last & tx_last_bits, tx_ipg_cntr(cntr_max), ).Elif(tx_ipg_cntr != 0, tx_ipg_cntr(tx_ipg_cntr - 1) ) tx_ipg_en = self._sig("tx_ipg_en") tx_ipg_en(tx_ipg_cntr._eq(0)) return tx_ipg_en
[docs] def _tx_logic(self, rmii: Rmii, tx: AxiStream, D_W: int, RMII_W: int, clk_edge): CNTR_W = D_W // RMII_W clk = (rmii.ref_clk, clk_edge) tx_reg = self._reg("tx_reg", Bits(D_W, signed=False), def_val=0, clk=clk) tx_vld = self._reg("tx_vld", Bits(CNTR_W, signed=False), def_val=0, clk=clk) tx_last_bits = tx_vld._eq(1) # last_prev=1 -> sending last byte last_prev = self._reg("tx_last_prev", def_val=0) tx_ipg_en = self._tx_ipg_logic(clk_edge, tx, last_prev, RMII_W) # 7B preable + 1B SFD PREAMBLE_CNTR_MAX = 7 + 1 preamble_cntr = self._reg("tx_preamble_cntr", dtype=Bits(log2ceil(PREAMBLE_CNTR_MAX), signed=False), def_val=PREAMBLE_CNTR_MAX - 1, clk=clk) tx_ready = preamble_cntr._eq(0) & (tx_last_bits | tx_vld._eq(0)) tx.ready(tx_ready) If(tx_ready & tx.valid, last_prev(tx.last), ) If(tx_ipg_en & tx.valid, If(tx_vld._eq(0), # start sending new byte tx_vld(mask(CNTR_W)), If(preamble_cntr._eq(0), tx_reg(tx.data) ).Else( tx_reg(ETH.PREAMBLE_1B) ) ).Elif(tx_last_bits, # directly start sending next byte # (only if it is data from same frame) If(last_prev, tx_vld(0), ).Else( tx_vld(mask(CNTR_W)), ), Switch(preamble_cntr)\ .Case(0, # preambule already send tx_reg(tx.data) ).Case(1, tx_reg(ETH.SFD) ).Default( tx_reg(ETH.PREAMBLE_1B) ), If(preamble_cntr != 0, preamble_cntr(preamble_cntr - 1) ) ).Else( # continue sending current data from tx_reg tx_vld(tx_vld >> 1), tx_reg(tx_reg >> RMII_W), ) ).Else( preamble_cntr(PREAMBLE_CNTR_MAX - 1), tx_vld(tx_vld >> 1), tx_reg(tx_reg >> RMII_W), ) rmii.tx.d(tx_reg[RMII_W:]) rmii.tx.en(tx_vld != 0)
[docs] def _rx_logic(self, rmii: Rmii, rx: VldSyncedDataErrLast, D_W: int, RMII_W: int, clk_edge): CNTR_W = D_W // RMII_W clk = (rmii.ref_clk, clk_edge) # register for all input signals din = self._reg("rx_din", Bits(RMII_W), clk=clk) din_vld = self._reg("rx_din_vld", clk=clk, def_val=0) din(rmii.rx.d) din_vld(rmii.rx.crs_dv) # register for temporary RX byte reg0 = self._reg("rx_reg0", Bits(D_W, signed=False), def_val=0, clk=clk) # register for Rx byte for clock edge sync reg1 = self._reg("rx_reg1", Bits(D_W, signed=False), def_val=0, clk=rmii.ref_clk) reg1_vld = self._reg("rx_reg1_vld", def_val=0) # one hot counter with rotating 1 cntr = self._reg("rx_cntr", Bits(CNTR_W, signed=False), def_val=1, clk=clk) # counter of preambule bytes preamble_cnt = self._reg("rx_preamble_cntr", Bits(log2ceil(8), signed=False), clk=clk) st_t = HEnum("rx_state_t", ["skip0", "preamble_and_sfd", "data", "error"]) st = self._reg("rx_state", st_t, def_val=st_t.skip0, clk=clk) last_in_B = cntr[CNTR_W - 1] & din_vld actual_rx_B = self._sig("rx_actual_val", dtype=Bits(8)) actual_rx_B(Concat(din, reg0[:RMII_W])) Switch(st)\ .Case(st_t.skip0, If(din_vld & (din != 0), st(st_t.preamble_and_sfd), preamble_cnt(8-1), ), ).Case(st_t.preamble_and_sfd, If(last_in_B, If(preamble_cnt._eq(0), If(actual_rx_B != ETH.SFD, st(st_t.error) ).Else( st(st_t.data) ) ).Elif(actual_rx_B != ETH.PREAMBLE_1B, st(st_t.error), ), preamble_cnt(preamble_cnt - 1) ) ).Case(st_t.error, If(~din_vld, st(st_t.skip0), ) ).Case(st_t.data, If(cntr[CNTR_W - 1] & ~din_vld, st(st_t.skip0) ) ) # bit collection logic If(st._eq(st_t.skip0), cntr(1<<1), reg0(Concat(din, Bits(D_W - RMII_W).from_py(0))), ).Elif(In(st, [st_t.preamble_and_sfd, st_t.data]), # shift in LSB first # :note: only D_W - RMII_W bits is captured in reg0 reg0(Concat(din, reg0[:RMII_W])), cntr(rol(cntr, 1)), ) reg1_vld(st._eq(st_t.data) & last_in_B) If(reg1_vld.next, reg1(actual_rx_B), ) err = st._eq(st_t.error) If(err, rx.data(None), ).Else( rx.data(reg1) ) rx.err(err) rx.last(~din_vld) rx.vld(reg1_vld | st._eq(st_t.error))
def _impl(self): assert self.clk.FREQ >= self.eth.ref_clk.FREQ, ( self.clk.FREQ, self.eth.ref_clk.FREQ, "Has to have same or faster frequency " "otherwise data will be corrupted during Rx/Tx") rmii = self.eth RMII_W = rmii.DATA_WIDTH D_W = self.tx.DATA_WIDTH assert D_W % RMII_W == 0, (D_W, RMII_W) self._tx_logic(rmii, self.tx, D_W, RMII_W, AllOps.FALLING_EDGE) self._rx_logic(rmii, self.rx, D_W, RMII_W, AllOps.RISING_EDGE)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = RmiiAdapter() print(to_rtl_str(u))