Source code for hwtLib.peripheral.spi.intf
from collections import deque
from hwt.hdl.constants import DIRECTION
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import Clk, Signal, VectSignal
from hwt.interfaces.tristate import TristateSig
from hwt.simulator.agentBase import SyncAgentBase
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.param import Param
from pyMathBitPrecise.bit_utils import mask, get_bit
from hwtSimApi.agents.base import AgentBase
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.process_utils import OnRisingCallbackLoop, OnFallingCallbackLoop
from hwtSimApi.triggers import WaitCombRead, WaitWriteOnly, Timer
[docs]class SpiAgent(SyncAgentBase):
"""
Simulation agent for SPI interface
:ivar ~.txData: data to transceiver container
:ivar ~.rxData: received data
:ivar ~.chipSelects: values of chip select
chipSelects, rxData and txData are lists of integers
"""
BITS_IN_WORD = 8
[docs] def __init__(self, sim: HdlSimulator, intf: "Spi", allowNoReset=False):
AgentBase.__init__(self, sim, intf)
self.txData = deque()
self.rxData = deque()
self.chipSelects = deque()
self._txBitBuff = deque()
self._rxBitBuff = deque()
self.csMask = mask(intf.cs._dtype.bit_length())
self.slaveEn = False
# resolve clk and rstn
self.clk = self.intf._getAssociatedClk()._sigInside
self.rst, self.rstOffIn = self._discoverReset(intf, allowNoReset=allowNoReset)
# read on rising edge write on falling
self.monitorRx = OnRisingCallbackLoop(self.sim, self.clk,
self.monitorRx,
self.getEnable)
self.monitorTx = OnFallingCallbackLoop(self.sim, self.clk,
self.monitorTx,
self.getEnable)
self.driverRx = OnFallingCallbackLoop(self.sim, self.clk,
self.driverRx,
self.getEnable)
self.driverTx = OnRisingCallbackLoop(self.sim, self.clk,
self.driverTx,
self.getEnable)
[docs] def setEnable(self, en):
self._enabled = en
self.monitorRx.setEnable(en)
self.monitorTx.setEnable(en)
self.driverRx.setEnable(en)
self.driverTx.setEnable(en)
[docs] def splitBits(self, v):
return deque([get_bit(v, i)
for i in range(self.BITS_IN_WORD - 1, -1, -1)])
[docs] def mergeBits(self, bits):
t = Bits(self.BITS_IN_WORD, False)
val = 0
vld_mask = 0
for v in bits:
val <<= 1
val |= v.val
vld_mask <<= 1
vld_mask |= v.vld_mask
return t.getValueCls()(t, val, vld_mask)
[docs] def readRxSig(self, sig):
d = sig.read()
bits = self._rxBitBuff
bits.append(d)
if len(bits) == self.BITS_IN_WORD:
self.rxData.append(self.mergeBits(bits))
self._rxBitBuff = []
[docs] def writeTxSig(self, sig):
bits = self._txBitBuff
if not bits:
if not self.txData:
return
d = self.txData.popleft()
bits = self._txBitBuff = self.splitBits(d)
sig.write(bits.popleft())
[docs] def monitorRx(self):
yield WaitCombRead()
if self.notReset():
cs = self.intf.cs.read()
cs = int(cs)
if cs != self.csMask: # if any slave is enabled
if not self._rxBitBuff:
self.chipSelects.append(cs)
self.readRxSig(self.intf.mosi)
# def monitorTx_pre_set(self):
# yield WaitWriteOnly()
# self.writeTxSig(self.intf.miso)
[docs] def monitorTx(self):
yield WaitCombRead()
if self.notReset():
cs = self.intf.cs.read()
cs = int(cs)
if cs != self.csMask:
yield Timer(1)
yield WaitWriteOnly()
self.writeTxSig(self.intf.miso)
[docs] def driverRx(self):
yield WaitCombRead()
if self.notReset() and self.slaveEn:
self.readRxSig(self.intf.miso)
[docs] def driverTx(self):
yield WaitCombRead()
if self.notReset():
if not self._txBitBuff:
try:
cs = self.chipSelects.popleft()
except IndexError:
self.slaveEn = False
yield WaitWriteOnly()
self.intf.cs.write(self.csMask)
return
self.slaveEn = True
yield WaitWriteOnly()
self.intf.cs.write(cs)
yield WaitWriteOnly()
self.writeTxSig(self.intf.mosi)
[docs] def getDrivers(self):
yield self.driverRx()
yield self.driverTx()
[docs] def getMonitors(self):
yield self.monitorRx()
# yield self.monitorTx_pre_set()
yield self.monitorTx()
# http://www.corelis.com/education/SPI_Tutorial.htm
[docs]class Spi(Interface):
"""
Bare SPI interface (Serial peripheral interface)
.. hwt-autodoc::
"""
def _config(self):
self.SLAVE_CNT = Param(1)
self.HAS_MISO = Param(True)
self.HAS_MOSI = Param(True)
self.FREQ = Param(Clk.DEFAULT_FREQ)
def _declr(self):
self.clk = Clk()
self.clk.FREQ = self.FREQ
assert self.HAS_MOSI or self.HAS_MISO
if self.HAS_MOSI:
self.mosi = Signal() # master out slave in
if self.HAS_MISO:
self.miso = Signal(masterDir=DIRECTION.IN) # master in slave out
if self.SLAVE_CNT is not None:
self.cs = VectSignal(self.SLAVE_CNT) # chip select
self._associatedClk = self.clk
[docs] def _initSimAgent(self, sim: HdlSimulator):
self._ag = SpiAgent(sim, self)
[docs]class SpiTristate(Spi):
"""
SPI interface where mosi and miso signal are merged into one tri-state wire
.. hwt-autodoc::
"""
def _config(self):
Spi._config(self)
self.DATA_WIDTH = Param(1)
def _declr(self):
self.clk = Clk()
with self._paramsShared():
self.io = TristateSig() # mosi and miso in one wire
self.cs = VectSignal(self.SLAVE_CNT) # chip select
self._associatedClk = self.clk
[docs]class QSPI(SpiTristate):
"""
SPI interface with 4 tristate data wires
.. hwt-autodoc::
"""
def _config(self):
Spi._config(self)
self.DATA_WIDTH = Param(4)