Skip to main content

src/nquic_protocol_streams.erl

-module(nquic_protocol_streams).
-moduledoc """
Inbound STREAM frame processing and RESET_STREAM / STOP_SENDING handling.

Pure functions over `#conn_state{}` covering peer-initiated stream
ingest: STREAM frame processing with connection- and stream-level flow
control and window-update emission (`handle_stream_frame/7`,
`maybe_send_max_data/1`), DATA_BLOCKED / STREAM_DATA_BLOCKED responses
(`respond_to_data_blocked/2`, `respond_to_stream_data_blocked/3`), the
sender-side blocked signal (`signal_blocked/4`), and RESET_STREAM /
STOP_SENDING dispatch with final-size flow accounting
(`handle_reset_stream/5`, `handle_reset_stream_new/2`,
`handle_stop_sending/3`).

Send-side drain/blocked bookkeeping lives in
`nquic_protocol_streams_send`; terminal-state reclamation and
stream-limit window bookkeeping in `nquic_protocol_streams_lifecycle`.
""".

-include("nquic_conn.hrl").
-include("nquic_frame.hrl").
-include("nquic_transport.hrl").
-export([
    handle_reset_stream/5,
    handle_reset_stream_new/2,
    handle_stop_sending/3,
    handle_stream_frame/7,
    maybe_send_max_data/1,
    respond_to_data_blocked/2,
    respond_to_stream_data_blocked/3,
    signal_blocked/4
]).

-spec apply_reset_stream(
    already_done | final_size_err | apply,
    nquic:stream_id(),
    non_neg_integer(),
    non_neg_integer(),
    #stream_state{},
    nquic_protocol:state()
) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}
    | {error, term(), nquic_protocol:state()}.
apply_reset_stream(already_done, _StreamID, _FinalSize, _Code, _Stream, State) ->
    {ok, [], State};
apply_reset_stream(final_size_err, _StreamID, _FinalSize, _Code, _Stream, State) ->
    {error, {transport_error, final_size_error}, State};
apply_reset_stream(apply, StreamID, FinalSize, AppErrorCode, Stream, State) ->
    Increase = max(0, FinalSize - Stream#stream_state.recv_max_offset),
    Flow = State#conn_state.flow,
    NewReceived = Flow#conn_flow.data_received + Increase,
    apply_reset_with_flow_check(
        NewReceived > Flow#conn_flow.local_max_data,
        NewReceived,
        StreamID,
        FinalSize,
        AppErrorCode,
        Stream,
        State
    ).

-spec apply_reset_with_flow_check(
    boolean(),
    non_neg_integer(),
    nquic:stream_id(),
    non_neg_integer(),
    non_neg_integer(),
    #stream_state{},
    nquic_protocol:state()
) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}
    | {error, term(), nquic_protocol:state()}.
apply_reset_with_flow_check(true, _NewReceived, _StreamID, _FinalSize, _Code, _Stream, State) ->
    {error, {transport_error, flow_control_error}, State};
apply_reset_with_flow_check(false, NewReceived, StreamID, FinalSize, AppErrorCode, Stream, State) ->
    NewStream = Stream#stream_state{
        recv_state = reset_recvd,
        recv_offset = FinalSize,
        recv_max_offset = FinalSize,
        recv_buffer = gb_trees:empty(),
        app_buffer = [],
        app_buffer_size = 0
    },
    #conn_state{streams_state = SS, flow = Flow} = State,
    #conn_streams{streams = Streams} = SS,
    State1 = State#conn_state{
        streams_state = SS#conn_streams{streams = Streams#{StreamID => NewStream}},
        flow = Flow#conn_flow{data_received = NewReceived}
    },
    {ok, [{stream_reset, StreamID, AppErrorCode}], State1}.

-spec classify_data_received(
    {ok, nquic_protocol:state(), #stream_state{}} | {error, term()},
    nquic:stream_id(),
    nquic_frame:t(),
    boolean(),
    map(),
    nquic_protocol:state()
) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}
    | {error, term(), nquic_protocol:state()}.
classify_data_received({error, flow_control_error}, _ID, _F, _New, _Streams, State) ->
    {error, {transport_error, flow_control_error}, State};
classify_data_received({ok, State1, StreamState1}, StreamID, Frame, IsNewStream, Streams0, _State) ->
    State2 = maybe_track_peer_stream_id(IsNewStream, StreamID, State1),
    State3 = maybe_send_max_data(State2),
    {StreamState2, State4} = maybe_send_max_stream_data(StreamState1, State3),
    classify_handle_recv(
        nquic_stream_statem:handle_recv(StreamState2, Frame),
        StreamID,
        IsNewStream,
        Streams0,
        State4
    ).

-spec classify_get_or_create(
    {ok, #stream_state{}, map()} | {error, term()},
    nquic:stream_id(),
    non_neg_integer(),
    binary(),
    nquic_frame:t(),
    boolean(),
    nquic_protocol:state()
) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}
    | {error, term(), nquic_protocol:state()}.
classify_get_or_create({error, stream_limit_error}, _ID, _Off, _Data, _F, _New, State) ->
    {error, {transport_error, stream_limit_error}, State};
classify_get_or_create({error, Reason}, _ID, _Off, _Data, _F, _New, State) ->
    {error, {stream_creation_error, Reason}, State};
classify_get_or_create(
    {ok, StreamState0, Streams0}, StreamID, Offset, StreamData, Frame, IsNewStream, State
) ->
    StreamState = nquic_frame_handler:ensure_stream_limits(StreamState0, State),
    Len = byte_size(StreamData),
    classify_data_received(
        nquic_flow:on_stream_data_received(State, StreamState, Offset, Len),
        StreamID,
        Frame,
        IsNewStream,
        Streams0,
        State
    ).

-spec classify_handle_recv(
    {ok, #stream_state{}} | {error, term()},
    nquic:stream_id(),
    boolean(),
    map(),
    nquic_protocol:state()
) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}
    | {error, term(), nquic_protocol:state()}.
classify_handle_recv({error, Reason}, _StreamID, _IsNewStream, _Streams0, State) ->
    {error, {stream_error, Reason}, State};
classify_handle_recv({ok, NewStreamState}, StreamID, IsNewStream, Streams0, State) ->
    NewStreams = Streams0#{StreamID => NewStreamState},
    SS = (State#conn_state.streams_state)#conn_streams{streams = NewStreams},
    State1 = State#conn_state{streams_state = SS},
    Events = stream_events(IsNewStream, StreamID, State1#conn_state.role),
    State2 = nquic_protocol_streams_lifecycle:maybe_cleanup_stream(
        StreamID, NewStreamState, State1
    ),
    {ok, Events, State2}.

-spec classify_reset_state(atom(), non_neg_integer(), non_neg_integer()) ->
    already_done | final_size_err | apply.
classify_reset_state(data_read, _, _) ->
    already_done;
classify_reset_state(reset_read, _, _) ->
    already_done;
classify_reset_state(reset_recvd, _, _) ->
    already_done;
classify_reset_state(size_known, FinalSize, RecvMaxOffset) when FinalSize =/= RecvMaxOffset ->
    final_size_err;
classify_reset_state(_, FinalSize, RecvMaxOffset) when FinalSize < RecvMaxOffset ->
    final_size_err;
classify_reset_state(_, _, _) ->
    apply.

-spec handle_max_data(
    {ok, nquic_protocol:state(), nquic_frame:t()} | false, nquic_protocol:state()
) ->
    nquic_protocol:state().
handle_max_data(false, State) ->
    State;
handle_max_data({ok, NewState, WinFrame}, _State) ->
    {ok, NewState1} = nquic_protocol_send_queues:queue_app_frame(WinFrame, NewState),
    NewState1.

-spec handle_max_stream_data(
    {ok, #stream_state{}, nquic_frame:t()} | false, #stream_state{}, nquic_protocol:state()
) -> {#stream_state{}, nquic_protocol:state()}.
handle_max_stream_data(false, StreamState, State) ->
    {StreamState, State};
handle_max_stream_data({ok, NewStreamState, SWinFrame}, _StreamState, State) ->
    {ok, NewState} = nquic_protocol_send_queues:queue_app_frame(SWinFrame, State),
    {NewStreamState, NewState}.

-spec handle_reset_stream(
    nquic:stream_id(), non_neg_integer(), non_neg_integer(), #stream_state{}, nquic_protocol:state()
) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}
    | {error, term(), nquic_protocol:state()}.
handle_reset_stream(StreamID, FinalSize, AppErrorCode, Stream, State) ->
    #stream_state{recv_state = RecvState, recv_max_offset = RecvMaxOffset} = Stream,
    apply_reset_stream(
        classify_reset_state(RecvState, FinalSize, RecvMaxOffset),
        StreamID,
        FinalSize,
        AppErrorCode,
        Stream,
        State
    ).

-spec handle_reset_stream_new(non_neg_integer(), nquic_protocol:state()) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}
    | {error, term(), nquic_protocol:state()}.
handle_reset_stream_new(FinalSize, State) ->
    Flow0 = State#conn_state.flow,
    NewReceived = Flow0#conn_flow.data_received + FinalSize,
    case NewReceived > Flow0#conn_flow.local_max_data of
        true ->
            {error, {transport_error, flow_control_error}, State};
        false ->
            {ok, [], State#conn_state{flow = Flow0#conn_flow{data_received = NewReceived}}}
    end.

-spec handle_stop_sending(nquic:stream_id(), non_neg_integer(), nquic_protocol:state()) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}.
handle_stop_sending(StreamID, ErrCode, State) ->
    #conn_state{streams_state = SS} = State,
    #conn_streams{streams = Streams} = SS,
    case maps:find(StreamID, Streams) of
        {ok, Stream} ->
            #stream_state{send_state = SendState, send_offset = SendOffset} = Stream,
            case SendState of
                S when S =:= reset_sent; S =:= reset_recvd; S =:= data_recvd ->
                    {ok, [], State};
                _ ->
                    ResetFrame = #reset_stream{
                        stream_id = StreamID,
                        app_error_code = ErrCode,
                        final_size = SendOffset
                    },
                    NewStream = Stream#stream_state{send_state = reset_sent},
                    NewStreams = Streams#{StreamID => NewStream},
                    State0 = nquic_protocol_streams_send:clear_blocked(StreamID, State),
                    SS1 = (State0#conn_state.streams_state)#conn_streams{streams = NewStreams},
                    State1 = State0#conn_state{streams_state = SS1},
                    {ok, State2} = nquic_protocol_send_queues:queue_app_frame(ResetFrame, State1),
                    State3 = nquic_protocol_streams_lifecycle:maybe_cleanup_stream(
                        StreamID, NewStream, State2
                    ),
                    {ok, [{stop_sending, StreamID, ErrCode}], State3}
            end;
        error ->
            {ok, [], State}
    end.

-spec handle_stream_frame(
    nquic:stream_id(),
    non_neg_integer(),
    binary(),
    nquic_frame:t(),
    {ok, #stream_state{}} | error,
    map(),
    nquic_protocol:state()
) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}
    | {error, term(), nquic_protocol:state()}.
handle_stream_frame(StreamID, Offset, StreamData, Frame, Existing, Limits, State) ->
    #conn_state{streams_state = #conn_streams{streams = Streams}, role = Role} = State,
    IsNewStream = Existing =:= error,
    GetOrCreate =
        case Existing of
            {ok, StreamState} ->
                {ok, StreamState, Streams};
            error ->
                nquic_stream_manager:get_or_create(StreamID, Streams, Role, Limits)
        end,
    classify_get_or_create(
        GetOrCreate,
        StreamID,
        Offset,
        StreamData,
        Frame,
        IsNewStream,
        State
    ).

-spec maybe_send_max_data(nquic_protocol:state()) -> nquic_protocol:state().
maybe_send_max_data(State) ->
    Window = State#conn_state.local_params#transport_params.initial_max_data,
    handle_max_data(nquic_flow:maybe_update_conn_window(State, Window), State).

-spec maybe_send_max_stream_data(#stream_state{}, nquic_protocol:state()) ->
    {#stream_state{}, nquic_protocol:state()}.
maybe_send_max_stream_data(StreamState, State) ->
    Window = State#conn_state.local_params#transport_params.initial_max_stream_data_bidi_local,
    handle_max_stream_data(
        nquic_flow:maybe_update_stream_window(StreamState, Window), StreamState, State
    ).

-spec maybe_track_peer_stream_id(boolean(), nquic:stream_id(), nquic_protocol:state()) ->
    nquic_protocol:state().
maybe_track_peer_stream_id(true, StreamID, State) ->
    nquic_protocol_streams_send:track_peer_stream_id(StreamID, State);
maybe_track_peer_stream_id(false, _StreamID, State) ->
    State.

-doc """
Respond to a peer DATA_BLOCKED frame (RFC 9000 §4.1 / §19.12).
Unconditionally advertises a connection limit strictly above where
the peer reported it is blocked: `max(local_max_data, PeerLimit +
initial_max_data)`. This guarantees forward progress whenever a sender
signals it is stuck at the window edge; the receipt/reader ratchets
are proactive and can decline, but a peer that has explicitly said it
is blocked must always be granted headroom. Bounded and auto-tuning:
`PeerLimit` only grows as the peer makes progress, so the grant tracks
one window ahead of the peer's blocked offset rather than unbounded.
""".
-spec respond_to_data_blocked(non_neg_integer(), nquic_protocol:state()) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}.
respond_to_data_blocked(PeerLimit, State) ->
    Flow = State#conn_state.flow,
    Window = State#conn_state.local_params#transport_params.initial_max_data,
    Current = Flow#conn_flow.local_max_data,
    Target = max(Current, PeerLimit + Window),
    State1 =
        case Target > Current of
            true -> State#conn_state{flow = Flow#conn_flow{local_max_data = Target}};
            false -> State
        end,
    {ok, State2} = nquic_protocol_send_queues:queue_app_frame(
        #max_data{max_data = Target}, State1
    ),
    {ok, [], State2}.

-doc """
Respond to a peer STREAM_DATA_BLOCKED frame (RFC 9000 §19.13).
Per-stream analogue of `respond_to_data_blocked/2`: unconditionally
advertise `max(recv_window, PeerLimit + initial_max_stream_data_bidi_local)`
so a peer blocked on a stream limit is always granted headroom.
Unknown streams are ignored.
""".
-spec respond_to_stream_data_blocked(nquic:stream_id(), non_neg_integer(), nquic_protocol:state()) ->
    {ok, [nquic_protocol:event()], nquic_protocol:state()}.
respond_to_stream_data_blocked(StreamID, PeerLimit, State) ->
    SS0 = State#conn_state.streams_state,
    case maps:find(StreamID, SS0#conn_streams.streams) of
        {ok, Stream0} ->
            Window =
                State#conn_state.local_params#transport_params.initial_max_stream_data_bidi_local,
            Current = Stream0#stream_state.recv_window,
            Target = max(Current, PeerLimit + Window),
            Stream1 = Stream0#stream_state{recv_window = Target},
            SS1 = SS0#conn_streams{
                streams = maps:put(StreamID, Stream1, SS0#conn_streams.streams)
            },
            State1 = State#conn_state{streams_state = SS1},
            {ok, State2} = nquic_protocol_send_queues:queue_app_frame(
                #max_stream_data{stream_id = StreamID, max_stream_data = Target},
                State1
            ),
            {ok, [], State2};
        error ->
            {ok, [], State}
    end.

-doc """
Mark a stream flow-control-blocked and signal the peer (RFC 9000 §4.1).
A sender that is blocked by a flow-control limit SHOULD emit
DATA_BLOCKED (connection) or STREAM_DATA_BLOCKED (stream) so the
receiver can extend the limit. Emission is deduplicated per limit
value so a stream parked across many retry turns produces one frame
per limit, not one per turn.
""".
-spec signal_blocked(atom(), non_neg_integer(), nquic:stream_id(), nquic_protocol:state()) ->
    nquic_protocol:state().
signal_blocked(conn_flow_control_blocked, Limit, StreamID, State) ->
    State1 = nquic_protocol_streams_send:mark_blocked(StreamID, State),
    Flow = State1#conn_state.flow,
    case Limit > Flow#conn_flow.last_data_blocked of
        true ->
            {ok, State2} = nquic_protocol_send_queues:queue_app_frame(
                #data_blocked{limit = Limit}, State1
            ),
            Flow2 = State2#conn_state.flow,
            State2#conn_state{flow = Flow2#conn_flow{last_data_blocked = Limit}};
        false ->
            State1
    end;
signal_blocked(stream_flow_control_blocked, Limit, StreamID, State) ->
    State1 = nquic_protocol_streams_send:mark_blocked(StreamID, State),
    SS0 = State1#conn_state.streams_state,
    case maps:find(StreamID, SS0#conn_streams.streams) of
        {ok, Stream} when Limit > Stream#stream_state.last_stream_data_blocked ->
            {ok, State2} = nquic_protocol_send_queues:queue_app_frame(
                #stream_data_blocked{stream_id = StreamID, limit = Limit}, State1
            ),
            Stream1 = Stream#stream_state{last_stream_data_blocked = Limit},
            SS1 = State2#conn_state.streams_state,
            SS2 = SS1#conn_streams{
                streams = maps:put(StreamID, Stream1, SS1#conn_streams.streams)
            },
            State2#conn_state{streams_state = SS2};
        _ ->
            State1
    end.

-spec stream_events(boolean(), nquic:stream_id(), client | server) -> [nquic_protocol:event()].
stream_events(true, StreamID, Role) ->
    case nquic_frame_handler:is_locally_initiated(StreamID, Role) of
        true -> [{stream_data, StreamID}];
        false -> [{stream_opened, StreamID}, {stream_data, StreamID}]
    end;
stream_events(false, StreamID, _Role) ->
    [{stream_data, StreamID}].