-module(lightspeed@ops@replay_diagnostics).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/ops/replay_diagnostics.gleam").
-export([runtime_fault_trace/1, session_fault_trace/1, pipeline_etl_trace/1, events/1, trace_id/1, flow/1, flow_label/1, snapshot_at/2, latest_snapshot/1, snapshot_label/1, timeline_signature/1, trace_signature/1, fault_investigation_ready/1, etl_investigation_ready/1, investigation_signature/2, first_fault_snapshot/1]).
-export_type([flow/0, investigation_workflow/0, trace_event/0, replay_trace/0, snapshot/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Deterministic replay and time-travel diagnostics contracts for M42.\n").
-type flow() :: runtime_flow | session_flow | pipeline_flow.
-type investigation_workflow() :: fault_workflow | etl_workflow.
-type trace_event() :: {trace_event, integer(), integer(), binary(), binary()}.
-type replay_trace() :: {replay_trace,
binary(),
flow(),
binary(),
list(trace_event())}.
-type snapshot() :: {snapshot,
binary(),
flow(),
integer(),
integer(),
binary(),
binary()}.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 520).
-spec option_label(gleam@option:option(binary())) -> binary().
option_label(Value) ->
case Value of
none ->
<<"none"/utf8>>;
{some, Label} ->
Label
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 367).
-spec runtime_state_signature(lightspeed@testing:simulation_state()) -> binary().
runtime_state_signature(State) ->
Errors_count = erlang:length(lightspeed@testing:errors(State)),
<<<<<<<<<<<<<<<<<<<<<<<<<<"mounted="/utf8,
(option_label(
lightspeed@testing:mounted_route(
State
)
))/binary>>/binary,
"|views="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:length(
lightspeed@testing:rendered_views(
State
)
)
))/binary>>/binary,
"|patches="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:length(
lightspeed@testing:patches(
State
)
)
))/binary>>/binary,
"|events="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:length(
lightspeed@testing:pushed_events(State)
)
))/binary>>/binary,
"|subscriptions="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:length(
lightspeed@testing:subscriptions(State)
)
))/binary>>/binary,
"|shutdown="/utf8>>/binary,
(option_label(lightspeed@testing:shutdown_reason(State)))/binary>>/binary,
"|errors="/utf8>>/binary,
(erlang:integer_to_binary(Errors_count))/binary>>.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 340).
-spec runtime_trace_events(
list(lightspeed@agent@isa:instruction()),
list(lightspeed@agent@isa:instruction()),
integer(),
list(trace_event())
) -> list(trace_event()).
runtime_trace_events(Remaining, Executed_rev, Index, Events_rev) ->
case Remaining of
[] ->
lists:reverse(Events_rev);
[Instruction | Rest] ->
Executed = lists:reverse([Instruction | Executed_rev]),
State = lightspeed@testing:run(Executed),
Event = {trace_event,
Index,
100 + (Index * 5),
lightspeed@agent@isa:describe(Instruction),
runtime_state_signature(State)},
runtime_trace_events(
Rest,
[Instruction | Executed_rev],
Index + 1,
[Event | Events_rev]
)
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 54).
?DOC(" Deterministic runtime fault trace.\n").
-spec runtime_fault_trace(binary()) -> replay_trace().
runtime_fault_trace(Seed) ->
Program = [{render, <<"counter_root"/utf8>>},
{mount, <<"/orders"/utf8>>, <<"csrf_"/utf8, Seed/binary>>},
{render, <<"counter_root"/utf8>>},
{patch, <<"#counter"/utf8>>, <<"<div>1</div>"/utf8>>},
{shutdown, <<"runtime_fault_recovered"/utf8>>}],
Events = runtime_trace_events(Program, [], 0, []),
{replay_trace, <<"runtime_fault_trace"/utf8>>, runtime_flow, Seed, Events}.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 553).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"true"/utf8>>;
false ->
<<"false"/utf8>>
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 411).
-spec session_state_signature(lightspeed@agent@session:session()) -> binary().
session_state_signature(State) ->
Summary = lightspeed@agent@session:telemetry_summary(State),
Latest_telemetry = case lists:reverse(
lightspeed@agent@session:telemetry(State)
) of
[] ->
<<"none"/utf8>>;
[Event | _] ->
lightspeed@agent@session:telemetry_label(Event)
end,
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"lifecycle="/utf8,
(lightspeed@agent@typestate:lifecycle_to_string(
lightspeed@agent@session:lifecycle(
State
)
))/binary>>/binary,
"|counter="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@agent@session:counter(
State
)
))/binary>>/binary,
"|pending="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@agent@session:pending_patch_count(
State
)
))/binary>>/binary,
"|telemetry_count="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@agent@session:telemetry_count(
State
)
))/binary>>/binary,
"|summary_total="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@agent@session:summary_total_events(
Summary
)
))/binary>>/binary,
"|summary_crashed="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@agent@session:summary_session_crashed(
Summary
)
))/binary>>/binary,
"|summary_rehydrated="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@agent@session:summary_session_rehydrated(
Summary
)
))/binary>>/binary,
"|heartbeat_deadline="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@agent@session:heartbeat_deadline_ms(
State
)
))/binary>>/binary,
"|crashed="/utf8>>/binary,
(bool_label(lightspeed@agent@session:crashed(State)))/binary>>/binary,
"|latest_telemetry="/utf8>>/binary,
Latest_telemetry/binary>>.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 451).
-spec session_event_label(lightspeed@agent@session:inbox_event()) -> binary().
session_event_label(Event) ->
case Event of
{connect, Route, _, _} ->
<<"session_connect:"/utf8, Route/binary>>;
{reconnect, Route@1, _} ->
<<"session_reconnect:"/utf8, Route@1/binary>>;
increment ->
<<"session_increment"/utf8>>;
decrement ->
<<"session_decrement"/utf8>>;
{ack, Ref} ->
<<"session_ack:"/utf8, Ref/binary>>;
{heartbeat, _} ->
<<"session_heartbeat"/utf8>>;
{tick, _} ->
<<"session_tick"/utf8>>;
{crash, Reason} ->
<<"session_crash:"/utf8, Reason/binary>>;
{restart, _} ->
<<"session_restart"/utf8>>;
{shutdown, Reason@1} ->
<<"session_shutdown:"/utf8, Reason@1/binary>>
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 440).
-spec session_event_time(lightspeed@agent@session:inbox_event(), integer()) -> integer().
session_event_time(Event, Fallback) ->
case Event of
{connect, _, _, Now_ms} ->
Now_ms;
{reconnect, _, Now_ms@1} ->
Now_ms@1;
{heartbeat, Now_ms@2} ->
Now_ms@2;
{tick, Now_ms@3} ->
Now_ms@3;
{restart, Now_ms@4} ->
Now_ms@4;
_ ->
Fallback
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 386).
-spec session_trace_events(
lightspeed@agent@session:session(),
binary(),
list(lightspeed@agent@session:inbox_event()),
integer(),
list(trace_event())
) -> list(trace_event()).
session_trace_events(State, Owner, Inputs, Index, Events_rev) ->
case Inputs of
[] ->
lists:reverse(Events_rev);
[Input | Rest] ->
Next = lightspeed@agent@session:handle(
State,
{inbox_message, Owner, Input}
),
Event = {trace_event,
Index,
session_event_time(Input, 100 + (Index * 5)),
session_event_label(Input),
session_state_signature(Next)},
session_trace_events(
Next,
Owner,
Rest,
Index + 1,
[Event | Events_rev]
)
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 73).
?DOC(" Deterministic session fault/recovery trace.\n").
-spec session_fault_trace(binary()) -> replay_trace().
session_fault_trace(Seed) ->
Owner = <<"node-a"/utf8>>,
Initial = lightspeed@agent@session:start(
<<"session_"/utf8, Seed/binary>>,
Owner,
rehydrate,
100,
30
),
Events = [{connect, <<"/orders"/utf8>>, <<"csrf_"/utf8, Seed/binary>>, 100},
increment,
{ack, <<"patch-1"/utf8>>},
{crash, <<"runtime_fault"/utf8>>},
{restart, 130},
{reconnect, <<"/orders"/utf8>>, 132},
{tick, 170}],
{replay_trace,
<<"session_fault_trace"/utf8>>,
session_flow,
Seed,
session_trace_events(Initial, Owner, Events, 0, [])}.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 543).
-spec append_events(list(trace_event()), list(trace_event())) -> list(trace_event()).
append_events(Left, Right) ->
case Left of
[] ->
Right;
[Value | Rest] ->
[Value | append_events(Rest, Right)]
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 535).
-spec concat_three(
list(trace_event()),
list(trace_event()),
list(trace_event())
) -> list(trace_event()).
concat_three(First, Second, Third) ->
append_events(append_events(First, Second), Third).
-file("src/lightspeed/ops/replay_diagnostics.gleam", 466).
-spec fresh_pipeline_runtime(binary()) -> lightspeed@pipeline:runtime().
fresh_pipeline_runtime(Seed) ->
_pipe = lightspeed@pipeline:pipeline(
<<"orders_pipeline_"/utf8, Seed/binary>>,
{interval, 60000},
[lightspeed@pipeline:source_stage(
<<"extract_orders"/utf8>>,
<<"raw_order"/utf8>>
),
lightspeed@pipeline:transform_stage(
<<"normalize_orders"/utf8>>,
<<"raw_order"/utf8>>,
<<"normalized_order"/utf8>>
),
lightspeed@pipeline:sink_stage(
<<"write_orders"/utf8>>,
<<"normalized_order"/utf8>>
)]
),
lightspeed@pipeline:new(_pipe).
-file("src/lightspeed/ops/replay_diagnostics.gleam", 96).
?DOC(" Deterministic pipeline replay/ETL trace.\n").
-spec pipeline_etl_trace(binary()) -> replay_trace().
pipeline_etl_trace(Seed) ->
Runtime = fresh_pipeline_runtime(Seed),
Runtime@1 = lightspeed@pipeline:schedule(Runtime, 100),
Runtime@2 = lightspeed@pipeline:trigger(Runtime@1, <<"etl_tick"/utf8>>, 100),
Runtime@3 = lightspeed@pipeline:start(Runtime@2, 101),
Events_a = [{trace_event,
0,
100,
<<"pipeline_schedule_trigger_start"/utf8>>,
lightspeed@pipeline:signature(Runtime@3)}],
{Runtime@4, Source_result} = lightspeed@pipeline:process(
Runtime@3,
<<"extract_orders"/utf8>>,
5,
40,
<<"source-a"/utf8>>,
102
),
{Runtime@5, Transform_result} = lightspeed@pipeline:process(
Runtime@4,
<<"normalize_orders"/utf8>>,
5,
25,
<<"transform-a"/utf8>>,
103
),
{Runtime@6, Sink_result} = lightspeed@pipeline:process(
Runtime@5,
<<"write_orders"/utf8>>,
5,
20,
<<"order-1"/utf8>>,
104
),
Events_b = [{trace_event,
1,
102,
<<"pipeline_source:"/utf8,
(lightspeed@pipeline:process_result_label(Source_result))/binary>>,
lightspeed@pipeline:signature(Runtime@6)},
{trace_event,
2,
103,
<<"pipeline_transform:"/utf8,
(lightspeed@pipeline:process_result_label(Transform_result))/binary>>,
lightspeed@pipeline:signature(Runtime@6)},
{trace_event,
3,
104,
<<"pipeline_sink:"/utf8,
(lightspeed@pipeline:process_result_label(Sink_result))/binary>>,
lightspeed@pipeline:signature(Runtime@6)}],
Runtime@7 = lightspeed@pipeline:crash(
Runtime@6,
<<"etl_runtime_fault"/utf8>>
),
Resumed = case lightspeed@pipeline:resume_from_latest_checkpoint(
Runtime@7,
110
) of
{ok, Next} ->
Next;
{error, _} ->
Runtime@7
end,
{Resumed@1, Duplicate_result} = lightspeed@pipeline:process(
Resumed,
<<"write_orders"/utf8>>,
5,
18,
<<"order-1"/utf8>>,
111
),
{Resumed@2, Replay_sink_result} = lightspeed@pipeline:process(
Resumed@1,
<<"write_orders"/utf8>>,
5,
15,
<<"order-2"/utf8>>,
112
),
Resumed@3 = lightspeed@pipeline:complete(Resumed@2, 118),
Events_c = [{trace_event,
4,
110,
<<"pipeline_resume_replay"/utf8>>,
lightspeed@pipeline:signature(Resumed@3)},
{trace_event,
5,
111,
<<"pipeline_duplicate:"/utf8,
(lightspeed@pipeline:process_result_label(Duplicate_result))/binary>>,
lightspeed@pipeline:signature(Resumed@3)},
{trace_event,
6,
112,
<<"pipeline_replay_sink:"/utf8,
(lightspeed@pipeline:process_result_label(Replay_sink_result))/binary>>,
lightspeed@pipeline:signature(Resumed@3)},
{trace_event,
7,
118,
<<"pipeline_completed"/utf8>>,
lightspeed@pipeline:signature(Resumed@3)}],
{replay_trace,
<<"pipeline_etl_trace"/utf8>>,
pipeline_flow,
Seed,
concat_three(Events_a, Events_b, Events_c)}.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 189).
?DOC(" Trace events accessor.\n").
-spec events(replay_trace()) -> list(trace_event()).
events(Trace) ->
erlang:element(5, Trace).
-file("src/lightspeed/ops/replay_diagnostics.gleam", 194).
?DOC(" Trace id accessor.\n").
-spec trace_id(replay_trace()) -> binary().
trace_id(Trace) ->
erlang:element(2, Trace).
-file("src/lightspeed/ops/replay_diagnostics.gleam", 199).
?DOC(" Flow accessor.\n").
-spec flow(replay_trace()) -> flow().
flow(Trace) ->
erlang:element(3, Trace).
-file("src/lightspeed/ops/replay_diagnostics.gleam", 204).
?DOC(" Flow label.\n").
-spec flow_label(flow()) -> binary().
flow_label(Flow) ->
case Flow of
runtime_flow ->
<<"runtime"/utf8>>;
session_flow ->
<<"session"/utf8>>;
pipeline_flow ->
<<"pipeline"/utf8>>
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 509).
-spec find_event(list(trace_event()), integer()) -> gleam@option:option(trace_event()).
find_event(Events, Index) ->
case Events of
[] ->
none;
[Event | Rest] ->
case erlang:element(2, Event) =:= Index of
true ->
{some, Event};
false ->
find_event(Rest, Index)
end
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 213).
?DOC(" Extract one time-indexed snapshot.\n").
-spec snapshot_at(replay_trace(), integer()) -> {ok, snapshot()} |
{error, binary()}.
snapshot_at(Trace, Index) ->
case find_event(erlang:element(5, Trace), Index) of
none ->
{error,
<<"missing_trace_index:"/utf8,
(erlang:integer_to_binary(Index))/binary>>};
{some, Event} ->
{ok,
{snapshot,
erlang:element(2, Trace),
erlang:element(3, Trace),
erlang:element(2, Event),
erlang:element(3, Event),
erlang:element(4, Event),
erlang:element(5, Event)}}
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 229).
?DOC(" Latest snapshot when present.\n").
-spec latest_snapshot(replay_trace()) -> gleam@option:option(snapshot()).
latest_snapshot(Trace) ->
case lists:reverse(erlang:element(5, Trace)) of
[] ->
none;
[Event | _] ->
{some,
{snapshot,
erlang:element(2, Trace),
erlang:element(3, Trace),
erlang:element(2, Event),
erlang:element(3, Event),
erlang:element(4, Event),
erlang:element(5, Event)}}
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 245).
?DOC(" Snapshot label.\n").
-spec snapshot_label(snapshot()) -> binary().
snapshot_label(Snapshot) ->
<<<<<<<<<<<<<<<<<<"trace="/utf8, (erlang:element(2, Snapshot))/binary>>/binary,
"|flow="/utf8>>/binary,
(flow_label(erlang:element(3, Snapshot)))/binary>>/binary,
"|index="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(4, Snapshot)))/binary>>/binary,
"|at_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(5, Snapshot)))/binary>>/binary,
"|label="/utf8>>/binary,
(erlang:element(6, Snapshot))/binary>>.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 483).
-spec event_signature(trace_event()) -> binary().
event_signature(Event) ->
<<<<<<<<<<<<<<"index="/utf8,
(erlang:integer_to_binary(
erlang:element(2, Event)
))/binary>>/binary,
"|at_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Event)))/binary>>/binary,
"|label="/utf8>>/binary,
(erlang:element(4, Event))/binary>>/binary,
"|state="/utf8>>/binary,
(erlang:element(5, Event))/binary>>.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 527).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 259).
?DOC(" Stable timeline signature.\n").
-spec timeline_signature(replay_trace()) -> binary().
timeline_signature(Trace) ->
join_with(
<<";"/utf8>>,
gleam@list:map(erlang:element(5, Trace), fun event_signature/1)
).
-file("src/lightspeed/ops/replay_diagnostics.gleam", 264).
?DOC(" Stable full trace signature.\n").
-spec trace_signature(replay_trace()) -> binary().
trace_signature(Trace) ->
<<<<<<<<<<<<<<"trace_id="/utf8, (erlang:element(2, Trace))/binary>>/binary,
"|flow="/utf8>>/binary,
(flow_label(erlang:element(3, Trace)))/binary>>/binary,
"|seed="/utf8>>/binary,
(erlang:element(4, Trace))/binary>>/binary,
"|timeline="/utf8>>/binary,
(timeline_signature(Trace))/binary>>.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 276).
?DOC(" Fault-investigation workflow readiness.\n").
-spec fault_investigation_ready(replay_trace()) -> boolean().
fault_investigation_ready(Trace) ->
Signature = timeline_signature(Trace),
Has_time_travel_window = erlang:length(erlang:element(5, Trace)) >= 5,
case erlang:element(3, Trace) of
runtime_flow ->
Has_time_travel_window andalso gleam_stdlib:contains_string(
Signature,
<<"errors=1"/utf8>>
);
session_flow ->
(Has_time_travel_window andalso gleam_stdlib:contains_string(
Signature,
<<"session_crashed"/utf8>>
))
andalso gleam_stdlib:contains_string(
Signature,
<<"session_restarted"/utf8>>
);
pipeline_flow ->
(Has_time_travel_window andalso gleam_stdlib:contains_string(
Signature,
<<"pipeline_resume_replay"/utf8>>
))
andalso gleam_stdlib:contains_string(
Signature,
<<"duplicate_suppressed"/utf8>>
)
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 295).
?DOC(" ETL-investigation workflow readiness.\n").
-spec etl_investigation_ready(replay_trace()) -> boolean().
etl_investigation_ready(Trace) ->
case erlang:element(3, Trace) of
pipeline_flow ->
Signature = timeline_signature(Trace),
(((gleam_stdlib:contains_string(
Signature,
<<"pipeline_resume_replay"/utf8>>
)
andalso gleam_stdlib:contains_string(
Signature,
<<"run-2-replay"/utf8>>
))
andalso gleam_stdlib:contains_string(
Signature,
<<"checkpoints="/utf8>>
))
andalso gleam_stdlib:contains_string(
Signature,
<<"duplicate_suppressed"/utf8>>
))
andalso gleam_stdlib:contains_string(
Signature,
<<"pipeline_completed"/utf8>>
);
_ ->
false
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 310).
?DOC(" Investigation signature bound to one trace.\n").
-spec investigation_signature(investigation_workflow(), replay_trace()) -> binary().
investigation_signature(Workflow, Trace) ->
case Workflow of
fault_workflow ->
<<<<<<"fault_workflow:ready="/utf8,
(bool_label(fault_investigation_ready(Trace)))/binary>>/binary,
"|"/utf8>>/binary,
(trace_signature(Trace))/binary>>;
etl_workflow ->
<<<<<<"etl_workflow:ready="/utf8,
(bool_label(etl_investigation_ready(Trace)))/binary>>/binary,
"|"/utf8>>/binary,
(trace_signature(Trace))/binary>>
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 494).
-spec first_fault_index(list(trace_event())) -> gleam@option:option(integer()).
first_fault_index(Events) ->
case Events of
[] ->
none;
[Event | Rest] ->
case (gleam_stdlib:contains_string(
erlang:element(4, Event),
<<"crash"/utf8>>
)
orelse gleam_stdlib:contains_string(
erlang:element(5, Event),
<<"errors=1"/utf8>>
))
orelse gleam_stdlib:contains_string(
erlang:element(4, Event),
<<"duplicate"/utf8>>
) of
true ->
{some, erlang:element(2, Event)};
false ->
first_fault_index(Rest)
end
end.
-file("src/lightspeed/ops/replay_diagnostics.gleam", 329).
?DOC(" First fault snapshot for triage.\n").
-spec first_fault_snapshot(replay_trace()) -> gleam@option:option(snapshot()).
first_fault_snapshot(Trace) ->
case first_fault_index(erlang:element(5, Trace)) of
none ->
none;
{some, Index} ->
case snapshot_at(Trace, Index) of
{ok, Snapshot} ->
{some, Snapshot};
{error, _} ->
none
end
end.