src/lightspeed@cluster@durable_session.erl

-module(lightspeed@cluster@durable_session).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/cluster/durable_session.gleam").
-export([supported_backends/0, expected_token/3, start/8, append_counter_delta/3, crash/2, restart/2, request_takeover/5, reconnect/4, journal/1, continuity_report/3, backend_label/1, resume_policy_label/1, lifecycle_label/1, journal_entry_label/1, ownership_error_label/1, fence_signature/1, continuity_signature/1, snapshot_signature/1, signature/1, valid/1, backend/1, fence/1, snapshot/1]).
-export_type([persistence_backend/0, resume_policy/0, lifecycle_state/0, journal_entry/0, snapshot/0, ownership_fence/0, durable_session/0, ownership_error/0, continuity_report/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Durable cluster-session contracts and ownership fencing for M27.\n").

-type persistence_backend() :: {snapshot_only, binary(), integer()} |
    {journal_only, binary()} |
    {snapshot_journal, binary(), binary(), integer()}.

-type resume_policy() :: resume | rehydrate | remount.

-type lifecycle_state() :: live | disconnected | crashed.

-type journal_entry() :: {counter_delta, integer()} |
    {crash_marked, binary()} |
    {restarted, integer()} |
    {reconnected, resume_policy(), binary(), integer()}.

-type snapshot() :: {snapshot,
        binary(),
        binary(),
        integer(),
        lifecycle_state(),
        integer(),
        integer()}.

-type ownership_fence() :: {ownership_fence, binary(), integer(), binary()}.

-type durable_session() :: {durable_session,
        persistence_backend(),
        resume_policy(),
        ownership_fence(),
        snapshot(),
        list(journal_entry()),
        integer(),
        integer()}.

-type ownership_error() :: {stale_owner, integer(), integer()} |
    {split_brain, binary(), binary(), integer()} |
    {invalid_token, binary(), integer()}.

-type continuity_report() :: {continuity_report,
        integer(),
        integer(),
        boolean()}.

-file("src/lightspeed/cluster/durable_session.gleam", 86).
?DOC(" Supported backend matrix for M27 certification.\n").
-spec supported_backends() -> list(persistence_backend()).
supported_backends() ->
    [{snapshot_only, <<"snapshot_store_a"/utf8>>, 2},
        {journal_only, <<"journal_store_a"/utf8>>},
        {snapshot_journal,
            <<"snapshot_store_b"/utf8>>,
            <<"journal_store_b"/utf8>>,
            2}].

-file("src/lightspeed/cluster/durable_session.gleam", 274).
?DOC(" Expected deterministic fence token.\n").
-spec expected_token(binary(), binary(), integer()) -> binary().
expected_token(Session_id, Owner, Epoch) ->
    <<<<<<<<<<"fence:"/utf8, Session_id/binary>>/binary, ":"/utf8>>/binary,
                Owner/binary>>/binary,
            ":"/utf8>>/binary,
        (erlang:integer_to_binary(Epoch))/binary>>.

-file("src/lightspeed/cluster/durable_session.gleam", 99).
?DOC(" Start a durable session contract.\n").
-spec start(
    binary(),
    binary(),
    binary(),
    persistence_backend(),
    resume_policy(),
    integer(),
    integer(),
    integer()
) -> durable_session().
start(
    Session_id,
    Owner,
    Route,
    Backend,
    Resume_policy,
    Now_ms,
    Continuity_slo_ms,
    Max_data_loss_events
) ->
    Fence = {ownership_fence, Owner, 1, expected_token(Session_id, Owner, 1)},
    {durable_session,
        Backend,
        Resume_policy,
        Fence,
        {snapshot, Session_id, Route, 0, disconnected, 1, Now_ms},
        [],
        Continuity_slo_ms,
        Max_data_loss_events}.

-file("src/lightspeed/cluster/durable_session.gleam", 135).
?DOC(" Append one counter-delta entry and update the snapshot.\n").
-spec append_counter_delta(durable_session(), integer(), integer()) -> durable_session().
append_counter_delta(Session, Delta, Now_ms) ->
    {durable_session,
        erlang:element(2, Session),
        erlang:element(3, Session),
        erlang:element(4, Session),
        begin
            _record = erlang:element(5, Session),
            {snapshot,
                erlang:element(2, _record),
                erlang:element(3, _record),
                erlang:element(4, erlang:element(5, Session)) + Delta,
                erlang:element(5, _record),
                erlang:element(6, _record),
                Now_ms}
        end,
        [{counter_delta, Delta} | erlang:element(6, Session)],
        erlang:element(7, Session),
        erlang:element(8, Session)}.

-file("src/lightspeed/cluster/durable_session.gleam", 152).
?DOC(" Mark a crash in snapshot + journal.\n").
-spec crash(durable_session(), binary()) -> durable_session().
crash(Session, Reason) ->
    {durable_session,
        erlang:element(2, Session),
        erlang:element(3, Session),
        erlang:element(4, Session),
        begin
            _record = erlang:element(5, Session),
            {snapshot,
                erlang:element(2, _record),
                erlang:element(3, _record),
                erlang:element(4, _record),
                crashed,
                erlang:element(6, _record),
                erlang:element(7, _record)}
        end,
        [{crash_marked, Reason} | erlang:element(6, Session)],
        erlang:element(7, Session),
        erlang:element(8, Session)}.

-file("src/lightspeed/cluster/durable_session.gleam", 161).
?DOC(" Mark a restart in snapshot + journal.\n").
-spec restart(durable_session(), integer()) -> durable_session().
restart(Session, Now_ms) ->
    {durable_session,
        erlang:element(2, Session),
        erlang:element(3, Session),
        erlang:element(4, Session),
        begin
            _record = erlang:element(5, Session),
            {snapshot,
                erlang:element(2, _record),
                erlang:element(3, _record),
                erlang:element(4, _record),
                disconnected,
                erlang:element(6, _record),
                Now_ms}
        end,
        [{restarted, Now_ms} | erlang:element(6, Session)],
        erlang:element(7, Session),
        erlang:element(8, Session)}.

-file("src/lightspeed/cluster/durable_session.gleam", 174).
?DOC(" Attempt ownership takeover under fencing rules.\n").
-spec request_takeover(
    durable_session(),
    binary(),
    integer(),
    binary(),
    integer()
) -> {ok, durable_session()} | {error, ownership_error()}.
request_takeover(Session, Owner, Epoch, Token, Now_ms) ->
    Current_owner = erlang:element(2, erlang:element(4, Session)),
    Current_epoch = erlang:element(3, erlang:element(4, Session)),
    Expected = expected_token(
        erlang:element(2, erlang:element(5, Session)),
        Owner,
        Epoch
    ),
    case Epoch < Current_epoch of
        true ->
            {error, {stale_owner, Current_epoch, Epoch}};

        false ->
            case (Epoch =:= Current_epoch) andalso (Owner /= Current_owner) of
                true ->
                    {error, {split_brain, Current_owner, Owner, Epoch}};

                false ->
                    case Token =:= Expected of
                        false ->
                            {error, {invalid_token, Owner, Epoch}};

                        true ->
                            {ok,
                                {durable_session,
                                    erlang:element(2, Session),
                                    erlang:element(3, Session),
                                    {ownership_fence, Owner, Epoch, Token},
                                    begin
                                        _record = erlang:element(5, Session),
                                        {snapshot,
                                            erlang:element(2, _record),
                                            erlang:element(3, _record),
                                            erlang:element(4, _record),
                                            disconnected,
                                            Epoch,
                                            Now_ms}
                                    end,
                                    erlang:element(6, Session),
                                    erlang:element(7, Session),
                                    erlang:element(8, Session)}}
                    end
            end
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 222).
?DOC(" Reconnect using one recovery policy.\n").
-spec reconnect(durable_session(), resume_policy(), binary(), integer()) -> durable_session().
reconnect(Session, Policy, Route, Now_ms) ->
    Counter = case Policy of
        resume ->
            erlang:element(4, erlang:element(5, Session));

        rehydrate ->
            erlang:element(4, erlang:element(5, Session));

        remount ->
            0
    end,
    {durable_session,
        erlang:element(2, Session),
        erlang:element(3, Session),
        erlang:element(4, Session),
        begin
            _record = erlang:element(5, Session),
            {snapshot,
                erlang:element(2, _record),
                Route,
                Counter,
                live,
                erlang:element(6, _record),
                Now_ms}
        end,
        [{reconnected, Policy, Route, Now_ms} | erlang:element(6, Session)],
        erlang:element(7, Session),
        erlang:element(8, Session)}.

-file("src/lightspeed/cluster/durable_session.gleam", 397).
?DOC(" Journal entries in append order.\n").
-spec journal(durable_session()) -> list(journal_entry()).
journal(Session) ->
    lists:reverse(erlang:element(6, Session)).

-file("src/lightspeed/cluster/durable_session.gleam", 439).
-spec estimate_data_loss_events(persistence_backend(), list(journal_entry())) -> integer().
estimate_data_loss_events(Backend, Entries) ->
    case Backend of
        {snapshot_only, _, Checkpoint_every} ->
            case erlang:length(Entries) > Checkpoint_every of
                true ->
                    1;

                false ->
                    0
            end;

        {journal_only, _} ->
            0;

        {snapshot_journal, _, _, _} ->
            0
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 251).
?DOC(" Deterministic continuity SLO report for one failover timeline.\n").
-spec continuity_report(durable_session(), integer(), integer()) -> continuity_report().
continuity_report(Session, Failover_at_ms, Reconnect_at_ms) ->
    Reconnect_latency_ms = case Reconnect_at_ms < Failover_at_ms of
        true ->
            0;

        false ->
            Reconnect_at_ms - Failover_at_ms
    end,
    Estimated_data_loss_events = estimate_data_loss_events(
        erlang:element(2, Session),
        journal(Session)
    ),
    Met = (Reconnect_latency_ms =< erlang:element(7, Session)) andalso (Estimated_data_loss_events
    =< erlang:element(8, Session)),
    {continuity_report, Reconnect_latency_ms, Estimated_data_loss_events, Met}.

-file("src/lightspeed/cluster/durable_session.gleam", 279).
?DOC(" Backend label.\n").
-spec backend_label(persistence_backend()) -> binary().
backend_label(Backend) ->
    case Backend of
        {snapshot_only, Store, Checkpoint_every} ->
            <<<<<<"snapshot_only:"/utf8, Store/binary>>/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(Checkpoint_every))/binary>>;

        {journal_only, Store@1} ->
            <<"journal_only:"/utf8, Store@1/binary>>;

        {snapshot_journal, Snapshot_store, Journal_store, Checkpoint_every@1} ->
            <<<<<<<<<<"snapshot_journal:"/utf8, Snapshot_store/binary>>/binary,
                            ":"/utf8>>/binary,
                        Journal_store/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Checkpoint_every@1))/binary>>
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 295).
?DOC(" Resume policy label.\n").
-spec resume_policy_label(resume_policy()) -> binary().
resume_policy_label(Policy) ->
    case Policy of
        resume ->
            <<"resume"/utf8>>;

        rehydrate ->
            <<"rehydrate"/utf8>>;

        remount ->
            <<"remount"/utf8>>
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 304).
?DOC(" Lifecycle label.\n").
-spec lifecycle_label(lifecycle_state()) -> binary().
lifecycle_label(Lifecycle) ->
    case Lifecycle of
        live ->
            <<"live"/utf8>>;

        disconnected ->
            <<"disconnected"/utf8>>;

        crashed ->
            <<"crashed"/utf8>>
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 313).
?DOC(" Journal entry label.\n").
-spec journal_entry_label(journal_entry()) -> binary().
journal_entry_label(Entry) ->
    case Entry of
        {counter_delta, Delta} ->
            <<"counter_delta:"/utf8, (erlang:integer_to_binary(Delta))/binary>>;

        {crash_marked, Reason} ->
            <<"crash_marked:"/utf8, Reason/binary>>;

        {restarted, Now_ms} ->
            <<"restarted:"/utf8, (erlang:integer_to_binary(Now_ms))/binary>>;

        {reconnected, Policy, Route, Now_ms@1} ->
            <<<<<<<<<<"reconnected:"/utf8,
                                (resume_policy_label(Policy))/binary>>/binary,
                            ":"/utf8>>/binary,
                        Route/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Now_ms@1))/binary>>
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 329).
?DOC(" Ownership error label.\n").
-spec ownership_error_label(ownership_error()) -> binary().
ownership_error_label(Error) ->
    case Error of
        {stale_owner, Current_epoch, Requested_epoch} ->
            <<<<<<"stale_owner:"/utf8,
                        (erlang:integer_to_binary(Current_epoch))/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Requested_epoch))/binary>>;

        {split_brain, Current_owner, Requested_owner, Epoch} ->
            <<<<<<<<<<"split_brain:"/utf8, Current_owner/binary>>/binary,
                            ":"/utf8>>/binary,
                        Requested_owner/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Epoch))/binary>>;

        {invalid_token, Owner, Epoch@1} ->
            <<<<<<"invalid_token:"/utf8, Owner/binary>>/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(Epoch@1))/binary>>
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 349).
?DOC(" Ownership-fence signature.\n").
-spec fence_signature(ownership_fence()) -> binary().
fence_signature(Fence) ->
    <<<<<<<<(erlang:element(2, Fence))/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(3, Fence)))/binary>>/binary,
            ":"/utf8>>/binary,
        (erlang:element(4, Fence))/binary>>.

-file("src/lightspeed/cluster/durable_session.gleam", 454).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
    case Value of
        true ->
            <<"true"/utf8>>;

        false ->
            <<"false"/utf8>>
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 354).
?DOC(" Continuity-report signature.\n").
-spec continuity_signature(continuity_report()) -> binary().
continuity_signature(Report) ->
    <<<<<<<<<<"latency_ms="/utf8,
                        (erlang:integer_to_binary(erlang:element(2, Report)))/binary>>/binary,
                    "|data_loss_events="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(3, Report)))/binary>>/binary,
            "|met="/utf8>>/binary,
        (bool_label(erlang:element(4, Report)))/binary>>.

-file("src/lightspeed/cluster/durable_session.gleam", 461).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [Value] ->
            Value;

        [Value@1 | Rest] ->
            <<<<Value@1/binary, Separator/binary>>/binary,
                (join_with(Separator, Rest))/binary>>
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 382).
?DOC(" Snapshot signature.\n").
-spec snapshot_signature(snapshot()) -> binary().
snapshot_signature(Snapshot) ->
    <<<<<<<<<<<<<<<<<<<<(erlang:element(2, Snapshot))/binary, ":"/utf8>>/binary,
                                        (erlang:element(3, Snapshot))/binary>>/binary,
                                    ":counter="/utf8>>/binary,
                                (erlang:integer_to_binary(
                                    erlang:element(4, Snapshot)
                                ))/binary>>/binary,
                            ":lifecycle="/utf8>>/binary,
                        (lifecycle_label(erlang:element(5, Snapshot)))/binary>>/binary,
                    ":epoch="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(6, Snapshot)))/binary>>/binary,
            ":checkpoint_ms="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(7, Snapshot)))/binary>>.

-file("src/lightspeed/cluster/durable_session.gleam", 364).
?DOC(" Deterministic durable-session signature.\n").
-spec signature(durable_session()) -> binary().
signature(Session) ->
    <<<<<<<<<<<<<<<<<<<<<<<<<<"backend="/utf8,
                                                        (backend_label(
                                                            erlang:element(
                                                                2,
                                                                Session
                                                            )
                                                        ))/binary>>/binary,
                                                    "|resume_policy="/utf8>>/binary,
                                                (resume_policy_label(
                                                    erlang:element(3, Session)
                                                ))/binary>>/binary,
                                            "|fence="/utf8>>/binary,
                                        (fence_signature(
                                            erlang:element(4, Session)
                                        ))/binary>>/binary,
                                    "|snapshot="/utf8>>/binary,
                                (snapshot_signature(erlang:element(5, Session)))/binary>>/binary,
                            "|journal="/utf8>>/binary,
                        (join_with(
                            <<","/utf8>>,
                            gleam@list:map(
                                journal(Session),
                                fun journal_entry_label/1
                            )
                        ))/binary>>/binary,
                    "|continuity_slo_ms="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(7, Session)))/binary>>/binary,
            "|max_data_loss_events="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(8, Session)))/binary>>.

-file("src/lightspeed/cluster/durable_session.gleam", 430).
-spec valid_backend(persistence_backend()) -> boolean().
valid_backend(Backend) ->
    case Backend of
        {snapshot_only, Store, Checkpoint_every} ->
            (Store /= <<""/utf8>>) andalso (Checkpoint_every > 0);

        {journal_only, Store@1} ->
            Store@1 /= <<""/utf8>>;

        {snapshot_journal, Snapshot_store, Journal_store, Checkpoint_every@1} ->
            ((Snapshot_store /= <<""/utf8>>) andalso (Journal_store /= <<""/utf8>>))
            andalso (Checkpoint_every@1 > 0)
    end.

-file("src/lightspeed/cluster/durable_session.gleam", 402).
?DOC(" Validate one durable-session contract.\n").
-spec valid(durable_session()) -> boolean().
valid(Session) ->
    (((valid_backend(erlang:element(2, Session)) andalso (erlang:element(
        3,
        erlang:element(5, Session)
    )
    /= <<""/utf8>>))
    andalso (erlang:element(7, Session) > 0))
    andalso (erlang:element(8, Session) >= 0))
    andalso (erlang:element(4, erlang:element(4, Session)) =:= expected_token(
        erlang:element(2, erlang:element(5, Session)),
        erlang:element(2, erlang:element(4, Session)),
        erlang:element(3, erlang:element(4, Session))
    )).

-file("src/lightspeed/cluster/durable_session.gleam", 416).
?DOC(" Durable-state backend accessor.\n").
-spec backend(durable_session()) -> persistence_backend().
backend(Session) ->
    erlang:element(2, Session).

-file("src/lightspeed/cluster/durable_session.gleam", 421).
?DOC(" Durable-state fence accessor.\n").
-spec fence(durable_session()) -> ownership_fence().
fence(Session) ->
    erlang:element(4, Session).

-file("src/lightspeed/cluster/durable_session.gleam", 426).
?DOC(" Durable-state snapshot accessor.\n").
-spec snapshot(durable_session()) -> snapshot().
snapshot(Session) ->
    erlang:element(5, Session).