lib/volley/persistent_subscription.ex

defmodule Volley.PersistentSubscription do
  @moduledoc """
  A GenStage/Broadway producer for persistent subscriptions

  Persistent subscriptions are a feature of EventStoreDB which offload the
  responsibilities of tracking processed events, back-pressure, and subscriber
  dispatch to the EventStoreDB server. This allows subscribers to more easily
  implement complex subscription strategies, such as allowing multiple
  subscribers across services without gossip, avoiding head-of-line blocking,
  and enabling concurrent and batch processing schemes and rate-limiting.

  Broadway features an acknowledgement interface which integrates well with
  the persistent subscription `Spear.ack/3` and `Spear.nack/4` system. Consumers
  intending to use this producer should prefer writing handlers with
  `Broadway` (over `GenStage`) where possible.

  ## Configuration

  * `:broadway?` - (default: `false`) whether to emit events as
    `t:Broadway.Message.t/0` messages or as `t:Spear.Event.t/0` events.
    `true` should be set for this option if this producer is being used
    in a Broadway topology, `false` for use in a `GenStage` pipeline.
    When set as `true`, this producer will set the ack interface for
    each message to `Volley.PersistentSubscription.Acknowledger` with the
    proper connection details for sending acks and nacks. When `false`, the
    `Spear.Event.metadata.subscription` field will be replaced with a
    `t:Volley.PersistentSubscription.Subscription.t/0` struct with any necessary
    connection details.

  * `:subscriptions` - (default: `[]`) a list of subscription configurations.
    Broadway does not currently allow more than one producer in a topology,
    however one may wish to subscribe to multiple persistent subscriptions,
    potentially across EventStoreDBs. Since back-pressure is controlled
    on the EventStoreDB side, a handler may specify multiple subscriptions
    in a single producer without any special considerations. The schema of
    each subscription is as follows

    * `:connection` - (required) a `t:Spear.Connection.t/0` process which
      can either be specified as a PID or any valid `t:GenServer.name/0`

    * `:stream_name` - (required) the EventStoreDB stream

    * `:group_name` - (required) the EventStoreDB group name

    * `:opts` - (default: `[]`) options to pass to
      `Spear.connect_to_persistent_subscription/5`. The main use of this
      options field is to configure the `:buffer_size` option (default: `1`).
      The `:buffer_size` option controls the number of events allowed to be
      sent to this producer before any events are acknowledged.

  Remaining options are passed to `GenStage.start_link/3` and the
  `{:producer, state, opts}` tuple in `c:GenStage.init/1`.

  ## Examples

  ```elixir
  defmodule MyHandler do
    use Broadway

    alias Broadway.Message

    def start_link(_opts) do
      subscription_opts = [
        broadway?: true,
        subscriptions: [
          [
            stream_name: "MyStream",
            group_name: inspect(__MODULE__),
            connection: MyApp.SpearClient,
            opts: [
              # 10 events allowed in-flight at a time
              buffer_size: 10
            ]
          ]
        ]
      ]

      Broadway.start_link(__MODULE__,
        name: __MODULE__,
        producer: [
          module: {Volley.PersistentSubscription, subscription_opts}
        ],
        processors: [
          default: [concurrency: 2]
        ],
        batchers: [
          default: [concurrency: 1, batch_size: 5]
        ]
      )
    end

    @impl Broadway
    def handle_message(:default, %Message{} = message, _context) do
      message
      |> Message.put_batcher(:default)
    end

    @impl Broadway
    def handle_batch(:default, messages, _batch_info, context) do
      # do something batchy with messages...
    end
  end
  ```
  """

  use GenStage
  import Volley
  alias __MODULE__.Subscription

  defstruct [:config, subscriptions: %{}]

  # coveralls-ignore-start
  @doc false
  def start_link(opts) do
    {start_link_opts, opts} = pop_genserver_opts(opts)

    GenStage.start_link(__MODULE__, opts, start_link_opts)
  end

  # coveralls-ignore-stop

  @impl GenStage
  def init(opts) do
    {producer_opts, opts} = pop_producer_opts(opts)

    subscriptions =
      opts
      |> Keyword.get(:subscriptions, [])
      |> Enum.map(&Subscription.from_config/1)

    Enum.each(subscriptions, fn sub -> send(self(), {:connect, sub}) end)

    config =
      opts
      |> Map.new()
      |> Map.put(:subscriptions, subscriptions)

    {:producer, %__MODULE__{config: config}, producer_opts}
  end

  @impl GenStage
  def handle_info({:connect, subscription}, state) do
    state =
      case Subscription.connect(subscription) do
        {:ok, %Subscription{ref: ref} = subscription} ->
          put_in(state.subscriptions[ref], subscription)

        :error ->
          state
      end

    {:noreply, [], state}
  end

  def handle_info({:eos, subscription, _reason}, state) do
    {%Subscription{} = sub, state} = pop_in(state.subscriptions[subscription])
    Subscription.reconnect(sub)

    {:noreply, [], state}
  end

  def handle_info(%Spear.Event{} = event, state) do
    {:noreply, [map_event(event, state)], state}
  end

  @impl GenStage
  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end

  if_broadway do
    defp map_event(event, %__MODULE__{config: %{broadway?: true}} = state) do
      subscription = state.subscriptions[event.metadata.subscription]

      %Broadway.Message{
        data: event,
        acknowledger:
          {Volley.PersistentSubscription.Acknowledger, subscription, %{}}
      }
    end
  end

  defp map_event(event, state) do
    update_in(event.metadata.subscription, &state.subscriptions[&1])
  end
end