%%%-------------------------------------------------------------------
%%% @doc Changes feed API for barrel_docdb
%%%
%%% Provides functions to track and query document changes in a
%%% database. Changes are ordered by HLC (Hybrid Logical Clock)
%%% timestamps for distributed ordering.
%%% @end
%%%-------------------------------------------------------------------
-module(barrel_changes).
-include("barrel_docdb.hrl").
%% API
-export([
fold_changes/5,
fold_changes_compact/5,
get_changes/4,
get_changes_chunked/4, %% Returns continuation token for pagination
get_last_seq/2, %% Returns opaque sequence (encoded HLC binary)
get_last_hlc/2, %% Returns decoded HLC timestamp
count_changes_since/3,
has_changes_since/3 %% Fast O(1) check using buckets
]).
%% Internal - for use by barrel_db_writer
-export([
write_change/4,
write_change_ops/3,
update_change_bucket_ops/3,
delete_old_change/4
]).
%% Path-indexed changes API
-export([
write_path_index_ops/3,
update_path_index_ops/5,
get_changes_by_path/5
]).
%%====================================================================
%% Types
%%====================================================================
-type changes_result() :: #{
changes := [change()],
last_hlc := barrel_hlc:timestamp(),
pending := non_neg_integer()
}.
-type fold_fun() :: fun((change(), Acc :: term()) ->
{ok, Acc :: term()} | {stop, Acc :: term()} | stop).
-type compact_change() :: {docid(), barrel_hlc:timestamp(), binary(), boolean(), non_neg_integer()}.
-type compact_fold_fun() :: fun((compact_change(), Acc :: term()) ->
{ok, Acc :: term()} | {stop, Acc :: term()} | stop).
-type changes_opts() :: #{
include_docs => boolean(),
limit => non_neg_integer(),
descending => boolean(),
style => main_only | all_docs,
doc_ids => [docid()],
paths => [binary()], % MQTT-style path patterns to filter by
query => barrel_query:query_spec() % Query to filter by
}.
-type continuation_info() :: #{
last_hlc := barrel_hlc:timestamp(),
has_more := boolean(),
continuation => binary() %% Only present when has_more=true
}.
-export_type([changes_result/0, fold_fun/0, compact_change/0, compact_fold_fun/0,
changes_opts/0, continuation_info/0]).
%%====================================================================
%% API
%%====================================================================
%% @doc Fold over changes since a given HLC timestamp (exclusive)
%% Changes returned are strictly after the given HLC.
%% Use 'first' to get all changes from the beginning.
-spec fold_changes(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp() | first, fold_fun(), term()) ->
{ok, term(), barrel_hlc:timestamp()}.
fold_changes(StoreRef, DbName, Since, Fun, Acc) ->
fold_changes_internal(StoreRef, DbName, Since, Fun, Acc, normal).
%% @doc Fold over changes optimized for full sequential scans.
%% Uses readahead and avoids cache pollution for large scans.
%% Best for scanning all changes from the beginning without limit.
-spec fold_changes_long_scan(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp() | first, fold_fun(), term()) ->
{ok, term(), barrel_hlc:timestamp()}.
fold_changes_long_scan(StoreRef, DbName, Since, Fun, Acc) ->
fold_changes_internal(StoreRef, DbName, Since, Fun, Acc, long_scan).
%% @doc Fold over changes with compact tuple format for efficiency.
%% Callback receives {DocId, Hlc, Rev, Deleted, NumConflicts} tuples.
%% Use for internal iteration; convert to maps at API boundary.
%% Always uses long_scan mode for optimal sequential read performance.
-spec fold_changes_compact(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp() | first, compact_fold_fun(), term()) ->
{ok, term(), barrel_hlc:timestamp()}.
fold_changes_compact(StoreRef, DbName, Since, Fun, Acc) ->
{StartHlc, StartKey} = case Since of
first ->
Min = barrel_hlc:min(),
{Min, barrel_store_keys:doc_hlc(DbName, Min)};
SinceHlc ->
{SinceHlc, barrel_store_keys:doc_hlc(DbName, SinceHlc)}
end,
EndKey = barrel_store_keys:doc_hlc_end(DbName),
FoldFun = fun(Key, Value, {CurrentHlc, AccIn}) ->
ChangeHlc = barrel_store_keys:decode_hlc_key(DbName, Key),
case Since =/= first andalso barrel_hlc:equal(ChangeHlc, Since) of
true ->
{ok, {CurrentHlc, AccIn}};
false ->
CompactChange = decode_change_compact(Value, ChangeHlc),
case Fun(CompactChange, AccIn) of
{ok, AccOut} ->
{ok, {ChangeHlc, AccOut}};
{stop, AccOut} ->
{stop, {ChangeHlc, AccOut}};
stop ->
{stop, {CurrentHlc, AccIn}}
end
end
end,
{LastHlc, FinalAcc} = barrel_store_rocksdb:fold_range_long_scan(
StoreRef, StartKey, EndKey, FoldFun, {StartHlc, Acc}),
{ok, FinalAcc, LastHlc}.
%% @private Internal fold with scan mode selection
fold_changes_internal(StoreRef, DbName, Since, Fun, Acc, ScanMode) ->
{StartHlc, StartKey} = case Since of
first ->
%% Start from the very beginning
Min = barrel_hlc:min(),
{Min, barrel_store_keys:doc_hlc(DbName, Min)};
SinceHlc ->
%% Exclusive: start at SinceHlc, we'll skip matching entries
{SinceHlc, barrel_store_keys:doc_hlc(DbName, SinceHlc)}
end,
EndKey = barrel_store_keys:doc_hlc_end(DbName),
FoldFun = fun(Key, Value, {CurrentHlc, AccIn}) ->
ChangeHlc = barrel_store_keys:decode_hlc_key(DbName, Key),
%% Skip if we're at the exact Since HLC (exclusive)
case Since =/= first andalso barrel_hlc:equal(ChangeHlc, Since) of
true ->
{ok, {CurrentHlc, AccIn}};
false ->
Change = decode_change(Value, ChangeHlc),
case Fun(Change, AccIn) of
{ok, AccOut} ->
{ok, {ChangeHlc, AccOut}};
{stop, AccOut} ->
{stop, {ChangeHlc, AccOut}};
stop ->
{stop, {CurrentHlc, AccIn}}
end
end
end,
{LastHlc, FinalAcc} = case ScanMode of
long_scan ->
barrel_store_rocksdb:fold_range_long_scan(
StoreRef, StartKey, EndKey, FoldFun, {StartHlc, Acc});
normal ->
barrel_store_rocksdb:fold_range(
StoreRef, StartKey, EndKey, FoldFun, {StartHlc, Acc})
end,
{ok, FinalAcc, LastHlc}.
%% @doc Get a list of changes since an HLC timestamp
-spec get_changes(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp() | first, changes_opts()) ->
{ok, [change()], barrel_hlc:timestamp()}.
get_changes(StoreRef, DbName, Since, Opts) ->
DocIds = maps:get(doc_ids, Opts, undefined),
PathPatterns = maps:get(paths, Opts, undefined),
QuerySpec = maps:get(query, Opts, undefined),
%% Determine if we can use path index for more efficient query
%% We can use path index if:
%% - Path patterns are specified
%% - No doc_ids filter (can't combine with path index efficiently)
%% - No query filter (path index entries don't include doc body for query matching)
case {PathPatterns, DocIds, QuerySpec} of
{[SinglePath], undefined, undefined} when is_binary(SinglePath) ->
%% Single path pattern, no doc_ids, no query - use path index directly
get_changes_with_path_index(StoreRef, DbName, SinglePath, Since, Opts, QuerySpec);
{Paths, undefined, undefined} when is_list(Paths), length(Paths) > 1 ->
%% Multiple path patterns, no query - merge results from path indexes
get_changes_with_multiple_paths(StoreRef, DbName, Paths, Since, Opts, QuerySpec);
_ ->
%% Fall back to full scan with filtering when:
%% - doc_ids is specified (can't use path index)
%% - query is specified (path index entries don't have doc body)
%% - no paths specified
get_changes_full_scan(StoreRef, DbName, Since, Opts)
end.
%% @doc Get changes with continuation support for pagination.
%% Accepts either an HLC timestamp, 'first', or a continuation token (binary).
%% Returns changes along with continuation info for fetching the next batch.
%%
%% Example usage:
%% ```
%% %% First request
%% {ok, Changes1, Info1} = get_changes_chunked(Store, Db, first, #{limit => 100}),
%% %% Check if more changes available
%% case maps:get(has_more, Info1) of
%% true ->
%% Continuation = maps:get(continuation, Info1),
%% {ok, Changes2, Info2} = get_changes_chunked(Store, Db, Continuation, #{limit => 100});
%% false ->
%% done
%% end.
%% '''
-spec get_changes_chunked(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp() | first | binary(), changes_opts()) ->
{ok, [change()], continuation_info()}.
get_changes_chunked(StoreRef, DbName, ContinuationOrSince, Opts) ->
%% Decode continuation token if binary
Since = case ContinuationOrSince of
first -> first;
Bin when is_binary(Bin), byte_size(Bin) =:= 12 ->
%% This is an encoded HLC (continuation token)
barrel_hlc:decode(Bin);
Hlc when is_tuple(Hlc) ->
%% Already an HLC timestamp
Hlc
end,
%% Get one extra to determine if there are more
Limit = maps:get(limit, Opts, 100),
OptsWithExtra = Opts#{limit => Limit + 1},
{ok, AllChanges, _LastHlc} = get_changes(StoreRef, DbName, Since, OptsWithExtra),
%% Check if we got the extra one (meaning there are more)
{Changes, HasMore} = case length(AllChanges) > Limit of
true ->
{lists:sublist(AllChanges, Limit), true};
false ->
{AllChanges, false}
end,
%% Build continuation info
ResultLastHlc = case Changes of
[] ->
case Since of
first -> barrel_hlc:min();
_ -> Since
end;
_ ->
LastChange = lists:last(Changes),
maps:get(hlc, LastChange)
end,
ContinuationInfo = case HasMore of
true ->
#{
last_hlc => ResultLastHlc,
has_more => true,
continuation => barrel_hlc:encode(ResultLastHlc)
};
false ->
#{
last_hlc => ResultLastHlc,
has_more => false
}
end,
{ok, Changes, ContinuationInfo}.
%% @private Get changes using path index for a single path pattern
get_changes_with_path_index(StoreRef, DbName, PathPattern, Since, Opts, QuerySpec) ->
%% Use path index directly - no fallback to full scan
{ok, PathChanges, PathLastHlc} = get_changes_by_path(StoreRef, DbName, PathPattern, Since,
#{limit => maps:get(limit, Opts, infinity)}),
Limit = maps:get(limit, Opts, infinity),
Style = maps:get(style, Opts, all_docs),
CompiledQuery = compile_query(QuerySpec),
%% Apply query filter and style if needed
FilteredChanges = lists:filtermap(
fun(Change) ->
case maybe_match_query(CompiledQuery, Change) of
true ->
%% Apply style filter
StyledChange = apply_style(Style, Change),
{true, StyledChange};
false ->
false
end
end,
PathChanges
),
%% Apply limit if query filter was used (might have reduced count)
LimitedChanges = case {CompiledQuery, Limit} of
{undefined, _} -> FilteredChanges;
{_, infinity} -> FilteredChanges;
{_, N} -> lists:sublist(FilteredChanges, N)
end,
Changes = case maps:get(descending, Opts, false) of
true -> lists:reverse(LimitedChanges);
false -> LimitedChanges
end,
{ok, Changes, PathLastHlc}.
%% @private Get changes from multiple path patterns and merge by HLC
get_changes_with_multiple_paths(StoreRef, DbName, Paths, Since, Opts, QuerySpec) ->
%% Get changes from each path index - no fallback to full scan
AllChanges = lists:flatmap(
fun(PathPattern) ->
{ok, Changes, _} = get_changes_by_path(StoreRef, DbName, PathPattern, Since, #{}),
Changes
end,
Paths
),
Limit = maps:get(limit, Opts, infinity),
Style = maps:get(style, Opts, all_docs),
CompiledQuery = compile_query(QuerySpec),
%% Remove duplicates (same doc_id) keeping latest HLC
Deduped = dedupe_changes_by_id(AllChanges),
%% Sort by HLC
Sorted = lists:sort(
fun(A, B) ->
barrel_hlc:compare(maps:get(hlc, A), maps:get(hlc, B)) =/= gt
end,
Deduped
),
%% Apply query filter and limit
{FilteredChanges, _Count} = lists:foldl(
fun(Change, {Acc, Count}) ->
case Limit =/= infinity andalso Count >= Limit of
true ->
{Acc, Count};
false ->
case maybe_match_query(CompiledQuery, Change) of
true ->
StyledChange = apply_style(Style, Change),
{[StyledChange | Acc], Count + 1};
false ->
{Acc, Count}
end
end
end,
{[], 0},
Sorted
),
Changes = case maps:get(descending, Opts, false) of
true -> FilteredChanges;
false -> lists:reverse(FilteredChanges)
end,
LastHlc = case FilteredChanges of
[] -> case Since of first -> barrel_hlc:min(); _ -> Since end;
_ -> maps:get(hlc, hd(FilteredChanges))
end,
{ok, Changes, LastHlc}.
%% @private Full scan with filtering
get_changes_full_scan(StoreRef, DbName, Since, Opts) ->
DocIds = maps:get(doc_ids, Opts, undefined),
PathPatterns = maps:get(paths, Opts, undefined),
QuerySpec = maps:get(query, Opts, undefined),
IncludeDocs = maps:get(include_docs, Opts, false),
%% Fast path: no filters and no include_docs - use compact format for efficiency
case {DocIds, PathPatterns, QuerySpec, IncludeDocs} of
{undefined, undefined, undefined, false} ->
get_changes_fast(StoreRef, DbName, Since, Opts);
_ ->
get_changes_filtered(StoreRef, DbName, Since, Opts)
end.
%% @private Fast path using compact format (no filtering needed)
get_changes_fast(StoreRef, DbName, Since, Opts) ->
Limit = maps:get(limit, Opts, infinity),
Style = maps:get(style, Opts, all_docs),
StartHlc = case Since of first -> barrel_hlc:min(); H -> H end,
%% Accumulator: {Count, LastProcessedHlc, Changes}
FoldFun = fun({DocId, Hlc, Rev, Deleted, NumConflicts}, {Count, _LastHlc, Acc}) ->
Change = compact_to_change(DocId, Hlc, Rev, Deleted, NumConflicts, Style),
NewCount = Count + 1,
NewAcc = {NewCount, Hlc, [Change | Acc]},
case Limit =/= infinity andalso NewCount >= Limit of
true -> {stop, NewAcc};
false -> {ok, NewAcc}
end
end,
{ok, {_Count, LastHlc, RevChanges}, _FoldHlc} = fold_changes_compact(
StoreRef, DbName, Since, FoldFun, {0, StartHlc, []}),
Changes = case maps:get(descending, Opts, false) of
true -> RevChanges;
false -> lists:reverse(RevChanges)
end,
{ok, Changes, LastHlc}.
%% @private Convert compact tuple to change map
compact_to_change(DocId, Hlc, Rev, Deleted, NumConflicts, _Style) ->
Change = #{
id => DocId,
hlc => Hlc,
rev => Rev,
changes => [#{rev => Rev}],
num_conflicts => NumConflicts
},
case Deleted of
true -> Change#{deleted => true};
false -> Change
end.
%% Batch size for chunked doc fetching
-define(DOC_FETCH_BATCH_SIZE, 100).
%% @private Filtered scan (needs full change format)
%% Uses chunked batch processing for efficient doc fetching.
get_changes_filtered(StoreRef, DbName, Since, Opts) ->
DocIds = maps:get(doc_ids, Opts, undefined),
Style = maps:get(style, Opts, all_docs),
PathPatterns = maps:get(paths, Opts, undefined),
QuerySpec = maps:get(query, Opts, undefined),
%% If path filter specified, prepare a match trie for efficient matching
PathMatcher = case PathPatterns of
undefined ->
undefined;
Patterns when is_list(Patterns) ->
Trie = match_trie:new(public),
lists:foreach(fun(P) -> match_trie:insert(Trie, P) end, Patterns),
Trie
end,
%% If query filter specified, compile it
CompiledQuery = compile_query(QuerySpec),
%% Do we need doc body for filtering or include_docs?
IncludeDocs = maps:get(include_docs, Opts, false),
NeedsDoc = PathMatcher =/= undefined orelse CompiledQuery =/= undefined orelse IncludeDocs,
%% Use chunked processing with batch doc fetching for efficiency
case NeedsDoc of
true ->
%% Chunked batch processing when docs are needed
get_changes_filtered_chunked(StoreRef, DbName, Since, Opts,
DocIds, Style, PathMatcher, CompiledQuery);
false ->
%% Simple fold when no doc fetching needed (doc_ids filter only)
get_changes_filtered_simple(StoreRef, DbName, Since, Opts,
DocIds, Style)
end.
%% @private Chunked processing with batch doc fetching
get_changes_filtered_chunked(StoreRef, DbName, Since, Opts,
DocIds, Style, PathMatcher, CompiledQuery) ->
Limit = maps:get(limit, Opts, infinity),
%% Collect changes in chunks, batch fetch docs, apply filters
FoldFun = fun(Change, {Chunk, ChunkSize, Results, ResultCount, _LastHlc}) ->
DocId = maps:get(id, Change),
%% Pre-filter by doc_ids before adding to chunk
IncludeByDocId = case DocIds of
undefined -> true;
Ids -> lists:member(DocId, Ids)
end,
case IncludeByDocId of
false ->
%% Skip this change entirely
{ok, {Chunk, ChunkSize, Results, ResultCount, maps:get(hlc, Change)}};
true ->
NewChunk = [{DocId, Change} | Chunk],
NewChunkSize = ChunkSize + 1,
NewLastHlc = maps:get(hlc, Change),
%% Process chunk when it reaches batch size
case NewChunkSize >= ?DOC_FETCH_BATCH_SIZE of
true ->
{NewResults, NewResultCount, Done} =
process_change_chunk(StoreRef, DbName, lists:reverse(NewChunk),
PathMatcher, CompiledQuery, Style,
Results, ResultCount, Limit),
case Done of
true ->
{stop, {[], 0, NewResults, NewResultCount, NewLastHlc}};
false ->
{ok, {[], 0, NewResults, NewResultCount, NewLastHlc}}
end;
false ->
{ok, {NewChunk, NewChunkSize, Results, ResultCount, NewLastHlc}}
end
end
end,
StartHlc = case Since of first -> barrel_hlc:min(); H -> H end,
%% Use long_scan optimization for full unbounded scans
FoldChanges = case {Since, Limit} of
{first, infinity} -> fun fold_changes_long_scan/5;
_ -> fun fold_changes/5
end,
{ok, {FinalChunk, _, RevResults, FinalCount, LastHlc}, _FoldHlc} =
FoldChanges(StoreRef, DbName, Since, FoldFun, {[], 0, [], 0, StartHlc}),
%% Process any remaining chunk
{AllResults, _FinalResultCount, _} = case FinalChunk of
[] ->
{RevResults, FinalCount, false};
_ ->
process_change_chunk(StoreRef, DbName, lists:reverse(FinalChunk),
PathMatcher, CompiledQuery, Style,
RevResults, FinalCount, Limit)
end,
%% Cleanup the trie if we created one
case PathMatcher of
undefined -> ok;
CleanupTrie -> match_trie:delete(CleanupTrie)
end,
Changes = case maps:get(descending, Opts, false) of
true -> AllResults;
false -> lists:reverse(AllResults)
end,
{ok, Changes, LastHlc}.
%% @private Process a chunk of changes with batch doc fetching
process_change_chunk(StoreRef, DbName, DocChangePairs,
PathMatcher, CompiledQuery, Style,
Results, ResultCount, Limit) ->
%% Batch fetch doc bodies for all changes in chunk
ChangesWithDocs = batch_fetch_doc_bodies(StoreRef, DbName, DocChangePairs),
%% Apply filters to each change
lists:foldl(
fun(ChangeWithDoc, {Acc, Count, Done}) ->
case Done of
true ->
{Acc, Count, true};
false ->
%% Check path patterns filter
IncludeByPath = case PathMatcher of
undefined ->
true;
MatchTrie ->
case maps:get(doc, ChangeWithDoc, undefined) of
undefined ->
false;
DocBody when is_map(DocBody) ->
DocPaths = barrel_ars:analyze(DocBody),
Topics = barrel_ars:paths_to_topics(DocPaths),
matches_any_pattern(MatchTrie, Topics);
_ ->
false
end
end,
%% Check query filter
IncludeByQuery = maybe_match_query(CompiledQuery, ChangeWithDoc),
case IncludeByPath andalso IncludeByQuery of
false ->
{Acc, Count, false};
true ->
FilteredChange = apply_style(Style, ChangeWithDoc),
NewCount = Count + 1,
NewAcc = [FilteredChange | Acc],
IsDone = Limit =/= infinity andalso NewCount >= Limit,
{NewAcc, NewCount, IsDone}
end
end
end,
{Results, ResultCount, false},
ChangesWithDocs
).
%% @private Simple filtered scan (doc_ids filter only, no doc fetching)
get_changes_filtered_simple(StoreRef, DbName, Since, Opts, DocIds, Style) ->
Limit = maps:get(limit, Opts, infinity),
FoldFun = fun(Change, {Count, Changes}) ->
DocId = maps:get(id, Change),
IncludeByDocId = case DocIds of
undefined -> true;
Ids -> lists:member(DocId, Ids)
end,
case IncludeByDocId of
false ->
{ok, {Count, Changes}};
true ->
FilteredChange = apply_style(Style, Change),
NewCount = Count + 1,
NewChanges = [FilteredChange | Changes],
case Limit of
infinity ->
{ok, {NewCount, NewChanges}};
N when NewCount >= N ->
{stop, {NewCount, NewChanges}};
_ ->
{ok, {NewCount, NewChanges}}
end
end
end,
FoldChanges = case {Since, Limit} of
{first, infinity} -> fun fold_changes_long_scan/5;
_ -> fun fold_changes/5
end,
{ok, {_Count, RevChanges}, LastHlc} = FoldChanges(StoreRef, DbName, Since, FoldFun, {0, []}),
Changes = case maps:get(descending, Opts, false) of
true -> RevChanges;
false -> lists:reverse(RevChanges)
end,
{ok, Changes, LastHlc}.
%% @private Compile query spec if provided
compile_query(undefined) -> undefined;
compile_query(QuerySpec) when is_map(QuerySpec) ->
case barrel_query:compile(QuerySpec) of
{ok, Plan} -> Plan;
{error, _} -> undefined
end.
%% @private Check if change matches query
maybe_match_query(undefined, _Change) -> true;
maybe_match_query(QueryPlan, Change) ->
case maps:get(doc, Change, undefined) of
undefined -> false;
Doc when is_map(Doc) -> barrel_query:match(QueryPlan, Doc);
_ -> false
end.
%% @private Apply style filter to change
apply_style(main_only, Change) ->
case maps:get(changes, Change, undefined) of
undefined ->
%% Path index changes don't have changes list, create one from rev
Rev = maps:get(rev, Change, <<>>),
Change#{changes => [#{rev => Rev}]};
[_ | _] = Changes ->
Change#{changes => [hd(Changes)]}
end;
apply_style(all_docs, Change) ->
case maps:get(changes, Change, undefined) of
undefined ->
%% Path index changes don't have changes list, create one from rev
Rev = maps:get(rev, Change, <<>>),
Change#{changes => [#{rev => Rev}]};
_ ->
Change
end.
%% @private Deduplicate changes by doc ID, keeping latest HLC
dedupe_changes_by_id(Changes) ->
%% Build map of doc_id -> change (keeping latest by HLC)
ById = lists:foldl(
fun(Change, Acc) ->
DocId = maps:get(id, Change),
case maps:get(DocId, Acc, undefined) of
undefined ->
Acc#{DocId => Change};
Existing ->
ExistingHlc = maps:get(hlc, Existing),
ChangeHlc = maps:get(hlc, Change),
case barrel_hlc:compare(ChangeHlc, ExistingHlc) of
gt -> Acc#{DocId => Change};
_ -> Acc
end
end
end,
#{},
Changes
),
maps:values(ById).
%% @private Check if any topic matches any pattern in the trie
matches_any_pattern(_Trie, []) ->
false;
matches_any_pattern(Trie, [Topic | Rest]) ->
case match_trie:match(Trie, Topic) of
[] -> matches_any_pattern(Trie, Rest);
[_ | _] -> true
end.
%% @doc Get the last sequence (opaque encoded HLC) for a database
%% The sequence is an opaque binary that can be used for ordering.
-spec get_last_seq(barrel_store_rocksdb:db_ref(), db_name()) -> binary().
get_last_seq(StoreRef, DbName) ->
Hlc = get_last_hlc(StoreRef, DbName),
barrel_hlc:encode(Hlc).
%% @doc Get the last HLC timestamp for a database
%% First tries the fast path (metadata key), falls back to reverse iteration
-spec get_last_hlc(barrel_store_rocksdb:db_ref(), db_name()) -> barrel_hlc:timestamp().
get_last_hlc(StoreRef, DbName) ->
%% Try fast path: read from metadata key (O(1))
LastHlcKey = barrel_store_keys:db_last_hlc(DbName),
case barrel_store_rocksdb:get(StoreRef, LastHlcKey) of
{ok, EncodedHlc} ->
barrel_hlc:decode(EncodedHlc);
not_found ->
%% Fallback for databases created before this optimization
get_last_hlc_slow(StoreRef, DbName)
end.
%% @private Get last HLC by reverse iteration (slow path, O(n) worst case)
get_last_hlc_slow(StoreRef, DbName) ->
StartKey = barrel_store_keys:doc_hlc_prefix(DbName),
EndKey = barrel_store_keys:doc_hlc_end(DbName),
barrel_store_rocksdb:fold_range_reverse(
StoreRef, StartKey, EndKey,
fun(Key, _Value, _Acc) ->
Hlc = barrel_store_keys:decode_hlc_key(DbName, Key),
%% Stop immediately after finding the first (last in order) entry
{stop, Hlc}
end,
barrel_hlc:min()
).
%% @doc Count changes since a given HLC timestamp (exclusive)
-spec count_changes_since(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp()) -> non_neg_integer().
count_changes_since(StoreRef, DbName, Since) ->
StartKey = barrel_store_keys:doc_hlc(DbName, Since),
EndKey = barrel_store_keys:doc_hlc_end(DbName),
barrel_store_rocksdb:fold_range(
StoreRef, StartKey, EndKey,
fun(Key, _Value, Count) ->
ChangeHlc = barrel_store_keys:decode_hlc_key(DbName, Key),
%% Skip if we're at the exact Since HLC (exclusive)
case barrel_hlc:equal(ChangeHlc, Since) of
true -> {ok, Count};
false -> {ok, Count + 1}
end
end,
0
).
%% Bucket granularity: 1 minute
-define(BUCKET_GRANULARITY_SECS, 60).
%% Number of recent buckets to check
-define(BUCKET_CHECK_COUNT, 5).
%% @doc Fast O(1) check if there are changes since a given HLC.
%% Uses time-bucketed hints to avoid scanning the change log.
%% Returns true if there might be changes, false if definitely no changes.
%% Note: May return true even if no changes (false positive OK, false negative not OK).
-spec has_changes_since(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp()) -> boolean().
has_changes_since(StoreRef, DbName, Since) ->
%% Get current bucket and check recent buckets
NowSecs = erlang:system_time(second),
CurrentBucket = NowSecs div ?BUCKET_GRANULARITY_SECS,
%% Check the last N buckets for any max_hlc > Since
check_recent_buckets(StoreRef, DbName, Since, CurrentBucket, ?BUCKET_CHECK_COUNT).
%% @private Check recent buckets for changes
check_recent_buckets(_StoreRef, _DbName, _Since, _Bucket, 0) ->
%% No buckets had changes > Since, but bucket data might be stale
%% Fall back to true to be safe (will do full scan)
true;
check_recent_buckets(StoreRef, DbName, Since, Bucket, Count) when Bucket >= 0 ->
BucketKey = barrel_store_keys:change_bucket(DbName, Bucket),
case barrel_store_rocksdb:get(StoreRef, BucketKey) of
{ok, <<_MinBin:12/binary, MaxBin:12/binary, BucketCount:32>>} ->
MaxHlc = barrel_hlc:decode(MaxBin),
case BucketCount > 0 andalso barrel_hlc:compare(MaxHlc, Since) =:= gt of
true -> true; %% Found changes after Since
false -> check_recent_buckets(StoreRef, DbName, Since, Bucket - 1, Count - 1)
end;
not_found ->
%% No bucket data, check older bucket
check_recent_buckets(StoreRef, DbName, Since, Bucket - 1, Count - 1)
end;
check_recent_buckets(_StoreRef, _DbName, _Since, _Bucket, _Count) ->
%% Bucket < 0, shouldn't happen in practice
true.
%% @private Batch fetch doc bodies using multi_get for efficiency.
%% Takes a list of {DocId, Change} pairs and returns changes with doc bodies added.
%% Uses multi_get: first for doc_current (to get revs), then for doc_body from body store.
-spec batch_fetch_doc_bodies(barrel_store_rocksdb:db_ref(), db_name(),
[{docid(), change()}]) -> [change()].
batch_fetch_doc_bodies(_StoreRef, _DbName, []) ->
[];
batch_fetch_doc_bodies(StoreRef, DbName, DocChangePairs) ->
%% Step 1: Build keys for doc_current lookup
DocIds = [DocId || {DocId, _Change} <- DocChangePairs],
CurrentKeys = [barrel_store_keys:doc_current(DbName, DocId) || DocId <- DocIds],
%% Step 2: Batch fetch doc_current entries
CurrentResults = barrel_store_rocksdb:multi_get(StoreRef, CurrentKeys),
%% Step 3: Parse results and build {DocId, Rev} pairs for non-deleted docs
{DocIdRevPairs, DeletedSet} = lists:foldl(
fun({DocId, Result}, {Pairs, Deleted}) ->
case Result of
{ok, CurrentBin} ->
{Rev, IsDeleted, _Hlc} = binary_to_term(CurrentBin),
case IsDeleted of
true ->
{Pairs, sets:add_element(DocId, Deleted)};
false ->
{[{DocId, Rev} | Pairs], Deleted}
end;
not_found ->
{Pairs, Deleted};
{error, _} ->
{Pairs, Deleted}
end
end,
{[], sets:new()},
lists:zip(DocIds, CurrentResults)
),
%% Step 4: Batch fetch doc bodies from body store (BlobDB)
DocBodies = case DocIdRevPairs of
[] ->
#{};
_ ->
ReversedPairs = lists:reverse(DocIdRevPairs),
BodyResults = barrel_doc_body_store:multi_get_bodies(DbName, ReversedPairs, #{}),
lists:foldl(
fun({{DocId, _Rev}, Result}, Acc) ->
case Result of
{ok, CborBin} ->
DocBody = barrel_docdb_codec_cbor:decode_any(CborBin),
maps:put(DocId, DocBody, Acc);
_ ->
Acc
end
end,
#{},
lists:zip(ReversedPairs, BodyResults)
)
end,
%% Step 5: Merge doc bodies back into changes
[case sets:is_element(DocId, DeletedSet) of
true ->
Change; %% Deleted doc, no body to add
false ->
case maps:get(DocId, DocBodies, undefined) of
undefined -> Change;
DocBody -> Change#{doc => DocBody}
end
end || {DocId, Change} <- DocChangePairs].
%%====================================================================
%% Internal API - for barrel_db_writer
%%====================================================================
%% @doc Write a change entry for a document.
%% Also updates change buckets for idle poll optimization.
-spec write_change(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp(), doc_info()) -> ok.
write_change(StoreRef, DbName, Hlc, DocInfo) ->
ChangeOps = write_change_ops(DbName, Hlc, DocInfo),
BucketOps = update_change_bucket_ops(StoreRef, DbName, Hlc),
PathOps = write_path_index_ops(DbName, Hlc, DocInfo),
barrel_store_rocksdb:write_batch(StoreRef, ChangeOps ++ BucketOps ++ PathOps).
%% @doc Return batch operation to write a change entry.
%% Use this to combine with other operations in a single write_batch.
%% Also updates the last_hlc metadata for efficient get_last_seq lookups.
%% Note: Does NOT update change buckets - call write_change/4 for full update.
-spec write_change_ops(db_name(), barrel_hlc:timestamp(),
#{id := binary(), rev := binary(), deleted := boolean(), _ => _}) ->
[{put, binary(), binary()}].
write_change_ops(DbName, Hlc, DocInfo) ->
Key = barrel_store_keys:doc_hlc(DbName, Hlc),
Value = encode_change(DocInfo),
%% Also update last_hlc metadata for O(1) get_last_seq
LastHlcKey = barrel_store_keys:db_last_hlc(DbName),
LastHlcValue = barrel_hlc:encode(Hlc),
[{put, Key, Value}, {put, LastHlcKey, LastHlcValue}].
%% @doc Return batch operation to update change bucket.
%% Buckets store {MinHlc, MaxHlc, Count} in compact binary format.
%% Format: `<<MinHlc:12/binary, MaxHlc:12/binary, Count:32>>'
-spec update_change_bucket_ops(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp()) -> [{put, binary(), binary()}].
update_change_bucket_ops(StoreRef, DbName, Hlc) ->
NowSecs = erlang:system_time(second),
BucketTs = NowSecs div ?BUCKET_GRANULARITY_SECS,
BucketKey = barrel_store_keys:change_bucket(DbName, BucketTs),
HlcBin = barrel_hlc:encode(Hlc),
%% Read current bucket value and update
NewValue = case barrel_store_rocksdb:get(StoreRef, BucketKey) of
{ok, <<MinBin:12/binary, MaxBin:12/binary, Count:32>>} ->
MinHlc = barrel_hlc:decode(MinBin),
MaxHlc = barrel_hlc:decode(MaxBin),
NewMinBin = case barrel_hlc:compare(Hlc, MinHlc) of
lt -> HlcBin;
_ -> MinBin
end,
NewMaxBin = case barrel_hlc:compare(Hlc, MaxHlc) of
gt -> HlcBin;
_ -> MaxBin
end,
<<NewMinBin/binary, NewMaxBin/binary, (Count + 1):32>>;
not_found ->
<<HlcBin/binary, HlcBin/binary, 1:32>>
end,
[{put, BucketKey, NewValue}].
%% @doc Delete an old HLC entry (when document is updated)
-spec delete_old_change(barrel_store_rocksdb:db_ref(), db_name(),
barrel_hlc:timestamp(), docid()) -> ok.
delete_old_change(StoreRef, DbName, OldHlc, _DocId) ->
Key = barrel_store_keys:doc_hlc(DbName, OldHlc),
barrel_store_rocksdb:delete(StoreRef, Key).
%%====================================================================
%% Internal Functions
%%====================================================================
%% @doc Encode change to compact binary format.
%% Format: `<<DocIdLen:16, DocId/binary, RevLen:16, Rev/binary, Deleted:8, NumConflicts:16, HasDoc:8, [DocCbor/binary]>>'
%% Stores conflict count for quick check; optionally includes doc body for filtering.
-spec encode_change(#{id := binary(), rev := binary(), deleted => boolean(), _ => _}) -> binary().
encode_change(DocInfo) ->
DocId = maps:get(id, DocInfo),
Rev = maps:get(rev, DocInfo),
Deleted = case maps:get(deleted, DocInfo, false) of true -> 1; false -> 0 end,
NumConflicts = count_conflicts(DocInfo),
DocIdLen = byte_size(DocId),
RevLen = byte_size(Rev),
Base = <<DocIdLen:16, DocId/binary, RevLen:16, Rev/binary, Deleted:8, NumConflicts:16>>,
case maps:get(doc, DocInfo, undefined) of
undefined ->
<<Base/binary, 0:8>>;
Doc when is_map(Doc) ->
DocCbor = barrel_docdb_codec_cbor:encode(Doc),
<<Base/binary, 1:8, DocCbor/binary>>;
_ ->
<<Base/binary, 0:8>>
end.
count_conflicts(#{revtree := RevTree}) when is_map(RevTree) ->
length(barrel_revtree_bin:conflicts_from_map(RevTree));
count_conflicts(_) ->
0.
%% @doc Decode change to compact tuple format for efficient iteration.
%% Returns {DocId, Hlc, Rev, Deleted, NumConflicts} - minimal allocation.
-spec decode_change_compact(binary(), barrel_hlc:timestamp()) ->
{docid(), barrel_hlc:timestamp(), binary(), boolean(), non_neg_integer()}.
decode_change_compact(<<DocIdLen:16, DocId:DocIdLen/binary,
RevLen:16, Rev:RevLen/binary,
Deleted:8, NumConflicts:16, _Rest/binary>>, Hlc) ->
{DocId, Hlc, Rev, Deleted =:= 1, NumConflicts}.
%% @doc Decode change to full map format for API responses.
%% Includes doc body if present in the change record.
-spec decode_change(binary(), barrel_hlc:timestamp()) -> change().
decode_change(<<DocIdLen:16, DocId:DocIdLen/binary,
RevLen:16, Rev:RevLen/binary,
Deleted:8, NumConflicts:16, HasDoc:8, Rest/binary>>, Hlc) ->
Change = #{
id => DocId,
hlc => Hlc,
rev => Rev,
changes => [#{rev => Rev}],
num_conflicts => NumConflicts
},
Change1 = case Deleted =:= 1 of
true -> Change#{deleted => true};
false -> Change
end,
case HasDoc of
1 ->
Doc = barrel_docdb_codec_cbor:decode(Rest),
Change1#{doc => Doc};
0 ->
Change1
end.
%%====================================================================
%% Path-Indexed Changes API
%%====================================================================
%% @doc Generate operations to index a change by all its paths.
%% Creates entries under path_hlc/{db}/{topic}/{hlc} for exact matches,
%% and prefix_changes/{db}/{prefix}/{bucket} posting lists for wildcard queries.
-spec write_path_index_ops(db_name(), barrel_hlc:timestamp(),
#{id := binary(), rev := binary(), deleted := boolean(), _ => _}) ->
[{put, binary(), binary()} | {posting_append, binary(), binary()}].
write_path_index_ops(DbName, Hlc, DocInfo) ->
#{id := DocId, rev := Rev, deleted := Deleted} = DocInfo,
%% Extract topics from document paths
Topics = case Deleted of
true ->
%% For deleted docs, just use the doc ID as a topic
[DocId];
false ->
Doc = maps:get(doc, DocInfo, #{}),
case Doc of
#{} ->
Paths = barrel_ars:analyze(Doc),
barrel_ars:paths_to_topics(Paths);
_ ->
[DocId]
end
end,
%% Create index entry value using compact binary format
%% Format: DocIdLen:16, DocId, RevLen:16, Rev, Deleted:8, NumConflicts:16, HasDoc:8
ChangeValue = encode_change(#{id => DocId, rev => Rev, deleted => Deleted}),
%% Index each topic and all its prefixes
AllPrefixes = lists:usort(lists:flatmap(fun topic_prefixes/1, Topics)),
%% Old path_hlc index: topic + hlc (for exact matches)
PathHlcOps = [{put, barrel_store_keys:path_hlc(DbName, Prefix, Hlc), ChangeValue}
|| Prefix <- AllPrefixes],
%% New prefix_changes posting list: sharded by time bucket
%% Entry format: << HLC:12/binary, Change/binary >> - sorted by HLC in posting list
HlcBin = barrel_hlc:encode(Hlc),
Bucket = barrel_store_keys:hlc_to_bucket(Hlc),
PrefixChangeOps = [{posting_append,
barrel_store_keys:prefix_changes_key(DbName, Prefix, Bucket),
<<HlcBin/binary, ChangeValue/binary>>}
|| Prefix <- AllPrefixes],
PathHlcOps ++ PrefixChangeOps.
%% @doc Generate operations to update path index (remove old entries + add new).
%% Used when a document is updated to maintain a current path index without stale entries.
-spec update_path_index_ops(db_name(), barrel_hlc:timestamp(),
#{id := binary(), rev := binary(), deleted := boolean(), _ => _},
barrel_hlc:timestamp() | undefined, map() | undefined) ->
[tuple()].
update_path_index_ops(DbName, NewHlc, NewDocInfo, OldHlc, OldDoc) ->
%% Generate new path entries
NewOps = write_path_index_ops(DbName, NewHlc, NewDocInfo),
%% Generate delete ops for old entries if document existed before
DeleteOps = case {OldHlc, OldDoc} of
{undefined, _} ->
[];
{_, undefined} ->
[];
{_, _} ->
#{id := DocId, rev := OldRev} = NewDocInfo,
OldDeleted = false, %% If we're updating, old wasn't deleted
OldChangeValue = encode_change(#{id => DocId, rev => OldRev, deleted => OldDeleted}),
%% Extract old topics and delete their path_hlc entries
OldPaths = barrel_ars:analyze(OldDoc),
OldTopics = barrel_ars:paths_to_topics(OldPaths),
OldPrefixes = lists:usort(lists:flatmap(fun topic_prefixes/1, OldTopics)),
%% Delete from old path_hlc index
OldPathHlcDeletes = [{delete, barrel_store_keys:path_hlc(DbName, P, OldHlc)}
|| P <- OldPrefixes],
%% Remove from prefix_changes posting list (tombstone marker)
OldHlcBin = barrel_hlc:encode(OldHlc),
OldBucket = barrel_store_keys:hlc_to_bucket(OldHlc),
OldPrefixRemoves = [{posting_remove,
barrel_store_keys:prefix_changes_key(DbName, P, OldBucket),
<<OldHlcBin/binary, OldChangeValue/binary>>}
|| P <- OldPrefixes],
OldPathHlcDeletes ++ OldPrefixRemoves
end,
DeleteOps ++ NewOps.
%% @doc Get changes for a specific path pattern since HLC.
%% Scans the path_hlc index directly for efficient filtered queries.
-spec get_changes_by_path(barrel_store_rocksdb:db_ref(), db_name(),
binary(), barrel_hlc:timestamp() | first, map()) ->
{ok, [change()], barrel_hlc:timestamp()}.
get_changes_by_path(StoreRef, DbName, PathPattern, Since, Opts) ->
Limit = maps:get(limit, Opts, infinity),
%% Parse the path pattern
case parse_path_pattern(PathPattern) of
{exact, Topic} ->
%% Exact match: scan path_hlc/{db}/{topic}/{since}..{end}
scan_path_hlc(StoreRef, DbName, Topic, Since, Limit);
{prefix, TopicPrefix} ->
%% Prefix match (ends with #): scan all topics with prefix
%% For prefix match, we scan the prefix and filter by HLC
scan_path_hlc_prefix(StoreRef, DbName, TopicPrefix, Since, Limit)
end.
%% @private Parse path pattern to determine scan type
%% - Exact: "users/123/name" -> {exact, <<"users/123/name">>}
%% - Prefix: "users/#" -> {prefix, <<"users/">>}
parse_path_pattern(Pattern) ->
case binary:last(Pattern) of
$# ->
%% Remove # and trailing /# to get prefix
Len = byte_size(Pattern) - 1,
<<Prefix:Len/binary, _/binary>> = Pattern,
%% Also remove trailing / if present
Prefix2 = case binary:last(Prefix) of
$/ -> binary:part(Prefix, 0, byte_size(Prefix) - 1);
_ -> Prefix
end,
{prefix, Prefix2};
_ ->
{exact, Pattern}
end.
%% @private Scan path_hlc index for exact topic
scan_path_hlc(StoreRef, DbName, Topic, Since, Limit) ->
{StartHlc, StartKey} = case Since of
first ->
Min = barrel_hlc:min(),
{Min, barrel_store_keys:path_hlc(DbName, Topic, Min)};
SinceHlc ->
{SinceHlc, barrel_store_keys:path_hlc(DbName, Topic, SinceHlc)}
end,
EndKey = barrel_store_keys:path_hlc_end(DbName, Topic),
%% Use fold_range_limit to select read profile based on expected result size
FoldFun = fun(Key, Value, {CurrentHlc, Count, Acc}) ->
case Limit =/= infinity andalso Count >= Limit of
true ->
{stop, {CurrentHlc, Count, Acc}};
false ->
{_KeyTopic, ChangeHlc} = barrel_store_keys:decode_path_hlc_key(DbName, Key),
%% Skip if we're at the exact Since HLC (exclusive)
case Since =/= first andalso barrel_hlc:equal(ChangeHlc, Since) of
true ->
{ok, {CurrentHlc, Count, Acc}};
false ->
%% Decode from compact binary format
Change = decode_path_hlc_value(Value, ChangeHlc),
{ok, {ChangeHlc, Count + 1, [Change | Acc]}}
end
end
end,
{LastHlc, _Count, Changes} = barrel_store_rocksdb:fold_range_limit(
StoreRef, StartKey, EndKey, FoldFun, {StartHlc, 0, []}, Limit
),
{ok, lists:reverse(Changes), LastHlc}.
%% @private Scan prefix_changes posting lists for wildcard queries (# pattern)
%% Uses sharded posting lists: one posting list per (prefix, time_bucket)
%% Each entry in the posting list is << HLC:12, Change/binary >> sorted by HLC.
%% Uses range scan on posting_cf to find actual bucket keys (avoids scanning empty buckets).
scan_path_hlc_prefix(StoreRef, DbName, TopicPrefix, Since, Limit) ->
SinceHlc = case Since of
first -> barrel_hlc:min();
Hlc -> Hlc
end,
%% Calculate bucket range for key bounds
StartBucket = barrel_store_keys:hlc_to_bucket(SinceHlc),
%% Use max bucket (0xFFFFFFFF) as upper bound - range scan will stop at actual keys
MaxBucket = 16#FFFFFFFF,
%% Range scan to find all bucket keys for this prefix
StartKey = barrel_store_keys:prefix_changes_start(DbName, TopicPrefix, StartBucket),
EndKey = barrel_store_keys:prefix_changes_end(DbName, TopicPrefix, MaxBucket),
%% Fold over bucket keys in posting_cf
%% fold_range_posting already decodes entries via rocksdb:posting_list_keys
FoldFun = fun(_BucketKey, Entries, {CurrentLastHlc, Count, Acc}) ->
case Limit =/= infinity andalso Count >= Limit of
true ->
{stop, {CurrentLastHlc, Count, Acc}};
false ->
%% Filter entries by HLC and decode
{NewLastHlc, NewAcc} = filter_posting_entries(
Entries, SinceHlc, Since, CurrentLastHlc, Acc, Limit
),
{ok, {NewLastHlc, length(NewAcc), NewAcc}}
end
end,
{LastHlc, _Count, Changes} = barrel_store_rocksdb:fold_range_posting(
StoreRef, StartKey, EndKey, FoldFun, {SinceHlc, 0, []}
),
{ok, lists:reverse(Changes), LastHlc}.
%% @private Filter and decode posting entries
filter_posting_entries([], _SinceHlc, _Since, LastHlc, Acc, _Limit) ->
{LastHlc, Acc};
filter_posting_entries(_Entries, _SinceHlc, _Since, LastHlc, Acc, Limit)
when Limit =/= infinity, length(Acc) >= Limit ->
{LastHlc, Acc};
filter_posting_entries([Entry | Rest], SinceHlc, Since, LastHlc, Acc, Limit) ->
%% Entry format: << HLC:12/binary, Change/binary >>
<<HlcBin:12/binary, ChangeValue/binary>> = Entry,
EntryHlc = barrel_hlc:decode(HlcBin),
%% Check if entry is after SinceHlc
case barrel_hlc:compare(EntryHlc, SinceHlc) of
gt ->
%% Entry is after since - include it
Change = decode_path_hlc_value(ChangeValue, EntryHlc),
filter_posting_entries(Rest, SinceHlc, Since, EntryHlc, [Change | Acc], Limit);
eq when Since =:= first ->
%% Equal to min HLC and since is first - include it
Change = decode_path_hlc_value(ChangeValue, EntryHlc),
filter_posting_entries(Rest, SinceHlc, Since, EntryHlc, [Change | Acc], Limit);
_ ->
%% Entry is at or before since - skip it
filter_posting_entries(Rest, SinceHlc, Since, LastHlc, Acc, Limit)
end.
%% @private Decode path_hlc index value to change map.
%% Uses same compact binary format as change feed.
decode_path_hlc_value(<<DocIdLen:16, DocId:DocIdLen/binary,
RevLen:16, Rev:RevLen/binary,
Deleted:8, _Rest/binary>>, Hlc) ->
Change = #{
id => DocId,
rev => Rev,
hlc => Hlc
},
case Deleted =:= 1 of
true -> Change#{deleted => true};
false -> Change
end.
%% @private Generate topic prefixes for hierarchical matching
%% "users/123/name" -> ["users", "users/123", "users/123/name"]
-spec topic_prefixes(binary()) -> [binary()].
topic_prefixes(Topic) ->
Parts = binary:split(Topic, <<"/">>, [global]),
build_prefixes(Parts, <<>>, []).
build_prefixes([], _Current, Acc) ->
lists:reverse(Acc);
build_prefixes([Part | Rest], <<>>, Acc) ->
build_prefixes(Rest, Part, [Part | Acc]);
build_prefixes([Part | Rest], Current, Acc) ->
New = <<Current/binary, "/", Part/binary>>,
build_prefixes(Rest, New, [New | Acc]).