from hwt.constants import DIRECTION, READ, WRITE, NOP, READ_WRITE
from hwt.hwIO import HwIO
from hwt.hwIOs.agents.rdVldSync import HwIODataRdVldAgent
from hwt.hwIOs.agents.vldSync import HwIODataVldAgent
from hwt.hwIOs.std import HwIOVectSignal, HwIOSignal
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwt.simulator.agentBase import SyncAgentBase
from hwtSimApi.hdlSimulator import HdlSimulator
from pyMathBitPrecise.bit_utils import mask
RESP_OKAY = 0b00
# RESP_RESERVED = 0b01
RESP_SLAVEERROR = 0b10
RESP_DECODEERROR = 0b11
[docs]
class AvalonMM(HwIO):
"""
Avalon Memory Mapped interface
:note: 1 address+writedata channel (ready, valid sync),
1 read data channel (valid only sync),
1 write response channel (valid only sync),
https://docs.altera.com/r/docs/683091/22.3/avalon-interface-specifications/introduction-to-the-avalon-interface-specifications
:ivar waitRequestAllowance: works on same principe as :attr:`AvalonST.readyAllowance`, it specicifies how many
transactions may be buffered in destination and thus how many transaction can source send even when ready=0 (waitRequest=1)
.. hwt-autodoc::
"""
@override
def hwConfig(self):
self.ADDR_WIDTH = HwParam(32)
self.DATA_WIDTH = HwParam(32)
self.MAX_BURST = HwParam(0)
self.waitRequestAllowance = HwParam(0)
@override
def hwDeclr(self):
# self.debugAccess = HwIOSignal()
IN = DIRECTION.IN
# read/write transaction start
self.read = HwIOSignal()
self.write = HwIOSignal()
self.address = HwIOVectSignal(self.ADDR_WIDTH)
# ready_n from slave to mark that next request can not be accepted
self.waitRequest = HwIOSignal(masterDir=IN)
self.readData = HwIOVectSignal(self.DATA_WIDTH, masterDir=IN)
self.readDataValid = HwIOSignal(masterDir=IN) # read data valid
self.byteEnable = HwIOVectSignal(self.DATA_WIDTH // 8)
self.writeData = HwIOVectSignal(self.DATA_WIDTH)
# self.lock = HwIOSignal()
self.response = HwIOVectSignal(2, masterDir=IN)
self.writeResponseValid = HwIOSignal(masterDir=IN)
if self.MAX_BURST != 0:
self.burstCount = HwIOVectSignal(log2ceil(self.MAX_BURST))
# self.beginBurstTransfer = HwIOSignal()
[docs]
def _getWordAddrStep(self):
"""
:return: size of one word in unit of address
"""
return int(self.DATA_WIDTH) // self._getAddrStep()
[docs]
def _getAddrStep(self):
"""
:return: how many bits is one unit of address
(e.g. 8 bits for char * pointer, 36 for 36 bit bram)
"""
return 8
[docs]
@override
def _initSimAgent(self, sim: HdlSimulator):
self._ag = AvalonMmAgent(sim, self)
[docs]
class AvalonMmDataRAgent(HwIODataVldAgent):
"""
Simulation/verification agent for data part of AvalomMM interface
* vld signal = readDataValid
* data signal = (readData, response)
"""
[docs]
@classmethod
@override
def get_valid_signal(cls, hwIO):
return hwIO.readDataValid
[docs]
@override
def get_valid(self):
return self._vld.read()
[docs]
@override
def set_valid(self, val):
self._vld.write(val)
[docs]
@override
def get_data(self):
"""extract data from interface"""
hwIO = self.hwIO
return (hwIO.readData.read(), hwIO.response.read())
[docs]
@override
def set_data(self, data):
"""write data to interface"""
hwIO = self.hwIO
if data is None:
hwIO.readData.write(None)
hwIO.response.write(None)
else:
readData, response = data
hwIO.readData.write(readData)
hwIO.response.write(response)
[docs]
class AvalonMmAddrAgent(HwIODataRdVldAgent):
"""
data format is tuple (address, byteEnable, READ/WRITE, burstCount)
* two valid signals "read", "write"
* one ready_n signal "waitrequest")
* on write set data and byteenamble as well
"""
[docs]
def __init__(self, sim: HdlSimulator, hwIO, allowNoReset=False):
HwIODataRdVldAgent.__init__(self, sim, hwIO, allowNoReset=allowNoReset)
self.BE_ALL = mask(hwIO.readData._dtype.bit_length() // 8)
[docs]
@classmethod
@override
def get_ready_signal(cls, hwIO: AvalonMM):
return hwIO.waitRequest
[docs]
@override
def get_ready(self):
rd = self._rd.read()
rd.val = int(not rd.val)
return rd
[docs]
@override
def set_ready(self, val: int):
self._rd.write(int(not val))
[docs]
@classmethod
@override
def get_valid_signal(cls, hwIO: AvalonMM):
return (hwIO.read, hwIO.write)
[docs]
@override
def get_valid(self):
r = self._vld[0].read()
w = self._vld[1].read()
r.val = r.val | w.val
r.vld_mask = r.vld_mask & w.vld_mask
return r
[docs]
@override
def set_valid(self, val: int):
if self.actualData is None or self.actualData is NOP:
r = 0
w = 0
else:
mode = self.actualData[0]
if mode is READ:
r = val
w = 0
elif mode is WRITE:
r = 0
w = val
else:
raise ValueError("Unknown mode", mode)
self._vld[0].write(r)
self._vld[1].write(w)
[docs]
@override
def get_data(self):
hwIO = self.hwIO
address = hwIO.address.read()
byteEnable = hwIO.byteEnable.read()
read = hwIO.read.read()
write = hwIO.write.read()
wdata = hwIO.writeData.read()
if hwIO.MAX_BURST != 0:
burstCount = hwIO.burstCount.read()
else:
burstCount = 1
if read.val:
if write.val:
rw = READ_WRITE
else:
rw = READ
wdata = None
byteEnable = None
elif write.val:
rw = WRITE
else:
raise AssertionError(
"This function should not be called when data"
"is not ready on interface")
return (rw, address, burstCount, wdata, byteEnable)
[docs]
@override
def set_data(self, data):
hwIO = self.hwIO
if data is None:
hwIO.address.write(None)
hwIO.byteEnable.write(None)
if hwIO.MAX_BURST != 0:
hwIO.burstCount.write(None)
hwIO.writeData.write(None)
hwIO.read.write(0)
hwIO.write.write(0)
else:
rw, address, burstCount, d, be = data
if rw is READ:
rd, wr = 1, 0
be = self.BE_ALL
elif rw is WRITE:
rd, wr = 0, 1
hwIO.writeData.write(d)
else:
raise TypeError(f"rw is in invalid format {rw}")
hwIO.address.write(address)
hwIO.byteEnable.write(be)
assert int(burstCount) >= 1, burstCount
if hwIO.MAX_BURST:
hwIO.burstCount.write(burstCount)
hwIO.read.write(rd)
hwIO.write.write(wr)
[docs]
class AvalonMmWRespAgent(HwIODataVldAgent):
[docs]
@classmethod
@override
def get_valid_signal(cls, hwIO):
return hwIO.writeResponseValid
[docs]
@override
def get_data(self):
return self.hwIO.response.read()
[docs]
@override
def set_data(self, data):
self.hwIO.response.write(data)
[docs]
class AvalonMmAgent(SyncAgentBase):
"""
Simulation agent for AvalonMM bus interface
:ivar ~.req: request data, items are tuples (READ/WRITE, address, burstCount, writeData, writeMask)
:ivar ~.wResp: write response data
:ivar ~.rData: data read from interface, items are typles (data, response)
"""
[docs]
def __init__(self, sim: HdlSimulator, hwIO, allowNoReset=False):
SyncAgentBase.__init__(self, sim, hwIO, allowNoReset=allowNoReset)
self.addrAg = AvalonMmAddrAgent(sim, hwIO, allowNoReset=allowNoReset)
self.rDataAg = AvalonMmDataRAgent(sim, hwIO, allowNoReset=allowNoReset)
self.wRespAg = AvalonMmWRespAgent(sim, hwIO, allowNoReset=allowNoReset)
[docs]
def req_get(self):
return self.addrAg.data
[docs]
def req_set(self, v):
self.addrAg.data = v
req = property(req_get, req_set)
[docs]
def wResp_get(self):
return self.wRespAg.data
[docs]
def wResp_set(self, v):
self.wRespAg = v
wResp = property(wResp_get, wResp_set)
[docs]
def rData_get(self):
return self.rDataAg.data
[docs]
def rData_set(self, v):
self.rDataAg.data = v
rData = property(rData_get, rData_set)
[docs]
@override
def setEnable_asDriver(self, en: bool):
self._enabled = en
self.addrAg.setEnable(en)
# self.wRespAg.setEnable(en)
[docs]
@override
def setEnable_asMonitor(self, en: bool):
self._enabled = en
self.addrAg.setEnable(en)
self.wRespAg.setEnable(en)
self.rData.setEnable(en)
[docs]
@override
def getDrivers(self):
self.setEnable = self.setEnable_asDriver
yield from self.rDataAg.getMonitors()
yield from self.addrAg.getDrivers()
yield from self.wRespAg.getMonitors()
[docs]
@override
def getMonitors(self):
self.setEnable = self.setEnable_asMonitor
yield from self.rDataAg.getDrivers()
yield from self.addrAg.getMonitors()
yield from self.wRespAg.getDrivers()