src/rally_runtime@jobs.erl

-module(rally_runtime@jobs).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/rally_runtime/jobs.gleam").
-export([enqueue/4, enqueue_in/4, start_runner/2, run_once/2, run_once_at/3]).
-export_type([job/0, msg/0, state/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " Durable background job runner backed by SQLite.\n"
    "\n"
    " Jobs are rows in the system DB's `jobs` table. A poller actor claims\n"
    " ready jobs with UPDATE...RETURNING, runs the handler, and marks them\n"
    " completed or schedules a retry. Failed jobs get quadratic backoff\n"
    " (5s, 20s, 45s, 80s) and are dead-lettered after max_attempts.\n"
    " Running jobs have a claimed_at lease so they can be reclaimed if the\n"
    " process crashes. Use run_once for deterministic testing without the\n"
    " polling actor.\n"
).

-type job() :: {job, integer(), binary(), bitstring(), integer()}.

-type msg() :: poll.

-type state() :: {state,
        sqlight:connection(),
        fun((binary(), bitstring()) -> {ok, nil} | {error, binary()}),
        gleam@erlang@process:subject(msg())}.

-file("src/rally_runtime/jobs.gleam", 40).
-spec enqueue(sqlight:connection(), binary(), bitstring(), integer()) -> nil.
enqueue(Db, Name, Payload, Run_at) ->
    _ = sqlight:'query'(
        <<"INSERT INTO jobs (name, payload, run_at, attempts, status) VALUES (?1, ?2, ?3, 0, 'pending')"/utf8>>,
        Db,
        [sqlight:text(Name),
            sqlight_ffi:coerce_blob(Payload),
            sqlight:int(Run_at)],
        gleam@dynamic@decode:success(nil)
    ),
    nil.

-file("src/rally_runtime/jobs.gleam", 254).
-spec unix_seconds() -> integer().
unix_seconds() ->
    {Seconds, _} = gleam@time@timestamp:to_unix_seconds_and_nanoseconds(
        gleam@time@timestamp:system_time()
    ),
    Seconds.

-file("src/rally_runtime/jobs.gleam", 56).
-spec enqueue_in(sqlight:connection(), binary(), bitstring(), integer()) -> nil.
enqueue_in(Db, Name, Payload, Delay_seconds) ->
    Run_at = unix_seconds() + Delay_seconds,
    enqueue(Db, Name, Payload, Run_at).

-file("src/rally_runtime/jobs.gleam", 232).
-spec mark_retry(
    sqlight:connection(),
    integer(),
    integer(),
    integer(),
    binary()
) -> nil.
mark_retry(Db, Job_id, Attempts, Retry_at, Reason) ->
    _ = sqlight:'query'(
        <<"UPDATE jobs SET status = 'pending', attempts = ?2, run_at = ?3, last_error = ?4, claimed_at = NULL WHERE id = ?1"/utf8>>,
        Db,
        [sqlight:int(Job_id),
            sqlight:int(Attempts),
            sqlight:int(Retry_at),
            sqlight:text(Reason)],
        gleam@dynamic@decode:success(nil)
    ),
    nil.

-file("src/rally_runtime/jobs.gleam", 217).
-spec mark_dead(sqlight:connection(), integer(), binary()) -> nil.
mark_dead(Db, Job_id, Reason) ->
    _ = sqlight:'query'(
        <<"UPDATE jobs SET status = 'dead', last_error = ?2 WHERE id = ?1"/utf8>>,
        Db,
        [sqlight:int(Job_id), sqlight:text(Reason)],
        gleam@dynamic@decode:success(nil)
    ),
    nil.

-file("src/rally_runtime/jobs.gleam", 206).
-spec mark_completed(sqlight:connection(), integer()) -> nil.
mark_completed(Db, Job_id) ->
    _ = sqlight:'query'(
        <<"UPDATE jobs SET status = 'completed' WHERE id = ?1"/utf8>>,
        Db,
        [sqlight:int(Job_id)],
        gleam@dynamic@decode:success(nil)
    ),
    nil.

-file("src/rally_runtime/jobs.gleam", 166).
-spec run_single_job(
    sqlight:connection(),
    fun((binary(), bitstring()) -> {ok, nil} | {error, binary()}),
    job()
) -> nil.
run_single_job(Db, Handler, Job) ->
    case Handler(erlang:element(3, Job), erlang:element(4, Job)) of
        {ok, _} ->
            mark_completed(Db, erlang:element(2, Job));

        {error, Reason} ->
            Next_attempts = erlang:element(5, Job) + 1,
            case Next_attempts >= 5 of
                true ->
                    mark_dead(Db, erlang:element(2, Job), Reason),
                    logging:log(
                        warning,
                        <<<<<<<<<<"Job "/utf8, (erlang:element(3, Job))/binary>>/binary,
                                        " dead-lettered after "/utf8>>/binary,
                                    (erlang:integer_to_binary(5))/binary>>/binary,
                                " attempts: "/utf8>>/binary,
                            Reason/binary>>
                    );

                false ->
                    Backoff_seconds = (Next_attempts * Next_attempts) * 5,
                    Retry_at = unix_seconds() + Backoff_seconds,
                    mark_retry(
                        Db,
                        erlang:element(2, Job),
                        Next_attempts,
                        Retry_at,
                        Reason
                    )
            end
    end.

-file("src/rally_runtime/jobs.gleam", 152).
-spec run_jobs(
    sqlight:connection(),
    fun((binary(), bitstring()) -> {ok, nil} | {error, binary()}),
    list(job())
) -> nil.
run_jobs(Db, Handler, Jobs) ->
    case Jobs of
        [] ->
            nil;

        [Job | Rest] ->
            run_single_job(Db, Handler, Job),
            run_jobs(Db, Handler, Rest)
    end.

-file("src/rally_runtime/jobs.gleam", 119).
-spec fetch_ready_jobs(sqlight:connection(), integer()) -> list(job()).
fetch_ready_jobs(Db, Now) ->
    Stale_before = Now - 60,
    case sqlight:'query'(
        <<"UPDATE jobs
       SET status = 'running', claimed_at = ?1
       WHERE id IN (
         SELECT id FROM jobs
         WHERE run_at <= ?1
         AND (
           status = 'pending'
           OR (status = 'running' AND (claimed_at IS NULL OR claimed_at <= ?2))
         )
         ORDER BY run_at
         LIMIT 10
       )
       RETURNING id, name, payload, attempts"/utf8>>,
        Db,
        [sqlight:int(Now), sqlight:int(Stale_before)],
        begin
            gleam@dynamic@decode:field(
                0,
                {decoder, fun gleam@dynamic@decode:decode_int/1},
                fun(Id) ->
                    gleam@dynamic@decode:field(
                        1,
                        {decoder, fun gleam@dynamic@decode:decode_string/1},
                        fun(Name) ->
                            gleam@dynamic@decode:field(
                                2,
                                {decoder,
                                    fun gleam@dynamic@decode:decode_bit_array/1},
                                fun(Payload) ->
                                    gleam@dynamic@decode:field(
                                        3,
                                        {decoder,
                                            fun gleam@dynamic@decode:decode_int/1},
                                        fun(Attempts) ->
                                            gleam@dynamic@decode:success(
                                                {job,
                                                    Id,
                                                    Name,
                                                    Payload,
                                                    Attempts}
                                            )
                                        end
                                    )
                                end
                            )
                        end
                    )
                end
            )
        end
    ) of
        {ok, Jobs} ->
            Jobs;

        _ ->
            []
    end.

-file("src/rally_runtime/jobs.gleam", 108).
-spec process_pending_jobs_at(
    sqlight:connection(),
    fun((binary(), bitstring()) -> {ok, nil} | {error, binary()}),
    integer()
) -> nil.
process_pending_jobs_at(Db, Handler, Now) ->
    case fetch_ready_jobs(Db, Now) of
        [] ->
            nil;

        Jobs ->
            run_jobs(Db, Handler, Jobs)
    end.

-file("src/rally_runtime/jobs.gleam", 66).
-spec start_runner(
    sqlight:connection(),
    fun((binary(), bitstring()) -> {ok, nil} | {error, binary()})
) -> {ok, gleam@otp@actor:started(nil)} | {error, gleam@otp@actor:start_error()}.
start_runner(Db, Handler) ->
    _pipe@3 = gleam@otp@actor:new_with_initialiser(
        1000,
        fun(Subject) ->
            Selector = begin
                _pipe = gleam_erlang_ffi:new_selector(),
                gleam@erlang@process:select_map(
                    _pipe,
                    Subject,
                    fun(Msg) -> Msg end
                )
            end,
            State = {state, Db, Handler, Subject},
            gleam@erlang@process:send(Subject, poll),
            _pipe@1 = gleam@otp@actor:initialised(State),
            _pipe@2 = gleam@otp@actor:selecting(_pipe@1, Selector),
            {ok, _pipe@2}
        end
    ),
    _pipe@4 = gleam@otp@actor:on_message(
        _pipe@3,
        fun(State@1, Msg@1) ->
            poll = Msg@1,
            process_pending_jobs_at(
                erlang:element(2, State@1),
                erlang:element(3, State@1),
                unix_seconds()
            ),
            _ = gleam@erlang@process:send_after(
                erlang:element(4, State@1),
                1000,
                poll
            ),
            gleam@otp@actor:continue(State@1)
        end
    ),
    gleam@otp@actor:start(_pipe@4).

-file("src/rally_runtime/jobs.gleam", 96).
-spec run_once(
    sqlight:connection(),
    fun((binary(), bitstring()) -> {ok, nil} | {error, binary()})
) -> nil.
run_once(Db, Handler) ->
    process_pending_jobs_at(Db, Handler, unix_seconds()).

-file("src/rally_runtime/jobs.gleam", 100).
-spec run_once_at(
    sqlight:connection(),
    integer(),
    fun((binary(), bitstring()) -> {ok, nil} | {error, binary()})
) -> nil.
run_once_at(Db, Now, Handler) ->
    process_pending_jobs_at(Db, Handler, Now).