Source code for hwtLib.ipif.intf
from collections import deque
from hwt.bitmask import mask
from hwt.hdl.constants import READ, WRITE, NOP
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import s, D, VectSignal
from hwt.simulator.agentBase import SyncAgentBase
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.param import Param
[docs]class Ipif(Interface):
READ = 1
WRITE = 0
[docs] def _config(self):
self.ADDR_WIDTH = Param(32)
self.DATA_WIDTH = Param(32)
[docs] def _declr(self):
# chip select
self.bus2ip_cs = s()
# A High level indicates the transfer request is a user IP read.
# A Low level indicates the transfer request is a user IP write.
self.bus2ip_rnw = s()
# read /write addr
self.bus2ip_addr = VectSignal(self.ADDR_WIDTH)
self.bus2ip_data = VectSignal(self.DATA_WIDTH)
# byte enable for bus2ip_data
self.bus2ip_be = VectSignal(self.DATA_WIDTH // 8)
self.ip2bus_data = VectSignal(self.DATA_WIDTH, masterDir=D.IN)
# write ack
self.ip2bus_wrack = s(masterDir=D.IN)
# read ack
self.ip2bus_rdack = s(masterDir=D.IN)
self.ip2bus_error = s(masterDir=D.IN)
[docs] def _getWordAddrStep(self):
"""
:return: size of one word in unit of address
"""
return int(self.DATA_WIDTH) // self._getAddrStep()
[docs] def _getAddrStep(self):
"""
:return: how many bits is one unit of address (f.e. 8 bits for char * pointer,
36 for 36 bit bram)
"""
return 8
[docs] def _initSimAgent(self):
self._ag = IpifAgent(self)
[docs]class IpifWithCE(Ipif):
[docs] def _config(self):
super(IpifWithCE, self)._config()
self.REG_COUNT = Param(1)
[docs] def _declr(self):
super()._declr()
ce_t = Bits(self.REG_COUNT)
# read chip enable bus
self.bus2ip_rdce = s(dtype=ce_t)
# Write chip enable bus
self.bus2ip_wrce = s(dtype=ce_t)
[docs]class IpifAgent(SyncAgentBase):
"""
:ivar requests: list of tuples (request type, address, [write data], [write mask]) - used for driver
:ivar data: list of data in memory, used for monitor
:ivar mem: if agent is in monitor mode (= is slave) all reads and writes are performed on
mem object
:ivar actual: actual request which is performed
"""
[docs] def __init__(self, intf, allowNoReset=True):
super().__init__(intf, allowNoReset=allowNoReset)
self.requests = deque()
self.actual = NOP
self.readed = deque()
self.wmaskAll = mask(intf.bus2ip_data._dtype.bit_length()//8)
self.mem = {}
self.requireInit = True
[docs] def doReq(self, sim, req):
rw = req[0]
addr = req[1]
if rw == READ:
rw = Ipif.READ
wdata = None
wmask = None
elif rw == WRITE:
wdata = req[2]
rw = Ipif.WRITE
wmask = req[3]
else:
raise NotImplementedError(rw)
intf = self.intf
w = sim.write
w(1, intf.bus2ip_cs)
w(rw, intf.bus2ip_rnw)
w(addr, intf.bus2ip_addr)
w(wdata, intf.bus2ip_data)
w(wmask, intf.bus2ip_be)
[docs] def monitor(self, sim):
raise NotImplementedError()
[docs] def driver(self, sim):
intf = self.intf
actual = self.actual
actual_next = actual
w = sim.write
r = sim.read
if self.requireInit:
w(0, intf.bus2ip_cs)
self.requireInit = False
yield sim.waitOnCombUpdate()
yield sim.waitOnCombUpdate()
# now we are after clk edge
if actual is not NOP:
if actual[0] is READ:
rack = r(intf.ip2bus_rdack)
assert rack.vldMask == 1
if rack.val:
d = r(intf.ip2bus_data)
if self._debugOutput is not None:
self._debugOutput.write("%s, on %r read_data: %d\n" % (
self.intf._getFullName(),
sim.now, d.val))
self.readed.append(d)
actual_next = NOP
else:
# write in progress
wack = r(intf.ip2bus_wrack)
assert wack.vldMask == 1
if wack.val:
if self._debugOutput is not None:
self._debugOutput.write("%s, on %r write_ack\n" % (
self.intf._getFullName(),
sim.now))
actual_next = NOP
en = self.notReset(sim)
if en:
if self.actual is NOP:
if self.requests:
req = self.requests.popleft()
if req is not NOP:
self.doReq(sim, req)
self.actual = req
return
else:
self.actual = actual_next
return
w(0, intf.bus2ip_cs)