src/rally_runtime@system.erl

-module(rally_runtime@system).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/rally_runtime/system.gleam").
-export([open/1, log_to_server/7, log_to_client/6, log_broadcast/3, start/1, get_conn/0, start_with_jobs/2, enqueue/3, enqueue_in/3, enqueue_now/2]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " System database: observability and background jobs.\n"
    "\n"
    " Opens a separate SQLite database (system.db) for message logging and\n"
    " the job queue. The connection is stored globally so any WS handler\n"
    " process can log messages. Uses synchronous=OFF because losing a few\n"
    " recent log entries on crash is acceptable for the throughput gain.\n"
).

-file("src/rally_runtime/system.gleam", 63).
-spec ensure_column(sqlight:connection(), binary(), binary(), binary()) -> {ok,
        nil} |
    {error, sqlight:error()}.
ensure_column(Conn, Table, Column, Ddl) ->
    case sqlight:'query'(
        <<"SELECT name FROM pragma_table_info(?1) WHERE name = ?2"/utf8>>,
        Conn,
        [sqlight:text(Table), sqlight:text(Column)],
        begin
            gleam@dynamic@decode:field(
                0,
                {decoder, fun gleam@dynamic@decode:decode_string/1},
                fun(Name) -> gleam@dynamic@decode:success(Name) end
            )
        end
    ) of
        {ok, [_]} ->
            {ok, nil};

        {ok, _} ->
            sqlight:exec(Ddl, Conn);

        {error, E} ->
            {error, E}
    end.

-file("src/rally_runtime/system.gleam", 19).
-spec open(binary()) -> {ok, sqlight:connection()} | {error, sqlight:error()}.
open(Path) ->
    gleam@result:'try'(
        sqlight:open(Path),
        fun(Conn) ->
            gleam@result:'try'(
                sqlight:exec(<<"PRAGMA journal_mode=WAL;"/utf8>>, Conn),
                fun(_) ->
                    gleam@result:'try'(
                        sqlight:exec(<<"PRAGMA synchronous=OFF;"/utf8>>, Conn),
                        fun(_) ->
                            gleam@result:'try'(
                                sqlight:exec(
                                    <<"PRAGMA busy_timeout=5000;"/utf8>>,
                                    Conn
                                ),
                                fun(_) ->
                                    gleam@result:'try'(
                                        sqlight:exec(
                                            <<"CREATE TABLE IF NOT EXISTS messages (
  id INTEGER PRIMARY KEY,
  timestamp INTEGER NOT NULL,
  session_id TEXT NOT NULL,
  user_id INTEGER,
  page TEXT NOT NULL,
  direction TEXT NOT NULL,
  variant TEXT NOT NULL,
  payload BLOB,
  elapsed_ms INTEGER
);
CREATE TABLE IF NOT EXISTS jobs (
  id INTEGER PRIMARY KEY,
  name TEXT NOT NULL,
  payload BLOB NOT NULL,
  run_at INTEGER NOT NULL,
  attempts INTEGER NOT NULL DEFAULT 0,
  status TEXT NOT NULL DEFAULT 'pending',
  last_error TEXT,
  created_at INTEGER NOT NULL DEFAULT (unixepoch()),
  claimed_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id);
CREATE INDEX IF NOT EXISTS idx_messages_user ON messages(user_id);
CREATE INDEX IF NOT EXISTS idx_messages_page ON messages(page);
CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_jobs_status_run_at ON jobs(status, run_at);"/utf8>>,
                                            Conn
                                        ),
                                        fun(_) ->
                                            gleam@result:'try'(
                                                ensure_column(
                                                    Conn,
                                                    <<"jobs"/utf8>>,
                                                    <<"claimed_at"/utf8>>,
                                                    <<"ALTER TABLE jobs ADD COLUMN claimed_at INTEGER;"/utf8>>
                                                ),
                                                fun(_) -> {ok, Conn} end
                                            )
                                        end
                                    )
                                end
                            )
                        end
                    )
                end
            )
        end
    ).

-file("src/rally_runtime/system.gleam", 163).
-spec nullable_int({ok, integer()} | {error, nil}) -> sqlight:value().
nullable_int(Val) ->
    case Val of
        {ok, N} ->
            sqlight:int(N);

        _ ->
            sqlight_ffi:null()
    end.

-file("src/rally_runtime/system.gleam", 170).
-spec unix_seconds() -> integer().
unix_seconds() ->
    {Seconds, _} = gleam@time@timestamp:to_unix_seconds_and_nanoseconds(
        gleam@time@timestamp:system_time()
    ),
    Seconds.

-file("src/rally_runtime/system.gleam", 86).
-spec log_to_server(
    sqlight:connection(),
    binary(),
    {ok, integer()} | {error, nil},
    binary(),
    binary(),
    bitstring(),
    integer()
) -> nil.
log_to_server(
    Db,
    Session_id,
    User_id,
    Page,
    Variant_name,
    Raw_payload,
    Elapsed_ms
) ->
    Now = unix_seconds(),
    _ = sqlight:'query'(
        <<"INSERT INTO messages (timestamp, session_id, user_id, page, direction, variant, payload, elapsed_ms)
       VALUES (?1, ?2, ?3, ?4, 'to_server', ?5, ?6, ?7)"/utf8>>,
        Db,
        [sqlight:int(Now),
            sqlight:text(Session_id),
            nullable_int(User_id),
            sqlight:text(Page),
            sqlight:text(Variant_name),
            sqlight_ffi:coerce_blob(Raw_payload),
            sqlight:int(Elapsed_ms)],
        gleam@dynamic@decode:success(nil)
    ),
    nil.

-file("src/rally_runtime/system.gleam", 115).
-spec log_to_client(
    sqlight:connection(),
    binary(),
    {ok, integer()} | {error, nil},
    binary(),
    binary(),
    integer()
) -> nil.
log_to_client(Db, Session_id, User_id, Page, Variant, Elapsed_ms) ->
    Now = unix_seconds(),
    _ = sqlight:'query'(
        <<"INSERT INTO messages (timestamp, session_id, user_id, page, direction, variant, elapsed_ms)
       VALUES (?1, ?2, ?3, ?4, 'to_client', ?5, ?6)"/utf8>>,
        Db,
        [sqlight:int(Now),
            sqlight:text(Session_id),
            nullable_int(User_id),
            sqlight:text(Page),
            sqlight:text(Variant),
            sqlight:int(Elapsed_ms)],
        gleam@dynamic@decode:success(nil)
    ),
    nil.

-file("src/rally_runtime/system.gleam", 142).
-spec log_broadcast(sqlight:connection(), binary(), binary()) -> nil.
log_broadcast(Db, Page, Variant) ->
    Now = unix_seconds(),
    _ = sqlight:'query'(
        <<"INSERT INTO messages (timestamp, session_id, user_id, page, direction, variant)
       VALUES (?1, '', NULL, ?2, 'broadcast', ?3)"/utf8>>,
        Db,
        [sqlight:int(Now), sqlight:text(Page), sqlight:text(Variant)],
        gleam@dynamic@decode:success(nil)
    ),
    nil.

-file("src/rally_runtime/system.gleam", 269).
-spec message_count(sqlight:connection()) -> integer().
message_count(Db) ->
    case sqlight:'query'(
        <<"SELECT COUNT(*) FROM messages"/utf8>>,
        Db,
        [],
        (gleam@dynamic@decode:at(
            [0],
            {decoder, fun gleam@dynamic@decode:decode_int/1}
        ))
    ) of
        {ok, [Count]} ->
            Count;

        _ ->
            0
    end.

-file("src/rally_runtime/system.gleam", 178).
?DOC(
    " Call during app startup. Opens the system DB and stores the connection\n"
    " globally so any process (WS handlers) can access it.\n"
).
-spec start(binary()) -> nil.
start(Path) ->
    case open(Path) of
        {ok, Conn} ->
            _ = global_value:create_with_unique_name(
                <<"rally_system_db"/utf8>>,
                fun() -> Conn end
            ),
            rally_runtime_ffi:store_system_conn(Conn),
            logging:log(info, <<"System DB opened: "/utf8, Path/binary>>),
            Count = message_count(Conn),
            logging:log(
                info,
                <<<<"System: "/utf8, (erlang:integer_to_binary(Count))/binary>>/binary,
                    " messages recorded"/utf8>>
            ),
            nil;

        {error, Err} ->
            logging:log(
                warning,
                <<"Failed to open system DB: "/utf8,
                    (erlang:element(3, Err))/binary>>
            ),
            nil
    end.

-file("src/rally_runtime/system.gleam", 260).
-spec get_conn() -> {ok, sqlight:connection()} | {error, nil}.
get_conn() ->
    rally_runtime_ffi:get_system_conn().

-file("src/rally_runtime/system.gleam", 205).
?DOC(" Start the system DB with a background job runner.\n").
-spec start_with_jobs(
    binary(),
    fun((binary(), bitstring()) -> {ok, nil} | {error, binary()})
) -> nil.
start_with_jobs(Path, Handler) ->
    start(Path),
    case get_conn() of
        {ok, Conn} ->
            case rally_runtime@jobs:start_runner(Conn, Handler) of
                {ok, _} ->
                    logging:log(info, <<"Job runner started"/utf8>>);

                _ ->
                    logging:log(warning, <<"Failed to start job runner"/utf8>>)
            end;

        _ ->
            logging:log(warning, <<"System DB not available"/utf8>>)
    end.

-file("src/rally_runtime/system.gleam", 222).
?DOC(" Enqueue a job to run at a specific time.\n").
-spec enqueue(binary(), bitstring(), integer()) -> nil.
enqueue(Name, Payload, Run_at) ->
    case get_conn() of
        {ok, Conn} ->
            rally_runtime@jobs:enqueue(Conn, Name, Payload, Run_at);

        _ ->
            nil
    end.

-file("src/rally_runtime/system.gleam", 235).
?DOC(" Enqueue a job to run after a delay.\n").
-spec enqueue_in(binary(), bitstring(), integer()) -> nil.
enqueue_in(Name, Payload, Delay_seconds) ->
    case get_conn() of
        {ok, Conn} ->
            rally_runtime@jobs:enqueue_in(Conn, Name, Payload, Delay_seconds);

        _ ->
            nil
    end.

-file("src/rally_runtime/system.gleam", 253).
?DOC(" Enqueue a job to run immediately.\n").
-spec enqueue_now(binary(), bitstring()) -> nil.
enqueue_now(Name, Payload) ->
    case get_conn() of
        {ok, Conn} ->
            rally_runtime@jobs:enqueue(Conn, Name, Payload, 0);

        _ ->
            nil
    end.