Source code for hwtLib.peripheral.spi.intf

from collections import deque

from hwt.hdl.constants import DIRECTION
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import Clk, Signal, VectSignal
from hwt.interfaces.tristate import TristateSig
from hwt.simulator.agentBase import SyncAgentBase
from hwt.synthesizer.interface import Interface
from hwt.synthesizer.param import Param
from pyMathBitPrecise.bit_utils import mask, get_bit
from hwtSimApi.agents.base import AgentBase
from hwtSimApi.hdlSimulator import HdlSimulator
from hwtSimApi.process_utils import OnRisingCallbackLoop, OnFallingCallbackLoop
from hwtSimApi.triggers import WaitCombRead, WaitWriteOnly, Timer


[docs]class SpiAgent(SyncAgentBase): """ Simulation agent for SPI interface :ivar ~.txData: data to transceiver container :ivar ~.rxData: received data :ivar ~.chipSelects: values of chip select chipSelects, rxData and txData are lists of integers """ BITS_IN_WORD = 8
[docs] def __init__(self, sim: HdlSimulator, intf: "Spi", allowNoReset=False): AgentBase.__init__(self, sim, intf) self.txData = deque() self.rxData = deque() self.chipSelects = deque() self._txBitBuff = deque() self._rxBitBuff = deque() self.csMask = mask(intf.cs._dtype.bit_length()) self.slaveEn = False # resolve clk and rstn self.clk = self.intf._getAssociatedClk()._sigInside self.rst, self.rstOffIn = self._discoverReset(intf, allowNoReset=allowNoReset) # read on rising edge write on falling self.monitorRx = OnRisingCallbackLoop(self.sim, self.clk, self.monitorRx, self.getEnable) self.monitorTx = OnFallingCallbackLoop(self.sim, self.clk, self.monitorTx, self.getEnable) self.driverRx = OnFallingCallbackLoop(self.sim, self.clk, self.driverRx, self.getEnable) self.driverTx = OnRisingCallbackLoop(self.sim, self.clk, self.driverTx, self.getEnable)
[docs] def setEnable(self, en): self._enabled = en self.monitorRx.setEnable(en) self.monitorTx.setEnable(en) self.driverRx.setEnable(en) self.driverTx.setEnable(en)
[docs] def splitBits(self, v): return deque([get_bit(v, i) for i in range(self.BITS_IN_WORD - 1, -1, -1)])
[docs] def mergeBits(self, bits): t = Bits(self.BITS_IN_WORD, False) val = 0 vld_mask = 0 for v in bits: val <<= 1 val |= v.val vld_mask <<= 1 vld_mask |= v.vld_mask return t.getValueCls()(t, val, vld_mask)
[docs] def readRxSig(self, sig): d = sig.read() bits = self._rxBitBuff bits.append(d) if len(bits) == self.BITS_IN_WORD: self.rxData.append(self.mergeBits(bits)) self._rxBitBuff = []
[docs] def writeTxSig(self, sig): bits = self._txBitBuff if not bits: if not self.txData: return d = self.txData.popleft() bits = self._txBitBuff = self.splitBits(d) sig.write(bits.popleft())
[docs] def monitorRx(self): yield WaitCombRead() if self.notReset(): cs = self.intf.cs.read() cs = int(cs) if cs != self.csMask: # if any slave is enabled if not self._rxBitBuff: self.chipSelects.append(cs) self.readRxSig(self.intf.mosi)
# def monitorTx_pre_set(self): # yield WaitWriteOnly() # self.writeTxSig(self.intf.miso)
[docs] def monitorTx(self): yield WaitCombRead() if self.notReset(): cs = self.intf.cs.read() cs = int(cs) if cs != self.csMask: yield Timer(1) yield WaitWriteOnly() self.writeTxSig(self.intf.miso)
[docs] def driverRx(self): yield WaitCombRead() if self.notReset() and self.slaveEn: self.readRxSig(self.intf.miso)
[docs] def driverTx(self): yield WaitCombRead() if self.notReset(): if not self._txBitBuff: try: cs = self.chipSelects.popleft() except IndexError: self.slaveEn = False yield WaitWriteOnly() self.intf.cs.write(self.csMask) return self.slaveEn = True yield WaitWriteOnly() self.intf.cs.write(cs) yield WaitWriteOnly() self.writeTxSig(self.intf.mosi)
[docs] def getDrivers(self): yield self.driverRx() yield self.driverTx()
[docs] def getMonitors(self): yield self.monitorRx() # yield self.monitorTx_pre_set() yield self.monitorTx()
# http://www.corelis.com/education/SPI_Tutorial.htm
[docs]class Spi(Interface): """ Bare SPI interface (Serial peripheral interface) .. hwt-autodoc:: """ def _config(self): self.SLAVE_CNT = Param(1) self.HAS_MISO = Param(True) self.HAS_MOSI = Param(True) self.FREQ = Param(Clk.DEFAULT_FREQ) def _declr(self): self.clk = Clk() self.clk.FREQ = self.FREQ assert self.HAS_MOSI or self.HAS_MISO if self.HAS_MOSI: self.mosi = Signal() # master out slave in if self.HAS_MISO: self.miso = Signal(masterDir=DIRECTION.IN) # master in slave out if self.SLAVE_CNT is not None: self.cs = VectSignal(self.SLAVE_CNT) # chip select self._associatedClk = self.clk
[docs] def _initSimAgent(self, sim: HdlSimulator): self._ag = SpiAgent(sim, self)
[docs]class SpiTristate(Spi): """ SPI interface where mosi and miso signal are merged into one tri-state wire .. hwt-autodoc:: """ def _config(self): Spi._config(self) self.DATA_WIDTH = Param(1) def _declr(self): self.clk = Clk() with self._paramsShared(): self.io = TristateSig() # mosi and miso in one wire self.cs = VectSignal(self.SLAVE_CNT) # chip select self._associatedClk = self.clk
[docs]class QSPI(SpiTristate): """ SPI interface with 4 tristate data wires .. hwt-autodoc:: """ def _config(self): Spi._config(self) self.DATA_WIDTH = Param(4)