-module(lightspeed@cluster@durable_session).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/cluster/durable_session.gleam").
-export([supported_backends/0, expected_token/3, start/8, append_counter_delta/3, crash/2, restart/2, request_takeover/5, reconnect/4, journal/1, continuity_report/3, backend_label/1, resume_policy_label/1, lifecycle_label/1, journal_entry_label/1, ownership_error_label/1, fence_signature/1, continuity_signature/1, snapshot_signature/1, signature/1, valid/1, backend/1, fence/1, snapshot/1]).
-export_type([persistence_backend/0, resume_policy/0, lifecycle_state/0, journal_entry/0, snapshot/0, ownership_fence/0, durable_session/0, ownership_error/0, continuity_report/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Durable cluster-session contracts and ownership fencing for M27.\n").
-type persistence_backend() :: {snapshot_only, binary(), integer()} |
{journal_only, binary()} |
{snapshot_journal, binary(), binary(), integer()}.
-type resume_policy() :: resume | rehydrate | remount.
-type lifecycle_state() :: live | disconnected | crashed.
-type journal_entry() :: {counter_delta, integer()} |
{crash_marked, binary()} |
{restarted, integer()} |
{reconnected, resume_policy(), binary(), integer()}.
-type snapshot() :: {snapshot,
binary(),
binary(),
integer(),
lifecycle_state(),
integer(),
integer()}.
-type ownership_fence() :: {ownership_fence, binary(), integer(), binary()}.
-type durable_session() :: {durable_session,
persistence_backend(),
resume_policy(),
ownership_fence(),
snapshot(),
list(journal_entry()),
integer(),
integer()}.
-type ownership_error() :: {stale_owner, integer(), integer()} |
{split_brain, binary(), binary(), integer()} |
{invalid_token, binary(), integer()}.
-type continuity_report() :: {continuity_report,
integer(),
integer(),
boolean()}.
-file("src/lightspeed/cluster/durable_session.gleam", 86).
?DOC(" Supported backend matrix for M27 certification.\n").
-spec supported_backends() -> list(persistence_backend()).
supported_backends() ->
[{snapshot_only, <<"snapshot_store_a"/utf8>>, 2},
{journal_only, <<"journal_store_a"/utf8>>},
{snapshot_journal,
<<"snapshot_store_b"/utf8>>,
<<"journal_store_b"/utf8>>,
2}].
-file("src/lightspeed/cluster/durable_session.gleam", 274).
?DOC(" Expected deterministic fence token.\n").
-spec expected_token(binary(), binary(), integer()) -> binary().
expected_token(Session_id, Owner, Epoch) ->
<<<<<<<<<<"fence:"/utf8, Session_id/binary>>/binary, ":"/utf8>>/binary,
Owner/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Epoch))/binary>>.
-file("src/lightspeed/cluster/durable_session.gleam", 99).
?DOC(" Start a durable session contract.\n").
-spec start(
binary(),
binary(),
binary(),
persistence_backend(),
resume_policy(),
integer(),
integer(),
integer()
) -> durable_session().
start(
Session_id,
Owner,
Route,
Backend,
Resume_policy,
Now_ms,
Continuity_slo_ms,
Max_data_loss_events
) ->
Fence = {ownership_fence, Owner, 1, expected_token(Session_id, Owner, 1)},
{durable_session,
Backend,
Resume_policy,
Fence,
{snapshot, Session_id, Route, 0, disconnected, 1, Now_ms},
[],
Continuity_slo_ms,
Max_data_loss_events}.
-file("src/lightspeed/cluster/durable_session.gleam", 135).
?DOC(" Append one counter-delta entry and update the snapshot.\n").
-spec append_counter_delta(durable_session(), integer(), integer()) -> durable_session().
append_counter_delta(Session, Delta, Now_ms) ->
{durable_session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
begin
_record = erlang:element(5, Session),
{snapshot,
erlang:element(2, _record),
erlang:element(3, _record),
erlang:element(4, erlang:element(5, Session)) + Delta,
erlang:element(5, _record),
erlang:element(6, _record),
Now_ms}
end,
[{counter_delta, Delta} | erlang:element(6, Session)],
erlang:element(7, Session),
erlang:element(8, Session)}.
-file("src/lightspeed/cluster/durable_session.gleam", 152).
?DOC(" Mark a crash in snapshot + journal.\n").
-spec crash(durable_session(), binary()) -> durable_session().
crash(Session, Reason) ->
{durable_session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
begin
_record = erlang:element(5, Session),
{snapshot,
erlang:element(2, _record),
erlang:element(3, _record),
erlang:element(4, _record),
crashed,
erlang:element(6, _record),
erlang:element(7, _record)}
end,
[{crash_marked, Reason} | erlang:element(6, Session)],
erlang:element(7, Session),
erlang:element(8, Session)}.
-file("src/lightspeed/cluster/durable_session.gleam", 161).
?DOC(" Mark a restart in snapshot + journal.\n").
-spec restart(durable_session(), integer()) -> durable_session().
restart(Session, Now_ms) ->
{durable_session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
begin
_record = erlang:element(5, Session),
{snapshot,
erlang:element(2, _record),
erlang:element(3, _record),
erlang:element(4, _record),
disconnected,
erlang:element(6, _record),
Now_ms}
end,
[{restarted, Now_ms} | erlang:element(6, Session)],
erlang:element(7, Session),
erlang:element(8, Session)}.
-file("src/lightspeed/cluster/durable_session.gleam", 174).
?DOC(" Attempt ownership takeover under fencing rules.\n").
-spec request_takeover(
durable_session(),
binary(),
integer(),
binary(),
integer()
) -> {ok, durable_session()} | {error, ownership_error()}.
request_takeover(Session, Owner, Epoch, Token, Now_ms) ->
Current_owner = erlang:element(2, erlang:element(4, Session)),
Current_epoch = erlang:element(3, erlang:element(4, Session)),
Expected = expected_token(
erlang:element(2, erlang:element(5, Session)),
Owner,
Epoch
),
case Epoch < Current_epoch of
true ->
{error, {stale_owner, Current_epoch, Epoch}};
false ->
case (Epoch =:= Current_epoch) andalso (Owner /= Current_owner) of
true ->
{error, {split_brain, Current_owner, Owner, Epoch}};
false ->
case Token =:= Expected of
false ->
{error, {invalid_token, Owner, Epoch}};
true ->
{ok,
{durable_session,
erlang:element(2, Session),
erlang:element(3, Session),
{ownership_fence, Owner, Epoch, Token},
begin
_record = erlang:element(5, Session),
{snapshot,
erlang:element(2, _record),
erlang:element(3, _record),
erlang:element(4, _record),
disconnected,
Epoch,
Now_ms}
end,
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session)}}
end
end
end.
-file("src/lightspeed/cluster/durable_session.gleam", 222).
?DOC(" Reconnect using one recovery policy.\n").
-spec reconnect(durable_session(), resume_policy(), binary(), integer()) -> durable_session().
reconnect(Session, Policy, Route, Now_ms) ->
Counter = case Policy of
resume ->
erlang:element(4, erlang:element(5, Session));
rehydrate ->
erlang:element(4, erlang:element(5, Session));
remount ->
0
end,
{durable_session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
begin
_record = erlang:element(5, Session),
{snapshot,
erlang:element(2, _record),
Route,
Counter,
live,
erlang:element(6, _record),
Now_ms}
end,
[{reconnected, Policy, Route, Now_ms} | erlang:element(6, Session)],
erlang:element(7, Session),
erlang:element(8, Session)}.
-file("src/lightspeed/cluster/durable_session.gleam", 397).
?DOC(" Journal entries in append order.\n").
-spec journal(durable_session()) -> list(journal_entry()).
journal(Session) ->
lists:reverse(erlang:element(6, Session)).
-file("src/lightspeed/cluster/durable_session.gleam", 439).
-spec estimate_data_loss_events(persistence_backend(), list(journal_entry())) -> integer().
estimate_data_loss_events(Backend, Entries) ->
case Backend of
{snapshot_only, _, Checkpoint_every} ->
case erlang:length(Entries) > Checkpoint_every of
true ->
1;
false ->
0
end;
{journal_only, _} ->
0;
{snapshot_journal, _, _, _} ->
0
end.
-file("src/lightspeed/cluster/durable_session.gleam", 251).
?DOC(" Deterministic continuity SLO report for one failover timeline.\n").
-spec continuity_report(durable_session(), integer(), integer()) -> continuity_report().
continuity_report(Session, Failover_at_ms, Reconnect_at_ms) ->
Reconnect_latency_ms = case Reconnect_at_ms < Failover_at_ms of
true ->
0;
false ->
Reconnect_at_ms - Failover_at_ms
end,
Estimated_data_loss_events = estimate_data_loss_events(
erlang:element(2, Session),
journal(Session)
),
Met = (Reconnect_latency_ms =< erlang:element(7, Session)) andalso (Estimated_data_loss_events
=< erlang:element(8, Session)),
{continuity_report, Reconnect_latency_ms, Estimated_data_loss_events, Met}.
-file("src/lightspeed/cluster/durable_session.gleam", 279).
?DOC(" Backend label.\n").
-spec backend_label(persistence_backend()) -> binary().
backend_label(Backend) ->
case Backend of
{snapshot_only, Store, Checkpoint_every} ->
<<<<<<"snapshot_only:"/utf8, Store/binary>>/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(Checkpoint_every))/binary>>;
{journal_only, Store@1} ->
<<"journal_only:"/utf8, Store@1/binary>>;
{snapshot_journal, Snapshot_store, Journal_store, Checkpoint_every@1} ->
<<<<<<<<<<"snapshot_journal:"/utf8, Snapshot_store/binary>>/binary,
":"/utf8>>/binary,
Journal_store/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Checkpoint_every@1))/binary>>
end.
-file("src/lightspeed/cluster/durable_session.gleam", 295).
?DOC(" Resume policy label.\n").
-spec resume_policy_label(resume_policy()) -> binary().
resume_policy_label(Policy) ->
case Policy of
resume ->
<<"resume"/utf8>>;
rehydrate ->
<<"rehydrate"/utf8>>;
remount ->
<<"remount"/utf8>>
end.
-file("src/lightspeed/cluster/durable_session.gleam", 304).
?DOC(" Lifecycle label.\n").
-spec lifecycle_label(lifecycle_state()) -> binary().
lifecycle_label(Lifecycle) ->
case Lifecycle of
live ->
<<"live"/utf8>>;
disconnected ->
<<"disconnected"/utf8>>;
crashed ->
<<"crashed"/utf8>>
end.
-file("src/lightspeed/cluster/durable_session.gleam", 313).
?DOC(" Journal entry label.\n").
-spec journal_entry_label(journal_entry()) -> binary().
journal_entry_label(Entry) ->
case Entry of
{counter_delta, Delta} ->
<<"counter_delta:"/utf8, (erlang:integer_to_binary(Delta))/binary>>;
{crash_marked, Reason} ->
<<"crash_marked:"/utf8, Reason/binary>>;
{restarted, Now_ms} ->
<<"restarted:"/utf8, (erlang:integer_to_binary(Now_ms))/binary>>;
{reconnected, Policy, Route, Now_ms@1} ->
<<<<<<<<<<"reconnected:"/utf8,
(resume_policy_label(Policy))/binary>>/binary,
":"/utf8>>/binary,
Route/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Now_ms@1))/binary>>
end.
-file("src/lightspeed/cluster/durable_session.gleam", 329).
?DOC(" Ownership error label.\n").
-spec ownership_error_label(ownership_error()) -> binary().
ownership_error_label(Error) ->
case Error of
{stale_owner, Current_epoch, Requested_epoch} ->
<<<<<<"stale_owner:"/utf8,
(erlang:integer_to_binary(Current_epoch))/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Requested_epoch))/binary>>;
{split_brain, Current_owner, Requested_owner, Epoch} ->
<<<<<<<<<<"split_brain:"/utf8, Current_owner/binary>>/binary,
":"/utf8>>/binary,
Requested_owner/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Epoch))/binary>>;
{invalid_token, Owner, Epoch@1} ->
<<<<<<"invalid_token:"/utf8, Owner/binary>>/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(Epoch@1))/binary>>
end.
-file("src/lightspeed/cluster/durable_session.gleam", 349).
?DOC(" Ownership-fence signature.\n").
-spec fence_signature(ownership_fence()) -> binary().
fence_signature(Fence) ->
<<<<<<<<(erlang:element(2, Fence))/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Fence)))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(4, Fence))/binary>>.
-file("src/lightspeed/cluster/durable_session.gleam", 454).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"true"/utf8>>;
false ->
<<"false"/utf8>>
end.
-file("src/lightspeed/cluster/durable_session.gleam", 354).
?DOC(" Continuity-report signature.\n").
-spec continuity_signature(continuity_report()) -> binary().
continuity_signature(Report) ->
<<<<<<<<<<"latency_ms="/utf8,
(erlang:integer_to_binary(erlang:element(2, Report)))/binary>>/binary,
"|data_loss_events="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Report)))/binary>>/binary,
"|met="/utf8>>/binary,
(bool_label(erlang:element(4, Report)))/binary>>.
-file("src/lightspeed/cluster/durable_session.gleam", 461).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/cluster/durable_session.gleam", 382).
?DOC(" Snapshot signature.\n").
-spec snapshot_signature(snapshot()) -> binary().
snapshot_signature(Snapshot) ->
<<<<<<<<<<<<<<<<<<<<(erlang:element(2, Snapshot))/binary, ":"/utf8>>/binary,
(erlang:element(3, Snapshot))/binary>>/binary,
":counter="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(4, Snapshot)
))/binary>>/binary,
":lifecycle="/utf8>>/binary,
(lifecycle_label(erlang:element(5, Snapshot)))/binary>>/binary,
":epoch="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(6, Snapshot)))/binary>>/binary,
":checkpoint_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(7, Snapshot)))/binary>>.
-file("src/lightspeed/cluster/durable_session.gleam", 364).
?DOC(" Deterministic durable-session signature.\n").
-spec signature(durable_session()) -> binary().
signature(Session) ->
<<<<<<<<<<<<<<<<<<<<<<<<<<"backend="/utf8,
(backend_label(
erlang:element(
2,
Session
)
))/binary>>/binary,
"|resume_policy="/utf8>>/binary,
(resume_policy_label(
erlang:element(3, Session)
))/binary>>/binary,
"|fence="/utf8>>/binary,
(fence_signature(
erlang:element(4, Session)
))/binary>>/binary,
"|snapshot="/utf8>>/binary,
(snapshot_signature(erlang:element(5, Session)))/binary>>/binary,
"|journal="/utf8>>/binary,
(join_with(
<<","/utf8>>,
gleam@list:map(
journal(Session),
fun journal_entry_label/1
)
))/binary>>/binary,
"|continuity_slo_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(7, Session)))/binary>>/binary,
"|max_data_loss_events="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(8, Session)))/binary>>.
-file("src/lightspeed/cluster/durable_session.gleam", 430).
-spec valid_backend(persistence_backend()) -> boolean().
valid_backend(Backend) ->
case Backend of
{snapshot_only, Store, Checkpoint_every} ->
(Store /= <<""/utf8>>) andalso (Checkpoint_every > 0);
{journal_only, Store@1} ->
Store@1 /= <<""/utf8>>;
{snapshot_journal, Snapshot_store, Journal_store, Checkpoint_every@1} ->
((Snapshot_store /= <<""/utf8>>) andalso (Journal_store /= <<""/utf8>>))
andalso (Checkpoint_every@1 > 0)
end.
-file("src/lightspeed/cluster/durable_session.gleam", 402).
?DOC(" Validate one durable-session contract.\n").
-spec valid(durable_session()) -> boolean().
valid(Session) ->
(((valid_backend(erlang:element(2, Session)) andalso (erlang:element(
3,
erlang:element(5, Session)
)
/= <<""/utf8>>))
andalso (erlang:element(7, Session) > 0))
andalso (erlang:element(8, Session) >= 0))
andalso (erlang:element(4, erlang:element(4, Session)) =:= expected_token(
erlang:element(2, erlang:element(5, Session)),
erlang:element(2, erlang:element(4, Session)),
erlang:element(3, erlang:element(4, Session))
)).
-file("src/lightspeed/cluster/durable_session.gleam", 416).
?DOC(" Durable-state backend accessor.\n").
-spec backend(durable_session()) -> persistence_backend().
backend(Session) ->
erlang:element(2, Session).
-file("src/lightspeed/cluster/durable_session.gleam", 421).
?DOC(" Durable-state fence accessor.\n").
-spec fence(durable_session()) -> ownership_fence().
fence(Session) ->
erlang:element(4, Session).
-file("src/lightspeed/cluster/durable_session.gleam", 426).
?DOC(" Durable-state snapshot accessor.\n").
-spec snapshot(durable_session()) -> snapshot().
snapshot(Session) ->
erlang:element(5, Session).