lib/process_hub/handler/cluster_update.ex

defmodule ProcessHub.Handler.ClusterUpdate do
  @moduledoc false

  alias ProcessHub.Constant.Hook
  alias ProcessHub.Constant.Event
  alias ProcessHub.Constant.StorageKey
  alias ProcessHub.Service.HookManager
  alias ProcessHub.Service.Distributor
  alias ProcessHub.Service.Dispatcher
  alias ProcessHub.Service.Synchronizer
  alias ProcessHub.Service.ProcessRegistry
  alias ProcessHub.Service.State
  alias ProcessHub.Service.Storage
  alias ProcessHub.Strategy.Distribution.Base, as: DistributionStrategy
  alias ProcessHub.Strategy.Redundancy.Base, as: RedundancyStrategy
  alias ProcessHub.Strategy.Migration.Base, as: MigrationStrategy
  alias ProcessHub.Strategy.PartitionTolerance.Base, as: PartitionToleranceStrategy
  alias ProcessHub.Strategy.Synchronization.Base, as: SynchronizationStrategy

  defmodule NodeUp do
    @moduledoc """
    Handler for the node up event.
    """
    alias ProcessHub.Constant.PriorityLevel
    use Event

    @type t() :: %__MODULE__{
            hub_id: ProcessHub.hub_id(),
            redun_strat: RedundancyStrategy.t(),
            sync_strat: SynchronizationStrategy.t(),
            migr_strat: MigrationStrategy.t(),
            partition_strat: PartitionToleranceStrategy.t(),
            dist_strat: DistributionStrategy.t(),
            node: node(),
            repl_fact: pos_integer(),
            local_children: list(),
            keep: list(),
            migrate: list(),
            async_tasks: list()
          }

    @enforce_keys [
      :hub_id,
      :node
    ]
    defstruct @enforce_keys ++
                [
                  :repl_fact,
                  :local_children,
                  :keep,
                  :migrate,
                  :redun_strat,
                  :migr_strat,
                  :sync_strat,
                  :partition_strat,
                  :dist_strat,
                  async_tasks: []
                ]

    @spec handle(t()) :: :ok
    def handle(%__MODULE__{} = arg) do
      arg = add_strategies(arg)

      # Dispatch the nodes pre redistribution event.
      HookManager.dispatch_hook(arg.hub_id, Hook.pre_nodes_redistribution(), {:nodeup, arg.node})

      # Handle the redistribution of processes.
      distribute_processes(arg)

      # Propagate the local children to the new node.
      propagate_local_children(arg.hub_id, arg.node)

      # Dispatch the nodes post redistribution event.
      HookManager.dispatch_hook(arg.hub_id, Hook.post_nodes_redistribution(), {:nodeup, arg.node})

      # Unlock the event handler.
      State.unlock_event_handler(arg.hub_id)

      :ok
    end

    defp distribute_processes(arg) do
      arg
      |> Map.put(:repl_fact, RedundancyStrategy.replication_factor(arg.redun_strat))
      |> local_children()
      |> dist_children()
      |> handle_migrate()
      |> handle_keep()
      |> wait_for_tasks()
    end

    defp add_strategies(arg) do
      %__MODULE__{
        arg
        | sync_strat: Storage.get(arg.hub_id, StorageKey.strsyn()),
          redun_strat: Storage.get(arg.hub_id, StorageKey.strred()),
          dist_strat: Storage.get(arg.hub_id, StorageKey.strdist()),
          migr_strat: Storage.get(arg.hub_id, StorageKey.strmigr()),
          partition_strat: Storage.get(arg.hub_id, StorageKey.strpart())
      }
    end

    defp propagate_local_children(hub_id, node) do
      local_processes = Synchronizer.local_sync_data(hub_id)
      local_node = node()

      Dispatcher.propagate_event(
        hub_id,
        @event_sync_remote_children,
        {local_processes, local_node},
        %{members: [node], priority: PriorityLevel.locked()}
      )
    end

    defp wait_for_tasks(%__MODULE__{async_tasks: async_tasks, migr_strat: migr_strat} = _arg) do
      Task.await_many(async_tasks, migration_timeout(migr_strat))

      #  %__MODULE__{arg | async_tasks: []}

      :ok
    end

    defp handle_migrate(
           %__MODULE__{
             hub_id: hub_id,
             migrate: data,
             async_tasks: async_tasks,
             migr_strat: migr_strat,
             sync_strat: sync_strat,
             node: node
           } = arg
         ) do
      task =
        Task.async(fn ->
          child_specs =
            Enum.map(data, fn %{child_spec: child_spec} ->
              child_spec
            end)

          MigrationStrategy.handle_migration(migr_strat, hub_id, child_specs, node, sync_strat)
        end)

      %__MODULE__{arg | async_tasks: [task | async_tasks]}
    end

    defp handle_keep(
           %__MODULE__{
             hub_id: hub_id,
             keep: keep,
             async_tasks: async_tasks,
             node: node
           } = arg
         ) do
      task =
        Task.async(fn ->
          handle_redundancy(hub_id, node, keep)

          Distributor.children_redist_init(
            hub_id,
            node,
            Enum.map(keep, fn %{child_spec: cs} -> cs end)
          )
        end)

      %__MODULE__{arg | async_tasks: [task | async_tasks]}
    end

    defp handle_redundancy(hub_id, node, children) do
      children_pids = ProcessRegistry.local_data(hub_id)

      redun_data =
        Enum.map(children, fn %{child_spec: cs, child_nodes: cn} ->
          {cs.id, cn, {:pid, children_pids[cs.id]}}
        end)

      HookManager.dispatch_hook(
        hub_id,
        Hook.pre_children_redistribution(),
        {redun_data, {:up, node}}
      )
    end

    defp local_children(%__MODULE__{hub_id: hub_id} = arg) do
      local_node = node()

      local_children =
        ProcessRegistry.registry(hub_id)
        |> Enum.filter(fn {_child_id, {_child_spec, child_nodes}} ->
          Enum.member?(Keyword.keys(child_nodes), local_node)
        end)

      %__MODULE__{arg | local_children: local_children}
    end

    defp dist_children(
           %__MODULE__{
             local_children: lc,
             dist_strat: dist_strat,
             hub_id: hub_id,
             repl_fact: rp,
             node: node
           } = arg
         ) do
      local_node = node()

      {keep, migrate} =
        Enum.reduce(lc, {[], []}, fn {child_id, {child_spec, _}}, {keep, migrate} = acc ->
          child_nodes = DistributionStrategy.belongs_to(dist_strat, hub_id, child_id, rp)

          case Enum.member?(child_nodes, node) do
            true ->
              case Enum.member?(child_nodes, local_node) do
                true ->
                  {[%{child_spec: child_spec, child_nodes: child_nodes} | keep], migrate}

                false ->
                  {keep, [%{child_spec: child_spec, child_nodes: child_nodes} | migrate]}
              end

            false ->
              acc
          end
        end)

      %__MODULE__{arg | keep: keep, migrate: migrate}
    end

    defp migration_timeout(migr_strategy) do
      default_timeout = 15000

      if Map.has_key?(migr_strategy, :retention) do
        case Map.get(migr_strategy, :retention, :none) do
          :none -> default_timeout
          timeout -> timeout + default_timeout
        end
      else
        default_timeout
      end
    end
  end

  defmodule NodeDown do
    @moduledoc """
    Handler for the node down event.
    """

    @type t() :: %__MODULE__{
            hub_id: ProcessHub.hub_id(),
            removed_node: node(),
            partition_strat: PartitionToleranceStrategy.t(),
            redun_strat: RedundancyStrategy.t(),
            dist_strat: DistributionStrategy.t(),
            hub_nodes: [node()],
            rem_node_cids: [ProcessHub.child_id()]
          }

    @enforce_keys [
      :hub_id,
      :removed_node,
      :hub_nodes
    ]
    defstruct @enforce_keys ++ [:partition_strat, :redun_strat, :dist_strat, :rem_node_cids]

    @spec handle(t()) :: any()
    def handle(%__MODULE__{} = arg) do
      %__MODULE__{
        arg
        | partition_strat: Storage.get(arg.hub_id, StorageKey.strpart()),
          redun_strat: Storage.get(arg.hub_id, StorageKey.strred()),
          dist_strat: Storage.get(arg.hub_id, StorageKey.strdist())
      }
      |> dispatch_down_hook()
      |> distribute_processes()
      |> clear_registry()
      |> handle_locking()
      |> dispatch_post_hooks()
    end

    defp dispatch_post_hooks(arg) do
      HookManager.dispatch_hook(
        arg.hub_id,
        Hook.post_nodes_redistribution(),
        {:nodedown, arg.removed_node}
      )

      HookManager.dispatch_hook(arg.hub_id, Hook.post_cluster_leave(), arg)
    end

    defp handle_locking(arg) do
      State.unlock_event_handler(arg.hub_id)

      lock_status =
        PartitionToleranceStrategy.toggle_lock?(
          arg.partition_strat,
          arg.hub_id,
          arg.removed_node
        )

      if lock_status do
        State.toggle_quorum_failure(arg.hub_id)
      end

      arg
    end

    defp dispatch_down_hook(arg) do
      HookManager.dispatch_hook(
        arg.hub_id,
        Hook.pre_nodes_redistribution(),
        {:nodedown, arg.removed_node}
      )

      arg
    end

    # Removes all processes from the registry that were running on the removed node.
    defp clear_registry(arg) do
      children_nodes =
        Enum.map(arg.rem_node_cids, fn child_id ->
          {child_id, [arg.removed_node]}
        end)
        |> Map.new()

      ProcessRegistry.bulk_delete(arg.hub_id, children_nodes)

      arg
    end

    defp distribute_processes(%__MODULE__{} = arg) do
      local_node = node()

      {redun, redist, cids} =
        removed_node_processes(arg)
        |> Enum.reduce({[], [], []}, fn {cid, cspec, nlist1, nlist2}, {redun, redist, cids} ->
          case Enum.member?(nlist1, local_node) do
            true ->
              {[{cid, nlist2, []} | redun], redist, [cid | cids]}

            false ->
              case Enum.member?(nlist2, local_node) do
                true ->
                  {redun, [cspec | redist], [cid | cids]}

                false ->
                  {redun, redist, [cid | cids]}
              end
          end
        end)

      handle_redundancy(arg, redun)
      handle_redistribution(arg.hub_id, redist)

      Map.put(arg, :rem_node_cids, cids)
    end

    defp handle_redistribution(hub_id, child_specs) do
      # Local node is part of the new nodes and therefore we need to start
      # the child process locally.
      Distributor.children_redist_init(hub_id, node(), child_specs)
    end

    defp handle_redundancy(arg, children) do
      HookManager.dispatch_hook(
        arg.hub_id,
        Hook.pre_children_redistribution(),
        {children, {:down, arg.removed_node}}
      )
    end

    defp removed_node_processes(arg) do
      repl_fact = RedundancyStrategy.replication_factor(arg.redun_strat)

      ProcessRegistry.registry(arg.hub_id)
      |> Enum.reduce([], fn {child_id, {child_spec, node_pids}}, acc ->
        nodes_orig = Keyword.keys(node_pids)

        if Enum.member?(nodes_orig, arg.removed_node) do
          nodes_updated =
            DistributionStrategy.belongs_to(
              arg.dist_strat,
              arg.hub_id,
              child_id,
              repl_fact
            )

          [{child_id, child_spec, nodes_orig, nodes_updated} | acc]
        else
          acc
        end
      end)
    end
  end
end