lib/eventize/persistence/event_store.ex

defmodule Eventize.Persistence.EventStore do
  @moduledoc """
  EventStore is a `GenServer` process used to store
  events for `Eventize.EventSourcedProcess` instances.
  """

  defmodule EventData do
    @moduledoc """
    Represents a event with payload, meta data and a sequence number.
    """

    @type t :: %__MODULE__{
            payload: term(),
            meta_data: map(),
            sequence_number: non_neg_integer()
          }

    @enforce_keys [:payload, :meta_data, :sequence_number]

    defstruct [:payload, :meta_data, :sequence_number]
  end

  defmodule SnapshotData do
    @moduledoc """
    Represents a snapshot with payload, meta data and version.
    """

    @type t :: %__MODULE__{
            payload: term(),
            meta_data: map(),
            version: non_neg_integer()
          }

    @enforce_keys [:payload, :meta_data, :version]

    defstruct [:payload, :meta_data, :version]
  end

  @typedoc """
  A map containing all data needed to get events from the event store.
  """
  @type load_events_query :: %{
          stream_name: String.t(),
          start: :start | non_neg_integer(),
          max_count: :all | non_neg_integer()
        }

  @typedoc """
  A map containing all data needed to append events to the event store.
  """
  @type append_events_command :: %{
          stream_name: String.t(),
          events: list({term(), map()}),
          expected_version: :any | :empty | non_neg_integer()
        }

  @typedoc """
  A map containing all data needed to delete events from the event store.
  """
  @type delete_events_command :: %{stream_name: String.t(), version: non_neg_integer() | :all}

  @typedoc """
  A map containing all data needed to load a snapshot from the event store.
  """
  @type load_snapshot_query :: %{
          stream_name: String.t(),
          max_version: :max | non_neg_integer()
        }

  @typedoc """
  A map containing all data needed to append a snapshot to the event store.
  """
  @type append_snapshot_command :: %{
          stream_name: String.t(),
          snapshot: {term(), map()},
          version: non_neg_integer()
        }

  @typedoc """
  A map containing all data needed to delete snapshots from the event store.
  """
  @type delete_snapshots_command :: %{stream_name: String.t(), version: non_neg_integer() | :all}

  @typedoc """
  Represents the response that a caller will receive when reading or appending events.
  """
  @type events_response :: {:ok, list(EventData), :empty | non_neg_integer()} | {:error, term()}

  @typedoc """
  Represents the response that a caller will receive when reading or appending a snapshot.
  """
  @type snapshot_response :: {:ok, %SnapshotData{} | nil} | {:error, term()}

  @typedoc """
  Represents the response a caller will receive when deleting events or snapshots.
  """
  @type delete_response :: :ok | {:error, term()}

  @type event_bus :: %{
          load_events:
            (String.t(), :start | non_neg_integer(), :all | non_neg_integer() ->
               events_response()),
          append_events:
            (String.t(), list({term(), map()}), :any | :empty | non_neg_integer() ->
               events_response()),
          delete_events: (String.t(), non_neg_integer() -> delete_response()),
          load_snapshot: (String.t(), :max | non_neg_integer() -> snapshot_response()),
          append_snapshot:
            (String.t(), {term(), map()}, non_neg_integer() ->
               snapshot_response()),
          delete_snapshots: (String.t(), non_neg_integer() -> delete_response())
        }

  @callback load_events(
              load_events_query(),
              GenServer.from(),
              term()
            ) ::
              {:reply, events_response(), term()}
              | {:reply, events_response(), term(),
                 timeout() | :hibernate | {:continue, continue_arg :: term()}}
              | {:stop, term(), term(), term()}
              | {:stop, term(), term()}

  @callback append_events(append_events_command(), GenServer.from(), term()) ::
              {:reply, events_response(), term()}
              | {:reply, events_response(), term(),
                 timeout() | :hibernate | {:continue, continue_arg :: term()}}
              | {:stop, term(), term(), term()}
              | {:stop, term(), term()}

  @callback delete_events(delete_events_command(), GenServer.from(), term()) ::
              {:reply, delete_response(), term()}
              | {:reply, delete_response(), term(),
                 timeout() | :hibernate | {:continue, continue_arg :: term()}}
              | {:stop, term(), term(), term()}
              | {:stop, term(), term()}

  @callback load_snapshot(load_snapshot_query(), GenServer.from(), term()) ::
              {:reply, snapshot_response(), term()}
              | {:reply, snapshot_response(), term(),
                 timeout() | :hibernate | {:continue, continue_arg :: term()}}
              | {:stop, term(), term(), term()}
              | {:stop, term(), term()}

  @callback append_snapshot(append_snapshot_command(), GenServer.from(), term()) ::
              {:reply, snapshot_response(), term()}
              | {:reply, snapshot_response(), term(),
                 timeout() | :hibernate | {:continue, continue_arg :: term()}}
              | {:stop, term(), term(), term()}
              | {:stop, term(), term()}

  @callback delete_snapshots(
              delete_snapshots_command(),
              GenServer.from(),
              term()
            ) ::
              {:reply, delete_response(), term()}
              | {:reply, delete_response(), term(),
                 timeout() | :hibernate | {:continue, continue_arg :: term()}}
              | {:stop, term(), term(), term()}
              | {:stop, term(), term()}

  defguardp is_event_bus(event_bus)
            when is_map(event_bus) and is_map_key(event_bus, :load_events) and
                   is_map_key(event_bus, :append_events) and
                   is_map_key(event_bus, :delete_events) and
                   is_map_key(event_bus, :load_snapshot) and
                   is_map_key(event_bus, :append_snapshot) and
                   is_map_key(event_bus, :delete_snapshots) and
                   is_function(event_bus.load_events, 3) and
                   is_function(event_bus.append_events, 3) and
                   is_function(event_bus.delete_events, 2) and
                   is_function(event_bus.load_snapshot, 2) and
                   is_function(event_bus.append_snapshot, 4) and
                   is_function(event_bus.delete_snapshots, 2)

  @spec parse_event_bus(any) :: event_bus()
  def parse_event_bus(bus) do
    case bus do
      nil ->
        Eventize.Persistence.NilEventBus.get()

      eb when is_event_bus(eb) ->
        eb

      pid ->
        %{
          load_events: fn stream_name, start, max_count ->
            GenServer.call(
              pid,
              {:load_events,
               %{
                 stream_name: stream_name,
                 start: start,
                 max_count: max_count
               }}
            )
          end,
          append_events: fn stream_name, events, expected_version ->
            GenServer.call(
              pid,
              {:append_events,
               %{
                 stream_name: stream_name,
                 events: events,
                 expected_version: expected_version
               }}
            )
          end,
          delete_events: fn stream_name, version ->
            GenServer.call(
              pid,
              {:delete_events, %{stream_name: stream_name, version: version}}
            )
          end,
          load_snapshot: fn stream_name, max_version ->
            GenServer.call(
              pid,
              {:load_snapshot, %{stream_name: stream_name, max_version: max_version}}
            )
          end,
          append_snapshot: fn stream_name, snapshot, version ->
            GenServer.call(
              pid,
              {:append_snapshot,
               %{
                 stream_name: stream_name,
                 snapshot: snapshot,
                 version: version
               }}
            )
          end,
          delete_snapshots: fn stream_name, version ->
            GenServer.call(
              pid,
              {:delete_snapshots, %{stream_name: stream_name, version: version}}
            )
          end
        }
    end
  end

  defmacro __using__(_) do
    quote do
      use GenServer

      @behaviour Eventize.Persistence.EventStore

      alias Eventize.Persistence.EventStore.EventData

      def handle_call({:load_events, query}, from, state),
        do: load_events(query, from, state)

      def handle_call({:append_events, cmd}, from, state),
        do: append_events(cmd, from, state)

      def handle_call({:delete_events, cmd}, from, state),
        do: delete_events(cmd, from, state)

      def handle_call({:load_snapshot, query}, from, state),
        do: load_snapshot(query, from, state)

      def handle_call({:append_snapshot, cmd}, from, state),
        do: append_snapshot(cmd, from, state)

      def handle_call({:delete_snapshots, cmd}, from, state),
        do: delete_snapshots(cmd, from, state)
    end
  end
end