from collections import deque
from hwt.hdl.constants import NOP
from hwt.simulator.agentBase import SyncAgentBase
from hwtLib.peripheral.usb.constants import USB_LINE_STATE
from hwtLib.peripheral.usb.usb2.ulpi import ulpi_reg_otg_control_t_reset_defaults, \
ulpi_reg_function_control_t_reset_default
from hwtLib.peripheral.usb.usb2.utmi import Utmi_8b_rx, Utmi_8b, utmi_interrupt_t
from hwtSimApi.agents.base import AgentBase
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.triggers import WaitCombStable, WaitCombRead, WaitWriteOnly
from pyMathBitPrecise.bit_utils import ValidityError
[docs]class Utmi_8b_rxAgent(SyncAgentBase):
"""
Simulation agent for :class:`hwtLib.peripheral.usb.usb2.utmi.Utmi_8b_rx` interface.
:attention: "active" signal acts as a valid, "valid" signal acts as a mask
:ivar data: Deque[Deque[Tuple[int, int]]] (internal deque represents packets, tuple represents data, error)
.. figure:: ./_static/utmi_rx.png
"""
USB_ERROR = "ERROR"
[docs] def __init__(self, sim:HdlSimulator, intf: Utmi_8b_rx, allowNoReset=False):
SyncAgentBase.__init__(self, sim, intf, allowNoReset=allowNoReset)
self._last_active = 0
self.data = deque()
self.actual_packet = None
[docs] def get_data(self):
i = self.intf
if i.error.read():
d = self.USB_ERROR
else:
d = int(i.data.read())
return d
[docs] def set_active(self, val):
self.intf.active.write(val)
self._last_active = val
[docs] def set_data(self, data):
if data is None:
d = None
e = None
else:
if data is self.USB_ERROR:
e = 1
d = None
else:
e = 0
d = data
i = self.intf
i.data.write(d)
i.error.write(e)
[docs] def monitor(self):
yield WaitCombStable()
if self.notReset():
intf = self.intf
active = int(intf.active.read())
if active:
if not self._last_active:
# start of packet
assert self.actual_packet is None
self.actual_packet = deque()
vld = int(intf.valid.read())
if vld:
d = self.get_data()
if self._debugOutput is not None:
self._debugOutput.write("%s, read, %d: %r\n" % (
intf._getFullName(),
self.sim.now, d))
self.actual_packet.append(d)
elif self._last_active:
# end of packet
assert self.actual_packet, (self.sim.now, intf._getFullName())
self.data.append(self.actual_packet)
self.actual_packet = None
self._last_active = active
[docs] def driver(self):
yield WaitCombRead()
d = NOP
active = 0
if self.notReset():
if self.actual_packet:
d = self.actual_packet.popleft()
active = 1
elif self._last_active == 1:
# end of packet, need to have at least one clk tick with active=0
active = 0
d = NOP
elif self.data and self._last_active == 0:
self.actual_packet = self.data.popleft()
# first beat of packet (active=1, valid=0)
active = 1
d = NOP
yield WaitWriteOnly()
intf = self.intf
if d is NOP:
self.set_data(None)
intf.valid.write(0)
else:
self.set_data(d)
intf.valid.write(1)
self.set_active(active)
if active and self._debugOutput is not None:
self._debugOutput.write("%s, wrote, %d: %r\n" % (
intf._getFullName(),
self.sim.now, self.actualData))
[docs]class Utmi_8b_txAgent(SyncAgentBase):
"""
Simulation agent for :class:`hwtLib.peripheral.usb.usb2.utmi.Utmi_8b_tx` interface.
:ivar data: Deque[Deque[int]] (internal deque represents packets)
.. figure:: ./_static/utmi_rx.png
"""
[docs] def __init__(self, sim:HdlSimulator, intf, allowNoReset=False):
SyncAgentBase.__init__(
self, sim, intf, allowNoReset=allowNoReset)
self.data = deque()
self.actual_packet = None
self._last_ready = 0
self._last_valid = 0
[docs] def driver(self):
yield WaitCombRead()
if self.notReset():
d = NOP
if self.actual_packet:
d = self.actual_packet[0]
elif self._last_valid == 1:
# end of packet, need to have at least one clk tick with active=0
d = NOP
self.actual_packet = None
elif self.data and self._last_ready == 0:
self.actual_packet = self.data.popleft()
# first beat of packet (active=1, valid=0)
d = self.actual_packet[0]
yield WaitWriteOnly()
intf = self.intf
if d is NOP:
intf.data.write(None)
intf.vld.write(0)
else:
intf.data.write(d)
intf.vld.write(1)
yield WaitCombStable()
try:
rd = int(intf.rd.read())
except ValidityError:
raise AssertionError(self.sim.now, intf._getFullName(), "invalid rd (would cause desynchronization of the channel)")
if d is NOP:
if self._last_valid == 1:
assert rd == 1, (self.sim.now, intf._getFullName(), rd, "Ready must be 1 or 1 clk tick after end of packet (EOP state)")
if rd and self.actual_packet:
self.actual_packet.popleft()
else:
rd = 0
d = NOP
self._last_ready = rd
self._last_valid = int(d is not NOP)
[docs] def monitor(self):
yield WaitCombRead()
if self.notReset():
yield WaitWriteOnly()
yield WaitCombRead()
intf = self.intf
try:
vld = int(intf.vld.read())
except ValidityError:
raise AssertionError(self.sim.now, intf._getFullName(), "invalid vld, this would case desynchronization")
if self._last_valid and not vld:
# end of packet
self.data.append(self.actual_packet)
self.actual_packet = None
elif not self._last_valid and vld:
# start of packet
self.actual_packet = deque()
if vld:
try:
d = int(intf.data.read())
except ValidityError:
raise AssertionError(self.sim.now, intf._getFullName(), "invalid data")
self.actual_packet.append(d)
if vld or self._last_valid:
rd = 1
else:
rd = 0
yield WaitWriteOnly()
intf.rd.write(rd)
self._last_ready = rd
self._last_valid = vld
else:
self._last_ready = 0
self._last_valid = 0
[docs]class Utmi_8bAgent(AgentBase):
"""
Simulation agent for :class:`hwtLib.peripheral.usb.usb2.utmi.Utmi_8b` interface.
"""
[docs] def __init__(self, sim:HdlSimulator, intf: Utmi_8b):
AgentBase.__init__(self, sim, intf)
for i in [intf.function_control, intf.otg_control, intf.interrupt, intf.rx, intf.tx]:
i._initSimAgent(sim)
@property
def link_to_phy_packets(self):
return self.intf.tx._ag.data
@link_to_phy_packets.setter
def link_to_phy_packets_setter(self, v):
self.intf.tx._ag.data = v
@property
def actual_link_to_phy_packet(self):
return self.intf.tx._ag.actual_packet
@actual_link_to_phy_packet.setter
def actual_link_to_phy_packet_setter(self, v):
self.intf.tx._ag.actual_packet = v
@property
def phy_to_link_packets(self):
return self.intf.rx._ag.data
@phy_to_link_packets.setter
def phy_to_link_packets_setter(self, v):
self.intf.rx._ag.data = v
@property
def actual_phy_to_link_packet(self):
return self.intf.tx._ag.actual_packet
@actual_phy_to_link_packet.setter
def actual_phy_to_link_packet_setter(self, v):
self.intf.rx._ag.actual_packet = v
[docs] def getMonitors(self):
yield from self.intf.tx._ag.getDrivers()
yield from self.intf.rx._ag.getMonitors()
yield self.monitor()
[docs] def monitor(self):
yield WaitWriteOnly()
intf = self.intf
for i in intf.function_control._interfaces:
d = ulpi_reg_function_control_t_reset_default[i._name]
i.write(d)
for i in intf.otg_control._interfaces:
d = ulpi_reg_otg_control_t_reset_defaults[i._name]
i.write(d)
intf.tx.vld.write(0)
[docs] def getDrivers(self):
yield from self.intf.tx._ag.getMonitors()
yield from self.intf.rx._ag.getDrivers()
yield self.driver()
[docs] def driver(self):
yield WaitWriteOnly()
intf = self.intf
intf.LineState.write(USB_LINE_STATE.J)
intf.interrupt._ag.set_data(tuple(0 for _ in range(len(utmi_interrupt_t.fields) - 1)))
intf.tx.rd.write(0)
intf.rx.valid.write(0)
intf.rx.active.write(0)