lib/gen_registry.ex

defmodule GenRegistry do
  @moduledoc """
  GenRegistry provides a `Registry` like interface for managing processes.
  """

  @behaviour GenRegistry.Behaviour

  use GenServer

  alias :ets, as: ETS
  alias GenRegistry.Types

  defstruct [:worker_module, :worker_type, :workers]

  @typedoc """
  GenRegistry State.

  - worker_module: Module to spawn
  - worker_type: `:supervisor` if the worker_module is a supervisor, `:worker` otherwise
  - workers: ETS table id holding the worker tracking records.
  """
  @type t :: %__MODULE__{
          worker_module: module,
          worker_type: :supervisor | :worker,
          workers: ETS.tab()
        }

  @gen_module Application.get_env(:gen_registry, :gen_module, GenServer)

  ## Client

  @doc """
  Callback called by `Supervisor.init/2`

  It is required that you provide a `:worker_module` argument or the call will fail.
  """
  @spec child_spec(opts :: Keyword.t()) :: Supervisor.child_spec()
  def child_spec(opts) do
    worker_module = Keyword.fetch!(opts, :worker_module)

    opts =
      opts
      |> Keyword.delete(:worker_module)
      |> Keyword.put_new(:name, worker_module)

    %{
      id: opts[:name],
      start: {__MODULE__, :start_link, [worker_module, opts]},
      type: :supervisor
    }
  end

  @doc """
  Start a registry instance.

  GenRegistry should be run under a supervision tree, it is not recommended to call this directly.
  """
  @spec start_link(module, Keyword.t()) :: {:ok, pid} | {:error, any}
  def start_link(module, opts \\ []) do
    GenServer.start_link(__MODULE__, {module, opts[:name]}, opts)
  end

  @doc """
  Lookup a running a process.

  This is a fast path to the ETS table.
  """
  @spec lookup(table :: ETS.tab(), id :: Types.id()) :: {:ok, pid} | {:error, :not_found}
  def lookup(table, id) do
    case ETS.lookup(table, id) do
      [{^id, pid}] -> {:ok, pid}
      [] -> {:error, :not_found}
    end
  end

  @doc """
  Attempts to lookup a running process by id.

  If the id is not associated with a running process then it is spawned, the optional third
  argument will be passed to `start_link` of the `worker_module` to spawn a new process.
  """
  @spec lookup_or_start(
          registry :: GenServer.server(),
          id :: Types.id(),
          args :: [any],
          timeout :: integer
        ) ::
          {:ok, pid} | {:error, any}
  def lookup_or_start(registry, id, args \\ [], timeout \\ 5_000)

  def lookup_or_start(registry, id, args, timeout) when is_atom(registry) do
    case lookup(registry, id) do
      {:ok, pid} ->
        {:ok, pid}

      {:error, :not_found} ->
        @gen_module.call(registry, {:lookup_or_start, id, args}, timeout)
    end
  end

  def lookup_or_start(registry, id, args, timeout) do
    @gen_module.call(registry, {:lookup_or_start, id, args}, timeout)
  end

  @doc """
  Starts a process by id

  If the id is already associated with a running process `{:error, {:already_started, pid}}` is
  returned.

  If the id is not associated with a running process then it is spawned, the optional third
  argument will be passed to `start_link` of the `worker_module` to spawn a new process.
  """
  @spec start(
          registry :: GenServer.server(),
          id :: Types.id(),
          args :: [any()],
          timeout :: integer()
        ) :: {:ok, pid()} | {:error, {:already_started, pid()}} | {:error, any()}
  def start(registry, id, args \\ [], timeout \\ 5_000)

  def start(registry, id, args, timeout) when is_atom(registry) do
    case lookup(registry, id) do
      {:ok, pid} ->
        {:error, {:already_started, pid}}

      {:error, :not_found} ->
        @gen_module.call(registry, {:start, id, args}, timeout)
    end
  end

  def start(registry, id, args, timeout) do
    @gen_module.call(registry, {:start, id, args}, timeout)
  end

  @doc """
  Safely stops a process managed by the GenRegistry

  In addition to stopping the process, the id is also removed from the GenRegistry

  If the id provided is not registered this will return `{:error, :not_found}`
  """
  @spec stop(registry :: GenServer.server(), id :: Types.id()) :: :ok | {:error, :not_found}
  def stop(registry, id) do
    @gen_module.call(registry, {:stop, id})
  end

  @doc """
  Return the number of running processes in this registry.
  """
  @spec count(table :: ETS.tab()) :: non_neg_integer()
  def count(table) do
    ETS.info(table, :size)
  end

  @doc """
  Return a sample entry from the registry.

  If the registry is empty returns `nil`.
  """
  @spec sample(table :: ETS.tab()) :: {Types.id(), pid()} | nil
  def sample(table) do
    case ETS.first(table) do
      :"$end_of_table" ->
        nil

      key ->
        case ETS.lookup(table, key) do
          [entry] ->
            entry

          _ ->
            sample(table)
        end
    end
  end

  @doc """
  Loop over all the processes and return result.

  The function will be called with two arguments, a two-tuple of `{id, pid}` and then accumulator,
  the function should return the accumulator.

  There is no ordering guarantee when reducing.
  """
  @spec reduce(table :: ETS.tab(), acc :: any, ({Types.id(), pid()}, any() -> any())) :: any
  def reduce(table, acc, func) do
    ETS.foldr(func, acc, table)
  end

  @doc """
  Returns all the entries of the GenRegistry as a list.

  There is no ordering guarantee for the list.
  """
  @spec to_list(table :: ETS.tab()) :: [{Types.id(), pid()}]
  def to_list(table) do
    ETS.tab2list(table)
  end

  ## Server Callbacks

  def init({worker_module, name}) do
    Process.flag(:trap_exit, true)

    worker_type =
      case worker_module.module_info[:attributes][:behaviour] do
        [:supervisor] -> :supervisor
        _ -> :worker
      end

    state = %__MODULE__{
      workers:
        ETS.new(name, [
          :public,
          :set,
          :named_table,
          {:read_concurrency, true}
        ]),
      worker_module: worker_module,
      worker_type: worker_type
    }

    {:ok, state}
  end

  def terminate(_reason, _state) do
    for pid <- Process.get_keys(), is_pid(pid) do
      Process.unlink(pid)
      Process.exit(pid, :kill)
    end

    :ok
  end

  def handle_call({:lookup_or_start, id, args}, _from, state) do
    {:reply, do_lookup_or_start(state, id, args), state}
  end

  def handle_call({:start, id, args}, _from, state) do
    {:reply, do_start(state, id, args), state}
  end

  def handle_call({:stop, id}, _from, state) do
    {:reply, do_stop(state, id), state}
  end

  # Call from supervisor module.
  def handle_call(
        :which_children,
        _from,
        %__MODULE__{worker_type: worker_type, worker_module: worker_module} = state
      ) do
    children =
      for pid <- Process.get_keys(), is_pid(pid) do
        {:undefined, pid, worker_type, [worker_module]}
      end

    {:reply, children, state}
  end

  def handle_call(
        :count_children,
        _from,
        %__MODULE__{worker_type: worker_type, worker_module: worker_module} = state
      ) do
    counts = [
      specs: 1,
      active: count(worker_module),
      supervisors: 0,
      workers: 0
    ]

    counts =
      case worker_type do
        :worker -> Keyword.put(counts, :workers, counts[:active])
        :supervisor -> Keyword.put(counts, :supervisors, counts[:active])
      end

    {:reply, counts, state}
  end

  def handle_call(_message, _from, state) do
    {:reply, {:error, __MODULE__}, state}
  end

  def handle_info({:EXIT, pid, _reason}, %__MODULE__{workers: workers} = state) do
    ETS.delete(workers, Process.delete(pid))
    {:noreply, state}
  end

  def handle_info(_message, state) do
    {:noreply, state}
  end

  ## Private

  @spec do_lookup_or_start(state :: t, id :: Types.id(), args :: [any]) ::
          {:ok, pid} | {:error, any}
  defp do_lookup_or_start(%__MODULE__{worker_module: worker_module, workers: workers}, id, args) do
    case lookup(workers, id) do
      {:ok, pid} ->
        {:ok, pid}

      {:error, :not_found} ->
        case apply(worker_module, :start_link, args) do
          {:ok, pid} ->
            ETS.insert_new(workers, {id, pid})
            Process.put(pid, id)
            {:ok, pid}

          {:error, reason} ->
            {:error, reason}
        end
    end
  end

  @spec do_start(state :: t(), id :: Types.id(), args :: [any()]) ::
          {:ok, pid()} | {:error, {:already_started, pid()}} | {:error, any()}
  defp do_start(%__MODULE__{worker_module: worker_module, workers: workers}, id, args) do
    case lookup(workers, id) do
      {:ok, pid} ->
        {:error, {:already_started, pid}}

      {:error, :not_found} ->
        case apply(worker_module, :start_link, args) do
          {:ok, pid} ->
            ETS.insert_new(workers, {id, pid})
            Process.put(pid, id)
            {:ok, pid}

          {:error, reason} ->
            {:error, reason}
        end
    end
  end

  @spec do_stop(state :: t, id :: Types.id()) :: :ok | {:error, :not_found}
  defp do_stop(%__MODULE__{workers: workers}, id) do
    with {:ok, pid} <- lookup(workers, id) do
      Process.unlink(pid)
      Process.exit(pid, :shutdown)
      ETS.delete(workers, Process.delete(pid))
      :ok
    end
  end
end