Skip to main content

src/pharos@internal@connection.erl

-module(pharos@internal@connection).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/internal/connection.gleam").
-export([from_name/1, backoff_delay/2, jittered_delay/2, start_link/5, child_spec/5, deliver/2, resolve_brain_target/2, brain_connect/1, brain_deliver/2, encode_etf/1, brain_transport/2]).
-export_type([connection_state/0, connection_config/0, transport/0, connection/0, message/0, data/0, brain_target/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(false).

-type connection_state() :: disconnected | connecting | streaming | throttled.

-type connection_config() :: {connection_config,
        integer(),
        integer(),
        integer(),
        integer()}.

-type transport() :: {transport,
        fun(() -> {ok, nil} | {error, binary()}),
        fun((pharos@metric:metric()) -> {ok, nil} | {error, binary()})}.

-opaque connection() :: {connection, gleam@erlang@process:name(message())}.

-type message() :: {deliver, pharos@metric:metric()} |
    tick |
    attempt |
    drain_tick |
    flush_tick.

-type data() :: {data,
        transport(),
        pharos@internal@hot_buffer:hot_buffer(),
        gleam@option:option(pharos@internal@spillover:spillover()),
        connection_config(),
        integer()}.

-type brain_target() :: any().

-file("src/pharos/internal/connection.gleam", 111).
?DOC(false).
-spec from_name(gleam@erlang@process:name(message())) -> connection().
from_name(Name) ->
    {connection, Name}.

-file("src/pharos/internal/connection.gleam", 191).
?DOC(false).
-spec server_ref(connection()) -> eparch@state_machine:server_ref(message()).
server_ref(Connection) ->
    statem_ffi:ref_from_subject(
        gleam@erlang@process:named_subject(erlang:element(2, Connection))
    ).

-file("src/pharos/internal/connection.gleam", 187).
?DOC(false).
-spec schedule_first_tick(connection()) -> nil.
schedule_first_tick(Connection) ->
    statem_ffi:cast(server_ref(Connection), tick).

-file("src/pharos/internal/connection.gleam", 432).
?DOC(false).
-spec backoff_delay(connection_config(), integer()) -> integer().
backoff_delay(Config, Attempt) ->
    Exponent = gleam@int:min(Attempt, 16),
    Raw = erlang:element(2, Config) * erlang:'bsl'(1, Exponent),
    gleam@int:min(Raw, erlang:element(3, Config)).

-file("src/pharos/internal/connection.gleam", 442).
?DOC(false).
-spec jittered_delay(connection_config(), integer()) -> integer().
jittered_delay(Config, Attempt) ->
    gleam@int:random(backoff_delay(Config, Attempt)).

-file("src/pharos/internal/connection.gleam", 354).
?DOC(false).
-spec go_disconnected(data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
go_disconnected(Data) ->
    Bumped = {data,
        erlang:element(2, Data),
        erlang:element(3, Data),
        erlang:element(4, Data),
        erlang:element(5, Data),
        erlang:element(6, Data) + 1},
    Delay = jittered_delay(erlang:element(5, Data), erlang:element(6, Data)),
    eparch@state_machine:next_state(
        disconnected,
        Bumped,
        [{state_timeout, {'after', Delay}, tick}]
    ).

-file("src/pharos/internal/connection.gleam", 409).
?DOC(false).
-spec deliver_all(data(), list(pharos@metric:metric())) -> {ok, nil} |
    {error, binary()}.
deliver_all(Data, Metrics) ->
    case Metrics of
        [] ->
            {ok, nil};

        [Metric | Rest] ->
            case (erlang:element(3, erlang:element(2, Data)))(Metric) of
                {ok, nil} ->
                    deliver_all(Data, Rest);

                {error, Reason} ->
                    gleam@list:each(
                        [Metric | Rest],
                        fun(M) ->
                            _ = pharos@internal@hot_buffer:push(
                                erlang:element(3, Data),
                                M
                            )
                        end
                    ),
                    {error, Reason}
            end
    end.

-file("src/pharos/internal/connection.gleam", 391).
?DOC(false).
-spec deliver_cold(
    data(),
    pharos@internal@spillover:spillover(),
    list(pharos@metric:metric())
) -> {ok, nil} | {error, binary()}.
deliver_cold(Data, Sp, Metrics) ->
    case Metrics of
        [] ->
            {ok, nil};

        [Metric | Rest] ->
            case (erlang:element(3, erlang:element(2, Data)))(Metric) of
                {ok, nil} ->
                    deliver_cold(Data, Sp, Rest);

                {error, Reason} ->
                    pharos@internal@spillover:push_all(Sp, [Metric | Rest]),
                    {error, Reason}
            end
    end.

-file("src/pharos/internal/connection.gleam", 384).
?DOC(false).
-spec replay_cold(data()) -> {ok, nil} | {error, binary()}.
replay_cold(Data) ->
    case erlang:element(4, Data) of
        none ->
            {ok, nil};

        {some, Sp} ->
            deliver_cold(Data, Sp, pharos@internal@spillover:drain(Sp))
    end.

-file("src/pharos/internal/connection.gleam", 373).
?DOC(false).
-spec flush(data()) -> {ok, nil} | {error, binary()}.
flush(Data) ->
    case replay_cold(Data) of
        {error, Reason} ->
            {error, Reason};

        {ok, nil} ->
            deliver_all(
                Data,
                pharos@internal@hot_buffer:drain(erlang:element(3, Data))
            )
    end.

-file("src/pharos/internal/connection.gleam", 315).
?DOC(false).
-spec on_flush_tick(connection_state(), data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_flush_tick(State, Data) ->
    case State of
        streaming ->
            case flush(Data) of
                {error, _} ->
                    go_disconnected(Data);

                {ok, nil} ->
                    eparch@state_machine:keep_state(
                        Data,
                        [{state_timeout,
                                {'after',
                                    erlang:element(5, erlang:element(5, Data))},
                                flush_tick}]
                    )
            end;

        disconnected ->
            eparch@state_machine:keep_state(Data, []);

        connecting ->
            eparch@state_machine:keep_state(Data, []);

        throttled ->
            eparch@state_machine:keep_state(Data, [])
    end.

-file("src/pharos/internal/connection.gleam", 336).
?DOC(false).
-spec maybe_throttle(data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
maybe_throttle(Data) ->
    case pharos@internal@hot_buffer:size(erlang:element(3, Data)) > erlang:element(
        4,
        erlang:element(5, Data)
    ) of
        true ->
            eparch@state_machine:next_state(
                throttled,
                Data,
                [{state_timeout,
                        {'after', erlang:element(2, erlang:element(5, Data))},
                        drain_tick}]
            );

        false ->
            eparch@state_machine:next_state(
                streaming,
                Data,
                [{state_timeout,
                        {'after', erlang:element(5, erlang:element(5, Data))},
                        flush_tick}]
            )
    end.

-file("src/pharos/internal/connection.gleam", 300).
?DOC(false).
-spec on_drain_tick(connection_state(), data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_drain_tick(State, Data) ->
    case State of
        throttled ->
            case flush(Data) of
                {error, _} ->
                    go_disconnected(Data);

                {ok, nil} ->
                    maybe_throttle(Data)
            end;

        disconnected ->
            eparch@state_machine:keep_state(Data, []);

        connecting ->
            eparch@state_machine:keep_state(Data, []);

        streaming ->
            eparch@state_machine:keep_state(Data, [])
    end.

-file("src/pharos/internal/connection.gleam", 278).
?DOC(false).
-spec on_attempt(connection_state(), data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_attempt(State, Data) ->
    case State of
        connecting ->
            case (erlang:element(2, erlang:element(2, Data)))() of
                {error, _} ->
                    go_disconnected(Data);

                {ok, nil} ->
                    case flush(Data) of
                        {error, _} ->
                            go_disconnected(Data);

                        {ok, nil} ->
                            Reset = {data,
                                erlang:element(2, Data),
                                erlang:element(3, Data),
                                erlang:element(4, Data),
                                erlang:element(5, Data),
                                0},
                            maybe_throttle(Reset)
                    end
            end;

        disconnected ->
            eparch@state_machine:keep_state(Data, []);

        streaming ->
            eparch@state_machine:keep_state(Data, []);

        throttled ->
            eparch@state_machine:keep_state(Data, [])
    end.

-file("src/pharos/internal/connection.gleam", 263).
?DOC(false).
-spec on_tick(connection_state(), data()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_tick(State, Data) ->
    case State of
        disconnected ->
            eparch@state_machine:next_state(
                connecting,
                Data,
                [{state_timeout, {'after', 0}, attempt}]
            );

        connecting ->
            eparch@state_machine:keep_state(Data, []);

        streaming ->
            eparch@state_machine:keep_state(Data, []);

        throttled ->
            eparch@state_machine:keep_state(Data, [])
    end.

-file("src/pharos/internal/connection.gleam", 250).
?DOC(false).
-spec buffer_with_spill(data(), pharos@metric:metric()) -> nil.
buffer_with_spill(Data, Metric) ->
    case erlang:element(4, Data) of
        {some, Sp} ->
            case pharos@internal@hot_buffer:size(erlang:element(3, Data)) >= erlang:element(
                4,
                erlang:element(5, Data)
            ) of
                true ->
                    pharos@internal@spillover:push_all(
                        Sp,
                        pharos@internal@hot_buffer:drain(
                            erlang:element(3, Data)
                        )
                    );

                false ->
                    nil
            end;

        none ->
            nil
    end,
    _ = pharos@internal@hot_buffer:push(erlang:element(3, Data), Metric),
    nil.

-file("src/pharos/internal/connection.gleam", 224).
?DOC(false).
-spec on_deliver(connection_state(), data(), pharos@metric:metric()) -> eparch@state_machine:step(connection_state(), data(), message(), any()).
on_deliver(State, Data, Metric) ->
    case State of
        disconnected ->
            buffer_with_spill(Data, Metric),
            eparch@state_machine:keep_state(Data, []);

        connecting ->
            buffer_with_spill(Data, Metric),
            eparch@state_machine:keep_state(Data, []);

        throttled ->
            buffer_with_spill(Data, Metric),
            eparch@state_machine:keep_state(Data, []);

        streaming ->
            _ = pharos@internal@hot_buffer:push(erlang:element(3, Data), Metric),
            case flush(Data) of
                {ok, nil} ->
                    maybe_throttle(Data);

                {error, _} ->
                    go_disconnected(Data)
            end
    end.

-file("src/pharos/internal/connection.gleam", 199).
?DOC(false).
-spec handle_event(
    eparch@state_machine:event(connection_state(), message(), JBE),
    connection_state(),
    data()
) -> eparch@state_machine:step(connection_state(), data(), message(), JBE).
handle_event(Event, State, Data) ->
    case Event of
        {cast, {deliver, Metric}} ->
            on_deliver(State, Data, Metric);

        {cast, tick} ->
            on_tick(State, Data);

        {cast, attempt} ->
            on_attempt(State, Data);

        {cast, drain_tick} ->
            on_drain_tick(State, Data);

        {cast, flush_tick} ->
            on_flush_tick(State, Data);

        {timeout, state_timeout_type, tick} ->
            on_tick(State, Data);

        {timeout, state_timeout_type, attempt} ->
            on_attempt(State, Data);

        {timeout, state_timeout_type, drain_tick} ->
            on_drain_tick(State, Data);

        {timeout, state_timeout_type, flush_tick} ->
            on_flush_tick(State, Data);

        {timeout, _, _} ->
            eparch@state_machine:keep_state(Data, []);

        {info, _} ->
            eparch@state_machine:keep_state(Data, []);

        {call, _, _} ->
            eparch@state_machine:keep_state(Data, []);

        {enter, _} ->
            eparch@state_machine:keep_state(Data, [])
    end.

-file("src/pharos/internal/connection.gleam", 147).
?DOC(false).
-spec do_start(
    gleam@erlang@process:name(message()),
    transport(),
    pharos@internal@hot_buffer:hot_buffer(),
    gleam@option:option(pharos@internal@spillover:spillover()),
    connection_config()
) -> {ok, {gleam@erlang@process:pid_(), connection()}} |
    {error, eparch@state_machine:start_error()}.
do_start(Name, Transport, Buffer, Spillover, Config) ->
    Initial = {data, Transport, Buffer, Spillover, Config, 0},
    Result = begin
        _pipe = eparch@state_machine:new(disconnected, Initial),
        _pipe@1 = eparch@state_machine:named(_pipe, {local, Name}),
        _pipe@2 = eparch@state_machine:on_event(_pipe@1, fun handle_event/3),
        eparch@state_machine:start_link(_pipe@2)
    end,
    case Result of
        {ok, Started} ->
            Connection = {connection, Name},
            schedule_first_tick(Connection),
            {ok, {erlang:element(2, Started), Connection}};

        {error, Error} ->
            {error, Error}
    end.

-file("src/pharos/internal/connection.gleam", 118).
?DOC(false).
-spec start_link(
    gleam@erlang@process:name(message()),
    transport(),
    pharos@internal@hot_buffer:hot_buffer(),
    gleam@option:option(pharos@internal@spillover:spillover()),
    connection_config()
) -> {ok, connection()} | {error, eparch@state_machine:start_error()}.
start_link(Name, Transport, Buffer, Spillover, Config) ->
    case do_start(Name, Transport, Buffer, Spillover, Config) of
        {ok, {_, Connection}} ->
            {ok, Connection};

        {error, Error} ->
            {error, Error}
    end.

-file("src/pharos/internal/connection.gleam", 172).
?DOC(false).
-spec start_error_to_string(eparch@state_machine:start_error()) -> binary().
start_error_to_string(Error) ->
    case Error of
        init_timeout ->
            <<"connection manager init timeout"/utf8>>;

        {init_failed, Reason} ->
            Reason;

        {init_exited, _} ->
            <<"connection manager init exited"/utf8>>;

        {already_started, _} ->
            <<"connection manager name already registered"/utf8>>
    end.

-file("src/pharos/internal/connection.gleam", 132).
?DOC(false).
-spec child_spec(
    gleam@erlang@process:name(message()),
    transport(),
    pharos@internal@hot_buffer:hot_buffer(),
    gleam@option:option(pharos@internal@spillover:spillover()),
    connection_config()
) -> gleam@otp@supervision:child_specification(connection()).
child_spec(Name, Transport, Buffer, Spillover, Config) ->
    gleam@otp@supervision:worker(
        fun() -> case do_start(Name, Transport, Buffer, Spillover, Config) of
                {ok, {Pid, Connection}} ->
                    {ok, {started, Pid, Connection}};

                {error, Error} ->
                    {error, {init_failed, start_error_to_string(Error)}}
            end end
    ).

-file("src/pharos/internal/connection.gleam", 183).
?DOC(false).
-spec deliver(connection(), pharos@metric:metric()) -> nil.
deliver(Connection, Metric) ->
    statem_ffi:cast(server_ref(Connection), {deliver, Metric}).

-file("src/pharos/internal/connection.gleam", 460).
?DOC(false).
-spec resolve_brain_target(binary(), binary()) -> brain_target().
resolve_brain_target(Node, Name) ->
    pharos_ffi:brain_target(Node, Name).

-file("src/pharos/internal/connection.gleam", 464).
?DOC(false).
-spec brain_connect(brain_target()) -> {ok, nil} | {error, binary()}.
brain_connect(Target) ->
    pharos_ffi:brain_connect(Target).

-file("src/pharos/internal/connection.gleam", 468).
?DOC(false).
-spec brain_deliver(brain_target(), bitstring()) -> {ok, nil} |
    {error, binary()}.
brain_deliver(Target, Payload) ->
    pharos_ffi:brain_deliver(Target, Payload).

-file("src/pharos/internal/connection.gleam", 475).
?DOC(false).
-spec encode_etf(any()) -> bitstring().
encode_etf(Term) ->
    pharos_ffi:encode_etf(Term).

-file("src/pharos/internal/connection.gleam", 480).
?DOC(false).
-spec brain_transport(binary(), binary()) -> transport().
brain_transport(Node, Name) ->
    Target = pharos_ffi:brain_target(Node, Name),
    {transport,
        fun() -> pharos_ffi:brain_connect(Target) end,
        fun(Metric) ->
            pharos_ffi:brain_deliver(Target, pharos_ffi:encode_etf(Metric))
        end}.