lib/documents/session/session_manager.ex

defmodule Ravix.Documents.Session.Manager do
  @moduledoc """
  Functions to manage session changes
  """
  require OK

  alias Ravix.Connection.Executor.RavenResponse
  alias Ravix.Documents.Session.State, as: SessionState
  alias Ravix.Documents.Session.{SaveChangesData, Validations}

  alias Ravix.Documents.Commands.{
    BatchCommand,
    GetDocumentsCommand,
    ExecuteQueryCommand,
    ExecuteStreamQueryCommand
  }

  alias Ravix.Documents.Metadata
  alias Ravix.Connection
  alias Ravix.Connection.State, as: ConnectionState
  alias Ravix.Connection.RequestExecutor
  alias Ravix.RQL.Query

  @spec load_documents(SessionState.t(), list, any, any) ::
          {:error, any} | {:ok, [{any, any}, ...]}
  def load_documents(%SessionState{} = state, document_ids, includes, nil),
    do: load_documents(state, document_ids, includes, [])

  def load_documents(%SessionState{} = state, document_ids, includes, opts) do
    OK.try do
      _ <- Validations.load_documents_limit_reached(state, document_ids)
      already_loaded_ids = fetch_loaded_documents(state, document_ids)
      ids_to_load <- Validations.all_ids_are_not_already_loaded(document_ids, already_loaded_ids)
      network_state <- Connection.fetch_state(state.store)
      response <- execute_load_request(network_state, ids_to_load, includes, opts)
      parsed_response = GetDocumentsCommand.parse_response(state, response)
      updated_state = SessionState.update_session(state, parsed_response[:results])
      updated_state = SessionState.update_session(updated_state, parsed_response[:includes])
      updated_state = SessionState.update_last_session_call(updated_state)
    after
      {:ok,
       [
         response: Map.put(response, "already_loaded_ids", already_loaded_ids),
         updated_state: updated_state
       ]}
    rescue
      :all_ids_already_loaded ->
        {:ok,
         [
           response:
             Map.new()
             |> Map.put("Results", [])
             |> Map.put("Includes", [])
             |> Map.put("already_loaded_ids", document_ids),
           updated_state: state
         ]}

      err ->
        {:error, err}
    end
  end

  @spec store_entity(SessionState.t(), map, any, String.t(), keyword()) ::
          {:error, any} | {:ok, [...]}
  def store_entity(%SessionState{} = state, entity, key, change_vector, opts)
      when is_struct(entity) do
    entity
    |> do_store_entity(key, change_vector, state, opts)
  end

  def store_entity(%SessionState{} = state, entity, key, change_vector, opts)
      when is_map(entity) do
    entity
    |> Morphix.atomorphiform!()
    |> do_store_entity(key, change_vector, state, opts)
  end

  defp do_store_entity(entity, key, change_vector, %SessionState{} = state, opts) do
    OK.try do
      _ <- Validations.session_request_limit_reached(state)

      _ <-
        case Keyword.get(opts, :upsert, true) do
          false -> Validations.document_not_stored(state, key)
          true -> {:ok, nil}
        end

      change_vector =
        case state.conventions.use_optimistic_concurrency do
          true -> change_vector
          false -> nil
        end

      local_key <- ensure_key(key)
      metadata = Metadata.build_default_metadata(entity)
      entity = Metadata.add_metadata(entity, metadata)
      original_document = Map.get(state.documents_by_id, local_key)

      updated_state <-
        state
        |> SessionState.increment_request_count()
        |> SessionState.update_last_session_call()
        |> SessionState.register_document(local_key, entity, change_vector, original_document)
    after
      {:ok, [entity, updated_state]}
    rescue
      err -> {:error, err}
    end
  end

  @spec save_changes(SessionState.t()) :: {:error, any} | {:ok, keyword()}
  def save_changes(%SessionState{} = state) do
    OK.for do
      network_state <- Connection.fetch_state(state.store)
      result <- execute_save_request(state, network_state)

      parsed_updates =
        BatchCommand.parse_batch_response(
          result[:request_response]["Results"],
          result[:updated_state]
        )

      updated_session =
        SessionState.update_session(
          result[:updated_state],
          parsed_updates
        )
    after
      {:ok, [result: result[:request_response], updated_state: updated_session]}
    end
  end

  @spec delete_document(SessionState.t(), bitstring()) ::
          {:error, atom()} | {:ok, SessionState.t()}
  def delete_document(%SessionState{} = state, document_id) do
    OK.for do
      updated_state <-
        state
        |> SessionState.increment_request_count()
        |> SessionState.update_last_session_call()
        |> SessionState.mark_document_for_exclusion(document_id)
    after
      updated_state
    end
  end

  @spec execute_query(SessionState.t(), Query.t(), any) ::
          {:error, any} | {:ok, map()}
  def execute_query(%SessionState{} = session_state, %Query{} = query, method) do
    with {:ok, network_state} <- Connection.fetch_state(session_state.store),
         command <- %ExecuteQueryCommand{
           Query: query.query_string,
           QueryParameters: query.query_params,
           method: method
         },
         caching_plan <- query_caching_plan(network_state, command) do
      execute_with_caching_plan(caching_plan, command, network_state)
    end
  end

  defp query_caching_plan(
         %ConnectionState{
           conventions: %{
             caching: %{
               enable_agressive_cache: true,
               cache: cache
             }
           }
         },
         %ExecuteQueryCommand{} = command
       ) do
    cache_key =
      command
      |> ExecuteQueryCommand.hash_query()

    case cache_key |> cache.get() do
      nil -> {:execute_and_cache, cache_key}
      cached_response -> {:return_from_cache_if_matched, cache_key, cached_response}
    end
  end

  defp query_caching_plan(_, _), do: :no_cache

  defp execute_with_caching_plan(:no_cache, command, network_state) do
    case RequestExecutor.execute(command, network_state) do
      {:ok, response} -> {:ok, response.body}
      err -> err
    end
  end

  defp execute_with_caching_plan({:execute_and_cache, cache_key}, command, network_state) do
    with {:ok, response} <- RequestExecutor.execute(command, network_state),
         {:ok, etag} <- RavenResponse.response_etag(response) do
      cache = network_state.conventions.caching.cache
      cache_duration = network_state.conventions.caching.cache_duration
      cache.put(cache_key, [etag: etag, cached_response: response], ttl: cache_duration)
      {:ok, response.body}
    else
      {:no_etag, response} -> {:ok, response.body}
      err -> err
    end
  end

  defp execute_with_caching_plan(
         {
           :return_from_cache_if_matched,
           cache_key,
           etag: cached_etag, cached_response: cached_response
         },
         command,
         network_state
       ) do
    headers = [{"If-None-Match", cached_etag}]

    case RequestExecutor.execute(command, network_state, headers) do
      {:ok, response} ->
        if response.status_code == 304 do
          {:ok, cached_response.body}
        else
          {:ok, etag} = RavenResponse.response_etag(response)
          cache = network_state.conventions.caching.cache
          cache_duration = network_state.conventions.caching.cache_duration
          cache.put(cache_key, [etag: etag, cached_response: response], ttl: cache_duration)
          {:ok, response.body}
        end

      err ->
        err
    end
  end

  @spec stream_query(SessionState.t(), Query.t(), any) :: {:error, any} | {:ok, Enumerable.t()}
  def stream_query(%SessionState{} = session_state, %Query{} = query, "GET") do
    OK.for do
      network_state <- Connection.fetch_state(session_state.store)

      command = %ExecuteStreamQueryCommand{
        Query: query.query_string,
        QueryParameters: query.query_params,
        method: :get,
        is_stream: true
      }

      stream <- RequestExecutor.execute(command, network_state)
    after
      stream.body
    end
  end

  defp fetch_loaded_documents(%SessionState{} = state, document_ids) do
    document_ids
    |> Enum.map(fn id ->
      case Validations.document_not_stored(state, id) do
        {:ok, _} -> nil
        {:error, {:document_already_stored, stored_document}} -> stored_document.key
      end
    end)
    |> Enum.reject(&is_nil/1)
  end

  defp execute_load_request(%ConnectionState{} = network_state, ids, includes, opts)
       when is_list(ids) do
    start = Keyword.get(opts, :start)
    page_size = Keyword.get(opts, :page_size)
    metadata_only = Keyword.get(opts, :metadata_only)

    case RequestExecutor.execute(
           %GetDocumentsCommand{
             ids: ids,
             includes: includes,
             start: start,
             page_size: page_size,
             metadata_only: metadata_only
           },
           network_state
         ) do
      {:ok, response} -> {:ok, response.body}
      {:error, err} -> {:error, err}
    end
  end

  defp execute_save_request(%SessionState{} = state, %ConnectionState{} = network_state) do
    OK.for do
      data_to_save =
        %SaveChangesData{}
        |> SaveChangesData.add_deferred_commands(state.defer_commands)
        |> SaveChangesData.add_delete_commands(state.deleted_entities)
        |> SaveChangesData.add_put_commands(state.documents_by_id)

      response <-
        %BatchCommand{Commands: data_to_save.commands}
        |> RequestExecutor.execute(network_state)

      updated_state =
        state
        |> SessionState.update_last_session_call()
        |> SessionState.clear_deferred_commands()
        |> SessionState.clear_deleted_entities()
        |> SessionState.clear_tmp_keys()
    after
      [request_response: response.body, updated_state: updated_state]
    end
  end

  defp ensure_key(nil), do: {:error, :no_valid_id_informed}

  defp ensure_key(key) when is_bitstring(key) do
    key =
      case String.last(key) do
        "/" -> "tmp_" <> key <> UUID.uuid4()
        "|" -> "tmp_" <> key <> UUID.uuid4()
        _ -> key
      end

    {:ok, key}
  end

  defp ensure_key(key), do: {:ok, key}
end