% This file is licensed to you under the Apache License,
% Version 2.0 (the "License"); you may not use this file
% except in compliance with the License. You may obtain
% a copy of the License at
%
% https://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing,
% software distributed under the License is distributed on an
% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
% KIND, either express or implied. See the License for the
% specific language governing permissions and limitations
% under the License.
%%% @doc Top supervisor for a `worker_pool'.
%%%
%%% This supervisor supervises `wpool_process_sup' (which is the worker's supervisor) together with
%%% auxiliary servers that help keep the whole pool running and in order.
%%%
%%% The strategy of this supervisor must be `one_for_all' but the intensity and period may be
%%% changed from their defaults by the `t:wpool:pool_sup_intensity()' and
%%% `t:wpool:pool_sup_intensity()' options respectively.
-module(wpool_pool).
-behaviour(supervisor).
%% API
-export([start_link/2]).
-export([best_worker/1, random_worker/1, next_worker/1, hash_worker/2,
next_available_worker/1, send_request_available_worker/3, call_available_worker/3]).
-export([cast_to_available_worker/2, broadcast/2, broadcall/3]).
-export([stats/0, stats/1, get_workers/1]).
-export([worker_name/2, find_wpool/1]).
-export([next/2, wpool_get/2]).
-export([add_callback_module/2, remove_callback_module/2]).
%% Supervisor callbacks
-export([init/1]).
-record(wpool,
{name :: wpool:name(),
size :: pos_integer(),
next :: atomics:atomics_ref(),
workers :: tuple(),
opts :: [wpool:option()],
qmanager :: wpool_queue_manager:queue_mgr(),
born = erlang:system_time(second) :: integer()}).
-opaque wpool() :: #wpool{}.
-export_type([wpool/0]).
%% ===================================================================
%% API functions
%% ===================================================================
%% @doc Starts a supervisor with several `wpool_process'es as its children
-spec start_link(wpool:name(), [wpool:option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
start_link(Name, Options) ->
supervisor:start_link({local, Name}, ?MODULE, {Name, Options}).
%% @doc Picks the worker with the smaller queue of messages.
%% @throws no_workers
-spec best_worker(wpool:name()) -> atom().
best_worker(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool ->
min_message_queue(Wpool)
end.
%% @doc Picks a random worker
%% @throws no_workers
-spec random_worker(wpool:name()) -> atom().
random_worker(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool = #wpool{size = Size} ->
WorkerNumber = fast_rand_uniform(Size),
nth_worker_name(Wpool, WorkerNumber)
end.
%% @doc Picks the next worker in a round robin fashion
%% @throws no_workers
-spec next_worker(wpool:name()) -> atom().
next_worker(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool = #wpool{next = Atomic, size = Size} ->
Index = atomics:get(Atomic, 1),
NextIndex = next_to_check(Index, Size),
_ = atomics:compare_exchange(Atomic, 1, Index, NextIndex),
nth_worker_name(Wpool, Index)
end.
%% @doc Picks the first available worker, if any
%% @throws no_workers | no_available_workers
-spec next_available_worker(wpool:name()) -> atom().
next_available_worker(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool ->
case worker_with_no_task(Wpool) of
undefined ->
exit(no_available_workers);
Worker ->
Worker
end
end.
%% @doc Picks the first available worker and sends the call to it.
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
-spec call_available_worker(wpool:name(), any(), timeout()) -> atom().
call_available_worker(Name, Call, Timeout) ->
case wpool_queue_manager:call_available_worker(queue_manager_name(Name), Call, Timeout) of
noproc ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
end.
%% @doc Picks the first available worker and sends the request to it.
%% The timeout provided considers only the time it takes to get a worker
-spec send_request_available_worker(wpool:name(), any(), timeout()) ->
noproc | timeout | gen_server:request_id().
send_request_available_worker(Name, Call, Timeout) ->
wpool_queue_manager:send_request_available_worker(queue_manager_name(Name),
Call,
Timeout).
%% @doc Picks a worker base on a hash result.
%% <pre>phash2(Term, Range)</pre> returns hash = integer,
%% 0 <= hash < Range so <pre>1</pre> must be added
%% @throws no_workers
-spec hash_worker(wpool:name(), term()) -> atom().
hash_worker(Name, HashKey) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool = #wpool{size = WpoolSize} ->
Index = 1 + erlang:phash2(HashKey, WpoolSize),
nth_worker_name(Wpool, Index)
end.
%% @doc Casts a message to the first available worker.
%% Since we can wait forever for a wpool:cast to be delivered
%% but we don't want the caller to be blocked, this function
%% just forwards the cast when it gets the worker
-spec cast_to_available_worker(wpool:name(), term()) -> ok.
cast_to_available_worker(Name, Cast) ->
wpool_queue_manager:cast_to_available_worker(queue_manager_name(Name), Cast).
%% @doc Casts a message to all the workers within the given pool.
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Name, Cast) ->
lists:foreach(fun(Worker) -> ok = wpool_process:cast(Worker, Cast) end,
all_workers(Name)).
%% @doc Calls all workers in the pool in parallel
%%
%% Waits for responses in parallel too, and it assumes that if any response times out,
%% all of them did too and therefore exits with reason timeout like a regular `gen_server' does.
-spec broadcall(wpool:name(), term(), timeout()) ->
{[Replies :: term()], [Errors :: term()]}.
broadcall(Name, Call, Timeout) ->
Workers = all_workers(Name),
ReqId0 = gen_server:reqids_new(),
RequestFold = fun(Worker, Acc) -> gen_server:send_request(Worker, Call, Name, Acc) end,
ReqId1 = lists:foldl(RequestFold, ReqId0, Workers),
WaitFold =
fun(_, {Coll, Replies, Errors}) ->
case gen_server:receive_response(Coll, Timeout, true) of
{{reply, Reply}, _, Coll1} ->
{Coll1, [Reply | Replies], Errors};
{{error, Error}, _, Coll1} ->
{Coll1, Replies, [Error | Errors]};
timeout ->
exit({timeout, {?MODULE, broadcall, [Name, Call, Timeout]}})
end
end,
{_, Replies, Errors} = lists:foldl(WaitFold, {ReqId1, [], []}, Workers),
{Replies, Errors}.
-spec all() -> [wpool:name()].
all() ->
[Name || {{?MODULE, Name}, _} <- persistent_term:get(), find_wpool(Name) /= undefined].
%% @doc Retrieves the list of worker registered names.
%% This can be useful to manually inspect the workers or do custom work on them.
-spec get_workers(wpool:name()) -> [atom()].
get_workers(Name) ->
all_workers(Name).
%% @doc Retrieves the pool stats for all pools
-spec stats() -> [wpool:stats()].
stats() ->
[stats(Name) || Name <- all()].
%% @doc Retrieves a snapshot of the pool stats
%% @throws no_workers
-spec stats(wpool:name()) -> wpool:stats().
stats(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool ->
stats(Wpool, Name)
end.
stats(Wpool, Name) ->
{Total, WorkerStats} =
lists:foldl(fun(N, {T, L}) ->
case worker_info(Wpool,
N,
[message_queue_len,
memory,
current_function,
current_location,
dictionary])
of
undefined ->
{T, L};
[{message_queue_len, MQL} = MQLT,
Memory,
Function,
Location,
{dictionary, Dictionary}] ->
WS = [MQLT, Memory]
++ function_location(Function, Location)
++ task(proplists:get_value(wpool_task, Dictionary)),
{T + MQL, [{N, WS} | L]}
end
end,
{0, []},
lists:seq(1, Wpool#wpool.size)),
PendingTasks = wpool_queue_manager:pending_task_count(Wpool#wpool.qmanager),
[{pool, Name},
{supervisor, erlang:whereis(Name)},
{options, lists:ukeysort(1, proplists:unfold(Wpool#wpool.opts))},
{size, Wpool#wpool.size},
{next_worker, atomics:get(Wpool#wpool.next, 1)},
{total_message_queue_len, Total + PendingTasks},
{workers, WorkerStats}].
worker_info(Wpool, N, Info) ->
case erlang:whereis(nth_worker_name(Wpool, N)) of
undefined ->
undefined;
Worker ->
erlang:process_info(Worker, Info)
end.
function_location({current_function, {gen_server, loop, _}}, _) ->
[];
function_location({current_function, {erlang, hibernate, _}}, _) ->
[];
function_location(Function, Location) ->
[Function, Location].
task(undefined) ->
[];
task({_TaskId, Started, Task}) ->
Time =
calendar:datetime_to_gregorian_seconds(
calendar:universal_time()),
[{task, Task}, {runtime, Time - Started}].
%% @doc Set next within the worker pool record. Useful when using
%% a custom strategy function.
-spec next(pos_integer(), wpool()) -> wpool().
next(Next, #wpool{next = Atomic} = Wpool) ->
atomics:put(Atomic, 1, Next),
Wpool.
%% @doc Adds a callback module.
%% The module must implement the `wpool_process_callbacks' behaviour.
-spec add_callback_module(wpool:name(), module()) -> ok | {error, term()}.
add_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:add_callback_module(EventManager, Module).
%% @doc Removes a callback module.
-spec remove_callback_module(wpool:name(), module()) -> ok | {error, term()}.
remove_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:remove_callback_module(EventManager, Module).
%% @doc Get values from the worker pool record. Useful when using a custom
%% strategy function.
-spec wpool_get(atom(), wpool()) -> any();
([atom()], wpool()) -> any().
wpool_get(List, Wpool) when is_list(List) ->
[g(Atom, Wpool) || Atom <- List];
wpool_get(Atom, Wpool) when is_atom(Atom) ->
g(Atom, Wpool).
g(name, #wpool{name = Ret}) ->
Ret;
g(size, #wpool{size = Ret}) ->
Ret;
g(next, #wpool{next = Ret}) ->
atomics:get(Ret, 1);
g(opts, #wpool{opts = Ret}) ->
Ret;
g(qmanager, #wpool{qmanager = Ret}) ->
Ret;
g(born, #wpool{born = Ret}) ->
Ret.
-spec time_checker_name(wpool:name()) -> atom().
time_checker_name(Name) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ "-time-checker").
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
%% @private
-spec init({wpool:name(), [wpool:option()]}) ->
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init({Name, Options}) ->
Size = proplists:get_value(workers, Options, 100),
QueueType = proplists:get_value(queue_type, Options),
OverrunHandler =
proplists:get_value(overrun_handler, Options, {error_logger, warning_report}),
TimeChecker = time_checker_name(Name),
QueueManager = queue_manager_name(Name),
ProcessSup = process_sup_name(Name),
EventManagerName = event_manager_name(Name),
_Wpool = store_wpool(Name, Size, Options),
TimeCheckerSpec =
{TimeChecker,
{wpool_time_checker, start_link, [Name, TimeChecker, OverrunHandler]},
permanent,
brutal_kill,
worker,
[wpool_time_checker]},
QueueManagerSpec =
{QueueManager,
{wpool_queue_manager, start_link, [Name, QueueManager, [{queue_type, QueueType}]]},
permanent,
brutal_kill,
worker,
[wpool_queue_manager]},
EventManagerSpec =
{EventManagerName,
{gen_event, start_link, [{local, EventManagerName}]},
permanent,
brutal_kill,
worker,
dynamic},
SupShutdown = proplists:get_value(pool_sup_shutdown, Options, brutal_kill),
WorkerOpts =
[{queue_manager, QueueManager}, {time_checker, TimeChecker} | Options]
++ maybe_event_manager(Options, {event_manager, EventManagerName}),
ProcessSupSpec =
{ProcessSup,
{wpool_process_sup, start_link, [Name, ProcessSup, WorkerOpts]},
permanent,
SupShutdown,
supervisor,
[wpool_process_sup]},
Children =
[TimeCheckerSpec, QueueManagerSpec]
++ maybe_event_manager(Options, EventManagerSpec)
++ [ProcessSupSpec],
SupIntensity = proplists:get_value(pool_sup_intensity, Options, 5),
SupPeriod = proplists:get_value(pool_sup_period, Options, 60),
SupStrategy =
#{strategy => one_for_all,
intensity => SupIntensity,
period => SupPeriod},
{ok, {SupStrategy, Children}}.
%% @private
-spec nth_worker_name(wpool(), pos_integer()) -> atom().
nth_worker_name(#wpool{workers = Workers}, I) ->
element(I, Workers).
-spec worker_name(wpool:name(), pos_integer()) -> atom().
worker_name(Name, I) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ [$- | integer_to_list(I)]).
%% ===================================================================
%% Private functions
%% ===================================================================
process_sup_name(Name) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ "-process-sup").
queue_manager_name(Name) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ "-queue-manager").
event_manager_name(Name) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ "-event-manager").
worker_with_no_task(#wpool{size = Size} = Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
%% do not always start asking for process_info to the processes that are most
%% likely to have bigger message queues
First = fast_rand_uniform(Size),
worker_with_no_task(0, Size, First, Wpool).
worker_with_no_task(Size, Size, _, _) ->
undefined;
worker_with_no_task(Step, Size, ToCheck, Wpool) ->
Worker = nth_worker_name(Wpool, ToCheck),
case try_process_info(whereis(Worker), [message_queue_len, dictionary]) of
[{message_queue_len, 0}, {dictionary, Dictionary}] ->
case proplists:get_value(wpool_task, Dictionary) of
undefined ->
Worker;
_ ->
worker_with_no_task(Step + 1, Size, next_to_check(ToCheck, Size), Wpool)
end;
_ ->
worker_with_no_task(Step + 1, Size, next_to_check(ToCheck, Size), Wpool)
end.
try_process_info(undefined, _) ->
[];
try_process_info(Pid, Keys) ->
erlang:process_info(Pid, Keys).
min_message_queue(#wpool{size = Size} = Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
%% do not always start asking for process_info to the processes that are most
%% likely to have bigger message queues
First = fast_rand_uniform(Size),
Worker = nth_worker_name(Wpool, First),
QLength = queue_length(whereis(Worker)),
min_message_queue(0, Size, First, Wpool, QLength, Worker).
min_message_queue(_, _, _, _, 0, Worker) ->
Worker;
min_message_queue(Size, Size, _, _, _QLength, Worker) ->
Worker;
min_message_queue(Step, Size, ToCheck, Wpool, CurrentQLength, CurrentWorker) ->
Worker = nth_worker_name(Wpool, ToCheck),
QLength = queue_length(whereis(Worker)),
Next = next_to_check(ToCheck, Size),
case QLength < CurrentQLength of
true ->
min_message_queue(Step + 1, Size, Next, Wpool, QLength, Worker);
false ->
min_message_queue(Step + 1, Size, Next, Wpool, CurrentQLength, CurrentWorker)
end.
next_to_check(Next, Size) ->
Next rem Size + 1.
fast_rand_uniform(Range) ->
UI = erlang:unique_integer(),
1 + erlang:phash2(UI, Range).
queue_length(undefined) ->
infinity;
queue_length(Pid) when is_pid(Pid) ->
case erlang:process_info(Pid, message_queue_len) of
{message_queue_len, L} ->
L;
undefined ->
infinity
end.
-spec all_workers(wpool:name()) -> [atom()].
all_workers(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
#wpool{workers = Workers} ->
tuple_to_list(Workers)
end.
%% ===================================================================
%% Storage functions
%% ===================================================================
store_wpool(Name, Size, Options) ->
Atomic = atomics:new(1, [{signed, false}]),
atomics:put(Atomic, 1, 1),
WorkerNames = list_to_tuple([worker_name(Name, I) || I <- lists:seq(1, Size)]),
Wpool =
#wpool{name = Name,
size = Size,
next = Atomic,
workers = WorkerNames,
opts = Options,
qmanager = queue_manager_name(Name)},
persistent_term:put({?MODULE, Name}, Wpool),
Wpool.
%% @doc Use this function to get the Worker pool record in a custom worker.
-spec find_wpool(atom()) -> undefined | wpool().
find_wpool(Name) ->
try {erlang:whereis(Name), persistent_term:get({?MODULE, Name})} of
{undefined, _} ->
undefined;
{_, Wpool} ->
Wpool
catch
_:badarg ->
build_wpool(Name)
end.
%% @doc We use this function not to report an error if for some reason we've
%% lost the record on the persistent_term table. This SHOULDN'T be called too much.
build_wpool(Name) ->
error_logger:warning_msg("Building a #wpool record for ~p. Something must have failed.",
[Name]),
try supervisor:count_children(process_sup_name(Name)) of
Children ->
Size = proplists:get_value(active, Children, 0),
store_wpool(Name, Size, [])
catch
_:Error ->
error_logger:warning_msg("Wpool ~p not found: ~p", [Name, Error]),
undefined
end.
maybe_event_manager(Options, Item) ->
EnableEventManager = proplists:get_value(enable_callbacks, Options, false),
case EnableEventManager of
true ->
[Item];
_ ->
[]
end.