lib/commanded/event_store/adapters/in_memory.ex

defmodule Commanded.EventStore.Adapters.InMemory do
  @moduledoc """
  An in-memory event store adapter implemented as a `GenServer` process which
  stores events in memory only.

  This is only designed for testing purposes.
  """

  @behaviour Commanded.EventStore.Adapter

  use GenServer

  defmodule State do
    @moduledoc false

    defstruct [
      :name,
      :serializer,
      persisted_events: [],
      streams: %{},
      transient_subscribers: %{},
      persistent_subscriptions: %{},
      snapshots: %{},
      next_event_number: 1
    ]
  end

  alias Commanded.EventStore.Adapters.InMemory.{PersistentSubscription, State, Subscription}
  alias Commanded.EventStore.{EventData, RecordedEvent, SnapshotData}
  alias Commanded.UUID

  def start_link(opts \\ []) do
    {start_opts, in_memory_opts} =
      Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt, :hibernate_after])

    state = %State{
      name: Keyword.fetch!(opts, :name),
      serializer: Keyword.get(in_memory_opts, :serializer)
    }

    GenServer.start_link(__MODULE__, state, start_opts)
  end

  @impl GenServer
  def init(%State{} = state) do
    {:ok, state}
  end

  @impl Commanded.EventStore.Adapter
  def child_spec(application, config) do
    {event_store_name, config} = parse_config(application, config)

    supervisor_name = subscriptions_supervisor_name(event_store_name)

    child_spec = [
      {DynamicSupervisor, strategy: :one_for_one, name: supervisor_name},
      %{
        id: event_store_name,
        start: {__MODULE__, :start_link, [config]}
      }
    ]

    {:ok, child_spec, %{name: event_store_name}}
  end

  @impl Commanded.EventStore.Adapter
  def append_to_stream(adapter_meta, stream_uuid, expected_version, events, _opts \\ []) do
    event_store = event_store_name(adapter_meta)

    GenServer.call(event_store, {:append, stream_uuid, expected_version, events})
  end

  @impl Commanded.EventStore.Adapter
  def stream_forward(adapter_meta, stream_uuid, start_version \\ 0, read_batch_size \\ 1_000)

  def stream_forward(adapter_meta, stream_uuid, start_version, _read_batch_size) do
    event_store = event_store_name(adapter_meta)

    GenServer.call(event_store, {:stream_forward, stream_uuid, start_version})
  end

  @impl Commanded.EventStore.Adapter
  def subscribe(adapter_meta, stream_uuid) do
    event_store = event_store_name(adapter_meta)

    GenServer.call(event_store, {:subscribe, stream_uuid, self()})
  end

  @impl Commanded.EventStore.Adapter
  def subscribe_to(adapter_meta, stream_uuid, subscription_name, subscriber, start_from, opts) do
    event_store = event_store_name(adapter_meta)

    subscription = %PersistentSubscription{
      concurrency_limit: Keyword.get(opts, :concurrency_limit),
      name: subscription_name,
      partition_by: Keyword.get(opts, :partition_by),
      start_from: start_from,
      stream_uuid: stream_uuid
    }

    GenServer.call(event_store, {:subscribe_to, subscription, subscriber})
  end

  @impl Commanded.EventStore.Adapter
  def ack_event(adapter_meta, subscription, %RecordedEvent{} = event) when is_pid(subscription) do
    event_store = event_store_name(adapter_meta)

    GenServer.call(event_store, {:ack_event, event, subscription})
  end

  @impl Commanded.EventStore.Adapter
  def unsubscribe(adapter_meta, subscription) when is_pid(subscription) do
    event_store = event_store_name(adapter_meta)

    GenServer.call(event_store, {:unsubscribe, subscription})
  end

  @impl Commanded.EventStore.Adapter
  def delete_subscription(adapter_meta, stream_uuid, subscription_name) do
    event_store = event_store_name(adapter_meta)

    GenServer.call(event_store, {:delete_subscription, stream_uuid, subscription_name})
  end

  @impl Commanded.EventStore.Adapter
  def read_snapshot(adapter_meta, source_uuid) do
    event_store = event_store_name(adapter_meta)

    GenServer.call(event_store, {:read_snapshot, source_uuid})
  end

  @impl Commanded.EventStore.Adapter
  def record_snapshot(adapter_meta, snapshot) do
    event_store = event_store_name(adapter_meta)

    GenServer.call(event_store, {:record_snapshot, snapshot})
  end

  @impl Commanded.EventStore.Adapter
  def delete_snapshot(adapter_meta, source_uuid) do
    event_store = event_store_name(adapter_meta)

    GenServer.call(event_store, {:delete_snapshot, source_uuid})
  end

  def reset!(application, config \\ []) do
    {event_store, _config} = parse_config(application, config)

    GenServer.call(event_store, :reset!)
  end

  @impl GenServer
  def handle_call({:append, stream_uuid, expected_version, events}, _from, %State{} = state) do
    %State{streams: streams} = state

    stream_events = Map.get(streams, stream_uuid)

    {reply, state} =
      case {expected_version, stream_events} do
        {:any_version, nil} ->
          persist_events(state, stream_uuid, [], events)

        {:any_version, stream_events} ->
          persist_events(state, stream_uuid, stream_events, events)

        {:no_stream, stream_events} when is_list(stream_events) ->
          {{:error, :stream_exists}, state}

        {:no_stream, nil} ->
          persist_events(state, stream_uuid, [], events)

        {:stream_exists, nil} ->
          {{:error, :stream_not_found}, state}

        {:stream_exists, stream_events} ->
          persist_events(state, stream_uuid, stream_events, events)

        {0, nil} ->
          persist_events(state, stream_uuid, [], events)

        {expected_version, nil} when is_integer(expected_version) ->
          {{:error, :wrong_expected_version}, state}

        {expected_version, stream_events}
        when is_integer(expected_version) and length(stream_events) != expected_version ->
          {{:error, :wrong_expected_version}, state}

        {expected_version, stream_events}
        when is_integer(expected_version) and length(stream_events) == expected_version ->
          persist_events(state, stream_uuid, stream_events, events)
      end

    {:reply, reply, state}
  end

  @impl GenServer
  def handle_call({:stream_forward, stream_uuid, start_version}, _from, %State{} = state) do
    %State{streams: streams} = state

    reply =
      case Map.get(streams, stream_uuid) do
        nil ->
          {:error, :stream_not_found}

        events ->
          events
          |> Enum.reverse()
          |> Stream.drop(max(0, start_version - 1))
          |> Stream.map(&deserialize(state, &1))
          |> Enum.map(&set_event_number_from_version(&1, stream_uuid))
      end

    {:reply, reply, state}
  end

  @impl GenServer
  def handle_call({:subscribe, stream_uuid, subscriber}, _from, %State{} = state) do
    %State{transient_subscribers: transient_subscribers} = state

    Process.monitor(subscriber)

    transient_subscribers =
      Map.update(transient_subscribers, stream_uuid, [subscriber], fn subscribers ->
        [subscriber | subscribers]
      end)

    state = %State{state | transient_subscribers: transient_subscribers}

    {:reply, :ok, state}
  end

  @impl GenServer
  def handle_call({:subscribe_to, subscription, subscriber}, _from, %State{} = state) do
    %PersistentSubscription{name: subscription_name} = subscription
    %State{persistent_subscriptions: persistent_subscriptions} = state

    {reply, state} =
      case Map.get(persistent_subscriptions, subscription_name) do
        nil ->
          start_persistent_subscription(state, subscription, subscriber)

        %PersistentSubscription{subscribers: []} = subscription ->
          start_persistent_subscription(state, subscription, subscriber)

        %PersistentSubscription{concurrency_limit: nil} ->
          {{:error, :subscription_already_exists}, state}

        %PersistentSubscription{} = subscription ->
          %PersistentSubscription{concurrency_limit: concurrency_limit, subscribers: subscribers} =
            subscription

          if length(subscribers) < concurrency_limit do
            start_persistent_subscription(state, subscription, subscriber)
          else
            {{:error, :too_many_subscribers}, state}
          end
      end

    {:reply, reply, state}
  end

  @impl GenServer
  def handle_call({:unsubscribe, pid}, _from, %State{} = state) do
    state =
      update_persistent_subscription(state, pid, fn %PersistentSubscription{} = subscription ->
        :ok = stop_subscription(state, pid)

        PersistentSubscription.unsubscribe(subscription, pid)
      end)

    {:reply, :ok, state}
  end

  @impl GenServer
  def handle_call({:delete_subscription, stream_uuid, subscription_name}, _from, %State{} = state) do
    %State{persistent_subscriptions: persistent_subscriptions} = state

    {reply, state} =
      case Map.get(persistent_subscriptions, subscription_name) do
        %PersistentSubscription{stream_uuid: ^stream_uuid, subscribers: []} ->
          state = %State{
            state
            | persistent_subscriptions: Map.delete(persistent_subscriptions, subscription_name)
          }

          {:ok, state}

        nil ->
          {{:error, :subscription_not_found}, state}
      end

    {:reply, reply, state}
  end

  @impl GenServer
  def handle_call({:read_snapshot, source_uuid}, _from, %State{} = state) do
    %State{snapshots: snapshots} = state

    reply =
      case Map.get(snapshots, source_uuid, nil) do
        nil -> {:error, :snapshot_not_found}
        snapshot -> {:ok, deserialize(state, snapshot)}
      end

    {:reply, reply, state}
  end

  @impl GenServer
  def handle_call({:record_snapshot, %SnapshotData{} = snapshot}, _from, %State{} = state) do
    %SnapshotData{source_uuid: source_uuid} = snapshot
    %State{snapshots: snapshots} = state

    state = %State{state | snapshots: Map.put(snapshots, source_uuid, serialize(state, snapshot))}

    {:reply, :ok, state}
  end

  @impl GenServer
  def handle_call({:delete_snapshot, source_uuid}, _from, %State{} = state) do
    %State{snapshots: snapshots} = state

    state = %State{state | snapshots: Map.delete(snapshots, source_uuid)}

    {:reply, :ok, state}
  end

  @impl GenServer
  def handle_call(:reset!, _from, %State{} = state) do
    %State{name: name, serializer: serializer, persistent_subscriptions: persistent_subscriptions} =
      state

    for {_name, %PersistentSubscription{subscribers: subscribers}} <- persistent_subscriptions do
      for %{pid: pid} <- subscribers, is_pid(pid) do
        stop_subscription(state, pid)
      end
    end

    initial_state = %State{name: name, serializer: serializer}

    {:reply, :ok, initial_state}
  end

  @impl GenServer
  def handle_call({:ack_event, event, subscriber}, _from, %State{} = state) do
    state = ack_persistent_subscription_by_pid(state, event, subscriber)

    {:reply, :ok, state}
  end

  @impl GenServer
  def handle_info({:DOWN, _ref, :process, pid, _reason}, %State{} = state) do
    state =
      state
      |> remove_persistent_subscription_by_pid(pid)
      |> remove_transient_subscriber_by_pid(pid)

    {:noreply, state}
  end

  defp persist_events(%State{} = state, stream_uuid, existing_events, new_events) do
    %State{
      next_event_number: next_event_number,
      persisted_events: persisted_events,
      persistent_subscriptions: persistent_subscriptions,
      streams: streams
    } = state

    initial_stream_version = length(existing_events) + 1
    now = DateTime.utc_now()

    new_events =
      new_events
      |> Enum.with_index(0)
      |> Enum.map(fn {recorded_event, index} ->
        event_number = next_event_number + index
        stream_version = initial_stream_version + index

        map_to_recorded_event(event_number, stream_uuid, stream_version, now, recorded_event)
      end)
      |> Enum.map(&serialize(state, &1))

    stream_events = prepend(existing_events, new_events)
    next_event_number = List.last(new_events).event_number + 1

    state = %State{
      state
      | streams: Map.put(streams, stream_uuid, stream_events),
        persisted_events: prepend(persisted_events, new_events),
        next_event_number: next_event_number
    }

    publish_all_events = Enum.map(new_events, &deserialize(state, &1))

    publish_stream_events =
      Enum.map(publish_all_events, &set_event_number_from_version(&1, stream_uuid))

    state = publish_to_transient_subscribers(state, :all, publish_all_events)
    state = publish_to_transient_subscribers(state, stream_uuid, publish_stream_events)

    persistent_subscriptions =
      Enum.into(persistent_subscriptions, %{}, fn {subscription_name, subscription} ->
        {subscription_name, publish_events(state, subscription)}
      end)

    state = %State{state | persistent_subscriptions: persistent_subscriptions}

    {:ok, state}
  end

  defp set_event_number_from_version(%RecordedEvent{} = event, :all), do: event

  # Event number should equal stream version for stream events.
  defp set_event_number_from_version(%RecordedEvent{} = event, _stream_uuid) do
    %RecordedEvent{stream_version: stream_version} = event

    %RecordedEvent{event | event_number: stream_version}
  end

  defp prepend(list, []), do: list
  defp prepend(list, [item | remainder]), do: prepend([item | list], remainder)

  defp map_to_recorded_event(event_number, stream_uuid, stream_version, now, %EventData{} = event) do
    %EventData{
      causation_id: causation_id,
      correlation_id: correlation_id,
      event_type: event_type,
      data: data,
      metadata: metadata
    } = event

    %RecordedEvent{
      event_id: UUID.uuid4(),
      event_number: event_number,
      stream_id: stream_uuid,
      stream_version: stream_version,
      causation_id: causation_id,
      correlation_id: correlation_id,
      event_type: event_type,
      data: data,
      metadata: metadata,
      created_at: now
    }
  end

  defp start_persistent_subscription(%State{} = state, subscription, subscriber) do
    %State{name: event_store_name, persistent_subscriptions: persistent_subscriptions} = state
    %PersistentSubscription{name: subscription_name, checkpoint: checkpoint} = subscription

    supervisor_name = subscriptions_supervisor_name(event_store_name)
    subscription_spec = Subscription.child_spec(subscriber) |> Map.put(:restart, :temporary)

    {:ok, pid} = DynamicSupervisor.start_child(supervisor_name, subscription_spec)

    Process.monitor(pid)

    checkpoint = if is_nil(checkpoint), do: start_from(state, subscription), else: checkpoint

    subscription = PersistentSubscription.subscribe(subscription, pid, checkpoint)
    subscription = publish_events(state, subscription)

    persistent_subscriptions = Map.put(persistent_subscriptions, subscription_name, subscription)

    state = %State{state | persistent_subscriptions: persistent_subscriptions}

    {{:ok, pid}, state}
  end

  defp publish_events(%State{} = state, %PersistentSubscription{} = subscription) do
    %State{persisted_events: persisted_events, streams: streams} = state
    %PersistentSubscription{checkpoint: checkpoint, stream_uuid: stream_uuid} = subscription

    events =
      case stream_uuid do
        :all -> persisted_events
        stream_uuid -> Map.get(streams, stream_uuid, [])
      end

    position = if checkpoint == 0, do: -1, else: -(checkpoint + 1)

    case Enum.at(events, position) do
      %RecordedEvent{} = unseen_event ->
        unseen_event =
          deserialize(state, unseen_event) |> set_event_number_from_version(stream_uuid)

        case PersistentSubscription.publish(subscription, unseen_event) do
          {:ok, subscription} -> publish_events(state, subscription)
          {:error, :no_subscriber_available} -> subscription
        end

      nil ->
        subscription
    end
  end

  defp stop_subscription(%State{} = state, subscription) do
    %State{name: name} = state

    supervisor_name = subscriptions_supervisor_name(name)

    DynamicSupervisor.terminate_child(supervisor_name, subscription)
  end

  defp ack_persistent_subscription_by_pid(%State{} = state, %RecordedEvent{} = event, pid) do
    %RecordedEvent{event_number: event_number} = event

    update_persistent_subscription(state, pid, fn %PersistentSubscription{} = subscription ->
      subscription = PersistentSubscription.ack(subscription, event_number)

      publish_events(state, subscription)
    end)
  end

  defp remove_persistent_subscription_by_pid(%State{} = state, pid) do
    update_persistent_subscription(state, pid, fn %PersistentSubscription{} = subscription ->
      subscription = PersistentSubscription.unsubscribe(subscription, pid)

      publish_events(state, subscription)
    end)
  end

  defp update_persistent_subscription(%State{} = state, pid, updater)
       when is_function(updater, 1) do
    %State{persistent_subscriptions: persistent_subscriptions} = state

    case find_persistent_subscription(persistent_subscriptions, pid) do
      {subscription_name, %PersistentSubscription{} = subscription} ->
        updated_subscription = updater.(subscription)

        persistent_subscriptions =
          Map.put(persistent_subscriptions, subscription_name, updated_subscription)

        %State{state | persistent_subscriptions: persistent_subscriptions}

      _ ->
        state
    end
  end

  defp find_persistent_subscription(persistent_subscriptions, pid) do
    Enum.find(persistent_subscriptions, fn {_name, %PersistentSubscription{} = subscription} ->
      PersistentSubscription.has_subscriber?(subscription, pid)
    end)
  end

  defp remove_transient_subscriber_by_pid(%State{} = state, pid) do
    %State{transient_subscribers: transient_subscribers} = state

    transient_subscribers =
      Enum.reduce(transient_subscribers, transient_subscribers, fn
        {stream_uuid, subscribers}, acc ->
          Map.put(acc, stream_uuid, List.delete(subscribers, pid))
      end)

    %State{state | transient_subscribers: transient_subscribers}
  end

  defp start_from(%State{} = state, %PersistentSubscription{} = subscription) do
    %State{persisted_events: persisted_events, streams: streams} = state
    %PersistentSubscription{start_from: start_from, stream_uuid: stream_uuid} = subscription

    case {start_from, stream_uuid} do
      {:current, :all} -> length(persisted_events)
      {:current, stream_uuid} -> Map.get(streams, stream_uuid, []) |> length()
      {:origin, _stream_uuid} -> 0
      {position, _stream_uuid} when is_integer(position) -> position
    end
  end

  defp publish_to_transient_subscribers(%State{} = state, stream_uuid, events) do
    %State{transient_subscribers: transient_subscribers} = state

    subscribers = Map.get(transient_subscribers, stream_uuid, [])

    for subscriber <- subscribers, &is_pid/1 do
      send(subscriber, {:events, events})
    end

    state
  end

  defp serialize(%State{serializer: nil}, data), do: data

  defp serialize(%State{} = state, %RecordedEvent{} = recorded_event) do
    %State{serializer: serializer} = state
    %RecordedEvent{data: data, metadata: metadata} = recorded_event

    %RecordedEvent{
      recorded_event
      | data: serializer.serialize(data),
        metadata: serializer.serialize(metadata)
    }
  end

  defp serialize(%State{} = state, %SnapshotData{} = snapshot) do
    %State{serializer: serializer} = state
    %SnapshotData{data: data, metadata: metadata} = snapshot

    %SnapshotData{
      snapshot
      | data: serializer.serialize(data),
        metadata: serializer.serialize(metadata)
    }
  end

  defp deserialize(%State{serializer: nil}, data), do: data

  defp deserialize(%State{} = state, %RecordedEvent{} = recorded_event) do
    %State{serializer: serializer} = state
    %RecordedEvent{data: data, metadata: metadata, event_type: event_type} = recorded_event

    %RecordedEvent{
      recorded_event
      | data: serializer.deserialize(data, type: event_type),
        metadata: serializer.deserialize(metadata)
    }
  end

  defp deserialize(%State{} = state, %SnapshotData{} = snapshot) do
    %State{serializer: serializer} = state
    %SnapshotData{data: data, metadata: metadata, source_type: source_type} = snapshot

    %SnapshotData{
      snapshot
      | data: serializer.deserialize(data, type: source_type),
        metadata: serializer.deserialize(metadata)
    }
  end

  defp parse_config(application, config) do
    case Keyword.get(config, :name) do
      nil ->
        name = Module.concat([application, EventStore])

        {name, Keyword.put(config, :name, name)}

      name when is_atom(name) ->
        {name, config}

      invalid ->
        raise ArgumentError,
          message:
            "expected :name option to be an atom but got: " <>
              inspect(invalid)
    end
  end

  defp event_store_name(adapter_meta) when is_map(adapter_meta),
    do: Map.get(adapter_meta, :name)

  defp subscriptions_supervisor_name(event_store),
    do: Module.concat([event_store, SubscriptionsSupervisor])
end