hwtLib.peripheral.usb.sim.usbip package

Submodules

hwtLib.peripheral.usb.sim.usbip.connection module

class hwtLib.peripheral.usb.sim.usbip.connection.USBIPConnection(server: UsbipServer, reader: StreamReader, writer: StreamWriter)[source]

Bases: object

Server container of informations about USBIP client connection

OP_COMMON = '>HIIII'
OP_COMMON_SIZE = 18
OP_SUBMIT = '>IiIIi8s'
OP_SUBMIT_SIZE = 28
__init__(server: UsbipServer, reader: StreamReader, writer: StreamWriter)[source]
async connection()[source]
debug_log(t)[source]
async getDeviceList()[source]

usbctx like function

async handle_op_devlist()[source]
async handle_op_import(busid)[source]
async handle_packet()[source]

Handle a USBIP packet.

async handle_urb_submit(seqnum: int, dev: USBIPDevice, direction, ep)[source]
classmethod make_usbip_header_basic(command: int, seqnum: int, devid: int, direction: int, ep: int)[source]
pack_device_desc(dev: USBIPSimDevice, interfaces=True)[source]

Pack a device descriptors into bytes for stransport to client

classmethod usbip_ret_submit(seqnum, devid, direction, ep, status: int, actual_length: int, error_count: int, transfer_buffer: bytes, iso_start_frame=0, iso_number_of_packets=4294967295, iso_packet_descriptor: bytes = b'')[source]
exception hwtLib.peripheral.usb.sim.usbip.connection.USBIPProtocolErrorException(message)[source]

Bases: Exception

__init__(message)[source]

hwtLib.peripheral.usb.sim.usbip.constants module

hwtLib.peripheral.usb.sim.usbip.device module

class hwtLib.peripheral.usb.sim.usbip.device.LIBUSB_TRANSFER_STATUS[source]

Bases: object

TRANSFER_CANCELLED = 3
TRANSFER_COMPLETED = 0
TRANSFER_ERROR = 1
TRANSFER_NO_DEVICE = 5
TRANSFER_OVERFLOW = 6
TRANSFER_STALL = 4
TRANSFER_TIMED_OUT = 2
class hwtLib.peripheral.usb.sim.usbip.device.USBIPDevice(devid: Tuple[int, int], hnd: USBIPSimDevice)[source]

Bases: object

__init__(devid: Tuple[int, int], hnd: USBIPSimDevice)[source]
packDevid()[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPOperationPromise(operation_process, sim, clk)[source]

Bases: object

__init__(operation_process, sim, clk)[source]
_continue_with_clk_triggering()[source]
_pool_process()[source]
fulfill(data)[source]
async pool_sleep_until_fulfil(t)[source]
schedule()[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPPending(seqnum, device, xfer)[source]

Bases: object

__init__(seqnum, device, xfer)[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDevice(usb_agent, addr: int, descriptors: UsbDescriptorBundle)[source]

Bases: object

Attention

API of this component should be same as the device from library usb1 if possible because we want to just swap the device object in sim and test the real device as well

__init__(usb_agent, addr: int, descriptors: UsbDescriptorBundle)[source]
close()[source]
async controlRead(bmRequestType_recipient: USB_REQUEST_TYPE_RECIPIENT, bmRequestType_type: USB_REQUEST_TYPE_TYPE, bmRequestType_data_transfer_direction: USB_REQUEST_TYPE_DIRECTION, bRequest, wValue, wIndex, wLength)[source]
async controlWrite(bmRequestType_recipient: USB_REQUEST_TYPE_RECIPIENT, bmRequestType_type: USB_REQUEST_TYPE_TYPE, bmRequestType_data_transfer_direction: USB_REQUEST_TYPE_DIRECTION, bRequest: int, wValue: int, wIndex: int, data)[source]
Returns

The number of bytes actually sent.

getBusNumber()[source]
getDevice()[source]
getDeviceAddress()[source]
getDeviceClass()[source]
getDeviceProtocol()[source]
getDeviceSpeed()[source]
getDeviceSubClass()[source]
getNumConfigurations()[source]
getProductID()[source]
getTransfer()[source]
getVendorID()[source]
getbcdDevice()[source]
iterConfigurations()[source]
open()[source]
USBIPSimDeviceConfiguration(dev: ~hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDevice, config_descr: struct usb_descriptor_configuration_t {
USBIPSimDeviceConfiguration    struct usb_descriptor_header_t {
USBIPSimDeviceConfiguration        <Bits, 8bits, unsigned> bLength
USBIPSimDeviceConfiguration        <Bits, 8bits, unsigned> bDescriptorType
USBIPSimDeviceConfiguration    } header
USBIPSimDeviceConfiguration    struct usb_descriptor_configuration_body_t {
USBIPSimDeviceConfiguration        <Bits, 16bits, unsigned> wTotalLength
USBIPSimDeviceConfiguration        <Bits, 8bits, unsigned> bNumInterfaces
USBIPSimDeviceConfiguration        <Bits, 8bits, unsigned> bConfigurationValue
USBIPSimDeviceConfiguration        <Bits, 8bits, unsigned> iConfiguration
USBIPSimDeviceConfiguration        struct usb_configuration_body_bmAttributes_t {
USBIPSimDeviceConfiguration            <Bits, 5bits> reserved0
USBIPSimDeviceConfiguration            <Bits, 1bit> remoteWakeup
USBIPSimDeviceConfiguration            <Bits, 1bit> selfPowered
USBIPSimDeviceConfiguration            <Bits, 1bit> reserved1
USBIPSimDeviceConfiguration        } bmAttributes
USBIPSimDeviceConfiguration        <Bits, 8bits, unsigned> bMaxPower
USBIPSimDeviceConfiguration    } body
USBIPSimDeviceConfiguration})

Bases: object

USBIPSimDeviceConfiguration.__init__(dev: ~hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDevice, config_descr: struct usb_descriptor_configuration_t {
USBIPSimDeviceConfiguration.__init__    struct usb_descriptor_header_t {
USBIPSimDeviceConfiguration.__init__        <Bits, 8bits, unsigned> bLength
USBIPSimDeviceConfiguration.__init__        <Bits, 8bits, unsigned> bDescriptorType
USBIPSimDeviceConfiguration.__init__    } header
USBIPSimDeviceConfiguration.__init__    struct usb_descriptor_configuration_body_t {
USBIPSimDeviceConfiguration.__init__        <Bits, 16bits, unsigned> wTotalLength
USBIPSimDeviceConfiguration.__init__        <Bits, 8bits, unsigned> bNumInterfaces
USBIPSimDeviceConfiguration.__init__        <Bits, 8bits, unsigned> bConfigurationValue
USBIPSimDeviceConfiguration.__init__        <Bits, 8bits, unsigned> iConfiguration
USBIPSimDeviceConfiguration.__init__        struct usb_configuration_body_bmAttributes_t {
USBIPSimDeviceConfiguration.__init__            <Bits, 5bits> reserved0
USBIPSimDeviceConfiguration.__init__            <Bits, 1bit> remoteWakeup
USBIPSimDeviceConfiguration.__init__            <Bits, 1bit> selfPowered
USBIPSimDeviceConfiguration.__init__            <Bits, 1bit> reserved1
USBIPSimDeviceConfiguration.__init__        } bmAttributes
USBIPSimDeviceConfiguration.__init__        <Bits, 8bits, unsigned> bMaxPower
USBIPSimDeviceConfiguration.__init__    } body
USBIPSimDeviceConfiguration.__init__})
USBIPSimDeviceConfiguration.getConfigurationValue()[source]
USBIPSimDeviceConfiguration.getNumInterfaces()[source]
USBIPSimDeviceConfiguration.iterInterfaces()[source]
USBIPSimDeviceInterface(conf: ~hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDeviceConfiguration, descr: struct usb_descriptor_interface_t {
USBIPSimDeviceInterface    struct usb_descriptor_header_t {
USBIPSimDeviceInterface        <Bits, 8bits, unsigned> bLength
USBIPSimDeviceInterface        <Bits, 8bits, unsigned> bDescriptorType
USBIPSimDeviceInterface    } header
USBIPSimDeviceInterface    struct usb_descriptor_interface_body_t {
USBIPSimDeviceInterface        <Bits, 8bits, unsigned> bInterfaceNumber
USBIPSimDeviceInterface        <Bits, 8bits, unsigned> bAlternateSetting
USBIPSimDeviceInterface        <Bits, 8bits, unsigned> bNumEndpoints
USBIPSimDeviceInterface        <Bits, 8bits, unsigned> bInterfaceClass
USBIPSimDeviceInterface        <Bits, 8bits, unsigned> bInterfaceSubClass
USBIPSimDeviceInterface        <Bits, 8bits, unsigned> bInterfaceProtocol
USBIPSimDeviceInterface        <Bits, 8bits, unsigned> iInterface
USBIPSimDeviceInterface    } body
USBIPSimDeviceInterface})

Bases: object

USBIPSimDeviceInterface.__init__(conf: ~hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDeviceConfiguration, descr: struct usb_descriptor_interface_t {
USBIPSimDeviceInterface.__init__    struct usb_descriptor_header_t {
USBIPSimDeviceInterface.__init__        <Bits, 8bits, unsigned> bLength
USBIPSimDeviceInterface.__init__        <Bits, 8bits, unsigned> bDescriptorType
USBIPSimDeviceInterface.__init__    } header
USBIPSimDeviceInterface.__init__    struct usb_descriptor_interface_body_t {
USBIPSimDeviceInterface.__init__        <Bits, 8bits, unsigned> bInterfaceNumber
USBIPSimDeviceInterface.__init__        <Bits, 8bits, unsigned> bAlternateSetting
USBIPSimDeviceInterface.__init__        <Bits, 8bits, unsigned> bNumEndpoints
USBIPSimDeviceInterface.__init__        <Bits, 8bits, unsigned> bInterfaceClass
USBIPSimDeviceInterface.__init__        <Bits, 8bits, unsigned> bInterfaceSubClass
USBIPSimDeviceInterface.__init__        <Bits, 8bits, unsigned> bInterfaceProtocol
USBIPSimDeviceInterface.__init__        <Bits, 8bits, unsigned> iInterface
USBIPSimDeviceInterface.__init__    } body
USBIPSimDeviceInterface.__init__})
USBIPSimDeviceInterface.iterSettings()[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPSimDeviceInterfaceSetting(intf: USBIPSimDeviceInterface)[source]

Bases: object

__init__(intf: USBIPSimDeviceInterface)[source]
getClass()[source]
getProtocol()[source]
getSubClass()[source]
class hwtLib.peripheral.usb.sim.usbip.device.USBIPTransfer(dev: USBIPSimDevice)[source]

Bases: object

__init__(dev: USBIPSimDevice)[source]
_next_bulk_pid(pid)[source]
getActualLength()[source]
getBuffer()[source]
getStatus()[source]
setBulk(ep: int, buff_or_len: Union[int, bytes], callback: Callable[[USBIPTransfer], None])[source]
async submit()[source]

hwtLib.peripheral.usb.sim.usbip.server module

This server allows to connect virtual USB devices in the simulator directly to any machine which supports USBIP.

Based on https://github.com/jwise/pyusbip/blob/master/pyusbip.py https://www.kernel.org/doc/html/latest/usb/usbip_protocol.html https://forums.aws.amazon.com/thread.jspa?messageID=968176

note

usbip is a part of linux-tools-common apt package but in /usr/bin/ is only a placeholder the /usr/lib/linux-tools-uname -r/usbipd

https://developer.ridgerun.com/wiki/index.php?title=How_to_setup_and_use_USB/IP

note

on some machines /usr/bin/usbip does not work correctly because the version of a kernel slightly differs from version of linux tools /usr/lib/linux-tools-5.8.0-25/usbip /usr/lib/linux-tools-5.9.0-050900-generic/usbip

modprobe vhci_hcd
usbip list -r 127.0.0.1 -l
usbip attach -r 127.0.0.1 -b 0-1

usbip port -r 127.0.0.1
usbip detach -p 00
class hwtLib.peripheral.usb.sim.usbip.server.UsbipServer(usb_ag: UtmiUsbAgent, host='127.0.0.1', port=3240, debug=False)[source]

Bases: object

__init__(usb_ag: UtmiUsbAgent, host='127.0.0.1', port=3240, debug=False)[source]
_start_server(loop)[source]
handle_loop_exception(loop, context)[source]
install_session_recorder(session_recoder: UsbipServerSessionRecorder)[source]
async on_usbip_connection(reader: StreamReader, writer: StreamWriter)[source]
run()[source]
terminate()[source]

hwtLib.peripheral.usb.sim.usbip.session_recorder module

class hwtLib.peripheral.usb.sim.usbip.session_recorder.UsbipServerReplayer(usb_ag: UtmiUsbAgent, session_data, debug=False)[source]

Bases: UsbipServer

class Reader(sim: HdlSimulator, data: Deque[Tuple[int, str, List[int]]])[source]

Bases: object

__init__(sim: HdlSimulator, data: Deque[Tuple[int, str, List[int]]])[source]
async readexactly(size) bytes[source]
class Writer(sim: HdlSimulator, ref_data: Deque[List[int]])[source]

Bases: object

__init__(sim: HdlSimulator, ref_data: Deque[List[int]])[source]
close()[source]
async drain()[source]
get_extra_info(name)[source]
write(data: bytes)[source]
__init__(usb_ag: UtmiUsbAgent, session_data, debug=False)[source]
_start_server(loop)[source]
async on_usbip_connection(reader: StreamReader, writer: StreamWriter)[source]
class hwtLib.peripheral.usb.sim.usbip.session_recorder.UsbipServerSessionRecorder(sim: HdlSimulator)[source]

Bases: object

Used to record the client-server communication in hwtLib.peripheral.usb.sim.usbip.server.UsbipServer for later replay.

class Reader(sim: HdlSimulator, original: StreamReader, dst: List[List[int]])[source]

Bases: object

__init__(sim: HdlSimulator, original: StreamReader, dst: List[List[int]])[source]
async readexactly(size)[source]
class Writer(sim: HdlSimulator, original: StreamWriter, dst: List[List[int]])[source]

Bases: object

__init__(sim: HdlSimulator, original: StreamWriter, dst: List[List[int]])[source]
close()[source]
async drain()[source]
get_extra_info(name)[source]
write(data)[source]
__init__(sim: HdlSimulator)[source]
apply(reader: StreamReader, writer: StreamWriter)[source]
hwtLib.peripheral.usb.sim.usbip.session_recorder.cut_off_empty_time_segments(data: List[Tuple[int, str, List[int]]], time_segments: List[Tuple[int, int]]) List[Tuple[int, str, List[int]]][source]
Note

if time segments contains some transaction the assertion error is raised

Note

if segment (0,10) is cut off from times 0, 10, 11 it becomes 0, 1, 2

Parameters

time_segments – list of ranges of times to cut off (the time after is shifted)

Returns

filtered list of data from input

hwtLib.peripheral.usb.sim.usbip.session_recorder.filter_empty_in(_data: List[Tuple[int, str, List[int]]], seqnum_to_rm: Set[int]) List[Tuple[int, str, List[int]]][source]

Remove empty IN transfers from endpoints other than 0 and remove transactions by seqnum

Parameters
  • _data – list of tuples time, r/w, list of bytes

  • seqnum_to_rm – set of seqnum to remove (all parts of the transaction with that specified seqnum are removed entirely)

  • cut_off_segments – list of ranges of times to cut off (the time after is shifted)

Returns

filtered list of data from input