Source code for hwtLib.peripheral.usb.usb2.device_cdc_vcp

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import Switch
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import Handshaked
from hwt.interfaces.structIntf import StructIntf
from hwt.interfaces.utils import propagateClkRstn
from hwt.synthesizer.param import Param
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwtLib.handshaked.builder import HsBuilder
from hwtLib.handshaked.streamNode import StreamNode
from hwtLib.peripheral.usb.descriptors.bundle import UsbDescriptorBundle, \
    UsbEndpointMeta
from hwtLib.peripheral.usb.descriptors.cdc import get_default_usb_cdc_vcp_descriptors, \
    CLASS_REQUEST, make_usb_line_coding_default
from hwtLib.peripheral.usb.usb2.device_common import Usb2DeviceCommon
from hwtLib.peripheral.usb.usb2.device_ep_buffers import UsbDeviceEpBuffers
from hwtLib.types.ctypes import uint8_t
from hwt.code_utils import rename_signal


[docs]class Usb2CdcVcp(Usb2DeviceCommon): """ USB2.0 communcation device class virtual com port core (serial/uart over USB) :see: :class:`hwtLib.peripheral.usb.usb2.device_common.Usb2DeviceCommon` :ivar RX_AGGREGATION_TIMEOUT: the timeout (in clk ticks) for a packing of incomming bytes to a USB packet (the bigger packet equals more USB efficiency) :ivar rx: stream data from host :ivar tx: stream data to host .. hwt-autodoc:: """ def _config(self): Usb2DeviceCommon._config(self) self.DESCRIPTORS: UsbDescriptorBundle = get_default_usb_cdc_vcp_descriptors( productStr=self.__class__.__name__, bMaxPacketSize=512) self.RX_AGGREGATION_TIMEOUT = Param(512) def _declr(self): Usb2DeviceCommon._declr(self) self.rx:Handshaked = Handshaked()._m() self.tx = Handshaked() for i in [self.rx, self.tx]: i.DATA_WIDTH = 8
[docs] def generat_descriptor_rom(self, descriptors: UsbDescriptorBundle, rst): _descriptor_rom = descriptors.compile_rom() line_coding = descriptors.HValue_to_byte_list(make_usb_line_coding_default()) descriptors.ROM_CDC_LINE_CODING_ADDR = len(descriptors) descriptors.ROM_CDC_LINE_CODING_SIZE = len(line_coding) _descriptor_rom.extend(line_coding) descriptor_rom = self._sig("descriptor_rom", Bits(8)[len(_descriptor_rom)], def_val=_descriptor_rom) # a register for descriptor reader descr_addr = self._reg("descr_addr", uint8_t, rst=rst) descr_d = descriptor_rom[descr_addr] return descr_addr, descr_d
[docs] def decode_setup_request_class(self, setup:StructIntf, ep0_stall: RtlSignal, usb_addr_next: RtlSignal, descriptors: UsbDescriptorBundle, req_bDescriptorType: RtlSignal, req_bDescriptorIndex: RtlSignal, dev_configured: RtlSignal, descr_addr: RtlSignal, ep0_trans_len: RtlSignal): bRequest = rename_signal(self, setup.bRequest, "bRequest") return Switch(bRequest)\ .Case(CLASS_REQUEST.GET_LINE_CODING, descr_addr(descriptors.ROM_CDC_LINE_CODING_ADDR), ep0_trans_len(descriptors.ROM_CDC_LINE_CODING_SIZE), )
# accept all, from some reason there is bRequest 0x25 which should be reserved value when used with usbip def _impl(self): ep_meta = self.DESCRIPTORS.get_endpoint_meta() ep_meta = list(ep_meta) for ep0_channel in ep_meta[0]: ep0_channel: UsbEndpointMeta # no need to buffer anything as we can process it directly ep0_channel.buffer_size = 0 # because we do not support any interrupts # the buffer will respond with the stall automatically for ep2_channel in ep_meta[2]: if ep2_channel is None: continue ep2_channel: UsbEndpointMeta # no need to buffer interrupts as we can process it directly ep2_channel.buffer_size = 0 ep_buffers = UsbDeviceEpBuffers() ep_buffers.ENDPOINT_META = tuple(ep_meta) self.ep_buffers = ep_buffers dev_configured, ep0_stall = self.descriptor_ep0_fsm(self.DESCRIPTORS) self.connect_core_and_ep_buffers_common(ep0_stall, ep_buffers) tx_src = HsBuilder(self, self.tx).to_axis( MAX_FRAME_WORDS=ep_meta[1][0].max_packet_size, IN_TIMEOUT=self.RX_AGGREGATION_TIMEOUT).end ep_buffers.ep[1].tx(tx_src, exclude=[tx_src.ready, tx_src.valid, ep_buffers.ep[1].tx.keep]) ep_buffers.ep[1].tx.keep(1) StreamNode([tx_src], [ep_buffers.ep[1].tx]).sync(dev_configured._eq(1)) ep_buffers.ep[2].tx.data(None) ep_buffers.ep[2].tx.last(None) ep_buffers.ep[2].tx.keep(None) ep_buffers.ep[2].tx.valid(0) rx = ep_buffers.ep[1].rx StreamNode([rx], [self.rx]).sync(dev_configured._eq(1)) self.rx.data(rx.data) propagateClkRstn(self)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = Usb2CdcVcp() print(to_rtl_str(u))