Source code for hwtLib.mem.fifoCopy
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If
from hwt.code_utils import rename_signal
from hwt.hdl.types.bits import HBits
from hwt.hwIOs.std import HwIOFifoWriter, HwIOFifoReader, HwIODataVld
from hwt.hwIOs.utils import addClkRstn
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwt.serializer.mode import serializeParamsUniq
from hwtLib.mem.fifo import Fifo
[docs]
@serializeParamsUniq
class FifoCopy(Fifo):
"""
:see: :class:`hwtLib.mem.fifo`
Fifo with an extra signals to control replay of lastly stored data
:ivar dataOut_copy_frame: The channel which drives when to capture start of the frame
and when to start relaying previously stored frame from the marked start
.. hwt-autodoc:: _example_FifoCopy
"""
@override
def hwDeclr(self):
assert int(self.DEPTH) > 0, \
"Fifo is disabled in this case, do not use it entirely"
assert int(self.DEPTH) > 1, \
"Use register instead"
addClkRstn(self)
with self._hwParamsShared():
self.dataIn = HwIOFifoWriter()
self.dataOut = HwIOFifoReader()._m()
fc = self.dataOut_copy_frame = HwIODataVld()
fc.DATA_WIDTH = 1
self._declr_size_and_space()
if self.EXPORT_SIZE or self.EXPORT_SPACE:
raise NotImplementedError()
@override
def hwImpl(self):
DEPTH = self.DEPTH
index_t = HBits(log2ceil(DEPTH), signed=False)
s = self._sig
r = self._reg
mem = self.mem = s("memory", HBits(self.DATA_WIDTH)[self.DEPTH + 1])
# write pointer which is seen by reader
wr_ptr = r("wr_ptr", index_t, 0)
# read pointer which is used by reader and can be potentially
# reseted to a rd_ptr value (copy command) or can update rd_ptr (non-copy command)
rd_ptr_tmp = r("rd_ptr_tmp", index_t, 0)
rd_ptr = r("rd_ptr", index_t, 0)
MAX_DEPTH = DEPTH - 1
assert MAX_DEPTH.bit_length() == index_t.bit_length(), (MAX_DEPTH, index_t)
dout = self.dataOut
din = self.dataIn
fifo_write = s("fifo_write")
fifo_read = s("fifo_read")
frame_copy = self.dataOut_copy_frame
# Update Tail pointer as needed
If(fifo_read,
If(frame_copy.vld & frame_copy.data,
If(rd_ptr._eq(MAX_DEPTH),
rd_ptr_tmp(0)
).Else(
rd_ptr_tmp(rd_ptr + 1)
)
).Elif(rd_ptr_tmp._eq(MAX_DEPTH),
rd_ptr_tmp(0)
).Else(
rd_ptr_tmp(rd_ptr_tmp + 1)
),
)
If(frame_copy.vld & ~frame_copy.data,
# jump to next frame
rd_ptr(rd_ptr_tmp)
# If(rd_ptr_tmp._eq(MAX_DEPTH),
# rd_ptr(0)
# ).Else(
# rd_ptr(rd_ptr_tmp + 1)
# )
)
wr_ptr_next = self._sig("wr_ptr_next", index_t)
If(wr_ptr._eq(MAX_DEPTH),
wr_ptr_next(0)
).Else(
wr_ptr_next(wr_ptr + 1)
)
# Increment Head pointer as needed
If(fifo_write,
wr_ptr(wr_ptr_next)
)
If(self.clk._onRisingEdge(),
If(fifo_write,
# Write Data to Memory
mem[wr_ptr](din.data)
)
)
fifo_read(dout.en & (
(wr_ptr != rd_ptr_tmp) |
(frame_copy.vld & frame_copy.data))
)
read_addr_tmp = rename_signal(self, (frame_copy.vld & frame_copy.data)._ternary(rd_ptr, rd_ptr_tmp), "read_addr_tmp")
If(self.clk._onRisingEdge(),
If(fifo_read,
# Update data output
dout.data(mem[read_addr_tmp])
)
)
fifo_write(din.en & (wr_ptr_next != rd_ptr))
# Update Empty and Full flags
din.wait(wr_ptr_next._eq(rd_ptr))
If(frame_copy.vld & frame_copy.data,
dout.wait(0)
).Else(
dout.wait((wr_ptr._eq(rd_ptr_tmp)))
)
[docs]
def _example_FifoCopy():
m = FifoCopy()
m.DATA_WIDTH = 8
# m.EXPORT_SIZE = True
# m.EXPORT_SPACE = True
m.DEPTH = 16
return m
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = _example_FifoCopy()
print(to_rtl_str(m))