% This file is licensed to you under the Apache License,
% Version 2.0 (the "License"); you may not use this file
% except in compliance with the License. You may obtain
% a copy of the License at
%
% https://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing,
% software distributed under the License is distributed on an
% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
% KIND, either express or implied. See the License for the
% specific language governing permissions and limitations
% under the License.
%%% @author Fernando Benavides <elbrujohalcon@inaka.net>
%%% @doc Worker pool main interface.
%%% Use functions provided by this module to manage your pools of workers
-module(wpool).
-behaviour(application).
%% Copied from gen.erl
-type debug_flag() :: trace | log | statistics | debug | {logfile, string()}.
-type gen_option() ::
{timeout, timeout()} | {debug, [debug_flag()]} | {spawn_opt, [proc_lib:spawn_option()]}.
-type gen_options() :: [gen_option()].
-type name() :: atom().
-type supervisor_strategy() :: {supervisor:strategy(), non_neg_integer(), pos_integer()}.
-type option() ::
{overrun_warning, infinity | pos_integer()} |
{max_overrun_warnings, infinity | pos_integer()} |
{overrun_handler, {Module :: atom(), Fun :: atom()}} |
{workers, pos_integer()} |
{worker_opt, gen_options()} |
{worker, {Module :: atom(), InitArg :: term()}} |
{strategy, supervisor_strategy()} |
{worker_type, gen_server} |
{pool_sup_intensity, non_neg_integer()} |
{pool_sup_shutdown, brutal_kill | timeout()} |
{pool_sup_period, non_neg_integer()} |
{queue_type, wpool_queue_manager:queue_type()} |
{enable_callbacks, boolean()} |
{callbacks, [module()]}.
-type custom_strategy() :: fun(([atom()]) -> Atom :: atom()).
-type strategy() ::
best_worker |
random_worker |
next_worker |
available_worker |
next_available_worker |
{hash_worker, term()} |
custom_strategy().
-type worker_stats() ::
[{messsage_queue_len, non_neg_integer()} | {memory, pos_integer()}].
-type stats() ::
[{pool, name()} |
{supervisor, pid()} |
{options, [option()]} |
{size, non_neg_integer()} |
{next_worker, pos_integer()} |
{total_message_queue_len, non_neg_integer()} |
{workers, [{pos_integer(), worker_stats()}]}].
-export_type([name/0, option/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).
-export([start/0, start/2, stop/0, stop/1]).
-export([start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]).
-export([stop_pool/1, stop_sup_pool/1]).
-export([call/2, cast/2, call/3, cast/3, call/4, broadcast/2]).
-export([send_request/2, send_request/3, send_request/4]).
-export([stats/0, stats/1, get_workers/1]).
-export([default_strategy/0]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% ADMIN API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @doc Starts the application
-spec start() -> ok | {error, {already_started, ?MODULE}}.
start() ->
application:start(worker_pool).
%% @doc Stops the application
-spec stop() -> ok.
stop() ->
application:stop(worker_pool).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% BEHAVIOUR CALLBACKS
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @private
-spec start(any(), any()) -> {ok, pid()} | {error, term()}.
start(_StartType, _StartArgs) ->
wpool_sup:start_link().
%% @private
-spec stop(any()) -> ok.
stop(_State) ->
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% PUBLIC API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @equiv start_pool(Name, [])
-spec start_pool(name()) -> {ok, pid()}.
start_pool(Name) ->
start_pool(Name, []).
%% @doc Starts (and links) a pool of N wpool_processes.
%% The result pid belongs to a supervisor (in case you want to add it to a
%% supervisor tree)
-spec start_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
start_pool(Name, Options) ->
wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)).
%% @doc Stops the pool
-spec stop_pool(name()) -> true.
stop_pool(Name) ->
case whereis(Name) of
undefined ->
true;
Pid ->
exit(Pid, normal)
end.
%% @equiv start_sup_pool(Name, [])
-spec start_sup_pool(name()) -> {ok, pid()} | {error, {already_started, pid()} | term()}.
start_sup_pool(Name) ->
start_sup_pool(Name, []).
%% @doc Starts a pool of N wpool_processes supervised by `wpool_sup'
-spec start_sup_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
start_sup_pool(Name, Options) ->
wpool_sup:start_pool(Name, wpool_utils:add_defaults(Options)).
%% @doc Stops the pool
-spec stop_sup_pool(name()) -> ok.
stop_sup_pool(Name) ->
wpool_sup:stop_pool(Name).
%% @doc Default strategy
-spec default_strategy() -> strategy().
default_strategy() ->
case application:get_env(worker_pool, default_strategy) of
undefined ->
available_worker;
{ok, Strategy} ->
Strategy
end.
%% @equiv call(Sup, Call, default_strategy())
-spec call(name(), term()) -> term().
call(Sup, Call) ->
call(Sup, Call, default_strategy()).
%% @equiv call(Sup, Call, Strategy, 5000)
-spec call(name(), term(), strategy()) -> term().
call(Sup, Call, Strategy) ->
call(Sup, Call, Strategy, 5000).
%% @doc Picks a server and issues the call to it.
%% For all strategies except available_worker, Timeout applies only to the
%% time spent on the actual call to the worker, because time spent finding
%% the worker in other strategies is negligible.
%% For available_worker the time used choosing a worker is also considered
-spec call(name(), term(), strategy(), timeout()) -> term().
call(Sup, Call, available_worker, Timeout) ->
wpool_pool:call_available_worker(Sup, Call, Timeout);
call(Sup, Call, {hash_worker, HashKey}, Timeout) ->
wpool_process:call(
wpool_pool:hash_worker(Sup, HashKey), Call, Timeout);
call(Sup, Call, Fun, Timeout) when is_function(Fun) ->
wpool_process:call(Fun(Sup), Call, Timeout);
call(Sup, Call, Strategy, Timeout) ->
wpool_process:call(
wpool_pool:Strategy(Sup), Call, Timeout).
%% @equiv cast(Sup, Cast, default_strategy())
-spec cast(name(), term()) -> ok.
cast(Sup, Cast) ->
cast(Sup, Cast, default_strategy()).
%% @doc Picks a server and issues the cast to it
-spec cast(name(), term(), strategy()) -> ok.
cast(Sup, Cast, available_worker) ->
wpool_pool:cast_to_available_worker(Sup, Cast);
cast(Sup, Cast, {hash_worker, HashKey}) ->
wpool_process:cast(
wpool_pool:hash_worker(Sup, HashKey), Cast);
cast(Sup, Cast, Fun) when is_function(Fun) ->
wpool_process:cast(Fun(Sup), Cast);
cast(Sup, Cast, Strategy) ->
wpool_process:cast(
wpool_pool:Strategy(Sup), Cast).
%% @equiv send_request(Sup, Call, default_strategy(), 5000)
-spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id().
send_request(Sup, Call) ->
send_request(Sup, Call, default_strategy()).
%% @equiv send_request(Sup, Call, Strategy, 5000)
-spec send_request(name(), term(), strategy()) ->
noproc | timeout | gen_server:request_id().
send_request(Sup, Call, Strategy) ->
send_request(Sup, Call, Strategy, 5000).
%% @doc Picks a server and issues the call to it.
%% Timeout applies only for the time used choosing a worker in the available_worker strategy
-spec send_request(name(), term(), strategy(), timeout()) ->
noproc | timeout | gen_server:request_id().
send_request(Sup, Call, available_worker, Timeout) ->
wpool_pool:send_request_available_worker(Sup, Call, Timeout);
send_request(Sup, Call, {hash_worker, HashKey}, _Timeout) ->
wpool_process:send_request(
wpool_pool:hash_worker(Sup, HashKey), Call);
send_request(Sup, Call, Fun, _Timeout) when is_function(Fun) ->
wpool_process:send_request(Fun(Sup), Call);
send_request(Sup, Call, Strategy, _Timeout) ->
wpool_process:send_request(
wpool_pool:Strategy(Sup), Call).
%% @doc Retrieves a snapshot of the pool stats
-spec stats() -> [stats()].
stats() ->
wpool_pool:stats().
%% @doc Retrieves a snapshot of a given pool stats
-spec stats(name()) -> stats().
stats(Sup) ->
wpool_pool:stats(Sup).
%% @doc Retrieves the list of worker registered names.
%% This can be useful to manually inspect the workers or do custom work on them.
-spec get_workers(name()) -> [atom()].
get_workers(Sup) ->
wpool_pool:get_workers(Sup).
%% @doc Casts a message to all the workers within the given pool.
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Sup, Cast) ->
wpool_pool:broadcast(Sup, Cast).