defmodule Hub do
@moduledoc """
Pub-sub hub
Subscription is done with a pattern.
Example:
```
Hub.subscribe("global", %{count: count} when count > 42)
Hub.publish("global", %{count: 45, message: "You rock!"})
```
"""
alias Hub.Channel
alias Hub.ChannelRegistry
alias Hub.ChannelSupervisor
alias Hub.Subscriber
@doc """
Unsubscribes using the reference returned on subscribe.
"""
defdelegate unsubscribe(ref), to: Channel
defdelegate unsubscribe_and_flush(ref), to: Channel
@doc """
Convenience macro for subscribing without the need to unquote the pattern.
Example:
```
Hub.subscribe("global", %{count: count} when count > 42)
```
"""
defmacro subscribe(channel_name, pattern, options \\ []) do
quote do
{bind_quoted, options} = unquote(options) |> Keyword.pop(:bind_quoted, [])
quoted_pattern = unquote(Macro.escape(pattern)) |> Hub.replace_pins(bind_quoted)
Hub.subscribe_quoted(unquote(channel_name), quoted_pattern, options)
end
end
@doc """
Publishes the term to all subscribers that matches it.
Returns the number of subscribers that got the message.
"""
@spec publish(String.t(), any) :: non_neg_integer
def publish(channel_name, term) do
case lookup_channel(channel_name) do
{:ok, channel} ->
Channel.publish(channel, term)
:not_found ->
0
end
end
@doc """
Subscribes to the quoted pattern in the given channel_name.
Example:
```
Hub.subscribe("global", quote do: %{count: count} when count > 42)
```
"""
@spec subscribe_quoted(String.t(), any, Channel.subscribe_options()) ::
{:ok, Channel.subscription_ref()} | {:error, reason :: String.t()}
def subscribe_quoted(channel_name, quoted_pattern, options \\ []) do
channel = upsert_channel(channel_name)
Channel.subscribe_quoted(channel, quoted_pattern, options)
end
@doc """
Get all subscribers from channel.
"""
@spec subscribers(String.t()) :: [Subscriber.t()]
def subscribers(channel_name) do
case lookup_channel(channel_name) do
{:ok, channel} ->
Channel.subscribers(channel)
:not_found ->
[]
end
end
@doc false
def replace_pins(ast, [] = _binding) do
ast
end
def replace_pins(ast, bindings) do
{ast, _acc} =
Macro.traverse(
ast,
nil,
fn ast, _acc ->
ast = traverse_pin(ast, bindings)
{ast, nil}
end,
fn ast, _acc -> {ast, nil} end
)
ast
end
defp traverse_pin({:^, _, [{name, _, atom}]} = term, bindings) when is_atom(atom) do
case Keyword.fetch(bindings, name) do
{:ok, value} -> Macro.escape(value)
:error -> term
end
end
defp traverse_pin(ast, _bindings) do
ast
end
defp upsert_channel(channel_name) do
case lookup_channel(channel_name) do
{:ok, channel} ->
channel
:not_found ->
case ChannelSupervisor.start_child(channel_name) do
{:ok, channel} ->
channel
:ignore ->
# Handle race condition where two processes are creating a channel at the same time
{:ok, channel} = lookup_channel(channel_name)
channel
end
end
end
defp lookup_channel(channel_name) do
ChannelRegistry.lookup(channel_name)
end
end