Source code for hwtLib.amba.datapump.sim_ram

from collections import deque

from hwtLib.abstract.sim_ram import SimRam
from pyMathBitPrecise.bit_utils import mask, ValidityError
from hwtSimApi.triggers import WaitWriteOnly


[docs]class AxiDpSimRam(SimRam): """ Dense RAM for simulation purposes with axi datapump interfaces :ivar ~.data: memory dict """
[docs] def __init__(self, cellWidth, clk, rDatapumpIntf=None, wDatapumpIntf=None, parent=None): """ :param cellWidth: width of items in memmory :param clk: clk signal for synchronization :param parent: parent instance of SimRam (memory will be shared with this instance) """ assert cellWidth % 8 == 0 super(AxiDpSimRam, self).__init__(cellWidth // 8, parent=parent) self.allMask = mask(self.cellSize) assert rDatapumpIntf is not None or wDatapumpIntf is not None, \ "At least read or write interface has to be present" if rDatapumpIntf is None: arAg = rAg = None else: arAg = rDatapumpIntf.req._ag rAg = rDatapumpIntf.r._ag self.arAg = arAg self.rAg = rAg self.rPending = deque() if wDatapumpIntf is None: awAg = wAg = wAckAg = None else: awAg = wDatapumpIntf.req._ag wAg = wDatapumpIntf.w._ag wAckAg = wDatapumpIntf.ack._ag self.w_use_strb = wDatapumpIntf is not None and hasattr( wDatapumpIntf.w, "strb") self.r_use_strb = rDatapumpIntf is not None and hasattr( rDatapumpIntf.r, "strb") self.awAg = awAg self.wAg = wAg self.wAckAg = wAckAg self.wPending = deque() self.clk = clk if awAg is not None: intf = awAg.intf elif arAg is not None: intf = arAg.intf else: raise AssertionError("Need at least some interface") self.ID_WIDTH = intf.ID_WIDTH self.MAX_LEN = intf.MAX_LEN self._registerOnClock()
[docs] def _registerOnClock(self): self.clk._sigInside.wait(self.checkRequests())
[docs] def checkRequests(self): """ Check if any request has appeared on interfaces """ yield WaitWriteOnly() if self.arAg is not None: if self.arAg.data: self.onReadReq() if self.rPending: self.doRead() if self.awAg is not None: if self.awAg.data: self.onWriteReq() if self.wPending and self.wPending[0][2] <= len(self.wAg.data): self.doWrite() self._registerOnClock()
[docs] def parseReq(self, req): for i, v in enumerate(req): assert v._is_full_valid(), (i, v) if self.MAX_LEN: if self.ID_WIDTH: assert len(req) == 4, req _id = req[0].val addr = req[1].val size = req[2].val + 1 lastWordBytes = req[3].val else: assert len(req) == 3, req _id = 0 addr = req[0].val size = req[1].val + 1 lastWordBytes = req[2].val else: if self.ID_WIDTH: assert len(req) == 3, req _id = req[0].val addr = req[1].val size = 1 lastWordBytes = req[2].val else: assert len(req) == 2, req _id = 0 addr = req[0].val size = 1 lastWordBytes = req[1].val if lastWordBytes == 0: lastWordBitmask = self.allMask else: lastWordBitmask = mask(lastWordBytes) return (_id, addr, size, lastWordBitmask)
[docs] def onReadReq(self): readReq = self.arAg.data.pop() self.rPending.append(self.parseReq(readReq))
[docs] def onWriteReq(self): writeReq = self.awAg.data.pop() self.wPending.append(self.parseReq(writeReq))
[docs] def doRead(self): _id, addr, size, lastWordBitmask = self.rPending.popleft() HAS_ID = self.rAg.intf.ID_WIDTH > 0 baseIndex = addr // self.cellSize if baseIndex * self.cellSize != addr: raise NotImplementedError( f"unaligned transaction not implemented (0x{addr:x})") for i in range(size): isLast = i == size - 1 try: data = self.data[baseIndex + i] except KeyError: data = None if data is None: raise AssertionError( "Invalid read of uninitialized value on addr 0x%x" % (addr + i * self.cellSize)) if self.r_use_strb: if isLast: strb = lastWordBitmask else: strb = self.allMask if HAS_ID: read_trans = (_id, data, strb, isLast) else: read_trans = (data, strb, isLast) else: if HAS_ID: read_trans = (_id, data, isLast) else: read_trans = (data, isLast) self.rAg.data.append(read_trans)
[docs] def doWriteAck(self, _id): self.wAckAg.data.append(_id)
[docs] def doWrite(self): _id, addr, size, lastWordBitmask = self.wPending.popleft() baseIndex = addr // self.cellSize if baseIndex * self.cellSize != addr: raise NotImplementedError("unaligned transaction not implemented") for i in range(size): if self.w_use_strb: data, strb, last = self.wAg.data.popleft() # assert data._is_full_valid() assert strb._is_full_valid() strb = strb.val else: data, last = self.wAg.data.popleft() try: data = int(data) except ValidityError: data = None last = bool(last) isLast = i == size - 1 assert last == isLast, \ f"write 0x{addr:x}, size {size:d}, expected last:{isLast:d} in word {i:d}" # if data is None: # raise AssertionError( # "Invalid write of uninitialized value on addr 0x%x" % # (addr + i * self.cellSize)) if isLast: expectedStrb = lastWordBitmask else: expectedStrb = self.allMask if expectedStrb != self.allMask: raise NotImplementedError() if self.w_use_strb: assert strb == expectedStrb self.data[baseIndex + i] = data self.doWriteAck(_id)