defmodule Poolex do
@moduledoc """
## Usage
In the most typical use of Poolex, you only need to start pool of workers as a child of your application.
```elixir
children = [
{Poolex,
pool_id: :worker_pool,
worker_module: SomeWorker,
workers_count: 5}
]
Supervisor.start_link(children, strategy: :one_for_one)
```
Then you can execute any code on the workers with `run/3`:
```elixir
Poolex.run(:worker_pool, &(is_pid?(&1)), checkout_timeout: 1_000)
{:ok, true}
```
For more information see [Getting Started](https://hexdocs.pm/poolex/getting-started.html)
"""
use GenServer, shutdown: :infinity
alias Poolex.Private.BusyWorkers
alias Poolex.Private.DebugInfo
alias Poolex.Private.IdleOverflowedWorkers
alias Poolex.Private.IdleWorkers
alias Poolex.Private.Metrics
alias Poolex.Private.Monitoring
alias Poolex.Private.Options.Parser, as: OptionsParser
alias Poolex.Private.State
alias Poolex.Private.WaitingCallers
require Logger
@default_checkout_timeout to_timeout(second: 5)
@poolex_options_table """
| Option | Description | Example | Default value |
|----------------------------------|--------------------------------------------------------------------|---------------------------------|-----------------------------------|
| `busy_workers_impl` | Module that describes how to work with busy workers | `SomeBusyWorkersImpl` | `Poolex.Workers.Impl.List` |
| `failed_workers_retry_interval` | Interval in milliseconds between retry attempts for failed workers | `5_000` | `1_000` |
| `idle_workers_impl` | Module that describes how to work with idle workers | `SomeIdleWorkersImpl` | `Poolex.Workers.Impl.List` |
| `idle_overflowed_workers_impl` | Module that describes how to work with idle overflowed workers | `SomeIdleOverflowedWorkersImpl` | `Poolex.Workers.Impl.List` |
| `max_overflow` | How many workers can be created over the limit | `2` | `0` |
| `worker_shutdown_delay` | Delay (ms) before shutting down overflow worker after release | `5000` | `0` |
| `pool_id` | Identifier by which you will access the pool | `:my_pool` | `worker_module` value |
| `pool_size_metrics` | Whether to dispatch pool size metrics | `true` | `false` |
| `waiting_callers_impl` | Module that describes how to work with callers queue | `WaitingCallersImpl` | `Poolex.Callers.Impl.ErlangQueue` |
| `worker_args` | List of arguments passed to the start function | `[:gg, "wp"]` | `[]` |
| `worker_module` | Name of module that implements our worker | `MyApp.Worker` | **option is required** |
| `worker_start_fun` | Name of the function that starts the worker | `:run` | `:start_link` |
| `workers_count` | How many workers should be running in the pool | `5` | **option is required** |
"""
@typedoc """
Any valid GenServer's name. It may be an atom like `:some_pool` or a tuple {:via, Registry, {MyApp.Registry, "pool"}
if you want to use Registry.
"""
@type pool_id() :: GenServer.name() | pid()
@typedoc """
#{@poolex_options_table}
"""
@type poolex_option() ::
{:busy_workers_impl, module()}
| {:failed_workers_retry_interval, timeout()}
| {:idle_overflowed_workers_impl, module()}
| {:idle_workers_impl, module()}
| {:max_overflow, non_neg_integer()}
| {:pool_id, pool_id()}
| {:pool_size_metrics, boolean()}
| {:waiting_callers_impl, module()}
| {:worker_args, list(any())}
| {:worker_module, module()}
| {:worker_shutdown_delay, timeout()}
| {:worker_start_fun, atom()}
| {:workers_count, non_neg_integer()}
@typedoc """
Process id of `worker`.
**Workers** are processes launched in a pool.
"""
@type worker() :: pid()
@typedoc """
| Option | Description | Example | Default value |
|------------------|----------------------------------------------------|----------|--------------------------------|
| checkout_timeout | How long we can wait for a worker on the call site | `60_000` | `#{@default_checkout_timeout}` |
"""
@type run_option() :: {:checkout_timeout, timeout()}
@spawn_opts [priority: :high]
@doc """
Starts a Poolex process without links (outside of a supervision tree).
See start_link/1 for more information.
## Examples
iex> Poolex.start(pool_id: :my_pool, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
iex> %Poolex.Private.State{worker_module: worker_module} = :sys.get_state(:my_pool)
iex> worker_module
Agent
"""
@spec start(list(poolex_option())) :: GenServer.on_start()
def start(opts) do
GenServer.start(__MODULE__, opts, name: OptionsParser.parse_pool_id(opts), spawn_opt: @spawn_opts)
end
@doc """
Starts a Poolex process linked to the current process.
This is often used to start the Poolex as part of a supervision tree.
After the process is started, you can access it using the previously specified `pool_id`.
## Options
#{@poolex_options_table}
## Examples
iex> Poolex.start_link(pool_id: :other_pool, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
iex> %Poolex.Private.State{worker_module: worker_module} = :sys.get_state(:other_pool)
iex> worker_module
Agent
"""
@spec start_link(list(poolex_option())) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: OptionsParser.parse_pool_id(opts), spawn_opt: @spawn_opts)
end
@doc """
Returns a specification to start this module under a supervisor.
## Options
#{@poolex_options_table}
## Examples
children = [
Poolex.child_spec(worker_module: SomeWorker, workers_count: 5),
# or in another way
{Poolex, worker_module: SomeOtherWorker, workers_count: 5}
]
Supervisor.start_link(children, strategy: :one_for_one)
"""
@spec child_spec(list(poolex_option())) :: Supervisor.child_spec()
def child_spec(opts) do
%{id: OptionsParser.parse_pool_id(opts), start: {Poolex, :start_link, [opts]}}
end
@doc """
The main function for working with the pool.
It takes a pool identifier, a function that takes a worker process id as an argument and returns any value.
When executed, an attempt is made to find a free worker with specified timeout (5 seconds by default).
You can set the timeout using the `checkout_timeout` option.
Returns:
* `{:ok, result}` if the worker was found and the function was executed successfully.
* `{:error, :checkout_timeout}` if no free worker was found before the timeout.
## Examples
iex> Poolex.start_link(pool_id: :some_pool, worker_module: Agent, worker_args: [fn -> 5 end], workers_count: 1)
iex> Poolex.run(:some_pool, fn pid -> Agent.get(pid, &(&1)) end)
{:ok, 5}
"""
@spec run(pool_id(), (worker :: pid() -> any()), list(run_option())) ::
{:ok, any()} | {:error, :checkout_timeout}
def run(pool_id, fun, options \\ []) do
checkout_timeout = Keyword.get(options, :checkout_timeout, @default_checkout_timeout)
case get_idle_worker(pool_id, checkout_timeout) do
{:ok, worker_pid} ->
monitor_process = monitor_caller(pool_id, self(), worker_pid)
try do
{:ok, fun.(worker_pid)}
after
Process.exit(monitor_process, :kill)
GenServer.cast(pool_id, {:release_busy_worker, worker_pid})
end
{:error, :checkout_timeout} ->
{:error, :checkout_timeout}
end
end
@spec get_idle_worker(pool_id(), timeout()) :: {:ok, worker()} | {:error, :checkout_timeout}
defp get_idle_worker(pool_id, checkout_timeout) do
caller_reference = make_ref()
try do
GenServer.call(pool_id, {:get_idle_worker, caller_reference}, checkout_timeout)
catch
:exit, {:timeout, {GenServer, :call, [_pool_id, {:get_idle_worker, ^caller_reference}, _timeout]}} ->
{:error, :checkout_timeout}
after
GenServer.cast(pool_id, {:cancel_waiting, caller_reference})
end
end
@doc """
Adds some idle workers to existing pool.
"""
@spec add_idle_workers!(pool_id(), pos_integer()) :: :ok | no_return()
def add_idle_workers!(_pool_id, workers_count) when workers_count < 1 do
message = "workers_count must be positive number, received: #{inspect(workers_count)}"
raise ArgumentError, message
end
def add_idle_workers!(pool_id, workers_count) when is_integer(workers_count) do
GenServer.call(pool_id, {:add_idle_workers, workers_count})
end
@doc """
Removes some idle workers from existing pool.
If the number of workers to remove is greater than the number of idle workers, all idle workers will be removed.
"""
@spec remove_idle_workers!(pool_id(), pos_integer()) :: :ok | no_return()
def remove_idle_workers!(_pool_id, workers_count) when workers_count < 1 do
message = "workers_count must be positive number, received: #{inspect(workers_count)}"
raise ArgumentError, message
end
def remove_idle_workers!(pool_id, workers_count) when is_integer(workers_count) do
GenServer.call(pool_id, {:remove_idle_workers, workers_count})
end
@impl GenServer
def init(opts) do
Process.flag(:trap_exit, true)
parsed_options = OptionsParser.parse(opts)
{:ok, supervisor} = Poolex.Private.Supervisor.start_link()
state =
%State{
failed_workers_retry_interval: parsed_options.failed_workers_retry_interval,
max_overflow: parsed_options.max_overflow,
pool_id: parsed_options.pool_id,
supervisor: supervisor,
worker_args: parsed_options.worker_args,
worker_module: parsed_options.worker_module,
worker_start_fun: parsed_options.worker_start_fun,
worker_shutdown_delay: parsed_options.worker_shutdown_delay
}
{initial_workers_pids, state} = start_workers(parsed_options.workers_count, state)
state =
state
|> IdleWorkers.init(parsed_options.idle_workers_impl, initial_workers_pids)
|> BusyWorkers.init(parsed_options.busy_workers_impl)
|> IdleOverflowedWorkers.init(parsed_options.idle_overflowed_workers_impl)
|> WaitingCallers.init(parsed_options.waiting_callers_impl)
{:ok, state, {:continue, opts}}
end
@impl GenServer
def handle_continue(opts, state) do
Metrics.start_poller(opts)
schedule_retry_failed_workers(state)
{:noreply, state}
end
@spec start_workers(non_neg_integer(), State.t()) :: {[pid], State.t()}
defp start_workers(0, state) do
{[], state}
end
defp start_workers(workers_count, state) when is_integer(workers_count) and workers_count >= 1 do
Enum.reduce(1..workers_count, {[], state}, fn _iterator, {workers_pids, state} ->
case start_worker(state) do
{:ok, worker_pid} ->
state = Monitoring.add(state, worker_pid, :worker)
{[worker_pid | workers_pids], state}
{:error, :failed_to_start_worker} ->
state = %{state | failed_to_start_workers_count: state.failed_to_start_workers_count + 1}
{workers_pids, state}
end
end)
end
defp start_workers(workers_count, _state) do
msg = "workers_count must be non negative integer, received: #{inspect(workers_count)}"
raise ArgumentError, msg
end
@spec start_worker(State.t()) :: {:ok, pid()} | {:error, :failed_to_start_worker}
defp start_worker(%State{} = state) do
case DynamicSupervisor.start_child(state.supervisor, %{
id: make_ref(),
start: {state.worker_module, state.worker_start_fun, state.worker_args},
restart: :temporary
}) do
{:ok, pid} ->
{:ok, pid}
{:error, reason} ->
Logger.error("[Poolex] Failed to start worker. Reason: #{inspect(reason)}")
{:error, :failed_to_start_worker}
end
end
@spec stop_worker(Supervisor.supervisor(), pid()) :: :ok | {:error, :not_found}
defp stop_worker(supervisor, worker_pid) do
DynamicSupervisor.terminate_child(supervisor, worker_pid)
end
@impl GenServer
def handle_call({:get_idle_worker, caller_reference}, {from_pid, _} = caller, %State{} = state) do
cond do
not IdleOverflowedWorkers.empty?(state) ->
# If there are overflowed idle workers, we can immediately provide one to the caller
{overflowed_worker_pid, state} = IdleOverflowedWorkers.pop(state)
state = BusyWorkers.add(state, overflowed_worker_pid)
{:reply, {:ok, overflowed_worker_pid}, state}
not IdleWorkers.empty?(state) ->
# If there are idle workers, we can immediately provide one to the caller
{idle_worker_pid, state} = IdleWorkers.pop(state)
state = BusyWorkers.add(state, idle_worker_pid)
{:reply, {:ok, idle_worker_pid}, state}
state.overflow < state.max_overflow ->
# We can create a new worker if we are not at the max overflow limit
{:ok, new_worker} = start_worker(state)
state =
state
|> Monitoring.add(new_worker, :worker)
|> BusyWorkers.add(new_worker)
{:reply, {:ok, new_worker}, %{state | overflow: state.overflow + 1}}
true ->
# We can't provide a worker immediately, so we need to add the caller to the waiting list
state =
state
|> Monitoring.add(from_pid, :waiting_caller)
|> WaitingCallers.add(%Poolex.Caller{reference: caller_reference, from: caller})
{:noreply, state}
end
end
def handle_call(:get_debug_info, _from, %State{} = state) do
debug_info = %DebugInfo{
busy_workers_count: BusyWorkers.count(state),
busy_workers_impl: state.busy_workers_impl,
busy_workers_pids: BusyWorkers.to_list(state),
failed_to_start_workers_count: state.failed_to_start_workers_count,
idle_overflowed_workers_count: IdleOverflowedWorkers.count(state),
idle_overflowed_workers_impl: state.idle_overflowed_workers_impl,
idle_overflowed_workers_pids: IdleOverflowedWorkers.to_list(state),
idle_workers_count: IdleWorkers.count(state),
idle_workers_impl: state.idle_workers_impl,
idle_workers_pids: IdleWorkers.to_list(state),
max_overflow: state.max_overflow,
overflow: state.overflow,
waiting_callers: WaitingCallers.to_list(state),
waiting_callers_impl: state.waiting_callers_impl,
worker_args: state.worker_args,
worker_module: state.worker_module,
worker_shutdown_delay: state.worker_shutdown_delay,
worker_start_fun: state.worker_start_fun
}
{:reply, debug_info, state}
end
@impl GenServer
def handle_call({:add_idle_workers, workers_count}, _from, %State{} = state) do
{workers, state} = start_workers(workers_count, state)
state =
Enum.reduce(workers, state, fn worker, acc_state ->
if WaitingCallers.empty?(acc_state) do
IdleWorkers.add(acc_state, worker)
else
acc_state
|> Monitoring.add(worker, :worker)
|> BusyWorkers.add(worker)
|> provide_worker_to_waiting_caller(worker)
end
end)
{:reply, :ok, state}
end
@impl GenServer
def handle_call({:remove_idle_workers, workers_count}, _from, %State{} = state) do
new_state =
state
|> IdleWorkers.to_list()
|> Enum.take(workers_count)
|> Enum.reduce(state, fn worker, acc_state ->
IdleWorkers.remove(acc_state, worker)
end)
{:reply, :ok, new_state}
end
@impl GenServer
def handle_cast({:release_busy_worker, worker}, %State{} = state) do
if WaitingCallers.empty?(state) do
new_state = release_busy_worker(state, worker)
{:noreply, new_state}
else
new_state = provide_worker_to_waiting_caller(state, worker)
{:noreply, new_state}
end
end
@impl GenServer
def handle_cast({:stop_worker, worker_pid}, %State{} = state) do
stop_worker(state.supervisor, worker_pid)
{:noreply, state}
end
@impl GenServer
def handle_cast({:cancel_waiting, caller_reference}, %State{} = state) do
{:noreply, WaitingCallers.remove_by_reference(state, caller_reference)}
end
@impl GenServer
def handle_info({:DOWN, monitoring_reference, _process, dead_process_pid, _reason}, %State{} = state) do
case Monitoring.remove(state, monitoring_reference) do
{:worker, state} ->
{:noreply, handle_down_worker(state, dead_process_pid)}
{:waiting_caller, state} ->
{:noreply, handle_down_waiting_caller(state, dead_process_pid)}
end
end
@impl GenServer
def handle_info(:retry_failed_workers, state) do
# Try to start workers that failed to initialize
state =
if state.failed_to_start_workers_count > 0 do
retry_failed_workers(state)
else
state
end
schedule_retry_failed_workers(state)
{:noreply, state}
end
@impl GenServer
def handle_info({:delayed_stop_worker, worker}, %State{} = state) do
if IdleOverflowedWorkers.expired?(state, worker) do
# Stop the worker if it has been idle for too long
stop_worker(state.supervisor, worker)
{:noreply, IdleOverflowedWorkers.remove(state, worker)}
else
# Otherwise, just ignore the message
{:noreply, state}
end
end
@spec release_busy_worker(State.t(), worker()) :: State.t()
defp release_busy_worker(%State{} = state, worker) do
if BusyWorkers.member?(state, worker) do
state = BusyWorkers.remove(state, worker)
if state.overflow > 0 do
release_overflowed_worker(state, worker)
else
IdleWorkers.add(state, worker)
end
else
state
end
end
defp release_overflowed_worker(%State{} = state, worker) do
if state.worker_shutdown_delay > 0 do
# We add 10 ms to the delay to ensure that message will be processed after the expiration
Process.send_after(self(), {:delayed_stop_worker, worker}, state.worker_shutdown_delay + 10)
IdleOverflowedWorkers.add(state, worker)
else
stop_worker(state.supervisor, worker)
state
end
end
@spec provide_worker_to_waiting_caller(State.t(), worker()) :: State.t()
defp provide_worker_to_waiting_caller(%State{} = state, worker) do
{caller, state} = WaitingCallers.pop(state)
GenServer.reply(caller.from, {:ok, worker})
state
end
@spec handle_down_worker(State.t(), pid()) :: State.t()
defp handle_down_worker(%State{} = state, dead_process_pid) do
state =
state
|> IdleWorkers.remove(dead_process_pid)
|> BusyWorkers.remove(dead_process_pid)
|> IdleOverflowedWorkers.remove(dead_process_pid)
if WaitingCallers.empty?(state) do
if state.overflow > 0 do
%{state | overflow: state.overflow - 1}
else
{:ok, new_worker} = start_worker(state)
state
|> Monitoring.add(new_worker, :worker)
|> IdleWorkers.add(new_worker)
end
else
{:ok, new_worker} = start_worker(state)
state
|> Monitoring.add(new_worker, :worker)
|> BusyWorkers.add(new_worker)
|> provide_worker_to_waiting_caller(new_worker)
end
end
@spec handle_down_waiting_caller(State.t(), pid()) :: State.t()
defp handle_down_waiting_caller(%State{} = state, dead_process_pid) do
WaitingCallers.remove_by_pid(state, dead_process_pid)
end
@impl GenServer
def terminate(reason, %State{} = state) do
DynamicSupervisor.stop(state.supervisor, reason)
:ok
end
# Monitor the `caller`. Release attached worker in case of caller's death.
@spec monitor_caller(pool_id(), caller :: pid(), worker :: pid()) :: monitor_process :: pid()
defp monitor_caller(pool_id, caller, worker) do
spawn(fn ->
reference = Process.monitor(caller)
receive do
{:DOWN, ^reference, :process, ^caller, _reason} ->
# Send message to stop worker if caller is dead
# After that worker will be restarted
GenServer.cast(pool_id, {:stop_worker, worker})
end
end)
end
@spec schedule_retry_failed_workers(State.t()) :: :ok
defp schedule_retry_failed_workers(state) do
Process.send_after(self(), :retry_failed_workers, state.failed_workers_retry_interval)
:ok
end
@spec retry_failed_workers(State.t()) :: State.t()
defp retry_failed_workers(%State{} = state) do
workers_to_retry = state.failed_to_start_workers_count
Logger.info("[Poolex] Attempting to restart #{workers_to_retry} failed workers")
# Reset the failed workers counter
state = %{state | failed_to_start_workers_count: 0}
# Start the specified number of workers
{workers, updated_state} = start_workers(workers_to_retry, state)
# Add successfully started workers to the pool
Enum.reduce(workers, updated_state, fn worker, acc_state ->
if WaitingCallers.empty?(acc_state) do
# If there are no waiting callers, add to idle workers list
IdleWorkers.add(acc_state, worker)
else
# If there are waiting callers, give them the worker
acc_state
|> Monitoring.add(worker, :worker)
|> BusyWorkers.add(worker)
|> provide_worker_to_waiting_caller(worker)
end
end)
end
end