%%%-------------------------------------------------------------------
%%% @doc Query compiler and executor for barrel_docdb
%%%
%%% Provides functions to compile Datalog-style query specifications
%%% into query plans, and execute them against the path index.
%%%
%%% Query Syntax:
%%% ```
%%% #{
%%% where => [
%%% {path, [<<"type">>], <<"user">>}, % equality
%%% {path, [<<"org_id">>], '?Org'}, % bind variable
%%% {compare, [<<"age">>], '>', 18}, % comparison
%%% {'and', [...]}, % conjunction
%%% {'or', [...]} % disjunction
%%% ],
%%% select => ['?Org', '?Name'], % fields/variables to return
%%% order_by => '?Name', % ordering
%%% limit => 100, % max results
%%% offset => 0 % skip first N
%%% }
%%% '''
%%% @end
%%%-------------------------------------------------------------------
-module(barrel_query).
-include("barrel_docdb.hrl").
%% API
-export([
compile/1,
validate_spec/1,
execute/3,
execute/4,
match/2,
matches/2,
explain/1,
extract_paths/1
]).
%% Internal exports for testing
-export([
is_logic_var/1,
normalize_condition/1
]).
%% Profiling (temporary)
-export([get_profile/0, reset_profile/0, dump_profile/0]).
%%====================================================================
%% Types
%%====================================================================
-type logic_var() :: atom(). % Atoms starting with '?'
-type path() :: [binary() | integer()].
-type value() :: binary() | number() | boolean() | null | logic_var().
-type compare_op() :: '>' | '<' | '>=' | '=<' | '=/=' | '=='.
-type condition() ::
{path, path(), value()} |
{compare, path(), compare_op(), value()} |
{'and', [condition()]} |
{'or', [condition()]} |
{'not', condition()} |
{in, path(), [value()]} |
{contains, path(), value()} |
{exists, path()} |
{missing, path()} |
{regex, path(), binary()} |
{prefix, path(), binary()}.
-type projection() :: logic_var() | path() | '*'.
-type order_spec() :: logic_var() | path() | {logic_var() | path(), asc | desc}.
-type query_spec() :: #{
where => [condition()],
select => [projection()],
order_by => order_spec() | [order_spec()],
limit => pos_integer(),
offset => non_neg_integer(),
include_docs => boolean(),
flat => boolean(),
id_prefix => binary(),
id_range => {binary() | undefined, binary() | undefined},
doc_format => binary | map | json,
decoder_fun => fun((binary()) -> term())
}.
-record(query_plan, {
%% Normalized conditions
conditions :: [condition()],
%% Variables bound in conditions
bindings :: #{logic_var() => path()},
%% Fields/variables to project
projections :: [projection()],
%% Ordering specification
order :: [{path() | logic_var(), asc | desc}],
%% Result limit
limit :: pos_integer() | undefined,
%% Result offset
offset :: non_neg_integer(),
%% Include full documents
include_docs :: boolean(),
%% Document format for include_docs: map (default) | binary | json
doc_format :: binary | map | json,
%% Custom decoder function for documents
decoder_fun :: undefined | fun((binary()) -> term()),
%% Index strategy hint
strategy :: index_seek | index_scan | multi_index | full_scan,
%% Return flat documents (Doc#{<<"id">>}) instead of #{id, doc} wrappers
flat = false :: boolean(),
%% Primary-key scan over the entity keyspace by document id:
%% undefined | {prefix, binary()} | {range, Start, End}
%% (Start/End are binary() | undefined). Bypasses the ARS index.
id_scan = undefined :: undefined | {prefix, binary()}
| {range, binary() | undefined, binary() | undefined}
}).
-type query_plan() :: #query_plan{}.
%% Chunked execution options
-type chunk_opts() :: #{
chunk_size => pos_integer(), %% Max results per chunk (default 1000)
continuation => binary() | undefined, %% Resume from previous chunk
eventual_consistency => boolean() %% Skip snapshot for fresh reads (default false)
}.
%% Chunked execution result metadata
-type result_meta() :: #{
last_seq := seq(),
has_more := boolean(),
continuation => binary() %% Only present if has_more = true
}.
-export_type([query_spec/0, query_plan/0, condition/0, logic_var/0, chunk_opts/0, result_meta/0]).
%% Read profile type from storage layer
-type read_profile() :: barrel_store_rocksdb:read_profile().
%%====================================================================
%% Chunked Execution Constants
%%====================================================================
%% Target: ~1MB of doc data per chunk for good cache utilization
-define(TARGET_CHUNK_BYTES, 1048576).
-define(MIN_CHUNK_SIZE, 50).
-define(MAX_CHUNK_SIZE, 1000).
-define(INITIAL_CHUNK_SIZE, 200).
%%====================================================================
%% Adaptive Strategy Constants
%%====================================================================
%% Threshold for choosing between scan vs posting list strategies:
%% - Below threshold: use value_index scan (per-doc keys, early termination)
%% - Above threshold: use posting list decode (one seek, bulk decode)
-define(ADAPTIVE_CARDINALITY_THRESHOLD, 1000).
%% Maximum unbounded results before forcing pagination
%% For include_docs=true queries without limit, auto-paginate at this size
-define(MAX_UNBOUNDED_RESULTS, 1000).
%%====================================================================
%% Streaming Batch Fetch Constants
%%====================================================================
%% Minimum batch size for streaming (too small = overhead from many batches)
-define(STREAM_MIN_BATCH, 50).
%% Maximum batch size for streaming (too large = defeats the purpose)
-define(STREAM_MAX_BATCH, 500).
%% Initial assumed selectivity (50% of docs pass filter)
-define(STREAM_INITIAL_SELECTIVITY, 0.5).
%% Threshold ratio: use streaming when we have Nx more docs than needed
-define(STREAM_THRESHOLD_RATIO, 3).
%% Streaming statistics for adaptive batch sizing
-record(stream_stats, {
fetched = 0 :: non_neg_integer(), %% Total DocIds fetched
decoded = 0 :: non_neg_integer(), %% Total docs decoded
matched = 0 :: non_neg_integer(), %% Docs passing filter
selectivity = ?STREAM_INITIAL_SELECTIVITY :: float() %% Estimated filter pass rate
}).
%%====================================================================
%% Pipelined Execution Constants
%%====================================================================
%% Batch size for pipelined index iteration
-define(PIPELINE_BATCH_SIZE, 200).
%% Maximum in-flight body fetches (limits memory usage)
-define(PIPELINE_MAX_INFLIGHT, 4).
%% Minimum DocIds to use pipelining (below this, sequential is faster)
-define(PIPELINE_MIN_DOCIDS, 500).
%%====================================================================
%% API
%%====================================================================
%% @doc Compile a query specification into a query plan.
%% Returns {ok, QueryPlan} or {error, Reason}.
-spec compile(query_spec()) -> {ok, query_plan()} | {error, term()}.
compile(Spec) when is_map(Spec) ->
barrel_trace:with_db_span(query_compile, undefined, fun() ->
case validate_spec(Spec) of
ok ->
do_compile(Spec);
{error, _} = Error ->
barrel_trace:record_error(Error),
Error
end
end);
compile(_) ->
{error, {invalid_spec, not_a_map}}.
%% @doc Validate a query specification.
%% Returns ok or {error, Reason}.
-spec validate_spec(query_spec()) -> ok | {error, term()}.
validate_spec(Spec) when is_map(Spec) ->
HasIdScan = maps:is_key(id_prefix, Spec) orelse maps:is_key(id_range, Spec),
case maps:get(where, Spec, undefined) of
undefined when HasIdScan ->
%% Standalone id_prefix / id_range scan - no where clause required.
case compile_id_scan(Spec) of
{error, _} = Error -> Error;
_ -> ok
end;
undefined ->
{error, {missing_clause, where}};
Where when is_list(Where) ->
validate_conditions(Where);
_ ->
{error, {invalid_clause, where, must_be_list}}
end;
validate_spec(_) ->
{error, {invalid_spec, not_a_map}}.
%% @doc Execute a compiled query plan against a database.
%% Returns {ok, Results, LastSeq} or {error, Reason}.
%% Uses a snapshot for read consistency across all index and document reads.
%%
%% For unbounded include_docs queries, automatically paginates internally
%% to avoid excessive memory usage from large MultiGet operations.
-spec execute(barrel_store_rocksdb:db_ref(), db_name(), query_plan()) ->
{ok, [map()], seq()} | {error, term()}.
execute(StoreRef, DbName, #query_plan{include_docs = true, limit = undefined} = Plan) ->
%% Unbounded include_docs query - auto-paginate to avoid memory issues
Result = barrel_trace:with_db_span(query_execute, DbName, fun() ->
execute_with_auto_pagination(StoreRef, DbName, Plan)
end),
maybe_flatten_result(Result, Plan);
execute(StoreRef, DbName, #query_plan{} = Plan) ->
%% Bounded query or pure index query - execute directly
Result = barrel_trace:with_db_span(query_execute, DbName, fun() ->
{ok, Snapshot} = barrel_store_rocksdb:snapshot(StoreRef),
try
execute_with_snapshot(StoreRef, DbName, Plan, Snapshot)
after
barrel_store_rocksdb:release_snapshot(Snapshot)
end
end),
maybe_flatten_result(Result, Plan).
%% @private Execute unbounded include_docs query with automatic pagination
%% Collects results in chunks to avoid memory pressure from large MultiGet
execute_with_auto_pagination(StoreRef, DbName, Plan) ->
collect_all_chunks(StoreRef, DbName, Plan, undefined, []).
%% @private Direct execution bypassing auto-pagination
%% Used when chunked execution falls back to non-chunked for needs_body queries
execute_direct(StoreRef, DbName, Plan) ->
{ok, Snapshot} = barrel_store_rocksdb:snapshot(StoreRef),
try
execute_with_snapshot(StoreRef, DbName, Plan, Snapshot)
after
barrel_store_rocksdb:release_snapshot(Snapshot)
end.
%% @private Collect all chunks from paginated query
-spec collect_all_chunks(barrel_store_rocksdb:db_ref(), db_name(), query_plan(),
binary() | undefined, [map()]) ->
{ok, [map()], seq()} | {error, term()}.
collect_all_chunks(StoreRef, DbName, Plan, undefined, AccResults) ->
%% Initial call - no continuation token
%% execute_chunked_fresh always succeeds (no cursor lookup)
Opts = #{chunk_size => ?MAX_UNBOUNDED_RESULTS},
{ok, Results, Meta} = execute_chunked_fresh(StoreRef, DbName, Plan, Opts),
case Meta of
#{has_more := false, last_seq := LastSeq} ->
{ok, AccResults ++ Results, LastSeq};
#{has_more := true, continuation := NextToken} ->
collect_all_chunks(StoreRef, DbName, Plan, NextToken, AccResults ++ Results)
end;
collect_all_chunks(StoreRef, DbName, Plan, Token, AccResults) when is_binary(Token) ->
%% Continuation call - cursor lookup may fail
Opts = #{chunk_size => ?MAX_UNBOUNDED_RESULTS},
case execute_chunked_resume(StoreRef, DbName, Plan, Token, Opts) of
{ok, Results, #{has_more := false, last_seq := LastSeq}} ->
{ok, AccResults ++ Results, LastSeq};
{ok, Results, #{has_more := true, continuation := NextToken}} ->
collect_all_chunks(StoreRef, DbName, Plan, NextToken, AccResults ++ Results);
{error, _} = Error ->
Error
end.
%% @doc Execute a compiled query plan with chunked execution options.
%% Returns {ok, Results, ResultMeta} where ResultMeta includes:
%% - last_seq: sequence number for change tracking
%% - has_more: true if more results available
%% - continuation: opaque token to resume (only if has_more = true)
%%
%% Options:
%% - chunk_size: max results per chunk (default 1000)
%% - continuation: resume token from previous call
%% - eventual_consistency: if true, each chunk sees current data (default false)
%%
%% Example:
%% ```
%% %% First chunk
%% {ok, R1, #{has_more := true, continuation := Token}} =
%% barrel_query:execute(Store, Db, Plan, #{chunk_size => 100}),
%%
%% %% Next chunk
%% {ok, R2, #{has_more := false}} =
%% barrel_query:execute(Store, Db, Plan, #{continuation => Token}).
%% '''
-spec execute(barrel_store_rocksdb:db_ref(), db_name(), query_plan(), chunk_opts()) ->
{ok, [map()], result_meta()} | {error, term()}.
execute(StoreRef, DbName, #query_plan{} = Plan, Opts) when is_map(Opts) ->
Result = barrel_trace:with_db_span(query_execute, DbName, fun() ->
case maps:get(continuation, Opts, undefined) of
undefined ->
%% Fresh query - create new snapshot
execute_chunked_fresh(StoreRef, DbName, Plan, Opts);
Token ->
%% Resume from cursor
execute_chunked_resume(StoreRef, DbName, Plan, Token, Opts)
end
end),
maybe_flatten_result(Result, Plan).
%% @private Apply `flat => true' to a query result: replace #{id, doc} wrappers
%% with the flat document Doc#{<<"id">>}. Other shapes pass through.
maybe_flatten_result({ok, Results, Meta}, #query_plan{flat = true, include_docs = true}) ->
{ok, [flatten_one(R) || R <- Results], Meta};
maybe_flatten_result(Result, _Plan) ->
Result.
flatten_one(#{<<"doc">> := Doc, <<"id">> := Id}) when is_map(Doc) ->
Doc#{<<"id">> => Id};
flatten_one(Other) ->
Other.
%%====================================================================
%% Chunked Execution Implementation
%%====================================================================
%% @private Execute fresh chunked query (no continuation token)
execute_chunked_fresh(StoreRef, DbName, Plan, Opts) ->
ChunkSize = maps:get(chunk_size, Opts, 1000),
EventualConsistency = maps:get(eventual_consistency, Opts, false),
%% Create snapshot for consistent reads (unless eventual consistency)
Snapshot = case EventualConsistency of
true -> undefined;
false ->
{ok, S} = barrel_store_rocksdb:snapshot(StoreRef),
S
end,
try
execute_chunked_with_snapshot(StoreRef, DbName, Plan, ChunkSize, undefined, Snapshot)
catch
Class:Reason:Stack ->
%% Release snapshot on error
maybe_release_snapshot(Snapshot),
erlang:raise(Class, Reason, Stack)
end.
%% @private Resume chunked query from cursor
execute_chunked_resume(StoreRef, DbName, Plan, Token, Opts) ->
case barrel_query_cursor:lookup(Token) of
{ok, Cursor} ->
%% Get cursor fields
#{snapshot := Snapshot, last_key := LastKey, query_type := QueryType} = cursor_to_map(Cursor),
ChunkSize = maps:get(chunk_size, Opts, 1000),
%% Validate cursor matches current query
%% Currently validate_cursor_for_plan always returns ok
ok = validate_cursor_for_plan(QueryType, Plan),
execute_chunked_with_snapshot(StoreRef, DbName, Plan, ChunkSize, LastKey, Snapshot);
{error, expired} ->
{error, cursor_expired};
{error, not_found} ->
{error, cursor_not_found}
end.
%% @private Execute chunked query with snapshot
execute_chunked_with_snapshot(StoreRef, DbName, #query_plan{id_scan = IdScan} = Plan,
ChunkSize, StartKey, Snapshot)
when IdScan =/= undefined ->
%% Primary-key scan over the entity keyspace by document id.
execute_id_scan_chunked(StoreRef, DbName, IdScan, Plan, ChunkSize, StartKey, Snapshot);
execute_chunked_with_snapshot(StoreRef, DbName, Plan, ChunkSize, StartKey, Snapshot) ->
#query_plan{conditions = Conditions} = Plan,
%% Classify query type for pure index execution
QueryType = classify_scan_query(Conditions, Plan),
case QueryType of
{pure_equality, Path, Value} ->
execute_pure_equality_chunked(StoreRef, DbName, Path, Value, Plan, ChunkSize, StartKey, Snapshot);
{pure_exists, Path} ->
execute_pure_exists_chunked(StoreRef, DbName, Path, Plan, ChunkSize, StartKey, Snapshot);
{pure_prefix, Path, Prefix} ->
execute_pure_prefix_chunked(StoreRef, DbName, Path, Prefix, Plan, ChunkSize, StartKey, Snapshot);
{pure_compare, Path, Op, Value} ->
execute_pure_compare_chunked(StoreRef, DbName, Path, Op, Value, Plan, ChunkSize, StartKey, Snapshot);
{multi_index, IndexConditions} ->
execute_multi_index_chunked(StoreRef, DbName, IndexConditions, Plan, ChunkSize, StartKey, Snapshot);
needs_body ->
%% Body-fetch queries require loading full documents, which doesn't
%% support efficient chunked iteration. Fall back to direct execution.
%% This may increase memory usage for large result sets.
maybe_release_snapshot(Snapshot),
{ok, Results, LastSeq} = execute_direct(StoreRef, DbName, Plan),
{ok, Results, #{last_seq => LastSeq, has_more => false}}
end.
%% @private Execute chunked pure equality query
execute_pure_equality_chunked(StoreRef, DbName, Path, Value, Plan, ChunkSize, StartKey, Snapshot) ->
#query_plan{offset = Offset, include_docs = IncludeDocs, decoder_fun = DecoderFun} = Plan,
%% Determine start key for iteration
ActualStartKey = case StartKey of
undefined ->
barrel_store_keys:value_index_prefix(DbName, Value, Path);
Key ->
%% Resume after the last key
<<Key/binary, 0>>
end,
EndKey = barrel_store_keys:value_index_end(DbName, Value, Path),
PrefixLen = byte_size(barrel_store_keys:value_index_prefix(DbName, Value, Path)),
%% Collect ChunkSize + 1 to detect if there are more results
%% Track the last key of the ChunkSize-th result separately
MaxCollect = ChunkSize + 1,
FoldFun = fun(Key, _Val, {Count, ChunkLastK, Acc}) ->
<<_:PrefixLen/binary, DocId/binary>> = Key,
NewCount = Count + 1,
if
NewCount > MaxCollect ->
%% Already collected enough, stop
{stop, {Count, ChunkLastK, Acc}};
NewCount =:= ChunkSize ->
%% This is the ChunkSize-th result - save its key for cursor
{ok, {NewCount, Key, [DocId | Acc]}};
NewCount > ChunkSize ->
%% Extra result to detect has_more - don't update ChunkLastK
{ok, {NewCount, ChunkLastK, [DocId | Acc]}};
true ->
{ok, {NewCount, ChunkLastK, [DocId | Acc]}}
end
end,
{CollectedCount, ChunkLastKey, DocIds} =
case Snapshot of
undefined ->
barrel_store_rocksdb:fold_range(
StoreRef, ActualStartKey, EndKey, FoldFun, {0, undefined, []});
_ ->
barrel_store_rocksdb:fold_range_with_snapshot(
StoreRef, ActualStartKey, EndKey, FoldFun, {0, undefined, []}, Snapshot)
end,
%% Check if we have more results
HasMore = CollectedCount > ChunkSize,
%% Take only ChunkSize doc IDs (drop the extra one if present)
FinalDocIds0 = case HasMore of
true -> tl(DocIds); %% Drop the extra one (results are reversed)
false -> DocIds
end,
FinalDocIds = apply_offset_limit(lists:reverse(FinalDocIds0), Offset, ChunkSize),
%% Fetch docs if include_docs is true, otherwise just return IDs
FinalResults = maybe_fetch_docs_chunked(DbName, FinalDocIds, IncludeDocs, DecoderFun),
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
case HasMore of
true ->
%% Create cursor with the key of the last result we're returning
Token = barrel_query_cursor:create(
StoreRef, DbName, pure_equality, ChunkLastKey, Snapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => true, continuation => Token}};
false ->
%% No more results - release snapshot
maybe_release_snapshot(Snapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => false}}
end.
%% @private Execute chunked pure exists query
execute_pure_exists_chunked(StoreRef, DbName, Path, Plan, ChunkSize, StartKey, Snapshot) ->
#query_plan{offset = Offset, include_docs = IncludeDocs, decoder_fun = DecoderFun} = Plan,
%% Determine start key for iteration
ActualStartKey = case StartKey of
undefined ->
barrel_store_keys:path_posting_prefix(DbName, Path);
Key ->
<<Key/binary, 0>>
end,
EndKey = barrel_store_keys:path_posting_end(DbName, Path),
%% Collect ChunkSize + 1 to detect if there are more
MaxCollect = ChunkSize + 1,
%% Use provided snapshot or create a temporary one
ActualSnapshot = case Snapshot of
undefined ->
{ok, S} = barrel_store_rocksdb:snapshot(StoreRef),
S;
S -> S
end,
%% Fold over posting lists
FoldResult = barrel_store_rocksdb:fold_range_posting_with_snapshot(
StoreRef, ActualStartKey, EndKey,
fun(Key, DocIds0, {Seen, Count, LastK, Acc}) ->
process_exists_docids_chunked(DocIds0, Key, Seen, Count, LastK, Acc, MaxCollect)
end,
{#{}, 0, undefined, []},
ActualSnapshot
),
{_, CollectedCount, LastCollectedKey, DocIds} = FoldResult,
HasMore = CollectedCount > ChunkSize,
FinalDocIds0 = case HasMore of
true -> tl(DocIds);
false -> DocIds
end,
FinalDocIds = apply_offset_limit(lists:reverse(FinalDocIds0), Offset, ChunkSize),
%% Fetch docs if include_docs is true
FinalResults = maybe_fetch_docs_chunked(DbName, FinalDocIds, IncludeDocs, DecoderFun),
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
case HasMore of
true ->
Token = barrel_query_cursor:create(
StoreRef, DbName, pure_exists, LastCollectedKey, ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => true, continuation => Token}};
false ->
maybe_release_snapshot(ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => false}}
end.
%% @private Execute chunked pure prefix query
execute_pure_prefix_chunked(StoreRef, DbName, Path, Prefix, Plan, ChunkSize, StartKey, Snapshot) ->
#query_plan{offset = Offset, include_docs = IncludeDocs, decoder_fun = DecoderFun} = Plan,
%% Build prefix range
FullPath = Path ++ [Prefix],
ActualStartKey = case StartKey of
undefined ->
barrel_store_keys:path_posting_prefix(DbName, FullPath);
Key ->
<<Key/binary, 0>>
end,
%% End key for prefix scan. The value is the last path component and is
%% encoded with a 0x00 0x00 terminator, so we must append 0xFF *inside*
%% the value component (before the terminator) to bracket all values that
%% start with Prefix. Using path_posting_end(FullPath) would place 0xFF
%% after the terminator and wrongly exclude longer values like
%% "Hello World" (see fold_prefix/6).
EndKey = barrel_store_keys:path_posting_prefix(
DbName, Path ++ [<<Prefix/binary, 16#FF>>]),
MaxCollect = ChunkSize + 1,
%% Use provided snapshot or create a temporary one
ActualSnapshot = case Snapshot of
undefined ->
{ok, S} = barrel_store_rocksdb:snapshot(StoreRef),
S;
S -> S
end,
FoldResult = barrel_store_rocksdb:fold_range_posting_with_snapshot(
StoreRef, ActualStartKey, EndKey,
fun(Key, DocIds0, {Seen, Count, LastK, Acc}) ->
process_prefix_docids_chunked(DocIds0, Key, Seen, Count, LastK, Acc, MaxCollect)
end,
{#{}, 0, undefined, []},
ActualSnapshot
),
{_, CollectedCount, LastCollectedKey, DocIds} = FoldResult,
HasMore = CollectedCount > ChunkSize,
FinalDocIds0 = case HasMore of
true -> tl(DocIds);
false -> DocIds
end,
FinalDocIds = apply_offset_limit(lists:reverse(FinalDocIds0), Offset, ChunkSize),
%% Fetch docs if include_docs is true
FinalResults = maybe_fetch_docs_chunked(DbName, FinalDocIds, IncludeDocs, DecoderFun),
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
case HasMore of
true ->
Token = barrel_query_cursor:create(
StoreRef, DbName, pure_prefix, LastCollectedKey, ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => true, continuation => Token}};
false ->
maybe_release_snapshot(ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => false}}
end.
%% @private Execute a primary-key (document id) range scan over the entity
%% keyspace. Skips tombstones; supports include_docs/flat/limit/offset and
%% cursor continuation. O(matches in range), ordered by id.
execute_id_scan_chunked(StoreRef, DbName, IdScan, Plan, ChunkSize, StartKey, Snapshot) ->
#query_plan{offset = Offset, include_docs = IncludeDocs, decoder_fun = DecoderFun} = Plan,
EntityPrefix = barrel_store_keys:doc_entity_prefix(DbName),
PrefixLen = byte_size(EntityPrefix),
{RangeStart, RangeEnd} = id_scan_range(DbName, IdScan),
ActualStartKey = case StartKey of
undefined -> RangeStart;
Key -> <<Key/binary, 0>>
end,
ActualSnapshot = case Snapshot of
undefined ->
{ok, S} = barrel_store_rocksdb:snapshot(StoreRef),
S;
S -> S
end,
MaxCollect = ChunkSize + 1,
FoldFun = fun(Key, _Value, {Count, Acc}) ->
case entity_live(StoreRef, Key) of
true ->
DocId = binary:part(Key, PrefixLen, byte_size(Key) - PrefixLen),
NewCount = Count + 1,
NewAcc = [DocId | Acc],
case NewCount >= MaxCollect of
true -> {stop, {NewCount, NewAcc}};
false -> {ok, {NewCount, NewAcc}}
end;
false ->
{ok, {Count, Acc}}
end
end,
{CollectedCount, DocIdsRev} = barrel_store_rocksdb:fold_range_with_snapshot(
StoreRef, ActualStartKey, RangeEnd, FoldFun, {0, []}, ActualSnapshot),
HasMore = CollectedCount > ChunkSize,
AllDocIds = lists:reverse(DocIdsRev),
ReturnedDocIds = lists:sublist(AllDocIds, ChunkSize),
FinalDocIds = apply_offset_limit(ReturnedDocIds, Offset, ChunkSize),
FinalResults = maybe_fetch_docs_chunked(DbName, FinalDocIds, IncludeDocs, DecoderFun),
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
case HasMore andalso ReturnedDocIds =/= [] of
true ->
LastDocId = lists:last(ReturnedDocIds),
LastKey = barrel_store_keys:doc_entity(DbName, LastDocId),
Token = barrel_query_cursor:create(StoreRef, DbName, id_scan, LastKey, ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => true, continuation => Token}};
false ->
maybe_release_snapshot(ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => false}}
end.
%% @private Compute the entity-keyspace [Start, End) range for an id_scan.
id_scan_range(DbName, {prefix, P}) ->
Prefix = barrel_store_keys:doc_entity_prefix(DbName),
Start = <<Prefix/binary, P/binary>>,
End = case bin_increment(P) of
overflow -> barrel_store_keys:doc_entity_end(DbName);
NextP -> <<Prefix/binary, NextP/binary>>
end,
{Start, End};
id_scan_range(DbName, {range, StartId, EndId}) ->
Prefix = barrel_store_keys:doc_entity_prefix(DbName),
Start = case StartId of
undefined -> Prefix;
_ -> <<Prefix/binary, StartId/binary>>
end,
End = case EndId of
undefined -> barrel_store_keys:doc_entity_end(DbName);
_ -> <<Prefix/binary, EndId/binary>>
end,
{Start, End}.
%% @private Smallest binary greater than all binaries having Bin as a prefix.
%% Returns `overflow' when Bin is empty or all 0xFF (scan to keyspace end).
bin_increment(Bin) ->
case bin_increment_rev(lists:reverse(binary_to_list(Bin))) of
overflow -> overflow;
Rev -> list_to_binary(lists:reverse(Rev))
end.
bin_increment_rev([]) -> overflow;
bin_increment_rev([16#FF | Rest]) -> bin_increment_rev(Rest);
bin_increment_rev([B | Rest]) -> [B + 1 | Rest].
%% @private A document entity is live (not a tombstone) when its `del' column
%% is not <<"true">>.
entity_live(StoreRef, EntityKey) ->
case barrel_store_rocksdb:get_entity(StoreRef, EntityKey) of
{ok, Columns} ->
proplists:get_value(<<"del">>, Columns, <<"false">>) =/= <<"true">>;
_ ->
false
end.
%% @private Process DocIds for chunked exists query
process_exists_docids_chunked([], _Key, Seen, Count, LastKey, Acc, _MaxCollect) ->
{ok, {Seen, Count, LastKey, Acc}};
process_exists_docids_chunked([DocId | Rest], Key, Seen, Count, _LastKey, Acc, MaxCollect) ->
case maps:is_key(DocId, Seen) of
true ->
process_exists_docids_chunked(Rest, Key, Seen, Count, Key, Acc, MaxCollect);
false ->
NewSeen = Seen#{DocId => true},
NewCount = Count + 1,
NewAcc = [DocId | Acc],
case NewCount >= MaxCollect of
true ->
{stop, {NewSeen, NewCount, Key, NewAcc}};
false ->
process_exists_docids_chunked(Rest, Key, NewSeen, NewCount, Key, NewAcc, MaxCollect)
end
end.
%% @private Process DocIds for chunked prefix query
process_prefix_docids_chunked([], _Key, Seen, Count, LastKey, Acc, _MaxCollect) ->
{ok, {Seen, Count, LastKey, Acc}};
process_prefix_docids_chunked([DocId | Rest], Key, Seen, Count, _LastKey, Acc, MaxCollect) ->
case maps:is_key(DocId, Seen) of
true ->
process_prefix_docids_chunked(Rest, Key, Seen, Count, Key, Acc, MaxCollect);
false ->
NewSeen = Seen#{DocId => true},
NewCount = Count + 1,
NewAcc = [DocId | Acc],
case NewCount >= MaxCollect of
true ->
{stop, {NewSeen, NewCount, Key, NewAcc}};
false ->
process_prefix_docids_chunked(Rest, Key, NewSeen, NewCount, Key, NewAcc, MaxCollect)
end
end.
%% @private Execute chunked pure compare query (range scan)
execute_pure_compare_chunked(StoreRef, DbName, Path, Op, Value, Plan, ChunkSize, StartKey, Snapshot) ->
#query_plan{offset = Offset, include_docs = IncludeDocs, decoder_fun = DecoderFun} = Plan,
%% Compute start/end keys for the range based on operator
{RangeStart, RangeEnd} = compare_range_keys(DbName, Path, Op, Value),
%% Use provided StartKey for pagination, otherwise use range start
ActualStartKey = case StartKey of
undefined -> RangeStart;
Key -> <<Key/binary, 0>>
end,
MaxCollect = ChunkSize + 1,
%% Get snapshot if not already present
ActualSnapshot = case Snapshot of
undefined ->
{ok, S} = barrel_store_rocksdb:snapshot(StoreRef),
S;
S -> S
end,
FoldResult = barrel_store_rocksdb:fold_range_posting_with_snapshot(
StoreRef, ActualStartKey, RangeEnd,
fun(Key, DocIds0, {Seen, Count, LastK, Acc}) ->
process_compare_docids_chunked(DocIds0, Key, Seen, Count, LastK, Acc, MaxCollect)
end,
{#{}, 0, undefined, []},
ActualSnapshot
),
{_, CollectedCount, LastCollectedKey, DocIds} = FoldResult,
HasMore = CollectedCount > ChunkSize,
FinalDocIds0 = case HasMore of
true -> tl(DocIds);
false -> DocIds
end,
FinalDocIds = apply_offset_limit(lists:reverse(FinalDocIds0), Offset, ChunkSize),
%% Fetch docs if include_docs is true
FinalResults = maybe_fetch_docs_chunked(DbName, FinalDocIds, IncludeDocs, DecoderFun),
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
case HasMore of
true ->
Token = barrel_query_cursor:create(
StoreRef, DbName, pure_compare, LastCollectedKey, ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => true, continuation => Token}};
false ->
maybe_release_snapshot(ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => false}}
end.
%% @private Compute range keys for compare operators
compare_range_keys(DbName, Path, '>', Value) ->
%% > Value: start AFTER value, end at path end
StartKey = barrel_store_keys:path_posting_end(DbName, Path ++ [Value]),
EndKey = barrel_store_keys:path_posting_end(DbName, Path),
{StartKey, EndKey};
compare_range_keys(DbName, Path, '>=', Value) ->
%% >= Value: start AT value, end at path end
StartKey = barrel_store_keys:path_posting_prefix(DbName, Path ++ [Value]),
EndKey = barrel_store_keys:path_posting_end(DbName, Path),
{StartKey, EndKey};
compare_range_keys(DbName, Path, '<', Value) ->
%% < Value: start at path start, end BEFORE value
StartKey = barrel_store_keys:path_posting_prefix(DbName, Path),
EndKey = barrel_store_keys:path_posting_prefix(DbName, Path ++ [Value]),
{StartKey, EndKey};
compare_range_keys(DbName, Path, '=<', Value) ->
%% =< Value: start at path start, end AFTER value
StartKey = barrel_store_keys:path_posting_prefix(DbName, Path),
EndKey = barrel_store_keys:path_posting_end(DbName, Path ++ [Value]),
{StartKey, EndKey}.
%% @private Process DocIds for chunked compare query
process_compare_docids_chunked([], _Key, Seen, Count, LastKey, Acc, _MaxCollect) ->
{ok, {Seen, Count, LastKey, Acc}};
process_compare_docids_chunked([DocId | Rest], Key, Seen, Count, _LastKey, Acc, MaxCollect) ->
case maps:is_key(DocId, Seen) of
true ->
process_compare_docids_chunked(Rest, Key, Seen, Count, Key, Acc, MaxCollect);
false ->
NewSeen = Seen#{DocId => true},
NewCount = Count + 1,
NewAcc = [DocId | Acc],
case NewCount >= MaxCollect of
true ->
{stop, {NewSeen, NewCount, Key, NewAcc}};
false ->
process_compare_docids_chunked(Rest, Key, NewSeen, NewCount, Key, NewAcc, MaxCollect)
end
end.
%% @private Execute chunked multi-index query (intersection of posting lists)
execute_multi_index_chunked(StoreRef, DbName, Conditions, Plan, ChunkSize, StartKey, Snapshot) ->
#query_plan{offset = Offset, include_docs = IncludeDocs, decoder_fun = DecoderFun} = Plan,
%% Get snapshot if not already present
ActualSnapshot = case Snapshot of
undefined ->
{ok, S} = barrel_store_rocksdb:snapshot(StoreRef),
S;
S -> S
end,
%% Collect DocIds - use lazy intersection when starting from scratch
%% and have multiple conditions (lazy is O(result_count) vs O(total_matching))
MaxToCollect = ChunkSize + Offset + 1, %% ChunkSize + 1 for has_more detection + offset
IntersectedDocIds = case {StartKey, length(Conditions) > 1} of
{undefined, true} ->
%% Fresh query with multiple conditions - use lazy intersection
%% This is much faster when result set is larger than chunk size
intersect_lazy_with_limit(StoreRef, DbName, Conditions, MaxToCollect);
_ ->
%% Continuation - need full sorted list for proper cursor position
%% Or single condition - no benefit from lazy
intersect_docid_sets_v2(StoreRef, DbName, Conditions)
end,
%% Apply pagination using StartKey
PaginatedDocIds = case StartKey of
undefined -> IntersectedDocIds;
Key ->
%% Skip until we pass the start key
lists:dropwhile(fun(DocId) -> DocId =< Key end, IntersectedDocIds)
end,
%% Collect ChunkSize + 1 for has_more detection
MaxCollect = ChunkSize + 1,
{CollectedDocIds, HasMore} = case length(PaginatedDocIds) > MaxCollect of
true ->
{lists:sublist(PaginatedDocIds, MaxCollect), true};
false ->
{PaginatedDocIds, false}
end,
%% Build results
FinalDocIds0 = case HasMore of
true -> lists:droplast(CollectedDocIds);
false -> CollectedDocIds
end,
FinalDocIds = apply_offset_limit(FinalDocIds0, Offset, ChunkSize),
%% Fetch docs if include_docs is true
FinalResults = maybe_fetch_docs_chunked(DbName, FinalDocIds, IncludeDocs, DecoderFun),
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
case HasMore of
true ->
LastDocId = lists:last(CollectedDocIds),
Token = barrel_query_cursor:create(
StoreRef, DbName, multi_index, LastDocId, ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => true, continuation => Token}};
false ->
maybe_release_snapshot(ActualSnapshot),
{ok, FinalResults, #{last_seq => LastSeq, has_more => false}}
end.
%% @private Collect DocIds for a single condition using index
collect_condition_docids(StoreRef, DbName, {path, Path, Value}) ->
FullPath = Path ++ [Value],
barrel_ars_index:get_posting_list(StoreRef, DbName, FullPath);
collect_condition_docids(StoreRef, DbName, {exists, Path}) ->
%% Collect all DocIds that have this path
barrel_ars_index:fold_path(StoreRef, DbName, Path,
fun({_P, DocId}, Acc) -> {ok, [DocId | Acc]} end, [], short_range);
collect_condition_docids(StoreRef, DbName, {prefix, Path, Prefix}) ->
barrel_ars_index:fold_prefix(StoreRef, DbName, Path, Prefix,
fun({_P, DocId}, Acc) -> {ok, [DocId | Acc]} end, [], short_range);
collect_condition_docids(StoreRef, DbName, {compare, Path, Op, Value}) ->
barrel_ars_index:fold_path_values_compare(StoreRef, DbName, Path, Op, Value,
fun({_P, DocId}, Acc) -> {ok, [DocId | Acc]} end, []).
%% @private Intersect multiple DocId sets using optimized intersection
%% First collects from most selective condition, then filters against others
intersect_docid_sets(StoreRef, DbName, Conditions) ->
%% Find the most selective condition (smallest posting list)
{SmallestCond, _SmallestPath} = find_most_selective_condition(StoreRef, DbName, Conditions),
%% Get docids from the smallest posting list
DocIds = collect_condition_docids(StoreRef, DbName, SmallestCond),
%% Verify remaining conditions using posting lists
OtherConditions = Conditions -- [SmallestCond],
verify_conditions(StoreRef, DbName, DocIds, OtherConditions).
%% @private Find the most selective condition based on cardinality
find_most_selective_condition(StoreRef, DbName, Conditions) ->
ConditionsWithCard = lists:map(fun(Cond) ->
{Card, Path} = case Cond of
{path, P, V} ->
FullPath = P ++ [V],
case barrel_ars_index:get_path_cardinality(StoreRef, DbName, FullPath) of
{ok, C} -> {C, FullPath};
_ -> {999999999, FullPath}
end;
{exists, P} ->
case barrel_ars_index:get_path_cardinality(StoreRef, DbName, P) of
{ok, C} -> {C, P};
_ -> {999999999, P}
end;
{compare, P, Op, Value} ->
%% Estimate compare cardinality by sampling the index
%% For '>' or '<', estimate roughly half the values match
%% This is a heuristic - actual count would require full scan
case barrel_ars_index:get_path_cardinality(StoreRef, DbName, P) of
{ok, TotalCard} ->
%% Assume 50% selectivity for range queries
EstCard = case Op of
'>' -> TotalCard div 2;
'<' -> TotalCard div 2;
'>=' -> (TotalCard div 2) + 1;
'=<' -> (TotalCard div 2) + 1
end,
{EstCard, P ++ [Value]};
_ ->
{999999998, P}
end;
{prefix, P, Prefix} ->
%% Estimate prefix matches ~10% of path values
case barrel_ars_index:get_path_cardinality(StoreRef, DbName, P) of
{ok, TotalCard} -> {TotalCard div 10, P ++ [Prefix]};
_ -> {999999997, P}
end;
_ ->
{999999999, []}
end,
{Card, Path, Cond}
end, Conditions),
[{_, BestPath, BestCond} | _] = lists:sort(ConditionsWithCard),
{BestCond, BestPath}.
%% @private Verify that docids match remaining conditions using posting lists
%% Uses point lookups for equality conditions (O(1) per docid) and set membership
%% for other condition types (O(m) to collect, then O(1) lookups).
verify_conditions(_StoreRef, _DbName, DocIds, []) ->
lists:usort(DocIds);
verify_conditions(StoreRef, DbName, DocIds, [{path, Path, Value} | Rest]) ->
%% Use point lookup for equality - O(1) per docid via value-first index
FilteredDocIds = lists:filter(fun(DocId) ->
barrel_ars_index:docid_has_value(StoreRef, DbName, Path, Value, DocId)
end, DocIds),
verify_conditions(StoreRef, DbName, FilteredDocIds, Rest);
verify_conditions(StoreRef, DbName, DocIds, [Cond | Rest]) ->
%% For other conditions (compare, exists, prefix), use set-based approach
CondDocIds = sets:from_list(collect_condition_docids(StoreRef, DbName, Cond)),
FilteredDocIds = lists:filter(fun(DocId) ->
sets:is_element(DocId, CondDocIds)
end, DocIds),
verify_conditions(StoreRef, DbName, FilteredDocIds, Rest).
%%====================================================================
%% Native Postings V2 Query Execution (using rocksdb 2.5.0 postings API)
%%====================================================================
%% @private Intersect DocId sets using native posting list intersection.
%% For equality conditions, uses native postings_intersect_all for O(min(n,m)) performance.
%% For non-equality conditions, uses cardinality-based selection for optimal iteration order.
intersect_docid_sets_v2(StoreRef, DbName, Conditions) ->
%% Separate equality conditions (can use native intersection) from others
{EqConds, OtherConds} = lists:partition(fun
({path, _, _}) -> true;
(_) -> false
end, Conditions),
case {EqConds, OtherConds} of
{[], _} ->
%% No equality conditions - fall back to original approach
intersect_docid_sets(StoreRef, DbName, Conditions);
{_, []} ->
%% Only equality conditions - use native posting intersection
intersect_equality_conditions(StoreRef, DbName, EqConds);
{_, _} ->
%% Mixed conditions - choose best starting condition based on cardinality
intersect_mixed_conditions_v2(StoreRef, DbName, EqConds, OtherConds)
end.
%% @private Intersect multiple equality conditions using native posting list API.
intersect_equality_conditions(StoreRef, DbName, EqConds) ->
PostingBins = lists:filtermap(fun({path, Path, Value}) ->
FullPath = Path ++ [Value],
case barrel_ars_index:get_posting_list_binary(StoreRef, DbName, FullPath) of
{ok, Bin} -> {true, Bin};
not_found -> false;
{error, _} -> false
end
end, EqConds),
case PostingBins of
[] ->
[];
[Single] ->
{ok, Postings} = barrel_postings:open(Single),
barrel_postings:keys(Postings);
_ ->
case barrel_postings:intersect_all(PostingBins) of
{ok, ResultPostings} ->
barrel_postings:keys(ResultPostings);
{error, _} ->
[]
end
end.
%% @private Intersect mixed equality + non-equality conditions.
%% Uses cardinality estimation to choose the most selective starting condition.
intersect_mixed_conditions_v2(StoreRef, DbName, EqConds, OtherConds) ->
%% Estimate cardinality for all conditions
AllConds = EqConds ++ OtherConds,
CondsWithCard = estimate_all_cardinalities(StoreRef, DbName, AllConds),
%% Sort by estimated cardinality (smallest first)
Sorted = lists:sort(CondsWithCard),
%% Debug: io:format("Sorted conditions: ~p~n", [Sorted]),
[{_BestCard, BestCond} | RestWithCard] = Sorted,
RestConds = [C || {_, C} <- RestWithCard],
%% Collect DocIds from the most selective condition
DocIds = collect_condition_docids(StoreRef, DbName, BestCond),
%% Verify remaining conditions
verify_conditions_v2(StoreRef, DbName, DocIds, RestConds).
%% @private Estimate cardinality for all conditions.
estimate_all_cardinalities(StoreRef, DbName, Conditions) ->
lists:map(fun(Cond) ->
Card = estimate_condition_cardinality(StoreRef, DbName, Cond),
{Card, Cond}
end, Conditions).
%% @private Estimate cardinality for a single condition.
estimate_condition_cardinality(StoreRef, DbName, {path, Path, Value}) ->
FullPath = Path ++ [Value],
case barrel_ars_index:get_path_cardinality(StoreRef, DbName, FullPath) of
{ok, C} -> C;
_ -> 999999999
end;
estimate_condition_cardinality(StoreRef, DbName, {compare, Path, Op, _Value}) ->
%% Estimate compare cardinality - assume 50% selectivity for range queries
case barrel_ars_index:get_path_cardinality(StoreRef, DbName, Path) of
{ok, TotalCard} ->
case Op of
'>' -> TotalCard div 2;
'<' -> TotalCard div 2;
'>=' -> (TotalCard div 2) + 1;
'=<' -> (TotalCard div 2) + 1;
_ -> TotalCard
end;
_ -> 999999998
end;
estimate_condition_cardinality(StoreRef, DbName, {exists, Path}) ->
case barrel_ars_index:get_path_cardinality(StoreRef, DbName, Path) of
{ok, C} -> C;
_ -> 999999999
end;
estimate_condition_cardinality(StoreRef, DbName, {prefix, Path, _Prefix}) ->
%% Estimate prefix matches ~10% of path values
case barrel_ars_index:get_path_cardinality(StoreRef, DbName, Path) of
{ok, TotalCard} -> TotalCard div 10;
_ -> 999999997
end;
estimate_condition_cardinality(_, _, _) ->
999999999.
%% @private Verify conditions using batch lookups for efficiency.
%% Uses multi_key_exists for batch verification instead of individual lookups.
verify_conditions_v2(_StoreRef, _DbName, DocIds, []) ->
lists:usort(DocIds);
verify_conditions_v2(StoreRef, DbName, DocIds, [{path, Path, Value} | Rest]) ->
%% Use batch key existence check - much faster than individual lookups
%% First deduplicate to avoid redundant checks
UniqueDocIds = lists:usort(DocIds),
FilteredDocIds = barrel_ars_index:filter_docids_by_value(
StoreRef, DbName, Path, Value, UniqueDocIds),
verify_conditions_v2(StoreRef, DbName, FilteredDocIds, Rest);
verify_conditions_v2(StoreRef, DbName, DocIds, [Cond | Rest]) ->
%% For non-equality conditions (compare, exists, prefix), use set-based approach
%% These conditions need to iterate over ranges, so we can't use single posting list lookup
CondDocIds = sets:from_list(collect_condition_docids(StoreRef, DbName, Cond)),
FilteredDocIds = lists:filter(fun(DocId) ->
sets:is_element(DocId, CondDocIds)
end, DocIds),
verify_conditions_v2(StoreRef, DbName, FilteredDocIds, Rest).
%% @private Release snapshot if present
maybe_release_snapshot(Snapshot) ->
barrel_store_rocksdb:safe_release_snapshot(Snapshot).
%% @private Convert cursor record to map for pattern matching
cursor_to_map(Cursor) ->
%% Cursor is a record, extract fields
#{
snapshot => element(5, Cursor), %% #cursor.snapshot
last_key => element(6, Cursor), %% #cursor.last_key
query_type => element(7, Cursor) %% #cursor.query_type
}.
%% @private Validate cursor matches current query plan
validate_cursor_for_plan(_QueryType, _Plan) ->
%% For now, just accept - could add stricter validation
ok.
%% @doc Execute query with a snapshot for read consistency
execute_with_snapshot(StoreRef, DbName, Plan, Snapshot) ->
#query_plan{order = Order, limit = Limit, conditions = Conditions} = Plan,
%% Check if we can use indexed order for ORDER BY + LIMIT optimization
%% Most beneficial when: no filter conditions, small limit, large dataset
case can_use_indexed_order(Order, Limit) of
{true, OrderPath, Dir} ->
case should_use_indexed_order(OrderPath, Conditions) of
true ->
execute_with_indexed_order(StoreRef, DbName, OrderPath, Dir, Plan, Snapshot);
false ->
execute_by_strategy(StoreRef, DbName, Plan, Snapshot)
end;
false ->
execute_by_strategy(StoreRef, DbName, Plan, Snapshot)
end.
%% @doc Execute based on query strategy
execute_by_strategy(StoreRef, DbName, Plan, Snapshot) ->
case Plan#query_plan.strategy of
index_seek ->
execute_index_seek(StoreRef, DbName, Plan, Snapshot);
index_scan ->
execute_index_scan(StoreRef, DbName, Plan, Snapshot);
multi_index ->
execute_multi_index(StoreRef, DbName, Plan, Snapshot);
full_scan ->
execute_full_scan(StoreRef, DbName, Plan, Snapshot)
end.
%% @doc Decide if indexed order should be used
%% The Top-K optimization is most beneficial when:
%% - No filter conditions (pure ORDER BY + LIMIT)
%% - Conditions can be checked via index (lazy evaluation)
%% This changes complexity from O(matching_docs) to O(limit)
should_use_indexed_order(_OrderPath, []) ->
%% No conditions - pure ORDER BY + LIMIT, definitely use indexed order
%% This is the main use case: "get latest N" without filtering
true;
should_use_indexed_order(_OrderPath, Conditions) ->
%% With filter conditions, use indexed order if all conditions
%% can be checked via index lookups (lazy evaluation)
can_lazy_check_conditions(Conditions).
%% @doc Check if all conditions can be lazily evaluated via index.
%% Returns true if we can avoid body fetch for condition checking.
can_lazy_check_conditions([]) ->
true;
can_lazy_check_conditions([Condition | Rest]) ->
case can_lazy_check_condition(Condition) of
true -> can_lazy_check_conditions(Rest);
false -> false
end.
%% @private Check if a single condition can be lazily evaluated via O(1) index lookup.
%% Only allow conditions that use docid_has_value (point lookup), not doc_paths load.
can_lazy_check_condition({path, _Path, Value}) when not is_atom(Value) ->
%% Equality check - uses docid_has_value (O(1) point lookup)
%% Logic variables need body fetch for binding
true;
can_lazy_check_condition({path, _Path, Value}) ->
%% Logic variables like '?Var' need body fetch
not is_logic_var(Value);
can_lazy_check_condition({compare, _Path, _Op, _Value}) ->
%% Range check - docid_satisfies_compare loads doc_paths which is expensive
%% Disable for now until we have a more efficient range index lookup
false;
can_lazy_check_condition({exists, _Path}) ->
%% Existence check - requires doc_paths load, expensive
false;
can_lazy_check_condition({prefix, _Path, _Prefix}) ->
%% Prefix check - requires doc_paths load, expensive
false;
can_lazy_check_condition({'and', Conditions}) ->
can_lazy_check_conditions(Conditions);
can_lazy_check_condition({'or', Conditions}) ->
can_lazy_check_conditions(Conditions);
can_lazy_check_condition(_) ->
%% Other conditions (in, contains, regex, not, etc.) need body fetch
false.
%% @doc Check if a document matches a compiled query plan.
%% This is useful for filtering documents in-memory without index access.
-spec match(query_plan(), map()) -> boolean().
match(#query_plan{conditions = Conditions, bindings = Bindings}, Doc)
when is_map(Doc) ->
case matches_conditions(Doc, Conditions, Bindings) of
{true, _BoundVars} -> true;
false -> false
end.
%% @doc Check if a document matches a list of conditions.
%% Simpler API for filtering without needing a compiled query plan.
-spec matches(map(), list()) -> boolean().
matches(Doc, Conditions) when is_map(Doc), is_list(Conditions) ->
case matches_conditions(Doc, Conditions, #{}) of
{true, _BoundVars} -> true;
false -> false
end.
%% @doc Explain a query plan (for debugging/optimization).
%% Returns a map describing the execution strategy.
-spec explain(query_plan()) -> map().
explain(#query_plan{} = Plan) ->
#{
strategy => Plan#query_plan.strategy,
conditions => Plan#query_plan.conditions,
bindings => Plan#query_plan.bindings,
projections => Plan#query_plan.projections,
order => Plan#query_plan.order,
limit => Plan#query_plan.limit,
offset => Plan#query_plan.offset,
include_docs => Plan#query_plan.include_docs
}.
%% @doc Extract all paths referenced in a query plan.
%% Used for subscription optimization - only evaluate query when
%% a change affects one of these paths.
%% Returns MQTT-style path patterns for use with barrel_sub.
-spec extract_paths(query_plan()) -> [binary()].
extract_paths(#query_plan{conditions = Conditions}) ->
Paths = extract_paths_from_conditions(Conditions, []),
UniquePathPatterns = lists:usort([path_to_pattern(P) || P <- Paths]),
UniquePathPatterns.
%% @private Extract paths from conditions recursively
extract_paths_from_conditions([], Acc) ->
Acc;
extract_paths_from_conditions([{path, Path, _} | Rest], Acc) ->
extract_paths_from_conditions(Rest, [Path | Acc]);
extract_paths_from_conditions([{compare, Path, _, _} | Rest], Acc) ->
extract_paths_from_conditions(Rest, [Path | Acc]);
extract_paths_from_conditions([{'and', Nested} | Rest], Acc) ->
NestedPaths = extract_paths_from_conditions(Nested, []),
extract_paths_from_conditions(Rest, NestedPaths ++ Acc);
extract_paths_from_conditions([{'or', Nested} | Rest], Acc) ->
NestedPaths = extract_paths_from_conditions(Nested, []),
extract_paths_from_conditions(Rest, NestedPaths ++ Acc);
extract_paths_from_conditions([{'not', Condition} | Rest], Acc) ->
NestedPaths = extract_paths_from_conditions([Condition], []),
extract_paths_from_conditions(Rest, NestedPaths ++ Acc);
extract_paths_from_conditions([{in, Path, _} | Rest], Acc) ->
extract_paths_from_conditions(Rest, [Path | Acc]);
extract_paths_from_conditions([{contains, Path, _} | Rest], Acc) ->
extract_paths_from_conditions(Rest, [Path | Acc]);
extract_paths_from_conditions([{exists, Path} | Rest], Acc) ->
extract_paths_from_conditions(Rest, [Path | Acc]);
extract_paths_from_conditions([{missing, Path} | Rest], Acc) ->
extract_paths_from_conditions(Rest, [Path | Acc]);
extract_paths_from_conditions([{regex, Path, _} | Rest], Acc) ->
extract_paths_from_conditions(Rest, [Path | Acc]);
extract_paths_from_conditions([{prefix, Path, _} | Rest], Acc) ->
extract_paths_from_conditions(Rest, [Path | Acc]);
extract_paths_from_conditions([_ | Rest], Acc) ->
extract_paths_from_conditions(Rest, Acc).
%% @private Convert a path list to an MQTT-style pattern with # wildcard.
%% This allows matching any value at the path.
%% Example: `[<<"type">>]' to `<<"type/#">>'
path_to_pattern([]) ->
<<"#">>;
path_to_pattern(Path) ->
Parts = [to_bin(P) || P <- Path],
BasePath = iolist_to_binary(lists:join(<<"/">>, Parts)),
<<BasePath/binary, "/#">>.
%%====================================================================
%% Internal - Compilation
%%====================================================================
do_compile(Spec) ->
case compile_id_scan(Spec) of
{error, _} = Err -> Err;
IdScan -> do_compile(Spec, IdScan)
end.
%% @private Parse id_prefix / id_range spec options into an id_scan plan field.
compile_id_scan(Spec) ->
case {maps:get(id_prefix, Spec, undefined), maps:get(id_range, Spec, undefined)} of
{undefined, undefined} ->
undefined;
{P, undefined} when is_binary(P) ->
{prefix, P};
{undefined, {S, E}} when (is_binary(S) orelse S =:= undefined),
(is_binary(E) orelse E =:= undefined) ->
{range, S, E};
_ ->
{error, {invalid_id_scan, maps:with([id_prefix, id_range], Spec)}}
end.
do_compile(Spec, IdScan) ->
Where = maps:get(where, Spec, []),
%% id_scan is a standalone primary-key scan; it does not combine with where.
case {IdScan, Where} of
{Scan, [_ | _]} when Scan =/= undefined ->
{error, {id_scan_with_where, Where}};
_ ->
do_compile(Spec, IdScan, Where)
end.
do_compile(Spec, IdScan, Where) ->
Flat = maps:get(flat, Spec, false),
Select = maps:get(select, Spec, ['*']),
OrderBy = maps:get(order_by, Spec, undefined),
Limit = maps:get(limit, Spec, undefined),
Offset = maps:get(offset, Spec, 0),
IncludeDocs = maps:get(include_docs, Spec, false),
%% Default doc_format is 'map' for backwards compatibility
DocFormat = maps:get(doc_format, Spec, map),
%% Convert doc_format to decoder_fun at compile time for simpler execution
DecoderFun = case maps:get(decoder_fun, Spec, undefined) of
undefined -> make_decoder_fun(DocFormat);
CustomFun -> CustomFun
end,
%% Normalize conditions
NormalizedConditions = [normalize_condition(C) || C <- Where],
%% Extract variable bindings from conditions
Bindings = extract_bindings(NormalizedConditions),
%% Normalize projections
Projections = normalize_projections(Select),
%% Normalize order specification
Order = normalize_order(OrderBy),
%% Determine execution strategy
Strategy = determine_strategy(NormalizedConditions),
Plan = #query_plan{
conditions = NormalizedConditions,
bindings = Bindings,
projections = Projections,
order = Order,
limit = Limit,
offset = Offset,
include_docs = IncludeDocs,
doc_format = DocFormat,
decoder_fun = DecoderFun,
strategy = Strategy,
flat = Flat,
id_scan = IdScan
},
{ok, Plan}.
%% @doc Create a decoder function from doc_format option
%% Uses decode_any/1 which handles both indexed CBOR and plain CBOR
-spec make_decoder_fun(binary | map | json) -> fun((binary()) -> term()).
make_decoder_fun(binary) -> fun(Bin) -> Bin end;
make_decoder_fun(map) -> fun barrel_docdb_codec_cbor:decode_any/1;
make_decoder_fun(json) -> fun(Bin) ->
Map = barrel_docdb_codec_cbor:decode_any(Bin),
json:encode(Map)
end.
%% @doc Normalize a condition to canonical form
normalize_condition({path, Path, Value}) when is_list(Path) ->
{path, Path, Value};
normalize_condition({compare, Path, Op, Value}) when is_list(Path) ->
case lists:member(Op, ['>', '<', '>=', '=<', '=/=', '==']) of
true -> {compare, Path, Op, Value};
false -> {error, {invalid_operator, Op}}
end;
normalize_condition({'and', Conditions}) when is_list(Conditions) ->
{'and', [normalize_condition(C) || C <- Conditions]};
normalize_condition({'or', Conditions}) when is_list(Conditions) ->
{'or', [normalize_condition(C) || C <- Conditions]};
normalize_condition({'not', Condition}) ->
{'not', normalize_condition(Condition)};
normalize_condition({in, Path, Values}) when is_list(Path), is_list(Values) ->
{in, Path, Values};
normalize_condition({contains, Path, Value}) when is_list(Path) ->
{contains, Path, Value};
normalize_condition({exists, Path}) when is_list(Path) ->
{exists, Path};
normalize_condition({missing, Path}) when is_list(Path) ->
{missing, Path};
normalize_condition({regex, Path, Pattern}) when is_list(Path), is_binary(Pattern) ->
{regex, Path, Pattern};
normalize_condition({prefix, Path, Prefix}) when is_list(Path), is_binary(Prefix) ->
{prefix, Path, Prefix};
normalize_condition(Other) ->
{error, {invalid_condition, Other}}.
%% @doc Extract variable bindings from conditions
extract_bindings(Conditions) ->
extract_bindings(Conditions, #{}).
extract_bindings([], Acc) ->
Acc;
extract_bindings([{path, Path, Value} | Rest], Acc) ->
case is_logic_var(Value) of
true ->
extract_bindings(Rest, Acc#{Value => Path});
false ->
extract_bindings(Rest, Acc)
end;
extract_bindings([{compare, Path, _Op, Value} | Rest], Acc) ->
case is_logic_var(Value) of
true ->
extract_bindings(Rest, Acc#{Value => Path});
false ->
extract_bindings(Rest, Acc)
end;
extract_bindings([{'and', Nested} | Rest], Acc) ->
NestedBindings = extract_bindings(Nested, Acc),
extract_bindings(Rest, NestedBindings);
extract_bindings([{'or', Branches} | Rest], Acc) ->
%% For OR, we can only safely use bindings that appear in ALL branches
%% Extract bindings from each branch and intersect them
case Branches of
[] ->
extract_bindings(Rest, Acc);
[First | RestBranches] ->
FirstBindings = extract_bindings([First], #{}),
CommonBindings = lists:foldl(
fun(Branch, CommonAcc) ->
BranchBindings = extract_bindings([Branch], #{}),
maps:filter(
fun(Var, Path) ->
maps:get(Var, BranchBindings, undefined) =:= Path
end,
CommonAcc
)
end,
FirstBindings,
RestBranches
),
extract_bindings(Rest, maps:merge(Acc, CommonBindings))
end;
extract_bindings([_ | Rest], Acc) ->
extract_bindings(Rest, Acc).
%% @doc Check if a value is a logic variable (atom starting with '?')
-spec is_logic_var(term()) -> boolean().
is_logic_var(Atom) when is_atom(Atom) ->
case atom_to_list(Atom) of
[$? | _] -> true;
_ -> false
end;
is_logic_var(_) ->
false.
%% @doc Normalize projections
normalize_projections(Select) when is_list(Select) ->
Select;
normalize_projections(Single) ->
[Single].
%% @doc Normalize order specification
normalize_order(undefined) ->
[];
normalize_order(Spec) when is_atom(Spec); is_list(Spec), is_binary(hd(Spec)) ->
[{Spec, asc}];
normalize_order({Spec, Dir}) when Dir =:= asc; Dir =:= desc ->
[{Spec, Dir}];
normalize_order(Specs) when is_list(Specs) ->
[normalize_order_item(S) || S <- Specs].
normalize_order_item({Spec, Dir}) when Dir =:= asc; Dir =:= desc ->
{Spec, Dir};
normalize_order_item(Spec) ->
{Spec, asc}.
%% @doc Determine the best execution strategy for the query
determine_strategy(Conditions) ->
%% Analyze conditions to find indexed access paths
case find_index_conditions(Conditions) of
[] ->
%% No index-friendly conditions - full scan
full_scan;
[_Single] ->
%% Single index condition
case has_range_condition(Conditions) of
true -> index_scan;
false -> index_seek
end;
_Multiple ->
%% Multiple index conditions - intersection
multi_index
end.
%% @doc Find conditions that can use the path index
find_index_conditions(Conditions) ->
find_index_conditions(Conditions, []).
find_index_conditions([], Acc) ->
lists:reverse(Acc);
find_index_conditions([{path, Path, Value} | Rest], Acc) ->
case is_logic_var(Value) of
true ->
%% Variable binding - can't use for initial index lookup
find_index_conditions(Rest, Acc);
false ->
%% Concrete value - good for index
find_index_conditions(Rest, [{path, Path, Value} | Acc])
end;
find_index_conditions([{compare, Path, _Op, _Value} | Rest], Acc) ->
%% Range comparison - can use index scan
find_index_conditions(Rest, [{compare, Path} | Acc]);
find_index_conditions([{prefix, Path, _Prefix} | Rest], Acc) ->
%% Prefix match - can use index scan
find_index_conditions(Rest, [{prefix, Path} | Acc]);
find_index_conditions([{exists, Path} | Rest], Acc) ->
%% Exists check - can use index scan on path prefix
find_index_conditions(Rest, [{exists, Path} | Acc]);
find_index_conditions([{'and', Nested} | Rest], Acc) ->
NestedIndexable = find_index_conditions(Nested),
find_index_conditions(Rest, NestedIndexable ++ Acc);
find_index_conditions([_ | Rest], Acc) ->
find_index_conditions(Rest, Acc).
%% @doc Check if conditions include range comparisons
has_range_condition([]) -> false;
has_range_condition([{compare, _, Op, _} | _]) when Op =/= '==' -> true;
has_range_condition([{prefix, _, _} | _]) -> true;
has_range_condition([{exists, _} | _]) -> true;
has_range_condition([{'and', Nested} | Rest]) ->
has_range_condition(Nested) orelse has_range_condition(Rest);
has_range_condition([{'or', Nested} | Rest]) ->
has_range_condition(Nested) orelse has_range_condition(Rest);
has_range_condition([_ | Rest]) ->
has_range_condition(Rest).
%%====================================================================
%% Internal - Validation
%%====================================================================
validate_conditions([]) ->
ok;
validate_conditions([Condition | Rest]) ->
case validate_condition(Condition) of
ok -> validate_conditions(Rest);
{error, _} = Error -> Error
end.
validate_condition({path, Path, _Value}) ->
validate_path(Path);
validate_condition({compare, Path, Op, _Value}) ->
case lists:member(Op, ['>', '<', '>=', '=<', '=/=', '==']) of
true -> validate_path(Path);
false -> {error, {invalid_operator, Op}}
end;
validate_condition({'and', Conditions}) when is_list(Conditions) ->
validate_conditions(Conditions);
validate_condition({'or', Conditions}) when is_list(Conditions) ->
validate_conditions(Conditions);
validate_condition({'not', Condition}) ->
validate_condition(Condition);
validate_condition({in, Path, Values}) when is_list(Values) ->
validate_path(Path);
validate_condition({in, _Path, _}) ->
{error, {invalid_in_values, must_be_list}};
validate_condition({contains, Path, _Value}) ->
validate_path(Path);
validate_condition({exists, Path}) ->
validate_path(Path);
validate_condition({missing, Path}) ->
validate_path(Path);
validate_condition({regex, Path, Pattern}) when is_binary(Pattern) ->
case re:compile(Pattern) of
{ok, _} -> validate_path(Path);
{error, Reason} -> {error, {invalid_regex, Reason}}
end;
validate_condition({regex, _Path, _}) ->
{error, {invalid_regex, must_be_binary}};
validate_condition({prefix, Path, Prefix}) when is_binary(Prefix) ->
validate_path(Path);
validate_condition({prefix, _Path, _}) ->
{error, {invalid_prefix, must_be_binary}};
validate_condition(Other) ->
{error, {invalid_condition, Other}}.
validate_path(Path) when is_list(Path) ->
case lists:all(fun is_valid_path_component/1, Path) of
true -> ok;
false -> {error, {invalid_path, Path}}
end;
validate_path(Path) ->
{error, {invalid_path, Path, must_be_list}}.
is_valid_path_component(C) when is_binary(C) -> true;
is_valid_path_component(C) when is_integer(C), C >= 0 -> true;
is_valid_path_component('*') -> true; % Wildcard for array any
is_valid_path_component(_) -> false.
%%====================================================================
%% Internal - Execution
%%====================================================================
%% @doc Select read profile based on limit and cardinality.
%% - point: Small result sets (limit =< 10), keep blocks in cache
%% - short_range: Medium scans (limit =< 200 or cardinality =< 200), auto-readahead
%% - long_scan: Large/unbounded scans, prefetch aggressively, avoid cache pollution
-spec select_read_profile(undefined | pos_integer(), non_neg_integer()) -> read_profile().
select_read_profile(Limit, _Cardinality) when is_integer(Limit), Limit =< 10 ->
point;
select_read_profile(Limit, _Cardinality) when is_integer(Limit), Limit =< 200 ->
short_range;
select_read_profile(_Limit, Cardinality) when Cardinality =< 200 ->
short_range;
select_read_profile(_, _) ->
long_scan.
%% @doc Select read profile based on limit only (for collectors)
select_read_profile(MaxCount) when is_integer(MaxCount), MaxCount =< 10 ->
point;
select_read_profile(MaxCount) when is_integer(MaxCount), MaxCount =< 200 ->
short_range;
select_read_profile(_) ->
long_scan.
%% @doc Execute using direct index key lookup (fastest)
%% Uses prefix bloom filters for O(1) skip of non-matching SST blocks.
execute_index_seek(StoreRef, DbName, Plan, Snapshot) ->
#query_plan{conditions = Conditions, include_docs = IncludeDocs,
projections = Projections} = Plan,
%% Find the first equality condition to use for index lookup
case find_first_equality(Conditions) of
{ok, {path, Path, Value} = IndexCond} ->
FullPath = Path ++ [Value],
%% Compute remaining conditions (index condition already satisfied by iteration)
RemainingConds = Conditions -- [IndexCond],
%% Check for pure equality: single condition, no doc body needed
NeedsBody = IncludeDocs orelse needs_body_for_projection(Projections),
case {RemainingConds, NeedsBody} of
{[], false} ->
%% Pure equality - return DocIds directly from posting list
execute_pure_equality(StoreRef, DbName, Path, Value, Plan, Snapshot);
_ ->
%% Need doc fetch - original path
execute_index_seek_with_fetch(StoreRef, DbName, FullPath, RemainingConds, Plan, Snapshot)
end;
not_found ->
%% Fallback to scan
execute_index_scan(StoreRef, DbName, Plan, Snapshot)
end.
%% @doc Execute index seek that requires document fetching
execute_index_seek_with_fetch(StoreRef, DbName, FullPath, RemainingConds, Plan, Snapshot) ->
#query_plan{order = Order, limit = Limit, offset = Offset} = Plan,
%% O(1) cardinality check - skip iteration if no matches
Cardinality = case barrel_ars_index:get_path_cardinality(StoreRef, DbName, FullPath) of
{ok, C} -> C;
{error, _} -> 1 %% Assume at least 1 if error
end,
case Cardinality of
0 ->
%% No matches - skip iteration entirely
filter_and_project(StoreRef, DbName, [], Plan, Snapshot);
_ ->
%% Index condition already satisfied - use remaining conditions only
FilterPlan = Plan#query_plan{conditions = RemainingConds},
case Limit of
undefined ->
%% Unbounded query - use adaptive chunked execution
execute_index_seek_chunked(StoreRef, DbName, FullPath, FilterPlan, Snapshot);
_ when is_integer(Limit) ->
%% Limited query - check streaming vs batch
case should_use_streaming(Limit, Cardinality) of
true ->
%% Streaming: fetch/match documents one-by-one
execute_index_seek_streaming(StoreRef, DbName, FullPath, FilterPlan, Snapshot);
false ->
%% Batch: collect DocIds then fetch/filter
%% PROFILING: Index iteration
T0 = erlang:monotonic_time(microsecond),
EarlyLimitResult = can_use_early_limit(Order, RemainingConds, Limit),
DocIds = case EarlyLimitResult of
{true, MaxCollect} ->
collect_docids_for_path_limited(StoreRef, DbName, FullPath, MaxCollect + Offset);
false ->
collect_docids_for_path(StoreRef, DbName, FullPath)
end,
T1 = erlang:monotonic_time(microsecond),
put(profile_index_iter, pdict_get(profile_index_iter, 0) + (T1 - T0)),
put(profile_doc_count, length(DocIds)),
filter_and_project(StoreRef, DbName, DocIds, FilterPlan, Snapshot)
end
end
end.
%% @doc Execute index seek using streaming approach
%% Iterates index entries and fetches/matches documents one-by-one
%% Stops early when enough results are collected
%% Much faster than batch-fetch for small limits on high-cardinality indexes
execute_index_seek_streaming(StoreRef, DbName, FullPath, Plan, _Snapshot) ->
#query_plan{
conditions = Conditions,
bindings = Bindings,
projections = Projections,
limit = Limit,
offset = Offset,
include_docs = IncludeDocs,
decoder_fun = DecoderFun
} = Plan,
%% Calculate how many results we need
%% Use 1.5x to account for deleted docs (less than batch approach)
MaxCollect = round((Limit + Offset) * 1.5),
%% Iterate index entries and fetch/match documents one-by-one
%% Use point profile since streaming is for small limits
{_Count, Results} = barrel_ars_index:fold_path_reverse(
StoreRef, DbName, FullPath,
fun({_Path, DocId}, {Count, Acc}) ->
case Count >= MaxCollect of
true ->
{stop, {Count, Acc}};
false ->
%% Fetch and filter document
case fetch_and_match_doc(StoreRef, DbName, DocId, Conditions, Bindings, IncludeDocs) of
{ok, CborBin, Doc, BoundVars} ->
Result = project_result(CborBin, Doc, DocId, Projections, BoundVars, IncludeDocs, DecoderFun),
{ok, {Count + 1, [Result | Acc]}};
skip ->
{ok, {Count, Acc}}
end
end
end,
{0, []},
point
),
%% Results are in reverse order due to prepend - reverse them
OrderedResults = lists:reverse(Results),
%% Apply offset and limit
FinalResults = apply_offset_limit(OrderedResults, Offset, Limit),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, FinalResults, LastSeq}.
%% @doc Execute index seek with adaptive chunked processing for unbounded queries.
%% Processes index entries in chunks, batch-fetching each chunk for efficiency.
%% Chunk size adapts based on average document size to target ~1MB per batch.
execute_index_seek_chunked(StoreRef, DbName, FullPath, Plan, Snapshot) ->
#query_plan{
conditions = Conditions,
bindings = Bindings,
projections = Projections,
order = Order,
offset = Offset,
include_docs = IncludeDocs,
decoder_fun = DecoderFun
} = Plan,
%% Fold index in chunks, batch-fetch and filter each chunk
%% Use long_scan profile for unbounded queries (prefetch, avoid cache pollution)
{ResultChunks, _TotalBytes, _ChunkCount} = barrel_ars_index:fold_path_chunked(
StoreRef, DbName, FullPath, ?INITIAL_CHUNK_SIZE,
fun(DocIdChunk, {Acc, TotalBytes, ChunkCount}) ->
{ChunkResults, ChunkBytes} = batch_fetch_and_filter_with_size(
StoreRef, DbName, DocIdChunk,
Conditions, Bindings, Projections, IncludeDocs, DecoderFun, Snapshot),
%% Calculate new chunk size based on average doc size
NewTotalBytes = TotalBytes + ChunkBytes,
NewChunkCount = ChunkCount + 1,
TotalDocs = length(DocIdChunk) * NewChunkCount,
AvgDocSize = case TotalDocs > 0 of
true -> NewTotalBytes div TotalDocs;
false -> 0
end,
NewChunkSize = calculate_chunk_size(AvgDocSize),
{ok, {[ChunkResults | Acc], NewTotalBytes, NewChunkCount}, NewChunkSize}
end,
{[], 0, 0},
long_scan
),
%% Flatten result chunks (they're in reverse order)
FlatResults = lists:append(lists:reverse(ResultChunks)),
%% Apply ordering if specified
OrderedResults = apply_order(FlatResults, Order),
%% Apply offset (no limit for unbounded queries)
FinalResults = apply_offset_limit(OrderedResults, Offset, undefined),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, FinalResults, LastSeq}.
%% @doc Calculate optimal chunk size based on average doc size
%% Targets ~1MB of doc data per chunk for good cache utilization
calculate_chunk_size(AvgDocSize) when AvgDocSize > 0 ->
Optimal = ?TARGET_CHUNK_BYTES div AvgDocSize,
max(?MIN_CHUNK_SIZE, min(?MAX_CHUNK_SIZE, Optimal));
calculate_chunk_size(_) ->
?INITIAL_CHUNK_SIZE.
%% @doc Batch fetch with size tracking for adaptive chunking
batch_fetch_and_filter_with_size(StoreRef, DbName, DocIds, Conditions, Bindings, Projections, IncludeDocs, DecoderFun, Snapshot) ->
Results = batch_fetch_and_filter(StoreRef, DbName, DocIds, Conditions, Bindings, Projections, IncludeDocs, DecoderFun, Snapshot),
%% Estimate bytes from doc count (conservative ~500 bytes avg per doc)
EstimatedBytes = length(DocIds) * 500,
{Results, EstimatedBytes}.
%% @private Helper for process dictionary get with default
pdict_get(Key, Default) ->
case erlang:get(Key) of
undefined -> Default;
Value -> Value
end.
%% @doc Check if early limit optimization can be used
%% Returns {true, MaxToCollect} or false
can_use_early_limit([], [], Limit) when is_integer(Limit), Limit > 0 ->
%% No ORDER BY and no remaining conditions - safe to limit early
%% Collect limit + small buffer to account for deleted docs (reduced from 2x)
{true, max(Limit + 5, round(Limit * 1.2))};
can_use_early_limit([], RemainingConds, Limit) when is_integer(Limit), Limit > 0, RemainingConds =/= [] ->
%% No ORDER BY but has remaining conditions - over-collect to handle filter losses
%% Use 3x multiplier to account for filtering (conservative estimate)
{true, Limit * 3};
can_use_early_limit(_, _, _) ->
%% Has ORDER BY - need full scan to sort candidates
false.
%% @doc Check if streaming execution should be used for index seek
%% Streaming is better when: small limit + high cardinality index
%% Returns true if streaming should be used
should_use_streaming(Limit, Cardinality) when is_integer(Limit), Limit > 0, Limit =< 100 ->
%% Use streaming when cardinality is much higher than limit
%% This avoids batch-fetching many documents we won't use
Cardinality > Limit * 10;
should_use_streaming(_, _) ->
false.
%% @doc Check if we can use indexed order for ORDER BY + LIMIT
%% Returns {true, Path, Dir} if ORDER BY can use index iteration order
can_use_indexed_order([], _Limit) ->
false;
can_use_indexed_order(_Order, undefined) ->
false;
can_use_indexed_order([{Path, Dir}], Limit) when is_list(Path), is_integer(Limit), Limit > 0 ->
%% Single ORDER BY on a path - can use index order
{true, Path, Dir};
can_use_indexed_order([{Var, _Dir}], Limit) when is_atom(Var), is_integer(Limit), Limit > 0 ->
%% ORDER BY on a variable - check if it's bound to a path
%% For now, we only support direct path ordering
case is_logic_var(Var) of
true -> false; %% Variable - can't use directly
false -> false
end;
can_use_indexed_order(_, _) ->
false.
%% @doc Execute query using indexed order for ORDER BY + LIMIT
%% Iterates the index in the requested order and stops early
%% Uses CBOR iterator for condition matching and projection
execute_with_indexed_order(StoreRef, DbName, OrderPath, Dir, Plan, _Snapshot) ->
#query_plan{
conditions = Conditions,
bindings = Bindings,
projections = Projections,
limit = Limit,
offset = Offset,
include_docs = IncludeDocs,
decoder_fun = DecoderFun
} = Plan,
%% Calculate how many results we need (accounting for filtering)
%% Collect 3x to handle filtering and deleted docs
MaxCollect = (Limit + Offset) * 3,
%% Select read profile based on limit (ORDER BY + LIMIT is typically small)
Profile = select_read_profile(Limit, MaxCollect),
%% Choose iteration direction based on ORDER BY
FoldFun = case Dir of
desc -> fun barrel_ars_index:fold_path_values_reverse/6;
asc -> fun barrel_ars_index:fold_path_values/6
end,
%% Determine if we should use lazy checking (index-based filtering)
UseLazy = can_lazy_check_conditions(Conditions),
%% Collect matching documents with early termination
{_Count, Results} = FoldFun(
StoreRef, DbName, OrderPath,
fun({_Path, DocId}, {Count, Acc}) ->
case Count >= MaxCollect of
true ->
{stop, {Count, Acc}};
false ->
%% Filter and fetch document
case UseLazy of
true ->
%% Lazy path: check conditions via index BEFORE body fetch
lazy_fetch_and_match_doc(StoreRef, DbName, DocId, Conditions,
Bindings, Projections, IncludeDocs, DecoderFun,
Count, Acc);
false ->
%% Standard path: fetch body first, then check conditions
case fetch_and_match_doc(StoreRef, DbName, DocId, Conditions, Bindings, IncludeDocs) of
{ok, CborBin, Doc, BoundVars} ->
Result = project_result(CborBin, Doc, DocId, Projections, BoundVars, IncludeDocs, DecoderFun),
{ok, {Count + 1, [Result | Acc]}};
skip ->
{ok, {Count, Acc}}
end
end
end
end,
{0, []},
Profile
),
%% Results are in reverse order due to prepend - reverse them
%% Note: For DESC we iterate high-to-low, prepend gives low-to-high, so reverse gives high-to-low
%% For ASC we iterate low-to-high, prepend gives high-to-low, so reverse gives low-to-high
OrderedResults = lists:reverse(Results),
%% Apply offset and limit (no sorting needed - already in order)
FinalResults = apply_offset_limit(OrderedResults, Offset, Limit),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, FinalResults, LastSeq}.
%% @doc Lazy fetch and match - check conditions via index BEFORE body fetch.
%% This avoids fetching bodies for documents that don't match filters.
%% Complexity: O(limit) instead of O(matching_docs)
lazy_fetch_and_match_doc(StoreRef, DbName, DocId, Conditions, _Bindings, Projections,
IncludeDocs, DecoderFun, Count, Acc) ->
case lazy_check_conditions(StoreRef, DbName, DocId, Conditions) of
true ->
%% Conditions passed via index lookup - now fetch body if needed
case IncludeDocs of
true ->
case barrel_doc_body_store:get_current_body(DbName, DocId) of
{ok, CborBin} ->
Doc = barrel_docdb_codec_cbor:decode_any(CborBin),
Result = project_result(CborBin, Doc, DocId, Projections, #{}, IncludeDocs, DecoderFun),
{ok, {Count + 1, [Result | Acc]}};
_ ->
%% Doc deleted
{ok, {Count, Acc}}
end;
false ->
%% No body needed - just return DocId
Result = #{<<"_id">> => DocId},
{ok, {Count + 1, [Result | Acc]}}
end;
false ->
%% Conditions failed - skip without body fetch
{ok, {Count, Acc}}
end.
%% @doc Check conditions via index lookups (lazy evaluation).
%% Returns true if all conditions pass, false otherwise.
%% Uses index point lookups instead of body fetch.
lazy_check_conditions(_StoreRef, _DbName, _DocId, []) ->
true;
lazy_check_conditions(StoreRef, DbName, DocId, [Condition | Rest]) ->
case lazy_check_condition(StoreRef, DbName, DocId, Condition) of
true -> lazy_check_conditions(StoreRef, DbName, DocId, Rest);
false -> false
end.
%% @private Check a single condition via index lookup
lazy_check_condition(StoreRef, DbName, DocId, {path, Path, Value}) ->
%% O(1) point lookup in value index
barrel_ars_index:docid_has_value(StoreRef, DbName, Path, Value, DocId);
lazy_check_condition(StoreRef, DbName, DocId, {compare, Path, Op, Value}) ->
%% Load doc_paths (once per DocId) and compare
barrel_ars_index:docid_satisfies_compare(StoreRef, DbName, DocId, Path, Op, Value);
lazy_check_condition(StoreRef, DbName, DocId, {exists, Path}) ->
%% Check if path exists via doc_paths
case barrel_ars_index:docid_get_value(StoreRef, DbName, DocId, Path) of
{ok, _} -> true;
_ -> false
end;
lazy_check_condition(StoreRef, DbName, DocId, {prefix, Path, Prefix}) ->
%% Get value and check prefix
case barrel_ars_index:docid_get_value(StoreRef, DbName, DocId, Path) of
{ok, Value} when is_binary(Value) ->
PrefixLen = byte_size(Prefix),
case Value of
<<Prefix:PrefixLen/binary, _/binary>> -> true;
_ -> false
end;
_ ->
false
end;
lazy_check_condition(StoreRef, DbName, DocId, {'and', Conditions}) ->
lazy_check_conditions(StoreRef, DbName, DocId, Conditions);
lazy_check_condition(StoreRef, DbName, DocId, {'or', Conditions}) ->
lazy_check_any(StoreRef, DbName, DocId, Conditions);
lazy_check_condition(_StoreRef, _DbName, _DocId, _Condition) ->
%% Unsupported condition type - should not happen if can_lazy_check_condition is correct
false.
%% @private Check if any condition passes (for OR)
lazy_check_any(_StoreRef, _DbName, _DocId, []) ->
false;
lazy_check_any(StoreRef, DbName, DocId, [Condition | Rest]) ->
case lazy_check_condition(StoreRef, DbName, DocId, Condition) of
true -> true;
false -> lazy_check_any(StoreRef, DbName, DocId, Rest)
end.
%% @doc Fetch a document and check if it matches conditions (direct body fetch)
%% Returns {ok, CborBin, Doc, BoundVars} or skip
%% Returns both raw binary (for doc_format=binary) and decoded doc (for matching/projections)
%% No entity lookup needed - body is stored at doc_body(DbName, DocId)
fetch_and_match_doc(_StoreRef, DbName, DocId, Conditions, Bindings, _IncludeDocs) ->
%% Direct body fetch - no entity lookup needed!
case barrel_doc_body_store:get_current_body(DbName, DocId) of
{ok, CborBin} ->
%% Decode to map for condition matching
Doc = barrel_docdb_codec_cbor:decode_any(CborBin),
case matches_conditions(Doc, Conditions, Bindings) of
{true, BoundVars} ->
{ok, CborBin, Doc, BoundVars};
false ->
skip
end;
_ ->
%% Doc deleted or not found
skip
end.
%% @doc Execute using index prefix scan with adaptive strategy selection
execute_index_scan(StoreRef, DbName, Plan, Snapshot) ->
#query_plan{conditions = Conditions, limit = Limit} = Plan,
%% Use adaptive classification for equality queries
%% For limited queries, always prefer scan (early termination)
%% For unlimited high-cardinality, prefer bulk decode
case classify_scan_query_adaptive(StoreRef, DbName, Conditions, Plan) of
{scan_equality, Path, Value} ->
%% Low cardinality or has limit: iterate value_index keys
execute_pure_equality(StoreRef, DbName, Path, Value, Plan, Snapshot);
{bulk_equality, Path, Value} ->
%% High cardinality without limit: single posting list decode
case Limit of
undefined ->
execute_bulk_equality(StoreRef, DbName, Path, Value, Plan, Snapshot);
_ ->
%% Has limit, prefer scan for early termination
execute_pure_equality(StoreRef, DbName, Path, Value, Plan, Snapshot)
end;
{pure_exists, Path} ->
execute_pure_exists(StoreRef, DbName, Path, Plan, Snapshot);
{pure_prefix, Path, Prefix} ->
execute_pure_prefix(StoreRef, DbName, Path, Prefix, Plan, Snapshot);
{pure_compare, Path, Op, Value} ->
%% Range query using index scan
execute_pure_compare(StoreRef, DbName, Path, Op, Value, Plan, Snapshot);
{multi_index, IndexConditions} ->
%% Multi-condition with all index-friendly conditions
execute_multi_index(StoreRef, DbName, IndexConditions, Plan, Snapshot);
needs_body ->
%% Extract limit from Plan for early termination
#query_plan{limit = Limit, offset = Offset} = Plan,
MaxCount = case Limit of
undefined -> infinity;
L when is_integer(L) -> (L + Offset) * 3 %% Over-collect to handle filtering
end,
DocIds = collect_scan_docids(StoreRef, DbName, Conditions, Snapshot, MaxCount),
filter_and_project(StoreRef, DbName, DocIds, Plan, Snapshot)
end.
%% @doc Classify scan query for pure index execution.
%% Returns:
%% {pure_equality, Path, Value} - exact match, return DocIds from posting list
%% {pure_exists, Path} - exists check, return DocIds from posting lists
%% {pure_prefix, Path, Prefix} - prefix match, return DocIds from posting lists
%% needs_body - requires doc body fetch for filtering/projection
-spec classify_scan_query([condition()], query_plan()) ->
{pure_equality, [term()], term()} |
{pure_exists, [term()]} |
{pure_prefix, [term()], binary()} |
{pure_compare, [term()], compare_op(), term()} |
{multi_index, [condition()]} |
needs_body.
classify_scan_query(Conditions, Plan) ->
#query_plan{projections = Projections} = Plan,
%% Note: include_docs doesn't affect classification - we can still use
%% pure index scans and fetch docs afterward. Only projections that
%% reference paths require body fetch during query.
NeedsBodyForQuery = needs_body_for_projection(Projections),
case {Conditions, NeedsBodyForQuery} of
{[{path, Path, Value}], false} ->
%% Pure equality - only if value is concrete (not a logic var)
case is_logic_var(Value) of
true -> needs_body;
false -> {pure_equality, Path, Value}
end;
{[{exists, Path}], false} ->
{pure_exists, Path};
{[{prefix, Path, Prefix}], false} ->
{pure_prefix, Path, Prefix};
{[{compare, Path, Op, Value}], false} when Op =:= '>' orelse Op =:= '<'
orelse Op =:= '>=' orelse Op =:= '=<' ->
%% Pure compare - use index range scan
case is_logic_var(Value) of
true -> needs_body;
false -> {pure_compare, Path, Op, Value}
end;
{MultiConds, false} when length(MultiConds) > 1 ->
%% Multiple conditions - check if all are index-friendly
case all_index_conditions(MultiConds) of
true -> {multi_index, MultiConds};
false -> needs_body
end;
_ ->
needs_body
end.
%% @doc Check if all conditions can be evaluated using index
all_index_conditions([]) -> true;
all_index_conditions([{path, _, Value} | Rest]) ->
case is_logic_var(Value) of
true -> false;
false -> all_index_conditions(Rest)
end;
all_index_conditions([{exists, _} | Rest]) ->
all_index_conditions(Rest);
all_index_conditions([{prefix, _, _} | Rest]) ->
all_index_conditions(Rest);
all_index_conditions([{compare, _, Op, Value} | Rest])
when Op =:= '>' orelse Op =:= '<' orelse Op =:= '>=' orelse Op =:= '=<' ->
case is_logic_var(Value) of
true -> false;
false -> all_index_conditions(Rest)
end;
all_index_conditions(_) -> false.
%% @doc Check if projections require document body
needs_body_for_projection(['*']) -> false;
needs_body_for_projection([]) -> false;
needs_body_for_projection(Projections) ->
%% Projections require body if they include path references
lists:any(fun(P) -> is_list(P) end, Projections).
%% @doc Adaptive query classification with cardinality-based strategy selection
%% For equality queries, chooses between:
%% - scan_equality: iterate value_index keys (good for selective queries, early termination)
%% - bulk_equality: decode posting list (good for high-cardinality, one seek)
-spec classify_scan_query_adaptive(barrel_store_rocksdb:db_ref(), db_name(), [condition()], query_plan()) ->
{scan_equality, [term()], term()} |
{bulk_equality, [term()], term()} |
{pure_exists, [term()]} |
{pure_prefix, [term()], binary()} |
{pure_compare, [term()], compare_op(), term()} |
{multi_index, [condition()]} |
needs_body.
classify_scan_query_adaptive(StoreRef, DbName, Conditions, Plan) ->
#query_plan{projections = Projections} = Plan,
%% Note: include_docs doesn't affect classification - we can still use
%% pure index scans and fetch docs afterward.
NeedsBodyForQuery = needs_body_for_projection(Projections),
case {Conditions, NeedsBodyForQuery} of
{[{path, Path, Value}], false} ->
case is_logic_var(Value) of
true ->
needs_body;
false ->
%% Check cardinality to choose strategy
choose_equality_strategy(StoreRef, DbName, Path, Value)
end;
{[{exists, Path}], false} ->
{pure_exists, Path};
{[{prefix, Path, Prefix}], false} ->
{pure_prefix, Path, Prefix};
{[{compare, Path, Op, Value}], false} when Op =:= '>' orelse Op =:= '<'
orelse Op =:= '>=' orelse Op =:= '=<' ->
case is_logic_var(Value) of
true -> needs_body;
false -> {pure_compare, Path, Op, Value}
end;
{MultiConds, false} when length(MultiConds) > 1 ->
%% Multiple conditions - check if all are index-friendly
case all_index_conditions(MultiConds) of
true -> {multi_index, MultiConds};
false -> needs_body
end;
_ ->
needs_body
end.
%% @doc Choose between scan vs bulk strategy based on estimated cardinality
-spec choose_equality_strategy(barrel_store_rocksdb:db_ref(), db_name(), [term()], term()) ->
{scan_equality, [term()], term()} | {bulk_equality, [term()], term()}.
choose_equality_strategy(StoreRef, DbName, Path, Value) ->
case estimate_cardinality(StoreRef, DbName, Path, Value) of
{ok, Count} when Count =< ?ADAPTIVE_CARDINALITY_THRESHOLD ->
%% Low cardinality: use value_index scan with early termination
{scan_equality, Path, Value};
{ok, _HighCount} ->
%% High cardinality: use posting list bulk decode
{bulk_equality, Path, Value};
{error, _} ->
%% Error getting stats, fall back to scan (safer default)
{scan_equality, Path, Value}
end.
%% @doc Estimate cardinality for a path+value combination
%% Uses stored path statistics for O(1) lookup
-spec estimate_cardinality(barrel_store_rocksdb:db_ref(), db_name(), [term()], term()) ->
{ok, non_neg_integer()} | {error, term()}.
estimate_cardinality(StoreRef, DbName, Path, Value) ->
FullPath = Path ++ [Value],
barrel_ars_index:get_path_cardinality(StoreRef, DbName, FullPath).
%% @doc Execute pure exists query - iterate posting lists directly
%% Path index only contains non-deleted docs, so no doc_current check needed
%% Uses fold_posting which gets list of DocIds per key - much more efficient
execute_pure_exists(StoreRef, DbName, Path, Plan, Snapshot) ->
#query_plan{
limit = Limit,
offset = Offset
} = Plan,
%% Calculate how many unique docs we need
MaxCollect = case Limit of
undefined -> undefined;
L -> L + Offset
end,
%% Iterate posting lists directly with snapshot for consistency
%% Each key contains a list of DocIds
{_, _, Results} = barrel_ars_index:fold_posting_with_snapshot(
StoreRef, DbName, Path,
fun(_Key, DocIds, {Seen, Count, Acc}) ->
%% Process all DocIds from this posting list (ignore Key)
process_exists_docids(DocIds, Seen, Count, Acc, MaxCollect)
end,
{#{}, 0, []},
Snapshot
),
%% Apply offset and limit (results are in reverse order)
Results1 = lists:reverse(Results),
Results2 = apply_offset_limit(Results1, Offset, Limit),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, Results2, LastSeq}.
%% @private Process DocIds from a posting list for exists query
process_exists_docids([], Seen, Count, Acc, _MaxCollect) ->
{ok, {Seen, Count, Acc}};
process_exists_docids([DocId | Rest], Seen, Count, Acc, MaxCollect) ->
case maps:is_key(DocId, Seen) of
true ->
%% Already seen this doc, skip
process_exists_docids(Rest, Seen, Count, Acc, MaxCollect);
false ->
%% New doc
NewSeen = Seen#{DocId => true},
NewCount = Count + 1,
NewAcc = [#{<<"id">> => DocId} | Acc],
case MaxCollect =/= undefined andalso NewCount >= MaxCollect of
true ->
{stop, {NewSeen, NewCount, NewAcc}};
false ->
process_exists_docids(Rest, NewSeen, NewCount, NewAcc, MaxCollect)
end
end.
%% @doc Execute pure prefix query - iterate posting lists directly
%% No doc_current check needed since path index only contains non-deleted docs
%% Uses fold_prefix_posting which gets raw DocId lists per posting list key
execute_pure_prefix(StoreRef, DbName, Path, Prefix, Plan, Snapshot) ->
#query_plan{
limit = Limit,
offset = Offset
} = Plan,
%% Calculate how many unique docs we need
MaxCollect = case Limit of
undefined -> undefined;
L -> L + Offset
end,
%% Iterate posting lists directly with snapshot for consistency
%% Each key contains a list of DocIds
{_, _, Results} = barrel_ars_index:fold_prefix_posting_with_snapshot(
StoreRef, DbName, Path, Prefix,
fun(_Key, DocIds, {Seen, Count, Acc}) ->
%% Process all DocIds from this posting list
process_prefix_docids(DocIds, Seen, Count, Acc, MaxCollect)
end,
{#{}, 0, []},
Snapshot
),
%% Apply offset and limit (results are in reverse order)
Results1 = lists:reverse(Results),
Results2 = apply_offset_limit(Results1, Offset, Limit),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, Results2, LastSeq}.
%% @doc Execute pure compare query - iterates posting lists for range matches
%% Uses optimized fold_compare_docids which skips key decoding overhead
execute_pure_compare(StoreRef, DbName, Path, Op, Value, Plan, Snapshot) ->
#query_plan{
limit = Limit,
offset = Offset
} = Plan,
%% Calculate how many unique docs we need
MaxCollect = case Limit of
undefined -> undefined;
L -> L + Offset
end,
%% Iterate posting lists for values matching the compare condition
%% Uses optimized fold that doesn't decode keys (we don't need paths)
FoldFun = fun(DocId, {Seen, Count, Acc}) ->
case maps:is_key(DocId, Seen) of
true ->
{ok, {Seen, Count, Acc}};
false ->
NewSeen = Seen#{DocId => true},
NewCount = Count + 1,
NewAcc = [#{<<"id">> => DocId} | Acc],
case MaxCollect =/= undefined andalso NewCount >= MaxCollect of
true ->
{stop, {NewSeen, NewCount, NewAcc}};
false ->
{ok, {NewSeen, NewCount, NewAcc}}
end
end
end,
%% Use provided snapshot for consistency
{_, _, Results} = barrel_ars_index:fold_compare_docids_with_snapshot(
StoreRef, DbName, Path, Op, Value, FoldFun, {#{}, 0, []}, Snapshot
),
%% Apply offset and limit (results are in reverse order)
Results1 = lists:reverse(Results),
Results2 = apply_offset_limit(Results1, Offset, Limit),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, Results2, LastSeq}.
%% @doc Execute multi-index query - intersects posting lists from all conditions
execute_multi_index(StoreRef, DbName, Conditions, Plan, Snapshot) ->
#query_plan{
limit = Limit,
offset = Offset
} = Plan,
%% Collect DocIds using native postings API intersection (V2)
IntersectedDocIds = intersect_docid_sets_v2(StoreRef, DbName, Conditions),
%% Build results with limit applied
LimitedDocIds = case Limit of
undefined -> IntersectedDocIds;
L -> lists:sublist(IntersectedDocIds, L + Offset)
end,
Results0 = [#{<<"id">> => DocId} || DocId <- LimitedDocIds],
Results = apply_offset_limit(Results0, Offset, Limit),
%% Get last sequence
_ = Snapshot, %% Snapshot not used yet for this path
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, Results, LastSeq}.
%% @doc Execute pure equality query - iterates value-first index with early termination
%% Uses prefix scan on individual keys, stops after collecting enough results
execute_pure_equality(StoreRef, DbName, Path, Value, Plan, Snapshot) ->
#query_plan{
limit = Limit,
offset = Offset
} = Plan,
%% Calculate how many docs we need to collect
MaxCollect = case Limit of
undefined -> undefined;
L -> L + Offset
end,
%% Fold over value-first index with early termination
%% Path is the field path, Value is what we're searching for
FoldFun = fun(DocId, {Count, Acc}) ->
NewCount = Count + 1,
NewAcc = [#{<<"id">> => DocId} | Acc],
case MaxCollect =/= undefined andalso NewCount >= MaxCollect of
true ->
%% Collected enough, stop iteration
{stop, {NewCount, NewAcc}};
false ->
{ok, {NewCount, NewAcc}}
end
end,
{_, Results} = barrel_ars_index:fold_value_index(
StoreRef, DbName, Value, Path, FoldFun, {0, []}, Snapshot),
%% Results are in reverse order, fix and apply offset
FinalResults = apply_offset_limit(lists:reverse(Results), Offset, Limit),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, FinalResults, LastSeq}.
%% @doc Execute bulk equality query - single seek + posting list decode
%% Used for high-cardinality queries where iterating individual keys is slower
%% than decoding a posting list in one go.
execute_bulk_equality(StoreRef, DbName, Path, Value, Plan, _Snapshot) ->
#query_plan{
limit = Limit,
offset = Offset
} = Plan,
%% Single O(1) posting list lookup
FullPath = Path ++ [Value],
DocIds = barrel_ars_index:get_posting_list(StoreRef, DbName, FullPath),
%% Build results and apply offset/limit
Results = [#{<<"id">> => Id} || Id <- DocIds],
FinalResults = apply_offset_limit(Results, Offset, Limit),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, FinalResults, LastSeq}.
%% @private Process DocIds from a posting list for prefix query
process_prefix_docids([], Seen, Count, Acc, _MaxCollect) ->
{ok, {Seen, Count, Acc}};
process_prefix_docids([DocId | Rest], Seen, Count, Acc, MaxCollect) ->
case maps:is_key(DocId, Seen) of
true ->
%% Already seen this doc, skip
process_prefix_docids(Rest, Seen, Count, Acc, MaxCollect);
false ->
%% New doc
NewSeen = Seen#{DocId => true},
NewCount = Count + 1,
NewAcc = [#{<<"id">> => DocId} | Acc],
case MaxCollect =/= undefined andalso NewCount >= MaxCollect of
true ->
{stop, {NewSeen, NewCount, NewAcc}};
false ->
process_prefix_docids(Rest, NewSeen, NewCount, NewAcc, MaxCollect)
end
end.
%% @doc Collect DocIds for scan-based execution
%% Tries optimized paths: multi-index intersection, exists/prefix, then path prefix scan
%% MaxCount limits collection for early termination (use infinity for no limit)
collect_scan_docids(StoreRef, DbName, Conditions, _Snapshot, MaxCount) ->
%% First check if we have multiple index-friendly conditions for intersection
IndexConditions = [C || C <- Conditions, is_index_condition(C)],
case IndexConditions of
[_, _ | _] ->
%% Multiple index conditions - use native postings intersection (V2)
intersect_docid_sets_v2(StoreRef, DbName, IndexConditions);
_ ->
%% Single or no index conditions - use existing path
case find_exists_condition(Conditions) of
{ok, Path} ->
%% Exists check: collect docs that have any value at this path
collect_docids_for_path_exists(StoreRef, DbName, Path, MaxCount);
not_found ->
case find_prefix_condition(Conditions) of
{ok, Path, Prefix} ->
%% Optimized interval scan for prefix queries
collect_docids_for_value_prefix(StoreRef, DbName, Path, Prefix, MaxCount);
not_found ->
case find_best_scan_path(Conditions) of
{ok, Path} ->
collect_docids_for_prefix(StoreRef, DbName, Path, MaxCount);
not_found ->
collect_all_docids(StoreRef, DbName, MaxCount)
end
end
end
end.
%% @private Check if a condition can use index for intersection
is_index_condition({path, _Path, Value}) ->
not is_logic_var(Value);
is_index_condition({compare, _Path, _Op, Value}) ->
not is_logic_var(Value);
is_index_condition({prefix, _Path, _Prefix}) ->
true;
is_index_condition({exists, _Path}) ->
true;
is_index_condition(_) ->
false.
%% @doc Find an exists condition for optimized path scan
find_exists_condition([]) ->
not_found;
find_exists_condition([{exists, Path} | _]) ->
{ok, Path};
find_exists_condition([{'and', Nested} | Rest]) ->
case find_exists_condition(Nested) of
{ok, _} = Found -> Found;
not_found -> find_exists_condition(Rest)
end;
find_exists_condition([_ | Rest]) ->
find_exists_condition(Rest).
%% @doc Collect all DocIds that have any value at the given path
%% Uses the path index to find docs with the path without fetching full docs
%% MaxCount limits collection for early termination (use infinity for no limit)
collect_docids_for_path_exists(StoreRef, DbName, Path, infinity) ->
%% Use long_scan profile since exists queries typically return many docs
barrel_ars_index:fold_path_values(
StoreRef, DbName, Path,
fun({_FullPath, DocId}, Acc) -> {ok, [DocId | Acc]} end,
[],
long_scan
);
collect_docids_for_path_exists(StoreRef, DbName, Path, MaxCount) when is_integer(MaxCount) ->
%% Limited collection with early termination
Profile = select_read_profile(MaxCount),
{_, DocIds} = barrel_ars_index:fold_path_values(
StoreRef, DbName, Path,
fun({_FullPath, DocId}, {Count, Acc}) ->
case Count >= MaxCount of
true -> {stop, {Count, Acc}};
false -> {ok, {Count + 1, [DocId | Acc]}}
end
end,
{0, []},
Profile
),
DocIds.
%% @doc Collect all document IDs (for full scan fallback)
%% MaxCount limits collection for early termination (use infinity for no limit)
collect_all_docids(StoreRef, DbName, infinity) ->
%% Use long_scan profile for full table scans (prefetch, avoid cache pollution)
barrel_store_rocksdb:fold_range(
StoreRef,
barrel_store_keys:doc_info_prefix(DbName),
barrel_store_keys:doc_info_end(DbName),
fun(Key, _Value, Acc) ->
DocId = barrel_store_keys:decode_doc_info_key(DbName, Key),
{ok, [DocId | Acc]}
end,
[],
long_scan
);
collect_all_docids(StoreRef, DbName, MaxCount) when is_integer(MaxCount) ->
%% Limited full scan with early termination
Profile = select_read_profile(MaxCount),
{_, DocIds} = barrel_store_rocksdb:fold_range(
StoreRef,
barrel_store_keys:doc_info_prefix(DbName),
barrel_store_keys:doc_info_end(DbName),
fun(Key, _Value, {Count, Acc}) ->
case Count >= MaxCount of
true -> {stop, {Count, Acc}};
false ->
DocId = barrel_store_keys:decode_doc_info_key(DbName, Key),
{ok, {Count + 1, [DocId | Acc]}}
end
end,
{0, []},
Profile
),
DocIds.
%% @doc Find a prefix condition for optimized interval scan
find_prefix_condition([]) ->
not_found;
find_prefix_condition([{prefix, Path, Prefix} | _]) ->
{ok, Path, Prefix};
find_prefix_condition([{'and', Nested} | Rest]) ->
case find_prefix_condition(Nested) of
{ok, _, _} = Found -> Found;
not_found -> find_prefix_condition(Rest)
end;
find_prefix_condition([_ | Rest]) ->
find_prefix_condition(Rest).
%% @doc Collect DocIds using optimized prefix interval scan
%% MaxCount limits collection for early termination (use infinity for no limit)
collect_docids_for_value_prefix(StoreRef, DbName, Path, Prefix, infinity) ->
%% Use short_range profile - prefix queries typically have moderate selectivity
barrel_ars_index:fold_prefix(
StoreRef, DbName, Path, Prefix,
fun({_FullPath, DocId}, Acc) -> {ok, [DocId | Acc]} end,
[],
short_range
);
collect_docids_for_value_prefix(StoreRef, DbName, Path, Prefix, MaxCount) when is_integer(MaxCount) ->
%% Limited prefix scan with early termination
Profile = select_read_profile(MaxCount),
{_, DocIds} = barrel_ars_index:fold_prefix(
StoreRef, DbName, Path, Prefix,
fun({_FullPath, DocId}, {Count, Acc}) ->
case Count >= MaxCount of
true -> {stop, {Count, Acc}};
false -> {ok, {Count + 1, [DocId | Acc]}}
end
end,
{0, []},
Profile
),
DocIds.
%% @doc Execute using multiple index lookups with intersection
%% Uses cardinality-ordered intersection for efficient multi-condition queries.
execute_multi_index(StoreRef, DbName, Plan, Snapshot) ->
#query_plan{conditions = Conditions, limit = Limit, offset = Offset,
include_docs = IncludeDocs, projections = Projections} = Plan,
%% Find all indexable conditions (including compare, exists, prefix)
IndexConditions = find_all_index_conditions(Conditions),
case IndexConditions of
[] ->
execute_full_scan(StoreRef, DbName, Plan, Snapshot);
_ ->
%% Check if all conditions are equality (can use bitmap optimization)
AllEquality = lists:all(fun({path, _, _}) -> true; (_) -> false end, IndexConditions),
case AllEquality of
true ->
%% All equality conditions
case order_by_cardinality(StoreRef, DbName, IndexConditions) of
[] ->
filter_and_project(StoreRef, DbName, [], Plan, Snapshot);
OrderedConditions ->
%% Check if LIMIT allows lazy iteration
RemainingConds = Conditions -- IndexConditions,
NeedsBody = IncludeDocs orelse needs_body_for_projection(Projections)
orelse RemainingConds =/= [],
case {Limit, NeedsBody, length(OrderedConditions)} of
{L, false, N} when is_integer(L), L > 0, N > 1 ->
%% Multiple equality conditions with LIMIT, no body fetch
%% Use lazy iteration for early termination
execute_multi_index_lazy(StoreRef, DbName, OrderedConditions, Plan, Snapshot);
{L, true, N} when is_integer(L), L > 0, N > 1 ->
%% Multiple equality conditions with LIMIT, needs body
%% Use lazy intersection then stream body
DocIds = intersect_lazy_with_limit(StoreRef, DbName, OrderedConditions, (L + Offset) * 3),
FilterPlan = Plan#query_plan{conditions = RemainingConds},
filter_and_project(StoreRef, DbName, DocIds, FilterPlan, Snapshot);
_ ->
%% Single condition or no LIMIT - use full intersection
execute_with_postings(StoreRef, DbName, OrderedConditions, Plan, Snapshot)
end
end;
false ->
%% Mixed conditions (equality + compare/exists/prefix)
%% Check if remaining conditions need body fetch
RemainingConds = Conditions -- IndexConditions,
NeedsBody = IncludeDocs orelse needs_body_for_projection(Projections)
orelse RemainingConds =/= [],
%% Optimize: for LIMIT queries with mixed conditions,
%% use lazy iteration with O(1) equality checks
case {Limit, NeedsBody} of
{L, false} when is_integer(L), L > 0 ->
%% Pure index query with LIMIT - use lazy intersection
execute_multi_index_lazy(StoreRef, DbName, IndexConditions, Plan, Snapshot);
{L, true} when is_integer(L), L > 0 ->
%% Body fetch with LIMIT - use lazy intersection then stream body
DocIds = intersect_lazy_with_limit(StoreRef, DbName, IndexConditions, (L + Offset) * 3),
FilterPlan = Plan#query_plan{conditions = RemainingConds},
filter_and_project(StoreRef, DbName, DocIds, FilterPlan, Snapshot);
_ ->
%% No limit - use standard intersection
IntersectedDocIds = intersect_docid_sets_v2(StoreRef, DbName, IndexConditions),
case NeedsBody of
false ->
Results0 = [#{<<"id">> => DocId} || DocId <- IntersectedDocIds],
Results = apply_offset_limit(Results0, Offset, Limit),
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, Results, LastSeq};
true ->
FilterPlan = Plan#query_plan{conditions = RemainingConds},
filter_and_project(StoreRef, DbName, IntersectedDocIds, FilterPlan, Snapshot)
end
end
end
end.
%% @doc Execute multi-condition query using V2 posting list intersection.
%% Uses native postings_intersect_all for O(min(n,m)) intersection performance.
execute_with_postings(StoreRef, DbName, IndexConditions, Plan, Snapshot) ->
%% Remaining conditions not verified by index (e.g., OR, logic vars)
#query_plan{conditions = AllConditions} = Plan,
RemainingConds = AllConditions -- IndexConditions,
FilterPlan = Plan#query_plan{conditions = RemainingConds},
%% Use V2 native posting list intersection
DocIds = intersect_docid_sets_v2(StoreRef, DbName, IndexConditions),
filter_and_project(StoreRef, DbName, DocIds, FilterPlan, Snapshot).
%% @doc Execute multi-index with lazy iteration for LIMIT queries.
%% Iterates the smallest condition and verifies others with O(1) lookups.
%% Complexity: O(limit / selectivity) instead of O(matching_docs)
execute_multi_index_lazy(StoreRef, DbName, Conditions, Plan, Snapshot) ->
#query_plan{limit = Limit, offset = Offset} = Plan,
MaxCollect = Limit + Offset,
DocIds = intersect_lazy_with_limit(StoreRef, DbName, Conditions, MaxCollect),
Results0 = [#{<<"id">> => DocId} || DocId <- DocIds],
Results = apply_offset_limit(Results0, Offset, Limit),
_ = Snapshot,
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, Results, LastSeq}.
%% @doc Lazy intersection with limit - iterates one condition and verifies others.
%% Strategy: ALWAYS prefer iterating non-equality conditions because:
%% - Non-equality (compare/exists/prefix) iteration uses fast fold with early termination
%% - Equality verification uses O(1) docid_has_value lookup
%% - Non-equality verification requires expensive doc_paths load
%%
%% Among non-equality conditions, choose the smallest estimated cardinality.
%% Among equality conditions (if no non-equality), choose smallest cardinality.
intersect_lazy_with_limit(StoreRef, DbName, Conditions, MaxCollect) ->
%% Separate: equality (O(1) verify) vs non-equality (fast iterate, slow verify)
{EqConds, NonEqConds} = lists:partition(fun
({path, _, _}) -> true;
(_) -> false
end, Conditions),
case NonEqConds of
[] when length(EqConds) > 1 ->
%% All equality with multiple conditions - iterate smallest posting list
%% and verify others with O(1) lookups, using lazy iterator (not loading full list)
{IterCond, RestEq} = pick_smallest_equality(StoreRef, DbName, EqConds),
iterate_with_lazy_verify(StoreRef, DbName, IterCond, RestEq, MaxCollect);
[] ->
%% Single equality - just get posting list with limit
[{path, Path, Value}] = EqConds,
collect_docids_for_path_limited(StoreRef, DbName, Path ++ [Value], MaxCollect);
[SingleNonEq] ->
%% One non-equality condition - iterate it, verify equalities
iterate_with_lazy_verify(StoreRef, DbName, SingleNonEq, EqConds, MaxCollect);
_MultipleNonEq ->
%% Two or more non-equality conditions. Per-docid verification of a
%% second range/compare via doc_paths is unreliable, so collect each
%% condition's docid set and intersect (the proven non-lazy path),
%% then truncate to the limit. Returns a sorted list, consistent with
%% the continuation path.
lists:sublist(
intersect_docid_sets_v2(StoreRef, DbName, Conditions), MaxCollect)
end.
%% @private Pick the equality condition with smallest cardinality to iterate
pick_smallest_equality(StoreRef, DbName, EqConds) ->
WithCardinality = lists:map(fun({path, Path, Value} = Cond) ->
FullPath = Path ++ [Value],
Card = case barrel_ars_index:get_path_cardinality(StoreRef, DbName, FullPath) of
{ok, C} -> C;
_ -> 500000
end,
{Card, Cond}
end, EqConds),
Sorted = lists:keysort(1, WithCardinality),
[{_, Smallest} | Rest] = Sorted,
{Smallest, [C || {_, C} <- Rest]}.
%% @private Iterate a condition and lazily verify others with early termination
iterate_with_lazy_verify(StoreRef, DbName, IterCond, VerifyConds, MaxCollect) ->
%% Callback for path iteration (receives {Path, DocId})
PathFoldFun = fun({_Path, DocId}, {Count, Seen, Acc}) ->
process_docid_for_verify(StoreRef, DbName, DocId, VerifyConds, MaxCollect,
Count, Seen, Acc)
end,
%% Callback for value_index iteration (receives DocId directly)
ValueIndexFoldFun = fun(DocId, {Count, Seen, Acc}) ->
process_docid_for_verify(StoreRef, DbName, DocId, VerifyConds, MaxCollect,
Count, Seen, Acc)
end,
{_, _, DocIds} = case IterCond of
{compare, Path, Op, Value} ->
barrel_ars_index:fold_path_values_compare(
StoreRef, DbName, Path, Op, Value, PathFoldFun, {0, #{}, []});
{exists, Path} ->
barrel_ars_index:fold_path(
StoreRef, DbName, Path, PathFoldFun, {0, #{}, []}, short_range);
{prefix, Path, Prefix} ->
barrel_ars_index:fold_prefix(
StoreRef, DbName, Path, Prefix, PathFoldFun, {0, #{}, []}, short_range);
{path, Path, Value} ->
%% Equality - iterate value_index keys (individual DocId per key)
%% Much faster than fold_posting for LIMIT queries since we read
%% one key at a time instead of loading the entire posting list
barrel_ars_index:fold_value_index(
StoreRef, DbName, Value, Path, ValueIndexFoldFun, {0, #{}, []})
end,
lists:reverse(DocIds).
%% @private Process a single DocId for verification
process_docid_for_verify(StoreRef, DbName, DocId, VerifyConds, MaxCollect, Count, Seen, Acc) ->
case maps:is_key(DocId, Seen) of
true ->
{ok, {Count, Seen, Acc}};
false ->
NewSeen = Seen#{DocId => true},
case lazy_verify_all(StoreRef, DbName, DocId, VerifyConds) of
true ->
NewCount = Count + 1,
NewAcc = [DocId | Acc],
case NewCount >= MaxCollect of
true -> {stop, {NewCount, NewSeen, NewAcc}};
false -> {ok, {NewCount, NewSeen, NewAcc}}
end;
false ->
{ok, {Count, NewSeen, Acc}}
end
end.
%% @private Verify all conditions for a DocId using O(1) index lookups
lazy_verify_all(_StoreRef, _DbName, _DocId, []) ->
true;
lazy_verify_all(StoreRef, DbName, DocId, [{path, Path, Value} | Rest]) ->
case barrel_ars_index:docid_has_value(StoreRef, DbName, Path, Value, DocId) of
true -> lazy_verify_all(StoreRef, DbName, DocId, Rest);
false -> false
end;
lazy_verify_all(StoreRef, DbName, DocId, [{compare, Path, Op, Value} | Rest]) ->
%% For compare in verification, we need to check doc_paths (slower)
%% But this is rare - usually compare is the iterator
case barrel_ars_index:docid_satisfies_compare(StoreRef, DbName, DocId, Path, Op, Value) of
true -> lazy_verify_all(StoreRef, DbName, DocId, Rest);
false -> false
end;
lazy_verify_all(StoreRef, DbName, DocId, [{exists, Path} | Rest]) ->
case barrel_ars_index:docid_get_value(StoreRef, DbName, DocId, Path) of
{ok, _} -> lazy_verify_all(StoreRef, DbName, DocId, Rest);
_ -> false
end;
lazy_verify_all(StoreRef, DbName, DocId, [{prefix, Path, Prefix} | Rest]) ->
case barrel_ars_index:docid_get_value(StoreRef, DbName, DocId, Path) of
{ok, Val} when is_binary(Val) ->
PrefixLen = byte_size(Prefix),
case Val of
<<Prefix:PrefixLen/binary, _/binary>> ->
lazy_verify_all(StoreRef, DbName, DocId, Rest);
_ -> false
end;
_ -> false
end;
lazy_verify_all(_StoreRef, _DbName, _DocId, [_ | _]) ->
%% Unknown condition type - fail safe
false.
%% @doc Order conditions by cardinality (smallest first) for optimal intersection.
%% Returns empty list if any condition has 0 cardinality (short-circuit).
order_by_cardinality(StoreRef, DbName, Conditions) ->
%% Build keys for all conditions
Keys = [barrel_store_keys:path_stats_key(DbName, Path ++ [Value])
|| {path, Path, Value} <- Conditions],
%% Batch fetch all cardinalities with multi_get
Results = barrel_store_rocksdb:multi_get(StoreRef, Keys),
%% Parse results and associate with conditions
WithCardinality = lists:zipwith(
fun(Cond, Result) ->
Count = case Result of
{ok, CountBin} -> max(0, binary_to_integer(CountBin));
not_found -> 0
end,
{Count, Cond}
end,
Conditions, Results
),
%% Check for any zero cardinality (short-circuit)
case lists:any(fun({0, _}) -> true; (_) -> false end, WithCardinality) of
true ->
[];
false ->
%% Sort by cardinality ascending and extract conditions
Sorted = lists:keysort(1, WithCardinality),
[Cond || {_, Cond} <- Sorted]
end.
%% @doc Execute full document scan (slowest, last resort, using wide column entity)
execute_full_scan(StoreRef, DbName, Plan, Snapshot) ->
%% Collect all doc IDs by scanning doc_entity keys
%% Use long_scan profile for full table scans (prefetch, avoid cache pollution)
StartKey = barrel_store_keys:doc_entity_prefix(DbName),
EndKey = barrel_store_keys:doc_entity_end(DbName),
PrefixLen = byte_size(StartKey),
DocIds = barrel_store_rocksdb:fold_range(
StoreRef,
StartKey,
EndKey,
fun(Key, _Value, Acc) ->
%% Extract DocId from key (after prefix)
DocId = binary:part(Key, PrefixLen, byte_size(Key) - PrefixLen),
{ok, [DocId | Acc]}
end,
[],
long_scan
),
filter_and_project(StoreRef, DbName, DocIds, Plan, Snapshot).
%% @doc Collect document IDs matching an exact path+value
%% Returns SORTED list for intersection operations.
%% Uses bucketed posting lists for efficient sorted iteration.
collect_docids_for_path(StoreRef, DbName, FullPath) ->
%% Extract value (last element) and field path (rest)
case split_path_value(FullPath) of
{[], _} ->
%% Single element path - fall back to regular posting list
lists:sort(barrel_ars_index:get_posting_list(StoreRef, DbName, FullPath));
{FieldPath, Value} ->
%% Use bucketed posting list for sorted iteration
barrel_ars_index:get_bucketed_posting_list(StoreRef, DbName, Value, FieldPath)
end.
%% @private Split path into field path and value
split_path_value(Path) ->
ReversedPath = lists:reverse(Path),
[Value | ReversedFieldPath] = ReversedPath,
{lists:reverse(ReversedFieldPath), Value}.
%% @doc Collect document IDs with early termination at MaxCount
%% For LIMIT pushdown optimization
%% Uses direct posting list lookup with sublist - O(1) fetch + truncate
collect_docids_for_path_limited(StoreRef, DbName, FullPath, MaxCount) ->
AllDocIds = barrel_ars_index:get_posting_list(StoreRef, DbName, FullPath),
lists:sublist(AllDocIds, MaxCount).
%% @doc Collect document IDs matching a path prefix
%% MaxCount limits collection for early termination (use infinity for no limit)
collect_docids_for_prefix(StoreRef, DbName, PathPrefix, infinity) ->
%% Use short_range profile - prefix scans have unknown but typically moderate size
barrel_ars_index:fold_path(
StoreRef, DbName, PathPrefix,
fun({_Path, DocId}, Acc) -> {ok, [DocId | Acc]} end,
[],
short_range
);
collect_docids_for_prefix(StoreRef, DbName, PathPrefix, MaxCount) when is_integer(MaxCount) ->
%% Limited prefix scan with early termination
Profile = select_read_profile(MaxCount),
{_, DocIds} = barrel_ars_index:fold_path(
StoreRef, DbName, PathPrefix,
fun({_Path, DocId}, {Count, Acc}) ->
case Count >= MaxCount of
true -> {stop, {Count, Acc}};
false -> {ok, {Count + 1, [DocId | Acc]}}
end
end,
{0, []},
Profile
),
DocIds.
%% @doc Filter results by remaining conditions and apply projections.
%% Automatically chooses between streaming, pipelined, or batch fetch based on:
%% - Number of DocIds vs limit (streaming when we have 3x+ more than needed)
%% - Presence of ORDER BY (no streaming with ORDER BY, need all results for sorting)
%% - Large unbounded queries use pipelined execution for better throughput
filter_and_project(StoreRef, DbName, DocIds, Plan, Snapshot) ->
%% Remove duplicates
UniqueDocIds = lists:usort(DocIds),
DocCount = length(UniqueDocIds),
%% Choose strategy: streaming > pipelined > batch
StreamingCheck = should_use_streaming_fetch(DocCount, Plan),
PipelineCheck = should_use_pipelining(DocCount, Plan),
case StreamingCheck of
true ->
%% Streaming: fetch in batches with early termination (for limited queries)
filter_and_project_streamed(StoreRef, DbName, UniqueDocIds, Plan, Snapshot);
false ->
case PipelineCheck of
true ->
%% Pipelined: parallel body fetches for large unbounded queries
filter_and_project_pipelined(StoreRef, DbName, UniqueDocIds, Plan, Snapshot);
false ->
%% Batch: fetch all at once (original behavior)
filter_and_project_batch(StoreRef, DbName, UniqueDocIds, Plan, Snapshot)
end
end.
%% @doc Decide whether to use streaming fetch.
%% Returns true when:
%% 1. Query has a limit (unbounded queries need all results)
%% 2. No ORDER BY (need all results for sorting)
%% 3. DocIds count is 3x+ more than limit+offset
should_use_streaming_fetch(DocCount, #query_plan{limit = Limit, offset = Offset, order = Order}) ->
case {Limit, Order} of
{undefined, _} ->
%% No limit - need all results
false;
{_, [_ | _]} ->
%% Has ORDER BY - need all candidates for sorting
false;
{L, []} when is_integer(L), DocCount > (L + Offset) * ?STREAM_THRESHOLD_RATIO ->
%% Has limit, no ORDER BY, many more docs than needed - use streaming
true;
_ ->
false
end.
%% @doc Original batch fetch strategy - fetches all DocIds at once.
filter_and_project_batch(StoreRef, DbName, UniqueDocIds, Plan, Snapshot) ->
#query_plan{
conditions = Conditions,
bindings = Bindings,
projections = Projections,
order = Order,
limit = Limit,
offset = Offset,
include_docs = IncludeDocs,
decoder_fun = DecoderFun
} = Plan,
%% Batch fetch documents using multi_get with snapshot for consistency
Results0 = batch_fetch_and_filter(StoreRef, DbName, UniqueDocIds,
Conditions, Bindings, Projections, IncludeDocs, DecoderFun, Snapshot),
%% Apply ordering
Results1 = apply_order(Results0, Order),
%% Apply offset and limit
Results2 = apply_offset_limit(Results1, Offset, Limit),
%% Get last sequence (for consistency tracking)
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, Results2, LastSeq}.
%% @doc Streaming batch fetch - fetches DocIds in small batches with early termination.
%% Stops as soon as we have limit + offset valid results.
%% Uses adaptive batch sizing based on observed filter selectivity.
filter_and_project_streamed(StoreRef, DbName, DocIds, Plan, Snapshot) ->
#query_plan{
conditions = Conditions,
bindings = Bindings,
projections = Projections,
limit = Limit,
offset = Offset,
include_docs = IncludeDocs,
decoder_fun = DecoderFun
} = Plan,
%% Calculate target: how many valid results we need
Target = Limit + Offset,
%% Initial batch size with over-collect factor for filter conditions
InitialBatchSize = calculate_initial_batch_size(Target, Conditions),
%% Stream batches until we have enough
{Results, _Stats} = stream_batches(
DbName, DocIds,
Conditions, Bindings, Projections, IncludeDocs, DecoderFun, Snapshot,
Target, InitialBatchSize, [], #stream_stats{}
),
%% Apply offset and limit (no ORDER BY in streaming path)
FinalResults = apply_offset_limit(Results, Offset, Limit),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, FinalResults, LastSeq}.
%% @doc Stream batches with early termination.
stream_batches(_DbName, [], _Conds, _Bindings, _Projs, _IncludeDocs, _DecoderFun,
_Snapshot, _Target, _BatchSize, Acc, Stats) ->
%% Exhausted all DocIds
{lists:reverse(Acc), Stats};
stream_batches(_DbName, _DocIds, _Conds, _Bindings, _Projs, _IncludeDocs, _DecoderFun,
_Snapshot, Target, _BatchSize, Acc, Stats) when length(Acc) >= Target ->
%% Reached target - stop early (this is the key optimization!)
{lists:reverse(Acc), Stats};
stream_batches(DbName, DocIds, Conds, Bindings, Projs, IncludeDocs, DecoderFun,
Snapshot, Target, BatchSize, Acc, Stats) ->
%% Take next batch
{Batch, Remaining} = safe_split(BatchSize, DocIds),
%% Select read profile based on batch size
Profile = select_body_read_profile(length(Batch)),
%% Fetch and filter this batch
{BatchResults, DecodedCount} = fetch_and_filter_batch_streaming(
DbName, Batch, Conds, Bindings, Projs, IncludeDocs, DecoderFun, Snapshot, Profile
),
%% Update running stats
MatchedCount = length(BatchResults),
NewStats = update_stream_stats(Stats, length(Batch), DecodedCount, MatchedCount),
%% Calculate next batch size based on observed selectivity
Needed = Target - length(Acc) - MatchedCount,
NextBatchSize = calculate_next_batch_size(Needed, NewStats),
%% Continue with remaining DocIds
stream_batches(DbName, Remaining, Conds, Bindings, Projs, IncludeDocs, DecoderFun,
Snapshot, Target, NextBatchSize, BatchResults ++ Acc, NewStats).
%% @doc Fetch and filter a single batch for streaming.
%% Returns {Results, DecodedCount}.
fetch_and_filter_batch_streaming(DbName, DocIds, Conds, Bindings, Projs,
IncludeDocs, DecoderFun, Snapshot, Profile) ->
case DocIds of
[] ->
{[], 0};
_ ->
%% Use snapshot-aware fetch with read profile
%% Snapshot is always valid in this code path (created by execute/3)
DocBodyResults = barrel_doc_body_store:multi_get_current_bodies_with_snapshot(
DbName, DocIds, Snapshot, Profile
),
%% Process each doc
Pairs = lists:zip(DocIds, DocBodyResults),
FilterFun = fun({DocId, BodyResult}) ->
process_doc_body(DocId, BodyResult, Conds, Bindings, Projs, IncludeDocs, DecoderFun)
end,
Results = lists:filtermap(FilterFun, Pairs),
%% Count successful decodes (not not_found or errors)
DecodedCount = length([ok || {ok, _} <- DocBodyResults]),
{Results, DecodedCount}
end.
%% @doc Calculate initial batch size based on target and conditions.
calculate_initial_batch_size(Target, []) ->
%% No filter conditions - all docs will match
min(?STREAM_MAX_BATCH, max(?STREAM_MIN_BATCH, Target + 10));
calculate_initial_batch_size(Target, _Conditions) ->
%% Has filter conditions - over-collect assuming 50% selectivity
EstimatedNeeded = round(Target / ?STREAM_INITIAL_SELECTIVITY),
min(?STREAM_MAX_BATCH, max(?STREAM_MIN_BATCH, EstimatedNeeded)).
%% @doc Calculate next batch size based on observed selectivity.
calculate_next_batch_size(Needed, _Stats) when Needed =< 0 ->
0;
calculate_next_batch_size(Needed, #stream_stats{selectivity = Selectivity}) ->
%% Adjust for observed selectivity with 20% safety margin
SafeSelectivity = max(0.1, Selectivity), %% Floor at 10%
EstimatedNeeded = round(Needed / SafeSelectivity * 1.2),
min(?STREAM_MAX_BATCH, max(?STREAM_MIN_BATCH, EstimatedNeeded)).
%% @doc Update streaming stats after processing a batch.
update_stream_stats(Stats, FetchedCount, DecodedCount, MatchedCount) ->
#stream_stats{fetched = F, decoded = D, matched = M} = Stats,
TotalDecoded = D + DecodedCount,
TotalMatched = M + MatchedCount,
NewSelectivity = case TotalDecoded > 0 of
true -> TotalMatched / TotalDecoded;
false -> ?STREAM_INITIAL_SELECTIVITY
end,
#stream_stats{
fetched = F + FetchedCount,
decoded = TotalDecoded,
matched = TotalMatched,
selectivity = NewSelectivity
}.
%% @doc Select read profile based on batch size.
select_body_read_profile(BatchSize) when BatchSize =< 50 ->
point;
select_body_read_profile(BatchSize) when BatchSize =< 200 ->
short_range;
select_body_read_profile(_) ->
long_scan.
%% @doc Safe split that handles lists shorter than N.
safe_split(N, List) when length(List) =< N ->
{List, []};
safe_split(N, List) ->
lists:split(N, List).
%% @doc Fetch documents for chunked query results when include_docs is true.
%% Takes a list of doc IDs and returns results with doc bodies included.
maybe_fetch_docs_chunked(_DbName, DocIds, false, _DecoderFun) ->
%% No include_docs - just return ID-only results
[#{<<"id">> => DocId} || DocId <- DocIds];
maybe_fetch_docs_chunked(DbName, DocIds, true, DecoderFun) ->
case DocIds of
[] -> [];
_ ->
%% Fetch document bodies
DocBodyResults = barrel_doc_body_store:multi_get_current_bodies(DbName, DocIds),
Pairs = lists:zip(DocIds, DocBodyResults),
lists:filtermap(
fun({DocId, {ok, CborBin}}) ->
Doc = case DecoderFun of
undefined -> barrel_docdb_codec_cbor:decode_any(CborBin);
Fun -> Fun(CborBin)
end,
{true, #{<<"id">> => DocId, <<"doc">> => Doc}};
({DocId, not_found}) ->
%% Doc deleted - return ID only with deleted flag
{true, #{<<"id">> => DocId, <<"deleted">> => true}};
({_DocId, {error, _}}) ->
false
end,
Pairs)
end.
%% @doc Batch fetch documents using direct body fetch (no entity lookup needed).
%% With current body stored without revision key, we can skip the entity fetch entirely
%% for include_docs queries, providing a major performance improvement.
%%
%% For large result sets (>50 docs), uses parallel decode + condition matching.
batch_fetch_and_filter(_StoreRef, DbName, DocIds, Conditions, Bindings, Projections, IncludeDocs, DecoderFun, _Snapshot) ->
case DocIds of
[] -> [];
_ ->
%% Direct body fetch - no entity lookup needed!
%% Current body is stored at doc_body(DbName, DocId) without revision
T0 = erlang:monotonic_time(microsecond),
DocBodyResults = barrel_doc_body_store:multi_get_current_bodies(DbName, DocIds),
T1 = erlang:monotonic_time(microsecond),
put(profile_docbody_fetch, pdict_get(profile_docbody_fetch, 0) + (T1 - T0)),
%% Decode and condition matching + projection
%% Use parallel processing for large result sets
T2 = erlang:monotonic_time(microsecond),
Pairs = lists:zip(DocIds, DocBodyResults),
FilterFun = fun({DocId, BodyResult}) ->
process_doc_body(DocId, BodyResult, Conditions, Bindings, Projections, IncludeDocs, DecoderFun)
end,
Results = case length(DocIds) of
N when N > 100 ->
%% Parallel decode + filter for batches > 100 docs
%% Pool-based execution gives 1.9-6x speedup
%% Use max 4 workers (like PostgreSQL's conservative default)
barrel_parallel:pfiltermap(FilterFun, Pairs, 4);
_ ->
%% Sequential for small batches
lists:filtermap(FilterFun, Pairs)
end,
T3 = erlang:monotonic_time(microsecond),
put(profile_deser_match, pdict_get(profile_deser_match, 0) + (T3 - T2)),
Results
end.
%% @private Process a single document body for filtermap
process_doc_body(DocId, {ok, CborBin}, Conditions, Bindings, Projections, IncludeDocs, DecoderFun) ->
%% Decode to map (handles both indexed and plain CBOR)
Doc = barrel_docdb_codec_cbor:decode_any(CborBin),
case matches_conditions(Doc, Conditions, Bindings) of
{true, BoundVars} ->
Result = project_result(CborBin, Doc, DocId, Projections, BoundVars, IncludeDocs, DecoderFun),
{true, Result};
false ->
false
end;
process_doc_body(_DocId, not_found, _Conditions, _Bindings, _Projections, _IncludeDocs, _DecoderFun) ->
%% Doc was deleted or doesn't exist
false;
process_doc_body(_DocId, {error, _}, _Conditions, _Bindings, _Projections, _IncludeDocs, _DecoderFun) ->
false.
%% @doc Check if a document matches all conditions
matches_conditions(Doc, Conditions, InitialBindings) ->
matches_conditions(Doc, Conditions, InitialBindings, #{}).
matches_conditions(_Doc, [], _Bindings, BoundVars) ->
{true, BoundVars};
matches_conditions(Doc, [Condition | Rest], Bindings, BoundVars) ->
case match_condition(Doc, Condition, Bindings, BoundVars) of
{true, NewBoundVars} ->
matches_conditions(Doc, Rest, Bindings, NewBoundVars);
false ->
false
end.
match_condition(Doc, {path, Path, Value}, _Bindings, BoundVars) ->
case get_path_value(Doc, Path) of
{ok, DocValue} ->
case is_logic_var(Value) of
true ->
%% Bind the variable
{true, BoundVars#{Value => DocValue}};
false ->
case DocValue =:= Value of
true -> {true, BoundVars};
false -> false
end
end;
not_found ->
false
end;
match_condition(Doc, {compare, Path, Op, Value}, _Bindings, BoundVars) ->
case get_path_value(Doc, Path) of
{ok, DocValue} ->
CompareValue = case is_logic_var(Value) of
true -> maps:get(Value, BoundVars, undefined);
false -> Value
end,
case compare_values(DocValue, Op, CompareValue) of
true -> {true, BoundVars};
false -> false
end;
not_found ->
false
end;
match_condition(Doc, {'and', Conditions}, Bindings, BoundVars) ->
matches_conditions(Doc, Conditions, Bindings, BoundVars);
match_condition(Doc, {'or', Conditions}, Bindings, BoundVars) ->
match_any(Doc, Conditions, Bindings, BoundVars);
match_condition(Doc, {'not', Condition}, Bindings, BoundVars) ->
case match_condition(Doc, Condition, Bindings, BoundVars) of
{true, _} -> false;
false -> {true, BoundVars}
end;
match_condition(Doc, {in, Path, Values}, _Bindings, BoundVars) ->
case get_path_value(Doc, Path) of
{ok, DocValue} ->
case lists:member(DocValue, Values) of
true -> {true, BoundVars};
false -> false
end;
not_found ->
false
end;
match_condition(Doc, {contains, Path, Value}, _Bindings, BoundVars) ->
case get_path_value(Doc, Path) of
{ok, DocValue} when is_list(DocValue) ->
case lists:member(Value, DocValue) of
true -> {true, BoundVars};
false -> false
end;
_ ->
false
end;
match_condition(Doc, {exists, Path}, _Bindings, BoundVars) ->
case get_path_value(Doc, Path) of
{ok, _} -> {true, BoundVars};
not_found -> false
end;
match_condition(Doc, {missing, Path}, _Bindings, BoundVars) ->
case get_path_value(Doc, Path) of
{ok, _} -> false;
not_found -> {true, BoundVars}
end;
match_condition(Doc, {regex, Path, Pattern}, _Bindings, BoundVars) ->
case get_path_value(Doc, Path) of
{ok, DocValue} when is_binary(DocValue) ->
case re:run(DocValue, Pattern) of
{match, _} -> {true, BoundVars};
nomatch -> false
end;
_ ->
false
end;
match_condition(Doc, {prefix, Path, Prefix}, _Bindings, BoundVars) ->
case get_path_value(Doc, Path) of
{ok, DocValue} when is_binary(DocValue) ->
PrefixLen = byte_size(Prefix),
case DocValue of
<<Prefix:PrefixLen/binary, _/binary>> -> {true, BoundVars};
_ -> false
end;
_ ->
false
end;
match_condition(_Doc, {error, _}, _Bindings, _BoundVars) ->
false.
match_any(_Doc, [], _Bindings, _BoundVars) ->
false;
match_any(Doc, [Condition | Rest], Bindings, BoundVars) ->
case match_condition(Doc, Condition, Bindings, BoundVars) of
{true, NewBoundVars} -> {true, NewBoundVars};
false -> match_any(Doc, Rest, Bindings, BoundVars)
end.
%% @doc Get a value from a document at the given path
get_path_value(Doc, []) ->
{ok, Doc};
get_path_value(Doc, [Key | Rest]) when is_map(Doc), is_binary(Key) ->
case maps:find(Key, Doc) of
{ok, Value} -> get_path_value(Value, Rest);
error -> not_found
end;
get_path_value(Doc, [Index | Rest]) when is_list(Doc), is_integer(Index) ->
case Index < length(Doc) of
true ->
Value = lists:nth(Index + 1, Doc), % 0-based to 1-based
get_path_value(Value, Rest);
false ->
not_found
end;
get_path_value(_, _) ->
not_found.
%% @doc Compare two values with an operator
compare_values(A, '>', B) when is_number(A), is_number(B) -> A > B;
compare_values(A, '<', B) when is_number(A), is_number(B) -> A < B;
compare_values(A, '>=', B) when is_number(A), is_number(B) -> A >= B;
compare_values(A, '=<', B) when is_number(A), is_number(B) -> A =< B;
compare_values(A, '=/=', B) -> A =/= B;
compare_values(A, '==', B) -> A =:= B;
compare_values(A, '>', B) when is_binary(A), is_binary(B) -> A > B;
compare_values(A, '<', B) when is_binary(A), is_binary(B) -> A < B;
compare_values(A, '>=', B) when is_binary(A), is_binary(B) -> A >= B;
compare_values(A, '=<', B) when is_binary(A), is_binary(B) -> A =< B;
compare_values(_, _, _) -> false.
%% @doc Project result fields from document
%% CborBin is the raw binary, Doc is decoded map (for projections)
%% DecoderFun formats the doc for include_docs output
project_result(CborBin, Doc, DocId, Projections, BoundVars, IncludeDocs, DecoderFun) ->
Result0 = #{<<"id">> => DocId},
Result1 = case IncludeDocs of
true ->
FormattedDoc = DecoderFun(CborBin),
Result0#{<<"doc">> => FormattedDoc};
false -> Result0
end,
%% Add projected fields/variables
lists:foldl(
fun('*', Acc) ->
%% Include all bound variables
maps:fold(fun(Var, Val, A) ->
VarName = atom_to_binary(Var, utf8),
A#{VarName => Val}
end, Acc, BoundVars);
(Var, Acc) when is_atom(Var) ->
case is_logic_var(Var) of
true ->
VarName = atom_to_binary(Var, utf8),
case maps:find(Var, BoundVars) of
{ok, Val} -> Acc#{VarName => Val};
error -> Acc
end;
false ->
Acc
end;
(Path, Acc) when is_list(Path) ->
case get_path_value(Doc, Path) of
{ok, Val} ->
PathKey = path_to_key(Path),
Acc#{PathKey => Val};
not_found ->
Acc
end
end,
Result1,
Projections
).
path_to_key(Path) ->
iolist_to_binary(lists:join(<<"/">>, [to_bin(P) || P <- Path])).
to_bin(B) when is_binary(B) -> B;
to_bin(N) when is_integer(N) -> integer_to_binary(N).
%% @doc Apply ordering to results
apply_order(Results, []) ->
Results;
apply_order(Results, [{Field, Dir} | _Rest]) ->
%% Simple single-field ordering for now
Sorted = lists:sort(
fun(A, B) ->
ValA = get_sort_value(A, Field),
ValB = get_sort_value(B, Field),
case Dir of
asc -> ValA =< ValB;
desc -> ValA >= ValB
end
end,
Results
),
Sorted.
get_sort_value(Result, Field) when is_atom(Field) ->
FieldKey = atom_to_binary(Field, utf8),
maps:get(FieldKey, Result, null);
get_sort_value(Result, Path) when is_list(Path) ->
PathKey = path_to_key(Path),
maps:get(PathKey, Result, null).
%% @doc Apply offset and limit
apply_offset_limit(Results, Offset, Limit) ->
Results1 = case Offset > 0 of
true -> lists:nthtail(min(Offset, length(Results)), Results);
false -> Results
end,
case Limit of
undefined -> Results1;
N -> lists:sublist(Results1, N)
end.
%% @doc Find first equality condition
find_first_equality([]) ->
not_found;
find_first_equality([{path, Path, Value} | _]) when not is_atom(Value) ->
{ok, {path, Path, Value}};
find_first_equality([{path, Path, Value} | Rest]) ->
case is_logic_var(Value) of
false -> {ok, {path, Path, Value}};
true -> find_first_equality(Rest)
end;
find_first_equality([{'and', Nested} | Rest]) ->
case find_first_equality(Nested) of
{ok, _} = Result -> Result;
not_found -> find_first_equality(Rest)
end;
find_first_equality([_ | Rest]) ->
find_first_equality(Rest).
%% @doc Find all index-friendly conditions (equality, compare, exists, prefix)
find_all_index_conditions(Conditions) ->
find_all_index_conditions(Conditions, []).
find_all_index_conditions([], Acc) ->
lists:reverse(Acc);
find_all_index_conditions([{path, Path, Value} | Rest], Acc) ->
case is_logic_var(Value) of
false ->
find_all_index_conditions(Rest, [{path, Path, Value} | Acc]);
true ->
find_all_index_conditions(Rest, Acc)
end;
find_all_index_conditions([{compare, Path, Op, Value} | Rest], Acc)
when Op =:= '>' orelse Op =:= '<' orelse Op =:= '>=' orelse Op =:= '=<' ->
case is_logic_var(Value) of
false ->
find_all_index_conditions(Rest, [{compare, Path, Op, Value} | Acc]);
true ->
find_all_index_conditions(Rest, Acc)
end;
find_all_index_conditions([{exists, Path} | Rest], Acc) ->
find_all_index_conditions(Rest, [{exists, Path} | Acc]);
find_all_index_conditions([{prefix, Path, Prefix} | Rest], Acc) ->
find_all_index_conditions(Rest, [{prefix, Path, Prefix} | Acc]);
find_all_index_conditions([{'and', Nested} | Rest], Acc) ->
NestedConds = find_all_index_conditions(Nested),
find_all_index_conditions(Rest, NestedConds ++ Acc);
find_all_index_conditions([_ | Rest], Acc) ->
find_all_index_conditions(Rest, Acc).
%% @doc Find best path for scanning
find_best_scan_path([]) ->
not_found;
find_best_scan_path([{path, Path, _} | _]) ->
{ok, Path};
find_best_scan_path([{compare, Path, _, _} | _]) ->
{ok, Path};
find_best_scan_path([{prefix, Path, _} | _]) ->
{ok, Path};
find_best_scan_path([{'and', Nested} | Rest]) ->
case find_best_scan_path(Nested) of
{ok, _} = Result -> Result;
not_found -> find_best_scan_path(Rest)
end;
find_best_scan_path([_ | Rest]) ->
find_best_scan_path(Rest).
%%====================================================================
%% Pipelined Execution
%%====================================================================
%% @doc Execute query with pipelined index iteration and body fetching.
%% Overlaps index iteration with body fetching for better throughput.
%%
%% Architecture:
%% - Main process iterates index in batches
%% - For each batch, spawns async body fetch process
%% - Continues iterating while body fetches are in progress
%% - Collects results at the end
%%
%% Benefits:
%% - Overlaps I/O: index iteration and body fetch happen concurrently
%% - Better resource utilization: multiple RocksDB reads in flight
%% - Reduced total latency for large result sets
%% - Bounded concurrency: limits memory usage with concurrent queries
filter_and_project_pipelined(StoreRef, DbName, DocIds, Plan, Snapshot) ->
#query_plan{
conditions = Conditions,
bindings = Bindings,
projections = Projections,
order = Order,
limit = Limit,
offset = Offset,
include_docs = IncludeDocs,
decoder_fun = DecoderFun
} = Plan,
%% Split DocIds into batches
UniqueDocIds = lists:usort(DocIds),
Batches = split_into_batches(UniqueDocIds, ?PIPELINE_BATCH_SIZE),
NumBatches = length(Batches),
%% Use bounded concurrency to limit memory with concurrent queries
%% Spawn initial batch of fetchers (up to MAX_INFLIGHT)
Parent = self(),
FetchCtx = #{
db_name => DbName,
conditions => Conditions,
bindings => Bindings,
projections => Projections,
include_docs => IncludeDocs,
decoder_fun => DecoderFun,
snapshot => Snapshot,
parent => Parent
},
%% Execute with bounded concurrency
Results0 = execute_bounded_pipeline(Batches, FetchCtx, NumBatches),
%% Results are already in order (collected in batch order)
%% Apply ordering if needed
Results1 = apply_order(Results0, Order),
%% Apply offset and limit
Results2 = apply_offset_limit(Results1, Offset, Limit),
%% Get last sequence
LastSeq = barrel_changes:get_last_seq(StoreRef, DbName),
{ok, Results2, LastSeq}.
%% @doc Execute pipeline with bounded concurrency.
%% Only spawns up to MAX_INFLIGHT fetchers at a time.
%% As each completes, spawns the next batch if available.
execute_bounded_pipeline(Batches, FetchCtx, NumBatches) ->
%% Create indexed batch list
IndexedBatches = lists:zip(lists:seq(1, NumBatches), Batches),
%% Spawn initial fetchers (up to MAX_INFLIGHT)
{InitialBatches, RemainingBatches} = safe_split(?PIPELINE_MAX_INFLIGHT, IndexedBatches),
InitialRefs = spawn_fetchers(InitialBatches, FetchCtx),
%% Collect results with bounded spawning
collect_bounded_results(InitialRefs, RemainingBatches, FetchCtx, #{}, NumBatches).
%% @doc Spawn fetchers for a list of indexed batches.
spawn_fetchers(IndexedBatches, FetchCtx) ->
#{parent := Parent} = FetchCtx,
lists:map(fun({BatchNum, Batch}) ->
Ref = make_ref(),
spawn_link(fun() ->
Results = fetch_batch_for_pipeline_ctx(Batch, FetchCtx),
Parent ! {pipeline_result, Ref, BatchNum, Results}
end),
{Ref, BatchNum}
end, IndexedBatches).
%% @doc Fetch using context map.
fetch_batch_for_pipeline_ctx(DocIds, FetchCtx) ->
#{db_name := DbName, conditions := Conditions, bindings := Bindings,
projections := Projections, include_docs := IncludeDocs,
decoder_fun := DecoderFun, snapshot := Snapshot} = FetchCtx,
fetch_batch_for_pipeline(DbName, DocIds, Conditions, Bindings,
Projections, IncludeDocs, DecoderFun, Snapshot).
%% @doc Collect results with bounded concurrency.
%% When a fetcher completes, spawn next batch if available.
collect_bounded_results([], [], _FetchCtx, Results, NumBatches) ->
%% All done - flatten in order
lists:flatmap(fun(N) ->
maps:get(N, Results, [])
end, lists:seq(1, NumBatches));
collect_bounded_results(InFlight, Remaining, FetchCtx, Results, NumBatches) ->
Timeout = application:get_env(barrel_docdb, query_timeout_ms, 30000),
receive
{pipeline_result, Ref, BatchNum, BatchResults} ->
case lists:keytake(Ref, 1, InFlight) of
{value, _, RestInFlight} ->
NewResults = Results#{BatchNum => BatchResults},
case Remaining of
[] ->
collect_bounded_results(RestInFlight, [], FetchCtx, NewResults, NumBatches);
[NextBatch | RestRemaining] ->
NewRefs = spawn_fetchers([NextBatch], FetchCtx),
collect_bounded_results(NewRefs ++ RestInFlight, RestRemaining, FetchCtx, NewResults, NumBatches)
end;
false ->
collect_bounded_results(InFlight, Remaining, FetchCtx, Results, NumBatches)
end
after Timeout ->
Completed = maps:size(Results),
Missing = NumBatches - Completed,
logger:warning("barrel_query: pipeline timed out after ~pms; "
"~p/~p batches completed, ~p still in flight",
[Timeout, Completed, NumBatches, length(InFlight) + length(Remaining)]),
barrel_metrics:inc_query_timeouts(),
error({query_timeout, #{completed => Completed,
total => NumBatches,
missing_batches => Missing,
timeout_ms => Timeout}})
end.
%% @doc Split list into batches of specified size.
split_into_batches(List, BatchSize) ->
split_into_batches(List, BatchSize, []).
split_into_batches([], _BatchSize, Acc) ->
lists:reverse(Acc);
split_into_batches(List, BatchSize, Acc) ->
case length(List) =< BatchSize of
true ->
lists:reverse([List | Acc]);
false ->
{Batch, Rest} = lists:split(BatchSize, List),
split_into_batches(Rest, BatchSize, [Batch | Acc])
end.
%% @doc Fetch and filter a batch for pipelined execution.
fetch_batch_for_pipeline(DbName, DocIds, Conditions, Bindings,
Projections, IncludeDocs, DecoderFun, Snapshot) ->
Profile = select_body_read_profile(length(DocIds)),
%% Fetch bodies with snapshot (always valid in this code path)
DocBodyResults = barrel_doc_body_store:multi_get_current_bodies_with_snapshot(
DbName, DocIds, Snapshot, Profile
),
%% Process each doc
Pairs = lists:zip(DocIds, DocBodyResults),
lists:filtermap(fun({DocId, BodyResult}) ->
process_doc_body(DocId, BodyResult, Conditions, Bindings,
Projections, IncludeDocs, DecoderFun)
end, Pairs).
%% @doc Decide whether to use pipelined execution.
%% Use pipelining when:
%% - Many DocIds to process (>500)
%% - Include docs is true (need body fetch)
%% - No limit or large limit (for small limits, streaming is better)
should_use_pipelining(DocCount, #query_plan{include_docs = IncludeDocs, limit = Limit}) ->
case IncludeDocs of
false ->
%% No body fetch needed, pipelining doesn't help
false;
true ->
case Limit of
undefined ->
%% Unbounded query with many docs
DocCount >= ?PIPELINE_MIN_DOCIDS;
L when is_integer(L), L > 100 ->
%% Large limit with many docs
DocCount >= ?PIPELINE_MIN_DOCIDS;
_ ->
%% Small limit - streaming is better
false
end
end.
%%====================================================================
%% Profiling Functions (temporary)
%%====================================================================
%% @doc Get current profiling counters
get_profile() ->
#{
index_iter_us => pdict_get(profile_index_iter, 0),
verify_conds_us => pdict_get(profile_verify_conds, 0),
docinfo_fetch_us => pdict_get(profile_docinfo_fetch, 0),
docbody_fetch_us => pdict_get(profile_docbody_fetch, 0),
deser_match_us => pdict_get(profile_deser_match, 0),
doc_count => pdict_get(profile_doc_count, 0)
}.
%% @doc Reset profiling counters
reset_profile() ->
erase(profile_index_iter),
erase(profile_verify_conds),
erase(profile_docinfo_fetch),
erase(profile_docbody_fetch),
erase(profile_deser_match),
erase(profile_doc_count),
ok.
%% @doc Dump profiling data to console
dump_profile() ->
Profile = get_profile(),
Total = maps:get(index_iter_us, Profile) +
maps:get(verify_conds_us, Profile) +
maps:get(docinfo_fetch_us, Profile) +
maps:get(docbody_fetch_us, Profile) +
maps:get(deser_match_us, Profile),
io:format("~n=== Query Profile ===~n"),
io:format(" Index iteration: ~8.B us (~5.1f%)~n",
[maps:get(index_iter_us, Profile),
pct(maps:get(index_iter_us, Profile), Total)]),
io:format(" Verify conditions: ~8.B us (~5.1f%)~n",
[maps:get(verify_conds_us, Profile),
pct(maps:get(verify_conds_us, Profile), Total)]),
io:format(" Doc info fetch: ~8.B us (~5.1f%)~n",
[maps:get(docinfo_fetch_us, Profile),
pct(maps:get(docinfo_fetch_us, Profile), Total)]),
io:format(" Doc body fetch: ~8.B us (~5.1f%)~n",
[maps:get(docbody_fetch_us, Profile),
pct(maps:get(docbody_fetch_us, Profile), Total)]),
io:format(" Deser + matching: ~8.B us (~5.1f%)~n",
[maps:get(deser_match_us, Profile),
pct(maps:get(deser_match_us, Profile), Total)]),
io:format(" --------------------------~n"),
io:format(" Total: ~8.B us~n", [Total]),
io:format(" Docs processed: ~8.B~n", [maps:get(doc_count, Profile)]),
ok.
pct(_, 0) -> 0.0;
pct(Part, Total) -> (Part / Total) * 100.