% This file is licensed to you under the Apache License,
% Version 2.0 (the "License"); you may not use this file
% except in compliance with the License. You may obtain
% a copy of the License at
%
% https://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing,
% software distributed under the License is distributed on an
% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
% KIND, either express or implied. See the License for the
% specific language governing permissions and limitations
% under the License.
%%% @doc Top supervisor for a `worker_pool'.
%%%
%%% This supervisor supervises `wpool_process_sup' (which is the worker's supervisor) together with
%%% auxiliary servers that help keep the whole pool running and in order.
%%%
%%% The strategy of this supervisor must be `one_for_all' but the intensity and period may be
%%% changed from their defaults by the `t:wpool:pool_sup_intensity()' and
%%% `t:wpool:pool_sup_intensity()' options respectively.
-module(wpool_pool).
-include_lib("kernel/include/logger.hrl").
-behaviour(supervisor).
%% API
-export([start_link/2]).
-export([
best_worker/1,
random_worker/1,
next_worker/1,
hash_worker/2,
next_available_worker/1,
send_request_available_worker/3,
call_available_worker/3,
run_with_available_worker/3
]).
-export([cast_to_available_worker/2, broadcast/2, broadcall/3]).
-export([stats/0, stats/1, get_workers/1]).
-export([worker_name/2, find_wpool/1]).
-export([next/2, wpool_get/2]).
-export([add_callback_module/2, remove_callback_module/2]).
%% Supervisor callbacks
-export([init/1]).
-record(wpool, {
name :: wpool:name(),
size :: pos_integer(),
next :: atomics:atomics_ref(),
workers :: tuple(),
opts :: wpool:options(),
qmanager :: wpool_queue_manager:queue_mgr(),
born = erlang:system_time() :: integer()
}).
-opaque wpool() :: #wpool{}.
-export_type([wpool/0]).
%% ===================================================================
%% API functions
%% ===================================================================
%% @doc Starts a supervisor with several `wpool_process'es as its children
-spec start_link(wpool:name(), wpool:options()) -> supervisor:startlink_ret().
start_link(Name, Options) ->
supervisor:start_link({local, Name}, ?MODULE, {Name, Options}).
%% @doc Picks the worker with the smaller queue of messages.
%% @throws no_workers
-spec best_worker(wpool:name()) -> atom().
best_worker(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool ->
min_message_queue(Wpool)
end.
%% @doc Picks a random worker
%% @throws no_workers
-spec random_worker(wpool:name()) -> atom().
random_worker(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool = #wpool{size = Size} ->
WorkerNumber = fast_rand_uniform(Size),
nth_worker_name(Wpool, WorkerNumber)
end.
%% @doc Picks the next worker in a round robin fashion
%% @throws no_workers
-spec next_worker(wpool:name()) -> atom().
next_worker(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool = #wpool{next = Atomic, size = Size} ->
Index = atomics:get(Atomic, 1),
NextIndex = next_to_check(Index, Size),
_ = atomics:compare_exchange(Atomic, 1, Index, NextIndex),
nth_worker_name(Wpool, Index)
end.
%% @doc Picks the first available worker, if any
%% @throws no_workers | no_available_workers
-spec next_available_worker(wpool:name()) -> atom().
next_available_worker(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool ->
case worker_with_no_task(Wpool) of
undefined ->
exit(no_available_workers);
Worker ->
Worker
end
end.
%% @doc Picks the first available worker and sends the call to it.
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
-spec run_with_available_worker(wpool:name(), wpool:run(Result), timeout()) -> Result.
run_with_available_worker(Name, Run, Timeout) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
#wpool{qmanager = QManager} ->
case wpool_queue_manager:run_with_available_worker(QManager, Run, Timeout) of
noproc ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
end
end.
%% @doc Picks the first available worker and sends the call to it.
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
-spec call_available_worker(wpool:name(), any(), timeout()) -> any().
call_available_worker(Name, Call, Timeout) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
#wpool{qmanager = QManager} ->
case wpool_queue_manager:call_available_worker(QManager, Call, Timeout) of
noproc ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
end
end.
%% @doc Picks the first available worker and sends the request to it.
%% The timeout provided considers only the time it takes to get a worker
-spec send_request_available_worker(wpool:name(), any(), timeout()) ->
noproc | timeout | gen_server:request_id().
send_request_available_worker(Name, Call, Timeout) ->
wpool_queue_manager:send_request_available_worker(
queue_manager_name(Name),
Call,
Timeout
).
%% @doc Picks a worker base on a hash result.
%% <pre>phash2(Term, Range)</pre> returns hash = integer,
%% 0 <= hash < Range so <pre>1</pre> must be added
%% @throws no_workers
-spec hash_worker(wpool:name(), term()) -> atom().
hash_worker(Name, HashKey) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool = #wpool{size = WpoolSize} ->
Index = 1 + erlang:phash2(HashKey, WpoolSize),
nth_worker_name(Wpool, Index)
end.
%% @doc Casts a message to the first available worker.
%% Since we can wait forever for a wpool:cast to be delivered
%% but we don't want the caller to be blocked, this function
%% just forwards the cast when it gets the worker
-spec cast_to_available_worker(wpool:name(), term()) -> ok.
cast_to_available_worker(Name, Cast) ->
wpool_queue_manager:cast_to_available_worker(queue_manager_name(Name), Cast).
%% @doc Casts a message to all the workers within the given pool.
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Name, Cast) ->
lists:foreach(
fun(Worker) -> ok = wpool_process:cast(Worker, Cast) end,
all_workers(Name)
).
%% @doc Calls all workers in the pool in parallel
%%
%% Waits for responses in parallel too, and it assumes that if any response times out,
%% all of them did too and therefore exits with reason timeout like a regular `gen_server' does.
-spec broadcall(wpool:name(), term(), timeout()) ->
{[Replies :: term()], [Errors :: term()]}.
broadcall(Name, Call, Timeout) ->
Workers = all_workers(Name),
ReqId0 = gen_server:reqids_new(),
RequestFold = fun(Worker, Acc) -> gen_server:send_request(Worker, Call, Name, Acc) end,
ReqId1 = lists:foldl(RequestFold, ReqId0, Workers),
WaitFold =
fun(_, {Coll, Replies, Errors}) ->
case gen_server:receive_response(Coll, Timeout, true) of
{{reply, Reply}, _, Coll1} ->
{Coll1, [Reply | Replies], Errors};
{{error, Error}, _, Coll1} ->
{Coll1, Replies, [Error | Errors]};
timeout ->
exit({timeout, {?MODULE, broadcall, [Name, Call, Timeout]}})
end
end,
{_, Replies, Errors} = lists:foldl(WaitFold, {ReqId1, [], []}, Workers),
{Replies, Errors}.
-spec all() -> [wpool:name()].
all() ->
[
Name
|| {{?MODULE, Name}, _} <- persistent_term:get(),
is_atom(Name),
find_wpool(Name) /= undefined
].
%% @doc Retrieves the list of worker registered names.
%% This can be useful to manually inspect the workers or do custom work on them.
-spec get_workers(wpool:name()) -> [atom()].
get_workers(Name) ->
all_workers(Name).
%% @doc Retrieves the pool stats for all pools
-spec stats() -> [wpool:stats()].
stats() ->
[stats(Name) || Name <- all()].
%% @doc Retrieves a snapshot of the pool stats
%% @throws no_workers
-spec stats(wpool:name()) -> wpool:stats().
stats(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool ->
stats(Wpool, Name)
end.
stats(Wpool, Name) ->
{Total, WorkerStats} =
lists:foldl(
fun(N, {T, L}) ->
case
worker_info(
Wpool,
N,
[
message_queue_len,
memory,
current_function,
current_location,
dictionary
]
)
of
undefined ->
{T, L};
[
{message_queue_len, MQL} = MQLT,
Memory,
Function,
Location,
{dictionary, Dictionary}
] ->
WS =
[MQLT, Memory] ++
function_location(Function, Location) ++
task(proplists:get_value(wpool_task, Dictionary)),
{T + MQL, [{N, WS} | L]}
end
end,
{0, []},
lists:seq(1, Wpool#wpool.size)
),
PendingTasks = wpool_queue_manager:pending_task_count(Wpool#wpool.qmanager),
[
{pool, Name},
{supervisor, erlang:whereis(Name)},
{options, maps:to_list(Wpool#wpool.opts)},
{size, Wpool#wpool.size},
{next_worker, atomics:get(Wpool#wpool.next, 1)},
{total_message_queue_len, Total + PendingTasks},
{workers, WorkerStats}
].
worker_info(Wpool, N, Info) ->
case erlang:whereis(nth_worker_name(Wpool, N)) of
undefined ->
undefined;
Worker ->
erlang:process_info(Worker, Info)
end.
function_location({current_function, {gen_server, loop, _}}, _) ->
[];
function_location({current_function, {erlang, hibernate, _}}, _) ->
[];
function_location(Function, Location) ->
[Function, Location].
task(undefined) ->
[];
task({_TaskId, Started, Task}) ->
Time = erlang:system_time(),
Runtime = erlang:convert_time_unit(Time - Started, native, second),
[{task, Task}, {runtime, Runtime}].
%% @doc Set next within the worker pool record. Useful when using
%% a custom strategy function.
-spec next(pos_integer(), wpool()) -> wpool().
next(Next, #wpool{next = Atomic} = Wpool) ->
atomics:put(Atomic, 1, Next),
Wpool.
%% @doc Adds a callback module.
%% The module must implement the `wpool_process_callbacks' behaviour.
-spec add_callback_module(wpool:name(), module()) -> ok | {error, term()}.
add_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:add_callback_module(EventManager, Module).
%% @doc Removes a callback module.
-spec remove_callback_module(wpool:name(), module()) -> ok | {error, term()}.
remove_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:remove_callback_module(EventManager, Module).
%% @doc Get values from the worker pool record. Useful when using a custom
%% strategy function.
-spec wpool_get
(atom(), wpool()) -> any();
([atom()], wpool()) -> any().
wpool_get(List, Wpool) when is_list(List) ->
[g(Atom, Wpool) || Atom <- List];
wpool_get(Atom, Wpool) when is_atom(Atom) ->
g(Atom, Wpool).
g(name, #wpool{name = Ret}) ->
Ret;
g(size, #wpool{size = Ret}) ->
Ret;
g(next, #wpool{next = Ret}) ->
atomics:get(Ret, 1);
g(opts, #wpool{opts = Ret}) ->
Ret;
g(qmanager, #wpool{qmanager = Ret}) ->
Ret;
g(born, #wpool{born = Ret}) ->
Ret.
-spec time_checker_name(wpool:name()) -> atom().
time_checker_name(Name) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ "-time-checker").
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
%% @private
-spec init({wpool:name(), wpool:options()}) ->
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init({Name, Options}) ->
Size = maps:get(workers, Options, 100),
QueueType = maps:get(queue_type, Options),
OverrunHandler = maps:get(overrun_handler, Options, {logger, warning}),
SupShutdown = maps:get(pool_sup_shutdown, Options, brutal_kill),
TimeCheckerName = time_checker_name(Name),
QueueManagerName = queue_manager_name(Name),
ProcessSupName = process_sup_name(Name),
EventManagerName = event_manager_name(Name),
_Wpool = store_wpool(Name, Size, Options),
WorkerOpts0 =
[{time_checker, TimeCheckerName}] ++
maybe_queue_manager(Options, QueueManagerName) ++
maybe_event_manager(Options, EventManagerName),
WorkerOpts =
maps:merge(
maps:from_list(WorkerOpts0), Options
),
TimeCheckerSpec =
#{
id => TimeCheckerName,
start => {wpool_time_checker, start_link, [Name, TimeCheckerName, OverrunHandler]},
restart => permanent,
shutdown => brutal_kill,
type => worker,
modules => [wpool_time_checker]
},
QueueManagerSpec =
#{
id => QueueManagerName,
start =>
{wpool_queue_manager, start_link, [
Name, QueueManagerName, [{queue_type, QueueType}]
]},
restart => permanent,
shutdown => brutal_kill,
type => worker,
modules => [wpool_queue_manager]
},
EventManagerSpec =
#{
id => EventManagerName,
start => {gen_event, start_link, [{local, EventManagerName}]},
restart => permanent,
shutdown => brutal_kill,
type => worker,
modules => dynamic
},
ProcessSupSpec =
{ProcessSupName, {wpool_process_sup, start_link, [Name, ProcessSupName, WorkerOpts]},
permanent, SupShutdown, supervisor, [wpool_process_sup]},
Children =
[TimeCheckerSpec] ++
maybe_queue_manager_child(Options, QueueManagerSpec) ++
maybe_event_manager_child(Options, EventManagerSpec) ++
[ProcessSupSpec],
SupIntensity = maps:get(pool_sup_intensity, Options, 5),
SupPeriod = maps:get(pool_sup_period, Options, 60),
SupStrategy =
#{
strategy => one_for_all,
intensity => SupIntensity,
period => SupPeriod
},
{ok, {SupStrategy, Children}}.
%% @private
-spec nth_worker_name(wpool(), pos_integer()) -> atom().
nth_worker_name(#wpool{workers = Workers}, I) ->
element(I, Workers).
-spec worker_name(wpool:name(), pos_integer()) -> atom().
worker_name(Name, I) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ [$- | integer_to_list(I)]).
%% ===================================================================
%% Private functions
%% ===================================================================
process_sup_name(Name) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ "-process-sup").
queue_manager_name(Name) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ "-queue-manager").
event_manager_name(Name) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ "-event-manager").
worker_with_no_task(#wpool{size = Size} = Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
%% do not always start asking for process_info to the processes that are most
%% likely to have bigger message queues
First = fast_rand_uniform(Size),
worker_with_no_task(0, Size, First, Wpool).
worker_with_no_task(Size, Size, _, _) ->
undefined;
worker_with_no_task(Step, Size, ToCheck, Wpool) ->
Worker = nth_worker_name(Wpool, ToCheck),
case try_process_info(whereis(Worker), [message_queue_len, dictionary]) of
[{message_queue_len, 0}, {dictionary, Dictionary}] ->
case proplists:get_value(wpool_task, Dictionary) of
undefined ->
Worker;
_ ->
worker_with_no_task(Step + 1, Size, next_to_check(ToCheck, Size), Wpool)
end;
_ ->
worker_with_no_task(Step + 1, Size, next_to_check(ToCheck, Size), Wpool)
end.
try_process_info(undefined, _) ->
[];
try_process_info(Pid, Keys) ->
erlang:process_info(Pid, Keys).
min_message_queue(#wpool{size = Size} = Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
%% do not always start asking for process_info to the processes that are most
%% likely to have bigger message queues
First = fast_rand_uniform(Size),
Worker = nth_worker_name(Wpool, First),
QLength = queue_length(whereis(Worker)),
min_message_queue(0, Size, First, Wpool, QLength, Worker).
min_message_queue(_, _, _, _, 0, Worker) ->
Worker;
min_message_queue(Size, Size, _, _, _QLength, Worker) ->
Worker;
min_message_queue(Step, Size, ToCheck, Wpool, CurrentQLength, CurrentWorker) ->
Worker = nth_worker_name(Wpool, ToCheck),
QLength = queue_length(whereis(Worker)),
Next = next_to_check(ToCheck, Size),
case QLength < CurrentQLength of
true ->
min_message_queue(Step + 1, Size, Next, Wpool, QLength, Worker);
false ->
min_message_queue(Step + 1, Size, Next, Wpool, CurrentQLength, CurrentWorker)
end.
next_to_check(Next, Size) ->
Next rem Size + 1.
fast_rand_uniform(Range) ->
UI = erlang:unique_integer(),
1 + erlang:phash2(UI, Range).
queue_length(undefined) ->
infinity;
queue_length(Pid) when is_pid(Pid) ->
case erlang:process_info(Pid, message_queue_len) of
{message_queue_len, L} ->
L;
undefined ->
infinity
end.
-spec all_workers(wpool:name()) -> [atom()].
all_workers(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
#wpool{workers = Workers} ->
tuple_to_list(Workers)
end.
%% ===================================================================
%% Storage functions
%% ===================================================================
store_wpool(Name, Size, Options) ->
Atomic = atomics:new(1, [{signed, false}]),
atomics:put(Atomic, 1, 1),
WorkerNames = list_to_tuple([worker_name(Name, I) || I <- lists:seq(1, Size)]),
Wpool =
#wpool{
name = Name,
size = Size,
next = Atomic,
workers = WorkerNames,
opts = Options,
qmanager = queue_manager_name(Name)
},
persistent_term:put({?MODULE, Name}, Wpool),
Wpool.
%% @doc Use this function to get the Worker pool record in a custom worker.
-spec find_wpool(atom()) -> undefined | wpool().
find_wpool(Name) ->
try {erlang:whereis(Name), persistent_term:get({?MODULE, Name})} of
{undefined, _} ->
undefined;
{_, Wpool} ->
Wpool
catch
_:badarg ->
build_wpool(Name)
end.
%% @doc We use this function not to report an error if for some reason we've
%% lost the record on the persistent_term table. This SHOULDN'T be called too much.
build_wpool(Name) ->
logger:warning(
#{
what => "Building a #wpool record. Something must have failed.",
pool => Name
},
?LOCATION
),
try supervisor:count_children(process_sup_name(Name)) of
Children ->
Size = proplists:get_value(active, Children, 0),
store_wpool(Name, Size, #{})
catch
_:Error ->
logger:warning(
#{
what => "Wpool not found",
pool => Name,
reason => Error
},
?LOCATION
),
undefined
end.
maybe_queue_manager(#{enable_queues := false}, _) ->
[{queue_manager, undefined}];
maybe_queue_manager(_, Item) ->
[{queue_manager, Item}].
maybe_event_manager(#{enable_callbacks := true}, Item) ->
[{event_manager, Item}];
maybe_event_manager(_, _) ->
[{event_manager, undefined}].
maybe_queue_manager_child(#{enable_queues := false}, _) ->
[];
maybe_queue_manager_child(_, Item) ->
[Item].
maybe_event_manager_child(#{enable_callbacks := true}, Item) ->
[Item];
maybe_event_manager_child(_, _) ->
[].