Source code for hwtLib.peripheral.mdio.master

from math import ceil

from hwt.code import FsmBuilder, Concat, If, In
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.enum import HEnum
from hwt.interfaces.agents.handshaked import HandshakedAgent
from hwt.interfaces.std import VectSignal, HandshakeSync, \
    Handshaked
from hwt.interfaces.utils import addClkRstn
from hwt.math import log2ceil
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.param import Param
from hwt.synthesizer.unit import Unit
from hwtLib.clocking.clkBuilder import ClkBuilder
from hwtLib.peripheral.mdio.intf import Mdio
from hwtSimApi.hdlSimulator import HdlSimulator


[docs]class MdioAddr(Interface): """ .. hwt-autodoc:: """ def _declr(self): self.phy = VectSignal(5) self.reg = VectSignal(5)
[docs]class MdioReq(HandshakeSync): """ MDIO transaction request interface .. hwt-autodoc:: """ def _declr(self): self.opcode = VectSignal(Mdio.OP_W) # R/W self.addr = MdioAddr() self.wdata = VectSignal(Mdio.D_W) HandshakeSync._declr(self)
[docs] def _initSimAgent(self, sim: HdlSimulator): self._ag = MdioReqAgent(sim, self)
[docs]class MdioReqAgent(HandshakedAgent): """ Simulation agent for :class:`.MdioReq` interface """
[docs] def set_data(self, data): i = self.intf if data is None: i.opcode.write(None) i.addr.phy.write(None) i.addr.reg.write(None) i.wdata.write(None) else: opcode, (phyaddr, regaddr), wdata = data i.opcode.write(opcode) i.addr.phy.write(phyaddr) i.addr.reg.write(regaddr) i.wdata.write(wdata)
[docs] def get_data(self): i = self.intf return (i.opcode.read(), (i.addr.phy.read(), i.addr.reg.read()), i.wdata.read())
[docs]def shift_in_msb_first(reg, sig_in): """ Shift data in to register, MSB first """ w = reg._dtype.bit_length() w_in = sig_in._dtype.bit_length() return reg(Concat(reg[w-w_in:], sig_in))
[docs]class MdioMaster(Unit): """ Master for MDIO interface. :ivar ~.FREQ: frequency of input clock :ivar ~.MDIO_FREQ: frequency of output MDIO clock * based on: * https://opencores.org/websvn/filedetails?repname=ethmac10g&path=%2Fethmac10g%2Ftrunk%2Frtl%2Fverilog%2Fmgmt%2Fmdio.v * https://github.com/NetFPGA/netfpga/blob/master/lib/verilog/core/io/mdio/src/nf2_mdio.v .. hwt-autodoc:: """ def _config(self): self.FREQ = Param(int(100e6)) self.MDIO_FREQ = Param(int(2.5e6)) def _declr(self): addClkRstn(self) self.md = Mdio()._m() self.rdata = Handshaked()._m() self.rdata.DATA_WIDTH = Mdio.D_W self.req = MdioReq()
[docs] def _packet_sequence_timer(self, mdio_clk_rising, mdio_clk_falling, rst_n): """ Create timers for all important events in protocol main FSM """ PRE_W = Mdio.PRE_W ADDR_BLOCK_W = Mdio.ADDR_BLOCK_W TA_W = Mdio.TA_W preamble_last = self._sig("preamble_last") addr_block_last = self._sig("addr_block_last") turnarround_last_en = self._sig("turnarround_last_en") data_last = self._sig("data_last") CNTR_MAX = PRE_W + ADDR_BLOCK_W + TA_W + Mdio.D_W - 1 timer = self._reg("packet_sequence_timer", dtype=Bits(log2ceil(CNTR_MAX)), def_val=0, rst=self.rst_n & rst_n) If(mdio_clk_falling, timer(timer + 1) ) preamble_last(mdio_clk_falling & timer._eq(PRE_W - 1)) addr_block_last(mdio_clk_falling & timer._eq(PRE_W + ADDR_BLOCK_W - 1)) turnarround_last_en(timer._eq(PRE_W + ADDR_BLOCK_W + TA_W - 1)) data_last(mdio_clk_rising & timer._eq(CNTR_MAX)) return preamble_last, addr_block_last, turnarround_last_en, data_last
def _impl(self): # timers and other registers CLK_HALF_PERIOD_DIV = ceil(self.clk.FREQ / (self.MDIO_FREQ * 2)) mdio_clk = self._reg("mdio_clk", def_val=0) r_data_vld = self._reg("r_data_vld", def_val=0) req = self.req clk_half_period_en = ClkBuilder(self, self.clk).timer( ("clk_half_period_en", CLK_HALF_PERIOD_DIV)) If(clk_half_period_en, mdio_clk(~mdio_clk), ) mdio_clk_rising = clk_half_period_en & ~mdio_clk mdio_clk_falling = clk_half_period_en & mdio_clk idle = self._sig("idle") preamble_last, addr_block_last, turnarround_last_en, data_last =\ self._packet_sequence_timer( mdio_clk_rising, mdio_clk_falling, ~idle) is_rx = self._reg("is_rx", def_val=0) # protocol FSM st_t = HEnum("st_t", ["idle", "pre", "addr_block", "ta", "data"]) st = FsmBuilder(self, st_t)\ .Trans(st_t.idle, (req.vld & ~r_data_vld, st_t.pre))\ .Trans(st_t.pre, (preamble_last, st_t.addr_block))\ .Trans(st_t.addr_block, (addr_block_last, st_t.ta))\ .Trans(st_t.ta, (turnarround_last_en & ( (is_rx & mdio_clk_falling) | (~is_rx & mdio_clk_rising)), st_t.data))\ .Trans(st_t.data, (data_last, st_t.idle)).stateReg idle(st._eq(st_t.idle)) req.rd(idle & ~r_data_vld) If(idle, is_rx(req.opcode._eq(Mdio.OP.READ)) ) # TX logic md = self.md TX_W = md.ST_W + md.OP_W + md.PA_W + md.RA_W + md.TA_W + md.D_W tx_reg = self._reg("tx_reg", Bits(TX_W)) If(idle & req.vld, tx_reg(Concat(Mdio.ST, req.opcode, req.addr.phy, req.addr.reg, Mdio.TA, req.wdata)) ).Elif(mdio_clk_falling & In(st, [st_t.addr_block, st_t.data, st_t.ta, st_t.data]), tx_reg(tx_reg << 1) ) md.c(mdio_clk) # because MDIO uses open-drain, this means that if t=0 the value of the signal is 1 md.io.o(0) tx_bit = tx_reg[tx_reg._dtype.bit_length() - 1] md.io.t(~idle & ~tx_bit & (st._eq(st_t.addr_block) | (In(st, [st_t.data, st_t.ta]) & ~is_rx)) ) # RX logic rx_reg = self._reg("rx_reg", Bits(md.D_W)) If(st._eq(st_t.data) & is_rx & mdio_clk_rising, shift_in_msb_first(rx_reg, self.md.io.i) ) If(idle & self.rdata.rd, r_data_vld(0) ).Elif(addr_block_last, r_data_vld(is_rx) ) self.rdata.vld(idle & r_data_vld) self.rdata.data(rx_reg)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = MdioMaster() print(to_rtl_str(u))