-module(lightspeed@pipeline).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline.gleam").
-export([source_stage/2, transform_stage/3, sink_stage/2, pipeline/3, new/1, checkpoints/1, valid_pipeline/1, valid/1, schedule/2, trigger/3, start/2, process/6, record_retry/2, record_dead_letter/2, fail/2, crash/2, resume_from_latest_checkpoint/2, complete/2, runtime_pipeline/1, lifecycle/1, runtime_telemetry/1, sink_idempotency_keys/1, stage_kind_label/1, schedule_label/1, lifecycle_label/1, process_result_label/1, pipeline_signature/1, signature/1]).
-export_type([stage_kind/0, boundary/0, stage/0, schedule/0, lifecycle/0, pipeline/0, process_result/0, runtime/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Integrated data-pipeline core contracts for M31.\n").
-type stage_kind() :: source | transform | sink.
-type boundary() :: {boundary, binary(), binary()}.
-type stage() :: {stage, binary(), stage_kind(), boundary()}.
-type schedule() :: manual | {interval, integer()} | {cron, binary()}.
-type lifecycle() :: idle |
{scheduled, integer()} |
{triggered, binary(), integer()} |
{running, binary(), integer()} |
{failed, binary(), binary()} |
{crashed, binary(), binary()} |
{replaying, binary(), binary(), integer()} |
{completed, binary(), integer()}.
-type pipeline() :: {pipeline, binary(), schedule(), list(stage())}.
-type process_result() :: {applied, lightspeed@pipeline@checkpoint:checkpoint()} |
{duplicate_suppressed, binary(), binary()} |
{rejected, binary()}.
-opaque runtime() :: {runtime,
pipeline(),
lifecycle(),
integer(),
list(lightspeed@pipeline@checkpoint:checkpoint()),
list(binary()),
lightspeed@pipeline@telemetry:snapshot()}.
-file("src/lightspeed/pipeline.gleam", 70).
?DOC(" Build a source stage.\n").
-spec source_stage(binary(), binary()) -> stage().
source_stage(Name, Output_contract) ->
{stage, Name, source, {boundary, <<"none"/utf8>>, Output_contract}}.
-file("src/lightspeed/pipeline.gleam", 79).
?DOC(" Build a transform stage.\n").
-spec transform_stage(binary(), binary(), binary()) -> stage().
transform_stage(Name, Input_contract, Output_contract) ->
{stage, Name, transform, {boundary, Input_contract, Output_contract}}.
-file("src/lightspeed/pipeline.gleam", 95).
?DOC(" Build a sink stage.\n").
-spec sink_stage(binary(), binary()) -> stage().
sink_stage(Name, Input_contract) ->
{stage, Name, sink, {boundary, Input_contract, <<"none"/utf8>>}}.
-file("src/lightspeed/pipeline.gleam", 104).
?DOC(" Build one pipeline contract.\n").
-spec pipeline(binary(), schedule(), list(stage())) -> pipeline().
pipeline(Name, Schedule, Stages) ->
{pipeline, Name, Schedule, Stages}.
-file("src/lightspeed/pipeline.gleam", 113).
?DOC(" Build one runtime.\n").
-spec new(pipeline()) -> runtime().
new(Pipeline) ->
{runtime, Pipeline, idle, 1, [], [], lightspeed@pipeline@telemetry:zero()}.
-file("src/lightspeed/pipeline.gleam", 348).
?DOC(" Stable checkpoints in emit order.\n").
-spec checkpoints(runtime()) -> list(lightspeed@pipeline@checkpoint:checkpoint()).
checkpoints(Runtime) ->
lists:reverse(erlang:element(5, Runtime)).
-file("src/lightspeed/pipeline.gleam", 538).
-spec checkpoints_valid(list(lightspeed@pipeline@checkpoint:checkpoint())) -> boolean().
checkpoints_valid(Entries) ->
case Entries of
[] ->
true;
[Entry | Rest] ->
lightspeed@pipeline@checkpoint:valid(Entry) andalso checkpoints_valid(
Rest
)
end.
-file("src/lightspeed/pipeline.gleam", 450).
-spec stages_valid(list(stage())) -> boolean().
stages_valid(Stages) ->
case Stages of
[] ->
true;
[Stage | Rest] ->
(((erlang:element(2, Stage) /= <<""/utf8>>) andalso (erlang:element(
2,
erlang:element(4, Stage)
)
/= <<""/utf8>>))
andalso (erlang:element(3, erlang:element(4, Stage)) /= <<""/utf8>>))
andalso stages_valid(Rest)
end.
-file("src/lightspeed/pipeline.gleam", 520).
-spec has_key(list(binary()), binary()) -> boolean().
has_key(Keys, Key) ->
case Keys of
[] ->
false;
[Entry | Rest] ->
case Entry =:= Key of
true ->
true;
false ->
has_key(Rest, Key)
end
end.
-file("src/lightspeed/pipeline.gleam", 461).
-spec stage_names_unique(list(stage()), list(binary())) -> boolean().
stage_names_unique(Stages, Seen) ->
case Stages of
[] ->
true;
[Stage | Rest] ->
case has_key(Seen, erlang:element(2, Stage)) of
true ->
false;
false ->
stage_names_unique(Rest, [erlang:element(2, Stage) | Seen])
end
end.
-file("src/lightspeed/pipeline.gleam", 442).
-spec schedule_valid(schedule()) -> boolean().
schedule_valid(Schedule) ->
case Schedule of
manual ->
true;
{interval, Interval_ms} ->
Interval_ms > 0;
{cron, Expression} ->
Expression /= <<""/utf8>>
end.
-file("src/lightspeed/pipeline.gleam", 479).
-spec last_stage_kind(list(stage())) -> gleam@option:option(stage_kind()).
last_stage_kind(Stages) ->
case lists:reverse(Stages) of
[] ->
none;
[Stage | _] ->
{some, erlang:element(3, Stage)}
end.
-file("src/lightspeed/pipeline.gleam", 472).
-spec first_stage_kind(list(stage())) -> gleam@option:option(stage_kind()).
first_stage_kind(Stages) ->
case Stages of
[] ->
none;
[Stage | _] ->
{some, erlang:element(3, Stage)}
end.
-file("src/lightspeed/pipeline.gleam", 133).
?DOC(" Validate one pipeline contract.\n").
-spec valid_pipeline(pipeline()) -> boolean().
valid_pipeline(Pipeline) ->
case (erlang:element(2, Pipeline) =:= <<""/utf8>>) orelse (erlang:element(
4,
Pipeline
)
=:= []) of
true ->
false;
false ->
First_kind = first_stage_kind(erlang:element(4, Pipeline)),
Last_kind = last_stage_kind(erlang:element(4, Pipeline)),
(((schedule_valid(erlang:element(3, Pipeline)) andalso (First_kind
=:= {some, source}))
andalso (Last_kind =:= {some, sink}))
andalso stage_names_unique(erlang:element(4, Pipeline), []))
andalso stages_valid(erlang:element(4, Pipeline))
end.
-file("src/lightspeed/pipeline.gleam", 125).
?DOC(" Validate pipeline and runtime invariants.\n").
-spec valid(runtime()) -> boolean().
valid(Runtime) ->
((valid_pipeline(erlang:element(2, Runtime)) andalso (erlang:element(
4,
Runtime
)
> 0))
andalso checkpoints_valid(checkpoints(Runtime)))
andalso lightspeed@pipeline@telemetry:valid(erlang:element(7, Runtime)).
-file("src/lightspeed/pipeline.gleam", 553).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
case Left >= Right of
true ->
Left;
false ->
Right
end.
-file("src/lightspeed/pipeline.gleam", 150).
?DOC(" Schedule one runtime.\n").
-spec schedule(runtime(), integer()) -> runtime().
schedule(Runtime, At_ms) ->
{runtime,
erlang:element(2, Runtime),
{scheduled, max(0, At_ms)},
erlang:element(4, Runtime),
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime)}.
-file("src/lightspeed/pipeline.gleam", 155).
?DOC(" Trigger one runtime.\n").
-spec trigger(runtime(), binary(), integer()) -> runtime().
trigger(Runtime, Reason, At_ms) ->
{runtime,
erlang:element(2, Runtime),
{triggered, Reason, max(0, At_ms)},
erlang:element(4, Runtime),
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime)}.
-file("src/lightspeed/pipeline.gleam", 505).
-spec next_run_id(integer()) -> binary().
next_run_id(Seq) ->
<<"run-"/utf8, (erlang:integer_to_binary(Seq))/binary>>.
-file("src/lightspeed/pipeline.gleam", 160).
?DOC(" Start one run.\n").
-spec start(runtime(), integer()) -> runtime().
start(Runtime, Now_ms) ->
Run_id = next_run_id(erlang:element(4, Runtime)),
{runtime,
erlang:element(2, Runtime),
{running, Run_id, max(0, Now_ms)},
erlang:element(4, Runtime) + 1,
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime)}.
-file("src/lightspeed/pipeline.gleam", 497).
-spec active_started_ms(lifecycle()) -> integer().
active_started_ms(Lifecycle) ->
case Lifecycle of
{running, _, Started_ms} ->
Started_ms;
{replaying, _, _, Started_ms@1} ->
Started_ms@1;
_ ->
0
end.
-file("src/lightspeed/pipeline.gleam", 531).
-spec total_offset(list(lightspeed@pipeline@checkpoint:checkpoint())) -> integer().
total_offset(Entries_rev) ->
case Entries_rev of
[] ->
0;
[Entry | _] ->
erlang:element(2, erlang:element(5, Entry))
end.
-file("src/lightspeed/pipeline.gleam", 509).
-spec find_stage(list(stage()), binary()) -> gleam@option:option(stage()).
find_stage(Stages, Name) ->
case Stages of
[] ->
none;
[Stage | Rest] ->
case erlang:element(2, Stage) =:= Name of
true ->
{some, Stage};
false ->
find_stage(Rest, Name)
end
end.
-file("src/lightspeed/pipeline.gleam", 486).
-spec active_run(lifecycle()) -> gleam@option:option(binary()).
active_run(Lifecycle) ->
case Lifecycle of
{running, Run_id, _} ->
{some, Run_id};
{replaying, Run_id@1, _, _} ->
{some, Run_id@1};
{failed, Run_id@2, _} ->
{some, Run_id@2};
{crashed, Run_id@3, _} ->
{some, Run_id@3};
{completed, Run_id@4, _} ->
{some, Run_id@4};
_ ->
none
end.
-file("src/lightspeed/pipeline.gleam", 170).
?DOC(" Process one stage event.\n").
-spec process(runtime(), binary(), integer(), integer(), binary(), integer()) -> {runtime(),
process_result()}.
process(Runtime, Stage_name, Processed_records, Lag_ms, Idempotency_key, Now_ms) ->
case active_run(erlang:element(3, Runtime)) of
none ->
{Runtime, {rejected, <<"not_running"/utf8>>}};
{some, Run_id} ->
case find_stage(
erlang:element(4, erlang:element(2, Runtime)),
Stage_name
) of
none ->
{Runtime,
{rejected, <<"unknown_stage:"/utf8, Stage_name/binary>>}};
{some, Stage} ->
case Processed_records =< 0 of
true ->
{Runtime,
{rejected,
<<"invalid_processed_records:"/utf8,
(erlang:integer_to_binary(
Processed_records
))/binary>>}};
false ->
case (erlang:element(3, Stage) =:= sink) andalso has_key(
erlang:element(6, Runtime),
Idempotency_key
) of
true ->
{Runtime,
{duplicate_suppressed,
erlang:element(2, Stage),
Idempotency_key}};
false ->
Sequence = erlang:length(
erlang:element(5, Runtime)
)
+ 1,
Watermark = lightspeed@pipeline@checkpoint:watermark(
total_offset(erlang:element(5, Runtime))
+ Processed_records,
max(0, Now_ms)
),
Entry = lightspeed@pipeline@checkpoint:checkpoint(
Run_id,
erlang:element(2, Stage),
Sequence,
Watermark,
Idempotency_key,
max(0, Now_ms)
),
Started_ms = active_started_ms(
erlang:element(3, Runtime)
),
Snapshot = lightspeed@pipeline@telemetry:apply(
erlang:element(7, Runtime),
lightspeed@pipeline@telemetry:delta(
Processed_records,
max(0, Lag_ms),
0,
0,
max(1, max(0, Now_ms) - Started_ms)
)
),
Keys = case erlang:element(3, Stage) =:= sink of
true ->
[Idempotency_key |
erlang:element(6, Runtime)];
false ->
erlang:element(6, Runtime)
end,
{{runtime,
erlang:element(2, Runtime),
erlang:element(3, Runtime),
erlang:element(4, Runtime),
[Entry | erlang:element(5, Runtime)],
Keys,
Snapshot},
{applied, Entry}}
end
end
end
end.
-file("src/lightspeed/pipeline.gleam", 257).
?DOC(" Record retries for the active run.\n").
-spec record_retry(runtime(), integer()) -> runtime().
record_retry(Runtime, Count) ->
{runtime,
erlang:element(2, Runtime),
erlang:element(3, Runtime),
erlang:element(4, Runtime),
erlang:element(5, Runtime),
erlang:element(6, Runtime),
lightspeed@pipeline@telemetry:apply(
erlang:element(7, Runtime),
lightspeed@pipeline@telemetry:delta(
0,
erlang:element(2, erlang:element(7, Runtime)),
max(0, Count),
0,
1
)
)}.
-file("src/lightspeed/pipeline.gleam", 268).
?DOC(" Record dead-letter increments for the active run.\n").
-spec record_dead_letter(runtime(), integer()) -> runtime().
record_dead_letter(Runtime, Count) ->
{runtime,
erlang:element(2, Runtime),
erlang:element(3, Runtime),
erlang:element(4, Runtime),
erlang:element(5, Runtime),
erlang:element(6, Runtime),
lightspeed@pipeline@telemetry:apply(
erlang:element(7, Runtime),
lightspeed@pipeline@telemetry:delta(
0,
erlang:element(2, erlang:element(7, Runtime)),
0,
max(0, Count),
1
)
)}.
-file("src/lightspeed/pipeline.gleam", 279).
?DOC(" Mark active run as failed.\n").
-spec fail(runtime(), binary()) -> runtime().
fail(Runtime, Reason) ->
case active_run(erlang:element(3, Runtime)) of
none ->
Runtime;
{some, Run_id} ->
{runtime,
erlang:element(2, Runtime),
{failed, Run_id, Reason},
erlang:element(4, Runtime),
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime)}
end.
-file("src/lightspeed/pipeline.gleam", 288).
?DOC(" Mark active run as crashed.\n").
-spec crash(runtime(), binary()) -> runtime().
crash(Runtime, Reason) ->
case active_run(erlang:element(3, Runtime)) of
none ->
Runtime;
{some, Run_id} ->
{runtime,
erlang:element(2, Runtime),
{crashed, Run_id, Reason},
erlang:element(4, Runtime),
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime)}
end.
-file("src/lightspeed/pipeline.gleam", 297).
?DOC(" Resume from the latest checkpoint.\n").
-spec resume_from_latest_checkpoint(runtime(), integer()) -> {ok, runtime()} |
{error, binary()}.
resume_from_latest_checkpoint(Runtime, Now_ms) ->
case lightspeed@pipeline@checkpoint:latest(checkpoints(Runtime)) of
none ->
{error, <<"missing_checkpoint"/utf8>>};
{some, Entry} ->
Run_id = <<(next_run_id(erlang:element(4, Runtime)))/binary,
"-replay"/utf8>>,
{ok,
{runtime,
erlang:element(2, Runtime),
{replaying,
Run_id,
erlang:element(3, Entry),
max(0, Now_ms)},
erlang:element(4, Runtime) + 1,
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime)}}
end.
-file("src/lightspeed/pipeline.gleam", 321).
?DOC(" Complete the active run.\n").
-spec complete(runtime(), integer()) -> runtime().
complete(Runtime, Now_ms) ->
case active_run(erlang:element(3, Runtime)) of
none ->
Runtime;
{some, Run_id} ->
{runtime,
erlang:element(2, Runtime),
{completed, Run_id, max(0, Now_ms)},
erlang:element(4, Runtime),
erlang:element(5, Runtime),
erlang:element(6, Runtime),
erlang:element(7, Runtime)}
end.
-file("src/lightspeed/pipeline.gleam", 333).
?DOC(" Runtime pipeline accessor.\n").
-spec runtime_pipeline(runtime()) -> pipeline().
runtime_pipeline(Runtime) ->
erlang:element(2, Runtime).
-file("src/lightspeed/pipeline.gleam", 338).
?DOC(" Runtime lifecycle accessor.\n").
-spec lifecycle(runtime()) -> lifecycle().
lifecycle(Runtime) ->
erlang:element(3, Runtime).
-file("src/lightspeed/pipeline.gleam", 343).
?DOC(" Runtime telemetry accessor.\n").
-spec runtime_telemetry(runtime()) -> lightspeed@pipeline@telemetry:snapshot().
runtime_telemetry(Runtime) ->
erlang:element(7, Runtime).
-file("src/lightspeed/pipeline.gleam", 353).
?DOC(" Sink idempotency keys in application order.\n").
-spec sink_idempotency_keys(runtime()) -> list(binary()).
sink_idempotency_keys(Runtime) ->
lists:reverse(erlang:element(6, Runtime)).
-file("src/lightspeed/pipeline.gleam", 358).
?DOC(" Stage kind label.\n").
-spec stage_kind_label(stage_kind()) -> binary().
stage_kind_label(Kind) ->
case Kind of
source ->
<<"source"/utf8>>;
transform ->
<<"transform"/utf8>>;
sink ->
<<"sink"/utf8>>
end.
-file("src/lightspeed/pipeline.gleam", 367).
?DOC(" Schedule label.\n").
-spec schedule_label(schedule()) -> binary().
schedule_label(Schedule) ->
case Schedule of
manual ->
<<"manual"/utf8>>;
{interval, Interval_ms} ->
<<"interval:"/utf8, (erlang:integer_to_binary(Interval_ms))/binary>>;
{cron, Expression} ->
<<"cron:"/utf8, Expression/binary>>
end.
-file("src/lightspeed/pipeline.gleam", 376).
?DOC(" Lifecycle label.\n").
-spec lifecycle_label(lifecycle()) -> binary().
lifecycle_label(Lifecycle) ->
case Lifecycle of
idle ->
<<"idle"/utf8>>;
{scheduled, At_ms} ->
<<"scheduled:"/utf8, (erlang:integer_to_binary(At_ms))/binary>>;
{triggered, Reason, At_ms@1} ->
<<<<<<"triggered:"/utf8, Reason/binary>>/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(At_ms@1))/binary>>;
{running, Run_id, Started_ms} ->
<<<<<<"running:"/utf8, Run_id/binary>>/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(Started_ms))/binary>>;
{failed, Run_id@1, Reason@1} ->
<<<<<<"failed:"/utf8, Run_id@1/binary>>/binary, ":"/utf8>>/binary,
Reason@1/binary>>;
{crashed, Run_id@2, Reason@2} ->
<<<<<<"crashed:"/utf8, Run_id@2/binary>>/binary, ":"/utf8>>/binary,
Reason@2/binary>>;
{replaying, Run_id@3, From_stage, Started_ms@1} ->
<<<<<<<<<<"replaying:"/utf8, Run_id@3/binary>>/binary, ":"/utf8>>/binary,
From_stage/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Started_ms@1))/binary>>;
{completed, Run_id@4, Finished_ms} ->
<<<<<<"completed:"/utf8, Run_id@4/binary>>/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(Finished_ms))/binary>>
end.
-file("src/lightspeed/pipeline.gleam", 399).
?DOC(" Stable process-result label.\n").
-spec process_result_label(process_result()) -> binary().
process_result_label(Result) ->
case Result of
{applied, Entry} ->
<<"applied:"/utf8,
(lightspeed@pipeline@checkpoint:checkpoint_label(Entry))/binary>>;
{duplicate_suppressed, Stage, Key} ->
<<<<<<"duplicate_suppressed:"/utf8, Stage/binary>>/binary,
":"/utf8>>/binary,
Key/binary>>;
{rejected, Reason} ->
<<"rejected:"/utf8, Reason/binary>>
end.
-file("src/lightspeed/pipeline.gleam", 545).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/pipeline.gleam", 432).
-spec stage_signature(stage()) -> binary().
stage_signature(Stage) ->
<<<<<<<<<<<<(erlang:element(2, Stage))/binary, ":"/utf8>>/binary,
(stage_kind_label(erlang:element(3, Stage)))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(2, erlang:element(4, Stage)))/binary>>/binary,
"->"/utf8>>/binary,
(erlang:element(3, erlang:element(4, Stage)))/binary>>.
-file("src/lightspeed/pipeline.gleam", 423).
?DOC(" Stable pipeline signature.\n").
-spec pipeline_signature(pipeline()) -> binary().
pipeline_signature(Pipeline) ->
<<<<<<<<<<"name="/utf8, (erlang:element(2, Pipeline))/binary>>/binary,
"|schedule="/utf8>>/binary,
(schedule_label(erlang:element(3, Pipeline)))/binary>>/binary,
"|stages="/utf8>>/binary,
(join_with(
<<","/utf8>>,
gleam@list:map(erlang:element(4, Pipeline), fun stage_signature/1)
))/binary>>.
-file("src/lightspeed/pipeline.gleam", 409).
?DOC(" Stable runtime signature.\n").
-spec signature(runtime()) -> binary().
signature(Runtime) ->
<<<<<<<<<<<<<<<<<<"pipeline="/utf8,
(pipeline_signature(
erlang:element(2, Runtime)
))/binary>>/binary,
"|lifecycle="/utf8>>/binary,
(lifecycle_label(erlang:element(3, Runtime)))/binary>>/binary,
"|checkpoints="/utf8>>/binary,
(lightspeed@pipeline@checkpoint:chain_signature(
checkpoints(Runtime)
))/binary>>/binary,
"|sink_keys="/utf8>>/binary,
(join_with(<<","/utf8>>, sink_idempotency_keys(Runtime)))/binary>>/binary,
"|telemetry="/utf8>>/binary,
(lightspeed@pipeline@telemetry:label(erlang:element(7, Runtime)))/binary>>.