src/lightspeed@ops@channel_harness.erl

-module(lightspeed@ops@channel_harness).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/ops/channel_harness.gleam").
-export([run_with/3, run_default/0, report_signature/1]).
-export_type([report/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Deterministic channel fanout harness for M14 baseline reporting.\n").

-type report() :: {report,
        binary(),
        binary(),
        integer(),
        integer(),
        integer(),
        integer(),
        integer(),
        integer()}.

-file("src/lightspeed/ops/channel_harness.gleam", 169).
-spec at(list(integer()), integer(), integer()) -> integer().
at(Values, Index, Fallback) ->
    case {Values, Index} of
        {[], _} ->
            Fallback;

        {[Value | _], 0} ->
            Value;

        {[_ | Rest], _} ->
            at(Rest, Index - 1, Fallback)
    end.

-file("src/lightspeed/ops/channel_harness.gleam", 184).
-spec clamp_min(integer(), integer()) -> integer().
clamp_min(Value, Min) ->
    case Value < Min of
        true ->
            Min;

        false ->
            Value
    end.

-file("src/lightspeed/ops/channel_harness.gleam", 155).
-spec percentile(list(integer()), integer()) -> integer().
percentile(Values, Percent) ->
    Sorted = gleam@list:sort(Values, fun gleam@int:compare/2),
    Length = erlang:length(Sorted),
    case Length of
        0 ->
            0;

        _ ->
            Scaled = (Length - 1) * Percent,
            Rank = clamp_min(Scaled div 100, 0),
            at(Sorted, Rank, 0)
    end.

-file("src/lightspeed/ops/channel_harness.gleam", 191).
-spec divide_safe(integer(), integer()) -> integer().
divide_safe(Value, Divisor) ->
    case Divisor =< 0 of
        true ->
            0;

        false ->
            case Divisor of
                0 -> 0;
                Gleam@denominator -> Value div Gleam@denominator
            end
    end.

-file("src/lightspeed/ops/channel_harness.gleam", 177).
-spec sum(list(integer()), integer()) -> integer().
sum(Values, Total) ->
    case Values of
        [] ->
            Total;

        [Value | Rest] ->
            sum(Rest, Total + Value)
    end.

-file("src/lightspeed/ops/channel_harness.gleam", 144).
-spec count_push(list(lightspeed@channel:outbound()), integer()) -> integer().
count_push(Outbox, Total) ->
    case Outbox of
        [] ->
            Total;

        [Message | Rest] ->
            case Message of
                {push_message, _, _, _, _} ->
                    count_push(Rest, Total + 1);

                _ ->
                    count_push(Rest, Total)
            end
    end.

-file("src/lightspeed/ops/channel_harness.gleam", 102).
-spec broadcast_loop(
    lightspeed@channel:channel(),
    binary(),
    integer(),
    integer(),
    list(binary()),
    list(integer()),
    integer()
) -> {lightspeed@channel:channel(), list(integer()), integer()}.
broadcast_loop(
    State,
    Topic,
    Broadcasts,
    Index,
    Client_ids,
    Samples_rev,
    Deliveries
) ->
    case Index > Broadcasts of
        true ->
            {State, Samples_rev, Deliveries};

        false ->
            case Client_ids of
                [] ->
                    {State, Samples_rev, Deliveries};

                [Sender | _] ->
                    Next = lightspeed@channel:handle(
                        State,
                        {broadcast,
                            Sender,
                            Topic,
                            <<"m-"/utf8,
                                (erlang:integer_to_binary(Index))/binary>>}
                    ),
                    {Next@1, Outbox} = lightspeed@channel:flush_outbox(Next),
                    Delivered = count_push(Outbox, 0),
                    Latency = clamp_min((Delivered div 4) + 2, 1),
                    broadcast_loop(
                        Next@1,
                        Topic,
                        Broadcasts,
                        Index + 1,
                        Client_ids,
                        [Latency | Samples_rev],
                        Deliveries + Delivered
                    )
            end
    end.

-file("src/lightspeed/ops/channel_harness.gleam", 74).
-spec join_clients(
    lightspeed@channel:channel(),
    binary(),
    integer(),
    integer(),
    list(binary())
) -> {lightspeed@channel:channel(), list(binary())}.
join_clients(State, Topic, Clients, Index, Ids_rev) ->
    case Index > Clients of
        true ->
            {State, lists:reverse(Ids_rev)};

        false ->
            Client_id = <<"c-"/utf8, (erlang:integer_to_binary(Index))/binary>>,
            User_id = <<"u-"/utf8, (erlang:integer_to_binary(Index))/binary>>,
            Next = lightspeed@channel:handle(
                State,
                {join, Client_id, Topic, User_id, Index}
            ),
            {Next@1, _} = lightspeed@channel:flush_outbox(Next),
            join_clients(
                Next@1,
                Topic,
                Clients,
                Index + 1,
                [Client_id | Ids_rev]
            )
    end.

-file("src/lightspeed/ops/channel_harness.gleam", 27).
?DOC(" Run one fanout scenario.\n").
-spec run_with(integer(), integer(), binary()) -> report().
run_with(Clients, Broadcasts, Topic) ->
    Bounded_clients = clamp_min(Clients, 1),
    Bounded_broadcasts = clamp_min(Broadcasts, 1),
    {State, Client_ids} = join_clients(
        lightspeed@channel:new_single_node(),
        Topic,
        Bounded_clients,
        1,
        []
    ),
    {_, Samples_rev, Deliveries} = broadcast_loop(
        State,
        Topic,
        Bounded_broadcasts,
        1,
        Client_ids,
        [],
        0
    ),
    Samples = lists:reverse(Samples_rev),
    Total_latency = sum(Samples, 0),
    Throughput = divide_safe(Deliveries * 1000, clamp_min(Total_latency, 1)),
    {report,
        lightspeed@channel:adapter_label(State),
        Topic,
        Bounded_clients,
        Bounded_broadcasts,
        Deliveries,
        percentile(Samples, 50),
        percentile(Samples, 95),
        Throughput}.

-file("src/lightspeed/ops/channel_harness.gleam", 22).
?DOC(" Run the default M14 fanout baseline on single-node adapter.\n").
-spec run_default() -> report().
run_default() ->
    run_with(32, 24, <<"room:lobby"/utf8>>).

-file("src/lightspeed/ops/channel_harness.gleam", 55).
?DOC(" Stable report signature for docs and fixture tests.\n").
-spec report_signature(report()) -> binary().
report_signature(Report) ->
    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"adapter="/utf8,
                                                                (erlang:element(
                                                                    2,
                                                                    Report
                                                                ))/binary>>/binary,
                                                            "|topic="/utf8>>/binary,
                                                        (erlang:element(
                                                            3,
                                                            Report
                                                        ))/binary>>/binary,
                                                    "|clients="/utf8>>/binary,
                                                (erlang:integer_to_binary(
                                                    erlang:element(4, Report)
                                                ))/binary>>/binary,
                                            "|broadcasts="/utf8>>/binary,
                                        (erlang:integer_to_binary(
                                            erlang:element(5, Report)
                                        ))/binary>>/binary,
                                    "|deliveries="/utf8>>/binary,
                                (erlang:integer_to_binary(
                                    erlang:element(6, Report)
                                ))/binary>>/binary,
                            "|p50="/utf8>>/binary,
                        (erlang:integer_to_binary(erlang:element(7, Report)))/binary>>/binary,
                    "|p95="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(8, Report)))/binary>>/binary,
            "|throughput="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(9, Report)))/binary>>.