-module(lightspeed@pipeline@orchestrator).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/orchestrator.gleam").
-export([new/3, dead_letters/1, retries/1, batches/1, valid/1, start_run/2, complete_run/2, apply_action/3, enqueue_batch/7, start_available/2, ack_batch/3, fail_batch/5, runtime_pipeline/1, runtime_connector_plan/1, runtime_operator/1, in_flight_count/1, queued_count/1, process_outcome_label/1, signature/1]).
-export_type([batch/0, retry_record/0, dead_letter_record/0, process_outcome/0, runtime/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Connector-aware pipeline orchestration runtime for M32.\n").
-type batch() :: {batch,
binary(),
binary(),
integer(),
integer(),
binary(),
integer()}.
-type retry_record() :: {retry_record, binary(), integer(), binary(), integer()}.
-type dead_letter_record() :: {dead_letter_record,
binary(),
binary(),
binary(),
integer()}.
-type process_outcome() :: {processed, lightspeed@pipeline:process_result()} |
{retry_scheduled, retry_record()} |
{dead_lettered, dead_letter_record()} |
{failure_rejected, binary()}.
-opaque runtime() :: {runtime,
lightspeed@pipeline:runtime(),
lightspeed@pipeline@connector:connector_plan(),
lightspeed@async@backpressure:runtime(),
lightspeed@pipeline@operator:runtime(),
list(batch()),
list(retry_record()),
list(dead_letter_record())}.
-file("src/lightspeed/pipeline/orchestrator.gleam", 65).
?DOC(" Build one orchestration runtime.\n").
-spec new(
lightspeed@pipeline:runtime(),
lightspeed@pipeline@connector:connector_plan(),
lightspeed@async@backpressure:boundary()
) -> runtime().
new(Pipeline_runtime, Connector_plan, Boundary) ->
{runtime,
Pipeline_runtime,
Connector_plan,
lightspeed@async@backpressure:new(Boundary),
lightspeed@pipeline@operator:new(),
[],
[],
[]}.
-file("src/lightspeed/pipeline/orchestrator.gleam", 392).
?DOC(" Dead-letter records in emit order.\n").
-spec dead_letters(runtime()) -> list(dead_letter_record()).
dead_letters(Runtime) ->
lists:reverse(erlang:element(8, Runtime)).
-file("src/lightspeed/pipeline/orchestrator.gleam", 482).
-spec dead_letters_valid(list(dead_letter_record())) -> boolean().
dead_letters_valid(Entries) ->
case Entries of
[] ->
true;
[Entry | Rest] ->
((((erlang:element(2, Entry) /= <<""/utf8>>) andalso (erlang:element(
3,
Entry
)
/= <<""/utf8>>))
andalso (erlang:element(4, Entry) /= <<""/utf8>>))
andalso (erlang:element(5, Entry) >= 0))
andalso dead_letters_valid(Rest)
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 387).
?DOC(" Retry records in emit order.\n").
-spec retries(runtime()) -> list(retry_record()).
retries(Runtime) ->
lists:reverse(erlang:element(7, Runtime)).
-file("src/lightspeed/pipeline/orchestrator.gleam", 471).
-spec retries_valid(list(retry_record())) -> boolean().
retries_valid(Entries) ->
case Entries of
[] ->
true;
[Entry | Rest] ->
(((erlang:element(2, Entry) /= <<""/utf8>>) andalso (erlang:element(
3,
Entry
)
> 0))
andalso (erlang:element(5, Entry) >= 0))
andalso retries_valid(Rest)
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 382).
?DOC(" Batches in enqueue order.\n").
-spec batches(runtime()) -> list(batch()).
batches(Runtime) ->
lists:reverse(erlang:element(6, Runtime)).
-file("src/lightspeed/pipeline/orchestrator.gleam", 458).
-spec batches_valid(list(batch())) -> boolean().
batches_valid(Entries) ->
case Entries of
[] ->
true;
[Entry | Rest] ->
(((((erlang:element(2, Entry) /= <<""/utf8>>) andalso (erlang:element(
3,
Entry
)
/= <<""/utf8>>))
andalso (erlang:element(4, Entry) > 0))
andalso (erlang:element(6, Entry) /= <<""/utf8>>))
andalso (erlang:element(7, Entry) > 0))
andalso batches_valid(Rest)
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 82).
?DOC(" Validate runtime invariants.\n").
-spec valid(runtime()) -> boolean().
valid(Runtime) ->
(((((lightspeed@pipeline:valid(erlang:element(2, Runtime)) andalso lightspeed@pipeline@connector:valid(
erlang:element(3, Runtime)
))
andalso lightspeed@async@backpressure:valid(erlang:element(4, Runtime)))
andalso lightspeed@pipeline@operator:valid(erlang:element(5, Runtime)))
andalso batches_valid(batches(Runtime)))
andalso retries_valid(retries(Runtime)))
andalso dead_letters_valid(dead_letters(Runtime)).
-file("src/lightspeed/pipeline/orchestrator.gleam", 93).
?DOC(" Start one pipeline run.\n").
-spec start_run(runtime(), integer()) -> runtime().
start_run(Runtime, Now_ms) ->
{runtime,
lightspeed@pipeline:start(erlang:element(2, Runtime), Now_ms),
erlang:element(3, Runtime),
erlang:element(4, Runtime),
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime),
erlang:element(8, Runtime)}.
-file("src/lightspeed/pipeline/orchestrator.gleam", 101).
?DOC(" Complete one pipeline run.\n").
-spec complete_run(runtime(), integer()) -> runtime().
complete_run(Runtime, Now_ms) ->
{runtime,
lightspeed@pipeline:complete(erlang:element(2, Runtime), Now_ms),
erlang:element(3, Runtime),
erlang:element(4, Runtime),
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime),
erlang:element(8, Runtime)}.
-file("src/lightspeed/pipeline/orchestrator.gleam", 109).
?DOC(" Apply one operator control action.\n").
-spec apply_action(runtime(), lightspeed@pipeline@operator:action(), integer()) -> {runtime(),
{ok, nil} | {error, binary()}}.
apply_action(Runtime, Action, Now_ms) ->
case Action of
{replay, _, _} ->
case lightspeed@pipeline:resume_from_latest_checkpoint(
erlang:element(2, Runtime),
Now_ms
) of
{error, Reason} ->
{Runtime, {error, <<"replay:"/utf8, Reason/binary>>}};
{ok, Resumed_pipeline} ->
case lightspeed@pipeline@operator:apply(
erlang:element(5, Runtime),
Action,
Now_ms
) of
{error, Reason@1} ->
{Runtime, {error, Reason@1}};
{ok, Next_operator} ->
{{runtime,
Resumed_pipeline,
erlang:element(3, Runtime),
erlang:element(4, Runtime),
Next_operator,
erlang:element(6, Runtime),
erlang:element(7, Runtime),
erlang:element(8, Runtime)},
{ok, nil}}
end
end;
_ ->
case lightspeed@pipeline@operator:apply(
erlang:element(5, Runtime),
Action,
Now_ms
) of
{error, Reason@2} ->
{Runtime, {error, Reason@2}};
{ok, Next_operator@1} ->
{{runtime,
erlang:element(2, Runtime),
erlang:element(3, Runtime),
erlang:element(4, Runtime),
Next_operator@1,
erlang:element(6, Runtime),
erlang:element(7, Runtime),
erlang:element(8, Runtime)},
{ok, nil}}
end
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 145).
?DOC(" Enqueue one batch with connector, operator, and backpressure checks.\n").
-spec enqueue_batch(
runtime(),
binary(),
binary(),
integer(),
integer(),
binary(),
integer()
) -> {runtime(), {ok, nil} | {error, binary()}}.
enqueue_batch(Runtime, Key, Stage, Records, Lag_ms, Idempotency_key, Sequence) ->
case lightspeed@pipeline@operator:can_enqueue(erlang:element(5, Runtime)) of
false ->
{Runtime,
{error,
<<"operator_blocked:"/utf8,
(lightspeed@pipeline@operator:state_label(
lightspeed@pipeline@operator:state(
erlang:element(5, Runtime)
)
))/binary>>}};
true ->
case (Records =< 0) orelse (Records > lightspeed@pipeline@connector:max_batch_records(
erlang:element(3, Runtime)
)) of
true ->
{Runtime,
{error,
<<<<<<"invalid_batch_records:"/utf8,
(erlang:integer_to_binary(Records))/binary>>/binary,
":max="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@pipeline@connector:max_batch_records(
erlang:element(3, Runtime)
)
))/binary>>}};
false ->
case lightspeed@pipeline@connector:within_reprocess_window(
erlang:element(3, Runtime),
Sequence
) of
false ->
{Runtime,
{error,
<<"outside_reprocess_window:"/utf8,
(erlang:integer_to_binary(Sequence))/binary>>}};
true ->
case lightspeed@async@backpressure:enqueue(
erlang:element(4, Runtime),
Key
) of
{_, {error, Error}} ->
{Runtime,
{error,
<<"backpressure:"/utf8,
(lightspeed@async@backpressure:error_label(
Error
))/binary>>}};
{Next_queue, {ok, _}} ->
Batch = {batch,
Key,
Stage,
Records,
Lag_ms,
Idempotency_key,
Sequence},
{{runtime,
erlang:element(2, Runtime),
erlang:element(3, Runtime),
Next_queue,
erlang:element(5, Runtime),
[Batch | erlang:element(6, Runtime)],
erlang:element(7, Runtime),
erlang:element(8, Runtime)},
{ok, nil}}
end
end
end
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 215).
?DOC(" Start queued work up to backpressure in-flight limit.\n").
-spec start_available(runtime(), integer()) -> {runtime(), list(binary())}.
start_available(Runtime, Now_ms) ->
case lightspeed@pipeline@operator:can_start_work(erlang:element(5, Runtime)) of
false ->
{Runtime, []};
true ->
{Next_queue, Started} = lightspeed@async@backpressure:start_available(
erlang:element(4, Runtime),
Now_ms
),
{{runtime,
erlang:element(2, Runtime),
erlang:element(3, Runtime),
Next_queue,
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime),
erlang:element(8, Runtime)},
Started}
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 447).
-spec remove_batch(list(batch()), binary()) -> list(batch()).
remove_batch(Batches_rev, Key) ->
case Batches_rev of
[] ->
[];
[Batch | Rest] ->
case erlang:element(2, Batch) =:= Key of
true ->
Rest;
false ->
[Batch | remove_batch(Rest, Key)]
end
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 436).
-spec find_batch(list(batch()), binary()) -> gleam@option:option(batch()).
find_batch(Batches_rev, Key) ->
case Batches_rev of
[] ->
none;
[Batch | Rest] ->
case erlang:element(2, Batch) =:= Key of
true ->
{some, Batch};
false ->
find_batch(Rest, Key)
end
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 230).
?DOC(" Acknowledge one batch after work completion.\n").
-spec ack_batch(runtime(), binary(), integer()) -> {runtime(),
{ok, process_outcome()} | {error, binary()}}.
ack_batch(Runtime, Key, Now_ms) ->
case find_batch(erlang:element(6, Runtime), Key) of
none ->
{Runtime, {error, <<"unknown_batch:"/utf8, Key/binary>>}};
{some, Batch} ->
case lightspeed@async@backpressure:succeed(
erlang:element(4, Runtime),
Key
) of
{_, {error, Error}} ->
{Runtime,
{error,
<<"backpressure:"/utf8,
(lightspeed@async@backpressure:error_label(
Error
))/binary>>}};
{Next_queue, {ok, _}} ->
{Next_pipeline, Result} = lightspeed@pipeline:process(
erlang:element(2, Runtime),
erlang:element(3, Batch),
erlang:element(4, Batch),
erlang:element(5, Batch),
erlang:element(6, Batch),
Now_ms
),
{{runtime,
Next_pipeline,
erlang:element(3, Runtime),
Next_queue,
erlang:element(5, Runtime),
remove_batch(erlang:element(6, Runtime), Key),
erlang:element(7, Runtime),
erlang:element(8, Runtime)},
{ok, {processed, Result}}}
end
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 494).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
case Left >= Right of
true ->
Left;
false ->
Right
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 268).
?DOC(" Handle one failed batch according to retry/dead-letter policy.\n").
-spec fail_batch(runtime(), binary(), integer(), binary(), integer()) -> {runtime(),
{ok, process_outcome()} | {error, binary()}}.
fail_batch(Runtime, Key, Attempt, Reason, Now_ms) ->
case find_batch(erlang:element(6, Runtime), Key) of
none ->
{Runtime, {error, <<"unknown_batch:"/utf8, Key/binary>>}};
{some, _} ->
case lightspeed@async@backpressure:fail(
erlang:element(4, Runtime),
Key,
Reason
) of
{_, {error, Error}} ->
{Runtime,
{error,
<<"backpressure:"/utf8,
(lightspeed@async@backpressure:error_label(
Error
))/binary>>}};
{Failed_queue, {ok, _}} ->
Action = lightspeed@pipeline@connector:classify_failure(
erlang:element(3, Runtime),
max(0, Attempt),
Reason,
Now_ms
),
case Action of
{retry, Next_attempt, Retry_at_ms} ->
case lightspeed@async@backpressure:retry(
Failed_queue,
Key
) of
{_, {error, Error@1}} ->
{Runtime,
{error,
<<"backpressure:"/utf8,
(lightspeed@async@backpressure:error_label(
Error@1
))/binary>>}};
{Retried_queue, {ok, _}} ->
Record = {retry_record,
Key,
Next_attempt,
Reason,
Retry_at_ms},
{{runtime,
lightspeed@pipeline:record_retry(
erlang:element(2, Runtime),
1
),
erlang:element(3, Runtime),
Retried_queue,
erlang:element(5, Runtime),
erlang:element(6, Runtime),
[Record |
erlang:element(7, Runtime)],
erlang:element(8, Runtime)},
{ok, {retry_scheduled, Record}}}
end;
{dead_letter, Destination, Dead_reason} ->
Record@1 = {dead_letter_record,
Key,
Destination,
Dead_reason,
max(0, Now_ms)},
{{runtime,
lightspeed@pipeline:record_dead_letter(
erlang:element(2, Runtime),
1
),
erlang:element(3, Runtime),
Failed_queue,
erlang:element(5, Runtime),
remove_batch(
erlang:element(6, Runtime),
Key
),
erlang:element(7, Runtime),
[Record@1 | erlang:element(8, Runtime)]},
{ok, {dead_lettered, Record@1}}};
{reject, Reject_reason} ->
{{runtime,
erlang:element(2, Runtime),
erlang:element(3, Runtime),
Failed_queue,
erlang:element(5, Runtime),
remove_batch(
erlang:element(6, Runtime),
Key
),
erlang:element(7, Runtime),
erlang:element(8, Runtime)},
{ok, {failure_rejected, Reject_reason}}}
end
end
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 357).
?DOC(" Access inner pipeline runtime.\n").
-spec runtime_pipeline(runtime()) -> lightspeed@pipeline:runtime().
runtime_pipeline(Runtime) ->
erlang:element(2, Runtime).
-file("src/lightspeed/pipeline/orchestrator.gleam", 362).
?DOC(" Access connector plan.\n").
-spec runtime_connector_plan(runtime()) -> lightspeed@pipeline@connector:connector_plan().
runtime_connector_plan(Runtime) ->
erlang:element(3, Runtime).
-file("src/lightspeed/pipeline/orchestrator.gleam", 367).
?DOC(" Access operator runtime.\n").
-spec runtime_operator(runtime()) -> lightspeed@pipeline@operator:runtime().
runtime_operator(Runtime) ->
erlang:element(5, Runtime).
-file("src/lightspeed/pipeline/orchestrator.gleam", 372).
?DOC(" Current in-flight count.\n").
-spec in_flight_count(runtime()) -> integer().
in_flight_count(Runtime) ->
lightspeed@async@backpressure:in_flight_count(erlang:element(4, Runtime)).
-file("src/lightspeed/pipeline/orchestrator.gleam", 377).
?DOC(" Current queued count.\n").
-spec queued_count(runtime()) -> integer().
queued_count(Runtime) ->
lightspeed@async@backpressure:queued_count(erlang:element(4, Runtime)).
-file("src/lightspeed/pipeline/orchestrator.gleam", 397).
?DOC(" Process-outcome label.\n").
-spec process_outcome_label(process_outcome()) -> binary().
process_outcome_label(Outcome) ->
case Outcome of
{processed, Result} ->
<<"processed:"/utf8,
(lightspeed@pipeline:process_result_label(Result))/binary>>;
{retry_scheduled, Record} ->
<<<<<<<<<<"retry_scheduled:"/utf8,
(erlang:element(2, Record))/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Record)))/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(5, Record)))/binary>>;
{dead_lettered, Record@1} ->
<<<<<<<<<<"dead_lettered:"/utf8,
(erlang:element(2, Record@1))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(3, Record@1))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(4, Record@1))/binary>>;
{failure_rejected, Reason} ->
<<"failure_rejected:"/utf8, Reason/binary>>
end.
-file("src/lightspeed/pipeline/orchestrator.gleam", 419).
?DOC(" Stable runtime signature.\n").
-spec signature(runtime()) -> binary().
signature(Runtime) ->
<<<<<<<<<<<<<<<<<<<<<<<<<<"pipeline="/utf8,
(lightspeed@pipeline:signature(
erlang:element(
2,
Runtime
)
))/binary>>/binary,
"|connector="/utf8>>/binary,
(lightspeed@pipeline@connector:signature(
erlang:element(3, Runtime)
))/binary>>/binary,
"|operator="/utf8>>/binary,
(lightspeed@pipeline@operator:signature(
erlang:element(5, Runtime)
))/binary>>/binary,
"|queue="/utf8>>/binary,
(lightspeed@async@backpressure:signature(
erlang:element(4, Runtime)
))/binary>>/binary,
"|batches="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:length(erlang:element(6, Runtime))
))/binary>>/binary,
"|retries="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:length(erlang:element(7, Runtime))
))/binary>>/binary,
"|dead_letters="/utf8>>/binary,
(erlang:integer_to_binary(erlang:length(erlang:element(8, Runtime))))/binary>>.