Source code for hwtLib.peripheral.usb.sim.usbip.server

#!/usr/bin/env python3

This server allows to connect virtual USB devices in the simulator directly to any machine which supports USBIP.

Based on

:note: usbip is a part of linux-tools-common apt package
       but in /usr/bin/ is only a placeholder
       the /usr/lib/linux-tools-`uname -r`/usbipd

:note: on some machines /usr/bin/usbip does not work correctly
    because the version of a kernel slightly differs from version of linux tools

.. code-block::

    modprobe vhci_hcd
    usbip list -r -l
    usbip attach -r -b 0-1

    usbip port -r
    usbip detach -p 00

import asyncio
from asyncio.selector_events import BaseSelectorEventLoop
from asyncio.streams import StreamReader, StreamWriter
import threading

from hwtLib.peripheral.usb.sim.usbip.connection import USBIPConnection
from hwtLib.peripheral.usb.usb2.utmi_usb_agent import UtmiUsbAgent

[docs]class UsbipServer():
[docs] def __init__(self, usb_ag: UtmiUsbAgent, host='', port=3240, debug=False): self.usb_ag = usb_ag = host self.port = port self._debug = debug self._server = None self._loop = None self._die_on_exception = debug self._terminated = False self._session_recorder = None
[docs] def install_session_recorder(self, session_recoder: 'UsbipServerSessionRecorder'): self._session_recorder = session_recoder
[docs] async def on_usbip_connection(self, reader: StreamReader, writer: StreamWriter): if self._session_recorder: reader, writer = self._session_recorder.apply(reader, writer) conn = USBIPConnection(self, reader, writer) await conn.connection()
[docs] def terminate(self): if self._terminated: return self._terminated = True server = self._server loop = self._loop if loop is not None: if server is not None: server.close() # loop.run_until_complete(server.wait_closed()) loop.stop()
# loop.close() # self._server = None # self._loop = None
[docs] def handle_loop_exception(self, loop, context): # raise the exception in main thread that there was some unfixable error self._terminated = True sim = self.usb_ag.sim def raise_err(): raise context.get("exception", context["message"]) yield self.usb_ag.sim._schedule_proc( + 2, raise_err())
# msg = ctypes.py_object(SystemExit) # thread_id = ctypes.c_long(self.main_thread_id) # res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, msg) # if res == 0: # raise ValueError("Can not notify the exception to a main thread") # elif res > 1: # ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0) # raise context.get("exception", context["message"])
[docs] def _start_server(self, loop): coro = asyncio.start_server(self.on_usbip_connection,, self.port, loop=loop) self._server = loop.run_until_complete(coro) return coro
[docs] def run(self): if self._terminated: return try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop: BaseSelectorEventLoop self._loop = loop loop.set_exception_handler(self.handle_loop_exception) coro = self._start_server(loop) if self._terminated: self.terminate() return # self._server = loop.run_until_complete(coro) if self._terminated: self.terminate() return t1 = threading.Thread(target=loop.run_forever, args=()) t1.daemon = True t1.start() return t1