lib/actors.ex

defmodule Actors do
  @moduledoc """
  Documentation for `Actors`.
  """
  require Logger
  use Retry

  alias Actors.Actor.Entity, as: ActorEntity
  alias Actors.Actor.Entity.Supervisor, as: ActorEntitySupervisor

  alias Actors.Registry.{ActorRegistry, HostActor}

  alias Eigr.Functions.Protocol.Actors.{Actor, ActorId, ActorSettings, ActorSystem, Registry}

  alias Eigr.Functions.Protocol.{
    InvocationRequest,
    ProxyInfo,
    RegistrationRequest,
    RegistrationResponse,
    RequestStatus,
    ServiceInfo,
    SpawnRequest,
    SpawnResponse
  }

  @activate_actors_min_demand 0
  @activate_actors_max_demand 4

  @erpc_timeout 5_000

  @spec get_state(String.t(), String.t()) :: {:ok, term()} | {:error, term()}
  def get_state(system_name, actor_name) do
    do_lookup_action(system_name, actor_name, nil, fn actor_ref ->
      ActorEntity.get_state(actor_ref)
    end)
  end

  @spec register(RegistrationRequest.t(), any()) :: {:ok, RegistrationResponse.t()}
  def register(registration, opts \\ [])

  def register(
        %RegistrationRequest{
          service_info: %ServiceInfo{} = _service_info,
          actor_system:
            %ActorSystem{name: _name, registry: %Registry{actors: actors} = _registry} =
              actor_system
        } = _registration,
        opts
      ) do
    hosts =
      Enum.map(Map.values(actors), fn actor ->
        %HostActor{node: Node.self(), actor: actor, opts: opts}
      end)

    :ok = ActorRegistry.register(hosts)

    spawn(fn ->
      create_actors(actor_system, actors, opts)
    end)

    proxy_info =
      ProxyInfo.new(
        protocol_major_version: 1,
        protocol_minor_version: 2,
        proxy_name: "spawn",
        proxy_version: "0.1.0"
      )

    status = RequestStatus.new(status: :OK, message: "Accepted")
    {:ok, RegistrationResponse.new(proxy_info: proxy_info, status: status)}
  end

  @spec spawn_actor(SpawnRequest.t(), any()) :: {:ok, SpawnResponse.t()}
  def spawn_actor(registration, opts \\ [])

  def spawn_actor(
        %SpawnRequest{
          actor_system:
            %ActorSystem{name: _name, registry: %Registry{actors: actors} = _registry} =
              _actor_system
        } = _registration,
        opts
      ) do
    hosts =
      Enum.map(Map.values(actors), fn actor ->
        %HostActor{node: Node.self(), actor: actor, opts: opts}
      end)

    :ok = ActorRegistry.register(hosts)

    status = RequestStatus.new(status: :OK, message: "Accepted")
    {:ok, SpawnResponse.new(status: status)}
  end

  @spec invoke(%InvocationRequest{}) :: {:ok, :async} | {:ok, term()} | {:error, term()}
  def invoke(
        %InvocationRequest{
          actor: %Actor{} = actor,
          system: %ActorSystem{} = system,
          async: async?
        } = request,
        opts \\ []
      ) do
    retry with: exponential_backoff() |> randomize |> expiry(10_000),
          atoms: [:error, :exit, :noproc, :erpc, :noconnection],
          rescue_only: [ErlangError] do
      do_lookup_action(system.name, actor.id.name, system, fn actor_ref ->
        maybe_invoke_async(async?, actor_ref, request, opts)
      end)
    after
      result -> result
    else
      error -> error
    end
  end

  defp do_lookup_action(system_name, actor_name, system, action_fun) do
    case Spawn.Cluster.Node.Registry.lookup(Actors.Actor.Entity, actor_name) do
      [{actor_ref, _}] ->
        Logger.debug("Lookup Actor #{actor_name}. PID: #{inspect(actor_ref)}")

        action_fun.(actor_ref)

      _ ->
        with {:ok, %HostActor{node: node, actor: actor, opts: opts}} <-
               ActorRegistry.lookup(system_name, actor_name),
             {:ok, actor_ref} =
               :erpc.call(
                 node,
                 __MODULE__,
                 :try_reactivate_actor,
                 [system, actor, opts],
                 @erpc_timeout
               ) do
          action_fun.(actor_ref)
        else
          {:not_found, _} ->
            Logger.error("Actor #{actor_name} not found on ActorSystem #{system_name}")
            {:error, "Actor #{actor_name} not found on ActorSystem #{system_name}"}

          {:erpc, :timeout} ->
            Logger.error(
              "Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}: Node connection timeout"
            )

            {:error, "Node connection timeout"}

          {:error, reason} ->
            Logger.error(
              "Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}: #{inspect(reason)}"
            )

            {:error, reason}

          _ ->
            Logger.error("Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}")
            {:error, "Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}"}
        end
    end
  end

  defp maybe_invoke_async(true, actor_ref, request, opts) do
    ActorEntity.invoke_async(actor_ref, request, opts)

    {:ok, :async}
  end

  defp maybe_invoke_async(false, actor_ref, request, opts) do
    ActorEntity.invoke(actor_ref, request, opts)
  end

  @spec try_reactivate_actor(ActorSystem.t(), Actor.t(), any()) :: {:ok, any()} | {:error, any()}
  def try_reactivate_actor(system, actor, opts \\ [])

  def try_reactivate_actor(
        %ActorSystem{} = system,
        %Actor{id: %ActorId{name: name} = _id} = actor,
        opts
      ) do
    case ActorEntitySupervisor.lookup_or_create_actor(system, actor, opts) do
      {:ok, actor_ref} ->
        Logger.debug("Actor #{name} reactivated. ActorRef PID: #{inspect(actor_ref)}")
        {:ok, actor_ref}

      reason ->
        Logger.error("Failed to reactivate actor #{name}: #{inspect(reason)}")
        {:error, reason}
    end
  end

  # To lookup all actors
  def try_reactivate_actor(nil, %Actor{id: %ActorId{name: name} = _id} = actor, opts) do
    case ActorEntitySupervisor.lookup_or_create_actor(nil, actor, opts) do
      {:ok, actor_ref} ->
        Logger.debug("Actor #{name} reactivated. ActorRef PID: #{inspect(actor_ref)}")
        {:ok, actor_ref}

      reason ->
        Logger.error("Failed to reactivate actor #{name}: #{inspect(reason)}")
        {:error, reason}
    end
  end

  defp create_actors(actor_system, actors, opts) when is_map(actors) do
    actors
    |> Flow.from_enumerable(
      min_demand: @activate_actors_min_demand,
      max_demand: @activate_actors_max_demand
    )
    |> Flow.filter(fn {_actor_name,
                       %Actor{
                         settings: %ActorSettings{persistent: persistent, abstract: abstract}
                       } = _actor} ->
      is_boolean(persistent) and
        match?(true, persistent) and
        match?(false, abstract)
    end)
    |> Flow.map(fn {actor_name, actor} ->
      Logger.debug("Registering #{actor_name} #{inspect(actor)} on Node: #{inspect(Node.self())}")

      {time, result} = :timer.tc(&lookup_actor/4, [actor_system, actor_name, actor, opts])

      Logger.info(
        "Registered and Activated the #{actor_name} on Node #{inspect(Node.self())} in #{inspect(time)}ms"
      )

      result
    end)
    |> Flow.run()
  end

  @spec lookup_actor(ActorSystem.t(), String.t(), Actor.t(), any()) ::
          {:ok, pid()} | {:error, String.t()}
  defp lookup_actor(actor_system, actor_name, actor, opts) do
    case ActorEntitySupervisor.lookup_or_create_actor(actor_system, actor, opts) do
      {:ok, pid} ->
        {:ok, pid}

      _ ->
        Logger.debug("Failed to register Actor #{actor_name}")
        {:error, "Failed to register Actor #{actor_name}"}
    end
  end
end