Source code for hwtLib.peripheral.usb.usb2.sie_rx

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from math import inf

from hwt.code import If, Concat
from hwt.code_utils import rename_signal
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.stream import HStream
from hwt.hdl.types.struct import HStruct
from hwt.interfaces.std import Signal, Handshaked
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axis import AxiStream
from hwtLib.amba.axis_comp.builder import AxiSBuilder
from hwtLib.amba.axis_comp.frame_parser import AxiS_frameParser
from hwtLib.handshaked.streamNode import StreamNode
from hwtLib.logic.crc import Crc
from hwtLib.logic.crcComb import CrcComb
from hwtLib.logic.crcPoly import CRC_5_USB, CRC_16_USB
from hwtLib.peripheral.usb.constants import usb_addr_t, usb_pid_t, usb_endp_t, \
    USB_PID, usb_crc5_t
from hwtLib.peripheral.usb.usb2.sie_interfaces import Usb2SieRxOut, \
    DataErrVldKeepLast
from hwtLib.peripheral.usb.usb2.utmi import Utmi_8b_rx
from pyMathBitPrecise.bit_utils import mask


[docs]class Usb2SieDeviceRx(Unit): """ UTMI rx (host->device) packet parser and CRC checker and cutter, (SIE stands for serial interface engine) :note: based on https://github.com/ultraembedded/core_usb_cdc .. hwt-autodoc:: """ def _declr(self): addClkRstn(self) self.enable = Signal() self.rx = Utmi_8b_rx() self.current_usb_addr = Signal(usb_addr_t) self.rx_header: Usb2SieRxOut = Usb2SieRxOut()._m() self.rx_data: DataErrVldKeepLast = DataErrVldKeepLast()._m() self.rx_data.DATA_WIDTH = 8
[docs] def _Utmi_8b_rx_to_DataErrVldStrbLast(self, rx: Utmi_8b_rx): rx_tmp = self._reg( "rx_tmp", HStruct( (rx.data._dtype, "data"), (BIT, "error"), (BIT, "valid"), (BIT, "active"), )[2], def_val=[ { "active": 0, "valid": 0 }, { "active": 0, "valid": 0 } ] ) # we need to mark last valid data with last flag # If there was no data (and this is an error where even PID was not received sucessfuly) # we mark with the last the last invalid data before active went to 0 rx_tmp[0].active(rx.active) If(rx.active, If(rx.valid, rx_tmp[0].data(rx.data), rx_tmp[0].valid(1), rx_tmp[0].error(rx.error), ) ).Else( rx_tmp[0].data(None), rx_tmp[0].valid(0), rx_tmp[0].error(0), ) rx_tmp[1](rx_tmp[0]) rx_data_tmp = DataErrVldKeepLast() rx_data_tmp.DATA_WIDTH = rx.DATA_WIDTH rx_data_tmp.USE_KEEP = False self.rx_data_tmp = rx_data_tmp # :note: beat with keep=0 can apear only for zero lenght packet in the form of single workd with keep=0 last=1 rx_data_tmp.data(rx_tmp[1].data) rx_data_tmp.error(rx_tmp[1].error) rx_data_tmp.last(~rx_tmp[0].active) # :note: delay of the last valid word so we can mark it with "last" flag rx_data_tmp.vld((rx_tmp[1].valid & rx_tmp[0].active & rx_tmp[0].valid) | (rx_tmp[1].valid & rx_tmp[1].active & ~rx_tmp[0].active)) return rx_data_tmp
def _impl(self): rx = self._Utmi_8b_rx_to_DataErrVldStrbLast(self.rx) rx_ending = rename_signal(self, rx.vld & rx.last, "rx_ending") parser_pid = AxiS_frameParser( HStruct( (Bits(8), "pid"), (HStream(Bits(8), frame_len=(0, inf)), "data"), ) ) parser_token = AxiS_frameParser( HStruct( (usb_addr_t, "addr"), (usb_endp_t, "endp"), (usb_crc5_t, "crc5"), ) ) parser_payload = AxiS_frameParser( HStruct( (HStream(Bits(8), frame_len=[0, 1024]), "payload"), (Bits(16), "crc16"), ) ) for parser in [parser_pid, parser_token, parser_payload]: parser.DATA_WIDTH = 8 parser.UNDERFLOW_SUPPORT = True parser_pid.USE_KEEP = True parser_payload.USE_KEEP = True parser_token.OVERFLOW_SUPPORT = True self.parser_pid = parser_pid self.parser_token = parser_token self.parser_payload = parser_payload for parser in [parser_pid, parser_token, parser_payload]: # because input and output is just VldSynced and we can not stall the data for i in parser.dataOut._interfaces: if isinstance(i, Handshaked): i.rd(1) elif isinstance(i, AxiStream): pass else: raise NotImplementedError(i) parser_pid.dataIn(rx, exclude=[parser_pid.dataIn.ready, parser_pid.dataIn.valid, parser_pid.dataIn.keep, rx.vld]) parser_pid.dataIn.valid(rx.vld) parser_pid.dataIn.keep(mask(parser_pid.dataIn.keep._dtype.bit_length())) token_q = self._reg("token_q", HStruct( (usb_pid_t, "pid"), (BIT, "pid_loaded"), (BIT, "err_pid"), (usb_addr_t, "addr"), (usb_endp_t, "endp"), (usb_crc5_t, "crc5"), (BIT, "crc5_loaded"), (BIT, "err_len"), (BIT, "valid"), # mark that content of this register should be moved to output, it may contain some error set ), def_val={ "pid_loaded": 0, "err_pid": 0, "crc5_loaded": 0, "err_len": 0, "valid": 0, } ) pid: Handshaked = parser_pid.dataOut.pid # :attention: there are also SPLIT related PID values which are not supported pid_has_0b_data = rename_signal( self, pid.vld & USB_PID.is_hs(pid.data[4:]), "pid_has_0b_data") pid_has_16b_data = rename_signal( self, token_q.pid_loaded & (USB_PID.is_token(token_q.pid) | token_q.pid._eq(USB_PID.PING)), "pid_has_16b_data") pid_has_payload = rename_signal( self, token_q.pid_loaded & USB_PID.is_data(token_q.pid), "pid_has_payload") # packet with just pid contains something which should not err_packet_len_pid = rename_signal( self, pid_has_0b_data & ~rx_ending, "err_packet_len_pid") rx_data_ending = rename_signal( self, parser_pid.dataOut.data.valid & parser_pid.dataOut.data.last, "rx_data_ending") # token packet bytes missing or there are some extra err_packet_len_token = rename_signal( self, pid_has_16b_data & (rx_data_ending != parser_token.dataOut.crc5.vld), "err_packet_len_token") after_pid: AxiStream = parser_pid.dataOut.data StreamNode( [after_pid], [parser_token.dataIn, parser_payload.dataIn], extraConds={ parser_token.dataIn: pid_has_16b_data, parser_payload.dataIn: pid_has_payload, }, skipWhen={ parser_token.dataIn:~pid_has_16b_data, parser_payload.dataIn:~pid_has_payload, } ).sync() parser_token.dataIn(after_pid, exclude=[after_pid.ready, after_pid.valid]) parser_payload.dataIn(after_pid, exclude=[after_pid.ready, after_pid.valid]) # [todo] 1 word out to mark 0 len packets If(self.enable & pid.vld, token_q.pid(pid.data[4:]), token_q.pid_loaded(1), token_q.err_pid((pid.data[4:] != ~pid.data[:4])), token_q.err_len(err_packet_len_pid), token_q.valid(pid_has_0b_data | (pid.vld & USB_PID.is_data(pid.data[4:])) | err_packet_len_pid), ).Elif(self.enable & pid_has_16b_data & parser_token.dataOut.crc5.vld & ~token_q.err_len & ~token_q.err_pid, token_q.crc5_loaded(pid_has_16b_data), token_q.err_len(err_packet_len_token), token_q.valid(1), # the data are sent using a different channel, we are sending header now ).Elif((token_q.valid & (token_q.err_len | ~pid_has_payload)) | (pid_has_payload & rx_ending) | (token_q.err_pid & rx_ending) | ~self.enable, token_q.pid(None), token_q.err_pid(0), token_q.pid_loaded(0), token_q.crc5_loaded(0), token_q.err_len(0), token_q.valid(0), ).Elif(token_q.valid & pid_has_payload, token_q.valid(0), ) for token_field_name in ["addr", "endp", "crc5"]: p_intf = getattr(parser_token.dataOut, token_field_name) If(p_intf.vld & pid_has_16b_data, getattr(token_q, token_field_name)(p_intf.data) ) # crc for token headers crc5 = CrcComb() crc5.DATA_WIDTH = usb_addr_t.bit_length() + usb_endp_t.bit_length() crc5.setConfig(CRC_5_USB) crc5.REFOUT = False # because the crc5 already commes reversed self.crc5 = crc5 crc5.dataIn(Concat(token_q.endp, token_q.addr)) err_addr = rename_signal(self, token_q.crc5_loaded & (self.current_usb_addr != token_q.addr), "err_addr") err_token_crc = rename_signal(self, token_q.crc5_loaded & (crc5.dataOut != token_q.crc5), "err_token_crc") rx_header = self.rx_header # valid for 1 clk period after word with crc5 or input rx data stream ended prematurely rx_header.vld(token_q.valid) rx_header.error(err_addr | err_token_crc | token_q.err_len) rx_header.pid(token_q.pid) rx_header.endp(token_q.endp) rx_header.frame_number(token_q.pid._eq(USB_PID.TOKEN_SOF)._ternary(Concat(token_q.endp, token_q.addr), rx_header.frame_number._dtype.from_py(None))) # crc for payload data crc16 = Crc() crc16.LATENCY = 1 crc16.setConfig(CRC_16_USB) crc16.DATA_WIDTH = 8 self.crc16 = crc16 # restart or dissabled or end of the frame payload: AxiStream = parser_payload.dataOut.payload crc16.dataIn.data(payload.data) crc16.dataIn.vld(payload.valid & payload.keep) # buffer to assert that the crc error flag is set in last word payload = AxiSBuilder(self, payload).buff(2).end payload_end = payload.ready & payload.valid & payload.last crc16.rst_n(self.rst_n & self.enable & ~payload_end) payload.ready(1) # it would be better to use residue err_data_crc16 = rename_signal( self, parser_payload.dataOut.crc16.vld & (Concat(*(b for b in crc16.dataOut)) != parser_payload.dataOut.crc16.data), "err_data_crc") rx_data = self.rx_data err_data_crc16_delayed = self._reg("err_data_crc16_delayed", BIT[2], def_val=[0, 0]) # for the case where packet len < min size err_data_crc16_delayed[0](err_data_crc16 | parser_payload.error_underflow) err_data_crc16_delayed[1](err_data_crc16_delayed[0] | parser_payload.error_underflow) rx_data.error(err_data_crc16 | err_data_crc16_delayed[0] | err_data_crc16_delayed[1] | (rx.vld & rx.error)) # [todo] this rx error can be of next frame instead of this one rx_data.keep(payload.keep) If(~payload.keep | rx_data.error, rx_data.data(None) ).Else( rx_data.data(payload.data) ) rx_data.vld(payload.valid) rx_data.last(payload.last | rx_data.error) propagateClkRstn(self)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = Usb2SieDeviceRx() print(to_rtl_str(u))