Skip to main content

src/barrel_rep_alg.erl

%%%-------------------------------------------------------------------
%%% @doc barrel_rep_alg - Replication algorithm
%%%
%%% Implements the core replication algorithm for synchronizing
%%% documents between source and target databases.
%%% @end
%%%-------------------------------------------------------------------
-module(barrel_rep_alg).

-include("barrel_docdb.hrl").

%% API
-export([
    replicate/5,
    replicate_batch/5
]).

%%====================================================================
%% API
%%====================================================================

%% @doc Replicate a list of changes from source to target
-spec replicate(term(), term(), module(), module(), [map()]) ->
    {ok, map()}.
replicate(Source, Target, SourceTransport, TargetTransport, Changes) ->
    ExtraAttrs = #{
        <<"replication.changes_count">> => length(Changes),
        <<"replication.source_transport">> => atom_to_binary(SourceTransport, utf8),
        <<"replication.target_transport">> => atom_to_binary(TargetTransport, utf8)
    },
    barrel_trace:with_db_span(replication, undefined, ExtraAttrs, fun() ->
        Stats = new_stats(),
        {ok, Stats2} = lists:foldl(
            fun(Change, {ok, Acc}) ->
                %% Sync HLC from change to target (for distributed ordering)
                _ = maybe_sync_hlc(Target, TargetTransport, Change),
                sync_change(Source, Target, SourceTransport, TargetTransport, Change, Acc)
            end,
            {ok, Stats},
            Changes
        ),
        {ok, Stats2}
    end).

%% @doc Replicate changes in batches with checkpoint callback
-spec replicate_batch(term(), term(), module(), module(), map()) ->
    {ok, map()}.
replicate_batch(Source, Target, SourceTransport, TargetTransport, Opts) ->
    Since = maps:get(since, Opts, first),
    Limit = maps:get(batch_size, Opts, 100),
    CheckpointFun = maps:get(checkpoint_fun, Opts, fun(_) -> ok end),

    Stats = new_stats(),
    replicate_loop(Source, Target, SourceTransport, TargetTransport, Since, Limit, CheckpointFun, Stats).

%%====================================================================
%% Internal functions
%%====================================================================

%% @doc Main replication loop
replicate_loop(Source, Target, SourceTransport, TargetTransport, Since, Limit, CheckpointFun, Stats) ->
    GetOpts = #{limit => Limit},
    case SourceTransport:get_changes(Source, Since, GetOpts) of
        {ok, [], _LastSeq} ->
            %% No more changes
            {ok, Stats};
        {ok, Changes, LastSeq} ->
            %% Process this batch
            {ok, Stats2} = replicate(Source, Target, SourceTransport, TargetTransport, Changes),
            MergedStats = merge_stats(Stats, Stats2),

            %% Call checkpoint function
            ok = CheckpointFun(LastSeq),

            %% Continue with next batch
            replicate_loop(Source, Target, SourceTransport, TargetTransport, LastSeq, Limit, CheckpointFun, MergedStats);
        {error, Reason} ->
            {error, Reason}
    end.

%% @doc Sync a single change
sync_change(Source, Target, SourceTransport, TargetTransport, Change, Stats) ->
    DocId = get_value(id, Change),
    ChangeRevs = get_value(changes, Change, []),

    %% Extract revision IDs from changes
    RevIds = [get_value(rev, R) || R <- ChangeRevs],

    %% Find which revisions target is missing
    case TargetTransport:revsdiff(Target, DocId, RevIds) of
        {ok, [], _Ancestors} ->
            %% Target has all revisions
            {ok, Stats};
        {ok, MissingRevisions, _Ancestors} ->
            %% Sync each missing revision
            Stats2 = lists:foldl(
                fun(Revision, Acc) ->
                    sync_revision(Source, Target, SourceTransport, TargetTransport, DocId, Revision, Acc)
                end,
                Stats,
                MissingRevisions
            ),
            {ok, Stats2};
        {error, _} = Error ->
            logger:warning("revsdiff failed for ~s: ~p", [DocId, Error]),
            {ok, inc_stat(doc_read_failures, Stats, 1)}
    end.

%% @doc Sync a single revision
sync_revision(Source, Target, SourceTransport, TargetTransport, DocId, Revision, Stats) ->
    case read_doc_with_history(Source, SourceTransport, DocId, Revision, Stats) of
        {undefined, undefined, Stats2} ->
            Stats2;
        {Doc, Meta, Stats2} ->
            History = parse_revisions(Meta),
            Deleted = maps:get(<<"deleted">>, Meta, false),
            write_doc(Target, TargetTransport, Doc, History, Deleted, Stats2)
    end.

%% @doc Read document with revision history from source
read_doc_with_history(Source, SourceTransport, DocId, Rev, Stats) ->
    StartTime = erlang:monotonic_time(microsecond),
    case SourceTransport:get_doc(Source, DocId, #{rev => Rev, history => true}) of
        {ok, Doc, Meta} ->
            Time = erlang:monotonic_time(microsecond) - StartTime,
            Stats2 = inc_stat(docs_read, Stats, 1),
            Stats3 = update_time_stat(doc_read_time_us, Time, Stats2),
            {Doc, Meta, Stats3};
        {error, _} ->
            Stats2 = inc_stat(doc_read_failures, Stats, 1),
            {undefined, undefined, Stats2}
    end.

%% @doc Write document with history to target
write_doc(_Target, _TargetTransport, undefined, _History, _Deleted, Stats) ->
    Stats;
write_doc(Target, TargetTransport, Doc, History, Deleted, Stats) ->
    StartTime = erlang:monotonic_time(microsecond),
    case TargetTransport:put_rev(Target, Doc, History, Deleted) of
        {ok, _DocId, _Rev} ->
            Time = erlang:monotonic_time(microsecond) - StartTime,
            Stats2 = inc_stat(docs_written, Stats, 1),
            update_time_stat(doc_write_time_us, Time, Stats2);
        {error, Reason} ->
            DocId = maps:get(<<"id">>, Doc, undefined),
            logger:error("replicate write error for ~p: ~p", [DocId, Reason]),
            inc_stat(doc_write_failures, Stats, 1)
    end.

%% @doc Parse revisions from metadata
parse_revisions(#{<<"revisions">> := Revisions}) ->
    Start = maps:get(<<"start">>, Revisions),
    Ids = maps:get(<<"ids">>, Revisions),
    lists:zipwith(
        fun(Gen, Hash) ->
            iolist_to_binary([integer_to_binary(Gen), "-", Hash])
        end,
        lists:seq(Start, Start - length(Ids) + 1, -1),
        Ids
    );
parse_revisions(#{<<"rev">> := Rev}) ->
    [Rev];
parse_revisions(_) ->
    [].

%%====================================================================
%% Stats management
%%====================================================================

new_stats() ->
    #{
        docs_read => 0,
        doc_read_failures => 0,
        docs_written => 0,
        doc_write_failures => 0,
        doc_read_time_us => 0,
        doc_write_time_us => 0
    }.

inc_stat(Key, Stats, N) ->
    maps:update_with(Key, fun(V) -> V + N end, Stats).

update_time_stat(Key, Time, Stats) ->
    maps:update_with(Key, fun(V) -> V + Time end, Stats).

merge_stats(Stats1, Stats2) ->
    maps:merge_with(fun(_K, V1, V2) -> V1 + V2 end, Stats1, Stats2).

%% @doc Get value from map, trying atom and binary keys
get_value(Key, Map) ->
    get_value(Key, Map, undefined).

get_value(Key, Map, Default) when is_atom(Key) ->
    BinKey = atom_to_binary(Key, utf8),
    case maps:get(Key, Map, undefined) of
        undefined -> maps:get(BinKey, Map, Default);
        Value -> Value
    end.

%% @doc Sync HLC from change to target if present
%% This ensures the target database clock reflects causality with source events.
maybe_sync_hlc(Target, TargetTransport, Change) ->
    case get_value(hlc, Change) of
        undefined ->
            ok;
        Hlc ->
            %% Try to sync - ignore errors (transport may not support it)
            try
                _ = TargetTransport:sync_hlc(Target, Hlc),
                ok
            catch
                error:undef -> ok;
                _:_ -> ok
            end
    end.