from math import ceil
from hwt.code import FsmBuilder, Concat, If, In
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.enum import HEnum
from hwt.hwIO import HwIO
from hwt.hwIOs.agents.rdVldSync import HwIODataRdVldAgent
from hwt.hwIOs.std import HwIOVectSignal, HwIORdVldSync, \
HwIODataRdVld
from hwt.hwIOs.utils import addClkRstn
from hwt.hwModule import HwModule
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwtLib.clocking.clkBuilder import ClkBuilder
from hwtLib.peripheral.mdio.intf import Mdio
from hwtSimApi.hdlSimulator import HdlSimulator
from hwt.hdl.types.bitsRtlSignal import HBitsRtlSignal
[docs]
class MdioAddr(HwIO):
"""
.. hwt-autodoc::
"""
@override
def hwDeclr(self):
self.phy = HwIOVectSignal(5)
self.reg = HwIOVectSignal(5)
[docs]
class MdioReq(HwIORdVldSync):
"""
MDIO transaction request interface
.. hwt-autodoc::
"""
@override
def hwDeclr(self):
self.opcode = HwIOVectSignal(Mdio.OP_W) # R/W
self.addr = MdioAddr()
self.wdata = HwIOVectSignal(Mdio.D_W)
HwIORdVldSync.hwDeclr(self)
[docs]
@override
def _initSimAgent(self, sim: HdlSimulator):
self._ag = MdioReqAgent(sim, self)
[docs]
class MdioReqAgent(HwIODataRdVldAgent):
"""
Simulation agent for :class:`.MdioReq` interface
"""
[docs]
@override
def set_data(self, data):
i = self.hwIO
if data is None:
i.opcode.write(None)
i.addr.phy.write(None)
i.addr.reg.write(None)
i.wdata.write(None)
else:
opcode, (phyaddr, regaddr), wdata = data
i.opcode.write(opcode)
i.addr.phy.write(phyaddr)
i.addr.reg.write(regaddr)
i.wdata.write(wdata)
[docs]
@override
def get_data(self):
i = self.hwIO
return (i.opcode.read(),
(i.addr.phy.read(), i.addr.reg.read()),
i.wdata.read())
[docs]
def shift_in_msb_first(reg, sig_in):
"""
Shift data in to register, MSB first
"""
w = reg._dtype.bit_length()
w_in = sig_in._dtype.bit_length()
return reg(Concat(reg[w-w_in:], sig_in))
[docs]
class MdioMaster(HwModule):
"""
Master for MDIO interface.
:ivar ~.CLK_FREQ: frequency of input clock
:ivar ~.MDIO_FREQ: frequency of output MDIO clock
* based on:
* https://opencores.org/websvn/filedetails?repname=ethmac10g&path=%2Fethmac10g%2Ftrunk%2Frtl%2Fverilog%2Fmgmt%2Fmdio.v
* https://github.com/NetFPGA/netfpga/blob/master/lib/verilog/core/io/mdio/src/nf2_mdio.v
.. hwt-autodoc::
"""
@override
def hwConfig(self):
self.CLK_FREQ = HwParam(int(100e6))
self.MDIO_FREQ = HwParam(int(2.5e6))
@override
def hwDeclr(self):
addClkRstn(self)
self.md = Mdio()._m()
self.rdata = HwIODataRdVld()._m()
self.rdata.DATA_WIDTH = Mdio.D_W
self.req = MdioReq()
[docs]
def _packet_sequence_timer(self,
mdio_clk_rising: HBitsRtlSignal,
mdio_clk_falling: HBitsRtlSignal, rst: HBitsRtlSignal):
"""
Create timers for all important events in protocol main FSM
"""
PRE_W = Mdio.PRE_W
ADDR_BLOCK_W = Mdio.ADDR_BLOCK_W
TA_W = Mdio.TA_W
preamble_last = self._sig("preamble_last")
addr_block_last = self._sig("addr_block_last")
turnarround_last_en = self._sig("turnarround_last_en")
data_last = self._sig("data_last")
CNTR_MAX = PRE_W + ADDR_BLOCK_W + TA_W + Mdio.D_W - 1
timer = self._reg("packet_sequence_timer",
dtype=HBits(log2ceil(CNTR_MAX)),
def_val=0, rst=self.rst_n._isOn() | rst._isOn())
If(mdio_clk_falling,
timer(timer + 1)
)
preamble_last(mdio_clk_falling & timer._eq(PRE_W - 1))
addr_block_last(mdio_clk_falling & timer._eq(PRE_W + ADDR_BLOCK_W - 1))
turnarround_last_en(timer._eq(PRE_W + ADDR_BLOCK_W + TA_W - 1))
data_last(mdio_clk_rising & timer._eq(CNTR_MAX))
return preamble_last, addr_block_last, turnarround_last_en, data_last
@override
def hwImpl(self):
# timers and other registers
CLK_HALF_PERIOD_DIV = ceil(self.clk.FREQ / (self.MDIO_FREQ * 2))
mdio_clk = self._reg("mdio_clk", def_val=0)
r_data_vld = self._reg("r_data_vld", def_val=0)
req = self.req
clk_half_period_en = ClkBuilder(self, self.clk).timer(
("clk_half_period_en", CLK_HALF_PERIOD_DIV))
If(clk_half_period_en,
mdio_clk(~mdio_clk),
)
mdio_clk_rising = clk_half_period_en & ~mdio_clk
mdio_clk_falling = clk_half_period_en & mdio_clk
idle = self._sig("idle")
preamble_last, addr_block_last, turnarround_last_en, data_last =\
self._packet_sequence_timer(
mdio_clk_rising, mdio_clk_falling, idle)
is_rx = self._reg("is_rx", def_val=0)
# protocol FSM
st_t = HEnum("st_t", ["idle", "pre", "addr_block", "ta", "data"])
st = FsmBuilder(self, st_t)\
.Trans(st_t.idle, (req.vld & ~r_data_vld, st_t.pre))\
.Trans(st_t.pre, (preamble_last, st_t.addr_block))\
.Trans(st_t.addr_block, (addr_block_last, st_t.ta))\
.Trans(st_t.ta, (turnarround_last_en & (
(is_rx & mdio_clk_falling)
| (~is_rx & mdio_clk_rising)), st_t.data))\
.Trans(st_t.data, (data_last, st_t.idle)).stateReg
idle(st._eq(st_t.idle))
req.rd(idle & ~r_data_vld)
If(idle,
is_rx(req.opcode._eq(Mdio.OP.READ))
)
# TX logic
md = self.md
TX_W = md.ST_W + md.OP_W + md.PA_W + md.RA_W + md.TA_W + md.D_W
tx_reg = self._reg("tx_reg", HBits(TX_W))
If(idle & req.vld,
tx_reg(Concat(Mdio.ST, req.opcode, req.addr.phy,
req.addr.reg, Mdio.TA, req.wdata))
).Elif(mdio_clk_falling & In(st, [st_t.addr_block, st_t.data, st_t.ta, st_t.data]),
tx_reg(tx_reg << 1)
)
md.c(mdio_clk)
# because MDIO uses open-drain, this means that if t=0 the value of the signal is 1
md.io.o(0)
tx_bit = tx_reg[tx_reg._dtype.bit_length() - 1]
md.io.t(~idle & ~tx_bit & (st._eq(st_t.addr_block)
| (In(st, [st_t.data, st_t.ta]) & ~is_rx))
)
# RX logic
rx_reg = self._reg("rx_reg", HBits(md.D_W))
If(st._eq(st_t.data) & is_rx & mdio_clk_rising,
shift_in_msb_first(rx_reg, self.md.io.i)
)
If(idle & self.rdata.rd,
r_data_vld(0)
).Elif(addr_block_last,
r_data_vld(is_rx)
)
self.rdata.vld(idle & r_data_vld)
self.rdata.data(rx_reg)
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = MdioMaster()
print(to_rtl_str(m))