lib/multiverses/server.ex

defmodule Multiverses.AppSupervisor do
  @moduledoc """
  AppSupervisor is the core `Application` module that supervises the `Multiverses.Server` module,
  which should be running in active `Multiverses` environments.
  """

  use Application

  alias Multiverses.Server

  def start(_type, _args) do
    Supervisor.start_link([Server], strategy: :one_for_one, name: __MODULE__)
  end
end

defmodule Multiverses.Server do
  @moduledoc "core server for managing multiverse partitions"

  alias Multiverses.UnexpectedCallError

  use GenServer

  # API
  @spec shard(module) :: [{module, Multiverse.id()}]
  @spec shards(pid) :: [{module, Multiverse.id()}]
  @spec id(module, keyword) :: Multiverse.id()
  @spec allow(module, pid | Multiverse.id(), term) :: :error | [{{module, pid}, Multiverse.id()}]
  @spec allow([{module, Multiverse.id()}], term) :: :error | [{{module, pid}, Multiverse.id()}]
  @spec all(module) :: [Multiverse.id()]
  @spec clear(module) :: :ok

  # private api
  @spec id_pair(module) :: {pid, Multiverse.id()} | nil
  @spec id_pair(:ets.table(), module, [pid]) :: {pid, Multiverse.id()} | nil

  # STARTUP BOILERPLATE
  @doc false
  def start_link(_) do
    case GenServer.start_link(__MODULE__, [], name: __MODULE__) do
      {:error, {:already_started, _}} -> :ignore
      response -> response
    end
  end

  @doc false
  def init(_) do
    ref = :ets.new(__MODULE__, [:named_table, :set, :protected, read_concurrency: true])
    {:ok, ref}
  end

  # API IMPLEMENTATIONS

  def shard(module_or_modules) when is_list(module_or_modules) or is_atom(module_or_modules) do
    this = self()
    modules = List.wrap(module_or_modules)

    # check to make sure that the shard isn't already assigned.
    Enum.each(modules, fn module ->
      case id_pair(module) do
        nil ->
          :ok

        {pid, _} ->
          raise UnexpectedCallError,
                "Multiverse shard for #{inspect(module)} already exists for pid #{inspect(self())}#{format_pid_name(pid)}"
      end
    end)

    __MODULE__
    |> GenServer.call({:shard, modules, this})
    |> distribute_import
  end

  def shards(pid) do
    :ets.select(__MODULE__, [
      {{{:"$1", :"$2"}, :"$3"}, [{:==, :"$2", {:const, pid}}], [{{:"$1", :"$3"}}]}
    ])
  end

  defp shard_impl(modules, pid, _from, table) do
    tuples = Enum.map(modules, &{{&1, pid}, :erlang.phash2({&1, pid})})
    :ets.insert(table, tuples)
    {:reply, tuples, table}
  end

  def id(module, options) do
    pair = id_pair(module)
    strict = Keyword.get(options, :strict, true)

    unless not strict or pair do
      raise UnexpectedCallError,
            "no shard defined for module #{inspect(module)} in #{format_process()}"
    end

    elem(pair, 1)
  end

  defp id_pair(module) do
    callers = [self() | Process.get(:"$callers", [])]
    id_pair(__MODULE__, module, callers)
  end

  defp id_pair(table, module, callers) do
    table
    |> :ets.select(select(module, callers))
    |> List.first()
  end

  # generates the select matchspec from callers value
  defp select(module, callers) do
    callers_chain =
      callers
      |> Enum.map(&{:==, :"$2", &1})
      |> Enum.reduce(&{:or, &1, &2})

    [{{{:"$1", :"$2"}, :"$3"}, [{:==, :"$1", module}, callers_chain], [{{:"$2", :"$3"}}]}]
  end

  def allow(module, owner, allow) do
    case GenServer.call(__MODULE__, {:allow, module, owner, allow}) do
      :error ->
        raise "Multiverses.allow/3 attempted to find the shard of #{inspect(owner)} but there was none"

      list when is_list(list) ->
        distribute_import(list)
    end
  end

  def allow(modulespec, allow) do
    __MODULE__
    |> GenServer.call({:allow, modulespec, allow})
    |> distribute_import
  end

  def _import(imports) do
    GenServer.call(__MODULE__, {:_import, imports})
  end

  defp _import_impl(imports, _from, table) do
    :ets.insert(table, imports)
    {:reply, :ok, table}
  end

  defp allow_impl(module, owner, allow, _from, table) do
    allow_pid = to_pid(allow)

    result =
      with owner_pid when is_pid(owner_pid) <- owner,
           selection = [
             {{{:"$1", :"$2"}, :"$3"}, [{:==, :"$1", module}, {:==, :"$2", {:const, owner_pid}}],
              [:"$3"]}
           ],
           [id] <- :ets.select(table, selection) do
        insertion = {{module, allow_pid}, id}
        :ets.insert(table, insertion)
        [insertion]
      else
        id when is_integer(id) ->
          insertion = {{module, allow_pid}, id}
          :ets.insert(table, insertion)
          [insertion]

        [] ->
          :error
      end

    {:reply, result, table}
  end

  defp allow_impl(modulespec, allow, _from, table) do
    allow_pid = to_pid(allow)
    insertion = Enum.map(modulespec, fn {module, shard_id} -> {{module, allow_pid}, shard_id} end)
    :ets.insert(table, insertion)
    {:reply, insertion, table}
  end

  defp to_pid(pid) when is_pid(pid), do: pid
  defp to_pid(atom) when is_atom(atom), do: :erlang.whereis(atom)
  defp to_pid({module, key}), do: module.whereis_name(key)

  def all(module) do
    __MODULE__
    |> :ets.select([{{{:"$1", :_}, :"$2"}, [{:==, :"$1", module}], [:"$2"]}])
    |> Enum.uniq()
  end

  def clear(module), do: GenServer.call(__MODULE__, {:clear, module, self()})

  defp clear_impl(module, who, _from, table) do
    :ets.delete(table, {module, who})
    {:reply, :ok, table}
  end

  def _dump, do: GenServer.call(__MODULE__, :dump)

  defp dump_impl(_from, table),
    do: {:reply, :ets.select(table, [{{:"$1", :"$2"}, [], [{{:"$1", :"$2"}}]}]), table}

  # COMMON UTILITIES

  defp distribute_import(list) do
    Node.list()
    |> Task.async_stream(fn node ->
      :rpc.call(node, __MODULE__, :_import, [list])
    end)
    |> Stream.run()

    list
  end

  # ROUTER

  def handle_call({:shard, module, pid}, from, table),
    do: shard_impl(module, pid, from, table)

  def handle_call({:allow, module, owner, allowed}, from, table),
    do: allow_impl(module, owner, allowed, from, table)

  def handle_call({:allow, modulespec, allowed}, from, table),
    do: allow_impl(modulespec, allowed, from, table)

  def handle_call({:_import, imports}, from, table), do: _import_impl(imports, from, table)

  def handle_call(:dump, from, table), do: dump_impl(from, table)

  def handle_call({:clear, module, who}, from, table), do: clear_impl(module, who, from, table)

  # DEBUG UTILITIES

  defp format_process do
    callers =
      :"$callers"
      |> Process.get()
      |> List.wrap()

    "process #{inspect(self())}" <>
      if Enum.empty?(callers) do
        ""
      else
        " (or in its callers #{inspect(callers)})"
      end
  end

  defp format_pid_name(pid) do
    if self() != pid, do: " (in parent #{pid})"
  end
end