Source code for hwtLib.peripheral.usb.sim.usb_agent_device

from typing import Deque, Union, List

from hwtLib.peripheral.usb.constants import USB_PID
from hwtLib.peripheral.usb.descriptors.bundle import UsbDescriptorBundle, \
    UsbNoSuchDescriptor
from hwtLib.peripheral.usb.descriptors.std import USB_DESCRIPTOR_TYPE, \
    usb_descriptor_device_t, usb_descriptor_configuration_t, \
    usb_descriptor_device_qualifier_t, LANG_ID_EN_US
from hwtLib.peripheral.usb.device_request import usb_device_request_t, \
    USB_REQUEST_TYPE_RECIPIENT, USB_REQUEST_TYPE_TYPE, \
    USB_REQUEST_TYPE_DIRECTION, USB_REQUEST
from hwtLib.peripheral.usb.sim.agent_base import UsbAgent, UsbPacketToken, \
    UsbPacketData, UsbPacketHandshake
from pyMathBitPrecise.bit_utils import mask


[docs]class UsbDevAgent(UsbAgent): """ This agent uses rx and tx queue to comunicate with a USB device. The agnet implements address assignment and descriptor upload and it is meant to be extended with a specific functionality of USB device. """
[docs] def __init__(self, rx: Deque[Union[UsbPacketToken, UsbPacketData]], tx: Deque[Union[UsbPacketToken, UsbPacketData]], descriptors:UsbDescriptorBundle): super(UsbDevAgent, self).__init__(rx, tx) # dictionary endp: last data pid value self.last_data_pid = {0: USB_PID.DATA_1} self.addr = 0 self.descr = descriptors
[docs] def set_data_pid(self, pid: USB_PID): assert self.last_data_pid != pid, (self.last_data_pid, pid, "data pid should change between the transactions") self.last_data_pid = pid
[docs] def get_data_pid(self): pid = self.last_data_pid if pid == USB_PID.DATA_1: pid = USB_PID.DATA_0 else: pid = USB_PID.DATA_1 self.last_data_pid = pid return pid
[docs] def send_data(self, endp: int, pid_init: USB_PID, maxPacketLen: int, data_bytes: List[int]): pid = pid_init begin = 0 end = len(data_bytes) while True: t = yield from self.receive(UsbPacketToken) assert t.pid == USB_PID.TOKEN_IN, t assert t.addr == self.addr, (t, self.addr) assert t.endp == endp, t _end = min(begin + maxPacketLen, end) p = UsbPacketData(pid, data_bytes[begin:_end]) yield from self.send(p) yield from self.wait_on_ack() begin = _end if pid == USB_PID.DATA_0: pid = USB_PID.DATA_1 elif pid == USB_PID.DATA_1: pid = USB_PID.DATA_0 else: raise ValueError() if len(p.data) < maxPacketLen: break
[docs] def proc(self): while True: # wait on request from host t = yield from self.receive(UsbPacketToken) assert t.pid == USB_PID.TOKEN_SETUP, t assert t.addr == self.addr, t assert t.endp == 0, t req = yield from self.receive(UsbPacketData) assert req.pid == USB_PID.DATA_0, req req = req.unpack(usb_device_request_t) yield from self.send_ack() # process setup request if int(req.bmRequestType.recipient) == USB_REQUEST_TYPE_RECIPIENT.DEVICE and\ int(req.bmRequestType.type) == USB_REQUEST_TYPE_TYPE.STANDARD: if int(req.bmRequestType.data_transfer_direction) == USB_REQUEST_TYPE_DIRECTION.DEV_TO_HOST: # now we expect that we get TOKEN_IN so the device can send the data to host if int(req.bRequest) == USB_REQUEST.GET_DESCRIPTOR: _v = int(req.wValue) des_t = _v >> 8 des_i = _v & mask(8) lang_id = int(req.wIndex) wLength = int(req.wLength) if des_t == USB_DESCRIPTOR_TYPE.DEVICE: descr_t = usb_descriptor_device_t elif des_t == USB_DESCRIPTOR_TYPE.CONFIGURATION: descr_t = usb_descriptor_configuration_t elif des_t == USB_DESCRIPTOR_TYPE.DEVICE_QUALIFIER: descr_t = usb_descriptor_device_qualifier_t elif des_t == USB_DESCRIPTOR_TYPE.STRING: descr_t = str if des_i == 0: assert lang_id == 0, lang_id elif lang_id != LANG_ID_EN_US: raise NotImplementedError("Need to filter string descriptors for this specific " "language or language id is invalid", lang_id) else: raise NotImplementedError(des_t) try: descr_i = self.descr.get_descriptor_index(descr_t, des_i) except UsbNoSuchDescriptor: descr_i = None if descr_i is None: t = yield from self.receive(UsbPacketToken) assert t.pid == USB_PID.TOKEN_IN, t assert t.addr == self.addr, (t, self.addr) assert t.endp == 0, t yield from self.send(UsbPacketHandshake(USB_PID.HS_STALL)) continue else: dev_descr = self.descr.get_descriptor(usb_descriptor_device_t, 0)[1] ep0_max_packet_size = int(dev_descr.body.bMaxPacketSize) data = self.descr.get_descr_bytes(descr_i, wLength) yield from self.send_data(0, USB_PID.DATA_1, ep0_max_packet_size, data) else: raise NotImplementedError() t = yield from self.receive(UsbPacketToken) assert t.pid == USB_PID.TOKEN_OUT, t assert t.addr == self.addr, (t, self.addr) assert t.endp == 0, t t = yield from self.receive(UsbPacketData) assert t.pid == USB_PID.DATA_1, t assert len(t.data) == 0, t yield from self.send_ack() else: _addr = self.addr # host to dev if int(req.bRequest) == USB_REQUEST.SET_ADDRESS: assert int(req.wIndex) == 0, req.wIndex assert int(req.wLength) == 0, req.wLength addr = int(req.wValue) assert addr > 0, addr self.addr = addr else: raise NotImplementedError(req) t = yield from self.receive(UsbPacketToken) assert t.pid == USB_PID.TOKEN_IN, t assert t.addr == _addr, (t, _addr) # intentionaly use old address assert t.endp == 0, t yield from self.send(UsbPacketData(USB_PID.DATA_1, [])) yield from self.wait_on_ack() else: raise NotImplementedError(req)