defmodule ProcessHub.Service.Distributor do
@moduledoc """
The distributor service provides API functions for distributing child processes.
"""
alias ProcessHub.Service.LocalStorage
alias ProcessHub.Service.ProcessRegistry
alias ProcessHub.Utility.Name
alias ProcessHub.Service.Dispatcher
alias ProcessHub.Service.Mailbox
alias ProcessHub.Service.Cluster
alias ProcessHub.DistributedSupervisor
alias ProcessHub.Strategy.Synchronization.Base, as: SynchronizationStrategy
alias ProcessHub.Strategy.Redundancy.Base, as: RedundancyStrategy
alias ProcessHub.Strategy.Distribution.Base, as: DistributionStrategy
@doc "Initiates process redistribution."
@spec children_redist_init(
ProcessHub.hub_id(),
[ProcessHub.child_spec()],
node(),
keyword() | nil
) ::
{:ok, :redistribution_initiated} | {:ok, :no_children_to_redistribute}
def children_redist_init(hub_id, child_specs, node, opts \\ []) do
# Migration expects the `:migration_add` true flag otherwise the
# remote node wont release the lock.
opts = Keyword.put(opts, :migration_add, true)
redist_children =
Enum.map(child_specs, fn child_spec ->
init_data([node], hub_id, child_spec)
|> Map.merge(Map.new(opts))
end)
case length(redist_children) > 0 do
true ->
Dispatcher.children_migrate(hub_id, [{node, redist_children}], opts)
{:ok, :redistribution_initiated}
false ->
{:ok, :no_children_to_redistribute}
end
end
@doc "Initiates processes startup."
@spec init_children(ProcessHub.hub_id(), [ProcessHub.child_spec()], keyword()) ::
{:ok, :start_initiated}
| (-> {:ok, list})
| {:error,
:child_start_timeout
| :no_children
| {:already_started, [ProcessHub.child_id()]}
| any()}
def init_children(_hub_id, [], _opts), do: {:error, :no_children}
def init_children(hub_id, child_specs, opts) do
with {:ok, strategies} <- init_strategies(hub_id),
:ok <- init_distribution(hub_id, child_specs, opts, strategies),
:ok <- init_registry_check(hub_id, child_specs, opts),
{:ok, children_nodes} <- init_attach_nodes(hub_id, child_specs, strategies),
:ok <- init_mailbox_cleanup(children_nodes, opts),
{:ok, composed_data} <- init_compose_data(hub_id, children_nodes, opts) do
pre_start_children(composed_data, hub_id, opts)
else
err -> err
end
end
@doc "Initiates processes shutdown."
@spec stop_children(ProcessHub.hub_id(), [ProcessHub.child_id()], keyword()) ::
(-> {:error, list} | {:ok, list}) | {:ok, :stop_initiated}
def stop_children(hub_id, child_ids, opts) do
redun_strat = LocalStorage.get(hub_id, :redundancy_strategy)
dist_strat = LocalStorage.get(hub_id, :distribution_strategy)
repl_fact = RedundancyStrategy.replication_factor(redun_strat)
Enum.reduce(child_ids, [], fn child_id, acc ->
child_nodes = DistributionStrategy.belongs_to(dist_strat, hub_id, child_id, repl_fact)
child_data = %{nodes: child_nodes, child_id: child_id}
child_data =
case Keyword.get(opts, :async_wait, false) do
true -> Map.put(child_data, :reply_to, [self()])
false -> child_data
end
append_items =
Enum.map(child_nodes, fn child_node ->
existing_children = acc[child_node] || []
{child_node, [child_data | existing_children]}
end)
Keyword.merge(acc, append_items)
end)
|> pre_stop_children(hub_id, opts)
end
@doc """
Terminates child process locally and propagates all nodes in the cluster
to remove the child process from their registry.
"""
@spec child_terminate(
ProcessHub.hub_id(),
ProcessHub.child_id(),
ProcessHub.Strategy.Synchronization.Base
) :: :ok | {:error, :not_found | :restarting | :running}
def child_terminate(hub_id, child_id, sync_strategy) do
supervisor_resp =
Name.distributed_supervisor(hub_id)
|> DistributedSupervisor.terminate_child(child_id)
SynchronizationStrategy.propagate(
sync_strategy,
hub_id,
[{child_id, {supervisor_resp, []}}],
node(),
:rem,
[]
)
supervisor_resp
end
# TODO: we can use this function in the future for performance.
# @spec children_terminate(
# ProcessHub.hub_id(),
# [ProcessHub.child_id()],
# ProcessHub.Strategy.Synchronization.Base
# ) :: :ok
# def children_terminate(hub_id, child_ids, sync_strategy) do
# dist_sup = Name.distributed_supervisor(hub_id)
# propagation_data = Enum.map(child_ids, fn child_id ->
# supervisor_resp = DistributedSupervisor.terminate_child(dist_sup, child_id)
# {child_id, {supervisor_resp, []}}
# end)
# SynchronizationStrategy.propagate(
# sync_strategy,
# hub_id,
# propagation_data,
# node(),
# :rem,
# []
# )
# :ok
# end
@doc """
Returns all child processes started by the local node.
Works similar to `Supervisor.which_children/1` but returns a list of tuple
where the first element is the node name and the second child processes started under the node.
"""
@spec which_children_local(ProcessHub.hub_id(), keyword() | nil) ::
{node(),
[{any, :restarting | :undefined | pid, :supervisor | :worker, :dynamic | list}]}
def which_children_local(hub_id, _opts) do
{node(), Supervisor.which_children(Name.distributed_supervisor(hub_id))}
end
@doc """
Return a list of all child processes started by all nodes in the cluster.
"""
@spec which_children_global(ProcessHub.hub_id(), keyword()) :: list
def which_children_global(hub_id, opts) do
timeout = Keyword.get(opts, :timeout, 5000)
Cluster.nodes(hub_id, [:include_local])
|> :erpc.multicall(fn -> __MODULE__.which_children_local(hub_id, opts) end, timeout)
|> Enum.map(fn {_status, result} -> result end)
end
defp pre_start_children(startup_children, hub_id, opts) do
Dispatcher.children_start(hub_id, startup_children, opts)
case Keyword.get(opts, :async_wait, false) do
false ->
{:ok, :start_initiated}
true ->
fn ->
receiveable(startup_children)
|> Mailbox.receive_start_resp(opts)
end
end
end
defp init_distribution(hub_id, child_specs, opts, %{distribution: strategy}) do
DistributionStrategy.children_init(strategy, hub_id, child_specs, opts)
end
defp init_strategies(hub_id) do
{:ok,
%{
distribution: LocalStorage.get(hub_id, :distribution_strategy),
redundancy: LocalStorage.get(hub_id, :redundancy_strategy)
}}
end
defp pre_stop_children(stop_children, hub_id, opts) do
Dispatcher.children_stop(hub_id, stop_children)
case Keyword.get(opts, :async_wait, false) do
false ->
{:ok, :stop_initiated}
true ->
fn ->
receiveable(stop_children)
|> Mailbox.receive_stop_resp(opts)
end
end
end
defp receiveable(nodes_children) do
Enum.reduce(nodes_children, [], fn {node, children}, acc ->
node_receivable =
Enum.map(children, fn child ->
child.child_id
end)
Keyword.put(acc, node, node_receivable)
end)
end
defp init_data(child_nodes, hub_id, child_spec) do
%{
hub_id: hub_id,
nodes: child_nodes,
child_id: child_spec.id,
child_spec: child_spec
}
end
defp init_compose_data(hub_id, children, opts) do
async_await = Keyword.get(opts, :async_wait, false)
{:ok,
Enum.reduce(children, [], fn {child_spec, child_nodes}, acc ->
child_data = init_data(child_nodes, hub_id, child_spec)
child_data =
case async_await do
true -> Map.put(child_data, :reply_to, [self()])
false -> child_data
end
append_items =
Enum.map(child_nodes, fn child_node ->
existing_children = acc[child_node] || []
{child_node, [child_data | existing_children]}
end)
Keyword.merge(acc, append_items)
end)}
end
defp init_attach_nodes(hub_id, child_specs, %{distribution: dist, redundancy: redun}) do
repl_fact = RedundancyStrategy.replication_factor(redun)
{:ok,
Enum.map(child_specs, fn child_spec ->
{child_spec, DistributionStrategy.belongs_to(dist, hub_id, child_spec.id, repl_fact)}
end)}
end
defp init_mailbox_cleanup(children, opts) do
case Keyword.get(opts, :check_mailbox) do
true ->
Enum.each(children, fn {%{id: child_id}, nodes} ->
Enum.each(nodes, fn node ->
receive do
{:child_start_resp, ^child_id, _, ^node} -> nil
after
0 -> nil
end
end)
end)
false ->
nil
end
:ok
end
defp init_registry_check(hub_id, child_specs, opts) do
case Keyword.get(opts, :check_existing) do
true ->
contains = ProcessRegistry.contains_children(hub_id, Enum.map(child_specs, & &1.id))
case contains do
[] -> :ok
_ -> {:error, {:already_started, contains}}
end
false ->
:ok
end
end
end