defmodule Phoenix.Presence do
@moduledoc """
Provides Presence tracking to processes and channels.
This behaviour provides presence features such as fetching
presences for a given topic, as well as handling diffs of
join and leave events as they occur in real-time. Using this
module defines a supervisor and a module that implements the
`Phoenix.Tracker` behaviour that uses `Phoenix.PubSub` to
broadcast presence updates.
In case you want to use only a subset of the functionality
provided by `Phoenix.Presence`, such as tracking processes
but without broadcasting updates, we recommend that you look
at the `Phoenix.Tracker` functionality from the `phoenix_pubsub`
project.
## Example Usage
Start by defining a presence module within your application
which uses `Phoenix.Presence` and provide the `:otp_app` which
holds your configuration, as well as the `:pubsub_server`.
defmodule MyAppWeb.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
end
The `:pubsub_server` must point to an existing pubsub server
running in your application, which is included by default as
`MyApp.PubSub` for new applications.
Next, add the new supervisor to your supervision tree in
`lib/my_app/application.ex`. It must be after the PubSub child
and before the endpoint:
children = [
...
{Phoenix.PubSub, name: MyApp.PubSub},
MyAppWeb.Presence,
MyAppWeb.Endpoint
]
Once added, presences can be tracked in your channel after joining:
defmodule MyAppWeb.MyChannel do
use MyAppWeb, :channel
alias MyAppWeb.Presence
def join("some:topic", _params, socket) do
send(self(), :after_join)
{:ok, assign(socket, :user_id, ...)}
end
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second))
})
push(socket, "presence_state", Presence.list(socket))
{:noreply, socket}
end
end
In the example above, `Presence.track` is used to register this channel's process as a
presence for the socket's user ID, with a map of metadata.
Next, the current presence information for
the socket's topic is pushed to the client as a `"presence_state"` event.
Finally, a diff of presence join and leave events will be sent to the
client as they happen in real-time with the "presence_diff" event.
The diff structure will be a map of `:joins` and `:leaves` of the form:
%{
joins: %{"123" => %{metas: [%{status: "away", phx_ref: ...}]}},
leaves: %{"456" => %{metas: [%{status: "online", phx_ref: ...}]}}
},
See `c:list/1` for more information on the presence data structure.
## Fetching Presence Information
Presence metadata should be minimized and used to store small,
ephemeral state, such as a user's "online" or "away" status.
More detailed information, such as user details that need to be fetched
from the database, can be achieved by overriding the `c:fetch/2` function.
The `c:fetch/2` callback is triggered when using `c:list/1` and on
every update, and it serves as a mechanism to fetch presence information
a single time, before broadcasting the information to all channel subscribers.
This prevents N query problems and gives you a single place to group
isolated data fetching to extend presence metadata.
The function must return a map of data matching the outlined Presence
data structure, including the `:metas` key, but can extend the map of
information to include any additional information. For example:
def fetch(_topic, presences) do
users = presences |> Map.keys() |> Accounts.get_users_map()
for {key, %{metas: metas}} <- presences, into: %{} do
{key, %{metas: metas, user: users[String.to_integer(key)]}}
end
end
Where `Account.get_users_map/1` could be implemented like:
def get_users_map(ids) do
query =
from u in User,
where: u.id in ^ids,
select: {u.id, u}
query |> Repo.all() |> Enum.into(%{})
end
The `fetch/2` function above fetches all users from the database who
have registered presences for the given topic. The presences
information is then extended with a `:user` key of the user's
information, while maintaining the required `:metas` field from the
original presence data.
## Using Elixir as a Presence Client
Presence is great for external clients, such as JavaScript applications, but
it can also be used from an Elixir client process to keep track of presence
changes as they happen on the server. This can be accomplished by implementing
the optional [`init/1`](`c:init/1`) and [`handle_metas/4`](`c:handle_metas/4`)
callbacks on your presence module. For example, the following callback
receives presence metadata changes, and broadcasts to other Elixir processes
about users joining and leaving:
defmodule MyApp.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
def init(_opts) do
{:ok, %{}} # user-land state
end
def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do
# fetch existing presence information for the joined users and broadcast the
# event to all subscribers
for {user_id, presence} <- joins do
user_data = %{user: presence.user, metas: Map.fetch!(presences, user_id)}
msg = {MyApp.PresenceClient, {:join, user_data}}
Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
end
# fetch existing presence information for the left users and broadcast the
# event to all subscribers
for {user_id, presence} <- leaves do
metas =
case Map.fetch(presences, user_id) do
{:ok, presence_metas} -> presence_metas
:error -> []
end
user_data = %{user: presence.user, metas: metas}
msg = {MyApp.PresenceClient, {:leave, user_data}}
Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
end
{:ok, state}
end
end
The `handle_metas/4` callback receives the topic, presence diff, current presences
for the topic with their metadata, and any user-land state accumulated from init and
subsequent `handle_metas/4` calls. In our example implementation, we walk the `:joins` and
`:leaves` in the diff, and populate a complete presence from our known presence information.
Then we broadcast to the local node subscribers about user joins and leaves.
## Testing with Presence
Every time the `fetch` callback is invoked, it is done from a separate
process. Given those processes run asynchronously, it is often necessary
to guarantee they have been shutdown at the end of every test. This can
be done by using ExUnit's `on_exit` hook plus `fetchers_pids` function:
on_exit(fn ->
for pid <- MyAppWeb.Presence.fetchers_pids() do
ref = Process.monitor(pid)
assert_receive {:DOWN, ^ref, _, _, _}, 1000
end
end)
"""
@type presences :: %{String.t() => %{metas: [map()]}}
@type presence :: %{key: String.t(), meta: map()}
@type topic :: String.t()
@doc """
Track a channel's process as a presence.
Tracked presences are grouped by `key`, cast as a string. For example, to
group each user's channels together, use user IDs as keys. Each presence can
be associated with a map of metadata to store small, ephemeral state, such as
a user's online status. To store detailed information, see `c:fetch/2`.
## Example
alias MyApp.Presence
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second))
})
{:noreply, socket}
end
"""
@callback track(socket :: Phoenix.Socket.t(), key :: String.t(), meta :: map()) ::
{:ok, ref :: binary()}
| {:error, reason :: term()}
@doc """
Track an arbitrary process as a presence.
Same with `track/3`, except track any process by `topic` and `key`.
"""
@callback track(pid, topic, key :: String.t(), meta :: map()) ::
{:ok, ref :: binary()}
| {:error, reason :: term()}
@doc """
Stop tracking a channel's process.
"""
@callback untrack(socket :: Phoenix.Socket.t(), key :: String.t()) :: :ok
@doc """
Stop tracking a process.
"""
@callback untrack(pid, topic, key :: String.t()) :: :ok
@doc """
Update a channel presence's metadata.
Replace a presence's metadata by passing a new map or a function that takes
the current map and returns a new one.
"""
@callback update(
socket :: Phoenix.Socket.t(),
key :: String.t(),
meta :: map() | (map() -> map())
) ::
{:ok, ref :: binary()}
| {:error, reason :: term()}
@doc """
Update a process presence's metadata.
Same as `update/3`, but with an arbitrary process.
"""
@callback update(pid, topic, key :: String.t(), meta :: map() | (map() -> map())) ::
{:ok, ref :: binary()}
| {:error, reason :: term()}
@doc """
Returns presences for a socket/topic.
## Presence data structure
The presence information is returned as a map with presences grouped
by key, cast as a string, and accumulated metadata, with the following form:
%{key => %{metas: [%{phx_ref: ..., ...}, ...]}}
For example, imagine a user with id `123` online from two
different devices, as well as a user with id `456` online from
just one device. The following presence information might be returned:
%{"123" => %{metas: [%{status: "away", phx_ref: ...},
%{status: "online", phx_ref: ...}]},
"456" => %{metas: [%{status: "online", phx_ref: ...}]}}
The keys of the map will usually point to a resource ID. The value
will contain a map with a `:metas` key containing a list of metadata
for each resource. Additionally, every metadata entry will contain a
`:phx_ref` key which can be used to uniquely identify metadata for a
given key. In the event that the metadata was previously updated,
a `:phx_ref_prev` key will be present containing the previous
`:phx_ref` value.
"""
@callback list(Phoenix.Socket.t() | topic) :: presences
@doc """
Returns the map of presence metadata for a socket/topic-key pair.
## Examples
Uses the same data format as each presence in `c:list/1`, but only
returns metadata for the presences under a topic and key pair. For example,
a user with key `"user1"`, connected to the same chat room `"room:1"` from two
devices, could return:
iex> MyPresence.get_by_key("room:1", "user1")
[%{name: "User 1", metas: [%{device: "Desktop"}, %{device: "Mobile"}]}]
Like `c:list/1`, the presence metadata is passed to the `fetch`
callback of your presence module to fetch any additional information.
"""
@callback get_by_key(Phoenix.Socket.t() | topic, key :: String.t()) :: [presence]
@doc """
Extend presence information with additional data.
When `c:list/1` is used to list all presences of the given `topic`, this
callback is triggered once to modify the result before it is broadcasted to
all channel subscribers. This avoids N query problems and provides a single
place to extend presence metadata. You must return a map of data matching the
original result, including the `:metas` key, but can extend the map to include
any additional information.
The default implementation simply passes `presences` through unchanged.
## Example
def fetch(_topic, presences) do
query =
from u in User,
where: u.id in ^Map.keys(presences),
select: {u.id, u}
users = query |> Repo.all() |> Enum.into(%{})
for {key, %{metas: metas}} <- presences, into: %{} do
{key, %{metas: metas, user: users[key]}}
end
end
"""
@callback fetch(topic, presences) :: presences
@doc """
Initializes the presence client state.
Invoked when your presence module starts, allows dynamically
providing initial state for handling presence metadata.
"""
@callback init(state :: term) :: {:ok, new_state :: term}
@doc """
Receives presence metadata changes.
"""
@callback handle_metas(topic :: String.t(), diff :: map(), presences :: map(), state :: term) ::
{:ok, term}
@optional_callbacks init: 1, handle_metas: 4
defmacro __using__(opts) do
quote location: :keep, bind_quoted: [opts: opts] do
@behaviour Phoenix.Presence
@opts opts
@task_supervisor Module.concat(__MODULE__, "TaskSupervisor")
_ = opts[:otp_app] || raise "use Phoenix.Presence expects :otp_app to be given"
# User defined
def fetch(_topic, presences), do: presences
defoverridable fetch: 2
# Private
def child_spec(opts) do
opts = Keyword.merge(@opts, opts)
%{
id: __MODULE__,
start: {Phoenix.Presence, :start_link, [__MODULE__, @task_supervisor, opts]},
type: :supervisor
}
end
# API
def track(%Phoenix.Socket{} = socket, key, meta) do
track(socket.channel_pid, socket.topic, key, meta)
end
def track(pid, topic, key, meta) do
Phoenix.Tracker.track(__MODULE__, pid, topic, key, meta)
end
def untrack(%Phoenix.Socket{} = socket, key) do
untrack(socket.channel_pid, socket.topic, key)
end
def untrack(pid, topic, key) do
Phoenix.Tracker.untrack(__MODULE__, pid, topic, key)
end
def update(%Phoenix.Socket{} = socket, key, meta) do
update(socket.channel_pid, socket.topic, key, meta)
end
def update(pid, topic, key, meta) do
Phoenix.Tracker.update(__MODULE__, pid, topic, key, meta)
end
def list(%Phoenix.Socket{topic: topic}), do: list(topic)
def list(topic), do: Phoenix.Presence.list(__MODULE__, topic)
def get_by_key(%Phoenix.Socket{topic: topic}, key), do: get_by_key(topic, key)
def get_by_key(topic, key), do: Phoenix.Presence.get_by_key(__MODULE__, topic, key)
def fetchers_pids(), do: Task.Supervisor.children(@task_supervisor)
end
end
defmodule Tracker do
@moduledoc false
use Phoenix.Tracker
def start_link({module, task_supervisor, opts}) do
pubsub_server =
opts[:pubsub_server] || raise "use Phoenix.Presence expects :pubsub_server to be given"
Phoenix.Tracker.start_link(
__MODULE__,
{module, task_supervisor, pubsub_server},
opts
)
end
def init(state), do: Phoenix.Presence.init(state)
def handle_diff(diff, state), do: Phoenix.Presence.handle_diff(diff, state)
def handle_info(msg, state),
do: Phoenix.Presence.handle_info(msg, state)
end
@doc false
def start_link(module, task_supervisor, opts) do
otp_app = opts[:otp_app]
opts =
opts
|> Keyword.merge(Application.get_env(otp_app, module, []))
|> Keyword.put(:name, module)
children = [
{Task.Supervisor, name: task_supervisor},
{Tracker, {module, task_supervisor, opts}}
]
sup_opts = [
strategy: :rest_for_one,
name: Module.concat(module, "Supervisor")
]
Supervisor.start_link(children, sup_opts)
end
@doc false
def init({module, task_supervisor, pubsub_server}) do
state = %{
module: module,
task_supervisor: task_supervisor,
pubsub_server: pubsub_server,
topics: %{},
tasks: :queue.new(),
current_task: nil,
client_state: nil
}
client_state =
if function_exported?(module, :handle_metas, 4) do
unless function_exported?(module, :init, 1) do
raise ArgumentError, """
missing #{inspect(module)}.init/1 callback for client state
When you implement the handle_metas/4 callback, you must also
implement init/1. For example, add the following to
#{inspect(module)}:
def init(_opts), do: {:ok, %{}}
"""
end
case module.init(%{}) do
{:ok, client_state} ->
client_state
other ->
raise ArgumentError, """
expected #{inspect(module)}.init/1 to return {:ok, state}, got: #{inspect(other)}
"""
end
end
{:ok, %{state | client_state: client_state}}
end
@doc false
def handle_diff(diff, state) do
{:ok, async_merge(state, diff)}
end
@doc false
def handle_info({task_ref, {:phoenix, ref, computed_diffs}}, state) do
%{current_task: current_task} = state
{^ref, %Task{ref: ^task_ref} = task} = current_task
Task.shutdown(task)
Enum.each(computed_diffs, fn {topic, presence_diff} ->
Phoenix.Channel.Server.local_broadcast(
state.pubsub_server,
topic,
"presence_diff",
presence_diff
)
end)
new_state =
if function_exported?(state.module, :handle_metas, 4) do
do_handle_metas(state, computed_diffs)
else
state
end
{:noreply, next_task(new_state)}
end
@doc false
def list(module, topic) do
grouped =
module
|> Phoenix.Tracker.list(topic)
|> group()
module.fetch(topic, grouped)
end
@doc false
def get_by_key(module, topic, key) do
string_key = to_string(key)
case Phoenix.Tracker.get_by_key(module, topic, key) do
[] ->
[]
[_ | _] = pid_metas ->
metas = Enum.map(pid_metas, fn {_pid, meta} -> meta end)
%{^string_key => fetched_metas} = module.fetch(topic, %{string_key => %{metas: metas}})
fetched_metas
end
end
@doc false
def group(presences) do
presences
|> Enum.reverse()
|> Enum.reduce(%{}, fn {key, meta}, acc ->
Map.update(acc, to_string(key), %{metas: [meta]}, fn %{metas: metas} ->
%{metas: [meta | metas]}
end)
end)
end
defp send_continue(%Task{} = task, ref), do: send(task.pid, {ref, :continue})
defp next_task(state) do
case :queue.out(state.tasks) do
{{:value, {ref, %Task{} = next}}, remaining_tasks} ->
send_continue(next, ref)
%{state | current_task: {ref, next}, tasks: remaining_tasks}
{:empty, _} ->
%{state | current_task: nil, tasks: :queue.new()}
end
end
defp do_handle_metas(state, computed_diffs) do
Enum.reduce(computed_diffs, state, fn {topic, presence_diff}, acc ->
updated_topics = merge_diff(acc.topics, topic, presence_diff)
topic_presences =
case Map.fetch(updated_topics, topic) do
{:ok, presences} -> presences
:error -> %{}
end
case acc.module.handle_metas(topic, presence_diff, topic_presences, acc.client_state) do
{:ok, updated_client_state} ->
%{acc | topics: updated_topics, client_state: updated_client_state}
other ->
raise ArgumentError, """
expected #{inspect(acc.module)}.handle_metas/4 to return {:ok, new_state}.
got: #{inspect(other)}
"""
end
end)
end
defp async_merge(state, diff) do
%{module: module} = state
ref = make_ref()
new_task =
Task.Supervisor.async(state.task_supervisor, fn ->
computed_diffs =
Enum.map(diff, fn {topic, {joins, leaves}} ->
joins = module.fetch(topic, Phoenix.Presence.group(joins))
leaves = module.fetch(topic, Phoenix.Presence.group(leaves))
{topic, %{joins: joins, leaves: leaves}}
end)
receive do
{^ref, :continue} -> {:phoenix, ref, computed_diffs}
end
end)
if state.current_task do
%{state | tasks: :queue.in({ref, new_task}, state.tasks)}
else
send_continue(new_task, ref)
%{state | current_task: {ref, new_task}}
end
end
defp merge_diff(topics, topic, %{leaves: leaves, joins: joins} = _diff) do
# add new topic if needed
updated_topics =
if Map.has_key?(topics, topic) do
topics
else
add_new_topic(topics, topic)
end
# merge diff into topics
{updated_topics, _topic} = Enum.reduce(joins, {updated_topics, topic}, &handle_join/2)
{updated_topics, _topic} = Enum.reduce(leaves, {updated_topics, topic}, &handle_leave/2)
# if no more presences for given topic, remove topic
if topic_presences_count(updated_topics, topic) == 0 do
remove_topic(updated_topics, topic)
else
updated_topics
end
end
defp handle_join({joined_key, presence}, {topics, topic}) do
joined_metas = Map.get(presence, :metas, [])
{add_new_presence_or_metas(topics, topic, joined_key, joined_metas), topic}
end
defp handle_leave({left_key, presence}, {topics, topic}) do
{remove_presence_or_metas(topics, topic, left_key, presence), topic}
end
defp add_new_presence_or_metas(
topics,
topic,
key,
new_metas
) do
topic_presences = topics[topic]
updated_topic =
case Map.fetch(topic_presences, key) do
# existing presence, add new metas
{:ok, existing_metas} ->
remaining_metas = new_metas -- existing_metas
updated_metas = existing_metas ++ remaining_metas
Map.put(topic_presences, key, updated_metas)
# there are no presences for that key
:error ->
Map.put_new(topic_presences, key, new_metas)
end
Map.put(topics, topic, updated_topic)
end
defp remove_presence_or_metas(
topics,
topic,
key,
deleted_metas
) do
topic_presences = topics[topic]
presence_metas = Map.get(topic_presences, key, [])
remaining_metas = presence_metas -- Map.get(deleted_metas, :metas, [])
updated_topic =
case remaining_metas do
[] -> Map.delete(topic_presences, key)
_ -> Map.put(topic_presences, key, remaining_metas)
end
Map.put(topics, topic, updated_topic)
end
defp add_new_topic(topics, topic) do
Map.put_new(topics, topic, %{})
end
defp remove_topic(topics, topic) do
Map.delete(topics, topic)
end
defp topic_presences_count(topics, topic) do
map_size(topics[topic])
end
end