lib/absinthe/subscription/local.ex

defmodule Absinthe.Subscription.Local do
  @moduledoc """
  This module handles broadcasting documents that are local to this node
  """

  require Logger

  alias Absinthe.Pipeline.BatchResolver

  # This module handles running and broadcasting documents that are local to this
  # node.

  @doc """
  Publish a mutation to the local node only.

  See also `Absinthe.Subscription.publish/3`
  """
  @spec publish_mutation(
          Absinthe.Subscription.Pubsub.t(),
          term,
          [Absinthe.Subscription.subscription_field_spec()]
        ) :: :ok
  def publish_mutation(pubsub, mutation_result, subscribed_fields) do
    docs_and_topics =
      for {field, key_strategy} <- subscribed_fields,
          {topic, doc} <- get_docs(pubsub, field, mutation_result, key_strategy) do
        {topic, key_strategy, doc}
      end

    run_docset_fn =
      if function_exported?(pubsub, :run_docset, 3), do: &pubsub.run_docset/3, else: &run_docset/3

    run_docset_fn.(pubsub, docs_and_topics, mutation_result)

    :ok
  end

  alias Absinthe.{Phase, Pipeline}

  defp run_docset(pubsub, docs_and_topics, mutation_result) do
    for {topic, key_strategy, doc} <- docs_and_topics do
      try do
        pipeline = pipeline(doc, mutation_result)

        {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline)

        Logger.debug("""
        Absinthe Subscription Publication
        Field Topic: #{inspect(key_strategy)}
        Subscription id: #{inspect(topic)}
        Data: #{inspect(data)}
        """)

        :ok = pubsub.publish_subscription(topic, data)
      rescue
        e ->
          BatchResolver.pipeline_error(e, __STACKTRACE__)
      end
    end
  end

  def pipeline(doc, mutation_result) do
    pipeline =
      doc.initial_phases
      |> Pipeline.replace(
        Phase.Telemetry,
        {Phase.Telemetry, event: [:subscription, :publish, :start]}
      )
      |> Pipeline.without(Phase.Subscription.SubscribeSelf)
      |> Pipeline.insert_before(
        Phase.Document.Execution.Resolution,
        {Phase.Document.OverrideRoot, root_value: mutation_result}
      )
      |> Pipeline.upto(Phase.Document.Execution.Resolution)

    pipeline = [
      pipeline,
      [
        result_phase(doc),
        {Absinthe.Phase.Telemetry, event: [:subscription, :publish, :stop]}
      ]
    ]

    pipeline
  end

  defp get_docs(pubsub, field, mutation_result, topic: topic_fun)
       when is_function(topic_fun, 1) do
    do_get_docs(pubsub, field, topic_fun.(mutation_result))
  end

  defp get_docs(pubsub, field, _mutation_result, key) do
    do_get_docs(pubsub, field, key)
  end

  defp do_get_docs(pubsub, field, keys) do
    keys
    |> List.wrap()
    |> Enum.map(&to_string/1)
    |> Enum.flat_map(&Absinthe.Subscription.get(pubsub, {field, &1}))
  end

  defp result_phase(doc) do
    # use the configured result phase from the initial pipeline
    # this will allow the result of the subscription data to match
    # the output of query/mutation. An example of result phase is
    # Absinthe.Phoenix.Controller.Result where the output will have
    # atom keys and allow struct to be returned

    doc.initial_phases
    |> Pipeline.from(Phase.Blueprint)
    |> case do
      [{Phase.Blueprint, opts} | _] ->
        Keyword.get(opts, :result_phase, Phase.Document.Result)

      _ ->
        Phase.Document.Result
    end
  end
end