lib/nebulex/rpc.ex

defmodule Nebulex.RPC do
  @moduledoc """
  RPC utilities for distributed task execution.

  This module uses supervised tasks underneath `Task.Supervisor`.

  > **NOTE:** The approach by using distributed tasks will be deprecated
    in the future in favor of `:erpc`.
  """

  @typedoc "Task supervisor"
  @type task_sup :: Supervisor.supervisor()

  @typedoc "Task callback"
  @type callback :: {module, atom, [term]}

  @typedoc "Group entry: node -> callback"
  @type node_callback :: {node, callback}

  @typedoc "Node group"
  @type node_group :: %{optional(node) => callback} | [node_callback]

  @typedoc "Reducer function spec"
  @type reducer_fun :: ({:ok, term} | {:error, term}, node_callback | node, term -> term)

  @typedoc "Reducer spec"
  @type reducer :: {acc :: term, reducer_fun}

  ## API

  @doc """
  Evaluates `apply(mod, fun, args)` on node `node` and returns the corresponding
  evaluation result, or `{:badrpc, reason}` if the call fails.

  A timeout, in milliseconds or `:infinity`, can be given with a default value
  of `5000`. It uses `Task.await/2` internally.

  ## Example

      iex> Nebulex.RPC.call(:my_task_sup, :node1, Kernel, :to_string, [1])
      "1"

  """
  @spec call(task_sup, node, module, atom, [term], timeout) :: term | {:badrpc, term}
  def call(supervisor, node, mod, fun, args, timeout \\ 5000) do
    rpc_call(supervisor, node, mod, fun, args, timeout)
  end

  @doc """
  In contrast to a regular single-node RPC, a multicall is an RPC that is sent
  concurrently from one client to multiple servers. The function evaluates
  `apply(mod, fun, args)` on each `node_group` entry and collects the answers.
  Then, evaluates the `reducer` function (set in the `opts`) on each answer.

  This function is similar to `:rpc.multicall/5`.

  ## Options

    * `:timeout` - A timeout, in milliseconds or `:infinity`, can be given with
      a default value of `5000`. It uses `Task.yield_many/2` internally.

    * `:reducer` - Reducer function to be executed on each collected result.
      (check out `reducer` type).

  ## Example

      iex> Nebulex.RPC.multi_call(
      ...>   :my_task_sup,
      ...>   %{
      ...>     node1: {Kernel, :to_string, [1]},
      ...>     node2: {Kernel, :to_string, [2]}
      ...>   },
      ...>   timeout: 10_000,
      ...>   reducer: {
      ...>     [],
      ...>     fn
      ...>       {:ok, res}, _node_callback, acc ->
      ...>         [res | acc]
      ...>
      ...>       {:error, _}, _node_callback, acc ->
      ...>         acc
      ...>     end
      ...>   }
      ...> )
      ["1", "2"]

  """
  @spec multi_call(task_sup, node_group, Keyword.t()) :: term
  def multi_call(supervisor, node_group, opts \\ []) do
    rpc_multi_call(supervisor, node_group, opts)
  end

  @doc """
  Similar to `multi_call/3` but the same `node_callback` (given by `module`,
  `fun`, `args`) is executed on all `nodes`; Internally it creates a
  `node_group` with the same `node_callback` for each node.

  ## Options

  Same options as `multi_call/3`.

  ## Example

      iex> Nebulex.RPC.multi_call(
      ...>   :my_task_sup,
      ...>   [:node1, :node2],
      ...>   Kernel,
      ...>   :to_string,
      ...>   [1],
      ...>   timeout: 5000,
      ...>   reducer: {
      ...>     [],
      ...>     fn
      ...>       {:ok, res}, _node_callback, acc ->
      ...>         [res | acc]
      ...>
      ...>       {:error, _}, _node_callback, acc ->
      ...>         acc
      ...>     end
      ...>   }
      ...> )
      ["1", "1"]

  """
  @spec multi_call(task_sup, [node], module, atom, [term], Keyword.t()) :: term
  def multi_call(supervisor, nodes, mod, fun, args, opts \\ []) do
    rpc_multi_call(supervisor, nodes, mod, fun, args, opts)
  end

  ## Helpers

  if Code.ensure_loaded?(:erpc) do
    defp rpc_call(_supervisor, node, mod, fun, args, _timeout) when node == node() do
      apply(mod, fun, args)
    end

    defp rpc_call(_supervisor, node, mod, fun, args, timeout) do
      :erpc.call(node, mod, fun, args, timeout)
    rescue
      e in ErlangError ->
        case e.original do
          {:exception, original, _} when is_struct(original) ->
            reraise original, __STACKTRACE__

          {:exception, original, _} ->
            :erlang.raise(:error, original, __STACKTRACE__)

          other ->
            reraise %Nebulex.RPCError{reason: other, node: node}, __STACKTRACE__
        end
    end

    def rpc_multi_call(_supervisor, node_group, opts) do
      {reducer_acc, reducer_fun} = opts[:reducer] || default_reducer()
      timeout = opts[:timeout] || 5000

      node_group
      |> Enum.map(fn {node, {mod, fun, args}} = group ->
        {:erpc.send_request(node, mod, fun, args), group}
      end)
      |> Enum.reduce(reducer_acc, fn {req_id, group}, acc ->
        try do
          res = :erpc.receive_response(req_id, timeout)
          reducer_fun.({:ok, res}, group, acc)
        rescue
          exception ->
            reducer_fun.({:error, exception}, group, acc)
        catch
          :exit, reason ->
            reducer_fun.({:error, {:exit, reason}}, group, acc)
        end
      end)
    end

    def rpc_multi_call(_supervisor, nodes, mod, fun, args, opts) do
      {reducer_acc, reducer_fun} = opts[:reducer] || default_reducer()

      nodes
      |> :erpc.multicall(mod, fun, args, opts[:timeout] || 5000)
      |> :lists.zip(nodes)
      |> Enum.reduce(reducer_acc, fn {res, node}, acc ->
        reducer_fun.(res, node, acc)
      end)
    end
  else
    # TODO: This approach by using distributed tasks will be deprecated in the
    #       future in favor of `:erpc` which is proven to improve performance
    #       almost by 3x.

    defp rpc_call(_supervisor, node, mod, fun, args, _timeout) when node == node() do
      apply(mod, fun, args)
    rescue
      # FIXME: this is because coveralls does not check this as covered
      # coveralls-ignore-start
      exception ->
        {:badrpc, exception}
        # coveralls-ignore-stop
    end

    defp rpc_call(supervisor, node, mod, fun, args, timeout) do
      {supervisor, node}
      |> Task.Supervisor.async_nolink(
        __MODULE__,
        :call,
        [supervisor, node, mod, fun, args, timeout]
      )
      |> Task.await(timeout)
    end

    defp rpc_multi_call(supervisor, node_group, opts) do
      node_group
      |> Enum.map(fn {node, {mod, fun, args}} ->
        Task.Supervisor.async_nolink({supervisor, node}, mod, fun, args)
      end)
      |> handle_multi_call(node_group, opts)
    end

    defp rpc_multi_call(supervisor, nodes, mod, fun, args, opts) do
      rpc_multi_call(supervisor, Enum.map(nodes, &{&1, {mod, fun, args}}), opts)
    end

    defp handle_multi_call(tasks, node_group, opts) do
      {reducer_acc, reducer_fun} = Keyword.get(opts, :reducer, default_reducer())

      tasks
      |> Task.yield_many(opts[:timeout] || 5000)
      |> :lists.zip(node_group)
      |> Enum.reduce(reducer_acc, fn
        {{_task, {:ok, res}}, group}, acc ->
          reducer_fun.({:ok, res}, group, acc)

        {{_task, {:exit, reason}}, group}, acc ->
          reducer_fun.({:error, {:exit, reason}}, group, acc)

        {{task, nil}, group}, acc ->
          _ = Task.shutdown(task, :brutal_kill)
          reducer_fun.({:error, :timeout}, group, acc)
      end)
    end
  end

  defp default_reducer do
    {
      {[], []},
      fn
        {:ok, res}, _node_callback, {ok, err} ->
          {[res | ok], err}

        {kind, _} = error, node_callback, {ok, err} when kind in [:error, :exit, :throw] ->
          {ok, [{error, node_callback} | err]}
      end
    }
  end
end