Skip to main content

lib/commanded/event_store/adapters/eventsourcingdb.ex

defmodule Commanded.EventStore.Adapters.EventSourcingDB do
  alias Commanded.EventStore.EventData
  alias Commanded.EventStore.Adapters.EventSourcingDB.Config
  alias Commanded.EventStore.Adapters.EventSourcingDB.EventMapper
  alias Commanded.EventStore.Adapters.EventSourcingDB.StreamMapper
  alias Commanded.EventStore.Adapters.EventSourcingDB.Subscription
  alias Commanded.EventStore.Adapters.EventSourcingDB.SubscriptionSupervisor

  @behaviour Commanded.EventStore.Adapter

  @impl Commanded.EventStore.Adapter
  @spec child_spec(Commanded.Application.t(), Keyword.t()) ::
          {:ok, [:supervisor.child_spec() | {module, term} | module], map()}
  def child_spec(application, config) do
    event_store =
      case Keyword.get(config, :name) do
        nil -> Module.concat([application, EventSourcingDB])
        name -> Module.concat([name, EventSourcingDB])
      end

    esdb_config = Keyword.get(config, :client, [])

    child_spec = [
      Supervisor.child_spec(
        {Commanded.EventStore.Adapters.EventSourcingDB.Supervisor,
         Keyword.merge(config, event_store: event_store)},
        id: event_store
      )
    ]

    adapter_meta = %{
      event_store: event_store,
      client: Config.client(esdb_config),
      stream_prefix: Keyword.get(config, :stream_prefix, ""),
      source: Keyword.get(config, :source),
      observer_registry: Module.concat([event_store, ObserverRegistry])
    }

    {:ok, child_spec, adapter_meta}
  end

  def ping(adapter_meta) do
    client = client(adapter_meta)
    EventSourcingDB.ping(client)
  end

  def append_to_stream(adapter_meta, subject, expected_version, events),
    do: append_to_stream(adapter_meta, subject, expected_version, events, [])

  @impl Commanded.EventStore.Adapter
  @spec append_to_stream(
          map(),
          String.t(),
          Commanded.EventStore.Adapter.expected_version(),
          list(EventData.t()),
          Keyword.t()
        ) ::
          :ok
          | {:error, :wrong_expected_version}
          | {:error, term()}
  def append_to_stream(adapter_meta, subject, expected_version, events, _opts) do
    client = client(adapter_meta)
    stream_prefix = stream_prefix(adapter_meta)
    source = source(adapter_meta)
    subject = StreamMapper.to_subject(stream_prefix, subject)

    event_candidates =
      Enum.map(events, fn event_data ->
        metadata = event_data.metadata || %{}

        data =
          EventMapper.serialize_event_data(
            event_data.data,
            event_data.correlation_id,
            event_data.causation_id,
            metadata
          )

        %EventSourcingDB.EventCandidate{
          source: source,
          subject: subject,
          type: Map.get(metadata, "type", event_data.event_type),
          data: data
        }
      end)

    preconditions = expected_version_to_preconditions(subject, expected_version)

    case EventSourcingDB.write_events(client, event_candidates, preconditions) do
      {:ok, _events} -> :ok
      {:error, reason} -> handle_write_error(reason, expected_version)
    end
  end

  def stream_forward(adapter_meta, subject),
    do: stream_forward(adapter_meta, subject, 0, 1_000)

  def stream_forward(adapter_meta, subject, start_version),
    do: stream_forward(adapter_meta, subject, start_version, 1_000)

  @impl Commanded.EventStore.Adapter
  @spec stream_forward(
          map(),
          String.t(),
          non_neg_integer(),
          non_neg_integer()
        ) ::
          Enumerable.t()
          | {:error, :stream_not_found}
          | {:error, term()}
  def stream_forward(adapter_meta, subject, start_version, _read_batch_size) do
    client = client(adapter_meta)
    stream_prefix = stream_prefix(adapter_meta)
    subject = StreamMapper.to_subject(stream_prefix, subject)

    task =
      Task.async(fn ->
        case EventSourcingDB.read_events(client, subject) do
          {:ok, events_stream} -> {:ok, Enum.to_list(events_stream)}
          {:error, reason} -> {:error, reason}
        end
      end)

    case Task.await(task, 5_000) do
      {:ok, []} ->
        {:error, :stream_not_found}

      {:ok, events} ->
        events
        |> Stream.with_index(1)
        |> Stream.filter(fn {_event, index} -> index >= max(start_version, 1) end)
        # credo:disable-for-next-line Credo.Check.Refactor.Nesting
        |> Stream.map(fn {event, stream_version} ->
          EventMapper.to_recorded_event(event, stream_version, stream_prefix)
        end)

      {:error, _} ->
        {:error, :stream_not_found}
    end
  end

  @impl Commanded.EventStore.Adapter
  def subscribe(adapter_meta, :all), do: subscribe(adapter_meta, "$all")

  @impl Commanded.EventStore.Adapter
  @spec subscribe(map(), String.t()) :: :ok | {:error, term()}
  def subscribe(adapter_meta, stream_uuid) do
    event_store = server_name(adapter_meta)
    pubsub_name = Module.concat([event_store, PubSub])

    case Registry.register(pubsub_name, stream_uuid, []) do
      {:ok, _} ->
        :ok

      # already subscribed, that's fine
      {:error, {:already_registered, _}} ->
        :ok

      {:error, reason} ->
        {:error, reason}
    end
  end

  @impl Commanded.EventStore.Adapter
  @spec subscribe_to(
          map(),
          :all | String.t(),
          String.t(),
          pid(),
          Commanded.EventStore.Adapter.start_from(),
          Keyword.t()
        ) ::
          {:ok, pid()}
          | {:error, :subscription_already_exists}
          | {:error, :too_many_subscribers}
          | {:error, term()}
  def subscribe_to(adapter_meta, :all, subscription_name, subscriber, start_from, opts) do
    subscribe_to(adapter_meta, "$all", subscription_name, subscriber, start_from, opts)
  end

  def subscribe_to(adapter_meta, stream, subscription_name, subscriber, start_from, opts) do
    event_store = server_name(adapter_meta)

    SubscriptionSupervisor.start_subscription(
      event_store,
      stream,
      subscription_name,
      subscriber,
      start_from,
      opts
    )
  end

  @impl Commanded.EventStore.Adapter
  @spec ack_event(map(), pid(), Commanded.EventStore.RecordedEvent.t()) :: :ok
  def ack_event(_adapter_meta, subscription_pid, event) do
    if Process.alive?(subscription_pid) do
      try do
        Subscription.ack(subscription_pid, event)
      catch
        :exit, _ -> :ok
      end
    else
      :ok
    end
  end

  @impl Commanded.EventStore.Adapter
  @spec unsubscribe(map(), pid()) :: :ok
  def unsubscribe(adapter_meta, subscription_pid) when is_pid(subscription_pid) do
    event_store = server_name(adapter_meta)
    SubscriptionSupervisor.stop_subscription(event_store, subscription_pid)
  end

  @impl Commanded.EventStore.Adapter
  @spec delete_subscription(map(), String.t() | :all, String.t()) ::
          :ok | {:error, :subscription_not_found} | {:error, term()}
  def delete_subscription(adapter_meta, :all, subscription_name) do
    delete_subscription(adapter_meta, "$all", subscription_name)
  end

  def delete_subscription(adapter_meta, stream, subscription_name) do
    event_store = server_name(adapter_meta)
    stream_prefix = stream_prefix(adapter_meta)

    SubscriptionSupervisor.delete_subscription(
      event_store,
      stream_prefix,
      stream,
      subscription_name
    )
  end

  @impl Commanded.EventStore.Adapter
  @spec read_snapshot(map(), String.t()) ::
          {:ok, Commanded.EventStore.SnapshotData.t()} | {:error, :snapshot_not_found}
  def read_snapshot(_adapter_meta, _source_uuid) do
    {:error, :snapshots_not_supported}
  end

  @impl Commanded.EventStore.Adapter
  @spec record_snapshot(map(), Commanded.EventStore.SnapshotData.t()) ::
          :ok | {:error, term()}
  def record_snapshot(_adapter_meta, _snapshot_data) do
    {:error, :snapshots_not_supported}
  end

  @impl Commanded.EventStore.Adapter
  @spec delete_snapshot(map(), String.t()) ::
          :ok | {:error, term()}
  def delete_snapshot(_adapter_meta, _source_uuid) do
    {:error, :snapshots_not_supported}
  end

  defp client(adapter_meta), do: Map.fetch!(adapter_meta, :client)

  defp server_name(adapter_meta) do
    Map.fetch!(adapter_meta, :event_store)
  end

  defp stream_prefix(adapter_meta) do
    Map.get(adapter_meta, :stream_prefix, "")
  end

  defp source(adapter_meta) do
    Map.fetch!(adapter_meta, :source)
  end

  defp expected_version_to_preconditions(subject, :no_stream) do
    [%EventSourcingDB.IsSubjectPristine{subject: subject}]
  end

  defp expected_version_to_preconditions(subject, :stream_exists) do
    [%EventSourcingDB.IsSubjectPopulated{subject: subject}]
  end

  defp expected_version_to_preconditions(_subject, :any_version), do: []

  defp expected_version_to_preconditions(subject, 0) do
    [%EventSourcingDB.IsSubjectPristine{subject: subject}]
  end

  defp expected_version_to_preconditions(subject, expected_version)
       when is_integer(expected_version) and expected_version > 0 do
    [
      %EventSourcingDB.IsEventQLQueryTrue{
        query:
          "FROM e IN events WHERE e.subject == '#{subject}' PROJECT INTO COUNT() == #{expected_version}"
      }
    ]
  end

  defp handle_write_error(%EventSourcingDB.Errors.ApiError{reason: reason}, :no_stream) do
    case reason do
      "state conflict: precondition failed\n" -> {:error, :stream_exists}
      "state conflict: subject not found\n" -> {:error, :stream_not_found}
      _ -> {:error, reason}
    end
  end

  defp handle_write_error(%EventSourcingDB.Errors.ApiError{reason: reason}, :stream_exists) do
    case reason do
      "state conflict: precondition failed\n" -> {:error, :stream_not_found}
      "state conflict: subject not found\n" -> {:error, :stream_not_found}
      _ -> {:error, reason}
    end
  end

  defp handle_write_error(%EventSourcingDB.Errors.ApiError{reason: reason}, expected_version)
       when is_integer(expected_version) do
    case reason do
      "state conflict: precondition failed\n" -> {:error, :wrong_expected_version}
      "state conflict: subject not found\n" -> {:error, :wrong_expected_version}
      _ -> {:error, reason}
    end
  end

  defp handle_write_error(reason, _expected_version), do: {:error, reason}
end