hwtLib.peripheral.usb.sim.usbip package

Submodules

hwtLib.peripheral.usb.sim.usbip.connection module

class hwtLib.peripheral.usb.sim.usbip.connection.USBIPConnection(server: UsbipServer, reader: StreamReader, writer: StreamWriter)[source]

Bases: object

Server container of informations about USBIP client connection

OP_COMMON = '>HIIII'
OP_COMMON_SIZE = 18
OP_SUBMIT = '>IiIIi8s'
OP_SUBMIT_SIZE = 28
__init__(server: UsbipServer, reader: StreamReader, writer: StreamWriter)[source]
async connection()[source]
debug_log(t)[source]
async getDeviceList()[source]

usbctx like function

async handle_op_devlist()[source]
async handle_op_import(busid)[source]
async handle_packet()[source]

Handle a USBIP packet.

async handle_urb_submit(seqnum: int, dev: USBIPDevice, direction, ep)[source]
classmethod make_usbip_header_basic(command: int, seqnum: int, devid: int, direction: int, ep: int)[source]
pack_device_desc(dev: USBIPSimDevice, interfaces=True)[source]

Pack a device descriptors into bytes for stransport to client

classmethod usbip_ret_submit(seqnum, devid, direction, ep, status: int, actual_length: int, error_count: int, transfer_buffer: bytes, iso_start_frame=0, iso_number_of_packets=4294967295, iso_packet_descriptor: bytes = b'')[source]
exception hwtLib.peripheral.usb.sim.usbip.connection.USBIPProtocolErrorException(message)[source]

Bases: Exception

__init__(message)[source]

hwtLib.peripheral.usb.sim.usbip.constants module

hwtLib.peripheral.usb.sim.usbip.device module

class hwtLib.peripheral.usb.sim.usbip.device.LIBUSB_TRANSFER_STATUS[source]

Bases: object

TRANSFER_CANCELLED = 3
TRANSFER_COMPLETED = 0
TRANSFER_ERROR = 1
TRANSFER_NO_DEVICE = 5
TRANSFER_OVERFLOW = 6
TRANSFER_STALL = 4
TRANSFER_TIMED_OUT = 2
class hwtLib.peripheral.usb.sim.usbip.device.USBIPDevice(devid: Tuple[int, int], hnd: USBIPSimDevice)[source]

Bases: object

__init__(devid: Tuple[int, int], hnd: USBIPSimDevice)[source]
packDevid()[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPOperationPromise(operation_process, sim, clk)[source]

Bases: object

__init__(operation_process, sim, clk)[source]
_continue_with_clk_triggering()[source]
_pool_process()[source]
fulfill(data)[source]
async pool_sleep_until_fulfil(t)[source]
schedule()[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPPending(seqnum, device, xfer)[source]

Bases: object

__init__(seqnum, device, xfer)[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDevice(usb_agent, addr: int, descriptors: UsbDescriptorBundle)[source]

Bases: object

Attention:

API of this component should be same as the device from library usb1 if possible because we want to just swap the device object in sim and test the real device as well

__init__(usb_agent, addr: int, descriptors: UsbDescriptorBundle)[source]
close()[source]
async controlRead(bmRequestType_recipient: USB_REQUEST_TYPE_RECIPIENT, bmRequestType_type: USB_REQUEST_TYPE_TYPE, bmRequestType_data_transfer_direction: USB_REQUEST_TYPE_DIRECTION, bRequest, wValue, wIndex, wLength)[source]
async controlWrite(bmRequestType_recipient: USB_REQUEST_TYPE_RECIPIENT, bmRequestType_type: USB_REQUEST_TYPE_TYPE, bmRequestType_data_transfer_direction: USB_REQUEST_TYPE_DIRECTION, bRequest: int, wValue: int, wIndex: int, data)[source]
Returns:

The number of bytes actually sent.

getBusNumber()[source]
getDevice()[source]
getDeviceAddress()[source]
getDeviceClass()[source]
getDeviceProtocol()[source]
getDeviceSpeed()[source]
getDeviceSubClass()[source]
getNumConfigurations()[source]
getProductID()[source]
getTransfer()[source]
getVendorID()[source]
getbcdDevice()[source]
iterConfigurations()[source]
open()[source]
USBIPSimDeviceConfiguration(dev: ~hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDevice, config_descr: struct usb_descriptor_configuration_t {
struct usb_descriptor_header_t {
<HBits, 8bits, unsigned> bLength
<HBits, 8bits, unsigned> bDescriptorType
} header
struct usb_descriptor_configuration_body_t {
<HBits, 16bits, unsigned> wTotalLength
<HBits, 8bits, unsigned> bNumInterfaces
<HBits, 8bits, unsigned> bConfigurationValue
<HBits, 8bits, unsigned> iConfiguration
struct usb_configuration_body_bmAttributes_t {
<HBits, 5bits> reserved0
<HBits, 1bit> remoteWakeup
<HBits, 1bit> selfPowered
<HBits, 1bit> reserved1
} bmAttributes
<HBits, 8bits, unsigned> bMaxPower
} body
})

Bases: object

USBIPSimDeviceConfiguration.__init__(dev: ~hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDevice, config_descr: struct usb_descriptor_configuration_t {
struct usb_descriptor_header_t {
<HBits, 8bits, unsigned> bLength
<HBits, 8bits, unsigned> bDescriptorType
} header
struct usb_descriptor_configuration_body_t {
<HBits, 16bits, unsigned> wTotalLength
<HBits, 8bits, unsigned> bNumInterfaces
<HBits, 8bits, unsigned> bConfigurationValue
<HBits, 8bits, unsigned> iConfiguration
struct usb_configuration_body_bmAttributes_t {
<HBits, 5bits> reserved0
<HBits, 1bit> remoteWakeup
<HBits, 1bit> selfPowered
<HBits, 1bit> reserved1
} bmAttributes
<HBits, 8bits, unsigned> bMaxPower
} body
})
USBIPSimDeviceConfiguration.getConfigurationValue()[source]
USBIPSimDeviceConfiguration.getNumInterfaces()[source]
USBIPSimDeviceConfiguration.iterInterfaces()[source]
USBIPSimDeviceInterface(conf: ~hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDeviceConfiguration, descr: struct usb_descriptor_interface_t {
struct usb_descriptor_header_t {
<HBits, 8bits, unsigned> bLength
<HBits, 8bits, unsigned> bDescriptorType
} header
struct usb_descriptor_interface_body_t {
<HBits, 8bits, unsigned> bInterfaceNumber
<HBits, 8bits, unsigned> bAlternateSetting
<HBits, 8bits, unsigned> bNumEndpoints
<HBits, 8bits, unsigned> bInterfaceClass
<HBits, 8bits, unsigned> bInterfaceSubClass
<HBits, 8bits, unsigned> bInterfaceProtocol
<HBits, 8bits, unsigned> iInterface
} body
})

Bases: object

USBIPSimDeviceInterface.__init__(conf: ~hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDeviceConfiguration, descr: struct usb_descriptor_interface_t {
struct usb_descriptor_header_t {
<HBits, 8bits, unsigned> bLength
<HBits, 8bits, unsigned> bDescriptorType
} header
struct usb_descriptor_interface_body_t {
<HBits, 8bits, unsigned> bInterfaceNumber
<HBits, 8bits, unsigned> bAlternateSetting
<HBits, 8bits, unsigned> bNumEndpoints
<HBits, 8bits, unsigned> bInterfaceClass
<HBits, 8bits, unsigned> bInterfaceSubClass
<HBits, 8bits, unsigned> bInterfaceProtocol
<HBits, 8bits, unsigned> iInterface
} body
})
USBIPSimDeviceInterface.iterSettings()[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDeviceInterfaceSetting(hwIO: USBIPSimDeviceInterface)[source]

Bases: object

__init__(hwIO: USBIPSimDeviceInterface)[source]
getClass()[source]
getProtocol()[source]
getSubClass()[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPTransfer(dev: USBIPSimDevice)[source]

Bases: object

__init__(dev: USBIPSimDevice)[source]
_next_bulk_pid(pid)[source]
getActualLength()[source]
getBuffer()[source]
getStatus()[source]
setBulk(ep: int, buff_or_len: int | bytes, callback: Callable[[USBIPTransfer], None])[source]
async submit()[source]

hwtLib.peripheral.usb.sim.usbip.server module

This server allows to connect virtual USB devices in the simulator directly to any machine which supports USBIP.

Based on https://github.com/jwise/pyusbip/blob/master/pyusbip.py https://www.kernel.org/doc/html/latest/usb/usbip_protocol.html https://forums.aws.amazon.com/thread.jspa?messageID=968176

note:

usbip is a part of linux-tools-common apt package but in /usr/bin/ is only a placeholder the /usr/lib/linux-tools-uname -r/usbipd

https://developer.ridgerun.com/wiki/index.php?title=How_to_setup_and_use_USB/IP

note:

on some machines /usr/bin/usbip does not work correctly because the version of a kernel slightly differs from version of linux tools /usr/lib/linux-tools-5.8.0-25/usbip /usr/lib/linux-tools-5.9.0-050900-generic/usbip

modprobe vhci_hcd
usbip list -r 127.0.0.1 -l
usbip attach -r 127.0.0.1 -b 0-1

usbip port -r 127.0.0.1
usbip detach -p 00
class hwtLib.peripheral.usb.sim.usbip.server.UsbipServer(usb_ag: UtmiUsbAgent, host='127.0.0.1', port=3240, debug=False)[source]

Bases: object

__init__(usb_ag: UtmiUsbAgent, host='127.0.0.1', port=3240, debug=False)[source]
_start_server(loop)[source]
handle_loop_exception(loop, context)[source]
install_session_recorder(session_recoder: UsbipServerSessionRecorder)[source]
async on_usbip_connection(reader: StreamReader, writer: StreamWriter)[source]
run()[source]
terminate()[source]

hwtLib.peripheral.usb.sim.usbip.session_recorder module

class hwtLib.peripheral.usb.sim.usbip.session_recorder.UsbipServerReplayer(usb_ag: UtmiUsbAgent, session_data, debug=False)[source]

Bases: UsbipServer

class Reader(sim: HdlSimulator, data: Deque[Tuple[int, str, List[int]]])[source]

Bases: object

__init__(sim: HdlSimulator, data: Deque[Tuple[int, str, List[int]]])[source]
async readexactly(size) bytes[source]
class Writer(sim: HdlSimulator, ref_data: Deque[List[int]])[source]

Bases: object

__init__(sim: HdlSimulator, ref_data: Deque[List[int]])[source]
close()[source]
async drain()[source]
get_extra_info(name)[source]
write(data: bytes)[source]
__annotations__ = {}
__init__(usb_ag: UtmiUsbAgent, session_data, debug=False)[source]
_start_server(loop)[source]
async on_usbip_connection(reader: StreamReader, writer: StreamWriter)[source]
class hwtLib.peripheral.usb.sim.usbip.session_recorder.UsbipServerSessionRecorder(sim: HdlSimulator)[source]

Bases: object

Used to record the client-server communication in hwtLib.peripheral.usb.sim.usbip.server.UsbipServer for later replay.

class Reader(sim: HdlSimulator, original: StreamReader, dst: List[List[int]])[source]

Bases: object

__init__(sim: HdlSimulator, original: StreamReader, dst: List[List[int]])[source]
async readexactly(size)[source]
class Writer(sim: HdlSimulator, original: StreamWriter, dst: List[List[int]])[source]

Bases: object

__init__(sim: HdlSimulator, original: StreamWriter, dst: List[List[int]])[source]
close()[source]
async drain()[source]
get_extra_info(name)[source]
write(data)[source]
__init__(sim: HdlSimulator)[source]
apply(reader: StreamReader, writer: StreamWriter)[source]
hwtLib.peripheral.usb.sim.usbip.session_recorder.cut_off_empty_time_segments(data: List[Tuple[int, str, List[int]]], time_segments: List[Tuple[int, int]]) List[Tuple[int, str, List[int]]][source]
Note:

if time segments contains some transaction the assertion error is raised

Note:

if segment (0,10) is cut off from times 0, 10, 11 it becomes 0, 1, 2

Parameters:

time_segments – list of ranges of times to cut off (the time after is shifted)

Returns:

filtered list of data from input

hwtLib.peripheral.usb.sim.usbip.session_recorder.filter_empty_in(_data: List[Tuple[int, str, List[int]]], seqnum_to_rm: Set[int]) List[Tuple[int, str, List[int]]][source]

Remove empty IN transfers from endpoints other than 0 and remove transactions by seqnum

Parameters:
  • _data – list of tuples time, r/w, list of bytes

  • seqnum_to_rm – set of seqnum to remove (all parts of the transaction with that specified seqnum are removed entirely)

  • cut_off_segments – list of ranges of times to cut off (the time after is shifted)

Returns:

filtered list of data from input