lib/phoenix/channel.ex

defmodule Phoenix.Channel do
  @moduledoc ~S"""
  Defines a Phoenix Channel.

  Channels provide a means for bidirectional communication from clients that
  integrate with the `Phoenix.PubSub` layer for soft-realtime functionality.

  For a conceptual overview, see the [Channels guide](channels.html).

  ## Topics & Callbacks

  Every time you join a channel, you need to choose which particular topic you
  want to listen to. The topic is just an identifier, but by convention it is
  often made of two parts: `"topic:subtopic"`. Using the `"topic:subtopic"`
  approach pairs nicely with the `Phoenix.Socket.channel/3` allowing you to
  match on all topics starting with a given prefix by using a splat (the `*`
  character) as the last character in the topic pattern:

      channel "room:*", MyAppWeb.RoomChannel

  Any topic coming into the router with the `"room:"` prefix would dispatch
  to `MyAppWeb.RoomChannel` in the above example. Topics can also be pattern
  matched in your channels' `join/3` callback to pluck out the scoped pattern:

      # handles the special `"lobby"` subtopic
      def join("room:lobby", _payload, socket) do
        {:ok, socket}
      end

      # handles any other subtopic as the room ID, for example `"room:12"`, `"room:34"`
      def join("room:" <> room_id, _payload, socket) do
        {:ok, socket}
      end

  ## Authorization

  Clients must join a channel to send and receive PubSub events on that channel.
  Your channels must implement a `join/3` callback that authorizes the socket
  for the given topic. For example, you could check if the user is allowed to
  join that particular room.

  To authorize a socket in `join/3`, return `{:ok, socket}`.
  To refuse authorization in `join/3`, return `{:error, reply}`.

  ## Incoming Events

  After a client has successfully joined a channel, incoming events from the
  client are routed through the channel's `handle_in/3` callbacks. Within these
  callbacks, you can perform any action. Incoming callbacks must return the
  `socket` to maintain ephemeral state.

  Typically you'll either forward a message to all listeners with
  `broadcast!/3` or reply directly to a client event for request/response style
  messaging.

  General message payloads are received as maps:

      def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
        ...
        {:reply, :ok, socket}
      end

  Binary data payloads are passed as a `{:binary, data}` tuple:

      def handle_in("file_chunk", {:binary, chunk}, socket) do
        ...
        {:reply, :ok, socket}
      end

  ## Broadcasts

  Here's an example of receiving an incoming `"new_msg"` event from one client,
  and broadcasting the message to all topic subscribers for this socket.

      def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
        broadcast!(socket, "new_msg", %{uid: uid, body: body})
        {:noreply, socket}
      end

  ## Replies

  Replies are useful for acknowledging a client's message or responding with
  the results of an operation. A reply is sent only to the client connected to
  the current channel process. Behind the scenes, they include the client
  message `ref`, which allows the client to correlate the reply it receives
  with the message it sent.

  For example, imagine creating a resource and replying with the created record:

      def handle_in("create:post", attrs, socket) do
        changeset = Post.changeset(%Post{}, attrs)

        if changeset.valid? do
          post = Repo.insert!(changeset)
          response = MyAppWeb.PostView.render("show.json", %{post: post})
          {:reply, {:ok, response}, socket}
        else
          response = MyAppWeb.ChangesetView.render("errors.json", %{changeset: changeset})
          {:reply, {:error, response}, socket}
        end
      end

  Or you may just want to confirm that the operation succeeded:

      def handle_in("create:post", attrs, socket) do
        changeset = Post.changeset(%Post{}, attrs)

        if changeset.valid? do
          Repo.insert!(changeset)
          {:reply, :ok, socket}
        else
          {:reply, :error, socket}
        end
      end

  Binary data is also supported with replies via a `{:binary, data}` tuple:

      {:reply, {:ok, {:binary, bin}}, socket}

  If you don't want to send a reply to the client, you can return:

      {:noreply, socket}

  One situation when you might do this is if you need to reply later; see
  `reply/2`.

  ## Pushes

  Calling `push/3` allows you to send a message to the client which is not a
  reply to a specific client message. Because it is not a reply, a pushed
  message does not contain a client message `ref`; there is no prior client
  message to relate it to.

  Possible use cases include notifying a client that:
  - You've auto-saved the user's document
  - The user's game is ending soon
  - The IoT device's settings should be updated

  For example, you could `push/3` a message to the client in `handle_info/3`
  after receiving a `PubSub` message relevant to them.

      alias Phoenix.Socket.Broadcast
      def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
        push(socket, event, payload)
        {:noreply, socket}
      end

  Push data can be given in the form of a map or a tagged `{:binary, data}`
  tuple:

      # client asks for their current rank. reply contains it, and client
      # is also pushed a leader board and a badge image
      def handle_in("current_rank", _, socket) do
        push(socket, "leaders", %{leaders: Game.get_leaders(socket.assigns.game_id)})
        push(socket, "badge", {:binary, File.read!(socket.assigns.badge_path)})
        {:reply, %{val: Game.get_rank(socket.assigns[:user])}, socket}
      end

  Note that in this example, `push/3` is called from `handle_in/3`; in this way
  you can essentially reply N times to a single message from the client. See
  `reply/2` for why this may be desirable.

  ## Intercepting Outgoing Events

  When an event is broadcasted with `broadcast/3`, each channel subscriber can
  choose to intercept the event and have their `handle_out/3` callback triggered.
  This allows the event's payload to be customized on a socket by socket basis
  to append extra information, or conditionally filter the message from being
  delivered. If the event is not intercepted with `Phoenix.Channel.intercept/1`,
  then the message is pushed directly to the client:

      intercept ["new_msg", "user_joined"]

      # for every socket subscribing to this topic, append an `is_editable`
      # value for client metadata.
      def handle_out("new_msg", msg, socket) do
        push(socket, "new_msg", Map.merge(msg,
          %{is_editable: User.can_edit_message?(socket.assigns[:user], msg)}
        ))
        {:noreply, socket}
      end

      # do not send broadcasted `"user_joined"` events if this socket's user
      # is ignoring the user who joined.
      def handle_out("user_joined", msg, socket) do
        unless User.ignoring?(socket.assigns[:user], msg.user_id) do
          push(socket, "user_joined", msg)
        end
        {:noreply, socket}
      end

  ## Broadcasting to an external topic

  In some cases, you will want to broadcast messages without the context of
  a `socket`. This could be for broadcasting from within your channel to an
  external topic, or broadcasting from elsewhere in your application like a
  controller or another process. Such can be done via your endpoint:

      # within channel
      def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
        ...
        broadcast_from!(socket, "new_msg", %{uid: uid, body: body})
        MyAppWeb.Endpoint.broadcast_from!(self(), "room:superadmin",
          "new_msg", %{uid: uid, body: body})
        {:noreply, socket}
      end

      # within controller
      def create(conn, params) do
        ...
        MyAppWeb.Endpoint.broadcast!("room:" <> rid, "new_msg", %{uid: uid, body: body})
        MyAppWeb.Endpoint.broadcast!("room:superadmin", "new_msg", %{uid: uid, body: body})
        redirect(conn, to: "/")
      end

  ## Terminate

  On termination, the channel callback `terminate/2` will be invoked with
  the error reason and the socket.

  If we are terminating because the client left, the reason will be
  `{:shutdown, :left}`. Similarly, if we are terminating because the
  client connection was closed, the reason will be `{:shutdown, :closed}`.

  If any of the callbacks return a `:stop` tuple, it will also
  trigger terminate with the reason given in the tuple.

  `terminate/2`, however, won't be invoked in case of errors nor in
  case of exits. This is the same behaviour as you find in Elixir
  abstractions like `GenServer` and others. Similar to `GenServer`,
  it would also be possible to `:trap_exit` to guarantee that `terminate/2`
  is invoked. This practice is not encouraged though.

  Generally speaking, if you want to clean something up, it is better to
  monitor your channel process and do the clean up from another process.
  All channel callbacks, including `join/3`, are called from within the
  channel process. Therefore, `self()` in any of them returns the PID to
  be monitored.

  ## Exit reasons when stopping a channel

  When the channel callbacks return a `:stop` tuple, such as:

      {:stop, :shutdown, socket}
      {:stop, {:error, :enoent}, socket}

  the second argument is the exit reason, which follows the same behaviour as
  standard `GenServer` exits.

  You have three options to choose from when shutting down a channel:

    * `:normal` - in such cases, the exit won't be logged and linked processes
      do not exit

    * `:shutdown` or `{:shutdown, term}` - in such cases, the exit won't be
      logged and linked processes exit with the same reason unless they're
      trapping exits

    * any other term - in such cases, the exit will be logged and linked
      processes exit with the same reason unless they're trapping exits

  ## Subscribing to external topics

  Sometimes you may need to programmatically subscribe a socket to external
  topics in addition to the internal `socket.topic`. For example,
  imagine you have a bidding system where a remote client dynamically sets
  preferences on products they want to receive bidding notifications on.
  Instead of requiring a unique channel process and topic per
  preference, a more efficient and simple approach would be to subscribe a
  single channel to relevant notifications via your endpoint. For example:

      defmodule MyAppWeb.Endpoint.NotificationChannel do
        use Phoenix.Channel

        def join("notification:" <> user_id, %{"ids" => ids}, socket) do
          topics = for product_id <- ids, do: "product:#{product_id}"

          {:ok, socket
                |> assign(:topics, [])
                |> put_new_topics(topics)}
        end

        def handle_in("watch", %{"product_id" => id}, socket) do
          {:reply, :ok, put_new_topics(socket, ["product:#{id}"])}
        end

        def handle_in("unwatch", %{"product_id" => id}, socket) do
          {:reply, :ok, MyAppWeb.Endpoint.unsubscribe("product:#{id}")}
        end

        defp put_new_topics(socket, topics) do
          Enum.reduce(topics, socket, fn topic, acc ->
            topics = acc.assigns.topics
            if topic in topics do
              acc
            else
              :ok = MyAppWeb.Endpoint.subscribe(topic)
              assign(acc, :topics, [topic | topics])
            end
          end)
        end
      end

  Note: the caller must be responsible for preventing duplicate subscriptions.
  After calling `subscribe/1` from your endpoint, the same flow applies to
  handling regular Elixir messages within your channel. Most often, you'll
  simply relay the `%Phoenix.Socket.Broadcast{}` event and payload:

      alias Phoenix.Socket.Broadcast
      def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
        push(socket, event, payload)
        {:noreply, socket}
      end

  ## Hibernation

  From Erlang/OTP 20, channels automatically hibernate to save memory
  after 15_000 milliseconds of inactivity. This can be customized by
  passing the `:hibernate_after` option to `use Phoenix.Channel`:

      use Phoenix.Channel, hibernate_after: 60_000

  You can also set it to `:infinity` to fully disable it.

  ## Shutdown

  You can configure the shutdown behavior of each channel used when your
  application is shutting down by setting the `:shutdown` value on use:

      use Phoenix.Channel, shutdown: 5_000

  It defaults to 5_000. The supported values are described under the
  in the `Supervisor` module docs.

  ## Logging

  By default, channel `"join"` and `"handle_in"` events are logged, using
  the level `:info` and `:debug`, respectively. Logs can be customized per
  event type or disabled by setting the `:log_join` and `:log_handle_in`
  options when using `Phoenix.Channel`. For example, the following
  configuration logs join events as `:info`, but disables logging for
  incoming events:

      use Phoenix.Channel, log_join: :info, log_handle_in: false

  """
  alias Phoenix.Socket
  alias Phoenix.Channel.Server

  @type payload :: map | {:binary, binary}
  @type reply :: status :: atom | {status :: atom, response :: payload}
  @type socket_ref ::
          {transport_pid :: Pid, serializer :: module, topic :: binary, ref :: binary,
           join_ref :: binary}

  @doc """
  Handle channel joins by `topic`.

  To authorize a socket, return `{:ok, socket}` or `{:ok, reply, socket}`. To
  refuse authorization, return `{:error, reason}`.

  ## Example

      def join("room:lobby", payload, socket) do
        if authorized?(payload) do
          {:ok, socket}
        else
          {:error, %{reason: "unauthorized"}}
        end
      end

  """
  @callback join(topic :: binary, payload :: payload, socket :: Socket.t()) ::
              {:ok, Socket.t()}
              | {:ok, reply :: payload, Socket.t()}
              | {:error, reason :: map}

  @doc """
  Handle incoming `event`s.

  ## Example

      def handle_in("ping", payload, socket) do
        {:reply, {:ok, payload}, socket}
      end

  """
  @callback handle_in(event :: String.t(), payload :: payload, socket :: Socket.t()) ::
              {:noreply, Socket.t()}
              | {:noreply, Socket.t(), timeout | :hibernate}
              | {:reply, reply, Socket.t()}
              | {:stop, reason :: term, Socket.t()}
              | {:stop, reason :: term, reply, Socket.t()}

  @doc """
  Intercepts outgoing `event`s.

  See `intercept/1`.
  """
  @callback handle_out(event :: String.t(), payload :: payload, socket :: Socket.t()) ::
              {:noreply, Socket.t()}
              | {:noreply, Socket.t(), timeout | :hibernate}
              | {:stop, reason :: term, Socket.t()}

  @doc """
  Handle regular Elixir process messages.

  See `c:GenServer.handle_info/2`.
  """
  @callback handle_info(msg :: term, socket :: Socket.t()) ::
              {:noreply, Socket.t()}
              | {:stop, reason :: term, Socket.t()}

  @doc """
  Handle regular GenServer call messages.

  See `c:GenServer.handle_call/3`.
  """
  @callback handle_call(msg :: term, from :: {pid, tag :: term}, socket :: Socket.t()) ::
              {:reply, response :: term, Socket.t()}
              | {:noreply, Socket.t()}
              | {:stop, reason :: term, Socket.t()}

  @doc """
  Handle regular GenServer cast messages.

  See `c:GenServer.handle_cast/2`.
  """
  @callback handle_cast(msg :: term, socket :: Socket.t()) ::
              {:noreply, Socket.t()}
              | {:stop, reason :: term, Socket.t()}

  @doc false
  @callback code_change(old_vsn, Socket.t(), extra :: term) ::
              {:ok, Socket.t()}
              | {:error, reason :: term}
            when old_vsn: term | {:down, term}

  @doc """
  Invoked when the channel process is about to exit.

  See `c:GenServer.terminate/2`.
  """
  @callback terminate(
              reason :: :normal | :shutdown | {:shutdown, :left | :closed | term},
              Socket.t()
            ) ::
              term

  @optional_callbacks handle_in: 3,
                      handle_out: 3,
                      handle_info: 2,
                      handle_call: 3,
                      handle_cast: 2,
                      code_change: 3,
                      terminate: 2

  defmacro __using__(opts \\ []) do
    quote do
      opts = unquote(opts)
      @behaviour unquote(__MODULE__)
      @on_definition unquote(__MODULE__)
      @before_compile unquote(__MODULE__)
      @phoenix_intercepts []
      @phoenix_log_join Keyword.get(opts, :log_join, :info)
      @phoenix_log_handle_in Keyword.get(opts, :log_handle_in, :debug)
      @phoenix_hibernate_after Keyword.get(opts, :hibernate_after, 15_000)
      @phoenix_shutdown Keyword.get(opts, :shutdown, 5000)

      import unquote(__MODULE__)
      import Phoenix.Socket, only: [assign: 3, assign: 2]

      def child_spec(init_arg) do
        %{
          id: __MODULE__,
          start: {__MODULE__, :start_link, [init_arg]},
          shutdown: @phoenix_shutdown,
          restart: :temporary
        }
      end

      def start_link(triplet) do
        GenServer.start_link(Phoenix.Channel.Server, triplet,
          hibernate_after: @phoenix_hibernate_after
        )
      end

      def __socket__(:private) do
        %{log_join: @phoenix_log_join, log_handle_in: @phoenix_log_handle_in}
      end
    end
  end

  defmacro __before_compile__(_) do
    quote do
      def __intercepts__, do: @phoenix_intercepts
    end
  end

  @doc """
  Defines which Channel events to intercept for `handle_out/3` callbacks.

  By default, broadcasted events are pushed directly to the client, but
  intercepting events gives your channel a chance to customize the event
  for the client to append extra information or filter the message from being
  delivered.

  *Note*: intercepting events can introduce significantly more overhead if a
  large number of subscribers must customize a message since the broadcast will
  be encoded N times instead of a single shared encoding across all subscribers.

  ## Examples

      intercept ["new_msg"]

      def handle_out("new_msg", payload, socket) do
        push(socket, "new_msg", Map.merge(payload,
          is_editable: User.can_edit_message?(socket.assigns[:user], payload)
        ))
        {:noreply, socket}
      end

  `handle_out/3` callbacks must return one of:

      {:noreply, Socket.t} |
      {:noreply, Socket.t, timeout | :hibernate} |
      {:stop, reason :: term, Socket.t}

  """
  defmacro intercept(events) do
    quote do
      @phoenix_intercepts unquote(events)
    end
  end

  @doc false
  def __on_definition__(env, :def, :handle_out, [event, _payload, _socket], _, _)
      when is_binary(event) do
    unless event in Module.get_attribute(env.module, :phoenix_intercepts) do
      IO.write(
        "#{Path.relative_to(env.file, File.cwd!())}:#{env.line}: [warning] " <>
          "An intercept for event \"#{event}\" has not yet been defined in #{env.module}.handle_out/3. " <>
          "Add \"#{event}\" to your list of intercepted events with intercept/1"
      )
    end
  end

  def __on_definition__(_env, _kind, _name, _args, _guards, _body) do
    :ok
  end

  @doc """
  Broadcast an event to all subscribers of the socket topic.

  The event's message must be a serializable map or a tagged `{:binary, data}`
  tuple where `data` is binary data.

  ## Examples

      iex> broadcast(socket, "new_message", %{id: 1, content: "hello"})
      :ok

      iex> broadcast(socket, "new_message", {:binary, "hello"})
      :ok

  """
  def broadcast(socket, event, message) do
    %{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
    Server.broadcast(pubsub_server, topic, event, message)
  end

  @doc """
  Same as `broadcast/3`, but raises if broadcast fails.
  """
  def broadcast!(socket, event, message) do
    %{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
    Server.broadcast!(pubsub_server, topic, event, message)
  end

  @doc """
  Broadcast event from pid to all subscribers of the socket topic.

  The channel that owns the socket will not receive the published
  message. The event's message must be a serializable map or a tagged
  `{:binary, data}` tuple where `data` is binary data.

  ## Examples

      iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
      :ok

      iex> broadcast_from(socket, "new_message", {:binary, "hello"})
      :ok

  """
  def broadcast_from(socket, event, message) do
    %{pubsub_server: pubsub_server, topic: topic, channel_pid: channel_pid} =
      assert_joined!(socket)

    Server.broadcast_from(pubsub_server, channel_pid, topic, event, message)
  end

  @doc """
  Same as `broadcast_from/3`, but raises if broadcast fails.
  """
  def broadcast_from!(socket, event, message) do
    %{pubsub_server: pubsub_server, topic: topic, channel_pid: channel_pid} =
      assert_joined!(socket)

    Server.broadcast_from!(pubsub_server, channel_pid, topic, event, message)
  end

  @doc """
  Sends an event directly to the connected client without requiring a prior
  message from the client.

  The event's message must be a serializable map or a tagged `{:binary, data}`
  tuple where `data` is binary data.

  Note that unlike some in client libraries, this server-side `push/3` does not
  return a reference. If you need to get a reply from the client and to
  correlate that reply with the message you pushed, you'll need to include a
  unique identifier in the message, track it in the Channel's state, have the
  client include it in its reply, and examine the ref when the reply comes to
  `handle_in/3`.

  ## Examples

      iex> push(socket, "new_message", %{id: 1, content: "hello"})
      :ok

      iex> push(socket, "new_message", {:binary, "hello"})
      :ok

  """
  def push(socket, event, message) do
    %{transport_pid: transport_pid, topic: topic} = assert_joined!(socket)
    Server.push(transport_pid, socket.join_ref, topic, event, message, socket.serializer)
  end

  @doc """
  Replies asynchronously to a socket push.

  The usual way of replying to a client's message is to return a tuple from `handle_in/3`
  like:

      {:reply, {status, payload}, socket}

  But sometimes you need to reply to a push asynchronously - that is, after
  your `handle_in/3` callback completes. For example, you might need to perform
  work in another process and reply when it's finished.

  You can do this by generating a reference to the socket with `socket_ref/1`
  and calling `reply/2` with that ref when you're ready to reply.

  *Note*: A `socket_ref` is required so the `socket` itself is not leaked
  outside the channel. The `socket` holds information such as assigns and
  transport configuration, so it's important to not copy this information
  outside of the channel that owns it.

  Technically, `reply/2` will allow you to reply multiple times to the same
  client message, and each reply will include the client message `ref`. But the
  client may expect only one reply; in that case, `push/3` would be preferable
  for the additional messages.

  ## Examples

      def handle_in("work", payload, socket) do
        Worker.perform(payload, socket_ref(socket))
        {:noreply, socket}
      end

      def handle_info({:work_complete, result, ref}, socket) do
        reply(ref, {:ok, result})
        {:noreply, socket}
      end

  """
  @spec reply(socket_ref, reply) :: :ok
  def reply(socket_ref, status) when is_atom(status) do
    reply(socket_ref, {status, %{}})
  end

  def reply({transport_pid, serializer, topic, ref, join_ref}, {status, payload}) do
    Server.reply(transport_pid, join_ref, ref, topic, {status, payload}, serializer)
  end

  @doc """
  Generates a `socket_ref` for an async reply.

  See `reply/2` for example usage.
  """
  @spec socket_ref(Socket.t()) :: socket_ref
  def socket_ref(%Socket{joined: true, ref: ref} = socket) when not is_nil(ref) do
    {socket.transport_pid, socket.serializer, socket.topic, ref, socket.join_ref}
  end

  def socket_ref(_socket) do
    raise ArgumentError, """
    socket refs can only be generated for a socket that has joined with a push ref
    """
  end

  defp assert_joined!(%Socket{joined: true} = socket) do
    socket
  end

  defp assert_joined!(%Socket{joined: false}) do
    raise """
    push/3, reply/2, and broadcast/3 can only be called after the socket has finished joining.
    To push a message on join, send to self and handle in handle_info/2. For example:

        def join(topic, auth_msg, socket) do
          ...
          send(self, :after_join)
          {:ok, socket}
        end

        def handle_info(:after_join, socket) do
          push(socket, "feed", %{list: feed_items(socket)})
          {:noreply, socket}
        end

    """
  end
end