from collections import deque
from typing import Deque, Union
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.struct import HStruct
from hwt.simulator.agentBase import SyncAgentBase
from hwtLib.peripheral.usb.constants import usb_addr_t, usb_endp_t, \
usb_crc5_t, usb_pid_t, USB_PID
from hwtLib.peripheral.usb.sim.agent_base import UsbPacketToken, UsbPacketData, \
UsbPacketHandshake
from hwtLib.peripheral.usb.sim.usb_agent_device import UsbDevAgent
from hwtLib.peripheral.usb.sim.usb_agent_host import UsbHostAgent
from hwtLib.peripheral.usb.usb2.utmi_agent import Utmi_8bAgent
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.process_utils import CallbackLoop
[docs]class UtmiUsbHostProcAgent(UsbHostAgent):
"""
A simulation agent for :class:`hwtLib.peripheral.usb.usb2.utmi.Utmi_8b` interface
with the functionality of the host.
"""
usb_packet_token_t = HStruct(
(usb_pid_t, "pid"),
(usb_pid_t, "pid_inv"), # inversion of the pid
(usb_addr_t, "addr"),
(usb_endp_t, "endp"),
(usb_crc5_t, "crc5"), # :note: does not involve USB_PID, only addr, endp
)
[docs] def parse_packet_pid_and_bytes(self, pid: int, p: Deque[int]):
if USB_PID.is_token(pid):
return UsbPacketToken.from_pid_and_body_bytes(pid, p)
elif USB_PID.is_data(pid):
crc16_h = p.pop()
crc16_l = p.pop()
crc16 = (crc16_h << 8 | crc16_l)
new_p = UsbPacketData(pid, p)
expected_crc = new_p.crc16()
assert crc16 == expected_crc, (crc16, expected_crc, p)
return new_p
elif USB_PID.is_hs(pid):
return UsbPacketHandshake(pid)
else:
raise NotImplementedError(pid)
[docs] def parse_packet(self, p: Deque[int]):
# need to cut of ulpi tx_cmd header
pid = int(p.popleft())
pid_inv = (pid & 0xf0) >> 4
pid &= 0xf
assert pid == (~pid_inv & 0xf), (pid, pid_inv)
return self.parse_packet_pid_and_bytes(pid, p)
[docs] def deparse_packet(self, p: Union[UsbPacketToken, UsbPacketData, UsbPacketHandshake]):
cls = type(p)
v = deque()
if cls is UsbPacketToken:
v0 = self.usb_packet_token_t.from_py({
"pid": p.pid,
"pid_inv":~p.pid & 0xf,
"addr": p.addr,
"endp": p.endp,
"crc5": p.crc5(),
})
v1 = v0._reinterpret_cast(Bits(8)[self.usb_packet_token_t.bit_length() // 8])
v.extend(int(_v) for _v in v1)
elif cls is UsbPacketData:
v.append(((~p.pid & 0xf) << 4) | p.pid)
v.extend(p.data)
crc16 = p.crc16()
v.append(crc16 & 0xff)
v.append(crc16 >> 8)
elif cls is UsbPacketHandshake:
v.append(((~p.pid & 0xf) << 4) | p.pid)
else:
raise NotImplementedError(cls, p)
return v
[docs]class UtmiUsbDevProcAgent(UsbDevAgent):
[docs] def parse_packet_pid_and_bytes(self, pid: int, p: Deque[int]):
return UtmiUsbHostProcAgent.parse_packet_pid_and_bytes(self, pid, p)
[docs] def parse_packet(self, p):
return UtmiUsbHostProcAgent.parse_packet(self, p)
[docs] def deparse_packet(self, p):
# need to add ulpi tx_cmd header
return UtmiUsbHostProcAgent.deparse_packet(self, p)
[docs]class UtmiUsbAgent(Utmi_8bAgent, SyncAgentBase):
"""
:class:` hwtLib.peripheral.usb.usb2.utmi_agent.Utmi_8bAgent`
with device host logic and USB stack
"""
[docs] def __init__(self, sim:HdlSimulator, intf:Utmi_8bAgent, allowNoReset=False):
Utmi_8bAgent.__init__(self, sim, intf)
self.descriptors = None
self.usb_driver = None
self.usb_driver_proc = None
self.clk = self.intf._getAssociatedClk()
self.rst = self._discoverReset(intf, allowNoReset)
self.monitor = CallbackLoop(sim, self.clk, self.monitor, self.getEnable)
self.driver = CallbackLoop(sim, self.clk, self.driver, self.getEnable)
[docs] def run_usb_driver(self):
if self.usb_driver_proc is not None:
try:
next(self.usb_driver_proc)
except StopIteration:
pass
[docs] def driver_init(self):
yield from Utmi_8bAgent.driver(self)
[docs] def driver(self):
self.run_usb_driver()
[docs] def getDrivers(self):
# PHY/host
if self.usb_driver is None:
self.usb_driver = UtmiUsbHostProcAgent(self.link_to_phy_packets,
self.phy_to_link_packets)
self.usb_driver_proc = self.usb_driver.proc()
yield self.driver_init()
yield from Utmi_8bAgent.getDrivers(self)
[docs] def monitor_init(self):
yield from Utmi_8bAgent.monitor(self)
[docs] def monitor(self):
self.run_usb_driver()
[docs] def getMonitors(self):
# link/device
assert self.descriptors is not None
if self.usb_driver is None:
self.usb_driver = UtmiUsbDevProcAgent(self.phy_to_link_packets,
self.link_to_phy_packets,
self.descriptors)
self.usb_driver_proc = self.usb_driver.proc()
return [
self.monitor_init(),
*Utmi_8bAgent.getMonitors(self)
]