-module(pharos@sink).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/sink.gleam").
-export([json_body/1, render/1, handler_for/1, attach_all/2]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" The pluggable sink dispatcher\n"
"\n"
" Each `AlertSink` configured on the `Config` becomes one handler on the\n"
" alert event bus. The collection/detection logic stays decoupled from the\n"
" transport: it only ever `notify`s the bus, and every configured sink\n"
" receives the `AlertEvent` independently. A slow or failing sink cannot\n"
" block the others (handlers are fire-and-forget).\n"
"\n"
" Sinks:\n"
" - **Console** logs locally via `logging` (NixOS `journalctl`).\n"
" - **Webhook** POSTs a JSON body (e.g. to Slack), the direct-notification\n"
" path when the Brain is unreachable.\n"
" - **Brain** ETF-encodes the alert and sends it to a registered process on\n"
" a (possibly remote) node for native BEAM-to-BEAM delivery.\n"
" - **Otlp** is a documented stub until the Protobuf/gRPC exporter lands.\n"
).
-file("src/pharos/sink.gleam", 98).
-spec level_to_string(pharos@alert:alert_level()) -> binary().
level_to_string(Level) ->
case Level of
warning ->
<<"warning"/utf8>>;
critical ->
<<"critical"/utf8>>
end.
-file("src/pharos/sink.gleam", 84).
-spec fields(pharos@alert:alert_event()) -> gleam@dict:dict(binary(), binary()).
fields(Event) ->
case Event of
{alert_firing, Id, Level, Diagnostic} ->
maps:from_list(
[{<<"status"/utf8>>, <<"firing"/utf8>>},
{<<"id"/utf8>>, Id},
{<<"level"/utf8>>, level_to_string(Level)},
{<<"diagnostic"/utf8>>, Diagnostic}]
);
{alert_resolved, Id@1} ->
maps:from_list(
[{<<"status"/utf8>>, <<"resolved"/utf8>>},
{<<"id"/utf8>>, Id@1}]
)
end.
-file("src/pharos/sink.gleam", 80).
?DOC(
" JSON body for the webhook sink. Built from a string->string map and encoded\n"
" with Erlang's built-in `json` module (OTP 27+), which handles escaping.\n"
).
-spec json_body(pharos@alert:alert_event()) -> binary().
json_body(Event) ->
pharos_ffi:json_encode(fields(Event)).
-file("src/pharos/sink.gleam", 70).
?DOC(" Human-readable one-line rendering for the console sink.\n").
-spec render(pharos@alert:alert_event()) -> binary().
render(Event) ->
case Event of
{alert_firing, Id, Level, Diagnostic} ->
<<<<<<<<<<"FIRING ["/utf8, (level_to_string(Level))/binary>>/binary,
"] "/utf8>>/binary,
Id/binary>>/binary,
" - "/utf8>>/binary,
Diagnostic/binary>>;
{alert_resolved, Id@1} ->
<<"RESOLVED "/utf8, Id@1/binary>>
end.
-file("src/pharos/sink.gleam", 40).
?DOC(" Build the bus handler closure for a single sink.\n").
-spec handler_for(pharos@config:alert_sink()) -> fun((pharos@alert:alert_event()) -> nil).
handler_for(Sink) ->
case Sink of
{console, Level} ->
fun(Event) -> logging:log(Level, render(Event)) end;
{webhook, Url} ->
fun(Event@1) ->
_ = pharos_ffi:webhook_post(Url, json_body(Event@1)),
nil
end;
{brain, Node, Name} ->
Target = pharos_ffi:brain_target(Node, Name),
fun(Event@2) ->
_ = pharos_ffi:brain_deliver(
Target,
pharos_ffi:encode_etf(Event@2)
),
nil
end;
{otlp, Endpoint} ->
fun(_) ->
logging:log(
debug,
<<"pharos otlp sink not yet implemented; dropping alert for "/utf8,
Endpoint/binary>>
)
end
end.
-file("src/pharos/sink.gleam", 32).
?DOC(
" Attach one bus handler per configured sink. Returns the handler ids of the\n"
" sinks that attached successfully (a failed attach is skipped, not fatal:\n"
" the agent still runs with the sinks that did attach).\n"
).
-spec attach_all(pharos@event_bus:event_bus(), list(pharos@config:alert_sink())) -> list(pharos@event_bus:handler_id()).
attach_all(Bus, Sinks) ->
gleam@list:filter_map(
Sinks,
fun(Sink) ->
_pipe = pharos@event_bus:add_handler(Bus, handler_for(Sink)),
gleam@result:replace_error(_pipe, nil)
end
).