Source code for hwtLib.peripheral.usb.sim.usb_agent_host

from typing import Deque, Union, Optional, List

from hwt.code import Concat
from hwt.hdl.types.bitsVal import BitsVal
from hwt.hdl.types.struct import HStruct
from hwt.hdl.types.structValBase import StructValBase
from hwt.hdl.value import HValue
from hwt.synthesizer.rtlLevel.constants import NOT_SPECIFIED
from hwtLib.peripheral.usb.constants import USB_PID
from hwtLib.peripheral.usb.descriptors.bundle import UsbDescriptorBundle, \
    UsbNoSuchDescriptor
from hwtLib.peripheral.usb.descriptors.cdc import usb_descriptor_functional_header, \
    USB_CDC_DESCRIPTOR_SUBTYPE, usb_descriptor_functional_header_t, \
    usb_descriptor_functional_call_management_t, \
    usb_descriptor_functional_abstract_control_management_t, \
    usb_define_descriptor_functional_union_t
from hwtLib.peripheral.usb.descriptors.std import usb_descriptor_interface_t, \
    USB_DEVICE_CLASS, usb_descriptor_header_t, USB_DESCRIPTOR_TYPE, \
    usb_descriptor_configuration_t, usb_descriptor_endpoint_t, \
    make_usb_device_request_get_descr, usb_define_descriptor_string, \
    usb_descriptor_device_t, usb_descriptor_device_qualifier_t, \
    usb_define_descriptor_string0, USB_ENDPOINT_DIR
from hwtLib.peripheral.usb.device_request import make_usb_device_request, \
    USB_REQUEST_TYPE_RECIPIENT, USB_REQUEST_TYPE_TYPE, \
    USB_REQUEST_TYPE_DIRECTION, USB_REQUEST
from hwtLib.peripheral.usb.sim.agent_base import UsbAgent, UsbPacketToken, \
    UsbPacketData, UsbPacketHandshake
from hwtLib.types.ctypes import uint8_t


[docs]class UsbHostAgent(UsbAgent): """ This agent uses rx and tx queue to comunicate with a USB device. It performs bus enumerations, sets address to a device and downloads the descriptors. Note that the agent is written in a way which allows for easy extension to a driver which can parse the specific descriptors and comunicate with devices further. """
[docs] def __init__(self, rx: Deque[Union[UsbPacketToken, UsbPacketHandshake, UsbPacketData]], tx: Deque[Union[UsbPacketToken, UsbPacketHandshake, UsbPacketData]]): super(UsbHostAgent, self).__init__(rx, tx) # the addresses are not asigned yet this dictionary will be filled during device enumeration self.descr = {} self._descriptors_downloaded = False
[docs] def parse_interface_functional_descriptor(self, interface_descr: StructValBase, data:BitsVal): bInterfaceClass = int(interface_descr.body.bInterfaceClass) if bInterfaceClass == USB_DEVICE_CLASS.CDC_CONTROL: h_t = usb_descriptor_functional_header header = data[h_t.bit_length():]._reinterpret_cast(h_t) sub_t = int(header.bDescriptorSubtype) if sub_t == USB_CDC_DESCRIPTOR_SUBTYPE.HEADER: descr_t = usb_descriptor_functional_header_t elif sub_t == USB_CDC_DESCRIPTOR_SUBTYPE.CALL_MANAGEMENT_FUNCTIONAL: descr_t = usb_descriptor_functional_call_management_t elif sub_t == USB_CDC_DESCRIPTOR_SUBTYPE.ABSTRACT_CONTROL_MANAGEMENT: descr_t = usb_descriptor_functional_abstract_control_management_t elif sub_t == USB_CDC_DESCRIPTOR_SUBTYPE.UNION: slave_cnt = (data._dtype.bit_length() - h_t.bit_length() - 8) // 8 descr_t = usb_define_descriptor_functional_union_t(slave_cnt) assert data._dtype.bit_length() == descr_t.bit_length() return data._reinterpret_cast(descr_t) else: raise NotImplementedError()
[docs] def parse_configuration_descriptor_bundle(self, data_bytes: List[int]): data = [d if isinstance(d, HValue) else uint8_t.from_py(d) for d in data_bytes] data = Concat(*reversed(data)) offset = 0 end = data._dtype.bit_length() header_width = usb_descriptor_header_t.bit_length() descriptors = [] interface_descr = None while offset < end: header = data[header_width + offset: offset] header = header._reinterpret_cast(usb_descriptor_header_t) descr_typeId = int(header.bDescriptorType) descr_width = int(header.bLength) * 8 try: d = data[descr_width + offset: offset] except IndexError: raise IndexError("The input data is incomplete, the header suggest additional data", offset, descr_width, end) if descr_typeId == USB_DESCRIPTOR_TYPE.CONFIGURATION: t = usb_descriptor_configuration_t assert descr_width == t.bit_length() d = d._reinterpret_cast(t) interface_descr = None elif descr_typeId == USB_DESCRIPTOR_TYPE.INTERFACE: # :note: interface descriptors are class dependent, # the class can be resolved from first interface descriptor # next interface descriptors may be functional descriptors t = usb_descriptor_interface_t assert d._dtype.bit_length() == t.bit_length(), (d._dtype.bit_length(), t.bit_length()) d = d._reinterpret_cast(t) interface_descr = d elif descr_typeId == USB_DESCRIPTOR_TYPE.ENDPOINT: t = usb_descriptor_endpoint_t assert descr_width == t.bit_length() d = d._reinterpret_cast(t) elif descr_typeId == USB_DESCRIPTOR_TYPE.FUNCTIONAL: d = self.parse_interface_functional_descriptor(interface_descr, d) else: raise NotImplementedError(descr_typeId) descriptors.append(d) offset += descr_width return descriptors
[docs] def get_max_packet_size(self, addr:int, endp: int, direction: USB_ENDPOINT_DIR): ddb: UsbDescriptorBundle = self.descr.get(addr, None) if ddb is None: max_packet_size = 64 else: if endp == 0: d = ddb.get_descriptor(usb_descriptor_device_t, 0)[1] max_packet_size = int(d.body.bMaxPacketSize) else: max_packet_size = None for des in ddb: if des._dtype == usb_descriptor_endpoint_t and \ int(des.body.bEndpointAddress) == endp and \ int(des.body.bEndpointAddressDir) == direction: max_packet_size = int(des.body.wMaxPacketSize) break if max_packet_size is None: raise ValueError("Can not find configuration for endpoint in descriptors", endp, ddb) return max_packet_size
[docs] def receive_bulk(self, addr: int, endp: int, pid_init: USB_PID, size=NOT_SPECIFIED) -> List[int]: max_packet_size = self.get_max_packet_size(addr, endp, USB_ENDPOINT_DIR.IN) pid = pid_init # start recieveing the data yield from self.send(UsbPacketToken(USB_PID.TOKEN_IN, addr, endp)) # can receive data or STALL if the descriptor is not present d_raw = yield from self.receive(NOT_SPECIFIED) if isinstance(d_raw, UsbPacketData): assert d_raw.pid == pid, (d_raw.pid, pid) # descriptor data yield from self.send_ack() if size is not NOT_SPECIFIED: return d_raw.data # could be actually larger in the case when EP0 is not configured yet if len(d_raw.data) >= max_packet_size: # coud be possibly split into multiple packets # if the first chunk was just of pmax_packet_size and this is the size what we are asking for while True: if pid == USB_PID.DATA_0: pid = USB_PID.DATA_1 elif pid == USB_PID.DATA_1: pid = USB_PID.DATA_0 else: raise NotImplementedError(pid) yield from self.send(UsbPacketToken(USB_PID.TOKEN_IN, addr, endp)) descr_part = yield from self.receive(UsbPacketData) assert descr_part.pid == pid, (d_raw.pid, pid) d_raw.data.extend(descr_part.data) yield from self.send_ack() if len(descr_part.data) < max_packet_size: break return d_raw.data elif isinstance(d_raw, UsbPacketHandshake): # packet which means some error if d_raw.pid == USB_PID.HS_STALL: raise UsbNoSuchDescriptor() elif d_raw.pid == USB_PID.HS_NACK: return None else: raise NotImplementedError(d_raw.pid) else: raise NotImplementedError(d_raw)
[docs] def transmit_bulk(self, addr: int, endp: int, pid_init: USB_PID, data_bytes: List[int]): max_packet_size = self.get_max_packet_size(addr, endp, USB_ENDPOINT_DIR.OUT) pid = pid_init # start sending the data begin = 0 end = len(data_bytes) while True: yield from self.send(UsbPacketToken(USB_PID.TOKEN_OUT, addr, endp)) _end = min(begin + max_packet_size, end) p = UsbPacketData(pid, data_bytes[begin:_end]) yield from self.send(p) yield from self.wait_on_ack() begin = _end if pid == USB_PID.DATA_0: pid = USB_PID.DATA_1 elif pid == USB_PID.DATA_1: pid = USB_PID.DATA_0 else: raise ValueError(pid) if len(p.data) < max_packet_size: break
[docs] def control_read(self, addr, bmRequestType_type:USB_REQUEST_TYPE_TYPE, bRequest:int, wValue:int, wIndex:int, wLength:int, bmRequestType_recipient:USB_REQUEST_TYPE_RECIPIENT=USB_REQUEST_TYPE_RECIPIENT.DEVICE, bmRequestType_data_transfer_direction:USB_REQUEST_TYPE_DIRECTION=USB_REQUEST_TYPE_DIRECTION.DEV_TO_HOST, ): dev_req = make_usb_device_request( bmRequestType_recipient=bmRequestType_recipient, bmRequestType_type=bmRequestType_type, bmRequestType_data_transfer_direction=bmRequestType_data_transfer_direction, bRequest=bRequest, wValue=wValue, wIndex=wIndex, wLength=wLength) # read the device descriptor # SETUP STAGE, send request for descriptor downloading yield from self.send(UsbPacketToken(USB_PID.TOKEN_SETUP, addr, 0)) yield from self.send(UsbPacketData(USB_PID.DATA_0, dev_req)) yield from self.wait_on_ack() # DATA stage data = yield from self.receive_bulk(addr, 0, USB_PID.DATA_1) # STATUS stage yield from self.transmit_bulk(addr, 0, USB_PID.DATA_1, []) return data
[docs] def control_write(self, addr:int, ep:int, bmRequestType_type:USB_REQUEST_TYPE_TYPE, bRequest:int, wValue:int, wIndex:int, buff:List[int], bmRequestType_recipient:USB_REQUEST_TYPE_RECIPIENT=USB_REQUEST_TYPE_RECIPIENT.DEVICE, bmRequestType_data_transfer_direction:USB_REQUEST_TYPE_DIRECTION=USB_REQUEST_TYPE_DIRECTION.HOST_TO_DEV, ): p = UsbPacketToken(USB_PID.TOKEN_SETUP, addr, ep) yield from self.send(p) dev_req = make_usb_device_request( bmRequestType_recipient=bmRequestType_recipient, bmRequestType_type=bmRequestType_type, bmRequestType_data_transfer_direction=bmRequestType_data_transfer_direction, bRequest=bRequest, wValue=wValue, wIndex=wIndex, wLength=len(buff)) yield from self.send(UsbPacketData(USB_PID.DATA_0, dev_req)) yield from self.wait_on_ack() if buff: yield from self.transmit_bulk(addr, 0, USB_PID.DATA_1, buff) else: # no data present skiping write pass # STATUS stage data = yield from self.receive_bulk(addr, 0, USB_PID.DATA_1) assert not data, data
[docs] def download_descriptor(self, addr: int, descriptor_t: Union[HStruct, str], index: int, wIndex:int=0, wLength: Optional[int]=NOT_SPECIFIED): dev_req_get_descr = make_usb_device_request_get_descr( descriptor_t, index, wIndex=wIndex, wLength=wLength) # read the device descriptor # SETUP STAGE, send request for descriptor downloading yield from self.send(UsbPacketToken(USB_PID.TOKEN_SETUP, addr, 0)) yield from self.send(UsbPacketData(USB_PID.DATA_0, dev_req_get_descr)) yield from self.wait_on_ack() # DATA stage descr = yield from self.receive_bulk(addr, 0, USB_PID.DATA_1) # assert len(descr) == int(dev_req_get_descr.wLength), (descriptor_t, wIndex, len(descr), int(dev_req_get_descr.wLength)) if wLength is NOT_SPECIFIED: if descriptor_t is str: char_cnt = (len(descr) - usb_descriptor_header_t.bit_length() // 8) // 2 if index == 0: descriptor_t = usb_define_descriptor_string0(char_cnt) else: descriptor_t = usb_define_descriptor_string(char_cnt) descr = UsbPacketData(USB_PID.DATA_1, descr) descr = descr.unpack(descriptor_t) else: assert len(descr) == int(dev_req_get_descr.wLength), (len(descr), int(dev_req_get_descr.wLength)) assert descriptor_t is usb_descriptor_configuration_t, descriptor_t descr = self.parse_configuration_descriptor_bundle(descr) # STATUS stage yield from self.send(UsbPacketToken(USB_PID.TOKEN_OUT, addr, 0)) yield from self.send(UsbPacketData(USB_PID.DATA_1, [])) yield from self.wait_on_ack() return descr
[docs] def proc(self): new_addr = len(self.descr) + 1 # init device address yield from self.control_write(0, 0, bmRequestType_type=USB_REQUEST_TYPE_TYPE.STANDARD, bRequest=USB_REQUEST.SET_ADDRESS, wValue=new_addr, wIndex=0, buff=[]) # :note: device address now set, starting download of descriptors dev_descr = yield from self.download_descriptor(new_addr, usb_descriptor_device_t, 0) ddb = self.descr[new_addr] = UsbDescriptorBundle() ddb.append(dev_descr) bNumConfigurations = int(dev_descr.body.bNumConfigurations) assert bNumConfigurations > 0, "Device must have some configuration descriptors" for i in range(bNumConfigurations): # first we have to resolve wTotalLength (the total lenght of configuration bundle) conf_descr = yield from self.download_descriptor(new_addr, usb_descriptor_configuration_t, i) size = int(conf_descr.body.wTotalLength) # now we download all descriptors in configuration bundle conf_descr_bundle = yield from self.download_descriptor( new_addr, usb_descriptor_configuration_t, i, wLength=size) real_size = sum(c._dtype.bit_length() for c in conf_descr_bundle) // 8 assert real_size == size, (real_size, size, conf_descr_bundle) ddb.extend(conf_descr_bundle) while True: try: yield from self.download_descriptor( new_addr, usb_descriptor_device_qualifier_t, 0 ) except UsbNoSuchDescriptor: break raise NotImplementedError("usb_descriptor_device_qualifier") # now read all string descriptors str_descr0 = None for i in range(0, 255): try: str_descr = yield from self.download_descriptor( new_addr, str, i, wIndex=0 if i == 0 else int(str_descr0.body[0]) ) except UsbNoSuchDescriptor: if i == 0: raise UsbNoSuchDescriptor("Need at least string descriptor 0 with language code") else: # other are not required break if i == 0: str_descr0 = str_descr ddb.append(str_descr) self._descriptors_downloaded = True