lib/poolex.ex

defmodule Poolex do
  @moduledoc """
  ## Usage

  In the most typical use of Poolex, you only need to start pool of workers as a child of your application.

  ```elixir
  children = [
    {Poolex,
      pool_id: :worker_pool,
      worker_module: SomeWorker,
      workers_count: 5}
  ]

  Supervisor.start_link(children, strategy: :one_for_one)
  ```

  Then you can execute any code on the workers with `run/3`:

  ```elixir
  Poolex.run(:worker_pool, &(is_pid?(&1)), checkout_timeout: 1_000)
  {:ok, true}
  ```

  Fore more information see [Getting Started](https://hexdocs.pm/poolex/getting-started.html)
  """

  use GenServer, shutdown: :infinity

  alias Poolex.Private.BusyWorkers
  alias Poolex.Private.DebugInfo
  alias Poolex.Private.IdleWorkers
  alias Poolex.Private.Monitoring
  alias Poolex.Private.State
  alias Poolex.Private.WaitingCallers

  @default_checkout_timeout :timer.seconds(5)
  @poolex_options_table """
  | Option                 | Description                                          | Example               | Default value                     |
  |------------------------|------------------------------------------------------|-----------------------|-----------------------------------|
  | `pool_id`              | Identifier by which you will access the pool         | `:my_pool`            | **option is required**            |
  | `worker_module`        | Name of module that implements our worker            | `MyApp.Worker`        | **option is required**            |
  | `workers_count`        | How many workers should be running in the pool       | `5`                   | **option is required**            |
  | `max_overflow`         | How many workers can be created over the limit       | `2`                   | `0`                               |
  | `worker_args`          | List of arguments passed to the start function       | `[:gg, "wp"]`         | `[]`                              |
  | `worker_start_fun`     | Name of the function that starts the worker          | `:run`                | `:start_link`                     |
  | `busy_workers_impl`    | Module that describes how to work with busy workers  | `SomeBusyWorkersImpl` | `Poolex.Workers.Impl.List`        |
  | `idle_workers_impl`    | Module that describes how to work with idle workers  | `SomeIdleWorkersImpl` | `Poolex.Workers.Impl.List`        |
  | `waiting_callers_impl` | Module that describes how to work with callers queue | `WaitingCallersImpl`  | `Poolex.Callers.Impl.ErlangQueue` |
  """

  @typedoc """
  Any atom naming your pool, e.g. `:my_pool`.
  """
  @type pool_id() :: atom()
  @typedoc """
  #{@poolex_options_table}
  """
  @type poolex_option() ::
          {:pool_id, pool_id()}
          | {:worker_module, module()}
          | {:workers_count, non_neg_integer()}
          | {:max_overflow, non_neg_integer()}
          | {:worker_args, list(any())}
          | {:worker_start_fun, atom()}
          | {:busy_workers_impl, module()}
          | {:idle_workers_impl, module()}
          | {:waiting_callers_impl, module()}

  @typedoc """
  Process id of `worker`.

  **Workers** are processes launched in a pool.
  """
  @type worker() :: pid()

  @typedoc """
  | Option  | Description                                        | Example  | Default value                           |
  |---------|----------------------------------------------------|----------|-----------------------------------------|
  | checkout_timeout | How long we can wait for a worker on the call site | `60_000` | `#{@default_checkout_timeout}` |
  """
  @type run_option() :: {:checkout_timeout, timeout()}

  @doc """
  Starts a Poolex process without links (outside of a supervision tree).

  See start_link/1 for more information.

  ## Examples

      iex> Poolex.start(pool_id: :my_pool, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
      iex> %Poolex.Private.State{worker_module: worker_module} = Poolex.get_state(:my_pool)
      iex> worker_module
      Agent
  """
  @spec start(list(poolex_option())) :: GenServer.on_start()
  def start(opts) do
    pool_id = Keyword.fetch!(opts, :pool_id)
    GenServer.start(__MODULE__, opts, name: pool_id)
  end

  @doc """
  Starts a Poolex process linked to the current process.

  This is often used to start the Poolex as part of a supervision tree.

  After the process is started, you can access it using the previously specified `pool_id`.

  ## Options

  #{@poolex_options_table}

  ## Examples

      iex> Poolex.start_link(pool_id: :other_pool, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
      iex> %Poolex.Private.State{worker_module: worker_module} = Poolex.get_state(:other_pool)
      iex> worker_module
      Agent
  """
  @spec start_link(list(poolex_option())) :: GenServer.on_start()
  def start_link(opts) do
    pool_id = Keyword.fetch!(opts, :pool_id)
    GenServer.start_link(__MODULE__, opts, name: pool_id)
  end

  @doc """
  Returns a specification to start this module under a supervisor.

  ## Options

  #{@poolex_options_table}

  ## Examples

      children = [
        Poolex.child_spec(pool_id: :worker_pool_1, worker_module: SomeWorker, workers_count: 5),
        # or in another way
        {Poolex, [pool_id: :worker_pool_2, worker_module: SomeOtherWorker, workers_count: 5]}
      ]

      Supervisor.start_link(children, strategy: :one_for_one)
  """
  @spec child_spec(list(poolex_option())) :: Supervisor.child_spec()
  def child_spec(opts) do
    pool_id = Keyword.fetch!(opts, :pool_id)
    %{id: pool_id, start: {Poolex, :start_link, [opts]}}
  end

  @doc """
  The main function for working with the pool.

  It takes a pool identifier, a function that takes a worker process id as an argument and returns any value.
  When executed, an attempt is made to find a free worker with specified timeout (5 seconds by default).
  You can set the timeout using the `checkout_timeout` option.

  Returns:
    * `{:ok, result}` if the worker was found and the function was executed successfully.
    * `{:error, :checkout_timeout}` if no free worker was found before the timeout.

  ## Examples

      iex> Poolex.start_link(pool_id: :some_pool, worker_module: Agent, worker_args: [fn -> 5 end], workers_count: 1)
      iex> Poolex.run(:some_pool, fn pid -> Agent.get(pid, &(&1)) end)
      {:ok, 5}
  """
  @spec run(pool_id(), (worker :: pid() -> any()), list(run_option())) ::
          {:ok, any()} | {:error, :checkout_timeout}
  def run(pool_id, fun, options \\ []) do
    checkout_timeout = Keyword.get(options, :checkout_timeout, @default_checkout_timeout)

    case get_idle_worker(pool_id, checkout_timeout) do
      {:ok, worker_pid} ->
        monitor_process = monitor_caller(pool_id, self(), worker_pid)

        try do
          {:ok, fun.(worker_pid)}
        after
          Process.exit(monitor_process, :kill)
          GenServer.cast(pool_id, {:release_busy_worker, worker_pid})
        end

      {:error, :checkout_timeout} ->
        {:error, :checkout_timeout}
    end
  end

  @spec get_idle_worker(pool_id(), timeout()) :: {:ok, worker()} | {:error, :checkout_timeout}
  defp get_idle_worker(pool_id, checkout_timeout) do
    caller_reference = make_ref()

    try do
      GenServer.call(pool_id, {:get_idle_worker, caller_reference}, checkout_timeout)
    catch
      :exit,
      {:timeout, {GenServer, :call, [_pool_id, {:get_idle_worker, ^caller_reference}, _timeout]}} ->
        {:error, :checkout_timeout}
    after
      GenServer.cast(pool_id, {:cancel_waiting, caller_reference})
    end
  end

  @doc """
  Returns current state of started pool.

  Primarily needed to help with debugging. **Avoid using this function in production.**

  ## Examples

      iex> Poolex.start(pool_id: :my_pool_2, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
      iex> state = %Poolex.Private.State{} = Poolex.get_state(:my_pool_2)
      iex> state.worker_module
      Agent
      iex> is_pid(state.supervisor)
      true
  """
  @spec get_state(pool_id()) :: State.t()
  def get_state(pool_id) do
    GenServer.call(pool_id, :get_state)
  end

  @doc """
  Returns detailed information about started pool.

  Primarily needed to help with debugging. **Avoid using this function in production.**

  ## Fields

      * `busy_workers_count` - how many workers are busy right now.
      * `busy_workers_pids` - list of busy workers.
      * `idle_workers_count` - how many workers are ready to work.
      * `idle_workers_pids` - list of idle workers.
      * `max_overflow` - how many workers can be created over the limit.
      * `overflow` - current count of workers launched over limit.
      * `waiting_caller_pids` - list of callers processes.
      * `worker_args` - what parameters are used to start the worker.
      * `worker_module` - name of a module that describes a worker.
      * `worker_start_fun` - what function is used to start the worker.

  ## Examples

      iex> Poolex.start(pool_id: :my_pool_3, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
      iex> debug_info = %Poolex.Private.DebugInfo{} = Poolex.get_debug_info(:my_pool_3)
      iex> debug_info.busy_workers_count
      0
      iex> debug_info.idle_workers_count
      5
  """
  @spec get_debug_info(pool_id()) :: DebugInfo.t()
  def get_debug_info(pool_id) do
    GenServer.call(pool_id, :get_debug_info)
  end

  @impl GenServer
  def init(opts) do
    Process.flag(:trap_exit, true)

    pool_id = Keyword.fetch!(opts, :pool_id)
    worker_module = Keyword.fetch!(opts, :worker_module)
    workers_count = Keyword.fetch!(opts, :workers_count)

    max_overflow = Keyword.get(opts, :max_overflow, 0)
    worker_args = Keyword.get(opts, :worker_args, [])
    worker_start_fun = Keyword.get(opts, :worker_start_fun, :start_link)

    busy_workers_impl = Keyword.get(opts, :busy_workers_impl, Poolex.Workers.Impl.List)
    idle_workers_impl = Keyword.get(opts, :idle_workers_impl, Poolex.Workers.Impl.List)

    waiting_callers_impl =
      Keyword.get(opts, :waiting_callers_impl, Poolex.Callers.Impl.ErlangQueue)

    {:ok, monitor_id} = Monitoring.init(pool_id)
    {:ok, supervisor} = Poolex.Private.Supervisor.start_link()

    state =
      %State{
        max_overflow: max_overflow,
        monitor_id: monitor_id,
        pool_id: pool_id,
        supervisor: supervisor,
        worker_args: worker_args,
        worker_module: worker_module,
        worker_start_fun: worker_start_fun
      }

    initial_workers_pids = start_workers(workers_count, state, monitor_id)

    state =
      state
      |> IdleWorkers.init(idle_workers_impl, initial_workers_pids)
      |> BusyWorkers.init(busy_workers_impl)
      |> WaitingCallers.init(waiting_callers_impl)

    {:ok, state}
  end

  @spec start_workers(non_neg_integer(), State.t(), Monitoring.monitor_id()) :: [pid]
  defp start_workers(0, _state, _monitor_id) do
    []
  end

  defp start_workers(workers_count, _state, _monitor_id) when workers_count < 0 do
    msg = "workers_count must be non negative number, received: #{inspect(workers_count)}"
    raise ArgumentError, msg
  end

  defp start_workers(workers_count, state, monitor_id) do
    Enum.map(1..workers_count, fn _ ->
      {:ok, worker_pid} = start_worker(state)
      Monitoring.add(monitor_id, worker_pid, :worker)

      worker_pid
    end)
  end

  @spec start_worker(State.t()) :: {:ok, pid()}
  defp start_worker(%State{} = state) do
    DynamicSupervisor.start_child(state.supervisor, %{
      id: make_ref(),
      start: {state.worker_module, state.worker_start_fun, state.worker_args},
      restart: :temporary
    })
  end

  @spec stop_worker(Supervisor.supervisor(), pid()) :: :ok | {:error, :not_found}
  defp stop_worker(supervisor, worker_pid) do
    DynamicSupervisor.terminate_child(supervisor, worker_pid)
  end

  @impl GenServer
  def handle_call({:get_idle_worker, caller_reference}, {from_pid, _} = caller, %State{} = state) do
    if IdleWorkers.empty?(state) do
      if state.overflow < state.max_overflow do
        {:ok, new_worker} = start_worker(state)

        Monitoring.add(state.monitor_id, new_worker, :worker)

        state = BusyWorkers.add(state, new_worker)

        {:reply, {:ok, new_worker}, %State{state | overflow: state.overflow + 1}}
      else
        Monitoring.add(state.monitor_id, from_pid, :waiting_caller)

        state =
          WaitingCallers.add(state, %Poolex.Caller{reference: caller_reference, from: caller})

        {:noreply, state}
      end
    else
      {idle_worker_pid, state} = IdleWorkers.pop(state)
      state = BusyWorkers.add(state, idle_worker_pid)

      {:reply, {:ok, idle_worker_pid}, state}
    end
  end

  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end

  def handle_call(:get_debug_info, _form, %State{} = state) do
    debug_info = %DebugInfo{
      busy_workers_count: BusyWorkers.count(state),
      busy_workers_impl: state.busy_workers_impl,
      busy_workers_pids: BusyWorkers.to_list(state),
      idle_workers_count: IdleWorkers.count(state),
      idle_workers_impl: state.idle_workers_impl,
      idle_workers_pids: IdleWorkers.to_list(state),
      max_overflow: state.max_overflow,
      overflow: state.overflow,
      waiting_callers: WaitingCallers.to_list(state),
      waiting_callers_impl: state.waiting_callers_impl,
      worker_args: state.worker_args,
      worker_module: state.worker_module,
      worker_start_fun: state.worker_start_fun
    }

    {:reply, debug_info, state}
  end

  @impl GenServer
  def handle_cast({:release_busy_worker, worker}, %State{} = state) do
    if WaitingCallers.empty?(state) do
      new_state = release_busy_worker(state, worker)
      {:noreply, new_state}
    else
      new_state = provide_worker_to_waiting_caller(state, worker)
      {:noreply, new_state}
    end
  end

  @impl GenServer
  def handle_cast({:cancel_waiting, caller_reference}, %State{} = state) do
    {:noreply, WaitingCallers.remove_by_reference(state, caller_reference)}
  end

  @impl GenServer
  def handle_info(
        {:DOWN, monitoring_reference, _process, dead_process_pid, _reason},
        %State{} = state
      ) do
    case Monitoring.remove(state.monitor_id, monitoring_reference) do
      :worker ->
        {:noreply, handle_down_worker(state, dead_process_pid)}

      :waiting_caller ->
        {:noreply, handle_down_waiting_caller(state, dead_process_pid)}
    end
  end

  @spec release_busy_worker(State.t(), worker()) :: State.t()
  defp release_busy_worker(%State{} = state, worker) do
    if BusyWorkers.member?(state, worker) do
      state = BusyWorkers.remove(state, worker)

      if state.overflow > 0 do
        stop_worker(state.supervisor, worker)

        state
      else
        IdleWorkers.add(state, worker)
      end
    else
      state
    end
  end

  @spec provide_worker_to_waiting_caller(State.t(), worker()) :: State.t()
  defp provide_worker_to_waiting_caller(%State{} = state, worker) do
    {caller, state} = WaitingCallers.pop(state)

    GenServer.reply(caller.from, {:ok, worker})

    state
  end

  @spec handle_down_worker(State.t(), pid()) :: State.t()
  defp handle_down_worker(%State{} = state, dead_process_pid) do
    state =
      state
      |> IdleWorkers.remove(dead_process_pid)
      |> BusyWorkers.remove(dead_process_pid)

    if WaitingCallers.empty?(state) do
      if state.overflow > 0 do
        %State{state | overflow: state.overflow - 1}
      else
        {:ok, new_worker} = start_worker(state)

        Monitoring.add(state.monitor_id, new_worker, :worker)

        IdleWorkers.add(state, new_worker)
      end
    else
      {:ok, new_worker} = start_worker(state)
      Monitoring.add(state.monitor_id, new_worker, :worker)

      state
      |> BusyWorkers.add(new_worker)
      |> provide_worker_to_waiting_caller(new_worker)
    end
  end

  @spec handle_down_waiting_caller(State.t(), pid()) :: State.t()
  defp handle_down_waiting_caller(%State{} = state, dead_process_pid) do
    WaitingCallers.remove_by_pid(state, dead_process_pid)
  end

  @impl GenServer
  def terminate(reason, %State{} = state) do
    DynamicSupervisor.stop(state.supervisor, reason)
    Monitoring.stop(state.monitor_id)

    :ok
  end

  # Monitor the `caller`. Release attached worker in case of caller's death.
  @spec monitor_caller(pool_id(), caller :: pid(), worker :: pid()) :: monitor_process :: pid()
  defp monitor_caller(pool_id, caller, worker) do
    spawn(fn ->
      reference = Process.monitor(caller)

      receive do
        {:DOWN, ^reference, :process, ^caller, _reason} ->
          GenServer.cast(pool_id, {:release_busy_worker, worker})
      end
    end)
  end
end