Source code for hwtLib.avalon.mm_buff

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from typing import Union

from hwt.interfaces.std import Handshaked
from hwt.interfaces.utils import addClkRstn, propagateClkRstn
from hwt.serializer.mode import serializeParamsUniq
from hwt.synthesizer.interfaceLevel.interfaceUtils.utils import packIntf, \
    connectPacked
from hwt.synthesizer.param import Param
from hwtLib.abstract.busBridge import BusBridge
from hwtLib.avalon.mm import AvalonMM
from hwtLib.handshaked.fifo import HandshakedFifo
from hwtLib.handshaked.reg import HandshakedReg
from hwtLib.handshaked.streamNode import StreamNode


[docs]@serializeParamsUniq class AvalonMmBuff(BusBridge): """ Transaction buffer for Avalon MM interface .. hwt-autodoc:: """ def _config(self): AvalonMM._config(self) self.ADDR_BUFF_DEPTH = Param(4) self.DATA_BUFF_DEPTH = Param(4) def _declr(self): addClkRstn(self) with self._paramsShared(): self.s = AvalonMM() with self._paramsShared(): self.m: AvalonMM = AvalonMM()._m() assert self.ADDR_BUFF_DEPTH > 0 or self.DATA_BUFF_DEPTH > 0, ( "This buffer is completely disabled," " it should not be instantiated at all", self.ADDR_BUFF_DEPTH, self.DATA_BUFF_DEPTH)
[docs] def _mk_buff(self, DEPTH: int, DATA_WIDTH: int) -> Union[HandshakedFifo, HandshakedReg]: if DEPTH == 1: b = HandshakedReg(Handshaked) else: b = HandshakedFifo(Handshaked) b.DEPTH = self.DATA_BUFF_DEPTH b.DATA_WIDTH = DATA_WIDTH return b
def _impl(self): s: AvalonMM = self.s m: AvalonMM = self.m r_data = self._mk_buff(self.DATA_BUFF_DEPTH, self.DATA_WIDTH) self.r_data = r_data r_data.dataIn.data(m.readData) r_data.dataIn.vld(m.readDataValid) s.readData(r_data.dataOut.data) s.readDataValid(r_data.dataOut.vld) r_data.dataOut.rd(1) w_resp = self._mk_buff(self.DATA_BUFF_DEPTH, m.response._dtype.bit_length()) self.w_resp = w_resp w_resp.dataIn.data(m.response) w_resp.dataIn.vld(m.writeResponseValid) s.response(w_resp.dataOut.data) s.writeResponseValid(w_resp.dataOut.vld) w_resp.dataOut.rd(1) addr_data = packIntf(s, exclude=[s.readData, s.readDataValid, s.response, s.writeResponseValid, s.waitRequest]) addr = self._mk_buff(self.ADDR_BUFF_DEPTH, addr_data._dtype.bit_length()) self.addr = addr addr.dataIn.data(addr_data) StreamNode( [(s.read | s.write, ~s.waitRequest), ], [addr.dataIn], ).sync() m_tmp = AvalonMM() m_tmp._updateParamsFrom(m) self.m_tmp = m_tmp non_addr_signals = [ m_tmp.readData, m_tmp.readDataValid, m_tmp.response, m_tmp.writeResponseValid, m_tmp.waitRequest ] connectPacked(addr.dataOut.data, m_tmp, exclude=non_addr_signals) m(m_tmp, exclude=non_addr_signals + [m_tmp.read, m_tmp.write]) m.read(m_tmp.read & addr.dataOut.vld) m.write(m_tmp.write & addr.dataOut.vld) addr.dataOut.rd(~m.waitRequest) propagateClkRstn(self)
if __name__ == "__main__": from hwt.synthesizer.utils import to_rtl_str u = AvalonMmBuff() print(to_rtl_str(u))