lib/slipstream.ex

defmodule Slipstream do
  @moduledoc """
  A websocket client for Phoenix channels

  Slipstream is a websocket client for connection to `Phoenix.Channel`s.
  Slipstream is a bit different from existing websocket implementations in that:

  - it's backed by `Mint.WebSocket`
  - it has an `await_*` interface for performing actions synchronously
  - smart retry strategies for reconnection and rejoining work out-of-the-box
  - a testing framework for clients
  - high-level and low-level instrumentation with `:telemetry`

  ## Basic Usage

  The intended use for Slipstream is to write asynchronous, callback-oriented
  GenServer-like modules that define socket clients. This approach makes it
  easy to write socket clients that resemble state-machines. A minimalistic
  example usage might be like so:

      defmodule MyApp.MySocketClient do
        @moduledoc \"\"\"
        A socket client for connecting to that other Phoenix server

        Periodically sends pings and asks the other server for its metrics.
        \"\"\"

        use Slipstream,
          restart: :temporary

        require Logger

        @topic "backend-service:money-server"

        def start_link(args) do
          Slipstream.start_link(__MODULE__, args, name: __MODULE__)
        end

        @impl Slipstream
        def init(config) do
          {:ok, connect!(config), {:continue, :start_ping}}
        end

        @impl Slipstream
        def handle_continue(:start_ping, socket) do
          timer = :timer.send_interval(1000, self(), :request_metrics)

          {:noreply, assign(socket, :ping_timer, timer)}
        end

        @impl Slipstream
        def handle_connect(socket) do
          {:ok, join(socket, @topic)}
        end

        @impl Slipstream
        def handle_join(@topic, _join_response, socket) do
          # an asynchronous push with no reply:
          push(socket, @topic, "hello", %{})

          {:ok, socket}
        end

        @impl Slipstream
        def handle_info(:request_metrics, socket) do
          # we will asynchronously receive a reply and handle it in the
          # handle_reply/3 implementation below
          {:ok, ref} = push(socket, @topic, "get_metrics", %{format: "json"})

          {:noreply, assign(socket, :metrics_request, ref)}
        end

        @impl Slipstream
        def handle_reply(ref, metrics, socket) do
          if ref == socket.assigns.metrics_request do
            :ok = MyApp.MetricsPublisher.publish(metrics)
          end

          {:ok, socket}
        end

        @impl Slipstream
        def handle_message(@topic, event, message, socket) do
          Logger.error(
            "Was not expecting a push from the server. Heard: " <>
              inspect({@topic, event, message})
          )

          {:ok, socket}
        end

        @impl Slipstream
        def handle_disconnect(_reason, socket) do
          :timer.cancel(socket.assigns.ping_timer)

          {:stop, :normal, socket}
        end
      end

  ## Synchronicity

  Slipstream is designed to work asynchronously by default. Requests such
  as `connect/2`, `join/3`, and `push/4` are asynchronous requests. When
  the remote server replies, the associated callback will be invoked
  (`c:handle_connect/1`, `c:handle_join/3`, and `c:handle_reply/3` in cases
  of success, respectively). For all of these operations, though, you may
  await the outcome of the asynchronous request with `await_*` functions. E.g.

      iex> {:ok, ref} = push(socket, "room:lobby", "msg:new", %{user: 1, msg: "foo"})
      iex> {:ok, %{"created" => true}} = await_reply(ref)

  Note that all `await_*` functions must be called from the slipstream process
  that emitted the request, or else they will timeout.

  While Slipstream provides a rich toolset for synchronicity, the asynchronous,
  callback-based workflow is recommended.

  ## GenServer operations

  Note that Slipstream is in many ways a simple wrapper around a GenServer.
  As such, all GenServer functionality is possible with Slipstream clients,
  such as `Kernel.send/2` or `GenServer.call/3`. For example, assume you have
  a slipstream client written like so:

      defmodule MyClient do
        use Slipstream

        require Logger

        def start_link(args) do
          Slipstream.start_link(__MODULE__, args, name: __MODULE__)
        end

        @impl Slipstream
        def init(config), do: connect(config)

        @impl Slipstream
        def handle_cast(:ping, socket) do
          Logger.info("pong")

          {:noreply, socket}
        end

        @impl Slipstream
        def handle_info(:hello, socket) do
          Logger.info("hello")

          {:noreply, socket}
        end

        @impl Slipstream
        def handle_call(:foo, _from, socket) do
          {:reply, {:ok, :bar}, socket}
        end

        ..
      end

  This `MyClient` client is a GenServer, so the following are valid ways to
  interact with `MyClient`:

      iex> GenServer.cast(MyClient, :ping)
      [info] pong
      :ok
      iex> MyClient |> GenServer.whereis |> send(:hello)
      [info] hello
      :hello
      iex> GenServer.call(MyClient, :foo)
      {:ok, :bar}

  ## Retry Mechanisms

  Slipstream emulates the official `phoenix.js` package with its reconnection
  and re-join features. `Slipstream.Configuration` allows configuration of the
  back-off times with the `:reconnect_after_msec` and `:rejoin_after_msec`
  lists, respectively.

  To take advantage of these built-in mechanisms, a client must be written
  in the asynchronous GenServer-like manner and must use the `reconnect/1` and
  `rejoin/3` functions in its `c:Slipstream.handle_disconnect/2` and
  `c:Slipstream.handle_topic_close/3` callbacks, respectively. Note that the
  default implementation of these callbacks invokes these functions, so a client
  which does not explicitly define these callbacks will retry connection and
  joins.

  Take care to handle the `:left` case of `c:Slipstream.handle_topic_close/3`.
  In the case that a client attempts to leave a topic with `leave/2`, the
  callback will be invoked with a `reason` of `:left`. The default
  implementation of `c:Slipstream.handle_topic_close/3` makes this distinction
  and simply no-ops on channel leaves.

      defmodule MyClientWithRetry do
        use Slipstream

        def start_link(config) do
          Slipstream.start_link(__MODULE__, config, name: __MODULE__)
        end

        @impl Slipstream
        def init(config), do: connect(config)

        @impl Slipstream
        def handle_connect(socket) do
          {:ok, join(socket, "rooms:lobby", %{user_id: 1})}
        end

        @impl Slipstream
        def handle_disconnect(_reason, socket) do
          case reconnect(socket) do
            {:ok, socket} -> {:ok, socket}
            {:error, reason} -> {:stop, reason, socket}
          end
        end

        @impl Slipstream
        def handle_topic_close(topic, _reason, socket) do
          rejoin(socket, topic)
        end
      end
  """

  alias Slipstream.{Commands, Events, Socket, TelemetryHelper}
  import Slipstream.CommandRouter, only: [route_command: 1]
  import Slipstream.Signatures, only: [event: 1, command: 1]

  # 5s default await timeout, same as GenServer calls
  @default_timeout 5_000

  @typedoc """
  Any data structure capable of being serialized as JSON

  Any argument typed as `t:Slipstream.json_serializable/0` must be able to
  be encoded with the JSON parser passed in configuration. See
  `Slipstream.Configuration`.
  """
  @typedoc since: "0.1.0"
  @type json_serializable :: term()

  @typedoc """
  A reference to a message pushed by the client

  These references are returned by calls to `push/4` and may be matched on
  in `c:handle_reply/3`. They are also used to match messages
  for `await_reply/2`.

  ## Synchronicity

  This approach treats the websocket connection as an RPC: some other process
  in the service does a `GenServer.call/3` to the slipstream client process,
  which sends a push to the remote websocket server, waits for a reply
  (synchronously) and then sends that back to the caller. All-in-all, this
  appears completely synchronous for the caller.

      @impl Slipstream
      def handle_call({:new_message, params}, _from, socket) do
        {:ok, ref} = push(socket, "rooms:lobby", "msg:new", params)

        {:reply, await_reply(ref), socket}
      end

  This approach is written in a more asynchronous fashion. An info message
  arriving from any other process triggers the slipstream client to push a
  work message to the remote websocket server. When the remote websocket server
  replies with the result, the slipstream client sends off the result to be
  dealt with else-where. No process in this scenario blocks, so they are all
  capable of receiving other messages while the work is being completed.

      @impl Slipstream
      def handle_info(:do_work, socket) do
        ref = push(socket, "worker_queue:foo", "do_work", %{})

        {:noreply, assign(socket, :work_ref, ref)}
      end

      @impl Slipstream
      def handle_reply(ref, result, %{assigns: %{work_ref: ref}} = socket) do
        IO.inspect(result, label: "work complete!")

        {:ok, socket}
      end
  """
  @typedoc since: "0.1.0"
  @type push_reference() :: String.t()

  @typedoc """
  A reply from a remote server to a push from the client

  Replies may be any of

  - `:ok`
  - `:error`
  - `{:ok, any()}`
  - `{:error, any()}`

  depending on how the remote server's reply is written.

  Note that the empty map is removed in ok and error tuples, so a reply written
  like so on the server-side:

      def handle_in(_event, _params, socket) do
        {:reply, {:ok, %{}}, socket}
      end

  will translate to a reply of `:ok` (and the same for `{:error, %{}}`).

  ## Examples

      # on the Phoenix.Channel (server) side:
      def handle_in(_event, _params, socket) do
        {:reply, {:ok, %{created?: true}}, socket}
      end

      # on the Slipstream (client) side:
      def handle_reply(_ref, {:ok, %{"created?" => true}} = _reply, socket) do
        ..
  """
  @typedoc since: "0.1.0"
  @type reply() ::
          :ok
          | :error
          | {:ok, json_serializable()}
          | {:error, json_serializable()}

  # the family of GenServer-wrapping callbacks

  @doc ~S"""
  Invoked when the slipstream process in started

  Behaves the same as `c:GenServer.init/1`, but the return state must be a
  new `t:Slipstream.Socket.t/0`. Values from `c:init/1` that you'd
  like to keep in state can be stored with `Slipstream.Socket.assign/3`.

  This callback is a good place to request connection with `connect/2`. Note
  that `connect/2` is an asynchronous request for connection. Awaiting
  connection with `await_connect/2` is unwise in many scenarios, however,
  because failure to connect may result in an exit from the process, crashing
  the supervision tree that started the process. If you wish to connect
  synchronously upon init, a better approach could be:

      @impl Slipstream
      def init(_args) do
        config = Application.fetch_env!(:my_app, __MODULE__)
        socket = new_socket() |> assign(:connect_config, config)

        {:ok, socket, {:continue, :connect}}
      end

      @impl Slipstream
      def handle_continue(:connect, socket) do
        {:ok, socket} = connect(socket, socket.assigns.connect_config)

        {:ok, socket} = await_connect(socket)

        {:noreply, socket}
      end

  But a more minimalistic approach that still provides safety in cases of
  configuration validation failures would be:

      defmodule MySocketClient do
        use Slipstream

        def start_link(args) do
          Slipstream.start_link(__MODULE__, args, name: __MODULE__)
        end

        @impl Slipstream
        def init(_args) do
          config = Application.fetch_env!(:my_app, __MODULE__)

          case connect(config) do
            {:ok socket} ->
              {:ok, socket}

            {:error, reason} ->
              Logger.error("Could not start #{__MODULE__} because of " <>
                "validation failure: #{inspect(reason)}")

              :ignore
          end
        end

        ..
      end

  The configuration could be stored in application config:

      # config/<env>.exs
      config :my_app, MySocketClient,
        uri: "ws://example.org/socket/websocket",
        reconnect_after_msec: [200, 500, 1_000, 2_000]

  And in cases where the configuration validation fails, the `MySocketClient`
  process will not crash the application's supervision tree.

  ## Examples

      @impl Slipstream
      def init(config) do
        connect(config)
      end
  """
  @doc since: "0.1.0"
  @callback init(init_arg :: any()) ::
              {:ok, state}
              | {:ok, state, timeout() | :hibernate | {:continue, term()}}
              | :ignore
              | {:stop, reason :: any()}
            when state: Socket.t()

  @doc """
  Invoked as a continuation of another GenServer callback

  GenServer callbacks may end with signatures that declare that the next
  function invoked should be a continuation. E.g.

      def init(state) do
        {:ok, state, {:continue, :my_continue}}
      end

      # this will be invoked immediately after `init/1`
      def handle_continue(:my_continue, state) do
        # do something with state

        {:norelpy, state}
      end

  This provides a way to schedule work to occur immediately after
  successful initialization or to break work across multiple callbacks, which
  can be useful for clients which are state-machine-like.

  See `c:GenServer.handle_continue/2` for more information.
  """
  @doc since: "0.1.0"
  @callback handle_continue(continue :: term(), state :: Socket.t()) ::
              {:noreply, new_state}
              | {:noreply, new_state,
                 timeout() | :hibernate | {:continue, term()}}
              | {:stop, reason :: term(), new_state}
            when new_state: Socket.t()

  @doc """
  Invoked when a slipstream process receives a message

  Behaves the same as `c:GenServer.handle_info/2`
  """
  @doc since: "0.1.0"
  @callback handle_info(msg :: term(), socket :: Socket.t()) ::
              {:noreply, new_socket}
              | {:noreply, new_socket,
                 timeout() | :hibernate | {:continue, term()}}
              | {:stop, reason :: term(), new_socket}
            when new_socket: Socket.t()

  @doc """
  Invoked when a slipstream process receives a GenServer cast

  Behaves the same as `c:GenServer.handle_cast/2`
  """
  @doc since: "0.1.0"
  @callback handle_cast(msg :: term(), socket :: Socket.t()) ::
              {:noreply, new_socket}
              | {:noreply, new_socket,
                 timeout() | :hibernate | {:continue, term()}}
              | {:stop, reason :: term(), new_socket}
            when new_socket: Socket.t()

  @doc """
  Invoked when a slipstream process receives a GenServer call

  Behaves the same as `c:GenServer.handle_call/3`
  """
  @doc since: "0.1.0"
  @callback handle_call(
              request :: term(),
              from :: GenServer.from(),
              socket :: Socket.t()
            ) ::
              {:reply, reply, new_socket}
              | {:reply, reply, new_socket,
                 timeout() | :hibernate | {:continue, term()}}
              | {:noreply, new_socket}
              | {:noreply, new_socket,
                 timeout() | :hibernate | {:continue, term()}}
              | {:stop, reason, new_socket}
              | {:stop, reason, reply, new_socket}
            when new_socket: Socket.t(), reply: term(), reason: term()

  @doc """
  Invoked when a slipstream process is terminated

  Note that this callback is not always invoked as the process shuts down.
  See `c:GenServer.terminate/2` for more information.

  It is wise to `disconnect/1` in this callback (and such is the default
  implementation). This will gracefully end the websocket connection.
  This is the behavior of the default implementation of `c:terminate/2`.

  ## Examples

      @impl Slipstream
      def terminate(reason, socket) do
        Logger.debug("shutting down: " <> inspect(reason))

        disconnect(socket)
      end
  """
  @doc since: "0.1.0"
  @callback terminate(reason :: term(), socket :: Socket.t()) :: term()

  # callbacks unique to Slipstream ('novel' callbacks)

  @doc """
  Invoked when a connection has been established to a websocket server

  This callback provides a good place to `join/3`.

  ## Examples

      @impl Slipstream
      def handle_connect(socket) do
        {:noreply, socket}
      end
  """
  @doc since: "0.1.0"
  @callback handle_connect(socket :: Socket.t()) ::
              {:ok, new_socket}
              | {:stop, reason :: term(), new_socket}
            when new_socket: Socket.t()

  @doc """
  Invoked when a connection has been terminated

  The default implementation of this callback requests reconnection

  ## Examples

      @impl Slipstream
      def handle_disconnect(_reason, socket) do
        case reconnect(socket) do
          {:ok, socket} -> {:ok, socket}
          {:error, reason} -> {:stop, reason, socket}
        end
      end
  """
  @doc since: "0.1.0"
  @callback handle_disconnect(reason :: term(), socket :: Socket.t()) ::
              {:ok, new_socket}
              | {:stop, stop_reason :: term(), new_socket}
            when new_socket: Socket.t()

  @doc """
  Invoked when the websocket server replies to the request to join

  ## Examples

      @impl Slipstream
      def handle_join("rooms:echo", %{}, socket) do
        push(socket, "rooms:echo", "echo", %{"ping" => 1})

        {:ok, socket}
      end
  """
  @doc since: "0.1.0"
  @callback handle_join(
              topic :: String.t(),
              response :: json_serializable(),
              socket :: Socket.t()
            ) ::
              {:ok, new_socket}
              | {:stop, reason :: term(), new_socket}
            when new_socket: Socket.t()

  @doc """
  Invoked when a message is received on the websocket connection

  This callback will not be invoked for a message which is a reply. Those
  messages will be handled in `c:handle_reply/3`.

  Note that while replying is supported on the server-side of the Phoenix
  Channel protocol, it is not supported by a client. Messages sent from
  the server cannot be directly replied to.

  ## Examples

      @impl Slipstream
      def handle_message("room:lobby", "new:msg", params, socket) do
        MyApp.Msg.create(params)

        {:ok, socket}
      end
  """
  @doc since: "0.1.0"
  @callback handle_message(
              topic :: String.t(),
              event :: String.t(),
              message :: any(),
              socket :: Socket.t()
            ) ::
              {:ok, new_socket}
              | {:stop, reason :: term(), new_socket}
            when new_socket: Socket.t()

  @doc """
  Invoked when a message is received on the websocket connection which
  references a push from this client process.

  `ref` is the string reference returned from the `push/2` which resulted in
  this reply.

  ## Examples

      @impl Slipstream
      def handle_join(topic, _params, socket) do
        my_req = push(socket, topic, "msg:new", %{"foo" => "bar"})

        {:ok, assign(socket, :request, my_req)}
      end

      @impl Slipstream
      def handle_reply(ref, reply, %{assigns: %{request: ref}} = socket) do
        IO.inspect(reply, label: "reply to my request")

        {:ok, socket}
      end
  """
  @doc since: "0.1.0"
  @callback handle_reply(
              ref :: push_reference(),
              message :: reply(),
              socket :: Socket.t()
            ) ::
              {:ok, new_socket}
              | {:stop, reason :: term(), new_socket}
            when new_socket: Socket.t()

  @doc """
  Invoked when a join has concluded

  This callback will be invoked in the case that the remote channel crashes.
  `reason` is an error tuple `{:error, params :: json_serializable()}` where
  `params` is the message sent from the remote channel on crash.

  The default implementation of this callback attempts to re-join the
  last-joined topic.

  ## Examples

      @impl Slipstream
      def handle_topic_close(topic, _reason, socket) do
        {:ok, socket} = rejoin(socket, topic)
      end
  """
  @doc since: "0.1.0"
  @callback handle_topic_close(
              topic :: String.t(),
              reason :: term(),
              socket :: Socket.t()
            ) ::
              {:ok, new_socket}
              | {:stop, stop_reason :: term(), new_socket}
            when new_socket: Socket.t()

  @doc """
  Invoked when a join has concluded by a leave request

  This callback is invoked when the remote server acknowledges that the client
  has disconnected as a result of calling `leave/2`.

  The default implementation of this callback performs a no-op.

  ## Examples

      @impl Slipstream
      def handle_leave(topic, socket) do
        Logger.info("Successfully left topic " <> topic)

        {:ok, socket}
      end
  """
  @doc since: "0.7.0"
  @callback handle_leave(
              topic :: String.t(),
              socket :: Socket.t()
            ) ::
              {:ok, new_socket}
              | {:stop, stop_reason :: term(), new_socket}
            when new_socket: Socket.t()

  @optional_callbacks init: 1,
                      handle_info: 2,
                      handle_cast: 2,
                      handle_call: 3,
                      handle_continue: 2,
                      terminate: 2,
                      handle_connect: 1,
                      handle_disconnect: 2,
                      handle_join: 3,
                      handle_message: 4,
                      handle_reply: 3,
                      handle_topic_close: 3,
                      handle_leave: 2

  # --- core functionality

  @doc """
  Starts a slipstream client process

  This function delegates to `GenServer.start_link/3`, so all options passable
  to that function are passable here as well. Most notably, you may name your
  slipstream clients using the same naming rules as GenServers, as in the
  example below.

  ## Examples

      defmodule MySlipstreamClient do
        use Slipstream

        def start_link(args) do
          Slipstream.start_link(__MODULE__, args, name: __MODULE__)
        end

        ..
      end
  """
  @spec start_link(module(), any()) :: GenServer.on_start()
  @spec start_link(module(), any(), GenServer.options()) :: GenServer.on_start()
  defdelegate start_link(module, init_arg, options \\ []), to: GenServer

  @doc """
  Creates a new socket without immediately connecting to a remote websocket

  This can be useful if you do not wish to request connection with `connect/2`
  during the `c:init/1` callback (because the `c:init/1` callback requires that
  you return a `t:Slipstream.Socket.t/0`).

  ## Examples

      defmodule MySocketClient do
        use Slipstream

        ..

        @impl Slipstream
        def init(args) do
          {:ok, new_socket() |> assign(:init_args, args)}
        end

        ..
      end

      iex> new_socket()
      #Slipstream.Socket<assigns: %{}, ...>
  """
  @doc since: "0.1.0"
  @spec new_socket() :: Socket.t()
  defdelegate new_socket(), to: Socket, as: :new

  @doc """
  Requests connection to the remote endpoint

  `opts` are passed to `Slipstream.Configuration.validate/1` before sending.

  Note that this request for connection is asynchronous. A return value of
  `{:ok, socket}` does not mean that a connection has successfully been
  established.

  ## Examples

      {:ok, socket} = connect(uri: "ws://localhost:4000/socket/websocket")
  """
  @doc since: "0.1.0"
  @spec connect(opts :: Keyword.t()) ::
          {:ok, Socket.t()} | {:error, NimbleOptions.ValidationError.t()}
  @spec connect(socket :: Socket.t(), opts :: Keyword.t()) ::
          {:ok, Socket.t()} | {:error, NimbleOptions.ValidationError.t()}
  def connect(socket \\ new_socket(), opts) do
    case Slipstream.Configuration.validate(opts) do
      {:ok, config} ->
        socket = TelemetryHelper.begin_connect(socket, config)

        route_command %Commands.OpenConnection{config: config, socket: socket}

        {:ok, %Socket{socket | channel_config: config}}

      {:error, reason} ->
        {:error, reason}
    end
  end

  @doc """
  Same as `connect/2` but raises on configuration validation error

  Note that `connect!/2` will not necessarily raise an error on failure to
  connect. The `!` only pertains to the potential for raising when the
  configuration is invalid.
  """
  @doc since: "0.1.0"
  @spec connect!(opts :: Keyword.t()) :: Socket.t()
  @spec connect!(socket :: Socket.t(), opts :: Keyword.t()) :: Socket.t()
  def connect!(socket \\ new_socket(), opts) do
    config = Slipstream.Configuration.validate!(opts)
    socket = TelemetryHelper.begin_connect(socket, config)

    route_command %Commands.OpenConnection{config: config, socket: socket}

    %Socket{socket | channel_config: config}
  end

  @doc """
  Request reconnection given the last-used connection configuration

  Note that when `reconnect/1` is used to re-connect instead of `connect/2`
  (or `connect!/2`), the slipstream process will attempt to reconnect with
  a retry mechanism with backoff. The process will wait an interval between
  reconnection attempts following the list of milliseconds provided in the
  `:reconnect_after_msec` key of configuration passed to `connect/2` (or
  `connect!/2`).

  The `c:handle_disconnect/2` callback will be invoked for each
  failure to re-connect, however, so an implementation of that callback which
  will simply retry with backoff can be achieved like so:

      @impl Slipstream
      def handle_disconnect(_reason, socket) do
        case reconnect(socket) do
          {:ok, socket} -> {:ok, socket}
          {:error, reason} -> {:stop, reason, socket}
        end
      end

  `reconnect/1` may return an `:error` tuple in the case that the socket passed
  does not contain any connection information (which is added to the socket
  with `connect/2` or `connect!/2`), or if the socket is currently connected.
  For a `reconnect/1` call without configuration, the return pattern is
  `{:error, :no_config}`, and for a socket that is already connected, the
  pattern is `{:error, :connected}`.

  A reconnect may be awaited with `await_connect/2`.
  """
  @doc since: "0.1.0"
  @spec reconnect(socket :: Socket.t()) ::
          {:ok, Socket.t()} | {:error, :no_config | :not_connected}
  def reconnect(socket) do
    with false <- Socket.connected?(socket),
         %Slipstream.Configuration{} = config <- socket.channel_config,
         {time, socket} <- Socket.next_reconnect_time(socket) do
      command = %Commands.OpenConnection{socket: socket, config: config}

      Process.send_after(
        socket.socket_pid,
        command(command),
        time
      )

      {:ok, socket}
    else
      nil -> {:error, :no_config}
      true -> {:error, :connected}
    end
  end

  @doc """
  Requests that a topic be joined in the current connection

  Multiple topics may be joined by one Slipstream client, but each topic
  may only be joined once. Despite this, `join/3` may be called on the same
  topic multiple times, but the result will be idempotent. The client will
  not request to join unless it has not yet joined that topic. In cases where
  you wish to begin a new session with a topic, you must first `leave/2` and
  then `join/3` again.

  The request to join will not error-out if the client is not connected to a
  remote server. In that case, the `join/3` function will act as a no-op.

  A join can be awaited in a blocking fashion with `await_join/3`.

  ## Examples

      @impl Slipstream
      def handle_connect(socket) do
        {:ok, join(socket, "rooms:lobby", %{user: 1})}
      end
  """
  @doc since: "0.1.0"
  @spec join(socket :: Socket.t(), topic :: String.t()) :: Socket.t()
  @spec join(
          socket :: Socket.t(),
          topic :: String.t(),
          params :: json_serializable()
        ) :: Socket.t()
  def join(%Socket{} = socket, topic, params \\ %{}) when is_binary(topic) do
    if Socket.connected?(socket) and
         Socket.join_status(socket, topic) in [nil, :closed] do
      socket = TelemetryHelper.begin_join(socket, topic, params)

      route_command %Commands.JoinTopic{
        socket: socket,
        topic: topic,
        payload: params
      }

      Socket.put_join_config(socket, topic, params)
    else
      socket
    end
  end

  @doc """
  Requests that the specified topic be joined again

  If `params` is not provided, the previously used value will be sent.

  In the case that the specified topic has not been joined before, `rejoin/3`
  will return `{:error, :never_joined}`.

  Note that a rejoin may be awaited with `await_join/3`.

  ## Dealing with crashes

  When attempting to re-join a disconnected topic with `rejoin/3`, the
  Slipstream process will attempt to use backoff governed by the
  `:rejoin_after_msec` list in configuration passed to `connect/2` (or
  `connect!/2`).

  The `c:handle_topic_close/3` callback will be invoked with
  the for each crash, however, so a minimal implementation of that callback
  which achieves the backoff retry is like so:

  ## Examples

      @impl Slipstream
      def handle_topic_close(topic, _reason, socket) do
        {:ok, _socket} = rejoin(socket, topic)
      end
  """
  @doc since: "0.1.0"
  @spec rejoin(socket :: Socket.t(), topic :: String.t()) ::
          {:ok, Socket.t()} | {:error, :never_joined}
  @spec rejoin(
          socket :: Socket.t(),
          topic :: String.t(),
          params :: json_serializable()
        ) :: {:ok, Socket.t()} | {:error, :never_joined}
  def rejoin(%Socket{} = socket, topic, params \\ nil) when is_binary(topic) do
    with {:ok, %{status: :closed} = prior_join} <-
           Map.fetch(socket.joins, topic),
         {time, socket} <- Socket.next_rejoin_time(socket, topic) do
      command = %Commands.JoinTopic{
        socket: socket,
        topic: topic,
        payload: params || prior_join.params
      }

      Process.send_after(socket.socket_pid, command(command), time)

      {:ok, socket}
    else
      {:ok, %{status: already_joined}}
      when already_joined in [:requested, :joined] ->
        {:ok, socket}

      :error ->
        {:error, :never_joined}
    end
  end

  @doc """
  Requests that the given topic be left

  Note that like joining, leaving is an asynchronous request and can be awaited
  with `await_leave/3`.

  Also similar to `join/3`, `leave/2` is idempotent and will not raise an error
  if the provided topic is not currently joined.

  ## Examples

      iex> {:ok, socket} = leave(socket, "room:lobby") |> await_leave("rooms:lobby")
      iex> join(socket, "rooms:specific")
  """
  @doc since: "0.1.0"
  @spec leave(socket :: Socket.t(), topic :: String.t()) :: Socket.t()
  def leave(%Socket{} = socket, topic) when is_binary(topic) do
    if Socket.joined?(socket, topic) do
      route_command %Commands.LeaveTopic{socket: socket, topic: topic}
    end

    socket
  end

  @doc """
  Requests that a message be pushed on the websocket connection

  A channel has the ability to reply directly to a message, but this reply
  is asynchronous. Handle replies using the `c:handle_reply/3`
  callback or by awaiting them synchronously with `await_reply/2`.

  Although this request to the remote server is asynchronous, the call to the
  transport process to transmit the push is synchronous and will exert
  back-pressure on calls to `push/4`, as `push/4` blocks until the message
  has been sent by the transport.

  If you are pushing especially large messages, you may need to adjust the
  `timeout` argument so that the GenServer call does not exit with `:timeout`.
  The default value is `5_000` msec (5 seconds).

  A phoenix channel may decide to reply to a message sent with `push/2`. In
  order to link push requests to their replies, store the `ref` string returned
  from the call to `push/4` and match on it in `c:handle_reply/3`.

  ## Examples

      @impl Slipstream
      def handle_join(:success, _response, state) do
        {:ok, hello_request} = push(socket, "rooms:lobby", "new:msg", %{user: 1, body: "hello"})

        {:ok, Map.put(state, :hello_request, hello_request)}
      end

      @impl Slipstream
      def handle_reply(ref, reply, %{hello_request: ref} = state) do
        IO.inspect(reply, label: "nice, a response.")

        {:ok, state}
      end
  """
  @doc since: "0.1.0"
  @spec push(
          socket :: Socket.t(),
          topic :: String.t(),
          event :: String.t(),
          params :: json_serializable() | {:binary, binary()},
          timeout :: timeout()
        ) :: {:ok, push_reference()} | {:error, reason :: term()}
  def push(
        %Socket{} = socket,
        topic,
        event,
        params,
        timeout \\ @default_timeout
      )
      when is_binary(topic) and is_binary(event) do
    command = %Commands.PushMessage{
      socket: socket,
      topic: topic,
      event: event,
      payload: params,
      timeout: timeout
    }

    with true <- Socket.joined?(socket, topic),
         {:ok, ref} <- route_command(command) do
      {:ok, ref}
    else
      false ->
        {:error, :not_joined}

      # e.g. if the genserver call fails by timeout
      # we're not gonna test that though. imagine how big the message would have
      # to be
      # coveralls-ignore-start
      other_return ->
        other_return

        # coveralls-ignore-stop
    end
  end

  @doc """
  Pushes, raising if the topic is not joined or if the channel is dead

  Same as `push/4`, but raises in cases of failure.

  This can be useful for pipeing into `await_reply/2`

  ## Examples

      iex> {:ok, result} = push!(socket, "rooms:lobby", "msg:new", params) |> await_reply()
      {:ok, %{"created?" => true}}
  """
  @doc since: "0.1.0"
  @spec push!(
          socket :: Socket.t(),
          topic :: String.t(),
          event :: String.t(),
          params :: json_serializable()
        ) :: push_reference()
  def push!(socket, topic, event, params) do
    case push(socket, topic, event, params) do
      {:ok, ref} ->
        ref

      # coveralls-ignore-start
      {:error, reason} ->
        raise "could not push!/4 message: #{inspect(reason)}"

        # coveralls-ignore-stop
    end
  end

  @doc """
  Requests that the connection process undergoe garbage collection

  If you're using Slipstream to send large messages, you may wish to flush
  the process memory of the connection process between large messages. This
  can be achieved through garbage collection.

  ## Examples

      iex> collect_garbage(socket)
      :ok
  """
  @doc since: "0.1.0"
  @spec collect_garbage(socket :: Socket.t()) :: :ok
  def collect_garbage(socket) do
    route_command %Commands.CollectGarbage{socket: socket}

    :ok
  end

  @doc """
  Requests that an open connection be closed

  This function will no-op when the socket is not currently connected to
  any remote websocket server.

  Note that you do not need to use `disconnect/1` to clean up a connection.
  The connection process monitors the slipstream client process and will shut
  down when it detects that the process has terminated.

  Disconnection may be awaited synchronously with `await_disconnect/2`

  ## Examples

      @impl Slipstream
      def handle_info(:chaos_monkey, socket) do
        {:ok, socket} =
          socket
          |> disconnect()
          |> await_disconnect()

        {:noreply, reconnect(socket)}
      end
  """
  @doc since: "0.1.0"
  @spec disconnect(socket :: Socket.t()) :: Socket.t()
  def disconnect(socket) do
    route_command %Commands.CloseConnection{socket: socket}

    socket
  end

  # --- await functionality

  @doc """
  Awaits a pending connection request synchronously
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_connect(socket :: Socket.t(), timeout()) ::
          {:ok, Socket.t()} | {:error, term()}
  def await_connect(socket, timeout \\ @default_timeout) do
    receive do
      event(%Events.ChannelConnected{} = event) ->
        {:ok, Socket.apply_event(socket, event)}

      event(%Events.ChannelConnectFailed{} = event) ->
        {:error, event.reason}
    after
      # coveralls-ignore-start
      timeout ->
        {:error, :timeout}
        # coveralls-ignore-stop
    end
  end

  @doc """
  Awaits a pending connection request synchronously, raising on failure
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_connect!(socket :: Socket.t(), timeout()) :: Socket.t()
  def await_connect!(socket, timeout \\ @default_timeout) do
    case await_connect(socket, timeout) do
      {:ok, socket} ->
        socket

      # coveralls-ignore-start
      {:error, reason} when is_atom(reason) ->
        exit(reason)

      {:error, reason} ->
        raise "Could not await connection: #{inspect(reason)}"

        # coveralls-ignore-stop
    end
  end

  @doc """
  Awaits a pending disconnection request synchronously
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_disconnect(socket :: Socket.t(), timeout()) ::
          {:ok, Socket.t()} | {:error, term()}
  def await_disconnect(socket, timeout \\ @default_timeout) do
    receive do
      event(%Events.ChannelClosed{} = event) ->
        {:ok, Socket.apply_event(socket, event)}
    after
      # coveralls-ignore-start
      timeout ->
        {:error, :timeout}
        # coveralls-ignore-stop
    end
  end

  @doc """
  Awaits a pending disconnection request synchronously, raising on failure
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_disconnect!(socket :: Socket.t(), timeout()) :: Socket.t()
  def await_disconnect!(socket, timeout \\ @default_timeout) do
    case await_disconnect(socket, timeout) do
      {:ok, socket} ->
        socket

      # coveralls-ignore-start
      {:error, reason} when is_atom(reason) ->
        exit(reason)

        # coveralls-ignore-stop
    end
  end

  @doc """
  Awaits a pending join request synchronously
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_join(socket :: Socket.t(), topic :: String.t(), timeout()) ::
          {:ok, Socket.t()} | {:error, term()}
  def await_join(socket, topic, timeout \\ @default_timeout)
      when is_binary(topic) do
    receive do
      event(%Events.TopicJoinSucceeded{topic: ^topic} = event) ->
        {:ok, Socket.apply_event(socket, event)}

      # coveralls-ignore-start
      event(%Events.TopicJoinFailed{topic: ^topic} = event) ->
        {:error, Events.TopicJoinFailed.to_reason(event)}
    after
      timeout ->
        {:error, :timeout}
        # coveralls-ignore-stop
    end
  end

  @doc """
  Awaits a join request synchronously, raising on failure
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_join!(socket :: Socket.t(), topic :: String.t(), timeout()) ::
          Socket.t()
  def await_join!(socket, topic, timeout \\ @default_timeout) do
    case await_join(socket, topic, timeout) do
      {:ok, socket} ->
        socket

      # coveralls-ignore-start
      {:error, reason} when is_atom(reason) ->
        exit(reason)

      {:error, reason} ->
        raise "Could not await join: #{inspect(reason)}"

        # coveralls-ignore-stop
    end
  end

  @doc """
  Awaits a leave request synchronously
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_leave(socket :: Socket.t(), topic :: String.t(), timeout()) ::
          {:ok, Socket.t()} | {:error, term()}
  def await_leave(socket, topic, timeout \\ @default_timeout)
      when is_binary(topic) do
    receive do
      event(%Events.TopicLeft{topic: ^topic} = event) ->
        {:ok, Socket.apply_event(socket, event)}
    after
      # coveralls-ignore-start
      timeout ->
        {:error, :timeout}
        # coveralls-ignore-stop
    end
  end

  @doc """
  Awaits a leave request synchronously, raising on failure
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_leave!(socket :: Socket.t(), topic :: String.t(), timeout()) ::
          Socket.t()
  def await_leave!(socket, topic, timeout \\ @default_timeout) do
    case await_leave(socket, topic, timeout) do
      {:ok, socket} ->
        socket

      # coveralls-ignore-start
      {:error, reason} when is_atom(reason) ->
        exit(reason)

        # coveralls-ignore-stop
    end
  end

  @doc """
  Awaits the server's message push

  Note that unlike the other `await_*` functions, `await_message/4` is a
  macro. This allows an author to match on patterns in the topic, event, and/or
  payload parts of a message.

  ## Examples

      iex> event = "msg:new"
      iex> await_message("rooms:lobby", ^event, %{"user_id" => 5})
      {:ok, "rooms:lobby", "msg:new", %{"user_id" => 5, body: "hello"}}
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_message(
          topic_expr :: Macro.t(),
          event_expr :: Macro.t(),
          payload_expr :: Macro.t(),
          timeout()
        ) ::
          {:ok, topic :: String.t(), event :: String.t(),
           payload :: json_serializable()}
          | {:error, :timeout}
  defmacro await_message(
             topic_expr,
             event_expr,
             payload_expr,
             timeout \\ @default_timeout
           ) do
    quote do
      receive do
        event(
          %Events.MessageReceived{
            topic: unquote(topic_expr),
            event: unquote(event_expr),
            payload: unquote(payload_expr)
          } = event
        ) ->
          {:ok, event.topic, event.event, event.payload}
      after
        unquote(timeout) -> {:error, :timeout}
      end
    end
  end

  @doc """
  Awaits the server's message push, raising on failure

  See `await_message/4`
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_message!(
          topic_expr :: Macro.t(),
          event_expr :: Macro.t(),
          payload_expr :: Macro.t(),
          timeout()
        ) ::
          {topic :: String.t(), event :: String.t(),
           payload :: json_serializable()}
  defmacro await_message!(
             topic_expr,
             event_expr,
             payload_expr,
             timeout \\ @default_timeout
           ) do
    quote do
      receive do
        event(
          %Events.MessageReceived{
            topic: unquote(topic_expr),
            event: unquote(event_expr),
            payload: unquote(payload_expr)
          } = event
        ) ->
          {event.topic, event.event, event.payload}
      after
        unquote(timeout) -> exit(:timeout)
      end
    end
  end

  @doc """
  Awaits the server's response to a message
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_reply(push_reference(), timeout()) :: reply() | {:error, :timeout}
  def await_reply(push_reference, timeout \\ @default_timeout)

  def await_reply(ref, timeout) do
    receive do
      event(%Events.ReplyReceived{ref: ^ref} = event) -> event.reply
    after
      # coveralls-ignore-start
      timeout ->
        {:error, :timeout}
        # coveralls-ignore-stop
    end
  end

  @doc """
  Awaits the server's response to a message, exiting on timeout

  See `await_reply/2` for more information.
  """
  @doc since: "0.1.0"
  @doc synchronicity: :synchronous
  @spec await_reply!(push_reference(), timeout()) :: reply()
  def await_reply!(push_reference, timeout \\ @default_timeout) do
    case await_reply(push_reference, timeout) do
      # coveralls-ignore-start
      {:error, :timeout} -> exit(:timeout)
      # coveralls-ignore-stop
      reply -> reply
    end
  end

  @doc """
  Declares that a module is a Slipstream socket client

  Slipstream provides a `use Slipstream` macro that behaves similar to
  GenServer's `use GenServer`. This does a few things:

  - a default implementation of `child_spec/1`, which is used to start the
    module as a GenServer. This may be overridden.
  - imports for all documented functions in `Slipstream` and `Slipstream.Socket`
  - a `c:GenServer.handle_info/2` function clause which matches incoming events
    from the connection process and dispatches them to the various Slipstream
    callbacks
  - a `c:GenServer.handle_info/2` function clause which matches Slipstream
    commands. This is used to implement back-off retry mechanisms for
    `reconnect/1` and `rejoin/3`.

  This provides a familiar and sleek interface for the common case of using
  Slipstream: an asynchronous callback-based GenServer module.

  It's not required to use this macro, though. Slipstream can be used in
  synchronous mode (via the `await_*` family of functions).

  ## Options

  Any options passed as the `opts` argument to `__using__/1` are passed along
  to `Supervisor.child_spec/2`, as is done in the default `child_spec/1`
  implementation for GenServer. Most notably, you may control the restart
  strategy of the client with the `:restart` option. See the example below.

  ## Examples

      defmodule MyApp.MySocketClient do
        # one crash/shutdown/exit will permanently terminate the server
        use Slipstream, restart: :temporary

        def start_link(args) do
          Slipstream.start_link(__MODULE__, args)
        end

        ..
      end
  """
  @doc since: "0.1.0"
  @spec __using__(Keyword.t()) :: Macro.t()
  defmacro __using__(opts) do
    quote location: :keep do
      def child_spec(init_arg) do
        %{
          id: __MODULE__,
          start: {__MODULE__, :start_link, [init_arg]}
        }
        |> Supervisor.child_spec(unquote(Macro.escape(opts)))
      end

      defoverridable child_spec: 1

      require Slipstream.Signatures

      import Slipstream

      import Slipstream.Socket,
        only: [
          assign: 2,
          assign: 3,
          channel_pid: 1,
          connected?: 1,
          join_status: 2,
          joined?: 2,
          update: 3
        ]

      @behaviour Slipstream

      @impl Slipstream
      def handle_info(Slipstream.Signatures.event(event), socket) do
        Slipstream.Callback.dispatch(__MODULE__, event, socket)
      end

      # this matches on time-delay commands like those emitted from
      # reconnect/1 and rejoin/3
      def handle_info(
            Slipstream.Signatures.command(
              %Slipstream.Commands.OpenConnection{} = cmd
            ),
            socket
          ) do
        socket = TelemetryHelper.begin_connect(socket, cmd.config)

        _ = Slipstream.CommandRouter.route_command(cmd)

        {:noreply, socket}
      end

      def handle_info(
            Slipstream.Signatures.command(
              %Slipstream.Commands.JoinTopic{} = cmd
            ),
            socket
          ) do
        socket = TelemetryHelper.begin_join(socket, cmd.topic, cmd.payload)

        _ = Slipstream.CommandRouter.route_command(cmd)

        {:noreply, socket}
      end
    end
  end
end