src/lightspeed@pipeline.erl

-module(lightspeed@pipeline).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline.gleam").
-export([source_stage/2, transform_stage/3, sink_stage/2, pipeline/3, new/1, checkpoints/1, valid_pipeline/1, valid/1, schedule/2, trigger/3, start/2, process/6, record_retry/2, record_dead_letter/2, fail/2, crash/2, resume_from_latest_checkpoint/2, complete/2, runtime_pipeline/1, lifecycle/1, runtime_telemetry/1, sink_idempotency_keys/1, stage_kind_label/1, schedule_label/1, lifecycle_label/1, process_result_label/1, pipeline_signature/1, signature/1]).
-export_type([stage_kind/0, boundary/0, stage/0, schedule/0, lifecycle/0, pipeline/0, process_result/0, runtime/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Integrated data-pipeline core contracts for M31.\n").

-type stage_kind() :: source | transform | sink.

-type boundary() :: {boundary, binary(), binary()}.

-type stage() :: {stage, binary(), stage_kind(), boundary()}.

-type schedule() :: manual | {interval, integer()} | {cron, binary()}.

-type lifecycle() :: idle |
    {scheduled, integer()} |
    {triggered, binary(), integer()} |
    {running, binary(), integer()} |
    {failed, binary(), binary()} |
    {crashed, binary(), binary()} |
    {replaying, binary(), binary(), integer()} |
    {completed, binary(), integer()}.

-type pipeline() :: {pipeline, binary(), schedule(), list(stage())}.

-type process_result() :: {applied, lightspeed@pipeline@checkpoint:checkpoint()} |
    {duplicate_suppressed, binary(), binary()} |
    {rejected, binary()}.

-opaque runtime() :: {runtime,
        pipeline(),
        lifecycle(),
        integer(),
        list(lightspeed@pipeline@checkpoint:checkpoint()),
        list(binary()),
        lightspeed@pipeline@telemetry:snapshot()}.

-file("src/lightspeed/pipeline.gleam", 70).
?DOC(" Build a source stage.\n").
-spec source_stage(binary(), binary()) -> stage().
source_stage(Name, Output_contract) ->
    {stage, Name, source, {boundary, <<"none"/utf8>>, Output_contract}}.

-file("src/lightspeed/pipeline.gleam", 79).
?DOC(" Build a transform stage.\n").
-spec transform_stage(binary(), binary(), binary()) -> stage().
transform_stage(Name, Input_contract, Output_contract) ->
    {stage, Name, transform, {boundary, Input_contract, Output_contract}}.

-file("src/lightspeed/pipeline.gleam", 95).
?DOC(" Build a sink stage.\n").
-spec sink_stage(binary(), binary()) -> stage().
sink_stage(Name, Input_contract) ->
    {stage, Name, sink, {boundary, Input_contract, <<"none"/utf8>>}}.

-file("src/lightspeed/pipeline.gleam", 104).
?DOC(" Build one pipeline contract.\n").
-spec pipeline(binary(), schedule(), list(stage())) -> pipeline().
pipeline(Name, Schedule, Stages) ->
    {pipeline, Name, Schedule, Stages}.

-file("src/lightspeed/pipeline.gleam", 113).
?DOC(" Build one runtime.\n").
-spec new(pipeline()) -> runtime().
new(Pipeline) ->
    {runtime, Pipeline, idle, 1, [], [], lightspeed@pipeline@telemetry:zero()}.

-file("src/lightspeed/pipeline.gleam", 348).
?DOC(" Stable checkpoints in emit order.\n").
-spec checkpoints(runtime()) -> list(lightspeed@pipeline@checkpoint:checkpoint()).
checkpoints(Runtime) ->
    lists:reverse(erlang:element(5, Runtime)).

-file("src/lightspeed/pipeline.gleam", 538).
-spec checkpoints_valid(list(lightspeed@pipeline@checkpoint:checkpoint())) -> boolean().
checkpoints_valid(Entries) ->
    case Entries of
        [] ->
            true;

        [Entry | Rest] ->
            lightspeed@pipeline@checkpoint:valid(Entry) andalso checkpoints_valid(
                Rest
            )
    end.

-file("src/lightspeed/pipeline.gleam", 450).
-spec stages_valid(list(stage())) -> boolean().
stages_valid(Stages) ->
    case Stages of
        [] ->
            true;

        [Stage | Rest] ->
            (((erlang:element(2, Stage) /= <<""/utf8>>) andalso (erlang:element(
                2,
                erlang:element(4, Stage)
            )
            /= <<""/utf8>>))
            andalso (erlang:element(3, erlang:element(4, Stage)) /= <<""/utf8>>))
            andalso stages_valid(Rest)
    end.

-file("src/lightspeed/pipeline.gleam", 520).
-spec has_key(list(binary()), binary()) -> boolean().
has_key(Keys, Key) ->
    case Keys of
        [] ->
            false;

        [Entry | Rest] ->
            case Entry =:= Key of
                true ->
                    true;

                false ->
                    has_key(Rest, Key)
            end
    end.

-file("src/lightspeed/pipeline.gleam", 461).
-spec stage_names_unique(list(stage()), list(binary())) -> boolean().
stage_names_unique(Stages, Seen) ->
    case Stages of
        [] ->
            true;

        [Stage | Rest] ->
            case has_key(Seen, erlang:element(2, Stage)) of
                true ->
                    false;

                false ->
                    stage_names_unique(Rest, [erlang:element(2, Stage) | Seen])
            end
    end.

-file("src/lightspeed/pipeline.gleam", 442).
-spec schedule_valid(schedule()) -> boolean().
schedule_valid(Schedule) ->
    case Schedule of
        manual ->
            true;

        {interval, Interval_ms} ->
            Interval_ms > 0;

        {cron, Expression} ->
            Expression /= <<""/utf8>>
    end.

-file("src/lightspeed/pipeline.gleam", 479).
-spec last_stage_kind(list(stage())) -> gleam@option:option(stage_kind()).
last_stage_kind(Stages) ->
    case lists:reverse(Stages) of
        [] ->
            none;

        [Stage | _] ->
            {some, erlang:element(3, Stage)}
    end.

-file("src/lightspeed/pipeline.gleam", 472).
-spec first_stage_kind(list(stage())) -> gleam@option:option(stage_kind()).
first_stage_kind(Stages) ->
    case Stages of
        [] ->
            none;

        [Stage | _] ->
            {some, erlang:element(3, Stage)}
    end.

-file("src/lightspeed/pipeline.gleam", 133).
?DOC(" Validate one pipeline contract.\n").
-spec valid_pipeline(pipeline()) -> boolean().
valid_pipeline(Pipeline) ->
    case (erlang:element(2, Pipeline) =:= <<""/utf8>>) orelse (erlang:element(
        4,
        Pipeline
    )
    =:= []) of
        true ->
            false;

        false ->
            First_kind = first_stage_kind(erlang:element(4, Pipeline)),
            Last_kind = last_stage_kind(erlang:element(4, Pipeline)),
            (((schedule_valid(erlang:element(3, Pipeline)) andalso (First_kind
            =:= {some, source}))
            andalso (Last_kind =:= {some, sink}))
            andalso stage_names_unique(erlang:element(4, Pipeline), []))
            andalso stages_valid(erlang:element(4, Pipeline))
    end.

-file("src/lightspeed/pipeline.gleam", 125).
?DOC(" Validate pipeline and runtime invariants.\n").
-spec valid(runtime()) -> boolean().
valid(Runtime) ->
    ((valid_pipeline(erlang:element(2, Runtime)) andalso (erlang:element(
        4,
        Runtime
    )
    > 0))
    andalso checkpoints_valid(checkpoints(Runtime)))
    andalso lightspeed@pipeline@telemetry:valid(erlang:element(7, Runtime)).

-file("src/lightspeed/pipeline.gleam", 553).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
    case Left >= Right of
        true ->
            Left;

        false ->
            Right
    end.

-file("src/lightspeed/pipeline.gleam", 150).
?DOC(" Schedule one runtime.\n").
-spec schedule(runtime(), integer()) -> runtime().
schedule(Runtime, At_ms) ->
    {runtime,
        erlang:element(2, Runtime),
        {scheduled, max(0, At_ms)},
        erlang:element(4, Runtime),
        erlang:element(5, Runtime),
        erlang:element(6, Runtime),
        erlang:element(7, Runtime)}.

-file("src/lightspeed/pipeline.gleam", 155).
?DOC(" Trigger one runtime.\n").
-spec trigger(runtime(), binary(), integer()) -> runtime().
trigger(Runtime, Reason, At_ms) ->
    {runtime,
        erlang:element(2, Runtime),
        {triggered, Reason, max(0, At_ms)},
        erlang:element(4, Runtime),
        erlang:element(5, Runtime),
        erlang:element(6, Runtime),
        erlang:element(7, Runtime)}.

-file("src/lightspeed/pipeline.gleam", 505).
-spec next_run_id(integer()) -> binary().
next_run_id(Seq) ->
    <<"run-"/utf8, (erlang:integer_to_binary(Seq))/binary>>.

-file("src/lightspeed/pipeline.gleam", 160).
?DOC(" Start one run.\n").
-spec start(runtime(), integer()) -> runtime().
start(Runtime, Now_ms) ->
    Run_id = next_run_id(erlang:element(4, Runtime)),
    {runtime,
        erlang:element(2, Runtime),
        {running, Run_id, max(0, Now_ms)},
        erlang:element(4, Runtime) + 1,
        erlang:element(5, Runtime),
        erlang:element(6, Runtime),
        erlang:element(7, Runtime)}.

-file("src/lightspeed/pipeline.gleam", 497).
-spec active_started_ms(lifecycle()) -> integer().
active_started_ms(Lifecycle) ->
    case Lifecycle of
        {running, _, Started_ms} ->
            Started_ms;

        {replaying, _, _, Started_ms@1} ->
            Started_ms@1;

        _ ->
            0
    end.

-file("src/lightspeed/pipeline.gleam", 531).
-spec total_offset(list(lightspeed@pipeline@checkpoint:checkpoint())) -> integer().
total_offset(Entries_rev) ->
    case Entries_rev of
        [] ->
            0;

        [Entry | _] ->
            erlang:element(2, erlang:element(5, Entry))
    end.

-file("src/lightspeed/pipeline.gleam", 509).
-spec find_stage(list(stage()), binary()) -> gleam@option:option(stage()).
find_stage(Stages, Name) ->
    case Stages of
        [] ->
            none;

        [Stage | Rest] ->
            case erlang:element(2, Stage) =:= Name of
                true ->
                    {some, Stage};

                false ->
                    find_stage(Rest, Name)
            end
    end.

-file("src/lightspeed/pipeline.gleam", 486).
-spec active_run(lifecycle()) -> gleam@option:option(binary()).
active_run(Lifecycle) ->
    case Lifecycle of
        {running, Run_id, _} ->
            {some, Run_id};

        {replaying, Run_id@1, _, _} ->
            {some, Run_id@1};

        {failed, Run_id@2, _} ->
            {some, Run_id@2};

        {crashed, Run_id@3, _} ->
            {some, Run_id@3};

        {completed, Run_id@4, _} ->
            {some, Run_id@4};

        _ ->
            none
    end.

-file("src/lightspeed/pipeline.gleam", 170).
?DOC(" Process one stage event.\n").
-spec process(runtime(), binary(), integer(), integer(), binary(), integer()) -> {runtime(),
    process_result()}.
process(Runtime, Stage_name, Processed_records, Lag_ms, Idempotency_key, Now_ms) ->
    case active_run(erlang:element(3, Runtime)) of
        none ->
            {Runtime, {rejected, <<"not_running"/utf8>>}};

        {some, Run_id} ->
            case find_stage(
                erlang:element(4, erlang:element(2, Runtime)),
                Stage_name
            ) of
                none ->
                    {Runtime,
                        {rejected, <<"unknown_stage:"/utf8, Stage_name/binary>>}};

                {some, Stage} ->
                    case Processed_records =< 0 of
                        true ->
                            {Runtime,
                                {rejected,
                                    <<"invalid_processed_records:"/utf8,
                                        (erlang:integer_to_binary(
                                            Processed_records
                                        ))/binary>>}};

                        false ->
                            case (erlang:element(3, Stage) =:= sink) andalso has_key(
                                erlang:element(6, Runtime),
                                Idempotency_key
                            ) of
                                true ->
                                    {Runtime,
                                        {duplicate_suppressed,
                                            erlang:element(2, Stage),
                                            Idempotency_key}};

                                false ->
                                    Sequence = erlang:length(
                                        erlang:element(5, Runtime)
                                    )
                                    + 1,
                                    Watermark = lightspeed@pipeline@checkpoint:watermark(
                                        total_offset(erlang:element(5, Runtime))
                                        + Processed_records,
                                        max(0, Now_ms)
                                    ),
                                    Entry = lightspeed@pipeline@checkpoint:checkpoint(
                                        Run_id,
                                        erlang:element(2, Stage),
                                        Sequence,
                                        Watermark,
                                        Idempotency_key,
                                        max(0, Now_ms)
                                    ),
                                    Started_ms = active_started_ms(
                                        erlang:element(3, Runtime)
                                    ),
                                    Snapshot = lightspeed@pipeline@telemetry:apply(
                                        erlang:element(7, Runtime),
                                        lightspeed@pipeline@telemetry:delta(
                                            Processed_records,
                                            max(0, Lag_ms),
                                            0,
                                            0,
                                            max(1, max(0, Now_ms) - Started_ms)
                                        )
                                    ),
                                    Keys = case erlang:element(3, Stage) =:= sink of
                                        true ->
                                            [Idempotency_key |
                                                erlang:element(6, Runtime)];

                                        false ->
                                            erlang:element(6, Runtime)
                                    end,
                                    {{runtime,
                                            erlang:element(2, Runtime),
                                            erlang:element(3, Runtime),
                                            erlang:element(4, Runtime),
                                            [Entry | erlang:element(5, Runtime)],
                                            Keys,
                                            Snapshot},
                                        {applied, Entry}}
                            end
                    end
            end
    end.

-file("src/lightspeed/pipeline.gleam", 257).
?DOC(" Record retries for the active run.\n").
-spec record_retry(runtime(), integer()) -> runtime().
record_retry(Runtime, Count) ->
    {runtime,
        erlang:element(2, Runtime),
        erlang:element(3, Runtime),
        erlang:element(4, Runtime),
        erlang:element(5, Runtime),
        erlang:element(6, Runtime),
        lightspeed@pipeline@telemetry:apply(
            erlang:element(7, Runtime),
            lightspeed@pipeline@telemetry:delta(
                0,
                erlang:element(2, erlang:element(7, Runtime)),
                max(0, Count),
                0,
                1
            )
        )}.

-file("src/lightspeed/pipeline.gleam", 268).
?DOC(" Record dead-letter increments for the active run.\n").
-spec record_dead_letter(runtime(), integer()) -> runtime().
record_dead_letter(Runtime, Count) ->
    {runtime,
        erlang:element(2, Runtime),
        erlang:element(3, Runtime),
        erlang:element(4, Runtime),
        erlang:element(5, Runtime),
        erlang:element(6, Runtime),
        lightspeed@pipeline@telemetry:apply(
            erlang:element(7, Runtime),
            lightspeed@pipeline@telemetry:delta(
                0,
                erlang:element(2, erlang:element(7, Runtime)),
                0,
                max(0, Count),
                1
            )
        )}.

-file("src/lightspeed/pipeline.gleam", 279).
?DOC(" Mark active run as failed.\n").
-spec fail(runtime(), binary()) -> runtime().
fail(Runtime, Reason) ->
    case active_run(erlang:element(3, Runtime)) of
        none ->
            Runtime;

        {some, Run_id} ->
            {runtime,
                erlang:element(2, Runtime),
                {failed, Run_id, Reason},
                erlang:element(4, Runtime),
                erlang:element(5, Runtime),
                erlang:element(6, Runtime),
                erlang:element(7, Runtime)}
    end.

-file("src/lightspeed/pipeline.gleam", 288).
?DOC(" Mark active run as crashed.\n").
-spec crash(runtime(), binary()) -> runtime().
crash(Runtime, Reason) ->
    case active_run(erlang:element(3, Runtime)) of
        none ->
            Runtime;

        {some, Run_id} ->
            {runtime,
                erlang:element(2, Runtime),
                {crashed, Run_id, Reason},
                erlang:element(4, Runtime),
                erlang:element(5, Runtime),
                erlang:element(6, Runtime),
                erlang:element(7, Runtime)}
    end.

-file("src/lightspeed/pipeline.gleam", 297).
?DOC(" Resume from the latest checkpoint.\n").
-spec resume_from_latest_checkpoint(runtime(), integer()) -> {ok, runtime()} |
    {error, binary()}.
resume_from_latest_checkpoint(Runtime, Now_ms) ->
    case lightspeed@pipeline@checkpoint:latest(checkpoints(Runtime)) of
        none ->
            {error, <<"missing_checkpoint"/utf8>>};

        {some, Entry} ->
            Run_id = <<(next_run_id(erlang:element(4, Runtime)))/binary,
                "-replay"/utf8>>,
            {ok,
                {runtime,
                    erlang:element(2, Runtime),
                    {replaying,
                        Run_id,
                        erlang:element(3, Entry),
                        max(0, Now_ms)},
                    erlang:element(4, Runtime) + 1,
                    erlang:element(5, Runtime),
                    erlang:element(6, Runtime),
                    erlang:element(7, Runtime)}}
    end.

-file("src/lightspeed/pipeline.gleam", 321).
?DOC(" Complete the active run.\n").
-spec complete(runtime(), integer()) -> runtime().
complete(Runtime, Now_ms) ->
    case active_run(erlang:element(3, Runtime)) of
        none ->
            Runtime;

        {some, Run_id} ->
            {runtime,
                erlang:element(2, Runtime),
                {completed, Run_id, max(0, Now_ms)},
                erlang:element(4, Runtime),
                erlang:element(5, Runtime),
                erlang:element(6, Runtime),
                erlang:element(7, Runtime)}
    end.

-file("src/lightspeed/pipeline.gleam", 333).
?DOC(" Runtime pipeline accessor.\n").
-spec runtime_pipeline(runtime()) -> pipeline().
runtime_pipeline(Runtime) ->
    erlang:element(2, Runtime).

-file("src/lightspeed/pipeline.gleam", 338).
?DOC(" Runtime lifecycle accessor.\n").
-spec lifecycle(runtime()) -> lifecycle().
lifecycle(Runtime) ->
    erlang:element(3, Runtime).

-file("src/lightspeed/pipeline.gleam", 343).
?DOC(" Runtime telemetry accessor.\n").
-spec runtime_telemetry(runtime()) -> lightspeed@pipeline@telemetry:snapshot().
runtime_telemetry(Runtime) ->
    erlang:element(7, Runtime).

-file("src/lightspeed/pipeline.gleam", 353).
?DOC(" Sink idempotency keys in application order.\n").
-spec sink_idempotency_keys(runtime()) -> list(binary()).
sink_idempotency_keys(Runtime) ->
    lists:reverse(erlang:element(6, Runtime)).

-file("src/lightspeed/pipeline.gleam", 358).
?DOC(" Stage kind label.\n").
-spec stage_kind_label(stage_kind()) -> binary().
stage_kind_label(Kind) ->
    case Kind of
        source ->
            <<"source"/utf8>>;

        transform ->
            <<"transform"/utf8>>;

        sink ->
            <<"sink"/utf8>>
    end.

-file("src/lightspeed/pipeline.gleam", 367).
?DOC(" Schedule label.\n").
-spec schedule_label(schedule()) -> binary().
schedule_label(Schedule) ->
    case Schedule of
        manual ->
            <<"manual"/utf8>>;

        {interval, Interval_ms} ->
            <<"interval:"/utf8, (erlang:integer_to_binary(Interval_ms))/binary>>;

        {cron, Expression} ->
            <<"cron:"/utf8, Expression/binary>>
    end.

-file("src/lightspeed/pipeline.gleam", 376).
?DOC(" Lifecycle label.\n").
-spec lifecycle_label(lifecycle()) -> binary().
lifecycle_label(Lifecycle) ->
    case Lifecycle of
        idle ->
            <<"idle"/utf8>>;

        {scheduled, At_ms} ->
            <<"scheduled:"/utf8, (erlang:integer_to_binary(At_ms))/binary>>;

        {triggered, Reason, At_ms@1} ->
            <<<<<<"triggered:"/utf8, Reason/binary>>/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(At_ms@1))/binary>>;

        {running, Run_id, Started_ms} ->
            <<<<<<"running:"/utf8, Run_id/binary>>/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(Started_ms))/binary>>;

        {failed, Run_id@1, Reason@1} ->
            <<<<<<"failed:"/utf8, Run_id@1/binary>>/binary, ":"/utf8>>/binary,
                Reason@1/binary>>;

        {crashed, Run_id@2, Reason@2} ->
            <<<<<<"crashed:"/utf8, Run_id@2/binary>>/binary, ":"/utf8>>/binary,
                Reason@2/binary>>;

        {replaying, Run_id@3, From_stage, Started_ms@1} ->
            <<<<<<<<<<"replaying:"/utf8, Run_id@3/binary>>/binary, ":"/utf8>>/binary,
                        From_stage/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Started_ms@1))/binary>>;

        {completed, Run_id@4, Finished_ms} ->
            <<<<<<"completed:"/utf8, Run_id@4/binary>>/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(Finished_ms))/binary>>
    end.

-file("src/lightspeed/pipeline.gleam", 399).
?DOC(" Stable process-result label.\n").
-spec process_result_label(process_result()) -> binary().
process_result_label(Result) ->
    case Result of
        {applied, Entry} ->
            <<"applied:"/utf8,
                (lightspeed@pipeline@checkpoint:checkpoint_label(Entry))/binary>>;

        {duplicate_suppressed, Stage, Key} ->
            <<<<<<"duplicate_suppressed:"/utf8, Stage/binary>>/binary,
                    ":"/utf8>>/binary,
                Key/binary>>;

        {rejected, Reason} ->
            <<"rejected:"/utf8, Reason/binary>>
    end.

-file("src/lightspeed/pipeline.gleam", 545).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [Value] ->
            Value;

        [Value@1 | Rest] ->
            <<<<Value@1/binary, Separator/binary>>/binary,
                (join_with(Separator, Rest))/binary>>
    end.

-file("src/lightspeed/pipeline.gleam", 432).
-spec stage_signature(stage()) -> binary().
stage_signature(Stage) ->
    <<<<<<<<<<<<(erlang:element(2, Stage))/binary, ":"/utf8>>/binary,
                        (stage_kind_label(erlang:element(3, Stage)))/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:element(2, erlang:element(4, Stage)))/binary>>/binary,
            "->"/utf8>>/binary,
        (erlang:element(3, erlang:element(4, Stage)))/binary>>.

-file("src/lightspeed/pipeline.gleam", 423).
?DOC(" Stable pipeline signature.\n").
-spec pipeline_signature(pipeline()) -> binary().
pipeline_signature(Pipeline) ->
    <<<<<<<<<<"name="/utf8, (erlang:element(2, Pipeline))/binary>>/binary,
                    "|schedule="/utf8>>/binary,
                (schedule_label(erlang:element(3, Pipeline)))/binary>>/binary,
            "|stages="/utf8>>/binary,
        (join_with(
            <<","/utf8>>,
            gleam@list:map(erlang:element(4, Pipeline), fun stage_signature/1)
        ))/binary>>.

-file("src/lightspeed/pipeline.gleam", 409).
?DOC(" Stable runtime signature.\n").
-spec signature(runtime()) -> binary().
signature(Runtime) ->
    <<<<<<<<<<<<<<<<<<"pipeline="/utf8,
                                        (pipeline_signature(
                                            erlang:element(2, Runtime)
                                        ))/binary>>/binary,
                                    "|lifecycle="/utf8>>/binary,
                                (lifecycle_label(erlang:element(3, Runtime)))/binary>>/binary,
                            "|checkpoints="/utf8>>/binary,
                        (lightspeed@pipeline@checkpoint:chain_signature(
                            checkpoints(Runtime)
                        ))/binary>>/binary,
                    "|sink_keys="/utf8>>/binary,
                (join_with(<<","/utf8>>, sink_idempotency_keys(Runtime)))/binary>>/binary,
            "|telemetry="/utf8>>/binary,
        (lightspeed@pipeline@telemetry:label(erlang:element(7, Runtime)))/binary>>.