%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
-module(barrel_p2p_ormap).
%% A last-write-wins map with HLC-tagged tombstones, suitable for
%% replicating the service registry.
%%
%% Each key holds either a `value' entry (a payload plus the set of
%% dots that have written it) or a `tombstone' entry (an HLC marking
%% the time of removal). Merge keeps the entry with the greater
%% HLC (ties between concurrent values broken deterministically by the
%% node atom, so the merge is commutative and replicas never diverge); a
%% tombstone newer than any value wins, and an add newer than
%% any tombstone wins. The two outcomes are symmetric, so delayed
%% gossip cannot resurrect a removed entry, nor can a delayed remove
%% silently drop a fresher add.
%%
%% This is not a strict CRDT-textbook OR-Map (we do not track
%% per-add dot history through removes). For the service-registry
%% use case it gives the property that matters: register/unregister
%% ordering converges under reorder, partition, and replay.
-include_lib("hlc/include/hlc.hrl").
-export([new/0, add/3, remove/2, get/2, keys/1, to_list/1]).
-export([merge/2, is_empty/1]).
-export([get_entry/2]).
-export([absorb_clock/1, gc_tombstones/2]).
-type dot() :: {node(), barrel_p2p_hlc:timestamp()}.
-type value_entry() :: {value, term(), #{dot() => true}}.
-type tombstone_entry() :: {tombstone, barrel_p2p_hlc:timestamp()}.
-type entry() :: value_entry() | tombstone_entry().
-type ormap() :: #{term() => entry()}.
-export_type([ormap/0, dot/0, entry/0]).
%%====================================================================
%% API
%%====================================================================
%% Create a new empty OR-Map.
-spec new() -> ormap().
new() -> #{}.
%% Add a key-value pair with a fresh dot. If the current entry is a
%% tombstone newer than the add's HLC, the add is silently ignored;
%% the tombstone won.
-spec add(term(), term(), ormap()) -> ormap().
add(Key, Value, Map) ->
Dot = {node(), barrel_p2p_hlc:now()},
DotHLC = dot_hlc(Dot),
case maps:get(Key, Map, undefined) of
undefined ->
Map#{Key => {value, Value, #{Dot => true}}};
{tombstone, T} ->
case barrel_p2p_hlc:compare(DotHLC, T) of
gt -> Map#{Key => {value, Value, #{Dot => true}}};
_ -> Map
end;
{value, _OldValue, Dots} ->
Map#{Key => {value, Value, Dots#{Dot => true}}}
end.
%% Remove a key by writing a tombstone tagged with the current HLC.
%% A subsequent add with a strictly greater HLC wins; a stale
%% delayed add never resurrects this entry.
-spec remove(term(), ormap()) -> ormap().
remove(Key, Map) ->
Map#{Key => {tombstone, barrel_p2p_hlc:now()}}.
%% Get the live value for a key. Tombstones return not_found.
-spec get(term(), ormap()) -> {ok, term()} | not_found.
get(Key, Map) ->
case maps:get(Key, Map, undefined) of
{value, Value, _Dots} -> {ok, Value};
_ -> not_found
end.
%% Get the full entry (live value or tombstone) for a key. Used by
%% callers that need to inspect dot history; ordinary lookups should
%% use get/2.
-spec get_entry(term(), ormap()) -> {ok, entry()} | not_found.
get_entry(Key, Map) ->
case maps:get(Key, Map, undefined) of
undefined -> not_found;
Entry -> {ok, Entry}
end.
%% Keys with live entries. Tombstones are not listed.
-spec keys(ormap()) -> [term()].
keys(Map) ->
[K || {K, {value, _, _}} <- maps:to_list(Map)].
%% Live key-value pairs. Tombstones are skipped.
-spec to_list(ormap()) -> [{term(), term()}].
to_list(Map) ->
[{K, V} || {K, {value, V, _Dots}} <- maps:to_list(Map)].
%% A map is empty when it has no live entries; tombstones do not
%% count.
-spec is_empty(ormap()) -> boolean().
is_empty(Map) ->
lists:all(
fun
({_K, {value, _, _}}) -> false;
(_) -> true
end,
maps:to_list(Map)
).
%% Advance the local HLC from every dot and tombstone in an incoming
%% map, so a value merged from a peer cannot later be out-ranked by a
%% locally generated timestamp that is behind it. Callers that merge a
%% whole received map must call this BEFORE `merge/2'. Callers that
%% reject some entries (e.g. on a freshness/skew check) must filter
%% first and absorb only the accepted sub-map: `barrel_p2p_hlc:update/1'
%% accepts future timestamps, so absorbing a rejected far-future dot
%% would still move the clock forward.
-spec absorb_clock(ormap()) -> ok.
absorb_clock(Map) ->
maps:foreach(
fun
(_Key, {value, _Val, Dots}) ->
lists:foreach(
fun({_Node, HLC}) -> barrel_p2p_hlc:update(HLC) end,
maps:keys(Dots)
);
(_Key, {tombstone, HLC}) ->
barrel_p2p_hlc:update(HLC)
end,
Map
),
ok.
%% Drop tombstones whose wall-clock time is older than `CutoffWallMs'.
%% Live value entries are never touched. This bounds the map for
%% high-churn callers (e.g. reminders) where every remove leaves a
%% tombstone. It is a best-effort shrink, not a correctness operation: a
%% re-arriving tombstone is idempotent, and the cutoff must be chosen so
%% no add older than a dropped tombstone can still be in flight.
-spec gc_tombstones(ormap(), non_neg_integer()) -> ormap().
gc_tombstones(Map, CutoffWallMs) ->
maps:filter(
fun
(_Key, {tombstone, HLC}) ->
barrel_p2p_hlc:wall_time(HLC) >= CutoffWallMs;
(_Key, _Value) ->
true
end,
Map
).
%% Merge two OR-Maps. Commutative, associative, idempotent.
-spec merge(ormap(), ormap()) -> ormap().
merge(Map1, Map2) ->
Keys = lists:usort(maps:keys(Map1) ++ maps:keys(Map2)),
lists:foldl(
fun(Key, Acc) ->
Acc#{
Key => merge_entry(
maps:get(Key, Map1, undefined),
maps:get(Key, Map2, undefined)
)
}
end,
#{},
Keys
).
%%====================================================================
%% Internal
%%====================================================================
merge_entry(undefined, E) ->
E;
merge_entry(E, undefined) ->
E;
merge_entry({value, V1, D1}, {value, V2, D2}) ->
MergedDots = maps:merge(D1, D2),
%% Last-write-wins by the maximum dot. Compare the full dot
%% ({Node, HLC}), not just the HLC: when two concurrent writes land on
%% the same HLC (common on a single host, same millisecond) the node
%% atom breaks the tie. That keeps the merge commutative, so every
%% replica resolves the conflict to the same value instead of diverging
%% on argument order.
case dot_compare(max_dot(D1), max_dot(D2)) of
gt -> {value, V1, MergedDots};
_ -> {value, V2, MergedDots}
end;
merge_entry({tombstone, T1}, {tombstone, T2}) ->
case barrel_p2p_hlc:compare(T1, T2) of
gt -> {tombstone, T1};
_ -> {tombstone, T2}
end;
merge_entry({tombstone, T} = Tomb, {value, _, D} = V) ->
case barrel_p2p_hlc:compare(T, max_hlc(D)) of
gt -> Tomb;
_ -> V
end;
merge_entry({value, _, D} = V, {tombstone, T} = Tomb) ->
case barrel_p2p_hlc:compare(T, max_hlc(D)) of
gt -> Tomb;
_ -> V
end.
dot_hlc({_Node, HLC}) -> HLC.
%% Total order on dots: HLC first, then the node atom as a deterministic
%% tiebreak so two writes with an equal HLC resolve identically on every
%% replica (the merge stays commutative).
dot_compare({Na, Ha}, {Nb, Hb}) ->
case barrel_p2p_hlc:compare(Ha, Hb) of
eq -> compare_node(Na, Nb);
Other -> Other
end.
compare_node(N, N) -> eq;
compare_node(Na, Nb) when Na > Nb -> gt;
compare_node(_, _) -> lt.
%% The greatest dot in a non-empty dot set, by dot_compare/2.
max_dot(Dots) ->
[First | Rest] = maps:keys(Dots),
lists:foldl(
fun(Dot, Acc) ->
case dot_compare(Dot, Acc) of
gt -> Dot;
_ -> Acc
end
end,
First,
Rest
).
%% Maximum HLC across a non-empty dot set.
max_hlc(Dots) ->
[First | Rest] = maps:keys(Dots),
lists:foldl(
fun({_, HLC}, Acc) ->
case barrel_p2p_hlc:compare(HLC, Acc) of
gt -> HLC;
_ -> Acc
end
end,
dot_hlc(First),
Rest
).