lib/message_store.ex

defmodule MessageStore do
  @moduledoc """
  A module for interactions with message store
  """

  alias EventStore.RecordedEvent

  defguard is_conn(conn) when is_atom(conn) or is_pid(conn)

  defmacro __using__(opts) do
    quote bind_quoted: [opts: opts] do
      use EventStore, opts

      def fetch(stream_name, projection, opts \\ [])
          when is_binary(stream_name) and is_atom(projection) and is_list(opts) do
        {conn, opts} = parse_fetch_options(opts)

        MessageStore.fetch(conn, stream_name, projection, opts)
      end

      def expected_version(stream_uuid) when is_binary(stream_uuid) do
        MessageStore.expected_version(__MODULE__, stream_uuid)
      end

      defp parse_fetch_options(opts) do
        opts
        |> parse_opts()
        |> put_read_function(opts)
        |> put_project_function(opts)
      end

      defp put_read_function({conn, base}, opts) do
        read = Keyword.get(opts, :read, &MessageStore.Stream.read/3)

        {conn, Keyword.put(base, :read, read)}
      end

      defp put_project_function({conn, base}, opts) do
        project = Keyword.get(opts, :project, &MessageStore.project/2)

        {conn, Keyword.put(base, :project, project)}
      end
    end
  end

  @spec fetch(
          conn,
          String.t(),
          m,
          read: (conn, String.t(), list() -> Result.t(reason, [RecordedEvent.t()])),
          project: ([RecordedEvent.t()], m -> projection)
        ) :: Result.t(reason, projection)
        when conn: module(), m: module(), reason: term(), projection: any()
  def fetch(conn, stream_name, projection, opts)
      when is_conn(conn) and is_binary(stream_name) and is_atom(projection) and is_list(opts) do
    {read, opts} = Keyword.pop!(opts, :read)
    {project, opts} = Keyword.pop!(opts, :project)

    conn
    |> read.(stream_name, opts)
    |> Result.catch_error(:stream_not_found, fn _ -> {:ok, []} end)
    |> Result.map(&project.(&1, projection))
  end

  @spec project([RecordedEvent.t()], module()) :: any()
  def project(messages, projection) when is_list(messages) and is_atom(projection) do
    Enum.reduce(
      messages,
      projection.init(),
      &projection.handle_message/2
    )
  end

  @spec to_result(:ok | {:error, err}, value) :: Result.t(err, value)
        when err: any(), value: any()
  def to_result(:ok, value) do
    {:ok, value}
  end

  def to_result({:error, _err} = error, _value) do
    error
  end

  @spec category(String.t() | nil) :: String.t()
  def category(nil) do
    ""
  end

  def category(stream_name) when is_binary(stream_name) do
    stream_name
    |> String.split("-")
    |> List.first()
  end

  @spec stream_name_to_id(String.t()) :: String.t() | nil
  def stream_name_to_id(stream_name) when is_binary(stream_name) do
    stream_name
    |> String.split("-", parts: 2)
    |> maybe_id()
  end

  @spec expected_version(atom(), String.t()) :: Result.t(term(), integer())
  def expected_version(message_store, stream_uuid)
      when is_atom(message_store) and is_binary(stream_uuid) do
    stream_uuid
    |> message_store.stream_forward()
    |> stream_length()
  end

  defp stream_length({:error, _} = error) do
    error
  end

  defp stream_length(events) do
    events
    |> Enum.to_list()
    |> length()
    |> Result.ok()
  end

  defp maybe_id([_, ""]) do
    nil
  end

  defp maybe_id([_, id]) do
    id
  end

  defp maybe_id(_) do
    nil
  end
end