Skip to main content

src/barrel_compaction_filter.erl

%%%-------------------------------------------------------------------
%%% @doc RocksDB compaction filter for revision tree pruning
%%%
%%% This module implements revision pruning during RocksDB compaction.
%%% Each database has its own compaction filter handler that:
%%%
%%% 1. Processes doc_entity keys during compaction
%%% 2. Decodes the revtree from entity columns
%%% 3. Prunes old revisions based on depth
%%% 4. Deletes body entries for pruned revisions
%%% 5. Returns updated entity with pruned revtree
%%%
%%% The filter is configured on the default column family where
%%% document entities are stored. Body deletions are issued via
%%% normal write_batch and cleaned up by RocksDB in subsequent
%%% compaction cycles.
%%%
%%% @end
%%%-------------------------------------------------------------------
-module(barrel_compaction_filter).

-behaviour(gen_server).

%% API
-export([start_link/1]).
-export([get_stats/1, reset_stats/1]).
-export([set_prune_depth/2, get_prune_depth/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(DEFAULT_PRUNE_DEPTH, 1000).

-record(state, {
    db_name :: binary(),
    prune_depth :: non_neg_integer(),
    %% Stats
    filter_calls = 0 :: non_neg_integer(),
    entities_processed = 0 :: non_neg_integer(),
    revisions_pruned = 0 :: non_neg_integer()
}).

%%====================================================================
%% API
%%====================================================================

%% @doc Start a compaction filter handler for a database
%% Options:
%%   - db_name: The database name (required)
%%   - prune_depth: Max revisions to keep per branch (default: 1000)
%%
%% Note: The db_ref is looked up from persistent_term at runtime since
%% the filter handler must be started before RocksDB is opened (to pass
%% the handler pid to the CF options).
-spec start_link(map()) -> {ok, pid()} | {error, term()}.
start_link(Opts) ->
    gen_server:start_link(?MODULE, Opts, []).

%% @doc Get filter statistics for debugging
-spec get_stats(pid()) -> map().
get_stats(Pid) ->
    gen_server:call(Pid, get_stats).

%% @doc Reset filter statistics
-spec reset_stats(pid()) -> ok.
reset_stats(Pid) ->
    gen_server:call(Pid, reset_stats).

%% @doc Set the prune depth
-spec set_prune_depth(pid(), non_neg_integer()) -> ok.
set_prune_depth(Pid, Depth) ->
    gen_server:call(Pid, {set_prune_depth, Depth}).

%% @doc Get the current prune depth
-spec get_prune_depth(pid()) -> non_neg_integer().
get_prune_depth(Pid) ->
    gen_server:call(Pid, get_prune_depth).

%%====================================================================
%% gen_server callbacks
%%====================================================================

init(Opts) ->
    DbName = maps:get(db_name, Opts),
    PruneDepth = maps:get(prune_depth, Opts, ?DEFAULT_PRUNE_DEPTH),
    {ok, #state{
        db_name = DbName,
        prune_depth = PruneDepth
    }}.

handle_call(get_stats, _From, State) ->
    #state{filter_calls = Calls,
           entities_processed = Entities,
           revisions_pruned = Pruned} = State,
    Stats = #{
        filter_calls => Calls,
        entities_processed => Entities,
        revisions_pruned => Pruned
    },
    {reply, Stats, State};

handle_call(reset_stats, _From, State) ->
    {reply, ok, State#state{
        filter_calls = 0,
        entities_processed = 0,
        revisions_pruned = 0
    }};

handle_call({set_prune_depth, Depth}, _From, State) ->
    {reply, ok, State#state{prune_depth = Depth}};

handle_call(get_prune_depth, _From, #state{prune_depth = Depth} = State) ->
    {reply, Depth, State};

handle_call(_Request, _From, State) ->
    {reply, {error, unknown_request}, State}.

handle_cast(_Request, State) ->
    {noreply, State}.

%% @doc Handle compaction filter batch requests from RocksDB
%% Message format: {compaction_filter, BatchRef, Keys}
%% where Keys is a list of {Level, Key, Value} tuples
handle_info({compaction_filter, BatchRef, Keys}, State) ->
    {Decisions, NewState} = filter_batch(Keys, State),
    rocksdb:compaction_filter_reply(BatchRef, Decisions),
    {noreply, NewState#state{filter_calls = State#state.filter_calls + 1}};

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

%%====================================================================
%% Internal functions
%%====================================================================

%% @private Filter a batch of keys from compaction
filter_batch(Keys, State) ->
    {Decisions, FinalState} = lists:foldl(
        fun({_Level, Key, Value}, {DecAcc, SAcc}) ->
            {Decision, NewSAcc} = filter_key(Key, Value, SAcc),
            {[Decision | DecAcc], NewSAcc}
        end,
        {[], State},
        Keys
    ),
    {lists:reverse(Decisions), FinalState}.

%% @private Filter a single key-value pair
%% Returns: {keep | remove | {change_value, NewBinary}, State}
filter_key(Key, Value, State) ->
    #state{db_name = DbName} = State,
    case barrel_store_keys:parse_key(Key) of
        {doc_entity, DbName, DocId} ->
            %% This is a document entity for our database - process it
            process_doc_entity(DocId, Value, State);
        {doc_entity, _OtherDb, _DocId} ->
            %% Different database, keep as-is
            {keep, State};
        _ ->
            %% Not a doc_entity key, keep as-is
            {keep, State}
    end.

%% @private Process a document entity, pruning old revisions
process_doc_entity(DocId, EntityBin, State) ->
    #state{db_name = DbName, prune_depth = Depth} = State,

    %% Look up db_ref from persistent_term (registered by barrel_db_server)
    case persistent_term:get({barrel_store, DbName}, undefined) of
        undefined ->
            %% Store not yet registered, skip pruning
            {keep, State};
        DbRef ->
            do_process_doc_entity(DbRef, DbName, DocId, EntityBin, Depth, State)
    end.

%% @private Actually process the document entity
do_process_doc_entity(DbRef, DbName, DocId, EntityBin, Depth, State) ->
    %% Decode entity columns
    Columns = barrel_store_rocksdb:decode_entity(EntityBin),

    %% Find revtree column
    case find_column(<<"revtree">>, Columns) of
        {ok, RevTreeBin} ->
            %% Decode revtree
            RevTree = barrel_revtree_bin:decode(RevTreeBin),

            %% Prune revtree
            {PrunedRT, PrunedRevIds} = barrel_revtree_bin:prune(RevTree, Depth),

            case PrunedRevIds of
                [] ->
                    %% Nothing to prune
                    NewState = State#state{
                        entities_processed = State#state.entities_processed + 1
                    },
                    {keep, NewState};
                _ ->
                    %% Delete body entries for pruned revisions
                    DeleteOps = [{body_delete,
                        barrel_store_keys:doc_body_rev(DbName, DocId, Rev)}
                        || Rev <- PrunedRevIds],
                    ok = barrel_store_rocksdb:write_batch(DbRef, DeleteOps),

                    %% Encode updated entity
                    NewRevTreeBin = barrel_revtree_bin:encode(PrunedRT),
                    UpdatedColumns = update_column(<<"revtree">>, NewRevTreeBin, Columns),
                    UpdatedEntityBin = barrel_store_rocksdb:encode_entity(UpdatedColumns),

                    %% Update stats
                    NewState = State#state{
                        entities_processed = State#state.entities_processed + 1,
                        revisions_pruned = State#state.revisions_pruned + length(PrunedRevIds)
                    },
                    {{change_value, UpdatedEntityBin}, NewState}
            end;
        not_found ->
            %% No revtree column, keep as-is
            {keep, State}
    end.

%% @private Find a column by name
find_column(Name, Columns) ->
    case lists:keyfind(Name, 1, Columns) of
        {Name, Value} -> {ok, Value};
        false -> not_found
    end.

%% @private Update a column value
update_column(Name, NewValue, Columns) ->
    lists:keyreplace(Name, 1, Columns, {Name, NewValue}).