lib/dragonfly/pool.ex

defmodule Dragonfly.Pool.RunnerState do
  defstruct count: nil, pid: nil, monitor_ref: nil
end

defmodule Dragonfly.Pool.WaitingState do
  defstruct from: nil, monitor_ref: nil
end

defmodule Dragonfly.Pool do
  @moduledoc """
  Manages a pool of `Dragonfly.Runner`'s.

  Pools support elastic growth and shrinking of the number of runners.

  ## Examples

      children = [
        ...,
        {Dragonfly.Pool, name: MyRunner, min: 1, max: 10, max_concurrency: 100}
      ]

  See `start_link/1` for supported options.
  """
  use GenServer

  alias Dragonfly.{Pool, Runner}
  alias Dragonfly.Pool.{RunnerState, WaitingState}

  @default_max_concurrency 100
  @boot_timeout 30_000
  @idle_shutdown_after 30_000

  defstruct name: nil,
            dynamic_sup: nil,
            terminator_sup: nil,
            boot_timeout: nil,
            idle_shutdown_after: nil,
            min: nil,
            max: nil,
            max_concurrency: nil,
            callers: %{},
            waiting: [],
            runners: %{},
            runner_opts: []

  def child_spec(opts) do
    %{
      id: {__MODULE__, Keyword.fetch!(opts, :name)},
      start: {Dragonfly.Pool.Supervisor, :start_link, [opts]},
      type: :supervisor
    }
  end

  @doc """
  Starts a pool of runners.

  ## Options

    * `:name` - The name of the pool, for example: `MyApp.FFMPegRunner`

    * `:min` - The minimum number of runners to keep in the pool at all times.
      For "scale to zero" behavior you may pass `0`. When starting as a dragonfly child,
      the `:min` will be forced to zero to avoid recursively starting backend resources.

    * `:max` - The maximum number of runners to elastically grow to in the pool.

    * `:max_concurrency` - The maximum number of concurrent executions per runner before
      booting new runners or queueing calls. Defaults to `100`.

    * `:single_use` - if `true`, runners will be terminated after each call completes.
      Defaults `false`.

    * `:backend` - The backend to use. Defaults to the configured `:dragonfly, :backend` or
      `Dragonfly.LocalBackend` if not configured.

    * `:log` - The log level to use for verbose logging. Defaults to `false`.

    * `:timeout` - The time to allow functions to execute on a remote node. Defaults to 30 seconds.
      This value is also used as the default `Dragonfly.call/3` timeout for the caller.
    * `:boot_timeout` - The time to allow for booting and connecting to a remote node.
      Defaults to 30 seconds.

    * `:shutdown_timeout` - The time to allow for graceful shutdown on the remote node.
      Defaults to 30 seconds.

    * `:idle_shutdown_after` - The amount of time and function check to idle a remote node
      down after a period of inactivity. Defaults to 30 seconds. A tuple may also be passed
      to check a spefici condition, for example:

          {10_000, fn -> Supervisor.which_children(MySup) == []}
  """
  def start_link(opts) do
    Keyword.validate!(opts, [
      :name,
      :dynamic_sup,
      :terminator_sup,
      :idle_shutdown_after,
      :min,
      :max,
      :max_concurrency,
      :backend,
      :log,
      :single_use,
      :timeout,
      :boot_timeout,
      :shutdown_timeout
    ])

    GenServer.start_link(__MODULE__, opts, name: Keyword.fetch!(opts, :name))
  end

  @doc """
  TODO
  """
  def call(name, func, opts \\ []) do
    opts = Keyword.put_new_lazy(opts, :timeout, fn -> lookup_boot_timeout(name) end)
    {{ref, runner_pid}, opts} =
      with_elapsed_timeout(opts, fn -> GenServer.call(name, :checkout, opts[:timeout]) end)

    result = Runner.call(runner_pid, func, opts[:timeout])
    :ok = GenServer.call(name, {:checkin, ref})
    result
  end

  defp with_elapsed_timeout(opts, func) do
    {micro, result} = :timer.tc(func)
    elapsed_ms = div(micro, 1000)

    opts =
      case Keyword.fetch(opts, :timeout) do
        {:ok, :infinity} -> opts
        {:ok, ms} when is_integer(ms) -> Keyword.put(opts, :timeout, ms - elapsed_ms)
        {:ok, nil} -> opts
        :error -> opts
      end

    {result, opts}
  end

  defp lookup_boot_timeout(name) do
    :ets.lookup_element(name, :boot_timeout, 2)
  end

  @impl true
  def init(opts) do
    name = Keyword.fetch!(opts, :name)
    boot_timeout = Keyword.get(opts, :boot_timeout, @boot_timeout)
    :ets.new(name, [:set, :public, :named_table, read_concurrency: true])
    :ets.insert(name, {:boot_timeout, boot_timeout})
    terminator_sup = Keyword.fetch!(opts, :terminator_sup)
    runner_opts = runner_opts(opts, terminator_sup)
    min = Keyword.fetch!(opts, :min)

    # we must avoid recursively booting remote runners if we are a child
    min =
      if Dragonfly.Parent.get() do
        0
      else
        min
      end

    state = %Pool{
      dynamic_sup: Keyword.fetch!(opts, :dynamic_sup),
      terminator_sup: terminator_sup,
      name: name,
      min: min,
      max: Keyword.fetch!(opts, :max),
      boot_timeout: boot_timeout,
      idle_shutdown_after: Keyword.get(opts, :idle_shutdown_after, @idle_shutdown_after),
      max_concurrency: Keyword.get(opts, :max_concurrency, @default_max_concurrency),
      runner_opts: runner_opts
    }

    {:ok, boot_runners(state)}
  end

  defp runner_opts(opts, terminator_sup) do
    defaults = [terminator_sup: terminator_sup, log: Keyword.get(opts, :log, false)]

    runner_opts =
      Keyword.take(
        opts,
        [
          :backend,
          :log,
          :single_use,
          :timeout,
          :boot_timeout,
          :shutdown_timeout,
          :idle_shutdown_after
        ]
      )

    case Keyword.fetch(opts, :backend) do
      {:ok, {backend, opts}} ->
        Keyword.update!(runner_opts, :backend, {backend, Keyword.merge(opts, defaults)})

      {:ok, backend} ->
        Keyword.update!(runner_opts, :backend, {backend, defaults})

      :error ->
        backend = Dragonfly.Backend.impl()
        backend_opts = Application.get_env(:dragonfly, backend) || []
        Keyword.put(runner_opts, :backend, {backend, Keyword.merge(backend_opts, defaults)})
    end
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, _pid, _reason} = msg, %Pool{} = state) do
    {:noreply, handle_down(state, msg)}
  end

  @impl true
  def handle_call(:checkout, from, state) do
    {:noreply, checkout_runner(state, from)}
  end

  def handle_call({:checkin, ref}, _from, state) do
    {:reply, :ok, checkin_runner(state, ref)}
  end

  defp min_runner(state) do
    if map_size(state.runners) == 0 do
      nil
    else
      {_ref, min} = Enum.min_by(state.runners, fn {_, %RunnerState{count: count}} -> count end)
      min
    end
  end

  defp checkin_runner(state, ref) do
    %{^ref => {_from, runner_ref}} = state.callers
    Process.demonitor(ref, [:flush])

    drop_caller(state, ref, runner_ref)
  end

  defp checkout_runner(%Pool{} = state, from, monitor_ref \\ nil) do
    min_runner = min_runner(state)
    runner_count = map_size(state.runners)

    cond do
      runner_count == 0 || (min_runner.count == state.max_concurrency && runner_count < state.max) ->
        case start_child_runner(state) do
          {:ok, %RunnerState{} = runner} ->
            state
            |> put_runner(runner)
            |> reply_runner_checkout(runner, from, monitor_ref)

          {:error, reason} ->
            GenServer.reply(from, {:error, reason})
            state
        end

      min_runner && min_runner.count < state.max_concurrency ->
        reply_runner_checkout(state, min_runner, from, monitor_ref)

      true ->
        waiting_in(state, from)
    end
  end

  defp reply_runner_checkout(state, %RunnerState{} = runner, from, monitor_ref) do
    # we pass monitor_ref down from waiting so we don't need to remonitor if already monitoring
    ref =
      if monitor_ref do
        monitor_ref
      else
        {from_pid, _tag} = from
        Process.monitor(from_pid)
      end

    GenServer.reply(from, {ref, runner.pid})
    new_state = %Pool{state | callers: Map.put(state.callers, ref, {from, runner.monitor_ref})}
    inc_runner_count(new_state, runner.monitor_ref)
  end

  defp waiting_in(%Pool{} = state, {pid, _tag} = from) do
    ref = Process.monitor(pid)
    waiting = %WaitingState{from: from, monitor_ref: ref}
    %Pool{state | waiting: state.waiting ++ [waiting]}
  end

  defp boot_runners(%Pool{} = state) do
    if state.min > 0 do
      # start min runners, and do not idle them down regardless of idle configuration
      0..(state.min - 1)
      |> Task.async_stream(fn _ -> start_child_runner(state, idle_shutdown_after: nil) end,
        max_concurrency: 10,
        timeout: state.boot_timeout
      )
      |> Enum.reduce(state, fn
        {:ok, {:ok, %RunnerState{} = runner}}, acc -> put_runner(acc, runner)
        {:exit, reason}, _acc -> raise "failed to boot runner: #{inspect(reason)}"
      end)
    else
      state
    end
  end

  defp start_child_runner(%Pool{} = state, runner_opts \\ []) do
    opts = Keyword.merge(state.runner_opts, runner_opts)

    name = Module.concat(state.name, "Runner#{map_size(state.runners) + 1}")

    spec = %{
      id: name,
      start: {Dragonfly.Runner, :start_link, [opts]},
      restart: :temporary
    }

    {:ok, pid} = DynamicSupervisor.start_child(state.dynamic_sup, spec)
    ref = Process.monitor(pid)

    try do
      :ok = Runner.remote_boot(pid)
      runner = %RunnerState{count: 0, pid: pid, monitor_ref: ref}
      {:ok, runner}
    catch
      {:exit, reason} ->
        Process.demonitor(ref, [:flush])
        {:error, {:exit, reason}}
    end
  end

  defp put_runner(%Pool{} = state, %RunnerState{} = runner) do
    %Pool{state | runners: Map.put(state.runners, runner.monitor_ref, runner)}
  end

  defp inc_runner_count(%Pool{} = state, ref) do
    new_runners =
      Map.update!(state.runners, ref, fn %RunnerState{} = runner ->
        %RunnerState{runner | count: runner.count + 1}
      end)

    %Pool{state | runners: new_runners}
  end

  defp dec_runner_count(%Pool{} = state, ref) do
    new_runners =
      Map.update!(state.runners, ref, fn %RunnerState{} = runner ->
        %RunnerState{runner | count: runner.count - 1}
      end)

    %Pool{state | runners: new_runners}
  end

  defp drop_child_runner(%Pool{} = state, ref) when is_reference(ref) do
    Process.demonitor(ref, [:flush])
    %Pool{state | runners: Map.delete(state.runners, ref)}
  end

  defp drop_caller(%Pool{} = state, caller_ref, runner_ref) do
    new_state = %Pool{state | callers: Map.delete(state.callers, caller_ref)}

    new_state
    |> dec_runner_count(runner_ref)
    |> call_next_waiting_caller()
  end

  defp call_next_waiting_caller(%Pool{} = state) do
    # we flush DOWN's so we don't send a lease to a waiting caller that is already down
    new_state = flush_downs(state)

    case new_state.waiting do
      [] ->
        new_state

      [%WaitingState{} = first | rest] ->
        # checkout_runner will borrow already running monitor
        checkout_runner(%Pool{new_state | waiting: rest}, first.from, first.monitor_ref)
    end
  end

  defp handle_down(%Pool{} = state, {:DOWN, ref, :process, _pid, _reason}) do
    new_waiting =
      Enum.filter(state.waiting, fn %WaitingState{} = waiting ->
        waiting.monitor_ref != ref
      end)

    state = %Pool{state | waiting: new_waiting}

    state =
      case state.callers do
        %{^ref => {_from, runner_ref}} ->
          drop_caller(state, ref, runner_ref)

        %{} ->
          state
      end

    case state.runners do
      %{^ref => _} -> drop_child_runner(state, ref)
      %{} -> state
    end
  end

  defp flush_downs(%Pool{} = state) do
    receive do
      {:DOWN, _ref, :process, _pid, _reason} = msg ->
        state
        |> handle_down(msg)
        |> flush_downs()
    after
      0 -> state
    end
  end
end