-module(swirl_flow).
-include("swirl.hrl").
%% public
-export([
start/4,
stop/1
]).
%% inernal
-export([
lookup/1,
register/1,
unregister/1
]).
%% callback
-callback map(stream_name(), event(), mapper_opts()) ->
list(update()) | update() | ignore.
-callback reduce(flow(), row(), reducer_opts()) ->
update() | ignore.
-callback output(flow(), period(), list(row()), output_opts()) ->
ok.
%% public
-spec start(atom(), [flow_opts()], [node()], node()) ->
{ok, flow()} | {error, flow_mod_undef | {bad_flow_opts, list()}}.
start(FlowMod, FlowOpts, MapperNodes, ReducerNode) ->
case flow(FlowMod, FlowOpts, MapperNodes, ReducerNode) of
{ok, Flow} ->
ok = swirl_tracker:start_reducer(Flow),
ok = swirl_tracker:start_mappers(Flow),
{ok, Flow};
{error, Reason} ->
{error, Reason}
end.
-spec stop(flow()) ->
ok.
stop(#flow {} = Flow) ->
ok = swirl_tracker:stop_mappers(Flow),
ok = swirl_tracker:stop_reducer(Flow),
ok.
%% internal
-spec lookup(binary() | flow()) ->
undefined | flow().
lookup(FlowId) when is_binary(FlowId) ->
lookup(#flow {id = FlowId});
lookup(#flow {} = Flow) ->
swirl_tracker:lookup(?TABLE_NAME_FLOWS, key(Flow)).
-spec register(flow()) ->
true.
register(#flow {} = Flow) ->
swirl_tracker:register(?TABLE_NAME_FLOWS, key(Flow), Flow).
-spec unregister(flow()) ->
true.
unregister(#flow {} = Flow) ->
swirl_tracker:unregister(?TABLE_NAME_FLOWS, key(Flow)).
%% private
flow(Module, Options, MapperNodes, ReducerNode) ->
case swirl_code_server:version(Module) of
{ok, ModuleVsn} ->
case verify_options(Options) of
ok ->
{ok, new_flow_rec(Module, ModuleVsn, Options,
MapperNodes, ReducerNode)};
{error, Reason} ->
{error, Reason}
end;
{error, undef} ->
{error, flow_mod_undef}
end.
key(#flow {id = Id}) -> Id.
new_flow_rec(Module, ModuleVsn, Options, MapperNodes,
ReducerNode) ->
#flow {
id = swirl_utils:uuid(),
module = Module,
module_vsn = ModuleVsn,
start_node = node(),
heartbeat = ?L(heartbeat, Options, ?DEFAULT_HEARTBEAT),
window_sync = ?L(window_sync, Options, ?DEFAULT_WINDOW_SYNC),
mapper_window = ?L(mapper_window, Options, ?DEFAULT_MAPPER_WINDOW),
mapper_nodes = MapperNodes,
mapper_opts = ?L(mapper_opts, Options, []),
reducer_window = ?L(reducer_window, Options, ?DEFAULT_REDUCER_WINDOW),
reducer_node = ReducerNode,
reducer_opts = ?L(reducer_opts, Options, []),
reducer_skip = ?L(reducer_skip, Options, ?DEFAULT_REDUCER_SKIP),
output_opts = ?L(output_opts, Options, []),
stream_filter = ?L(stream_filter, Options),
stream_names = ?L(stream_names, Options, []),
started_at = os:timestamp()
}.
verify_options(FlowOpts) ->
verify_options(FlowOpts, []).
verify_options([{heartbeat, Heartbeat} | Options], Errors)
when is_integer(Heartbeat) ->
verify_options(Options, Errors);
verify_options([{mapper_opts, _} | Options], Errors) ->
verify_options(Options, Errors);
verify_options([{mapper_window, MapperWindow} | Options], Errors)
when is_integer(MapperWindow) ->
verify_options(Options, Errors);
verify_options([{output_opts, _} | Options], Errors) ->
verify_options(Options, Errors);
verify_options([{reducer_opts, _} | Options], Errors) ->
verify_options(Options, Errors);
verify_options([{reducer_skip, ReducerSkip} | Options], Errors)
when is_boolean(ReducerSkip) ->
verify_options(Options, Errors);
verify_options([{reducer_window, ReducerWindow} | Options], Errors)
when is_integer(ReducerWindow) ->
verify_options(Options, Errors);
verify_options([{stream_filter, undefined} | Options], Errors) ->
verify_options(Options, Errors);
verify_options([{stream_filter, StreamFilter} = Option | Options], Errors) ->
case swirl_ql:parse(StreamFilter) of
{ok, _ExpTree} ->
verify_options(Options, Errors);
{error, _Reason} ->
verify_options(Options, [Option | Errors])
end;
verify_options([{stream_names, StreamNames} | Options], Errors)
when is_list(StreamNames)->
verify_options(Options, Errors);
verify_options([{window_sync, WindowSync} | Options], Errors)
when is_boolean(WindowSync) ->
verify_options(Options, Errors);
verify_options([Option | Options], Errors) ->
verify_options(Options, [Option | Errors]);
verify_options([], []) ->
ok;
verify_options([], Errors) ->
{error, {bad_flow_opts, Errors}}.