lib/absinthe/graphql_ws/socket.ex

defmodule Absinthe.GraphqlWS.Socket do
  @moduledoc """
  This module is used by a custom websocket, which can then handle connections from a client
  implementing the [GraphQL over WebSocket protocol](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md)

  ## Options

  * `schema` - required - The Absinthe schema for the current application (example: `MyAppWeb.Schema`)
  * `keepalive` - optional - Interval in milliseconds to send `:ping` control frames over the websocket.
    Defaults to `30_000` (30 seconds).
  * `pipeline` - optional - A `{module, function}` tuple defining how to generate an Absinthe pipeline
    for each incoming message. Defaults to `{Absinthe.GraphqlWS.Socket, :absinthe_pipeline}`.

  ## Pipeline modification

  The `:pipeline` option to socket definition defaults to `{Absinthe.GraphqlWS.Socket, :absinthe_pipeline}`.
  This function returns the default pipeline provided by `&Absinthe.Pipeline.for_document/2`. Absinthe query execution
  can be modified by altering the list of phases in this pipeline. See `Absinthe.Pipeline` for more info.

  If an alternate pipeline function is provided, it must accept the arguments `schema` and `options`. These
  options include the current context and any variables that are included with the requested query.

  ## Example

      defmodule MyAppWeb.GraphqlSocket do
        use Absinthe.GraphqlWS.Socket, schema: MyAppWeb.Schema

        def handle_message(_msg, socket) do
          {:ok, socket}
        end
      end
  """

  alias Absinthe.GraphqlWS.Socket
  require Logger

  @default_keepalive 30_000

  @enforce_keys ~w[absinthe connect_info endpoint handler keepalive pubsub]a
  defstruct [
    :absinthe,
    :connect_info,
    :endpoint,
    :handler,
    :keepalive,
    :pubsub,
    assigns: %{},
    initialized?: false,
    subscriptions: %{}
  ]

  @typedoc """
  A socket that holds information necessary for parsing incoming messages as well as outgoing subscription data.
  """
  @type t() :: %Socket{
          absinthe: map(),
          assigns: map(),
          connect_info: map(),
          endpoint: module(),
          initialized?: boolean(),
          keepalive: integer(),
          subscriptions: map()
        }
  @type socket() :: t()

  @typedoc """
  Opcode atoms for messages handled by `handle_control/2`. Used by server-side keepalive messages.
  """
  @type control() ::
          :ping
          | :pong

  @typedoc """
  Opcode atoms for messages pushed to the client.
  """
  @type opcode() ::
          :text
          | :binary
          | control()

  @typedoc """
  JSON that conforms to the `graphql-ws` protocol.
  """
  @type message() :: binary()

  @typedoc """
  A websocket frame to send to the client.
  """
  @type frame() :: {opcode(), message()}
  @typedoc """
  Used internally by `Absinthe.GraphqlWS.Transport.handle_in/2`.

  These are return values to incoming messages from a websocket.

  ## Values

  * `{:ok, socket}` - save new socket state, without sending any data to the client.
  * `{:reply, :ok, {:text, "{}"}, socket}` - send JSON content to the client.
  * `{:reply, :error, {:text, "{}"}, socket}` - send an error with JSON payload to the client.
  * `{:stop, :normal, socket}` - shut down the socket process.
  """
  @type reply_inbound() ::
          {:ok, socket()}
          | {:reply, :ok, frame(), socket()}
          | {:reply, :error, frame(), socket()}
          | {:stop, term(), socket()}

  @typedoc """
  Valid return values from `c:handle_message/2`.

  These are return values to messages that have been received from within Elixir

  ## Values

  * `{:ok, socket}` - save new socket state, without sending any data to the client.
  * `{:push, {:text, Message.Next.new(id, %{})}, socket}` - save new socket state, and send data to the client.
  * `{:stop, :reason, socket}` - stop the socket.
  """
  @type reply_message() ::
          {:ok, socket()}
          | {:push, frame(), socket()}
          | {:stop, term(), socket()}

  @typedoc """
  Return values from `c:handle_init/2`.
  """
  @type init() ::
          {:ok, map(), socket()}
          | {:error, map(), socket()}
          | {:stop, term(), socket()}

  @doc """
  Handles messages that are sent to this process through `send/2`, which have not been caught
  by the default implementation. It must return a `t:reply_message/0`.

  If pushing content to the websocket, it must return a tuple in the form
  `{:push, {:text, message}, socket}`, where `message` is JSON that represents a valid `grapql-ws`
  message.

  ## Example

      alias Absinthe.GraphqlWS.Message

      def handle_message({:thing, thing}, socket) do
        {:ok, assign(socket, :thing, thing)}
      end

      def handle_message({:send, id, payload}, socket) do
        {:push, {:text, Message.Next.new(id, payload)}, socket}
      end

      def handle_message(_msg, socket) do
        {:ok, socket}
      end
  """
  @callback handle_message(params :: term(), socket()) :: Socket.reply_message()

  @doc """
  Handle the `connection_init` message sent by the socket implementation. This will receive
  the `payload` from the message, defaulting to an empty map if received from the client.

  This can be used for custom authentication/authorization, using
  `Absinthe.GraphqlWS.Util.assign_context/2` to modify the Absinthe context.

  In case the user is authenticated through session cookies, the session data may be accessed in
  the socket's `:connect_info` field. Note that you need to send a `_csrf_token` param in the URL to effectively receive
  the session info (or else the session will be `nil`). For more information, visit the Phoenix Endpoint docs:
  https://hexdocs.pm/phoenix/Phoenix.Endpoint.html#socket/3-common-configuration

  ## Example

      defmodule MySocket do
        use Absinthe.GraphqlWS.Socket, schema: MySchema

        def handle_init(%{"user_id" => user_id}, socket) do
          case find_user(user_id) do
            nil ->
              {:error, %{}, socket}
            user ->
              socket = assign_context(socket, current_user: user)
              {:ok, %{name: user.name}, socket}
          end
        end
      end
  """
  @callback handle_init(payload :: map(), socket()) :: Socket.init()

  @optional_callbacks handle_message: 2, handle_init: 2

  @spec __after_compile__(any(), any()) :: :ok
  def __after_compile__(env, _bytecode) do
    opts = Module.get_attribute(env.module, :graphql_ws_socket_opts)

    unless Keyword.has_key?(opts, :schema) do
      :elixir_errors.erl_warn(env.line, env.file, "#{env.module} must specify `:schema` when using Absinthe.GraphqlWS.Socket")
    end

    :ok
  end

  defmacro __using__(opts) do
    quote do
      @graphql_ws_socket_opts unquote(opts)
      @after_compile Absinthe.GraphqlWS.Socket

      import Absinthe.GraphqlWS.Util
      alias Absinthe.GraphqlWS.Socket

      @behaviour Phoenix.Socket.Transport
      @behaviour Absinthe.GraphqlWS.Socket

      @doc false
      @impl Phoenix.Socket.Transport
      def child_spec(opts) do
        Socket.__child_spec__(__MODULE__, opts, @graphql_ws_socket_opts)
      end

      @doc false
      @impl Phoenix.Socket.Transport
      def connect(transport) do
        Socket.__connect__(__MODULE__, transport, @graphql_ws_socket_opts)
      end

      @doc false
      @impl Phoenix.Socket.Transport
      def init(socket) do
        if socket.keepalive > 0,
          do: Process.send_after(self(), :keepalive, socket.keepalive)

        {:ok, socket}
      end

      @doc false
      @impl Phoenix.Socket.Transport
      def handle_control(message, socket),
        do: Absinthe.GraphqlWS.Transport.handle_control(message, socket)

      @doc false
      @impl Phoenix.Socket.Transport
      def handle_in(message, socket),
        do: Absinthe.GraphqlWS.Transport.handle_in(message, socket)

      @doc false
      @impl Phoenix.Socket.Transport
      def handle_info(message, socket),
        do: Absinthe.GraphqlWS.Transport.handle_info(message, socket)

      @doc false
      @impl Phoenix.Socket.Transport
      def terminate(message, socket),
        do: Absinthe.GraphqlWS.Transport.terminate(message, socket)

      defoverridable terminate: 2
    end
  end

  defmacrop debug(msg), do: quote(do: Logger.debug("[graph-socket@#{inspect(self())}] #{unquote(msg)}"))

  @doc false
  def new(attrs \\ []), do: __struct__(attrs)

  @doc """
  Provides a stub implementation that allows the socket to start. Phoenix.Socket.Transport
  expects a child spec that starts a process; we do so with a noop Task.
  """
  def __child_spec__(module, _opts, _socket_opts) do
    %{id: {__MODULE__, module}, start: {Task, :start_link, [fn -> :ok end]}, restart: :transient}
  end

  @doc """
  When a client connects to this websocket, this function is called to initialize the socket.
  """
  @spec __connect__(module(), map(), Keyword.t()) :: {:ok, socket()}
  def __connect__(module, socket, options) do
    absinthe_pipeline = Keyword.get(options, :pipeline, {__MODULE__, :absinthe_pipeline})
    pubsub = socket.endpoint.config(:pubsub_server)
    schema = Keyword.fetch!(options, :schema)
    keepalive = Keyword.get(options, :keepalive, @default_keepalive)

    absinthe_config = %{
      opts: [
        context: %{
          pubsub: socket.endpoint
        }
      ],
      pipeline: absinthe_pipeline,
      schema: schema
    }

    socket =
      Socket.new(
        absinthe: absinthe_config,
        connect_info: socket.connect_info,
        endpoint: socket.endpoint,
        handler: module,
        keepalive: keepalive,
        pubsub: pubsub
      )

    debug("connect: #{socket}")

    {:ok, socket}
  end

  @doc """
  Provides the default absinthe pipeline.

  ## Params

  * `schema` - An `Absinthe.Schema.t()`
  * `options` - A keyword list with the current context, variables, etc for the
    current query.
  """
  @spec absinthe_pipeline(Absinthe.Schema.t(), Keyword.t()) :: Absinthe.Pipeline.t()
  def absinthe_pipeline(schema, options) do
    schema
    |> Absinthe.Pipeline.for_document(options)
  end

  defimpl String.Chars do
    def to_string(socket) do
      handler = Module.split(socket.handler) |> Enum.join(".")
      connect_info = Map.keys(socket.connect_info) |> inspect()
      "#Socket<handler=#{handler}, connect_info=#{connect_info}, keepalive=#{keepalive(socket.keepalive)}>"
    end

    defp keepalive(0), do: "disabled"
    defp keepalive(value) when value > 10_000, do: "#{value / 1000}s"
    defp keepalive(value), do: "#{value}ms"
  end
end