src/erlmld_runner.erl

%%% @copyright (C) 2016, AdRoll
%%% @doc
%%%
%%%     Given a path corresponding to an MLD .properties file, run a script with that path
%%%     as its argument using erlexec, assuming it should run forever.  That script should
%%%     launch an instance of the MultiLangDaemon, which interprets the properties file
%%%     and connects back to this node (one TCP connection for each shard owned by that
%%%     MLD instance).
%%%
%%% @end
%%% Created : 18 Nov 2016 by Mike Watters <mike.watters@adroll.com>

-module(erlmld_runner).

-export([start_link/3, run/3, build_properties/1]).

start_link(Regname, Pathname, StreamType) ->
    {ok, spawn_link(?MODULE, run, [Regname, Pathname, StreamType])}.

run(Regname, Pathname, StreamType) ->
    register(Regname, self()),
    {Exe, CWD} = runner_params(StreamType),
    process_flag(trap_exit, true),
    {ok, ErlPid, OsPid} =
        exec:run_link([Exe, Pathname], [{cd, CWD}, {group, 0}, kill_group, stdout, stderr]),
    error_logger:info_msg("~p launched ~p (pid ~p)~n", [Regname, Exe, OsPid]),
    {ok, SpamMP} = spam_mp(),
    loop(Regname, ErlPid, OsPid, SpamMP).

loop(Regname, ErlPid, OsPid, SpamMP) ->
    receive
        {stdout, OsPid, _Data} ->
            ok;
        {stderr, OsPid, Data} ->
            case application:get_env(erlmld, log_kcl_spam, undefined) of
                true ->
                    io:format("~p: ~s", [Regname, Data]);
                false ->
                    ok;
                undefined ->
                    ok;
                {LagerMod, LagerSink} ->
                    [LagerMod:log(LagerSink, debug, LagerMod:md(), "~p: ~s", [Regname, Line])
                     || Line <- binary:split(Data, <<"\n">>, [global]), not is_spam(SpamMP, Line)],
                    ok
            end;
        {'EXIT', ErlPid, Reason} ->
            exit({child_exited, OsPid, Reason})
    end,
    loop(Regname, ErlPid, OsPid, SpamMP).

runner_params(StreamType) ->
    Runner = io_lib:format("run_~p.sh", [StreamType]),
    Pathname = priv_path(Runner),
    {Pathname, filename:dirname(Pathname)}.

input_properties_file_path() ->
    case application:get_env(erlmld, properties_file_path) of
        {ok, Val} ->
            Val;
        _ ->
            priv_path("mld.properties.in")
    end.

priv_path(Filename) ->
    Priv = code:priv_dir(erlmld),
    lists:flatten(
        filename:join(Priv, Filename)).

tempdir_path(Filename) ->
    filename:join(
        os:getenv("TMPDIR", "/tmp"), [erlmld, $/, Filename]).

%% given a map of option values, populate the MLD properties template, creating a file
%% like "$TMPDIR/erlmld/erlmld.X.properties", where X is either "default" or the
%% app_suffix value, and return that populated pathname.
build_properties(#{app_suffix := AppSuffix} = Opts) ->
    Input = input_properties_file_path(),
    Suffix =
        atom_to_list(case AppSuffix of
                         undefined ->
                             default;
                         _ ->
                             AppSuffix
                     end),
    Output = tempdir_path("erlmld." ++ Suffix ++ ".properties"),
    {ok, Template} = file:read_file(Input),
    {ok, Result} = apply_substitutions(Template, Opts),
    ok = filelib:ensure_dir(Output),
    ok = file:write_file(Output, Result),
    {ok, Output}.

%% given a binary template and a map of options, use the map to apply substitutions into
%% the template.  the map of options is used to populate shell-like variable references in
%% the template file as in the following example:
%%
%%   options:
%%
%%     #{xyzzy => <<"asdf">>, foo => undefined}
%%
%%   template contents:
%%
%%     someProperty = ${XYZZY}
%%     otherProperty = ${FOO}
%%
%%  result:
%%
%%     someProperty = asdf
%%     otherProperty =
%%
%%  Unknown variable references are errors.  To substitute an empty value, use undefined.
%%  Integers, atoms, and iodata are converted to binaries before being substituted.
%%  Other types are unsupported and ignored.
apply_substitutions(Template, Opts) ->
    Data =
        maps:fold(fun (_, V, Acc) when is_tuple(V); is_map(V) ->
                          Acc;
                      (K, V, Acc) ->
                          Var = iolist_to_binary("${" ++ string:to_upper(atom_to_list(K)) ++ "}"),
                          Val = case V of
                                    undefined ->
                                        <<>>;
                                    V when is_integer(V) ->
                                        integer_to_binary(V);
                                    V when is_atom(V) ->
                                        atom_to_binary(V, utf8);
                                    V when is_list(V) ->
                                        iolist_to_binary(V);
                                    V when is_binary(V) ->
                                        V;
                                    _ ->
                                        <<>>
                                end,
                          binary:replace(Acc, Var, Val, [global])
                  end,
                  Template,
                  Opts),
    case re:run(Data, <<"\\${([^}]+)}">>, [global]) of
        {match, Groups} ->
            {error,
             {unknown_variables,
              sets:to_list(
                  lists:foldl(fun([_, {Start, Size}], Acc) ->
                                 Name = binary:part(Data, {Start, Size}),
                                 sets:add_element(Name, Acc)
                              end,
                              sets:new(),
                              Groups))}};
        nomatch ->
            {ok, Data}
    end.

spam_mp() ->
    Spammy =
        [<<"^INFO: Received response ">>,
         <<"^INFO: Starting: Reading next message from STDIN ">>,
         <<"^INFO: Writing ProcessRecordsMessage to child process ">>,
         <<"^INFO: Message size == (40|63) bytes for shard ">>,
         <<"com.amazonaws.services.kinesis.multilang.MultiLangProtocol "
           "validateStatusMessage$">>,
         <<"com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage$">>,
         <<"com.amazonaws.services.kinesis.multilang.MessageWriter call$">>,
         <<"com.amazonaws.services.kinesis.multilang.LineReaderTask call$">>],
    re:compile(
        lists:join(<<"|">>, Spammy)).

is_spam(_, <<>>) ->
    true;
is_spam(MP, Bin) ->
    nomatch /= re:run(Bin, MP).