defmodule Ravix.Documents.Session do
@moduledoc """
A stateful session to execute RavenDB commands
"""
use GenServer
require OK
require Logger
alias Ravix.Documents.Session.State, as: SessionState
alias Ravix.Documents.Session.Manager, as: SessionManager
alias Ravix.Documents.Session.Supervisor, as: SessionSupervisor
def init(session_state) do
{:ok, session_state, {:continue, :session_ttl_checker}}
end
@spec start_link(any, SessionState.t()) :: :ignore | {:error, any} | {:ok, pid}
def start_link(_attr, %SessionState{} = initial_state) do
GenServer.start_link(
__MODULE__,
initial_state,
name: session_id(initial_state.session_id)
)
end
@doc """
Loads the document from the database to the local session
## Parameters
- session_id: the session_id
- ids: the document ids to be loaded
- includes: the document includes path
- opts: load options
## Returns
- `{:ok, results}`
- `{:errors, cause}`
## Examples
iex> Session.load(session_id, "entity_id")
{:ok,
%{
"Includes" => %{},
"Results" => [
%{
"@metadata" => %{
"@change-vector" => "A:6450-HJrwf2z3c0G/FHJPm3zK3w",
"@id" => "f13ffb17-ed7d-43b6-a483-23993db70958",
"@last-modified" => "2022-04-23T11:14:16.4277047Z"
},
"cat_name" => "Coco",
"id" => "f13ffb17-ed7d-43b6-a483-23993db70958"
}
],
"already_loaded_ids" => []
}}
"""
@spec load(binary(), list() | bitstring(), any, keyword() | nil) :: any
def load(session_id, ids, includes \\ nil, opts \\ nil)
def load(_session_id, nil, _includes, _opts), do: {:error, :document_ids_not_informed}
def load(session_id, ids, includes, opts) when is_list(ids) do
session_id
|> session_id()
|> GenServer.call({:load, [document_ids: ids, includes: includes, opts: opts]})
end
def load(session_id, id, includes, opts) do
session_id
|> session_id()
|> GenServer.call({:load, [document_ids: [id], includes: includes, opts: opts]})
end
@doc """
Marks the document for deletion
## Parameters
- session_id: the session id
- entity/entity_id: the document to be deleted
## Returns
- `{:ok, Ravix.Documents.Session.State}`
- `{:error, cause}`
"""
@spec delete(binary, map() | binary()) :: any
def delete(session_id, entity) when is_map_key(entity, :id) do
delete(session_id, entity.id)
end
def delete(session_id, id) when is_binary(id) do
session_id
|> session_id()
|> GenServer.call({:delete, id})
end
@doc """
Add a document to the session to be created
## Parameters
- session_id: the session id
- entity: the document to store
- key: the document key to be used
- change_vector: the concurrency change vector
- opts: [
upsert: If the document is already loaded in the session, it should be upserted
]
## Returns
- `{:ok, Ravix.Documents.Session.State}`
- `{:error, cause}`
"""
@spec store(binary(), map(), binary() | nil, binary() | nil, keyword()) :: any
def store(session_id, entity, key \\ nil, change_vector \\ nil, opts \\ [])
def store(_session_id, entity, _key, _change_vector, _opts) when entity == nil,
do: {:error, :null_entity}
def store(session_id, entity, key, change_vector, opts) do
session_id
|> session_id()
|> GenServer.call(
{:store, [entity: entity, key: key, change_vector: change_vector, opts: opts]}
)
end
@doc """
Persists the session changes to the RavenDB database
Returns a [RavenDB batch response](https://ravendb.net/docs/article-page/5.3/csharp/client-api/rest-api/document-commands/batch-commands#response-format)
## Examples
iex> Session.save_changes(session_id)
{:ok,
%{
"Results" => [
%{
"ChangeVector" => nil,
"Deleted" => true,
"Id" => "3421125e-416a-4bce-bb56-56cb4a7991ae",
"Type" => "DELETE"
}
]
}}
"""
@spec save_changes(binary) :: any
def save_changes(session_id) do
session_id
|> session_id()
|> GenServer.call({:save_changes})
end
@doc """
Fetches the current session state
Returns a `{:ok, Ravix.Documents.Session.State}`
"""
@spec fetch_state(binary()) :: {:error, :session_not_found} | {:ok, SessionState.t()}
def fetch_state(session_id) do
try do
{:ok,
session_id
|> session_id()
|> :sys.get_state()}
catch
:exit, _ -> {:error, :session_not_found}
end
end
@doc """
Executes a query into the RavenDB
## Paremeters
- query: The `Ravix.RQL.Query` to be executed
- session_id: the session_id
- method: The http method
Returns a RavenDB query response
"""
@spec execute_query(any, binary, any) :: any
def execute_query(query, session_id, method) do
session_id
|> session_id()
|> GenServer.call({:execute_query, query, method}, :infinity)
end
@doc """
Executes a query in RavenDB and stream the response
## Paremeters
- query: The `Ravix.RQL.Query` to be executed
- session_id: the session_id
- method: The http method
Returns a enumerable with the "Results" field of the RavenDB Query Response
"""
@spec stream_query(any, binary) :: Enumerable.t()
def stream_query(query, session_id) do
session_id
|> session_id()
|> GenServer.call({:stream_query, query, "GET"})
end
@spec session_id(String.t()) :: {:via, Registry, {:sessions, String.t()}}
defp session_id(id) when id != nil, do: {:via, Registry, {:sessions, id}}
####################
# Handlers #
####################
def handle_call(
{:load, [document_ids: ids, includes: includes, opts: opts]},
_from,
%SessionState{} = state
) do
case SessionManager.load_documents(state, ids, includes, opts) do
{:ok, result} -> {:reply, {:ok, result[:response]}, result[:updated_state]}
err -> {:reply, err, state}
end
end
def handle_call(
{:store, [entity: entity, key: key, change_vector: change_vector, opts: opts]},
_from,
%SessionState{} = state
)
when key != nil do
OK.try do
[entity, updated_state] <-
SessionManager.store_entity(state, entity, key, change_vector, opts)
after
{:reply, {:ok, entity}, updated_state}
rescue
err -> {:reply, {:error, err}, state}
end
end
def handle_call(
{:store, [entity: entity, key: _, change_vector: change_vector, opts: opts]},
_from,
%SessionState{} = state
)
when is_map_key(entity, :id) do
OK.try do
[entity, updated_state] <-
SessionManager.store_entity(state, entity, entity.id, change_vector, opts)
after
{:reply, {:ok, entity}, updated_state}
rescue
err -> {:reply, {:error, err}, state}
end
end
def handle_call(
{:store, [entity: _, key: _, change_vector: _, opts: _]},
_from,
%SessionState{} = state
),
do: {:reply, {:error, :no_valid_id_informed}, state}
def handle_call({:save_changes}, _from, %SessionState{} = state) do
case SessionManager.save_changes(state) do
{:ok, response} -> {:reply, {:ok, response[:result]}, response[:updated_state]}
{:error, err} -> {:reply, {:error, err}, state}
end
end
def handle_call({:delete, id}, _from, %SessionState{} = state) do
case SessionManager.delete_document(state, id) do
{:ok, updated_state} -> {:reply, {:ok, updated_state}, updated_state}
{:error, err} -> {:reply, {:error, err}, state}
end
end
def handle_call({:execute_query, query, method}, from, %SessionState{} = state) do
reference = make_ref()
self_pid = self()
Task.start(fn ->
response = SessionManager.execute_query(state, query, method)
GenServer.cast(self_pid, {:query_processed, reference, response})
end)
{:noreply,
%SessionState{state | running_queries: Map.put(state.running_queries, reference, from)}
|> SessionState.update_last_session_call()}
end
def handle_call({:stream_query, query, method}, _from, %SessionState{} = state) do
{
:reply,
SessionManager.stream_query(state, query, method),
state |> SessionState.update_last_session_call()
}
end
def handle_cast({:query_processed, reference, response}, %SessionState{} = state) do
{from, remaining_queries} = Map.pop(state.running_queries, reference)
GenServer.reply(from, response)
{:noreply,
%SessionState{state | running_queries: remaining_queries}
|> SessionState.update_last_session_call()}
end
def handle_info(:check_session, %SessionState{} = state) do
self_pid = self()
Task.start(fn ->
if Timex.diff(Timex.now(), state.last_session_call, :seconds) >
state.conventions.session_idle_ttl do
Logger.warn(
"[RAVIX] The session #{state.session_id} timed-out because it was inactive for more than #{inspect(state.conventions.session_idle_ttl)} seconds"
)
SessionSupervisor.close_session(state.store, self_pid)
end
end)
{:noreply, state}
end
def handle_continue(:session_ttl_checker, %SessionState{} = state) do
Process.send_after(self(), :check_session, 5000)
{:noreply, state}
end
end