Skip to main content

src/pharos@sink.erl

-module(pharos@sink).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/sink.gleam").
-export([json_body/1, render/1, handler_for/1, attach_all/2]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " The pluggable sink dispatcher\n"
    "\n"
    " Each `AlertSink` configured on the `Config` becomes one handler on the\n"
    " alert event bus. The collection/detection logic stays decoupled from the\n"
    " transport: it only ever `notify`s the bus, and every configured sink\n"
    " receives the `AlertEvent` independently. A slow or failing sink cannot\n"
    " block the others (handlers are fire-and-forget).\n"
    "\n"
    " Sinks:\n"
    " - **Console** logs locally via `logging` (NixOS `journalctl`).\n"
    " - **Webhook** POSTs a JSON body (e.g. to Slack), the direct-notification\n"
    "   path when the Brain is unreachable.\n"
    " - **Brain** ETF-encodes the alert and sends it to a registered process on\n"
    "   a (possibly remote) node for native BEAM-to-BEAM delivery.\n"
    " - **Otlp** is a documented stub until the Protobuf/gRPC exporter lands.\n"
).

-file("src/pharos/sink.gleam", 98).
-spec level_to_string(pharos@alert:alert_level()) -> binary().
level_to_string(Level) ->
    case Level of
        warning ->
            <<"warning"/utf8>>;

        critical ->
            <<"critical"/utf8>>
    end.

-file("src/pharos/sink.gleam", 84).
-spec fields(pharos@alert:alert_event()) -> gleam@dict:dict(binary(), binary()).
fields(Event) ->
    case Event of
        {alert_firing, Id, Level, Diagnostic} ->
            maps:from_list(
                [{<<"status"/utf8>>, <<"firing"/utf8>>},
                    {<<"id"/utf8>>, Id},
                    {<<"level"/utf8>>, level_to_string(Level)},
                    {<<"diagnostic"/utf8>>, Diagnostic}]
            );

        {alert_resolved, Id@1} ->
            maps:from_list(
                [{<<"status"/utf8>>, <<"resolved"/utf8>>},
                    {<<"id"/utf8>>, Id@1}]
            )
    end.

-file("src/pharos/sink.gleam", 80).
?DOC(
    " JSON body for the webhook sink. Built from a string->string map and encoded\n"
    " with Erlang's built-in `json` module (OTP 27+), which handles escaping.\n"
).
-spec json_body(pharos@alert:alert_event()) -> binary().
json_body(Event) ->
    pharos_ffi:json_encode(fields(Event)).

-file("src/pharos/sink.gleam", 70).
?DOC(" Human-readable one-line rendering for the console sink.\n").
-spec render(pharos@alert:alert_event()) -> binary().
render(Event) ->
    case Event of
        {alert_firing, Id, Level, Diagnostic} ->
            <<<<<<<<<<"FIRING ["/utf8, (level_to_string(Level))/binary>>/binary,
                            "] "/utf8>>/binary,
                        Id/binary>>/binary,
                    " - "/utf8>>/binary,
                Diagnostic/binary>>;

        {alert_resolved, Id@1} ->
            <<"RESOLVED "/utf8, Id@1/binary>>
    end.

-file("src/pharos/sink.gleam", 40).
?DOC(" Build the bus handler closure for a single sink.\n").
-spec handler_for(pharos@config:alert_sink()) -> fun((pharos@alert:alert_event()) -> nil).
handler_for(Sink) ->
    case Sink of
        {console, Level} ->
            fun(Event) -> logging:log(Level, render(Event)) end;

        {webhook, Url} ->
            fun(Event@1) ->
                _ = pharos_ffi:webhook_post(Url, json_body(Event@1)),
                nil
            end;

        {brain, Node, Name} ->
            Target = pharos_ffi:brain_target(Node, Name),
            fun(Event@2) ->
                _ = pharos_ffi:brain_deliver(
                    Target,
                    pharos_ffi:encode_etf(Event@2)
                ),
                nil
            end;

        {otlp, Endpoint} ->
            fun(_) ->
                logging:log(
                    debug,
                    <<"pharos otlp sink not yet implemented; dropping alert for "/utf8,
                        Endpoint/binary>>
                )
            end
    end.

-file("src/pharos/sink.gleam", 32).
?DOC(
    " Attach one bus handler per configured sink. Returns the handler ids of the\n"
    " sinks that attached successfully (a failed attach is skipped, not fatal:\n"
    " the agent still runs with the sinks that did attach).\n"
).
-spec attach_all(pharos@event_bus:event_bus(), list(pharos@config:alert_sink())) -> list(pharos@event_bus:handler_id()).
attach_all(Bus, Sinks) ->
    gleam@list:filter_map(
        Sinks,
        fun(Sink) ->
            _pipe = pharos@event_bus:add_handler(Bus, handler_for(Sink)),
            gleam@result:replace_error(_pipe, nil)
        end
    ).