Skip to main content

src/pharos.erl

-module(pharos).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos.gleam").
-export([start_link/1, subscribe/2, unsubscribe/2, stop/1]).
-export_type([pharos/0, start_error/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " Public API.\n"
    "\n"
    " `start_link/1` boots the entire pharos supervision tree from a\n"
    " `Config`. The returned `Pharos` handle lets callers `subscribe` to\n"
    " alert events, `unsubscribe` later, and `stop` everything when done.\n"
    "\n"
    " ## Example\n"
    "\n"
    " ```gleam\n"
    " import pharos\n"
    " import pharos/config\n"
    " import pharos/statistic\n"
    "\n"
    " let assert Ok(started) =\n"
    "   pharos.start_link(\n"
    "     config.new()\n"
    "     |> config.with_statistics([\n"
    "       statistic.poll(statistic.BeamMemory),\n"
    "       statistic.poll_every(statistic.BeamRunQueues, 500),\n"
    "     ])\n"
    "     |> config.with_thresholds([config.TotalMemory(above: 500.0)]),\n"
    "   )\n"
    "\n"
    " let assert Ok(_handler) =\n"
    "   pharos.subscribe(started.data, fn(event) { handle(event) })\n"
    " ```\n"
).

-opaque pharos() :: {pharos,
        gleam@erlang@process:pid_(),
        pharos@event_bus:event_bus(),
        gleam@erlang@atom:atom_()}.

-type start_error() :: {supervisor_start_failed, binary()} |
    {telemetry_attach_failed, binary()}.

-file("src/pharos.gleam", 278).
-spec format_attach_error(pharos@internal@telemetry:attach_error()) -> binary().
format_attach_error(Error) ->
    case Error of
        handler_already_attached ->
            <<"telemetry handler already attached"/utf8>>;

        {attach_failed, Reason} ->
            Reason
    end.

-file("src/pharos.gleam", 270).
-spec format_actor_error(gleam@otp@actor:start_error()) -> binary().
format_actor_error(Error) ->
    case Error of
        init_timeout ->
            <<"supervisor initialisation timed out"/utf8>>;

        {init_failed, Reason} ->
            Reason;

        {init_exited, _} ->
            <<"supervisor exited during initialisation"/utf8>>
    end.

-file("src/pharos.gleam", 255).
?DOC(
    " Build the runtime `WindowSpec` for a windowed threshold (the inner\n"
    " threshold supplies the metric's limit); `None` for per-tick thresholds.\n"
).
-spec window_spec(pharos@config:threshold()) -> gleam@option:option(pharos@alert:window_spec()).
window_spec(Threshold) ->
    case Threshold of
        {windowed, Over, Window_ms, Mode} ->
            {some,
                {window_spec,
                    Window_ms,
                    pharos@threshold_eval:threshold_limit(Over),
                    Mode}};

        _ ->
            none
    end.

-file("src/pharos.gleam", 69).
?DOC(
    " Boot pharos from `config`.\n"
    "\n"
    " Generates stable registered names for the event bus and one per\n"
    " threshold, builds the supervision tree, then attaches a single\n"
    " telemetry handler that decodes events and routes\n"
    " `breach`/`recover` casts to the matching alert managers.\n"
).
-spec start_link(pharos@config:config()) -> {ok,
        gleam@otp@actor:started(pharos())} |
    {error, start_error()}.
start_link(Config) ->
    Bus_name = gleam_erlang_ffi:new_name(<<"pharos_event_bus"/utf8>>),
    Beam_lane = gleam@list:map(
        erlang:element(3, Config),
        fun(Threshold) ->
            Id = pharos@config:threshold_id(Threshold),
            {Threshold,
                gleam_erlang_ffi:new_name(<<"pharos_alert_"/utf8, Id/binary>>)}
        end
    ),
    Probe_lane = gleam@list:map(
        erlang:element(5, Config),
        fun(Threshold@1) ->
            Id@1 = pharos@probe:threshold_id(Threshold@1),
            {Threshold@1,
                gleam_erlang_ffi:new_name(<<"pharos_probe_"/utf8, Id@1/binary>>)}
        end
    ),
    Manager_specs = lists:append(
        gleam@list:map(
            Beam_lane,
            fun(Pair) ->
                {Threshold@2, Name} = Pair,
                Window = window_spec(Threshold@2),
                Soak_period_ms = case Window of
                    {some, _} ->
                        0;

                    none ->
                        erlang:element(8, Config)
                end,
                {alert_manager_spec,
                    {alert_data,
                        pharos@config:threshold_id(Threshold@2),
                        erlang:element(10, Config),
                        Soak_period_ms,
                        erlang:element(9, Config),
                        Window},
                    Name}
            end
        ),
        gleam@list:map(
            Probe_lane,
            fun(Pair@1) ->
                {Threshold@3, Name@1} = Pair@1,
                {alert_manager_spec,
                    {alert_data,
                        pharos@probe:threshold_id(Threshold@3),
                        erlang:element(5, Threshold@3),
                        erlang:element(8, Config),
                        erlang:element(9, Config),
                        none},
                    Name@1}
            end
        )
    ),
    Buffer_name = gleam_erlang_ffi:new_name(<<"pharos_hot_buffer"/utf8>>),
    Table = pharos_ffi:name_to_atom(Buffer_name),
    Buffer = pharos@internal@hot_buffer:from_table(Table),
    Buffer_child = pharos@internal@hot_buffer:child_spec(
        Table,
        erlang:element(11, Config)
    ),
    {Spillover_handle, Spillover_child} = case {erlang:element(12, Config),
        erlang:element(13, Config)} of
        {{some, _}, {some, Path}} ->
            Spillover_name = gleam_erlang_ffi:new_name(
                <<"pharos_spillover"/utf8>>
            ),
            Spillover_table = pharos_ffi:name_to_atom(Spillover_name),
            Handle = pharos@internal@spillover:from_table(Spillover_table),
            {{some, Handle},
                {some,
                    pharos@internal@spillover:child_spec(Spillover_table, Path)}};

        {_, _} ->
            {none, none}
    end,
    Connection_child = case erlang:element(12, Config) of
        none ->
            none;

        {some, Brain} ->
            {some,
                pharos@internal@connection:child_spec(
                    gleam_erlang_ffi:new_name(<<"pharos_connection"/utf8>>),
                    pharos@internal@connection:brain_transport(
                        erlang:element(2, Brain),
                        erlang:element(3, Brain)
                    ),
                    Buffer,
                    Spillover_handle,
                    {connection_config,
                        1000,
                        30000,
                        erlang:element(11, Config) div 2,
                        1000}
                )}
    end,
    case pharos@internal@supervisor:start_link(
        Config,
        Bus_name,
        Manager_specs,
        Buffer_child,
        Spillover_child,
        Connection_child
    ) of
        {error, Error} ->
            {error, {supervisor_start_failed, format_actor_error(Error)}};

        {ok, Started} ->
            Bus = pharos@event_bus:from_name(Bus_name),
            Handler_id = erlang:binary_to_atom(<<"pharos_handler"/utf8>>),
            Manager_handles = gleam@list:map(
                Beam_lane,
                fun(Pair@2) ->
                    {Threshold@4, Name@2} = Pair@2,
                    {Threshold@4, pharos@alert_manager:from_name(Name@2)}
                end
            ),
            Probe_handles = gleam@list:map(
                Probe_lane,
                fun(Pair@3) ->
                    {Threshold@5, Name@3} = Pair@3,
                    {Threshold@5, pharos@alert_manager:from_name(Name@3)}
                end
            ),
            case pharos@internal@telemetry:attach(
                Config,
                Manager_handles,
                Probe_handles,
                erlang:element(4, Config),
                Buffer,
                Handler_id
            ) of
                {ok, nil} ->
                    _ = pharos@sink:attach_all(Bus, erlang:element(6, Config)),
                    {ok,
                        {started,
                            erlang:element(2, Started),
                            {pharos,
                                erlang:element(2, Started),
                                Bus,
                                Handler_id}}};

                {error, Attach_error} ->
                    pharos_ffi:shutdown_supervisor(erlang:element(2, Started)),
                    {error,
                        {telemetry_attach_failed,
                            format_attach_error(Attach_error)}}
            end
    end.

-file("src/pharos.gleam", 226).
?DOC(
    " Subscribe `on_event` to the alert event bus. Returns an opaque handle\n"
    " that can be passed to `unsubscribe`.\n"
).
-spec subscribe(pharos(), fun((pharos@alert:alert_event()) -> nil)) -> {ok,
        pharos@event_bus:handler_id()} |
    {error, eparch@event_manager:add_error(nil, nil)}.
subscribe(Pharos, On_event) ->
    pharos@event_bus:add_handler(erlang:element(3, Pharos), On_event).

-file("src/pharos.gleam", 234).
?DOC(" Unsubscribe a previously registered handler.\n").
-spec unsubscribe(pharos(), pharos@event_bus:handler_id()) -> {ok, nil} |
    {error, eparch@event_manager:remove_error(nil, nil)}.
unsubscribe(Pharos, Handler) ->
    pharos@event_bus:remove_handler(erlang:element(3, Pharos), Handler).

-file("src/pharos.gleam", 244).
?DOC(
    " Stop pharos: detach the telemetry handler, then shut down the\n"
    " supervision tree. Safe to call multiple times - extra calls become\n"
    " no-ops once the supervisor is already gone.\n"
).
-spec stop(pharos()) -> nil.
stop(Pharos) ->
    _ = pharos@internal@telemetry:detach(erlang:element(4, Pharos)),
    pharos_ffi:shutdown_supervisor(erlang:element(2, Pharos)).