lib/process_hub/service/process_registry.ex

defmodule ProcessHub.Service.ProcessRegistry do
  @moduledoc """
  The process registry service provides API functions for managing the process registry.
  """

  @type registry() :: %{
          ProcessHub.child_id() => {
            ProcessHub.child_spec(),
            [{node(), pid()}]
          }
        }

  alias ProcessHub.Utility.Name
  alias ProcessHub.Constant.Hook
  alias ProcessHub.Service.HookManager

  @doc "Returns the process registry table identifier."
  @spec registry_name(ProcessHub.hub_id()) :: atom()
  def registry_name(hub_id) do
    Name.concat_name([hub_id, :process_registry], ".")
  end

  @doc "Returns information about all registered processes."
  @spec registry(ProcessHub.hub_id()) :: registry()
  def registry(hub_id) do
    registry_name(hub_id)
    |> :ets.tab2list()
    |> Map.new()
  end

  @spec contains_children(ProcessHub.hub_id(), [ProcessHub.child_id()]) :: [ProcessHub.child_id()]
  @doc "Returns a list of child_ids that match the given `child_ids` variable."
  def contains_children(hub_id, child_ids) do
    registry(hub_id)
    |> Enum.reduce([], fn {child_id, _}, acc ->
      case Enum.member?(child_ids, child_id) do
        true -> [child_id | acc]
        false -> acc
      end
    end)
    |> Enum.reverse()
  end

  @doc "Deletes all objects from the process registry."
  @spec clear_all(ProcessHub.hub_id()) :: true
  def clear_all(hub_id) do
    registry_name(hub_id)
    |> :ets.delete_all_objects()
  end

  @doc "Returns information about processes registered under the local node."
  @spec local_children(ProcessHub.hub_id()) :: [
          {ProcessHub.child_id(), {ProcessHub.child_spec(), [{node(), pid()}]}}
        ]
  def local_children(hub_id) do
    local_node = node()

    registry(hub_id)
    |> Enum.filter(fn {_, {_, nodes}} ->
      Enum.member?(Keyword.keys(nodes), local_node)
    end)
  end

  @doc "Returns a list of child specs registered under the local node."
  @spec local_child_specs(ProcessHub.hub_id()) :: [ProcessHub.child_spec()]
  def local_child_specs(hub_id) do
    local_children(hub_id)
    |> Enum.map(fn {_, {child_spec, _}} -> child_spec end)
  end

  @doc "Return the child_spec, nodes, and pids for the given child_id."
  @spec lookup(ProcessHub.hub_id(), ProcessHub.child_id()) ::
          nil | {ProcessHub.child_spec(), [{node(), pid()}]}
  def lookup(hub_id, child_id) do
    res =
      registry_name(hub_id)
      |> :ets.lookup(child_id)
      |> List.first()

    case res do
      nil ->
        nil

      {^child_id, {child_spec, nodes}} ->
        {child_spec, nodes}
    end
  end

  @doc """
  Inserts information about a child process into the registry.

  Calling this function will dispatch the `:registry_pid_insert_hook` hook unless the `:skip_hooks` option is set to `true`.
  """
  @spec insert(ProcessHub.hub_id(), ProcessHub.child_spec(), [{node(), pid()}], keyword() | nil) ::
          :ok
  def insert(hub_id, child_spec, child_nodes, opts \\ []) do
    Keyword.get(opts, :table, registry_name(hub_id))
    |> :ets.insert({child_spec.id, {child_spec, child_nodes}})

    unless Keyword.get(opts, :skip_hooks, false) do
      HookManager.dispatch_hook(hub_id, Hook.registry_pid_inserted(), {child_spec, child_nodes})
    end

    :ok
  end

  @doc """
  Deletes information about a child process from the registry.

  Calling this function will dispatch the `:registry_pid_remove_hook` hook unless the `:skip_hooks` option is set to `true`.
  """
  @spec delete(ProcessHub.hub_id(), ProcessHub.child_id(), keyword() | nil) :: :ok
  def delete(hub_id, child_id, opts \\ []) do
    Keyword.get(opts, :table, registry_name(hub_id))
    |> :ets.delete(child_id)

    unless Keyword.get(opts, :skip_hooks, false) do
      HookManager.dispatch_hook(hub_id, Hook.registry_pid_removed(), child_id)
    end

    :ok
  end

  @doc """
  Inserts information about multiple child processes into the registry.

  Calling this function will dispatch the `:registry_pid_insert_hook` hook unless the `:skip_hooks` option is set to `true`.
  """
  @spec bulk_insert(ProcessHub.hub_id(), %{
          ProcessHub.child_id() => {ProcessHub.child_spec(), [{node(), pid()}]}
        }) :: :ok
  def bulk_insert(hub_id, children) do
    registry_table = registry_name(hub_id)
    process_registry = registry(hub_id)

    hooks =
      Enum.map(children, fn {child_id, {child_spec, child_nodes}} ->
        inserted =
          case process_registry[child_id] do
            nil ->
              insert(hub_id, child_spec, child_nodes, skip_hooks: true, table: registry_table)
              child_nodes

            {_child_spec, nodes} ->
              new_nodes = Enum.uniq(nodes ++ child_nodes)
              insert(hub_id, child_spec, new_nodes, skip_hooks: true, table: registry_table)
              new_nodes
          end

        {Hook.registry_pid_inserted(), {child_spec.id, inserted}}
      end)

    HookManager.dispatch_hooks(hub_id, hooks)

    :ok
  end

  @doc """
  Deletes information about multiple child processes from the registry.

  Calling this function will dispatch the `:registry_pid_remove_hook` hooks unless
  the `:skip_hooks` option is set to `true`.
  """
  @spec bulk_delete(ProcessHub.hub_id(), %{
          ProcessHub.child_id() => {ProcessHub.child_spec(), [{node(), pid()}]}
        }) :: :ok
  def bulk_delete(hub_id, children) do
    registry_table = registry_name(hub_id)
    process_registry = registry(hub_id)

    hooks =
      Enum.map(children, fn {child_id, rem_nodes} ->
        case process_registry[child_id] do
          nil ->
            nil

          {child_spec, nodes} ->
            new_nodes =
              Enum.filter(nodes, fn {node, _pid} ->
                !Enum.member?(rem_nodes, node)
              end)

            if length(new_nodes) > 0 do
              insert(hub_id, child_spec, new_nodes, skip_hooks: true, table: registry_table)
            else
              delete(hub_id, child_id, skip_hooks: true, table: registry_table)
            end

            {Hook.registry_pid_removed(), {child_id, rem_nodes}}
        end
      end)
      |> Enum.reject(&is_nil/1)

    HookManager.dispatch_hooks(hub_id, hooks)
  end
end