defmodule Actors.Actor.Entity.Invocation do
@moduledoc """
Handles Invocation functions for Actor Entity
All the public functions here assumes they are executing inside a GenServer
"""
require Logger
require OpenTelemetry.Tracer, as: Tracer
alias Actors.Actor.Entity.{EntityState, Lifecycle}
alias Actors.Exceptions.NotAuthorizedException
alias Actors.Actor.InvocationScheduler
alias Eigr.Functions.Protocol.Actors.{
Actor,
ActorId,
ActorSystem,
ActorState,
Action,
FixedTimerAction
}
alias Eigr.Functions.Protocol.{
ActorInvocation,
ActorInvocationResponse,
Broadcast,
Context,
Forward,
InvocationRequest,
Pipe,
SideEffect,
Workflow,
Noop
}
alias Actors.Actor.Pubsub
import Spawn.Utils.Common, only: [return_and_maybe_hibernate: 1]
@default_actions [
"get",
"Get",
"get_state",
"getState",
"GetState"
]
@default_init_actions [
"init",
"Init",
"setup",
"Setup"
]
@http_host_interface Actors.Actor.Interface.Http
def handle_timers([], _system, _actor), do: :ok
def handle_timers(timers, system, actor) when is_list(timers) do
invocations =
Enum.map(timers, fn %FixedTimerAction{action: %Action{name: action}, seconds: delay} ->
invocation_request = %InvocationRequest{
actor: actor,
action_name: action,
payload: {:noop, %Noop{}},
async: true,
scheduled_to: 0,
caller: actor.id,
system: %ActorSystem{name: system}
}
scheduled_to =
DateTime.utc_now()
|> DateTime.add(delay, :millisecond)
{invocation_request, scheduled_to, delay}
end)
InvocationScheduler.schedule_fixed_invocations(invocations)
:ok
catch
error -> Logger.error("Error on handle timers #{inspect(error)}")
end
def handle_timers(nil, _system, _actor), do: :ok
@doc """
Handles the initialization invocation for an Actor Entity.
## Parameters
- `state` (%EntityState{}): The current state of the Actor Entity.
## Returns
- `{:noreply, new_state}`: If the initialization is successful.
- `{:noreply, new_state}`: If the actor has not registered any actions, indicating a warning.
- `{:error, reason, new_state}`: If there is an error during the initialization, returns a tuple with the reason for the error and the updated entity state.
## Behavior
The `invoke_init/1` function is responsible for handling the initialization invocation of an Actor Entity. It checks if the actor has registered any actions and performs the following steps:
1. **Action Registration Check:** Checks if the actor has registered any actions. If not, it logs a warning and returns `{:noreply, new_state}`.
2. **Init Action Selection:** Filters the registered actions to find the initialization action (`init` or similar) and selects the first matching action.
3. **Interface Invocation:** Invokes the selected initialization action using the appropriate interface.
4. **Handling Initialization Result:** Processes the result of the initialization, updating the entity state accordingly.
## Example
```elixir
state = %EntityState{
system: %ActorSystem{name: "example_system"},
actor: %Actor{
id: %ActorId{name: "example_actor"},
actions: ["init", "perform_action"],
state: %{}
},
opts: %{}
}
case Actors.Actor.Entity.Invocation.invoke_init(state) do
{:noreply, new_state} ->
IO.puts("Initialization successful!")
{:error, reason, new_state} ->
IO.puts("Initialization failed. Reason: {reason}")
end
```
"""
def invoke_init(
%EntityState{
system: actor_system,
actor:
%Actor{
id: %ActorId{name: actor_name, parent: parent} = id,
state: actor_state,
actions: actions
} = _actor,
opts: actor_opts
} = state
) do
if length(actions) <= 0 do
Logger.warning("Actor [#{actor_name}] has not registered any Actions")
{:noreply, state}
|> return_and_maybe_hibernate()
else
init_action =
Enum.filter(actions, fn cmd -> Enum.member?(@default_init_actions, cmd.name) end)
|> Enum.at(0)
case init_action do
nil ->
{:noreply, state}
|> return_and_maybe_hibernate()
_ ->
interface = get_interface(actor_opts)
metadata = %{}
current_state = Map.get(actor_state || %{}, :state) || %ActorState{}
current_tags = Map.get(actor_state || %{}, :tags, %{})
%ActorInvocation{
actor: %ActorId{name: actor_name, system: actor_system, parent: parent},
action_name: init_action.name,
payload: {:noop, %Noop{}},
current_context: %Context{
metadata: metadata,
caller: id,
self: %ActorId{name: actor_name, system: actor_system},
state: current_state,
tags: current_tags
},
caller: id
}
|> interface.invoke_host(state, @default_actions)
|> case do
{:ok, _response, new_state} ->
{:noreply, new_state}
{:error, _reason, new_state} ->
{:noreply, new_state}
|> return_and_maybe_hibernate()
end
end
end
end
@doc """
Handles the invocation of actions on an Actor Entity.
## Parameters
- `invocation` (%InvocationRequest{}): A struct representing the invocation request, including details about the actor, action, and payload.
- `opts` (Keyword.t): Additional options for the invocation.
## Returns
- `{:reply, result, new_state}`: If the invocation is successful, returns a tuple containing the reply result, and the updated entity state.
- `{:noreply, new_state}`: If the invocation is successful, but there is no specific reply result.
- `{:noreply, new_state, opts}`: If the invocation is successful and includes additional options.
- `{:error, reason, new_state}`: If there is an error during the invocation, returns a tuple with the reason for the error and the updated entity state.
## Behavior
The `invoke/2` function is responsible for handling the invocation of actions on an Actor Entity.
It verifies authorization, finds the appropriate action to execute, and delegates the invocation to the corresponding interface.
The function performs the following steps:
1. **Authorization Check:** Checks if the invocation is authorized based on the actor's actions and timers.
2. **Span Context Handling:** Uses OpenTelemetry for tracing and creates a new span for the invocation.
3. **Find Request by Action:** Determines the appropriate interface and builds the request based on the action.
4. **Invocation Host Handling:** Delegates the invocation to the selected interface's `invoke_host/3` function.
5. **Handle Response:** Processes the response from the invocation, handles side effects, and updates the entity state.
6. **Checkpoint:** Optionally performs a checkpoint operation to record the state revision.
## Example
```elixir
invocation = %InvocationRequest{
actor: %Actor{id: %ActorId{name: "example_actor"}},
action_name: "perform_action",
payload: {:data, %{}},
caller: %ActorId{name: "caller_actor"}
}
opts = [span_ctx: OpenTelemetry.Ctx.new()]
case Actors.Actor.Entity.Invocation.invoke({invocation, opts}, entity_state) do
{:reply, result, new_state} ->
IO.puts("Invocation successful! Result: {result}")
{:noreply, new_state} ->
IO.puts("Invocation successful! No specific reply.")
{:error, reason, new_state} ->
IO.puts("Invocation failed. Reason: {reason}")
end
```
"""
def invoke(
{%InvocationRequest{
actor: %Actor{id: %ActorId{name: actor_name} = _id} = _actor,
action_name: action_name
} = invocation, opts},
%EntityState{
actor: %Actor{state: actor_state, actions: actions, timer_actions: timers},
opts: actor_opts
} = state
) do
if is_authorized?(invocation, actions, timers) do
all_opts = Keyword.merge(actor_opts, opts)
ctx = Keyword.get(opts, :span_ctx, OpenTelemetry.Ctx.new())
Tracer.with_span ctx, "#{actor_name} invocation handler", kind: :server do
case find_request_by_action(
invocation,
actor_state,
action_name,
actions,
timers,
all_opts
) do
{true, interface, request} ->
Tracer.with_span "invoke-host" do
handle_invocation(interface, request, state, all_opts)
end
{false, _} ->
handle_not_found_action(action_name, actor_name, state)
end
end
else
raise NotAuthorizedException
end
end
defp is_authorized?(invocation, actions, timers) do
acl_manager = get_acl_manager()
acl_manager.get_policies!()
|> acl_manager.is_authorized?(invocation) and
length(actions ++ timers) > 0
end
defp find_request_by_action(invocation, actor_state, action, actions, timers, actor_opts) do
all_actions = actions ++ Enum.map(timers, & &1.action)
case member_action?(action, all_actions) do
true ->
interface = get_interface(actor_opts)
request = build_request(invocation, actor_state, actor_opts)
{true, interface, request}
false ->
{false, nil}
end
end
defp member_action?(action, actions) do
Enum.member?(@default_actions, action) or Enum.any?(actions, &(&1.name == action))
end
defp handle_invocation(interface, request, state, opts) do
Tracer.with_span "invoke-host" do
case interface.invoke_host(request, state, @default_actions) do
{:ok, response, new_state} ->
handle_response(request, response, new_state, opts)
{:error, reason, new_state} ->
{:reply, {:error, reason}, new_state} |> return_and_maybe_hibernate()
end
end
end
defp handle_not_found_action(action, actor_name, state) do
Logger.warning("Action [#{action}] not found for Actor [#{actor_name}]")
{:reply,
{:error, :action_not_found, "Action [#{action}] not found for Actor [#{actor_name}]"}, state,
:hibernate}
end
defp build_request(
%InvocationRequest{
actor:
%Actor{
id: %ActorId{} = id
} = _actor,
metadata: metadata,
action_name: action,
payload: payload,
caller: caller
},
actor_state,
_opts
) do
metadata = if is_nil(metadata), do: %{}, else: metadata
current_state = Map.get(actor_state || %{}, :state)
current_tags = Map.get(actor_state || %{}, :tags, %{})
# TODO: Validate state before invoke
%ActorInvocation{
actor: id,
action_name: action,
payload: payload,
current_context: %Context{
caller: caller,
self: id,
state: current_state,
metadata: metadata,
tags: current_tags
},
caller: caller
}
end
defp handle_response(
request,
%ActorInvocationResponse{checkpoint: checkpoint} = response,
%EntityState{
revision: revision
} = state,
opts
) do
response =
case do_response(request, response, state, opts) do
:noreply ->
{:noreply, state}
|> return_and_maybe_hibernate()
response ->
{:reply, {:ok, response}, state}
|> return_and_maybe_hibernate()
end
response_checkpoint(response, checkpoint, revision, state)
end
defp response_checkpoint(response, checkpoint, revision, state) do
if checkpoint do
Lifecycle.checkpoint(revision, state)
else
response
end
end
defp do_response(
_request,
%ActorInvocationResponse{workflow: workflow} = response,
_state,
_opts
)
when is_nil(workflow) or workflow == %{} do
response
end
defp do_response(request, response, state, opts) do
do_run_workflow(request, response, state, opts)
end
defp do_run_workflow(
_request,
%ActorInvocationResponse{workflow: workflow} = response,
_state,
_opts
)
when is_nil(workflow) or workflow == %{} do
response
end
defp do_run_workflow(
request,
%ActorInvocationResponse{
workflow: %Workflow{broadcast: broadcast, effects: effects} = _workflow
} = response,
_state,
opts
) do
Tracer.with_span "run-workflow" do
do_side_effects(effects, opts)
do_broadcast(request, broadcast, opts)
do_handle_routing(request, response, opts)
end
end
defp do_handle_routing(
_request,
%ActorInvocationResponse{
workflow: %Workflow{routing: routing} = _workflow
} = response,
_opts
)
when is_nil(routing),
do: response
defp do_handle_routing(
%ActorInvocation{
actor: %ActorId{name: caller_actor_name, system: system_name}
},
%ActorInvocationResponse{
payload: payload,
workflow:
%Workflow{
routing: {:pipe, %Pipe{actor: actor_name, action_name: cmd} = _pipe} = _workflow
} = response
},
opts
) do
from_pid = Keyword.get(opts, :from_pid)
dispatch_routing_to_caller(from_pid, fn ->
Tracer.with_span "run-pipe-routing" do
invocation = %InvocationRequest{
system: %ActorSystem{name: system_name},
actor: %Actor{id: %ActorId{name: actor_name, system: system_name}},
action_name: cmd,
payload: payload,
caller: %ActorId{name: caller_actor_name, system: system_name}
}
try do
case Actors.invoke(invocation,
span_ctx: OpenTelemetry.Tracer.current_span_ctx()
) do
{:ok, response} ->
{:ok, response}
error ->
error
end
catch
error ->
Logger.warning(
"Error during Pipe request to Actor #{system_name}:#{actor_name}. Error: #{inspect(error)}"
)
{:ok, response}
end
end
end)
end
defp do_handle_routing(
%ActorInvocation{
actor: %ActorId{name: caller_actor_name, system: system_name},
payload: payload
} = _request,
%ActorInvocationResponse{
workflow:
%Workflow{
routing:
{:forward, %Forward{actor: actor_name, action_name: cmd} = _pipe} = _workflow
} = response
},
opts
) do
from_pid = Keyword.get(opts, :from_pid)
dispatch_routing_to_caller(from_pid, fn ->
Tracer.with_span "run-forward-routing" do
invocation = %InvocationRequest{
system: %ActorSystem{name: system_name},
actor: %Actor{id: %ActorId{name: actor_name, system: system_name}},
action_name: cmd,
payload: payload,
caller: %ActorId{name: caller_actor_name, system: system_name}
}
try do
case Actors.invoke(invocation,
span_ctx: OpenTelemetry.Tracer.current_span_ctx()
) do
{:ok, response} ->
{:ok, response}
error ->
error
end
catch
error ->
Logger.warning(
"Error during Forward request to Actor #{system_name}:#{actor_name}. Error: #{inspect(error)}"
)
{:ok, response}
end
end
end)
end
def do_broadcast(_request, broadcast, _opts \\ [])
def do_broadcast(_request, broadcast, _opts)
when is_nil(broadcast) or broadcast == %{} do
:ok
end
def do_broadcast(
request,
%Broadcast{channel_group: channel, payload: payload} = _broadcast,
_opts
) do
Tracer.with_span "run-broadcast" do
Tracer.add_event("publish", [{"channel", channel}])
Pubsub.publish(channel, payload, request)
end
end
defp dispatch_routing_to_caller(from, callback)
when is_function(callback) and is_nil(from),
do: callback.()
defp dispatch_routing_to_caller(from, callback) when is_function(callback) do
spawn(fn -> GenServer.reply(from, callback.()) end)
:noreply
end
def do_side_effects(effects, opts \\ [])
def do_side_effects(effects, _opts) when effects == [] do
:ok
end
def do_side_effects(effects, _opts) when is_list(effects) do
Tracer.with_span "handle-side-effects" do
try do
spawn(fn ->
effects
|> Flow.from_enumerable(min_demand: 1, max_demand: System.schedulers_online())
|> Flow.map(fn %SideEffect{
request:
%InvocationRequest{
actor: %Actor{id: %ActorId{name: actor_name} = _id} = _actor,
system: %ActorSystem{name: system_name}
} = invocation
} ->
try do
Actors.invoke(invocation, span_ctx: Tracer.current_span_ctx())
catch
error ->
Logger.warning(
"Error during Side Effect request to Actor #{system_name}:#{actor_name}. Error: #{inspect(error)}"
)
:ok
end
end)
|> Flow.run()
end)
catch
error ->
Logger.warning("Error during Side Effect request. Error: #{inspect(error)}")
:ok
end
end
end
defp get_interface(opts), do: Keyword.get(opts, :interface, @http_host_interface)
defp get_acl_manager(),
do: Application.get_env(:spawn, :acl_manager, Actors.Security.Acl.DefaultAclManager)
end