#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.struct import HStruct
from hwt.hdl.value import HValue
from hwt.interfaces.std import Signal
from hwt.interfaces.structIntf import StructIntf
from hwt.interfaces.utils import addClkRstn
from hwt.synthesizer.param import Param
from hwt.synthesizer.rtlLevel.mainBases import RtlMemoryBase
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axis import AxiStream
from hwtLib.amba.axis_comp.builder import AxiSBuilder
from hwtLib.types.net.ethernet import Eth2Header_t
from hwtLib.types.net.icmp import ICMP_echo_header_t, ICMP_TYPE
from hwtLib.types.net.ip import IPv4Header_t, ipv4_t
echoFrame_t = HStruct(
(Eth2Header_t, "eth"),
(IPv4Header_t, "ip"),
(ICMP_echo_header_t, "icmp"),
)
# https://github.com/hamsternz/FPGA_Webserver/tree/master/hdl/icmp
[docs]class PingResponder(Unit):
"""
Listen for echo request on rx axi stream interface and respond
with echo response on tx interface
:note: incoming checksum is not checked
:attention: you have to ping "ping -s 0 <ip>" because unit ignores
additional data in packet and linux by defaults adds it
.. hwt-autodoc::
"""
def _config(self):
self.DATA_WIDTH = Param(32)
self.IS_BIGENDIAN = Param(True)
self.USE_STRB = Param(True)
def _declr(self):
addClkRstn(self)
self.myIp = Signal(dtype=ipv4_t)
with self._paramsShared(exclude=({"USE_STRB"}, set())):
self.rx = AxiStream()
with self._paramsShared():
self.tx = AxiStream()._m()
[docs] def req_load(self, parsed, regs, freeze):
"""
Load request from parser input into registers
:param parsed: input interface with parsed fields of ICPM frame
:param regs: registers for ICMP frame
:param freeze: signal to freeze value in registers
:param def_val: dictionary item from regs: default value
:attention: dst and src are swapped
"""
dtype = regs._dtype
for f in dtype.fields:
name = f.name
In = getattr(parsed, name)
reg = getattr(regs, name)
if isinstance(In, StructIntf):
self.req_load(In, reg, freeze)
elif isinstance(reg, HValue) or \
(isinstance(reg, Signal) and not isinstance(reg._sig, RtlMemoryBase)):
# we have an exact value to use, ignore this intput
In.rd(1)
continue
else:
If(In.vld & ~freeze,
reg(In.data)
)
In.rd(~freeze)
[docs] def connect_resp(self, resp, deparserIn, sendingReply):
"""
Connect response data on inputs of frame deparser
:param resp: registers with response data
:param deparserIn: input interface of frame deparser
:param sendingRepply: flag which signalizes that data
should be deparsed into frame and send
"""
t = resp._dtype
if isinstance(t, Bits):
deparserIn.data(resp)
deparserIn.vld(sendingReply)
else:
for f in t.fields:
name = f.name
In = getattr(resp, name)
# swap dst and src
if name == "src":
name = "dst"
elif name == "dst":
name = "src"
Out = getattr(deparserIn, name)
self.connect_resp(In, Out, sendingReply)
[docs] def icmp_checksum(self, header):
"""
:note: we do not need to care about endianity because parser/deparser
will swap it for us and we can work with little endians only
:return: checksum for icmp header
"""
# type, code, checksum = 0
return ~(header.identifier +
header.seqNo
)
def _impl(self):
# tmp registers
sendingReply = self._reg("sendingReply", def_val=0)
resp = self._reg("resp", echoFrame_t)
isEchoReq = self._reg("isEchoReq", def_val=0)
# set fields of reply
resp.icmp.type._sig = resp.icmp.type._dtype.from_py(ICMP_TYPE.ECHO_REPLY)
resp.icmp.code._sig = resp.icmp.code._dtype.from_py(0)
resp.icmp.checksum._sig = self.icmp_checksum(resp.icmp)
# parse input frame
parsed = AxiSBuilder(self, self.rx).parse(echoFrame_t)
self.req_load(parsed, resp, sendingReply)
t = parsed.icmp.type
If(t.vld,
isEchoReq(t.data._eq(ICMP_TYPE.ECHO_REQUEST))
)
def setup_frame_deparser(out):
out.DATA_WIDTH = self.DATA_WIDTH
out.IS_BIGENDIAN = self.IS_BIGENDIAN
# create output frame
txBuilder, deparserIn = AxiSBuilder.deparse(self,
echoFrame_t,
AxiStream,
setup_frame_deparser)
self.connect_resp(resp, deparserIn, sendingReply)
tx = txBuilder.end
self.tx(tx)
# update state flags
If(self.rx.last & self.rx.valid,
sendingReply(self.myIp._eq(resp.ip.dst) & isEchoReq)
).Elif(tx.valid & tx.last,
sendingReply(0)
)
if __name__ == "__main__": # alias python main function
from hwt.synthesizer.utils import to_rtl_str
u = PingResponder()
print(to_rtl_str(u))