#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional
from hwt.code import If, Switch, CodeBlock
from hwt.code_utils import rename_signal
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.enum import HEnum
from hwt.hdl.types.struct import HStruct
from hwt.interfaces.std import VldSynced, Signal
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.math import log2ceil
from hwt.synthesizer.param import Param
from hwt.synthesizer.rtlLevel.mainBases import RtlSignalBase
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axis import AxiStream
from hwtLib.peripheral.usb.constants import usb_addr_t, USB_PID, USB_VER, \
usb_endp_t, usb_pid_t, USB_LINE_STATE
from hwtLib.peripheral.usb.descriptors.bundle import UsbDescriptorBundle, \
UsbEndpointMeta
from hwtLib.peripheral.usb.usb2.device_core_interfaces import UsbEndpointInterface
from hwtLib.peripheral.usb.usb2.sie_rx import Usb2SieDeviceRx
from hwtLib.peripheral.usb.usb2.sie_tx import Usb2SieDeviceTx
from hwtLib.peripheral.usb.usb2.ulpi import ulpi_reg_otg_control_t_reset_defaults, \
ulpi_reg_function_control_t_reset_default
from hwtLib.peripheral.usb.usb2.utmi import Utmi_8b
from hwtLib.types.ctypes import uint8_t
from pyMathBitPrecise.bit_utils import mask
[docs]class Usb2DeviceCore(Unit):
"""
Based on USB descriptors build endpoint statemachines, speed negotiation logic,
usb reset logic and transaction logic
:see: https://www.beyondlogic.org/usbnutshell/usb4.shtml
:ivar ~.phy: An interface to USB PHY which is connected to host
:ivar ~.ep: And interface to USB endpont buffers of this device
:ivar ~.usb_rst: output of USB reset detector
:ivar ~.usb_speed: An interface which holds the index of the USB version
to note which speed was negotiated.
:ivar ~.current_usb_addr: An input signal with an address currently
assigned for this device.
:ivar ~.PRE_NEGOTIATED_TO: A parameter for testing purposes which can be used to
specify the default state of the link negotiation state
:ivar sie_rx: Serial Interface Engine for UTMI rx channel (host->device)
:ivar sie_tx: Serial Interface Engine for UTMI tx channel (device->host)
.. hwt-autodoc:: _example_Usb2DeviceCore
"""
def _config(self):
self.DESCRIPTORS: UsbDescriptorBundle = Param(None)
self.CLK_FREQ = Param(int(60e6))
self.PRE_NEGOTIATED_TO: Optional[USB_VER] = Param(None)
def _declr(self):
assert isinstance(self.DESCRIPTORS, UsbDescriptorBundle), self.DESCRIPTORS
self.ENDPOINT_CONFIG = self.DESCRIPTORS.get_endpoint_meta()
addClkRstn(self)
self.clk.FREQ = self.CLK_FREQ
self.phy = Utmi_8b()
self.ep: UsbEndpointInterface = UsbEndpointInterface()._m()
self.usb_rst: Signal = Signal()._m()
self.usb_speed: VldSynced = VldSynced()._m()
self.usb_speed.DATA_WIDTH = log2ceil(len(USB_VER.values))
self.current_usb_addr = Signal(usb_addr_t)
self.sie_rx = Usb2SieDeviceRx()
self.sie_tx = Usb2SieDeviceTx()
[docs] def detect_usb_rst(self, LineState: RtlSignal, usb_speed: VldSynced):
se0_cntr = self._reg("se0_cntr", Bits(15), def_val=0)
usb_rst_detected = se0_cntr[14]
# is_negotiated_to_HS = usb_speed.vld & usb_speed.data._eq(USB_VER.values.index(USB_VER)
# LS/FS SE0 for more than > 2.5us
If(LineState._eq(USB_LINE_STATE.SE0),
If(~usb_rst_detected,
se0_cntr(se0_cntr + 1)
)
).Else(
# [TODO] HS more than 3ms of bus inactivity -> suspend or reset
se0_cntr(0),
)
return usb_rst_detected
[docs] def define_endpoint_states(self, endp: RtlSignal, rst_any:RtlSignal):
# used for DATA0/1 toogling
# [todo] endpoint specific
ep_out_data_bits = []
ep_in_data_bits = []
for i, (ep_out, ep_in) in enumerate(self.ENDPOINT_CONFIG):
if ep_out is not None:
ep_out: UsbEndpointMeta
o_b = self._reg(f"ep{i:d}_out_data_bit", BIT, def_val=0, rst=rst_any)
else:
o_b = None
ep_out_data_bits.append(o_b)
if ep_in is not None:
ep_in: UsbEndpointMeta
i_b = self._reg(f"ep{i:d}_in_data_bit", BIT, def_val=0, rst=rst_any)
else:
i_b = None
ep_in_data_bits.append(i_b)
ep_out_data_bit = self._sig("ep_out_data_bit")
ep_in_data_bit = self._sig("ep_in_data_bit")
Switch(endp)\
.add_cases((i, [
ep_out_data_bit(o_b),
ep_in_data_bit(i_b),
]) for i, (o_b, i_b) in enumerate(zip(ep_out_data_bits, ep_in_data_bits))
).Default(
ep_out_data_bit(None),
ep_in_data_bit(None),
)
ep_is_isochronous = BIT.from_py(0)
def set_ep_out_data_bit(val):
res = []
for i, o_b in enumerate(ep_out_data_bits):
if o_b is not None:
res.append(
If(endp._eq(i),
o_b(val),
)
)
return res
def set_ep_in_data_bit(val):
res = []
for i, i_b in enumerate(ep_in_data_bits):
if i_b is not None:
res.append(
If(endp._eq(i),
i_b(val),
)
)
return res
return ep_out_data_bit, ep_in_data_bit, set_ep_out_data_bit, set_ep_in_data_bit, ep_is_isochronous
[docs] def usb_endpoint_fsm(self, usb_rst: RtlSignal, chirp_en: RtlSignal,
ep_rx: AxiStream,
ep_tx: AxiStream,
ep_tx_success: VldSynced,
ep_rx_stall: RtlSignal,
ep_tx_stall: RtlSignal,
):
st_t = HEnum("usb_endpoint_fsm_st_t", [
"RX_IDLE",
"RX_DATA",
"RX_DATA_IGNORE",
"RX_DATA_IGNORE_NO_RESP",
"TX_DATA",
"TX_DATA_COMPLETE",
"TX_HANDSHAKE",
"TX_CHIRP",
])
rst_any = self.rst_n._isOn() | (usb_rst._isOn() & ~chirp_en)
st = self._reg("usb_endpoint_fsm_st", st_t, def_val=st_t.RX_IDLE, rst=rst_any)
# tx drive
token_pid = self.sie_rx.rx_header.pid
rx_data = self.sie_rx.rx_data
tx_cmd = self.sie_tx.tx_cmd
ep_out_data_bit, ep_in_data_bit, set_ep_out_data_bit, set_ep_in_data_bit, ep_is_isochronous = self.define_endpoint_states(self.sie_rx.rx_header.endp, rst_any)
out_pid_error = rename_signal(self, ~ep_is_isochronous & self.sie_rx.rx_header.vld & (
(~ep_out_data_bit & (token_pid != USB_PID.DATA_0)) |
(ep_out_data_bit & (token_pid != USB_PID.DATA_1))
), "out_pid_error")
tx_cmd_extra_last = self._sig("tx_cmd_extra_last")
tx_pid = self._reg("tx_pid", usb_pid_t)
CodeBlock(
tx_cmd.pid(None),
tx_cmd.chirp(0),
tx_cmd.valid(0),
tx_cmd_extra_last(0),
Switch(st)\
.Case(st_t.RX_IDLE,
tx_pid(None),
If(self.sie_rx.rx_header.vld,
Switch(token_pid)\
.Case(USB_PID.TOKEN_IN, # tx (device -> host)
If(ep_tx_stall,
# the endpoint is shut down or not supported
st(st_t.TX_HANDSHAKE),
tx_pid(USB_PID.HS_STALL),
).Elif(ep_tx.valid,
# will send regular data
st(st_t.TX_DATA),
# [todo] MDATA for isochronous endpoints
If(ep_in_data_bit._eq(0),
tx_pid(USB_PID.DATA_0),
).Else(
tx_pid(USB_PID.DATA_1),
),
).Else(
# no data to send
st(st_t.TX_HANDSHAKE),
tx_pid(USB_PID.HS_NACK),
)
).Case(USB_PID.PING, # (device -> host, asking for host -> device)
st(st_t.TX_HANDSHAKE),
If(ep_rx_stall,
# the endpoint is shut down or not supported
tx_pid(USB_PID.HS_STALL),
).Elif(ep_tx.valid,
tx_pid(USB_PID.HS_ACK),
).Else(
# no data to send
tx_pid(USB_PID.HS_NACK),
),
).Case(USB_PID.TOKEN_OUT, # rx (host -> device)
If(ep_rx_stall,
# the endpoint is shut down or not supported
tx_pid(USB_PID.HS_STALL),
st(st_t.RX_DATA_IGNORE),
).Elif(ep_rx.ready,
st(st_t.RX_DATA),
).Else(
# no space in rx buffer
tx_pid(USB_PID.HS_NACK),
st(st_t.RX_DATA_IGNORE),
)
).Case(USB_PID.TOKEN_SETUP, # (host -> device)
set_ep_out_data_bit(0),
set_ep_in_data_bit(1),
If(ep_rx.ready,
# can receive the setup data
st(st_t.RX_DATA),
).Else(
# no space in rx buffer (note that this should not happen)
tx_pid(USB_PID.HS_NACK),
st(st_t.RX_DATA_IGNORE),
)
)
).Elif(chirp_en,
st(st_t.TX_CHIRP),
),
).Case(st_t.RX_DATA,
# checking if data PID is correct
# [TODO] isochronous rx
# [TODO] timeout
If(out_pid_error,
st(st_t.RX_DATA_IGNORE_NO_RESP),
).Elif(rx_data.error | (ep_is_isochronous & rx_data.vld & rx_data.last),
# no response on CRC error or isochronous enpoint
st(st_t.RX_IDLE),
).Elif(rx_data.vld & rx_data.last,
# ack after data receive
tx_pid(USB_PID.HS_ACK),
set_ep_out_data_bit(~ep_out_data_bit),
st(st_t.TX_HANDSHAKE)
),
).Case(st_t.RX_DATA_IGNORE_NO_RESP,
If(rx_data.vld & rx_data.last,
st(st_t.RX_IDLE),
)
).Case(st_t.RX_DATA_IGNORE,
# there was an error, waiting until end of packet
If(rx_data.vld & rx_data.last,
If(rx_data.error | ep_is_isochronous,
# no response on CRC error or isochronous enpoint
st(st_t.RX_IDLE),
).Else(
st(st_t.TX_HANDSHAKE)
)
)
).Case(st_t.TX_DATA,
tx_cmd.pid(tx_pid),
tx_cmd.valid(ep_tx.valid),
If(ep_tx.valid & ep_tx.ready & ep_tx.last,
set_ep_in_data_bit(~ep_in_data_bit),
st(st_t.TX_DATA_COMPLETE),
),
).Case(st_t.TX_DATA_COMPLETE,
st(st_t.RX_IDLE),
).Case(st_t.TX_HANDSHAKE,
tx_cmd.valid(1),
tx_cmd_extra_last(1),
If(ep_tx_stall,
tx_cmd.pid(USB_PID.HS_STALL),
).Else(
tx_cmd.pid(tx_pid),
),
If(tx_cmd.ready,
st(st_t.RX_IDLE),
),
).Case(st_t.TX_CHIRP,
tx_cmd.valid(1),
tx_cmd.pid(0),
tx_cmd.chirp(1),
If(~chirp_en,
st(st_t.RX_IDLE),
)
)
)
ep_rx.valid(rx_data.vld & st._eq(st_t.RX_DATA))
ep_tx_success.vld(st._eq(st_t.TX_DATA_COMPLETE) & tx_cmd.ready)
ep_tx_success.data(1)
ep_tx.ready(((st._eq(st_t.TX_HANDSHAKE) & ep_tx_stall) |
st._eq(st_t.TX_DATA)
) & tx_cmd.ready)
return tx_cmd_extra_last
[docs] def ms_to_clock_ticks(self, t_ms):
return (t_ms * 1e-3) / (1 / self.CLK_FREQ)
[docs] def usb_linerate_negotiation(self,
enable: RtlSignalBase, usb_reset: RtlSignal,
utmi: Utmi_8b, usb_speed_o:VldSynced):
# Default - disconnect
st_t = HEnum("usb_linerate_negotiation_state", [
"IDLE",
"WAIT_RST",
"SEND_CHIRP_K",
"WAIT_CHIRP_JK",
"FULLSPEED",
"HIGHSPEED",
])
st = self._reg("usb_linerate_negotiation_state", st_t,
def_val={
None: st_t.IDLE,
USB_VER.USB1_1:st_t.FULLSPEED,
USB_VER.USB2_0:st_t.HIGHSPEED,
}[self.PRE_NEGOTIATED_TO])
st_t = st._dtype
utmi_fn = utmi.function_control
utmi_otg = utmi.otg_control
utmi_otg_default = utmi_otg._dtype.from_py({
k: 0 if k in ("DpPulldown", "DmPulldown") else v
for k, v in ulpi_reg_otg_control_t_reset_defaults.items()
})
utmi_otg(utmi_otg_default)
utmi_fn.Reset(ulpi_reg_function_control_t_reset_default["Reset"])
utmi_fn.SuspendM(ulpi_reg_function_control_t_reset_default["SuspendM"])
HS_CHIRP_COUNT = 5
chirp_count_q = self._reg("chirp_count_q", uint8_t, def_val=0)
last_LineState = self._reg("last_LineState", utmi.LineState._dtype, def_val=USB_LINE_STATE.SE0)
last_LineState(utmi.LineState)
If(st._eq(st_t.SEND_CHIRP_K),
chirp_count_q(0),
).Elif(st._eq(st_t.WAIT_CHIRP_JK) & (last_LineState != utmi.LineState) & (chirp_count_q != 0xFF),
chirp_count_q(chirp_count_q + 1)
)
ms = self.ms_to_clock_ticks
DETACH_TIME = ms(1) # 1ms -> T0
ATTACH_FS_TIME = DETACH_TIME + ms(3) # T0 + 3ms = T1
CHIRPK_TIME = ATTACH_FS_TIME + ms(1) # T1 + ~1ms
HS_RESET_TIME = DETACH_TIME + ms(9) # T0 + 10ms = T9
# Time since T0 (start of HS reset)
usb_rst_time_q = self._reg("usb_rst_time_q", Bits(log2ceil(HS_RESET_TIME)), def_val=0)
If((st != st_t.WAIT_RST) & st.next._eq(st_t.WAIT_RST),
# Entering wait for reset state
usb_rst_time_q(0),
).Elif(st._eq(st_t.WAIT_RST) & (utmi.LineState != USB_LINE_STATE.SE0),
# Waiting for reset, reset count on line state toggle
usb_rst_time_q(0),
).Elif(usb_rst_time_q != mask(usb_rst_time_q._dtype.bit_length()),
usb_rst_time_q(usb_rst_time_q + 1),
)
OP_MODE = Utmi_8b.OP_MODE
XCVR_SELECT = Utmi_8b.XCVR_SELECT
TERM_SELECT = Utmi_8b.TERM_SELECT
def set_mode(OpMode: Utmi_8b.OP_MODE,
XcvrSelect: Utmi_8b.XCVR_SELECT,
TermSelect: Utmi_8b.TERM_SELECT):
return [
utmi_fn.OpMode(OpMode),
utmi_fn.XcvrSelect(XcvrSelect),
utmi_fn.TermSelect(TermSelect),
]
Switch(st)\
.Case(st_t.IDLE,
set_mode(OP_MODE.NON_DRIVING, XCVR_SELECT.HS, TERM_SELECT.HS),
# Detached
If(enable & (usb_rst_time_q > DETACH_TIME),
st(st_t.WAIT_RST),
)
).Case(st_t.WAIT_RST,
# Assert FS mode, check for SE0 (T0)
set_mode(OP_MODE.NORMAL, XCVR_SELECT.FS, TERM_SELECT.FS),
# Wait for SE0 (T1), send device chirp K
If(usb_rst_time_q > ATTACH_FS_TIME,
st(st_t.SEND_CHIRP_K),
)
).Case(st_t.SEND_CHIRP_K,
# Send chirp K
set_mode(OP_MODE.DISABLE_BIT_STUFFING_AND_NRZI, XCVR_SELECT.HS, TERM_SELECT.FS),
# End of device chirp K (T2)
If(usb_rst_time_q > CHIRPK_TIME,
st(st_t.WAIT_CHIRP_JK),
)
).Case(st_t.WAIT_CHIRP_JK,
# Stop sending chirp K and wait for downstream port chirps
set_mode(OP_MODE.DISABLE_BIT_STUFFING_AND_NRZI, XCVR_SELECT.HS, TERM_SELECT.FS),
# Required number of chirps detected, move to HS mode (T7)
If(chirp_count_q > HS_CHIRP_COUNT,
st(st_t.HIGHSPEED),
# Time out waiting for chirps, fallback to FS mode
).Elif(usb_rst_time_q > HS_RESET_TIME,
st(st_t.FULLSPEED),
)
).Case(st_t.FULLSPEED,
set_mode(OP_MODE.NORMAL, XCVR_SELECT.FS, TERM_SELECT.FS),
# USB reset detected...
If((usb_rst_time_q > HS_RESET_TIME) & usb_reset,
st(st_t.WAIT_RST),
),
).Case(st_t.HIGHSPEED,
# Enter HS mode
set_mode(OP_MODE.NORMAL, XCVR_SELECT.HS, TERM_SELECT.HS),
# Long SE0 - could be reset or suspend
# TODO: Should revert to FS mode and check...
If((usb_rst_time_q > HS_RESET_TIME) & usb_reset,
st(st_t.WAIT_RST),
)
)
Switch(st)\
.Case(st_t.FULLSPEED,
usb_speed_o.vld(1),
usb_speed_o.data(USB_VER.values.index(USB_VER.USB1_1)),
).Case(st_t.HIGHSPEED,
usb_speed_o.vld(1),
usb_speed_o.data(USB_VER.values.index(USB_VER.USB2_0)),
).Default(
usb_speed_o.vld(0),
usb_speed_o.data(None),
)
chirp_en = st._eq(st_t.SEND_CHIRP_K)
return chirp_en
def _impl(self):
phy = self.phy
ep = self.ep
sie_rx = self.sie_rx
sie_tx = self.sie_tx
usb_rst = self.detect_usb_rst(phy.LineState, self.usb_speed)
chirp_en = self.usb_linerate_negotiation(BIT.from_py(1), usb_rst, phy, self.usb_speed)
tx_cmd_extra_last = self.usb_endpoint_fsm(usb_rst, chirp_en, ep.rx, ep.tx, ep.tx_success, ep.rx_stall, ep.tx.valid & ep.tx_stall)
endp = self._reg("endp", HStruct(
(usb_endp_t, "data"),
(BIT, "vld")
),
def_val={"vld": 0}, rst=self.rst_n._isOn() | usb_rst
)
If(sie_rx.rx_header.vld,
endp.data(sie_rx.rx_header.endp),
endp.vld(~sie_rx.rx_header.error),
ep.endp.data(sie_rx.rx_header.endp),
ep.endp.vld(~sie_rx.rx_header.error),
).Else(
ep.endp(endp)
)
# tx - device to host
phy.tx(sie_tx.tx)
sie_tx.tx_cmd.data(ep.tx.data)
sie_tx.tx_cmd.keep(ep.tx.keep)
sie_tx.tx_cmd.last(ep.tx.last | ep.tx_stall | tx_cmd_extra_last)
sie_tx.enable(~usb_rst)
# rx - host to device
sie_rx.rx(phy.rx)
ep.rx.data(sie_rx.rx_data.data)
ep.rx.keep(sie_rx.rx_data.keep)
ep.rx.last(sie_rx.rx_data.last)
ep.rx.user(sie_rx.rx_data.error)
sie_rx.current_usb_addr(self.current_usb_addr)
sie_rx.enable(~usb_rst & ~chirp_en)
self.usb_rst(usb_rst)
propagateClkRstn(self)
[docs]def _example_Usb2DeviceCore():
from hwtLib.peripheral.usb.descriptors.cdc import get_default_usb_cdc_vcp_descriptors
u = Usb2DeviceCore()
u.DESCRIPTORS = get_default_usb_cdc_vcp_descriptors()
return u
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
u = _example_Usb2DeviceCore()
print(to_rtl_str(u))