lib/handler/pool.ex

defmodule Handler.Pool do
  defstruct delegate_fun: nil,
            delegate_to: nil,
            max_workers: 100,
            max_memory_bytes: 10 * 1024 * 1024 * 1024,
            name: nil

  alias __MODULE__
  alias Handler.Pool.{NoWorkersAvailable, State}
  import Handler.Opts
  use GenServer

  @type t :: %Handler.Pool{
          delegate_fun: nil | {module(), fn_name :: atom(), first_argument :: term()},
          delegate_to: nil | name(),
          max_workers: non_neg_integer(),
          max_memory_bytes: non_neg_integer(),
          name: nil | name()
        }
  @type name :: GenServer.name()
  @type opts :: list(opt())
  @type opt ::
          Handler.opt()
          | {:task_name, String.t()}
          | {:delegate_param, term()}
          | {:pool_timeout, Handler.milliseconds()}
  @type pool :: GenServer.server()
  @type exception :: Pool.InsufficientMemory.t() | Pool.NoWorkersAvailable.t() | Pool.Timeout.t()

  @user_killed_process_exit Handler.ProcessExit.exception(
                              message: "User killed the process",
                              reason: :user_killed
                            )

  @moduledoc """
  Manage a pool of resources used to run dangerous functions

  The Pool provides both a synchrous interface (`run/3`) as well as an asynchronous
  interface (`async/3` and `await/1`) for running functions.

  ## Composing Pools

  This module also provides a way to have limited resources for particular use-cases, that end up
  sharing bigger pools of resources. Take for instance a hosted multi-tenant application where you
  can safely use up to 10GB of memory for a particular task, but you don't want any one customer to
  use the whole pool, so each customer has a limit of just 5GB. You can handle this scenario using
  the `delegate_to` configuration.

      {:ok, _pid} = Handler.Pool.start_link(%Handler.Pool{
        max_memory: 10 * 1024 * 1024 * 1024,
        name: :shared_pool
      })
      {:ok, _pid} = Handler.Pool.start_link(%Handler.Pool{
        delegate_to: :shared_pool,
        max_memory: 5 * 1024 * 1024 * 1024,
        name: :customer_pool
      })

      100 = Pool.run(:customer_pool, fn -> 10 * 10 end, max_heap_bytes: 100 * 1024)
  """

  @doc """
  Asynchronously start a job

  Take a potentially dangerous function and run it in the pool. You'll either get back an ok tuple with
  a reference you can pass to the `await/1` function, or a reject tuple with an exception describing
  why the function couldn't be started.
  """
  @spec async(pool(), (() -> any()), opts()) :: {:ok, reference()} | {:reject, exception}
  def async(pool, fun, opts) do
    validate_pool_opts!(opts)
    timeout = Keyword.get(opts, :pool_timeout, 1_000)

    try do
      GenServer.call(pool, {:run, fun, opts}, timeout)
    catch
      :exit, {:noproc, _} ->
        {:reject, NoWorkersAvailable.exception(message: "No Pools Available")}

      :exit, {:timeout, {GenServer, :call, _args}} ->
        {:reject, Pool.Timeout.exception(message: "Could not reach pool within #{timeout}ms")}
    end
  end

  @doc """
  Wait for the result of a job kicked off by async
  """
  @spec await(reference()) :: any() | {:error, Handler.exception()}
  def await(ref) do
    receive do
      {^ref, result} ->
        result
    end
  end

  @doc """
  Run a potentially dangerous function in the pool

  This function has pretty much the same interface as `Handler.run/2` with
  the addition of the `{:reject, t:exception()}` return values when the pool
  does not have enough resources to start a particular function.
  """
  @spec run(pool(), (() -> any()), opts()) ::
          any() | {:error, Handler.exception()} | {:reject, exception()}
  def run(pool, fun, opts) do
    with {:ok, ref} <- async(pool, fun, opts) do
      await(ref)
    end
  end

  @doc """
  Kill jobs by their `:name`

  When kicking off a job the `:name` option can be set and then later
  this function can be used to kill any jobs in progress with a `:name`
  matching the `task_name` of this function.

  The client that kicked off the work will receive an `{:error, exception}`
  as the result for the job. If you don't pass an exception, you will get back
  a `Handler.ProcessExit{reason: :user_killed}` by default.
  """
  @spec kill(pool(), String.t()) :: {:ok, integer()}
  def kill(pool, task_name, exception \\ @user_killed_process_exit) when is_binary(task_name) do
    GenServer.call(pool, {:kill, task_name, exception})
  end

  @doc """
  Kill a job by its `ref`

  When kicking off a job with the `async/3` function a `ref` is returned
  and that job can later be killed.

  The client that kicked off the work will receive an `{:error, exception}`
  as the result for the job. If you don't pass an exception, you will get back
  a `Handler.ProcessExit{reason: :user_killed}` by default.
  """
  @spec kill_by_ref(pool(), reference()) :: :ok | :no_such_worker
  def kill_by_ref(pool, ref, exception \\ @user_killed_process_exit) when is_reference(ref) do
    GenServer.call(pool, {:kill_ref, ref, exception})
  end

  @doc """
  Kill all jobs in a pool. Returns the number of killed jobs.

  The client that kicked off the work will receive an `{:error, exception}`
  as the result for the job. If you don't pass an exception, you will get back
  a `Handler.ProcessExit{reason: :user_killed}` by default.
  """
  @spec flush(pool()) :: {:ok, non_neg_integer()}
  def flush(pool, exception \\ @user_killed_process_exit) do
    GenServer.call(pool, {:flush, exception})
  end

  ## GenServer / OTP callbacks

  @spec start_link(Handler.Pool.t()) :: :ignore | {:error, any} | {:ok, pid}
  def start_link(%Pool{name: name} = config) when not is_nil(name) do
    opts = [name: name]
    GenServer.start_link(Pool, config, opts)
  end

  def start_link(%Pool{} = config) do
    GenServer.start_link(Pool, config)
  end

  @impl GenServer
  def init(%Pool{} = pool) do
    state = %State{pool: pool}
    {:ok, state}
  end

  @impl GenServer
  def handle_call({:run, fun, opts}, {from_pid, _tag}, state) do
    case State.start_worker(state, fun, opts, from_pid) do
      {:ok, state, ref} ->
        {:reply, {:ok, ref}, state}

      {:reject, exception} ->
        {:reply, {:reject, exception}, state}
    end
  end

  @impl GenServer
  def handle_call({:kill, task_name, exception}, _from, state) do
    {:ok, state, number_killed} = State.kill_worker(state, task_name, exception)
    {:reply, {:ok, number_killed}, state}
  end

  @impl GenServer
  def handle_call({:kill_ref, ref, exception}, _from, state) do
    {:ok, state, result} = State.kill_worker_by_ref(state, ref, exception)
    {:reply, result, state}
  end

  @impl GenServer
  def handle_call({:flush, exception}, _from, state) do
    {:ok, state, number_killed} = State.flush_workers(state, exception)
    {:reply, {:ok, number_killed}, state}
  end

  @impl GenServer
  def handle_info({ref, result}, state) when is_reference(ref) do
    if delegating_work?(state) do
      state =
        State.send_response(state, ref, result)
        |> State.cleanup_commitments(ref)

      {:noreply, state}
    else
      state = State.send_response(state, ref, result)
      {:noreply, state}
    end
  end

  @impl GenServer
  def handle_info({:DOWN, ref, :process, pid, :normal}, state)
      when is_reference(ref) and is_pid(pid) do
    state = State.cleanup_commitments(state, ref)
    {:noreply, state}
  end

  @impl GenServer
  def handle_info({:EXIT, pid, :normal}, state) when is_pid(pid) do
    # tasks that are killed via :user_killed send this message back in
    # addition to the {:DOWN, ref, :process, pid, :normal} message
    {:noreply, state}
  end

  @impl GenServer
  def handle_info(other, state) do
    require Logger
    Logger.error("#{__MODULE__} received unexpected message #{inspect(other)}")
    {:noreply, state}
  end

  defp delegating_work?(%State{} = state) do
    state.pool.delegate_to != nil or state.pool.delegate_fun != nil
  end
end