src/lattice_presence@presence_state.erl

-module(lattice_presence@presence_state).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lattice_presence/presence_state.gleam").
-export([new/1, join/5, leave/4, leave_by_pid/2, online_list/1, get_by_topic/2, get_by_key/3, compact/1, merge_with_diff/2, merge/2, extract_full_state/1, replica/1, compacted_clocks/1, entry_count/1, cloud_count/1, internal_values/1, internal_clouds/1, replica_down/2, replica_up/2, remove_down_replica/2, replicated_parts/1, from_replicated_parts/4]).
-export_type([tag/0, entry/0, replica_status/0, state/0, diff/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " Presence State - Pure CRDT for distributed presence tracking\n"
    "\n"
    " A causal-context add-wins observed-remove set, inspired by\n"
    " Phoenix.Tracker.State. This module is a pure data structure with no\n"
    " actors or side effects.\n"
    "\n"
    " Each node (replica) tracks its own presences authoritatively. State is\n"
    " replicated by extracting deltas and merging them at remote replicas.\n"
    " Conflicts are resolved causally: adds win over concurrent removes.\n"
    "\n"
    " ## Example\n"
    "\n"
    " ```gleam\n"
    " import gleam/json\n"
    " import lattice_presence/presence_state as state\n"
    "\n"
    " let a = state.new(\"node-a\")\n"
    "   |> state.join(\"pid-1\", \"room:lobby\", \"alice\", json.object([]))\n"
    " let b = state.new(\"node-b\")\n"
    "   |> state.join(\"pid-2\", \"room:lobby\", \"bob\", json.object([]))\n"
    " let merged = state.merge(a, b)\n"
    " state.get_by_topic(merged, \"room:lobby\")\n"
    " // -> [#(\"pid-1\", \"alice\", _), #(\"pid-2\", \"bob\", _)]\n"
    " ```\n"
).

-type tag() :: {tag, binary(), integer()}.

-type entry() :: {entry, binary(), binary(), binary(), gleam@json:json()}.

-type replica_status() :: up | down.

-opaque state() :: {state,
        binary(),
        gleam@dict:dict(binary(), integer()),
        gleam@dict:dict(binary(), gleam@set:set(integer())),
        gleam@dict:dict(tag(), entry()),
        gleam@dict:dict(binary(), replica_status())}.

-type diff() :: {diff,
        gleam@dict:dict(binary(), list({binary(), binary(), gleam@json:json()})),
        gleam@dict:dict(binary(), list({binary(), binary(), gleam@json:json()}))}.

-file("src/lattice_presence/presence_state.gleam", 119).
?DOC(" Create a new empty state for this replica\n").
-spec new(binary()) -> state().
new(Replica) ->
    {state,
        Replica,
        maps:new(),
        maps:new(),
        maps:new(),
        maps:from_list([{Replica, up}])}.

-file("src/lattice_presence/presence_state.gleam", 516).
-spec next_clock(state(), binary()) -> integer().
next_clock(State, Replica) ->
    Ctx_clock = case gleam_stdlib:map_get(erlang:element(3, State), Replica) of
        {ok, C} ->
            C;

        {error, _} ->
            0
    end,
    Cloud_max = case gleam_stdlib:map_get(erlang:element(4, State), Replica) of
        {ok, Cloud} ->
            gleam@set:fold(Cloud, 0, fun gleam@int:max/2);

        {error, _} ->
            0
    end,
    gleam@int:max(Ctx_clock, Cloud_max) + 1.

-file("src/lattice_presence/presence_state.gleam", 130).
?DOC(" Add a tracked presence. Increments the local clock.\n").
-spec join(state(), binary(), binary(), binary(), gleam@json:json()) -> state().
join(State, Pid, Topic, Key, Meta) ->
    Clock = next_clock(State, erlang:element(2, State)),
    Tag = {tag, erlang:element(2, State), Clock},
    Entry = {entry, Topic, Key, Pid, Meta},
    New_context = gleam@dict:insert(
        erlang:element(3, State),
        erlang:element(2, State),
        Clock
    ),
    New_values = gleam@dict:insert(erlang:element(5, State), Tag, Entry),
    {state,
        erlang:element(2, State),
        New_context,
        erlang:element(4, State),
        New_values,
        erlang:element(6, State)}.

-file("src/lattice_presence/presence_state.gleam", 151).
?DOC(
    " Remove a specific presence by pid, topic, and key.\n"
    "\n"
    " Only entries owned by this replica are removable — leaving a foreign\n"
    " replica's entry would not be causally observed (this node's context\n"
    " doesn't cover the foreign tag), so it would silently reappear on the\n"
    " next merge. Foreign entries are filtered out at the source instead.\n"
).
-spec leave(state(), binary(), binary(), binary()) -> state().
leave(State, Pid, Topic, Key) ->
    New_values = gleam@dict:filter(
        erlang:element(5, State),
        fun(Tag, Entry) ->
            (((erlang:element(2, Tag) /= erlang:element(2, State)) orelse (erlang:element(
                4,
                Entry
            )
            /= Pid))
            orelse (erlang:element(2, Entry) /= Topic))
            orelse (erlang:element(3, Entry) /= Key)
        end
    ),
    {state,
        erlang:element(2, State),
        erlang:element(3, State),
        erlang:element(4, State),
        New_values,
        erlang:element(6, State)}.

-file("src/lattice_presence/presence_state.gleam", 166).
?DOC(
    " Remove all presences for a pid owned by this replica.\n"
    "\n"
    " As with `leave`, only locally-owned entries are eligible — see that\n"
    " function's docs for the rationale.\n"
).
-spec leave_by_pid(state(), binary()) -> state().
leave_by_pid(State, Pid) ->
    New_values = gleam@dict:filter(
        erlang:element(5, State),
        fun(Tag, Entry) ->
            (erlang:element(2, Tag) /= erlang:element(2, State)) orelse (erlang:element(
                4,
                Entry
            )
            /= Pid)
        end
    ),
    {state,
        erlang:element(2, State),
        erlang:element(3, State),
        erlang:element(4, State),
        New_values,
        erlang:element(6, State)}.

-file("src/lattice_presence/presence_state.gleam", 532).
-spec is_replica_up(state(), binary()) -> boolean().
is_replica_up(State, Replica) ->
    case gleam_stdlib:map_get(erlang:element(6, State), Replica) of
        {ok, up} ->
            true;

        {ok, down} ->
            false;

        {error, _} ->
            true
    end.

-file("src/lattice_presence/presence_state.gleam", 177).
?DOC(" Collect entries from non-down replicas that satisfy `predicate`.\n").
-spec visible_entries(state(), fun((entry()) -> boolean())) -> list(entry()).
visible_entries(State, Predicate) ->
    gleam@dict:fold(
        erlang:element(5, State),
        [],
        fun(Acc, Tag, Entry) ->
            case is_replica_up(State, erlang:element(2, Tag)) andalso Predicate(
                Entry
            ) of
                true ->
                    [Entry | Acc];

                false ->
                    Acc
            end
        end
    ).

-file("src/lattice_presence/presence_state.gleam", 187).
?DOC(" List all online presences across all topics (from non-down replicas)\n").
-spec online_list(state()) -> list({binary(),
    binary(),
    binary(),
    gleam@json:json()}).
online_list(State) ->
    _pipe = visible_entries(State, fun(_) -> true end),
    gleam@list:map(
        _pipe,
        fun(Entry) ->
            {erlang:element(4, Entry),
                erlang:element(2, Entry),
                erlang:element(3, Entry),
                erlang:element(5, Entry)}
        end
    ).

-file("src/lattice_presence/presence_state.gleam", 193).
?DOC(" Get all presences for a topic (from non-down replicas)\n").
-spec get_by_topic(state(), binary()) -> list({binary(),
    binary(),
    gleam@json:json()}).
get_by_topic(State, Topic) ->
    _pipe = visible_entries(
        State,
        fun(Entry) -> erlang:element(2, Entry) =:= Topic end
    ),
    gleam@list:map(
        _pipe,
        fun(Entry@1) ->
            {erlang:element(4, Entry@1),
                erlang:element(3, Entry@1),
                erlang:element(5, Entry@1)}
        end
    ).

-file("src/lattice_presence/presence_state.gleam", 202).
?DOC(" Get presences for a specific key within a topic\n").
-spec get_by_key(state(), binary(), binary()) -> list({binary(),
    gleam@json:json()}).
get_by_key(State, Topic, Key) ->
    _pipe = visible_entries(
        State,
        fun(Entry) ->
            (erlang:element(2, Entry) =:= Topic) andalso (erlang:element(
                3,
                Entry
            )
            =:= Key)
        end
    ),
    gleam@list:map(
        _pipe,
        fun(Entry@1) ->
            {erlang:element(4, Entry@1), erlang:element(5, Entry@1)}
        end
    ).

-file("src/lattice_presence/presence_state.gleam", 342).
?DOC(" Compact a single cloud: advance base clock through contiguous values\n").
-spec compact_cloud(integer(), gleam@set:set(integer())) -> {integer(),
    gleam@set:set(integer())}.
compact_cloud(Base, Cloud) ->
    case gleam@set:contains(Cloud, Base + 1) of
        true ->
            compact_cloud(Base + 1, gleam@set:delete(Cloud, Base + 1));

        false ->
            {Base, Cloud}
    end.

-file("src/lattice_presence/presence_state.gleam", 314).
?DOC(
    " Compact clouds into context where possible\n"
    "\n"
    " If context[replica] + 1 is in the cloud, advance context and remove from\n"
    " cloud. Repeat until no more compaction possible.\n"
).
-spec compact(state()) -> state().
compact(State) ->
    {New_context, New_clouds@1} = gleam@dict:fold(
        erlang:element(4, State),
        {erlang:element(3, State), erlang:element(4, State)},
        fun(Acc, Replica, Cloud) ->
            {Ctx, Clouds} = Acc,
            Base = case gleam_stdlib:map_get(Ctx, Replica) of
                {ok, C} ->
                    C;

                {error, _} ->
                    0
            end,
            {New_base, Remaining} = compact_cloud(Base, Cloud),
            New_ctx = case New_base > Base of
                true ->
                    gleam@dict:insert(Ctx, Replica, New_base);

                false ->
                    Ctx
            end,
            New_clouds = case gleam@set:size(Remaining) of
                0 ->
                    gleam@dict:delete(Clouds, Replica);

                _ ->
                    gleam@dict:insert(Clouds, Replica, Remaining)
            end,
            {New_ctx, New_clouds}
        end
    ),
    {state,
        erlang:element(2, State),
        New_context,
        New_clouds@1,
        erlang:element(5, State),
        erlang:element(6, State)}.

-file("src/lattice_presence/presence_state.gleam", 350).
?DOC(" Group entries by topic for diff reporting\n").
-spec entries_to_topic_diff(list(entry())) -> gleam@dict:dict(binary(), list({binary(),
    binary(),
    gleam@json:json()})).
entries_to_topic_diff(Entries) ->
    gleam@list:fold(
        Entries,
        maps:new(),
        fun(Acc, Entry) ->
            Existing = case gleam_stdlib:map_get(Acc, erlang:element(2, Entry)) of
                {ok, L} ->
                    L;

                {error, _} ->
                    []
            end,
            gleam@dict:insert(
                Acc,
                erlang:element(2, Entry),
                [{erlang:element(3, Entry),
                        erlang:element(4, Entry),
                        erlang:element(5, Entry)} |
                    Existing]
            )
        end
    ).

-file("src/lattice_presence/presence_state.gleam", 303).
?DOC(" Merge cloud sets\n").
-spec merge_clouds(
    gleam@dict:dict(binary(), gleam@set:set(integer())),
    gleam@dict:dict(binary(), gleam@set:set(integer()))
) -> gleam@dict:dict(binary(), gleam@set:set(integer())).
merge_clouds(A, B) ->
    gleam@dict:combine(A, B, fun(Sa, Sb) -> gleam@set:union(Sa, Sb) end).

-file("src/lattice_presence/presence_state.gleam", 295).
?DOC(" Merge two vector clocks (take max per replica)\n").
-spec merge_contexts(
    gleam@dict:dict(binary(), integer()),
    gleam@dict:dict(binary(), integer())
) -> gleam@dict:dict(binary(), integer()).
merge_contexts(A, B) ->
    gleam@dict:combine(A, B, fun gleam@int:max/2).

-file("src/lattice_presence/presence_state.gleam", 278).
?DOC(" Check if a tag is \"in\" a causal context (either compacted or in clouds)\n").
-spec tag_is_in(
    gleam@dict:dict(binary(), integer()),
    gleam@dict:dict(binary(), gleam@set:set(integer())),
    tag()
) -> boolean().
tag_is_in(Context, Clouds, Tag) ->
    case gleam_stdlib:map_get(Context, erlang:element(2, Tag)) of
        {ok, Clock} when Clock >= erlang:element(3, Tag) ->
            true;

        _ ->
            case gleam_stdlib:map_get(Clouds, erlang:element(2, Tag)) of
                {ok, Cloud} ->
                    gleam@set:contains(Cloud, erlang:element(3, Tag));

                {error, _} ->
                    false
            end
    end.

-file("src/lattice_presence/presence_state.gleam", 223).
?DOC(" Merge remote state into local state and return a diff of what changed.\n").
-spec merge_with_diff(state(), state()) -> {state(), diff()}.
merge_with_diff(Local, Remote) ->
    Joins = begin
        _pipe = maps:to_list(erlang:element(5, Remote)),
        gleam@list:filter(
            _pipe,
            fun(Kv) ->
                {Tag, _} = Kv,
                not tag_is_in(
                    erlang:element(3, Local),
                    erlang:element(4, Local),
                    Tag
                )
            end
        )
    end,
    Removes = begin
        _pipe@1 = maps:to_list(erlang:element(5, Local)),
        gleam@list:filter(
            _pipe@1,
            fun(Kv@1) ->
                {Tag@1, _} = Kv@1,
                ((erlang:element(2, Tag@1) /= erlang:element(2, Local)) andalso tag_is_in(
                    erlang:element(3, Remote),
                    erlang:element(4, Remote),
                    Tag@1
                ))
                andalso not gleam@dict:has_key(erlang:element(5, Remote), Tag@1)
            end
        )
    end,
    New_values = gleam@list:fold(
        Removes,
        erlang:element(5, Local),
        fun(Vals, Kv@2) ->
            {Tag@2, _} = Kv@2,
            gleam@dict:delete(Vals, Tag@2)
        end
    ),
    New_values@1 = gleam@list:fold(
        Joins,
        New_values,
        fun(Vals@1, Kv@3) ->
            {Tag@3, Entry} = Kv@3,
            gleam@dict:insert(Vals@1, Tag@3, Entry)
        end
    ),
    New_context = merge_contexts(
        erlang:element(3, Local),
        erlang:element(3, Remote)
    ),
    New_clouds = merge_clouds(
        erlang:element(4, Local),
        erlang:element(4, Remote)
    ),
    Join_diff = entries_to_topic_diff(
        gleam@list:map(Joins, fun(Kv@4) -> erlang:element(2, Kv@4) end)
    ),
    Leave_diff = entries_to_topic_diff(
        gleam@list:map(Removes, fun(Kv@5) -> erlang:element(2, Kv@5) end)
    ),
    Diff = {diff, Join_diff, Leave_diff},
    New_state = {state,
        erlang:element(2, Local),
        New_context,
        New_clouds,
        New_values@1,
        erlang:element(6, Local)},
    {compact(New_state), Diff}.

-file("src/lattice_presence/presence_state.gleam", 217).
?DOC(
    " Merge remote state into local state.\n"
    "\n"
    " `replicas` (per-node liveness view) is **not** merged because it is\n"
    " local-only view state, not part of the replicated CRDT payload.\n"
).
-spec merge(state(), state()) -> state().
merge(Local, Remote) ->
    {Merged, _} = merge_with_diff(Local, Remote),
    Merged.

-file("src/lattice_presence/presence_state.gleam", 377).
?DOC(
    " Extract state for sending to a remote replica.\n"
    "\n"
    " Currently returns the full local state. Remote's `merge` handles\n"
    " deduplication of entries it already has, and absence of an entry\n"
    " combined with coverage in `context` represents an observed removal.\n"
    "\n"
    " A future delta-extraction variant will use the remote's known\n"
    " `context` to filter to only the tags the remote hasn't seen — that\n"
    " will be exposed as a separate function rather than retrofitted onto\n"
    " this one.\n"
).
-spec extract_full_state(state()) -> state().
extract_full_state(State) ->
    State.

-file("src/lattice_presence/presence_state.gleam", 384).
?DOC(" Get the current vector clock\n").
-spec replica(state()) -> binary().
replica(State) ->
    erlang:element(2, State).

-file("src/lattice_presence/presence_state.gleam", 389).
?DOC(" Get the compacted vector clock.\n").
-spec compacted_clocks(state()) -> gleam@dict:dict(binary(), integer()).
compacted_clocks(State) ->
    erlang:element(3, State).

-file("src/lattice_presence/presence_state.gleam", 394).
?DOC(" Return the number of entries retained by the CRDT state.\n").
-spec entry_count(state()) -> integer().
entry_count(State) ->
    maps:size(erlang:element(5, State)).

-file("src/lattice_presence/presence_state.gleam", 399).
?DOC(" Return the number of uncompacted cloud entries retained by the state.\n").
-spec cloud_count(state()) -> integer().
cloud_count(State) ->
    maps:size(erlang:element(4, State)).

-file("src/lattice_presence/presence_state.gleam", 404).
?DOC(false).
-spec internal_values(state()) -> gleam@dict:dict(tag(), entry()).
internal_values(State) ->
    erlang:element(5, State).

-file("src/lattice_presence/presence_state.gleam", 409).
?DOC(false).
-spec internal_clouds(state()) -> gleam@dict:dict(binary(), gleam@set:set(integer())).
internal_clouds(State) ->
    erlang:element(4, State).

-file("src/lattice_presence/presence_state.gleam", 416).
?DOC(" Collect all entries currently owned by `replica`.\n").
-spec entries_for_replica(state(), binary()) -> list(entry()).
entries_for_replica(State, Replica) ->
    gleam@dict:fold(
        erlang:element(5, State),
        [],
        fun(Acc, Tag, Entry) -> case erlang:element(2, Tag) =:= Replica of
                true ->
                    [Entry | Acc];

                false ->
                    Acc
            end end
    ).

-file("src/lattice_presence/presence_state.gleam", 429).
?DOC(
    " Mark a replica as down. Returns entries that are now invisible (leaves).\n"
    "\n"
    " Idempotent: if the replica is already `Down`, the state is unchanged\n"
    " and the returned diff is empty.\n"
).
-spec replica_down(state(), binary()) -> {state(), diff()}.
replica_down(State, Replica) ->
    case gleam_stdlib:map_get(erlang:element(6, State), Replica) of
        {ok, down} ->
            {State, {diff, maps:new(), maps:new()}};

        _ ->
            New_replicas = gleam@dict:insert(
                erlang:element(6, State),
                Replica,
                down
            ),
            New_state = {state,
                erlang:element(2, State),
                erlang:element(3, State),
                erlang:element(4, State),
                erlang:element(5, State),
                New_replicas},
            Hidden = entries_for_replica(State, Replica),
            Diff = {diff, maps:new(), entries_to_topic_diff(Hidden)},
            {New_state, Diff}
    end.

-file("src/lattice_presence/presence_state.gleam", 447).
?DOC(
    " Mark a replica as up. Returns entries that are now visible again (joins).\n"
    "\n"
    " Idempotent: if the replica is already `Up` (or unknown — unknown\n"
    " replicas are assumed up), the state is unchanged and the returned\n"
    " diff is empty.\n"
).
-spec replica_up(state(), binary()) -> {state(), diff()}.
replica_up(State, Replica) ->
    case gleam_stdlib:map_get(erlang:element(6, State), Replica) of
        {ok, down} ->
            New_replicas = gleam@dict:insert(
                erlang:element(6, State),
                Replica,
                up
            ),
            New_state = {state,
                erlang:element(2, State),
                erlang:element(3, State),
                erlang:element(4, State),
                erlang:element(5, State),
                New_replicas},
            Restored = entries_for_replica(State, Replica),
            Diff = {diff, entries_to_topic_diff(Restored), maps:new()},
            {New_state, Diff};

        {ok, up} ->
            {State, {diff, maps:new(), maps:new()}};

        {error, _} ->
            New_replicas@1 = gleam@dict:insert(
                erlang:element(6, State),
                Replica,
                up
            ),
            {{state,
                    erlang:element(2, State),
                    erlang:element(3, State),
                    erlang:element(4, State),
                    erlang:element(5, State),
                    New_replicas@1},
                {diff, maps:new(), maps:new()}}
    end.

-file("src/lattice_presence/presence_state.gleam", 471).
?DOC(" Permanently remove all entries and context for a downed replica\n").
-spec remove_down_replica(state(), binary()) -> state().
remove_down_replica(State, Replica) ->
    New_values = gleam@dict:filter(
        erlang:element(5, State),
        fun(Tag, _) -> erlang:element(2, Tag) /= Replica end
    ),
    New_context = gleam@dict:delete(erlang:element(3, State), Replica),
    New_clouds = gleam@dict:delete(erlang:element(4, State), Replica),
    New_replicas = gleam@dict:delete(erlang:element(6, State), Replica),
    {state,
        erlang:element(2, State),
        New_context,
        New_clouds,
        New_values,
        New_replicas}.

-file("src/lattice_presence/presence_state.gleam", 487).
?DOC(false).
-spec replicated_parts(state()) -> {binary(),
    gleam@dict:dict(binary(), integer()),
    gleam@dict:dict(binary(), gleam@set:set(integer())),
    gleam@dict:dict(tag(), entry())}.
replicated_parts(State) ->
    {erlang:element(2, State),
        erlang:element(3, State),
        erlang:element(4, State),
        erlang:element(5, State)}.

-file("src/lattice_presence/presence_state.gleam", 499).
?DOC(false).
-spec from_replicated_parts(
    binary(),
    gleam@dict:dict(binary(), integer()),
    gleam@dict:dict(binary(), gleam@set:set(integer())),
    gleam@dict:dict(tag(), entry())
) -> state().
from_replicated_parts(Replica, Context, Clouds, Values) ->
    {state, Replica, Context, Clouds, Values, maps:from_list([{Replica, up}])}.