src/swirl_stream.erl

-module(swirl_stream).
-include("swirl.hrl").
-compile([native]).

%% public
-export([
    emit/2
]).

%% internal
-export([
    lookup/1,
    register/2,
    unregister/1
]).

%% public
-spec emit(stream_name(), event()) -> ok.
emit(StreamName, Event) ->
    evaluate(StreamName, Event, lookup(StreamName)).

%% internal
-spec lookup(stream_name()) -> [tuple()].
lookup(StreamName) ->
    LookupSpec = match_lookup_spec(StreamName),
    ets:select(?TABLE_NAME_STREAMS, LookupSpec).

-spec register(flow(), ets:tab()) -> true.
register(#flow {
        id = FlowId,
        module = FlowMod,
        module_vsn = FlowModVsn,
        start_node = StartNode,
        stream_filter = StreamFilter,
        stream_names = StreamNames,
        mapper_opts = MapperOpts
    } = Flow, TableId) ->

    ok =:= lists:foreach(fun (StreamName) ->
        Key = key(Flow, StreamName),
        Stream = #stream {
            flow_id = FlowId,
            flow_mod = FlowMod,
            flow_mod_vsn = FlowModVsn,
            start_node = StartNode,
            exp_tree = expession_tree(StreamFilter),
            mapper_opts = MapperOpts,
            table_id = TableId
        },
        KeyValue = {Key, Stream},
        ets:insert(?TABLE_NAME_STREAMS, KeyValue)
    end, StreamNames).

-spec unregister(flow()) -> true.
unregister(#flow {stream_names = StreamNames} = Flow) ->
    ok =:= lists:foreach(fun (StreamName) ->
        DeleteSpec = match_delete_spec(Flow, StreamName),
        ets:match_delete(?TABLE_NAME_STREAMS, DeleteSpec)
    end, StreamNames).

%% private
evaluate(_StreamName, _Event, []) ->
    ok;
evaluate(StreamName, Event, [#stream {
        exp_tree = undefined
    } = Stream | T]) ->

    swirl_mapper:map(StreamName, Event, Stream),
    evaluate(StreamName, Event, T);
evaluate(StreamName, Event, [#stream {
        exp_tree = ExpTree
    } = Stream | T]) ->

    case swirl_ql:evaluate(ExpTree, Event) of
        true ->
            swirl_mapper:map(StreamName, Event, Stream);
        false -> ok
    end,
    evaluate(StreamName, Event, T).

expession_tree(undefined) ->
    undefined;
expession_tree(StreamFilter) ->
    {ok, ExpTree} = swirl_ql:parse(StreamFilter),
    ExpTree.

key(#flow {id = FlowId}, StreamName) ->
    {FlowId, StreamName}.

match_lookup_spec(StreamName) ->
    [{{{'$1', '$2'}, '$3'}, [{'orelse', {'=:=', '$2', StreamName},
        {'=:=', '$2', undefined}}], ['$3']}].

match_delete_spec(#flow {id = FlowId}, StreamName) ->
    {{FlowId, StreamName}, '_'}.