lib/event_streams.ex

defmodule Wiki.EventStreams do
  @moduledoc """
  This module reads from an infinite stream of [server-sent events](https://en.wikipedia.org/wiki/Server-sent_events)
  annotating actions such as editing or patrolling, as they happen on Wikimedia projects.

  For more about the public wiki streams and their format, see
  [EventStreams on Wikitech](https://wikitech.wikimedia.org/wiki/EventStreams)

  ## Examples

  Start reading the page creation feed, and expose as a GenStage.stream:

  ```elixir
  Wiki.EventStreams.start_link(streams: "page-create")
  Wiki.EventStreams.stream()
  |> Stream.take(6)
  |> Enum.to_list
  ```

  Combine multiple feeds,

  ```elixir
  Wiki.EventStreams.start_link(streams: ["revision-create", "revision-score"])
  Wiki.EventStreams.stream()
  |> Stream.take(6)
  |> Enum.to_list
  ```

  ## TODO

  * Currently only a single supervisor tree is supported, so calling applications can only read from one stream.
  * Track the restart ID, disconnect from the feed at some maximum queue size.  Reconnect as demand resumes.
  Application-lifetime or permanent storage for the restart ID tracking, for consumers that need an at-least-once
  guarantee.
  """

  defmodule Relay do
    @moduledoc false

    use GenStage

    @type state :: {:queue.queue(), integer}

    @type reply :: {:noreply, [map], state}

    @doc """
    ## Arguments

    * Client configuration.  `:streams` is required.
    """
    @spec start_link(Wiki.EventStreams.client_options()) :: GenServer.on_start()
    def start_link(opts) do
      {:ok, relay_pid} = GenStage.start_link(__MODULE__, opts, name: __MODULE__)
      # FIXME: Rationalize the supervision tree.
      source_opts = Keyword.put_new(opts, :stream_to, relay_pid)

      {:ok, _} =
        Supervisor.start_link([{Wiki.EventStreams.Source, source_opts}], strategy: :one_for_one)

      {:ok, relay_pid}
    end

    @impl true
    @spec init(any) :: {:producer, state}
    def init(_) do
      {:producer, {:queue.new(), 0}}
    end

    @impl true
    @spec handle_info(EventsourceEx.Message.t(), state) :: reply
    def handle_info(message, {queue, pending_demand}) do
      message
      |> decode_message_data()
      |> :queue.in(queue)
      # FIXME: Suppress reply until above min_demand or periodic timeout has elapsed.
      |> dispatch_events(pending_demand)
    end

    @doc """
    Return the requested number of events

    ## Arguments

    - `demand` - Number of new events requested by the consumer.
    - `state` - queue and pending demand
    """
    @impl true
    @spec handle_demand(pos_integer, state) :: reply
    def handle_demand(demand, {queue, pending_demand}) do
      dispatch_events(queue, demand + pending_demand)
    end

    @spec dispatch_events(:queue.queue(), integer) :: reply
    defp dispatch_events(queue, demand) do
      available = min(demand, :queue.len(queue))
      {retrieved, queue1} = :queue.split(available, queue)
      events = :queue.to_list(retrieved) |> Enum.reverse()
      {:noreply, events, {queue1, demand - available}}
    end

    @spec decode_message_data(EventsourceEx.Message.t()) :: map
    defp decode_message_data(message) do
      message.data
      |> Jason.decode!()
    end
  end

  defmodule Source do
    @moduledoc false

    alias Wiki.{EventStreams, Util}

    @default_endpoint "https://stream.wikimedia.org/v2/stream/"

    @doc false
    @spec child_spec(EventStreams.client_options()) :: map
    def child_spec(opts \\ []) do
      adapter = opts[:adapter]
      endpoint = opts[:endpoint] || @default_endpoint
      sink = opts[:stream_to]
      user_agent = opts[:user_agent] || Util.default_user_agent()
      url = endpoint <> normalize_streams(opts[:streams])

      %{
        id: Source,
        # FIXME: nicer if we could get the Relay sibling's specific PID each time,
        #  to allow an app to use multiple stream listeners.
        start: {
          EventsourceEx,
          :new,
          [
            url,
            [
              adapter: adapter,
              headers: [{"user-agent", user_agent}],
              stream_to: sink
            ]
          ]
        }
      }
    end

    @spec normalize_streams(atom | [atom]) :: atom | String.t()
    defp normalize_streams(streams)

    defp normalize_streams(streams) when is_list(streams), do: Enum.join(streams, ",")

    defp normalize_streams(streams) when is_atom(streams) or is_binary(streams), do: streams
  end

  defmodule RelaySupervisor do
    @moduledoc false

    # TODO: Should this logic be moved to init/1?
    @doc """
    ## Arguments

    * `opts` - Client keyword options
    """
    @spec start_link(Wiki.EventStreams.client_options()) :: GenServer.on_start()
    def start_link(opts) do
      {:ok, _} = Supervisor.start_link([{Relay, opts}], strategy: :one_for_one)
    end
  end

  @type client_option ::
          {:adapter, module()}
          | {:endpoint, binary()}
          | {:stream_to, GenServer.server()}
          | {:streams, atom | [atom]}
          | {:user_agent, binary()}
  @typedoc """
    * `:adapter` - Override the HTTP adapter.
    * `:endpoint` - Override the default endpoint URL.
    * `:stream_to` - Optional application which will receive the events, otherwise
      they go to the process starting the EventStream.
    * `:streams` - One or more atoms with the stream names to subscribe to.
    * `:user_agent` - Custom user-agent header string
  """
  @type client_options :: [client_option()]

  # TODO: Is only run as a genserver from tests, so this should go away or be
  # replaced by a supervisor.
  use GenServer

  @impl true
  @spec init(term) :: {:ok, any}
  def init(_) do
    {:ok, {}}
  end

  @doc """
  Start a supervisor tree to receive and relay server-side events.

  ## Arguments

  - `options` - Keyword list,
    - `{:adapter, module}` - Override HTTPoison adapter.
    - `{:endpoint, url}` - Override default endpoint.
    - `{:send_to, pid | module}` - Instead of using the built-in streaming relay,
    send the events directly to your own process.
    - `{:streams, atom | [atom]}` - Select which streams to listen to.  An updated list can be
    [found here](https://stream.wikimedia.org/?doc#/Streams).  Required.
  """
  @spec start_link(client_options()) :: GenServer.on_start()
  def start_link(args) do
    RelaySupervisor.start_link(args)
  end

  @doc """
  Indefinitely capture subscribed events and relay them as a `Stream`.
  """
  @spec stream(client_options()) :: Enumerable.t()
  def stream(options \\ []) do
    GenStage.stream([Relay], options)
  end
end