% This file is licensed to you under the Apache License,
% Version 2.0 (the "License"); you may not use this file
% except in compliance with the License. You may obtain
% a copy of the License at
%
% https://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing,
% software distributed under the License is distributed on an
% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
% KIND, either express or implied. See the License for the
% specific language governing permissions and limitations
% under the License.
%%% @doc Worker pool main interface.
%%%
%%% Use functions provided by this module to manage your pools of workers.
%%%
%%% <h2>Starting the application</h2>
%%% <b>Worker Pool</b> is an Erlang application that can be started using the functions in the
%%% `application' module. For convenience, `wpool:start/0' and `wpool:stop/0' are also provided.
%%%
%%% <h2>Starting a Pool</h2>
%%%
%%% To start a new worker pool, you can either
%%% <ul>
%%% <li>Use `wpool:child_spec/2' if you want to add the pool under a supervision tree
%%% initialisation;</li>
%%% <li>Use `wpool:start_pool/1' or `wpool:start_pool/2' if you want to supervise it
%%% yourself;</li>
%%% <li>Use `wpool:start_sup_pool/1' or `wpool:start_sup_pool/2' if you want the pool to live
%%% under
%%% wpool's supervision tree.</li>
%%% </ul>
%%%
%%% <h2>Stopping a Pool</h2>
%%% To stop a pool, just use `wpool:stop_pool/1' or `wpool:stop_sup_pool/1' according to how you
%%% started the pool.
%%%
%%% <h2>Using the Workers</h2>
%%%
%%% Since the workers are `gen_server's, messages can be `call'ed or `cast'ed to them. To do that
%%% you can use `wpool:call' and `wpool:cast' as you would use the equivalent functions on
%%% `gen_server'.
%%%
%%% <h3>Choosing a Strategy</h3>
%%%
%%% Beyond the regular parameters for `gen_server', wpool also provides an extra optional parameter
%%% <b>Strategy</b> The strategy used to pick up the worker to perform the task. If not provided,
%%% the result of `wpool:default_strategy/0' is used.
%%%
%%% The available strategies are defined in the `t:wpool:strategy/0' type.
%%%
%%% <h2>Watching a Pool</h2>
%%% Wpool provides a way to get live statistics about a pool. To do that, you can use
%%% `wpool:stats/1'.
-module(wpool).
%% @todo remove this line when https://github.com/AdRoll/rebar3_format/issues/356 is fixed
-format ignore.
-behaviour(application).
-type overrun_warning() :: infinity | pos_integer().
%% The number of milliseconds after which a task is considered <i>overrun</i> i.e., delayed.
%%
%% A warning is emitted using {@link overrun_handler()}.
%%
%% The task is monitored until it is finished,
%% thus more than one warning might be emitted for a single task.
%%
%% The rounds of warnings are not equally timed, an exponential backoff algorithm is used instead:
%% after each warning the overrun time is doubled (i.e. with `overrun_warning = 1000' warnings would
%% be emitted after 1000, 2000, 4000, 8000 ...).
%%
%% The default value for this setting is `infinity', i.e., no warnings are emitted.
-type max_overrun_warnings() :: infinity | pos_integer().
%% The maximum number of overrun warnings emitted before killing the worker with a delayed task.
%%
%% If this parameter is set to a value other than `infinity' the rounds of warnings become equally
%% timed (i.e. with `overrun_warning = 1000' and `max_overrun_warnings = 5' the task would be killed
%% after 5 seconds of execution).
%%
%% The default value for this setting is `infinity', i.e., delayed tasks are not killed.
%%
%% <b>NOTE</b>: As the worker is being killed it might cause worker's messages to be missing if you
%% are using a worker stategy other than `available_worker' (see worker {@link strategy()} below).
-type overrun_handler() :: {Module :: module(), Fun :: atom()}.
%% The module and function to call when a task is <i>overrun</i>
%%
%% The default value for this setting is `{error_logger, warning_report}'. The function must be of
%% arity 1, and it will be called as`Module:Fun(Args)' where `Args' is a proplist with the following
%% reported values:
%% <ul>
%% <li>`{alert, AlertType}': Where `AlertType' is `overrun' on regular warnings, or
%% `max_overrun_limit' when the worker is about to be killed.</li>
%% <li>`{pool, Pool}': The pool name.</li>
%% <li>`{worker, Pid}': Pid of the worker.</li>
%% <li>`{task, Task}': A description of the task.</li>
%% <li>`{runtime, Runtime}': The runtime of the current round.</li>
%% </ul>
-type workers() :: pos_integer().
%% The number of workers in the pool.
%%
%% The default value for this setting is `100'
-type worker() :: {Module :: module(), InitArg :: term()}.
%% The `gen_server' module and the arguments to pass to the `init' callback.
%%
%% This is the module that each worker will run and the `InitArgs' to use on the corresponding
%% `start_link' call used to initiate it.
%%
%% The default value for this setting is `{wpool_worker, undefined}'. That means that if you don't
%% provide a worker implementation, the pool will be generated with this default one.
%% See {@link wpool_worker} for details.
-type worker_opt() :: gen_server:start_opt().
%% Server options that will be passed to each `gen_server' worker.
%%
%% These are the same as described at the `gen_server' documentation.
-type worker_shutdown() :: worker_shutdown().
%% The `shutdown' option to be used over the individual workers.
%%
%% Defaults to `5000'. See {@link wpool_process_sup} for more details.
-type supervisor_strategy() :: supervisor:sup_flags().
%% Supervision strategy to use over the individual workers.
%%
%% Defaults to `{one_for_one, 5, 60}'. See {@link wpool_process_sup} for more details.
-type pool_sup_shutdown() :: brutal_kill | timeout().
%% The `shutdown' option to be used over the supervisor that supervises the workers.
%%
%% Defaults to `brutal_kill'. See {@link wpool_process_sup} for more details.
-type pool_sup_period() :: non_neg_integer().
%% The supervision period to use over the supervisor that supervises the workers.
%%
%% Defaults to `60'. See {@link wpool_pool} for more details.
-type pool_sup_intensity() :: non_neg_integer().
%% The supervision intensity to use over the supervisor that supervises the workers.
%%
%% Defaults to `5'. See {@link wpool_pool} for more details.
-type queue_type() :: wpool_queue_manager:queue_type().
%% Order in which requests will be stored and handled by workers.
%%
%% This option can take values `lifo' or `fifo'. Defaults to `fifo'.
-type enable_callbacks() :: boolean().
%% A boolean value determining if `event_manager' should be started for callback modules.
%%
%% Defaults to `false'.
-type callbacks() :: [module()].
%% Initial list of callback modules implementing `wpool_process_callbacks' to be
%% called on certain worker events.
%%
%% This options will only work if the {@link enable_callbacks()} is set to <b>true</b>.
%% Callbacks can be added and removed later by `wpool_pool:add_callback_module/2' and
%% `wpool_pool:remove_callback_module/2'.
-type name() :: atom().
%% Name of the pool
-type option() ::
{workers, workers()} |
{worker, worker()} |
{worker_opt, [worker_opt()]} |
{strategy, supervisor_strategy()} |
{worker_shutdown, worker_shutdown()} |
{overrun_handler, overrun_handler()} |
{overrun_warning, overrun_warning()} |
{max_overrun_warnings, max_overrun_warnings()} |
{pool_sup_intensity, pool_sup_intensity()} |
{pool_sup_shutdown, pool_sup_shutdown()} |
{pool_sup_period, pool_sup_period()} |
{queue_type, queue_type()} |
{enable_callbacks, enable_callbacks()} |
{callbacks, callbacks()}.
%% Options that can be provided to a new pool.
%%
%% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks
%% that take a list of these options as a parameter.
-type custom_strategy() :: fun(([atom()]) -> Atom :: atom()).
%% A callback that gets the pool name and returns a worker's name.
-type strategy() ::
best_worker |
random_worker |
next_worker |
available_worker |
next_available_worker |
{hash_worker, term()} |
custom_strategy().
%% Strategy to use when choosing a worker.
%%
%% <h2>`best_worker'</h2>
%% Picks the worker with the shortest queue of messages. Loosely based on this
%% article: [https://lethain.com/load-balancing-across-erlang-process-groups/].
%%
%% This strategy is usually useful when your workers always perform the same task,
%% or tasks with expectedly similar runtimes.
%%
%% <h2>`random_worker'</h2>
%% Just picks a random worker. This strategy is the fastest one to select a worker.
%% It's ideal if your workers will perform many short tasks.
%%
%% <h2>`next_worker'</h2>
%% Picks the next worker in a round-robin fashion. This ensures an evenly distribution of tasks.
%%
%% <h2>`available_worker'</h2>
%% Instead of just picking one of the workers in the queue and sending the request to it, this
%% strategy queues the request and waits until a worker is available to perform it. That may render
%% the worker selection part of the process much slower (thus generating the need for an additional
%% parameter: `Worker_Timeout' that controls how many milliseconds the client is willing to spend
%% in that, regardless of the global `Timeout' for the call).
%%
%% This strategy ensures that, if a worker crashes, no messages are lost in its message queue.
%% It also ensures that, if a task takes too long, that doesn't block other tasks since, as soon as
%% other worker is free it can pick up the next task in the list.
%%
%% <h2>`next_available_worker'</h2>
%% In a way, this strategy behaves like `available_worker' in the sense that it will pick the first
%% worker that it can find which is not running any task at the moment, but the difference is that
%% it will fail if all workers are busy.
%%
%% <h2>`{hash_worker, Key}'</h2>
%% This strategy takes a `Key' and selects a worker using `erlang:phash2/2'. This ensures that tasks
%% classified under the same key will be delivered to the same worker, which is useful to classify
%% events by key and work on them sequentially on the worker, distributing different keys across
%% different workers.
%%
%% <h2>{@link custom_strategy()}</h2>
%% A callback that gets the pool name and returns a worker's name.
-type worker_stats() ::
[{messsage_queue_len, non_neg_integer()} | {memory, pos_integer()}].
%% Statistics about a worker in a pool.
-type stats() ::
[{pool, name()} |
{supervisor, pid()} |
{options, [option()]} |
{size, non_neg_integer()} |
{next_worker, pos_integer()} |
{total_message_queue_len, non_neg_integer()} |
{workers, [{pos_integer(), worker_stats()}]}].
%% Statistics about a given live pool.
-export_type([name/0, option/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).
-export([start/0, start/2, stop/0, stop/1]).
-export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]).
-export([stop_pool/1, stop_sup_pool/1]).
-export([call/2, cast/2, call/3, cast/3, call/4, broadcall/3, broadcast/2]).
-export([send_request/2, send_request/3, send_request/4]).
-export([stats/0, stats/1, get_workers/1]).
-export([default_strategy/0]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% ADMIN API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @doc Starts the application
-spec start() -> ok | {error, {already_started, ?MODULE}}.
start() ->
application:start(worker_pool).
%% @doc Stops the application
-spec stop() -> ok.
stop() ->
application:stop(worker_pool).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% BEHAVIOUR CALLBACKS
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @private
-spec start(any(), any()) -> {ok, pid()} | {error, term()}.
start(_StartType, _StartArgs) ->
wpool_sup:start_link().
%% @private
-spec stop(any()) -> ok.
stop(_State) ->
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% PUBLIC API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @equiv start_pool(Name, [])
-spec start_pool(name()) -> {ok, pid()}.
start_pool(Name) ->
start_pool(Name, []).
%% @doc Starts (and links) a pool of N wpool_processes.
%% The result pid belongs to a supervisor (in case you want to add it to a
%% supervisor tree)
-spec start_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
start_pool(Name, Options) ->
wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)).
%% @doc Builds a child specification to pass to a supervisor.
-spec child_spec(name(), [option()]) -> supervisor:child_spec().
child_spec(Name, Options) ->
FullOptions = wpool_utils:add_defaults(Options),
#{id => Name,
start => {wpool, start_pool, [Name, FullOptions]},
restart => permanent,
shutdown => infinity,
type => supervisor}.
%% @doc Stops a pool that doesn't belong to `wpool_sup'.
-spec stop_pool(name()) -> true.
stop_pool(Name) ->
case whereis(Name) of
undefined ->
true;
Pid ->
exit(Pid, normal)
end.
%% @equiv start_sup_pool(Name, [])
-spec start_sup_pool(name()) -> {ok, pid()} | {error, {already_started, pid()} | term()}.
start_sup_pool(Name) ->
start_sup_pool(Name, []).
%% @doc Starts a pool of N wpool_processes supervised by `wpool_sup'
-spec start_sup_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
start_sup_pool(Name, Options) ->
wpool_sup:start_pool(Name, wpool_utils:add_defaults(Options)).
%% @doc Stops a pool supervised by `wpool_sup' supervision tree.
-spec stop_sup_pool(name()) -> ok.
stop_sup_pool(Name) ->
wpool_sup:stop_pool(Name).
%% @doc Default strategy
-spec default_strategy() -> strategy().
default_strategy() ->
case application:get_env(worker_pool, default_strategy) of
undefined ->
available_worker;
{ok, Strategy} ->
Strategy
end.
%% @equiv call(Sup, Call, default_strategy())
-spec call(name(), term()) -> term().
call(Sup, Call) ->
call(Sup, Call, default_strategy()).
%% @equiv call(Sup, Call, Strategy, 5000)
-spec call(name(), term(), strategy()) -> term().
call(Sup, Call, Strategy) ->
call(Sup, Call, Strategy, 5000).
%% @doc Picks a server and issues the call to it.
%% For all strategies except available_worker, Timeout applies only to the
%% time spent on the actual call to the worker, because time spent finding
%% the worker in other strategies is negligible.
%% For available_worker the time used choosing a worker is also considered
-spec call(name(), term(), strategy(), timeout()) -> term().
call(Sup, Call, available_worker, Timeout) ->
wpool_pool:call_available_worker(Sup, Call, Timeout);
call(Sup, Call, {hash_worker, HashKey}, Timeout) ->
wpool_process:call(
wpool_pool:hash_worker(Sup, HashKey), Call, Timeout);
call(Sup, Call, Fun, Timeout) when is_function(Fun) ->
wpool_process:call(Fun(Sup), Call, Timeout);
call(Sup, Call, Strategy, Timeout) ->
wpool_process:call(
wpool_pool:Strategy(Sup), Call, Timeout).
%% @equiv cast(Sup, Cast, default_strategy())
-spec cast(name(), term()) -> ok.
cast(Sup, Cast) ->
cast(Sup, Cast, default_strategy()).
%% @doc Picks a server and issues the cast to it
-spec cast(name(), term(), strategy()) -> ok.
cast(Sup, Cast, available_worker) ->
wpool_pool:cast_to_available_worker(Sup, Cast);
cast(Sup, Cast, {hash_worker, HashKey}) ->
wpool_process:cast(
wpool_pool:hash_worker(Sup, HashKey), Cast);
cast(Sup, Cast, Fun) when is_function(Fun) ->
wpool_process:cast(Fun(Sup), Cast);
cast(Sup, Cast, Strategy) ->
wpool_process:cast(
wpool_pool:Strategy(Sup), Cast).
%% @equiv send_request(Sup, Call, default_strategy(), 5000)
-spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id().
send_request(Sup, Call) ->
send_request(Sup, Call, default_strategy()).
%% @equiv send_request(Sup, Call, Strategy, 5000)
-spec send_request(name(), term(), strategy()) ->
noproc | timeout | gen_server:request_id().
send_request(Sup, Call, Strategy) ->
send_request(Sup, Call, Strategy, 5000).
%% @doc Picks a server and issues the call to it.
%% Timeout applies only for the time used choosing a worker in the available_worker strategy
-spec send_request(name(), term(), strategy(), timeout()) ->
noproc | timeout | gen_server:request_id().
send_request(Sup, Call, available_worker, Timeout) ->
wpool_pool:send_request_available_worker(Sup, Call, Timeout);
send_request(Sup, Call, {hash_worker, HashKey}, _Timeout) ->
wpool_process:send_request(
wpool_pool:hash_worker(Sup, HashKey), Call);
send_request(Sup, Call, Fun, _Timeout) when is_function(Fun) ->
wpool_process:send_request(Fun(Sup), Call);
send_request(Sup, Call, Strategy, _Timeout) ->
wpool_process:send_request(
wpool_pool:Strategy(Sup), Call).
%% @doc Retrieves a snapshot of statistics for all pools.
%%
%% See `t:stats/0' for details on the return type.
-spec stats() -> [stats()].
stats() ->
wpool_pool:stats().
%% @doc Retrieves a snapshot of statistics for a a given pool.
%%
%% See `t:stats/0' for details on the return type.
-spec stats(name()) -> stats().
stats(Sup) ->
wpool_pool:stats(Sup).
%% @doc Retrieves the list of worker registered names.
%% This can be useful to manually inspect the workers or do custom work on them.
-spec get_workers(name()) -> [atom()].
get_workers(Sup) ->
wpool_pool:get_workers(Sup).
%% @doc Casts a message to all the workers within the given pool.
%%
%% <b>NOTE:</b> These messages don't get queued, they go straight to the worker's message queues, so
%% if you're using available_worker strategy to balance the charge and you have some tasks queued up
%% waiting for the next available worker, the broadcast will reach all the workers <b>before</b> the
%% queued up tasks.
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Sup, Cast) ->
wpool_pool:broadcast(Sup, Cast).
%% @doc Calls all the workers within the given pool async and waits for the responses synchronously.
%%
%% If one worker times out, the entire call is considered timed-out.
-spec broadcall(wpool:name(), term(), timeout()) ->
{[Replies :: term()], [Errors :: term()]}.
broadcall(Sup, Call, Timeout) ->
wpool_pool:broadcall(Sup, Call, Timeout).