if Code.ensure_loaded?(Statestores.Supervisor) do
defmodule Actors.Actor.StateManager do
@moduledoc """
`StateManager` Implements behavior that allows an Actor's state to be saved
to persistent storage using database drivers.
"""
require Logger
alias Eigr.Functions.Protocol.Actors.{ActorId, ActorState}
alias Google.Protobuf.Any
alias Statestores.Schemas.Snapshot
alias Statestores.Manager.StateManager, as: StateStoreManager
def is_new?(_old_hash, new_state) when is_nil(new_state), do: false
def is_new?(old_hash, new_state) do
with bytes_from_state <- Any.encode(new_state),
hash <- :crypto.hash(:sha256, bytes_from_state) do
old_hash != hash
else
_ ->
false
end
catch
_kind, error ->
{:error, error}
end
@spec load(ActorId.t()) :: {:ok, any}
def load(%ActorId{} = actor_id) do
key = generate_key(actor_id)
case StateStoreManager.load(key) do
%Snapshot{
status: status,
node: node,
revision: rev,
tags: tags,
data_type: type,
data: data
} = _event ->
revision = if is_nil(rev), do: 0, else: rev
{:ok, %ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}},
revision, status, node}
_ ->
{:not_found, %{}, 0}
end
catch
_kind, error ->
{:error, error}
end
@spec load(ActorId.t(), number()) :: {:ok, any}
def load(%ActorId{} = actor_id, revision) do
key = generate_key(actor_id)
case StateStoreManager.load(key, revision) do
%Snapshot{
status: status,
node: node,
revision: rev,
tags: tags,
data_type: type,
data: data
} = _event ->
revision = if is_nil(rev), do: 0, else: rev
{:ok, %ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}},
revision, status, node}
_ ->
{:not_found, %{}, 0}
end
catch
_kind, error ->
{:error, error}
end
@spec load_all(ActorId.t()) :: {:ok, term()} | :not_found | {:error, term()}
def load_all(%ActorId{} = actor_id) do
key = generate_key(actor_id)
snapshots = StateStoreManager.load_all(key)
results =
Enum.map(snapshots, fn %Snapshot{
status: status,
node: node,
revision: rev,
tags: tags,
data_type: type,
data: data
} = _event ->
revision = if is_nil(rev), do: 0, else: rev
{%ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}},
revision, status, node}
end)
if Enum.empty?(results) do
:not_found
else
{:ok, results}
end
catch
_kind, error ->
{:error, error}
end
@spec load_by_interval(ActorId.t(), String.t(), String.t()) ::
{:ok, term()} | :not_found | {:error, term()}
def load_by_interval(%ActorId{} = actor_id, time_start, time_end) do
key = generate_key(actor_id)
snapshots = StateStoreManager.load_by_interval(key, time_start, time_end)
results =
Enum.map(snapshots, fn %Snapshot{
status: status,
node: node,
revision: rev,
tags: tags,
data_type: type,
data: data
} = _event ->
revision = if is_nil(rev), do: 0, else: rev
{%ActorState{tags: tags, state: %Google.Protobuf.Any{type_url: type, value: data}},
revision, status, node}
end)
if Enum.empty?(results) do
:not_found
else
{:ok, results}
end
catch
_kind, error ->
{:error, error}
end
@spec save(ActorId.t(), Eigr.Functions.Protocol.Actors.ActorState.t(), Keyword.t()) ::
{:ok, Eigr.Functions.Protocol.Actors.ActorState.t()}
| {:error, any(), Eigr.Functions.Protocol.Actors.ActorState.t()}
def save(_actor_id, nil, _opts), do: {:ok, nil}
def save(_actor_id, %ActorState{state: actor_state} = _state, _opts)
when is_nil(actor_state) or actor_state == %{},
do: {:ok, actor_state}
def save(
%ActorId{name: name, system: system} = actor_id,
%ActorState{tags: tags, state: actor_state} = _state,
opts
) do
Logger.debug("Saving state for actor #{name}")
revision = Keyword.get(opts, :revision, 0)
status = Keyword.get(opts, :status, "ACTIVATED")
node = Keyword.get(opts, :node, Atom.to_string(Node.self()))
with bytes_from_state <- Any.encode(actor_state),
hash <- :crypto.hash(:sha256, bytes_from_state),
key <- generate_key(actor_id) do
%Snapshot{
id: key,
actor: name,
system: system,
status: status,
node: node,
revision: revision,
tags: tags,
data_type: actor_state.type_url,
data: actor_state.value
}
|> StateStoreManager.save()
|> case do
{:ok, _event} ->
{:ok, actor_state, hash}
{:error, changeset} ->
{:error, changeset, actor_state, hash}
other ->
{:error, other, actor_state}
end
end
catch
_kind, error ->
{:error, error, actor_state}
end
@spec save_async(
ActorId.t(),
Eigr.Functions.Protocol.Actors.ActorState.t(),
Keyword.t()
) ::
{:ok, Eigr.Functions.Protocol.Actors.ActorState.t()}
| {:error, any(), Eigr.Functions.Protocol.Actors.ActorState.t()}
def save_async(actor_id, state, opts \\ [])
def save_async(_actor_id, nil, _opts), do: {:ok, %{}}
def save_async(_actor_id, %ActorState{state: actor_state} = _state, _opts)
when is_nil(actor_state) or actor_state == %{},
do: {:ok, actor_state}
def save_async(
%ActorId{name: name, system: system} = actor_id,
%ActorState{tags: tags, state: actor_state} = _state,
opts
) do
parent = self()
revision = Keyword.get(opts, :revision, 0)
timeout = Keyword.get(opts, :timeout, 5000)
status = Keyword.get(opts, :status, "ACTIVATED")
node = Keyword.get(opts, :node, Atom.to_string(Node.self()))
persist_data_task =
Task.async(fn ->
Logger.debug("Saving state for actor #{name}")
key = generate_key(actor_id)
%Snapshot{
id: key,
actor: name,
system: system,
status: status,
node: node,
revision: revision,
tags: tags,
data_type: actor_state.type_url,
data: actor_state.value
}
|> StateStoreManager.save()
end)
try do
res = Task.await(persist_data_task, timeout)
with bytes_from_state <- Any.encode(actor_state),
hash <- :crypto.hash(:sha256, bytes_from_state) do
if inserted_successfully?(parent, persist_data_task.pid) do
case res do
{:ok, _event} ->
{:ok, actor_state, hash}
{:error, changeset} ->
{:error, changeset, actor_state, hash}
other ->
{:error, other, actor_state}
end
else
{:error, :unsuccessfully, hash}
end
end
catch
_kind, error ->
Task.shutdown(persist_data_task, :brutal_kill)
{:error, error, actor_state}
end
end
defp generate_key(id), do: :erlang.phash2(id)
defp inserted_successfully?(ref, pid) do
receive do
{^ref, :ok} -> true
{^ref, _} -> false
{:EXIT, ^pid, _} -> false
end
end
end
else
defmodule Actors.Actor.StateManager do
@moduledoc false
@not_loaded_message """
Statestores not loaded properly
If you are creating actors with flag `persistent: true` consider adding :spawn_statestores to your deps list
"""
def is_new?(_old_hash, _new_state), do: raise(@not_loaded_message)
def load(_actor_id), do: raise(@not_loaded_message)
def load(_actor_id, _), do: raise(@not_loaded_message)
def load_all(_), do: raise(@not_loaded_message)
def load_by_interval(_, _, _), do: raise(@not_loaded_message)
def save(_actor_id, _state), do: raise(@not_loaded_message)
def save(_actor_id, _state, _opts), do: raise(@not_loaded_message)
def save_async(_actor_id, _state, _timeout), do: raise(@not_loaded_message)
end
end