#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from itertools import chain
from typing import Union, Optional
from hwt.code import If
from hwt.doc_markers import hwt_expr_producer, hwt_stm_producer
from hwt.hdl.types.bits import HBits
from hwt.hdl.types.bitsRtlSignal import HBitsRtlSignal
from hwt.hwModule import HwModule
from hwt.math import log2ceil, isPow2
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
EnWaitIncrValTuple = Union[
tuple[HBitsRtlSignal, HBitsRtlSignal], # en, wait
tuple[HBitsRtlSignal, HBitsRtlSignal, Union[int, HBitsRtlSignal]], # en, wait, incrVal
]
[docs]
class FifoPtrLogic():
"""
:note: r_ptr=index which is going to be read
:note: w_ptr=index which is going to be written
Variant of read_ptr/write_ptr implementations
=============================================
:note: in all variants r_ptr is initialized to 0, w_ptr to number of interms preloaded in FIFO
:note: variants with ram.R_LATENCY=1 will latch mem[rd_ptr] if fifo_read
so the mem[rd_ptr] is not safe to be written in any variant unless the fifo is empty
.. code-block::python
# ram.R_LATENCY=0
output = ram[read_ptr];
# ram.R_LATENCY=1
If(self.clk._onRisingEdge(),
If(fifo_read,
output(mem[rd_ptr])
)
)
Variant with r_wait = w_ptr == r_ptr
------------------------------------
* EFFECTIVE_DEPTH = DEPTH - 1
* timing issues: w_wait expressions contain add and saturation
.. code-block::python
full = w_wait = (w_ptr+1) % DEPTH == r_ptr
empty = r_wait = w_ptr == r_ptr
# rw
# [ - - - - ]
#
# r w
# [ 0 1 2 - ]
#
# w r
# [ 2 - 0 1 ] # full=1
#
# wr
# [ 2 3 0 1 ] # not possible with this pointer logic
* ram.R_LATENCY=1: https://vlsiverify.com/verilog/verilog-codes/synchronous-fifo/
Variant with r_wait = count != 0
--------------------------------
* Use of explicit count register may be resource/timing inefficient
* EFFECTIVE_DEPTH = DEPTH
* https://www.asic-world.com/examples/verilog/syn_fifo.html
* https://www.digikey.com/en/maker/tutorials/2025/fifo-design-in-systemverilog
Variant with item lock flag
--------------------------------
Uses explicit flag to check if the w_ptr moved to r_ptr or r_ptr moved to w_ptr
* EFFECTIVE_DEPTH = DEPTH
.. code-block::python
# last_was_r = 0 means current item is locked for write and 1 means for read
last_was_r = 1 # reset value for empty FIFO
# cycle step
if r_en && ~r_wait:
# lock current item for read
last_was_r = 1
elif w_en & ~w_wait:
# lock current item for write
last_was_r = 0
# :note: if w_ptr.next != r_ptr.next the value will be ignored
full = w_wait = (w_ptr._eq(r_ptr)) & ~last_was_read
empty = r_wait = (w_ptr._eq(r_ptr)) & last_was_read
* ram.R_LATENCY=0: https://medium.com/@aiclab.official/fifo-design-and-implementation-tutorial-in-rtl-systemverilog-f11d4c78e3e8
Variant with XORed item lock flag
--------------------------------------
Distributed last_was_r set to crossed0_r, crossed0_w
crossed0_r/crossed0_w is filipped if pointer overflows
* EFFECTIVE_DEPTH = DEPTH
.. code-block::python
r_is_before_w = crossed0_r == crossed0_w
w_is_before_r = crossed0_r != crossed0_w
full = w_wait = (w_ptr._eq(r_ptr)) & ~r_is_before_w
empty = r_wait = (w_ptr._eq(r_ptr)) & r_is_before_w
* ram.R_LATENCY=1: https://gist.github.com/C47D/e299230c65b82a87d7fc83579d78b168?permalink_comment_id=4216482#gistcomment-4216482
* only for pow2 depth
"""
[docs]
def __init__(self, parent: HwModule, DEPTH: int, RAM_SIZE: Optional[int]=None, INIT_SIZE:int=0):
if RAM_SIZE is None:
RAM_SIZE = DEPTH
self.RAM_SIZE = RAM_SIZE
self.parent = parent
assert DEPTH > 1, DEPTH
self.DEPTH = DEPTH
assert INIT_SIZE < DEPTH, (INIT_SIZE, DEPTH)
self.INIT_SIZE = INIT_SIZE
self.PTR_MAX = DEPTH - 1
self.index_t = HBits(log2ceil(DEPTH), signed=False)
[docs]
@hwt_expr_producer
def _usub_with_modulo(self, v: RtlSignal, decrVal: Union[int, RtlSignal]):
MAX = self.PTR_MAX
t = v._dtype.from_py
DEPTH = self.DEPTH
if isPow2(DEPTH):
return v - decrVal
elif isinstance(decrVal, int) and MAX % decrVal == 0:
return (v._eq(0))._ternary(t(MAX - (decrVal - 1)), v - decrVal)
else:
# return (v < decrVal)._ternary(t(MAX) - (v - decrVal), v - decrVal)
w = v._dtype.bit_length()
if isinstance(decrVal, int):
decrVal = t(decrVal)
else:
full_sum = v._ext(w + 1) - decrVal._ext(w + 1)
overflow = full_sum.getMsb() | (full_sum >= self.DEPTH)
intMax_min_depth = ((1 << (w)) - self.DEPTH)
return overflow._ternary(v - decrVal - intMax_min_depth,
v - decrVal)
[docs]
@hwt_expr_producer
def _uadd_with_modulo(self, v: RtlSignal, incrVal: Union[int, RtlSignal]):
MAX = self.PTR_MAX
DEPTH = self.DEPTH
t = v._dtype.from_py
if isPow2(DEPTH):
return v + incrVal
elif isinstance(incrVal, int) and DEPTH % incrVal == 0:
return (v._eq(MAX))._ternary(t(0), v + incrVal)
else:
w = v._dtype.bit_length()
if isinstance(incrVal, int):
incrVal = t(incrVal)
full_sum = v._ext(w + 1) + incrVal._ext(w + 1)
overflow = full_sum.getMsb() | (full_sum >= self.DEPTH)
intMax_min_depth = ((1 << (w)) - self.DEPTH)
return overflow._ternary(v + incrVal + intMax_min_depth,
v + incrVal)
[docs]
@staticmethod
def _normalize_EnWaitIncrValTuple(en_wait_incrVal: EnWaitIncrValTuple) -> EnWaitIncrValTuple:
if len(en_wait_incrVal) == 2:
en, wait = en_wait_incrVal
incr = 1
return (en, wait, incr)
else:
assert len(en_wait_incrVal) == 3, en_wait_incrVal
return en_wait_incrVal
[docs]
@hwt_expr_producer
def _is_fifo_ptr_add_possible(self,
ptr0: RtlSignal, ptr1: Union[RtlSignal, int],
allow_eq: RtlSignal,
ptr0IncrVal: Union[HBitsRtlSignal, int]
) -> RtlSignal:
DEPTH = self.DEPTH
if isinstance(ptr0IncrVal, int) and DEPTH % ptr0IncrVal == 0:
assert ptr0IncrVal <= DEPTH, (ptr0IncrVal, DEPTH)
wouldCross0 = ptr0._eq(DEPTH - ptr0IncrVal)
else:
if isPow2(DEPTH):
wouldCross0 = (ptr0 + ptr0IncrVal) < ptr0
else:
wouldCross0 = ((ptr0 + ptr0IncrVal) < ptr0) | ((ptr0 + ptr0IncrVal) >= DEPTH)
# can not move if ptr0 + ptr0IncrVal would cross the value of ptr1
if isinstance(ptr0IncrVal, int) and ptr0IncrVal == 1:
# there is only a single value which ptr0 can not have if ~allow_eq
addPossible = allow_eq | (ptr0 != ptr1)
else:
# there is a range of value which ptr0 can not have.
# ~allow_eq modifies this range
dist = self._fifo_ptr_distance(ptr0, ptr1)
if isinstance(ptr0IncrVal, int):
ptr0IncrVal = ptr0._dtype.from_py(ptr0IncrVal)
_ptr0IncrVal = ptr0IncrVal._zext(dist._dtype.bit_length())
addPossible = (ptr0._eq(ptr1))._ternary(allow_eq,
dist >= _ptr0IncrVal)
return addPossible, wouldCross0
[docs]
@hwt_expr_producer
def _fifo_ptr_distance(self, ptr0: HBitsRtlSignal, ptr1: HBitsRtlSignal):
"""
Computes distance from ptr0 to ptr1
"""
w = ptr1._dtype.bit_length()
diff = ptr1._zext(w + 1) - ptr0._zext(w + 1)
DEPTH = self.DEPTH
# ;note: (diff + DEPTH)._trunc(w) # works only for pow2 DEPTH
wraps = ptr1 < ptr0;
return wraps._ternary((diff + DEPTH), diff)
[docs]
def _should_use_distributed_locks(self, write_en_wait_incrVal: EnWaitIncrValTuple,
read_en_wait_incrVal_list: list[EnWaitIncrValTuple]) -> bool:
"""
Use distibuted locks if the check for overflow needs to check incrVal
because
"""
for en_wait_incrVal in chain((write_en_wait_incrVal,), read_en_wait_incrVal_list):
if len(en_wait_incrVal) == 3:
incrVal = en_wait_incrVal[2]
if not isinstance(incrVal, int):
return True
if self.DEPTH % incrVal != 0:
return True
return False
[docs]
def fifo_pointers(self, write_en_wait_incrVal: EnWaitIncrValTuple,
read_en_wait_incrVal_list: list[EnWaitIncrValTuple])\
->list[tuple[RtlSignal, RtlSignal]]:
"""
Create fifo writer and reader pointers and enable/wait logic
This functions supports multiple reader pointers
:note: Multiple read pointers are useful when the data in fifo passes
through multiple states, this efficiently means that instead of
two FIFOs betwen some components we can use just 1 with multiple read poiners.
For example 1st read pointer may represent if the data is beeing processed (lock)
and the second if the data processing was finished and the item in fifo is deallocated (commit).
:note: *_en are inputs, *_wait, are outputs
:note: en=1 and wait=1 will result in nop and will not cause underflow/overflow
:attention: writer pointer next logic check only last reader pointer
:return: list, tule(en, ptr) for writer and each reader
"""
assert len(read_en_wait_incrVal_list) > 0
s = self.parent._sig
r = self.parent._reg
index_t = self.index_t
DEPTH = self.DEPTH
fifo_write = s("fifo_write")
write_ptr = r("write_ptr", index_t, self.INIT_SIZE % self.DEPTH)
ack_ptr_list = [(fifo_write, write_ptr), ]
# Write ptr logic part 1 out of 2
en, wait, write_incr = self._normalize_EnWaitIncrValTuple(write_en_wait_incrVal)
write_en_wait = en, wait
ptr = write_ptr
IS_INITIALIZED_FULL = self.INIT_SIZE == DEPTH
USE_DISTRIBUTED_LOCK = self._should_use_distributed_locks(write_en_wait_incrVal, read_en_wait_incrVal_list)
if USE_DISTRIBUTED_LOCK:
crossed0_w = r(f"crossed0_w", def_val=IS_INITIALIZED_FULL)
crossed0 = crossed0_w
# instantiate all read pointers
for i, (read_en, read_wait, read_incr) in enumerate([self._normalize_EnWaitIncrValTuple(v)
for v in read_en_wait_incrVal_list]):
read_ptr = r(f"read_ptr{i:d}", index_t, 0)
fifo_read = s(f"fifo_read{i:d}")
ack_ptr_list.append((fifo_read, read_ptr))
# update reader (tail) pointer as needed
# check if state where w_ptr == r_ptr is full or empty
if USE_DISTRIBUTED_LOCK:
crossed0_r = r(f"crossed0_r{i:d}", def_val=i == 0 and IS_INITIALIZED_FULL)
ptrsAllowedToEqual = crossed0 != crossed0_r
else:
last_was_r = r(f"last_was_r{i:d}", def_val=i != 0 or not IS_INITIALIZED_FULL)
ptrsAllowedToEqual = ~last_was_r
read_possible, wouldCross0 = self._is_fifo_ptr_add_possible(read_ptr, ptr, ptrsAllowedToEqual, read_incr)
if USE_DISTRIBUTED_LOCK:
If(read_en & read_possible & wouldCross0,
crossed0_r(~crossed0_r)
)
else:
If(read_en & read_possible,
# write stalled and comsumming item, potentially reaching
# w_ptr if write pointer not advancing enough
last_was_r(1)
).Elif(en & ~wait,
# not reading and write advancing ==>
last_was_r(0)
)
read_wait(~read_possible)
fifo_read(read_en & read_possible)
If(fifo_read,
read_ptr(self._uadd_with_modulo(read_ptr, read_incr))
)
# previous reader is next port writer (producer) as it next reader can continue
# only if previous reader did consume the item
en, _ = read_en, read_wait
ptr = read_ptr
if USE_DISTRIBUTED_LOCK:
crossed0 = crossed0_r
# Write ptr logic part 2 out of 2
write_en, write_wait = write_en_wait
if USE_DISTRIBUTED_LOCK:
ptrsAllowedToEqual = crossed0_w._eq(crossed0_r)
else:
ptrsAllowedToEqual = last_was_r
write_possible, w_willCross0 = self._is_fifo_ptr_add_possible(
write_ptr, read_ptr, ptrsAllowedToEqual, write_incr)
if USE_DISTRIBUTED_LOCK:
If(write_en & write_possible & w_willCross0,
crossed0_w(~crossed0_w)
)
If(fifo_write,
write_ptr(self._uadd_with_modulo(write_ptr, write_incr))
)
# Update Empty and Full flags
write_wait(~write_possible)
fifo_write(write_en & write_possible)
return ack_ptr_list