use Croma
defmodule PoolSup do
@moduledoc File.read!(Path.join([__DIR__, "..", "README.md"])) |> String.replace_leading("# ", "") # 1st line shouldn't be a header line in exdoc
alias GenServer, as: GS
use GS
alias PoolSup.{PidSet, PidRefSet, ClientQueue, Callback}
alias PoolSup.CustomSupHelper, as: H
require H
@type pool :: pid | GS.name
@type option :: {:name, GS.name} | {:checkout_max_duration, pos_integer}
@typep sup_state :: H.sup_state
require Record
Record.defrecordp :state, [
:sup_state,
:reserved,
:ondemand,
:all,
:working,
:available,
:waiting,
:checkout_max_duration,
:current_term,
]
@typep state :: record(:state,
sup_state: sup_state,
reserved: non_neg_integer,
ondemand: non_neg_integer,
all: PidSet.t,
working: PidRefSet.t,
available: [pid],
waiting: ClientQueue.t,
checkout_max_duration: nil | pos_integer,
current_term: non_neg_integer)
#
# client API
#
@doc """
Returns a child specification to be used when it's not fully specified by the parent supervisor.
"""
defun child_spec(args :: list) :: Supervisor.child_spec do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, args},
shutdown: :infinity,
type: :supervisor,
}
end
@doc """
Starts a `PoolSup` process linked to the calling process.
## Arguments
- `worker_module` is the callback module of `PoolSup.Worker`.
- `worker_init_arg` is the value passed to `worker_module.start_link/1` callback function.
- `reserved` is the number of workers this `PoolSup` process holds.
- `ondemand` is the maximum number of workers that are spawned on checkouts when all reserved processes are in use.
- `options` is a keyword list of the following options:
- `:name`: used for name registration for the pool process.
- `:checkout_max_duration`: a threshold (in seconds) of worker's checkout duration (see below). Defaults to `nil` (workers won't be killed).
## Terminating non-returning workers
Sometimes it's difficult to guarantee that a checked-out worker pid will eventually be checked-in.
For example there are cases where a caller process of `checkout/2` is killed during waiting for a reply (a worker pid)
from a pool process, resulting in a process leak of the worker pid.
For this purpose `PoolSup` provides `:checkout_max_duration` option as a safeguard against process leaks.
If a checked-out worker has not been checked-in for longer than `:checkout_max_duration` seconds,
the pool regards the worker process as leaked and kill it.
If `:checkout_max_duration` is `nil` this cleanup functionality is disabled.
You can dynamically change `:checkout_max_duration` option of a pool by `change_checkout_max_duration/2`.
"""
defun start_link(worker_module :: g[module],
worker_init_arg :: term,
reserved :: g[non_neg_integer],
ondemand :: g[non_neg_integer],
options :: g[[option]] \\ []) :: GS.on_start do
GS.start_link(__MODULE__, {worker_module, worker_init_arg, reserved, ondemand, options}, H.gen_server_opts(options))
end
@doc """
Checks out a worker pid that is currently not used.
If no available worker process exists, the caller is blocked until either
- any process becomes available, or
- timeout is reached.
Note that when a pid is checked-out it must eventually be checked-in or die,
in order to correctly keep track of working processes and avoid process leaks.
For this purpose it's advisable to either
- link the checked-out process and the process who is going to check-in that process, or
- implement your worker to check-in itself at the end of each job.
"""
defun checkout(pool :: pool, timeout :: timeout \\ 5000) :: pid do
cancel_ref = make_ref()
try do
GenServer.call(pool, {:checkout, cancel_ref}, timeout)
catch
:exit, {:timeout, _} = reason ->
GenServer.cast(pool, {:cancel_waiting, self(), cancel_ref})
:erlang.raise(:exit, reason, __STACKTRACE__)
end
end
@doc """
Checks out a worker pid in a nonblocking manner, i.e. if no available worker found this returns `nil`.
"""
defun checkout_nonblocking(pool :: pool, timeout :: timeout \\ 5000) :: nil | pid do
GenServer.call(pool, :checkout_nonblocking, timeout)
end
@doc """
Checks in an in-use worker pid and make it available to others.
"""
defun checkin(pool :: pool, pid :: g[pid]) :: :ok do
# Don't checkin dead pid (especially in case of `transaction/3`, where execution of `f` may kill the worker).
if Process.alive?(pid), do: GenServer.cast(pool, {:checkin, pid})
:ok
end
@doc """
Checks out a worker pid, creates a link to the worker, executes the given function using the pid, and finally checks-in and unlink.
The `timeout` parameter is used only in the checkout step; time elapsed during other steps are not counted.
"""
defun transaction(pool :: pool, f :: (pid -> a), timeout :: timeout \\ 5000) :: a when a: term do
pid = checkout(pool, timeout)
try do
Process.link(pid)
f.(pid)
after
checkin(pool, pid)
Process.unlink(pid)
end
end
@doc """
Query current status of a pool.
"""
defun status(pool :: pool) :: %{reserved: nni, ondemand: nni, children: nni, available: nni, working: nni, checkout_max_duration: nil | pos_integer} when nni: non_neg_integer do
GenServer.call(pool, :status)
end
@doc """
Changes capacity (number of worker processes) of a pool.
`new_reserved` and/or `new_ondemand` parameters can be `nil`; in that case the original value is kept unchanged
(i.e. `PoolSup.change_capacity(pool, 10, nil)` replaces only `reserved` value of `pool`).
On receipt of `change_capacity` message, the pool adjusts number of children according to the new configuration as follows:
- If current number of workers are less than `reserved`, the pool spawns new workers to ensure `reserved` workers are available.
Note that, as is the same throughout the OTP framework, spawning processes under a supervisor is synchronous operation.
Therefore increasing `reserved` too many at once may make the pool unresponsive for a while.
- When increasing maximum capacity (`reserved + ondemand`) and if any client process is being checking-out in a blocking manner,
then the newly-spawned process is returned to the client.
- When decreasing capacity, the pool tries to shutdown extra workers that are not in use.
Processes currently in use are never interrupted.
If number of in-use workers is more than the desired capacity, terminating further is delayed until any worker process is checked in.
"""
defun change_capacity(pool :: pool, new_reserved :: nil | non_neg_integer, new_ondemand :: nil | non_neg_integer) :: :ok do
(_pool, nil, nil) -> :ok
(pool , r , o ) when H.is_nil_or_nni(r) and H.is_nil_or_nni(o) ->
GenServer.cast(pool, {:change_capacity, r, o})
end
@doc """
Changes `:checkout_max_duration` option of the pool.
See `start_link/5` for detailed explanation of `:checkout_max_duration` option.
"""
defun change_checkout_max_duration(pool :: pool, new_duration :: nil | pos_integer) :: :ok do
case new_duration do
nil -> :ok
d when is_integer(d) and d > 0 -> :ok
end
GenServer.cast(pool, {:change_checkout_max_duration, new_duration})
end
#
# gen_server callbacks
#
def init({mod, init_arg, reserved, ondemand, opts}) do
{:ok, sup_state} = :supervisor.init(supervisor_init_arg(mod, init_arg, opts))
duration =
case opts[:checkout_max_duration] do
nil -> nil
d when is_integer(d) and d > 0 -> start_term_increment_timer(d); d
end
s =
state(sup_state: sup_state, reserved: reserved, ondemand: ondemand,
all: PidSet.new(), working: PidRefSet.new(), available: [], waiting: ClientQueue.new(),
checkout_max_duration: duration, current_term: 0)
|> restock_children_upto_reserved()
{:ok, s}
end
defp supervisor_init_arg(mod, init_arg, opts) do
worker_spec = %{id: mod, start: {mod, :start_link, [init_arg]}, restart: :temporary}
spec = H.make_sup_spec([worker_spec], [max_restarts: 0, max_seconds: 1])
{H.make_sup_name(opts[:name]), Callback, [spec]}
end
def handle_call(:checkout_nonblocking, {_, ref},
state(reserved: reserved, ondemand: ondemand, all: all, available: available) = s) do
# In nonblocking checkout we use `ref` in `from` tuple (as a micro-optimization to avoid creating new one),
# since in this case no cancellation would ever happen later and thus any reference would be OK.
cancel_ref =
case ref do
[:alias | r] -> r # Erlang 24
r -> r
end
case available do
[pid | pids] -> reply_with_available_worker(pid, pids, cancel_ref, s)
[] ->
if map_size(all) < reserved + ondemand do
reply_with_ondemand_worker(s, cancel_ref)
else
{:reply, nil, s}
end
end
end
def handle_call({:checkout, cancel_ref}, from,
state(reserved: reserved, ondemand: ondemand, all: all, available: available) = s) do
case available do
[pid | pids] -> reply_with_available_worker(pid, pids, cancel_ref, s)
[] ->
if map_size(all) < reserved + ondemand do
reply_with_ondemand_worker(s, cancel_ref)
else
{:noreply, enqueue_client(s, from, cancel_ref)}
end
end
end
def handle_call(:status, _from,
state(reserved: reserved, ondemand: ondemand, all: all, available: available, working: working, checkout_max_duration: dur) = s) do
r = %{
reserved: reserved,
ondemand: ondemand,
children: map_size(all),
available: length(available),
working: PidRefSet.size(working),
checkout_max_duration: dur,
}
{:reply, r, s}
end
H.handle_call_default_clauses
defunp reply_with_available_worker(pid :: pid, pids :: [pid], cancel_ref :: reference, state(working: working, current_term: term) = s :: state) :: {:reply, pid, state} do
{:reply, pid, state(s, working: PidRefSet.put(working, pid, cancel_ref, term), available: pids)}
end
defunp reply_with_ondemand_worker(state(sup_state: sup_state, all: all, working: working, current_term: term) = s :: state, cancel_ref :: reference) :: {:reply, pid, state} do
{new_child_pid, new_sup_state} = H.start_child(sup_state)
s2 = state(s, sup_state: new_sup_state, all: PidSet.put(all, new_child_pid), working: PidRefSet.put(working, new_child_pid, cancel_ref, term))
{:reply, new_child_pid, s2}
end
def handle_cast({:checkin, pid}, state(working: working) = s) do
s2 =
if PidRefSet.member_pid?(working, pid) do
handle_worker_checkin(s, pid)
else
s
end
{:noreply, s2}
end
def handle_cast({:cancel_waiting, pid, cancel_ref}, state(working: working) = s) do
s2 =
case PidRefSet.get_pid_by_ref(working, cancel_ref) do
nil -> cancel_client(s, pid, cancel_ref)
worker_pid ->
# the client timed-out but `GenServer.reply/2` has already been called; need to reclaim the `worker_pid`
handle_worker_checkin(s, worker_pid)
end
{:noreply, s2}
end
def handle_cast({:change_capacity, new_reserved, new_ondemand}, s) do
s2 =
case {new_reserved, new_ondemand} do
{nil, o } -> state(s, ondemand: o)
{r , nil} -> state(s, reserved: r )
{r , o } -> state(s, reserved: r, ondemand: o)
end
{:noreply, handle_capacity_change(s2)}
end
def handle_cast({:change_checkout_max_duration, dur2}, state(checkout_max_duration: dur1) = s) do
s2 = state(s, checkout_max_duration: dur2)
if dur1 == nil and dur2 != nil do
start_term_increment_timer(dur2)
end
{:noreply, s2}
end
defunp handle_worker_checkin(state(reserved: reserved,
ondemand: ondemand,
all: all,
working: working,
available: available) = s :: state,
pid :: pid) :: state do
size_all = map_size(all)
cond do
size_all > reserved + ondemand ->
terminate_checked_in_child(s, pid)
size_all > reserved ->
case dequeue_client(s) do
nil -> terminate_checked_in_child(s, pid)
{from, cancel_ref, s2} -> send_reply_with_checked_in_child(s2, from, cancel_ref, pid)
end
:otherwise ->
case dequeue_client(s) do
nil -> state(s, working: PidRefSet.delete_by_pid(working, pid), available: [pid | available])
{from, cancel_ref, s2} -> send_reply_with_checked_in_child(s2, from, cancel_ref, pid)
end
end
end
defunp terminate_checked_in_child(state(sup_state: sup_state, all: all, working: working) = s :: state, pid :: pid) :: state do
state(s, sup_state: H.terminate_child(pid, sup_state), all: PidSet.delete(all, pid), working: PidRefSet.delete_by_pid(working, pid))
end
defunp send_reply_with_checked_in_child(state(working: working, current_term: term) = s :: state,
from :: GenServer.from,
cancel_ref :: reference,
pid :: pid) :: state do
GenServer.reply(from, pid)
state(s, working: PidRefSet.put(working, pid, cancel_ref, term))
end
defunp handle_capacity_change(state(available: available) = s :: state) :: state do
case available do
[] -> send_reply_to_waiting_clients_by_spawn(s)
_ -> terminate_extra_children(s) # As `available` worker exists, no client is currently waiting
end
|> restock_children_upto_reserved()
end
defunp send_reply_to_waiting_clients_by_spawn(state(reserved: reserved,
ondemand: ondemand,
all: all) = s :: state) :: state do
if map_size(all) < reserved + ondemand do
case dequeue_client(s) do
nil -> s
{from, cancel_ref, s2} -> send_reply_with_new_child(s2, from, cancel_ref) |> send_reply_to_waiting_clients_by_spawn()
end
else
s
end
end
defunp terminate_extra_children(state(sup_state: sup_state, reserved: reserved, all: all, available: available) = s :: state) :: state do
case available do
[] -> s
[pid | pids] ->
if map_size(all) > reserved do
state(s, sup_state: H.terminate_child(pid, sup_state), all: PidSet.delete(all, pid), available: pids) |> terminate_extra_children()
else
s
end
end
end
defunp restock_children_upto_reserved(state(reserved: reserved, all: all) = s :: state) :: state do
if map_size(all) < reserved do
restock_child(s) |> restock_children_upto_reserved()
else
s
end
end
def handle_info(msg, s) do
s2 =
case msg do
:increment_term -> handle_increment_term(s)
{:DOWN, _mref, :process, pid, _reason} -> cancel_client(s, pid, nil)
{:EXIT, pid, _reason} -> H.delegate_info_message_to_supervisor_callback(msg, s) |> handle_exit(pid)
_ -> H.delegate_info_message_to_supervisor_callback(msg, s)
end
{:noreply, s2}
end
defunp handle_exit(state(all: all) = s :: state, pid :: pid) :: state do
if PidSet.member?(all, pid) do
handle_child_exited(s, pid)
else
s
end
end
defunp handle_child_exited(state(reserved: reserved,
ondemand: ondemand,
all: all,
working: working,
available: available) = s :: state,
child_pid :: pid) :: state do
{working2, available2} =
case PidRefSet.member_pid?(working, child_pid) do
true -> {PidRefSet.delete_by_pid(working, child_pid), available}
false -> {working, List.delete(available, child_pid)}
end
all2 = PidSet.delete(all, child_pid)
s2 = state(s, all: all2, working: working2, available: available2)
size_all = map_size(all2)
cond do
size_all >= reserved + ondemand ->
s2
size_all >= reserved ->
case dequeue_client(s2) do
nil -> s2
{from, cancel_ref, s3} -> send_reply_with_new_child(s3, from, cancel_ref)
end
:otherwise ->
case dequeue_client(s2) do
nil -> restock_child(s2)
{from, cancel_ref, s3} -> send_reply_with_new_child(s3, from, cancel_ref)
end
end
end
defunp send_reply_with_new_child(state(sup_state: sup_state, all: all, working: working, current_term: term) = s :: state,
from :: GenServer.from,
cancel_ref :: reference) :: state do
{pid, new_sup_state} = H.start_child(sup_state)
GenServer.reply(from, pid)
state(s, sup_state: new_sup_state, all: PidSet.put(all, pid), working: PidRefSet.put(working, pid, cancel_ref, term))
end
defunp restock_child(state(sup_state: sup_state, all: all, available: available) = s :: state) :: state do
{pid, new_sup_state} = H.start_child(sup_state)
state(s, sup_state: new_sup_state, all: PidSet.put(all, pid), available: [pid | available])
end
defunp enqueue_client(state(waiting: waiting) = s :: state, from :: GenServer.from, cancel_ref :: reference) :: state do
state(s, waiting: ClientQueue.enqueue_and_monitor(waiting, from, cancel_ref))
end
defunp dequeue_client(state(waiting: waiting1) = s :: state) :: nil | {GenServer.from, reference, state} do
case ClientQueue.dequeue_and_demonitor(waiting1) do
nil -> nil
{from, cancel_ref, waiting2} -> {from, cancel_ref, state(s, waiting: waiting2)}
end
end
defunp cancel_client(state(waiting: waiting) = s :: state, pid :: pid, cancel_ref_or_nil :: nil | reference) :: state do
state(s, waiting: ClientQueue.cancel(waiting, pid, cancel_ref_or_nil))
end
defunp handle_increment_term(state(working: working, checkout_max_duration: dur, current_term: term) = s :: state) :: state do
case dur do
nil -> :ok # if `dur == nil`, periodic cleanup of too-long-running workers has been disabled
_ ->
PidRefSet.kill_workers_checked_out_too_long(working, term)
start_term_increment_timer(dur)
end
state(s, current_term: term + 1)
end
defunp start_term_increment_timer(duration_seconds :: pos_integer) :: :ok do
:erlang.send_after(duration_seconds * 1000, self(), :increment_term)
:ok
end
H.code_change_default_clause
defdelegate terminate(reason, state), to: H
defdelegate format_status(opt, list), to: H
end