Source code for hwtLib.xilinx.ipif.intf

from collections import deque
from enum import Enum

from hwt.hdl.constants import READ, WRITE, NOP
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import D, Signal, VectSignal
from hwt.simulator.agentBase import SyncAgentBase
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.param import Param
from pyMathBitPrecise.bit_utils import mask
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.triggers import WaitCombStable, WaitCombRead, WaitWriteOnly


# https://www.xilinx.com/support/documentation/ip_documentation/axi_lite_ipif/v2_0/pg155-axi-lite-ipif.pdf
# https://www.xilinx.com/support/documentation/ip_documentation/axi_lite_ipif_ds765.pdf
[docs]class Ipif(Interface): """ IPIF - IP interface is interface which was often used in designs for Xilinx FPGAs around year 2012 * shared address, validity signals, mask, write ack .. hwt-autodoc:: """ READ = 1 WRITE = 0 def _config(self): self.ADDR_WIDTH = Param(32) self.DATA_WIDTH = Param(32) def _declr(self): # chip select self.bus2ip_cs = Signal() # A High level indicates the transfer request is a user IP read. # A Low level indicates the transfer request is a user IP write. self.bus2ip_rnw = Signal() # read /write addr self.bus2ip_addr = VectSignal(self.ADDR_WIDTH) self.bus2ip_data = VectSignal(self.DATA_WIDTH) # byte enable for bus2ip_data self.bus2ip_be = VectSignal(self.DATA_WIDTH // 8) self.ip2bus_data = VectSignal(self.DATA_WIDTH, masterDir=D.IN) # write ack self.ip2bus_wrack = Signal(masterDir=D.IN) # read ack self.ip2bus_rdack = Signal(masterDir=D.IN) self.ip2bus_error = Signal(masterDir=D.IN)
[docs] def _getWordAddrStep(self): """ :return: size of one word in unit of address """ return int(self.DATA_WIDTH) // self._getAddrStep()
[docs] def _getAddrStep(self): """ :return: how many bits is one unit of address (e.g. 8 bits for char * pointer, 36 for 36 bit bram) """ return 8
[docs] def _initSimAgent(self, sim: HdlSimulator): self._ag = IpifAgent(sim, self)
[docs]class IpifWithCE(Ipif): """ .. hwt-autodoc:: """ def _config(self): super(IpifWithCE, self)._config() self.REG_COUNT = Param(1) def _declr(self): super()._declr() ce_t = Bits(self.REG_COUNT) # read chip enable bus self.bus2ip_rdce = Signal(dtype=ce_t) # Write chip enable bus self.bus2ip_wrce = Signal(dtype=ce_t)
[docs]class IpifAgentState(Enum): IDLE = 0 READ = 1 WRITE = 2
[docs]class IpifAgent(SyncAgentBase): """ :ivar ~.requests: list of tuples (READ, address) or (WRITE, address, data, mask) used for driver :ivar ~.r_data: list of read data for driver :ivar ~.mem: if agent is in monitor mode (= is slave) all reads and writes are performed on mem object, index is word index :note: this behavior can be overriden by onRead/onWrite methods :ivar ~.actual: actual request which is performed (in driver mode) """
[docs] def __init__(self, sim: HdlSimulator, intf, allowNoReset=True): super().__init__(sim, intf, allowNoReset=allowNoReset) # for driver only self.requests = deque() self.actual = NOP self.r_data = deque() # for monitor only self.READ_LATENCY = 0 self.WRITE_LATENCY = 0 self.mem = {} self._monitor_st = IpifAgentState.IDLE self._addr = self._rnw = self._be = self._wdata = None self._latencyCntr = 0 self._word_bytes = intf.bus2ip_data._dtype.bit_length() // 8 self.wmaskAll = mask(self._word_bytes) self._requireInit = True
[docs] def onRead(self, addr): if addr % self._word_bytes != 0: raise NotImplementedError("Unaligned read") return self.mem[int(addr) // self._word_bytes]
[docs] def onWrite(self, addr, val, byteen): if addr % self._word_bytes != 0: raise NotImplementedError("Unaligned write") if int(byteen) == self.wmaskAll: self.mem[addr//self._word_bytes] = val else: raise NotImplementedError("Masked write")
[docs] def doReq(self, req): rw = req[0] addr = req[1] if rw == READ: rw = Ipif.READ wdata = None wmask = None elif rw == WRITE: wdata = req[2] rw = Ipif.WRITE wmask = req[3] else: raise ValueError(rw) intf = self.intf intf.bus2ip_cs.write(1) intf.bus2ip_rnw.write(rw) intf.bus2ip_addr.write(addr) intf.bus2ip_data.write(wdata) intf.bus2ip_be.write(wmask)
[docs] def monitor(self): intf = self.intf yield WaitCombRead() en = self.notReset() yield WaitWriteOnly() if self._requireInit or not en: intf.ip2bus_rdack.write(0) intf.ip2bus_wrack.write(0) intf.ip2bus_error.write(None) self._requireInit = False if not en: return yield WaitCombRead() st = self._monitor_st if st == IpifAgentState.IDLE: yield WaitCombRead() cs = intf.bus2ip_cs.read() cs = int(cs) # [TODO] there can be multiple chips # and we react on all chip selects if cs: # print("") # print(sim.now, "cs") self._rnw = bool(intf.bus2ip_rnw.read()) self._addr = int(intf.bus2ip_addr.read()) if self._rnw: st = IpifAgentState.READ else: self._be = int(intf.bus2ip_be.read()) self._wdata = intf.bus2ip_data.read() st = IpifAgentState.WRITE else: yield WaitWriteOnly() # print("") # print(sim.now, "not cs") intf.ip2bus_rdack.write(0) intf.ip2bus_wrack.write(0) intf.ip2bus_error.write(None) intf.ip2bus_data.write(None) doStabilityCheck = False else: doStabilityCheck = True yield WaitWriteOnly() if st == IpifAgentState.READ: intf.ip2bus_wrack.write(0) if self._latencyCntr == self.READ_LATENCY: # print(sim.now, "read-done") self._latencyCntr = 0 d = self.onRead(self._addr) intf.ip2bus_data.write(d) intf.ip2bus_rdack.write(1) intf.ip2bus_error.write(0) st = IpifAgentState.IDLE else: # print(sim.now, "read-wait") intf.ip2bus_data.write(None) self._latencyCntr += 1 elif st == IpifAgentState.WRITE: intf.ip2bus_rdack.write(0) intf.ip2bus_data.write(None) if self._latencyCntr == self.WRITE_LATENCY: # print(sim.now, "write-done") self.onWrite(self._addr, self._wdata, self._be) intf.ip2bus_wrack.write(1) intf.ip2bus_error.write(0) self._latencyCntr = 0 st = IpifAgentState.IDLE else: # print(sim.now, "write-wait") self._latencyCntr += 1 if doStabilityCheck: sim = self.sim yield WaitCombStable() cs = bool(intf.bus2ip_cs.read()) assert cs, (sim.now, "chip select signal has to be stable") rnw = bool(intf.bus2ip_rnw.read()) assert rnw == self._rnw, ( sim.now, "read not write signal has to be stable", rnw, self._rnw) addr = int(intf.bus2ip_addr.read()) assert addr == self._addr, ( sim.now, "address signal has to be stable", addr, self._addr) if st == IpifAgentState.WRITE: be = int(intf.bus2ip_be.read()) assert be == self._be, ( sim.now, "byte enable signal has to be stable", be, self._be) d = intf.bus2ip_data.read() assert (self._wdata.val == d.val and self._wdata.vld_mask == d.vld_mask), ( sim.now, "ip2bus_data signal has to be stable", be, self._be) self._monitor_st = st
[docs] def driver(self): intf = self.intf actual = self.actual actual_next = actual if self._requireInit: yield WaitWriteOnly() intf.bus2ip_cs.write(0) self._requireInit = False yield WaitCombRead() # now we are after clk edge if actual is not NOP: if actual[0] is READ: yield WaitCombRead() rack = intf.ip2bus_rdack.read() rack = int(rack) if rack: d = intf.ip2bus_data.read() if self._debugOutput is not None: name = self.intf._getFullName() self._debugOutput.write(f"{name:s}, on {self.sim.now} read_data: {d.val:d}\n") self.r_data.append(d) actual_next = NOP else: # write in progress wack = int(intf.ip2bus_wrack.read()) if wack: if self._debugOutput is not None: name = self.intf._getFullName() self._debugOutput.write(f"{name:s}, on {self.sim.now} write_ack\n") actual_next = NOP en = self.notReset() if en: if self.actual is NOP: if self.requests: req = self.requests.popleft() if req is not NOP: yield WaitWriteOnly() self.doReq(req) self.actual = req return else: self.actual = actual_next return yield WaitWriteOnly() intf.bus2ip_cs.write(0)