lib/volley/in_order_subscription.ex

defmodule Volley.InOrderSubscription do
  @moduledoc """
  A subscription which guarantees ordering

  An in-order subscription consumes an EventStoreDB stream in order, as if
  subscribed via `Spear.subscribe/4`. InOrder subscriptions are simpler than
  persistent subscriptions and can be used in cases where unordered processing
  is too complicated or undesirable.

  ## Back-pressure

  This producer primarily makes use of `GenStage`'s buffering capabilities to
  provide back-pressure.

  A plain subscription through `Spear.subscribe/4` has no back-pressure. For
  very large streams, a subscriber process may become overwhelmed as the
  process mailbox fills up with events as fast as they can be read from
  the EventStoreDB.

  This producer has two modes:

  - subscription mode, where the stream is subscribed with `Spear.subscribe/4`
    and events are emitted as soon as available
  - reading mode, in which events are emitted on-demand of the consumer

  This producer starts up in reading mode and emits events on-demand as long
  as there are more events to be read. Once the producer reaches the current
  end of the stream, it subscribes using `Spear.subscribe/4` in the switch
  to subscription mode.

  Once the producer has caught up to the end of the stream, it will only
  receive newly appended events, and so may be less likely to become
  overwhelmed. Sustained bursts in appends to the stream may eventually
  overfill the `GenStage` buffer, though.

  ## Writing handlers for in-order subscriptions

  Special care must be taken when writing a consumer for in-order subscriptions.
  Consumers must implement blocking in order to preserve correct ordering
  of events.

  To implement blocking, a consumer must meet these three requirements

  - only one consumer may subscribe to each producer
  - the consumer must `Process.link/1` itself to the producer process
      - the `c:GenStage.init/1` callback is a suitable place to perform this
        linking
  - the consumer must curate its stream position

  Let's build a basic event handler for a in-order subscription with the
  `GenStage` basics

  ```elixir
  defmodule MyHandler do
    use GenStage

    def start_link(_) do
      GenStage.start_link(__MODULE__, :ok)
    end

    @impl GenStage
    def init(:ok) do
      {:consumer, :ok, subscribe_to: [MyProducer]}
    end

    @impl GenStage
    def handle_events(events, _from, state) do
      IO.inspect(events, label: "events")

      {:noreply, [], state}
    end
  end
  ```

  This is a very minimal consumer that starts up, subscribes to `MyProducer`,
  and handles events by outputting them with `IO.inspect/2`. We can start
  up this handler and the producer like so:

  ```elixir
  in_order_subscription_settings = [
    name: MyProducer,
    connection: MySpearClient,
    stream_name: "some_stream",
    ..
  ]

  [
    {Volley.InOrderSubscription, in_order_subscription_settings},
    MyHandler
  ]
  |> Supervisor.start_link(strategy: :one_for_one)
  ```

  This consumer doesn't follow our rules though. If we start it up, we'll see
  that we're handling multiple events at once, and if we restart it we'll see
  it start from the beginning of the stream. Let's restrict the number of
  events it can process at once by tuning the `:max_demand` down to `1`:

  ```elixir
  @impl GenStage
  def init(:ok) do
    {:consumer, :ok, subscribe_to: [{MyProducer, max_demand: 1}]}
  end
  ```

  Now we're handling events one-by-one in our handler so we can match on a
  single event in `c:GenStage.handle_events/3`

  ```elixir
  def handle_events([event], _from, state) do
    IO.inspect(event.metadata.stream_revision, label: "handling event no.")
    ..
  ```

  Note that it is simpler but not necessary to set a `:max_demand` of 1:
  in order to handle events in order, the consumer must set its stream position
  after every successful handle of an event, such as with a routine like so

  ```elixir
  def handle_events(events, _from, state) do
    Enum.each(events, fn event ->
      :ok = handle_one_event(event)

      :ok = update_stream_position(event)
    end)

    {:noreply, [], state}
  end
  ```

  A consumer can break ordering by attempting to handle all events in parallel
  or without updating the stream position on every successful handle. Consider a
  scenario where a consumer attempts to handle events `[1, 2, 3]`. If the
  consumer successfully handles 1 and 3 but fails to handle 2, the consumer
  cannot write a stream position number that fully describes its position in the
  stream. This may not be a concern if whatever side-effects the handler is
  committing are idempotent.

  This producer reads events in chunks at least as large as the demand from
  the consumer, so setting a very low `:max_demand` does not necessarily
  increase the number of network calls.

  Now if we start up our handler, we'll see it churning through each event
  in order. Let's introduce a bit of failure into the handler though.
  Defensive programming is typically discouraged in OTP applications, so let's
  do a `raise/1` in our handling code to simulate a situation like a failing
  bang (`!`) function or bad match:

  ```elixir
  def handle_events([event], _from, state) do
    revision = event.metadata.stream_revision

    IO.inspect(revision, label: "handling event no.")

    if revision == 42 do
      raise "aaaaah!"
    end

    {:noreply, [], state}
  end
  ```

  Now if we run our pipeline on a stream longer than 42 events, the handler
  will crash. Since producers and consumers are not linked by default in
  `GenStage`, the exit of the consumer will leave the producer running.
  This means that we will see output like

  ```
  handling event no.: 41
  handling event no.: 42
  21:03:07.107 [error] GenServer #PID<0.266.0> terminating
  ** (RuntimeError) aaaaah!
  handling event no.: 43
  ```

  The default restart strategy of our consumer will start a new process which
  will subscribe to the producer and handle the next event. This means that
  event 42 is effectively skipped which breaks ordered processing. To remedy
  this we need to notify the producer that the processing for an event has
  failed by linking together the producer and consumer.

  ```elixir
  @impl GenStage
  def init(:ok) do
    MyProducer |> GenServer.whereis() |> Process.link()

    {:consumer, :ok, subscribe_to: [{MyProducer, max_demand: 1}]}
  end
  ```

  Now when the handler exits on event 42, it will also exit the producer.
  With the producer and consumer that we have so far, this will result in
  the consumer restarting processing from the beginning of the stream.
  Again, this breaks ordering. We need the consumer to curate its position
  in the stream in order to keep a consistent order.

  For this example, we'll use `:ets` to hold the stream position of our handler
  in memory. This is useful and easy to set up for an example, but `:ets`
  is an in-memory cache which will clear out when the service stops. Production
  storage for stream positions should be more persistent: e.g. a PostgreSQL
  row or an mnesia record.

  Outside of our supervision tree for the producer and consumer we'll create
  an `:ets` table:

  ```elixir
  :ets.new(:stream_positions, [:set, :public, :named_table])
  ```

  And now add a function to our handler so the producer can restore a stream
  position from this table

  ```elixir
  def fetch_stream_position! do
    case :ets.lookup(:stream_positions, __MODULE__) do
      [{__MODULE__, position}] -> position
      [] -> :start
    end
  end
  ```

  And add that MFA to the producer's options:

  ```elixir
  in_order_subscription_settings = [
    name: MyProducer,
    connection: MySpearClient,
    stream_name: "some_stream",
    restore_stream_position!: {MyHandler, :fetch_stream_position!, []},
    subscribe_on_init?: {Function, :identity, [true]}
  ]
  ```

  Now the producer will fetch the current stream position on start-up, so
  even if the processes crash and need to be restarted, the handler will
  keep a consistent position in the subscription.

  Finally we'll store the stream position in the consumer. This should
  only occur after the consumer has done any side-effects or processing
  prone to failure. Ideally, the stream position should be persisted in a
  transaction with any side-effects.

  ```elixir
  @impl GenStage
  def handle_events([event], _from, state) do
    revision = event.metadata.stream_revision

    IO.inspect(revision, label: "handling event no.")

    if revision == 42 do
      raise "aaaaah!"
    end

    :ets.insert(:stream_positions, {__MODULE__, revision})

    {:noreply, [], state}
  end
  ```

  With this final change our consumer will read each event in the stream
  in order, reach event 42, raise, retry event 42, raise, and then the
  supervisor process will shut down. This is the essence of a blocking
  subscription: once the pipeline reaches an event which it cannot process,
  the entire pipeline is halted. This is generally an undesirable
  behavior: a code-wise or manual change is usually needed to resolve the
  blockage. Persistent subscriptions (see `Volley.PersistentSubscription`)
  offer much more flexibility around ordering, batching, and concurrency
  thanks to the asynchronous ack and nack workflow and the EventStoreDB's
  parking system, but do not guarantee event ordering.

  Altogether our handler looks like this:

  ```elixir
  defmodule MyHandler do
    use GenStage

    def start_link(_) do
      GenStage.start_link(__MODULE__, :ok)
    end

    @impl GenStage
    def init(:ok) do
      MyProducer |> GenServer.whereis() |> Process.link()

      {:consumer, :ok, subscribe_to: [{MyProducer, max_demand: 1}]}
    end

    @impl GenStage
    def handle_events([event], _from, state) do
      revision = event.metadata.stream_revision

      IO.inspect(revision, label: "handling event no.")

      if revision == 42 do
        raise "aaaaah!"
      end

      :ets.insert(:stream_positions, {__MODULE__, revision})

      {:noreply, [], state}
    end

    def fetch_stream_position! do
      case :ets.lookup(:stream_positions, __MODULE__) do
        [{__MODULE__, position}] -> position
        [] -> :start
      end
    end
  end
  ```

  ## Configuration

  * `:connection` - (required) a `t:Spear.Connection.t/0` to use for connecting
    to the EventStoreDB

  * `:stream_name` - (required) the EventStoreDB stream to read

  * `:restore_stream_position!` - (required) a 0-arity function to invoke
    to retrieve the stream position on start-up of the subscription.
    This function should read from the source to which the consumer is writing
    the stream position. A positive integer, a `t:Spear.Event.t/0`, or the
    atoms `:start` or `:end` may be returned. `:start` starts the subscription
    at the first event in the stream while end immediately subscribes the
    producer to the end of the stream. This function may either be a function
    capture (or anonymous function) or an MFA tuple.

  * `:subscribe_on_init?` - (default: `fn -> true end`) a 0-arity function to
    invoke which determines whether this producer should start producing events
    after starting up. If this function returns false, the producer must
    be subscribed manually by sending a `:subscribe` message. This function
    may either be a function capture (or anonymous function) or an MFA tuple.

  * `:subscribe_after` - (default: `0`) a period in ms to wait until the
    producer should query the `:subscribe_on_init?` function. This can be useful
    if the `:subscribe_on_init?` function reaches out to an external service
    which may not be immediately available on start-up.

  * `:read_opts` - (default: `[]`) options to pass to `Spear.read_stream/3`.
    The `:max_count` option may be worth tuning to achieve good performance:
    a stream of very small events may benefit from the batch-reading of a large
    max-count while a stream of very large events may be overwhelmed by a large
    max-count and need smaller read sizes.

  Remaining options are passed to `GenStage.start_link/3` and the
  `{:producer, state, opts}` tuple in `c:GenStage.init/1`.
  """

  @default_read_size 100

  use GenStage
  import Volley
  require Logger

  defstruct [
    :connection,
    :subscription,
    :stream_name,
    :restore_stream_position!,
    :self,
    demand: 0,
    subscribe_after: 0,
    subscribe_on_init?: {Volley, :yes, []},
    producing?: false,
    read_opts: []
  ]

  @doc false
  def start_link(opts) do
    {start_link_opts, opts} = pop_genserver_opts(opts)

    GenStage.start_link(__MODULE__, opts, start_link_opts)
  end

  @impl GenStage
  def init(opts) do
    self = Keyword.get(opts, :name, self())
    {producer_opts, opts} = pop_producer_opts(opts)

    state =
      struct(__MODULE__, opts)
      |> Map.put(:self, self)

    Process.send_after(self(), :check_auto_subscribe, subscribe_after(state))

    {:producer, state, producer_opts}
  end

  @impl GenStage
  def handle_demand(demand, state) do
    with true <- state.producing?,
         nil <- state.subscription,
         {:ok, events} <- read_stream(state, demand) do
      {:noreply, put_self(events, state), save_position(state, events)}
    else
      false ->
        {:noreply, [], update_in(state.demand, &(&1 + demand))}

      subscription when is_reference(subscription) ->
        {:noreply, [], update_in(state.demand, &(&1 + demand))}

      {:done, events} ->
        GenStage.async_info(self(), :switch_to_subscription)

        {:noreply, put_self(events, state), save_position(state, events)}

      # coveralls-ignore-start
      {:error, reason} ->
        {:stop, reason, state}
        # coveralls-ignore-stop
    end
  end

  @impl GenStage
  def handle_info(:subscribe, state) do
    handle_demand(state.demand, %__MODULE__{state | producing?: true})
  end

  def handle_info(:check_auto_subscribe, state) do
    identifier = "#{inspect(__MODULE__)} (#{inspect(state.self)})"

    if do_function(state.subscribe_on_init?) do
      Logger.info("#{identifier} subscribing to '#{state.stream_name}'")

      GenStage.async_info(self(), :subscribe)
    else
      # coveralls-ignore-start
      Logger.info("#{identifier} did not subscribe to '#{state.stream_name}'")

      # coveralls-ignore-stop
    end

    {:noreply, [], state}
  end

  def handle_info(:switch_to_subscription, state) do
    case subscribe(state) do
      {:ok, sub} ->
        {:noreply, [], put_in(state.subscription, sub)}

      # coveralls-ignore-start
      {:error, reason} ->
        {:stop, reason, state}
        # coveralls-ignore-stop
    end
  end

  def handle_info(%Spear.Event{} = event, state) do
    {:noreply, [put_self(event, state)], save_position(state, event)}
  end

  # coveralls-ignore-start
  def handle_info(%Spear.Filter.Checkpoint{}, state) do
    {:noreply, [], state}
  end

  def handle_info({:eos, reason}, state) do
    {:stop, reason, state}
  end

  # coveralls-ignore-stop

  defp read_stream(state, demand) do
    read_size = Keyword.get(state.read_opts, :max_count, @default_read_size)

    read_size = max(demand, read_size)
    position = position(state)
    # number of messages to drop because reading is inclusive on the :from
    drop_count = if position == :start, do: 0, else: 1

    opts =
      Map.get(state, :read_opts, [])
      |> Keyword.merge(
        from: position,
        max_count: read_size + drop_count
      )

    with position when position != :end <- position,
         {:ok, events} <-
           Spear.read_stream(state.connection, state.stream_name, opts),
         events when length(events) < read_size <-
           events |> Enum.drop(drop_count) do
      {:done, events}
    else
      :end ->
        {:done, []}

      events when is_list(events) ->
        {:ok, events}

      # coveralls-ignore-start
      error ->
        error
        # coveralls-ignore-stop
    end
  end

  defp subscribe(state) do
    opts =
      Map.get(state, :read_opts, [])
      |> Keyword.merge(from: position(state))

    Spear.subscribe(state.connection, self(), state.stream_name, opts)
  end

  defp position(%{position: position}), do: position

  defp position(%{restore_stream_position!: restore_function}) do
    do_function(restore_function)
  end

  defp save_position(state, []), do: state

  defp save_position(state, events) when is_list(events) do
    save_position(state, List.last(events))
  end

  defp save_position(state, event) do
    Map.put(state, :position, event)
  end

  defp put_self(events, state) when is_list(events) do
    Enum.map(events, &put_self(&1, state))
  end

  defp put_self(%Spear.Event{} = event, state) do
    put_in(event.metadata[:producer], state.self)
  end

  # coveralls-ignore-start
  defp subscribe_after(%__MODULE__{subscribe_after: nil}),
    do: Enum.random(3_000..5_000)

  defp subscribe_after(%__MODULE__{subscribe_after: subscribe_after}),
    do: subscribe_after

  defp do_function(function) when is_function(function, 0) do
    function.()
  end

  defp do_function({m, f, a}) do
    apply(m, f, a)
  end

  # coveralls-ignore-stop
end