include/swirl.hrl

%% macros
-define(DEFAULT_HEARTBEAT, timer:seconds(5)).
-define(DEFAULT_MAPPERS_MAX, 100).
-define(DEFAULT_MAPPER_WINDOW, timer:seconds(1)).
-define(DEFAULT_REDUCERS_MAX, 100).
-define(DEFAULT_REDUCER_SKIP, false).
-define(DEFAULT_REDUCER_WINDOW, timer:seconds(1)).
-define(DEFAULT_WINDOW_SYNC, false).

-define(TABLE_NAME_CODE_SERVER, swirl_code_server).
-define(TABLE_NAME_FLOWS, swirl_flows).
-define(TABLE_NAME_MAPPERS, swirl_mappers).
-define(TABLE_NAME_REDUCERS, swirl_reducers).
-define(TABLE_NAME_STREAMS, swirl_streams).

-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(L(Key, List), swirl_utils:lookup(Key, List)).
-define(L(Key, List, Default), swirl_utils:lookup(Key, List, Default)).
-define(NULL, undefined).

%% records
-record(flow, {
    id             :: binary(),
    module         :: module(),
    module_vsn     :: undefined | module_vsn(),
    stream_filter  :: undefined | string(),
    stream_names   :: undefined | stream_names(),
    mapper_window  :: undefined | pos_integer(),
    mapper_nodes   :: undefined | [node()],
    mapper_opts    :: mapper_opts(),
    reducer_window :: undefined | pos_integer(),
    reducer_node   :: node(),
    reducer_opts   :: reducer_opts(),
    reducer_skip   :: undefined | boolean(),
    output_opts    :: output_opts(),
    heartbeat      :: undefined | pos_integer(),
    window_sync    :: undefined | boolean(),
    started_at     :: undefined | erlang:timestamp(),
    start_node     :: node()
}).

-record(stream, {
    flow_id      :: binary(),
    flow_mod     :: module(),
    flow_mod_vsn :: module_vsn(),
    start_node   :: node(),
    exp_tree     :: undefined | exp_tree(),
    mapper_opts  :: mapper_opts(),
    table_id     :: ets:tab()
}).

-record(period, {
    start_at :: pos_integer(),
    end_at   :: pos_integer()
}).

%% types
-type flow()         :: #flow {}.
-type flow_opts()    :: {heartbeat, pos_integer()} |
                        {mapper_opts, mapper_opts()} |
                        {mapper_window, pos_integer()} |
                        {output_opts, output_opts()} |
                        {reducer_opts, reducer_opts()} |
                        {reducer_skip, boolean()} |
                        {reducer_window, pos_integer()} |
                        {stream_filter, string()} |
                        {stream_names, stream_names()} |
                        {window_sync, boolean()}.
-type mapper_opts()  :: term().
-type module_vsn()   :: pos_integer().
-type output_opts()  :: term().
-type period()       :: #period {}.
-type reducer_opts() :: term().
-type row()          :: tuple().
-type stream()       :: #stream {}.
-type stream_name()  :: atom().
-type stream_names() :: [stream_name()].
-type update()       :: {tuple(), tuple()}.

-type event() :: [{atom(), value()}].

-type boolean_op()    :: 'and' | 'or'.
-type comparison_op() :: '<' | '<=' | '=' | '>=' | '>' | '<>'.
-type inclusion_op()  :: in | notin.
-type null_op()       :: null | notnull.
-type variable()      :: atom().
-type value()         :: integer() | float() | binary().

-type exp_tree() :: {boolean_op(), exp_tree(), exp_tree()} |
                    {comparison_op(), variable(), value()} |
                    {inclusion_op(), variable(), [value(), ...]} |
                    {null_op(), variable()}.