-module(lightspeed@pipeline@operations_surface).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/operations_surface.gleam").
-export([profile_version_label/0, new/3, default_surface/0, audits/1, conformance_records/1, lineage_records/1, valid/1, start_run/2, pipeline_runtime/1, checkpoints/1, complete_run/2, apply_control/3, enqueue_batch/7, start_available/2, acknowledge_batch/3, fail_batch/5, certify_heavy_data_conformance/1, conformance_passed/1, name/1, adapter_profile_label/1, freshness/1, freshness_label/1, sink_idempotency_keys/1, telemetry_label/1, lifecycle_label/1, tenant_runtime/1, audit_signature/1, conformance_signature/1, lineage_signature/1, signature/1, fixture_snapshots/0, snapshot_signature/0]).
-export_type([adapter_profile/0, freshness_metadata/0, lineage_record/0, conformance_record/0, audit_record/0, surface/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Integrated data-pipeline and ETL runtime operations surface contracts (M60).\n").
-type adapter_profile() :: standard_profile |
heavy_data_profile |
burst_recovery_profile.
-type freshness_metadata() :: {freshness_metadata,
integer(),
integer(),
integer(),
boolean()}.
-type lineage_record() :: {lineage_record,
binary(),
binary(),
binary(),
binary(),
binary(),
integer(),
binary()}.
-type conformance_record() :: {conformance_record,
adapter_profile(),
binary(),
binary(),
binary(),
binary(),
boolean()}.
-type audit_record() :: {audit_record, binary(), binary(), integer()}.
-opaque surface() :: {surface,
binary(),
adapter_profile(),
lightspeed@pipeline@orchestrator:runtime(),
lightspeed@tenant@policy:runtime(),
freshness_metadata(),
list(lineage_record()),
list(conformance_record()),
list(audit_record())}.
-file("src/lightspeed/pipeline/operations_surface.gleam", 81).
?DOC(" Stable M60 profile version label.\n").
-spec profile_version_label() -> binary().
profile_version_label() ->
<<"m60.profile.v"/utf8, (erlang:integer_to_binary(1))/binary>>.
-file("src/lightspeed/pipeline/operations_surface.gleam", 707).
-spec default_tenant_runtime() -> lightspeed@tenant@policy:runtime().
default_tenant_runtime() ->
lightspeed@tenant@policy:new(
lightspeed@tenant@policy:tenant_context(
<<"ops-bot"/utf8>>,
<<"tenant-orders"/utf8>>,
tenant_admin
),
lightspeed@tenant@policy:expanded_budget(16, 6, 8, 12, 6, 10)
).
-file("src/lightspeed/pipeline/operations_surface.gleam", 714).
-spec freshness_budget_ms(adapter_profile()) -> integer().
freshness_budget_ms(Profile) ->
case Profile of
standard_profile ->
120000;
heavy_data_profile ->
180000;
burst_recovery_profile ->
240000
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 696).
-spec backpressure_boundary_for_profile(adapter_profile()) -> lightspeed@async@backpressure:boundary().
backpressure_boundary_for_profile(Profile) ->
case Profile of
standard_profile ->
lightspeed@async@backpressure:default_boundary();
heavy_data_profile ->
lightspeed@async@backpressure:boundary(push_pull, 5, 12, 400);
burst_recovery_profile ->
lightspeed@async@backpressure:boundary(pull_only, 6, 16, 350)
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 610).
-spec connector_plan_for_profile(adapter_profile()) -> lightspeed@pipeline@connector:connector_plan().
connector_plan_for_profile(Profile) ->
case Profile of
standard_profile ->
lightspeed@pipeline@connector:connector_plan(
<<"orders_standard_plan"/utf8>>,
lightspeed@pipeline@connector:queue_source(
<<"orders_queue"/utf8>>,
<<"orders.raw"/utf8>>,
6
),
lightspeed@pipeline@connector:database_sink(
<<"orders_store"/utf8>>,
<<"orders_projection"/utf8>>,
<<"order_id"/utf8>>
),
{retry_policy, 3, 250, 2000},
{dead_letter_policy, true, <<"orders.dead_letter"/utf8>>, 65536},
none,
500
);
heavy_data_profile ->
lightspeed@pipeline@connector:connector_plan(
<<"orders_heavy_data_plan"/utf8>>,
lightspeed@pipeline@connector:database_source(
<<"orders_logical_replica"/utf8>>,
<<"select * from orders_stream where updated_at >= :cursor"/utf8>>,
1200
),
lightspeed@pipeline@connector:database_sink(
<<"orders_analytics_store"/utf8>>,
<<"orders_projection_v2"/utf8>>,
<<"order_id"/utf8>>
),
{retry_policy, 4, 500, 4000},
{dead_letter_policy,
true,
<<"orders.analytics.dead_letter"/utf8>>,
131072},
{some,
{reprocess_window,
1,
10000,
<<"heavy_data_replay_window"/utf8>>}},
2000
);
burst_recovery_profile ->
lightspeed@pipeline@connector:connector_plan(
<<"orders_burst_recovery_plan"/utf8>>,
lightspeed@pipeline@connector:file_source(
<<"orders_compact_snapshot"/utf8>>,
<<"/var/lib/lightspeed/orders.ndjson"/utf8>>,
<<"ndjson"/utf8>>,
1000
),
lightspeed@pipeline@connector:pubsub_sink(
<<"orders_recovery_bus"/utf8>>,
<<"orders.recovery"/utf8>>,
<<"tenant_order"/utf8>>
),
{retry_policy, 5, 200, 2500},
{dead_letter_policy,
true,
<<"orders.recovery.dead_letter"/utf8>>,
98304},
{some,
{reprocess_window,
1,
15000,
<<"burst_recovery_window"/utf8>>}},
1600
)
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 592).
-spec default_pipeline_runtime() -> lightspeed@pipeline:runtime().
default_pipeline_runtime() ->
lightspeed@pipeline:new(
lightspeed@pipeline:pipeline(
<<"orders_pipeline_m60"/utf8>>,
{interval, 1000},
[lightspeed@pipeline:source_stage(
<<"extract_orders"/utf8>>,
<<"orders.raw.v2"/utf8>>
),
lightspeed@pipeline:transform_stage(
<<"normalize_orders"/utf8>>,
<<"orders.raw.v2"/utf8>>,
<<"orders.normalized.v2"/utf8>>
),
lightspeed@pipeline:sink_stage(
<<"write_orders"/utf8>>,
<<"orders.normalized.v2"/utf8>>
)]
)
).
-file("src/lightspeed/pipeline/operations_surface.gleam", 95).
?DOC(" Build a profile-specific surface with default pipeline and connector plans.\n").
-spec new(binary(), adapter_profile(), lightspeed@tenant@policy:runtime()) -> surface().
new(Name, Adapter_profile, Tenant_runtime) ->
Orchestration_runtime = lightspeed@pipeline@orchestrator:new(
default_pipeline_runtime(),
connector_plan_for_profile(Adapter_profile),
backpressure_boundary_for_profile(Adapter_profile)
),
{surface,
Name,
Adapter_profile,
Orchestration_runtime,
Tenant_runtime,
{freshness_metadata, freshness_budget_ms(Adapter_profile), 0, 0, true},
[],
[],
[]}.
-file("src/lightspeed/pipeline/operations_surface.gleam", 86).
?DOC(" Build a default M60 operations surface.\n").
-spec default_surface() -> surface().
default_surface() ->
new(
<<"orders_etl_operations_surface"/utf8>>,
heavy_data_profile,
default_tenant_runtime()
).
-file("src/lightspeed/pipeline/operations_surface.gleam", 453).
?DOC(" Audit records in emit order.\n").
-spec audits(surface()) -> list(audit_record()).
audits(Surface) ->
lists:reverse(erlang:element(9, Surface)).
-file("src/lightspeed/pipeline/operations_surface.gleam", 898).
-spec audits_valid(list(audit_record())) -> boolean().
audits_valid(Records) ->
case Records of
[] ->
true;
[Record | Rest] ->
(((erlang:element(2, Record) /= <<""/utf8>>) andalso (erlang:element(
3,
Record
)
/= <<""/utf8>>))
andalso (erlang:element(4, Record) >= 0))
andalso audits_valid(Rest)
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 448).
?DOC(" Conformance records in emit order.\n").
-spec conformance_records(surface()) -> list(conformance_record()).
conformance_records(Surface) ->
lists:reverse(erlang:element(8, Surface)).
-file("src/lightspeed/pipeline/operations_surface.gleam", 886).
-spec conformance_records_valid(list(conformance_record())) -> boolean().
conformance_records_valid(Records) ->
case Records of
[] ->
true;
[Record | Rest] ->
((((erlang:element(3, Record) /= <<""/utf8>>) andalso (erlang:element(
4,
Record
)
/= <<""/utf8>>))
andalso (erlang:element(5, Record) /= <<""/utf8>>))
andalso (erlang:element(6, Record) /= <<""/utf8>>))
andalso conformance_records_valid(Rest)
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 443).
?DOC(" Lineage records in emit order.\n").
-spec lineage_records(surface()) -> list(lineage_record()).
lineage_records(Surface) ->
lists:reverse(erlang:element(7, Surface)).
-file("src/lightspeed/pipeline/operations_surface.gleam", 871).
-spec lineage_records_valid(list(lineage_record())) -> boolean().
lineage_records_valid(Records) ->
case Records of
[] ->
true;
[Record | Rest] ->
(((((((erlang:element(2, Record) /= <<""/utf8>>) andalso (erlang:element(
3,
Record
)
/= <<""/utf8>>))
andalso (erlang:element(4, Record) /= <<""/utf8>>))
andalso (erlang:element(5, Record) /= <<""/utf8>>))
andalso (erlang:element(6, Record) /= <<""/utf8>>))
andalso (erlang:element(7, Record) > 0))
andalso (erlang:element(8, Record) /= <<""/utf8>>))
andalso lineage_records_valid(Rest)
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 865).
-spec freshness_valid(freshness_metadata()) -> boolean().
freshness_valid(Freshness) ->
((erlang:element(2, Freshness) >= 0) andalso (erlang:element(3, Freshness)
>= 0))
andalso (erlang:element(4, Freshness) >= 0).
-file("src/lightspeed/pipeline/operations_surface.gleam", 125).
?DOC(" Validate one operations-surface contract.\n").
-spec valid(surface()) -> boolean().
valid(Surface) ->
((((((erlang:element(2, Surface) /= <<""/utf8>>) andalso lightspeed@pipeline@orchestrator:valid(
erlang:element(4, Surface)
))
andalso lightspeed@tenant@policy:valid(erlang:element(5, Surface)))
andalso freshness_valid(erlang:element(6, Surface)))
andalso lineage_records_valid(lineage_records(Surface)))
andalso conformance_records_valid(conformance_records(Surface)))
andalso audits_valid(audits(Surface)).
-file("src/lightspeed/pipeline/operations_surface.gleam", 926).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
case Left >= Right of
true ->
Left;
false ->
Right
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 853).
-spec append_audit(surface(), binary(), binary(), integer()) -> surface().
append_audit(Surface, Action, Outcome, At_ms) ->
{surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
erlang:element(4, Surface),
erlang:element(5, Surface),
erlang:element(6, Surface),
erlang:element(7, Surface),
erlang:element(8, Surface),
[{audit_record, Action, Outcome, max(0, At_ms)} |
erlang:element(9, Surface)]}.
-file("src/lightspeed/pipeline/operations_surface.gleam", 136).
?DOC(" Start one pipeline run with tenant policy gating.\n").
-spec start_run(surface(), integer()) -> {surface(),
{ok, nil} | {error, binary()}}.
start_run(Surface, At_ms) ->
Clamped_at_ms = max(0, At_ms),
{Next_policy, Outcome} = lightspeed@tenant@policy:evaluate(
erlang:element(5, Surface),
{start_pipeline_run, 1}
),
Policy_label = lightspeed@tenant@policy:outcome_label(Outcome),
case Outcome of
{allowed, _} ->
Next_runtime = lightspeed@pipeline@orchestrator:start_run(
erlang:element(4, Surface),
Clamped_at_ms
),
Next = begin
_pipe = {surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
Next_runtime,
Next_policy,
erlang:element(6, Surface),
erlang:element(7, Surface),
erlang:element(8, Surface),
erlang:element(9, Surface)},
append_audit(
_pipe,
<<"start_run"/utf8>>,
<<"allowed:"/utf8, Policy_label/binary>>,
Clamped_at_ms
)
end,
{Next, {ok, nil}};
{denied, _} ->
Next@1 = begin
_pipe@1 = {surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
erlang:element(4, Surface),
Next_policy,
erlang:element(6, Surface),
erlang:element(7, Surface),
erlang:element(8, Surface),
erlang:element(9, Surface)},
append_audit(
_pipe@1,
<<"start_run"/utf8>>,
<<"denied:"/utf8, Policy_label/binary>>,
Clamped_at_ms
)
end,
{Next@1, {error, <<"policy_denied:"/utf8, Policy_label/binary>>}}
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 480).
?DOC(" Runtime pipeline accessor.\n").
-spec pipeline_runtime(surface()) -> lightspeed@pipeline:runtime().
pipeline_runtime(Surface) ->
lightspeed@pipeline@orchestrator:runtime_pipeline(
erlang:element(4, Surface)
).
-file("src/lightspeed/pipeline/operations_surface.gleam", 458).
?DOC(" Pipeline checkpoints in emit order.\n").
-spec checkpoints(surface()) -> list(lightspeed@pipeline@checkpoint:checkpoint()).
checkpoints(Surface) ->
lightspeed@pipeline:checkpoints(pipeline_runtime(Surface)).
-file("src/lightspeed/pipeline/operations_surface.gleam", 781).
-spec latest_sequence(list(lightspeed@pipeline@checkpoint:checkpoint())) -> integer().
latest_sequence(Entries) ->
case Entries of
[] ->
0;
[Entry | Rest] ->
max(erlang:element(4, Entry), latest_sequence(Rest))
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 763).
-spec refresh_freshness_from_runtime(surface()) -> surface().
refresh_freshness_from_runtime(Surface) ->
Runtime = pipeline_runtime(Surface),
Telemetry = lightspeed@pipeline:runtime_telemetry(Runtime),
Sequence = latest_sequence(checkpoints(Surface)),
Max_allowed = erlang:element(2, erlang:element(6, Surface)),
Observed_lag = erlang:element(2, Telemetry),
Is_fresh = Observed_lag =< Max_allowed,
Freshness = {freshness_metadata,
Max_allowed,
Observed_lag,
Sequence,
Is_fresh},
{surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
erlang:element(4, Surface),
erlang:element(5, Surface),
Freshness,
erlang:element(7, Surface),
erlang:element(8, Surface),
erlang:element(9, Surface)}.
-file("src/lightspeed/pipeline/operations_surface.gleam", 169).
?DOC(" Complete one pipeline run.\n").
-spec complete_run(surface(), integer()) -> surface().
complete_run(Surface, At_ms) ->
Clamped_at_ms = max(0, At_ms),
Next_runtime = lightspeed@pipeline@orchestrator:complete_run(
erlang:element(4, Surface),
Clamped_at_ms
),
_pipe = {surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
Next_runtime,
erlang:element(5, Surface),
erlang:element(6, Surface),
erlang:element(7, Surface),
erlang:element(8, Surface),
erlang:element(9, Surface)},
_pipe@1 = refresh_freshness_from_runtime(_pipe),
append_audit(
_pipe@1,
<<"complete_run"/utf8>>,
<<"completed"/utf8>>,
Clamped_at_ms
).
-file("src/lightspeed/pipeline/operations_surface.gleam", 816).
-spec control_policy_action(lightspeed@pipeline@operator:action()) -> gleam@option:option(lightspeed@tenant@policy:action()).
control_policy_action(Action) ->
case Action of
{pause, _} ->
{some, {apply_mitigation, pause_pipelines, 1}};
{drain, _} ->
{some, {apply_mitigation, pause_pipelines, 1}};
{replay, _, _} ->
{some, {replay_pipeline_run, 1}};
{resume, _} ->
none
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 797).
-spec evaluate_control_policy(
lightspeed@tenant@policy:runtime(),
lightspeed@pipeline@operator:action()
) -> {lightspeed@tenant@policy:runtime(), {ok, nil} | {error, binary()}}.
evaluate_control_policy(Runtime, Action) ->
case control_policy_action(Action) of
none ->
{Runtime, {ok, nil}};
{some, Policy_action} ->
{Next_runtime, Outcome} = lightspeed@tenant@policy:evaluate(
Runtime,
Policy_action
),
case Outcome of
{allowed, _} ->
{Next_runtime, {ok, nil}};
{denied, _} ->
{Next_runtime,
{error, lightspeed@tenant@policy:outcome_label(Outcome)}}
end
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 180).
?DOC(" Apply one operator control action with tenant-aware replay/mitigation checks.\n").
-spec apply_control(surface(), lightspeed@pipeline@operator:action(), integer()) -> {surface(),
{ok, nil} | {error, binary()}}.
apply_control(Surface, Action, At_ms) ->
Clamped_at_ms = max(0, At_ms),
{Policy_runtime, Policy_result} = evaluate_control_policy(
erlang:element(5, Surface),
Action
),
Base_surface = {surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
erlang:element(4, Surface),
Policy_runtime,
erlang:element(6, Surface),
erlang:element(7, Surface),
erlang:element(8, Surface),
erlang:element(9, Surface)},
Action_label = <<"control:"/utf8,
(lightspeed@pipeline@operator:action_label(Action))/binary>>,
case Policy_result of
{error, Reason} ->
Next = begin
_pipe = Base_surface,
append_audit(
_pipe,
Action_label,
<<"denied:"/utf8, Reason/binary>>,
Clamped_at_ms
)
end,
{Next, {error, <<"policy_denied:"/utf8, Reason/binary>>}};
{ok, nil} ->
case lightspeed@pipeline@orchestrator:apply_action(
erlang:element(4, Base_surface),
Action,
Clamped_at_ms
) of
{Next_runtime, {ok, _}} ->
Next@1 = begin
_pipe@1 = {surface,
erlang:element(2, Base_surface),
erlang:element(3, Base_surface),
Next_runtime,
erlang:element(5, Base_surface),
erlang:element(6, Base_surface),
erlang:element(7, Base_surface),
erlang:element(8, Base_surface),
erlang:element(9, Base_surface)},
_pipe@2 = refresh_freshness_from_runtime(_pipe@1),
append_audit(
_pipe@2,
Action_label,
<<"applied"/utf8>>,
Clamped_at_ms
)
end,
{Next@1, {ok, nil}};
{_, {error, Reason@1}} ->
Next@2 = begin
_pipe@3 = Base_surface,
append_audit(
_pipe@3,
Action_label,
<<"rejected:"/utf8, Reason@1/binary>>,
Clamped_at_ms
)
end,
{Next@2, {error, Reason@1}}
end
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 224).
?DOC(" Enqueue one batch into the ETL runtime operations surface.\n").
-spec enqueue_batch(
surface(),
binary(),
binary(),
integer(),
integer(),
binary(),
integer()
) -> {surface(), {ok, nil} | {error, binary()}}.
enqueue_batch(Surface, Key, Stage, Records, Lag_ms, Idempotency_key, Sequence) ->
{Next_runtime, Result} = lightspeed@pipeline@orchestrator:enqueue_batch(
erlang:element(4, Surface),
Key,
Stage,
Records,
Lag_ms,
Idempotency_key,
Sequence
),
Next = {surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
Next_runtime,
erlang:element(5, Surface),
erlang:element(6, Surface),
erlang:element(7, Surface),
erlang:element(8, Surface),
erlang:element(9, Surface)},
case Result of
{ok, _} ->
{begin
_pipe = Next,
append_audit(
_pipe,
<<"enqueue:"/utf8, Key/binary>>,
<<<<<<"queued:"/utf8, Stage/binary>>/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(Records))/binary>>,
max(0, Sequence)
)
end,
{ok, nil}};
{error, Reason} ->
{begin
_pipe@1 = Next,
append_audit(
_pipe@1,
<<"enqueue:"/utf8, Key/binary>>,
<<"rejected:"/utf8, Reason/binary>>,
max(0, Sequence)
)
end,
{error, Reason}}
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 916).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[First | Rest] ->
gleam@list:fold(
Rest,
First,
fun(Accumulator, Value) ->
<<<<Accumulator/binary, Separator/binary>>/binary,
Value/binary>>
end
)
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 268).
?DOC(" Start as many queued batches as allowed by backpressure boundaries.\n").
-spec start_available(surface(), integer()) -> {surface(), list(binary())}.
start_available(Surface, At_ms) ->
Clamped_at_ms = max(0, At_ms),
{Next_runtime, Started} = lightspeed@pipeline@orchestrator:start_available(
erlang:element(4, Surface),
Clamped_at_ms
),
Next = begin
_pipe = {surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
Next_runtime,
erlang:element(5, Surface),
erlang:element(6, Surface),
erlang:element(7, Surface),
erlang:element(8, Surface),
erlang:element(9, Surface)},
append_audit(
_pipe,
<<"start_available"/utf8>>,
<<"started="/utf8, (join_with(<<","/utf8>>, Started))/binary>>,
Clamped_at_ms
)
end,
{Next, Started}.
-file("src/lightspeed/pipeline/operations_surface.gleam", 788).
-spec latest_checkpoint(list(lightspeed@pipeline@checkpoint:checkpoint())) -> gleam@option:option(lightspeed@pipeline@checkpoint:checkpoint()).
latest_checkpoint(Entries) ->
case lists:reverse(Entries) of
[] ->
none;
[Entry | _] ->
{some, Entry}
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 722).
-spec apply_outcome_metadata(
surface(),
lightspeed@pipeline@orchestrator:process_outcome()
) -> surface().
apply_outcome_metadata(Surface, Outcome) ->
case Outcome of
{processed, Result} ->
case gleam_stdlib:string_starts_with(
lightspeed@pipeline:process_result_label(Result),
<<"applied:"/utf8>>
) of
false ->
Surface;
true ->
case latest_checkpoint(checkpoints(Surface)) of
none ->
Surface;
{some, Entry} ->
Plan = lightspeed@pipeline@orchestrator:runtime_connector_plan(
erlang:element(4, Surface)
),
Record = {lineage_record,
erlang:element(2, Entry),
erlang:element(3, Entry),
lightspeed@pipeline@connector:source_label(
lightspeed@pipeline@connector:source(Plan)
),
lightspeed@pipeline@connector:sink_label(
lightspeed@pipeline@connector:sink(Plan)
),
lightspeed@pipeline@checkpoint:watermark_label(
erlang:element(5, Entry)
),
erlang:element(4, Entry),
lightspeed@pipeline@checkpoint:checkpoint_label(
Entry
)},
{surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
erlang:element(4, Surface),
erlang:element(5, Surface),
erlang:element(6, Surface),
[Record | erlang:element(7, Surface)],
erlang:element(8, Surface),
erlang:element(9, Surface)}
end
end;
_ ->
Surface
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 287).
?DOC(" Acknowledge one processed batch and update lineage/freshness metadata.\n").
-spec acknowledge_batch(surface(), binary(), integer()) -> {surface(),
{ok, lightspeed@pipeline@orchestrator:process_outcome()} | {error, binary()}}.
acknowledge_batch(Surface, Key, At_ms) ->
Clamped_at_ms = max(0, At_ms),
{Next_runtime, Result} = lightspeed@pipeline@orchestrator:ack_batch(
erlang:element(4, Surface),
Key,
Clamped_at_ms
),
Next = {surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
Next_runtime,
erlang:element(5, Surface),
erlang:element(6, Surface),
erlang:element(7, Surface),
erlang:element(8, Surface),
erlang:element(9, Surface)},
case Result of
{error, Reason} ->
{begin
_pipe = Next,
append_audit(
_pipe,
<<"ack:"/utf8, Key/binary>>,
<<"rejected:"/utf8, Reason/binary>>,
Clamped_at_ms
)
end,
{error, Reason}};
{ok, Outcome} ->
Next@1 = begin
_pipe@1 = Next,
_pipe@2 = apply_outcome_metadata(_pipe@1, Outcome),
_pipe@3 = refresh_freshness_from_runtime(_pipe@2),
append_audit(
_pipe@3,
<<"ack:"/utf8, Key/binary>>,
lightspeed@pipeline@orchestrator:process_outcome_label(
Outcome
),
Clamped_at_ms
)
end,
{Next@1, {ok, Outcome}}
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 320).
?DOC(" Fail one batch and apply retry/dead-letter policies.\n").
-spec fail_batch(surface(), binary(), integer(), binary(), integer()) -> {surface(),
{ok, lightspeed@pipeline@orchestrator:process_outcome()} | {error, binary()}}.
fail_batch(Surface, Key, Attempt, Reason, At_ms) ->
Clamped_at_ms = max(0, At_ms),
{Next_runtime, Result} = lightspeed@pipeline@orchestrator:fail_batch(
erlang:element(4, Surface),
Key,
Attempt,
Reason,
Clamped_at_ms
),
Next = {surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
Next_runtime,
erlang:element(5, Surface),
erlang:element(6, Surface),
erlang:element(7, Surface),
erlang:element(8, Surface),
erlang:element(9, Surface)},
case Result of
{error, Error_reason} ->
{begin
_pipe = Next,
append_audit(
_pipe,
<<"fail:"/utf8, Key/binary>>,
<<"rejected:"/utf8, Error_reason/binary>>,
Clamped_at_ms
)
end,
{error, Error_reason}};
{ok, Outcome} ->
Next@1 = begin
_pipe@1 = Next,
_pipe@2 = refresh_freshness_from_runtime(_pipe@1),
append_audit(
_pipe@2,
<<"fail:"/utf8, Key/binary>>,
lightspeed@pipeline@orchestrator:process_outcome_label(
Outcome
),
Clamped_at_ms
)
end,
{Next@1, {ok, Outcome}}
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 846).
-spec all_conformance_passed(list(conformance_record())) -> boolean().
all_conformance_passed(Records) ->
case Records of
[] ->
true;
[Record | Rest] ->
erlang:element(7, Record) andalso all_conformance_passed(Rest)
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 909).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"true"/utf8>>;
false ->
<<"false"/utf8>>
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 827).
-spec budget_result_for_workload(
list(lightspeed@data_plane_huge:budget_result()),
binary()
) -> lightspeed@data_plane_huge:budget_result().
budget_result_for_workload(Results, Workload) ->
case Results of
[] ->
{budget_result, Workload, false, <<"missing_budget_result"/utf8>>};
[Result | Rest] ->
case erlang:element(2, Result) =:= Workload of
true ->
Result;
false ->
budget_result_for_workload(Rest, Workload)
end
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 364).
?DOC(" Certify source/sink conformance across heavy-data adapter profiles.\n").
-spec certify_heavy_data_conformance(surface()) -> surface().
certify_heavy_data_conformance(Surface) ->
Plan = lightspeed@pipeline@orchestrator:runtime_connector_plan(
erlang:element(4, Surface)
),
Source_signature = lightspeed@pipeline@connector:source_label(
lightspeed@pipeline@connector:source(Plan)
),
Sink_signature = lightspeed@pipeline@connector:sink_label(
lightspeed@pipeline@connector:sink(Plan)
),
Benchmarks = lightspeed@data_plane_huge:run_huge_benchmarks(),
Budget_results = lightspeed@data_plane_huge:evaluate_huge_budgets(
Benchmarks,
lightspeed@data_plane_huge:default_huge_budgets()
),
Records = gleam@list:map(
Benchmarks,
fun(Benchmark) ->
Budget_result = budget_result_for_workload(
Budget_results,
erlang:element(2, Benchmark)
),
{conformance_record,
erlang:element(3, Surface),
Source_signature,
Sink_signature,
lightspeed@data_plane_huge:benchmark_signature(Benchmark),
lightspeed@data_plane_huge:budget_result_signature(
Budget_result
),
erlang:element(3, Budget_result)}
end
),
Passed_label = bool_label(all_conformance_passed(Records)),
{surface,
erlang:element(2, Surface),
erlang:element(3, Surface),
erlang:element(4, Surface),
erlang:element(5, Surface),
erlang:element(6, Surface),
erlang:element(7, Surface),
lists:reverse(Records),
[{audit_record,
<<"certify_heavy_data_conformance"/utf8>>,
<<"passed="/utf8, Passed_label/binary>>,
erlang:length(Records)} |
erlang:element(9, Surface)]}.
-file("src/lightspeed/pipeline/operations_surface.gleam", 403).
?DOC(" `True` when heavy-data conformance evidence is present and all records pass.\n").
-spec conformance_passed(surface()) -> boolean().
conformance_passed(Surface) ->
Records = conformance_records(Surface),
case Records of
[] ->
false;
_ ->
all_conformance_passed(Records)
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 412).
?DOC(" Surface name accessor.\n").
-spec name(surface()) -> binary().
name(Surface) ->
erlang:element(2, Surface).
-file("src/lightspeed/pipeline/operations_surface.gleam", 417).
?DOC(" Adapter-profile label.\n").
-spec adapter_profile_label(adapter_profile()) -> binary().
adapter_profile_label(Profile) ->
case Profile of
standard_profile ->
<<"standard"/utf8>>;
heavy_data_profile ->
<<"heavy_data"/utf8>>;
burst_recovery_profile ->
<<"burst_recovery"/utf8>>
end.
-file("src/lightspeed/pipeline/operations_surface.gleam", 426).
?DOC(" Freshness metadata accessor.\n").
-spec freshness(surface()) -> freshness_metadata().
freshness(Surface) ->
erlang:element(6, Surface).
-file("src/lightspeed/pipeline/operations_surface.gleam", 431).
?DOC(" Freshness metadata label.\n").
-spec freshness_label(freshness_metadata()) -> binary().
freshness_label(Freshness) ->
<<<<<<<<<<<<<<"max_allowed_lag_ms="/utf8,
(erlang:integer_to_binary(
erlang:element(2, Freshness)
))/binary>>/binary,
"|observed_lag_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Freshness)))/binary>>/binary,
"|last_sequence="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(4, Freshness)))/binary>>/binary,
"|is_fresh="/utf8>>/binary,
(bool_label(erlang:element(5, Freshness)))/binary>>.
-file("src/lightspeed/pipeline/operations_surface.gleam", 463).
?DOC(" Sink idempotency keys in emit order.\n").
-spec sink_idempotency_keys(surface()) -> list(binary()).
sink_idempotency_keys(Surface) ->
lightspeed@pipeline:sink_idempotency_keys(pipeline_runtime(Surface)).
-file("src/lightspeed/pipeline/operations_surface.gleam", 468).
?DOC(" Pipeline runtime telemetry label.\n").
-spec telemetry_label(surface()) -> binary().
telemetry_label(Surface) ->
lightspeed@pipeline@telemetry:label(
lightspeed@pipeline:runtime_telemetry(pipeline_runtime(Surface))
).
-file("src/lightspeed/pipeline/operations_surface.gleam", 475).
?DOC(" Pipeline lifecycle label.\n").
-spec lifecycle_label(surface()) -> binary().
lifecycle_label(Surface) ->
lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(pipeline_runtime(Surface))
).
-file("src/lightspeed/pipeline/operations_surface.gleam", 485).
?DOC(" Runtime policy accessor.\n").
-spec tenant_runtime(surface()) -> lightspeed@tenant@policy:runtime().
tenant_runtime(Surface) ->
erlang:element(5, Surface).
-file("src/lightspeed/pipeline/operations_surface.gleam", 545).
?DOC(" Stable audit signature.\n").
-spec audit_signature(audit_record()) -> binary().
audit_signature(Record) ->
<<<<<<<<<<"action="/utf8, (erlang:element(2, Record))/binary>>/binary,
"|outcome="/utf8>>/binary,
(erlang:element(3, Record))/binary>>/binary,
"|at_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(4, Record)))/binary>>.
-file("src/lightspeed/pipeline/operations_surface.gleam", 529).
?DOC(" Stable conformance signature.\n").
-spec conformance_signature(conformance_record()) -> binary().
conformance_signature(Record) ->
<<<<<<<<<<<<<<<<<<<<<<"profile="/utf8,
(adapter_profile_label(
erlang:element(2, Record)
))/binary>>/binary,
"|source="/utf8>>/binary,
(erlang:element(3, Record))/binary>>/binary,
"|sink="/utf8>>/binary,
(erlang:element(4, Record))/binary>>/binary,
"|benchmark="/utf8>>/binary,
(erlang:element(5, Record))/binary>>/binary,
"|budget="/utf8>>/binary,
(erlang:element(6, Record))/binary>>/binary,
"|passed="/utf8>>/binary,
(bool_label(erlang:element(7, Record)))/binary>>.
-file("src/lightspeed/pipeline/operations_surface.gleam", 511).
?DOC(" Stable lineage signature.\n").
-spec lineage_signature(lineage_record()) -> binary().
lineage_signature(Record) ->
<<<<<<<<<<<<<<<<<<<<<<<<<<"run="/utf8, (erlang:element(2, Record))/binary>>/binary,
"|stage="/utf8>>/binary,
(erlang:element(3, Record))/binary>>/binary,
"|source="/utf8>>/binary,
(erlang:element(4, Record))/binary>>/binary,
"|sink="/utf8>>/binary,
(erlang:element(5, Record))/binary>>/binary,
"|watermark="/utf8>>/binary,
(erlang:element(6, Record))/binary>>/binary,
"|sequence="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(7, Record)))/binary>>/binary,
"|checkpoint="/utf8>>/binary,
(erlang:element(8, Record))/binary>>.
-file("src/lightspeed/pipeline/operations_surface.gleam", 490).
?DOC(" Surface signature.\n").
-spec signature(surface()) -> binary().
signature(Surface) ->
<<<<<<<<<<<<<<<<<<<<<<<<<<"profile="/utf8,
(adapter_profile_label(
erlang:element(
3,
Surface
)
))/binary>>/binary,
"|orchestrator="/utf8>>/binary,
(lightspeed@pipeline@orchestrator:signature(
erlang:element(4, Surface)
))/binary>>/binary,
"|policy="/utf8>>/binary,
(lightspeed@tenant@policy:signature(
erlang:element(5, Surface)
))/binary>>/binary,
"|freshness="/utf8>>/binary,
(freshness_label(erlang:element(6, Surface)))/binary>>/binary,
"|lineage="/utf8>>/binary,
(join_with(
<<";"/utf8>>,
gleam@list:map(
lineage_records(Surface),
fun lineage_signature/1
)
))/binary>>/binary,
"|conformance="/utf8>>/binary,
(join_with(
<<";"/utf8>>,
gleam@list:map(
conformance_records(Surface),
fun conformance_signature/1
)
))/binary>>/binary,
"|audits="/utf8>>/binary,
(join_with(
<<";"/utf8>>,
gleam@list:map(audits(Surface), fun audit_signature/1)
))/binary>>.
-file("src/lightspeed/pipeline/operations_surface.gleam", 555).
?DOC(" Deterministic fixture snapshots for M60 drift gates.\n").
-spec fixture_snapshots() -> list({binary(), binary()}).
fixture_snapshots() ->
Baseline = begin
_pipe = default_surface(),
certify_heavy_data_conformance(_pipe)
end,
[{<<"default_surface"/utf8>>, signature(Baseline)},
{<<"strict_policy_surface"/utf8>>,
signature(
new(
<<"strict_policy_surface"/utf8>>,
standard_profile,
lightspeed@tenant@policy:new(
lightspeed@tenant@policy:tenant_context(
<<"ops-strict"/utf8>>,
<<"tenant-orders"/utf8>>,
editor
),
lightspeed@tenant@policy:expanded_budget(
2,
1,
1,
1,
0,
0
)
)
)
)},
{<<"burst_recovery_surface"/utf8>>,
signature(
new(
<<"burst_recovery_surface"/utf8>>,
burst_recovery_profile,
default_tenant_runtime()
)
)}].
-file("src/lightspeed/pipeline/operations_surface.gleam", 582).
?DOC(" Deterministic M60 snapshot signature.\n").
-spec snapshot_signature() -> binary().
snapshot_signature() ->
Entries = gleam@list:map(
fixture_snapshots(),
fun(Entry) ->
{Label, Value} = Entry,
<<<<Label/binary, "="/utf8>>/binary, Value/binary>>
end
),
<<<<(profile_version_label())/binary, "|"/utf8>>/binary,
(join_with(<<";"/utf8>>, Entries))/binary>>.