Skip to main content

src/pharos@alert_manager.erl

-module(pharos@alert_manager).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/alert_manager.gleam").
-export([from_name/1, start_link/3, breach/1, recover/1, sample/2]).
-export_type([alert_manager/0, message/0, data/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " Per-threshold alert state machine.\n"
    "\n"
    " Wraps an `eparch/state_machine` running the four-state lifecycle\n"
    "  `Clear -> Pending -> Firing -> Cooling -> Clear`. \n"
    " The state machine is registered under a deterministic `Name` so \n"
    " that supervisor restarts.\n"
    " keep `AlertManager` handles valid: callers send `Breach` / `Recover`\n"
    " casts to the registered name, which the OTP runtime resolves to the\n"
    " current pid on each call.\n"
    "\n"
    " `breach/1` and `recover/1` map physical signals (threshold crossed\n"
    " up / down) to state transitions. The dispatcher is notified with\n"
    " `AlertFiring` and `AlertResolved` events on the relevant transitions.\n"
).

-opaque alert_manager() :: {alert_manager, gleam@erlang@process:name(message())}.

-type message() :: breach | recover | {sample, float()}.

-type data() :: {data,
        binary(),
        pharos@alert:alert_level(),
        integer(),
        integer(),
        pharos@event_bus:event_bus(),
        gleam@option:option(pharos@alert:window_spec()),
        list({integer(), float()})}.

-file("src/pharos/alert_manager.gleam", 58).
?DOC(
    " Wrap a registered name as an `AlertManager`. The supervisor uses this\n"
    " to build handles before the underlying state machine has been started,\n"
    " so that the handler-attacher can route breach/recover signals to a\n"
    " stable address.\n"
).
-spec from_name(gleam@erlang@process:name(message())) -> alert_manager().
from_name(Name) ->
    {alert_manager, Name}.

-file("src/pharos/alert_manager.gleam", 233).
-spec fire(data()) -> nil.
fire(Data) ->
    pharos@event_bus:notify(
        erlang:element(6, Data),
        {alert_firing,
            erlang:element(2, Data),
            erlang:element(3, Data),
            pharos_ffi:collect_diagnostic()}
    ).

-file("src/pharos/alert_manager.gleam", 215).
-spec handle_state_timeout(pharos@alert:alert_state(), data()) -> eparch@state_machine:step(pharos@alert:alert_state(), data(), message(), any()).
handle_state_timeout(State, Data) ->
    case State of
        clear ->
            eparch@state_machine:keep_state(Data, []);

        pending ->
            fire(Data),
            eparch@state_machine:next_state(firing, Data, []);

        firing ->
            eparch@state_machine:keep_state(Data, []);

        cooling ->
            pharos@event_bus:notify(
                erlang:element(6, Data),
                {alert_resolved, erlang:element(2, Data)}
            ),
            eparch@state_machine:next_state(clear, Data, [])
    end.

-file("src/pharos/alert_manager.gleam", 169).
-spec handle_recover(pharos@alert:alert_state(), data()) -> eparch@state_machine:step(pharos@alert:alert_state(), data(), message(), any()).
handle_recover(State, Data) ->
    case State of
        clear ->
            eparch@state_machine:keep_state(Data, []);

        pending ->
            eparch@state_machine:next_state(clear, Data, []);

        firing ->
            eparch@state_machine:next_state(
                cooling,
                Data,
                [{state_timeout, {'after', erlang:element(5, Data)}, breach}]
            );

        cooling ->
            eparch@state_machine:keep_state(Data, [])
    end.

-file("src/pharos/alert_manager.gleam", 138).
-spec handle_breach(pharos@alert:alert_state(), data()) -> eparch@state_machine:step(pharos@alert:alert_state(), data(), message(), any()).
handle_breach(State, Data) ->
    case State of
        clear ->
            case erlang:element(4, Data) of
                0 ->
                    fire(Data),
                    eparch@state_machine:next_state(firing, Data, []);

                Milliseconds ->
                    eparch@state_machine:next_state(
                        pending,
                        Data,
                        [{state_timeout, {'after', Milliseconds}, breach}]
                    )
            end;

        pending ->
            eparch@state_machine:keep_state(Data, []);

        firing ->
            eparch@state_machine:keep_state(Data, []);

        cooling ->
            fire(Data),
            eparch@state_machine:next_state(firing, Data, [])
    end.

-file("src/pharos/alert_manager.gleam", 191).
?DOC(
    " Fold a windowed sample into the bounded window, then drive the same\n"
    " soak/cool transitions as a per-tick rule using the aggregate verdict.\n"
    " Inert when no window is configured (a windowed signal to a per-tick rule).\n"
).
-spec handle_sample(pharos@alert:alert_state(), data(), float()) -> eparch@state_machine:step(pharos@alert:alert_state(), data(), message(), any()).
handle_sample(State, Data, Value) ->
    case erlang:element(7, Data) of
        none ->
            eparch@state_machine:keep_state(Data, []);

        {some, Spec} ->
            Now = pharos_ffi:now_ms(),
            Samples = pharos@alert:prune_samples(
                [{Now, Value} | erlang:element(8, Data)],
                Now,
                erlang:element(2, Spec)
            ),
            Data@1 = {data,
                erlang:element(2, Data),
                erlang:element(3, Data),
                erlang:element(4, Data),
                erlang:element(5, Data),
                erlang:element(6, Data),
                erlang:element(7, Data),
                Samples},
            case pharos@alert:window_breached(Samples, Now, Spec) of
                true ->
                    handle_breach(State, Data@1);

                false ->
                    handle_recover(State, Data@1)
            end
    end.

-file("src/pharos/alert_manager.gleam", 115).
-spec handle_event(
    eparch@state_machine:event(pharos@alert:alert_state(), message(), HXU),
    pharos@alert:alert_state(),
    data()
) -> eparch@state_machine:step(pharos@alert:alert_state(), data(), message(), HXU).
handle_event(Event, State, Data) ->
    case Event of
        {cast, breach} ->
            handle_breach(State, Data);

        {cast, recover} ->
            handle_recover(State, Data);

        {cast, {sample, Value}} ->
            handle_sample(State, Data, Value);

        {timeout, state_timeout_type, _} ->
            handle_state_timeout(State, Data);

        {timeout, event_timeout_type, _} ->
            eparch@state_machine:keep_state(Data, []);

        {timeout, {generic_timeout_type, _}, _} ->
            eparch@state_machine:keep_state(Data, []);

        {info, _} ->
            eparch@state_machine:keep_state(Data, []);

        {call, _, _} ->
            eparch@state_machine:keep_state(Data, []);

        {enter, _} ->
            eparch@state_machine:keep_state(Data, [])
    end.

-file("src/pharos/alert_manager.gleam", 65).
?DOC(
    " Start a state machine for `data`, registering it under `name`. Notifies\n"
    " `bus` with `AlertFiring` / `AlertResolved` events on the relevant\n"
    " transitions.\n"
).
-spec start_link(
    pharos@alert:alert_data(),
    pharos@event_bus:event_bus(),
    gleam@erlang@process:name(message())
) -> {ok, alert_manager()} | {error, eparch@state_machine:start_error()}.
start_link(Data, Bus, Name) ->
    {alert_data, Id, Level, Soak_period_ms, Cool_period_ms, Window} = Data,
    Initial = {data, Id, Level, Soak_period_ms, Cool_period_ms, Bus, Window, []},
    Result = begin
        _pipe = eparch@state_machine:new(clear, Initial),
        _pipe@1 = eparch@state_machine:named(_pipe, {local, Name}),
        _pipe@2 = eparch@state_machine:on_event(_pipe@1, fun handle_event/3),
        eparch@state_machine:start_link(_pipe@2)
    end,
    case Result of
        {ok, _} ->
            {ok, {alert_manager, Name}};

        {error, Error} ->
            {error, Error}
    end.

-file("src/pharos/alert_manager.gleam", 111).
?DOC(" Resolve a manager's registered name to a `ServerRef` for casting.\n").
-spec server_ref(alert_manager()) -> eparch@state_machine:server_ref(message()).
server_ref(Manager) ->
    statem_ffi:ref_from_subject(
        gleam@erlang@process:named_subject(erlang:element(2, Manager))
    ).

-file("src/pharos/alert_manager.gleam", 95).
?DOC(" Signal that the threshold has been breached.\n").
-spec breach(alert_manager()) -> nil.
breach(Manager) ->
    statem_ffi:cast(server_ref(Manager), breach).

-file("src/pharos/alert_manager.gleam", 100).
?DOC(" Signal that the threshold is no longer breached.\n").
-spec recover(alert_manager()) -> nil.
recover(Manager) ->
    statem_ffi:cast(server_ref(Manager), recover).

-file("src/pharos/alert_manager.gleam", 106).
?DOC(
    " Feed a sample to a windowed rule. The manager folds it into its window and\n"
    " derives breach/recover from the aggregate.\n"
).
-spec sample(alert_manager(), float()) -> nil.
sample(Manager, Value) ->
    statem_ffi:cast(server_ref(Manager), {sample, Value}).