Source code for hwtLib.peripheral.ethernet.rmii_agent
from collections import deque
from hwt.interfaces.agents.vldSynced import VldSyncedAgent
from hwt.pyUtils.arrayQuery import grouper
from hwtSimApi.agents.base import AgentBase
from hwtSimApi.hdlSimulator import HdlSimulator
from typing import List
from pyMathBitPrecise.bit_utils import get_bit_range
from hwtLib.peripheral.ethernet.constants import ETH
from itertools import chain
[docs]class RmiiAgent(AgentBase):
[docs] def __init__(self, sim: HdlSimulator, intf):
AgentBase.__init__(self, sim, intf)
intf.ref_clk._initSimAgent(sim)
intf.rx._initSimAgent(sim)
intf.tx._initSimAgent(sim)
[docs] def getDrivers(self):
raise NotImplementedError()
[docs] def getMonitors(self):
i = self.intf
yield from i.ref_clk._ag.getDrivers()
yield from i.tx._ag.getMonitors()
yield from i.rx._ag.getDrivers()
[docs]class RmiiTxChannelAgent(VldSyncedAgent):
"""
Simulation agent for :class:`hwtLib.peripheral.ethernet.rmii.RmiiTxChannel` interface
:type intf: RmiiTxChannel
"""
[docs] def __init__(self, sim: HdlSimulator, intf, allowNoReset=False):
VldSyncedAgent.__init__(self, sim, intf, allowNoReset=allowNoReset)
self.is_monitor = False
self.actual_frame = []
self.frames = deque()
[docs] def getMonitors(self):
self.is_monitor = True
return VldSyncedAgent.getMonitors(self)
[docs] def _pop_frame(self):
data = self.data
assert len(data) % 4 == 0, (len(data), data)
frame = []
for b01, b23, b45, b67 in grouper(4, data):
b = (int(b67) << 6) | (int(b45) << 4) | (int(b23) << 2) | int(b01)
frame.append(b)
return frame
[docs] def get_data(self):
return self.intf.d.read()
[docs] def set_data(self, data):
self.intf.d.write(data)
[docs] @classmethod
def get_valid_signal(cls, intf):
return intf.en
[docs] def get_valid(self):
v = self._vld.read()
if not int(v):
# this is potential end of frame
if self.data:
self.frames.append(self._pop_frame())
self.data = deque()
return v
[docs] def set_valid(self, val):
self._lastVld = val
return self._vld.write(val)
[docs]class RmiiRxChannelAgent(VldSyncedAgent):
"""
Simulation agent for :class:`hwtLib.peripheral.ethernet.rmii.RmiiRxChannel` interface
:type intf: RmiiRxChannel
"""
[docs] def _append_frame(self, frame: List[int], add_preamble=True):
data = self.data
if add_preamble:
preamble = [
0x0,
*[int(ETH.PREAMBLE_1B) for _ in range(7)],
int(ETH.SFD),
]
frame = chain(preamble, frame)
for b in frame:
b01 = b & 0b11
b23 = get_bit_range(b, 2, 2)
b45 = get_bit_range(b, 4, 2)
b67 = get_bit_range(b, 6, 2)
data.extend([b01, b23, b45, b67])
[docs] def get_data(self):
return self.intf.d.read()
[docs] def set_data(self, data):
self.intf.d.write(data)
[docs] @classmethod
def get_valid_signal(cls, intf):
return intf.crs_dv
[docs] def get_valid(self):
return self._vld.read()
[docs] def set_valid(self, val):
self._lastVld = val
return self._vld.write(val)