lib/message_store/stream.ex

defmodule MessageStore.Stream do
  @moduledoc """
  A stream operations.
  """

  alias EventStore.RecordedEvent
  alias EventStore.Storage.Reader

  import MessageStore, only: [is_conn: 1]

  require Logger

  def read(conn, stream_name, opts \\ [])
      when is_conn(conn) and is_binary(stream_name) and is_list(opts) do
    schema = Keyword.get(opts, :schema, "public")
    serializer = Keyword.fetch!(opts, :serializer)

    conn
    |> Postgrex.query(query_read_stream(schema), [stream_name], opts)
    |> normalize_postgrex_result()
    |> Result.map(&Reader.EventAdapter.to_event_data/1)
    |> Result.catch_all_errors(&failed_to_read(&1, stream_name))
    |> Result.map(&deserialize_recorded_events(&1, serializer))
  end

  def query_read_stream(schema) do
    """
    with event_ids as (
      select event_id
      from #{schema}.stream_events join #{schema}.streams using(stream_id)
      where #{schema}.streams.stream_uuid = $1
    )
    select se.stream_version,
           e.event_id,
           $1 as stream_uuid,
           se.original_stream_version,
           e.event_type,
           e.correlation_id,
           e.causation_id,
           e.data,
           e.metadata,
           e.created_at
    from event_ids
      join #{schema}.stream_events as se using(event_id)
      join #{schema}.events as e using(event_id)
    where se.stream_id=0
    order by se.stream_version asc;
    """
  end

  defp normalize_postgrex_result({:ok, %Postgrex.Result{num_rows: 0}}), do: {:ok, []}
  defp normalize_postgrex_result({:ok, %Postgrex.Result{rows: rows}}), do: {:ok, rows}

  defp normalize_postgrex_result({:error, %Postgrex.Error{postgres: %{message: message}}}) do
    Logger.warn("Failed to read events from stream due to: " <> inspect(message))

    {:error, message}
  end

  defp normalize_postgrex_result({:error, %DBConnection.ConnectionError{message: message}}) do
    Logger.warn("Failed to read events from stream due to: " <> inspect(message))

    {:error, message}
  end

  defp normalize_postgrex_result({:error, error} = reply) do
    Logger.warn("Failed to read events from stream due to: " <> inspect(error))

    reply
  end

  defp failed_to_read(reason, stream_name) do
    Logger.warn(fn ->
      "Failed to read events from stream #{stream_name} due to: #{inspect(reason)}"
    end)

    {:error, reason}
  end

  defp deserialize_recorded_events(recorded_events, serializer) do
    Enum.map(recorded_events, &RecordedEvent.deserialize(&1, serializer))
  end
end