Source code for hwtLib.abstract.frame_utils.join.fsm
from typing import Tuple, Dict, Set, List
from hwt.pyUtils.arrayQuery import iter_with_last
from hwtLib.abstract.frame_utils.join.state_trans_info import StateTransInfo
from hwtLib.abstract.frame_utils.join.state_trans_item import StateTransItem
from hwtLib.abstract.frame_utils.join.state_trans_table import StateTransTable
from hwtLib.abstract.frame_utils.join.input_reg_val import InputRegInputVal
from copy import deepcopy
[docs]def is_from_different_input(a: Tuple[int, int, int, int],
b: Tuple[int, int, int, int],
):
return a is None \
or b is None \
or a[0] != b[0]
[docs]def is_next_byte_from_same_input(a: Tuple[int, int, int, int],
b: Tuple[int, int, int, int],
):
return not is_from_different_input(a, b) and a[2] == (b[2] - 1)
[docs]def input_B_dst_to_fsm(word_bytes: int,
input_cnt: int,
input_B_dst: List[List[Set[
Tuple[Tuple[int, int], int, int, int]
]]],
can_be_zero_len_frame: List[bool]):
"""
:param word_bytes: number of bytes in output word
:param input_cnt: number of input streams
:param input_B_dst: list with mapping of input bytes to a output bytes in each state
.. code-block::
Format of input_B_dst is: List for each input
in this list there are lists for each input byte
in this list there are sets of byte destinations for each input byte
byte destination is a tuple:
state label, input index, time index, output byte index, input last flag
:note: input_B_dst is produced by :func:`hwtLib.amba.axis_comp.frame_utils.join.FrameJoinUtils.resolve_input_bytes_destinations`
"""
# (out_frame_format_i, out_word_i): StateTransInfo
sub_states: Dict[Tuple[int, int], StateTransInfo] = {}
# create substates from input byte mux info
for in_i, in_word_dst in enumerate(input_B_dst):
for in_B_i, in_B_dsts in enumerate(in_word_dst):
for (st_label,
in_B_time,
out_B_i,
B_from_last_input_word) in in_B_dsts:
st_label: Tuple[int, int]
st = sub_states.get(st_label, None)
if st is None:
st = StateTransInfo(st_label, word_bytes, input_cnt)
sub_states[st_label] = st
st.set_output(out_B_i, in_i, in_B_time,
in_B_i, B_from_last_input_word)
# resolve max lookahead for each input
max_lookahead_for_input: List[int] = [0 for _ in range(input_cnt)]
for in_i, in_word_dst in enumerate(input_B_dst):
for in_B_i, in_B_dsts in enumerate(in_word_dst):
for st_label, in_B_time, out_B_i, _ in in_B_dsts:
max_lookahead_for_input[in_i] = max(
max_lookahead_for_input[in_i], in_B_time)
# build fsm
state_cnt = input_cnt
tt = StateTransTable(
word_bytes, max_lookahead_for_input, state_cnt)
states_for_relict_processing: List[StateTransInfo] = []
# for all possible in/out configurations
for ss in sorted(sub_states.values(), key=lambda x: x.label):
ss: StateTransInfo
st_i = ss.get_state_i()
next_ss = ss.get_next_substate(sub_states)
if next_ss is None:
next_st_i = 0
else:
next_st_i = next_ss.get_state_i()
tr = StateTransItem(tt, st_i, next_st_i, int(next_ss is None))
tt.state_trans[st_i].append(tr)
o_prev = None
for last, (out_B_i, o) in iter_with_last(enumerate(ss.outputs)):
if o is None:
o_prev = o
# output byte is disconnected, which is default state
continue
# in_i - input stream index
# in_t - input time (register index)
(in_i, in_t, in_B_i, is_from_last_input_word) = o
in_rec: InputRegInputVal = tr.input[in_i][in_t]
# vld, keep required as we are planing to use this byte in output
in_rec.keep[in_B_i] = 1
in_rec.last = is_from_last_input_word
tr.out_byte_mux_sel[out_B_i] = (in_i, in_t, in_B_i)
tr.input_rd[in_i] = 1
# next keep = 0 because this byte will be consumed
tr.input_keep_mask[in_i][in_t][in_B_i] = 0
tr.output_keep[out_B_i] = 1
if last:
o_next = next_ss.outputs[0] if next_ss is not None else None
else:
o_next = ss.outputs[out_B_i + 1]
if o_next is not None:
assert o[0] <= o_next[0]
is_input_word_continuing_in_next_out_word = last \
and next_ss is not None \
and is_next_byte_from_same_input(o, o_next)\
and in_B_i != word_bytes - 1
if is_input_word_continuing_in_next_out_word:
assert next_ss is not None
states_for_relict_processing.append(next_ss)
is_first_input_byte = is_from_different_input(o_prev, o)
# is last byte from input byte in this output word
is_last_input_byte = is_from_different_input(o, o_next)
if is_last_input_byte:
assert not is_input_word_continuing_in_next_out_word
# iterate for all inputs until next input or end (if there are not any) and
# mark its input keep with 0 and last with 1 to mark for 0B frame input
next_input_i = input_cnt if o_next is None else o_next[0]
for skipped_input_i in range(in_i + 1, next_input_i):
_in_rec: InputRegInputVal = tr.input[skipped_input_i][0]
_in_rec.last = 1
_in_rec.keep = [0 for _ in _in_rec.keep]
_in_rec.relict = 1
tr.input_rd[skipped_input_i] = 1
tr.input_keep_mask[skipped_input_i][0] = [0 for _ in range(word_bytes)]
next_input_can_be_zero_len = not is_input_word_continuing_in_next_out_word and\
o_next is not None \
and can_be_zero_len_frame[next_input_i]
if next_input_can_be_zero_len:
# mark that the next_input does not have 0B frame
# to distinguish between the transitions which are skipping the input
assert o_next is not None
(next_in_i, next_in_t, next_in_B_i, _) = o_next
next_in_rec: InputRegInputVal = tr.input[next_in_i][next_in_t]
# vld, keep required as we are planing to use this byte in output in the future
next_in_rec.keep[next_in_B_i] = 1
if is_first_input_byte:
if in_B_i != 0:
# mark leading zero
for i in range(0, in_B_i):
in_rec.keep[i] = 0
if (is_last_input_byte \
or is_input_word_continuing_in_next_out_word\
or last) \
and (
not (is_from_last_input_word \
and is_last_input_byte \
and in_B_i == word_bytes - 1)):
# mark keep for next input byte
if not is_from_last_input_word or is_input_word_continuing_in_next_out_word:
# the next input byte is present because we are not in last input word
# or this may be a last word but it is not fully consumed
next_B_keep = 1
else:
# no more bytes from this input stream
next_B_keep = 0
if in_B_i == word_bytes - 1:
# because pipeline will shift next time
in_t += 1
input_val = tr.input[in_i]
if in_t < len(input_val):
next_keep = input_val[in_t].keep
next_keep[(in_B_i + 1) % word_bytes] = next_B_keep
o_prev = o
# if we are checking the input keep==0 set keep_mask=0 as well
# (not required, to make clear that the byte will not be used in code)
for in_meta, in_keep_mask in zip(tr.input, tr.input_keep_mask):
for in_i, in_inputs in enumerate(in_meta):
for B_i, k in enumerate(in_inputs.keep):
if k is not None and k == 0:
in_keep_mask[in_i][B_i] = 0
# mark relict flag
first_input_is_relict = ss in states_for_relict_processing
for o in ss.outputs:
if o is None:
# skip start padding
continue
(in_i, in_t, in_B_i, _) = o
v = tr.input[in_i][in_t]
if v.last:
# relict flag matters only for word with last flag set
# because it is used to distinguis starts of single word frames
# where only part of the word can be consumed to a output word
v.relict = int(first_input_is_relict)
break
if can_be_zero_len_frame[0]:
# The previous code generates the transition starting
# from the state corresponding to a minimal index of input used in it and the starting
# state is 0, thus all cases where some prefix input was 0B frame now starting in non starting state
prefix_zero_len_inputs_cnt = 0
for can_0B in can_be_zero_len_frame:
if can_0B:
prefix_zero_len_inputs_cnt += 1
else:
break
for orig_st_i in range(1, min(prefix_zero_len_inputs_cnt + 1, input_cnt)):
for tr in tt.state_trans[orig_st_i]:
new_tr: StateTransItem = deepcopy(tr)
new_tr.state = 0
# wait and consume 0B frames from all inputs where it is expected
for input_with_0B_i in range(tr.state):
in_rec: InputRegInputVal = new_tr.input[input_with_0B_i][0]
in_rec.keep = [0 for _ in range(word_bytes)]
in_rec.last = 1
in_rec.relict = 1
new_tr.input_rd[input_with_0B_i] = 1
new_tr.input_keep_mask[input_with_0B_i][0] = [0 for _ in range(word_bytes)]
tt.state_trans[0].append(new_tr)
if prefix_zero_len_inputs_cnt == input_cnt:
# everythin can be 0B frame, we need to add a special transition exactly for that
# because we did not process it by previous code because it looks only on output bytes
# and there are not output bytes
tr = StateTransItem(tt, 0, 0, 1)
tt.state_trans[0].append(tr)
for in_recs in tr.input:
in_rec: InputRegInputVal = in_recs[0]
in_rec.last = 1
in_rec.keep = [0 for _ in in_rec.keep]
in_rec.relict = 1
for skipped_input in tr.input_keep_mask:
skipped_input[0] = [0 for _ in range(word_bytes)]
tr.last = 1
tr.input_rd = [1 for _ in tr.input_rd]
tt.filter_unique_state_trans()
tt.assert_transitions_deterministic()
return tt