Source code for hwtLib.examples.axi.oooOp.testUtils

from io import StringIO
from typing import Union

from hwt.hdl.types.array import HArray
from hwt.hdl.types.bitsVal import BitsVal
from hwt.hdl.types.struct import HStruct
from hwt.simulator.simTestCase import SimTestCase
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwtLib.amba.axi_comp.oooOp.utils import OOOOpPipelineStage
from hwtLib.examples.axi.oooOp.counterHashTable import OooOpExampleCounterHashTable
from hwtSimApi.basic_hdl_simulator.model import BasicRtlSimModel
from hwtSimApi.constants import CLK_PERIOD
from hwtSimApi.triggers import Edge, WaitCombStable
from pyMathBitPrecise.bit_utils import ValidityError


[docs]def OutOfOrderCummulativeOp_dump_pipeline(tc: SimTestCase, u: OooOpExampleCounterHashTable, model: BasicRtlSimModel, states:list): m = model.io clk = u.clk if clk._sigInside is None: clk = clk._sig else: clk = clk._sigInside clk = getattr(m, clk.name) def int_or_none(v): try: return int(v) except ValidityError: return None def read_data(sig: Union[RtlSignal, Interface]): """ read data from simulation """ if isinstance(sig, Interface): if sig._interfaces: return tuple(read_data(i) for i in sig._interfaces) else: sig = sig._sig return int_or_none(getattr(m, sig.name).read()) has_operation = hasattr(u, "OPERATION") has_trans_data = u.TRANSACTION_STATE_T is not None has_composite_data = isinstance(u.MAIN_STATE_T, (HStruct, HArray)) ops_without_match = [] if has_operation: for n in ("SWAP_BY_ADDR", "READ_BY_ADDR"): o = getattr(u.OPERATION, n, None) if o is not None: ops_without_match.append(o) while True: yield Edge(clk) yield WaitCombStable() if clk.read(): clk_i = tc.hdl_simulator.now // CLK_PERIOD cur_state = [] for st in u.pipeline: st: OOOOpPipelineStage vld = read_data(st.valid) if vld: if st.index == 0: addr = -1 op = -1 if has_operation else None else: addr = read_data(st.addr) assert addr is not None trans_state_present = st.index > 0 and st.index <= u.PIPELINE_CONFIG.WAIT_FOR_WRITE_ACK if has_operation: if trans_state_present: op = read_data(st.transaction_state.operation) assert op is not None else: op = None if st.index >= u.PIPELINE_CONFIG.STATE_LOAD and st.index < u.PIPELINE_CONFIG.WRITE_BACK: if has_trans_data and op not in ops_without_match: key_match = [] for i, km in enumerate(st.key_matches): if not isinstance(km, BitsVal): km = read_data(km) assert km is not None, (op, st, i) if km: key_match.append(i) else: key_match = None collision = None for i, cd in enumerate(st.collision_detect): if not isinstance(cd, int): cd = read_data(cd) assert cd is not None if cd: collision = i # assert read_data(u.pipeline[i].valid), (clk_i, # u.pipeline[i].valid.name, # "not valid and we expecting collision with it") break else: key_match = None collision = None if st.index >= u.PIPELINE_CONFIG.STATE_LOAD and st.index <= u.PIPELINE_CONFIG.WRITE_BACK: wr_forward = [] if st.index == u.PIPELINE_CONFIG.WRITE_BACK: src_st = u.pipeline[u.PIPELINE_CONFIG.WRITE_BACK] if getattr(m, f"write_forwarding_en_{st.index}from{src_st.index}").read(): wr_forward.append(src_st.index) else: for src_st in u.pipeline[u.PIPELINE_CONFIG.WRITE_BACK:]: if getattr(m, f"write_forwarding_en_{st.index}from{src_st.index}").read(): wr_forward.append(src_st.index) else: wr_forward = None if has_trans_data: if trans_state_present: orig_d = st.transaction_state.original_data t_item_valid = read_data(orig_d.item_valid) t_key = read_data(orig_d.key) t_data = read_data(orig_d.value) else: t_item_valid = None t_key = None t_data = None t_data = (t_item_valid, t_key, t_data) else: t_data = None if has_composite_data: item_vld = read_data(st.data.item_valid) # assert item_vld is not None if item_vld: key = read_data(st.data.key) data = read_data(st.data.value) else: key = None data = None data = (item_vld, key, data) else: data = read_data(st.data) state_data = (op, addr, t_data, data, collision, wr_forward, key_match) else: state_data = None cur_state.append(state_data) if not states or states[-1][1] != cur_state: states.append((clk_i, cur_state)) # print(f"clk {clk_i}: {cur_state}") for st_data in cur_state[u.PIPELINE_CONFIG.STATE_LOAD:u.PIPELINE_CONFIG.WRITE_BACK]: if st_data is not None: (_, addr, _, data, collision, wr_forward, _) = st_data if has_composite_data: (item_vld, key, data) = data else: item_vld = 1 key = None # data = data for st in u.pipeline[u.PIPELINE_CONFIG.WRITE_BACK:(len(u.pipeline) if collision is None else collision)]: if st.index == collision: assert read_data(st.addr) and read_data(st.addr) == addr, (clk_i, "collision prediction was invalid", st.index, read_data(st.valid), read_data(st.addr), addr ) else: if read_data(st.valid): assert read_data(st.addr) != addr, (clk_i, "collision prediction missed item", st.index, read_data(st.addr))
[docs]def OutOfOrderCummulativeOp_dump_pipeline_html(file: StringIO, u: OooOpExampleCounterHashTable, states: list): rows = [] st_names = {getattr(u.PIPELINE_CONFIG, n): n for n in [ "READ_DATA_RECEIVE", "STATE_LOAD", "WRITE_BACK", "WAIT_FOR_WRITE_ACK"]} if hasattr(u, "OPERATION"): operation_names = {} for attr in dir(u.OPERATION): v = getattr(u.OPERATION, attr) if isinstance(v, int): assert v not in operation_names, (attr, v, operation_names) operation_names[v] = attr else: operation_names = None has_trans_data = u.TRANSACTION_STATE_T is not None has_composite_data = isinstance(u.MAIN_STATE_T, (HStruct, HArray)) for clk_i, total_st in states: if not rows: # construct header state_cells = [] for i in range(len(total_st)): st_name = st_names.get(i, None) if st_name is None: state_cells.append(f"<th>{i:d}</th>") else: state_cells.append(f"<th>{i:d} {st_name:s}</th>") state_cells = "".join(state_cells) row = f"<tr><th></th>{state_cells:s}</tr>" rows.append(row) state_cells = [] for st_i, st in enumerate(total_st): if st is None: cell = f"<td></td>" else: (op, addr, t, data, collision, wr_forward, key_match) = st if st_i == 0 or st_i > u.PIPELINE_CONFIG.WAIT_FOR_WRITE_ACK: op = "" # operation is not present in these stages t = None else: if operation_names is None: assert op is None else: op = operation_names.get(op, "INVALID") if has_trans_data: (t_item_valid, t_key, t_data) = t if t_item_valid is None: t = "" else: t = repr((t_item_valid, t_key, t_data)) else: t = None if has_composite_data: (item_vld, key, data) = data if item_vld: d = repr((item_vld, key, data)) elif item_vld is None: d = "INVALID" else: d = "" else: d = repr(data) cell_lines = [] if st_i > 0: if operation_names is not None: cell_lines.append(f"0x{addr:x} {op:s}<br/>") else: cell_lines.append(f"0x{addr:x}<br/>") if t is not None: cell_lines.append(f"t:{t}<br/>") cell_lines.append(f"d:{d}<br/>") if collision is not None: cell_lines.append(f"collision:{collision}<br/>") if wr_forward is not None: cell_lines.append(f"wr_forward:{wr_forward}<br/>") if key_match is not None: cell_lines.append(f"t_key_match:{key_match}<br/>") cell_lines = "".join(cell_lines) if st_i >= u.PIPELINE_CONFIG.WRITE_BACK: style = ' style="background-color:LightGreen;"' else: style = '' cell = f"<td{style:s}>{cell_lines:s}</td>" state_cells.append(cell) state_cells = "".join(state_cells) row = f"<tr><td>{clk_i}</td>{state_cells:s}</tr>" rows.append(row) rows = "\n".join(rows) file.write(f"<!DOCTYPE html><html><head><meta charset='utf-8'/></head><body><table border='1'>\n{rows}\n</table></body></html>\n")