#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If, Concat, FsmBuilder, In
from hwt.hdl.constants import DIRECTION
from hwt.hdl.types.bits import Bits
from hwt.hdl.types.enum import HEnum
from hwt.interfaces.agents.rdSynced import RdSyncedAgent
from hwt.interfaces.std import Signal, RdSynced, VectSignal
from hwt.interfaces.utils import addClkRstn
from hwt.math import log2ceil
from hwt.synthesizer.param import Param
from hwt.synthesizer.unit import Unit
from hwtLib.clocking.clkBuilder import ClkBuilder
from hwtLib.peripheral.i2c.intf import I2c
from hwtSimApi.hdlSimulator import HdlSimulator
NOP, START, STOP, READ, WRITE = range(5)
[docs]def hasRisen(last, actual):
return ~last & actual
[docs]def hasFallen(last, actual):
return last & ~actual
[docs]class I2cBitCntrlCmd(RdSynced):
"""
.. hwt-autodoc::
"""
def _config(self):
pass
def _declr(self):
self.din = Signal()
self.cmd = VectSignal(log2ceil(5))
self.rd = Signal(masterDir=DIRECTION.IN)
[docs] def _initSimAgent(self, sim: HdlSimulator):
self._ag = I2cBitCntrlCmdAgent(sim, self)
[docs]class I2cBitCntrlCmdAgent(RdSyncedAgent):
[docs] def get_data(self):
"""extract data from interface"""
return (self.intf.cmd.read(), self.intf.din.read())
[docs] def set_data(self, data):
"""write data to interface"""
if data is None:
cmd, d = None, None
else:
cmd, d = data
self.intf.din.write(d)
self.intf.cmd.write(cmd)
[docs]class I2cMasterBitCtrl(Unit):
"""
Translate simple commands into SCL/SDA transitions
Each command has 5 states, 0/1/2/3/idle
.. code-block:: text
start: SCL ~~~~~~~~~~~~~~\____
SDA XX/~~~~~~~\______
x | 0 | 1 | 2 | 3 | i
repstart SCL ______/~~~~~~~\___
SDA __/~~~~~~~\______
x | 0 | 1 | 2 | 3 | i
stop SCL _______/~~~~~~~~~~~
SDA ==\___________/~~~~~
x | 0 | 1 | 2 | 3 | i
write SCL ______/~~~~~~~\____
SDA XXX===============XX
x | 0 | 1 | 2 | 3 | i
read SCL ______/~~~~~~~\____
SDA XXXXXXX=XXXXXXXXXXX
x | 0 | 1 | 2 | 3 | i
============ ============== =============================================================
Timing: Normal mode Fast mode
============ ============== =============================================================
Fscl 100KHz 400KHz
Th_scl 4.0us 0.6us High period of SCL
Tl_scl 4.7us 1.3us Low period of SCL
Tsu:sta 4.7us 0.6us setup time for a repeated start condition
Tsu:sto 4.0us 0.6us setup time for a stop condition
Tbuf 4.7us 1.3us Bus free time between a stop and start condition
============ ============== =============================================================
.. hwt-autodoc::
"""
def _config(self):
self.CLK_CNTR_WIDTH = Param(16)
def _declr(self):
addClkRstn(self)
self.clk_cnt_initVal = VectSignal(16)
self.i2c = I2c()._m()
self.cntrl = I2cBitCntrlCmd()
self.arbitrationLost = Signal()._m() # arbitration lost
self.dout = Signal()._m()
[docs] def stateClkGen(self, scl_sync, scl_t, scl):
# whenever the slave is not ready it can delay the cycle by pulling SCL low
# delay scl_oen
delayedScl_t = self._reg("delayedScl_t", def_val=1)
delayedScl_t(scl_t)
# slave_wait is asserted when master wants to drive SCL high,
# but the slave pulls it low
# slave_wait remains asserted until the slave releases SCL
slave_wait = self._reg("slave_wait", def_val=0)
slave_wait((~scl_t & delayedScl_t & ~scl) | (slave_wait & ~scl))
clkCntr = self._reg("clkCntr",
Bits(self.CLK_CNTR_WIDTH, False),
def_val=self.clk_cnt_initVal)
stateClkEn = self._reg("stateClkEn", def_val=1)
If(clkCntr._eq(0) | scl_sync,
clkCntr(self.clk_cnt_initVal),
stateClkEn(1)
).Elif(slave_wait,
stateClkEn(0)
).Else(
clkCntr(clkCntr - 1),
stateClkEn(0)
)
return stateClkEn
[docs] def filter(self, name, sig):
"""attempt to remove glitches"""
filter0 = ClkBuilder(self, self.clk, "filter")\
.reg_path(sig, 2, "filter", def_val=0)
# let filter_cnt to be shared between filters
try:
filter_clk_cntr = self.filter_clk_cntr
except AttributeError:
filter_clk_cntr = self.filter_clk_cntr = self._reg(
"filter_clk_cntr",
Bits(self.CLK_CNTR_WIDTH),
def_val=self.clk_cnt_initVal)
If(filter_clk_cntr._eq(0),
filter_clk_cntr(self.clk_cnt_initVal)
).Else(
filter_clk_cntr(filter_clk_cntr - 1)
)
filter1 = self._reg(name + "_filter1", dtype=Bits(3), def_val=0b111)
If(filter_clk_cntr._eq(0),
filter1(Concat(filter1[2:], filter0))
)
filtered = ((filter1[2] & filter1[1]) |
(filter1[2] & filter1[0]) |
(filter1[1] & filter1[0]))
return filtered
[docs] def detectStartAndStop(self, scl, sda, scl_t):
"""
:attention: also dout driver
"""
lastScl = self._reg("lastScl", def_val=1)
lastSda = self._reg("lastSda", def_val=1)
startCond = hasFallen(lastSda, sda) & scl
stopCond = hasRisen(lastSda, sda) & scl
lastScl(scl)
lastSda(sda)
dout = self._reg("doutReg", def_val=0)
dout(hasRisen(lastScl, scl))
self.dout(dout)
# master drives SCL high, but another master pulls it low
# master start counting down its low cycle now (clock synchronization)
scl_sync = lastScl & ~scl & ~scl_t
return startCond, stopCond, scl_sync
[docs] def arbitrationLostDriver(self, st, sda, sda_chk, sda_t, stopCond, stateClkEn):
"""
aribitration lost when:
1) master drives SDA high, but the i2c bus is low
2) stop detected while not requested (detect during 'idle' state)
"""
al = self._reg("al", def_val=0)
cmd_stop = self._reg("cmd_stop", def_val=0)
If(stateClkEn,
cmd_stop(self.cntrl.cmd._eq(STOP))
)
_al = (sda_chk & ~sda & ~sda_t)
If(st._eq(st._dtype.idle),
al(_al | (stopCond & ~cmd_stop))
).Else(
al(_al)
)
self.arbitrationLost(al)
return al
def _impl(self):
cmd = self.cntrl.cmd
cmd_ack = self.cntrl.rd
stT = HEnum("stT",
["idle",
"start_0", "start_1", "start_2", "start_3", "start_4",
"stop_0", "stop_1", "stop_2", "stop_3",
"rd_0", "rd_1", "rd_2", "rd_3",
"wr_0", "wr_1", "wr_2", "wr_3"])
fsm = FsmBuilder(self, stT)
st = fsm.stateReg
# check SDA status (multi-master arbitration)
sda_chk = self._reg("sda_chk", def_val=0)
scl_t = self._reg("scl_t", def_val=0)
sda_t = self._reg("sda_t", def_val=0)
scl = self.filter("scl", self.i2c.scl.i)
sda = self.filter("sda", self.i2c.sda.i)
_, stopCond, scl_sync = self.detectStartAndStop(scl, sda, scl_t)
stateClkEn = self.stateClkGen(scl_sync, scl_t, scl)
al = self.arbitrationLostDriver(st, sda, sda_chk, sda_t, stopCond, stateClkEn)
def stateSequence(sequneceName, stateCnt):
for i in range(stateCnt):
stateFrom = getattr(stT, f"{sequneceName}_{i:d}")
if i == stateCnt - 1:
stateTo = stT.idle
else:
_i = i + 1
stateTo = getattr(stT, f"{sequneceName}_{_i:d}")
fsm.Trans(stateFrom,
(al, stT.idle),
stateTo
)
fsm.Trans(stT.idle,
(al, stT.idle),
(cmd._eq(NOP), stT.idle),
(cmd._eq(START), stT.start_0),
(cmd._eq(STOP), stT.stop_0),
(cmd._eq(WRITE), stT.wr_0),
(cmd._eq(READ), stT.rd_0),
)
stateSequence("start", 5)
stateSequence("stop", 4)
stateSequence("rd", 4)
stateSequence("wr", 4)
If(al,
cmd_ack(0),
sda_t(0),
scl_t(0),
sda_chk(0),
).Else(
If(In(st, [stT.start_1, stT.start_2, stT.start_3,
stT.stop_1, stT.stop_2, stT.stop_3,
stT.rd_1, stT.rd_2,
stT.wr_1, stT.wr_2]),
scl_t(0)
).Elif(In(st, [stT.start_4, stT.stop_0,
stT.rd_0, stT.rd_3,
stT.wr_0, stT.wr_3]),
scl_t(1)
),
If(In(st, [stT.start_0, stT.start_1,
stT.stop_3,
stT.rd_0, stT.rd_1, stT.rd_2, stT.rd_3]),
sda_t(0)
).Elif(In(st, [stT.start_2, stT.start_3, stT.start_4,
stT.stop_0, stT.stop_1, stT.stop_2]),
sda_t(1)
).Elif(In(st, [stT.wr_0, stT.wr_1, stT.wr_2, stT.wr_3]),
sda_t(~self.cntrl.din)
),
# cmd ack at the end of state sequence
cmd_ack(In(st, [stT.start_4, stT.stop_3, stT.rd_3, stT.wr_3]))
)
self.i2c.scl.o(0)
self.i2c.sda.o(0)
self.i2c.scl.t(scl_t)
self.i2c.sda.t(sda_t)
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
u = I2cMasterBitCtrl()
print(to_rtl_str(u))