src/erlmld_worker.erl

%%% @copyright (C) 2016, AdRoll
%%% @doc
%%%
%%%    KCL MultiLangDaemon worker (record processor) behavior.
%%%
%%%    A worker has the following lifecycle:
%%%
%%%        INITIALIZE -> PROCESSING -> SHUTDOWN
%%%
%%%    When a shard lease has been obtained, a worker is initialized to process records
%%%    appearing on that shard.  It is provided the opaque data which was supplied to
%%%    erlmld_sup, the shard name, and initial sequence number(s) (may be undefined for
%%%    new shards or if using V1 protocol), and returns an opaque worker_state() value
%%%    which is passed to process_records/2 and shutdown/2.
%%%
%%%    As records are read from the stream, they are b64decoded and passed to
%%%    process_record/2.  If a record was put on the stream using KPL aggregation, it is
%%%    also deaggregated, with each sub-record provided to the worker as a single record
%%%    along with a subsequence number.
%%%
%%%    After processing each record, a worker returns an updated worker_state().  It may
%%%    also return a checkpoint() (not necessarily the latest) containing a
%%%    sequence_number() from that record or a previous record, which will result in an
%%%    attempt to checkpoint the stream at the associated sequence number.  If the
%%%    supplied checkpoint() has an undefined sequence number, the stream is checkpointed
%%%    at the most recent sequence number.
%%%
%%%    Before starting to process each batch of records, a worker's ready/1 callback is
%%%    called, which should return a possibly-updated worker state and possibly a
%%%    checkpoint.  This can be useful when a record processor is using a watchdog timer
%%%    and is far behind on a stream (and so won't receive any actual records for a
%%%    while), or if a stream has very low volume (records seen less frequently than
%%%    desired checkpoint or flush intervals).
%%%
%%%    When a shard lease has been lost or a shard has been completely processed, a worker
%%%    will be shut down.  If the lease was lost, the worker will receive a reason of
%%%    'zombie', and it should not checkpoint (and any checkpoint response is in error).
%%%    If the shard was closed, the reason will be 'terminate' and the worker should
%%%    return a checkpoint response.  That checkpoint should either have an undefined
%%%    sequence number, or it should be the most recent sequence number which was provided
%%%    to process_record/2.
%%%
%%%    If a worker returns an error response, it is fatal.
%%%
%%%    See also: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java
%%%
%%% @end
%%% Created : 18 Nov 2016 by Mike Watters <mike.watters@adroll.com>

-module(erlmld_worker).

-include("erlmld.hrl").

-export([initialize/4, ready/2, process_record/3, checkpointed/4, shutdown/3]).

-callback initialize(term(), shard_id(), sequence_number() | undefined) ->
                        {ok, worker_state()} | {error, term()}.
-callback ready(worker_state()) ->
                   {ok, worker_state()} | {ok, worker_state(), checkpoint()} | {error, term()}.
-callback process_record(worker_state(), stream_record()) ->
                            {ok, worker_state()} |
                            {ok, worker_state(), checkpoint()} |
                            {error, term()}.
-callback checkpointed(worker_state(), sequence_number(), checkpoint()) ->
                          {ok, worker_state()} | {error, term()}.
-callback shutdown(worker_state(), shutdown_reason()) ->
                      ok | {ok, checkpoint()} | {error, term()}.

-spec initialize(module(), term(), shard_id(), sequence_number() | undefined) ->
                    {ok, worker_state()} | {error, term()}.
initialize(Mod, HandlerData, ShardId, ISN) ->
    Mod:initialize(HandlerData, ShardId, ISN).

-spec ready(module(), worker_state()) ->
               {ok, worker_state()} | {ok, worker_state(), checkpoint()} | {error, term()}.
ready(Mod, WorkerState) ->
    Mod:ready(WorkerState).

-spec process_record(module(), worker_state(), stream_record()) ->
                        {ok, worker_state()} | {ok, worker_state(), checkpoint()} | {error, term()}.
process_record(Mod, WorkerState, Record) ->
    Mod:process_record(WorkerState, Record).

-spec checkpointed(module(), worker_state(), sequence_number(), checkpoint()) ->
                      {ok, worker_state()} | {error, term()}.
checkpointed(Mod, WorkerState, SeqNumber, Checkpoint) ->
    Mod:checkpointed(WorkerState, SeqNumber, Checkpoint).

-spec shutdown(module(), worker_state(), shutdown_reason()) ->
                  ok | {ok, checkpoint()} | {error, term()}.
shutdown(Mod, WorkerState, Reason) ->
    Mod:shutdown(WorkerState, Reason).