Skip to main content

lib/commanded/event_store/adapters/eventsourcingdb/event_publisher.ex

defmodule Commanded.EventStore.Adapters.EventSourcingDB.EventPublisher do
  @moduledoc false
  @doc """
  GenServer that observes events from EventSourcingDB and distributes them
  to subscribers via the Registry.

  Responsibilities:
  - Subscribes to ESDB observe_events endpoint
  - Tracks stream_version per stream (transient subscriptions)
  - Converts ESDB Event to RecordedEvent with correct stream_version
  - Publishes to $all registry (all streams)
  - Publishes to stream-specific registry
  """

  use GenServer

  require Logger

  alias Commanded.EventStore.Adapters.EventSourcingDB.EventMapper
  alias Commanded.EventStore.Adapters.EventSourcingDB.StreamMapper
  alias Commanded.EventStore.RecordedEvent
  alias EventSourcingDB.Event
  alias EventSourcingDB.ObserveEventsOptions

  @observer_restart_delay 250

  defmodule State do
    @moduledoc false
    defstruct [
      :client,
      :event_store,
      :pubsub,
      :observer_registry,
      :stream_prefix,
      :subject,
      :observer_pid,
      :observer_ref,
      stream_versions: %{}
    ]
  end

  @spec start_link(
          {EventSourcingDB.Client.t(), atom(), atom(), atom(), String.t()},
          GenServer.options()
        ) ::
          GenServer.on_start()
  def start_link(
        {client, event_store, pubsub_name, observer_registry_name, stream_prefix},
        opts \\ []
      ) do
    subject = StreamMapper.to_subject(stream_prefix)

    state = %State{
      client: client,
      event_store: event_store,
      pubsub: pubsub_name,
      observer_registry: observer_registry_name,
      stream_prefix: stream_prefix,
      subject: subject
    }

    GenServer.start_link(__MODULE__, state, opts)
  end

  @impl true
  def init(%State{} = state) do
    Registry.register(state.observer_registry, state.stream_prefix, self())
    {:ok, state, {:continue, :start_observer}}
  end

  @impl true
  def handle_continue(:start_observer, state) do
    {:noreply, start_observer(state)}
  end

  @impl true
  def handle_cast({:stream_event, %Event{} = event}, state) do
    if matches_stream_prefix?(event, state.stream_prefix) do
      stream_id = StreamMapper.get_stream_id(event.subject, state.stream_prefix)
      new_version = Map.get(state.stream_versions, stream_id, 0) + 1
      recorded_event = EventMapper.to_recorded_event(event, new_version, state.stream_prefix)
      :ok = publish_event(recorded_event, state)

      {:noreply,
       %{state | stream_versions: Map.put(state.stream_versions, stream_id, new_version)}}
    else
      {:noreply, state}
    end
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _pid, _reason}, %State{observer_ref: ref} = state) do
    Process.send_after(self(), :restart_observer, @observer_restart_delay)
    {:noreply, %{state | observer_pid: nil, observer_ref: nil}}
  end

  def handle_info(:restart_observer, %State{observer_pid: nil} = state) do
    {:noreply, start_observer(state)}
  end

  def handle_info(:restart_observer, state), do: {:noreply, state}

  def handle_info(_msg, state), do: {:noreply, state}

  @impl true
  def terminate(_reason, %State{observer_pid: pid}) when is_pid(pid) do
    if Process.alive?(pid), do: Process.exit(pid, :shutdown)
    :ok
  end

  def terminate(_reason, _state), do: :ok

  defp publish_event(%RecordedEvent{} = event, state) do
    :ok = publish_to_all(event, state)
    :ok = publish_to_stream(event, state)
  end

  defp publish_to_all(%RecordedEvent{} = event, state) do
    Registry.dispatch(state.pubsub, "$all", fn entries ->
      for {pid, _} <- entries, do: send(pid, {:events, [event]})
    end)
  end

  defp publish_to_stream(%RecordedEvent{} = event, state) do
    %RecordedEvent{stream_id: stream_id} = event

    Registry.dispatch(state.pubsub, stream_id, fn entries ->
      for {pid, _} <- entries, do: send(pid, {:events, [event]})
    end)
  end

  defp start_observer(%State{} = state) do
    parent = self()
    client = state.client
    subject = state.subject
    opts = %ObserveEventsOptions{recursive: true}

    {pid, ref} = spawn_monitor(fn -> run_observer(parent, client, subject, opts) end)

    %{state | observer_pid: pid, observer_ref: ref}
  end

  defp run_observer(parent, client, subject, opts) do
    case EventSourcingDB.observe_events(client, subject, opts) do
      {:ok, stream} -> consume_stream(parent, stream)
      {:error, _reason} -> :ok
    end
  end

  defp consume_stream(parent, stream) do
    try do
      Enum.each(stream, fn
        %Event{} = event -> GenServer.cast(parent, {:stream_event, event})
        _other -> :ok
      end)
    rescue
      _ -> :ok
    catch
      _, _ -> :ok
    end
  end

  defp matches_stream_prefix?(%Event{} = event, stream_prefix) do
    subject_prefix = StreamMapper.to_subject(stream_prefix)
    String.starts_with?(event.subject, subject_prefix)
  end
end