src/lightspeed@pipeline@orchestrator.erl

-module(lightspeed@pipeline@orchestrator).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/orchestrator.gleam").
-export([new/3, dead_letters/1, retries/1, batches/1, valid/1, start_run/2, complete_run/2, apply_action/3, enqueue_batch/7, start_available/2, ack_batch/3, fail_batch/5, runtime_pipeline/1, runtime_connector_plan/1, runtime_operator/1, in_flight_count/1, queued_count/1, process_outcome_label/1, signature/1]).
-export_type([batch/0, retry_record/0, dead_letter_record/0, process_outcome/0, runtime/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Connector-aware pipeline orchestration runtime for M32.\n").

-type batch() :: {batch,
        binary(),
        binary(),
        integer(),
        integer(),
        binary(),
        integer()}.

-type retry_record() :: {retry_record, binary(), integer(), binary(), integer()}.

-type dead_letter_record() :: {dead_letter_record,
        binary(),
        binary(),
        binary(),
        integer()}.

-type process_outcome() :: {processed, lightspeed@pipeline:process_result()} |
    {retry_scheduled, retry_record()} |
    {dead_lettered, dead_letter_record()} |
    {failure_rejected, binary()}.

-opaque runtime() :: {runtime,
        lightspeed@pipeline:runtime(),
        lightspeed@pipeline@connector:connector_plan(),
        lightspeed@async@backpressure:runtime(),
        lightspeed@pipeline@operator:runtime(),
        list(batch()),
        list(retry_record()),
        list(dead_letter_record())}.

-file("src/lightspeed/pipeline/orchestrator.gleam", 65).
?DOC(" Build one orchestration runtime.\n").
-spec new(
    lightspeed@pipeline:runtime(),
    lightspeed@pipeline@connector:connector_plan(),
    lightspeed@async@backpressure:boundary()
) -> runtime().
new(Pipeline_runtime, Connector_plan, Boundary) ->
    {runtime,
        Pipeline_runtime,
        Connector_plan,
        lightspeed@async@backpressure:new(Boundary),
        lightspeed@pipeline@operator:new(),
        [],
        [],
        []}.

-file("src/lightspeed/pipeline/orchestrator.gleam", 392).
?DOC(" Dead-letter records in emit order.\n").
-spec dead_letters(runtime()) -> list(dead_letter_record()).
dead_letters(Runtime) ->
    lists:reverse(erlang:element(8, Runtime)).

-file("src/lightspeed/pipeline/orchestrator.gleam", 482).
-spec dead_letters_valid(list(dead_letter_record())) -> boolean().
dead_letters_valid(Entries) ->
    case Entries of
        [] ->
            true;

        [Entry | Rest] ->
            ((((erlang:element(2, Entry) /= <<""/utf8>>) andalso (erlang:element(
                3,
                Entry
            )
            /= <<""/utf8>>))
            andalso (erlang:element(4, Entry) /= <<""/utf8>>))
            andalso (erlang:element(5, Entry) >= 0))
            andalso dead_letters_valid(Rest)
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 387).
?DOC(" Retry records in emit order.\n").
-spec retries(runtime()) -> list(retry_record()).
retries(Runtime) ->
    lists:reverse(erlang:element(7, Runtime)).

-file("src/lightspeed/pipeline/orchestrator.gleam", 471).
-spec retries_valid(list(retry_record())) -> boolean().
retries_valid(Entries) ->
    case Entries of
        [] ->
            true;

        [Entry | Rest] ->
            (((erlang:element(2, Entry) /= <<""/utf8>>) andalso (erlang:element(
                3,
                Entry
            )
            > 0))
            andalso (erlang:element(5, Entry) >= 0))
            andalso retries_valid(Rest)
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 382).
?DOC(" Batches in enqueue order.\n").
-spec batches(runtime()) -> list(batch()).
batches(Runtime) ->
    lists:reverse(erlang:element(6, Runtime)).

-file("src/lightspeed/pipeline/orchestrator.gleam", 458).
-spec batches_valid(list(batch())) -> boolean().
batches_valid(Entries) ->
    case Entries of
        [] ->
            true;

        [Entry | Rest] ->
            (((((erlang:element(2, Entry) /= <<""/utf8>>) andalso (erlang:element(
                3,
                Entry
            )
            /= <<""/utf8>>))
            andalso (erlang:element(4, Entry) > 0))
            andalso (erlang:element(6, Entry) /= <<""/utf8>>))
            andalso (erlang:element(7, Entry) > 0))
            andalso batches_valid(Rest)
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 82).
?DOC(" Validate runtime invariants.\n").
-spec valid(runtime()) -> boolean().
valid(Runtime) ->
    (((((lightspeed@pipeline:valid(erlang:element(2, Runtime)) andalso lightspeed@pipeline@connector:valid(
        erlang:element(3, Runtime)
    ))
    andalso lightspeed@async@backpressure:valid(erlang:element(4, Runtime)))
    andalso lightspeed@pipeline@operator:valid(erlang:element(5, Runtime)))
    andalso batches_valid(batches(Runtime)))
    andalso retries_valid(retries(Runtime)))
    andalso dead_letters_valid(dead_letters(Runtime)).

-file("src/lightspeed/pipeline/orchestrator.gleam", 93).
?DOC(" Start one pipeline run.\n").
-spec start_run(runtime(), integer()) -> runtime().
start_run(Runtime, Now_ms) ->
    {runtime,
        lightspeed@pipeline:start(erlang:element(2, Runtime), Now_ms),
        erlang:element(3, Runtime),
        erlang:element(4, Runtime),
        erlang:element(5, Runtime),
        erlang:element(6, Runtime),
        erlang:element(7, Runtime),
        erlang:element(8, Runtime)}.

-file("src/lightspeed/pipeline/orchestrator.gleam", 101).
?DOC(" Complete one pipeline run.\n").
-spec complete_run(runtime(), integer()) -> runtime().
complete_run(Runtime, Now_ms) ->
    {runtime,
        lightspeed@pipeline:complete(erlang:element(2, Runtime), Now_ms),
        erlang:element(3, Runtime),
        erlang:element(4, Runtime),
        erlang:element(5, Runtime),
        erlang:element(6, Runtime),
        erlang:element(7, Runtime),
        erlang:element(8, Runtime)}.

-file("src/lightspeed/pipeline/orchestrator.gleam", 109).
?DOC(" Apply one operator control action.\n").
-spec apply_action(runtime(), lightspeed@pipeline@operator:action(), integer()) -> {runtime(),
    {ok, nil} | {error, binary()}}.
apply_action(Runtime, Action, Now_ms) ->
    case Action of
        {replay, _, _} ->
            case lightspeed@pipeline:resume_from_latest_checkpoint(
                erlang:element(2, Runtime),
                Now_ms
            ) of
                {error, Reason} ->
                    {Runtime, {error, <<"replay:"/utf8, Reason/binary>>}};

                {ok, Resumed_pipeline} ->
                    case lightspeed@pipeline@operator:apply(
                        erlang:element(5, Runtime),
                        Action,
                        Now_ms
                    ) of
                        {error, Reason@1} ->
                            {Runtime, {error, Reason@1}};

                        {ok, Next_operator} ->
                            {{runtime,
                                    Resumed_pipeline,
                                    erlang:element(3, Runtime),
                                    erlang:element(4, Runtime),
                                    Next_operator,
                                    erlang:element(6, Runtime),
                                    erlang:element(7, Runtime),
                                    erlang:element(8, Runtime)},
                                {ok, nil}}
                    end
            end;

        _ ->
            case lightspeed@pipeline@operator:apply(
                erlang:element(5, Runtime),
                Action,
                Now_ms
            ) of
                {error, Reason@2} ->
                    {Runtime, {error, Reason@2}};

                {ok, Next_operator@1} ->
                    {{runtime,
                            erlang:element(2, Runtime),
                            erlang:element(3, Runtime),
                            erlang:element(4, Runtime),
                            Next_operator@1,
                            erlang:element(6, Runtime),
                            erlang:element(7, Runtime),
                            erlang:element(8, Runtime)},
                        {ok, nil}}
            end
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 145).
?DOC(" Enqueue one batch with connector, operator, and backpressure checks.\n").
-spec enqueue_batch(
    runtime(),
    binary(),
    binary(),
    integer(),
    integer(),
    binary(),
    integer()
) -> {runtime(), {ok, nil} | {error, binary()}}.
enqueue_batch(Runtime, Key, Stage, Records, Lag_ms, Idempotency_key, Sequence) ->
    case lightspeed@pipeline@operator:can_enqueue(erlang:element(5, Runtime)) of
        false ->
            {Runtime,
                {error,
                    <<"operator_blocked:"/utf8,
                        (lightspeed@pipeline@operator:state_label(
                            lightspeed@pipeline@operator:state(
                                erlang:element(5, Runtime)
                            )
                        ))/binary>>}};

        true ->
            case (Records =< 0) orelse (Records > lightspeed@pipeline@connector:max_batch_records(
                erlang:element(3, Runtime)
            )) of
                true ->
                    {Runtime,
                        {error,
                            <<<<<<"invalid_batch_records:"/utf8,
                                        (erlang:integer_to_binary(Records))/binary>>/binary,
                                    ":max="/utf8>>/binary,
                                (erlang:integer_to_binary(
                                    lightspeed@pipeline@connector:max_batch_records(
                                        erlang:element(3, Runtime)
                                    )
                                ))/binary>>}};

                false ->
                    case lightspeed@pipeline@connector:within_reprocess_window(
                        erlang:element(3, Runtime),
                        Sequence
                    ) of
                        false ->
                            {Runtime,
                                {error,
                                    <<"outside_reprocess_window:"/utf8,
                                        (erlang:integer_to_binary(Sequence))/binary>>}};

                        true ->
                            case lightspeed@async@backpressure:enqueue(
                                erlang:element(4, Runtime),
                                Key
                            ) of
                                {_, {error, Error}} ->
                                    {Runtime,
                                        {error,
                                            <<"backpressure:"/utf8,
                                                (lightspeed@async@backpressure:error_label(
                                                    Error
                                                ))/binary>>}};

                                {Next_queue, {ok, _}} ->
                                    Batch = {batch,
                                        Key,
                                        Stage,
                                        Records,
                                        Lag_ms,
                                        Idempotency_key,
                                        Sequence},
                                    {{runtime,
                                            erlang:element(2, Runtime),
                                            erlang:element(3, Runtime),
                                            Next_queue,
                                            erlang:element(5, Runtime),
                                            [Batch | erlang:element(6, Runtime)],
                                            erlang:element(7, Runtime),
                                            erlang:element(8, Runtime)},
                                        {ok, nil}}
                            end
                    end
            end
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 215).
?DOC(" Start queued work up to backpressure in-flight limit.\n").
-spec start_available(runtime(), integer()) -> {runtime(), list(binary())}.
start_available(Runtime, Now_ms) ->
    case lightspeed@pipeline@operator:can_start_work(erlang:element(5, Runtime)) of
        false ->
            {Runtime, []};

        true ->
            {Next_queue, Started} = lightspeed@async@backpressure:start_available(
                erlang:element(4, Runtime),
                Now_ms
            ),
            {{runtime,
                    erlang:element(2, Runtime),
                    erlang:element(3, Runtime),
                    Next_queue,
                    erlang:element(5, Runtime),
                    erlang:element(6, Runtime),
                    erlang:element(7, Runtime),
                    erlang:element(8, Runtime)},
                Started}
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 447).
-spec remove_batch(list(batch()), binary()) -> list(batch()).
remove_batch(Batches_rev, Key) ->
    case Batches_rev of
        [] ->
            [];

        [Batch | Rest] ->
            case erlang:element(2, Batch) =:= Key of
                true ->
                    Rest;

                false ->
                    [Batch | remove_batch(Rest, Key)]
            end
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 436).
-spec find_batch(list(batch()), binary()) -> gleam@option:option(batch()).
find_batch(Batches_rev, Key) ->
    case Batches_rev of
        [] ->
            none;

        [Batch | Rest] ->
            case erlang:element(2, Batch) =:= Key of
                true ->
                    {some, Batch};

                false ->
                    find_batch(Rest, Key)
            end
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 230).
?DOC(" Acknowledge one batch after work completion.\n").
-spec ack_batch(runtime(), binary(), integer()) -> {runtime(),
    {ok, process_outcome()} | {error, binary()}}.
ack_batch(Runtime, Key, Now_ms) ->
    case find_batch(erlang:element(6, Runtime), Key) of
        none ->
            {Runtime, {error, <<"unknown_batch:"/utf8, Key/binary>>}};

        {some, Batch} ->
            case lightspeed@async@backpressure:succeed(
                erlang:element(4, Runtime),
                Key
            ) of
                {_, {error, Error}} ->
                    {Runtime,
                        {error,
                            <<"backpressure:"/utf8,
                                (lightspeed@async@backpressure:error_label(
                                    Error
                                ))/binary>>}};

                {Next_queue, {ok, _}} ->
                    {Next_pipeline, Result} = lightspeed@pipeline:process(
                        erlang:element(2, Runtime),
                        erlang:element(3, Batch),
                        erlang:element(4, Batch),
                        erlang:element(5, Batch),
                        erlang:element(6, Batch),
                        Now_ms
                    ),
                    {{runtime,
                            Next_pipeline,
                            erlang:element(3, Runtime),
                            Next_queue,
                            erlang:element(5, Runtime),
                            remove_batch(erlang:element(6, Runtime), Key),
                            erlang:element(7, Runtime),
                            erlang:element(8, Runtime)},
                        {ok, {processed, Result}}}
            end
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 494).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
    case Left >= Right of
        true ->
            Left;

        false ->
            Right
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 268).
?DOC(" Handle one failed batch according to retry/dead-letter policy.\n").
-spec fail_batch(runtime(), binary(), integer(), binary(), integer()) -> {runtime(),
    {ok, process_outcome()} | {error, binary()}}.
fail_batch(Runtime, Key, Attempt, Reason, Now_ms) ->
    case find_batch(erlang:element(6, Runtime), Key) of
        none ->
            {Runtime, {error, <<"unknown_batch:"/utf8, Key/binary>>}};

        {some, _} ->
            case lightspeed@async@backpressure:fail(
                erlang:element(4, Runtime),
                Key,
                Reason
            ) of
                {_, {error, Error}} ->
                    {Runtime,
                        {error,
                            <<"backpressure:"/utf8,
                                (lightspeed@async@backpressure:error_label(
                                    Error
                                ))/binary>>}};

                {Failed_queue, {ok, _}} ->
                    Action = lightspeed@pipeline@connector:classify_failure(
                        erlang:element(3, Runtime),
                        max(0, Attempt),
                        Reason,
                        Now_ms
                    ),
                    case Action of
                        {retry, Next_attempt, Retry_at_ms} ->
                            case lightspeed@async@backpressure:retry(
                                Failed_queue,
                                Key
                            ) of
                                {_, {error, Error@1}} ->
                                    {Runtime,
                                        {error,
                                            <<"backpressure:"/utf8,
                                                (lightspeed@async@backpressure:error_label(
                                                    Error@1
                                                ))/binary>>}};

                                {Retried_queue, {ok, _}} ->
                                    Record = {retry_record,
                                        Key,
                                        Next_attempt,
                                        Reason,
                                        Retry_at_ms},
                                    {{runtime,
                                            lightspeed@pipeline:record_retry(
                                                erlang:element(2, Runtime),
                                                1
                                            ),
                                            erlang:element(3, Runtime),
                                            Retried_queue,
                                            erlang:element(5, Runtime),
                                            erlang:element(6, Runtime),
                                            [Record |
                                                erlang:element(7, Runtime)],
                                            erlang:element(8, Runtime)},
                                        {ok, {retry_scheduled, Record}}}
                            end;

                        {dead_letter, Destination, Dead_reason} ->
                            Record@1 = {dead_letter_record,
                                Key,
                                Destination,
                                Dead_reason,
                                max(0, Now_ms)},
                            {{runtime,
                                    lightspeed@pipeline:record_dead_letter(
                                        erlang:element(2, Runtime),
                                        1
                                    ),
                                    erlang:element(3, Runtime),
                                    Failed_queue,
                                    erlang:element(5, Runtime),
                                    remove_batch(
                                        erlang:element(6, Runtime),
                                        Key
                                    ),
                                    erlang:element(7, Runtime),
                                    [Record@1 | erlang:element(8, Runtime)]},
                                {ok, {dead_lettered, Record@1}}};

                        {reject, Reject_reason} ->
                            {{runtime,
                                    erlang:element(2, Runtime),
                                    erlang:element(3, Runtime),
                                    Failed_queue,
                                    erlang:element(5, Runtime),
                                    remove_batch(
                                        erlang:element(6, Runtime),
                                        Key
                                    ),
                                    erlang:element(7, Runtime),
                                    erlang:element(8, Runtime)},
                                {ok, {failure_rejected, Reject_reason}}}
                    end
            end
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 357).
?DOC(" Access inner pipeline runtime.\n").
-spec runtime_pipeline(runtime()) -> lightspeed@pipeline:runtime().
runtime_pipeline(Runtime) ->
    erlang:element(2, Runtime).

-file("src/lightspeed/pipeline/orchestrator.gleam", 362).
?DOC(" Access connector plan.\n").
-spec runtime_connector_plan(runtime()) -> lightspeed@pipeline@connector:connector_plan().
runtime_connector_plan(Runtime) ->
    erlang:element(3, Runtime).

-file("src/lightspeed/pipeline/orchestrator.gleam", 367).
?DOC(" Access operator runtime.\n").
-spec runtime_operator(runtime()) -> lightspeed@pipeline@operator:runtime().
runtime_operator(Runtime) ->
    erlang:element(5, Runtime).

-file("src/lightspeed/pipeline/orchestrator.gleam", 372).
?DOC(" Current in-flight count.\n").
-spec in_flight_count(runtime()) -> integer().
in_flight_count(Runtime) ->
    lightspeed@async@backpressure:in_flight_count(erlang:element(4, Runtime)).

-file("src/lightspeed/pipeline/orchestrator.gleam", 377).
?DOC(" Current queued count.\n").
-spec queued_count(runtime()) -> integer().
queued_count(Runtime) ->
    lightspeed@async@backpressure:queued_count(erlang:element(4, Runtime)).

-file("src/lightspeed/pipeline/orchestrator.gleam", 397).
?DOC(" Process-outcome label.\n").
-spec process_outcome_label(process_outcome()) -> binary().
process_outcome_label(Outcome) ->
    case Outcome of
        {processed, Result} ->
            <<"processed:"/utf8,
                (lightspeed@pipeline:process_result_label(Result))/binary>>;

        {retry_scheduled, Record} ->
            <<<<<<<<<<"retry_scheduled:"/utf8,
                                (erlang:element(2, Record))/binary>>/binary,
                            ":"/utf8>>/binary,
                        (erlang:integer_to_binary(erlang:element(3, Record)))/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(5, Record)))/binary>>;

        {dead_lettered, Record@1} ->
            <<<<<<<<<<"dead_lettered:"/utf8,
                                (erlang:element(2, Record@1))/binary>>/binary,
                            ":"/utf8>>/binary,
                        (erlang:element(3, Record@1))/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:element(4, Record@1))/binary>>;

        {failure_rejected, Reason} ->
            <<"failure_rejected:"/utf8, Reason/binary>>
    end.

-file("src/lightspeed/pipeline/orchestrator.gleam", 419).
?DOC(" Stable runtime signature.\n").
-spec signature(runtime()) -> binary().
signature(Runtime) ->
    <<<<<<<<<<<<<<<<<<<<<<<<<<"pipeline="/utf8,
                                                        (lightspeed@pipeline:signature(
                                                            erlang:element(
                                                                2,
                                                                Runtime
                                                            )
                                                        ))/binary>>/binary,
                                                    "|connector="/utf8>>/binary,
                                                (lightspeed@pipeline@connector:signature(
                                                    erlang:element(3, Runtime)
                                                ))/binary>>/binary,
                                            "|operator="/utf8>>/binary,
                                        (lightspeed@pipeline@operator:signature(
                                            erlang:element(5, Runtime)
                                        ))/binary>>/binary,
                                    "|queue="/utf8>>/binary,
                                (lightspeed@async@backpressure:signature(
                                    erlang:element(4, Runtime)
                                ))/binary>>/binary,
                            "|batches="/utf8>>/binary,
                        (erlang:integer_to_binary(
                            erlang:length(erlang:element(6, Runtime))
                        ))/binary>>/binary,
                    "|retries="/utf8>>/binary,
                (erlang:integer_to_binary(
                    erlang:length(erlang:element(7, Runtime))
                ))/binary>>/binary,
            "|dead_letters="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:length(erlang:element(8, Runtime))))/binary>>.