-module(lightspeed@ops@operational_autopilot).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/ops/operational_autopilot.gleam").
-export([default_policy/0, strict_policy/0, blocked_policy/0, surface_label/1, run_drill/2, run_default_drills/0, drill_scenario_label/1, control_label/1, decision_label/1, policy_label/1, action_label/1, correlation_label/1, latency_reduced/1, drill_result_signature/1, actions/1, correlation/1, action_count/1, recovery_latency_ms/1, actions_bounded/2, actions_reversible/1, actions_auditable/1, drill_result_valid/2, all_drills_valid/2, requires_override/1, blocked_by_policy/1, audit_metric/1]).
-export_type([surface/0, control/0, override_policy/0, decision/0, drill_scenario/0, control_action/0, correlation/0, drill_result/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Cross-surface operational autopilot and SLO-aware controls (M54).\n").
-type surface() :: runtime_surface |
transport_surface |
tenant_surface |
etl_surface.
-type control() :: {shed_runtime_events, integer()} |
{throttle_transport, integer()} |
{isolate_tenant, binary()} |
pause_etl_pipelines.
-type override_policy() :: {override_policy,
binary(),
boolean(),
boolean(),
boolean(),
boolean(),
boolean(),
integer()}.
-type decision() :: applied |
{requires_override, binary()} |
{blocked_by_policy, binary()} |
rolled_back.
-type drill_scenario() :: runtime_latency_spike |
transport_ack_storm |
tenant_hotspot_isolation |
etl_replay_lag.
-type control_action() :: {control_action,
surface(),
control(),
decision(),
binary(),
integer(),
integer(),
integer(),
boolean()}.
-type correlation() :: {correlation,
binary(),
surface(),
binary(),
integer(),
integer(),
integer()}.
-type drill_result() :: {drill_result,
drill_scenario(),
surface(),
lightspeed@ops@slo_autopilot:status(),
integer(),
integer(),
integer(),
integer(),
list(control_action()),
correlation()}.
-file("src/lightspeed/ops/operational_autopilot.gleam", 97).
?DOC(" Default policy allows bounded automation on every M54 surface.\n").
-spec default_policy() -> override_policy().
default_policy() ->
{override_policy, <<"default"/utf8>>, true, true, true, true, false, 2}.
-file("src/lightspeed/ops/operational_autopilot.gleam", 110).
?DOC(" Strict policy requires explicit override semantics for high-impact controls.\n").
-spec strict_policy() -> override_policy().
strict_policy() ->
{override_policy, <<"strict"/utf8>>, true, true, true, true, true, 2}.
-file("src/lightspeed/ops/operational_autopilot.gleam", 123).
?DOC(" Blocked policy used by tests and operators to verify policy bounds.\n").
-spec blocked_policy() -> override_policy().
blocked_policy() ->
{override_policy, <<"blocked"/utf8>>, true, true, false, false, true, 1}.
-file("src/lightspeed/ops/operational_autopilot.gleam", 603).
-spec first_recovery(list(control_action())) -> integer().
first_recovery(Actions) ->
case Actions of
[Action | _] ->
erlang:element(8, Action);
[] ->
0
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 568).
-spec drill_result(
drill_scenario(),
surface(),
lightspeed@ops@slo_autopilot:status(),
integer(),
integer(),
integer(),
integer(),
list(control_action()),
binary(),
binary()
) -> drill_result().
drill_result(
Scenario,
Surface,
Status,
Baseline_detection_ms,
Autopilot_detection_ms,
Baseline_mitigation_ms,
Autopilot_mitigation_ms,
Actions,
Incident_id,
Trace_ref
) ->
Correlation = {correlation,
Incident_id,
Surface,
Trace_ref,
Autopilot_detection_ms,
Autopilot_mitigation_ms,
first_recovery(Actions)},
{drill_result,
Scenario,
Surface,
Status,
Baseline_detection_ms,
Autopilot_detection_ms,
Baseline_mitigation_ms,
Autopilot_mitigation_ms,
Actions,
Correlation}.
-file("src/lightspeed/ops/operational_autopilot.gleam", 171).
?DOC(" Surface label.\n").
-spec surface_label(surface()) -> binary().
surface_label(Surface) ->
case Surface of
runtime_surface ->
<<"runtime"/utf8>>;
transport_surface ->
<<"transport"/utf8>>;
tenant_surface ->
<<"tenant"/utf8>>;
etl_surface ->
<<"etl"/utf8>>
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 540).
-spec high_impact_surface(surface()) -> boolean().
high_impact_surface(Surface) ->
case Surface of
runtime_surface ->
false;
transport_surface ->
false;
tenant_surface ->
true;
etl_surface ->
true
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 518).
-spec decide(override_policy(), surface()) -> decision().
decide(Policy, Surface) ->
Allowed = case Surface of
runtime_surface ->
erlang:element(3, Policy);
transport_surface ->
erlang:element(4, Policy);
tenant_surface ->
erlang:element(5, Policy);
etl_surface ->
erlang:element(6, Policy)
end,
case Allowed of
false ->
{blocked_by_policy,
<<"surface_disabled:"/utf8, (surface_label(Surface))/binary>>};
true ->
case erlang:element(7, Policy) andalso high_impact_surface(Surface) of
true ->
{requires_override,
<<<<<<"override:"/utf8,
(erlang:element(2, Policy))/binary>>/binary,
":"/utf8>>/binary,
(surface_label(Surface))/binary>>};
false ->
applied
end
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 547).
-spec control_action(
surface(),
control(),
decision(),
binary(),
integer(),
integer(),
integer()
) -> control_action().
control_action(
Surface,
Control,
Decision,
Reason,
Detected_at_ms,
Applied_at_ms,
Recovered_at_ms
) ->
{control_action,
Surface,
Control,
Decision,
Reason,
Detected_at_ms,
Applied_at_ms,
Recovered_at_ms,
true}.
-file("src/lightspeed/ops/operational_autopilot.gleam", 459).
-spec run_etl_replay_lag(override_policy()) -> drill_result().
run_etl_replay_lag(Policy) ->
Observation = lightspeed@pipeline@slo:observation(
<<"orders_replay"/utf8>>,
380,
180,
11,
100,
5200
),
Budget = lightspeed@pipeline@slo:budget(
<<"orders_replay"/utf8>>,
320,
220,
8,
4500
),
Results = lightspeed@pipeline@slo:evaluate([Observation], [Budget]),
Result@1 = case Results of
[Result | _] ->
Result;
[] ->
{budget_result,
<<"orders_replay"/utf8>>,
false,
<<"missing_budget"/utf8>>}
end,
Status = case erlang:element(3, Result@1) of
true ->
healthy;
false ->
critical
end,
Action = control_action(
etl_surface,
pause_etl_pipelines,
decide(Policy, etl_surface),
<<"etl_slo:"/utf8,
(lightspeed@pipeline@slo:budget_result_label(Result@1))/binary>>,
22000,
58000,
96000
),
drill_result(
etl_replay_lag,
etl_surface,
Status,
150000,
22000,
210000,
58000,
[Action],
<<"m54-etl-replay"/utf8>>,
<<"etl:replay:lag"/utf8>>
).
-file("src/lightspeed/ops/operational_autopilot.gleam", 497).
-spec tenant_policy_reason() -> binary().
tenant_policy_reason() ->
Context = lightspeed@tenant@policy:tenant_context(
<<"operator-a"/utf8>>,
<<"tenant-a"/utf8>>,
tenant_admin
),
Runtime = lightspeed@tenant@policy:new(
Context,
lightspeed@tenant@policy:expanded_budget(10, 10, 10, 10, 10, 10)
),
{_, Outcome} = lightspeed@tenant@policy:evaluate(
Runtime,
{apply_mitigation, isolate_tenant, 1}
),
<<"tenant_policy:"/utf8,
(lightspeed@tenant@policy:outcome_label(Outcome))/binary>>.
-file("src/lightspeed/ops/operational_autopilot.gleam", 428).
-spec run_tenant_hotspot_isolation(override_policy()) -> drill_result().
run_tenant_hotspot_isolation(Policy) ->
Sample = lightspeed@ops@slo_autopilot:sample(
<<"tenant"/utf8>>,
<<"hotspot_budget"/utf8>>,
2600,
1900,
<<"tenant-a"/utf8>>
),
Status = lightspeed@ops@slo_autopilot:classify(
Sample,
lightspeed@ops@slo_autopilot:default_thresholds()
),
Policy_reason = tenant_policy_reason(),
Action = control_action(
tenant_surface,
{isolate_tenant, <<"tenant-a"/utf8>>},
decide(Policy, tenant_surface),
Policy_reason,
18000,
49000,
84000
),
drill_result(
tenant_hotspot_isolation,
tenant_surface,
Status,
130000,
18000,
190000,
49000,
[Action],
<<"m54-tenant-hotspot"/utf8>>,
<<"tenant:policy:isolation"/utf8>>
).
-file("src/lightspeed/ops/operational_autopilot.gleam", 399).
-spec run_transport_ack_storm(override_policy()) -> drill_result().
run_transport_ack_storm(Policy) ->
Sample = lightspeed@ops@slo_autopilot:sample(
<<"transport"/utf8>>,
<<"ack_latency"/utf8>>,
2200,
1600,
<<""/utf8>>
),
Status = lightspeed@ops@slo_autopilot:classify(
Sample,
lightspeed@ops@slo_autopilot:default_thresholds()
),
Action = control_action(
transport_surface,
{throttle_transport, 220},
decide(Policy, transport_surface),
<<"transport_ack_storm:cadence_guard"/utf8>>,
12000,
34000,
68000
),
drill_result(
transport_ack_storm,
transport_surface,
Status,
100000,
12000,
150000,
34000,
[Action],
<<"m54-transport-ack"/utf8>>,
<<"transport:ack:storm"/utf8>>
).
-file("src/lightspeed/ops/operational_autopilot.gleam", 370).
-spec run_runtime_latency_spike(override_policy()) -> drill_result().
run_runtime_latency_spike(Policy) ->
Sample = lightspeed@ops@slo_autopilot:sample(
<<"runtime"/utf8>>,
<<"availability"/utf8>>,
2500,
1700,
<<""/utf8>>
),
Status = lightspeed@ops@slo_autopilot:classify(
Sample,
lightspeed@ops@slo_autopilot:default_thresholds()
),
Action = control_action(
runtime_surface,
{shed_runtime_events, 40},
decide(Policy, runtime_surface),
<<"slo_burn_rate:runtime_availability"/utf8>>,
15000,
42000,
72000
),
drill_result(
runtime_latency_spike,
runtime_surface,
Status,
120000,
15000,
180000,
42000,
[Action],
<<"m54-runtime-latency"/utf8>>,
<<"runtime:slo:burn-rate"/utf8>>
).
-file("src/lightspeed/ops/operational_autopilot.gleam", 136).
?DOC(" Run one deterministic M54 drill.\n").
-spec run_drill(drill_scenario(), override_policy()) -> drill_result().
run_drill(Scenario, Policy) ->
case Scenario of
runtime_latency_spike ->
run_runtime_latency_spike(Policy);
transport_ack_storm ->
run_transport_ack_storm(Policy);
tenant_hotspot_isolation ->
run_tenant_hotspot_isolation(Policy);
etl_replay_lag ->
run_etl_replay_lag(Policy)
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 149).
?DOC(" Run all default M54 drills.\n").
-spec run_default_drills() -> list(drill_result()).
run_default_drills() ->
Policy = default_policy(),
[run_drill(runtime_latency_spike, Policy),
run_drill(transport_ack_storm, Policy),
run_drill(tenant_hotspot_isolation, Policy),
run_drill(etl_replay_lag, Policy)].
-file("src/lightspeed/ops/operational_autopilot.gleam", 161).
?DOC(" Drill scenario label.\n").
-spec drill_scenario_label(drill_scenario()) -> binary().
drill_scenario_label(Scenario) ->
case Scenario of
runtime_latency_spike ->
<<"runtime_latency_spike"/utf8>>;
transport_ack_storm ->
<<"transport_ack_storm"/utf8>>;
tenant_hotspot_isolation ->
<<"tenant_hotspot_isolation"/utf8>>;
etl_replay_lag ->
<<"etl_replay_lag"/utf8>>
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 181).
?DOC(" Control label.\n").
-spec control_label(control()) -> binary().
control_label(Control) ->
case Control of
{shed_runtime_events, Percent} ->
<<"shed_runtime_events:"/utf8,
(erlang:integer_to_binary(Percent))/binary>>;
{throttle_transport, Max_events_per_sec} ->
<<"throttle_transport:"/utf8,
(erlang:integer_to_binary(Max_events_per_sec))/binary>>;
{isolate_tenant, Tenant_id} ->
<<"isolate_tenant:"/utf8, Tenant_id/binary>>;
pause_etl_pipelines ->
<<"pause_etl_pipelines"/utf8>>
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 193).
?DOC(" Decision label.\n").
-spec decision_label(decision()) -> binary().
decision_label(Decision) ->
case Decision of
applied ->
<<"applied"/utf8>>;
{requires_override, Override_id} ->
<<"requires_override:"/utf8, Override_id/binary>>;
{blocked_by_policy, Reason} ->
<<"blocked_by_policy:"/utf8, Reason/binary>>;
rolled_back ->
<<"rolled_back"/utf8>>
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 610).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"true"/utf8>>;
false ->
<<"false"/utf8>>
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 203).
?DOC(" Override policy label.\n").
-spec policy_label(override_policy()) -> binary().
policy_label(Policy) ->
<<<<<<<<<<<<<<<<<<<<<<<<<<"policy="/utf8,
(erlang:element(
2,
Policy
))/binary>>/binary,
":runtime="/utf8>>/binary,
(bool_label(
erlang:element(3, Policy)
))/binary>>/binary,
":transport="/utf8>>/binary,
(bool_label(erlang:element(4, Policy)))/binary>>/binary,
":tenant="/utf8>>/binary,
(bool_label(erlang:element(5, Policy)))/binary>>/binary,
":etl="/utf8>>/binary,
(bool_label(erlang:element(6, Policy)))/binary>>/binary,
":override="/utf8>>/binary,
(bool_label(erlang:element(7, Policy)))/binary>>/binary,
":max_actions="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(8, Policy)))/binary>>.
-file("src/lightspeed/ops/operational_autopilot.gleam", 221).
?DOC(" Control action label.\n").
-spec action_label(control_action()) -> binary().
action_label(Action) ->
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"surface="/utf8,
(surface_label(
erlang:element(
2,
Action
)
))/binary>>/binary,
":control="/utf8>>/binary,
(control_label(
erlang:element(
3,
Action
)
))/binary>>/binary,
":decision="/utf8>>/binary,
(decision_label(
erlang:element(4, Action)
))/binary>>/binary,
":reason="/utf8>>/binary,
(erlang:element(5, Action))/binary>>/binary,
":detected="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(6, Action)
))/binary>>/binary,
":applied="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(7, Action)))/binary>>/binary,
":recovered="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(8, Action)))/binary>>/binary,
":reversible="/utf8>>/binary,
(bool_label(erlang:element(9, Action)))/binary>>.
-file("src/lightspeed/ops/operational_autopilot.gleam", 241).
?DOC(" Correlation label for incident triage.\n").
-spec correlation_label(correlation()) -> binary().
correlation_label(Correlation) ->
<<<<<<<<<<<<<<<<<<<<<<"incident="/utf8,
(erlang:element(2, Correlation))/binary>>/binary,
":surface="/utf8>>/binary,
(surface_label(
erlang:element(3, Correlation)
))/binary>>/binary,
":trace="/utf8>>/binary,
(erlang:element(4, Correlation))/binary>>/binary,
":detected="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(5, Correlation)
))/binary>>/binary,
":mitigated="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(6, Correlation)))/binary>>/binary,
":recovered="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(7, Correlation)))/binary>>.
-file("src/lightspeed/ops/operational_autopilot.gleam", 617).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[First | Rest] ->
gleam@list:fold(
Rest,
First,
fun(Accumulator, Value) ->
<<<<Accumulator/binary, Separator/binary>>/binary,
Value/binary>>
end
)
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 296).
?DOC(" True when autopilot detection and mitigation beat the manual baseline.\n").
-spec latency_reduced(drill_result()) -> boolean().
latency_reduced(Result) ->
(erlang:element(6, Result) < erlang:element(5, Result)) andalso (erlang:element(
8,
Result
)
< erlang:element(7, Result)).
-file("src/lightspeed/ops/operational_autopilot.gleam", 257).
?DOC(" Stable drill result signature.\n").
-spec drill_result_signature(drill_result()) -> binary().
drill_result_signature(Result) ->
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"drill="/utf8,
(drill_scenario_label(
erlang:element(
2,
Result
)
))/binary>>/binary,
"|surface="/utf8>>/binary,
(surface_label(
erlang:element(
3,
Result
)
))/binary>>/binary,
"|status="/utf8>>/binary,
(lightspeed@ops@slo_autopilot:status_label(
erlang:element(
4,
Result
)
))/binary>>/binary,
"|latency_reduced="/utf8>>/binary,
(bool_label(
latency_reduced(
Result
)
))/binary>>/binary,
"|baseline_detection_ms="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(5, Result)
))/binary>>/binary,
"|autopilot_detection_ms="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(6, Result)
))/binary>>/binary,
"|baseline_mitigation_ms="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(7, Result)
))/binary>>/binary,
"|autopilot_mitigation_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(8, Result)))/binary>>/binary,
"|actions="/utf8>>/binary,
(join_with(
<<","/utf8>>,
gleam@list:map(
erlang:element(9, Result),
fun action_label/1
)
))/binary>>/binary,
"|"/utf8>>/binary,
(correlation_label(erlang:element(10, Result)))/binary>>.
-file("src/lightspeed/ops/operational_autopilot.gleam", 281).
?DOC(" Drill actions accessor.\n").
-spec actions(drill_result()) -> list(control_action()).
actions(Result) ->
erlang:element(9, Result).
-file("src/lightspeed/ops/operational_autopilot.gleam", 286).
?DOC(" Drill correlation accessor.\n").
-spec correlation(drill_result()) -> correlation().
correlation(Result) ->
erlang:element(10, Result).
-file("src/lightspeed/ops/operational_autopilot.gleam", 291).
?DOC(" Count actions in a drill result.\n").
-spec action_count(drill_result()) -> integer().
action_count(Result) ->
erlang:length(erlang:element(9, Result)).
-file("src/lightspeed/ops/operational_autopilot.gleam", 302).
?DOC(" Recovery latency measured from detection to recovery.\n").
-spec recovery_latency_ms(drill_result()) -> integer().
recovery_latency_ms(Result) ->
erlang:element(7, erlang:element(10, Result)) - erlang:element(
5,
erlang:element(10, Result)
).
-file("src/lightspeed/ops/operational_autopilot.gleam", 307).
?DOC(" True when actions are bounded by policy and not silently unbounded.\n").
-spec actions_bounded(drill_result(), override_policy()) -> boolean().
actions_bounded(Result, Policy) ->
action_count(Result) =< erlang:element(8, Policy).
-file("src/lightspeed/ops/operational_autopilot.gleam", 312).
?DOC(" True when every emitted action is reversible.\n").
-spec actions_reversible(drill_result()) -> boolean().
actions_reversible(Result) ->
gleam@list:all(
erlang:element(9, Result),
fun(Action) -> erlang:element(9, Action) end
).
-file("src/lightspeed/ops/operational_autopilot.gleam", 317).
?DOC(" True when every action is auditable and bounded by explicit policy outcomes.\n").
-spec actions_auditable(drill_result()) -> boolean().
actions_auditable(Result) ->
gleam@list:all(
erlang:element(9, Result),
fun(Action) ->
(erlang:element(5, Action) /= <<""/utf8>>) andalso (decision_label(
erlang:element(4, Action)
)
/= <<""/utf8>>)
end
).
-file("src/lightspeed/ops/operational_autopilot.gleam", 324).
?DOC(" True when a drill has acceptable certification evidence.\n").
-spec drill_result_valid(drill_result(), override_policy()) -> boolean().
drill_result_valid(Result, Policy) ->
(((latency_reduced(Result) andalso actions_bounded(Result, Policy)) andalso actions_reversible(
Result
))
andalso actions_auditable(Result))
andalso (recovery_latency_ms(Result) > 0).
-file("src/lightspeed/ops/operational_autopilot.gleam", 333).
?DOC(" True when every drill passes the same certification criteria.\n").
-spec all_drills_valid(list(drill_result()), override_policy()) -> boolean().
all_drills_valid(Results, Policy) ->
gleam@list:all(
Results,
fun(Result) -> drill_result_valid(Result, Policy) end
).
-file("src/lightspeed/ops/operational_autopilot.gleam", 341).
?DOC(" True when a control action requested explicit operator override.\n").
-spec requires_override(control_action()) -> boolean().
requires_override(Action) ->
case erlang:element(4, Action) of
{requires_override, _} ->
true;
applied ->
false;
{blocked_by_policy, _} ->
false;
rolled_back ->
false
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 349).
?DOC(" True when a control action was blocked by policy.\n").
-spec blocked_by_policy(control_action()) -> boolean().
blocked_by_policy(Action) ->
case erlang:element(4, Action) of
{blocked_by_policy, _} ->
true;
applied ->
false;
{requires_override, _} ->
false;
rolled_back ->
false
end.
-file("src/lightspeed/ops/operational_autopilot.gleam", 357).
?DOC(" Telemetry metric for one autopilot action.\n").
-spec audit_metric(control_action()) -> lightspeed@ops@telemetry:metric().
audit_metric(Action) ->
{counter,
<<"lightspeed.operational_autopilot.action_total"/utf8>>,
1,
[{tag, <<"surface"/utf8>>, surface_label(erlang:element(2, Action))},
{tag, <<"control"/utf8>>, control_label(erlang:element(3, Action))},
{tag,
<<"decision"/utf8>>,
decision_label(erlang:element(4, Action))},
{tag, <<"reason"/utf8>>, erlang:element(5, Action)}]}.