-module(pharos@internal@connection).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/internal/connection.gleam").
-export([from_name/1, backoff_delay/2, jittered_delay/2, start_link/5, child_spec/5, deliver/2, resolve_brain_target/2, brain_connect/1, brain_deliver/2, encode_etf/1, brain_transport/2]).
-export_type([connection_state/0, connection_config/0, transport/0, connection/0, message/0, data/0, brain_target/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(false).
-type connection_state() :: disconnected | connecting | streaming | throttled.
-type connection_config() :: {connection_config,
integer(),
integer(),
integer(),
integer()}.
-type transport() :: {transport,
fun(() -> {ok, nil} | {error, binary()}),
fun((pharos@metric:metric()) -> {ok, nil} | {error, binary()})}.
-opaque connection() :: {connection, gleam@erlang@process:name(message())}.
-type message() :: {deliver, pharos@metric:metric()} |
tick |
attempt |
drain_tick |
flush_tick.
-type data() :: {data,
transport(),
pharos@internal@hot_buffer:hot_buffer(),
gleam@option:option(pharos@internal@spillover:spillover()),
connection_config(),
integer()}.
-type brain_target() :: any().
-file("src/pharos/internal/connection.gleam", 111).
?DOC(false).
-spec from_name(gleam@erlang@process:name(message())) -> connection().
from_name(Name) ->
{connection, Name}.
-file("src/pharos/internal/connection.gleam", 191).
?DOC(false).
-spec server_ref(connection()) -> eparch@state_machine:server_ref(message()).
server_ref(Connection) ->
statem_ffi:ref_from_subject(
gleam@erlang@process:named_subject(erlang:element(2, Connection))
).
-file("src/pharos/internal/connection.gleam", 187).
?DOC(false).
-spec schedule_first_tick(connection()) -> nil.
schedule_first_tick(Connection) ->
statem_ffi:cast(server_ref(Connection), tick).
-file("src/pharos/internal/connection.gleam", 432).
?DOC(false).
-spec backoff_delay(connection_config(), integer()) -> integer().
backoff_delay(Config, Attempt) ->
Exponent = gleam@int:min(Attempt, 16),
Raw = erlang:element(2, Config) * erlang:'bsl'(1, Exponent),
gleam@int:min(Raw, erlang:element(3, Config)).
-file("src/pharos/internal/connection.gleam", 442).
?DOC(false).
-spec jittered_delay(connection_config(), integer()) -> integer().
jittered_delay(Config, Attempt) ->
gleam@int:random(backoff_delay(Config, Attempt)).
-file("src/pharos/internal/connection.gleam", 354).
?DOC(false).
-spec go_disconnected(data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
go_disconnected(Data) ->
Bumped = {data,
erlang:element(2, Data),
erlang:element(3, Data),
erlang:element(4, Data),
erlang:element(5, Data),
erlang:element(6, Data) + 1},
Delay = jittered_delay(erlang:element(5, Data), erlang:element(6, Data)),
eparch@state_machine:next_state(
disconnected,
Bumped,
[{state_timeout, {'after', Delay}, tick}]
).
-file("src/pharos/internal/connection.gleam", 409).
?DOC(false).
-spec deliver_all(data(), list(pharos@metric:metric())) -> {ok, nil} |
{error, binary()}.
deliver_all(Data, Metrics) ->
case Metrics of
[] ->
{ok, nil};
[Metric | Rest] ->
case (erlang:element(3, erlang:element(2, Data)))(Metric) of
{ok, nil} ->
deliver_all(Data, Rest);
{error, Reason} ->
gleam@list:each(
[Metric | Rest],
fun(M) ->
_ = pharos@internal@hot_buffer:push(
erlang:element(3, Data),
M
)
end
),
{error, Reason}
end
end.
-file("src/pharos/internal/connection.gleam", 391).
?DOC(false).
-spec deliver_cold(
data(),
pharos@internal@spillover:spillover(),
list(pharos@metric:metric())
) -> {ok, nil} | {error, binary()}.
deliver_cold(Data, Sp, Metrics) ->
case Metrics of
[] ->
{ok, nil};
[Metric | Rest] ->
case (erlang:element(3, erlang:element(2, Data)))(Metric) of
{ok, nil} ->
deliver_cold(Data, Sp, Rest);
{error, Reason} ->
pharos@internal@spillover:push_all(Sp, [Metric | Rest]),
{error, Reason}
end
end.
-file("src/pharos/internal/connection.gleam", 384).
?DOC(false).
-spec replay_cold(data()) -> {ok, nil} | {error, binary()}.
replay_cold(Data) ->
case erlang:element(4, Data) of
none ->
{ok, nil};
{some, Sp} ->
deliver_cold(Data, Sp, pharos@internal@spillover:drain(Sp))
end.
-file("src/pharos/internal/connection.gleam", 373).
?DOC(false).
-spec flush(data()) -> {ok, nil} | {error, binary()}.
flush(Data) ->
case replay_cold(Data) of
{error, Reason} ->
{error, Reason};
{ok, nil} ->
deliver_all(
Data,
pharos@internal@hot_buffer:drain(erlang:element(3, Data))
)
end.
-file("src/pharos/internal/connection.gleam", 315).
?DOC(false).
-spec on_flush_tick(connection_state(), data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_flush_tick(State, Data) ->
case State of
streaming ->
case flush(Data) of
{error, _} ->
go_disconnected(Data);
{ok, nil} ->
eparch@state_machine:keep_state(
Data,
[{state_timeout,
{'after',
erlang:element(5, erlang:element(5, Data))},
flush_tick}]
)
end;
disconnected ->
eparch@state_machine:keep_state(Data, []);
connecting ->
eparch@state_machine:keep_state(Data, []);
throttled ->
eparch@state_machine:keep_state(Data, [])
end.
-file("src/pharos/internal/connection.gleam", 336).
?DOC(false).
-spec maybe_throttle(data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
maybe_throttle(Data) ->
case pharos@internal@hot_buffer:size(erlang:element(3, Data)) > erlang:element(
4,
erlang:element(5, Data)
) of
true ->
eparch@state_machine:next_state(
throttled,
Data,
[{state_timeout,
{'after', erlang:element(2, erlang:element(5, Data))},
drain_tick}]
);
false ->
eparch@state_machine:next_state(
streaming,
Data,
[{state_timeout,
{'after', erlang:element(5, erlang:element(5, Data))},
flush_tick}]
)
end.
-file("src/pharos/internal/connection.gleam", 300).
?DOC(false).
-spec on_drain_tick(connection_state(), data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_drain_tick(State, Data) ->
case State of
throttled ->
case flush(Data) of
{error, _} ->
go_disconnected(Data);
{ok, nil} ->
maybe_throttle(Data)
end;
disconnected ->
eparch@state_machine:keep_state(Data, []);
connecting ->
eparch@state_machine:keep_state(Data, []);
streaming ->
eparch@state_machine:keep_state(Data, [])
end.
-file("src/pharos/internal/connection.gleam", 278).
?DOC(false).
-spec on_attempt(connection_state(), data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_attempt(State, Data) ->
case State of
connecting ->
case (erlang:element(2, erlang:element(2, Data)))() of
{error, _} ->
go_disconnected(Data);
{ok, nil} ->
case flush(Data) of
{error, _} ->
go_disconnected(Data);
{ok, nil} ->
Reset = {data,
erlang:element(2, Data),
erlang:element(3, Data),
erlang:element(4, Data),
erlang:element(5, Data),
0},
maybe_throttle(Reset)
end
end;
disconnected ->
eparch@state_machine:keep_state(Data, []);
streaming ->
eparch@state_machine:keep_state(Data, []);
throttled ->
eparch@state_machine:keep_state(Data, [])
end.
-file("src/pharos/internal/connection.gleam", 263).
?DOC(false).
-spec on_tick(connection_state(), data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_tick(State, Data) ->
case State of
disconnected ->
eparch@state_machine:next_state(
connecting,
Data,
[{state_timeout, {'after', 0}, attempt}]
);
connecting ->
eparch@state_machine:keep_state(Data, []);
streaming ->
eparch@state_machine:keep_state(Data, []);
throttled ->
eparch@state_machine:keep_state(Data, [])
end.
-file("src/pharos/internal/connection.gleam", 250).
?DOC(false).
-spec buffer_with_spill(data(), pharos@metric:metric()) -> nil.
buffer_with_spill(Data, Metric) ->
case erlang:element(4, Data) of
{some, Sp} ->
case pharos@internal@hot_buffer:size(erlang:element(3, Data)) >= erlang:element(
4,
erlang:element(5, Data)
) of
true ->
pharos@internal@spillover:push_all(
Sp,
pharos@internal@hot_buffer:drain(
erlang:element(3, Data)
)
);
false ->
nil
end;
none ->
nil
end,
_ = pharos@internal@hot_buffer:push(erlang:element(3, Data), Metric),
nil.
-file("src/pharos/internal/connection.gleam", 224).
?DOC(false).
-spec on_deliver(connection_state(), data(), pharos@metric:metric()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_deliver(State, Data, Metric) ->
case State of
disconnected ->
buffer_with_spill(Data, Metric),
eparch@state_machine:keep_state(Data, []);
connecting ->
buffer_with_spill(Data, Metric),
eparch@state_machine:keep_state(Data, []);
throttled ->
buffer_with_spill(Data, Metric),
eparch@state_machine:keep_state(Data, []);
streaming ->
_ = pharos@internal@hot_buffer:push(erlang:element(3, Data), Metric),
case flush(Data) of
{ok, nil} ->
maybe_throttle(Data);
{error, _} ->
go_disconnected(Data)
end
end.
-file("src/pharos/internal/connection.gleam", 199).
?DOC(false).
-spec handle_event(
eparch@state_machine:event(connection_state(), message(), JBE),
connection_state(),
data()
) -> eparch@state_machine:step(connection_state(), data(), message(), JBE).
handle_event(Event, State, Data) ->
case Event of
{cast, {deliver, Metric}} ->
on_deliver(State, Data, Metric);
{cast, tick} ->
on_tick(State, Data);
{cast, attempt} ->
on_attempt(State, Data);
{cast, drain_tick} ->
on_drain_tick(State, Data);
{cast, flush_tick} ->
on_flush_tick(State, Data);
{timeout, state_timeout_type, tick} ->
on_tick(State, Data);
{timeout, state_timeout_type, attempt} ->
on_attempt(State, Data);
{timeout, state_timeout_type, drain_tick} ->
on_drain_tick(State, Data);
{timeout, state_timeout_type, flush_tick} ->
on_flush_tick(State, Data);
{timeout, _, _} ->
eparch@state_machine:keep_state(Data, []);
{info, _} ->
eparch@state_machine:keep_state(Data, []);
{call, _, _} ->
eparch@state_machine:keep_state(Data, []);
{enter, _} ->
eparch@state_machine:keep_state(Data, [])
end.
-file("src/pharos/internal/connection.gleam", 147).
?DOC(false).
-spec do_start(
gleam@erlang@process:name(message()),
transport(),
pharos@internal@hot_buffer:hot_buffer(),
gleam@option:option(pharos@internal@spillover:spillover()),
connection_config()
) -> {ok, {gleam@erlang@process:pid_(), connection()}} |
{error, eparch@state_machine:start_error()}.
do_start(Name, Transport, Buffer, Spillover, Config) ->
Initial = {data, Transport, Buffer, Spillover, Config, 0},
Result = begin
_pipe = eparch@state_machine:new(disconnected, Initial),
_pipe@1 = eparch@state_machine:named(_pipe, {local, Name}),
_pipe@2 = eparch@state_machine:on_event(_pipe@1, fun handle_event/3),
eparch@state_machine:start_link(_pipe@2)
end,
case Result of
{ok, Started} ->
Connection = {connection, Name},
schedule_first_tick(Connection),
{ok, {erlang:element(2, Started), Connection}};
{error, Error} ->
{error, Error}
end.
-file("src/pharos/internal/connection.gleam", 118).
?DOC(false).
-spec start_link(
gleam@erlang@process:name(message()),
transport(),
pharos@internal@hot_buffer:hot_buffer(),
gleam@option:option(pharos@internal@spillover:spillover()),
connection_config()
) -> {ok, connection()} | {error, eparch@state_machine:start_error()}.
start_link(Name, Transport, Buffer, Spillover, Config) ->
case do_start(Name, Transport, Buffer, Spillover, Config) of
{ok, {_, Connection}} ->
{ok, Connection};
{error, Error} ->
{error, Error}
end.
-file("src/pharos/internal/connection.gleam", 172).
?DOC(false).
-spec start_error_to_string(eparch@state_machine:start_error()) -> binary().
start_error_to_string(Error) ->
case Error of
init_timeout ->
<<"connection manager init timeout"/utf8>>;
{init_failed, Reason} ->
Reason;
{init_exited, _} ->
<<"connection manager init exited"/utf8>>;
{already_started, _} ->
<<"connection manager name already registered"/utf8>>
end.
-file("src/pharos/internal/connection.gleam", 132).
?DOC(false).
-spec child_spec(
gleam@erlang@process:name(message()),
transport(),
pharos@internal@hot_buffer:hot_buffer(),
gleam@option:option(pharos@internal@spillover:spillover()),
connection_config()
) -> gleam@otp@supervision:child_specification(connection()).
child_spec(Name, Transport, Buffer, Spillover, Config) ->
gleam@otp@supervision:worker(
fun() -> case do_start(Name, Transport, Buffer, Spillover, Config) of
{ok, {Pid, Connection}} ->
{ok, {started, Pid, Connection}};
{error, Error} ->
{error, {init_failed, start_error_to_string(Error)}}
end end
).
-file("src/pharos/internal/connection.gleam", 183).
?DOC(false).
-spec deliver(connection(), pharos@metric:metric()) -> nil.
deliver(Connection, Metric) ->
statem_ffi:cast(server_ref(Connection), {deliver, Metric}).
-file("src/pharos/internal/connection.gleam", 460).
?DOC(false).
-spec resolve_brain_target(binary(), binary()) -> brain_target().
resolve_brain_target(Node, Name) ->
pharos_ffi:brain_target(Node, Name).
-file("src/pharos/internal/connection.gleam", 464).
?DOC(false).
-spec brain_connect(brain_target()) -> {ok, nil} | {error, binary()}.
brain_connect(Target) ->
pharos_ffi:brain_connect(Target).
-file("src/pharos/internal/connection.gleam", 468).
?DOC(false).
-spec brain_deliver(brain_target(), bitstring()) -> {ok, nil} |
{error, binary()}.
brain_deliver(Target, Payload) ->
pharos_ffi:brain_deliver(Target, Payload).
-file("src/pharos/internal/connection.gleam", 475).
?DOC(false).
-spec encode_etf(any()) -> bitstring().
encode_etf(Term) ->
pharos_ffi:encode_etf(Term).
-file("src/pharos/internal/connection.gleam", 480).
?DOC(false).
-spec brain_transport(binary(), binary()) -> transport().
brain_transport(Node, Name) ->
Target = pharos_ffi:brain_target(Node, Name),
{transport,
fun() -> pharos_ffi:brain_connect(Target) end,
fun(Metric) ->
pharos_ffi:brain_deliver(Target, pharos_ffi:encode_etf(Metric))
end}.