-module(nquic_lib).
-moduledoc """
Library-mode API for nquic.
Library mode exposes the QUIC protocol as pure functions over an
opaque `nquic:ctx()` value. The owning process drives the socket,
receives packets, and flushes outgoing data. Compared to
`gen_statem` mode this avoids every `gen_statem` round trip on the
hot path.
Production use requires the caller to understand the protocol
carefully; the compiler will not stop you from calling the library
functions out of order.
## Typical usage
```erlang
{ok, Ctx} = nquic:accept(Listener),
{ok, Ctx1} = nquic_lib:takeover(Ctx),
{ok, Ctx2} = nquic_lib:upgrade_to_connected(Ctx1),
loop(Ctx2).
loop(Ctx) ->
case nquic_lib:recv_and_process(Ctx, 5000) of
{ok, Events, Ctx1} -> handle(Events), loop(Ctx1);
{error, _Reason, _} -> ok
end.
```
## Listener drain
A server connection's owner process is sent
`{quic_drain, Listener}` when its listener is torn down via
`nquic:stop_listener/1,2` with `mode => cascade` (the default).
The owner SHOULD handle this message by closing the connection
(`shutdown/1` for a graceful CONNECTION_CLOSE) and terminating.
An owner that ignores it is not leaked indefinitely (the connection
still closes on its idle timeout), but a prompt `cascade` depends on
owners honouring the signal. Client connections own their socket,
have no listener, and never receive it.
""".
-include("nquic_conn.hrl").
-export([
buffer_events/2,
close/1,
close/2,
close_stream/2,
flush/1,
flush_notimers/1,
handle_packet/3,
handle_packet/4,
handle_packet_batch/5,
handle_packet_batch_notimers/5,
handle_packet_notimers/3,
handle_packet_notimers/4,
handshake_timeout/3,
initiate_key_update/1,
is_writable/2,
open_stream/2,
path_stats/1,
recv/2,
recv_and_process/1,
recv_and_process/2,
recv_batch/1,
recv_batch/2,
recv_datagram/1,
recv_direct/1,
recv_direct/2,
recv_pending/1,
reset_stream/3,
schedule_timers/1,
send/3,
send_datagram/2,
send_fin/3,
send_fin_noflush/3,
server_accept_init/1,
shutdown/1,
shutdown/3,
takeover/1,
timeout/2,
upgrade_to_connected/1
]).
-export([ctx_connected/1, ctx_dispatch/1, ctx_peer/1, ctx_socket/1, ctx_state/1, ctx_timers/1]).
%%%-----------------------------------------------------------------------------
%% SEND / RECV
%%%-----------------------------------------------------------------------------
-doc """
Buffer protocol events into the context.
Moves `{datagram_received, Data}` events into the datagram buffer
(bounded, drops oldest on overflow). Returns remaining non-datagram
events and the updated context.
""".
-spec buffer_events([nquic_protocol:event()], nquic:ctx()) ->
{[nquic_protocol:event()], nquic:ctx()}.
buffer_events(Events, Ctx) ->
buffer_events(Events, Ctx, []).
-spec buffer_events([nquic_protocol:event()], nquic:ctx(), [nquic_protocol:event()]) ->
{[nquic_protocol:event()], nquic:ctx()}.
buffer_events([], Ctx, OtherAcc) ->
{lists:reverse(OtherAcc), Ctx};
buffer_events([{datagram_received, Data} | Rest], Ctx, OtherAcc) ->
Buf = nquic_ctx:datagram_buffer(Ctx),
Size = nquic_ctx:datagram_buffer_size(Ctx),
Max = nquic_ctx:datagram_buffer_max(Ctx),
{Buf1, Size1} =
case Size >= Max of
true ->
{{value, _}, Buf2} = queue:out(Buf),
{queue:in(Data, Buf2), Size};
false ->
{queue:in(Data, Buf), Size + 1}
end,
Ctx1 = nquic_ctx:set_datagram(Ctx, Buf1, Size1),
buffer_events(Rest, Ctx1, OtherAcc);
buffer_events([Event | Rest], Ctx, OtherAcc) ->
buffer_events(Rest, Ctx, [Event | OtherAcc]).
-doc """
Read available data from a stream buffer.
Does not block. Returns `{ok, Data, IsFin, Ctx}` where `IsFin` marks
end of stream.
""".
-spec recv(nquic:ctx(), nquic:stream_id()) ->
{ok, binary(), boolean(), nquic:ctx()} | {error, term()}.
recv(Ctx, StreamId) ->
case nquic_protocol:read_stream(StreamId, nquic_ctx:state(Ctx)) of
{ok, Data, IsFin, State1} ->
{ok, Data, IsFin, nquic_ctx:set_state(Ctx, State1)};
{error, _} = Err ->
Err
end.
-doc """
Receive a buffered DATAGRAM.
Returns the oldest datagram from the buffer, or `{error, empty}` if
none are available. Use `buffer_events/2` after `handle_packet/3` to
move received datagrams into the buffer.
""".
-spec recv_datagram(nquic:ctx()) -> {ok, binary(), nquic:ctx()} | {error, empty}.
recv_datagram(Ctx) ->
Buf = nquic_ctx:datagram_buffer(Ctx),
case queue:out(Buf) of
{{value, Data}, Buf1} ->
Size = nquic_ctx:datagram_buffer_size(Ctx),
{ok, Data, nquic_ctx:set_datagram(Ctx, Buf1, Size - 1)};
{empty, _} ->
{error, empty}
end.
-doc """
Send data on a stream.
Queues the data, flushes pending frames into packets, and writes them
to the socket.
""".
-spec send(nquic:ctx(), nquic:stream_id(), iodata()) ->
{ok, nquic:ctx()} | {error, term(), nquic:ctx()} | {error, term()}.
send(Ctx, StreamId, Data) ->
case nquic_protocol:send_stream(StreamId, Data, nofin, nquic_ctx:state(Ctx)) of
{ok, State1} ->
flush_ctx(nquic_ctx:set_state(Ctx, State1));
{error, Reason, State1} ->
{error, Reason, nquic_ctx:set_state(Ctx, State1)};
{error, _} = Err ->
Err
end.
-doc "Send an unreliable DATAGRAM frame.".
-spec send_datagram(nquic:ctx(), binary()) -> {ok, nquic:ctx()} | {error, nquic_error:any_reason()}.
send_datagram(Ctx, Data) ->
case nquic_protocol:send_datagram(Data, nquic_ctx:state(Ctx)) of
{ok, State1} ->
flush_ctx(nquic_ctx:set_state(Ctx, State1));
{error, _} = Err ->
Err
end.
-doc "Send data with FIN on a stream.".
-spec send_fin(nquic:ctx(), nquic:stream_id(), iodata()) ->
{ok, nquic:ctx()} | {error, term(), nquic:ctx()} | {error, term()}.
send_fin(Ctx, StreamId, Data) ->
case nquic_protocol:send_stream(StreamId, Data, fin, nquic_ctx:state(Ctx)) of
{ok, State1} ->
flush_ctx(nquic_ctx:set_state(Ctx, State1));
{error, Reason, State1} ->
{error, Reason, nquic_ctx:set_state(Ctx, State1)};
{error, _} = Err ->
Err
end.
-doc """
Queue data + FIN on a stream without flushing.
Use to batch multiple stream sends into a single subsequent `flush/1`
call.
""".
-spec send_fin_noflush(nquic:ctx(), nquic:stream_id(), iodata()) ->
{ok, nquic:ctx()} | {error, term(), nquic:ctx()} | {error, term()}.
send_fin_noflush(Ctx, StreamId, Data) ->
case nquic_protocol:send_stream(StreamId, Data, fin, nquic_ctx:state(Ctx)) of
{ok, State1} ->
{ok, nquic_ctx:set_state(Ctx, State1)};
{error, Reason, State1} ->
{error, Reason, nquic_ctx:set_state(Ctx, State1)};
{error, _} = Err ->
Err
end.
%%%-----------------------------------------------------------------------------
%% STREAM LIFECYCLE
%%%-----------------------------------------------------------------------------
-doc "Close a stream.".
-spec close_stream(nquic:ctx(), nquic:stream_id()) ->
{ok, nquic:ctx()} | {error, nquic_error:any_reason()}.
close_stream(Ctx, StreamId) ->
case nquic_protocol:close_stream(StreamId, nquic_ctx:state(Ctx)) of
{ok, State1} ->
flush_ctx(nquic_ctx:set_state(Ctx, State1));
{error, _} = Err ->
Err
end.
-doc "Open a new stream.".
-spec open_stream(nquic:ctx(), nquic:stream_opts()) ->
{ok, nquic:stream_id(), nquic:ctx()} | {error, term()}.
open_stream(Ctx, Opts) ->
case nquic_protocol:open_stream(Opts, nquic_ctx:state(Ctx)) of
{ok, StreamId, State1} ->
{ok, StreamId, nquic_ctx:set_state(Ctx, State1)};
{error, _} = Err ->
Err
end.
-doc "Reset a stream with an error code.".
-spec reset_stream(nquic:ctx(), nquic:stream_id(), non_neg_integer()) ->
{ok, nquic:ctx()} | {error, term()}.
reset_stream(Ctx, StreamId, ErrorCode) ->
case nquic_protocol:reset_stream(StreamId, ErrorCode, nquic_ctx:state(Ctx)) of
{ok, State1} ->
flush_ctx(nquic_ctx:set_state(Ctx, State1));
{error, _} = Err ->
Err
end.
%%%-----------------------------------------------------------------------------
%% KEY UPDATE
%%%-----------------------------------------------------------------------------
-doc """
Initiate a client-side key update (RFC 9001 Section 6).
Rotates the 1-RTT traffic secrets and flips the key phase so the next
sent 1-RTT packet carries the new keys. No frame is emitted and no
packet is sent here; the owner's next `flush/1` (or any send) carries
the rotation, and the peer's reply confirms it. Returns
`{error, key_update_pending}` if a previously initiated update has not
yet been confirmed by the peer.
""".
-spec initiate_key_update(nquic:ctx()) ->
{ok, nquic:ctx()} | {error, key_update_pending}.
initiate_key_update(Ctx) ->
case nquic_protocol_key_update:initiate_key_update(nquic_ctx:state(Ctx)) of
{ok, State1} ->
{ok, nquic_ctx:set_state(Ctx, State1)};
{error, key_update_pending} = Err ->
Err
end.
%%%-----------------------------------------------------------------------------
%% PACKET PROCESSING
%%%-----------------------------------------------------------------------------
-doc """
Flush pending frames into packets and send them.
Encrypts queued frames, sends the resulting packets via the socket,
and schedules any timer updates.
""".
-spec flush(nquic:ctx()) -> {ok, nquic:ctx()}.
flush(Ctx) ->
flush_ctx(Ctx).
-doc """
Flush pending frames without scheduling timers.
Use this when the caller will immediately call a recv function that
handles timers; avoids redundant `cancel_timer` / `send_after`
syscalls.
""".
-spec flush_notimers(nquic:ctx()) -> {ok, nquic:ctx()}.
flush_notimers(Ctx) ->
State = nquic_ctx:state(Ctx),
case nquic_protocol:flush(State) of
{ok, Packets, State1, _TimerActions} ->
Socket = nquic_ctx:socket(Ctx),
State2 = nquic_lib_timer:maybe_apply_ecn_transition(Socket, State1),
nquic_conn_metrics:bytes_out(State2, iolist_size(Packets)),
nquic_lib_socket:send_packets(
Socket,
State2#conn_state.peer,
nquic_ctx:connected(Ctx),
State2#conn_state.gso_size,
Packets
),
{ok, nquic_ctx:set_state(Ctx, State2)};
{ok, State1} ->
{ok, nquic_ctx:set_state(Ctx, State1)}
end.
-doc """
Process an incoming packet.
Decrypts, decodes, and handles every frame in the packet. Returns the
protocol events and the context with timers scheduled.
""".
-spec handle_packet(nquic:ctx(), nquic_socket:sockaddr(), binary()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
handle_packet(Ctx, Source, Bin) ->
handle_packet(Ctx, Source, Bin, not_ect).
-doc "Like `handle_packet/3` but carries the inbound ECN codepoint.".
-spec handle_packet(
nquic:ctx(), nquic_socket:sockaddr(), binary(), nquic_socket:ecn_mark()
) -> {ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
handle_packet(Ctx, Source, Bin, ECN) ->
nquic_conn_metrics:bytes_in(nquic_ctx:state(Ctx), byte_size(Bin)),
case nquic_protocol:handle_packet(Bin, Source, nquic_ctx:state(Ctx), ECN) of
{ok, Events, State1, TimerActions} ->
Ctx1 = nquic_lib_timer:apply_timer_actions(
nquic_ctx:set_state(Ctx, State1), TimerActions
),
{Events1, Ctx2} = nquic_lib_timer:absorb_migration_events(Events, Ctx1),
{ok, Events1, Ctx2};
{error, Reason, State1} ->
{error, Reason, nquic_ctx:set_state(Ctx, State1)}
end.
-doc """
Process an incoming packet without scheduling timers.
Use for batch processing: call this once per packet, then call
`schedule_timers/1` once at the end before flushing.
""".
-spec handle_packet_notimers(nquic:ctx(), nquic_socket:sockaddr(), binary()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
handle_packet_notimers(Ctx, Source, Bin) ->
handle_packet_notimers(Ctx, Source, Bin, not_ect).
-doc "Like `handle_packet_notimers/3` but carries an ECN mark.".
-spec handle_packet_notimers(
nquic:ctx(), nquic_socket:sockaddr(), binary(), nquic_socket:ecn_mark()
) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
handle_packet_notimers(Ctx, Source, Bin, ECN) ->
nquic_conn_metrics:bytes_in(nquic_ctx:state(Ctx), byte_size(Bin)),
case nquic_protocol:handle_packet_notimers(Bin, Source, nquic_ctx:state(Ctx), ECN) of
{ok, Events, State1} ->
{Events1, Ctx1} = nquic_lib_timer:absorb_migration_events(
Events, nquic_ctx:set_state(Ctx, State1)
),
{ok, Events1, Ctx1};
{error, Reason, State1} ->
{error, Reason, nquic_ctx:set_state(Ctx, State1)}
end.
-doc """
Compute and schedule timer actions for the current protocol state.
Call this after a batch of `handle_packet_notimers/3` calls and before
`flush/1`. Computes loss detection, idle, and ACK delay timers and
schedules them as `erlang:send_after` messages.
""".
-spec schedule_timers(nquic:ctx()) -> nquic:ctx().
schedule_timers(Ctx) ->
{TimerActions, State1} = nquic_protocol_timer:compute_timer_actions(nquic_ctx:state(Ctx)),
nquic_lib_timer:apply_timer_actions(nquic_ctx:set_state(Ctx, State1), TimerActions).
-doc """
Handle a QUIC timer expiration.
Called when a `{quic_timeout, Type}` message is received by the owner.
Processes the timeout (loss detection, idle, path validation),
flushes any resulting packets, and returns protocol events.
""".
-spec timeout(nquic:ctx(), nquic_protocol:timer_type()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
timeout(Ctx, Type) ->
case nquic_protocol:handle_timeout(Type, nquic_ctx:state(Ctx)) of
{ok, Events, State1, TimerActions} ->
{ok, Ctx1} = flush_ctx(nquic_ctx:set_state(Ctx, State1)),
Ctx2 = nquic_lib_timer:apply_timer_actions(Ctx1, TimerActions),
{ok, Events, Ctx2};
{error, Reason, State1} ->
{error, Reason, nquic_ctx:set_state(Ctx, State1)}
end.
%%%-----------------------------------------------------------------------------
%% SERVER HANDSHAKE (OWNER-FROM-FIRST-PACKET)
%%%-----------------------------------------------------------------------------
-doc """
Handle a `{quic_timeout, Type}` expiration during the server handshake.
`Phase` is the encryption level the owner is currently driving
(`initial` until it observes `{state_transition, handshake}`, then
`handshake`). On a PTO this sends the probe at that level; the
established `timeout/2` (which probes 1-RTT) is wrong before the
connection is established. Switch to `timeout/2` once `connected` is
observed.
""".
-spec handshake_timeout(nquic:ctx(), initial | handshake, nquic_protocol:timer_type()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
handshake_timeout(Ctx, Phase, Type) ->
case nquic_protocol:handle_handshake_timeout(Phase, Type, nquic_ctx:state(Ctx)) of
{ok, Events, State1, TimerActions} ->
{ok, Ctx1} = flush_ctx(nquic_ctx:set_state(Ctx, State1)),
Ctx2 = nquic_lib_timer:apply_timer_actions(Ctx1, TimerActions),
{ok, Events, Ctx2};
{error, Reason, State1} ->
{error, Reason, nquic_ctx:set_state(Ctx, State1)}
end.
-doc """
Seed a server-side `t:nquic:ctx/0` for an owner that drives the
handshake itself, from the first Initial packet's options.
`Opts` is the option map the receiver builds for a new connection
(role, socket, peer, dcid/odcid, version, dispatch_table, listener,
certs, alpn, static_key, transport params, ...). Builds the same
`#conn_state{}` as `nquic_conn_statem:init/1` (via
`nquic_conn_init:new_conn_state/1`), registers `SCID -> self()` in the
dispatch table so the connection's own CIDs route to this owner, and
returns a context in the `initial` phase.
The owner then drives `initial -> handshake -> established` by calling
`handle_packet/3,4` + `flush/1` on inbound `{packet, _}` messages and
`handshake_timeout/3` on `{quic_timeout, _}`, until `handle_packet`
emits `connected`. There is no export, accept queue, or takeover: the
owner is the registrant from the first packet, so the connection's CID
never resolves to a non-owner.
""".
-spec server_accept_init(map()) -> {ok, nquic:ctx()}.
server_accept_init(Opts) ->
State = nquic_conn_init:new_conn_state(Opts#{role => server}),
#conn_state{scid = SCID, socket = Socket, peer = Peer, dispatch_table = Dispatch} = State,
case Dispatch of
undefined -> ok;
Table -> nquic_listener:dispatch_register(Table, SCID, self())
end,
nquic_conn_metrics:handshake_started(State),
{ok, nquic_ctx:new(State, Socket, Peer, Dispatch)}.
%%%-----------------------------------------------------------------------------
%% HIGH-LEVEL RECV / RECV_AND_PROCESS
%%%-----------------------------------------------------------------------------
-spec ctx_owns_socket(nquic:ctx()) -> boolean().
ctx_owns_socket(Ctx) ->
case nquic_ctx:dispatch(Ctx) of
undefined -> true;
_ -> nquic_ctx:connected(Ctx)
end.
-doc """
Process a GRO-coalesced datagram delivered as a single
`{packet_batch, Source, Buf, GsoSize, ECN}` message.
Splits the buffer per GsoSize bytes and runs `handle_packet/4` on each
segment, consolidating events. Schedules timers once at the end.
""".
-spec handle_packet_batch(
nquic:ctx(),
nquic_socket:sockaddr(),
binary(),
pos_integer(),
nquic_socket:ecn_mark()
) -> {ok, [nquic_protocol:event()], nquic:ctx()}.
handle_packet_batch(Ctx, Source, Buf, GsoSize, ECN) ->
{Ctx1, EventsAcc} = nquic_lib_batch:drain_packet_batch(Ctx, Source, Buf, GsoSize, ECN, []),
Ctx2 = schedule_timers(Ctx1),
{ok, lists:reverse(EventsAcc), Ctx2}.
-doc """
Like `handle_packet_batch/5` but without scheduling timers.
For callers that drain many datagrams per wakeup and call
`schedule_timers/1` once at the end before flushing (the batch sibling of
`handle_packet_notimers/3`). Per-segment decode errors are dropped rather
than fatal, mirroring `drain_packet_batch/6`: an undecryptable packet in a
GRO-coalesced buffer must not tear down the connection (RFC 9000 §12.2).
""".
-spec handle_packet_batch_notimers(
nquic:ctx(),
nquic_socket:sockaddr(),
binary(),
pos_integer(),
nquic_socket:ecn_mark()
) -> {ok, [nquic_protocol:event()], nquic:ctx()}.
handle_packet_batch_notimers(Ctx, Source, Buf, GsoSize, ECN) ->
{Ctx1, EventsAcc} = nquic_lib_batch:drain_packet_batch(Ctx, Source, Buf, GsoSize, ECN, []),
{ok, lists:reverse(EventsAcc), Ctx1}.
-doc """
Receive and process the next packet or timeout.
Waits for either a `{packet, Source, Bin}` message or a
`{quic_timeout, Type}` timer expiration. Equivalent to
`recv_and_process(Ctx, infinity)`.
""".
-spec recv_and_process(nquic:ctx()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
recv_and_process(Ctx) ->
recv_and_process(Ctx, infinity).
-doc """
Receive and process the next packet or timeout, with timeout.
Returns `{ok, [], Ctx}` if no message arrives within `Timeout` ms.
""".
-spec recv_and_process(nquic:ctx(), timeout()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
recv_and_process(Ctx, Timeout) ->
receive
{packet, Source, PacketBin} ->
handle_packet(Ctx, Source, PacketBin);
{packet, Source, PacketBin, ECN} ->
handle_packet(Ctx, Source, PacketBin, ECN);
{packet_batch, Source, Buf, GsoSize, ECN} ->
handle_packet_batch(Ctx, Source, Buf, GsoSize, ECN);
{quic_timeout, Type} ->
timeout(Ctx, Type)
after Timeout ->
{ok, [], Ctx}
end.
-doc """
Receive and process ALL available packets, then schedule timers and
flush once.
For ctxs upgraded with `upgrade_to_connected/1`, drains both the
mailbox and the kernel queue (via non-blocking `socket:recvfrom/3`).
For dispatched-mode ctxs (sharing the listener socket), drains only
the mailbox; polling the shared socket would steal packets routed
to sibling connections. Blocks up to `Timeout` ms for the first
packet if none are available.
The batch equivalent of `recv_direct/1` / `recv_and_process/1`.
Amortises timer scheduling and flushing across many packets.
""".
-spec recv_batch(nquic:ctx()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
recv_batch(Ctx) ->
recv_batch(Ctx, infinity).
-doc "Like `recv_batch/1` with a timeout on the first packet.".
-spec recv_batch(nquic:ctx(), timeout()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
recv_batch(Ctx, Timeout) ->
case ctx_owns_socket(Ctx) of
true -> nquic_lib_batch:recv_batch_connected(Ctx, Timeout);
false -> nquic_lib_batch:recv_batch_dispatched(Ctx, Timeout)
end.
-doc """
Receive and process the next packet directly from the socket.
For connections that called `upgrade_to_connected/1`, reads packets
directly via `socket:recvfrom/3` instead of waiting for `{packet,...}`
messages. Also services timer expirations.
""".
-spec recv_direct(nquic:ctx()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
recv_direct(Ctx) ->
recv_direct(Ctx, infinity).
-doc "Like `recv_direct/1` with a timeout.".
-spec recv_direct(nquic:ctx(), timeout()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
recv_direct(Ctx, Timeout) ->
Socket = nquic_ctx:socket(Ctx),
nquic_lib_socket:drain_stale_socket_msgs(Socket),
receive
{packet, Source, PacketBin} ->
handle_packet(Ctx, Source, PacketBin);
{packet, Source, PacketBin, ECN} ->
handle_packet(Ctx, Source, PacketBin, ECN);
{packet_batch, Source, Buf, GsoSize, ECN} ->
handle_packet_batch(Ctx, Source, Buf, GsoSize, ECN);
{quic_timeout, Type} ->
timeout(Ctx, Type)
after 0 ->
case nquic_lib_socket:socket_recv_for_ctx(Socket) of
{ok, Source, Buf, undefined} ->
handle_packet(Ctx, Source, Buf);
{ok, Source, Buf, GsoSize} ->
handle_packet_batch(Ctx, Source, Buf, GsoSize, not_ect);
{select, SelectInfo} ->
receive
{'$socket', Socket, select, _SI} ->
recv_direct_ready(Ctx, Socket);
{packet, Src, Bin} ->
_ = nquic_socket:recv_cancel(Socket, SelectInfo),
handle_packet(Ctx, Src, Bin);
{packet, Src, Bin, ECN} ->
_ = nquic_socket:recv_cancel(Socket, SelectInfo),
handle_packet(Ctx, Src, Bin, ECN);
{packet_batch, Src, Buf, GsoSize, ECN} ->
_ = nquic_socket:recv_cancel(Socket, SelectInfo),
handle_packet_batch(Ctx, Src, Buf, GsoSize, ECN);
{quic_timeout, Type} ->
_ = nquic_socket:recv_cancel(Socket, SelectInfo),
timeout(Ctx, Type)
after Timeout ->
_ = nquic_socket:recv_cancel(Socket, SelectInfo),
{ok, [], Ctx}
end;
{error, Reason} ->
{error, Reason, Ctx}
end
end.
-doc """
Upgrade the context to use a connected UDP socket.
Opens a new UDP socket on the same port as the listener, then
`connect(2)`s it to the peer. The kernel routes packets from this peer
directly to the new socket, bypassing the receiver dispatch entirely.
After this call, use `recv_direct/1,2` instead of
`recv_and_process/1,2`.
> #### Warning: handshake race under burst load {: .warning}
>
> The connected socket must bind to the listener's port with
> `SO_REUSEPORT` (Linux requires all sockets sharing a port to have it
> if any does). Each upgraded connection adds another member to the
> reuseport group. For a new client's Initial (no existing 4-tuple
> match), Linux hashes across the group and may land on a connected
> socket whose peer does not match; `compute_score()` returns -1 and
> the kernel drops the packet rather than retrying another slot. The
> new client then stalls until `wait_established` times out.
>
> Under sequential-but-bursty connects the observed loss rate is ~1%
> per new handshake once many connections are held open. The proper
> fix is an `SO_ATTACH_REUSEPORT_CBPF` filter pinning fallback hashes
> to the listener slot (exact 4-tuple matches bypass it and keep the
> fast path). OTP 28's `socket` module does not yet expose this option
> and `setopt_native` cannot marshal the required `struct sock_fprog`
> pointer, so the fix is pending.
>
> Callers that accept many concurrent connections should avoid this
> helper until the library-level fix lands, or tolerate the rare
> timeout at the accept layer.
""".
-spec upgrade_to_connected(nquic:ctx()) ->
{ok, nquic:ctx()} | {error, term()}.
upgrade_to_connected(Ctx) ->
OldSocket = nquic_ctx:socket(Ctx),
Peer = nquic_ctx:peer(Ctx),
maybe
{ok, ListenerPort} ?= nquic_socket:port(OldSocket),
{ok, ConnSocket} ?= nquic_socket:open_connected(ListenerPort, Peer),
{ok, nquic_ctx:set_socket_connected(Ctx, ConnSocket, true)}
else
{error, _} = Err ->
Err
end.
%%%-----------------------------------------------------------------------------
%% CLOSE / SHUTDOWN
%%%-----------------------------------------------------------------------------
-doc "Close the connection gracefully (transport scope, error code 0, empty reason).".
-spec close(nquic:ctx()) -> {ok, nquic:ctx()}.
close(Ctx) ->
close(Ctx, #{}).
-doc """
Close the connection with the given options.
`scope => transport` (default) emits CONNECTION_CLOSE type 0x1c;
`application` emits type 0x1d (RFC 9000 §19.19).
""".
-spec close(nquic:ctx(), nquic:close_opts()) -> {ok, nquic:ctx()}.
close(Ctx, Opts) ->
Scope = maps:get(scope, Opts, transport),
ErrorCode = maps:get(error_code, Opts, 0),
Reason = maps:get(reason, Opts, <<>>),
State0 = nquic_ctx:state(Ctx),
{ok, State1} =
case Scope of
transport -> nquic_protocol:close(ErrorCode, Reason, State0);
application -> nquic_protocol:close_app(ErrorCode, Reason, State0)
end,
flush_ctx(nquic_ctx:set_state(Ctx, State1)).
-doc """
Shut down a library-mode connection.
Sends CONNECTION_CLOSE with error code 0, flushes pending packets,
cancels all timers, and explicitly closes the connected UDP socket
(if any). Idempotent. Always returns `ok`.
The caller is expected to clean up dispatch table entries and exit
the process itself after this returns.
""".
-spec shutdown(nquic:ctx()) -> ok.
shutdown(Ctx) ->
shutdown_impl(fun close/1, Ctx).
-doc "Like `shutdown/1` but sends an application error code.".
-spec shutdown(nquic:ctx(), non_neg_integer(), binary()) -> ok.
shutdown(Ctx, ErrorCode, Reason) ->
CloseOpts = #{scope => application, error_code => ErrorCode, reason => Reason},
CloseFn = fun(C) -> close(C, CloseOpts) end,
shutdown_impl(CloseFn, Ctx).
-spec shutdown_impl(fun((nquic:ctx()) -> {ok, nquic:ctx()}), nquic:ctx()) -> ok.
shutdown_impl(CloseFn, Ctx) ->
_ =
try CloseFn(Ctx) of
{ok, Ctx1} ->
try flush(Ctx1) of
{ok, _} -> ok
catch
error:_ -> ok
end
catch
error:_ -> ok
end,
ok = maps:foreach(
fun(_Type, Ref) ->
_ = erlang:cancel_timer(Ref, [{async, true}, {info, false}])
end,
nquic_ctx:timers(Ctx)
),
_ =
case nquic_ctx:connected(Ctx) of
true -> socket:close(nquic_ctx:socket(Ctx));
false -> ok
end,
ok.
%%%-----------------------------------------------------------------------------
%% CID TAKEOVER / PENDING DRAIN
%%%-----------------------------------------------------------------------------
-doc """
Process every buffered `{packet, Source, Bin}` message in the current
process mailbox.
Call after `takeover/1` and/or `upgrade_to_connected/1` to handle
packets that arrived during the ownership transition window. Returns
accumulated events, or `{error, Reason, Ctx}` on transport error.
""".
-spec recv_pending(nquic:ctx()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
recv_pending(Ctx) ->
recv_pending_loop(Ctx, []).
-spec recv_pending_loop(nquic:ctx(), [nquic_protocol:event()]) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
recv_pending_loop(Ctx, AccEvents) ->
receive
{packet, Source, Bin} ->
case handle_packet(Ctx, Source, Bin) of
{ok, Events, Ctx1} ->
recv_pending_loop(Ctx1, AccEvents ++ Events);
{error, Reason, Ctx1} ->
{error, Reason, Ctx1}
end;
{packet, Source, Bin, _ECN} ->
case handle_packet(Ctx, Source, Bin) of
{ok, Events, Ctx1} ->
recv_pending_loop(Ctx1, AccEvents ++ Events);
{error, Reason, Ctx1} ->
{error, Reason, Ctx1}
end
after 0 ->
{ok, AccEvents, Ctx}
end.
-doc """
Take ownership of a library-mode context in this process.
Re-registers all local connection IDs in the dispatch table to point to
`self()`. Call this when a process receives a `nquic:ctx()` from another
process (e.g. an accept loop handing off to a per-connection handler).
After `takeover/1`, call `recv_pending/1` to process any packets that
were buffered in the previous owner's mailbox during the transition.
""".
-spec takeover(nquic:ctx()) -> {ok, nquic:ctx()}.
takeover(Ctx) ->
State = nquic_ctx:state(Ctx),
case nquic_ctx:dispatch(Ctx) of
undefined ->
Ctx1 = nquic_ctx:set_state(Ctx, nquic_protocol:reset_timer_cache(State)),
{ok, schedule_timers(Ctx1)};
Dispatch ->
CIDs = nquic_protocol:local_cids(State),
Self = self(),
ok = lists:foreach(
fun(CID) -> nquic_dispatch:register(Dispatch, CID, Self) end,
CIDs
),
case nquic_protocol:odcid(State) of
undefined -> ok;
<<>> -> ok;
ODCID -> nquic_dispatch:register(Dispatch, ODCID, Self)
end,
Ctx1 = nquic_ctx:set_state(Ctx, nquic_protocol:reset_timer_cache(State)),
{ok, schedule_timers(Ctx1)}
end.
%%%-----------------------------------------------------------------------------
%% CONTEXT ACCESSORS
%%%-----------------------------------------------------------------------------
-doc false.
-spec ctx_connected(nquic:ctx()) -> boolean().
ctx_connected(Ctx) -> nquic_ctx:connected(Ctx).
-doc false.
-spec ctx_dispatch(nquic:ctx()) -> nquic_dispatch:t() | undefined.
ctx_dispatch(Ctx) -> nquic_ctx:dispatch(Ctx).
-doc false.
-spec ctx_peer(nquic:ctx()) -> nquic_socket:sockaddr().
ctx_peer(Ctx) -> nquic_ctx:peer(Ctx).
-doc false.
-spec ctx_socket(nquic:ctx()) -> nquic_socket:t().
ctx_socket(Ctx) -> nquic_ctx:socket(Ctx).
-doc false.
-spec ctx_state(nquic:ctx()) -> nquic_protocol:state().
ctx_state(Ctx) -> nquic_ctx:state(Ctx).
-doc false.
-spec ctx_timers(nquic:ctx()) -> #{nquic_protocol:timer_type() => reference()}.
ctx_timers(Ctx) -> nquic_ctx:timers(Ctx).
-doc """
Point-in-time stream writability probe on a library-mode context.
Pure projection on the conn_state held inside the `nquic:ctx()`:
returns `true` when the stream exists, its send side is not
terminal, and one byte fits under current connection-flow,
stream-flow, and congestion limits. `false` otherwise (including
unknown streams). A `false` does not guarantee the next send
succeeds, and a `true` can be stale before the caller acts; it is a
between-recv-turns probe, not a poll loop.
""".
-spec is_writable(nquic:ctx(), nquic:stream_id()) -> boolean().
is_writable(Ctx, StreamId) ->
nquic_protocol_streams_send:is_writable(StreamId, nquic_ctx:state(Ctx)).
-doc """
Project path-level statistics from a library-mode context.
Pure projection on the conn_state held inside the `nquic:ctx()`. No
message hops, no syscalls. See `nquic_conn:path_stats/1` for the field
list.
""".
-spec path_stats(nquic:ctx()) -> nquic_loss:path_stats().
path_stats(Ctx) ->
nquic_protocol:path_stats(nquic_ctx:state(Ctx)).
%%%-----------------------------------------------------------------------------
%% INTERNAL
%%%-----------------------------------------------------------------------------
-spec flush_ctx(nquic:ctx()) -> {ok, nquic:ctx()}.
flush_ctx(Ctx) ->
State = nquic_ctx:state(Ctx),
case nquic_protocol:flush(State) of
{ok, Packets, State1, TimerActions} ->
Socket = nquic_ctx:socket(Ctx),
State2 = nquic_lib_timer:maybe_apply_ecn_transition(Socket, State1),
nquic_conn_metrics:bytes_out(State2, iolist_size(Packets)),
nquic_lib_socket:send_packets(
Socket,
State2#conn_state.peer,
nquic_ctx:connected(Ctx),
State2#conn_state.gso_size,
Packets
),
Ctx1 = nquic_lib_timer:apply_timer_actions(
nquic_ctx:set_state(Ctx, State2), TimerActions
),
{ok, Ctx1};
{ok, State1} ->
{ok, nquic_ctx:set_state(Ctx, State1)}
end.
-spec recv_direct_ready(nquic:ctx(), nquic_socket:t()) ->
{ok, [nquic_protocol:event()], nquic:ctx()} | {error, term(), nquic:ctx()}.
recv_direct_ready(Ctx, Socket) ->
case nquic_lib_socket:socket_recv_for_ctx(Socket) of
{ok, Source, Buf, undefined} ->
handle_packet(Ctx, Source, Buf);
{ok, Source, Buf, GsoSize} ->
handle_packet_batch(Ctx, Source, Buf, GsoSize, not_ect);
{select, _} ->
{ok, [], Ctx};
{error, Reason} ->
{error, Reason, Ctx}
end.