Skip to main content

src/nquic_protocol_streams_lifecycle.erl

-module(nquic_protocol_streams_lifecycle).
-moduledoc """
Stream terminal-state cleanup and stream-limit bookkeeping.

Pure functions over `#conn_state{}` that decide when a stream has
reached a terminal state and reclaim it: per-side terminal predicates
(`is_stream_terminal/3`, `is_send_done/1`, `is_recv_done/1`,
`is_closed_stream/2`), terminal-state reclamation. The recv-side
predicate treats `size_known` with an empty app buffer as terminal:
the peer's FIN has been delivered and there is nothing left for the
caller to drain, so the stream can be reclaimed without waiting for
an explicit `recv` round-trip from the application.
(`maybe_cleanup_stream/2,3`, `cleanup_stream/2`), the watermark
bookkeeping that keeps `closed_peer_streams` bounded
(`record_peer_closed/2`, `advance_peer_wm/3`, `set_peer_wm/3`), and
MAX_STREAMS window-update emission as peer-initiated streams are
consumed and reclaimed (`bump_max_streams/2`,
`maybe_send_max_streams/2`, `maybe_auto_extend_max_streams/2`,
`peer_consumed_bidi_streams/1`, `peer_consumed_uni_streams/1`).

`cleanup_stream/2` purges the reclaimed stream from the send-side
pending/blocked indices via `nquic_protocol_streams_send`; the send
engine calls back here for `maybe_cleanup_stream/3` and
`maybe_auto_extend_max_streams/2`.
""".

-include("nquic_conn.hrl").
-include("nquic_frame.hrl").
-export([
    bump_max_streams/2,
    is_closed_stream/2,
    is_stream_terminal/3,
    maybe_auto_extend_max_streams/2,
    maybe_cleanup_stream/2,
    maybe_cleanup_stream/3,
    peer_consumed_bidi_streams/1,
    peer_consumed_uni_streams/1
]).

-spec advance_peer_wm(bidi | uni, nquic:stream_id(), nquic_protocol:state()) ->
    nquic_protocol:state().
advance_peer_wm(Type, StreamID, State) ->
    SS0 = State#conn_state.streams_state,
    Closed = SS0#conn_streams.closed_peer_streams,
    NextID = StreamID + 4,
    case maps:take(NextID, Closed) of
        {true, Closed1} ->
            NewSS = SS0#conn_streams{closed_peer_streams = Closed1},
            State1 = set_peer_wm(Type, StreamID, State#conn_state{streams_state = NewSS}),
            advance_peer_wm(Type, NextID, State1);
        error ->
            set_peer_wm(Type, StreamID, State)
    end.

-spec bump_max_streams(nquic:stream_id(), nquic_protocol:state()) -> nquic_protocol:state().
bump_max_streams(StreamID, State) ->
    SS0 = State#conn_state.streams_state,
    Type = nquic_stream_manager:type(StreamID),
    case Type of
        bidi ->
            NewLimit = SS0#conn_streams.local_max_streams_bidi + 1,
            NewSS = SS0#conn_streams{local_max_streams_bidi = NewLimit},
            maybe_send_max_streams(bidi, State#conn_state{streams_state = NewSS});
        uni ->
            NewLimit = SS0#conn_streams.local_max_streams_uni + 1,
            NewSS = SS0#conn_streams{local_max_streams_uni = NewLimit},
            maybe_send_max_streams(uni, State#conn_state{streams_state = NewSS})
    end.

-spec cleanup_stream(nquic:stream_id(), nquic_protocol:state()) -> nquic_protocol:state().
cleanup_stream(StreamID, State) ->
    #conn_state{streams_state = SS, role = Role} = State,
    #conn_streams{streams = Streams} = SS,
    State0 = nquic_protocol_streams_send:clear_pending_send(
        StreamID, nquic_protocol_streams_send:clear_blocked(StreamID, State)
    ),
    SS0 = State0#conn_state.streams_state,
    NewSS = SS0#conn_streams{streams = maps:remove(StreamID, Streams)},
    case nquic_frame_handler:is_locally_initiated(StreamID, Role) of
        true ->
            State0#conn_state{streams_state = NewSS};
        false ->
            State1 = State0#conn_state{streams_state = NewSS},
            State2 = record_peer_closed(StreamID, State1),
            bump_max_streams(StreamID, State2)
    end.

-spec is_closed_stream(nquic:stream_id(), nquic_protocol:state()) -> boolean().
is_closed_stream(StreamID, State) ->
    #conn_state{role = Role, streams_state = SS} = State,
    #conn_streams{
        next_bidi_stream = NextBidi,
        next_uni_stream = NextUni,
        closed_peer_bidi_wm = BidiWm,
        closed_peer_uni_wm = UniWm,
        closed_peer_streams = Closed,
        streams = Streams
    } = SS,
    case nquic_frame_handler:is_locally_initiated(StreamID, Role) of
        true ->
            not maps:is_key(StreamID, Streams) andalso
                case nquic_stream_manager:type(StreamID) of
                    bidi -> StreamID < NextBidi;
                    uni -> StreamID < NextUni
                end;
        false ->
            case nquic_stream_manager:type(StreamID) of
                bidi -> StreamID =< BidiWm orelse maps:is_key(StreamID, Closed);
                uni -> StreamID =< UniWm orelse maps:is_key(StreamID, Closed)
            end
    end.

-spec is_recv_done(#stream_state{}) -> boolean().
is_recv_done(#stream_state{recv_state = data_read}) -> true;
is_recv_done(#stream_state{recv_state = reset_recvd}) -> true;
is_recv_done(#stream_state{recv_state = reset_read}) -> true;
is_recv_done(#stream_state{recv_state = size_known, app_buffer_size = 0}) -> true;
is_recv_done(_) -> false.

-spec is_send_done(atom()) -> boolean().
is_send_done(data_sent) -> true;
is_send_done(data_recvd) -> true;
is_send_done(reset_sent) -> true;
is_send_done(reset_recvd) -> true;
is_send_done(_) -> false.

-spec is_stream_terminal(nquic:stream_id(), client | server | undefined, #stream_state{}) ->
    boolean().
is_stream_terminal(StreamID, Role, #stream_state{type = uni, send_state = SS} = Stream) ->
    case nquic_frame_handler:is_locally_initiated(StreamID, Role) of
        true -> is_send_done(SS);
        false -> is_recv_done(Stream)
    end;
is_stream_terminal(_StreamID, _Role, #stream_state{send_state = SS} = Stream) ->
    is_send_done(SS) andalso is_recv_done(Stream).

-spec maybe_auto_extend_max_streams(bidi | uni, nquic_protocol:state()) -> nquic_protocol:state().
maybe_auto_extend_max_streams(bidi, State) ->
    SS0 = State#conn_state.streams_state,
    Consumed = peer_consumed_bidi_streams(State),
    Current = SS0#conn_streams.local_max_streams_bidi,
    case Consumed * 4 >= Current * 3 of
        true ->
            NewLimit = Current * 2,
            NewSS1 = SS0#conn_streams{local_max_streams_bidi = NewLimit},
            State1 = State#conn_state{streams_state = NewSS1},
            Frame = #max_streams{max_streams = NewLimit, is_uni = false},
            {ok, State2} = nquic_protocol_send_queues:queue_app_frame(Frame, State1),
            SS2 = (State2#conn_state.streams_state)#conn_streams{
                last_sent_max_streams_bidi = NewLimit
            },
            State2#conn_state{streams_state = SS2};
        false ->
            State
    end;
maybe_auto_extend_max_streams(uni, State) ->
    SS0 = State#conn_state.streams_state,
    Consumed = peer_consumed_uni_streams(State),
    Current = SS0#conn_streams.local_max_streams_uni,
    case Consumed * 4 >= Current * 3 of
        true ->
            NewLimit = Current * 2,
            NewSS1 = SS0#conn_streams{local_max_streams_uni = NewLimit},
            State1 = State#conn_state{streams_state = NewSS1},
            Frame = #max_streams{max_streams = NewLimit, is_uni = true},
            {ok, State2} = nquic_protocol_send_queues:queue_app_frame(Frame, State1),
            SS2 = (State2#conn_state.streams_state)#conn_streams{
                last_sent_max_streams_uni = NewLimit
            },
            State2#conn_state{streams_state = SS2};
        false ->
            State
    end.

-spec maybe_cleanup_stream(nquic:stream_id(), nquic_protocol:state()) -> nquic_protocol:state().
maybe_cleanup_stream(StreamID, State) ->
    #conn_state{streams_state = #conn_streams{streams = Streams}, role = Role} = State,
    case maps:find(StreamID, Streams) of
        {ok, Stream} ->
            case is_stream_terminal(StreamID, Role, Stream) of
                true -> cleanup_stream(StreamID, State);
                false -> State
            end;
        error ->
            State
    end.

-spec maybe_cleanup_stream(nquic:stream_id(), #stream_state{}, nquic_protocol:state()) ->
    nquic_protocol:state().
maybe_cleanup_stream(StreamID, Stream, State) ->
    case is_stream_terminal(StreamID, State#conn_state.role, Stream) of
        true -> cleanup_stream(StreamID, State);
        false -> State
    end.

-spec maybe_send_max_streams(bidi | uni, nquic_protocol:state()) -> nquic_protocol:state().
maybe_send_max_streams(bidi, State) ->
    SS0 = State#conn_state.streams_state,
    Current = SS0#conn_streams.local_max_streams_bidi,
    LastSent = SS0#conn_streams.last_sent_max_streams_bidi,
    PeerConsumed = peer_consumed_bidi_streams(State),
    Remaining = LastSent - PeerConsumed,
    Threshold = max(1, LastSent div 2),
    case Remaining =< Threshold of
        true ->
            Frame = #max_streams{max_streams = Current, is_uni = false},
            {ok, State1} = nquic_protocol_send_queues:queue_app_frame(Frame, State),
            SS1 = (State1#conn_state.streams_state)#conn_streams{
                last_sent_max_streams_bidi = Current
            },
            State1#conn_state{streams_state = SS1};
        false ->
            State
    end;
maybe_send_max_streams(uni, State) ->
    SS0 = State#conn_state.streams_state,
    Current = SS0#conn_streams.local_max_streams_uni,
    LastSent = SS0#conn_streams.last_sent_max_streams_uni,
    PeerConsumed = peer_consumed_uni_streams(State),
    Remaining = LastSent - PeerConsumed,
    Threshold = max(1, LastSent div 2),
    case Remaining =< Threshold of
        true ->
            Frame = #max_streams{max_streams = Current, is_uni = true},
            {ok, State1} = nquic_protocol_send_queues:queue_app_frame(Frame, State),
            SS1 = (State1#conn_state.streams_state)#conn_streams{
                last_sent_max_streams_uni = Current
            },
            State1#conn_state{streams_state = SS1};
        false ->
            State
    end.

-spec peer_consumed_bidi_streams(nquic_protocol:state()) -> non_neg_integer().
peer_consumed_bidi_streams(
    #conn_state{streams_state = #conn_streams{opened_peer_bidi_count = N}}
) ->
    N.

-spec peer_consumed_uni_streams(nquic_protocol:state()) -> non_neg_integer().
peer_consumed_uni_streams(
    #conn_state{streams_state = #conn_streams{opened_peer_uni_count = N}}
) ->
    N.

-spec record_peer_closed(nquic:stream_id(), nquic_protocol:state()) -> nquic_protocol:state().
record_peer_closed(StreamID, State) ->
    SS0 = State#conn_state.streams_state,
    Type = nquic_stream_manager:type(StreamID),
    Wm =
        case Type of
            bidi -> SS0#conn_streams.closed_peer_bidi_wm;
            uni -> SS0#conn_streams.closed_peer_uni_wm
        end,
    InOrder =
        case Wm of
            -1 ->
                FirstID = nquic_stream_manager:first_peer_stream_id(
                    State#conn_state.role, Type
                ),
                StreamID =:= FirstID;
            _ ->
                StreamID =:= Wm + 4
        end,
    case InOrder of
        true ->
            advance_peer_wm(Type, StreamID, State);
        false ->
            Closed = SS0#conn_streams.closed_peer_streams,
            NewSS = SS0#conn_streams{closed_peer_streams = Closed#{StreamID => true}},
            State#conn_state{streams_state = NewSS}
    end.

-spec set_peer_wm(bidi | uni, nquic:stream_id(), nquic_protocol:state()) -> nquic_protocol:state().
set_peer_wm(bidi, ID, State) ->
    SS0 = State#conn_state.streams_state,
    State#conn_state{streams_state = SS0#conn_streams{closed_peer_bidi_wm = ID}};
set_peer_wm(uni, ID, State) ->
    SS0 = State#conn_state.streams_state,
    State#conn_state{streams_state = SS0#conn_streams{closed_peer_uni_wm = ID}}.