-module(rally_runtime@jobs).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/rally_runtime/jobs.gleam").
-export([enqueue/4, enqueue_in/4, start_runner/2, run_once/2, run_once_at/3]).
-export_type([job/0, msg/0, state/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" Durable background job runner backed by SQLite.\n"
"\n"
" Jobs are rows in the system DB's `jobs` table. A poller actor claims\n"
" ready jobs with UPDATE...RETURNING, runs the handler, and marks them\n"
" completed or schedules a retry. Failed jobs get quadratic backoff\n"
" (5s, 20s, 45s, 80s) and are dead-lettered after max_attempts.\n"
" Running jobs have a claimed_at lease so they can be reclaimed if the\n"
" process crashes. Use run_once for deterministic testing without the\n"
" polling actor.\n"
).
-type job() :: {job, integer(), binary(), bitstring(), integer()}.
-type msg() :: poll.
-type state() :: {state,
sqlight:connection(),
fun((binary(), bitstring()) -> {ok, nil} | {error, binary()}),
gleam@erlang@process:subject(msg())}.
-file("src/rally_runtime/jobs.gleam", 40).
-spec enqueue(sqlight:connection(), binary(), bitstring(), integer()) -> nil.
enqueue(Db, Name, Payload, Run_at) ->
_ = sqlight:'query'(
<<"INSERT INTO jobs (name, payload, run_at, attempts, status) VALUES (?1, ?2, ?3, 0, 'pending')"/utf8>>,
Db,
[sqlight:text(Name),
sqlight_ffi:coerce_blob(Payload),
sqlight:int(Run_at)],
gleam@dynamic@decode:success(nil)
),
nil.
-file("src/rally_runtime/jobs.gleam", 254).
-spec unix_seconds() -> integer().
unix_seconds() ->
{Seconds, _} = gleam@time@timestamp:to_unix_seconds_and_nanoseconds(
gleam@time@timestamp:system_time()
),
Seconds.
-file("src/rally_runtime/jobs.gleam", 56).
-spec enqueue_in(sqlight:connection(), binary(), bitstring(), integer()) -> nil.
enqueue_in(Db, Name, Payload, Delay_seconds) ->
Run_at = unix_seconds() + Delay_seconds,
enqueue(Db, Name, Payload, Run_at).
-file("src/rally_runtime/jobs.gleam", 232).
-spec mark_retry(
sqlight:connection(),
integer(),
integer(),
integer(),
binary()
) -> nil.
mark_retry(Db, Job_id, Attempts, Retry_at, Reason) ->
_ = sqlight:'query'(
<<"UPDATE jobs SET status = 'pending', attempts = ?2, run_at = ?3, last_error = ?4, claimed_at = NULL WHERE id = ?1"/utf8>>,
Db,
[sqlight:int(Job_id),
sqlight:int(Attempts),
sqlight:int(Retry_at),
sqlight:text(Reason)],
gleam@dynamic@decode:success(nil)
),
nil.
-file("src/rally_runtime/jobs.gleam", 217).
-spec mark_dead(sqlight:connection(), integer(), binary()) -> nil.
mark_dead(Db, Job_id, Reason) ->
_ = sqlight:'query'(
<<"UPDATE jobs SET status = 'dead', last_error = ?2 WHERE id = ?1"/utf8>>,
Db,
[sqlight:int(Job_id), sqlight:text(Reason)],
gleam@dynamic@decode:success(nil)
),
nil.
-file("src/rally_runtime/jobs.gleam", 206).
-spec mark_completed(sqlight:connection(), integer()) -> nil.
mark_completed(Db, Job_id) ->
_ = sqlight:'query'(
<<"UPDATE jobs SET status = 'completed' WHERE id = ?1"/utf8>>,
Db,
[sqlight:int(Job_id)],
gleam@dynamic@decode:success(nil)
),
nil.
-file("src/rally_runtime/jobs.gleam", 166).
-spec run_single_job(
sqlight:connection(),
fun((binary(), bitstring()) -> {ok, nil} | {error, binary()}),
job()
) -> nil.
run_single_job(Db, Handler, Job) ->
case Handler(erlang:element(3, Job), erlang:element(4, Job)) of
{ok, _} ->
mark_completed(Db, erlang:element(2, Job));
{error, Reason} ->
Next_attempts = erlang:element(5, Job) + 1,
case Next_attempts >= 5 of
true ->
mark_dead(Db, erlang:element(2, Job), Reason),
logging:log(
warning,
<<<<<<<<<<"Job "/utf8, (erlang:element(3, Job))/binary>>/binary,
" dead-lettered after "/utf8>>/binary,
(erlang:integer_to_binary(5))/binary>>/binary,
" attempts: "/utf8>>/binary,
Reason/binary>>
);
false ->
Backoff_seconds = (Next_attempts * Next_attempts) * 5,
Retry_at = unix_seconds() + Backoff_seconds,
mark_retry(
Db,
erlang:element(2, Job),
Next_attempts,
Retry_at,
Reason
)
end
end.
-file("src/rally_runtime/jobs.gleam", 152).
-spec run_jobs(
sqlight:connection(),
fun((binary(), bitstring()) -> {ok, nil} | {error, binary()}),
list(job())
) -> nil.
run_jobs(Db, Handler, Jobs) ->
case Jobs of
[] ->
nil;
[Job | Rest] ->
run_single_job(Db, Handler, Job),
run_jobs(Db, Handler, Rest)
end.
-file("src/rally_runtime/jobs.gleam", 119).
-spec fetch_ready_jobs(sqlight:connection(), integer()) -> list(job()).
fetch_ready_jobs(Db, Now) ->
Stale_before = Now - 60,
case sqlight:'query'(
<<"UPDATE jobs
SET status = 'running', claimed_at = ?1
WHERE id IN (
SELECT id FROM jobs
WHERE run_at <= ?1
AND (
status = 'pending'
OR (status = 'running' AND (claimed_at IS NULL OR claimed_at <= ?2))
)
ORDER BY run_at
LIMIT 10
)
RETURNING id, name, payload, attempts"/utf8>>,
Db,
[sqlight:int(Now), sqlight:int(Stale_before)],
begin
gleam@dynamic@decode:field(
0,
{decoder, fun gleam@dynamic@decode:decode_int/1},
fun(Id) ->
gleam@dynamic@decode:field(
1,
{decoder, fun gleam@dynamic@decode:decode_string/1},
fun(Name) ->
gleam@dynamic@decode:field(
2,
{decoder,
fun gleam@dynamic@decode:decode_bit_array/1},
fun(Payload) ->
gleam@dynamic@decode:field(
3,
{decoder,
fun gleam@dynamic@decode:decode_int/1},
fun(Attempts) ->
gleam@dynamic@decode:success(
{job,
Id,
Name,
Payload,
Attempts}
)
end
)
end
)
end
)
end
)
end
) of
{ok, Jobs} ->
Jobs;
_ ->
[]
end.
-file("src/rally_runtime/jobs.gleam", 108).
-spec process_pending_jobs_at(
sqlight:connection(),
fun((binary(), bitstring()) -> {ok, nil} | {error, binary()}),
integer()
) -> nil.
process_pending_jobs_at(Db, Handler, Now) ->
case fetch_ready_jobs(Db, Now) of
[] ->
nil;
Jobs ->
run_jobs(Db, Handler, Jobs)
end.
-file("src/rally_runtime/jobs.gleam", 66).
-spec start_runner(
sqlight:connection(),
fun((binary(), bitstring()) -> {ok, nil} | {error, binary()})
) -> {ok, gleam@otp@actor:started(nil)} | {error, gleam@otp@actor:start_error()}.
start_runner(Db, Handler) ->
_pipe@3 = gleam@otp@actor:new_with_initialiser(
1000,
fun(Subject) ->
Selector = begin
_pipe = gleam_erlang_ffi:new_selector(),
gleam@erlang@process:select_map(
_pipe,
Subject,
fun(Msg) -> Msg end
)
end,
State = {state, Db, Handler, Subject},
gleam@erlang@process:send(Subject, poll),
_pipe@1 = gleam@otp@actor:initialised(State),
_pipe@2 = gleam@otp@actor:selecting(_pipe@1, Selector),
{ok, _pipe@2}
end
),
_pipe@4 = gleam@otp@actor:on_message(
_pipe@3,
fun(State@1, Msg@1) ->
poll = Msg@1,
process_pending_jobs_at(
erlang:element(2, State@1),
erlang:element(3, State@1),
unix_seconds()
),
_ = gleam@erlang@process:send_after(
erlang:element(4, State@1),
1000,
poll
),
gleam@otp@actor:continue(State@1)
end
),
gleam@otp@actor:start(_pipe@4).
-file("src/rally_runtime/jobs.gleam", 96).
-spec run_once(
sqlight:connection(),
fun((binary(), bitstring()) -> {ok, nil} | {error, binary()})
) -> nil.
run_once(Db, Handler) ->
process_pending_jobs_at(Db, Handler, unix_seconds()).
-file("src/rally_runtime/jobs.gleam", 100).
-spec run_once_at(
sqlight:connection(),
integer(),
fun((binary(), bitstring()) -> {ok, nil} | {error, binary()})
) -> nil.
run_once_at(Db, Now, Handler) ->
process_pending_jobs_at(Db, Handler, Now).