%%% @copyright (C) 2016, AdRoll
%%% @doc
%%%
%%% Flusher behavior.
%%%
%%% A flusher is initialized with a ShardID and some arbitrary data:
%%%
%%% FlusherState = my_flusher:init(ShardId, FlusherData)
%%%
%%% The flusher maintains a batch of records. A new record can be added
%%% using 'add_record', together with an opaque token. 'add_record' will
%%% return an error if the current batch is full:
%%%
%%% case my_flusher:add_record(FlusherState, StreamRecord, Token) of
%%% {error, full} -> %% should flush and try again
%%% {ignored, NewFlusherState} -> %% record ignored
%%% {ok, NewFlusherState} -> %% record added
%%% end
%%%
%%% The current batch can be flushed using 'flush'. The caller gets back a
%%% list of tokens identifying which records got flushed. If the second
%%% argument to flush/2 is 'full', all outstanding data must be flushed;
%%% this is to support checkpointing on shards which processing is being
%%% terminated.
%%%
%%% {ok, NewFlusherState, FlushedTokens} = my_flusher:flush(FlusherState, partial)
%%%
%%% A flusher is meant to be used as part of a 'erlmld_batch_processor'.
%%% The batch processor handles checkpointing and decides when to trigger
%%% flushing.
%%%
%%% heartbeat/1 will be called regardless of whether any records could be
%%% obtained from the stream. It may return the same values as flush/2.
%%% If it returns a non-empty list of tokens as the third tuple element, it
%%% is considered to have just performed a partial flush. This allows a
%%% flusher to flush even if no records were actually available on the
%%% stream (e.g., after a period of time has elapsed), avoiding potential
%%% near-deadlock situations which would only be resolved by additional
%%% stream records appearing (where the batch processor is waiting for
%%% tokens from the flusher before checkpointing, but the flusher is
%%% waiting for more records from the batch processor before producing
%%% tokens via flushing).
%%%
%%% @end
%%% Created : 20 Dec 2016 by Constantin Berzan <constantin.berzan@adroll.com>
-module(erlmld_flusher).
-include("erlmld.hrl").
-export([init/3, add_record/4, flush/3, heartbeat/2]).
-callback init(shard_id(), term()) -> flusher_state().
-callback add_record(flusher_state(), stream_record(), flusher_token()) ->
{ok, flusher_state()} | {ignored, flusher_state()} | {error, full | term()}.
-callback flush(flusher_state(), partial | full) ->
{ok, flusher_state(), [flusher_token()]} | {error, term()}.
-callback heartbeat(flusher_state()) ->
{ok, flusher_state(), [flusher_token()]} | {error, term()}.
-spec init(module(), shard_id(), term()) -> flusher_state().
init(Mod, ShardId, Arg) ->
Mod:init(ShardId, Arg).
-spec add_record(module(), flusher_state(), stream_record(), flusher_token()) ->
{ok, flusher_state()} | {ignored, flusher_state()} | {error, full | term()}.
add_record(Mod, State, Record, Token) ->
Mod:add_record(State, Record, Token).
-spec flush(module(), flusher_state(), partial | full) ->
{ok, flusher_state(), [flusher_token()]} | {error, term()}.
flush(Mod, State, Mode) ->
Mod:flush(State, Mode).
-spec heartbeat(module(), flusher_state()) ->
{ok, flusher_state(), [flusher_token()]} | {error, term()}.
heartbeat(Mod, State) ->
Mod:heartbeat(State).