Source code for hwtLib.amba.datapump.intf
from math import ceil
from hwt.hdl.constants import DIRECTION
from hwt.interfaces.agents.handshaked import HandshakedAgent
from hwt.interfaces.std import Handshaked, VectSignal, HandshakeSync
from hwt.math import log2ceil
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.param import Param
from hwtLib.amba.axis import AxiStream
from hwtSimApi.agents.base import AgentBase
from hwtSimApi.hdlSimulator import HdlSimulator
[docs]class AddrSizeHs(Handshaked):
"""
:ivar MAX_LEN: maximum value of len (number of words - 1)
.. hwt-autodoc::
"""
def _config(self):
self.ID_WIDTH = Param(4)
self.ADDR_WIDTH = Param(32)
self.DATA_WIDTH = Param(64)
self.MAX_LEN = Param(4096 // 8 - 1)
self.USE_STRB = Param(True)
def _declr(self):
if self.ID_WIDTH:
self.id = VectSignal(self.ID_WIDTH)
self.addr = VectSignal(self.ADDR_WIDTH)
assert self.MAX_LEN >= 0, self.MAX_LEN
if self.MAX_LEN > 0:
self.len = VectSignal(log2ceil(self.MAX_LEN + 1))
# rem is number of bytes in last word which are valid - 1
self.rem = VectSignal(log2ceil(self.DATA_WIDTH // 8))
HandshakeSync._declr(self)
[docs] def _initSimAgent(self, sim: HdlSimulator):
self._ag = AddrSizeHsAgent(sim, self)
[docs]class AddrSizeHsAgent(HandshakedAgent):
[docs] def get_data(self):
intf = self.intf
addr = intf.addr.read()
rem = intf.rem.read()
if intf.ID_WIDTH:
_id = intf.id.read()
if intf.MAX_LEN:
_len = intf.len.read()
return (_id, addr, _len, rem)
else:
return (_id, addr, rem)
else:
if intf.MAX_LEN:
_len = intf.len.read()
return (addr, _len, rem)
else:
return (addr, rem)
[docs] def set_data(self, data):
intf = self.intf
if intf.ID_WIDTH:
if data is None:
_id, addr, _len, rem = (None for _ in range(4))
else:
if intf.MAX_LEN == 0:
_id, addr, rem = data
else:
_id, addr, _len, rem = data
intf.id.write(_id)
else:
if data is None:
addr, _len, rem = None, None, None
else:
if intf.MAX_LEN == 0:
addr, rem = data
else:
addr, _len, rem = data
intf.addr.write(addr)
if intf.MAX_LEN != 0:
intf.len.write(_len)
intf.rem.write(rem)
[docs]class AxiRDatapumpIntf(Interface):
"""
Interface of read datapump driver
.. hwt-autodoc::
"""
def _config(self):
self.ID_WIDTH = Param(0)
self.ADDR_WIDTH = Param(32)
self.DATA_WIDTH = Param(64)
self.MAX_BYTES = Param(4096)
self.USE_STRB = Param(True)
def _declr(self):
with self._paramsShared():
# user requests
self.req = AddrSizeHs()
self.req.MAX_LEN = max(ceil(self.MAX_BYTES / (self.DATA_WIDTH // 8)) - 1, 0)
self.r = AxiStream(masterDir=DIRECTION.IN)
[docs] def _initSimAgent(self, sim: HdlSimulator):
self._ag = AxiRDatapumpIntfAgent(sim, self)
[docs]class AxiRDatapumpIntfAgent(AgentBase):
"""
Composite agent with agent for every AxiRDatapumpIntf channel
enable is shared
"""
@property
def enable(self):
return self.__enable
@enable.setter
def enable(self, v):
"""
Distribute change of enable on child agents
"""
self.__enable = v
for o in [self.req, self.r]:
o.enable = v
[docs] def __init__(self, sim: HdlSimulator, intf):
self.__enable = True
self.intf = intf
intf.req._initSimAgent(sim)
self.req = intf.req._ag
intf.r._initSimAgent(sim)
self.r = intf.r._ag
[docs] def getDrivers(self):
yield from self.req.getDrivers()
yield from self.r.getMonitors()
[docs] def getMonitors(self):
yield from self.req.getMonitors()
yield from self.r.getDrivers()
[docs]class AxiWDatapumpIntf(Interface):
"""
Interface of write datapump driver
.. hwt-autodoc::
"""
def _config(self):
AddrSizeHs._config(self)
def _declr(self):
with self._paramsShared():
# user requests
self.req = AddrSizeHs()
with self._paramsShared(exclude=({"ID_WIDTH"}, set())):
self.w = AxiStream()
if self.ID_WIDTH:
ack = Handshaked(masterDir=DIRECTION.IN)
ack.DATA_WIDTH = self.ID_WIDTH
else:
ack = HandshakeSync(masterDir=DIRECTION.IN)
self.ack = ack
[docs] def _initSimAgent(self, sim: HdlSimulator):
self._ag = AxiWDatapumpIntfAgent(sim, self)
[docs]class AxiWDatapumpIntfAgent(AgentBase):
"""
Composite agent with agent for every AxiRDatapumpIntf channel
enable is shared
"""
@property
def enable(self):
return self.__enable
@enable.setter
def enable(self, v):
"""
Distribute change of enable on child agents
"""
self.__enable = v
for o in [self.req, self.w, self.ack]:
o.enable = v
[docs] def __init__(self, sim: HdlSimulator, intf):
self.__enable = True
self.intf = intf
intf.req._initSimAgent(sim)
self.req = intf.req._ag
intf.w._initSimAgent(sim)
self.w = intf.w._ag
intf.ack._initSimAgent(sim)
self.ack = intf.ack._ag
[docs] def getDrivers(self):
yield from self.req.getDrivers()
yield from self.w.getDrivers()
yield from self.ack.getMonitors()
[docs] def getMonitors(self):
yield from self.req.getMonitors()
yield from self.w.getMonitors()
yield from self.ack.getDrivers()