-module(lightspeed@ops@channel_harness).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/ops/channel_harness.gleam").
-export([run_with/3, run_default/0, report_signature/1]).
-export_type([report/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Deterministic channel fanout harness for M14 baseline reporting.\n").
-type report() :: {report,
binary(),
binary(),
integer(),
integer(),
integer(),
integer(),
integer(),
integer()}.
-file("src/lightspeed/ops/channel_harness.gleam", 169).
-spec at(list(integer()), integer(), integer()) -> integer().
at(Values, Index, Fallback) ->
case {Values, Index} of
{[], _} ->
Fallback;
{[Value | _], 0} ->
Value;
{[_ | Rest], _} ->
at(Rest, Index - 1, Fallback)
end.
-file("src/lightspeed/ops/channel_harness.gleam", 184).
-spec clamp_min(integer(), integer()) -> integer().
clamp_min(Value, Min) ->
case Value < Min of
true ->
Min;
false ->
Value
end.
-file("src/lightspeed/ops/channel_harness.gleam", 155).
-spec percentile(list(integer()), integer()) -> integer().
percentile(Values, Percent) ->
Sorted = gleam@list:sort(Values, fun gleam@int:compare/2),
Length = erlang:length(Sorted),
case Length of
0 ->
0;
_ ->
Scaled = (Length - 1) * Percent,
Rank = clamp_min(Scaled div 100, 0),
at(Sorted, Rank, 0)
end.
-file("src/lightspeed/ops/channel_harness.gleam", 191).
-spec divide_safe(integer(), integer()) -> integer().
divide_safe(Value, Divisor) ->
case Divisor =< 0 of
true ->
0;
false ->
case Divisor of
0 -> 0;
Gleam@denominator -> Value div Gleam@denominator
end
end.
-file("src/lightspeed/ops/channel_harness.gleam", 177).
-spec sum(list(integer()), integer()) -> integer().
sum(Values, Total) ->
case Values of
[] ->
Total;
[Value | Rest] ->
sum(Rest, Total + Value)
end.
-file("src/lightspeed/ops/channel_harness.gleam", 144).
-spec count_push(list(lightspeed@channel:outbound()), integer()) -> integer().
count_push(Outbox, Total) ->
case Outbox of
[] ->
Total;
[Message | Rest] ->
case Message of
{push_message, _, _, _, _} ->
count_push(Rest, Total + 1);
_ ->
count_push(Rest, Total)
end
end.
-file("src/lightspeed/ops/channel_harness.gleam", 102).
-spec broadcast_loop(
lightspeed@channel:channel(),
binary(),
integer(),
integer(),
list(binary()),
list(integer()),
integer()
) -> {lightspeed@channel:channel(), list(integer()), integer()}.
broadcast_loop(
State,
Topic,
Broadcasts,
Index,
Client_ids,
Samples_rev,
Deliveries
) ->
case Index > Broadcasts of
true ->
{State, Samples_rev, Deliveries};
false ->
case Client_ids of
[] ->
{State, Samples_rev, Deliveries};
[Sender | _] ->
Next = lightspeed@channel:handle(
State,
{broadcast,
Sender,
Topic,
<<"m-"/utf8,
(erlang:integer_to_binary(Index))/binary>>}
),
{Next@1, Outbox} = lightspeed@channel:flush_outbox(Next),
Delivered = count_push(Outbox, 0),
Latency = clamp_min((Delivered div 4) + 2, 1),
broadcast_loop(
Next@1,
Topic,
Broadcasts,
Index + 1,
Client_ids,
[Latency | Samples_rev],
Deliveries + Delivered
)
end
end.
-file("src/lightspeed/ops/channel_harness.gleam", 74).
-spec join_clients(
lightspeed@channel:channel(),
binary(),
integer(),
integer(),
list(binary())
) -> {lightspeed@channel:channel(), list(binary())}.
join_clients(State, Topic, Clients, Index, Ids_rev) ->
case Index > Clients of
true ->
{State, lists:reverse(Ids_rev)};
false ->
Client_id = <<"c-"/utf8, (erlang:integer_to_binary(Index))/binary>>,
User_id = <<"u-"/utf8, (erlang:integer_to_binary(Index))/binary>>,
Next = lightspeed@channel:handle(
State,
{join, Client_id, Topic, User_id, Index}
),
{Next@1, _} = lightspeed@channel:flush_outbox(Next),
join_clients(
Next@1,
Topic,
Clients,
Index + 1,
[Client_id | Ids_rev]
)
end.
-file("src/lightspeed/ops/channel_harness.gleam", 27).
?DOC(" Run one fanout scenario.\n").
-spec run_with(integer(), integer(), binary()) -> report().
run_with(Clients, Broadcasts, Topic) ->
Bounded_clients = clamp_min(Clients, 1),
Bounded_broadcasts = clamp_min(Broadcasts, 1),
{State, Client_ids} = join_clients(
lightspeed@channel:new_single_node(),
Topic,
Bounded_clients,
1,
[]
),
{_, Samples_rev, Deliveries} = broadcast_loop(
State,
Topic,
Bounded_broadcasts,
1,
Client_ids,
[],
0
),
Samples = lists:reverse(Samples_rev),
Total_latency = sum(Samples, 0),
Throughput = divide_safe(Deliveries * 1000, clamp_min(Total_latency, 1)),
{report,
lightspeed@channel:adapter_label(State),
Topic,
Bounded_clients,
Bounded_broadcasts,
Deliveries,
percentile(Samples, 50),
percentile(Samples, 95),
Throughput}.
-file("src/lightspeed/ops/channel_harness.gleam", 22).
?DOC(" Run the default M14 fanout baseline on single-node adapter.\n").
-spec run_default() -> report().
run_default() ->
run_with(32, 24, <<"room:lobby"/utf8>>).
-file("src/lightspeed/ops/channel_harness.gleam", 55).
?DOC(" Stable report signature for docs and fixture tests.\n").
-spec report_signature(report()) -> binary().
report_signature(Report) ->
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"adapter="/utf8,
(erlang:element(
2,
Report
))/binary>>/binary,
"|topic="/utf8>>/binary,
(erlang:element(
3,
Report
))/binary>>/binary,
"|clients="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(4, Report)
))/binary>>/binary,
"|broadcasts="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(5, Report)
))/binary>>/binary,
"|deliveries="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(6, Report)
))/binary>>/binary,
"|p50="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(7, Report)))/binary>>/binary,
"|p95="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(8, Report)))/binary>>/binary,
"|throughput="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(9, Report)))/binary>>.