Source code for hwtLib.peripheral.usb.usb2.ulpi_usb_agent
from typing import Deque, Union
from hwtLib.peripheral.usb.sim.agent_base import UsbPacketToken, UsbPacketData, \
UsbPacketHandshake
from hwtLib.peripheral.usb.sim.usb_agent_device import UsbDevAgent
from hwtLib.peripheral.usb.usb2.ulpi import ULPI_TX_CMD, Ulpi
from hwtLib.peripheral.usb.usb2.ulpi_agent import UlpiAgent
from hwtLib.peripheral.usb.usb2.utmi_usb_agent import UtmiUsbHostProcAgent
from hwtSimApi.hdlSimulator import HdlSimulator
[docs]class UlpiUsbHostProcAgent(UtmiUsbHostProcAgent):
"""
A simulation agent for :class:`hwtLib.peripheral.usb.usb2.ulpi.Ulpi` interface
with the functionality of the host.
"""
[docs] def parse_packet(self, p: Deque[int]):
# need to cut of ulpi tx_cmd header
tx_cmd = p.popleft()
if ULPI_TX_CMD.is_USB_PID(tx_cmd):
pid = ULPI_TX_CMD.get_USB_PID(tx_cmd)
return self.parse_packet_pid_and_bytes(pid, p)
else:
raise NotImplementedError(tx_cmd)
[docs] def deparse_packet(self, p: Union[UsbPacketToken, UsbPacketData, UsbPacketHandshake]):
cls = type(p)
v = UtmiUsbHostProcAgent.deparse_packet(self, p)
v[0] = ULPI_TX_CMD.USB_PID(p.pid)
return v
[docs]class UlpiUsbDevProcAgent(UsbDevAgent):
[docs] def parse_packet_pid_and_bytes(self, pid: int, p: Deque[int]):
return UlpiUsbHostProcAgent.parse_packet_pid_and_bytes(self, pid, p)
[docs] def parse_packet(self, p: Deque[int]):
return UlpiUsbHostProcAgent.parse_packet(self, p)
[docs] def deparse_packet(self, p: Union[UsbPacketToken, UsbPacketData, UsbPacketHandshake]):
# need to add ulpi tx_cmd header
return UlpiUsbHostProcAgent.deparse_packet(self, p)
[docs]class UlpiUsbAgent(UlpiAgent):
"""
:class:`hwtLib.peripheral.usb.usb2.ulpi_agent.UlpiAgent`
with device host logic and USB stack
"""
[docs] def __init__(self, sim:HdlSimulator, intf:Ulpi, allowNoReset=False):
UlpiAgent.__init__(self, sim, intf, allowNoReset=allowNoReset)
self.descriptors = None
self.usb_driver = None
self.usb_driver_proc = None
[docs] def run_usb_driver(self):
if self.usb_driver_proc is not None:
try:
next(self.usb_driver_proc)
except StopIteration:
pass
[docs] def driver(self):
self.run_usb_driver()
yield from UlpiAgent.driver(self)
self.run_usb_driver()
[docs] def getDrivers(self):
# PHY/host
self.usb_driver = UlpiUsbHostProcAgent(self.link_to_phy_packets, self.phy_to_link_packets)
self.usb_driver_proc = self.usb_driver.proc()
return UlpiAgent.getDrivers(self)
[docs] def monitor(self):
self.run_usb_driver()
yield from UlpiAgent.monitor(self)
self.run_usb_driver()
[docs] def getMonitors(self):
# link/device
assert self.descriptors is not None
self.usb_driver = UlpiUsbDevProcAgent(self.phy_to_link_packets,
self.link_to_phy_packets,
self.descriptors)
self.usb_driver_proc = self.usb_driver.proc()
return UlpiAgent.getMonitors(self)