-module(lattice_presence@presence_state).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lattice_presence/presence_state.gleam").
-export([new/1, join/5, leave/4, leave_by_pid/2, online_list/1, get_by_topic/2, get_by_key/3, compact/1, merge_with_diff/2, merge/2, extract_full_state/1, replica/1, compacted_clocks/1, entry_count/1, cloud_count/1, internal_values/1, internal_clouds/1, replica_down/2, replica_up/2, remove_down_replica/2, replicated_parts/1, from_replicated_parts/4]).
-export_type([tag/0, entry/0, replica_status/0, state/0, diff/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" Presence State - Pure CRDT for distributed presence tracking\n"
"\n"
" A causal-context add-wins observed-remove set, inspired by\n"
" Phoenix.Tracker.State. This module is a pure data structure with no\n"
" actors or side effects.\n"
"\n"
" Each node (replica) tracks its own presences authoritatively. State is\n"
" replicated by extracting deltas and merging them at remote replicas.\n"
" Conflicts are resolved causally: adds win over concurrent removes.\n"
"\n"
" ## Example\n"
"\n"
" ```gleam\n"
" import gleam/json\n"
" import lattice_presence/presence_state as state\n"
"\n"
" let a = state.new(\"node-a\")\n"
" |> state.join(\"pid-1\", \"room:lobby\", \"alice\", json.object([]))\n"
" let b = state.new(\"node-b\")\n"
" |> state.join(\"pid-2\", \"room:lobby\", \"bob\", json.object([]))\n"
" let merged = state.merge(a, b)\n"
" state.get_by_topic(merged, \"room:lobby\")\n"
" // -> [#(\"pid-1\", \"alice\", _), #(\"pid-2\", \"bob\", _)]\n"
" ```\n"
).
-type tag() :: {tag, binary(), integer()}.
-type entry() :: {entry, binary(), binary(), binary(), gleam@json:json()}.
-type replica_status() :: up | down.
-opaque state() :: {state,
binary(),
gleam@dict:dict(binary(), integer()),
gleam@dict:dict(binary(), gleam@set:set(integer())),
gleam@dict:dict(tag(), entry()),
gleam@dict:dict(binary(), replica_status())}.
-type diff() :: {diff,
gleam@dict:dict(binary(), list({binary(), binary(), gleam@json:json()})),
gleam@dict:dict(binary(), list({binary(), binary(), gleam@json:json()}))}.
-file("src/lattice_presence/presence_state.gleam", 119).
?DOC(" Create a new empty state for this replica\n").
-spec new(binary()) -> state().
new(Replica) ->
{state,
Replica,
maps:new(),
maps:new(),
maps:new(),
maps:from_list([{Replica, up}])}.
-file("src/lattice_presence/presence_state.gleam", 516).
-spec next_clock(state(), binary()) -> integer().
next_clock(State, Replica) ->
Ctx_clock = case gleam_stdlib:map_get(erlang:element(3, State), Replica) of
{ok, C} ->
C;
{error, _} ->
0
end,
Cloud_max = case gleam_stdlib:map_get(erlang:element(4, State), Replica) of
{ok, Cloud} ->
gleam@set:fold(Cloud, 0, fun gleam@int:max/2);
{error, _} ->
0
end,
gleam@int:max(Ctx_clock, Cloud_max) + 1.
-file("src/lattice_presence/presence_state.gleam", 130).
?DOC(" Add a tracked presence. Increments the local clock.\n").
-spec join(state(), binary(), binary(), binary(), gleam@json:json()) -> state().
join(State, Pid, Topic, Key, Meta) ->
Clock = next_clock(State, erlang:element(2, State)),
Tag = {tag, erlang:element(2, State), Clock},
Entry = {entry, Topic, Key, Pid, Meta},
New_context = gleam@dict:insert(
erlang:element(3, State),
erlang:element(2, State),
Clock
),
New_values = gleam@dict:insert(erlang:element(5, State), Tag, Entry),
{state,
erlang:element(2, State),
New_context,
erlang:element(4, State),
New_values,
erlang:element(6, State)}.
-file("src/lattice_presence/presence_state.gleam", 151).
?DOC(
" Remove a specific presence by pid, topic, and key.\n"
"\n"
" Only entries owned by this replica are removable — leaving a foreign\n"
" replica's entry would not be causally observed (this node's context\n"
" doesn't cover the foreign tag), so it would silently reappear on the\n"
" next merge. Foreign entries are filtered out at the source instead.\n"
).
-spec leave(state(), binary(), binary(), binary()) -> state().
leave(State, Pid, Topic, Key) ->
New_values = gleam@dict:filter(
erlang:element(5, State),
fun(Tag, Entry) ->
(((erlang:element(2, Tag) /= erlang:element(2, State)) orelse (erlang:element(
4,
Entry
)
/= Pid))
orelse (erlang:element(2, Entry) /= Topic))
orelse (erlang:element(3, Entry) /= Key)
end
),
{state,
erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
New_values,
erlang:element(6, State)}.
-file("src/lattice_presence/presence_state.gleam", 166).
?DOC(
" Remove all presences for a pid owned by this replica.\n"
"\n"
" As with `leave`, only locally-owned entries are eligible — see that\n"
" function's docs for the rationale.\n"
).
-spec leave_by_pid(state(), binary()) -> state().
leave_by_pid(State, Pid) ->
New_values = gleam@dict:filter(
erlang:element(5, State),
fun(Tag, Entry) ->
(erlang:element(2, Tag) /= erlang:element(2, State)) orelse (erlang:element(
4,
Entry
)
/= Pid)
end
),
{state,
erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
New_values,
erlang:element(6, State)}.
-file("src/lattice_presence/presence_state.gleam", 532).
-spec is_replica_up(state(), binary()) -> boolean().
is_replica_up(State, Replica) ->
case gleam_stdlib:map_get(erlang:element(6, State), Replica) of
{ok, up} ->
true;
{ok, down} ->
false;
{error, _} ->
true
end.
-file("src/lattice_presence/presence_state.gleam", 177).
?DOC(" Collect entries from non-down replicas that satisfy `predicate`.\n").
-spec visible_entries(state(), fun((entry()) -> boolean())) -> list(entry()).
visible_entries(State, Predicate) ->
gleam@dict:fold(
erlang:element(5, State),
[],
fun(Acc, Tag, Entry) ->
case is_replica_up(State, erlang:element(2, Tag)) andalso Predicate(
Entry
) of
true ->
[Entry | Acc];
false ->
Acc
end
end
).
-file("src/lattice_presence/presence_state.gleam", 187).
?DOC(" List all online presences across all topics (from non-down replicas)\n").
-spec online_list(state()) -> list({binary(),
binary(),
binary(),
gleam@json:json()}).
online_list(State) ->
_pipe = visible_entries(State, fun(_) -> true end),
gleam@list:map(
_pipe,
fun(Entry) ->
{erlang:element(4, Entry),
erlang:element(2, Entry),
erlang:element(3, Entry),
erlang:element(5, Entry)}
end
).
-file("src/lattice_presence/presence_state.gleam", 193).
?DOC(" Get all presences for a topic (from non-down replicas)\n").
-spec get_by_topic(state(), binary()) -> list({binary(),
binary(),
gleam@json:json()}).
get_by_topic(State, Topic) ->
_pipe = visible_entries(
State,
fun(Entry) -> erlang:element(2, Entry) =:= Topic end
),
gleam@list:map(
_pipe,
fun(Entry@1) ->
{erlang:element(4, Entry@1),
erlang:element(3, Entry@1),
erlang:element(5, Entry@1)}
end
).
-file("src/lattice_presence/presence_state.gleam", 202).
?DOC(" Get presences for a specific key within a topic\n").
-spec get_by_key(state(), binary(), binary()) -> list({binary(),
gleam@json:json()}).
get_by_key(State, Topic, Key) ->
_pipe = visible_entries(
State,
fun(Entry) ->
(erlang:element(2, Entry) =:= Topic) andalso (erlang:element(
3,
Entry
)
=:= Key)
end
),
gleam@list:map(
_pipe,
fun(Entry@1) ->
{erlang:element(4, Entry@1), erlang:element(5, Entry@1)}
end
).
-file("src/lattice_presence/presence_state.gleam", 342).
?DOC(" Compact a single cloud: advance base clock through contiguous values\n").
-spec compact_cloud(integer(), gleam@set:set(integer())) -> {integer(),
gleam@set:set(integer())}.
compact_cloud(Base, Cloud) ->
case gleam@set:contains(Cloud, Base + 1) of
true ->
compact_cloud(Base + 1, gleam@set:delete(Cloud, Base + 1));
false ->
{Base, Cloud}
end.
-file("src/lattice_presence/presence_state.gleam", 314).
?DOC(
" Compact clouds into context where possible\n"
"\n"
" If context[replica] + 1 is in the cloud, advance context and remove from\n"
" cloud. Repeat until no more compaction possible.\n"
).
-spec compact(state()) -> state().
compact(State) ->
{New_context, New_clouds@1} = gleam@dict:fold(
erlang:element(4, State),
{erlang:element(3, State), erlang:element(4, State)},
fun(Acc, Replica, Cloud) ->
{Ctx, Clouds} = Acc,
Base = case gleam_stdlib:map_get(Ctx, Replica) of
{ok, C} ->
C;
{error, _} ->
0
end,
{New_base, Remaining} = compact_cloud(Base, Cloud),
New_ctx = case New_base > Base of
true ->
gleam@dict:insert(Ctx, Replica, New_base);
false ->
Ctx
end,
New_clouds = case gleam@set:size(Remaining) of
0 ->
gleam@dict:delete(Clouds, Replica);
_ ->
gleam@dict:insert(Clouds, Replica, Remaining)
end,
{New_ctx, New_clouds}
end
),
{state,
erlang:element(2, State),
New_context,
New_clouds@1,
erlang:element(5, State),
erlang:element(6, State)}.
-file("src/lattice_presence/presence_state.gleam", 350).
?DOC(" Group entries by topic for diff reporting\n").
-spec entries_to_topic_diff(list(entry())) -> gleam@dict:dict(binary(), list({binary(),
binary(),
gleam@json:json()})).
entries_to_topic_diff(Entries) ->
gleam@list:fold(
Entries,
maps:new(),
fun(Acc, Entry) ->
Existing = case gleam_stdlib:map_get(Acc, erlang:element(2, Entry)) of
{ok, L} ->
L;
{error, _} ->
[]
end,
gleam@dict:insert(
Acc,
erlang:element(2, Entry),
[{erlang:element(3, Entry),
erlang:element(4, Entry),
erlang:element(5, Entry)} |
Existing]
)
end
).
-file("src/lattice_presence/presence_state.gleam", 303).
?DOC(" Merge cloud sets\n").
-spec merge_clouds(
gleam@dict:dict(binary(), gleam@set:set(integer())),
gleam@dict:dict(binary(), gleam@set:set(integer()))
) -> gleam@dict:dict(binary(), gleam@set:set(integer())).
merge_clouds(A, B) ->
gleam@dict:combine(A, B, fun(Sa, Sb) -> gleam@set:union(Sa, Sb) end).
-file("src/lattice_presence/presence_state.gleam", 295).
?DOC(" Merge two vector clocks (take max per replica)\n").
-spec merge_contexts(
gleam@dict:dict(binary(), integer()),
gleam@dict:dict(binary(), integer())
) -> gleam@dict:dict(binary(), integer()).
merge_contexts(A, B) ->
gleam@dict:combine(A, B, fun gleam@int:max/2).
-file("src/lattice_presence/presence_state.gleam", 278).
?DOC(" Check if a tag is \"in\" a causal context (either compacted or in clouds)\n").
-spec tag_is_in(
gleam@dict:dict(binary(), integer()),
gleam@dict:dict(binary(), gleam@set:set(integer())),
tag()
) -> boolean().
tag_is_in(Context, Clouds, Tag) ->
case gleam_stdlib:map_get(Context, erlang:element(2, Tag)) of
{ok, Clock} when Clock >= erlang:element(3, Tag) ->
true;
_ ->
case gleam_stdlib:map_get(Clouds, erlang:element(2, Tag)) of
{ok, Cloud} ->
gleam@set:contains(Cloud, erlang:element(3, Tag));
{error, _} ->
false
end
end.
-file("src/lattice_presence/presence_state.gleam", 223).
?DOC(" Merge remote state into local state and return a diff of what changed.\n").
-spec merge_with_diff(state(), state()) -> {state(), diff()}.
merge_with_diff(Local, Remote) ->
Joins = begin
_pipe = maps:to_list(erlang:element(5, Remote)),
gleam@list:filter(
_pipe,
fun(Kv) ->
{Tag, _} = Kv,
not tag_is_in(
erlang:element(3, Local),
erlang:element(4, Local),
Tag
)
end
)
end,
Removes = begin
_pipe@1 = maps:to_list(erlang:element(5, Local)),
gleam@list:filter(
_pipe@1,
fun(Kv@1) ->
{Tag@1, _} = Kv@1,
((erlang:element(2, Tag@1) /= erlang:element(2, Local)) andalso tag_is_in(
erlang:element(3, Remote),
erlang:element(4, Remote),
Tag@1
))
andalso not gleam@dict:has_key(erlang:element(5, Remote), Tag@1)
end
)
end,
New_values = gleam@list:fold(
Removes,
erlang:element(5, Local),
fun(Vals, Kv@2) ->
{Tag@2, _} = Kv@2,
gleam@dict:delete(Vals, Tag@2)
end
),
New_values@1 = gleam@list:fold(
Joins,
New_values,
fun(Vals@1, Kv@3) ->
{Tag@3, Entry} = Kv@3,
gleam@dict:insert(Vals@1, Tag@3, Entry)
end
),
New_context = merge_contexts(
erlang:element(3, Local),
erlang:element(3, Remote)
),
New_clouds = merge_clouds(
erlang:element(4, Local),
erlang:element(4, Remote)
),
Join_diff = entries_to_topic_diff(
gleam@list:map(Joins, fun(Kv@4) -> erlang:element(2, Kv@4) end)
),
Leave_diff = entries_to_topic_diff(
gleam@list:map(Removes, fun(Kv@5) -> erlang:element(2, Kv@5) end)
),
Diff = {diff, Join_diff, Leave_diff},
New_state = {state,
erlang:element(2, Local),
New_context,
New_clouds,
New_values@1,
erlang:element(6, Local)},
{compact(New_state), Diff}.
-file("src/lattice_presence/presence_state.gleam", 217).
?DOC(
" Merge remote state into local state.\n"
"\n"
" `replicas` (per-node liveness view) is **not** merged because it is\n"
" local-only view state, not part of the replicated CRDT payload.\n"
).
-spec merge(state(), state()) -> state().
merge(Local, Remote) ->
{Merged, _} = merge_with_diff(Local, Remote),
Merged.
-file("src/lattice_presence/presence_state.gleam", 377).
?DOC(
" Extract state for sending to a remote replica.\n"
"\n"
" Currently returns the full local state. Remote's `merge` handles\n"
" deduplication of entries it already has, and absence of an entry\n"
" combined with coverage in `context` represents an observed removal.\n"
"\n"
" A future delta-extraction variant will use the remote's known\n"
" `context` to filter to only the tags the remote hasn't seen — that\n"
" will be exposed as a separate function rather than retrofitted onto\n"
" this one.\n"
).
-spec extract_full_state(state()) -> state().
extract_full_state(State) ->
State.
-file("src/lattice_presence/presence_state.gleam", 384).
?DOC(" Get the current vector clock\n").
-spec replica(state()) -> binary().
replica(State) ->
erlang:element(2, State).
-file("src/lattice_presence/presence_state.gleam", 389).
?DOC(" Get the compacted vector clock.\n").
-spec compacted_clocks(state()) -> gleam@dict:dict(binary(), integer()).
compacted_clocks(State) ->
erlang:element(3, State).
-file("src/lattice_presence/presence_state.gleam", 394).
?DOC(" Return the number of entries retained by the CRDT state.\n").
-spec entry_count(state()) -> integer().
entry_count(State) ->
maps:size(erlang:element(5, State)).
-file("src/lattice_presence/presence_state.gleam", 399).
?DOC(" Return the number of uncompacted cloud entries retained by the state.\n").
-spec cloud_count(state()) -> integer().
cloud_count(State) ->
maps:size(erlang:element(4, State)).
-file("src/lattice_presence/presence_state.gleam", 404).
?DOC(false).
-spec internal_values(state()) -> gleam@dict:dict(tag(), entry()).
internal_values(State) ->
erlang:element(5, State).
-file("src/lattice_presence/presence_state.gleam", 409).
?DOC(false).
-spec internal_clouds(state()) -> gleam@dict:dict(binary(), gleam@set:set(integer())).
internal_clouds(State) ->
erlang:element(4, State).
-file("src/lattice_presence/presence_state.gleam", 416).
?DOC(" Collect all entries currently owned by `replica`.\n").
-spec entries_for_replica(state(), binary()) -> list(entry()).
entries_for_replica(State, Replica) ->
gleam@dict:fold(
erlang:element(5, State),
[],
fun(Acc, Tag, Entry) -> case erlang:element(2, Tag) =:= Replica of
true ->
[Entry | Acc];
false ->
Acc
end end
).
-file("src/lattice_presence/presence_state.gleam", 429).
?DOC(
" Mark a replica as down. Returns entries that are now invisible (leaves).\n"
"\n"
" Idempotent: if the replica is already `Down`, the state is unchanged\n"
" and the returned diff is empty.\n"
).
-spec replica_down(state(), binary()) -> {state(), diff()}.
replica_down(State, Replica) ->
case gleam_stdlib:map_get(erlang:element(6, State), Replica) of
{ok, down} ->
{State, {diff, maps:new(), maps:new()}};
_ ->
New_replicas = gleam@dict:insert(
erlang:element(6, State),
Replica,
down
),
New_state = {state,
erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
erlang:element(5, State),
New_replicas},
Hidden = entries_for_replica(State, Replica),
Diff = {diff, maps:new(), entries_to_topic_diff(Hidden)},
{New_state, Diff}
end.
-file("src/lattice_presence/presence_state.gleam", 447).
?DOC(
" Mark a replica as up. Returns entries that are now visible again (joins).\n"
"\n"
" Idempotent: if the replica is already `Up` (or unknown — unknown\n"
" replicas are assumed up), the state is unchanged and the returned\n"
" diff is empty.\n"
).
-spec replica_up(state(), binary()) -> {state(), diff()}.
replica_up(State, Replica) ->
case gleam_stdlib:map_get(erlang:element(6, State), Replica) of
{ok, down} ->
New_replicas = gleam@dict:insert(
erlang:element(6, State),
Replica,
up
),
New_state = {state,
erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
erlang:element(5, State),
New_replicas},
Restored = entries_for_replica(State, Replica),
Diff = {diff, entries_to_topic_diff(Restored), maps:new()},
{New_state, Diff};
{ok, up} ->
{State, {diff, maps:new(), maps:new()}};
{error, _} ->
New_replicas@1 = gleam@dict:insert(
erlang:element(6, State),
Replica,
up
),
{{state,
erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
erlang:element(5, State),
New_replicas@1},
{diff, maps:new(), maps:new()}}
end.
-file("src/lattice_presence/presence_state.gleam", 471).
?DOC(" Permanently remove all entries and context for a downed replica\n").
-spec remove_down_replica(state(), binary()) -> state().
remove_down_replica(State, Replica) ->
New_values = gleam@dict:filter(
erlang:element(5, State),
fun(Tag, _) -> erlang:element(2, Tag) /= Replica end
),
New_context = gleam@dict:delete(erlang:element(3, State), Replica),
New_clouds = gleam@dict:delete(erlang:element(4, State), Replica),
New_replicas = gleam@dict:delete(erlang:element(6, State), Replica),
{state,
erlang:element(2, State),
New_context,
New_clouds,
New_values,
New_replicas}.
-file("src/lattice_presence/presence_state.gleam", 487).
?DOC(false).
-spec replicated_parts(state()) -> {binary(),
gleam@dict:dict(binary(), integer()),
gleam@dict:dict(binary(), gleam@set:set(integer())),
gleam@dict:dict(tag(), entry())}.
replicated_parts(State) ->
{erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
erlang:element(5, State)}.
-file("src/lattice_presence/presence_state.gleam", 499).
?DOC(false).
-spec from_replicated_parts(
binary(),
gleam@dict:dict(binary(), integer()),
gleam@dict:dict(binary(), gleam@set:set(integer())),
gleam@dict:dict(tag(), entry())
) -> state().
from_replicated_parts(Replica, Context, Clouds, Values) ->
{state, Replica, Context, Clouds, Values, maps:from_list([{Replica, up}])}.