src/swirl_reducer.erl

%% TODO: add module vsn check to auto-refresh old modules from the code server

-module(swirl_reducer).
-include("swirl.hrl").

-compile({no_auto_import, [
    unregister/1
]}).

%% internal
-export([
    lookup/1,
    register/1,
    start/1,
    unregister/1
]).

-behaviour(gen_server).
-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    terminate/2,
    code_change/3
]).

-define(TABLE_NAME, reducer_rows).
-define(TABLE_OPTS, [public, {write_concurrency, true}]).
-define(SERVER, ?MODULE).

-record(state, {
    flow,
    table_id,
    window_timer,
    window_tstamp,
    hbeat_timer,
    hbeat_nodes
}).

%% internal
-spec lookup(binary() | flow()) -> undefined | pid().
lookup(FlowId) when is_binary(FlowId) ->
    lookup(#flow {id = FlowId});
lookup(#flow {} = Flow) ->
    swirl_tracker:lookup(?TABLE_NAME_REDUCERS, key(Flow)).

-spec register(flow()) -> true.
register(#flow {} = Flow) ->
    swirl_tracker:register(?TABLE_NAME_REDUCERS, key(Flow), self()).

-spec start(flow()) -> {ok, pid()} | {error, reducers_max}.
start(#flow {} = Flow) ->
    ReducersCount = swirl_config:reducers_count(),
    ReducersMax = swirl_config:reducers_max(),
    case lookup(Flow) of
        undefined when ReducersCount < ReducersMax ->
            start_link(Flow);
        _Else ->
            {error, reducers_max}
    end.

-spec unregister(flow()) -> true.
unregister(#flow {} = Flow) ->
    swirl_tracker:unregister(?TABLE_NAME_REDUCERS, key(Flow)).

%% gen_server callbacks
init(#flow {mapper_nodes = MapperNodes} = Flow) ->
    process_flag(trap_exit, true),
    register(Flow),
    swirl_flow:register(Flow),

    self() ! flush,
    self() ! heartbeat,

    {ok, #state {
        flow = Flow,
        hbeat_nodes = MapperNodes
    }}.
handle_call(Request, _From, State) ->
    io:format("unexpected message: ~p~n", [Request]),
    {reply, ok, State}.

handle_cast(Msg, State) ->
    io:format("unexpected message: ~p~n", [Msg]),
    {noreply, State}.

handle_info(flush, #state {
        flow = #flow {
            reducer_window = Window,
            window_sync = Sync
        } = Flow,
        table_id = TableId,
        window_tstamp = Tstamp
    } = State) ->

    Tstamp2 = swirl_utils:unix_tstamp_ms(),
    WindowTimer = swirl_utils:new_timer(Window, flush, Sync),
    NewTableId = ets:new(?TABLE_NAME, ?TABLE_OPTS),
    Period = #period {start_at = Tstamp, end_at = Tstamp2},
    spawn(fun() -> flush_window(Flow, Period, TableId) end),

    {noreply, State#state {
        table_id = NewTableId,
        window_tstamp = Tstamp2,
        window_timer = WindowTimer
    }};
handle_info(heartbeat, #state {
        flow = #flow {
            id = FlowId,
            heartbeat = Hbeat,
            mapper_nodes = MapperNodes
        } = Flow,
        hbeat_nodes = HbeatNodes
    } = State) ->

    HbeatTimer = swirl_utils:new_timer(Hbeat, heartbeat, true),

    DeadNodes = lists:filter(fun(Node) ->
        not lists:member(Node, HbeatNodes)
    end, MapperNodes),

    FlowProp = swirl_utils:record_to_proplist(Flow),
    Msg = {start_mapper, FlowProp},
    [swirl_tracker:message(Node, FlowId, Msg) || Node <- DeadNodes],

    Msg2 = {ping, node()},
    [swirl_tracker:message(Node, FlowId, Msg2) || Node <- MapperNodes],

    {noreply, State#state {
        hbeat_timer = HbeatTimer,
        hbeat_nodes = []
    }};
handle_info({pong, MapperNode}, #state {
        hbeat_nodes = HbeatNodes
    } = State) ->

    {noreply, State#state {
        hbeat_nodes = [MapperNode | HbeatNodes]
    }};
handle_info(stop, State) ->
    {stop, normal, State};
handle_info({mapper_window, Period, Rows}, #state {
        table_id = TableId
    } = State) ->

    spawn(fun() -> map_rows(Period, Rows, TableId) end),
    {noreply, State};
handle_info({ping, Node}, #state {flow = #flow {id = FlowId}} = State) ->
    swirl_tracker:message(Node, FlowId, pong),
    {noreply, State};
handle_info(Msg, State) ->
    io:format("unexpected message: ~p~n", [Msg]),
    {noreply, State}.

terminate(_Reason, #state {
        flow = Flow,
        window_timer = WindowTimer
    }) ->

    swirl_flow:unregister(Flow),
    unregister(Flow),
    timer:cancel(WindowTimer),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% private
flush_window(_Flow, _Period, undefined) ->
    ok;
flush_window(#flow {
        reducer_skip = ReducerSkip
    } = Flow, Period, TableId) ->

    Rows = swirl_utils:tab2list(TableId),
    true = ets:delete(TableId),
    ReducedRows = reduce_rows(Flow, Rows, ReducerSkip),
    output(Flow, Period, ReducedRows).

key(#flow {id = FlowId}) -> FlowId.

map_rows(_Period, [], _TableId) ->
    ok;
map_rows(Period, [H | T], TableId) ->
    [Key | Counters] = tuple_to_list(H),
    swirl_utils:safe_ets_increment(TableId, Key, Counters),
    map_rows(Period, T, TableId).

-spec output(flow(), period(), list(row())) -> ok.
output(#flow {
        module = Module,
        module_vsn = ModuleVsn,
        start_node = StartNode,
        output_opts = OutputOpts
    } = Flow, Period, Rows) ->

    try Module:output(Flow, Period, Rows, OutputOpts)
    catch
        error:undef ->
            swirl_code_server:get_module(StartNode, Module, ModuleVsn)
    end.

-spec reduce(flow(), row()) -> ignore | update().
reduce(#flow {
        module = Module,
        module_vsn = ModuleVsn,
        start_node = StartNode,
        reducer_opts = ReducerOpts
    } = Flow, Row) ->

    try Module:reduce(Flow, Row, ReducerOpts) of
        ignore -> ignore;
        {_Key, _Counters} = Update -> Update
    catch
        error:undef ->
            swirl_code_server:get_module(StartNode, Module, ModuleVsn),
            ignore
    end.

reduce_rows(_Flow, [], _ReducerSkip) ->
    [];
reduce_rows(Flow, [Row | T], ReducerSkip) ->
    [Key | Counters] = tuple_to_list(Row),
    Row2 = {Key, list_to_tuple(Counters)},
    case ReducerSkip of
        true ->
            [Row2 | reduce_rows(Flow, T, ReducerSkip)];
        false ->
            case reduce(Flow, Row2) of
                ignore ->
                    reduce_rows(Flow, T, ReducerSkip);
                Row3 ->
                    [Row3 | reduce_rows(Flow, T, ReducerSkip)]
            end
    end.

start_link(Flow) ->
    gen_server:start_link(?MODULE, Flow, []).