src/eharbor_coordinator.erl

-module(eharbor_coordinator).

-behavior(gen_server).

-export([start_link/1, pause/1, pause/2, get_stats/1]).
-export([init/1, handle_info/2, handle_cast/2, handle_call/3]).

-define(PAUSE_DEFAULT_MS, 100).

-record(state,
        {name,
         dedup,
         ordered,
         backlog,
         %% How many unique tasks
         piers,
         %% Breakwater_limit total number of processes waiting out of buffer
         breakwater_limit,
         breakwater_usage = 0,
         piers_in_use = 0,
         active_work_registry = #{},
         conductor_registry = #{},
         conductor_monitors = #{},
         pobox_pid = nil,
         notify_ready = true,
         total_messages = 0,
         dropped_messages = 0}).

%% API functions
-spec start_link(eharbor:incomplete_config()) -> {ok, pid()}.
start_link(IncompleteConfig) ->
    Config = #{name := Name} = eharbor:merge_with_defaults(IncompleteConfig),

    gen_server:start_link({local, eharbor:coordinator_name(Name)}, ?MODULE, Config, []).

%% For testing only simulate a busy coordinator
pause(Name) ->
    pause(Name, ?PAUSE_DEFAULT_MS).

pause(Name, Sleep) ->
    gen_server:cast(Name, {pause, Sleep}).

%% Get messages vs dropped message count since the start.
get_stats(Name) ->
    gen_server:call(Name, get_stats).

%% gen_server callbacks
init(_Config =
         #{name := Name,
           dedup := Dedup,
           ordered := Ordered,
           piers := Piers,
           breakwater_limit := BreakwaterLimit,
           backlog := Backlog}) ->
    %% Start the pobox load shedding buffer
    {ok, PoboxPid} =
        pobox:start_link({local, eharbor:pobox_name(Name)}, erlang:self(), Backlog, keep_old),
    %% Enter notify mode (tells pobox we are ready to handle new messages)
    ok = pobox:notify(PoboxPid),
    {ok,
     #state{name = Name,
            dedup = Dedup,
            ordered = Ordered,
            piers = Piers,
            breakwater_limit = BreakwaterLimit,
            backlog = Backlog,
            pobox_pid = PoboxPid}}.

handle_call(get_stats,
            _From,
            State = #state{total_messages = TotalMessages, dropped_messages = DroppedMessages}) ->
    {reply,
     #{total_messages => TotalMessages, dropped_messages => DroppedMessages},
     handle_notify(State)};
handle_call(_Msg, _From, State) ->
    {reply, ok, handle_notify(State)}.

handle_cast({pause, Sleep}, State) ->
    timer:sleep(Sleep),
    {noreply, handle_notify(State)};
handle_cast(_Msg, State) ->
    {noreply, handle_notify(State)}.

handle_info({mail, PoboxPid, new_data},
            State =
                #state{pobox_pid = PoboxPid,
                       piers = Piers,
                       piers_in_use = PiersInUse,
                       breakwater_usage = BreakwaterUsage,
                       breakwater_limit = BreakwaterLimit}) ->
    %% New data is available in our buffer determine the maximum we can accept without
    %% going over our limits and switch the pobox to active mode to get a batch.
    AvailableSlots = erlang:min(Piers - PiersInUse, BreakwaterLimit - BreakwaterUsage),
    pobox:active(PoboxPid,
                 fun (_Msg, Count) when Count =:= AvailableSlots ->
                         skip;
                     (Msg, Count) ->
                         {{ok, Msg}, Count + 1}
                 end,
                 0),
    {noreply, State};
handle_info({mail, PoboxPid, Messages, Count, DropCount},
            State =
                #state{pobox_pid = PoboxPid,
                       piers_in_use = PiersInUse,
                       breakwater_usage = BreakwaterUsage,
                       active_work_registry = ActiveWorkRegistry0,
                       conductor_registry = ConductorRegistry0,
                       conductor_monitors = ConductorMonitors0,
                       dedup = Dedup,
                       total_messages = TotalMessages,
                       dropped_messages = DroppedMessages}) ->
    ArgumentGroups =
        case Dedup of
            true ->
                group_by_key([{Args, From} || {'$habor_dock', From, Args} <- Messages]);
            false ->
                maps:from_list([{Args, [From]} || {'$habor_dock', From, Args} <- Messages])
        end,

    ArgumentGroupsKeys = maps:keys(ArgumentGroups),

    {OldArgsList, NewArgsList} =
        lists:partition(fun(ArgumentGroup) -> maps:is_key(ArgumentGroup, ActiveWorkRegistry0) end,
                        ArgumentGroupsKeys),

    [begin
         {ConductorPid, _MRef} =
             erlang:hd(
                 maps:get(OldArgs, ActiveWorkRegistry0)),
         FollowerFroms = maps:get(OldArgs, ArgumentGroups),
         lists:foreach(fun(FollowerFrom) -> gen:reply(FollowerFrom, {follower, ConductorPid}) end,
                       FollowerFroms)
     end
     || OldArgs <- OldArgsList],

    NewConductorMrefs =
        lists:foldl(fun(NewArgs, Acc) ->
                       ConductorFrom = hd(maps:get(NewArgs, ArgumentGroups)),
                       {ConductorPid, _MRef} = ConductorFrom,
                       ConductorMRef = erlang:monitor(process, ConductorPid),
                       Acc#{ConductorPid => {ConductorMRef, ConductorFrom}}
                    end,
                    #{},
                    NewArgsList),

    NewConductors =
        maps:from_list(
            lists:map(fun(NewArgs) ->
                         ConductorFrom = hd(maps:get(NewArgs, ArgumentGroups)),
                         {ConductorPid, _Mref} = ConductorFrom,
                         FollowerFroms = tl(maps:get(NewArgs, ArgumentGroups)),
                         gen:reply(ConductorFrom, conductor),
                         lists:foreach(fun(FollowerFrom) ->
                                          gen:reply(FollowerFrom, {follower, ConductorPid})
                                       end,
                                       FollowerFroms),
                         {ConductorFrom, NewArgs}
                      end,
                      NewArgsList)),
    ConductorRegistry1 = maps:merge(ConductorRegistry0, NewConductors),
    ActiveWorkRegistry1 =
        maps:merge_with(fun(_K, V1, V2) -> V1 ++ V2 end, ActiveWorkRegistry0, ArgumentGroups),

    ConductorMonitors1 = maps:merge(ConductorMonitors0, NewConductorMrefs),

    {noreply,
     handle_notify(State#state{active_work_registry = ActiveWorkRegistry1,
                               conductor_registry = ConductorRegistry1,
                               conductor_monitors = ConductorMonitors1,
                               piers_in_use = PiersInUse + erlang:length(NewArgsList),
                               breakwater_usage = BreakwaterUsage + Count,
                               notify_ready = true,
                               total_messages = TotalMessages + Count,
                               dropped_messages = DroppedMessages + DropCount})};
handle_info({'$harbor_work_completed', ConductorFrom},
            State =
                #state{active_work_registry = ActiveWorkRegistry0,
                       conductor_registry = ConductorRegistry0,
                       conductor_monitors = ConductorMonitors0,
                       breakwater_usage = BreakwaterUsage,
                       piers_in_use = PiersInUse}) ->
    ConductorArgs = maps:get(ConductorFrom, ConductorRegistry0),
    {[ConductorFrom | Followers] = BreakwaterProcs, ActiveWorkRegistry1} =
        maps:take(ConductorArgs, ActiveWorkRegistry0),
    gen:reply(ConductorFrom, {followers, Followers}),
    {ConductorArgs, ConductorRegistry1} = maps:take(ConductorFrom, ConductorRegistry0),
    {ConductorPid, _ConductorMRef} = ConductorFrom,
    {{ConductorMRef, _ConductorFrom}, ConductorMonitors1} =
        maps:take(ConductorPid, ConductorMonitors0),
    erlang:demonitor(ConductorMRef, [flush]),
    {noreply,
     handle_notify(State#state{active_work_registry = ActiveWorkRegistry1,
                               conductor_registry = ConductorRegistry1,
                               conductor_monitors = ConductorMonitors1,
                               breakwater_usage = BreakwaterUsage - erlang:length(BreakwaterProcs),
                               piers_in_use = PiersInUse - 1})};
handle_info({'DOWN', _ConductorMRef, process, ConductorPid, _Reason},
            State =
                #state{active_work_registry = ActiveWorkRegistry0,
                       conductor_registry = ConductorRegistry0,
                       conductor_monitors = ConductorMonitors0,
                       breakwater_usage = BreakwaterUsage,
                       piers_in_use = PiersInUse}) ->
    {{ConductorMRef, ConductorFrom}, ConductorMonitors1} =
        maps:take(ConductorPid, ConductorMonitors0),
    {Args, ConductorRegistry1} = maps:take(ConductorFrom, ConductorRegistry0),
    {Froms, ActiveWorkRegistry1} = maps:take(Args, ActiveWorkRegistry0),
    erlang:demonitor(ConductorMRef, [flush]),
    {noreply,
     handle_notify(State#state{active_work_registry = ActiveWorkRegistry1,
                               conductor_registry = ConductorRegistry1,
                               conductor_monitors = ConductorMonitors1,
                               breakwater_usage = BreakwaterUsage - erlang:length(Froms),
                               piers_in_use = PiersInUse - 1})};
handle_info(ping, State) ->
    {noreply, handle_notify(State)};
handle_info(_Msg, State) ->
    {noreply, handle_notify(State)}.

handle_notify(State =
                  #state{pobox_pid = PoboxPid,
                         breakwater_usage = BreakwaterUsage,
                         breakwater_limit = BreakwaterLimit,
                         piers_in_use = PiersInUse,
                         piers = Piers,
                         notify_ready = true}) ->
    {message_queue_len, QueueLen} =
        erlang:process_info(
            erlang:self(), message_queue_len),
    case QueueLen =:= 0 andalso BreakwaterUsage < BreakwaterLimit andalso PiersInUse < Piers
    of
        true ->
            pobox:notify(PoboxPid),
            State#state{notify_ready = false};
        false ->
            State
    end;
handle_notify(State) ->
    State.

group_by_key(KVs) ->
    lists:foldl(fun({K, V}, Acc) ->
                   case maps:is_key(K, Acc) of
                       true ->
                           Value = maps:get(K, Acc),
                           maps:put(K, [V | Value], Acc);
                       false ->
                           maps:put(K, [V], Acc)
                   end
                end,
                #{},
                KVs).