lib/fly_rpc.ex

defmodule Fly.RPC do
  @moduledoc """
  Provides an RPC interface for executing an MFA on a node within a region.

  ## Configuration

  Assumes each node is running the `Fly.RPC` server in its supervision tree and
  exports `FLY_REGION` environment variable to identify the fly region.

  To run code on a specific region call `rpc_region/4`. A node found within the
  given region will be chosen at random. Raises if no nodes exist on the given
  region.

  The special `:primary` region may be passed to run the rpc against the region
  identified by the `PRIMARY_REGION` environment variable.

  ## Examples

      > rpc_region("hkg", String, :upcase, ["fly"])
      "FLY"

      > rpc_region(Fly.primary_region(), String, :upcase, ["fly"])
      "FLY"

      > rpc_region(:primary, String, :upcase, ["fly"])
      "FLY"

  ## Server

  The GenServer's responsibility is just to monitor other nodes as they enter
  and leave the cluster. It maintains a list of nodes and the Fly.io region
  where they are deployed in an ETS table that other processes can use to find
  and initiate their own RPC calls to.
  """
  use GenServer
  require Logger

  @type region :: String.t() | :primary
  # reference: tid() "table identifier"
  @type tab :: atom() | reference()

  @tab :fly_regions

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @doc """
  Returns the Elixir OTP nodes registered the region. Reads from a local cache.
  """
  # @spec region_nodes(tab, region) :: [node]
  def region_nodes(tab \\ @tab, region) do
    case :ets.lookup(tab, region) do
      [{^region, nodes}] -> nodes
      [] -> []
    end
  end

  @doc """
  Asks a node what Fly region it's running in.

  Returns `:error` if RPC is not supported on remote node.
  """
  @spec region(node) :: {:ok, any()} | :error
  def region(node) do
    if is_rpc_supported?(node) do
      {:ok, rpc(node, Fly, :my_region, [])}
    else
      Logger.info("Detected Fly RPC support is not available on node #{inspect(node)}")
      :error
    end
  end

  @doc """
  Executes the MFA on an available node in the desired region.

  If the region is the "primary" region or the "local" region then execute the
  function immediately. Supports the string name of the region or `:primary` for
  the current configured primary region.

  Otherwise find an available node and select one at random to execute the
  function.

  Raises `ArgumentError` when no available nodes.

  ## Example

      > RPC.rpc_region("hkg", Kernel, :+, [1, 2])
      3

      > RPC.rpc_region(:primary, Kernel, :+, [1, 2])
      3

  """
  @spec rpc_region(region(), module(), func :: atom(), [any()], keyword()) :: any()
  def rpc_region(region, module, func, args, opts \\ [])

  def rpc_region(:primary, module, func, args, opts) do
    rpc_region(Fly.primary_region(), module, func, args, opts)
  end

  def rpc_region(region, module, func, args, opts) when is_binary(region) do
    if region == Fly.my_region() do
      apply(module, func, args)
    else
      timeout = Keyword.get(opts, :timeout, 5_000)
      available_nodes = region_nodes(region)

      if Enum.empty?(available_nodes),
        do: raise(ArgumentError, "no node found running in region #{inspect(region)}")

      node = Enum.random(available_nodes)

      rpc(node, module, func, args, timeout)
    end
  end

  @doc """
  Executes the function on the remote node and waits for the response.

  Exits after `timeout` milliseconds.
  """
  @spec rpc(node, module, func :: atom(), args :: [any], non_neg_integer()) :: any()
  def rpc(node, module, func, args, timeout \\ 5000) do
    verbose_log(:info, fn ->
      "RPC REQ from #{Fly.my_region()} for #{Fly.mfa_string(module, func, args)}"
    end)

    caller = self()
    ref = make_ref()

    # Perform the RPC call to the remote node and wait for the response
    _pid =
      Node.spawn_link(node, __MODULE__, :__local_rpc__, [
        [caller, ref, module, func | args]
      ])

    receive do
      {^ref, result} ->
        verbose_log(:info, fn ->
          "RPC RECV response in #{Fly.my_region()} for #{Fly.mfa_string(module, func, args)}"
        end)

        result
    after
      timeout ->
        verbose_log(:error, fn ->
          "RPC TIMEOUT in #{Fly.my_region()} calling #{Fly.mfa_string(module, func, args)}"
        end)

        exit(:timeout)
    end
  end

  @doc false
  # Private function that can be executed on a remote node in the cluster. Used
  # to execute arbitrary function from a trusted caller.
  def __local_rpc__([caller, ref, module, func | args]) do
    result = apply(module, func, args)
    send(caller, {ref, result})
  end

  @doc """
  Executes a function on the remote node to determine if the RPC API support is
  available.

  Support may not exist on the remote node in a "first roll out" scenario.
  """
  @spec is_rpc_supported?(node) :: boolean()
  def is_rpc_supported?(node) do
    # note: use :erpc.call once erlang 23+ is required
    case :rpc.call(node, Kernel, :function_exported?, [Fly, :my_region, 0], 5000) do
      result when is_boolean(result) ->
        result

      {:badrpc, reason} ->
        Logger.warn("Failed RPC supported test on node #{inspect(node)}, got: #{inspect(reason)}")
        false
    end
  end

  ## RPC calls run on local node

  def init(_opts) do
    tab = :ets.new(@tab, [:named_table, :public, read_concurrency: true])
    # monitor new node up/down activity
    :global_group.monitor_nodes(true)
    {:ok, %{nodes: MapSet.new(), tab: tab}, {:continue, :get_node_regions}}
  end

  def handle_continue(:get_node_regions, state) do
    new_state =
      Enum.reduce(Node.list(), state, fn node_name, acc ->
        put_node(acc, node_name)
      end)

    {:noreply, new_state}
  end

  def handle_info({:nodeup, node_name}, state) do
    Logger.debug("nodeup #{node_name}")

    # Only react/track visible nodes (hidden ones are for IEx, etc)
    new_state =
      if node_name in Node.list(:visible) do
        put_node(state, node_name)
      else
        state
      end

    {:noreply, new_state}
  end

  def handle_info({:nodedown, node_name}, state) do
    Logger.debug("nodedown #{node_name}")
    {:noreply, drop_node(state, node_name)}
  end

  # Executed when a new node shows up in the cluster. Asks the node what region
  # it's running in. If the request isn't supported by the node, do nothing.
  # This happens when this node is the first node with this new code. It reaches
  # out to the other nodes (they show up as having just appeared) but they don't
  # yet have the new code. So this ignores that node until it gets new code,
  # restarts and will then again show up as a new node.
  @doc false
  def put_node(state, node_name) do
    case region(node_name) do
      {:ok, region} ->
        Logger.info("Discovered node #{inspect(node_name)} in region #{region}")
        region_nodes = region_nodes(state.tab, region)
        :ets.insert(state.tab, {region, [node_name | region_nodes]})

        %{state | nodes: MapSet.put(state.nodes, {node_name, region})}

      :error ->
        state
    end
  end

  @doc false
  def drop_node(state, node_name) do
    # find the node information for the node going down.
    case get_node(state, node_name) do
      {^node_name, region} ->
        Logger.info("Dropping node #{inspect(node_name)} for region #{region}")
        # get the list of nodes currently registered in that region
        region_nodes = region_nodes(state.tab, region)
        # Remove the node from the known regions and update the local cache
        new_regions = Enum.reject(region_nodes, fn n -> n == node_name end)
        :ets.insert(state.tab, {region, new_regions})

        # Remove the node entry from the GenServer's state
        new_nodes =
          Enum.reduce(state.nodes, state.nodes, fn
            {^node_name, ^region}, acc -> MapSet.delete(acc, {node_name, region})
            {_node, _region}, acc -> acc
          end)

        # Return the new state
        %{state | nodes: new_nodes}

      # Node is not known to us. Ignore it.
      nil ->
        state
    end
  end

  defp get_node(state, name) do
    Enum.find(state.nodes, fn {n, _region} -> n == name end)
  end

  defp verbose_log(kind, func) do
    if Application.get_env(:fly_rpc, :verbose_logging) do
      Logger.log(kind, func)
    end
  end
end