Skip to main content

src/barrel_query_cursor.erl

%%%-------------------------------------------------------------------
%%% @doc Query cursor management for chunked query execution
%%%
%%% Manages cursor state for paginated queries using ETS.
%%% Each cursor holds:
%%% - Snapshot reference for consistent reads across chunks
%%% - Last key position for resumption
%%% - Query metadata for validation
%%% - Expiry time for cleanup
%%% @end
%%%-------------------------------------------------------------------
-module(barrel_query_cursor).
-behaviour(gen_server).

%% API
-export([start_link/0]).
-export([create/5, lookup/1, update/2, release/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(TABLE, barrel_query_cursors).
-define(CLEANUP_INTERVAL_MS, 10000).  %% Cleanup every 10 seconds
-define(DEFAULT_TTL_MS, 60000).       %% Cursors expire after 60 seconds

-record(cursor, {
    id :: binary(),
    store_ref :: term(),
    db_name :: binary(),
    snapshot :: term() | undefined,
    last_key :: binary(),
    query_type :: atom(),
    created_at :: integer(),
    expires_at :: integer()
}).

%%====================================================================
%% API
%%====================================================================

%% @doc Start the cursor manager
-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%% @doc Create a new cursor with snapshot
%% Returns an opaque cursor token
-spec create(term(), binary(), atom(), binary(), term()) -> binary().
create(StoreRef, DbName, QueryType, LastKey, Snapshot) ->
    CursorId = generate_cursor_id(),
    Now = erlang:system_time(millisecond),
    Cursor = #cursor{
        id = CursorId,
        store_ref = StoreRef,
        db_name = DbName,
        snapshot = Snapshot,
        last_key = LastKey,
        query_type = QueryType,
        created_at = Now,
        expires_at = Now + ?DEFAULT_TTL_MS
    },
    true = ets:insert(?TABLE, {CursorId, Cursor}),
    CursorId.

%% @doc Look up a cursor by token
%% Returns cursor data if valid and not expired
-spec lookup(binary()) -> {ok, #cursor{}} | {error, not_found | expired}.
lookup(CursorId) ->
    case ets:lookup(?TABLE, CursorId) of
        [{CursorId, Cursor}] ->
            Now = erlang:system_time(millisecond),
            case Cursor#cursor.expires_at > Now of
                true ->
                    %% Extend TTL on access
                    NewCursor = Cursor#cursor{expires_at = Now + ?DEFAULT_TTL_MS},
                    true = ets:insert(?TABLE, {CursorId, NewCursor}),
                    {ok, NewCursor};
                false ->
                    %% Expired - clean up
                    release(CursorId),
                    {error, expired}
            end;
        [] ->
            {error, not_found}
    end.

%% @doc Update cursor position after a chunk
-spec update(binary(), binary()) -> ok | {error, not_found}.
update(CursorId, NewLastKey) ->
    case ets:lookup(?TABLE, CursorId) of
        [{CursorId, Cursor}] ->
            Now = erlang:system_time(millisecond),
            NewCursor = Cursor#cursor{
                last_key = NewLastKey,
                expires_at = Now + ?DEFAULT_TTL_MS
            },
            true = ets:insert(?TABLE, {CursorId, NewCursor}),
            ok;
        [] ->
            {error, not_found}
    end.

%% @doc Release a cursor and its snapshot
-spec release(binary()) -> ok.
release(CursorId) ->
    case ets:lookup(?TABLE, CursorId) of
        [{CursorId, Cursor}] ->
            %% Release snapshot if present
            case Cursor#cursor.snapshot of
                undefined -> ok;
                Snapshot ->
                    barrel_store_rocksdb:safe_release_snapshot(Snapshot)
            end,
            true = ets:delete(?TABLE, CursorId),
            ok;
        [] ->
            ok
    end.

%%====================================================================
%% gen_server callbacks
%%====================================================================

init([]) ->
    %% Create ETS table for cursor storage
    ?TABLE = ets:new(?TABLE, [
        named_table,
        public,           %% Allow direct access from query processes
        set,
        {read_concurrency, true},
        {write_concurrency, true}
    ]),
    %% Schedule periodic cleanup
    erlang:send_after(?CLEANUP_INTERVAL_MS, self(), cleanup),
    {ok, #{}}.

handle_call(_Request, _From, State) ->
    {reply, {error, unknown_request}, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(cleanup, State) ->
    cleanup_expired_cursors(),
    erlang:send_after(?CLEANUP_INTERVAL_MS, self(), cleanup),
    {noreply, State};
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    %% Release all snapshots on shutdown
    ets:foldl(fun({_Id, Cursor}, Acc) ->
        case Cursor#cursor.snapshot of
            undefined -> ok;
            Snapshot ->
                barrel_store_rocksdb:safe_release_snapshot(Snapshot)
        end,
        Acc
    end, ok, ?TABLE),
    ok.

%%====================================================================
%% Internal functions
%%====================================================================

generate_cursor_id() ->
    %% Generate a unique cursor ID
    Rand = crypto:strong_rand_bytes(16),
    base64:encode(Rand).

cleanup_expired_cursors() ->
    Now = erlang:system_time(millisecond),
    %% Find and delete expired cursors
    Expired = ets:foldl(fun({Id, Cursor}, Acc) ->
        case Cursor#cursor.expires_at =< Now of
            true -> [Id | Acc];
            false -> Acc
        end
    end, [], ?TABLE),
    %% Release each expired cursor
    lists:foreach(fun(Id) -> release(Id) end, Expired).