Skip to main content

src/barrel_db_server.erl

%%%-------------------------------------------------------------------
%%% @doc barrel_db_server - Individual database server process
%%%
%%% Manages a single database instance. Each database has its own
%%% gen_server process that handles all operations for that database.
%%% Opens both a document store (regular RocksDB) and an attachment
%%% store (RocksDB with BlobDB enabled).
%%% @end
%%%-------------------------------------------------------------------
-module(barrel_db_server).

-behaviour(gen_server).

%% API
-export([start_link/2]).
-export([info/1, stop/1]).
-export([get_store_ref/1, get_att_ref/1]).

%% Document API
-export([
    put_doc/3,
    put_docs/3,
    get_doc/3,
    get_docs/3,
    delete_doc/3,
    fold_docs/3,
    fold_docs/4,
    resolve_conflict/4,
    get_conflicts/2
]).

%% Replication API
-export([
    put_rev/4,
    revsdiff/3,
    revsdiff_batch/2
]).

%% Local document API (for checkpoints, not replicated)
-export([
    put_local_doc/3,
    get_local_doc/2,
    delete_local_doc/2,
    fold_local_docs/4
]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2]).

-record(state, {
    name :: binary(),
    config :: map(),
    db_path :: string(),
    store_ref :: barrel_store_rocksdb:db_ref() | undefined,
    att_ref :: barrel_att_store:att_ref() | undefined,
    filter_pid :: pid() | undefined,  %% Compaction filter handler for this database
    compaction_timer :: reference() | undefined,  %% Timer for periodic compaction checks
    compaction_interval :: pos_integer(),  %% Interval between checks in ms (default: 1 hour)
    compaction_size_threshold :: pos_integer()  %% Size threshold in bytes to trigger compaction
}).

%% Default compaction settings
-define(DEFAULT_COMPACTION_INTERVAL, 3600000).  %% 1 hour in ms
-define(DEFAULT_COMPACTION_SIZE_THRESHOLD, 1073741824).  %% 1 GB in bytes

%%====================================================================
%% API functions
%%====================================================================

%% @doc Start the database server
-spec start_link(binary(), map()) -> {ok, pid()} | {error, term()}.
start_link(Name, Config) ->
    gen_server:start_link(?MODULE, [Name, Config], []).

%% @doc Get database info
-spec info(pid()) -> {ok, map()} | {error, term()}.
info(Pid) ->
    gen_server:call(Pid, info).

%% @doc Get the document store reference
-spec get_store_ref(pid()) -> {ok, barrel_store_rocksdb:db_ref()} | {error, term()}.
get_store_ref(Pid) ->
    gen_server:call(Pid, get_store_ref).

%% @doc Get the attachment store reference
-spec get_att_ref(pid()) -> {ok, barrel_att_store:att_ref()} | {error, term()}.
get_att_ref(Pid) ->
    gen_server:call(Pid, get_att_ref).


%% @doc Stop the database server
-spec stop(pid()) -> ok.
stop(Pid) ->
    gen_server:stop(Pid).

%%====================================================================
%% Document API functions
%%====================================================================

%% @doc Put a document (create or update)
-spec put_doc(pid(), map(), map()) -> {ok, map()} | {error, term()}.
put_doc(Pid, Doc, Opts) ->
    gen_server:call(Pid, {put_doc, Doc, Opts}).

%% @doc Put multiple documents (batch write)
-spec put_docs(pid(), [map()], map()) -> [{ok, map()} | {error, term()}].
put_docs(Pid, Docs, Opts) ->
    gen_server:call(Pid, {put_docs, Docs, Opts}).

%% @doc Get a document
%% When raw_body => true in Opts, returns {ok, CborBin, Meta} for zero-copy responses
-spec get_doc(pid(), binary(), map()) -> {ok, map()} | {ok, binary(), map()} | {error, not_found} | {error, term()}.
get_doc(Pid, DocId, Opts) ->
    gen_server:call(Pid, {get_doc, DocId, Opts}).

%% @doc Get multiple documents (batch read)
-spec get_docs(pid(), [binary()], map()) -> [{ok, map()} | {error, not_found} | {error, term()}].
get_docs(Pid, DocIds, Opts) ->
    gen_server:call(Pid, {get_docs, DocIds, Opts}).

%% @doc Delete a document
-spec delete_doc(pid(), binary(), map()) -> {ok, map()} | {error, term()}.
delete_doc(Pid, DocId, Opts) ->
    gen_server:call(Pid, {delete_doc, DocId, Opts}).

%% @doc Fold over all documents
-spec fold_docs(pid(), fun(), term()) -> {ok, term()}.
fold_docs(Pid, Fun, Acc) ->
    gen_server:call(Pid, {fold_docs, Fun, Acc}, infinity).

%% @doc Fold over all documents with options
-spec fold_docs(pid(), fun(), term(), map()) -> {ok, term()}.
fold_docs(Pid, Fun, Acc, Opts) when is_map(Opts) ->
    gen_server:call(Pid, {fold_docs, Fun, Acc, Opts}, infinity).

%% @doc Get conflicts for a document
%% Returns list of conflicting revision IDs (excluding the winner)
-spec get_conflicts(pid(), binary()) -> {ok, [binary()]} | {error, term()}.
get_conflicts(Pid, DocId) ->
    gen_server:call(Pid, {get_conflicts, DocId}).

%% @doc Resolve a conflict by choosing a winning revision or providing a merged doc
%% Resolution can be:
%%   - {choose, Rev} - keep this revision as winner, delete other branches
%%   - {merge, Doc} - create new revision merging all conflicts
-spec resolve_conflict(pid(), binary(), binary(), {choose, binary()} | {merge, map()}) ->
    {ok, map()} | {error, term()}.
resolve_conflict(Pid, DocId, BaseRev, Resolution) ->
    gen_server:call(Pid, {resolve_conflict, DocId, BaseRev, Resolution}).

%%====================================================================
%% Replication API functions
%%====================================================================

%% @doc Put a document with explicit revision history (for replication)
%% Returns {ok, DocId, RevId} on success.
-spec put_rev(pid(), map(), [binary()], boolean()) -> {ok, binary(), binary()} | {error, term()}.
put_rev(Pid, Doc, History, Deleted) ->
    gen_server:call(Pid, {put_rev, Doc, History, Deleted}).

%% @doc Get revisions difference (for replication)
%% Returns {ok, Missing, PossibleAncestors}
-spec revsdiff(pid(), binary(), [binary()]) -> {ok, [binary()], [binary()]}.
revsdiff(Pid, DocId, RevIds) ->
    gen_server:call(Pid, {revsdiff, DocId, RevIds}).

%% @doc Get revisions difference for multiple documents (batch)
%% Takes a map of DocId => [RevIds] and returns a map of
%% DocId => #{missing => [...], possible_ancestors => [...]}
-spec revsdiff_batch(pid(), #{binary() => [binary()]}) ->
    {ok, #{binary() => #{missing => [binary()], possible_ancestors => [binary()]}}}.
revsdiff_batch(Pid, RevsMap) when is_map(RevsMap) ->
    gen_server:call(Pid, {revsdiff_batch, RevsMap}).

%%====================================================================
%% Local Document API functions
%%====================================================================

%% @doc Put a local document (not replicated)
-spec put_local_doc(pid(), binary(), map()) -> ok | {error, term()}.
put_local_doc(Pid, DocId, Doc) ->
    gen_server:call(Pid, {put_local_doc, DocId, Doc}).

%% @doc Get a local document
-spec get_local_doc(pid(), binary()) -> {ok, map()} | {error, not_found}.
get_local_doc(Pid, DocId) ->
    gen_server:call(Pid, {get_local_doc, DocId}).

%% @doc Delete a local document
-spec delete_local_doc(pid(), binary()) -> ok | {error, not_found}.
delete_local_doc(Pid, DocId) ->
    gen_server:call(Pid, {delete_local_doc, DocId}).

%% @doc Fold over local documents with a given prefix
-spec fold_local_docs(pid(), binary(), fun((binary(), map(), term()) -> term()), term()) ->
    {ok, term()} | {error, term()}.
fold_local_docs(Pid, Prefix, Fun, Acc) ->
    gen_server:call(Pid, {fold_local_docs, Prefix, Fun, Acc}).

%%====================================================================
%% gen_server callbacks
%%====================================================================

%% @doc Initialize the database server
init([Name, Config]) ->
    process_flag(trap_exit, true),

    %% Get data directory from config. Fall back to the `data_dir' app env so a
    %% node can relocate ALL of its dbs (including internal ones created with no
    %% opts) off the shared default - lets two local nodes use distinct dirs.
    DefaultDataDir = application:get_env(barrel_docdb, data_dir, "/tmp/barrel_data"),
    DataDir = maps:get(data_dir, Config, DefaultDataDir),
    DbPath = filename:join([DataDir, binary_to_list(Name)]),

    %% Start compaction filter handler BEFORE opening RocksDB
    %% (handler pid is passed to CF options)
    PruneDepth = maps:get(prune_depth, Config, 1000),
    {ok, FilterPid} = barrel_compaction_filter:start_link(#{
        db_name => Name,
        prune_depth => PruneDepth
    }),

    %% Open document store (RocksDB with column families including body CF with BlobDB)
    DocStorePath = filename:join(DbPath, "docs"),
    StoreOpts0 = maps:get(store_opts, Config, #{}),
    %% Pass the filter handler to the store options
    StoreOpts = StoreOpts0#{compaction_filter_handler => FilterPid},
    case barrel_store_rocksdb:open(DocStorePath, StoreOpts) of
        {ok, StoreRef} ->
            %% Open attachment store (separate RocksDB with BlobDB)
            AttStorePath = filename:join(DbPath, "attachments"),
            AttOpts = maps:get(att_opts, Config, #{}),
            case barrel_att_store:open(AttStorePath, AttOpts) of
                {ok, AttRef} ->
                    %% Register in persistent_term for lookup
                    %% barrel_store is used by barrel_doc_body_store for body CF access
                    %% and by compaction filter handler for body deletion
                    persistent_term:put({barrel_db, Name}, self()),
                    persistent_term:put({barrel_store, Name}, StoreRef),

                    %% Compaction settings from config (or defaults)
                    CompactionInterval = maps:get(compaction_interval, Config,
                                                  ?DEFAULT_COMPACTION_INTERVAL),
                    CompactionThreshold = maps:get(compaction_size_threshold, Config,
                                                   ?DEFAULT_COMPACTION_SIZE_THRESHOLD),

                    %% Start periodic compaction check timer
                    TimerRef = erlang:send_after(CompactionInterval, self(), compaction_check),

                    logger:info("Database ~s started at ~s", [Name, DbPath]),
                    {ok, #state{
                        name = Name,
                        config = Config,
                        db_path = DbPath,
                        store_ref = StoreRef,
                        att_ref = AttRef,
                        filter_pid = FilterPid,
                        compaction_timer = TimerRef,
                        compaction_interval = CompactionInterval,
                        compaction_size_threshold = CompactionThreshold
                    }};
                {error, AttReason} ->
                    %% Close document store if attachment store fails
                    barrel_store_rocksdb:close(StoreRef),
                    exit(FilterPid, shutdown),
                    {stop, {att_store_open_failed, AttReason}}
            end;
        {error, Reason} ->
            exit(FilterPid, shutdown),
            {stop, {store_open_failed, Reason}}
    end.

%% @doc Handle synchronous calls
handle_call(info, _From, #state{name = Name, config = Config, db_path = DbPath} = State) ->
    Info = #{
        name => Name,
        config => Config,
        db_path => DbPath,
        pid => self()
    },
    {reply, {ok, Info}, State};

handle_call(get_store_ref, _From, #state{store_ref = StoreRef} = State) ->
    {reply, {ok, StoreRef}, State};

handle_call(get_att_ref, _From, #state{att_ref = AttRef} = State) ->
    {reply, {ok, AttRef}, State};

%% Document operations
handle_call({put_doc, Doc, Opts}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_put_doc(StoreRef, DbName, Doc, Opts),
    {reply, Result, State};

handle_call({put_docs, Docs, Opts}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_put_docs(StoreRef, DbName, Docs, Opts),
    {reply, Result, State};

handle_call({get_doc, DocId, Opts}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_get_doc(StoreRef, DbName, DocId, Opts),
    {reply, Result, State};

handle_call({get_docs, DocIds, Opts}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_get_docs(StoreRef, DbName, DocIds, Opts),
    {reply, Result, State};

handle_call({delete_doc, DocId, Opts}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_delete_doc(StoreRef, DbName, DocId, Opts),
    {reply, Result, State};

handle_call({fold_docs, Fun, Acc}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_fold_docs(StoreRef, DbName, Fun, Acc, #{}),
    {reply, Result, State};

handle_call({fold_docs, Fun, Acc, Opts}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_fold_docs(StoreRef, DbName, Fun, Acc, Opts),
    {reply, Result, State};

%% Conflict operations
handle_call({get_conflicts, DocId}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_get_conflicts(StoreRef, DbName, DocId),
    {reply, Result, State};

handle_call({resolve_conflict, DocId, BaseRev, Resolution}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_resolve_conflict(StoreRef, DbName, DocId, BaseRev, Resolution),
    {reply, Result, State};

%% Replication operations
handle_call({put_rev, Doc, History, Deleted}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_put_rev(StoreRef, DbName, Doc, History, Deleted),
    {reply, Result, State};

handle_call({revsdiff, DocId, RevIds}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_revsdiff(StoreRef, DbName, DocId, RevIds),
    {reply, Result, State};

handle_call({revsdiff_batch, RevsMap}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_revsdiff_batch(StoreRef, DbName, RevsMap),
    {reply, Result, State};

%% Local document operations
handle_call({put_local_doc, DocId, Doc}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_put_local_doc(StoreRef, DbName, DocId, Doc),
    {reply, Result, State};

handle_call({get_local_doc, DocId}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_get_local_doc(StoreRef, DbName, DocId),
    {reply, Result, State};

handle_call({delete_local_doc, DocId}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_delete_local_doc(StoreRef, DbName, DocId),
    {reply, Result, State};

handle_call({fold_local_docs, Prefix, Fun, Acc}, _From,
            #state{name = DbName, store_ref = StoreRef} = State) ->
    Result = do_fold_local_docs(StoreRef, DbName, Prefix, Fun, Acc),
    {reply, Result, State};

handle_call(_Request, _From, State) ->
    {reply, {error, unknown_request}, State}.

%% @doc Handle asynchronous casts
handle_cast(_Msg, State) ->
    {noreply, State}.

%% @doc Handle other messages
handle_info(compaction_check, #state{store_ref = StoreRef,
                                      compaction_interval = Interval,
                                      compaction_size_threshold = Threshold} = State) ->
    %% Check database size and trigger compaction if needed
    NewState = do_check_compaction(StoreRef, Threshold, State),
    %% Restart timer for next check
    TimerRef = erlang:send_after(Interval, self(), compaction_check),
    {noreply, NewState#state{compaction_timer = TimerRef}};

handle_info(_Info, State) ->
    {noreply, State}.

%% @doc Clean up when terminating
terminate(_Reason, #state{name = Name, store_ref = StoreRef, att_ref = AttRef,
                          filter_pid = FilterPid,
                          compaction_timer = CompactionTimer}) ->
    %% Cancel compaction timer 
    _ = 
      case CompactionTimer of
          undefined -> ok;
          _ -> erlang:cancel_timer(CompactionTimer)
      end,
    %% Stop compaction filter handler
    _ = 
      case FilterPid of
          undefined -> ok;
          _ ->
              try exit(FilterPid, shutdown)
              catch _:_ -> ok
              end
      end,
    %% Close attachment store
    _ = 
      case AttRef of
          undefined -> ok;
          _ -> barrel_att_store:close(AttRef)
      end,
    %% Close document store (includes body CF)
    _ = 
      case StoreRef of
          undefined -> ok;
          _ -> barrel_store_rocksdb:close(StoreRef)
      end,
    %% Unregister
    persistent_term:erase({barrel_db, Name}),
    persistent_term:erase({barrel_store, Name}),
    logger:info("Database ~s stopped", [Name]),
    ok.

%%====================================================================
%% Internal functions
%%====================================================================

%% @doc Check database size and trigger compaction if threshold exceeded
%% Called periodically by the compaction_check timer.
do_check_compaction(StoreRef, Threshold, State) ->
    #state{name = Name} = State,
    case barrel_store_rocksdb:get_db_size(StoreRef) of
        {ok, Size} when Size >= Threshold ->
            logger:info("Database ~s size ~p bytes exceeds threshold ~p, triggering compaction",
                        [Name, Size, Threshold]),
            case barrel_store_rocksdb:compact_default_cf(StoreRef) of
                ok ->
                    logger:info("Database ~s compaction completed", [Name]),
                    State;
                {error, Reason} ->
                    logger:warning("Database ~s compaction failed: ~p", [Name, Reason]),
                    State
            end;
        {ok, Size} ->
            logger:debug("Database ~s size ~p bytes below threshold ~p, skipping compaction",
                         [Name, Size, Threshold]),
            State;
        {error, Reason} ->
            logger:warning("Failed to get database ~s size: ~p", [Name, Reason]),
            State
    end.

%%====================================================================
%% Document Operations
%%====================================================================

%% Wide column names for document entity
-define(COL_REV, <<"rev">>).
-define(COL_DELETED, <<"del">>).
-define(COL_HLC, <<"hlc">>).
-define(COL_REVTREE, <<"revtree">>).
%% Reserved entity columns (default 0, preserved across writes).
%% The built-in tiering engine was removed; these are kept as on-disk
%% format-stable seams for an external tiering layer to use.
-define(COL_CREATED_AT, <<"created_at">>).
-define(COL_EXPIRES_AT, <<"expires_at">>).
-define(COL_TIER, <<"tier">>).

%% @doc Put a document (create or update)
%% Accepts: Erlang map, indexed CBOR binary, or plain CBOR binary
%% Options:
%%   - sync: boolean() - if true, sync to disk before returning (default: false)
do_put_doc(StoreRef, DbName, Doc, Opts) ->
    %% Normalize input: map, indexed binary, or plain CBOR -> map for processing
    DocMap = barrel_doc:to_map(Doc),
    %% Build document record from input
    DocRecord = barrel_doc:make_doc_record(DocMap),
    #{id := DocId, revs := Revs, deleted := Deleted, doc := DocBody} = DocRecord,
    [NewRev | _] = Revs,

    %% Check for existing document (wide column entity)
    DocEntityKey = barrel_store_keys:doc_entity(DbName, DocId),
    {OldHlc, OldRev, OldRevTree, OldDocBody, OldDocBodyCbor} = case barrel_store_rocksdb:get_entity(StoreRef, DocEntityKey) of
        {ok, Columns} ->
            ExistingRev = proplists:get_value(?COL_REV, Columns),
            ExistingHlc = barrel_hlc:decode(proplists:get_value(?COL_HLC, Columns)),
            OldTree = decode_revtree(proplists:get_value(?COL_REVTREE, Columns)),
            %% Get old doc body for path index diff from body CF (current body, no rev in key)
            %% Keep both raw CBOR (for archiving) and decoded map (for path indexing)
            {OldBody, OldBodyCbor} = case barrel_store_rocksdb:body_get(StoreRef,
                            barrel_store_keys:doc_body(DbName, DocId)) of
                {ok, OldCborBin} -> {barrel_docdb_codec_cbor:decode_any(OldCborBin), OldCborBin};
                not_found -> {undefined, undefined}
            end,
            {ExistingHlc, ExistingRev, OldTree, OldBody, OldBodyCbor};
        not_found ->
            {undefined, undefined, #{}, undefined, undefined}
    end,

    %% MVCC check: the client must supply the current winning revision
    %% when updating an existing document via the regular put_doc API.
    %% The replication path (bulk_docs with explicit `history`) carries
    %% no `hash` field on the doc record and is exempt here; conflict
    %% detection happens in the revtree merge.
    case mvcc_check(OldRev, Revs, DocRecord) of
        ok ->
            do_put_doc_apply(StoreRef, DbName, DocId, Revs, NewRev, Deleted,
                             DocBody, OldHlc, OldRev, OldRevTree, OldDocBody,
                             OldDocBodyCbor, Opts);
        {error, conflict} ->
            {error, conflict}
    end.

%% @doc MVCC pre-check for the regular (non-replication) put path.
%% - DocRecord without a `hash` key came through the bulk `history`
%%   constructor (replication / put_rev). Skip the check.
%% - No existing doc: accept regardless.
%% - Existing doc + supplied parent rev matches winner: accept.
%% - Otherwise: conflict.
mvcc_check(_OldRev, _Revs, DocRecord) when not is_map_key(hash, DocRecord) -> ok;
mvcc_check(undefined, _Revs, _DocRecord) -> ok;
mvcc_check(OldRev, [_NewRev, ParentRev | _], _DocRecord) when ParentRev =:= OldRev -> ok;
mvcc_check(_OldRev, _Revs, _DocRecord) -> {error, conflict}.

do_put_doc_apply(StoreRef, DbName, DocId, Revs, NewRev, Deleted, DocBody,
                 OldHlc, OldRev, OldRevTree, OldDocBody, OldDocBodyCbor, Opts) ->
    DocEntityKey = barrel_store_keys:doc_entity(DbName, DocId),

    %% Build revision tree (merge with existing)
    RevTree = case length(Revs) of
        1 ->
            %% New document
            OldRevTree#{NewRev => #{id => NewRev, parent => undefined, deleted => Deleted}};
        _ ->
            %% Update - build tree with history
            [CurRev, ParentRev | _] = Revs,
            OldRevTree#{
                ParentRev => #{id => ParentRev, parent => undefined, deleted => false},
                CurRev => #{id => CurRev, parent => ParentRev, deleted => Deleted}
            }
    end,

    %% Generate new HLC timestamp for this change
    NextHlc = barrel_hlc:new_hlc(),

    %% Prepare entity columns for wide-column storage.
    %% created_at/expires_at/tier are reserved columns (preserved across
    %% writes); the built-in tiering engine was removed.
    {CreatedAt, ExistingTier, ExistingExpires} = case OldHlc of
        undefined ->
            %% New document - set created_at to now, tier to hot (0)
            {barrel_hlc:encode(NextHlc), 0, 0};
        _ ->
            %% Update - preserve existing tier metadata
            case barrel_store_rocksdb:get_entity(StoreRef, DocEntityKey) of
                {ok, OldColumns} ->
                    {proplists:get_value(?COL_CREATED_AT, OldColumns, barrel_hlc:encode(OldHlc)),
                     proplists:get_value(?COL_TIER, OldColumns, 0),
                     proplists:get_value(?COL_EXPIRES_AT, OldColumns, 0)};
                _ ->
                    {barrel_hlc:encode(OldHlc), 0, 0}
            end
    end,
    DocColumns = [
        {?COL_REV, NewRev},
        {?COL_DELETED, deleted_to_bin(Deleted)},
        {?COL_HLC, barrel_hlc:encode(NextHlc)},
        {?COL_REVTREE, encode_revtree(RevTree)},
        {?COL_CREATED_AT, CreatedAt},
        {?COL_EXPIRES_AT, ExistingExpires},
        {?COL_TIER, ExistingTier}
    ],
    DocOps = [{entity_put, DocEntityKey, DocColumns}],

    %% Build DocInfo for change tracking (compatible with existing change feed)
    DocInfo = #{
        id => DocId,
        rev => NewRev,
        deleted => Deleted,
        revtree => RevTree,
        hlc => NextHlc
    },

    %% Delete old HLC entry if exists
    HlcDeleteOps = case OldHlc of
        undefined -> [];
        _ -> [{delete, barrel_store_keys:doc_hlc(DbName, OldHlc)}]
    end,

    %% Path index operations (if not deleted)
    PathIndexOps = case Deleted of
        true when OldDocBody =/= undefined ->
            %% Deleting a document - remove all paths
            case barrel_ars_index:get_doc_paths(StoreRef, DbName, DocId) of
                {ok, OldPaths} -> barrel_ars_index:remove_doc_ops(DbName, DocId, OldPaths);
                not_found -> []
            end;
        true ->
            %% New doc being created as deleted (edge case) - no paths to index
            [];
        false when OldDocBody =:= undefined ->
            %% New document - index all paths
            barrel_ars_index:index_doc_ops(DbName, DocId, DocBody);
        false ->
            case was_tombstone(OldRev, OldRevTree) of
                true ->
                    %% Re-creating a deleted document. Delete only removed the
                    %% path index, not the body CF, so a body diff would see no
                    %% change and never re-add the paths. Index from scratch.
                    barrel_ars_index:index_doc_ops(DbName, DocId, DocBody);
                false ->
                    %% Update document - compute diff and update paths
                    barrel_ars_index:update_doc_ops(DbName, DocId, OldDocBody, DocBody)
            end
    end,

    %% Change entry operations
    ChangeInfo = DocInfo#{doc => DocBody},
    ChangeOps = barrel_changes:write_change_ops(DbName, NextHlc, ChangeInfo),

    %% Path-indexed change operations (for efficient filtered queries)
    PathHlcOps = case OldHlc of
        undefined ->
            %% New document - create path index entries
            barrel_changes:write_path_index_ops(DbName, NextHlc, ChangeInfo);
        _ ->
            %% Update - remove old path entries, add new ones
            barrel_changes:update_path_index_ops(DbName, NextHlc, ChangeInfo,
                                                  OldHlc, OldDocBody)
    end,

    %% Write batch atomically (metadata + indexes + body)
    CborBody = barrel_docdb_codec_cbor:encode_cbor(DocBody),

    %% Archive old body to revision-keyed location if updating
    ArchiveOps = case {OldRev, OldDocBodyCbor} of
        {undefined, _} -> [];  %% New document, no archive needed
        {_, undefined} -> [];  %% No old body to archive
        {_, OldCbor} ->
            %% Move old body to revision-keyed location
            OldBodyKey = barrel_store_keys:doc_body_rev(DbName, DocId, OldRev),
            [{body_put, OldBodyKey, OldCbor}]
    end,

    %% Write new body to current location (no revision in key)
    BodyKey = barrel_store_keys:doc_body(DbName, DocId),
    BodyOp = {body_put, BodyKey, CborBody},
    AllOps = DocOps ++ HlcDeleteOps ++ PathIndexOps ++ ChangeOps ++ PathHlcOps ++ ArchiveOps ++ [BodyOp],
    Sync = maps:get(sync, Opts, false),
    WriteOpts = #{sync => Sync},
    ok = barrel_store_rocksdb:write_batch(StoreRef, AllOps, WriteOpts),

    %% Notify path subscribers
    notify_subscribers(DbName, DocId, NewRev, NextHlc, Deleted, DocBody),

    %% Return result
    Result = #{
        <<"id">> => DocId,
        <<"ok">> => true,
        <<"rev">> => NewRev
    },
    {ok, Result}.

%% @doc Put multiple documents in a single batch (batch write)
%% Options:
%%   - sync: boolean() - if true, sync to disk before returning (default: false)
do_put_docs(StoreRef, DbName, Docs, Opts) ->
    Sync = maps:get(sync, Opts, false),
    %% Process each document to build operations and metadata
    {AllOps, Notifications} = lists:foldl(
        fun(Doc, {OpsAcc, NotifyAcc}) ->
            case prepare_doc_ops(StoreRef, DbName, Doc) of
                {ok, Ops, NotifyInfo} ->
                    {OpsAcc ++ Ops, [NotifyInfo | NotifyAcc]};
                {error, _Reason} = Err ->
                    %% Skip failed docs, include error in notifications
                    {OpsAcc, [{error, Err} | NotifyAcc]}
            end
        end,
        {[], []},
        Docs
    ),

    %% Write all operations in a single batch (includes body CF writes)
    case AllOps of
        [] -> ok;
        _ ->
            WriteOpts = #{sync => Sync},
            ok = barrel_store_rocksdb:write_batch(StoreRef, AllOps, WriteOpts)
    end,

    %% Notify subscribers and build results (reverse to maintain order)
    Results = lists:map(
        fun({error, Err}) ->
            Err;
           ({DocId, NewRev, NextHlc, Deleted, DocBody}) ->
            notify_subscribers(DbName, DocId, NewRev, NextHlc, Deleted, DocBody),
            {ok, #{<<"id">> => DocId, <<"ok">> => true, <<"rev">> => NewRev}}
        end,
        lists:reverse(Notifications)
    ),
    Results.

%% @doc Prepare document operations without writing (using wide column entity)
%% Accepts: Erlang map, indexed CBOR binary, or plain CBOR binary
%% Returns {ok, Ops, NotifyInfo} or {error, Reason}
prepare_doc_ops(StoreRef, DbName, Doc) ->
    try
        %% Normalize input: map, indexed binary, or plain CBOR -> map for processing
        DocMap = barrel_doc:to_map(Doc),
        %% Build document record from input
        DocRecord = barrel_doc:make_doc_record(DocMap),
        #{id := DocId, revs := Revs, deleted := Deleted, doc := DocBody} = DocRecord,
        [NewRev | _] = Revs,

        %% Check for existing document (wide column entity)
        DocEntityKey = barrel_store_keys:doc_entity(DbName, DocId),
        {OldHlc, OldRev, OldRevTree, OldDocBody, OldCborBody, OldTierMeta} = case barrel_store_rocksdb:get_entity(StoreRef, DocEntityKey) of
            {ok, Columns} ->
                ExistingRev = proplists:get_value(?COL_REV, Columns),
                ExistingHlc = barrel_hlc:decode(proplists:get_value(?COL_HLC, Columns)),
                OldTree = decode_revtree(proplists:get_value(?COL_REVTREE, Columns)),
                %% Extract tier metadata
                TierMeta = {
                    proplists:get_value(?COL_CREATED_AT, Columns, <<>>),
                    proplists:get_value(?COL_EXPIRES_AT, Columns, 0),
                    proplists:get_value(?COL_TIER, Columns, 0)
                },
                %% Get old doc body from body CF (current body, no rev in key)
                {OldBody, OldCbor} = case barrel_store_rocksdb:body_get(StoreRef,
                                barrel_store_keys:doc_body(DbName, DocId)) of
                    {ok, OldCborBin} -> {barrel_docdb_codec_cbor:decode_any(OldCborBin), OldCborBin};
                    not_found -> {#{}, undefined}
                end,
                {ExistingHlc, ExistingRev, OldTree, OldBody, OldCbor, TierMeta};
            not_found ->
                {undefined, undefined, #{}, undefined, undefined, {<<>>, 0, 0}}
        end,

        %% MVCC check: bulk_docs without explicit `history` is treated
        %% as the regular update path and must supply the current winning
        %% _rev when overwriting an existing document. Bulk entries that
        %% carry `history` (replication) are exempt — DocRecord lacks the
        %% `hash` key in that case.
        case mvcc_check(OldRev, Revs, DocRecord) of
            ok -> ok;
            {error, conflict} -> throw(conflict)
        end,

        %% Build revision tree (merge with existing)
        RevTree = case length(Revs) of
            1 ->
                OldRevTree#{NewRev => #{id => NewRev, parent => undefined, deleted => Deleted}};
            _ ->
                [CurRev, ParentRev | _] = Revs,
                OldRevTree#{
                    ParentRev => #{id => ParentRev, parent => undefined, deleted => false},
                    CurRev => #{id => CurRev, parent => ParentRev, deleted => Deleted}
                }
        end,

        %% Generate new HLC timestamp
        NextHlc = barrel_hlc:new_hlc(),

        %% Prepare entity columns for wide-column storage (with tier metadata)
        {CreatedAt, ExpiresAt, Tier} = case OldHlc of
            undefined -> {barrel_hlc:encode(NextHlc), 0, 0};  %% New doc
            _ -> OldTierMeta  %% Preserve existing tier metadata
        end,
        DocColumns = [
            {?COL_REV, NewRev},
            {?COL_DELETED, deleted_to_bin(Deleted)},
            {?COL_HLC, barrel_hlc:encode(NextHlc)},
            {?COL_REVTREE, encode_revtree(RevTree)},
            {?COL_CREATED_AT, CreatedAt},
            {?COL_EXPIRES_AT, ExpiresAt},
            {?COL_TIER, Tier}
        ],
        DocOps = [{entity_put, DocEntityKey, DocColumns}],

        %% Build DocInfo for change tracking
        DocInfo = #{
            id => DocId,
            rev => NewRev,
            deleted => Deleted,
            revtree => RevTree,
            hlc => NextHlc
        },

        HlcDeleteOps = case OldHlc of
            undefined -> [];
            _ -> [{delete, barrel_store_keys:doc_hlc(DbName, OldHlc)}]
        end,

        PathIndexOps = case Deleted of
            true when OldDocBody =/= undefined ->
                case barrel_ars_index:get_doc_paths(StoreRef, DbName, DocId) of
                    {ok, OldPaths} -> barrel_ars_index:remove_doc_ops(DbName, DocId, OldPaths);
                    not_found -> []
                end;
            true -> [];
            false when OldDocBody =:= undefined ->
                barrel_ars_index:index_doc_ops(DbName, DocId, DocBody);
            false ->
                case was_tombstone(OldRev, OldRevTree) of
                    true ->
                        %% Re-creating a deleted document: delete removed the
                        %% path index but not the body CF, so index from scratch.
                        barrel_ars_index:index_doc_ops(DbName, DocId, DocBody);
                    false ->
                        barrel_ars_index:update_doc_ops(DbName, DocId, OldDocBody, DocBody)
                end
        end,

        ChangeInfo = DocInfo#{doc => DocBody},
        ChangeOps = barrel_changes:write_change_ops(DbName, NextHlc, ChangeInfo),

        PathHlcOps = case OldHlc of
            undefined ->
                barrel_changes:write_path_index_ops(DbName, NextHlc, ChangeInfo);
            _ ->
                barrel_changes:update_path_index_ops(DbName, NextHlc, ChangeInfo,
                                                      OldHlc, OldDocBody)
        end,

        CborBody = barrel_docdb_codec_cbor:encode_cbor(DocBody),

        %% Archive old body to revision-keyed location if updating
        ArchiveOps = case {OldRev, OldCborBody} of
            {undefined, _} -> [];
            {_, undefined} -> [];
            _ ->
                OldBodyKey = barrel_store_keys:doc_body_rev(DbName, DocId, OldRev),
                [{body_put, OldBodyKey, OldCborBody}]
        end,

        %% Write new body to current location (no revision in key)
        BodyKey = barrel_store_keys:doc_body(DbName, DocId),
        BodyOp = {body_put, BodyKey, CborBody},
        AllOps = DocOps ++ HlcDeleteOps ++ PathIndexOps ++ ChangeOps ++ PathHlcOps ++ ArchiveOps ++ [BodyOp],
        NotifyInfo = {DocId, NewRev, NextHlc, Deleted, DocBody},
        {ok, AllOps, NotifyInfo}
    catch
        _:Reason ->
            {error, Reason}
    end.

%% @doc Get a document by ID (using wide column entity)
do_get_doc(StoreRef, DbName, DocId, Opts) ->
    DocEntityKey = barrel_store_keys:doc_entity(DbName, DocId),
    case barrel_store_rocksdb:get_entity(StoreRef, DocEntityKey) of
        {ok, Columns} ->
            %% Get the deterministic winner from revtree (not just stored rev)
            RevTreeBin = proplists:get_value(?COL_REVTREE, Columns),
            Rev = case RevTreeBin of
                undefined ->
                    proplists:get_value(?COL_REV, Columns);
                <<>> ->
                    proplists:get_value(?COL_REV, Columns);
                _ ->
                    #{winner := Winner} = barrel_revtree_bin:decode_winner_leaves(RevTreeBin),
                    case Winner of
                        undefined -> proplists:get_value(?COL_REV, Columns);
                        _ -> Winner
                    end
            end,
            Deleted = bin_to_deleted(proplists:get_value(?COL_DELETED, Columns, <<"false">>)),
            IncludeDeleted = maps:get(include_deleted, Opts, false),

            case {Deleted, IncludeDeleted} of
                {true, false} ->
                    {error, not_found};
                _ ->
                    %% Get document body from body CF (current body, no rev in key)
                    BodyKey = barrel_store_keys:doc_body(DbName, DocId),
                    case barrel_store_rocksdb:body_get(StoreRef, BodyKey) of
                        {ok, CborBin} ->
                            %% Check if raw body requested (for zero-copy CBOR responses)
                            case maps:get(raw_body, Opts, false) of
                                true ->
                                    %% Return raw CBOR body with metadata for zero-copy
                                    Meta = #{
                                        id => DocId,
                                        rev => Rev,
                                        deleted => Deleted
                                    },
                                    Meta2 = maybe_add_conflicts_to_meta(Meta, Columns, Opts),
                                    {ok, CborBin, Meta2};
                                false ->
                                    %% Decode CBOR to get document body
                                    DocBody = barrel_docdb_codec_cbor:decode_any(CborBin),
                                    %% Add metadata
                                    Result = DocBody#{
                                        <<"id">> => DocId,
                                        <<"_rev">> => Rev
                                    },
                                    Result2 = case Deleted of
                                        true -> Result#{<<"_deleted">> => true};
                                        false -> Result
                                    end,
                                    %% Add conflicts if requested
                                    Result3 = maybe_add_conflicts(Result2, Columns, Opts),
                                    {ok, Result3}
                            end;
                        not_found ->
                            {error, not_found}
                    end
            end;
        not_found ->
            {error, not_found}
    end.

%% @doc Get multiple documents by ID (batch read - parallel entity + body fetch)
do_get_docs(StoreRef, DbName, DocIds, Opts) ->
    IncludeDeleted = maps:get(include_deleted, Opts, false),

    %% Build keys for both entity and body (no rev needed for body)
    DocEntityKeys = [barrel_store_keys:doc_entity(DbName, DocId) || DocId <- DocIds],
    BodyKeys = [barrel_store_keys:doc_body(DbName, DocId) || DocId <- DocIds],

    %% Batch fetch entities and bodies in parallel (same batch)
    DocEntityResults = barrel_store_rocksdb:multi_get_entity(StoreRef, DocEntityKeys),
    DocBodyResults = barrel_store_rocksdb:body_multi_get(StoreRef, BodyKeys),

    %% Combine results
    DocBodyMap = lists:foldl(
        fun({DocId, EntityResult, BodyResult}, Map) ->
            case {EntityResult, BodyResult} of
                {{ok, Columns}, {ok, CborBin}} ->
                    Rev = proplists:get_value(?COL_REV, Columns),
                    Deleted = bin_to_deleted(proplists:get_value(?COL_DELETED, Columns, <<"false">>)),
                    case {Deleted, IncludeDeleted} of
                        {true, false} ->
                            Map;  %% Skip deleted
                        _ ->
                            DocBody = barrel_docdb_codec_cbor:decode_any(CborBin),
                            Result = DocBody#{<<"id">> => DocId, <<"_rev">> => Rev},
                            Result2 = case Deleted of
                                true -> Result#{<<"_deleted">> => true};
                                false -> Result
                            end,
                            Map#{DocId => {ok, Result2}}
                    end;
                _ -> Map
            end
        end,
        #{},
        lists:zip3(DocIds, DocEntityResults, DocBodyResults)
    ),
    %% Return results in original order
    [maps:get(DocId, DocBodyMap, {error, not_found}) || DocId <- DocIds].

%% @doc Delete a document (using wide column entity)
%% Options:
%%   - rev: binary() - expected revision (optional, for conflict detection)
%%   - sync: boolean() - if true, sync to disk before returning (default: false)
do_delete_doc(StoreRef, DbName, DocId, Opts) ->
    %% Get current state (wide column entity)
    DocEntityKey = barrel_store_keys:doc_entity(DbName, DocId),
    case barrel_store_rocksdb:get_entity(StoreRef, DocEntityKey) of
        {ok, Columns} ->
            CurrentRev = proplists:get_value(?COL_REV, Columns),
            OldHlc = barrel_hlc:decode(proplists:get_value(?COL_HLC, Columns)),
            RevTree = decode_revtree(proplists:get_value(?COL_REVTREE, Columns)),

            %% Verify revision if provided
            ExpectedRev = maps:get(rev, Opts, undefined),
            case ExpectedRev of
                undefined -> ok;
                CurrentRev -> ok;
                _ -> throw({error, {conflict, CurrentRev}})
            end,

            %% Create delete revision
            {Gen, _Hash} = barrel_doc:parse_revision(CurrentRev),
            DeleteHash = barrel_doc:revision_hash(#{}, CurrentRev, true),
            NewRev = barrel_doc:make_revision(Gen + 1, DeleteHash),

            %% Generate new HLC
            NextHlc = barrel_hlc:new_hlc(),

            %% Update revision tree
            NewRevTree = RevTree#{NewRev => #{id => NewRev, parent => CurrentRev, deleted => true}},

            %% Prepare entity columns for wide-column storage
            DocColumns = [
                {?COL_REV, NewRev},
                {?COL_DELETED, <<"true">>},
                {?COL_HLC, barrel_hlc:encode(NextHlc)},
                {?COL_REVTREE, encode_revtree(NewRevTree)}
            ],
            DocOps = [{entity_put, DocEntityKey, DocColumns}],

            %% Delete old HLC index entry (HLC always exists for valid documents)
            HlcDeleteOps = [{delete, barrel_store_keys:doc_hlc(DbName, OldHlc)}],

            %% Path index removal operations
            PathIndexOps = case barrel_ars_index:get_doc_paths(StoreRef, DbName, DocId) of
                {ok, Paths} -> barrel_ars_index:remove_doc_ops(DbName, DocId, Paths);
                not_found -> []
            end,

            %% Build DocInfo for change tracking
            NewDocInfo = #{
                id => DocId,
                rev => NewRev,
                deleted => true,
                revtree => NewRevTree,
                hlc => NextHlc
            },

            %% Change entry operations
            ChangeOps = barrel_changes:write_change_ops(DbName, NextHlc, NewDocInfo),

            %% Path-indexed change operations for delete
            %% Get old doc body from body CF (current body, no rev in key)
            OldDocBody = case barrel_store_rocksdb:body_get(StoreRef,
                                barrel_store_keys:doc_body(DbName, DocId)) of
                {ok, OldCborBin} -> barrel_docdb_codec_cbor:decode_any(OldCborBin);
                not_found -> undefined
            end,
            PathHlcOps = barrel_changes:update_path_index_ops(DbName, NextHlc, NewDocInfo,
                                                               OldHlc, OldDocBody),

            %% Write batch atomically
            AllOps = DocOps ++ HlcDeleteOps ++ PathIndexOps ++ ChangeOps ++ PathHlcOps,
            WriteOpts = #{sync => maps:get(sync, Opts, false)},
            ok = barrel_store_rocksdb:write_batch(StoreRef, AllOps, WriteOpts),

            %% Notify path subscribers
            notify_subscribers(DbName, DocId, NewRev, NextHlc, true, #{}),

            {ok, #{<<"id">> => DocId, <<"ok">> => true, <<"rev">> => NewRev}};

        not_found ->
            {error, not_found}
    end.

%% @doc Fold over all documents (using wide column entity)
%% Options:
%%   - include_deleted: boolean() - include deleted documents (default: false)
%% Note: Uses regular key iteration since wide columns are stored per-key
do_fold_docs(StoreRef, DbName, Fun, Acc, Opts) ->
    StartKey = barrel_store_keys:doc_entity_prefix(DbName),
    EndKey = barrel_store_keys:doc_entity_end(DbName),
    PrefixLen = byte_size(StartKey),
    IncludeDeleted = maps:get(include_deleted, Opts, false),

    FoldFun = fun(Key, _Value, AccIn) ->
        %% Extract DocId from key (after prefix)
        DocId = binary:part(Key, PrefixLen, byte_size(Key) - PrefixLen),
        %% Get entity to access columns
        case barrel_store_rocksdb:get_entity(StoreRef, Key) of
            {ok, Columns} ->
                Rev = proplists:get_value(?COL_REV, Columns),
                Deleted = bin_to_deleted(proplists:get_value(?COL_DELETED, Columns, <<"false">>)),
                case {Deleted, IncludeDeleted} of
                    {true, false} ->
                        %% Skip deleted documents
                        {ok, AccIn};
                    _ ->
                        %% Get document body from body CF (current body, no rev in key)
                        BodyKey = barrel_store_keys:doc_body(DbName, DocId),
                        DocBody = case barrel_store_rocksdb:body_get(StoreRef, BodyKey) of
                            {ok, CborBin} -> barrel_docdb_codec_cbor:decode_any(CborBin);
                            not_found -> #{}
                        end,
                        Doc = DocBody#{
                            <<"id">> => DocId,
                            <<"_rev">> => Rev
                        },
                        case Fun(Doc, AccIn) of
                            {ok, AccOut} -> {ok, AccOut};
                            {stop, AccOut} -> {stop, AccOut};
                            stop -> {stop, AccIn}
                        end
                end;
            not_found ->
                {ok, AccIn}
        end
    end,

    FinalAcc = barrel_store_rocksdb:fold_range(StoreRef, StartKey, EndKey, FoldFun, Acc),
    {ok, FinalAcc}.

%%====================================================================
%% Replication Operations
%%====================================================================

%% @doc Put a document with explicit revision history (for replication, using wide column entity)
do_put_rev(StoreRef, DbName, Doc, History, Deleted) ->
    DocId = maps:get(<<"id">>, Doc),
    DocBody = barrel_doc:doc_without_meta(Doc),
    [NewRev | _] = History,

    %% Check for existing document (wide column entity)
    DocEntityKey = barrel_store_keys:doc_entity(DbName, DocId),
    {ExistingRevTree, OldRev, OldHlc, OldDocBody, OldCborBody} = case barrel_store_rocksdb:get_entity(StoreRef, DocEntityKey) of
        {ok, Columns} ->
            ExistingRev = proplists:get_value(?COL_REV, Columns),
            ExistingHlc = barrel_hlc:decode(proplists:get_value(?COL_HLC, Columns)),
            OldTree = decode_revtree(proplists:get_value(?COL_REVTREE, Columns)),
            %% Get old doc body from body CF (current body, no rev in key)
            {OldBody, OldCbor} = case barrel_store_rocksdb:body_get(StoreRef,
                            barrel_store_keys:doc_body(DbName, DocId)) of
                {ok, OldCborBin} -> {barrel_docdb_codec_cbor:decode_any(OldCborBin), OldCborBin};
                not_found -> {#{}, undefined}
            end,
            {OldTree, ExistingRev, ExistingHlc, OldBody, OldCbor};
        not_found ->
            {#{}, undefined, undefined, undefined, undefined}
    end,
    OldRevTree = ExistingRevTree,

    %% Build revision tree from history
    NewRevTree = build_revtree_from_history(History, Deleted, ExistingRevTree),

    %% Generate new HLC timestamp for this change
    NextHlc = barrel_hlc:new_hlc(),

    %% Prepare entity columns for wide-column storage
    DocColumns = [
        {?COL_REV, NewRev},
        {?COL_DELETED, deleted_to_bin(Deleted)},
        {?COL_HLC, barrel_hlc:encode(NextHlc)},
        {?COL_REVTREE, encode_revtree(NewRevTree)}
    ],
    DocOps = [{entity_put, DocEntityKey, DocColumns}],

    %% Build DocInfo for change tracking
    DocInfo = #{
        id => DocId,
        rev => NewRev,
        deleted => Deleted,
        revtree => NewRevTree,
        hlc => NextHlc
    },

    %% Delete old HLC entry if exists
    HlcDeleteOps = case OldHlc of
        undefined -> [];
        _ -> [{delete, barrel_store_keys:doc_hlc(DbName, OldHlc)}]
    end,

    %% Path index operations
    PathIndexOps = case Deleted of
        true when OldDocBody =/= undefined ->
            %% Deleting a document - remove all paths
            case barrel_ars_index:get_doc_paths(StoreRef, DbName, DocId) of
                {ok, OldPaths} -> barrel_ars_index:remove_doc_ops(DbName, DocId, OldPaths);
                not_found -> []
            end;
        true ->
            %% New doc being created as deleted - no paths to index
            [];
        false when OldDocBody =:= undefined ->
            %% New document - index all paths
            barrel_ars_index:index_doc_ops(DbName, DocId, DocBody);
        false ->
            case was_tombstone(OldRev, OldRevTree) of
                true ->
                    %% Re-creating a deleted document. Delete only removed the
                    %% path index, not the body CF, so a body diff would see no
                    %% change and never re-add the paths. Index from scratch.
                    barrel_ars_index:index_doc_ops(DbName, DocId, DocBody);
                false ->
                    %% Update document - compute diff and update paths
                    barrel_ars_index:update_doc_ops(DbName, DocId, OldDocBody, DocBody)
            end
    end,

    %% Change entry operations
    ChangeInfo = DocInfo#{doc => DocBody},
    ChangeOps = barrel_changes:write_change_ops(DbName, NextHlc, ChangeInfo),

    %% Path-indexed change operations (for efficient filtered queries)
    PathHlcOps = case OldHlc of
        undefined ->
            %% New document - create path index entries
            barrel_changes:write_path_index_ops(DbName, NextHlc, ChangeInfo);
        _ ->
            %% Update - remove old path entries, add new ones
            barrel_changes:update_path_index_ops(DbName, NextHlc, ChangeInfo,
                                                  OldHlc, OldDocBody)
    end,

    %% Write batch atomically (metadata + indexes + body)
    CborBody = barrel_docdb_codec_cbor:encode_cbor(DocBody),

    %% Archive old body to revision-keyed location if updating
    ArchiveOps = case {OldRev, OldCborBody} of
        {undefined, _} -> [];
        {_, undefined} -> [];
        _ ->
            OldBodyKey = barrel_store_keys:doc_body_rev(DbName, DocId, OldRev),
            [{body_put, OldBodyKey, OldCborBody}]
    end,

    %% Write new body to current location (no revision in key)
    BodyKey = barrel_store_keys:doc_body(DbName, DocId),
    BodyOp = {body_put, BodyKey, CborBody},
    AllOps = DocOps ++ HlcDeleteOps ++ PathIndexOps ++ ChangeOps ++ PathHlcOps ++ ArchiveOps ++ [BodyOp],
    ok = barrel_store_rocksdb:write_batch(StoreRef, AllOps),

    %% Notify path subscribers
    notify_subscribers(DbName, DocId, NewRev, NextHlc, Deleted, DocBody),

    {ok, DocId, NewRev}.

%% @doc Build revision tree from history
build_revtree_from_history(History, Deleted, ExistingTree) ->
    build_revtree_from_history(lists:reverse(History), Deleted, ExistingTree, undefined).

build_revtree_from_history([], _Deleted, Tree, _Parent) ->
    Tree;
build_revtree_from_history([Rev], Deleted, Tree, Parent) ->
    %% Last revision (the newest one)
    Tree#{Rev => #{id => Rev, parent => Parent, deleted => Deleted}};
build_revtree_from_history([Rev | Rest], Deleted, Tree, Parent) ->
    %% Intermediate revisions (not deleted)
    NewTree = Tree#{Rev => #{id => Rev, parent => Parent, deleted => false}},
    build_revtree_from_history(Rest, Deleted, NewTree, Rev).

%% @doc Get revisions difference (using wide column entity)
do_revsdiff(StoreRef, DbName, DocId, RevIds) ->
    DocEntityKey = barrel_store_keys:doc_entity(DbName, DocId),
    case barrel_store_rocksdb:get_entity(StoreRef, DocEntityKey) of
        {ok, Columns} ->
            RevTree = decode_revtree(proplists:get_value(?COL_REVTREE, Columns)),

            %% Find missing revisions and possible ancestors
            {Missing, PossibleAncestors} = lists:foldl(
                fun(RevId, {M, A} = Acc) ->
                    case maps:is_key(RevId, RevTree) of
                        true ->
                            %% Revision exists, not missing
                            Acc;
                        false ->
                            %% Revision is missing
                            M2 = [RevId | M],
                            %% Find possible ancestors in our tree
                            {Gen, _} = barrel_doc:parse_revision(RevId),
                            A2 = maps:fold(
                                fun(LocalRev, _RevInfo, AccA) ->
                                    {LocalGen, _} = barrel_doc:parse_revision(LocalRev),
                                    case LocalGen < Gen of
                                        true -> [LocalRev | AccA];
                                        false -> AccA
                                    end
                                end,
                                A,
                                RevTree
                            ),
                            {M2, A2}
                    end
                end,
                {[], []},
                RevIds
            ),
            {ok, lists:reverse(Missing), lists:usort(PossibleAncestors)};

        not_found ->
            %% Document doesn't exist - all revisions are missing
            {ok, RevIds, []}
    end.

%% @doc Get revisions difference for multiple documents (batch)
%% Returns {ok, ResultMap} where ResultMap is DocId => #{missing => [...], possible_ancestors => [...]}
do_revsdiff_batch(StoreRef, DbName, RevsMap) ->
    Result = maps:fold(
        fun(DocId, RevIds, Acc) ->
            {ok, Missing, PossibleAncestors} =  do_revsdiff(StoreRef, DbName, DocId, RevIds),
            maps:put(DocId, #{
                    missing => Missing,
                    possible_ancestors => PossibleAncestors
            }, Acc)
        end,
        #{},
        RevsMap
    ),
    {ok, Result}.

%%====================================================================
%% Local Document Operations
%%====================================================================
%% Local documents are stored in the dedicated local_cf column family.
%% They use a simple key format: DbName + 0 + DocId (no prefix needed).

%% @doc Put a local document (per-database)
do_put_local_doc(StoreRef, DbName, DocId, Doc) ->
    Key = barrel_store_keys:local_doc_key(DbName, DocId),
    Value = term_to_binary(Doc),
    ok = barrel_store_rocksdb:local_put(StoreRef, Key, Value),
    ok.

%% @doc Get a local document (per-database)
do_get_local_doc(StoreRef, DbName, DocId) ->
    Key = barrel_store_keys:local_doc_key(DbName, DocId),
    case barrel_store_rocksdb:local_get(StoreRef, Key) of
        {ok, Value} ->
            {ok, binary_to_term(Value)};
        not_found ->
            {error, not_found}
    end.

%% @doc Delete a local document (per-database)
do_delete_local_doc(StoreRef, DbName, DocId) ->
    Key = barrel_store_keys:local_doc_key(DbName, DocId),
    case barrel_store_rocksdb:local_get(StoreRef, Key) of
        {ok, _} ->
            ok = barrel_store_rocksdb:local_delete(StoreRef, Key),
            ok;
        not_found ->
            {error, not_found}
    end.

%% @doc Fold over local documents matching a prefix
%% The prefix is applied to the DocId portion (after DbName).
do_fold_local_docs(StoreRef, DbName, DocIdPrefix, Fun, Acc0) ->
    %% Build full key prefix: DbName + 0 + DocIdPrefix
    KeyPrefix = barrel_store_keys:local_doc_key(DbName, DocIdPrefix),
    %% Fold over matching keys, decode values, extract DocId
    DbNameLen = byte_size(DbName),
    Result = barrel_store_rocksdb:local_fold(
        StoreRef,
        KeyPrefix,
        fun(Key, Value, Acc) ->
            %% Extract DocId from key (skip DbName + 0)
            <<_:DbNameLen/binary, 0, DocId/binary>> = Key,
            Doc = binary_to_term(Value),
            Fun(DocId, Doc, Acc)
        end,
        Acc0
    ),
    {ok, Result}.

%%====================================================================
%% Subscription Notifications
%%====================================================================

%% @doc Notify subscribers of document changes
%% Extracts paths from document, matches against subscriptions,
%% and sends notifications to matching subscribers.
notify_subscribers(DbName, DocId, Rev, Hlc, Deleted, DocBody) ->
    %% Extract paths from document body
    Topics = case Deleted of
        true ->
            %% For deleted docs, just use the doc ID as a path
            [DocId];
        false ->
            Paths = barrel_ars:analyze(DocBody),
            barrel_ars:paths_to_topics(Paths)
    end,

    %% Find matching path subscribers
    Pids = barrel_sub:match(DbName, Topics),

    %% Build notification
    Notification = {barrel_change, DbName, #{
        id => DocId,
        rev => Rev,
        hlc => Hlc,
        deleted => Deleted,
        paths => Topics
    }},

    %% Send to each path subscriber
    _ = [Pid ! Notification || Pid <- Pids],

    %% Notify query subscribers (only for non-deleted docs)
    case Deleted of
        true ->
            ok;
        false ->
            barrel_query_sub:notify_change(DbName, DocId, Rev, DocBody)
    end,
    ok.

%%====================================================================
%% Wide Column Helpers
%%====================================================================

%% @doc Convert boolean to binary for wide column storage
-spec deleted_to_bin(boolean()) -> binary().
deleted_to_bin(true) -> <<"true">>;
deleted_to_bin(false) -> <<"false">>.

%% @doc Whether the prior winning revision was a tombstone (deleted). Used
%% on the live put path to decide between a body diff and a full re-index
%% when re-creating a previously deleted document.
-spec was_tombstone(binary() | undefined, map()) -> boolean().
was_tombstone(undefined, _RevTree) -> false;
was_tombstone(OldRev, RevTree) ->
    case maps:find(OldRev, RevTree) of
        {ok, #{deleted := Deleted}} -> Deleted =:= true;
        _ -> false
    end.

%% @doc Convert binary back to boolean from wide column storage
-spec bin_to_deleted(binary()) -> boolean().
bin_to_deleted(<<"true">>) -> true;
bin_to_deleted(<<"false">>) -> false;
bin_to_deleted(_) -> false.

%% @doc Encode revision tree to compact binary format
-spec encode_revtree(map()) -> binary().
encode_revtree(RevTree) when is_map(RevTree) ->
    RT = barrel_revtree_bin:from_map(RevTree),
    barrel_revtree_bin:encode(RT).

%% @doc Decode revision tree from compact binary format
-spec decode_revtree(binary() | undefined) -> map().
decode_revtree(undefined) -> #{};
decode_revtree(<<>>) -> #{};
decode_revtree(Bin) ->
    RT = barrel_revtree_bin:decode(Bin),
    barrel_revtree_bin:to_map(RT).

%% @doc Add conflicts to document if requested and conflicts exist
maybe_add_conflicts(Doc, Columns, Opts) ->
    case maps:get(conflicts, Opts, false) of
        true ->
            RevTreeBin = proplists:get_value(?COL_REVTREE, Columns),
            case RevTreeBin of
                undefined -> Doc;
                <<>> -> Doc;
                _ ->
                    %% Use fast path to get conflicts directly from binary
                    #{conflicts := Conflicts} = barrel_revtree_bin:decode_winner_leaves(RevTreeBin),
                    case Conflicts of
                        [] -> Doc;
                        _ -> Doc#{<<"_conflicts">> => Conflicts}
                    end
            end;
        false ->
            Doc
    end.

%% @doc Add conflicts to metadata map (for raw_body responses)
maybe_add_conflicts_to_meta(Meta, Columns, Opts) ->
    case maps:get(conflicts, Opts, false) of
        true ->
            RevTreeBin = proplists:get_value(?COL_REVTREE, Columns),
            case RevTreeBin of
                undefined -> Meta;
                <<>> -> Meta;
                _ ->
                    #{conflicts := Conflicts} = barrel_revtree_bin:decode_winner_leaves(RevTreeBin),
                    case Conflicts of
                        [] -> Meta;
                        _ -> Meta#{conflicts => Conflicts}
                    end
            end;
        false ->
            Meta
    end.

%%====================================================================
%% Conflict Operations
%%====================================================================

%% @doc Get list of conflicting revisions for a document
do_get_conflicts(StoreRef, DbName, DocId) ->
    DocEntityKey = barrel_store_keys:doc_entity(DbName, DocId),
    case barrel_store_rocksdb:get_entity(StoreRef, DocEntityKey) of
        {ok, Columns} ->
            RevTreeBin = proplists:get_value(?COL_REVTREE, Columns),
            case RevTreeBin of
                undefined -> {ok, []};
                <<>> -> {ok, []};
                _ ->
                    #{conflicts := Conflicts} = barrel_revtree_bin:decode_winner_leaves(RevTreeBin),
                    {ok, Conflicts}
            end;
        not_found ->
            {error, not_found}
    end.

%% @doc Resolve a conflict
%% Resolution types:
%%   {choose, Rev} - Mark the chosen rev as winner, delete all other leaf branches
%%   {merge, Doc} - Create a new revision that supersedes all conflicting branches
do_resolve_conflict(StoreRef, DbName, DocId, BaseRev, Resolution) ->
    DocEntityKey = barrel_store_keys:doc_entity(DbName, DocId),
    case barrel_store_rocksdb:get_entity(StoreRef, DocEntityKey) of
        {ok, Columns} ->
            CurrentRev = proplists:get_value(?COL_REV, Columns),
            RevTreeBin = proplists:get_value(?COL_REVTREE, Columns),
            case RevTreeBin of
                undefined ->
                    {error, no_conflicts};
                <<>> ->
                    {error, no_conflicts};
                _ ->
                    RevTree = decode_revtree(RevTreeBin),
                    #{winner := Winner, conflicts := Conflicts} =
                        barrel_revtree_bin:decode_winner_leaves(RevTreeBin),
                    case Conflicts of
                        [] ->
                            {error, no_conflicts};
                        _ ->
                            %% Verify base rev matches current winner
                            case BaseRev =:= Winner of
                                false ->
                                    {error, {conflict, CurrentRev}};
                                true ->
                                    do_apply_resolution(StoreRef, DbName, DocId,
                                                       DocEntityKey, Columns,
                                                       RevTree, Winner, Conflicts,
                                                       Resolution)
                            end
                    end
            end;
        not_found ->
            {error, not_found}
    end.

%% Apply the resolution to the document
do_apply_resolution(StoreRef, DbName, DocId, DocEntityKey, Columns,
                    RevTree, Winner, Conflicts, {choose, ChosenRev}) ->
    %% Verify chosen rev is either the winner or one of the conflicts
    AllLeaves = [Winner | Conflicts],
    case lists:member(ChosenRev, AllLeaves) of
        false ->
            {error, {invalid_rev, ChosenRev}};
        true ->
            %% Delete all other branches by marking them as deleted
            RevsToDelete = [R || R <- AllLeaves, R =/= ChosenRev],
            NewRevTree0 = lists:foldl(
                fun(Rev, Tree) ->
                    case maps:get(Rev, Tree, undefined) of
                        undefined -> Tree;
                        Info -> Tree#{Rev => Info#{deleted => true}}
                    end
                end,
                RevTree,
                RevsToDelete
            ),
            %% Delete bodies of resolved conflicting revisions
            DeleteOps = [
                {body_delete, barrel_store_keys:doc_body_rev(DbName, DocId, Rev)}
                || Rev <- RevsToDelete
            ],
            %% If chosen rev is not the current winner, we need to create a new
            %% revision as child of chosen (to make it the deterministic winner)
            case ChosenRev =:= Winner of
                true ->
                    %% Just update the revtree + delete loser bodies
                    UpdatedColumns = lists:keyreplace(?COL_REVTREE, 1, Columns,
                                                      {?COL_REVTREE, encode_revtree(NewRevTree0)}),
                    AllOps = [{entity_put, DocEntityKey, UpdatedColumns} | DeleteOps],
                    case barrel_store_rocksdb:write_batch(StoreRef, AllOps) of
                        ok ->
                            {ok, #{id => DocId, rev => Winner, conflicts_resolved => length(RevsToDelete)}};
                        {error, _} = Err ->
                            Err
                    end;
                false ->
                    %% Create a new rev as child of chosen to make it the winner
                    ChosenGen = case binary:split(ChosenRev, <<"-">>) of
                        [GenBin, _] -> binary_to_integer(GenBin);
                        _ -> 1
                    end,
                    NewGen = ChosenGen + 1,
                    %% Generate a hash that will be higher than any competing branch
                    Hash = crypto:hash(md5, term_to_binary({DocId, ChosenRev, erlang:system_time()})),
                    HashHex = string:lowercase(binary:encode_hex(Hash)),
                    NewRev = <<(integer_to_binary(NewGen))/binary, "-", HashHex/binary>>,

                    %% Add new rev as child of chosen
                    NewRevTree = NewRevTree0#{
                        NewRev => #{id => NewRev, parent => ChosenRev, deleted => false}
                    },

                    UpdatedColumns = lists:keyreplace(?COL_REVTREE, 1, Columns,
                                                      {?COL_REVTREE, encode_revtree(NewRevTree)}),
                    UpdatedColumns2 = lists:keyreplace(?COL_REV, 1, UpdatedColumns,
                                                       {?COL_REV, NewRev}),
                    AllOps = [{entity_put, DocEntityKey, UpdatedColumns2} | DeleteOps],
                    case barrel_store_rocksdb:write_batch(StoreRef, AllOps) of
                        ok ->
                            {ok, #{id => DocId, rev => NewRev, conflicts_resolved => length(RevsToDelete)}};
                        {error, _} = Err ->
                            Err
                    end
            end
    end;

do_apply_resolution(StoreRef, DbName, DocId, DocEntityKey, Columns,
                    RevTree, Winner, Conflicts, {merge, MergedDoc}) ->
    %% Create a new revision that has all conflicting branches as parents
    %% This effectively merges all branches into one
    AllLeaves = [Winner | Conflicts],

    %% Generate new revision - use winner's generation + 1
    WinnerGen = case binary:split(Winner, <<"-">>) of
        [GenBin, _] -> binary_to_integer(GenBin);
        _ -> 1
    end,
    NewGen = WinnerGen + 1,

    %% Create new rev hash from merged content
    DocWithoutMeta = maps:without([<<"id">>, <<"_id">>, <<"_rev">>, <<"_deleted">>], MergedDoc),
    Hash = crypto:hash(md5, term_to_binary({DocWithoutMeta, AllLeaves})),
    HashHex = string:lowercase(binary:encode_hex(Hash)),
    NewRev = <<(integer_to_binary(NewGen))/binary, "-", HashHex/binary>>,

    %% Build new revtree: add new rev as child of winner, mark all conflicts as deleted
    NewRevTree0 = lists:foldl(
        fun(Rev, Tree) ->
            case maps:get(Rev, Tree, undefined) of
                undefined -> Tree;
                Info -> Tree#{Rev => Info#{deleted => true}}
            end
        end,
        RevTree,
        Conflicts
    ),
    %% Add the merged revision as child of winner
    NewRevTree = NewRevTree0#{
        NewRev => #{id => NewRev, parent => Winner, deleted => false}
    },

    %% Generate new HLC timestamp
    NextHlc = barrel_hlc:new_hlc(),

    %% Preserve existing tier metadata
    CreatedAt = proplists:get_value(?COL_CREATED_AT, Columns, barrel_hlc:encode(NextHlc)),
    ExistingTier = proplists:get_value(?COL_TIER, Columns, 0),
    ExistingExpires = proplists:get_value(?COL_EXPIRES_AT, Columns, 0),

    %% Update entity columns with new rev and revtree
    NewColumns = [
        {?COL_REV, NewRev},
        {?COL_DELETED, <<"false">>},
        {?COL_HLC, barrel_hlc:encode(NextHlc)},
        {?COL_REVTREE, encode_revtree(NewRevTree)},
        {?COL_CREATED_AT, CreatedAt},
        {?COL_EXPIRES_AT, ExistingExpires},
        {?COL_TIER, ExistingTier}
    ],

    %% Encode the merged document body
    CborBin = barrel_docdb_codec_cbor:encode(DocWithoutMeta),
    BodyKey = barrel_store_keys:doc_body(DbName, DocId),

    %% Delete bodies of resolved conflict revisions
    DeleteOps = [
        {body_delete, barrel_store_keys:doc_body_rev(DbName, DocId, Rev)}
        || Rev <- Conflicts
    ],

    %% Build batch operations
    EntityOp = {entity_put, DocEntityKey, NewColumns},
    BodyOp = {body_put, BodyKey, CborBin},

    %% Write batch (entity + new body + delete conflict bodies)
    case barrel_store_rocksdb:write_batch(StoreRef, [EntityOp, BodyOp | DeleteOps]) of
        ok ->
            {ok, #{id => DocId, rev => NewRev, conflicts_resolved => length(Conflicts)}};
        {error, _} = Err ->
            Err
    end.