Source code for hwtLib.abstract.frame_utils.alignment_utils

from itertools import product
from math import inf
from typing import List, Tuple, Union, Optional

from hwt.hdl.types.stream import HStream
from hwtLib.abstract.frame_utils.byte_src_info import ByteSrcInfo


[docs]def freeze_frame(f: List[List[ByteSrcInfo]])\ ->Tuple[Tuple[ByteSrcInfo, ...], ...]: return tuple(tuple(w) for w in f)
[docs]def count_not_none(items: List[Optional[ByteSrcInfo]]): for i, b in enumerate(items): if b is not None: return i return 0
[docs]def next_frame_offsets(f0_t: HStream, data_width: int): f0_B_width = f0_t.element_t.bit_length() f1_start_offsets = set() i, len_max = f0_t.len_min, f0_t.len_max while i <= len_max: offset = (i * f0_B_width) % data_width if offset in f1_start_offsets: break f1_start_offsets.add(offset) i += 1 return sorted(f1_start_offsets)
[docs]class FrameAlignmentUtils(): """ :ivar ~.word_bytes: number of bytes in 1 output word """
[docs] def __init__(self, word_bytes: int, out_offset=0): self.word_bytes = word_bytes self.out_offset = out_offset
[docs] def join_streams( self, stream_data: Tuple[Tuple[Tuple[ByteSrcInfo, ...], ...], ...], offset): """ Create output frame with specified offset from input stream :param stream_data: list of data for each stream input (stream data is list of data words, data word is a tuple of ByteSrcInfo) :param offset: offset of output stream """ word_bytes = self.word_bytes assert offset >= 0 and offset < word_bytes, (offset, word_bytes) res = [] # input data frames to byte list in_data_bytes = [None for _ in range(offset)] for frame in stream_data: for word in frame: in_data_bytes.extend(word) curr_word = None for i, d in enumerate(in_data_bytes): if i >= offset and d is None: # skip non start fill-up continue if curr_word is None or len(curr_word) == word_bytes: # start of new frame curr_word = [d, ] res.append(curr_word) else: curr_word.append(d) # final padding to ensure all words do have if curr_word is not None and len(curr_word) != word_bytes: curr_word.extend(None for _ in range(word_bytes - len(curr_word))) return freeze_frame(res)
[docs] def create_frame(self, stream_i: int, byte_cnt: int, offset_out: int, offset_in: int)\ ->Tuple[Tuple[ByteSrcInfo]]: """ :param byte_cnt: number of bytes in this frame :param offset_out: how many empty bytes should be put before data in frame :param offset_in: how many bytes skip from imput stream """ assert byte_cnt >= 0, byte_cnt word_bytes = self.word_bytes assert offset_out >= 0 and offset_out < word_bytes, (offset_out, word_bytes) assert offset_in >= 0 and offset_in < word_bytes, (offset_in, word_bytes) frame = [] if byte_cnt == 0: return freeze_frame(frame) if offset_out: frame.append([None for _ in range(offset_out)]) last_in_word_i = 0 else: last_in_word_i = -1 last_word_i = ((offset_in + byte_cnt - 1) // word_bytes) for i in range(byte_cnt): in_B_i = i + offset_in in_byte_i = in_B_i % word_bytes in_word_i = in_B_i // word_bytes is_from_last_input_word = int(in_word_i == last_word_i) B_info = ByteSrcInfo(stream_i, in_word_i, in_byte_i, is_from_last_input_word) if in_word_i == last_in_word_i: frame[-1].append(B_info) else: assert last_in_word_i == in_word_i - 1, (last_in_word_i, in_word_i) last_in_word_i = in_word_i frame.append([B_info, ]) last_data_word = frame[-1] for _ in range(word_bytes - len(last_data_word)): last_data_word.append(None) return freeze_frame(frame)
[docs] def get_bytes_in_frame_info(self, offset_out, offset_in, chunk_size, chunk_cnt, already_has_body_words): # assert chunk_cnt > 0, chunk_cnt assert chunk_size > 0, chunk_size word_bytes = self.word_bytes offset = max(offset_in, offset_out) first_word_bytes = min(word_bytes - offset, chunk_cnt * chunk_size) last_word_bytes = (offset + chunk_size * chunk_cnt) % word_bytes has_body_words = (offset + chunk_size * chunk_cnt) >= (2 * word_bytes) # if first_word_bytes + last_word_bytes == word_bytes and word_bytes % chunk_size == 0: # # the output offset will be same after adding data from this frame # has_body_words = False first_word_is_last_word = (offset + chunk_cnt * chunk_size) <= word_bytes min_representative_frame_size = first_word_bytes if has_body_words and (offset != 0 or not already_has_body_words): min_representative_frame_size += word_bytes if not first_word_is_last_word\ and not ( has_body_words and last_word_bytes == word_bytes ): min_representative_frame_size += last_word_bytes if first_word_bytes == word_bytes\ and min_representative_frame_size > word_bytes: has_body_words = True return first_word_bytes, has_body_words, \ last_word_bytes, min_representative_frame_size
[docs] def get_important_byte_cnts( self, offset_out: int, offset_in: int, chunk_size: int, chunk_cnt_min: Union[int, float], chunk_cnt_max: Union[int, float]): """ Filter chunk cnt range, let only those values which affect the number of valid bytes in first/last word of frame and presence of words in body frame """ # assert chunk_cnt_min > 0 assert chunk_cnt_min <= chunk_cnt_max, (chunk_cnt_min, chunk_cnt_max) if isinstance(chunk_cnt_min, int) and isinstance(chunk_cnt_max, int)\ and chunk_cnt_min == chunk_cnt_max: _, _, _, min_representative_frame_size = self.get_bytes_in_frame_info( offset_out, offset_in, chunk_size, chunk_cnt_min, False) return [min_representative_frame_size, ] word_bytes = self.word_bytes _chunk_cnt_min = chunk_cnt_min % (2 * word_bytes) chunk_cnt_max = _chunk_cnt_min + \ min((2 * word_bytes), chunk_cnt_max - chunk_cnt_min) chunk_cnt_min = _chunk_cnt_min sizes = set() already_has_body_words = False for chunk_cnt in range(chunk_cnt_min, chunk_cnt_max + 1): (_, has_body_words, _, min_representative_frame_size) = self.get_bytes_in_frame_info( offset_out, offset_in, chunk_size, chunk_cnt, already_has_body_words) sizes.add(min_representative_frame_size) already_has_body_words |= has_body_words return sorted(sizes)
[docs] def stream_to_all_possible_frame_formats( self, t: HStream, stream_i: int, offset_out: int): """ Generate all possible frame formats with unique features (related to masks, last and data mux) """ frames = [] chunk_size = t.element_t.bit_length() // 8 for offset_in in t.start_offsets: for byte_cnt in self.get_important_byte_cnts( offset_out, offset_in, chunk_size, t.len_min, t.len_max): frame = self.create_frame( stream_i, byte_cnt, offset_out, offset_in) frames.append(frame) return frames
[docs] def streams_to_all_possible_frame_formats( self, streams: List[HStream], offset: int): """ :see: :func:`FrameJoinUtils.stream_to_all_possible_frame_formats` for multiple input streams """ frames_per_stream = [] prev_end_offsets = [offset, ] for i, t in enumerate(streams): assert isinstance(t, HStream), t f_frames = set() for offset_out in prev_end_offsets: f_frames_tmp = self.stream_to_all_possible_frame_formats( t, i, offset_out) f_frames.update(f_frames_tmp) f_frames = list(f_frames) # pprint(f_frames) frames_per_stream.append(f_frames) _prev_end_offsets = prev_end_offsets prev_end_offsets = set() # for each frame resolve end alignment of the frames for frame in f_frames: if frame: last_word = frame[-1] o = count_not_none(last_word) else: o = 0 prev_end_offsets.add(o) prev_end_offsets = sorted(prev_end_offsets) res = set() for frame_combination in product(*frames_per_stream): res_frame = self.join_streams(frame_combination, offset) res.add(res_frame) res = list(res) return res
[docs] def resolve_input_bytes_destinations(self, streams: List[HStream]): frames = self.streams_to_all_possible_frame_formats( streams, self.out_offset) input_B_dst = self._resolve_input_bytes_destinations( frames, len(streams)) return input_B_dst
[docs] @staticmethod def can_produce_zero_len_frame(streams: List[HStream]): return [s.len_min == 0 for s in streams]
[docs] def _resolve_input_bytes_destinations(self, frames, input_cnt): """ :param frames: list of all possible frame formats generated from streams_to_all_possible_frame_formats :param input_cnt: number of input streams """ word_bytes = self.word_bytes def init_word(): return [set() for _ in range(word_bytes)] input_B_dst = [init_word() for _ in range(input_cnt)] def add_dst(state_name, input_i, input_B_i, input_time_offset, output_B_i, is_last_word_from_this_input): input_B_dst[input_i][input_B_i].add(( state_name, input_time_offset, output_B_i, is_last_word_from_this_input)) for f_i, f in enumerate(frames): for output_word_i, w in enumerate(f): state_label = (f_i, output_word_i) # resolve min_word_i_per_input_in_this_out_word min_word_i_per_input_in_this_out_word = [ inf for _ in range(input_cnt) ] for output_B_i, byte_info in enumerate(w): if byte_info is None: continue i = byte_info.stream_i min_word_i_per_input_in_this_out_word[i] = \ min(min_word_i_per_input_in_this_out_word[i], byte_info.word_i) # resolve byte destinations for output_B_i, byte_info in enumerate(w): if byte_info is None: continue input_i = byte_info.stream_i input_B_i = byte_info.byte_i time_offset = byte_info.word_i - \ min_word_i_per_input_in_this_out_word[input_i] assert time_offset >= 0, time_offset add_dst(state_label, input_i, input_B_i, time_offset, output_B_i, byte_info.is_from_last_input_word) return input_B_dst