lib/kelvin/in_order_subscription.ex

defmodule Kelvin.InOrderSubscription do
  @moduledoc """
  A subscription producer which processes events in order as they appear
  in the EventStoreDB

  ## Options

  * `:name` - (optional) the GenServer name for this producer
  * `:stream_name` - (required) the stream name to which to subscribe
  * `:connection` - (required) the Extreme client module to use as a
    connection to the EventStoreDB. This may either be the name of the
    Extreme client module or its pid.
  * `:restore_stream_position!` - (required) a function which determines
    the stream position from which this listener should begin after initializing
    or restarting. Values may be either an MFA tuple or a 0-arity anonymous
    function.
  * `:subscribe_on_init?` - (required) a function which determines whether
    the producer should subscribe immediately after starting up. Values may
    be either an MFA tuple or a 0-arity anonymous function. The function
    should return either `true` to subscribe immediately on initialization or
    `false` if the author intends on manually subscribing the producer. This
    producer can be manually subscribed by `send/2`ing a message of
    `:subscribe` to the process.
  * `:subscribe_after` - (default: `Enum.random(3_000..5_000)`) the amount of
    time to wait after initializing to query the `:subscribe_on_init?` option.
    This can be useful to prevent all producers from trying to subscribe at
    the same time and to await an active connection to the EventStoreDB.
  * `:catch_up_chunk_size` - (default: `256`) the number of events to query
    for each read chunk while catching up. This option presents a trade-off
    between network queries and query duration over the network.
  """

  use GenStage
  require Logger

  defstruct [
    :config,
    :subscription,
    :self,
    :max_buffer_size,
    demand: 0,
    buffer: :queue.new(),
    buffer_size: 0
  ]

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, Keyword.take(opts, [:name]))
  end

  @impl GenStage
  def init(opts) do
    max_buffer_size =
      Keyword.get(
        opts,
        :catch_up_chunk_size,
        Application.get_env(:kelvin, :catch_up_chunk_size, 256)
      )

    state = %__MODULE__{
      config: Map.new(opts),
      self: Keyword.get(opts, :name, self()),
      max_buffer_size: max_buffer_size
    }

    Process.send_after(
      self(),
      :check_auto_subscribe,
      opts[:subscribe_after] || Enum.random(3_000..5_000)
    )

    {:producer, state}
  end

  @impl GenStage
  def handle_info(:check_auto_subscribe, state) do
    identifier = "#{inspect(__MODULE__)} (#{inspect(state.self)})"

    if do_function(state.config.subscribe_on_init?) do
      Logger.info("#{identifier} subscribing to '#{state.config.stream_name}'")

      GenStage.async_info(self(), :subscribe)
    else
      # coveralls-ignore-start
      Logger.info(
        "#{identifier} did not subscribe to '#{state.config.stream_name}'"
      )

      # coveralls-ignore-stop
    end

    {:noreply, [], state}
  end

  def handle_info(:subscribe, state) do
    if state.subscription do
      # coveralls-ignore-start
      Logger.warn("#{inspect(__MODULE__)} is already subscribed.")
      # coveralls-ignore-stop
    else
      case subscribe(state) do
        {:ok, sub} ->
          Process.link(sub)
          {:noreply, [], put_in(state.subscription, sub)}

        # coveralls-ignore-start
        {:error, reason} ->
          {:stop, reason, state}

          # coveralls-ignore-stop
      end
    end
  end

  def handle_info(_info, state), do: {:noreply, [], state}

  @impl GenStage
  def handle_call({:on_event, event}, from, state) do
    # when the current demand is 0, we should
    case state do
      %{demand: 0, buffer_size: size, max_buffer_size: max}
      when size + 1 == max ->
        {:noreply, [], enqueue(state, {event, from})}

      %{demand: 0} ->
        {:reply, :ok, [], enqueue(state, event)}

      %{demand: demand} ->
        {:reply, :ok, [{state.self, event}], put_in(state.demand, demand - 1)}
    end
  end

  @impl GenStage
  def handle_demand(demand, state) do
    dequeue_events(state, demand, [])
  end

  defp dequeue_events(%{buffer_size: size} = state, demand, events)
       when size == 0 or demand == 0 do
    {:noreply, :lists.reverse(events), put_in(state.demand, demand)}
  end

  defp dequeue_events(state, demand, events) do
    case dequeue(state) do
      {{:value, {event, from}}, state} ->
        GenStage.reply(from, :ok)
        dequeue_events(state, demand - 1, [{state.self, event} | events])

      {{:value, event}, state} ->
        dequeue_events(state, demand - 1, [{state.self, event} | events])
    end
  end

  defp dequeue(state) do
    case :queue.out(state.buffer) do
      # coveralls-ignore-start
      {:empty, buffer} ->
        # coveralls-ignore-stop
        {:empty, %{state | buffer: buffer, buffer_size: 0}}

      {value, buffer} ->
        {value, %{state | buffer: buffer, buffer_size: state.buffer_size - 1}}
    end
  end

  defp subscribe(state) do
    state.config.connection
    |> Extreme.RequestManager._name()
    |> GenServer.call(
      {:read_and_stay_subscribed, self(),
       {state.config.stream_name,
        do_function(state.config.restore_stream_position!) + 1,
        state.max_buffer_size, true, false, :infinity}},
      :infinity
    )
  end

  defp do_function(func) when is_function(func, 0), do: func.()

  defp do_function({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do
    apply(m, f, a)
  end

  defp enqueue(state, element) do
    %{
      state
      | buffer: :queue.in(element, state.buffer),
        buffer_size: state.buffer_size + 1
    }
  end
end