src/lightspeed@pipeline@operations_surface.erl

-module(lightspeed@pipeline@operations_surface).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/operations_surface.gleam").
-export([profile_version_label/0, new/3, default_surface/0, audits/1, conformance_records/1, lineage_records/1, valid/1, start_run/2, pipeline_runtime/1, checkpoints/1, complete_run/2, apply_control/3, enqueue_batch/7, start_available/2, acknowledge_batch/3, fail_batch/5, certify_heavy_data_conformance/1, conformance_passed/1, name/1, adapter_profile_label/1, freshness/1, freshness_label/1, sink_idempotency_keys/1, telemetry_label/1, lifecycle_label/1, tenant_runtime/1, audit_signature/1, conformance_signature/1, lineage_signature/1, signature/1, fixture_snapshots/0, snapshot_signature/0]).
-export_type([adapter_profile/0, freshness_metadata/0, lineage_record/0, conformance_record/0, audit_record/0, surface/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Integrated data-pipeline and ETL runtime operations surface contracts (M60).\n").

-type adapter_profile() :: standard_profile |
    heavy_data_profile |
    burst_recovery_profile.

-type freshness_metadata() :: {freshness_metadata,
        integer(),
        integer(),
        integer(),
        boolean()}.

-type lineage_record() :: {lineage_record,
        binary(),
        binary(),
        binary(),
        binary(),
        binary(),
        integer(),
        binary()}.

-type conformance_record() :: {conformance_record,
        adapter_profile(),
        binary(),
        binary(),
        binary(),
        binary(),
        boolean()}.

-type audit_record() :: {audit_record, binary(), binary(), integer()}.

-opaque surface() :: {surface,
        binary(),
        adapter_profile(),
        lightspeed@pipeline@orchestrator:runtime(),
        lightspeed@tenant@policy:runtime(),
        freshness_metadata(),
        list(lineage_record()),
        list(conformance_record()),
        list(audit_record())}.

-file("src/lightspeed/pipeline/operations_surface.gleam", 81).
?DOC(" Stable M60 profile version label.\n").
-spec profile_version_label() -> binary().
profile_version_label() ->
    <<"m60.profile.v"/utf8, (erlang:integer_to_binary(1))/binary>>.

-file("src/lightspeed/pipeline/operations_surface.gleam", 707).
-spec default_tenant_runtime() -> lightspeed@tenant@policy:runtime().
default_tenant_runtime() ->
    lightspeed@tenant@policy:new(
        lightspeed@tenant@policy:tenant_context(
            <<"ops-bot"/utf8>>,
            <<"tenant-orders"/utf8>>,
            tenant_admin
        ),
        lightspeed@tenant@policy:expanded_budget(16, 6, 8, 12, 6, 10)
    ).

-file("src/lightspeed/pipeline/operations_surface.gleam", 714).
-spec freshness_budget_ms(adapter_profile()) -> integer().
freshness_budget_ms(Profile) ->
    case Profile of
        standard_profile ->
            120000;

        heavy_data_profile ->
            180000;

        burst_recovery_profile ->
            240000
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 696).
-spec backpressure_boundary_for_profile(adapter_profile()) -> lightspeed@async@backpressure:boundary().
backpressure_boundary_for_profile(Profile) ->
    case Profile of
        standard_profile ->
            lightspeed@async@backpressure:default_boundary();

        heavy_data_profile ->
            lightspeed@async@backpressure:boundary(push_pull, 5, 12, 400);

        burst_recovery_profile ->
            lightspeed@async@backpressure:boundary(pull_only, 6, 16, 350)
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 610).
-spec connector_plan_for_profile(adapter_profile()) -> lightspeed@pipeline@connector:connector_plan().
connector_plan_for_profile(Profile) ->
    case Profile of
        standard_profile ->
            lightspeed@pipeline@connector:connector_plan(
                <<"orders_standard_plan"/utf8>>,
                lightspeed@pipeline@connector:queue_source(
                    <<"orders_queue"/utf8>>,
                    <<"orders.raw"/utf8>>,
                    6
                ),
                lightspeed@pipeline@connector:database_sink(
                    <<"orders_store"/utf8>>,
                    <<"orders_projection"/utf8>>,
                    <<"order_id"/utf8>>
                ),
                {retry_policy, 3, 250, 2000},
                {dead_letter_policy, true, <<"orders.dead_letter"/utf8>>, 65536},
                none,
                500
            );

        heavy_data_profile ->
            lightspeed@pipeline@connector:connector_plan(
                <<"orders_heavy_data_plan"/utf8>>,
                lightspeed@pipeline@connector:database_source(
                    <<"orders_logical_replica"/utf8>>,
                    <<"select * from orders_stream where updated_at >= :cursor"/utf8>>,
                    1200
                ),
                lightspeed@pipeline@connector:database_sink(
                    <<"orders_analytics_store"/utf8>>,
                    <<"orders_projection_v2"/utf8>>,
                    <<"order_id"/utf8>>
                ),
                {retry_policy, 4, 500, 4000},
                {dead_letter_policy,
                    true,
                    <<"orders.analytics.dead_letter"/utf8>>,
                    131072},
                {some,
                    {reprocess_window,
                        1,
                        10000,
                        <<"heavy_data_replay_window"/utf8>>}},
                2000
            );

        burst_recovery_profile ->
            lightspeed@pipeline@connector:connector_plan(
                <<"orders_burst_recovery_plan"/utf8>>,
                lightspeed@pipeline@connector:file_source(
                    <<"orders_compact_snapshot"/utf8>>,
                    <<"/var/lib/lightspeed/orders.ndjson"/utf8>>,
                    <<"ndjson"/utf8>>,
                    1000
                ),
                lightspeed@pipeline@connector:pubsub_sink(
                    <<"orders_recovery_bus"/utf8>>,
                    <<"orders.recovery"/utf8>>,
                    <<"tenant_order"/utf8>>
                ),
                {retry_policy, 5, 200, 2500},
                {dead_letter_policy,
                    true,
                    <<"orders.recovery.dead_letter"/utf8>>,
                    98304},
                {some,
                    {reprocess_window,
                        1,
                        15000,
                        <<"burst_recovery_window"/utf8>>}},
                1600
            )
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 592).
-spec default_pipeline_runtime() -> lightspeed@pipeline:runtime().
default_pipeline_runtime() ->
    lightspeed@pipeline:new(
        lightspeed@pipeline:pipeline(
            <<"orders_pipeline_m60"/utf8>>,
            {interval, 1000},
            [lightspeed@pipeline:source_stage(
                    <<"extract_orders"/utf8>>,
                    <<"orders.raw.v2"/utf8>>
                ),
                lightspeed@pipeline:transform_stage(
                    <<"normalize_orders"/utf8>>,
                    <<"orders.raw.v2"/utf8>>,
                    <<"orders.normalized.v2"/utf8>>
                ),
                lightspeed@pipeline:sink_stage(
                    <<"write_orders"/utf8>>,
                    <<"orders.normalized.v2"/utf8>>
                )]
        )
    ).

-file("src/lightspeed/pipeline/operations_surface.gleam", 95).
?DOC(" Build a profile-specific surface with default pipeline and connector plans.\n").
-spec new(binary(), adapter_profile(), lightspeed@tenant@policy:runtime()) -> surface().
new(Name, Adapter_profile, Tenant_runtime) ->
    Orchestration_runtime = lightspeed@pipeline@orchestrator:new(
        default_pipeline_runtime(),
        connector_plan_for_profile(Adapter_profile),
        backpressure_boundary_for_profile(Adapter_profile)
    ),
    {surface,
        Name,
        Adapter_profile,
        Orchestration_runtime,
        Tenant_runtime,
        {freshness_metadata, freshness_budget_ms(Adapter_profile), 0, 0, true},
        [],
        [],
        []}.

-file("src/lightspeed/pipeline/operations_surface.gleam", 86).
?DOC(" Build a default M60 operations surface.\n").
-spec default_surface() -> surface().
default_surface() ->
    new(
        <<"orders_etl_operations_surface"/utf8>>,
        heavy_data_profile,
        default_tenant_runtime()
    ).

-file("src/lightspeed/pipeline/operations_surface.gleam", 453).
?DOC(" Audit records in emit order.\n").
-spec audits(surface()) -> list(audit_record()).
audits(Surface) ->
    lists:reverse(erlang:element(9, Surface)).

-file("src/lightspeed/pipeline/operations_surface.gleam", 898).
-spec audits_valid(list(audit_record())) -> boolean().
audits_valid(Records) ->
    case Records of
        [] ->
            true;

        [Record | Rest] ->
            (((erlang:element(2, Record) /= <<""/utf8>>) andalso (erlang:element(
                3,
                Record
            )
            /= <<""/utf8>>))
            andalso (erlang:element(4, Record) >= 0))
            andalso audits_valid(Rest)
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 448).
?DOC(" Conformance records in emit order.\n").
-spec conformance_records(surface()) -> list(conformance_record()).
conformance_records(Surface) ->
    lists:reverse(erlang:element(8, Surface)).

-file("src/lightspeed/pipeline/operations_surface.gleam", 886).
-spec conformance_records_valid(list(conformance_record())) -> boolean().
conformance_records_valid(Records) ->
    case Records of
        [] ->
            true;

        [Record | Rest] ->
            ((((erlang:element(3, Record) /= <<""/utf8>>) andalso (erlang:element(
                4,
                Record
            )
            /= <<""/utf8>>))
            andalso (erlang:element(5, Record) /= <<""/utf8>>))
            andalso (erlang:element(6, Record) /= <<""/utf8>>))
            andalso conformance_records_valid(Rest)
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 443).
?DOC(" Lineage records in emit order.\n").
-spec lineage_records(surface()) -> list(lineage_record()).
lineage_records(Surface) ->
    lists:reverse(erlang:element(7, Surface)).

-file("src/lightspeed/pipeline/operations_surface.gleam", 871).
-spec lineage_records_valid(list(lineage_record())) -> boolean().
lineage_records_valid(Records) ->
    case Records of
        [] ->
            true;

        [Record | Rest] ->
            (((((((erlang:element(2, Record) /= <<""/utf8>>) andalso (erlang:element(
                3,
                Record
            )
            /= <<""/utf8>>))
            andalso (erlang:element(4, Record) /= <<""/utf8>>))
            andalso (erlang:element(5, Record) /= <<""/utf8>>))
            andalso (erlang:element(6, Record) /= <<""/utf8>>))
            andalso (erlang:element(7, Record) > 0))
            andalso (erlang:element(8, Record) /= <<""/utf8>>))
            andalso lineage_records_valid(Rest)
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 865).
-spec freshness_valid(freshness_metadata()) -> boolean().
freshness_valid(Freshness) ->
    ((erlang:element(2, Freshness) >= 0) andalso (erlang:element(3, Freshness)
    >= 0))
    andalso (erlang:element(4, Freshness) >= 0).

-file("src/lightspeed/pipeline/operations_surface.gleam", 125).
?DOC(" Validate one operations-surface contract.\n").
-spec valid(surface()) -> boolean().
valid(Surface) ->
    ((((((erlang:element(2, Surface) /= <<""/utf8>>) andalso lightspeed@pipeline@orchestrator:valid(
        erlang:element(4, Surface)
    ))
    andalso lightspeed@tenant@policy:valid(erlang:element(5, Surface)))
    andalso freshness_valid(erlang:element(6, Surface)))
    andalso lineage_records_valid(lineage_records(Surface)))
    andalso conformance_records_valid(conformance_records(Surface)))
    andalso audits_valid(audits(Surface)).

-file("src/lightspeed/pipeline/operations_surface.gleam", 926).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
    case Left >= Right of
        true ->
            Left;

        false ->
            Right
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 853).
-spec append_audit(surface(), binary(), binary(), integer()) -> surface().
append_audit(Surface, Action, Outcome, At_ms) ->
    {surface,
        erlang:element(2, Surface),
        erlang:element(3, Surface),
        erlang:element(4, Surface),
        erlang:element(5, Surface),
        erlang:element(6, Surface),
        erlang:element(7, Surface),
        erlang:element(8, Surface),
        [{audit_record, Action, Outcome, max(0, At_ms)} |
            erlang:element(9, Surface)]}.

-file("src/lightspeed/pipeline/operations_surface.gleam", 136).
?DOC(" Start one pipeline run with tenant policy gating.\n").
-spec start_run(surface(), integer()) -> {surface(),
    {ok, nil} | {error, binary()}}.
start_run(Surface, At_ms) ->
    Clamped_at_ms = max(0, At_ms),
    {Next_policy, Outcome} = lightspeed@tenant@policy:evaluate(
        erlang:element(5, Surface),
        {start_pipeline_run, 1}
    ),
    Policy_label = lightspeed@tenant@policy:outcome_label(Outcome),
    case Outcome of
        {allowed, _} ->
            Next_runtime = lightspeed@pipeline@orchestrator:start_run(
                erlang:element(4, Surface),
                Clamped_at_ms
            ),
            Next = begin
                _pipe = {surface,
                    erlang:element(2, Surface),
                    erlang:element(3, Surface),
                    Next_runtime,
                    Next_policy,
                    erlang:element(6, Surface),
                    erlang:element(7, Surface),
                    erlang:element(8, Surface),
                    erlang:element(9, Surface)},
                append_audit(
                    _pipe,
                    <<"start_run"/utf8>>,
                    <<"allowed:"/utf8, Policy_label/binary>>,
                    Clamped_at_ms
                )
            end,
            {Next, {ok, nil}};

        {denied, _} ->
            Next@1 = begin
                _pipe@1 = {surface,
                    erlang:element(2, Surface),
                    erlang:element(3, Surface),
                    erlang:element(4, Surface),
                    Next_policy,
                    erlang:element(6, Surface),
                    erlang:element(7, Surface),
                    erlang:element(8, Surface),
                    erlang:element(9, Surface)},
                append_audit(
                    _pipe@1,
                    <<"start_run"/utf8>>,
                    <<"denied:"/utf8, Policy_label/binary>>,
                    Clamped_at_ms
                )
            end,
            {Next@1, {error, <<"policy_denied:"/utf8, Policy_label/binary>>}}
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 480).
?DOC(" Runtime pipeline accessor.\n").
-spec pipeline_runtime(surface()) -> lightspeed@pipeline:runtime().
pipeline_runtime(Surface) ->
    lightspeed@pipeline@orchestrator:runtime_pipeline(
        erlang:element(4, Surface)
    ).

-file("src/lightspeed/pipeline/operations_surface.gleam", 458).
?DOC(" Pipeline checkpoints in emit order.\n").
-spec checkpoints(surface()) -> list(lightspeed@pipeline@checkpoint:checkpoint()).
checkpoints(Surface) ->
    lightspeed@pipeline:checkpoints(pipeline_runtime(Surface)).

-file("src/lightspeed/pipeline/operations_surface.gleam", 781).
-spec latest_sequence(list(lightspeed@pipeline@checkpoint:checkpoint())) -> integer().
latest_sequence(Entries) ->
    case Entries of
        [] ->
            0;

        [Entry | Rest] ->
            max(erlang:element(4, Entry), latest_sequence(Rest))
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 763).
-spec refresh_freshness_from_runtime(surface()) -> surface().
refresh_freshness_from_runtime(Surface) ->
    Runtime = pipeline_runtime(Surface),
    Telemetry = lightspeed@pipeline:runtime_telemetry(Runtime),
    Sequence = latest_sequence(checkpoints(Surface)),
    Max_allowed = erlang:element(2, erlang:element(6, Surface)),
    Observed_lag = erlang:element(2, Telemetry),
    Is_fresh = Observed_lag =< Max_allowed,
    Freshness = {freshness_metadata,
        Max_allowed,
        Observed_lag,
        Sequence,
        Is_fresh},
    {surface,
        erlang:element(2, Surface),
        erlang:element(3, Surface),
        erlang:element(4, Surface),
        erlang:element(5, Surface),
        Freshness,
        erlang:element(7, Surface),
        erlang:element(8, Surface),
        erlang:element(9, Surface)}.

-file("src/lightspeed/pipeline/operations_surface.gleam", 169).
?DOC(" Complete one pipeline run.\n").
-spec complete_run(surface(), integer()) -> surface().
complete_run(Surface, At_ms) ->
    Clamped_at_ms = max(0, At_ms),
    Next_runtime = lightspeed@pipeline@orchestrator:complete_run(
        erlang:element(4, Surface),
        Clamped_at_ms
    ),
    _pipe = {surface,
        erlang:element(2, Surface),
        erlang:element(3, Surface),
        Next_runtime,
        erlang:element(5, Surface),
        erlang:element(6, Surface),
        erlang:element(7, Surface),
        erlang:element(8, Surface),
        erlang:element(9, Surface)},
    _pipe@1 = refresh_freshness_from_runtime(_pipe),
    append_audit(
        _pipe@1,
        <<"complete_run"/utf8>>,
        <<"completed"/utf8>>,
        Clamped_at_ms
    ).

-file("src/lightspeed/pipeline/operations_surface.gleam", 816).
-spec control_policy_action(lightspeed@pipeline@operator:action()) -> gleam@option:option(lightspeed@tenant@policy:action()).
control_policy_action(Action) ->
    case Action of
        {pause, _} ->
            {some, {apply_mitigation, pause_pipelines, 1}};

        {drain, _} ->
            {some, {apply_mitigation, pause_pipelines, 1}};

        {replay, _, _} ->
            {some, {replay_pipeline_run, 1}};

        {resume, _} ->
            none
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 797).
-spec evaluate_control_policy(
    lightspeed@tenant@policy:runtime(),
    lightspeed@pipeline@operator:action()
) -> {lightspeed@tenant@policy:runtime(), {ok, nil} | {error, binary()}}.
evaluate_control_policy(Runtime, Action) ->
    case control_policy_action(Action) of
        none ->
            {Runtime, {ok, nil}};

        {some, Policy_action} ->
            {Next_runtime, Outcome} = lightspeed@tenant@policy:evaluate(
                Runtime,
                Policy_action
            ),
            case Outcome of
                {allowed, _} ->
                    {Next_runtime, {ok, nil}};

                {denied, _} ->
                    {Next_runtime,
                        {error, lightspeed@tenant@policy:outcome_label(Outcome)}}
            end
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 180).
?DOC(" Apply one operator control action with tenant-aware replay/mitigation checks.\n").
-spec apply_control(surface(), lightspeed@pipeline@operator:action(), integer()) -> {surface(),
    {ok, nil} | {error, binary()}}.
apply_control(Surface, Action, At_ms) ->
    Clamped_at_ms = max(0, At_ms),
    {Policy_runtime, Policy_result} = evaluate_control_policy(
        erlang:element(5, Surface),
        Action
    ),
    Base_surface = {surface,
        erlang:element(2, Surface),
        erlang:element(3, Surface),
        erlang:element(4, Surface),
        Policy_runtime,
        erlang:element(6, Surface),
        erlang:element(7, Surface),
        erlang:element(8, Surface),
        erlang:element(9, Surface)},
    Action_label = <<"control:"/utf8,
        (lightspeed@pipeline@operator:action_label(Action))/binary>>,
    case Policy_result of
        {error, Reason} ->
            Next = begin
                _pipe = Base_surface,
                append_audit(
                    _pipe,
                    Action_label,
                    <<"denied:"/utf8, Reason/binary>>,
                    Clamped_at_ms
                )
            end,
            {Next, {error, <<"policy_denied:"/utf8, Reason/binary>>}};

        {ok, nil} ->
            case lightspeed@pipeline@orchestrator:apply_action(
                erlang:element(4, Base_surface),
                Action,
                Clamped_at_ms
            ) of
                {Next_runtime, {ok, _}} ->
                    Next@1 = begin
                        _pipe@1 = {surface,
                            erlang:element(2, Base_surface),
                            erlang:element(3, Base_surface),
                            Next_runtime,
                            erlang:element(5, Base_surface),
                            erlang:element(6, Base_surface),
                            erlang:element(7, Base_surface),
                            erlang:element(8, Base_surface),
                            erlang:element(9, Base_surface)},
                        _pipe@2 = refresh_freshness_from_runtime(_pipe@1),
                        append_audit(
                            _pipe@2,
                            Action_label,
                            <<"applied"/utf8>>,
                            Clamped_at_ms
                        )
                    end,
                    {Next@1, {ok, nil}};

                {_, {error, Reason@1}} ->
                    Next@2 = begin
                        _pipe@3 = Base_surface,
                        append_audit(
                            _pipe@3,
                            Action_label,
                            <<"rejected:"/utf8, Reason@1/binary>>,
                            Clamped_at_ms
                        )
                    end,
                    {Next@2, {error, Reason@1}}
            end
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 224).
?DOC(" Enqueue one batch into the ETL runtime operations surface.\n").
-spec enqueue_batch(
    surface(),
    binary(),
    binary(),
    integer(),
    integer(),
    binary(),
    integer()
) -> {surface(), {ok, nil} | {error, binary()}}.
enqueue_batch(Surface, Key, Stage, Records, Lag_ms, Idempotency_key, Sequence) ->
    {Next_runtime, Result} = lightspeed@pipeline@orchestrator:enqueue_batch(
        erlang:element(4, Surface),
        Key,
        Stage,
        Records,
        Lag_ms,
        Idempotency_key,
        Sequence
    ),
    Next = {surface,
        erlang:element(2, Surface),
        erlang:element(3, Surface),
        Next_runtime,
        erlang:element(5, Surface),
        erlang:element(6, Surface),
        erlang:element(7, Surface),
        erlang:element(8, Surface),
        erlang:element(9, Surface)},
    case Result of
        {ok, _} ->
            {begin
                    _pipe = Next,
                    append_audit(
                        _pipe,
                        <<"enqueue:"/utf8, Key/binary>>,
                        <<<<<<"queued:"/utf8, Stage/binary>>/binary, ":"/utf8>>/binary,
                            (erlang:integer_to_binary(Records))/binary>>,
                        max(0, Sequence)
                    )
                end,
                {ok, nil}};

        {error, Reason} ->
            {begin
                    _pipe@1 = Next,
                    append_audit(
                        _pipe@1,
                        <<"enqueue:"/utf8, Key/binary>>,
                        <<"rejected:"/utf8, Reason/binary>>,
                        max(0, Sequence)
                    )
                end,
                {error, Reason}}
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 916).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [First | Rest] ->
            gleam@list:fold(
                Rest,
                First,
                fun(Accumulator, Value) ->
                    <<<<Accumulator/binary, Separator/binary>>/binary,
                        Value/binary>>
                end
            )
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 268).
?DOC(" Start as many queued batches as allowed by backpressure boundaries.\n").
-spec start_available(surface(), integer()) -> {surface(), list(binary())}.
start_available(Surface, At_ms) ->
    Clamped_at_ms = max(0, At_ms),
    {Next_runtime, Started} = lightspeed@pipeline@orchestrator:start_available(
        erlang:element(4, Surface),
        Clamped_at_ms
    ),
    Next = begin
        _pipe = {surface,
            erlang:element(2, Surface),
            erlang:element(3, Surface),
            Next_runtime,
            erlang:element(5, Surface),
            erlang:element(6, Surface),
            erlang:element(7, Surface),
            erlang:element(8, Surface),
            erlang:element(9, Surface)},
        append_audit(
            _pipe,
            <<"start_available"/utf8>>,
            <<"started="/utf8, (join_with(<<","/utf8>>, Started))/binary>>,
            Clamped_at_ms
        )
    end,
    {Next, Started}.

-file("src/lightspeed/pipeline/operations_surface.gleam", 788).
-spec latest_checkpoint(list(lightspeed@pipeline@checkpoint:checkpoint())) -> gleam@option:option(lightspeed@pipeline@checkpoint:checkpoint()).
latest_checkpoint(Entries) ->
    case lists:reverse(Entries) of
        [] ->
            none;

        [Entry | _] ->
            {some, Entry}
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 722).
-spec apply_outcome_metadata(
    surface(),
    lightspeed@pipeline@orchestrator:process_outcome()
) -> surface().
apply_outcome_metadata(Surface, Outcome) ->
    case Outcome of
        {processed, Result} ->
            case gleam_stdlib:string_starts_with(
                lightspeed@pipeline:process_result_label(Result),
                <<"applied:"/utf8>>
            ) of
                false ->
                    Surface;

                true ->
                    case latest_checkpoint(checkpoints(Surface)) of
                        none ->
                            Surface;

                        {some, Entry} ->
                            Plan = lightspeed@pipeline@orchestrator:runtime_connector_plan(
                                erlang:element(4, Surface)
                            ),
                            Record = {lineage_record,
                                erlang:element(2, Entry),
                                erlang:element(3, Entry),
                                lightspeed@pipeline@connector:source_label(
                                    lightspeed@pipeline@connector:source(Plan)
                                ),
                                lightspeed@pipeline@connector:sink_label(
                                    lightspeed@pipeline@connector:sink(Plan)
                                ),
                                lightspeed@pipeline@checkpoint:watermark_label(
                                    erlang:element(5, Entry)
                                ),
                                erlang:element(4, Entry),
                                lightspeed@pipeline@checkpoint:checkpoint_label(
                                    Entry
                                )},
                            {surface,
                                erlang:element(2, Surface),
                                erlang:element(3, Surface),
                                erlang:element(4, Surface),
                                erlang:element(5, Surface),
                                erlang:element(6, Surface),
                                [Record | erlang:element(7, Surface)],
                                erlang:element(8, Surface),
                                erlang:element(9, Surface)}
                    end
            end;

        _ ->
            Surface
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 287).
?DOC(" Acknowledge one processed batch and update lineage/freshness metadata.\n").
-spec acknowledge_batch(surface(), binary(), integer()) -> {surface(),
    {ok, lightspeed@pipeline@orchestrator:process_outcome()} | {error, binary()}}.
acknowledge_batch(Surface, Key, At_ms) ->
    Clamped_at_ms = max(0, At_ms),
    {Next_runtime, Result} = lightspeed@pipeline@orchestrator:ack_batch(
        erlang:element(4, Surface),
        Key,
        Clamped_at_ms
    ),
    Next = {surface,
        erlang:element(2, Surface),
        erlang:element(3, Surface),
        Next_runtime,
        erlang:element(5, Surface),
        erlang:element(6, Surface),
        erlang:element(7, Surface),
        erlang:element(8, Surface),
        erlang:element(9, Surface)},
    case Result of
        {error, Reason} ->
            {begin
                    _pipe = Next,
                    append_audit(
                        _pipe,
                        <<"ack:"/utf8, Key/binary>>,
                        <<"rejected:"/utf8, Reason/binary>>,
                        Clamped_at_ms
                    )
                end,
                {error, Reason}};

        {ok, Outcome} ->
            Next@1 = begin
                _pipe@1 = Next,
                _pipe@2 = apply_outcome_metadata(_pipe@1, Outcome),
                _pipe@3 = refresh_freshness_from_runtime(_pipe@2),
                append_audit(
                    _pipe@3,
                    <<"ack:"/utf8, Key/binary>>,
                    lightspeed@pipeline@orchestrator:process_outcome_label(
                        Outcome
                    ),
                    Clamped_at_ms
                )
            end,
            {Next@1, {ok, Outcome}}
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 320).
?DOC(" Fail one batch and apply retry/dead-letter policies.\n").
-spec fail_batch(surface(), binary(), integer(), binary(), integer()) -> {surface(),
    {ok, lightspeed@pipeline@orchestrator:process_outcome()} | {error, binary()}}.
fail_batch(Surface, Key, Attempt, Reason, At_ms) ->
    Clamped_at_ms = max(0, At_ms),
    {Next_runtime, Result} = lightspeed@pipeline@orchestrator:fail_batch(
        erlang:element(4, Surface),
        Key,
        Attempt,
        Reason,
        Clamped_at_ms
    ),
    Next = {surface,
        erlang:element(2, Surface),
        erlang:element(3, Surface),
        Next_runtime,
        erlang:element(5, Surface),
        erlang:element(6, Surface),
        erlang:element(7, Surface),
        erlang:element(8, Surface),
        erlang:element(9, Surface)},
    case Result of
        {error, Error_reason} ->
            {begin
                    _pipe = Next,
                    append_audit(
                        _pipe,
                        <<"fail:"/utf8, Key/binary>>,
                        <<"rejected:"/utf8, Error_reason/binary>>,
                        Clamped_at_ms
                    )
                end,
                {error, Error_reason}};

        {ok, Outcome} ->
            Next@1 = begin
                _pipe@1 = Next,
                _pipe@2 = refresh_freshness_from_runtime(_pipe@1),
                append_audit(
                    _pipe@2,
                    <<"fail:"/utf8, Key/binary>>,
                    lightspeed@pipeline@orchestrator:process_outcome_label(
                        Outcome
                    ),
                    Clamped_at_ms
                )
            end,
            {Next@1, {ok, Outcome}}
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 846).
-spec all_conformance_passed(list(conformance_record())) -> boolean().
all_conformance_passed(Records) ->
    case Records of
        [] ->
            true;

        [Record | Rest] ->
            erlang:element(7, Record) andalso all_conformance_passed(Rest)
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 909).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
    case Value of
        true ->
            <<"true"/utf8>>;

        false ->
            <<"false"/utf8>>
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 827).
-spec budget_result_for_workload(
    list(lightspeed@data_plane_huge:budget_result()),
    binary()
) -> lightspeed@data_plane_huge:budget_result().
budget_result_for_workload(Results, Workload) ->
    case Results of
        [] ->
            {budget_result, Workload, false, <<"missing_budget_result"/utf8>>};

        [Result | Rest] ->
            case erlang:element(2, Result) =:= Workload of
                true ->
                    Result;

                false ->
                    budget_result_for_workload(Rest, Workload)
            end
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 364).
?DOC(" Certify source/sink conformance across heavy-data adapter profiles.\n").
-spec certify_heavy_data_conformance(surface()) -> surface().
certify_heavy_data_conformance(Surface) ->
    Plan = lightspeed@pipeline@orchestrator:runtime_connector_plan(
        erlang:element(4, Surface)
    ),
    Source_signature = lightspeed@pipeline@connector:source_label(
        lightspeed@pipeline@connector:source(Plan)
    ),
    Sink_signature = lightspeed@pipeline@connector:sink_label(
        lightspeed@pipeline@connector:sink(Plan)
    ),
    Benchmarks = lightspeed@data_plane_huge:run_huge_benchmarks(),
    Budget_results = lightspeed@data_plane_huge:evaluate_huge_budgets(
        Benchmarks,
        lightspeed@data_plane_huge:default_huge_budgets()
    ),
    Records = gleam@list:map(
        Benchmarks,
        fun(Benchmark) ->
            Budget_result = budget_result_for_workload(
                Budget_results,
                erlang:element(2, Benchmark)
            ),
            {conformance_record,
                erlang:element(3, Surface),
                Source_signature,
                Sink_signature,
                lightspeed@data_plane_huge:benchmark_signature(Benchmark),
                lightspeed@data_plane_huge:budget_result_signature(
                    Budget_result
                ),
                erlang:element(3, Budget_result)}
        end
    ),
    Passed_label = bool_label(all_conformance_passed(Records)),
    {surface,
        erlang:element(2, Surface),
        erlang:element(3, Surface),
        erlang:element(4, Surface),
        erlang:element(5, Surface),
        erlang:element(6, Surface),
        erlang:element(7, Surface),
        lists:reverse(Records),
        [{audit_record,
                <<"certify_heavy_data_conformance"/utf8>>,
                <<"passed="/utf8, Passed_label/binary>>,
                erlang:length(Records)} |
            erlang:element(9, Surface)]}.

-file("src/lightspeed/pipeline/operations_surface.gleam", 403).
?DOC(" `True` when heavy-data conformance evidence is present and all records pass.\n").
-spec conformance_passed(surface()) -> boolean().
conformance_passed(Surface) ->
    Records = conformance_records(Surface),
    case Records of
        [] ->
            false;

        _ ->
            all_conformance_passed(Records)
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 412).
?DOC(" Surface name accessor.\n").
-spec name(surface()) -> binary().
name(Surface) ->
    erlang:element(2, Surface).

-file("src/lightspeed/pipeline/operations_surface.gleam", 417).
?DOC(" Adapter-profile label.\n").
-spec adapter_profile_label(adapter_profile()) -> binary().
adapter_profile_label(Profile) ->
    case Profile of
        standard_profile ->
            <<"standard"/utf8>>;

        heavy_data_profile ->
            <<"heavy_data"/utf8>>;

        burst_recovery_profile ->
            <<"burst_recovery"/utf8>>
    end.

-file("src/lightspeed/pipeline/operations_surface.gleam", 426).
?DOC(" Freshness metadata accessor.\n").
-spec freshness(surface()) -> freshness_metadata().
freshness(Surface) ->
    erlang:element(6, Surface).

-file("src/lightspeed/pipeline/operations_surface.gleam", 431).
?DOC(" Freshness metadata label.\n").
-spec freshness_label(freshness_metadata()) -> binary().
freshness_label(Freshness) ->
    <<<<<<<<<<<<<<"max_allowed_lag_ms="/utf8,
                                (erlang:integer_to_binary(
                                    erlang:element(2, Freshness)
                                ))/binary>>/binary,
                            "|observed_lag_ms="/utf8>>/binary,
                        (erlang:integer_to_binary(erlang:element(3, Freshness)))/binary>>/binary,
                    "|last_sequence="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(4, Freshness)))/binary>>/binary,
            "|is_fresh="/utf8>>/binary,
        (bool_label(erlang:element(5, Freshness)))/binary>>.

-file("src/lightspeed/pipeline/operations_surface.gleam", 463).
?DOC(" Sink idempotency keys in emit order.\n").
-spec sink_idempotency_keys(surface()) -> list(binary()).
sink_idempotency_keys(Surface) ->
    lightspeed@pipeline:sink_idempotency_keys(pipeline_runtime(Surface)).

-file("src/lightspeed/pipeline/operations_surface.gleam", 468).
?DOC(" Pipeline runtime telemetry label.\n").
-spec telemetry_label(surface()) -> binary().
telemetry_label(Surface) ->
    lightspeed@pipeline@telemetry:label(
        lightspeed@pipeline:runtime_telemetry(pipeline_runtime(Surface))
    ).

-file("src/lightspeed/pipeline/operations_surface.gleam", 475).
?DOC(" Pipeline lifecycle label.\n").
-spec lifecycle_label(surface()) -> binary().
lifecycle_label(Surface) ->
    lightspeed@pipeline:lifecycle_label(
        lightspeed@pipeline:lifecycle(pipeline_runtime(Surface))
    ).

-file("src/lightspeed/pipeline/operations_surface.gleam", 485).
?DOC(" Runtime policy accessor.\n").
-spec tenant_runtime(surface()) -> lightspeed@tenant@policy:runtime().
tenant_runtime(Surface) ->
    erlang:element(5, Surface).

-file("src/lightspeed/pipeline/operations_surface.gleam", 545).
?DOC(" Stable audit signature.\n").
-spec audit_signature(audit_record()) -> binary().
audit_signature(Record) ->
    <<<<<<<<<<"action="/utf8, (erlang:element(2, Record))/binary>>/binary,
                    "|outcome="/utf8>>/binary,
                (erlang:element(3, Record))/binary>>/binary,
            "|at_ms="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(4, Record)))/binary>>.

-file("src/lightspeed/pipeline/operations_surface.gleam", 529).
?DOC(" Stable conformance signature.\n").
-spec conformance_signature(conformance_record()) -> binary().
conformance_signature(Record) ->
    <<<<<<<<<<<<<<<<<<<<<<"profile="/utf8,
                                                (adapter_profile_label(
                                                    erlang:element(2, Record)
                                                ))/binary>>/binary,
                                            "|source="/utf8>>/binary,
                                        (erlang:element(3, Record))/binary>>/binary,
                                    "|sink="/utf8>>/binary,
                                (erlang:element(4, Record))/binary>>/binary,
                            "|benchmark="/utf8>>/binary,
                        (erlang:element(5, Record))/binary>>/binary,
                    "|budget="/utf8>>/binary,
                (erlang:element(6, Record))/binary>>/binary,
            "|passed="/utf8>>/binary,
        (bool_label(erlang:element(7, Record)))/binary>>.

-file("src/lightspeed/pipeline/operations_surface.gleam", 511).
?DOC(" Stable lineage signature.\n").
-spec lineage_signature(lineage_record()) -> binary().
lineage_signature(Record) ->
    <<<<<<<<<<<<<<<<<<<<<<<<<<"run="/utf8, (erlang:element(2, Record))/binary>>/binary,
                                                    "|stage="/utf8>>/binary,
                                                (erlang:element(3, Record))/binary>>/binary,
                                            "|source="/utf8>>/binary,
                                        (erlang:element(4, Record))/binary>>/binary,
                                    "|sink="/utf8>>/binary,
                                (erlang:element(5, Record))/binary>>/binary,
                            "|watermark="/utf8>>/binary,
                        (erlang:element(6, Record))/binary>>/binary,
                    "|sequence="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(7, Record)))/binary>>/binary,
            "|checkpoint="/utf8>>/binary,
        (erlang:element(8, Record))/binary>>.

-file("src/lightspeed/pipeline/operations_surface.gleam", 490).
?DOC(" Surface signature.\n").
-spec signature(surface()) -> binary().
signature(Surface) ->
    <<<<<<<<<<<<<<<<<<<<<<<<<<"profile="/utf8,
                                                        (adapter_profile_label(
                                                            erlang:element(
                                                                3,
                                                                Surface
                                                            )
                                                        ))/binary>>/binary,
                                                    "|orchestrator="/utf8>>/binary,
                                                (lightspeed@pipeline@orchestrator:signature(
                                                    erlang:element(4, Surface)
                                                ))/binary>>/binary,
                                            "|policy="/utf8>>/binary,
                                        (lightspeed@tenant@policy:signature(
                                            erlang:element(5, Surface)
                                        ))/binary>>/binary,
                                    "|freshness="/utf8>>/binary,
                                (freshness_label(erlang:element(6, Surface)))/binary>>/binary,
                            "|lineage="/utf8>>/binary,
                        (join_with(
                            <<";"/utf8>>,
                            gleam@list:map(
                                lineage_records(Surface),
                                fun lineage_signature/1
                            )
                        ))/binary>>/binary,
                    "|conformance="/utf8>>/binary,
                (join_with(
                    <<";"/utf8>>,
                    gleam@list:map(
                        conformance_records(Surface),
                        fun conformance_signature/1
                    )
                ))/binary>>/binary,
            "|audits="/utf8>>/binary,
        (join_with(
            <<";"/utf8>>,
            gleam@list:map(audits(Surface), fun audit_signature/1)
        ))/binary>>.

-file("src/lightspeed/pipeline/operations_surface.gleam", 555).
?DOC(" Deterministic fixture snapshots for M60 drift gates.\n").
-spec fixture_snapshots() -> list({binary(), binary()}).
fixture_snapshots() ->
    Baseline = begin
        _pipe = default_surface(),
        certify_heavy_data_conformance(_pipe)
    end,
    [{<<"default_surface"/utf8>>, signature(Baseline)},
        {<<"strict_policy_surface"/utf8>>,
            signature(
                new(
                    <<"strict_policy_surface"/utf8>>,
                    standard_profile,
                    lightspeed@tenant@policy:new(
                        lightspeed@tenant@policy:tenant_context(
                            <<"ops-strict"/utf8>>,
                            <<"tenant-orders"/utf8>>,
                            editor
                        ),
                        lightspeed@tenant@policy:expanded_budget(
                            2,
                            1,
                            1,
                            1,
                            0,
                            0
                        )
                    )
                )
            )},
        {<<"burst_recovery_surface"/utf8>>,
            signature(
                new(
                    <<"burst_recovery_surface"/utf8>>,
                    burst_recovery_profile,
                    default_tenant_runtime()
                )
            )}].

-file("src/lightspeed/pipeline/operations_surface.gleam", 582).
?DOC(" Deterministic M60 snapshot signature.\n").
-spec snapshot_signature() -> binary().
snapshot_signature() ->
    Entries = gleam@list:map(
        fixture_snapshots(),
        fun(Entry) ->
            {Label, Value} = Entry,
            <<<<Label/binary, "="/utf8>>/binary, Value/binary>>
        end
    ),
    <<<<(profile_version_label())/binary, "|"/utf8>>/binary,
        (join_with(<<";"/utf8>>, Entries))/binary>>.