Source code for hwtLib.amba.axiDatapumpIntf
from hwt.code import log2ceil
from hwt.hdl.constants import DIRECTION
from hwt.interfaces.agents.handshaked import HandshakedAgent
from hwt.interfaces.std import Handshaked, VectSignal, HandshakeSync
from hwt.simulator.agentBase import AgentBase
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.param import Param
from hwtLib.amba.axis import AxiStream_withId, AxiStream
[docs]class AddrSizeHs(Handshaked):
[docs] def _config(self):
self.ID_WIDTH = Param(4)
self.ADDR_WIDTH = Param(32)
self.DATA_WIDTH = Param(64)
self.MAX_LEN = Param(4096 // 8 - 1)
[docs] def _declr(self):
self.id = VectSignal(self.ID_WIDTH)
self.addr = VectSignal(self.ADDR_WIDTH)
# len is number of words -1
self.len = VectSignal(log2ceil(self.MAX_LEN))
# rem is number of bits in last word which is valid - 1
self.rem = VectSignal(log2ceil(self.DATA_WIDTH // 8))
HandshakeSync._declr(self)
[docs] def _initSimAgent(self):
self._ag = AddrSizeHsAgent(self)
[docs]class AddrSizeHsAgent(HandshakedAgent):
[docs] def doRead(self, s):
intf = self.intf
r = s.read
_id = r(intf.id)
addr = r(intf.addr)
_len = r(intf.len)
rem = r(intf.rem)
return (_id, addr, _len, rem)
[docs] def doWrite(self, s, data):
intf = self.intf
w = s.write
if data is None:
data = [None for _ in range(4)]
_id, addr, _len, rem = data
w(_id, intf.id)
w(addr, intf.addr)
w(_len, intf.len)
w(rem, intf.rem)
[docs]class AxiRDatapumpIntf(Interface):
"""
Interface of read datapump driver
"""
[docs] def _config(self):
AddrSizeHs._config(self)
[docs] def _declr(self):
with self._paramsShared():
# user requests
self.req = AddrSizeHs()
self.r = AxiStream_withId(masterDir=DIRECTION.IN)
[docs] def _initSimAgent(self):
self._ag = AxiRDatapumpIntfAgent(self)
[docs]class AxiRDatapumpIntfAgent(AgentBase):
"""
Composite agent with agent for every AxiRDatapumpIntf channel
enable is shared
"""
@property
def enable(self):
return self.__enable
@enable.setter
def enable(self, v):
"""
Distribute change of enable on child agents
"""
self.__enable = v
for o in [self.req, self.r]:
o.enable = v
[docs] def __init__(self, intf):
self.__enable = True
self.intf = intf
intf.req._initSimAgent()
self.req = intf.req._ag
intf.r._initSimAgent()
self.r = intf.r._ag
[docs] def getDrivers(self):
return (self.req.getDrivers() +
self.r.getMonitors()
)
[docs] def getMonitors(self):
return (self.req.getMonitors() +
self.r.getDrivers()
)
[docs]class AxiWDatapumpIntf(Interface):
"""
Interface of write datapump driver
"""
[docs] def _config(self):
AddrSizeHs._config(self)
[docs] def _declr(self):
with self._paramsShared():
# user requests
self.req = AddrSizeHs()
self.w = AxiStream()
self.ack = Handshaked(masterDir=DIRECTION.IN)
self.ack._replaceParam("DATA_WIDTH", self.ID_WIDTH)
[docs] def _initSimAgent(self):
self._ag = AxiWDatapumpIntfAgent(self)
[docs]class AxiWDatapumpIntfAgent(AgentBase):
"""
Composite agent with agent for every AxiRDatapumpIntf channel
enable is shared
"""
@property
def enable(self):
return self.__enable
@enable.setter
def enable(self, v):
"""
Distribute change of enable on child agents
"""
self.__enable = v
for o in [self.req, self.w, self.ack]:
o.enable = v
[docs] def __init__(self, intf):
self.__enable = True
self.intf = intf
intf.req._initSimAgent()
self.req = intf.req._ag
intf.w._initSimAgent()
self.w = intf.w._ag
intf.ack._initSimAgent()
self.ack = intf.ack._ag
[docs] def getDrivers(self):
return (self.req.getDrivers() +
self.w.getDrivers() +
self.ack.getMonitors()
)
[docs] def getMonitors(self):
return (self.req.getMonitors() +
self.w.getMonitors() +
self.ack.getDrivers()
)