defmodule Wiki.EventStreams do
@moduledoc """
This module reads from an infinite stream of [server-sent events](https://en.wikipedia.org/wiki/Server-sent_events)
annotating actions such as editing or patrolling, as they happen on Wikimedia projects.
For more about the public wiki streams and their format, see
[EventStreams on Wikitech](https://wikitech.wikimedia.org/wiki/EventStreams)
## Examples
Start reading the page creation feed, and expose as a GenStage.stream:
```elixir
Wiki.EventStreams.start_link(streams: "page-create")
Wiki.EventStreams.stream()
|> Stream.take(6)
|> Enum.to_list
```
Combine multiple feeds,
```elixir
Wiki.EventStreams.start_link(streams: ["revision-create", "revision-score"])
Wiki.EventStreams.stream()
|> Stream.take(6)
|> Enum.to_list
```
## TODO
* Currently only a single supervisor tree is supported, so calling applications can only read from one stream.
* Track the restart ID, disconnect from the feed at some maximum queue size. Reconnect as demand resumes.
Application-lifetime or permanent storage for the restart ID tracking, for consumers that need an at-least-once
guarantee.
"""
defmodule Relay do
@moduledoc false
use GenStage
@type state :: {:queue.queue(), integer}
@type reply :: {:noreply, [map], state}
@doc """
## Arguments
* Client configuration. `:streams` is required.
"""
@spec start_link(Wiki.EventStreams.client_options()) :: GenServer.on_start()
def start_link(opts) do
{:ok, relay_pid} = GenStage.start_link(__MODULE__, opts, name: __MODULE__)
# FIXME: Rationalize the supervision tree.
source_opts = Keyword.put_new(opts, :stream_to, relay_pid)
{:ok, _} =
Supervisor.start_link([{Wiki.EventStreams.Source, source_opts}], strategy: :one_for_one)
{:ok, relay_pid}
end
@impl true
@spec init(any) :: {:producer, state}
def init(_) do
{:producer, {:queue.new(), 0}}
end
@impl true
@spec handle_info(EventsourceEx.Message.t(), state) :: reply
def handle_info(message, {queue, pending_demand}) do
message
|> decode_message_data()
|> :queue.in(queue)
# FIXME: Suppress reply until above min_demand or periodic timeout has elapsed.
|> dispatch_events(pending_demand)
end
@doc """
Return the requested number of events
## Arguments
- `demand` - Number of new events requested by the consumer.
- `state` - queue and pending demand
"""
@impl true
@spec handle_demand(pos_integer, state) :: reply
def handle_demand(demand, {queue, pending_demand}) do
dispatch_events(queue, demand + pending_demand)
end
@spec dispatch_events(:queue.queue(), integer) :: reply
defp dispatch_events(queue, demand) do
available = min(demand, :queue.len(queue))
{retrieved, queue1} = :queue.split(available, queue)
events = :queue.to_list(retrieved) |> Enum.reverse()
{:noreply, events, {queue1, demand - available}}
end
@spec decode_message_data(EventsourceEx.Message.t()) :: map
defp decode_message_data(message) do
message.data
|> Jason.decode!()
end
end
defmodule Source do
@moduledoc false
alias Wiki.{EventStreams, Util}
@default_endpoint "https://stream.wikimedia.org/v2/stream/"
@doc false
@spec child_spec(EventStreams.client_options()) :: map
def child_spec(opts \\ []) do
adapter = opts[:adapter]
endpoint = opts[:endpoint] || @default_endpoint
sink = opts[:stream_to]
user_agent = opts[:user_agent] || Util.default_user_agent()
url = endpoint <> normalize_streams(opts[:streams])
%{
id: Source,
# FIXME: nicer if we could get the Relay sibling's specific PID each time,
# to allow an app to use multiple stream listeners.
start: {
EventsourceEx,
:new,
[
url,
[
adapter: adapter,
headers: [{"user-agent", user_agent}],
stream_to: sink
]
]
}
}
end
@spec normalize_streams(atom | [atom]) :: atom | String.t()
defp normalize_streams(streams)
defp normalize_streams(streams) when is_list(streams), do: Enum.join(streams, ",")
defp normalize_streams(streams) when is_atom(streams) or is_binary(streams), do: streams
end
defmodule RelaySupervisor do
@moduledoc false
# TODO: Should this logic be moved to init/1?
@doc """
## Arguments
* `opts` - Client keyword options
"""
@spec start_link(Wiki.EventStreams.client_options()) :: GenServer.on_start()
def start_link(opts) do
{:ok, _} = Supervisor.start_link([{Relay, opts}], strategy: :one_for_one)
end
end
@type client_option ::
{:adapter, module()}
| {:endpoint, binary()}
| {:stream_to, GenServer.server()}
| {:streams, atom | [atom]}
| {:user_agent, binary()}
@typedoc """
* `:adapter` - Override the HTTP adapter.
* `:endpoint` - Override the default endpoint URL.
* `:stream_to` - Optional application which will receive the events, otherwise
they go to the process starting the EventStream.
* `:streams` - One or more atoms with the stream names to subscribe to.
* `:user_agent` - Custom user-agent header string
"""
@type client_options :: [client_option()]
# TODO: Is only run as a genserver from tests, so this should go away or be
# replaced by a supervisor.
use GenServer
@impl true
@spec init(term) :: {:ok, any}
def init(_) do
{:ok, {}}
end
@doc """
Start a supervisor tree to receive and relay server-side events.
## Arguments
- `options` - Keyword list,
- `{:adapter, module}` - Override HTTPoison adapter.
- `{:endpoint, url}` - Override default endpoint.
- `{:send_to, pid | module}` - Instead of using the built-in streaming relay,
send the events directly to your own process.
- `{:streams, atom | [atom]}` - Select which streams to listen to. An updated list can be
[found here](https://stream.wikimedia.org/?doc#/Streams). Required.
"""
@spec start_link(client_options()) :: GenServer.on_start()
def start_link(args) do
RelaySupervisor.start_link(args)
end
@doc """
Indefinitely capture subscribed events and relay them as a `Stream`.
"""
@spec stream(client_options()) :: Enumerable.t()
def stream(options \\ []) do
GenStage.stream([Relay], options)
end
end