src/erlmld_noisy_wrk.erl

%%% @copyright (C) 2016, AdRoll
%%% @doc
%%%
%%%     The default example worker, which prints events and checkpoints after every 10
%%%     records.
%%%
%%% @end
%%% Created : 18 Nov 2016 by Mike Watters <mike.watters@adroll.com>

-module(erlmld_noisy_wrk).

-behaviour(erlmld_worker).

-export([initialize/3, ready/1, process_record/2, checkpointed/3, shutdown/2]).

-include("erlmld.hrl").

-record(state, {shard_id, count = 0}).

initialize(_Opaque, ShardId, ISN) ->
    State = #state{shard_id = ShardId},
    io:format("~p initialized for shard ~p at ~p~n", [self(), ShardId, ISN]),
    {ok, State}.

ready(State) ->
    {ok, State}.

process_record(#state{shard_id = ShardId, count = Count} = State,
               #stream_record{sequence_number = SN} = Record) ->
    io:format("~p (~p) got record ~p~n", [ShardId, Count, Record]),
    case Count >= 10 of
        true ->
            {ok, State#state{count = 0}, #checkpoint{sequence_number = SN}};
        false ->
            {ok, State#state{count = Count + 1}}
    end.

checkpointed(#state{shard_id = ShardId, count = Count} = State,
             SequenceNumber,
             Checkpoint) ->
    io:format("~p (~p) checkpointed at ~p (~p)~n",
              [ShardId, Count, Checkpoint, SequenceNumber]),
    {ok, State}.

shutdown(#state{shard_id = ShardId, count = Count}, Reason) ->
    io:format("~p (~p) shutting down, reason: ~p~n", [ShardId, Count, Reason]),
    case Reason of
        terminate ->
            {ok, #checkpoint{}};
        _ ->
            ok
    end.