lib/process_hub.ex

defmodule ProcessHub do
  @moduledoc """
  This is the main public API module for the `ProcessHub` library and it is recommended to use
  only the functions defined in this module to interact with the `ProcessHub` library.

  ProcessHub is a library that distributes processes within the BEAM cluster. It is designed to
  be used as a building block for distributed applications that require process distribution
  and synchronization.
  """

  @typedoc """
  The `hub_id` defines the name of the hub. It is used to identify the hub.
  """
  @type hub_id() :: atom()

  @typedoc """
  The `child_id` defines the name of the child. It is used to identify the child.
  Each child must have a unique `child_id()` in the cluster. A single child may have
  multiple `pid()`s across the cluster.
  """
  @type child_id() :: atom() | binary()

  @typedoc """
  The `reply_to` defines the `pid()`s that will receive the response from the hub
  when a child is started or stopped.
  """
  @type reply_to() :: [pid()]

  @typedoc """
  The `child_spec` defines the specification of a child process.
  """
  @type child_spec() :: %{
          id: child_id(),
          start: {module(), atom(), [any()]}
        }

  @typedoc """
  The `init_opts()` defines the options that can be passed to the `start_children/3`, `start_child/3`,
  `stop_children/3`, and `stop_child/3` functions.

  - `:async_wait` - is optional and is used to define whether the function should return another function that
  can be used to wait for the children to start or stop. The default is `false`.
  - `:timeout` is optional and is used to define the timeout for the function. The timeout option
  should be used with `async_wait: true`. The default is `5000` (5 seconds).
  - `:check_mailbox` - is optional and is used to define whether the function should clear
  the mailbox of any existing messages that may overlap. It is recommended to keep this
  option `true` to avoid any unexpected behavior where `start_child/3` or `start_children/3`
  call timeout but eventually the calling process receives the start responses later. These messages
  will stay in that process's mailbox, and when the same process calls start child functions again with the
  same `child_id()`s, it will receive the old responses.This option should be used with `async_wait: true`.
   The default is `true`.
  - `:check_existing` - is optional and is used to define whether the function should check if the children
  are already started. The default is `true`.
  """
  @type init_opts() :: [
          async_wait: boolean(),
          timeout: non_neg_integer(),
          check_mailbox: boolean(),
          check_existing: boolean()
        ]

  @typedoc """
  The `stop_opts()` defines the options that can be passed to the `stop_children/3` and `stop_child/3` functions.

  - `:async_wait` - is optional and is used to define whether the function should return another function that
  can be used to wait for the children to stop. The default is `false`.
  - `:timeout` is optional and is used to define the timeout for the function. The timeout option
  should be used with `async_wait: true`. The default is `5000` (5 seconds).
  """
  @type stop_opts() :: [
          async_wait: boolean(),
          timeout: non_neg_integer()
        ]

  @typedoc """
  This is the base configuration structure for the hub and has to be passed to the `start_link/1` function.
  - `:hub_id` is the name of the hub and is required.
  - `:child_specs` is optional and is used to define the child processes that will be started with the hub statically.
  - `:hooks` are optional and are used to define the hooks that can be triggered on specific events.
  - `:redundancy_strategy` is optional and is used to define the strategy for redundancy.
  The default is `ProcessHub.Strategy.Redundancy.Singularity`.
  - `:migration_strategy` is optional and is used to define the strategy for migration.
  The default is `ProcessHub.Strategy.Migration.ColdSwap`.
  - `:synchronization_strategy` is optional and is used to define the strategy for synchronization.
  The default is `ProcessHub.Strategy.Synchronization.PubSub`.
  - `:partition_tolerance_strategy` is optional and is used to define the strategy for partition tolerance.
  The default is `ProcessHub.Strategy.PartitionTolerance.Divergence`.
  - `:distribution_strategy` is optional and is used to define the strategy for process distribution.
  The default is `ProcessHub.Strategy.Distribution.ConsistentHashing`.
  - `:hubs_discover_interval` is optional and is used to define the interval in milliseconds
  for hubs to start the discovery process. The default is `60000` (1 minute).
  - `:deadlock_recovery_timeout` is optional and is used to define the timeout in milliseconds
  to recover from locked hub. Hub locking can happen for different reasons
  such as updating internal data, migrating processes or handling network partitions.
  The default is `60000` (1 minute).
  - `:storage_purge_interval` is optional and is used to define the interval in milliseconds
  for the janitor to clean up the old cache records when the TTL expires. The default is `15000` (15 seconds).
  - `:migr_base_timeout` is optional and is used to define the base timeout in milliseconds
  for the migration process to complete before the hub considers it as a failure.
  The default is `15000` (15 seconds).
  - `:dsup_max_restarts` is optional and is used to define the maximum number of restarts
  for the distributed supervisor. See `Supervisor` child specification for more information.
  The default is `100`.
  - `:dsup_max_seconds` is optional and is used to define the maximum number of seconds
  for the distributed supervisor to restart the child process.
  See `Supervisor` child specification for more information. The default is `4`.
  - `:dsup_shutdown_timeout` is optional and is used to define the timeout in milliseconds
  for the distributed supervisor to wait before forcefully terminating itself
   when receiving a shutdown signal.
  """
  @type t() :: %__MODULE__{
          hub_id: hub_id(),
          child_specs: [child_spec()],
          hooks: ProcessHub.Service.HookManager.hook_handlers(),
          redundancy_strategy:
            ProcessHub.Strategy.Redundancy.Singularity.t()
            | ProcessHub.Strategy.Redundancy.Replication.t(),
          migration_strategy:
            ProcessHub.Strategy.Migration.ColdSwap.t() | ProcessHub.Strategy.Migration.HotSwap.t(),
          synchronization_strategy:
            ProcessHub.Strategy.Synchronization.PubSub.t()
            | ProcessHub.Strategy.Synchronization.Gossip.t(),
          partition_tolerance_strategy:
            ProcessHub.Strategy.PartitionTolerance.Divergence.t()
            | ProcessHub.Strategy.PartitionTolerance.StaticQuorum.t()
            | ProcessHub.Strategy.PartitionTolerance.DynamicQuorum.t(),
          distribution_strategy:
            ProcessHub.Strategy.Distribution.ConsistentHashing.t()
            | ProcessHub.Strategy.Distribution.Guided.t(),
          hubs_discover_interval: pos_integer(),
          deadlock_recovery_timeout: pos_integer(),
          storage_purge_interval: pos_integer(),
          migr_base_timeout: pos_integer(),
          dsup_max_restarts: pos_integer(),
          dsup_max_seconds: pos_integer(),
          dsup_shutdown_timeout: pos_integer()
        }

  @enforce_keys [:hub_id]
  defstruct [
    :hub_id,
    child_specs: [],
    hooks: %{},
    redundancy_strategy: %ProcessHub.Strategy.Redundancy.Singularity{},
    migration_strategy: %ProcessHub.Strategy.Migration.ColdSwap{},
    synchronization_strategy: %ProcessHub.Strategy.Synchronization.PubSub{},
    partition_tolerance_strategy: %ProcessHub.Strategy.PartitionTolerance.Divergence{},
    distribution_strategy: %ProcessHub.Strategy.Distribution.ConsistentHashing{},
    hubs_discover_interval: 60000,
    deadlock_recovery_timeout: 60000,
    storage_purge_interval: 15000,
    migr_base_timeout: 15000,
    dsup_max_restarts: 100,
    dsup_max_seconds: 4,
    dsup_shutdown_timeout: 60000
  ]

  alias ProcessHub.Service.Distributor
  alias ProcessHub.Service.ProcessRegistry
  alias ProcessHub.Service.State
  alias ProcessHub.Service.Cluster
  alias ProcessHub.Utility.Name

  # 10 seconds
  @default_init_timeout 10000

  @doc """
  Starts a child process that will be distributed across the cluster.
  The `t:child_spec()` `:id` must be unique.

  ## Example
      iex> child_spec = %{id: :my_child, start: {MyProcess, :start_link, []}}
      iex> ProcessHub.start_child(:my_hub, child_spec)
      {:ok, :start_initiated}

  By default, the `start_child/3` function is **asynchronous** and returns immediately.
  To wait for the child to start, you can pass `async_wait: true` to the `opts` argument.
  When `async_wait: true`, you **must await** the response from the function.

  See `t:init_opts/0` for more options.

  ## Example with synchronous wait
  The synchronous response includes the status code `:ok` or `:error`, a tuple containing the `t:child_id/0` and
  a list of tuples where the first key is the node where the child is started, and the second key is the
  `pid()` of the started child. By default, the list should contain only one tuple, but if the
  redundancy strategy is configured for replicas, it may contain more than one tuple.

      iex> child_spec = %{id: :my_child, start: {MyProcess, :start_link, []}}
      iex> ProcessHub.start_child(:my_hub, child_spec, [async_wait: true]) |> ProcessHub.await()
      {:ok, {:my_child, [{:mynode, #PID<0.123.0>}]}}
  """
  @spec start_child(hub_id(), child_spec(), init_opts()) ::
          (-> {:ok, list})
          | {:error, :no_children | {:already_started, [atom | binary, ...]}}
          | {:ok, :start_initiated}
  def start_child(hub_id, child_spec, opts \\ []) do
    start_children(hub_id, [child_spec], Keyword.put(opts, :return_first, true))
  end

  @doc """
  Starts multiple child processes that will be distributed across the cluster.

  Same as `start_child/3`, except it starts multiple children at once and is more
  efficient than calling `start_child/3` multiple times.

  > #### Warning {: .warning}
  >
  > Using `start_children/3` with `async_wait: true` can lead to timeout errors,
  > especially when the number of children is large.
  > When `async_wait: true`, you **must await** the response from the function.
  """
  @spec start_children(hub_id(), [child_spec()], init_opts()) ::
          (-> {:ok, list})
          | {:ok, :start_initiated}
          | {:error,
             :no_children
             | {:error, :children_not_list}
             | {:already_started, [atom | binary, ...]}}
  def start_children(hub_id, child_specs, opts \\ []) when is_list(child_specs) do
    Distributor.init_children(hub_id, child_specs, default_init_opts(opts))
  end

  @doc """
  Stops a child process in the cluster.

  By default, this function is **asynchronous** and returns immediately.
  You can wait for the child to stop by passing `async_wait: true` in the `opts` argument.
  When `async_wait: true`, you **must await** the response from the function.

  ## Example
      iex> ProcessHub.stop_child(:my_hub, :my_child)
      {:ok, :stop_initiated}

  See `t:stop_opts/0` for more options.

  ## Example with synchronous wait
      iex> ProcessHub.stop_child(:my_hub, :my_child, [async_wait: true]) |> ProcessHub.await()
      {:ok, {:my_child, [:mynode]}}
  """
  @spec stop_child(hub_id(), child_id(), stop_opts()) ::
          (-> {:ok, list}) | {:ok, :stop_initiated}
  def stop_child(hub_id, child_id, opts \\ []) do
    stop_children(hub_id, [child_id], Keyword.put(opts, :return_first, true))
  end

  @doc """
  Stops multiple child processes in the cluster.

  This function is similar to `stop_child/3`, but it stops multiple children at once, making it more
  efficient than calling `stop_child/3` multiple times.

  > #### Warning {: .info}
  >
  > Using `stop_children/3` with `async_wait: true` can lead to timeout errors,
  > especially when stopping a large number of child processes.
  """
  @spec stop_children(hub_id(), [child_id()], stop_opts()) ::
          (-> {:ok, list}) | {:ok, :stop_initiated} | {:error, list}
  def stop_children(hub_id, child_ids, opts \\ []) do
    Distributor.stop_children(hub_id, child_ids, default_init_opts(opts))
  end

  @doc """
  Works similarly to `Supervisor.which_children/1`, but wraps the result in a tuple
  containing the node name and the children.

  > #### Info {: .info}
  >
  > The `Supervisor.which_children/1` function is known to consume a lot of memory
  > and this can affect performance. The problem is even more relevant when
  > using the `:global` option as it will make a network call to all nodes in the cluster.
  >
  > It is highly recommended to use `ProcessHub.process_list/2` instead.

  Available options:
  - `:global` - returns a list of all child processes started by all nodes in the cluster.
    The return result will be in the format of `[{:node, children}]`.
  - `:local` - returns a list of all child processes started by the local node.
    The return result will be in the format of `{:node, children}`.
  """
  @spec which_children(hub_id(), [:global | :local] | nil) ::
          list()
          | {node(),
             [{any, :restarting | :undefined | pid, :supervisor | :worker, :dynamic | list}]}
  def which_children(hub_id, opts \\ []) do
    case Enum.member?(opts, :global) do
      true -> Distributor.which_children_global(hub_id, opts)
      false -> Distributor.which_children_local(hub_id, opts)
    end
  end

  @doc """
  Checks if the `ProcessHub` with the given `t:hub_id/0` is alive.

  A hub is considered alive if the `ProcessHub.Initializer` supervisor process
  is running along with the required child processes for the hub to function.

  ## Example
      iex> ProcessHub.is_alive?(:not_existing)
      false
  """
  @spec is_alive?(hub_id()) :: boolean
  def is_alive?(hub_id) do
    pname = Name.initializer(hub_id)

    case Process.whereis(pname) do
      nil -> false
      _ -> true
    end
  end

  @doc """
  This function can be used to wait for the `ProcessHub` child start or stop
  functions to complete.

  The `await/1` function should be used with the `async_wait: true` option.

  Keep in mind that the `await/1` function will block the calling process until
  the response is received. If the response is not received within the timeout
  period, the function will return `{:error, term()}`.

  ## Example
      iex> ref = ProcessHub.start_child(:my_hub, child_spec, [async_wait: true])
      iex> ProcessHub.await(ref)
      {:ok, {:my_child, [{:mynode, #PID<0.123.0>}]}}
  """
  @spec await(function()) :: term()
  def await(init_func) when is_function(init_func) do
    init_func.()
  end

  def await(init_func), do: init_func

  @doc """
  Returns the child specification for the `ProcessHub.Initializer` supervisor.
  """
  @spec child_spec(t()) :: %{
          id: ProcessHub,
          start: {ProcessHub.Initializer, :start_link, [...]},
          type: :supervisor
        }
  def child_spec(opts) do
    %{
      id: opts.hub_id,
      start: {ProcessHub.Initializer, :start_link, [opts]},
      type: :supervisor
    }
  end

  @doc """
  Starts the `ProcessHub` with the given `t:hub_id/0` and settings.

  It is recommended to start the `ProcessHub` under a supervision tree.
  """
  @spec start_link(ProcessHub.t()) :: {:ok, pid()} | {:error, term()}
  defdelegate start_link(hub_settings), to: ProcessHub.Initializer, as: :start_link

  @doc """
  Stops the `ProcessHub` with the given `t:hub_id/0`.
  """
  @spec stop(atom) :: :ok | {:error, :not_alive}
  defdelegate stop(hub_id), to: ProcessHub.Initializer, as: :stop

  @doc """
  Returns information about processes that are registered with the given `t:child_id/0`.

  This function queries results from the local `ets` table and does not make any network calls.

  The return results contain the `t:child_spec/0` and a list of tuples where the first element is the node
  where the child is started, and the second element is the `pid()` of the started child.

  ## Example
      iex> {_child_spec, _node_pid_tuples} = ProcessHub.child_lookup(:my_hub, :my_child)
      {%{id: :my_child, start: {MyProcess, :start_link, []}}, [{:mynode, #PID<0.123.0>}]}
  """
  @spec child_lookup(hub_id(), child_id()) :: {child_spec(), [{node(), pid()}]} | nil
  defdelegate child_lookup(hub_id, child_id), to: ProcessRegistry, as: :lookup

  @doc """
  Returns all information in the registry.

  This function queries results from the local `ets` table and does not make any network calls.
  """
  @spec process_registry(hub_id()) :: ProcessHub.Service.ProcessRegistry.registry()
  defdelegate process_registry(hub_id), to: ProcessRegistry, as: :registry

  @doc """
  Returns a list of pids for the given child_id.

  ## Example
    iex> ProcessHub.get_pids(:my_hub, :my_child)
    [#PID<0.123.0>]
  """
  @spec get_pids(ProcessHub.hub_id(), ProcessHub.child_id()) :: [pid()]
  defdelegate get_pids(hub_id, child_id), to: ProcessRegistry, as: :get_pids

  @doc """
  Returns the first pid for the given child_id.

  Although the function can be handy to quickly get the pid of the child, it is
  not recommended to use with replication strategies as it will return the first pid only.

  ## Example
    iex> ProcessHub.get_pid(:my_hub, :my_child)
    #PID<0.123.0>
  """
  @spec get_pid(ProcessHub.hub_id(), ProcessHub.child_id()) :: pid() | nil
  defdelegate get_pid(hub_id, child_id), to: ProcessRegistry, as: :get_pid

  @doc """
  Returns a list of processes that are registered.

  The process list contains the `t:child_id/0` and depending on the scope
  option, it may contain the node and `pid()` of the child.

  This function queries results from the local `ets` table and does not make any network calls.

  Available options:
  - `:global` - returns a list of all child processes on all nodes in the cluster.
    The return result will be in the format of `[{child_id, [{:node, pid}]}]`.
  - `:local` - returns a list of child processes that belong to the local node.
    The return result will be in the format of `[{child_id, [pid]}]`
    but only the processes that belong to the local node will be returned.

  ## Example
      iex> ProcessHub.process_list(:my_hub, :global)
      [
        {:my_child1, [{:node1, #PID<0.123.0>}, {:node2, #PID<2.123.0>}]},
        {:my_child2, [{:node2, #PID<5.124.0>}]}
      ]
      iex> ProcessHub.process_list(:my_hub, :local)
      [{:my_child1, #PID<0.123.0>}]
  """
  @spec process_list(hub_id(), :global | :local) :: [
          {ProcessHub.child_id(), [{node(), pid()}] | pid()}
        ]
  defdelegate process_list(hub_id, scope), to: ProcessRegistry, as: :process_list

  @doc """
  Checks if the `ProcessHub` with the given `t:hub_id/0` is locked.

  A hub is considered locked if the `ProcessHub` local event queue has a priority level
  greater than or equal to 10. This is used to throttle the hub from processing
  any new events and conserve data integrity.

  ## Example
      iex> ProcessHub.is_locked?(:my_hub)
      false
  """
  @spec is_locked?(hub_id()) :: boolean()
  defdelegate is_locked?(hub_id), to: State

  @doc """
  Checks if the `ProcessHub` with the given `t:hub_id/0` is in a network-partitioned state.

  A hub is considered partitioned if the `ProcessHub.Strategy.PartitionTolerance` strategy
  has detected a network partition. When a network partition is detected, the hub will
  terminate the `ProcessHub.DistributedSupervisor` process along with its children.

  ## Example
      iex> ProcessHub.is_partitioned?(:my_hub)
      false
  """
  @spec is_partitioned?(hub_id()) :: boolean()
  defdelegate is_partitioned?(hub_id), to: State

  @doc """
  Returns a list of nodes where the `ProcessHub` with the given `t:hub_id/0` is running.

  Nodes where the `ProcessHub` is running with the same `t:hub_id/0` are considered
  to be part of the same cluster.

  ## Example
      iex> ProcessHub.nodes(:my_hub, [:include_local])
      [:remote_node]
  """
  @spec nodes(hub_id(), [:include_local] | nil) :: [node()]
  defdelegate nodes(hub_id, opts \\ []), to: Cluster

  defp default_init_opts(opts) do
    Keyword.put_new(opts, :timeout, @default_init_timeout)
    |> Keyword.put_new(:async_wait, false)
    |> Keyword.put_new(:check_mailbox, true)
    |> Keyword.put_new(:check_existing, true)
    |> Keyword.put_new(:return_first, false)
  end
end