Skip to main content

src/barrel_changes_stream.erl

%%%-------------------------------------------------------------------
%%% @doc Changes stream for barrel_docdb
%%%
%%% A gen_statem process that provides a streaming interface to the
%%% changes feed. Supports two modes:
%%% - iterate: Pull-based, client calls next/1 to get changes
%%% - push: Push-based, changes are sent to subscriber
%%% @end
%%%-------------------------------------------------------------------
-module(barrel_changes_stream).

-behaviour(gen_statem).

-include("barrel_docdb.hrl").

%% API
-export([
    start_link/3,
    next/1,
    stop/1
]).

%% Push mode API
-export([
    await/1,
    await/2,
    ack/2
]).

%% gen_statem callbacks
-export([
    init/1,
    callback_mode/0,
    terminate/3
]).

%% State functions
-export([
    iterate/3,
    push/3,
    wait_pending/3
]).

-define(CHANGES_INTERVAL, 100).
-define(BATCH_SIZE, 100).
-define(MAX_PENDING, 5).

%%====================================================================
%% Types
%%====================================================================

-type stream_mode() :: iterate | push.
-type stream_opts() :: #{
    since => seq() | first,
    mode => stream_mode(),
    include_docs => boolean(),
    interval => pos_integer(),
    batch_size => pos_integer(),
    owner => pid()
}.

-export_type([stream_mode/0, stream_opts/0]).

%%====================================================================
%% API
%%====================================================================

%% @doc Start a changes stream
-spec start_link(barrel_store_rocksdb:db_ref(), db_name(), stream_opts()) ->
    {ok, pid()} | {error, term()}.
start_link(StoreRef, DbName, Opts) ->
    gen_statem:start_link(?MODULE, [StoreRef, DbName, Opts], []).

%% @doc Get the next change (iterate mode)
-spec next(pid()) -> {ok, barrel_changes:change()} | done | {error, term()}.
next(Pid) ->
    Tag = make_ref(),
    gen_statem:cast(Pid, {next, {self(), Tag}}),
    receive
        {Tag, Result} -> Result
    after 5000 ->
        {error, timeout}
    end.

%% @doc Wait for changes (push mode)
-spec await(pid()) -> {reference(), [barrel_changes:change()]} | [].
await(Pid) ->
    await(Pid, infinity).

%% @doc Wait for changes with timeout (push mode)
-spec await(pid(), timeout()) -> {reference(), [barrel_changes:change()]} | [].
await(Pid, Timeout) ->
    MRef = monitor(process, Pid),
    receive
        {changes, ReqId, Changes} ->
            demonitor(MRef, [flush]),
            {ReqId, Changes};
        {'DOWN', MRef, process, _, _Reason} ->
            []
    after Timeout ->
        demonitor(MRef, [flush]),
        []
    end.

%% @doc Acknowledge receipt of changes (push mode)
-spec ack(pid(), reference()) -> ok.
ack(Pid, ReqId) ->
    gen_statem:cast(Pid, {ack, ReqId}).

%% @doc Stop the stream
-spec stop(pid()) -> ok.
stop(Pid) ->
    gen_statem:cast(Pid, stop).

%%====================================================================
%% gen_statem callbacks
%%====================================================================

callback_mode() -> state_functions.

init([StoreRef, DbName, Opts]) ->
    Since = maps:get(since, Opts, first),
    Mode = maps:get(mode, Opts, iterate),
    IncludeDocs = maps:get(include_docs, Opts, false),

    BaseState = #{
        store_ref => StoreRef,
        db_name => DbName,
        since => Since,
        include_docs => IncludeDocs
    },

    case Mode of
        iterate ->
            {ok, iterate, BaseState};
        push ->
            Interval = maps:get(interval, Opts, ?CHANGES_INTERVAL),
            BatchSize = maps:get(batch_size, Opts, ?BATCH_SIZE),
            Owner = maps:get(owner, Opts, undefined),
            PushState = BaseState#{
                interval => Interval,
                batch_size => BatchSize,
                owner => Owner,
                pending => []
            },
            {ok, push, PushState, [{next_event, info, send_changes}]}
    end.

terminate(_Reason, _State, _Data) ->
    ok.

%%====================================================================
%% State: iterate (pull mode)
%%====================================================================

iterate(cast, {next, {From, Tag}}, #{store_ref := StoreRef,
                                     db_name := DbName,
                                     since := Since} = State) ->
    FoldFun = fun(Change, _Acc) -> {stop, {found, Change}} end,
    case barrel_changes:fold_changes(StoreRef, DbName, Since, FoldFun, not_found) of
        {ok, {found, Change}, NewSeq} ->
            From ! {Tag, {ok, Change}},
            {keep_state, State#{since => NewSeq}};
        {ok, not_found, _} ->
            From ! {Tag, done},
            {stop, normal, State}
    end;

iterate(cast, stop, State) ->
    {stop, normal, State};

iterate(_EventType, _Event, State) ->
    {keep_state, State}.

%%====================================================================
%% State: push (push mode)
%%====================================================================

push(info, send_changes, #{store_ref := StoreRef,
                           db_name := DbName,
                           since := Since,
                           batch_size := BatchSize,
                           interval := Interval,
                           owner := Owner,
                           pending := Pending} = State) ->
    FoldFun = fun(Change, Acc) ->
        NewAcc = [Change | Acc],
        case length(NewAcc) >= BatchSize of
            true -> {stop, NewAcc};
            false -> {ok, NewAcc}
        end
    end,
    {ok, RevChanges, NewSeq} = barrel_changes:fold_changes(StoreRef, DbName, Since, FoldFun, []),
    Changes = lists:reverse(RevChanges),

    %% Only track non-empty batches to prevent stall during idle periods
    case Changes of
        [] ->
            %% No changes - don't add to pending, just schedule next poll
            erlang:send_after(Interval, self(), send_changes),
            {keep_state, State#{since => NewSeq}};
        _ ->
            %% Have changes - send and track
            ReqId = make_ref(),
            _ = case Owner of
                undefined -> ok;
                Pid when is_pid(Pid) -> Pid ! {changes, ReqId, Changes}
            end,

            NewPending = [ReqId | Pending],
            NewState = State#{since => NewSeq, pending => NewPending},

            case length(NewPending) < ?MAX_PENDING of
                true ->
                    erlang:send_after(Interval, self(), send_changes),
                    {keep_state, NewState};
                false ->
                    {next_state, wait_pending, NewState}
            end
    end;

push(cast, stop, State) ->
    {stop, normal, State};

push(_EventType, _Event, State) ->
    {keep_state, State}.



%%====================================================================
%% State: wait_pending (waiting for acks)
%%====================================================================

wait_pending(cast, {ack, ReqId}, #{pending := Pending, interval := Interval} = State) ->
    case Pending -- [ReqId] of
        [] ->
            {next_state, push, State#{pending => []},
             [{next_event, info, send_changes}]};
        NewPending when length(NewPending) < ?MAX_PENDING ->
            erlang:send_after(Interval, self(), send_changes),
            {next_state, push, State#{pending => NewPending}};
        NewPending ->
            {keep_state, State#{pending => NewPending}}
    end;

wait_pending(cast, stop, State) ->
    {stop, normal, State};

wait_pending(_EventType, _Event, State) ->
    {keep_state, State}.