if Code.ensure_loaded?(Statestores.Supervisor) do
defmodule Actors.Actor.StateManager do
@behaviour Actors.Actor.StateManager.Behaviour
require Logger
alias Eigr.Functions.Protocol.Actors.ActorState
alias Google.Protobuf.Any
alias Statestores.Schemas.Event
alias Statestores.Manager.StateManager, as: StateStoreManager
@impl true
def is_new?(_old_hash, new_state) when is_nil(new_state), do: false
def is_new?(old_hash, new_state) do
with bytes_from_state <- Any.encode(new_state),
hash <- :crypto.hash(:sha256, bytes_from_state) do
old_hash != hash
else
_ ->
false
end
catch
_kind, error ->
{:error, error}
end
@impl true
@spec load(String.t()) :: {:ok, any}
def load(name) do
case StateStoreManager.load(name) do
%Event{revision: _rev, tags: tags, data_type: type, data: data} = _event ->
{:ok,
ActorState.new(tags: tags, state: Google.Protobuf.Any.new(type_url: type, value: data))}
_ ->
{:not_found, %{}}
end
catch
_kind, error ->
{:error, error}
end
@impl true
@spec save(String.t(), Eigr.Functions.Protocol.Actors.ActorState.t()) ::
{:ok, Eigr.Functions.Protocol.Actors.ActorState.t()}
| {:error, any(), Eigr.Functions.Protocol.Actors.ActorState.t()}
def save(_name, nil), do: {:ok, nil}
def save(_name, %ActorState{state: actor_state} = _state)
when is_nil(actor_state) or actor_state == %{},
do: {:ok, actor_state}
def save(name, %ActorState{tags: tags, state: actor_state} = _state) do
Logger.debug("Saving state for actor #{name}")
with bytes_from_state <- Any.encode(actor_state),
hash <- :crypto.hash(:sha256, bytes_from_state) do
%Event{
actor: name,
revision: 0,
tags: tags,
data_type: actor_state.type_url,
data: actor_state.value
}
|> StateStoreManager.save()
|> case do
{:ok, _event} ->
{:ok, actor_state, hash}
{:error, changeset} ->
{:error, changeset, actor_state, hash}
other ->
{:error, other, actor_state}
end
end
catch
_kind, error ->
{:error, error, actor_state}
end
@impl true
@spec save_async(String.t(), Eigr.Functions.Protocol.Actors.ActorState.t()) ::
{:ok, Eigr.Functions.Protocol.Actors.ActorState.t()}
| {:error, any(), Eigr.Functions.Protocol.Actors.ActorState.t()}
def save_async(name, state, timeout \\ 5000)
def save_async(_name, nil, _timeout), do: {:ok, %{}}
def save_async(_name, %ActorState{state: actor_state} = _state, _timeout)
when is_nil(actor_state) or actor_state == %{},
do: {:ok, actor_state}
def save_async(name, %ActorState{tags: tags, state: actor_state} = _state, timeout) do
parent = self()
persist_data_task =
Task.async(fn ->
Logger.debug("Saving state for actor #{name}")
%Event{
actor: name,
revision: 0,
tags: tags,
data_type: actor_state.type_url,
data: actor_state.value
}
|> StateStoreManager.save()
end)
try do
res = Task.await(persist_data_task, timeout)
with bytes_from_state <- Any.encode(actor_state),
hash <- :crypto.hash(:sha256, bytes_from_state) do
if inserted_successfully?(parent, persist_data_task.pid) do
case res do
{:ok, _event} ->
{:ok, actor_state, hash}
{:error, changeset} ->
{:error, changeset, actor_state, hash}
other ->
{:error, other, actor_state}
end
else
{:error, :unsuccessfully, hash}
end
end
catch
_kind, error ->
Task.shutdown(persist_data_task, :brutal_kill)
{:error, error, actor_state}
end
end
defp inserted_successfully?(ref, pid) do
receive do
{^ref, :ok} -> true
{^ref, _} -> false
{:EXIT, ^pid, _} -> false
end
end
end
else
defmodule Actors.Actor.StateManager do
@behaviour Actors.Actor.StateManager.Behaviour
@not_loaded_message """
Statestores not loaded properly
If you are creating actors with flag `persistent: true` consider adding :spawn_statestores to your deps list
"""
def is_new?(_old_hash, _new_state), do: raise(@not_loaded_message)
def load(_key), do: raise(@not_loaded_message)
def save(_name, _state), do: raise(@not_loaded_message)
def save_async(_name, _state, _timeout), do: raise(@not_loaded_message)
end
end