Skip to main content

src/nquic_metrics.erl

-module(nquic_metrics).
-moduledoc """
Per-listener observability primitives.

Owns one `atomics` ref for listener-wide counters (packets, conns,
handshakes, accept queue depth, ...) and one ETS table that holds a row
per tracked connection. Both are allocated by `new/0` when the listener's
dispatch table is built and destroyed on listener shutdown.

Counter increments are inline `atomics:add/3` calls (~10ns) so the hot
receive path stays cheap. The per-conn rows track identity, peer, and
lifecycle state for internal lookup; they are written on handshake
completion, state change, and connection termination.

Per-connection lifetime byte counters live in a smaller atomics ref
attached to the row, so the conn-statem can bump `bytes_in` /
`bytes_out` without rewriting the row.
""".

-export([
    add/3,
    destroy/1,
    get/2,
    inc/2,
    new/0,
    rcvbuf_errs/1,
    snapshot/1
]).

-export([
    delete_row/2,
    info_table/1,
    insert_row/2,
    list_rows/1,
    lookup_row/2,
    new_row/4,
    row_counters/1,
    row_dcid/1,
    row_pid/1,
    row_state/1,
    update_state/3
]).

-export([
    inc_bytes_in/2,
    inc_bytes_out/2,
    new_conn_counters/0,
    read_conn_counters/1,
    touch_last_packet/2
]).

-export_type([conn_counters/0, conn_row/0, t/0, slot/0]).

-type slot() ::
    packets_in
    | packets_dropped_mailbox
    | packets_dropped_ratelimit
    | conns_established
    | conns_closed_normal
    | conns_closed_idle_timeout
    | conns_closed_peer
    | conns_closed_protocol_error
    | handshakes_inflight
    | accept_queue_depth.

-define(SLOT_PACKETS_IN, 1).
-define(SLOT_PACKETS_DROPPED_MAILBOX, 2).
-define(SLOT_PACKETS_DROPPED_RATELIMIT, 3).
-define(SLOT_CONNS_ESTABLISHED, 4).
-define(SLOT_CONNS_CLOSED_NORMAL, 5).
-define(SLOT_CONNS_CLOSED_IDLE_TIMEOUT, 6).
-define(SLOT_CONNS_CLOSED_PEER, 7).
-define(SLOT_CONNS_CLOSED_PROTOCOL_ERROR, 8).
-define(SLOT_HANDSHAKES_INFLIGHT, 9).
-define(SLOT_ACCEPT_QUEUE_DEPTH, 10).
-define(N_SLOTS, 10).

-define(C_BYTES_IN, 1).
-define(C_BYTES_OUT, 2).
-define(C_LAST_PACKET_US, 3).
-define(C_DROPPED_IN, 4).
-define(N_CONN_SLOTS, 4).
-record(metrics, {
    counters :: atomics:atomics_ref(),
    info_table :: ets:tid(),
    start_monotonic_us :: integer(),
    rcvbuf_baseline :: non_neg_integer()
}).

-record(conn_row, {
    dcid :: binary(),
    pid :: pid(),
    peer :: nquic_socket:sockaddr() | undefined,
    state :: handshake | established | draining,
    counters :: conn_counters()
}).

-record(conn_counters, {
    ref :: atomics:atomics_ref()
}).

-type t() :: #metrics{}.
-type conn_counters() :: #conn_counters{}.
-type conn_row() :: #conn_row{}.

%%%-----------------------------------------------------------------------------
%% LISTENER-WIDE COUNTERS
%%%-----------------------------------------------------------------------------
-doc "Add `Delta` to the named counter.".
-spec add(t(), slot(), integer()) -> ok.
add(#metrics{counters = Ref}, Slot, Delta) ->
    atomics:add(Ref, slot_index(Slot), Delta),
    ok.

-doc "Destroy the info ETS table. The atomics ref is GC'd with the metrics record.".
-spec destroy(t()) -> ok.
destroy(#metrics{info_table = T}) ->
    ets:delete(T),
    ok.

-doc "Read a single counter value.".
-spec get(t(), slot()) -> integer().
get(#metrics{counters = Ref}, Slot) ->
    atomics:get(Ref, slot_index(Slot)).

-doc "Increment the named counter by 1.".
-spec inc(t(), slot()) -> ok.
inc(M, Slot) ->
    add(M, Slot, 1).

-doc "Allocate a fresh metrics handle (atomics ref + info ETS table).".
-spec new() -> t().
new() ->
    Ref = atomics:new(?N_SLOTS, [{signed, true}]),
    Tab = ets:new(nquic_metrics_info, [
        set,
        public,
        {keypos, #conn_row.dcid},
        {read_concurrency, true},
        {write_concurrency, true},
        {decentralized_counters, true}
    ]),
    #metrics{
        counters = Ref,
        info_table = Tab,
        start_monotonic_us = erlang:monotonic_time(microsecond),
        rcvbuf_baseline = sample_rcvbuf_errs()
    }.

-doc """
Return the current `InErrors` delta vs the baseline taken at listener
start. On non-Linux (no `/proc/net/snmp`) returns 0.
""".
-spec rcvbuf_errs(t()) -> non_neg_integer().
rcvbuf_errs(#metrics{rcvbuf_baseline = Base}) ->
    case sample_rcvbuf_errs() of
        Now when Now >= Base -> Now - Base;
        _ -> 0
    end.

-doc """
Return a snapshot of every counter plus `uptime_ms` and
`udp_rcvbuf_errs`.
""".
-spec snapshot(t()) -> #{atom() => integer()}.
snapshot(#metrics{counters = Ref, start_monotonic_us = T0} = M) ->
    UptimeMs = (erlang:monotonic_time(microsecond) - T0) div 1000,
    #{
        packets_in => atomics:get(Ref, ?SLOT_PACKETS_IN),
        packets_dropped_mailbox => atomics:get(Ref, ?SLOT_PACKETS_DROPPED_MAILBOX),
        packets_dropped_ratelimit => atomics:get(Ref, ?SLOT_PACKETS_DROPPED_RATELIMIT),
        conns_established => atomics:get(Ref, ?SLOT_CONNS_ESTABLISHED),
        conns_closed_normal => atomics:get(Ref, ?SLOT_CONNS_CLOSED_NORMAL),
        conns_closed_idle_timeout => atomics:get(Ref, ?SLOT_CONNS_CLOSED_IDLE_TIMEOUT),
        conns_closed_peer => atomics:get(Ref, ?SLOT_CONNS_CLOSED_PEER),
        conns_closed_protocol_error => atomics:get(Ref, ?SLOT_CONNS_CLOSED_PROTOCOL_ERROR),
        handshakes_inflight => atomics:get(Ref, ?SLOT_HANDSHAKES_INFLIGHT),
        accept_queue_depth => atomics:get(Ref, ?SLOT_ACCEPT_QUEUE_DEPTH),
        udp_rcvbuf_errs => rcvbuf_errs(M),
        uptime_ms => UptimeMs
    }.

%%%-----------------------------------------------------------------------------
%% PER-CONN INFO TABLE
%%%-----------------------------------------------------------------------------
-doc "Delete the row keyed by DCID. No-op if absent.".
-spec delete_row(t(), binary()) -> ok.
delete_row(#metrics{info_table = T}, DCID) ->
    ets:delete(T, DCID),
    ok.

-doc "Return the ETS tid for the info table (testing / advanced callers).".
-spec info_table(t()) -> ets:tid().
info_table(#metrics{info_table = T}) ->
    T.

-doc """
Insert (or replace) a row. Used on handshake completion and when peer
identity changes mid-connection.
""".
-spec insert_row(t(), conn_row()) -> true.
insert_row(#metrics{info_table = T}, #conn_row{} = Row) ->
    ets:insert(T, Row).

-doc "Return every row currently in the table.".
-spec list_rows(t()) -> [conn_row()].
list_rows(#metrics{info_table = T}) ->
    ets:tab2list(T).

-doc "Look up a single row by DCID.".
-spec lookup_row(t(), binary()) -> {ok, conn_row()} | {error, not_found}.
lookup_row(#metrics{info_table = T}, DCID) ->
    case ets:lookup(T, DCID) of
        [#conn_row{} = Row] -> {ok, Row};
        [] -> {error, not_found}
    end.

-doc "Build a fresh row with the given identity and a new counters block.".
-spec new_row(
    binary(),
    pid(),
    nquic_socket:sockaddr() | undefined,
    handshake | established | draining
) -> conn_row().
new_row(DCID, Pid, Peer, State) ->
    #conn_row{
        dcid = DCID,
        pid = Pid,
        peer = Peer,
        state = State,
        counters = new_conn_counters()
    }.

-doc "Return the row's lifetime counters block.".
-spec row_counters(conn_row()) -> conn_counters().
row_counters(#conn_row{counters = C}) -> C.

-doc "Return the row's initial DCID (the key).".
-spec row_dcid(conn_row()) -> binary().
row_dcid(#conn_row{dcid = D}) -> D.

-doc "Return the row's owning conn pid.".
-spec row_pid(conn_row()) -> pid().
row_pid(#conn_row{pid = P}) -> P.

-doc "Return the row's lifecycle state.".
-spec row_state(conn_row()) -> handshake | established | draining.
row_state(#conn_row{state = S}) -> S.

-doc "Update the lifecycle state field of an existing row.".
-spec update_state(t(), binary(), handshake | established | draining) -> ok.
update_state(#metrics{info_table = T}, DCID, State) when
    State =:= handshake; State =:= established; State =:= draining
->
    _ = ets:update_element(T, DCID, {#conn_row.state, State}),
    ok.

%%%-----------------------------------------------------------------------------
%% PER-CONN LIFETIME COUNTERS
%%%-----------------------------------------------------------------------------
-doc "Add `N` to the row's lifetime `bytes_in` total.".
-spec inc_bytes_in(conn_counters(), non_neg_integer()) -> ok.
inc_bytes_in(#conn_counters{ref = Ref}, N) when N >= 0 ->
    atomics:add(Ref, ?C_BYTES_IN, N),
    ok.

-doc "Add `N` to the row's lifetime `bytes_out` total.".
-spec inc_bytes_out(conn_counters(), non_neg_integer()) -> ok.
inc_bytes_out(#conn_counters{ref = Ref}, N) when N >= 0 ->
    atomics:add(Ref, ?C_BYTES_OUT, N),
    ok.

-doc "Allocate a fresh per-conn counters block.".
-spec new_conn_counters() -> conn_counters().
new_conn_counters() ->
    Ref = atomics:new(?N_CONN_SLOTS, [{signed, true}]),
    atomics:put(Ref, ?C_LAST_PACKET_US, erlang:monotonic_time(microsecond)),
    #conn_counters{ref = Ref}.

-doc "Read the full per-conn counters block as a map.".
-spec read_conn_counters(conn_counters()) -> #{atom() => integer()}.
read_conn_counters(#conn_counters{ref = Ref}) ->
    #{
        bytes_in => atomics:get(Ref, ?C_BYTES_IN),
        bytes_out => atomics:get(Ref, ?C_BYTES_OUT),
        last_packet_us => atomics:get(Ref, ?C_LAST_PACKET_US),
        dropped_in => atomics:get(Ref, ?C_DROPPED_IN)
    }.

-doc "Record `TsUs` (monotonic microseconds) as the last-packet timestamp.".
-spec touch_last_packet(conn_counters(), integer()) -> ok.
touch_last_packet(#conn_counters{ref = Ref}, TsUs) ->
    atomics:put(Ref, ?C_LAST_PACKET_US, TsUs),
    ok.

%%%-----------------------------------------------------------------------------
%% INTERNAL
%%%-----------------------------------------------------------------------------
-spec binary_to_integer_safe(binary()) -> non_neg_integer().
binary_to_integer_safe(Bin) ->
    try binary_to_integer(Bin) of
        N when N >= 0 -> N;
        _ -> 0
    catch
        error:badarg -> 0
    end.

-spec index_of(binary(), [binary()], pos_integer()) -> pos_integer() | not_found.
index_of(_Needle, [], _Ix) ->
    not_found;
index_of(Needle, [Needle | _], Ix) ->
    Ix;
index_of(Needle, [_ | Rest], Ix) ->
    index_of(Needle, Rest, Ix + 1).

-spec nth_field(pos_integer(), [binary()]) -> binary() | undefined.
nth_field(_Ix, []) ->
    undefined;
nth_field(1, [V | _]) ->
    V;
nth_field(N, [_ | Rest]) ->
    nth_field(N - 1, Rest).

-spec parse_rcvbuf_errs(binary()) -> non_neg_integer().
parse_rcvbuf_errs(Bin) ->
    Lines = binary:split(Bin, <<"\n">>, [global]),
    parse_rcvbuf_errs_lines(Lines, undefined).

-spec parse_rcvbuf_errs_lines([binary()], undefined | pos_integer()) ->
    non_neg_integer().
parse_rcvbuf_errs_lines([], _) ->
    0;
parse_rcvbuf_errs_lines([<<"Udp:", Rest/binary>> | Tail], undefined) ->
    Fields = binary:split(Rest, <<" ">>, [global, trim_all]),
    case index_of(<<"InErrors">>, Fields, 1) of
        not_found -> parse_rcvbuf_errs_lines(Tail, undefined);
        Idx -> parse_rcvbuf_errs_lines(Tail, Idx)
    end;
parse_rcvbuf_errs_lines([<<"Udp:", Rest/binary>> | _], Idx) ->
    Values = binary:split(Rest, <<" ">>, [global, trim_all]),
    case nth_field(Idx, Values) of
        undefined -> 0;
        Bin -> binary_to_integer_safe(Bin)
    end;
parse_rcvbuf_errs_lines([_ | Tail], Acc) ->
    parse_rcvbuf_errs_lines(Tail, Acc).

-spec sample_rcvbuf_errs() -> non_neg_integer().
sample_rcvbuf_errs() ->
    case file:read_file("/proc/net/snmp") of
        {ok, Bin} -> parse_rcvbuf_errs(Bin);
        {error, _} -> 0
    end.

-spec slot_index(slot()) -> pos_integer().
slot_index(packets_in) -> ?SLOT_PACKETS_IN;
slot_index(packets_dropped_mailbox) -> ?SLOT_PACKETS_DROPPED_MAILBOX;
slot_index(packets_dropped_ratelimit) -> ?SLOT_PACKETS_DROPPED_RATELIMIT;
slot_index(conns_established) -> ?SLOT_CONNS_ESTABLISHED;
slot_index(conns_closed_normal) -> ?SLOT_CONNS_CLOSED_NORMAL;
slot_index(conns_closed_idle_timeout) -> ?SLOT_CONNS_CLOSED_IDLE_TIMEOUT;
slot_index(conns_closed_peer) -> ?SLOT_CONNS_CLOSED_PEER;
slot_index(conns_closed_protocol_error) -> ?SLOT_CONNS_CLOSED_PROTOCOL_ERROR;
slot_index(handshakes_inflight) -> ?SLOT_HANDSHAKES_INFLIGHT;
slot_index(accept_queue_depth) -> ?SLOT_ACCEPT_QUEUE_DEPTH.