Source code for hwtLib.peripheral.usb.sim.usbip.device

from asyncio.tasks import sleep
from collections import deque
from typing import Tuple, Callable, Union

from hwt.synthesizer.rtlLevel.constants import NOT_SPECIFIED
from hwtLib.peripheral.usb.constants import USB_VER, USB_PID
from hwtLib.peripheral.usb.descriptors.bundle import UsbDescriptorBundle, \
    UsbNoSuchDescriptor
from hwtLib.peripheral.usb.descriptors.std import usb_descriptor_configuration_t, \
    usb_descriptor_interface_t, USB_ENDPOINT_ATTRIBUTES_TRANSFER_TYPE, \
    USB_ENDPOINT_DIR
from hwtLib.peripheral.usb.device_request import \
    USB_REQUEST_TYPE_RECIPIENT, USB_REQUEST_TYPE_DIRECTION, \
    USB_REQUEST_TYPE_TYPE
from hwtSimApi.process_utils import CallbackLoop, ExitCallbackLoop
from hwtSimApi.simCalendar import DONE


[docs]class USBIPPending:
[docs] def __init__(self, seqnum, device, xfer): self.seqnum = seqnum self.device = device self.xfer = xfer
[docs]class USBIPOperationPromise():
[docs] def __init__(self, operation_process, sim, clk): self.data = NOT_SPECIFIED self.operation_process = operation_process(self) self.sim = sim self.cb = CallbackLoop( sim, clk, self._pool_process, self._continue_with_clk_triggering)
[docs] def _continue_with_clk_triggering(self): if self.data is not NOT_SPECIFIED: raise ExitCallbackLoop() return True
[docs] def schedule(self): sim = self.sim # [todo] potentialy not thread safe # print("Try to schedule") # next_time = 0 # sim._events._list[1] cb = self.cb() with sim.scheduler_lock: while sim._current_time_slot is None or sim._current_time_slot.timeslot_end is not DONE: pass # sim._current_event_list.append(cb) next_time = sim._events._list[0] + 2 sim._schedule_proc(next_time, cb)
# print("schedule done")
[docs] def _pool_process(self): try: next(self.operation_process) except StopIteration: assert self.data is not NOT_SPECIFIED
[docs] def fulfill(self, data): self.data = data
[docs] async def pool_sleep_until_fulfil(self, t): while self.data is NOT_SPECIFIED: await sleep(t) return self.data
[docs]class USBIPSimDevice(): """ :attention: API of this component should be same as the device from library usb1 if possible because we want to just swap the device object in sim and test the real device as well """
[docs] def __init__(self, usb_agent, addr:int, descriptors:UsbDescriptorBundle): self.usb_agent = usb_agent self.addr = addr self.endp_next_pid = {} self.descriptors = descriptors
[docs] def getBusNumber(self): return 0
[docs] def getDeviceAddress(self): return self.addr
[docs] def getDeviceSpeed(self): v = int(self.descriptors[0].body.bcdUSB) return USB_VER.from_uint16_t(v)
[docs] def getVendorID(self): return int(self.descriptors[0].body.idVendor)
[docs] def getProductID(self): return int(self.descriptors[0].body.idProduct)
[docs] def getbcdDevice(self): return int(self.descriptors[0].body.bcdDevice)
[docs] def getDeviceClass(self): return int(self.descriptors[0].body.bDeviceClass)
[docs] def getDeviceSubClass(self): return int(self.descriptors[0].body.bDeviceSubClass)
[docs] def getDeviceProtocol(self): return int(self.descriptors[0].body.bDeviceProtocol)
[docs] def getNumConfigurations(self): return int(self.descriptors[0].body.bNumConfigurations)
[docs] def iterConfigurations(self): descr_i = 0 while True: try: d = self.descriptors.get_descriptor(usb_descriptor_configuration_t, descr_i) except UsbNoSuchDescriptor: break yield USBIPSimDeviceConfiguration(self, d[1]) descr_i += 1
[docs] def open(self): return self
[docs] def close(self): pass
[docs] def getDevice(self): return self
[docs] async def controlRead(self, bmRequestType_recipient:USB_REQUEST_TYPE_RECIPIENT, bmRequestType_type:USB_REQUEST_TYPE_TYPE, bmRequestType_data_transfer_direction:USB_REQUEST_TYPE_DIRECTION, bRequest, wValue, wIndex, wLength): def execute_control_read(read_op): # print("controlRead", self.addr, bmRequestType_recipient, bmRequestType_type, bmRequestType_data_transfer_direction, bRequest, wValue, wIndex, wLength) read_data = yield from self.usb_agent.usb_driver.control_read( self.addr, bmRequestType_type, bRequest, wValue, wIndex, wLength, bmRequestType_recipient=bmRequestType_recipient, bmRequestType_data_transfer_direction=bmRequestType_data_transfer_direction) # print("controlRead-done", read_data) read_op.fulfill(read_data) read_op = USBIPOperationPromise(execute_control_read, self.usb_agent.sim, self.usb_agent.clk) read_op.schedule() d = await read_op.pool_sleep_until_fulfil(0.01) return bytes(d)
[docs] async def controlWrite(self, bmRequestType_recipient:USB_REQUEST_TYPE_RECIPIENT, bmRequestType_type:USB_REQUEST_TYPE_TYPE, bmRequestType_data_transfer_direction:USB_REQUEST_TYPE_DIRECTION, bRequest: int, wValue: int, wIndex: int, data): """ :return: The number of bytes actually sent. """ def execute_control_write(write_op): # print("controlWrite", self.addr, bRequestType, bRequest, wValue, wIndex, data) yield from self.usb_agent.usb_driver.control_write( self.addr, 0, bmRequestType_type, bRequest, wValue, wIndex, data, bmRequestType_recipient=bmRequestType_recipient, bmRequestType_data_transfer_direction=bmRequestType_data_transfer_direction) write_op.fulfill(True) write_op = USBIPOperationPromise(execute_control_write, self.usb_agent.sim, self.usb_agent.clk) write_op.schedule() await write_op.pool_sleep_until_fulfil(0.01) return len(data)
[docs] def getTransfer(self): return USBIPTransfer(self)
[docs]class LIBUSB_TRANSFER_STATUS: # Transfer completed without error. Note that this does not indicate # that the entire amount of requested data was transferred. TRANSFER_COMPLETED = 0 # Transfer failed TRANSFER_ERROR = 1 # Transfer timed out TRANSFER_TIMED_OUT = 2 # Transfer was cancelled TRANSFER_CANCELLED = 3 # For bulk/interrupt endpoints: halt condition detected (endpoint # stalled). For control endpoints: control request not supported. TRANSFER_STALL = 4 # Device was disconnected TRANSFER_NO_DEVICE = 5 # Device sent more data than requested TRANSFER_OVERFLOW = 6
[docs]class USBIPTransfer():
[docs] def __init__(self, dev: USBIPSimDevice): self.dev = dev self.is_in = None self.transfer_t = None self.ep = None self.buffer = None self.len = None self.callback = None
[docs] def setBulk(self, ep:int, buff_or_len:Union[int, bytes], callback:Callable[['USBIPTransfer'], None]): self.is_in = ep >= 0x80 self.ep = ep & ~0x80 self.transfer_t = USB_ENDPOINT_ATTRIBUTES_TRANSFER_TYPE.BULK if self.is_in: assert isinstance(buff_or_len, int), buff_or_len self.len = buff_or_len else: assert isinstance(buff_or_len, (list, tuple, bytes, deque)) self.buffer = buff_or_len self.callback = callback
[docs] def _next_bulk_pid(self, pid): if pid == USB_PID.DATA_0: return USB_PID.DATA_1 elif pid == USB_PID.DATA_1: return USB_PID.DATA_0 else: raise NotImplementedError(pid)
[docs] async def submit(self): if self.transfer_t == USB_ENDPOINT_ATTRIBUTES_TRANSFER_TYPE.BULK: if self.is_in: def execute_bulk(read_op): # print("IN", self.dev.addr, self.ep) epk = (self.ep, USB_ENDPOINT_DIR.IN) pid = self.dev.endp_next_pid.setdefault(epk, USB_PID.DATA_0) read_data = yield from self.dev.usb_agent.usb_driver.receive_bulk( self.dev.addr, self.ep, pid, 512) # print("IN-done", read_data) if read_data is None: read_data = [] # nack, do not update data pid else: self.dev.endp_next_pid[epk] = self._next_bulk_pid(pid) read_op.fulfill(read_data) else: def execute_bulk(read_op): # print("OUT", self.dev.addr, self.ep, self.buffer) epk = (self.ep, USB_ENDPOINT_DIR.OUT) pid = self.dev.endp_next_pid.setdefault(epk, USB_PID.DATA_0) yield from self.dev.usb_agent.usb_driver.transmit_bulk( self.dev.addr, self.ep, pid, self.buffer) # print("OUT-done") self.dev.endp_next_pid[epk] = self._next_bulk_pid(pid) read_op.fulfill(None) op = USBIPOperationPromise(execute_bulk, self.dev.usb_agent.sim, self.dev.usb_agent.clk) op.schedule() buffer = await op.pool_sleep_until_fulfil(0.01) if self.is_in and buffer is not None: self.buffer = list(buffer) self.callback(self) else: raise NotImplementedError()
[docs] def getBuffer(self): return self.buffer
[docs] def getActualLength(self): return len(self.buffer)
[docs] def getStatus(self): return LIBUSB_TRANSFER_STATUS.TRANSFER_COMPLETED
[docs]class USBIPDevice:
[docs] def __init__(self, devid: Tuple[int, int], hnd: USBIPSimDevice): self.devid = devid self.hnd = hnd
[docs] def packDevid(self): # dev.getBusNumber() << 16 | dev.getDeviceAddress() did = self.devid return (did[0] << 16 | did[1])
class USBIPSimDeviceConfiguration(): def __init__(self, dev: USBIPSimDevice, config_descr:usb_descriptor_configuration_t): self.dev = dev self.config_descr = config_descr
[docs] def getConfigurationValue(self): return 1
[docs] def getNumInterfaces(self): return int(self.config_descr.body.bNumInterfaces)
[docs] def iterInterfaces(self): descr_i = 0 while True: try: d = self.dev.descriptors.get_descriptor(usb_descriptor_interface_t, descr_i) except UsbNoSuchDescriptor: break yield USBIPSimDeviceInterface(self, d[1]) descr_i += 1
class USBIPSimDeviceInterface(): def __init__(self, conf: USBIPSimDeviceConfiguration, descr: usb_descriptor_interface_t): self.conf = conf self.descr = descr
[docs] def iterSettings(self): yield USBIPSimDeviceInterfaceSetting(self)
[docs]class USBIPSimDeviceInterfaceSetting():
[docs] def __init__(self, intf: USBIPSimDeviceInterface): self.intf = intf
[docs] def getClass(self): return int(self.intf.descr.body.bInterfaceClass)
[docs] def getSubClass(self): return int(self.intf.descr.body.bInterfaceSubClass)
[docs] def getProtocol(self): return int(self.intf.descr.body.bInterfaceProtocol)