Source code for hwtLib.mem.fifoCopy

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from hwt.code import If
from hwt.code_utils import rename_signal
from hwt.hdl.types.bits import Bits
from hwt.interfaces.std import FifoWriter, FifoReader, VldSynced
from hwt.interfaces.utils import addClkRstn
from hwt.math import log2ceil
from hwt.serializer.mode import serializeParamsUniq
from hwtLib.mem.fifo import Fifo


[docs]@serializeParamsUniq class FifoCopy(Fifo): """ :see: :class:`hwtLib.mem.fifo` Fifo with an extra signals to control replay of lastly stored data :ivar dataOut_copy_frame: The channel which drives when to capture start of the frame and when to start relaying previously stored frame from the marked start .. hwt-autodoc:: _example_FifoCopy """ def _declr(self): assert int(self.DEPTH) > 0, \ "Fifo is disabled in this case, do not use it entirely" assert int(self.DEPTH) > 1, \ "Use register instead" addClkRstn(self) with self._paramsShared(): self.dataIn = FifoWriter() self.dataOut = FifoReader()._m() fc = self.dataOut_copy_frame = VldSynced() fc.DATA_WIDTH = 1 self._declr_size_and_space() if self.EXPORT_SIZE or self.EXPORT_SPACE: raise NotImplementedError() def _impl(self): DEPTH = self.DEPTH index_t = Bits(log2ceil(DEPTH), signed=False) s = self._sig r = self._reg mem = self.mem = s("memory", Bits(self.DATA_WIDTH)[self.DEPTH + 1]) # write pointer which is seen by reader wr_ptr = r("wr_ptr", index_t, 0) # read pointer which is used by reader and can be potentially # reseted to a rd_ptr value (copy command) or can update rd_ptr (non-copy command) rd_ptr_tmp = r("rd_ptr_tmp", index_t, 0) rd_ptr = r("rd_ptr", index_t, 0) MAX_DEPTH = DEPTH - 1 assert MAX_DEPTH.bit_length() == index_t.bit_length(), (MAX_DEPTH, index_t) dout = self.dataOut din = self.dataIn fifo_write = s("fifo_write") fifo_read = s("fifo_read") frame_copy = self.dataOut_copy_frame # Update Tail pointer as needed If(fifo_read, If(frame_copy.vld & frame_copy.data, If(rd_ptr._eq(MAX_DEPTH), rd_ptr_tmp(0) ).Else( rd_ptr_tmp(rd_ptr + 1) ) ).Elif(rd_ptr_tmp._eq(MAX_DEPTH), rd_ptr_tmp(0) ).Else( rd_ptr_tmp(rd_ptr_tmp + 1) ), ) If(frame_copy.vld & ~frame_copy.data, # jump to next frame rd_ptr(rd_ptr_tmp) # If(rd_ptr_tmp._eq(MAX_DEPTH), # rd_ptr(0) # ).Else( # rd_ptr(rd_ptr_tmp + 1) # ) ) wr_ptr_next = self._sig("wr_ptr_next", index_t) If(wr_ptr._eq(MAX_DEPTH), wr_ptr_next(0) ).Else( wr_ptr_next(wr_ptr + 1) ) # Increment Head pointer as needed If(fifo_write, wr_ptr(wr_ptr_next) ) If(self.clk._onRisingEdge(), If(fifo_write, # Write Data to Memory mem[wr_ptr](din.data) ) ) fifo_read(dout.en & ( (wr_ptr != rd_ptr_tmp) | (frame_copy.vld & frame_copy.data)) ) read_addr_tmp = rename_signal(self, (frame_copy.vld & frame_copy.data)._ternary(rd_ptr, rd_ptr_tmp), "read_addr_tmp") If(self.clk._onRisingEdge(), If(fifo_read, # Update data output dout.data(mem[read_addr_tmp]) ) ) fifo_write(din.en & (wr_ptr_next != rd_ptr)) # Update Empty and Full flags din.wait(wr_ptr_next._eq(rd_ptr)) If(frame_copy.vld & frame_copy.data, dout.wait(0) ).Else( dout.wait((wr_ptr._eq(rd_ptr_tmp))) )
[docs]def _example_FifoCopy(): u = FifoCopy() u.DATA_WIDTH = 8 # u.EXPORT_SIZE = True # u.EXPORT_SPACE = True u.DEPTH = 16 return u
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = _example_FifoCopy() print(to_rtl_str(u))