from math import ceil
from typing import Tuple, Optional, List
from hwt.code import Switch, In
from hwt.hObjList import HObjList
from hwt.hwIOs.hwIOArray import HwIOArray
from hwt.hwIOs.std import HwIORdVldSync
from hwt.hwIOs.utils import addClkRstn, propagateClkRstn
from hwt.hwModule import HwModule
from hwt.hwParam import HwParam
from hwtLib.amba.axi4s import Axi4Stream
from hwtLib.amba.axi4s_fullduplex import Axi4StreamFullDuplex
from hwtLib.amba.axis_comp.fifoCopy import Axi4SFifoCopy
from hwtLib.amba.axis_comp.fifoDrop import Axi4SFifoDrop
from hwtLib.handshaked.fifo import HandshakedFifo
from hwtLib.handshaked.streamNode import StreamNode
from hwtLib.peripheral.usb.descriptors.bundle import UsbEndpointMeta
from hwtLib.peripheral.usb.usb2.device_core_interfaces import UsbEndpointInterface
[docs]
class UsbDeviceEpBuffers(HwModule):
"""
USB device endpoint buffers
Buffers for the data transfers which are between usb core and usb function.
.. hwt-autodoc:: _example_UsbDeviceEpBuffers
"""
def hwConfig(self) -> None:
self.ENDPOINT_META: Tuple[Tuple[Optional[UsbEndpointMeta], Optional[UsbEndpointMeta]]] = HwParam(None)
self.DATA_WIDTH = HwParam(8)
[docs]
def hwDeclr(self) -> None:
"""
:note: ep.rx = from host, ep.tx = to host
"""
assert self.ENDPOINT_META is not None, "ENDPOINT_META parameter is required"
addClkRstn(self)
self.usb_core_io: UsbEndpointInterface = UsbEndpointInterface()
eps: Optional[Axi4StreamFullDuplex] = HwIOArray()
for rx, tx in self.ENDPOINT_META:
if rx is None and tx is None:
ep = None
else:
ep = Axi4StreamFullDuplex()
ep.USE_KEEP = True
ep.DATA_WIDTH = self.DATA_WIDTH
ep.HAS_RX = rx is not None
ep.HAS_TX = tx is not None
eps.append(ep)
self.ep = eps
[docs]
def connect_rx_part(self, rx_channels: List[Tuple[int, Axi4Stream, UsbEndpointMeta, Axi4SFifoDrop]]):
if not rx_channels:
assert not self.usb_core_io.HAS_RX
return
usb_endp = self.usb_core_io.endp
sn = StreamNode(
[self.usb_core_io.rx],
[], # added later
)
usb_rx = self.usb_core_io.rx
for (ep_i, ep_rx, _, rx_fifo) in rx_channels:
if rx_fifo is None:
_rx = self.ep[ep_i].rx
ep_rx(usb_rx, exclude=[usb_rx.ready, usb_rx.valid])
else:
_rx = rx_fifo.dataIn
ep_rx(rx_fifo.dataOut)
rx_fifo.dataIn(usb_rx, exclude=[usb_rx.ready, usb_rx.valid])
# discard on rx error
rx_fifo.dataIn_discard(usb_rx.valid & usb_rx.last & usb_rx.user)
sn.slaves.append(_rx)
sn.extraConds[_rx] = usb_endp.data._eq(ep_i)
sn.skipWhen[_rx] = usb_endp.data != ep_i
sn.sync(usb_endp.vld)
self.usb_core_io.rx_stall(~In(usb_endp.data, [i for (i, _, _, _) in rx_channels]))
[docs]
def connect_tx_part(self, tx_channels: List[Tuple[int, Axi4Stream, UsbEndpointMeta, Axi4SFifoCopy, HandshakedFifo]]):
if not tx_channels:
assert not self.usb_core_io.HAS_TX
return
for (_, ep_tx, _, tx_fifo, tx_packet_buffered_fifo) in tx_channels:
if tx_fifo is None:
continue
tx_fifo.dataIn(ep_tx, exclude=[ep_tx.ready, ep_tx.valid])
StreamNode(
[ep_tx],
[tx_fifo.dataIn, tx_packet_buffered_fifo.dataIn],
extraConds={
tx_packet_buffered_fifo.dataIn: tx_fifo.dataIn.last,
},
skipWhen={
tx_packet_buffered_fifo.dataIn: ep_tx.valid & ~ep_tx.last,
}
).sync()
tx_fifo.dataOut_copy_frame(0) # [todo]
usb_endp = self.usb_core_io.endp
tx = self.usb_core_io.tx
Switch(usb_endp.data).add_cases(
(i, tx(self.ep[i].tx if tx_fifo is None else tx_fifo.dataOut, exclude=[tx.ready, tx.valid])
) for (i, _, _, tx_fifo, _) in tx_channels
).Default(
tx.data(None),
tx.keep(None),
tx.last(None),
)
sn = StreamNode(
[], # added later
[tx],
)
for (i, _, _, tx_fifo, tx_packet_buffered_fifo) in tx_channels:
if tx_fifo is None:
_tx = self.ep[i].tx
else:
_tx = tx_fifo.dataOut
pbf = tx_packet_buffered_fifo.dataOut
sn.masters.append(pbf)
sn.extraConds[pbf] = usb_endp.data._eq(i) & _tx.last
sn.skipWhen[pbf] = usb_endp.data != i
sn.extraConds[_tx] = usb_endp.data._eq(i)
sn.skipWhen[_tx] = usb_endp.data != i
sn.masters.append(_tx)
sn.sync(usb_endp.vld)
# stall if there is no such a endpoint
self.usb_core_io.tx_stall(~In(usb_endp.data, [i for (i, _, _, _, _) in tx_channels]))
def hwImpl(self):
rx_channels: List[Tuple[int, Axi4Stream, UsbEndpointMeta, Axi4SFifoDrop]] = []
tx_channels: List[Tuple[int, Axi4Stream, UsbEndpointMeta, Axi4SFifoCopy, HandshakedFifo]] = []
rx_fifos: HObjList[Optional[Axi4SFifoDrop]] = HObjList()
tx_fifos: HObjList[Optional[Axi4SFifoCopy]] = HObjList()
tx_packet_buffered_fifos = HObjList()
for i, (ep, (rx_conf, tx_conf)) in enumerate(zip(self.ep, self.ENDPOINT_META)):
ep: Optional[Axi4StreamFullDuplex]
rx_conf: UsbEndpointMeta
tx_conf: UsbEndpointMeta
if ep is None:
continue
if ep.HAS_RX:
if rx_conf.buffer_size > 0:
rx_fifo = Axi4SFifoDrop()
rx_fifo.USE_KEEP = True
rx_fifo.DATA_WIDTH = self.DATA_WIDTH
rx_fifo.DEPTH = rx_conf.buffer_size
else:
rx_fifo = None
rx_fifos.append(rx_fifo)
rx_channels.append((i, ep.rx, rx_conf, rx_fifo))
if ep.HAS_TX:
if tx_conf.buffer_size > 0:
# if there is some buffer we also need to asssert that
# we are sending only fully fubbered packet
tx_packet_buffered_fifo = HandshakedFifo(HwIORdVldSync)
tx_packet_buffered_fifo.DEPTH = ceil(tx_conf.buffer_size / tx_conf.max_packet_size) + 2
tx_fifo = Axi4SFifoCopy()
tx_fifo.USE_KEEP = True
tx_fifo.DATA_WIDTH = self.DATA_WIDTH
tx_fifo.DEPTH = tx_conf.buffer_size
else:
tx_packet_buffered_fifo = None
tx_fifo = None
tx_packet_buffered_fifos.append(tx_packet_buffered_fifo)
tx_fifos.append(tx_fifo)
tx_channels.append((i, ep.tx, tx_conf, tx_fifo, tx_packet_buffered_fifo))
self.rx_fifos = rx_fifos
self.tx_fifos = tx_fifos
self.tx_packet_buffered_fifos = tx_packet_buffered_fifos
self.connect_rx_part(rx_channels)
self.connect_tx_part(tx_channels)
propagateClkRstn(self)
[docs]
def _example_UsbDeviceEpBuffers():
from hwtLib.peripheral.usb.descriptors.cdc import get_default_usb_cdc_vcp_descriptors
m = UsbDeviceEpBuffers()
m.ENDPOINT_META = get_default_usb_cdc_vcp_descriptors().get_endpoint_meta()
m.ENDPOINT_META[0][0].buffer_size = 0
m.ENDPOINT_META[0][1].buffer_size = 0
return m
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = _example_UsbDeviceEpBuffers()
print(to_rtl_str(m))