defmodule ProcessHub.Strategy.Redundancy.Replication do
@moduledoc """
The replication strategy allows for multiple instances of a process to be started
across the cluster. The number of instances is determined by the `replication_factor`
option.
The replication strategy also comes with a `replication_model` option, which determines
which instances of the process are active and which are passive.
Imagine a scenario where you have a process that is responsible for handling streams of
data. You may want to have multiple instances of this process running across the cluster,
but only one of them should be active at any given time and insert data into the database.
The passive instances of the process should be ready to take over if the active instance
fails.
> Replication strategy selects the master/active node based on the `child_id` and
> `child_nodes` arguments. The `child_id` is converted to a charlist and summed up.
> The sum is then used to calculate the index of the master node in the `child_nodes` list.
> This ensures that the same master node is selected for the same `child_id` and `child_nodes`
> arguments.
This strategy also allows replication on all nodes in the cluster. This is done by setting
the `replication_factor` option to `:cluster_size`.
`ProcessHub` can notify the process when it is active or passive by sending a `:redundancy_signal`.
> #### `Using the :redundancy_signal option` {: .info}
> When using the `:redundancy_signal` option, make sure that the processes are handling
> the message `{:process_hub, :redundancy_signal, mode}`, where the `mode` variable is either
> `:active` or `:passive`.
Example `GenServer` process handling the `:redundancy_signal`:
def handle_info({:process_hub, :redundancy_signal, mode}, state) do
# Update the state with the new mode and do something differently.
{:noreply, Map.put(state, :replication_mode, mode)}
end
"""
alias ProcessHub.DistributedSupervisor
alias ProcessHub.Utility.Name
alias ProcessHub.Strategy.Redundancy.Base, as: RedundancyStrategy
alias ProcessHub.Constant.Hook
@typedoc """
Replication strategy options.
- `replication_factor` - The number of instances of a single process across the cluster.
This option can be positive integer or `cluster_size` atom to replicate process on all nodes.
Default value is `2`.
- `replication_model` - This option determines the mode in which the instances of processes are started.
Default value is `:active_active`.
- `:active_active` - All instances of a process across the cluster are equal.
- `:active_passive` - Only one instance of a process across the cluster is active; the rest are passive.
Remaining replicas are started as passive processes.
- `redundancy_signal` - This option determines when a process should be notified of its replication mode.
Default value is `:none`.
- `:none` - No notifications are sent.
- `:active_active` - Only active processes are notified.
- `:active_passive` - Only passive processes are notified.
- `:all` - All processes are notified.
"""
@type t :: %__MODULE__{
replication_factor: pos_integer() | :cluster_size,
replication_model: :active_active | :active_passive,
redundancy_signal: :none | :active | :passive | :all
}
defstruct replication_factor: 2, replication_model: :active_active, redundancy_signal: :none
defimpl RedundancyStrategy, for: ProcessHub.Strategy.Redundancy.Replication do
alias ProcessHub.Service.HookManager
alias ProcessHub.Strategy.Redundancy.Replication
@impl true
def init(strategy, hub_id) do
HookManager.register_handler(hub_id, Hook.post_children_start(), %HookManager{
id: :replication_post_start,
m: Replication,
f: :handle_post_start,
a: [strategy, hub_id, :_]
})
HookManager.register_handler(hub_id, Hook.pre_children_redistribution(), %HookManager{
id: :replication_post_update,
m: Replication,
f: :handle_post_update,
a: [strategy, hub_id, :_]
})
end
@impl true
@spec replication_factor(Replication.t()) ::
pos_integer() | :cluster_size
def replication_factor(strategy) do
case strategy.replication_factor do
:cluster_size ->
(Node.list() |> Enum.count()) + 1
_ ->
strategy.replication_factor
end
end
@impl true
@spec master_node(struct(), ProcessHub.hub_id(), ProcessHub.child_id(), [node()]) :: node()
def master_node(_strategy, _hub_id, child_id, child_nodes) do
child_nodes = Enum.sort(child_nodes)
child_total =
cond do
is_binary(child_id) -> child_id
is_atom(child_id) -> Atom.to_string(child_id)
end
|> to_charlist()
|> Enum.sum()
nodes_total = length(child_nodes)
index = rem(child_total, nodes_total)
Enum.at(child_nodes, index)
end
end
@spec handle_post_start(struct(), ProcessHub.hub_id(), [
{ProcessHub.child_id(), pid(), [node()]}
]) :: :ok
def handle_post_start(%__MODULE__{redundancy_signal: :none}, _, _), do: :ok
def handle_post_start(strategy, hub_id, post_start_data) do
Enum.each(post_start_data, fn {child_id, child_pid, child_nodes} ->
mode = process_mode(strategy, hub_id, child_id, child_nodes)
cond do
strategy.redundancy_signal === :all ->
send_redundancy_signal(child_pid, mode)
mode === strategy.redundancy_signal ->
send_redundancy_signal(child_pid, mode)
true ->
:ok
end
end)
:ok
end
@spec handle_post_update(
struct(),
ProcessHub.hub_id(),
{[{ProcessHub.child_id(), [node()], keyword()}], {:up | :down, node()}}
) :: :ok
def handle_post_update(%__MODULE__{redundancy_signal: :none}, _, _), do: :ok
def handle_post_update(
%__MODULE__{replication_model: :active_passive} = strategy,
hub_id,
{processes_data, {node_action, node}}
) do
Enum.each(processes_data, fn {child_id, child_nodes, opts} ->
handle_redundancy_signal(
strategy,
hub_id,
child_id,
child_nodes,
{node_action, node},
opts
)
end)
end
def handle_post_update(_, _, _), do: :ok
defp node_modes(strategy, hub_id, node_action, child_id, nodes, node) do
curr_master = RedundancyStrategy.master_node(strategy, hub_id, child_id, nodes)
prev_master =
case node_action do
:up ->
RedundancyStrategy.master_node(strategy, hub_id, child_id, nodes)
:down ->
[node | nodes]
end
{prev_master, curr_master}
end
defp handle_redundancy_signal(strategy, hub_id, child_id, nodes, {node_action, node}, opts) do
local_node = node()
{prev_master, curr_master} =
node_modes(strategy, hub_id, node_action, child_id, nodes, node)
cond do
prev_master === curr_master ->
# Do nothing because the same node still holds the active process.
:ok
Enum.member?([:all, :active], strategy.redundancy_signal) ->
if curr_master === local_node and prev_master !== curr_master do
# Current node is the new active node.
child_pid(hub_id, child_id, opts) |> send_redundancy_signal(:active)
end
Enum.member?([:all, :passive], strategy.redundancy_signal) ->
if curr_master !== local_node and prev_master === local_node do
# Current node is the new passive node.
child_pid(hub_id, child_id, opts) |> send_redundancy_signal(:passive)
end
true ->
:ok
end
end
defp process_mode(%__MODULE__{replication_model: rp} = strat, hub_id, child_id, child_nodes) do
master_node = RedundancyStrategy.master_node(strat, hub_id, child_id, child_nodes)
cond do
rp === :active_active ->
:active
master_node === node() ->
:active
true ->
:passive
end
end
defp child_pid(hub_id, child_id, opts) do
case Keyword.get(opts, :pid) do
nil ->
Name.distributed_supervisor(hub_id) |> DistributedSupervisor.local_pid(child_id)
pid ->
pid
end
end
defp send_redundancy_signal(pid, mode) when is_pid(pid) do
send(pid, {:process_hub, :redundancy_signal, mode})
end
defp send_redundancy_signal(_pid, _mode), do: nil
end