Source code for hwtLib.amba.axi_comp.tester

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from typing import Optional

from hwt.code import FsmBuilder, If, Concat
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.defs import BIT
from hwt.hdl.types.enum import HEnum
from hwt.hdl.types.struct import HStruct
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.synthesizer.param import Param
from hwt.synthesizer.unit import Unit
from hwtLib.amba.axi3 import Axi3
from hwtLib.amba.axi4 import Axi4
from hwtLib.amba.axi4Lite import Axi4Lite
from hwtLib.amba.axiLite_comp.endpoint import AxiLiteEndpoint


SEND_AW, SEND_W, RECV_B, SEND_AR, RECV_R = range(1, 6)


[docs]class AxiTester(Unit): """ Tester for AXI3/4 interfaces Can precisely control order and timing of read address/write address/read/write/write response transactions Allows to read and specify values of controls signals like cache/lock/burst etc... .. hwt-autodoc:: """
[docs] def __init__(self, axiCls=Axi4, cntrlCls=Axi4Lite, hdl_name_override:Optional[str]=None): self._axiCls = axiCls self._cntrlCls = cntrlCls super(AxiTester, self).__init__(hdl_name_override=hdl_name_override)
def _config(self): self._axiCls._config(self) self.CNTRL_DATA_WIDTH = Param(32) self.CNTRL_ADDR_WIDTH = Param(32) def _declr(self): addClkRstn(self) with self._paramsShared(): self.m_axi = self._axiCls()._m() with self._paramsShared(prefix="CNTRL_"): self.cntrl = self._cntrlCls() self._add_ep()
[docs] def _add_ep(self): strb_w = self.DATA_WIDTH // 8 # [TODO] hotfix, should be self.DATA_WIDTH DATA_FIELD_W = self.CNTRL_DATA_WIDTH LEN_WIDTH = self.m_axi.LEN_WIDTH LOCK_WIDTH = self.m_axi.LOCK_WIDTH mem_space = HStruct( (Bits(32), "id_reg"), # 0x00 (Bits(32), "cmd_and_status"), (Bits(self.ADDR_WIDTH), "addr"), # a or w id (Bits(self.ID_WIDTH), "ar_aw_w_id"), (Bits(32 - int(self.ID_WIDTH)), None), (Bits(2), "burst"), # 0x10 (Bits(32 - 2), None), (Bits(4), "cache"), (Bits(32 - 4), None), (Bits(LEN_WIDTH), "len"), (Bits(32 - LEN_WIDTH), None), (Bits(LOCK_WIDTH), "lock"), (Bits(32 - LOCK_WIDTH), None), (Bits(3), "prot"), # 0x20 (Bits(32 - 3), None), (Bits(3), "size"), (Bits(32 - 3), None), (Bits(4), "qos"), (Bits(32 - 4), None), (Bits(self.ID_WIDTH), "r_id"), (Bits(32 - int(self.ID_WIDTH)), None), (Bits(DATA_FIELD_W), "r_data"), # 0x30 (Bits(2), "r_resp"), (Bits(32 - 2), None), (BIT, "r_last"), (Bits(32 - 1), None), (Bits(self.ID_WIDTH), "b_id"), (Bits(32 - int(self.ID_WIDTH)), None), (Bits(2), "b_resp"), # 0x40 (Bits(32 - 2), None), (Bits(DATA_FIELD_W), "w_data"), (BIT, "w_last"), (Bits(32 - 1), None), (Bits(strb_w), "w_strb"), (Bits(32 - int(strb_w)), None), # 0x50 (Bits(5*2), "hs"), (Bits(5*2 - 1), None), ) ep = self.axi_ep = AxiLiteEndpoint(mem_space, intfCls=self._cntrlCls) ep.DATA_WIDTH = self.CNTRL_DATA_WIDTH ep.ADDR_WIDTH = self.CNTRL_ADDR_WIDTH
def _impl(self): propagateClkRstn(self) self.axi_ep.bus(self.cntrl) ep = self.axi_ep.decoded id_reg_val = int.from_bytes("test".encode(), byteorder="little") ep.id_reg.din(id_reg_val) def connected_reg(name, input_=None, inputEn=None, fit=False): if input_ is not None: assert inputEn is not None port = getattr(ep, name) reg = self._reg(name, port.din._dtype) e = If(port.dout.vld, reg(port.dout.data, fit=fit) ) if input_ is not None: e.Elif(inputEn, reg(input_, fit=fit) ) port.din(reg) return reg cmdIn = ep.cmd_and_status.dout cmd = self._reg("reg_cmd", cmdIn.data._dtype, def_val=0) cmdVld = self._reg("reg_cmd_vld", def_val=0) If(cmdIn.vld, cmd(cmdIn.data, fit=True) ) partDone = self._sig("partDone") If(partDone, cmdVld(0) ).Elif(cmdIn.vld, cmdVld(1) ) def cmd_en(cmd_code): return cmdVld & cmd._eq(cmd_code) state_t = HEnum("state_t", ["ready", "wait_ar", "wait_aw", "wait_w", "wait_b", "wait_r"]) axi = self.m_axi st = FsmBuilder(self, state_t)\ .Trans(state_t.ready, (cmd_en(SEND_AW), state_t.wait_aw), (cmd_en(SEND_AR), state_t.wait_ar), (cmd_en(SEND_W), state_t.wait_w), (cmd_en(RECV_R), state_t.wait_r), (cmd_en(RECV_B), state_t.wait_b), ).Trans(state_t.wait_aw, (axi.aw.ready, state_t.ready) ).Trans(state_t.wait_ar, (axi.ar.ready, state_t.ready) ).Trans(state_t.wait_w, (axi.w.ready, state_t.ready) ).Trans(state_t.wait_r, (axi.r.valid, state_t.ready) ).Trans(state_t.wait_b, (axi.b.valid, state_t.ready) ).stateReg partDone((st._eq(state_t.wait_aw) & axi.aw.ready) | (st._eq(state_t.wait_ar) & axi.ar.ready) | (st._eq(state_t.wait_w) & axi.w.ready) | (st._eq(state_t.wait_r) & axi.r.valid) | (st._eq(state_t.wait_b) & axi.b.valid)) ready = st._eq(state_t.ready) tmp = self._sig("tmp") tmp(ready & ~ep.cmd_and_status.dout.vld) ep.cmd_and_status.din(tmp, fit=True) a_w_id = connected_reg("ar_aw_w_id") addr = connected_reg("addr") burst = connected_reg("burst") cache = connected_reg("cache") len_ = connected_reg("len") lock = connected_reg("lock") prot = connected_reg("prot") size = connected_reg("size") qos = connected_reg("qos") # aw/ar for p in [axi.aw, axi.ar]: p.id(a_w_id) p.addr(addr) p.burst(burst) p.cache(cache) p.len(len_) p.lock(lock) p.prot(prot) p.size(size) if hasattr(p, "qos"): p.qos(qos) aw_vld = st._eq(state_t.wait_aw) axi.aw.valid(aw_vld) ar_vld = st._eq(state_t.wait_ar)._reinterpret_cast(BIT) axi.ar.valid(ar_vld) r = axi.r r_rd = st._eq(state_t.wait_r) r_en = r.valid & r_rd connected_reg("r_id", r.id, r_en) connected_reg("r_data", r.data, r_en, fit=True) connected_reg("r_resp", r.resp, r_en) connected_reg("r_last", r.last, r_en) r.ready(r_rd) w = axi.w w_data = connected_reg("w_data", fit=True) w_last = connected_reg("w_last") w_strb = connected_reg("w_strb") if hasattr(w, "id"): w.id(a_w_id) w.data(w_data, fit=True) w.strb(w_strb) w.last(w_last) w_vld = st._eq(state_t.wait_w) w.valid(w_vld) b = axi.b b_rd = st._eq(state_t.wait_b) connected_reg("b_id", b.id, b.valid & b_rd) connected_reg("b_resp") b.ready(b_rd) ep.hs.din(Concat(ar_vld, axi.ar.ready, aw_vld, axi.aw.ready, r.valid, r_rd, b.valid, b_rd, w_vld, w.ready))
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str #from hwt.serializer.ip_packager import IpPackager #from os.path import expanduser u = AxiTester(Axi3) print(to_rtl_str(u)) #p = IpPackager(u) #p.createPackage(expanduser("~"))