Source code for hwtLib.amba.datapump.intf

from math import ceil

from hwt.constants import DIRECTION
from hwt.hwIO import HwIO
from hwt.hwIOs.agents.rdVldSync import HwIODataRdVldAgent
from hwt.hwIOs.std import HwIODataRdVld, HwIOVectSignal, HwIORdVldSync
from hwt.hwParam import HwParam
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwtLib.amba.axi4s import Axi4Stream
from hwtSimApi.agents.base import AgentBase
from hwtSimApi.hdlSimulator import HdlSimulator


[docs] class AddrSizeHs(HwIODataRdVld): """ :ivar MAX_LEN: maximum value of len (number of words - 1) .. hwt-autodoc:: """ @override def hwConfig(self): self.ID_WIDTH = HwParam(4) self.ADDR_WIDTH = HwParam(32) self.DATA_WIDTH = HwParam(64) self.MAX_LEN = HwParam(4096 // 8 - 1) self.USE_STRB = HwParam(True) @override def hwDeclr(self): if self.ID_WIDTH: self.id = HwIOVectSignal(self.ID_WIDTH) self.addr = HwIOVectSignal(self.ADDR_WIDTH) assert self.MAX_LEN >= 0, self.MAX_LEN if self.MAX_LEN > 0: self.len = HwIOVectSignal(log2ceil(self.MAX_LEN + 1)) # rem is number of bytes in last word which are valid - 1 self.rem = HwIOVectSignal(log2ceil(self.DATA_WIDTH // 8)) HwIORdVldSync.hwDeclr(self)
[docs] @override def _initSimAgent(self, sim: HdlSimulator): self._ag = AddrSizeHsAgent(sim, self)
[docs] class AddrSizeHsAgent(HwIODataRdVldAgent):
[docs] @override def get_data(self): hwIO = self.hwIO addr = hwIO.addr.read() rem = hwIO.rem.read() if hwIO.ID_WIDTH: _id = hwIO.id.read() if hwIO.MAX_LEN: _len = hwIO.len.read() return (_id, addr, _len, rem) else: return (_id, addr, rem) else: if hwIO.MAX_LEN: _len = hwIO.len.read() return (addr, _len, rem) else: return (addr, rem)
[docs] @override def set_data(self, data): hwIO = self.hwIO if hwIO.ID_WIDTH: if data is None: _id, addr, _len, rem = (None for _ in range(4)) else: if hwIO.MAX_LEN == 0: _id, addr, rem = data else: _id, addr, _len, rem = data hwIO.id.write(_id) else: if data is None: addr, _len, rem = None, None, None else: if hwIO.MAX_LEN == 0: addr, rem = data else: addr, _len, rem = data hwIO.addr.write(addr) if hwIO.MAX_LEN != 0: hwIO.len.write(_len) hwIO.rem.write(rem)
[docs] class HwIOAxiRDatapump(HwIO): """ HwIO of read datapump driver .. hwt-autodoc:: """ @override def hwConfig(self): self.ID_WIDTH = HwParam(0) self.ADDR_WIDTH = HwParam(32) self.DATA_WIDTH = HwParam(64) self.MAX_BYTES = HwParam(4096) self.USE_STRB = HwParam(True) @override def hwDeclr(self): with self._hwParamsShared(): # user requests self.req = AddrSizeHs() self.req.MAX_LEN = max(ceil(self.MAX_BYTES / (self.DATA_WIDTH // 8)) - 1, 0) self.r = Axi4Stream(masterDir=DIRECTION.IN)
[docs] @override def _initSimAgent(self, sim: HdlSimulator): self._ag = HwIOAxiRDatapumpAgent(sim, self)
[docs] class HwIOAxiRDatapumpAgent(AgentBase): """ Composite agent with agent for every HwIOAxiRDatapump channel enable is shared """ @property def enable(self): return self.__enable @enable.setter def enable(self, v): """ Distribute change of enable on child agents """ self.__enable = v for o in [self.req, self.r]: o.enable = v
[docs] def __init__(self, sim: HdlSimulator, hwIO): self.__enable = True self.hwIO = hwIO hwIO.req._initSimAgent(sim) self.req = hwIO.req._ag hwIO.r._initSimAgent(sim) self.r = hwIO.r._ag
[docs] @override def getDrivers(self): yield from self.req.getDrivers() yield from self.r.getMonitors()
[docs] @override def getMonitors(self): yield from self.req.getMonitors() yield from self.r.getDrivers()
[docs] class HwIOAxiWDatapump(HwIO): """ HwIO of write datapump driver .. hwt-autodoc:: """ @override def hwConfig(self): AddrSizeHs.hwConfig(self) @override def hwDeclr(self): with self._hwParamsShared(): # user requests self.req = AddrSizeHs() with self._hwParamsShared(exclude=({"ID_WIDTH"}, set())): self.w = Axi4Stream() if self.ID_WIDTH: ack = HwIODataRdVld(masterDir=DIRECTION.IN) ack.DATA_WIDTH = self.ID_WIDTH else: ack = HwIORdVldSync(masterDir=DIRECTION.IN) self.ack = ack
[docs] @override def _initSimAgent(self, sim: HdlSimulator): self._ag = HwIOAxiWDatapumpAgent(sim, self)
[docs] class HwIOAxiWDatapumpAgent(AgentBase): """ Composite agent with agent for every HwIOAxiRDatapump channel enable is shared """ @property def enable(self): return self.__enable @enable.setter def enable(self, v): """ Distribute change of enable on child agents """ self.__enable = v for o in [self.req, self.w, self.ack]: o.enable = v
[docs] def __init__(self, sim: HdlSimulator, hwIO): self.__enable = True self.hwIO = hwIO hwIO.req._initSimAgent(sim) self.req = hwIO.req._ag hwIO.w._initSimAgent(sim) self.w = hwIO.w._ag hwIO.ack._initSimAgent(sim) self.ack = hwIO.ack._ag
[docs] @override def getDrivers(self): yield from self.req.getDrivers() yield from self.w.getDrivers() yield from self.ack.getMonitors()
[docs] @override def getMonitors(self): yield from self.req.getMonitors() yield from self.w.getMonitors() yield from self.ack.getDrivers()