from collections import deque
from hwt.hdl.constants import DIRECTION, READ, WRITE, NOP, READ_WRITE
from hwt.interfaces.agents.handshaked import HandshakedAgent
from hwt.interfaces.agents.vldSynced import VldSyncedAgent
from hwt.interfaces.std import VectSignal, Signal
from hwt.math import log2ceil
from hwt.simulator.agentBase import SyncAgentBase
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.param import Param
from hwtSimApi.hdlSimulator import HdlSimulator
from pyMathBitPrecise.bit_utils import mask
RESP_OKAY = 0b00
# RESP_RESERVED = 0b01
RESP_SLAVEERROR = 0b10
RESP_DECODEERROR = 0b11
[docs]class AvalonMM(Interface):
"""
Avalon Memory Mapped interface
:note: handshaked, shared address and response channel
https://www.intel.com/content/dam/altera-www/global/en_US/pdfs/literature/manual/mnl_avalon_spec.pdf
.. hwt-autodoc::
"""
def _config(self):
self.ADDR_WIDTH = Param(32)
self.DATA_WIDTH = Param(32)
self.MAX_BURST = Param(0)
def _declr(self):
# self.debugAccess = Signal()
IN = DIRECTION.IN
# read/write transaction start
self.read = Signal()
self.write = Signal()
self.address = VectSignal(self.ADDR_WIDTH)
# ready from slave to mark that next request can not be accepted
self.waitRequest = Signal(masterDir=IN)
self.readData = VectSignal(self.DATA_WIDTH, masterDir=IN)
self.readDataValid = Signal(masterDir=IN) # read data valid
self.byteEnable = VectSignal(self.DATA_WIDTH // 8)
self.writeData = VectSignal(self.DATA_WIDTH)
# self.lock = Signal()
self.response = VectSignal(2, masterDir=IN)
self.writeResponseValid = Signal(masterDir=IN)
if self.MAX_BURST != 0:
self.burstCount = VectSignal(log2ceil(self.MAX_BURST))
# self.beginBurstTransfer = Signal()
[docs] def _getWordAddrStep(self):
"""
:return: size of one word in unit of address
"""
return int(self.DATA_WIDTH) // self._getAddrStep()
[docs] def _getAddrStep(self):
"""
:return: how many bits is one unit of address
(e.g. 8 bits for char * pointer, 36 for 36 bit bram)
"""
return 8
[docs] def _initSimAgent(self, sim: HdlSimulator):
self._ag = AvalonMmAgent(sim, self)
[docs]class AvalonMmDataRAgent(VldSyncedAgent):
"""
Simulation/verification agent for data part of AvalomMM interface
* vld signal = readDataValid
* data signal = (readData, response)
"""
[docs] @classmethod
def get_valid_signal(cls, intf):
return intf.readDataValid
[docs] def get_valid(self):
return self._vld.read()
[docs] def set_valid(self, val):
self._vld.write(val)
[docs] def get_data(self):
"""extract data from interface"""
intf = self.intf
return (intf.readData.read(), intf.response.read())
[docs] def set_data(self, data):
"""write data to interface"""
intf = self.intf
if data is None:
intf.readData.write(None)
intf.response.write(None)
else:
readData, response = data
intf.readData.write(readData)
intf.response.write(response)
[docs]class AvalonMmAddrAgent(HandshakedAgent):
"""
data format is tuple (address, byteEnable, READ/WRITE, burstCount)
* two valid signals "read", "write"
* one ready_n signal "waitrequest")
* on write set data and byteenamble as well
"""
[docs] def __init__(self, sim: HdlSimulator, intf, allowNoReset=False):
HandshakedAgent.__init__(self, sim, intf, allowNoReset=allowNoReset)
self.BE_ALL = mask(intf.readData._dtype.bit_length() // 8)
[docs] @classmethod
def get_ready_signal(cls, intf: AvalonMM):
return intf.waitRequest
[docs] def get_ready(self):
rd = self._rd.read()
rd.val = int(not rd.val)
return rd
[docs] def set_ready(self, val: int):
self._rd.write(int(not val))
[docs] @classmethod
def get_valid_signal(cls, intf: AvalonMM):
return (intf.read, intf.write)
[docs] def get_valid(self):
r = self._vld[0].read()
w = self._vld[1].read()
r.val = r.val | w.val
r.vld_mask = r.vld_mask & w.vld_mask
return r
[docs] def set_valid(self, val: int):
if self.actualData is None or self.actualData is NOP:
r = 0
w = 0
else:
mode = self.actualData[0]
if mode is READ:
r = val
w = 0
elif mode is WRITE:
r = 0
w = val
else:
raise ValueError("Unknown mode", mode)
self._vld[0].write(r)
self._vld[1].write(w)
[docs] def get_data(self):
intf = self.intf
address = intf.address.read()
byteEnable = intf.byteEnable.read()
read = intf.read.read()
write = intf.write.read()
wdata = intf.writeData.read()
if intf.MAX_BURST != 0:
burstCount = intf.burstCount.read()
else:
burstCount = 1
if read.val:
if write.val:
rw = READ_WRITE
else:
rw = READ
wdata = None
byteEnable = None
elif write.val:
rw = WRITE
else:
raise AssertionError(
"This function should not be called when data"
"is not ready on interface")
return (rw, address, burstCount, wdata, byteEnable)
[docs] def set_data(self, data):
intf = self.intf
if data is None:
intf.address.write(None)
intf.byteEnable.write(None)
if intf.MAX_BURST != 0:
intf.burstCount.write(None)
intf.writeData.write(None)
intf.read.write(0)
intf.write.write(0)
else:
rw, address, burstCount, d, be = data
if rw is READ:
rd, wr = 1, 0
be = self.BE_ALL
elif rw is WRITE:
rd, wr = 0, 1
intf.writeData.write(d)
else:
raise TypeError(f"rw is in invalid format {rw}")
intf.address.write(address)
intf.byteEnable.write(be)
assert int(burstCount) >= 1, burstCount
if intf.MAX_BURST:
intf.burstCount.write(burstCount)
intf.read.write(rd)
intf.write.write(wr)
[docs]class AvalonMmWRespAgent(VldSyncedAgent):
[docs] @classmethod
def get_valid_signal(cls, intf):
return intf.writeResponseValid
[docs] def get_data(self):
return self.intf.response.read()
[docs] def set_data(self, data):
self.intf.response.write(data)
[docs]class AvalonMmAgent(SyncAgentBase):
"""
Simulation agent for AvalonMM bus interface
:ivar ~.req: request data, items are tuples (READ/WRITE, address, burstCount, writeData, writeMask)
:ivar ~.wResp: write response data
:ivar ~.rData: data read from interface, items are typles (data, response)
"""
[docs] def __init__(self, sim: HdlSimulator, intf, allowNoReset=False):
SyncAgentBase.__init__(self, sim, intf, allowNoReset=allowNoReset)
self.addrAg = AvalonMmAddrAgent(sim, intf, allowNoReset=allowNoReset)
self.rDataAg = AvalonMmDataRAgent(sim, intf, allowNoReset=allowNoReset)
self.wRespAg = AvalonMmWRespAgent(sim, intf, allowNoReset=allowNoReset)
[docs] def req_get(self):
return self.addrAg.data
[docs] def req_set(self, v):
self.addrAg.data = v
req = property(req_get, req_set)
[docs] def wResp_get(self):
return self.wRespAg.data
[docs] def wResp_set(self, v):
self.wRespAg = v
wResp = property(wResp_get, wResp_set)
[docs] def rData_get(self):
return self.rDataAg.data
[docs] def rData_set(self, v):
self.rDataAg.data = v
rData = property(rData_get, rData_set)
[docs] def setEnable_asDriver(self, en: bool):
self._enabled = en
self.addrAg.setEnable(en)
# self.wRespAg.setEnable(en)
[docs] def setEnable_asMonitor(self, en: bool):
self._enabled = en
self.addrAg.setEnable(en)
self.wRespAg.setEnable(en)
self.rData.setEnable(en)
[docs] def getDrivers(self):
self.setEnable = self.setEnable_asDriver
yield from self.rDataAg.getMonitors()
yield from self.addrAg.getDrivers()
yield from self.wRespAg.getMonitors()
[docs] def getMonitors(self):
self.setEnable = self.setEnable_asMonitor
yield from self.rDataAg.getDrivers()
yield from self.addrAg.getMonitors()
yield from self.wRespAg.getDrivers()