lib/process_hub/strategy/migration/hot_swap.ex

defmodule ProcessHub.Strategy.Migration.HotSwap do
  @moduledoc """
  The hot swap migration strategy implements the `ProcessHub.Strategy.Migration.Base` protocol.
  It provides a migration strategy where the local process is terminated after the new one is
  started on the remote node.

  In the following text, we will refer to the process that is being terminated after the migration
  as the peer process.

  Hotswap migration can also handle process state migration when nodes are leaving the cluster.
  The process states are stored in the local storage and are sent to the remote node before the
  local process is terminated. This will only work if the node is being shut down **gracefully**.

  The hot swap strategy is useful when we want to ensure that there is no downtime when migrating
  the child process to the remote node. It also provides a way to ensure that the state of the
  process is synchronized before terminating the peer process.

  To pass the process state to the newly started process on the remote node, the
  `:handover` option must be set to `true` in the migration strategy configuration, and
  necessary messages must be handled in the processes.

  > #### Using the handover option {: .warning}
  >
  > When the `:handover` option is set to `true`, the peer process must handle the following message:
  > `{:process_hub, :handover_start, startup_resp, from}`.

  Example migration with handover using `GenServer`:

  ```elixir
  def handle_info({:process_hub, :handover_start, startup_resp, child_id, from}, state) do
    case startup_resp do
      {:ok, pid} ->
        # Send the state to the remote process.
        Process.send(pid, {:process_hub, :handover, state}, [])

        # Signal the handler process that the state handover has been handled.
        Process.send(from, {:process_hub, :retention_handled, child_id}, [])

      _ ->
        nil
    end

    {:noreply, state}
  end

  def handle_info({:process_hub, :handover, handover_state}, _state) do
    {:noreply, handover_state}
  end
  ```

  > #### Use HotSwap macro to provide the handover callbacks automatically.  {: .info}
  > It's convenient to use the `HotSwap` macro to provide the handover callbacks automatically.
  >
  > ```elixir
  > use ProcessHub.Strategy.Migration.HotSwap
  > ```
  """

  require Logger

  alias ProcessHub.Strategy.Migration.Base, as: MigrationStrategy
  alias ProcessHub.DistributedSupervisor
  alias ProcessHub.Constant.Hook
  alias ProcessHub.Service.HookManager
  alias ProcessHub.Service.Distributor
  alias ProcessHub.Service.Mailbox
  alias ProcessHub.Utility.Name

  @typedoc """
  Hot-swap migration strategy configuration.
  Available options:
  - `:retention` - An integer value in milliseconds is used to specify how long
    the peer process should be kept alive after a new child process has been started on the remote node.
    This option is used to ensure that the peer process has had enough time to perform any cleanup
    or state synchronization before the local process is terminated.
    Keep in mind that process will be terminated before the retention time has passed if the peer process
    has notified the handler process that the state transfer has been handled.
    The default value is `5000`.

  - `:handover` - A boolean value. If set to `true`, the processes involved in the migration
    must handle the following message: `{:process_hub, :handover_start, startup_resp, from}`.
    This message will be sent to the peer process that will be terminated after the migration.
    The variable `startup_resp` will contain the response from the `ProcessHub.DistributedSupervisor.start_child/2` function, and if
    successful, it will be `{:ok, pid}`. The PID can be used to send the state of the process to the
    remote process.
    If the `retention` option is used, then the peer process has to signal the handler process
    that the state has been handled; otherwise, the handler will wait until the default retention time has passed.
    The handler PID is passed in the `from` variable.
    The default value is `false`.

  - `:handover_data_wait` - An integer value in milliseconds is used to specify how long the handler process
    should wait for the state of the remote process to be sent to the local node.
    The default value is `3000`.

  - `:child_migration_timeout` - An integer value in milliseconds is used to specify the timeout for single
  child process migration. If the child process migration does not complete within this time, the migration
  for single child process will be considered failed but the migration for other child processes will continue.
    The default value is `10000`.
  """
  @type t() :: %__MODULE__{
          retention: pos_integer(),
          handover: boolean(),
          handover_data_wait: pos_integer(),
          child_migration_timeout: pos_integer()
        }
  defstruct retention: 5000,
            handover: false,
            handover_data_wait: 3000,
            child_migration_timeout: 10000

  defimpl MigrationStrategy, for: ProcessHub.Strategy.Migration.HotSwap do
    alias ProcessHub.Constant.StorageKey
    alias ProcessHub.Service.Cluster
    alias ProcessHub.Service.Storage
    alias ProcessHub.Service.ProcessRegistry
    alias ProcessHub.Strategy.Migration.HotSwap
    alias ProcessHub.Strategy.Redundancy.Base, as: RedundancyStrategy
    alias ProcessHub.Strategy.Distribution.Base, as: DistributionStrategy

    @impl true
    def init(_struct, _hub_id), do: nil

    @impl true
    def handle_migration(struct, hub_id, child_specs, added_node, sync_strategy) do
      # Start redistribution of the child processes.
      Distributor.children_redist_init(hub_id, child_specs, added_node, reply_to: [self()])

      if length(child_specs) > 0 do
        migration_cids = migration_cids(hub_id, struct, child_specs, added_node)

        handle_retentions(hub_id, struct, sync_strategy, migration_cids)

        if length(migration_cids) > 0 do
          # Dispatch hook.
          HookManager.dispatch_hook(hub_id, Hook.children_migrated(), {added_node, child_specs})
        end
      end

      :ok
    end

    @impl true
    def handle_shutdown(%HotSwap{handover: true, handover_data_wait: hodw} = _struct, hub_id) do
      ProcessRegistry.local_data(hub_id)
      |> get_state_msgs()
      |> get_send_data(hodw)
      |> format_send_data(hub_id)
      |> send_data(hub_id)

      :ok
    end

    def handle_shutdown(_struct, _hub_id), do: :ok

    @impl true
    def handle_process_startups(%HotSwap{handover: true} = _struct, hub_id, pids) do
      state_data = Storage.get(hub_id, StorageKey.msk()) || []

      Enum.each(pids, fn {cid, pid} ->
        pstate = Keyword.get(state_data, cid, nil)

        unless pstate === nil do
          send(pid, {:process_hub, :handover, pstate})
        end
      end)

      rem_states(hub_id, Keyword.keys(state_data))

      :ok
    end

    def handle_process_startups(_struct, _hub_id, _pids), do: :ok

    defp send_data(send_data, hub_id) do
      # Send the data to each node now.
      Enum.each(send_data, fn {node, data} ->
        cluster_nodes = Cluster.nodes(hub_id)

        if Enum.member?(cluster_nodes, node) do
          # Need to be sure that this is sent and handled on the remote nodes
          # before they start the new children.
          :erpc.call(node, fn ->
            Storage.update(hub_id, StorageKey.msk(), fn old_value ->
              case old_value do
                nil -> data
                _ -> data ++ old_value
              end
            end)
          end)
        end
      end)
    end

    defp format_send_data({local_data, states}, hub_id) do
      dist_strat = Storage.get(hub_id, StorageKey.strdist())

      repl_fact =
        Storage.get(hub_id, StorageKey.strred())
        |> RedundancyStrategy.replication_factor()

      Enum.reduce(local_data, %{}, fn {cid, {_, cn}}, acc ->
        nodes = Keyword.keys(cn)
        new_nodes = DistributionStrategy.belongs_to(dist_strat, hub_id, cid, repl_fact)
        migration_node = Enum.find(new_nodes, fn node -> not Enum.member?(nodes, node) end)
        node_data = Map.get(acc, migration_node, [])

        Map.put(acc, migration_node, [{cid, Keyword.get(states, cid)} | node_data])
      end)
    end

    defp get_state_msgs(local_data) do
      local_node = node()
      self = self()

      Enum.each(local_data, fn {child_id, {_cs, cn}} ->
        local_pid = Keyword.get(cn, local_node)

        if is_pid(local_pid) do
          send(local_pid, {:process_hub, :get_state, child_id, self})
        end
      end)

      local_data
    end

    defp get_send_data(local_data, handover_data_wait) do
      local_node = node()

      send_data =
        Enum.map(local_data, fn _x ->
          receive do
            {:process_hub, :process_state, cid, state} ->
              {cid, state}
          after
            handover_data_wait ->
              Logger.error("Handover timeout while shutting down the node #{local_node}")
              nil
          end
        end)
        |> Enum.filter(&(&1 != nil))

      {local_data, send_data}
    end

    defp rem_states(hub_id, cids) do
      Storage.update(hub_id, StorageKey.msk(), fn states ->
        Enum.reject(states, fn {cid, _} -> Enum.member?(cids, cid) end)
      end)
    end

    defp handle_retentions(hub_id, strategy, sync_strategy, migration_cids) do
      Enum.reduce(migration_cids, :continue, fn
        child_id, :continue ->
          # Wait for response from the peer process and then terminate the child from local node.
          retention_signal = handle_retention(strategy, child_id)
          Distributor.child_terminate(hub_id, child_id, sync_strategy)
          retention_signal

        child_id, :kill ->
          # Terminate the local child immediately.
          Distributor.child_terminate(hub_id, child_id, sync_strategy)
          :kill
      end)

      # TODO: we could kill them in bulk for better performance.
      # Distributor.children_terminate(hub_id, migration_cids, sync_strategy)
    end

    defp migration_cids(hub_id, %HotSwap{} = strategy, child_specs, added_node) do
      dist_sup = Name.distributed_supervisor(hub_id)

      local_pids = DistributedSupervisor.local_children(dist_sup)

      Enum.map(1..length(child_specs), fn _x ->
        start_resp =
          Mailbox.receive_response(
            :child_start_resp,
            receive_handler(),
            strategy.child_migration_timeout
          )

        case start_resp do
          {:error, _reason} ->
            Logger.error("Migration failed to start on node #{inspect(added_node)}")
            nil

          {child_id, result} ->
            # Notify the peer process about the migration.
            case start_handover(strategy, child_id, local_pids, result) do
              nil -> nil
              _ -> child_id
            end
        end
      end)
      |> Enum.filter(&(&1 != nil))
      |> retention_switch(strategy)
    end

    defp retention_switch(child_ids, strategy) do
      Process.send_after(self(), {:process_hub, :retention_over}, strategy.retention)

      child_ids
    end

    defp receive_handler() do
      fn _child_id, resp, _node ->
        resp
      end
    end

    defp start_handover(strategy, child_id, local_pids, result) do
      case strategy.handover do
        true ->
          pid = Map.get(local_pids, child_id)

          if is_pid(pid) do
            send(pid, {:process_hub, :handover_start, result, child_id, self()})
          end

        false ->
          nil
      end
    end

    defp handle_retention(strategy, child_id) do
      receive do
        {:process_hub, :retention_over} ->
          :kill

        {:process_hub, :retention_handled, ^child_id} ->
          :continue
      after
        strategy.retention ->
          :kill
      end
    end
  end

  defmacro __using__(_) do
    quote do
      require Logger

      def handle_info({:process_hub, :handover_start, startup_resp, cid, from}, state) do
        case startup_resp do
          {:error, {:already_started, pid}} ->
            send_handover_start(pid, cid, from, state)

          {:ok, pid} ->
            send_handover_start(pid, cid, from, state)

          error ->
            Logger.error("Handover failed: #{inspect(error)}")
        end

        {:noreply, state}
      end

      def handle_info({:process_hub, :handover, handover_state}, _state) do
        {:noreply, handover_state}
      end

      def handle_info({:process_hub, :get_state, cid, from}, state) do
        send(from, {:process_hub, :process_state, cid, state})

        {:noreply, state}
      end

      defp send_handover_start(pid, cid, from, state) do
        Process.send(pid, {:process_hub, :handover, state}, [])
        Process.send(from, {:process_hub, :retention_handled, cid}, [])
      end
    end
  end
end