Source code for hwtLib.peripheral.usb.sim.usbip.server
#!/usr/bin/env python3
"""
This server allows to connect virtual USB devices in the simulator directly to any machine which supports USBIP.
Based on https://github.com/jwise/pyusbip/blob/master/pyusbip.py
https://www.kernel.org/doc/html/latest/usb/usbip_protocol.html
https://forums.aws.amazon.com/thread.jspa?messageID=968176
:note: usbip is a part of linux-tools-common apt package
but in /usr/bin/ is only a placeholder
the /usr/lib/linux-tools-`uname -r`/usbipd
https://developer.ridgerun.com/wiki/index.php?title=How_to_setup_and_use_USB/IP
:note: on some machines /usr/bin/usbip does not work correctly
because the version of a kernel slightly differs from version of linux tools
/usr/lib/linux-tools-5.8.0-25/usbip
/usr/lib/linux-tools-5.9.0-050900-generic/usbip
.. code-block::
modprobe vhci_hcd
usbip list -r 127.0.0.1 -l
usbip attach -r 127.0.0.1 -b 0-1
usbip port -r 127.0.0.1
usbip detach -p 00
"""
import asyncio
from asyncio.selector_events import BaseSelectorEventLoop
from asyncio.streams import StreamReader, StreamWriter
import threading
from hwtLib.peripheral.usb.sim.usbip.connection import USBIPConnection
from hwtLib.peripheral.usb.usb2.utmi_usb_agent import UtmiUsbAgent
[docs]class UsbipServer():
[docs] def __init__(self, usb_ag: UtmiUsbAgent, host='127.0.0.1', port=3240, debug=False):
self.usb_ag = usb_ag
self.host = host
self.port = port
self._debug = debug
self._server = None
self._loop = None
self._die_on_exception = debug
self._terminated = False
self._session_recorder = None
[docs] def install_session_recorder(self, session_recoder: 'UsbipServerSessionRecorder'):
self._session_recorder = session_recoder
[docs] async def on_usbip_connection(self, reader: StreamReader, writer: StreamWriter):
if self._session_recorder:
reader, writer = self._session_recorder.apply(reader, writer)
conn = USBIPConnection(self, reader, writer)
await conn.connection()
[docs] def terminate(self):
if self._terminated:
return
self._terminated = True
server = self._server
loop = self._loop
if loop is not None:
if server is not None:
server.close()
# loop.run_until_complete(server.wait_closed())
loop.stop()
# loop.close()
# self._server = None
# self._loop = None
[docs] def handle_loop_exception(self, loop, context):
# raise the exception in main thread that there was some unfixable error
self._terminated = True
sim = self.usb_ag.sim
def raise_err():
raise context.get("exception", context["message"])
yield
self.usb_ag.sim._schedule_proc(sim.now + 2, raise_err())
# msg = ctypes.py_object(SystemExit)
# thread_id = ctypes.c_long(self.main_thread_id)
# res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, msg)
# if res == 0:
# raise ValueError("Can not notify the exception to a main thread")
# elif res > 1:
# ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
# raise context.get("exception", context["message"])
[docs] def _start_server(self, loop):
coro = asyncio.start_server(self.on_usbip_connection, self.host, self.port, loop=loop)
self._server = loop.run_until_complete(coro)
return coro
[docs] def run(self):
if self._terminated:
return
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop: BaseSelectorEventLoop
self._loop = loop
loop.set_exception_handler(self.handle_loop_exception)
coro = self._start_server(loop)
if self._terminated:
self.terminate()
return
# self._server = loop.run_until_complete(coro)
if self._terminated:
self.terminate()
return
t1 = threading.Thread(target=loop.run_forever, args=())
t1.daemon = True
t1.start()
return t1