-module(rally_runtime@system).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/rally_runtime/system.gleam").
-export([open/1, log_to_server/7, log_to_client/6, log_broadcast/3, start/1, get_conn/0, start_with_jobs/2, enqueue/3, enqueue_in/3, enqueue_now/2]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" System database: observability and background jobs.\n"
"\n"
" Opens a separate SQLite database (system.db) for message logging and\n"
" the job queue. The connection is stored globally so any WS handler\n"
" process can log messages. Uses synchronous=OFF because losing a few\n"
" recent log entries on crash is acceptable for the throughput gain.\n"
).
-file("src/rally_runtime/system.gleam", 63).
-spec ensure_column(sqlight:connection(), binary(), binary(), binary()) -> {ok,
nil} |
{error, sqlight:error()}.
ensure_column(Conn, Table, Column, Ddl) ->
case sqlight:'query'(
<<"SELECT name FROM pragma_table_info(?1) WHERE name = ?2"/utf8>>,
Conn,
[sqlight:text(Table), sqlight:text(Column)],
begin
gleam@dynamic@decode:field(
0,
{decoder, fun gleam@dynamic@decode:decode_string/1},
fun(Name) -> gleam@dynamic@decode:success(Name) end
)
end
) of
{ok, [_]} ->
{ok, nil};
{ok, _} ->
sqlight:exec(Ddl, Conn);
{error, E} ->
{error, E}
end.
-file("src/rally_runtime/system.gleam", 19).
-spec open(binary()) -> {ok, sqlight:connection()} | {error, sqlight:error()}.
open(Path) ->
gleam@result:'try'(
sqlight:open(Path),
fun(Conn) ->
gleam@result:'try'(
sqlight:exec(<<"PRAGMA journal_mode=WAL;"/utf8>>, Conn),
fun(_) ->
gleam@result:'try'(
sqlight:exec(<<"PRAGMA synchronous=OFF;"/utf8>>, Conn),
fun(_) ->
gleam@result:'try'(
sqlight:exec(
<<"PRAGMA busy_timeout=5000;"/utf8>>,
Conn
),
fun(_) ->
gleam@result:'try'(
sqlight:exec(
<<"CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY,
timestamp INTEGER NOT NULL,
session_id TEXT NOT NULL,
user_id INTEGER,
page TEXT NOT NULL,
direction TEXT NOT NULL,
variant TEXT NOT NULL,
payload BLOB,
elapsed_ms INTEGER
);
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
payload BLOB NOT NULL,
run_at INTEGER NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'pending',
last_error TEXT,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
claimed_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id);
CREATE INDEX IF NOT EXISTS idx_messages_user ON messages(user_id);
CREATE INDEX IF NOT EXISTS idx_messages_page ON messages(page);
CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_jobs_status_run_at ON jobs(status, run_at);"/utf8>>,
Conn
),
fun(_) ->
gleam@result:'try'(
ensure_column(
Conn,
<<"jobs"/utf8>>,
<<"claimed_at"/utf8>>,
<<"ALTER TABLE jobs ADD COLUMN claimed_at INTEGER;"/utf8>>
),
fun(_) -> {ok, Conn} end
)
end
)
end
)
end
)
end
)
end
).
-file("src/rally_runtime/system.gleam", 163).
-spec nullable_int({ok, integer()} | {error, nil}) -> sqlight:value().
nullable_int(Val) ->
case Val of
{ok, N} ->
sqlight:int(N);
_ ->
sqlight_ffi:null()
end.
-file("src/rally_runtime/system.gleam", 170).
-spec unix_seconds() -> integer().
unix_seconds() ->
{Seconds, _} = gleam@time@timestamp:to_unix_seconds_and_nanoseconds(
gleam@time@timestamp:system_time()
),
Seconds.
-file("src/rally_runtime/system.gleam", 86).
-spec log_to_server(
sqlight:connection(),
binary(),
{ok, integer()} | {error, nil},
binary(),
binary(),
bitstring(),
integer()
) -> nil.
log_to_server(
Db,
Session_id,
User_id,
Page,
Variant_name,
Raw_payload,
Elapsed_ms
) ->
Now = unix_seconds(),
_ = sqlight:'query'(
<<"INSERT INTO messages (timestamp, session_id, user_id, page, direction, variant, payload, elapsed_ms)
VALUES (?1, ?2, ?3, ?4, 'to_server', ?5, ?6, ?7)"/utf8>>,
Db,
[sqlight:int(Now),
sqlight:text(Session_id),
nullable_int(User_id),
sqlight:text(Page),
sqlight:text(Variant_name),
sqlight_ffi:coerce_blob(Raw_payload),
sqlight:int(Elapsed_ms)],
gleam@dynamic@decode:success(nil)
),
nil.
-file("src/rally_runtime/system.gleam", 115).
-spec log_to_client(
sqlight:connection(),
binary(),
{ok, integer()} | {error, nil},
binary(),
binary(),
integer()
) -> nil.
log_to_client(Db, Session_id, User_id, Page, Variant, Elapsed_ms) ->
Now = unix_seconds(),
_ = sqlight:'query'(
<<"INSERT INTO messages (timestamp, session_id, user_id, page, direction, variant, elapsed_ms)
VALUES (?1, ?2, ?3, ?4, 'to_client', ?5, ?6)"/utf8>>,
Db,
[sqlight:int(Now),
sqlight:text(Session_id),
nullable_int(User_id),
sqlight:text(Page),
sqlight:text(Variant),
sqlight:int(Elapsed_ms)],
gleam@dynamic@decode:success(nil)
),
nil.
-file("src/rally_runtime/system.gleam", 142).
-spec log_broadcast(sqlight:connection(), binary(), binary()) -> nil.
log_broadcast(Db, Page, Variant) ->
Now = unix_seconds(),
_ = sqlight:'query'(
<<"INSERT INTO messages (timestamp, session_id, user_id, page, direction, variant)
VALUES (?1, '', NULL, ?2, 'broadcast', ?3)"/utf8>>,
Db,
[sqlight:int(Now), sqlight:text(Page), sqlight:text(Variant)],
gleam@dynamic@decode:success(nil)
),
nil.
-file("src/rally_runtime/system.gleam", 269).
-spec message_count(sqlight:connection()) -> integer().
message_count(Db) ->
case sqlight:'query'(
<<"SELECT COUNT(*) FROM messages"/utf8>>,
Db,
[],
(gleam@dynamic@decode:at(
[0],
{decoder, fun gleam@dynamic@decode:decode_int/1}
))
) of
{ok, [Count]} ->
Count;
_ ->
0
end.
-file("src/rally_runtime/system.gleam", 178).
?DOC(
" Call during app startup. Opens the system DB and stores the connection\n"
" globally so any process (WS handlers) can access it.\n"
).
-spec start(binary()) -> nil.
start(Path) ->
case open(Path) of
{ok, Conn} ->
_ = global_value:create_with_unique_name(
<<"rally_system_db"/utf8>>,
fun() -> Conn end
),
rally_runtime_ffi:store_system_conn(Conn),
logging:log(info, <<"System DB opened: "/utf8, Path/binary>>),
Count = message_count(Conn),
logging:log(
info,
<<<<"System: "/utf8, (erlang:integer_to_binary(Count))/binary>>/binary,
" messages recorded"/utf8>>
),
nil;
{error, Err} ->
logging:log(
warning,
<<"Failed to open system DB: "/utf8,
(erlang:element(3, Err))/binary>>
),
nil
end.
-file("src/rally_runtime/system.gleam", 260).
-spec get_conn() -> {ok, sqlight:connection()} | {error, nil}.
get_conn() ->
rally_runtime_ffi:get_system_conn().
-file("src/rally_runtime/system.gleam", 205).
?DOC(" Start the system DB with a background job runner.\n").
-spec start_with_jobs(
binary(),
fun((binary(), bitstring()) -> {ok, nil} | {error, binary()})
) -> nil.
start_with_jobs(Path, Handler) ->
start(Path),
case get_conn() of
{ok, Conn} ->
case rally_runtime@jobs:start_runner(Conn, Handler) of
{ok, _} ->
logging:log(info, <<"Job runner started"/utf8>>);
_ ->
logging:log(warning, <<"Failed to start job runner"/utf8>>)
end;
_ ->
logging:log(warning, <<"System DB not available"/utf8>>)
end.
-file("src/rally_runtime/system.gleam", 222).
?DOC(" Enqueue a job to run at a specific time.\n").
-spec enqueue(binary(), bitstring(), integer()) -> nil.
enqueue(Name, Payload, Run_at) ->
case get_conn() of
{ok, Conn} ->
rally_runtime@jobs:enqueue(Conn, Name, Payload, Run_at);
_ ->
nil
end.
-file("src/rally_runtime/system.gleam", 235).
?DOC(" Enqueue a job to run after a delay.\n").
-spec enqueue_in(binary(), bitstring(), integer()) -> nil.
enqueue_in(Name, Payload, Delay_seconds) ->
case get_conn() of
{ok, Conn} ->
rally_runtime@jobs:enqueue_in(Conn, Name, Payload, Delay_seconds);
_ ->
nil
end.
-file("src/rally_runtime/system.gleam", 253).
?DOC(" Enqueue a job to run immediately.\n").
-spec enqueue_now(binary(), bitstring()) -> nil.
enqueue_now(Name, Payload) ->
case get_conn() of
{ok, Conn} ->
rally_runtime@jobs:enqueue(Conn, Name, Payload, 0);
_ ->
nil
end.