#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import If
from hwt.hdl.types.bits import HBits
from hwt.hwIOs.agents.fifo import HwIOFifoWriterAgent
from hwt.hwIOs.std import HwIOFifoWriter, HwIOFifoReader, HwIOSignal
from hwt.hwIOs.utils import addClkRstn
from hwt.math import log2ceil
from hwt.pyUtils.typingFuture import override
from hwt.serializer.mode import serializeParamsUniq
from hwtLib.mem.fifo import Fifo
from hwtSimApi.hdlSimulator import HdlSimulator
from ipCorePackager.intfIpMeta import IntfIpMetaNotSpecifiedError
[docs]
class FifoWriterDropable(HwIOFifoWriter):
"""
FIFO write port interface witch commit and discard signal
used to drop data chunks already written in FIFO
:note: commit and discard behaves as another data signal
it is valid if en=1
:note: only one from "commit", "discard" can be 1 at the same time
:ivar ~.commit: if 1 all the written data are made available to reader,
including current data word
:ivar ~.discard: if 1 all written data which were not commited are discarded
including current data word
.. hwt-autodoc::
"""
@override
def hwDeclr(self):
super(FifoWriterDropable, self).hwDeclr()
self.commit = HwIOSignal()
self.discard = HwIOSignal()
[docs]
@override
def _initSimAgent(self, sim: HdlSimulator):
self._ag = FifoWriterDropableAgent(sim, self)
[docs]
@override
def _getIpCoreIntfClass(self):
raise IntfIpMetaNotSpecifiedError()
[docs]
class FifoWriterDropableAgent(HwIOFifoWriterAgent):
[docs]
@override
def set_data(self, d):
i = self.hwIO
if d is None:
commit, discard = (None, None)
else:
d, commit, discard = d
i.commit.write(commit)
i.discard.write(discard)
i.data.write(d)
[docs]
@override
def get_data(self):
i = self.hwIO
return (
i.commit.read(),
i.discard.read(),
i.data.read(),
)
[docs]
@serializeParamsUniq
class FifoDrop(Fifo):
"""
:see: :class:`hwtLib.mem.fifo`
Fifo with an extra signals for writter which allows to commit
or discard data chung writen in to fifo.
:attention: the commit/drop logic is executed on write
but also during dataIn.wait=1 this allows for droping
if FIFO is full of uncommited data
.. hwt-autodoc:: _example_FifoDrop
"""
@override
def hwDeclr(self):
assert int(self.DEPTH) > 0,\
"Fifo is disabled in this case, do not use it entirely"
addClkRstn(self)
with self._hwParamsShared():
self.dataIn = FifoWriterDropable()
self.dataOut = HwIOFifoReader()._m()
self._declr_size_and_space()
@override
def hwImpl(self):
DEPTH = self.DEPTH
index_t = HBits(log2ceil(DEPTH), signed=False)
s = self._sig
r = self._reg
mem = self.mem = s("memory", HBits(self.DATA_WIDTH)[self.DEPTH])
# write pointer which is seen by reader
wr_ptr = r("wr_ptr", index_t, 0)
# write pointer which is used by writer and can be potentially
# reseted to a wr_ptr value or can update wr_ptr
wr_ptr_tmp = r("wr_ptr_tmp", index_t, 0)
rd_ptr = r("rd_ptr", index_t, 0)
MAX_DEPTH = DEPTH - 1
dout = self.dataOut
din = self.dataIn
fifo_write = s("fifo_write")
fifo_read = s("fifo_read")
# Update Tail pointer as needed
If(fifo_read,
If(rd_ptr._eq(MAX_DEPTH),
rd_ptr(0)
).Else(
rd_ptr(rd_ptr + 1)
)
)
# Increment Head pointer as needed
If(din.discard,
wr_ptr_tmp(wr_ptr),
).Elif(fifo_write,
If(wr_ptr_tmp._eq(MAX_DEPTH),
wr_ptr_tmp(0)
).Else(
wr_ptr_tmp(wr_ptr_tmp + 1)
),
If(din.commit,
wr_ptr(wr_ptr_tmp._rtlNextSig)
)
)
If(self.clk._onRisingEdge(),
If(fifo_write,
# Write Data to Memory
mem[wr_ptr_tmp](din.data)
)
)
# assert isPow2(int(DEPTH)), DEPTH
looped = r("looped", def_val=False)
# same thing for looped as wr_ptr_tmp for wr_ptr
looped_tmp = r("looped_tmp", def_val=False)
fifo_read(dout.en & (looped | (wr_ptr != rd_ptr)))
If(self.clk._onRisingEdge(),
If(fifo_read,
# Update data output
dout.data(mem[rd_ptr])
)
)
fifo_write(din.en & (~looped_tmp | (wr_ptr_tmp != rd_ptr)))
# looped logic
If(din.en & din.commit,
looped(looped_tmp._rtlNextSig)
).Elif(dout.en & rd_ptr._eq(MAX_DEPTH),
looped(False)
)
If(din.discard,
looped_tmp(looped)
).Elif(din.en & wr_ptr_tmp._eq(MAX_DEPTH),
looped_tmp(True)
).Elif(dout.en & rd_ptr._eq(MAX_DEPTH),
looped_tmp(False)
)
# Update Empty and Full flags
If(wr_ptr_tmp._eq(rd_ptr),
If(looped_tmp,
din.wait(1),
).Else(
din.wait(0)
)
).Else(
din.wait(0),
)
If(wr_ptr._eq(rd_ptr),
If(looped,
dout.wait(0)
).Else(
dout.wait(1),
)
).Else(
dout.wait(0)
)
if self.EXPORT_SIZE or self.EXPORT_SPACE:
stash_size = r("stash_size_reg", self.size._dtype, 0)
If(din.discard,
stash_size(0)
).Elif(fifo_write,
If(din.commit,
stash_size(0)
).Else(
stash_size(stash_size + 1)
)
)
if self.EXPORT_SIZE:
size = r("size_reg", self.size._dtype, 0)
If(fifo_read,
If(fifo_write & din.commit,
size(size + stash_size)
).Else(
size(size - 1)
)
).Else(
If(fifo_write & din.commit,
size(size + stash_size + 1)
)
)
self.size(size + stash_size)
if self.EXPORT_SPACE:
space = r("space_reg", self.space._dtype, DEPTH)
If(fifo_read,
If(din.discard,
space(space - stash_size)
).Elif(fifo_write,
space(space - 1)
).Else(
space(space + 1)
)
).Else(
If(din.discard,
space(space - stash_size)
)
)
self.space(space)
[docs]
def _example_FifoDrop():
m = FifoDrop()
m.DATA_WIDTH = 8
m.EXPORT_SIZE = True
m.EXPORT_SPACE = True
m.DEPTH = 16
return m
if __name__ == "__main__":
from hwt.synth import to_rtl_str
m = _example_FifoDrop()
print(to_rtl_str(m))