#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Tuple, List
from hwt.code import If
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import FifoWriter, FifoReader, VectSignal
from hwt.interfaces.utils import addClkRstn
from hwt.math import log2ceil
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.param import Param
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
from hwt.synthesizer.unit import Unit
from hwt.synthesizer.rtlLevel.constants import NOT_SPECIFIED
# https://eewiki.net/pages/viewpage.action?pageId=20939499
[docs]@serializeParamsUniq
class Fifo(Unit):
"""
Generic FIFO usually mapped to BRAM.
:ivar ~.EXPORT_SIZE: parameter, if true "size" signal will be exported
:ivar ~.size: optional signal with count of items stored in this fifo
:ivar ~.EXPORT_SPACE: parameter, if true "space" signal is exported
:ivar ~.space: optional signal with count of items which can be added to this fifo
.. hwt-autodoc:: _example_Fifo
"""
def _config(self):
self.DATA_WIDTH = Param(64)
self.DEPTH = Param(0)
self.EXPORT_SIZE = Param(False)
self.EXPORT_SPACE = Param(False)
self.INIT_DATA: tuple = Param(())
self.INIT_DATA_FIRST_WORD = Param(NOT_SPECIFIED)
[docs] def _declr_size_and_space(self):
if self.EXPORT_SIZE:
self.size = VectSignal(log2ceil(self.DEPTH + 1), signed=False)._m()
if self.EXPORT_SPACE:
self.space = VectSignal(log2ceil(self.DEPTH + 1), signed=False)._m()
def _declr(self):
assert self.DEPTH > 0, \
"Fifo is disabled in this case, do not use it entirely"
addClkRstn(self)
with self._paramsShared():
self.dataIn = FifoWriter()
self.dataOut = FifoReader()._m()
self._declr_size_and_space()
[docs] def fifo_pointers(self, DEPTH: int,
write_en_wait: Tuple[RtlSignal, RtlSignal],
read_en_wait_list: List[Tuple[RtlSignal, RtlSignal]])\
->List[Tuple[RtlSignal, RtlSignal]]:
"""
Create fifo writer and reader pointers and enable/wait logic
This functions supports multiple reader pointers
:attention: writer pointer next logic check only last reader pointer
:return: list, tule(en, ptr) for writer and each reader
"""
index_t = Bits(log2ceil(DEPTH), signed=False)
# assert isPow2(DEPTH), DEPTH
MAX_DEPTH = DEPTH - 1
s = self._sig
r = self._reg
fifo_write = s("fifo_write")
write_ptr = _write_ptr = r("write_ptr", index_t, min(len(self.INIT_DATA), MAX_DEPTH))
ack_ptr_list = [(fifo_write, write_ptr), ]
# update writer (head) pointer as needed
If(fifo_write,
If(write_ptr._eq(MAX_DEPTH),
write_ptr(0)
).Else(
write_ptr(write_ptr + 1)
)
)
write_en, _ = write_en_wait
# instantiate all read pointers
for i, (read_en, read_wait) in enumerate(read_en_wait_list):
read_ptr = r(f"read_ptr{i:d}", index_t, 0)
fifo_read = s(f"fifo_read{i:d}")
ack_ptr_list.append((fifo_read, read_ptr))
# update reader (tail) pointer as needed
If(fifo_read,
If(read_ptr._eq(MAX_DEPTH),
read_ptr(0)
).Else(
read_ptr(read_ptr + 1)
)
)
looped = r(f"looped{i:d}", def_val=False if len(self.INIT_DATA) <= MAX_DEPTH else True)
# looped logic
If(write_en & write_ptr._eq(MAX_DEPTH),
looped(True)
).Elif(read_en & read_ptr._eq(MAX_DEPTH),
looped(False)
)
# Update Empty and Full flags
read_wait(write_ptr._eq(read_ptr) & ~looped)
fifo_read(read_en & (looped | (write_ptr != read_ptr)))
# previous reader is next port writer (producer) as it next reader can continue only if previous reader did consume the item
write_en, _ = read_en, read_wait
write_ptr = read_ptr
write_en, write_wait = write_en_wait
write_ptr = _write_ptr
# Update Empty and Full flags
write_wait(write_ptr._eq(read_ptr) & looped)
fifo_write(write_en & (~looped | (write_ptr != read_ptr)))
return ack_ptr_list
def _impl(self):
DEPTH = self.DEPTH
dout = self.dataOut
din = self.dataIn
s = self._sig
r = self._reg
((fifo_write, wr_ptr), (fifo_read, rd_ptr),) = self.fifo_pointers(
DEPTH, (din.en, din.wait), [(dout.en, dout.wait), ])
init_data = self.INIT_DATA
if not init_data:
init_data_expanded = None
else:
init_data_expanded = list(init_data) + [None for _ in range(self.DEPTH - len(init_data))]
if self.DATA_WIDTH:
mem = self.mem = s("memory", Bits(self.DATA_WIDTH)[DEPTH], def_val=init_data_expanded)
If(self.clk._onRisingEdge(),
If(fifo_write,
# Write Data to Memory
mem[wr_ptr](din.data)
)
)
If(self.clk._onRisingEdge(),
If(fifo_read,
# Update data output
dout.data(mem[rd_ptr])
)
if self.INIT_DATA_FIRST_WORD == NOT_SPECIFIED else
If(self.rst_n._isOn(),
dout.data(self.INIT_DATA_FIRST_WORD),
).Elif(fifo_read,
# Update data output
dout.data(mem[rd_ptr])
)
)
if self.EXPORT_SIZE:
size = r("size_reg", self.size._dtype, len(self.INIT_DATA))
If(fifo_read,
If(~fifo_write,
size(size - 1)
)
).Else(
If(fifo_write,
size(size + 1)
)
)
self.size(size)
if self.EXPORT_SPACE:
space = r("space_reg", self.space._dtype, DEPTH - len(self.INIT_DATA))
If(fifo_read,
If(~fifo_write,
space(space + 1)
)
).Else(
If(fifo_write,
space(space - 1)
)
)
self.space(space)
[docs]def _example_Fifo():
u = Fifo()
u.DATA_WIDTH = 8
u.EXPORT_SIZE = True
u.EXPORT_SPACE = True
u.INIT_DATA = (1, 2, 3)
u.INIT_DATA_FIRST_WORD = 0
u.DEPTH = 16
return u
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
u = _example_Fifo()
print(to_rtl_str(u))