lib/absinthe/graphql_ws/transport.ex

defmodule Absinthe.GraphqlWS.Transport do
  @moduledoc """
  Handles messages coming into the socket from clients (implemented in `handle_in/2`)
  as well as messages coming from within Elixir/Absinthe (implemented in `handle_info/2`).

  If the optional `c:Absinthe.GraphqlWS.Socket.handle_message/2` callback is implemented on
  the socket, then messages that are not specifically caught by `handle_info/2` in this
  module will be passed through to `c:Absinthe.GraphqlWS.Socket.handle_message/2`.

  **Note:** This module is not intended for use by individuals integrating this library into
  their codebase, but is documented to help understand the intentions of the code.
  """

  alias Absinthe.GraphqlWS.{Message, Socket, Util}
  alias Phoenix.Socket.Broadcast
  require Logger

  @ping "ping"
  @pong "pong"

  @type control :: Socket.control()
  @type reply_inbound() :: Socket.reply_inbound()
  @type reply_message() :: Socket.reply_message()
  @type socket() :: Socket.t()

  defmacrop debug(msg), do: quote(do: Logger.debug("[graph-socket@#{inspect(self())}] #{unquote(msg)}"))
  defmacrop warn(msg), do: quote(do: Logger.warn("[graph-socket@#{inspect(self())}] #{unquote(msg)}"))

  @doc """
  Generally this will only receive `:pong` messages in response to our keepalive
  ping messages. Client-side websocket libraries handle these control frames
  automatically in order to adhere to the spec, so unless a customer is writing their
  own low-level websocket it should be handled for them.
  """
  @spec handle_control({term(), opcode: control()}, socket()) :: reply_inbound()
  def handle_control({_, opcode: :ping}, socket), do: {:reply, :ok, {:pong, @pong}, socket}
  def handle_control({_, opcode: :pong}, socket), do: {:ok, socket}

  def handle_control(message, state) do
    warn(" unhandled control frame #{inspect(message)}")
    {:ok, state}
  end

  @doc """
  Receive messages from clients. We expect all incoming messages to be JSON encoded
  text, so if something else comes in we blow up.
  """
  @spec handle_in({binary(), [opcode: :text]}, socket()) :: reply_inbound()
  def handle_in({text, [opcode: :text]}, socket) do
    Util.json_library().decode(text)
    |> case do
      {:ok, json} ->
        handle_inbound(json, socket)

      {:error, reason} ->
        warn("JSON parse error: #{inspect(reason)}")
        {:reply, :error, {:text, Message.Error.new("4400")}, socket}
    end
  end

  @doc """
  Receive messages from inside the house.

  * `:keepalive` - Regularly send messages with opcode of `0x09`, ie `:ping`. The `graphql-ws`
    library has a strong opinion that it does not want to implement client-side keepalive, so
    in order to keep the websocket from closing we need to send it messages.

  * `subscription:data` - After we subscribe to an Absinthe subscription, we may receive messages
    for the relevant subscription. The `graphql-ws` will have sent us an `id` along with the
    subscription query, so we need to map our internal topic back to that `id` in order for the
    client to figure out what to do with our message.

  * `:complete` - If we get a `query` or a `mutation` on the websocket, we're supposed to reply
    with a `Next` message followed by a `Complete` message. We follow through on the latter by
    putting a message on our process queue.

  * fallthrough - If `c:Absinthe.GraphqlWs.Socket.handle_message/2` is defined on the socket,
    then uncaught messages will be sent there.
  """
  @spec handle_info(term(), socket()) :: reply_message()
  def handle_info(:keepalive, socket) do
    Process.send_after(self(), :keepalive, socket.keepalive)
    {:push, {:ping, @ping}, socket}
  end

  def handle_info(%Broadcast{event: "subscription:data", payload: payload, topic: topic}, socket) do
    subscription_id = socket.subscriptions[topic]
    {:push, {:text, Message.Next.new(subscription_id, payload.result)}, socket}
  end

  def handle_info({:complete, id}, socket) do
    {:push, {:text, Message.Complete.new(id)}, socket}
  end

  def handle_info(message, socket) do
    if function_exported?(socket.handler, :handle_message, 2) do
      socket.handler.handle_message(message, socket)
    else
      {:ok, socket}
    end
  end

  @doc """
  Process was stopped.
  """
  @spec terminate(term(), socket()) :: :ok
  def terminate(reason, _socket) do
    debug("terminated: #{inspect(reason)}")
    :ok
  end

  @doc """
  Callbacks for parsed JSON payloads coming in from a client.

  See:
  https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md
  """
  @spec handle_inbound(map(), socket()) :: reply_inbound()
  def handle_inbound(%{"type" => "connection_init"}, %{initialized?: true} = socket) do
    close(4429, "Too many initialisation requests", socket)
  end

  def handle_inbound(%{"type" => "connection_init"} = message, %{handler: handler} = socket) do
    if function_exported?(handler, :handle_init, 2) do
      case handler.handle_init(Map.get(message, "payload", %{}), socket) do
        {:ok, payload, socket} ->
          {:reply, :ok, {:text, Message.ConnectionAck.new(payload)}, %{socket | initialized?: true}}

        {:error, payload, socket} ->
          {:reply, :ok, {:text, Message.Error.new(payload)}, socket}
      end
    else
      {:reply, :ok, {:text, Message.ConnectionAck.new()}, %{socket | initialized?: true}}
    end
  end

  def handle_inbound(%{"type" => "subscribe"}, %{initialized?: false} = socket) do
    close(4400, "Subscribe message received before ConnectionInit", socket)
  end

  def handle_inbound(%{"id" => id, "type" => "subscribe", "payload" => payload}, socket) do
    payload
    |> handle_subscribe(id, socket)
  end

  def handle_inbound(%{"id" => id, "type" => "complete"}, socket) do
    socket.subscriptions
    |> Enum.find_value(fn
      {topic, ^id} ->
        {:ok, topic}

      _ ->
        false
    end)
    |> case do
      {:ok, topic} ->
        debug("unsubscribing from topic #{topic}")
        Phoenix.PubSub.unsubscribe(socket.pubsub, topic)
        Absinthe.Subscription.unsubscribe(socket.endpoint, topic)

        {:ok, %{socket | subscriptions: Map.delete(socket.subscriptions, id)}}

      _ ->
        {:ok, socket}
    end
  end

  def handle_inbound(%{"type" => "ping"}, socket),
    do: {:reply, :ok, {:text, Message.Pong.new()}, socket}

  def handle_inbound(msg, socket) do
    warn("unhandled message #{inspect(msg)}")
    close(4400, "Unhandled message from client", socket)
  end

  @doc """
  Subscribe messages in graphql-ws may include a subscription, implying a subscription to
  a long term stream of data. These messages may also be queries or mutations, so do not require
  a stream.
  """
  def handle_subscribe(payload, id, socket) do
    with %{schema: schema} <- socket.absinthe,
         {:ok, variables} <- parse_variables(payload),
         {:ok, query} <- parse_query(payload) do
      opts = socket.absinthe.opts |> Keyword.merge(variables: variables)

      Absinthe.Logger.log_run(:debug, {
        query,
        schema,
        [],
        opts
      })

      run_doc(socket, id, query, socket.absinthe, opts)
    else
      _ ->
        {:ok, socket}
    end
  end

  defp close(code, message, socket) do
    {:reply, :ok, {:close, code, message}, socket}
  end

  defp parse_query(%{"query" => query}) when is_binary(query), do: {:ok, query}
  defp parse_query(_), do: {:ok, ""}

  defp parse_variables(%{"variables" => variables}) when is_map(variables), do: {:ok, variables}
  defp parse_variables(_), do: {:ok, %{}}

  def pipeline(schema, options) do
    schema
    |> Absinthe.Pipeline.for_document(options)
  end

  defp run_doc(socket, id, query, config, opts) do
    case run(query, config[:schema], config[:pipeline], opts) do
      {:ok, %{"subscribed" => topic}, context} ->
        debug("subscribed to topic #{topic}")

        :ok =
          Phoenix.PubSub.subscribe(
            socket.pubsub,
            topic,
            # metadata: {:fastlane, self(), @serializer, []},
            link: true
          )

        socket = merge_opts(socket, context: context)
        {:ok, %{socket | subscriptions: Map.put(socket.subscriptions, topic, id)}}

      {:ok, %{data: _} = reply, context} ->
        queue_complete_message(id)
        socket = merge_opts(socket, context: context)
        {:reply, :ok, {:text, Message.Next.new(id, reply)}, socket}

      {:ok, %{errors: errors}, context} ->
        socket = merge_opts(socket, context: context)
        {:reply, :ok, {:text, Message.Error.new(id, errors)}, socket}

      {:error, reply} ->
        {:reply, :error, {:text, Message.Error.new(id, reply)}, socket}
    end
  end

  defp run(document, schema, pipeline, options) do
    {module, fun} = pipeline

    case Absinthe.Pipeline.run(document, apply(module, fun, [schema, options])) do
      {:ok, %{result: result, execution: res}, _phases} ->
        {:ok, result, res.context}

      {:error, msg, _phases} ->
        {:error, msg}
    end
  end

  defp merge_opts(socket, opts) do
    %{socket | absinthe: %{socket.absinthe | opts: opts}}
  end

  defp queue_complete_message(id), do: send(self(), {:complete, id})
end