lib/absinthe/subscription.ex

defmodule Absinthe.Subscription do
  @moduledoc """
  Real time updates via GraphQL

  For a how to guide on getting started with Absinthe.Subscriptions in your phoenix
  project see the `Absinthe.Phoenix` package.

  Define in your schema via `Absinthe.Schema.subscription/2`

  ## Basic Usage

  ## Performance Characteristics

  There are a couple of limitations to the beta release of subscriptions that
  are worth keeping in mind if you want to use this in production:

  By design, all subscription docs triggered by a mutation are run inside the
  mutation process as a form of back pressure.

  At the moment however database batching does not happen across the set of
  subscription docs. Thus if you have a lot of subscription docs and they each
  do a lot of extra DB lookups you're going to delay incoming mutation responses
  by however long it takes to do all that work.

  Before the final version of 1.4.0 we want

  - Batching across subscriptions
  - More user control over back pressure / async balance.
  """

  alias __MODULE__

  alias Absinthe.Subscription.PipelineSerializer

  @doc """
  Add Absinthe.Subscription to your process tree.
  """
  @spec start_link(atom() | [opt()]) :: Supervisor.on_start()
  defdelegate start_link(opts_or_pubsub), to: Subscription.Supervisor

  @type opt() ::
          {:pubsub, atom()} | {:compress_registry?, boolean()} | {:pool_size, pos_integer()}

  @doc """
  Build a child specification for subscriptions.

  In order to use subscriptions in your application, you must add
  `Absinthe.Subscription` to your supervision tree after your endpoint.

  See `guides/subscriptions.md` for more information on how to get up and
  running with subscriptions.

  ## Options

  * `:pubsub` - (Required) The `Phoenix.Pubsub` that should be used to publish
    subscriptions. Typically this will be your `Phoenix.Endpoint`.
  * `:compress_registry?` - (Optional - default `true`) A boolean controlling
    whether the Registry used to keep track of subscriptions will should be
    compressed or not.
  * `:pool_size` - (Optional - default `System.schedulers() * 2`) An integer
    specifying the number of `Absinthe.Subscription.Proxy` processes to start.
  """
  @spec child_spec(atom() | [opt()]) :: Supervisor.child_spec()
  def child_spec(pubsub) when is_atom(pubsub) do
    # child_spec/1 used to take a single argument - the pub-sub - so in order
    # to maintain compatibility for existing users of the library we still
    # accept this argument and transform it into a keyword list.
    child_spec(pubsub: pubsub)
  end

  def child_spec(opts) when is_list(opts) do
    %{
      id: __MODULE__,
      start: {Subscription.Supervisor, :start_link, [opts]},
      type: :supervisor
    }
  end

  @type subscription_field_spec :: {atom, term | (term -> term)}

  @doc """
  Publish a mutation

  This function is generally used when trying to publish to one or more subscription
  fields "out of band" from any particular mutation.

  ## Examples

  Note: As with all subscription examples if you're using Absinthe.Phoenix `pubsub`
  will be `MyAppWeb.Endpoint`.

  ```
  Absinthe.Subscription.publish(pubsub, user, [new_users: user.account_id])
  ```
  ```
  # publish to two subscription fields
  Absinthe.Subscription.publish(pubsub, user, [
    new_users: user.account_id,
    other_user_subscription_field: user.id,
  ])
  ```
  """
  @spec publish(
          Absinthe.Subscription.Pubsub.t(),
          term,
          Absinthe.Resolution.t() | [subscription_field_spec]
        ) :: :ok
  def publish(_pubsub, _mutation_result, []), do: :ok

  def publish(pubsub, mutation_result, %Absinthe.Resolution{} = info) do
    subscribed_fields = get_subscription_fields(info)
    publish(pubsub, mutation_result, subscribed_fields)
  end

  def publish(pubsub, mutation_result, subscribed_fields) do
    _ = publish_remote(pubsub, mutation_result, subscribed_fields)
    _ = Subscription.Local.publish_mutation(pubsub, mutation_result, subscribed_fields)
    :ok
  end

  defp get_subscription_fields(resolution_info) do
    mutation_field = resolution_info.definition.schema_node
    schema = resolution_info.schema
    subscription = Absinthe.Schema.lookup_type(schema, :subscription) || %{fields: []}

    subscription_fields = fetch_fields(subscription.fields, mutation_field.triggers)

    for {sub_field_id, sub_field} <- subscription_fields do
      triggers = Absinthe.Type.function(sub_field, :triggers)
      config = Map.fetch!(triggers, mutation_field.identifier)
      {sub_field_id, config}
    end
  end

  # TODO: normalize the `.fields` type.
  defp fetch_fields(fields, triggers) when is_map(fields) do
    Map.take(fields, triggers)
  end

  defp fetch_fields(_, _), do: []

  @doc false
  def subscribe(pubsub, field_keys, doc_id, doc) do
    field_keys = List.wrap(field_keys)

    registry = pubsub |> registry_name

    doc_value = %{
      initial_phases: PipelineSerializer.pack(doc.initial_phases),
      source: doc.source
    }

    pdict_add_fields(doc_id, field_keys)

    for field_key <- field_keys do
      {:ok, _} = Registry.register(registry, field_key, doc_id)
    end

    {:ok, _} = Registry.register(registry, doc_id, doc_value)
  end

  defp pdict_fields(doc_id) do
    Process.get({__MODULE__, doc_id}, [])
  end

  defp pdict_add_fields(doc_id, field_keys) do
    Process.put({__MODULE__, doc_id}, field_keys ++ pdict_fields(doc_id))
  end

  defp pdict_delete_fields(doc_id) do
    Process.delete({__MODULE__, doc_id})
  end

  @doc false
  def unsubscribe(pubsub, doc_id) do
    registry = pubsub |> registry_name

    for field_key <- pdict_fields(doc_id) do
      Registry.unregister(registry, field_key)
    end

    Registry.unregister(registry, doc_id)

    pdict_delete_fields(doc_id)
    :ok
  end

  @doc false
  def get(pubsub, key) do
    pubsub
    |> registry_name
    |> Registry.lookup(key)
    |> then(fn doc_ids ->
      pubsub
      |> registry_name
      |> Registry.select(
        # We compose a list of match specs that basically mean "lookup all keys
        # in the doc_ids list"
        for {_, doc_id} <- doc_ids,
            do: {{:"$1", :_, :"$2"}, [{:==, :"$1", doc_id}], [{{:"$1", :"$2"}}]}
      )
    end)
    |> Map.new(fn {doc_id, doc} ->
      doc = Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1)

      {doc_id, doc}
    end)
  end

  @doc false
  def registry_name(pubsub) do
    Module.concat([pubsub, :Registry])
  end

  @doc false
  def publish_remote(pubsub, mutation_result, subscribed_fields) do
    {:ok, pool_size} =
      pubsub
      |> registry_name
      |> Registry.meta(:pool_size)

    shard = :erlang.phash2(mutation_result, pool_size)

    proxy_topic = Subscription.Proxy.topic(shard)

    :ok = pubsub.publish_mutation(proxy_topic, mutation_result, subscribed_fields)
  end

  ## Middleware callback
  @doc false
  def call(%{state: :resolved, errors: [], value: value} = res, _) do
    with {:ok, pubsub} <- extract_pubsub(res.context) do
      __MODULE__.publish(pubsub, value, res)
    end

    res
  end

  def call(res, _), do: res

  @doc false
  def extract_pubsub(context) do
    with {:ok, pubsub} <- Map.fetch(context, :pubsub),
         pid when is_pid(pid) <- Process.whereis(registry_name(pubsub)) do
      {:ok, pubsub}
    else
      _ -> :error
    end
  end

  @doc false
  def add_middleware(middleware) do
    middleware ++ [{__MODULE__, []}]
  end
end