src/pmap.erl

%%%------------------------------------------------------------------------
%%% File: $Id$
%%%------------------------------------------------------------------------
%%% @doc Parallel map and multicall.
%%%
%%% @author  Serge Aleynikov <saleyn@gmail.com> (multicall)
%%%          Luke Gorries (http://lukego.livejournal.com) (pmap)
%%% @version $Rev$
%%%          $Date: 2008/07/02 03:29:58 $
%%% @end
%%%
%%%------------------------------------------------------------------------
%%% Created: 2008/02/11 10:07:15
%%%------------------------------------------------------------------------
-module(pmap).
-author('saleyn@gmail.com').
-id    ("$Id$").

-export([pmap/2, pmap/3, multicall/2, reply/2]).

-ifdef(DEBUG).
-export([test/0, test1/0, test2/0, test3/0]).
-endif.

%%-------------------------------------------------------------------------
%% @doc Evaluate the `MultiArgs' list by calling `F' on each argument in
%%      the list concurrently.
%% @see pmap/3
%% @end
%%-------------------------------------------------------------------------
-spec pmap(fun(() -> term()), [Args::term()]) -> [Reply::term()].
pmap(F, List) ->
    pmap(F, List, infinity).

%%-------------------------------------------------------------------------
%% @doc Evaluate the `MultiArgs' list by calling `F' on each argument in
%%      the list concurrently.  Same as pmap/2 but has a `Timeout' option.
%% @end
%%-------------------------------------------------------------------------
-spec pmap(fun((term()) -> term()), Args::[term()], integer()|infinity) -> [Reply::term()].
pmap(F, List, Timeout) ->
    Ref = make_ref(),
    [wait_result(Ref, Worker, Timeout) || Worker <- [spawn_worker(self(),Ref,F,E) || E <- List]].

spawn_worker(Parent, Ref, F, E) ->
    erlang:spawn_monitor(fun() -> reply({Parent, Ref}, F(E)) end).

wait_result(Ref, {Pid,MonRef}, Timeout) ->
    receive
    {'DOWN', MonRef, _, _, normal} -> 
        receive 
        {{Pid, Ref}, Result} ->
            Result
        after Timeout ->
            {error, timeout}
        end;
    {'DOWN', MonRef, _, _, Reason} ->
        {error, Reason}
    end.

%%-------------------------------------------------------------------------
%% @doc Send messages to pids and wait for replies.
%%      Each Pid would get a message in the form:
%%      `{{FromPid, Ref}, Msg}' and would have to reply with:
%%      `FromPid ! {{self(), Ref}, Reply}'. The function aggregates all 
%%      replies into `Success' and `Error' lists. The error list is in the 
%%      form: `{Pid, ErrorReason}'.
%% @end
%%-------------------------------------------------------------------------
-spec multicall([{pid(), term()}], Timeout::timeout()) -> {[OkReply::term()], [ErrorReply::term()]}.
multicall([], _Timeout) ->
    {[], []};
multicall(PidMsgs, Timeout) when is_list(PidMsgs) ->
    Ref = make_ref(),
    Fun = fun(_) ->
        {Refs, Errors, Monitors} = lists:foldl(
            fun({Pid, Msg}, {Refs, Err, Mons}) ->
                case node(Pid) =/= node() orelse erlang:is_process_alive(Pid) of
                true  -> 
                    reply({Pid, Ref}, Msg),
                    MonRef = erlang:monitor(process, Pid),
                    {gb_sets:add({Pid, Ref}, Refs), Err, [{MonRef, Pid} | Mons]};
                false -> 
                    {Refs, [{Pid, {error, no_process}} | Err], Mons}
                end
            end,
            {gb_sets:empty(), [], []},
            PidMsgs),
            
        gather_results(Refs, [], Errors, Monitors, Timeout)
    end,
    lists:foldl(
        fun({Ok, Err}, {AccOk, AccErr}) -> 
            {Ok ++ AccOk, Err ++ AccErr};
        (Other, _Acc) ->
            erlang:error({?MODULE, unexpected_result, Other})
        end,
        {[], []},
        pmap(Fun, [[]], Timeout)
    ).

gather_results({0,nil}, Replies, Errors, _Monitors, _Timeout) ->
    {lists:reverse(Replies), Errors};
gather_results(Set, Replies, Errors, Monitors, Timeout) ->
    receive
    {{Pid, Ref}, Result} ->
        try 
            NewSet = gb_sets:delete({Pid, Ref}, Set),
            gather_results(NewSet, [{Pid, Result} | Replies], Errors, Monitors, Timeout)
        catch _:_ ->
            gather_results(Set, Replies, Errors, Monitors, Timeout)
        end;
    {'DOWN', _MonRef, _, _, normal} ->
        % Ideally we'd have to remove the _MonRef from the Monitors list,
        % but since the multicall/2 call is executed in its own process (via pmap call)
        % at the end of this multicall all monitors are cleaned up as the process dies.
        gather_results(Set, Replies, Errors, Monitors, Timeout);
    {'DOWN', MonRef, _, _, Reason} ->
        case lists:keytake(MonRef, 1, Monitors) of
        {value, {_, Pid}, NewMonitors} ->
            NewSet = gb_sets:filter(fun({P, _}) -> P =/= Pid end, Set),
            gather_results(NewSet, Replies, 
                [{Pid, {error, {process_disconnected, Reason}}} | Errors], 
                NewMonitors, Timeout);
        false ->
            gather_results(Set, Replies, Errors, Monitors, Timeout)
        end
    after Timeout ->
        {Replies, gb_sets:fold(fun({Pid, _}, Acc) -> [{Pid, {error, timeout}} | Acc] end, [], Set) ++ Errors}
    end.

%%-------------------------------------------------------------------------
%% @doc Send a reply back to sender.
%% @end
%%-------------------------------------------------------------------------
-spec reply({pid(), reference()}, Reply::term()) -> ok.
reply({FromPid, Ref}, Msg) ->
    catch FromPid ! {{self(), Ref}, Msg},
    ok.

%%%------------------------------------------------------------------------
%%% TEST CASES
%%%------------------------------------------------------------------------

-ifdef(DEBUG).
test() ->
    Expected = lists:seq(1, 20),
    F = fun() -> receive {{_Pid, _Ref} = From, Msg} -> pmap:reply(From, Msg) end end,
    Msgs = [{spawn(F), I} || I <- Expected],
    {Oks, []} = pmap:multicall(Msgs, 10000),
    Expected = lists:sort([I || {_Pid, I} <- Oks]).
    
test1() ->
    F = fun() -> 
            receive 
            {_From, {_I, X}} when X == 3 -> 
                timer:sleep(1000), 
                exit(killed);
            {{From, Ref}, {I, _X}} -> 
                From ! {{self(), Ref}, I}, 
                timer:sleep(2000) 
            end 
        end,
    {A,B,C}=now(), random:seed(A,B,C),
    Msgs = [{spawn(F), {I, random:uniform(3)}} || I <- lists:seq(1, 20)],
    pmap:multicall(Msgs, 10000).

test2() ->
    [1,4,9,16,25] = pmap:pmap(fun(I) -> I*I end, [I || I <- lists:seq(1,5)]).

test3() ->
    F = fun() -> receive {{From, Ref}, Msg} -> timer:sleep(2000), From ! {{self(), Ref}, Msg}, timer:sleep(1000) end end,
    Msgs = [{spawn(F), I} || I <- lists:seq(1, 5)],
    multicall(Msgs, 1000).
    
-endif.