-module(lightspeed@agent@session).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/agent/session.gleam").
-export([start/5, handle/2, id/1, owner/1, lifecycle/1, counter/1, reconnect_policy/1, pending_patches/1, pending_patch_count/1, oldest_pending_patch/1, telemetry/1, telemetry_count/1, telemetry_summary/1, summary_patch_queued/1, summary_patch_acked/1, summary_session_crashed/1, summary_session_rehydrated/1, summary_session_remounted/1, summary_total_events/1, heartbeat_deadline_ms/1, crashed/1, flush_outbox/1, patch_ref/1, patch/1, telemetry_label/1]).
-export_type([reconnect_policy/0, inbox_event/0, inbox_message/0, patch_envelope/0, telemetry_event/0, outbox_message/0, telemetry_summary/0, session/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Minimal typed session actor model for Phase 2.\n").
-type reconnect_policy() :: rehydrate | remount.
-type inbox_event() :: {connect, binary(), binary(), integer()} |
{reconnect, binary(), integer()} |
increment |
decrement |
{ack, binary()} |
{heartbeat, integer()} |
{tick, integer()} |
{crash, binary()} |
{restart, integer()} |
{shutdown, binary()}.
-type inbox_message() :: {inbox_message, binary(), inbox_event()}.
-type patch_envelope() :: {patch_envelope, binary(), lightspeed@diff:patch()}.
-type telemetry_event() :: {session_mounted, binary(), binary()} |
{session_rehydrated, binary(), binary()} |
{session_remounted, binary(), binary()} |
{counter_updated, binary(), integer()} |
{patch_queued, binary(), binary()} |
{patch_acked, binary(), binary()} |
{heartbeat_received, binary(), integer()} |
{heartbeat_timed_out, binary(), integer()} |
{session_crashed, binary(), binary()} |
{session_restarted, binary()} |
{session_shutdown, binary(), binary()} |
{ownership_rejected, binary(), binary()} |
{event_ignored, binary(), binary()}.
-type outbox_message() :: {outbox_patch, patch_envelope()} |
{outbox_telemetry, telemetry_event()}.
-type telemetry_summary() :: {telemetry_summary,
integer(),
integer(),
integer(),
integer(),
integer(),
integer()}.
-opaque session() :: {session,
binary(),
binary(),
lightspeed@agent@typestate:lifecycle(),
reconnect_policy(),
gleam@option:option(binary()),
integer(),
integer(),
list(patch_envelope()),
list(outbox_message()),
list(telemetry_event()),
boolean(),
integer(),
integer(),
boolean()}.
-file("src/lightspeed/agent/session.gleam", 95).
?DOC(" Start a new disconnected session actor state.\n").
-spec start(binary(), binary(), reconnect_policy(), integer(), integer()) -> session().
start(Id, Owner, Reconnect_policy, Now_ms, Timeout_ms) ->
{session,
Id,
Owner,
disconnected_label,
Reconnect_policy,
none,
0,
1,
[],
[],
[],
false,
Timeout_ms,
Now_ms + Timeout_ms,
false}.
-file("src/lightspeed/agent/session.gleam", 467).
-spec emit_telemetry(session(), telemetry_event()) -> session().
emit_telemetry(Session, Event) ->
{session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
erlang:element(5, Session),
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session),
erlang:element(9, Session),
[{outbox_telemetry, Event} | erlang:element(10, Session)],
[Event | erlang:element(11, Session)],
erlang:element(12, Session),
erlang:element(13, Session),
erlang:element(14, Session),
erlang:element(15, Session)}.
-file("src/lightspeed/agent/session.gleam", 475).
-spec ignore_event(session(), binary()) -> session().
ignore_event(Session, Name) ->
emit_telemetry(Session, {event_ignored, erlang:element(2, Session), Name}).
-file("src/lightspeed/agent/session.gleam", 375).
-spec restart_session(session(), integer()) -> session().
restart_session(Session, Now_ms) ->
case erlang:element(5, Session) of
rehydrate ->
{session,
erlang:element(2, Session),
erlang:element(3, Session),
disconnected_label,
erlang:element(5, Session),
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session),
[],
[],
erlang:element(11, Session),
false,
erlang:element(13, Session),
Now_ms + erlang:element(13, Session),
false};
remount ->
{session,
erlang:element(2, Session),
erlang:element(3, Session),
disconnected_label,
erlang:element(5, Session),
erlang:element(6, Session),
0,
erlang:element(8, Session),
[],
[],
erlang:element(11, Session),
false,
erlang:element(13, Session),
Now_ms + erlang:element(13, Session),
false}
end.
-file("src/lightspeed/agent/session.gleam", 479).
-spec remove_patch_ref(list(patch_envelope()), binary()) -> {list(patch_envelope()),
boolean()}.
remove_patch_ref(Patches, Ref) ->
case Patches of
[] ->
{[], false};
[Entry | Rest] ->
case erlang:element(2, Entry) =:= Ref of
true ->
{Rest, true};
false ->
{Next, Removed} = remove_patch_ref(Rest, Ref),
{[Entry | Next], Removed}
end
end.
-file("src/lightspeed/agent/session.gleam", 413).
-spec apply_ack(session(), binary()) -> session().
apply_ack(Session, Ref) ->
{Pending_rev, Removed} = remove_patch_ref(erlang:element(9, Session), Ref),
case Removed of
true ->
_pipe = {session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
erlang:element(5, Session),
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session),
Pending_rev,
erlang:element(10, Session),
erlang:element(11, Session),
erlang:element(12, Session),
erlang:element(13, Session),
erlang:element(14, Session),
erlang:element(15, Session)},
emit_telemetry(
_pipe,
{patch_acked, erlang:element(2, Session), Ref}
);
false ->
ignore_event(Session, <<"ack"/utf8>>)
end.
-file("src/lightspeed/agent/session.gleam", 457).
-spec counter_fingerprint() -> binary().
counter_fingerprint() ->
<<"counter-view/v1"/utf8>>.
-file("src/lightspeed/agent/session.gleam", 461).
-spec counter_static_html() -> binary().
counter_static_html() ->
<<<<"<button data-ls-key=\"counter-button\">"/utf8,
"<span data-ls-slot=\"counter\"></span>"/utf8>>/binary,
"</button>"/utf8>>.
-file("src/lightspeed/agent/session.gleam", 424).
-spec queue_counter_patch(session()) -> session().
queue_counter_patch(Session) ->
Ref = erlang:integer_to_binary(erlang:element(8, Session)),
Patch_payload = case erlang:element(12, Session) of
false ->
{replace_segments,
<<"#app"/utf8>>,
counter_fingerprint(),
counter_static_html(),
[lightspeed@diff:slot(
<<"counter"/utf8>>,
erlang:integer_to_binary(erlang:element(7, Session))
)]};
true ->
{update_segments,
<<"#app"/utf8>>,
counter_fingerprint(),
[lightspeed@diff:slot(
<<"counter"/utf8>>,
erlang:integer_to_binary(erlang:element(7, Session))
)]}
end,
Patch = {patch_envelope, Ref, Patch_payload},
_pipe = {session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
erlang:element(5, Session),
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session) + 1,
[Patch | erlang:element(9, Session)],
[{outbox_patch, Patch} | erlang:element(10, Session)],
erlang:element(11, Session),
true,
erlang:element(13, Session),
erlang:element(14, Session),
erlang:element(15, Session)},
emit_telemetry(_pipe, {patch_queued, erlang:element(2, Session), Ref}).
-file("src/lightspeed/agent/session.gleam", 401).
-spec apply_counter_delta(session(), integer()) -> session().
apply_counter_delta(Session, Delta) ->
case erlang:element(4, Session) of
live_label ->
Counter = erlang:element(7, Session) + Delta,
_pipe = {session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
erlang:element(5, Session),
erlang:element(6, Session),
Counter,
erlang:element(8, Session),
erlang:element(9, Session),
erlang:element(10, Session),
erlang:element(11, Session),
erlang:element(12, Session),
erlang:element(13, Session),
erlang:element(14, Session),
erlang:element(15, Session)},
_pipe@1 = emit_telemetry(
_pipe,
{counter_updated, erlang:element(2, Session), Counter}
),
queue_counter_patch(_pipe@1);
_ ->
ignore_event(Session, <<"counter_update"/utf8>>)
end.
-file("src/lightspeed/agent/session.gleam", 348).
-spec connect_live(session(), binary(), integer(), telemetry_event(), boolean()) -> session().
connect_live(Session, Route, Now_ms, Connect_event, Keep_counter) ->
Counter = case Keep_counter of
true ->
erlang:element(7, Session);
false ->
0
end,
_pipe = {session,
erlang:element(2, Session),
erlang:element(3, Session),
live_label,
erlang:element(5, Session),
{some, Route},
Counter,
erlang:element(8, Session),
[],
[],
erlang:element(11, Session),
false,
erlang:element(13, Session),
Now_ms + erlang:element(13, Session),
false},
_pipe@1 = emit_telemetry(_pipe, Connect_event),
queue_counter_patch(_pipe@1).
-file("src/lightspeed/agent/session.gleam", 271).
-spec apply_event(session(), inbox_event()) -> session().
apply_event(Session, Event) ->
case Event of
{connect, Route, _, Now_ms} ->
connect_live(
Session,
Route,
Now_ms,
{session_mounted, erlang:element(2, Session), Route},
true
);
{reconnect, Route@1, Now_ms@1} ->
case erlang:element(5, Session) of
rehydrate ->
connect_live(
Session,
Route@1,
Now_ms@1,
{session_rehydrated,
erlang:element(2, Session),
Route@1},
true
);
remount ->
connect_live(
Session,
Route@1,
Now_ms@1,
{session_remounted, erlang:element(2, Session), Route@1},
false
)
end;
increment ->
apply_counter_delta(Session, 1);
decrement ->
apply_counter_delta(Session, -1);
{ack, Ref} ->
apply_ack(Session, Ref);
{heartbeat, Now_ms@2} ->
case erlang:element(4, Session) of
live_label ->
Deadline = Now_ms@2 + erlang:element(13, Session),
_pipe = {session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
erlang:element(5, Session),
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session),
erlang:element(9, Session),
erlang:element(10, Session),
erlang:element(11, Session),
erlang:element(12, Session),
erlang:element(13, Session),
Deadline,
erlang:element(15, Session)},
emit_telemetry(
_pipe,
{heartbeat_received,
erlang:element(2, Session),
Deadline}
);
_ ->
ignore_event(Session, <<"heartbeat"/utf8>>)
end;
{tick, Now_ms@3} ->
case erlang:element(4, Session) of
live_label ->
case Now_ms@3 > erlang:element(14, Session) of
true ->
_pipe@1 = {session,
erlang:element(2, Session),
erlang:element(3, Session),
draining_label,
erlang:element(5, Session),
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session),
erlang:element(9, Session),
erlang:element(10, Session),
erlang:element(11, Session),
erlang:element(12, Session),
erlang:element(13, Session),
erlang:element(14, Session),
erlang:element(15, Session)},
emit_telemetry(
_pipe@1,
{heartbeat_timed_out,
erlang:element(2, Session),
Now_ms@3}
);
false ->
Session
end;
_ ->
Session
end;
{crash, Reason} ->
_pipe@2 = {session,
erlang:element(2, Session),
erlang:element(3, Session),
terminated_label,
erlang:element(5, Session),
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session),
erlang:element(9, Session),
erlang:element(10, Session),
erlang:element(11, Session),
erlang:element(12, Session),
erlang:element(13, Session),
erlang:element(14, Session),
true},
emit_telemetry(
_pipe@2,
{session_crashed, erlang:element(2, Session), Reason}
);
{restart, Now_ms@4} ->
case erlang:element(15, Session) of
true ->
_pipe@3 = restart_session(Session, Now_ms@4),
emit_telemetry(
_pipe@3,
{session_restarted, erlang:element(2, Session)}
);
false ->
ignore_event(Session, <<"restart"/utf8>>)
end;
{shutdown, Reason@1} ->
_pipe@4 = {session,
erlang:element(2, Session),
erlang:element(3, Session),
terminated_label,
erlang:element(5, Session),
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session),
erlang:element(9, Session),
erlang:element(10, Session),
erlang:element(11, Session),
erlang:element(12, Session),
erlang:element(13, Session),
erlang:element(14, Session),
erlang:element(15, Session)},
emit_telemetry(
_pipe@4,
{session_shutdown, erlang:element(2, Session), Reason@1}
)
end.
-file("src/lightspeed/agent/session.gleam", 121).
?DOC(" Handle one typed inbox message.\n").
-spec handle(session(), inbox_message()) -> session().
handle(Session, Message) ->
case Message of
{inbox_message, Owner, Event} ->
case Owner =:= erlang:element(3, Session) of
true ->
apply_event(Session, Event);
false ->
emit_telemetry(
Session,
{ownership_rejected, erlang:element(2, Session), Owner}
)
end
end.
-file("src/lightspeed/agent/session.gleam", 132).
?DOC(" Session id.\n").
-spec id(session()) -> binary().
id(Session) ->
erlang:element(2, Session).
-file("src/lightspeed/agent/session.gleam", 137).
?DOC(" Owning server process id.\n").
-spec owner(session()) -> binary().
owner(Session) ->
erlang:element(3, Session).
-file("src/lightspeed/agent/session.gleam", 142).
?DOC(" Lifecycle label.\n").
-spec lifecycle(session()) -> lightspeed@agent@typestate:lifecycle().
lifecycle(Session) ->
erlang:element(4, Session).
-file("src/lightspeed/agent/session.gleam", 147).
?DOC(" Current counter model used by this minimal actor.\n").
-spec counter(session()) -> integer().
counter(Session) ->
erlang:element(7, Session).
-file("src/lightspeed/agent/session.gleam", 152).
?DOC(" Current reconnect policy.\n").
-spec reconnect_policy(session()) -> reconnect_policy().
reconnect_policy(Session) ->
erlang:element(5, Session).
-file("src/lightspeed/agent/session.gleam", 157).
?DOC(" Pending patches waiting for ack, in emit order.\n").
-spec pending_patches(session()) -> list(patch_envelope()).
pending_patches(Session) ->
lists:reverse(erlang:element(9, Session)).
-file("src/lightspeed/agent/session.gleam", 162).
?DOC(" Number of pending patches waiting for ack.\n").
-spec pending_patch_count(session()) -> integer().
pending_patch_count(Session) ->
erlang:length(erlang:element(9, Session)).
-file("src/lightspeed/agent/session.gleam", 496).
-spec oldest_patch(list(patch_envelope())) -> gleam@option:option(patch_envelope()).
oldest_patch(Patches) ->
case Patches of
[] ->
none;
[Patch] ->
{some, Patch};
[_ | Rest] ->
oldest_patch(Rest)
end.
-file("src/lightspeed/agent/session.gleam", 167).
?DOC(" Oldest pending patch waiting for ack, when present.\n").
-spec oldest_pending_patch(session()) -> gleam@option:option(patch_envelope()).
oldest_pending_patch(Session) ->
oldest_patch(erlang:element(9, Session)).
-file("src/lightspeed/agent/session.gleam", 172).
?DOC(" Telemetry records in emit order.\n").
-spec telemetry(session()) -> list(telemetry_event()).
telemetry(Session) ->
lists:reverse(erlang:element(11, Session)).
-file("src/lightspeed/agent/session.gleam", 177).
?DOC(" Number of telemetry events currently held by session state.\n").
-spec telemetry_count(session()) -> integer().
telemetry_count(Session) ->
erlang:length(erlang:element(11, Session)).
-file("src/lightspeed/agent/session.gleam", 504).
-spec summarize_telemetry(list(telemetry_event()), telemetry_summary()) -> telemetry_summary().
summarize_telemetry(Telemetry_rev, Summary) ->
case Telemetry_rev of
[] ->
Summary;
[Event | Rest] ->
Next = case Event of
{patch_queued, _, _} ->
{telemetry_summary,
erlang:element(2, Summary) + 1,
erlang:element(3, Summary),
erlang:element(4, Summary),
erlang:element(5, Summary),
erlang:element(6, Summary),
erlang:element(7, Summary) + 1};
{patch_acked, _, _} ->
{telemetry_summary,
erlang:element(2, Summary),
erlang:element(3, Summary) + 1,
erlang:element(4, Summary),
erlang:element(5, Summary),
erlang:element(6, Summary),
erlang:element(7, Summary) + 1};
{session_crashed, _, _} ->
{telemetry_summary,
erlang:element(2, Summary),
erlang:element(3, Summary),
erlang:element(4, Summary) + 1,
erlang:element(5, Summary),
erlang:element(6, Summary),
erlang:element(7, Summary) + 1};
{session_rehydrated, _, _} ->
{telemetry_summary,
erlang:element(2, Summary),
erlang:element(3, Summary),
erlang:element(4, Summary),
erlang:element(5, Summary) + 1,
erlang:element(6, Summary),
erlang:element(7, Summary) + 1};
{session_remounted, _, _} ->
{telemetry_summary,
erlang:element(2, Summary),
erlang:element(3, Summary),
erlang:element(4, Summary),
erlang:element(5, Summary),
erlang:element(6, Summary) + 1,
erlang:element(7, Summary) + 1};
_ ->
{telemetry_summary,
erlang:element(2, Summary),
erlang:element(3, Summary),
erlang:element(4, Summary),
erlang:element(5, Summary),
erlang:element(6, Summary),
erlang:element(7, Summary) + 1}
end,
summarize_telemetry(Rest, Next)
end.
-file("src/lightspeed/agent/session.gleam", 182).
?DOC(" Aggregated telemetry counters for hot-path extraction.\n").
-spec telemetry_summary(session()) -> telemetry_summary().
telemetry_summary(Session) ->
summarize_telemetry(
erlang:element(11, Session),
{telemetry_summary, 0, 0, 0, 0, 0, 0}
).
-file("src/lightspeed/agent/session.gleam", 197).
?DOC(" `patch_queued` event count.\n").
-spec summary_patch_queued(telemetry_summary()) -> integer().
summary_patch_queued(Summary) ->
erlang:element(2, Summary).
-file("src/lightspeed/agent/session.gleam", 202).
?DOC(" `patch_acked` event count.\n").
-spec summary_patch_acked(telemetry_summary()) -> integer().
summary_patch_acked(Summary) ->
erlang:element(3, Summary).
-file("src/lightspeed/agent/session.gleam", 207).
?DOC(" `session_crashed` event count.\n").
-spec summary_session_crashed(telemetry_summary()) -> integer().
summary_session_crashed(Summary) ->
erlang:element(4, Summary).
-file("src/lightspeed/agent/session.gleam", 212).
?DOC(" `session_rehydrated` event count.\n").
-spec summary_session_rehydrated(telemetry_summary()) -> integer().
summary_session_rehydrated(Summary) ->
erlang:element(5, Summary).
-file("src/lightspeed/agent/session.gleam", 217).
?DOC(" `session_remounted` event count.\n").
-spec summary_session_remounted(telemetry_summary()) -> integer().
summary_session_remounted(Summary) ->
erlang:element(6, Summary).
-file("src/lightspeed/agent/session.gleam", 222).
?DOC(" Total telemetry event count.\n").
-spec summary_total_events(telemetry_summary()) -> integer().
summary_total_events(Summary) ->
erlang:element(7, Summary).
-file("src/lightspeed/agent/session.gleam", 227).
?DOC(" Heartbeat deadline timestamp in milliseconds.\n").
-spec heartbeat_deadline_ms(session()) -> integer().
heartbeat_deadline_ms(Session) ->
erlang:element(14, Session).
-file("src/lightspeed/agent/session.gleam", 232).
?DOC(" True when session has crashed and is waiting for restart.\n").
-spec crashed(session()) -> boolean().
crashed(Session) ->
erlang:element(15, Session).
-file("src/lightspeed/agent/session.gleam", 237).
?DOC(" Drain the outbox and clear it from state.\n").
-spec flush_outbox(session()) -> {session(), list(outbox_message())}.
flush_outbox(Session) ->
Outbox = lists:reverse(erlang:element(10, Session)),
{{session,
erlang:element(2, Session),
erlang:element(3, Session),
erlang:element(4, Session),
erlang:element(5, Session),
erlang:element(6, Session),
erlang:element(7, Session),
erlang:element(8, Session),
erlang:element(9, Session),
[],
erlang:element(11, Session),
erlang:element(12, Session),
erlang:element(13, Session),
erlang:element(14, Session),
erlang:element(15, Session)},
Outbox}.
-file("src/lightspeed/agent/session.gleam", 243).
?DOC(" Extract patch reference.\n").
-spec patch_ref(patch_envelope()) -> binary().
patch_ref(Patch) ->
erlang:element(2, Patch).
-file("src/lightspeed/agent/session.gleam", 248).
?DOC(" Extract patch payload.\n").
-spec patch(patch_envelope()) -> lightspeed@diff:patch().
patch(Patch) ->
erlang:element(3, Patch).
-file("src/lightspeed/agent/session.gleam", 253).
?DOC(" Stable telemetry label for assertions and logs.\n").
-spec telemetry_label(telemetry_event()) -> binary().
telemetry_label(Event) ->
case Event of
{session_mounted, _, _} ->
<<"session_mounted"/utf8>>;
{session_rehydrated, _, _} ->
<<"session_rehydrated"/utf8>>;
{session_remounted, _, _} ->
<<"session_remounted"/utf8>>;
{counter_updated, _, _} ->
<<"counter_updated"/utf8>>;
{patch_queued, _, _} ->
<<"patch_queued"/utf8>>;
{patch_acked, _, _} ->
<<"patch_acked"/utf8>>;
{heartbeat_received, _, _} ->
<<"heartbeat_received"/utf8>>;
{heartbeat_timed_out, _, _} ->
<<"heartbeat_timed_out"/utf8>>;
{session_crashed, _, _} ->
<<"session_crashed"/utf8>>;
{session_restarted, _} ->
<<"session_restarted"/utf8>>;
{session_shutdown, _, _} ->
<<"session_shutdown"/utf8>>;
{ownership_rejected, _, _} ->
<<"ownership_rejected"/utf8>>;
{event_ignored, _, _} ->
<<"event_ignored"/utf8>>
end.