lib/manifold.ex

defmodule Manifold do
  use Application

  alias Manifold.Partitioner
  alias Manifold.Sender
  alias Manifold.Utils

  @type pack_mode :: :binary | :etf | nil

  @type pack_mode_option :: {:pack_mode, pack_mode()}
  @type send_mode_option :: {:send_mode, :offload}
  @type option :: pack_mode_option() | send_mode_option()

  @max_partitioners 32
  @partitioners min(Application.get_env(:manifold, :partitioners, 1), @max_partitioners)
  @workers_per_partitioner Application.get_env(:manifold, :workers_per_partitioner, System.schedulers_online)

  @max_senders 128
  @senders min(Application.get_env(:manifold, :senders, System.schedulers_online), @max_senders)

  ## OTP

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    partitioners =
      for partitioner_id <- 0..(@partitioners - 1) do
        Partitioner.child_spec(@workers_per_partitioner, name: partitioner_for(partitioner_id))
      end

    senders =
      for sender_id <- 0..(@senders - 1) do
        Sender.child_spec(name: sender_for(sender_id))
      end

    Supervisor.start_link(partitioners ++ senders,
      strategy: :one_for_one,
      max_restarts: 10,
      name: __MODULE__.Supervisor
    )
  end

  ## Client

  @spec valid_send_options?(Keyword.t()) :: boolean()
  def valid_send_options?(options) when is_list(options) do
    valid_options = [
      {:pack_mode, :binary},
      {:pack_mode, :etf},
      {:send_mode, :offload},
    ]

    # Keywords could have duplicate keys, in which case the first key wins.
    Keyword.keys(options)
    |> Enum.dedup()
    |> Enum.reduce(true, fn key, acc -> acc and {key, options[key]} in valid_options end)
  end

  def valid_send_options?(_options) do
    false
  end

  @spec send([pid() | nil] | pid() | nil, message :: term(), options :: [option()]) :: :ok
  def send(pid, message, options \\ [])
  def send([pid], message, options), do: __MODULE__.send(pid, message, options)

  def send(pids, message, options) when is_list(pids) do
    case options[:send_mode] do
      :offload ->
        Sender.send(current_sender(), current_partitioner(), pids, message, options[:pack_mode])

      nil ->
        message = Utils.pack_message(options[:pack_mode], message)

        partitioner_name = current_partitioner()

        grouped_by =
          Utils.group_by(pids, fn
            nil -> nil
            pid -> node(pid)
          end)

        for {node, pids} <- grouped_by,
            node != nil,
            do: Partitioner.send({partitioner_name, node}, pids, message)

        :ok
    end
  end

  def send(pid, message, options) when is_pid(pid) do
    case options[:send_mode] do
      :offload ->
        # To maintain linearizability guaranteed by send/2, we have to send
        # it to the sender process, even for a single receiving pid.
        #
        # Since we know we are only sending to a single pid, there's no
        # performance benefit to packing the message, so we will always send as
        # raw etf.
        Sender.send(current_sender(), current_partitioner(), [pid], message, :etf)

      nil ->
        Partitioner.send({current_partitioner(), node(pid)}, [pid], message)
    end
  end

  def send(nil, _message, _options), do: :ok

  def set_partitioner_key(key) do
    partitioner = key
    |> Utils.hash()
    |> rem(@partitioners)
    |> partitioner_for()

    Process.put(:manifold_partitioner, partitioner)
  end

  def current_partitioner() do
    case Process.get(:manifold_partitioner) do
      nil ->
        partitioner_for(self())
      partitioner ->
        partitioner
    end
  end

  def partitioner_for(pid) when is_pid(pid) do
    pid
    |> Utils.partition_for(@partitioners)
    |> partitioner_for
  end

  # The 0th partitioner does not have a number in it's process name for backwards compatibility
  # purposes.
  def partitioner_for(0), do: Manifold.Partitioner
  for partitioner_id <- (1..@max_partitioners - 1) do
    def partitioner_for(unquote(partitioner_id)) do
      unquote(:"Manifold.Partitioner_#{partitioner_id}")
    end
  end

  def set_sender_key(key) do
    sender =
      key
      |> Utils.hash()
      |> rem(@senders)
      |> sender_for()

    Process.put(:manifold_sender, sender)
  end

  def current_sender() do
    case Process.get(:manifold_sender) do
      nil ->
        sender_for(self())

      sender ->
        sender
    end
  end

  def sender_for(pid) when is_pid(pid) do
    pid
    |> Utils.partition_for(@senders)
    |> sender_for
  end

  for sender_id <- 0..(@max_senders - 1) do
    def sender_for(unquote(sender_id)) do
      unquote(:"Manifold.Sender_#{sender_id}")
    end
  end
end