lib/spawn/cluster/state_handoff/manager.ex

defmodule Spawn.Cluster.StateHandoff.Manager do
  @moduledoc """
  This handles state handoff in a cluster.

  This module monitors node up and down events as well as node terminate events and triggers `Spawn.Cluster.StateHandoff.ControllerBehaviour` implementations to handle these events.
  """
  use GenServer
  require Logger

  def child_spec(id, opts \\ []) do
    %{
      id: id,
      start: {__MODULE__, :start_link, [opts]},
      restart: :permanent
    }
  end

  defmodule State do
    defstruct data: nil, controller: nil, timer: nil
  end

  @impl true
  def init(config) do
    controller =
      Application.get_env(
        :spawn,
        :state_handoff_controller_adapter,
        Spawn.Cluster.StateHandoff.Controllers.PersistentController
      )

    do_init(config)

    case controller.handle_init(config) do
      {initial_state, {evt, delay} = _scheduler} ->
        timer = Process.send_after(self(), {:timer, evt}, delay)

        {:ok, %State{controller: controller, data: initial_state, timer: timer},
         {:continue, :after_init}}

      initial_state ->
        {:ok, %State{controller: controller, data: initial_state}, {:continue, :after_init}}
    end
  end

  @impl true
  def handle_continue(:after_init, %State{controller: controller, data: data} = state) do
    new_data = controller.handle_after_init(data)

    {:noreply, %State{state | data: new_data}}
  end

  @impl true
  def terminate(_reason, %State{controller: controller, data: data} = state) do
    Logger.debug("Calling StateHandoff Manager terminate")
    node = Node.self()
    new_data = controller.handle_terminate(node, data)

    {:ok, %State{state | data: new_data}}
  end

  @impl true
  def handle_cast({:set, actor_id, host}, state) do
    new_data = state.controller.set(actor_id, Node.self(), host, state.data)

    {:noreply, %State{state | data: new_data}}
  end

  @impl true
  def handle_call({:get_actor_hosts_by_actor_id, actor_id}, _from, state) do
    {_new_data, hosts} = state.controller.get_by_id(actor_id, state.data)

    {:reply, hosts, state}
  end

  def handle_call({:clean, node}, _from, state) do
    new_data = state.controller.handle_terminate(node, state.data)

    {:reply, new_data, %State{state | data: new_data}}
  end

  @impl true
  def handle_info(
        {:timer, event},
        %State{controller: controller, data: data, timer: timer} = state
      ) do
    if !is_nil(timer) do
      Process.cancel_timer(timer)
    end

    case controller.handle_timer(event, data) do
      {new_data, {evt, delay} = _timer} ->
        new_timer = Process.send_after(self(), {:timer, evt}, delay)
        {:noreply, %State{state | data: new_data, timer: new_timer}}

      new_data ->
        {:noreply, %State{state | data: new_data}}
    end
  end

  def handle_info({:nodeup, node, node_type}, %State{controller: controller, data: data} = state) do
    Logger.debug("Received :nodeup event from #{inspect(node)}")
    new_data = controller.handle_nodeup_event(node, node_type, data)

    {:noreply, %State{state | data: new_data}}
  end

  def handle_info(
        {:nodedown, node, node_type},
        %State{controller: controller, data: data} = state
      ) do
    Logger.debug("Received :nodedown event from #{inspect(node)}")
    new_data = controller.handle_nodedown_event(node, node_type, data)

    {:noreply, %State{state | data: new_data}}
  end

  def handle_info(event, state) do
    Logger.debug("Received handle_info event #{inspect(event)}")

    {:noreply, state}
  end

  # Client API
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @doc """
  Store a actor and entity in the lookup store
  """
  def set(actor_id, host), do: GenServer.cast(__MODULE__, {:set, actor_id, host})

  @doc """
  Pickup the stored entity data for a actor
  """
  def get(actor_id),
    do: GenServer.call(__MODULE__, {:get_actor_hosts_by_actor_id, actor_id}, :infinity)

  @doc """
  Cluster HostActor cleanup
  """
  def clean(node) do
    Logger.debug("Received cleanup action from Node #{inspect(node)}")

    GenServer.call(__MODULE__, {:clean, node})

    Logger.debug("Hosts cleaned for node #{inspect(node)}")
  end

  # Private functions
  defp do_init(_config) do
    Process.flag(:trap_exit, true)
    Process.flag(:message_queue_data, :off_heap)
    :net_kernel.monitor_nodes(true, node_type: :visible)
  end
end