#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If
from hwt.hdl.const import HConst
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.struct import HStruct
from hwt.hwIOs.hwIOStruct import HwIOStruct
from hwt.hwIOs.std import HwIOSignal
from hwt.hwIOs.utils import addClkRstn
from hwt.hwModule import HwModule
from hwt.hwParam import HwParam
from hwt.pyUtils.typingFuture import override
from hwtLib.amba.axi4s import Axi4Stream
from hwtLib.amba.axis_comp.builder import Axi4SBuilder
from hwtLib.types.net.ethernet import Eth2Header_t
from hwtLib.types.net.icmp import ICMP_echo_header_t, ICMP_TYPE
from hwtLib.types.net.ip import IPv4Header_t, ipv4_t
from pyMathBitPrecise.bit_utils import reverse_byte_order
echoFrame_t = HStruct(
(Eth2Header_t, "eth"),
(IPv4Header_t, "ip"),
(ICMP_echo_header_t, "icmp"),
)
# https://github.com/hamsternz/FPGA_Webserver/tree/master/hdl/icmp
[docs]
class Axi4SPingResponder(HwModule):
"""
Listen for echo request on rx axi stream interface and respond
with echo response on tx interface
:note: incoming checksum is not checked
:attention: you have to ping "ping -s 0 <ip>" because unit ignores
additional data in packet and linux by defaults adds it
.. hwt-autodoc::
"""
@override
def hwConfig(self):
self.DATA_WIDTH = HwParam(32)
self.IS_BIGENDIAN = HwParam(True)
self.USE_STRB = HwParam(True)
@override
def hwDeclr(self):
addClkRstn(self)
self.myIp = HwIOSignal(dtype=ipv4_t)
with self._hwParamsShared(exclude=({"USE_STRB"}, set())):
self.rx = Axi4Stream()
with self._hwParamsShared():
self.tx = Axi4Stream()._m()
[docs]
def req_load(self, parsed, regs, freeze):
"""
Load request from parser input into registers
:param parsed: input interface with parsed fields of ICPM frame
:param regs: registers for ICMP frame
:param freeze: signal to freeze value in registers
:param def_val: dictionary item from regs: default value
:attention: dst and src are swapped
"""
dtype = regs._dtype
for f in dtype.fields:
name = f.name
In = getattr(parsed, name)
reg = getattr(regs, name)
if isinstance(In, HwIOStruct):
self.req_load(In, reg, freeze)
elif isinstance(reg, HConst) or \
(isinstance(reg, HwIOSignal) and (isinstance(reg._sig, HConst) or reg._sig._rtlNextSig is None)):
# we have an exact value to use, ignore this input
In.rd(1)
continue
else:
If(In.vld & ~freeze,
reg(In.data)
)
In.rd(~freeze)
[docs]
def connect_resp(self, resp, deparserIn, sendingReply):
"""
Connect response data on inputs of frame deparser
:param resp: registers with response data
:param deparserIn: input interface of frame deparser
:param sendingRepply: flag which signalizes that data
should be deparsed into frame and send
"""
t = resp._dtype
if isinstance(t, HBits):
deparserIn.data(resp)
deparserIn.vld(sendingReply)
else:
for f in t.fields:
name = f.name
In = getattr(resp, name)
# swap dst and src
if name == "src":
name = "dst"
elif name == "dst":
name = "src"
Out = getattr(deparserIn, name)
self.connect_resp(In, Out, sendingReply)
[docs]
def icmp_checksum(self, header):
"""
:note: we do not need to care about endianity because parser/deparser
will swap it for us and we can work with little endians only
:return: checksum for icmp header
"""
# type, code, checksum = 0
return ~(header.identifier +
header.seqNo
)
@override
def hwImpl(self):
# tmp registers
sendingReply = self._reg("sendingReply", def_val=0)
resp = self._reg("resp", echoFrame_t)
isEchoReq = self._reg("isEchoReq", def_val=0)
# set fields of reply
resp.icmp.type._sig = resp.icmp.type._dtype.from_py(ICMP_TYPE.ECHO_REPLY)
resp.icmp.code._sig = resp.icmp.code._dtype.from_py(0)
resp.icmp.checksum._sig = self.icmp_checksum(resp.icmp)
# parse input frame
parsed = Axi4SBuilder(self, self.rx).parse(echoFrame_t)
self.req_load(parsed, resp, sendingReply)
t = parsed.icmp.type
If(t.vld,
isEchoReq(t.data._eq(ICMP_TYPE.ECHO_REQUEST))
)
def setup_frame_deparser(out):
out.DATA_WIDTH = self.DATA_WIDTH
out.IS_BIGENDIAN = self.IS_BIGENDIAN
# create output frame
txBuilder, deparserIn = Axi4SBuilder.deparse(self,
echoFrame_t,
Axi4Stream,
setup_frame_deparser)
# connect computed resp to input of deparser
self.connect_resp(resp, deparserIn, sendingReply)
tx = txBuilder.end
self.tx(tx)
# update state flags
respIpDstBE = reverse_byte_order(resp.ip.dst) # parser/deparser handles the casting to big-endian
If(self.rx.last & self.rx.valid,
sendingReply(self.myIp._eq(respIpDstBE) & isEchoReq)
).Elif(tx.valid & tx.last,
sendingReply(0)
)
if __name__ == "__main__": # alias python main function
from hwt.synth import to_rtl_str
m = Axi4SPingResponder()
print(to_rtl_str(m))