Source code for hwtLib.mem.ramXor
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from hwt.code import Xor
from hwt.hdl.constants import READ_WRITE, WRITE, READ
from hwt.interfaces.utils import propagateClk, addClkRst
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.hObjList import HObjList
from hwt.synthesizer.param import Param
from hwtLib.mem.ram import RamSingleClock
[docs]@serializeParamsUniq
class RamXorSingleClock(RamSingleClock):
"""
Multiport XOR based RAM with only one clock signal
:note: XOR, LVT, multipumped, banked and similar multiport memory implementations
are used to build a more memory ported memories from memories available.
https://doi.org/10.1145/2145694.2145730
https://doi.org/10.1145/2629629
:note: The bitwise XOR operation is commutative, associative, and hasthe following properties
1. A^0 = A
2. B^B = 0
3. A^B^B=A
The third property, which follows from the first two, implies that we can XOR two values A and B together,
and recover A by XORing the result with B. This componnet uses this principe to generate additional write ports.
:see: :class:`~.RamSingleClock`
.. hwt-autodoc::
"""
def _config(self):
RamSingleClock._config(self)
self.PORT_CNT = (WRITE, WRITE, READ)
self.PRIMITIVE_MEMORY_PORTS = Param((WRITE, READ))
def _declr(self):
addClkRst(self)
self._declr_ports()
r_ports = []
w_ports = []
rw_ports = []
for p in self.port:
if p.HAS_R and p.HAS_W:
rw_ports.append(p)
else:
if p.HAS_R:
r_ports.append(p)
if p.HAS_W:
w_ports.append(p)
self._r_ports = r_ports
self._w_ports = w_ports
self._rw_ports = rw_ports
primitive_ram_r_port_i = []
primitive_ram_w_port_i = []
primitive_ram_rw_port_i = []
if isinstance(self.PRIMITIVE_MEMORY_PORTS, int):
primitive_ram_rw_port_i = list(range(self.PRIMITIVE_MEMORY_PORTS))
else:
for i, p in enumerate(self.PRIMITIVE_MEMORY_PORTS):
if p == READ:
primitive_ram_r_port_i.append(i)
elif p == WRITE:
primitive_ram_w_port_i.append(i)
elif p == READ_WRITE:
primitive_ram_rw_port_i.append(i)
else:
raise NotImplementedError(p)
can_be_primitive_ram = len(rw_ports) <= len(primitive_ram_rw_port_i) and \
len(r_ports) <= len(primitive_ram_r_port_i) and \
len(w_ports) <= len(primitive_ram_w_port_i)
self._can_be_primitive_ram = can_be_primitive_ram
if can_be_primitive_ram:
self._declr_children()
def _impl(self):
if self._can_be_primitive_ram:
super(RamXorSingleClock, self)._impl()
else:
if self.PRIMITIVE_MEMORY_PORTS != (WRITE, READ):
raise NotImplementedError(self.PRIMITIVE_MEMORY_PORTS)
if self._rw_ports:
raise NotImplementedError("RW ports (supports only write and read ports)")
r_ports = self._r_ports
assert r_ports
w_ports = self._w_ports
assert w_ports
# 1. construct a matrix N x N where N is a number of wrie ports
# the diagonal is set to None because we do not need to synchronize the port with itself
w_rams = HObjList(
HObjList(
RamSingleClock() if x != y else None
for x in w_ports
)
for y in w_ports)
# construct a matrix M x N where M is number of read ports
r_rams = HObjList(HObjList(RamSingleClock() for _ in w_ports)
for _ in range(len(r_ports)))
for row in w_rams + r_rams:
for r in row:
if r is None:
continue
r._updateParamsFrom(self, exclude=(("PORT_CNT",), ()))
r.PORT_CNT = self.PRIMITIVE_MEMORY_PORTS
if self.INIT_DATA is not None:
raise NotImplementedError()
r.INIT_DATA = tuple(0 for _ in range(2 ** r.ADDR_WIDTH))
self.w_rams = w_rams
self.r_rams = r_rams
# :type: List[Tuple[RtlSignal, RtlSignal, RtlSignal]]
# List of tuples (en, address, write data), used for write forwarding on read ports
write_in_progress_staus = []
# 2. connect them with XOR logic
# add the register on write address and data because we need to load the data for XOR first
for i, w in enumerate(w_ports):
xor_src = []
for mem in w_rams[i]:
if mem is not None:
xor_src.append(mem)
xor_loaded_src = []
for s in xor_src:
p = s.port[1]
p.addr(w.addr)
p.en(w.en)
xor_loaded_src.append(p.dout)
w_addr_reg = self._reg(f"{w._name}_addr_reg", w.addr._dtype)
w_addr_reg(w.addr)
w_en_reg = self._reg(f"{w._name}_en_reg", def_val=0)
w_en_reg(w.en)
w_data_reg = self._reg(f"{w._name}_data_reg", w.din._dtype)
w_data_reg(w.din)
xored_data = Xor(w_data_reg, *xor_loaded_src)
for ram_row in w_rams + r_rams:
xor_dst = ram_row[i]
if xor_dst is None:
continue
xor_dst = xor_dst.port[0]
xor_dst.addr(w_addr_reg)
xor_dst.en(w_en_reg)
xor_dst.din(xored_data)
write_in_progress_staus.append((w_en_reg, w_addr_reg, xored_data))
# 3. connect read ports to memories
for (i, r), r_ram_column in zip(enumerate(r_ports), r_rams):
out_xor_inputs = []
for r_ram, (w_en_reg, w_addr_reg, w_xored_data) in zip(r_ram_column, write_in_progress_staus):
r_ram = r_ram.port[1]
s = (w_en_reg & w_addr_reg._eq(r.addr))._ternary(w_xored_data, r_ram.dout)
out_xor_inputs.append(s)
r_ram.addr(r.addr)
r_ram.en(r.en)
r.dout(Xor(*out_xor_inputs))
propagateClk(self)
if __name__ == "__main__":
from hwt.synthesizer.utils import to_rtl_str
u = RamXorSingleClock()
u.ADDR_WIDTH = 10
u.DATA_WIDTH = 32
u.PORT_CNT = (*(WRITE for _ in range(4)), *(READ for _ in range(8)))
print(to_rtl_str(u))