from math import ceil
from typing import Tuple, Optional, List
from hwt.code import Switch, In
from hwt.interfaces.std import HandshakeSync
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.synthesizer.hObjList import HObjList
from hwt.synthesizer.param import Param
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axis import AxiStream
from hwtLib.amba.axis_comp.fifoCopy import AxiSFifoCopy
from hwtLib.amba.axis_comp.fifoDrop import AxiSFifoDrop
from hwtLib.amba.axis_fullduplex import AxiStreamFullDuplex
from hwtLib.handshaked.fifo import HandshakedFifo
from hwtLib.handshaked.streamNode import StreamNode
from hwtLib.peripheral.usb.descriptors.bundle import UsbEndpointMeta
from hwtLib.peripheral.usb.usb2.device_core_interfaces import UsbEndpointInterface
[docs]class UsbDeviceEpBuffers(Unit):
"""
USB device endpoint buffers
Buffers for the data transfers which are between usb core and usb function.
.. hwt-autodoc:: _example_UsbDeviceEpBuffers
"""
def _config(self) -> None:
self.ENDPOINT_META: Tuple[Tuple[Optional[UsbEndpointMeta], Optional[UsbEndpointMeta]]] = Param(None)
self.DATA_WIDTH = Param(8)
[docs] def _declr(self) -> None:
"""
:note: ep.rx = from host, ep.tx = to host
"""
assert self.ENDPOINT_META is not None, "ENDPOINT_META parameter is required"
addClkRstn(self)
self.usb_core_io: UsbEndpointInterface = UsbEndpointInterface()
eps = HObjList()
for rx, tx in self.ENDPOINT_META:
if rx is None and tx is None:
ep = None
else:
ep = AxiStreamFullDuplex()
ep.USE_KEEP = True
ep.DATA_WIDTH = self.DATA_WIDTH
ep.HAS_RX = rx is not None
ep.HAS_TX = tx is not None
eps.append(ep)
self.ep: HObjList[AxiStreamFullDuplex] = eps
[docs] def connect_rx_part(self, rx_channels: List[Tuple[int, AxiStream, UsbEndpointMeta, AxiSFifoDrop]]):
if not rx_channels:
assert not self.usb_core_io.HAS_RX
return
usb_endp = self.usb_core_io.endp
sn = StreamNode(
[self.usb_core_io.rx],
[], # added later
)
usb_rx = self.usb_core_io.rx
for (ep_i, ep_rx, _, rx_fifo) in rx_channels:
if rx_fifo is None:
_rx = self.ep[ep_i].rx
ep_rx(usb_rx, exclude=[usb_rx.ready, usb_rx.valid])
else:
_rx = rx_fifo.dataIn
ep_rx(rx_fifo.dataOut)
rx_fifo.dataIn(usb_rx, exclude=[usb_rx.ready, usb_rx.valid])
# discard on rx error
rx_fifo.dataIn_discard(usb_rx.valid & usb_rx.last & usb_rx.user)
sn.slaves.append(_rx)
sn.extraConds[_rx] = usb_endp.data._eq(ep_i)
sn.skipWhen[_rx] = usb_endp.data != ep_i
sn.sync(usb_endp.vld)
self.usb_core_io.rx_stall(~In(usb_endp.data, [i for (i, _, _, _) in rx_channels]))
[docs] def connect_tx_part(self, tx_channels: List[Tuple[int, AxiStream, UsbEndpointMeta, AxiSFifoCopy, HandshakedFifo]]):
if not tx_channels:
assert not self.usb_core_io.HAS_TX
return
for (_, ep_tx, _, tx_fifo, tx_packet_buffered_fifo) in tx_channels:
if tx_fifo is None:
continue
tx_fifo.dataIn(ep_tx, exclude=[ep_tx.ready, ep_tx.valid])
StreamNode(
[ep_tx],
[tx_fifo.dataIn, tx_packet_buffered_fifo.dataIn],
extraConds={
tx_packet_buffered_fifo.dataIn: tx_fifo.dataIn.last,
},
skipWhen={
tx_packet_buffered_fifo.dataIn: ep_tx.valid & ~ep_tx.last,
}
).sync()
tx_fifo.dataOut_copy_frame(0) # [todo]
usb_endp = self.usb_core_io.endp
tx = self.usb_core_io.tx
Switch(usb_endp.data).add_cases(
(i, tx(self.ep[i].tx if tx_fifo is None else tx_fifo.dataOut, exclude=[tx.ready, tx.valid])
) for (i, _, _, tx_fifo, _) in tx_channels
).Default(
tx.data(None),
tx.keep(None),
tx.last(None),
)
sn = StreamNode(
[], # added later
[tx],
)
for (i, _, _, tx_fifo, tx_packet_buffered_fifo) in tx_channels:
if tx_fifo is None:
_tx = self.ep[i].tx
else:
_tx = tx_fifo.dataOut
pbf = tx_packet_buffered_fifo.dataOut
sn.masters.append(pbf)
sn.extraConds[pbf] = usb_endp.data._eq(i) & _tx.last
sn.skipWhen[pbf] = usb_endp.data != i
sn.extraConds[_tx] = usb_endp.data._eq(i)
sn.skipWhen[_tx] = usb_endp.data != i
sn.masters.append(_tx)
sn.sync(usb_endp.vld)
# stall if there is no such a endpoint
self.usb_core_io.tx_stall(~In(usb_endp.data, [i for (i, _, _, _, _) in tx_channels]))
def _impl(self):
rx_channels: List[Tuple[int, AxiStream, UsbEndpointMeta, AxiSFifoDrop]] = []
tx_channels: List[Tuple[int, AxiStream, UsbEndpointMeta, AxiSFifoCopy, HandshakedFifo]] = []
rx_fifos = HObjList()
tx_fifos = HObjList()
tx_packet_buffered_fifos = HObjList()
for i, (ep, (rx_conf, tx_conf)) in enumerate(zip(self.ep, self.ENDPOINT_META)):
ep: Optional[AxiStreamFullDuplex]
rx_conf: UsbEndpointMeta
tx_conf: UsbEndpointMeta
if ep is None:
continue
if ep.HAS_RX:
if rx_conf.buffer_size > 0:
rx_fifo = AxiSFifoDrop()
rx_fifo.USE_KEEP = True
rx_fifo.DATA_WIDTH = self.DATA_WIDTH
rx_fifo.DEPTH = rx_conf.buffer_size
else:
rx_fifo = None
rx_fifos.append(rx_fifo)
rx_channels.append((i, ep.rx, rx_conf, rx_fifo))
if ep.HAS_TX:
if tx_conf.buffer_size > 0:
# if there is some buffer we also need to asssert that
# we are sending only fully fubbered packet
tx_packet_buffered_fifo = HandshakedFifo(HandshakeSync)
tx_packet_buffered_fifo.DEPTH = ceil(tx_conf.buffer_size / tx_conf.max_packet_size) + 2
tx_fifo = AxiSFifoCopy()
tx_fifo.USE_KEEP = True
tx_fifo.DATA_WIDTH = self.DATA_WIDTH
tx_fifo.DEPTH = tx_conf.buffer_size
else:
tx_packet_buffered_fifo = None
tx_fifo = None
tx_packet_buffered_fifos.append(tx_packet_buffered_fifo)
tx_fifos.append(tx_fifo)
tx_channels.append((i, ep.tx, tx_conf, tx_fifo, tx_packet_buffered_fifo))
self.rx_fifos = rx_fifos
self.tx_fifos = tx_fifos
self.tx_packet_buffered_fifos = tx_packet_buffered_fifos
self.connect_rx_part(rx_channels)
self.connect_tx_part(tx_channels)
propagateClkRstn(self)
[docs]def _example_UsbDeviceEpBuffers():
from hwtLib.peripheral.usb.descriptors.cdc import get_default_usb_cdc_vcp_descriptors
u = UsbDeviceEpBuffers()
u.ENDPOINT_META = get_default_usb_cdc_vcp_descriptors().get_endpoint_meta()
u.ENDPOINT_META[0][0].buffer_size = 0
u.ENDPOINT_META[0][1].buffer_size = 0
return u
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
u = _example_UsbDeviceEpBuffers()
print(to_rtl_str(u))