lib/actors/actor/caller_consumer.ex

defmodule Actors.Actor.CallerConsumer do
  @moduledoc """
  An Elixir module representing a GenStage consumer responsible for handling
  events initiated by `CallerProducer` and interacting with actors in the system.
  """
  use GenStage
  use Retry

  require Logger
  require OpenTelemetry.Tracer, as: Tracer

  alias Actors.Actor.CallerProducer
  alias Actors.Config.PersistentTermConfig, as: Config
  alias Actors.Actor.Entity, as: ActorEntity
  alias Actors.Actor.Entity.Supervisor, as: ActorEntitySupervisor
  alias Actors.Actor.InvocationScheduler
  alias Actors.Actor.Pool, as: ActorPool

  alias Actors.Registry.{ActorRegistry, HostActor}

  alias Eigr.Functions.Protocol.Actors.{
    Actor,
    ActorId,
    Metadata,
    ActorSettings,
    ActorSystem,
    Registry
  }

  alias Eigr.Functions.Protocol.{
    InvocationRequest,
    ProxyInfo,
    RegistrationRequest,
    RegistrationResponse,
    RequestStatus,
    ServiceInfo,
    SpawnRequest,
    SpawnResponse
  }

  alias Sidecar.Measurements

  import Spawn.Utils.Common, only: [to_existing_atom_or_new: 1]

  @activate_actors_min_demand 0
  @activate_actors_max_demand 4

  @erpc_timeout 5_000

  @doc """
  Starts the consumer process and subscribes to the `CallerProducer` GenStage.
  """
  def start_link(opts \\ []) do
    id = Keyword.get(opts, :id, 1)
    GenStage.start_link(__MODULE__, opts, name: Module.concat(__MODULE__, "#{id}"))
  end

  @doc """
  Initializes the GenStage consumer.

  It subscribes to the `CallerProducer` GenStage with specified backpressure values.
  """
  @impl true
  def init(opts) do
    {min_demand, max_demand} = get_backpressure_values_allowed(opts)

    {:consumer, :ok,
     subscribe_to: [
       {CallerProducer, min_demand: min_demand, max_demand: max_demand}
     ]}
  end

  defp get_backpressure_values_allowed(opts) do
    index = Keyword.get(opts, :id, 0)
    actual_max_demand = Config.get(:actors_global_backpressure_max_demand)
    actual_min_demand = Config.get(:actors_global_backpressure_min_demand)
    backpressure_options = {actual_min_demand, actual_max_demand}

    Logger.debug(
      "Initialize Actor Event Consumer ID: #{index}. With Backpressure options: #{inspect(backpressure_options)}"
    )

    backpressure_options
  end

  @doc """
  Handles incoming events from the `CallerProducer` GenStage.

  Dispatches events to the appropriate functions for further processing.
  """
  @impl true
  def handle_events(events, _from, state) do
    if length(events) > 1,
      do: Logger.debug("Flushing the Event buffer. Buffer Size: #{inspect(length(events))}")

    Enum.each(events, &dispatch_to_actor/1)

    {:noreply, [], state}
  end

  defp dispatch_to_actor({from, {:register, event, opts}} = _producer_event) do
    reply_to_producer(from, register(event, opts))
  end

  defp dispatch_to_actor({from, {:get_state, event, _opts}} = _producer_event) do
    reply_to_producer(from, get_state(event))
  end

  defp dispatch_to_actor({from, {:spawn_actor, event, opts}} = _producer_event) do
    reply_to_producer(from, spawn_actor(event, opts))
  end

  defp dispatch_to_actor({from, {:invoke, request, opts}} = _producer_event) do
    if request.register_ref != "" and not is_nil(request.register_ref) do
      spawn_req = %SpawnRequest{
        actors: [%ActorId{request.actor.id | parent: request.register_ref}]
      }

      spawn_actor(spawn_req, opts)
    end

    reply_to_producer(from, invoke_with_span(request, opts))
  end

  defp reply_to_producer(:fake_from, response), do: response

  defp reply_to_producer(from, response) do
    GenStage.reply(from, response)
  end

  @doc """
  Registers actors in the system based on the provided registration request.

  Handles registration requests and ensures actors are properly registered.
  """
  def register(
        %RegistrationRequest{
          service_info: %ServiceInfo{} = _service_info,
          actor_system:
            %ActorSystem{name: _name, registry: %Registry{actors: actors} = _registry} =
              actor_system
        } = _registration,
        opts
      ) do
    if Sidecar.GracefulShutdown.running?() do
      actors
      |> Map.values()
      |> Enum.map(fn actor -> ActorPool.create_actor_host_pool(actor, opts) end)
      |> List.flatten()
      |> Enum.filter(&(&1.node == Node.self()))
      |> ActorRegistry.register()
      |> tap(fn _sts -> warmup_actors(actor_system, actors, opts) end)
      |> case do
        :ok ->
          status = %RequestStatus{status: :OK, message: "Accepted"}
          {:ok, %RegistrationResponse{proxy_info: get_proxy_info(), status: status}}

        _ ->
          status = %RequestStatus{
            status: :ERROR,
            message: "Failed to register one or more Actors"
          }

          {:error, %RegistrationResponse{proxy_info: get_proxy_info(), status: status}}
      end
    else
      status = %RequestStatus{
        status: :ERROR,
        message: "You can't register actors when node is stopping"
      }

      {:error, %RegistrationResponse{proxy_info: get_proxy_info(), status: status}}
    end
  end

  defp get_proxy_info() do
    %ProxyInfo{
      protocol_major_version: 1,
      protocol_minor_version: 2,
      proxy_name: "spawn",
      proxy_version: "1.0.0-rc.35"
    }
  end

  @doc """
  Gets the state of the specified actor.

  This function attempts to retrieve the state of the actor identified by the given
  `ActorId`. It uses an exponential backoff strategy for retrying in case of errors
  and logs any failures.

  ## Parameters

  - `id` (%ActorId): The unique identifier of the actor.

  ## Returns

  The state of the actor if successful, otherwise an error is raised.

  ## Retry Strategy

  The function utilizes an exponential backoff strategy with randomized delays and
  a maximum expiry time of 30,000 milliseconds.

  ## Errors

  The function handles errors such as `:error`, `:exit`, `:noproc`, `:erpc`,
  `:noconnection`, and `:timeout`. It also rescues `ErlangError` exceptions and logs
  detailed error messages.

  """
  def get_state(%ActorId{name: actor_name, system: system_name} = id) do
    retry with: exponential_backoff() |> randomize |> expiry(30_000),
          atoms: [:error, :exit, :noproc, :erpc, :noconnection, :timeout],
          rescue_only: [ErlangError] do
      try do
        do_lookup_action(
          system_name,
          {false, system_name, actor_name, id},
          nil,
          fn actor_ref, _actor_ref_id ->
            ActorEntity.get_state(actor_ref)
          end
        )
      rescue
        e ->
          Logger.error("Failure to make a call to actor #{inspect(actor_name)} #{inspect(e)}")

          reraise e, __STACKTRACE__
      end
    after
      result -> result
    else
      error -> error
    end
  end

  @doc """
  Spawns an actor or a group of actors based on the provided `SpawnRequest`.

  This function is responsible for spawning actors based on the specified `SpawnRequest`.
  It retrieves the hosts associated with the provided actor IDs and registers the actors.
  Additionally, it handles cases where the system is in the process of draining or stopping.

  ## Parameters

  - `spawn` (%SpawnRequest): The request containing information about the actors to spawn.
  - `opts` (Keyword.t): Additional options for spawning the actors. Defaults to an empty keyword list.

  ## Returns

  If successful, it returns `{:ok, %SpawnResponse{status: %RequestStatus{status: :OK, message: "Accepted"}}}`.
  Otherwise, an error is raised.

  ## Actor Spawning Process

  - Retrieves actor hosts based on actor IDs from the `ActorRegistry`.
  - Filters the hosts based on the system's graceful shutdown status.
  - Registers the selected hosts in the `ActorRegistry`.
  - Returns a success response.

  ## Errors

  - Raises an `ArgumentError` if attempting to spawn an unnamed actor that has not been registered before.

  """
  def spawn_actor(spawn, opts \\ [])

  def spawn_actor(%SpawnRequest{actors: actors} = _spawn, opts) do
    hosts =
      Enum.map(actors, fn %ActorId{} = id ->
        case ActorRegistry.get_hosts_by_actor(id, parent: true) do
          {:ok, actor_hosts} ->
            to_spawn_hosts(id, actor_hosts, opts)
            |> then(fn hosts ->
              if Sidecar.GracefulShutdown.get_status() in [:draining, :stopping] do
                Enum.reject(hosts, &(&1.node == Node.self()))
              else
                hosts
              end
            end)

          error ->
            raise ArgumentError,
                  "You are trying to create an actor from an Unamed actor that has never been registered before. ActorId: #{inspect(id)}. Details. #{inspect(error)}"
        end
      end)
      |> List.flatten()
      |> Enum.filter(&(&1.node == Node.self()))

    ActorRegistry.register(hosts)

    status = %RequestStatus{status: :OK, message: "Accepted"}
    {:ok, %SpawnResponse{status: status}}
  end

  @doc """
  Invokes an actor action with distributed tracing using OpenTelemetry.

  This function performs an actor action invocation, incorporating distributed tracing
  with OpenTelemetry. It sets up the tracing context, adds relevant attributes,
  and handles asynchronous and synchronous invocations.

  ## Parameters

  - `request` (%InvocationRequest): The request containing information about the invocation.
  - `opts` (Keyword.t): Additional options for the invocation. Defaults to an empty keyword list.

  ## Returns

  A tuple containing the status and the result of the invocation.
  If the invocation is asynchronous, it returns `{:ok, :async}`.

  ## Tracing Context

  The function sets up the tracing context and adds attributes related to the invocation.
  It uses OpenTelemetry to trace the client invoke with the kind set to `:client`.

  ## Retry Mechanism

  The function incorporates a retry mechanism with backoff, randomization, and timeout
  to handle potential errors during the invocation.

  ## Error Handling

  In case of errors during the invocation, appropriate logging and tracing events are added,
  and the error is re-raised with a stack trace.

  """
  def invoke_with_span(
        %InvocationRequest{
          actor: %Actor{id: %ActorId{name: _name, system: _actor_id_system} = actor_id} = actor,
          system: %ActorSystem{} = system,
          action_name: action_name,
          async: async?,
          metadata: metadata,
          caller: caller,
          pooled: pooled?
        } = request,
        opts
      ) do
    {time, result} =
      :timer.tc(fn ->
        metadata_attributes =
          for {key, value} <- metadata,
              do: {to_existing_atom_or_new(key), value}

        metadata_attributes =
          metadata_attributes ++
            [
              {:async, async?},
              {"from", get_caller(caller)},
              {"target", actor_id.name}
            ]

        {_current, opts} =
          Keyword.get_and_update(opts, :span_ctx, fn span_ctx ->
            maybe_include_span(span_ctx)
          end)

        Tracer.with_span opts[:span_ctx], "client invoke", kind: :client do
          Tracer.set_attributes(metadata_attributes)

          # Instead of using Map.get/3, which performs a lookup twice, we use pattern matching
          timeout =
            case metadata["request-timeout"] do
              nil -> 10_000
              value -> value
            end

          retry_while with: exponential_backoff() |> randomize |> expiry(timeout) do
            try do
              Tracer.add_event("lookup", [{"target", actor.id.name}])

              actor_fqdn =
                if pooled? do
                  case ActorRegistry.get_hosts_by_actor(actor_id) do
                    {:ok, actor_hosts} ->
                      # Here the results are shuffled using Enum.shuffle/1 to introduce randomness.
                      # Then, the first shuffled result is chosen as the random choice.
                      # This approach is more efficient than choosing randomly from a complete list

                      # Shuffle the results to introduce randomness
                      shuffled_actor_hosts = Enum.shuffle(actor_hosts)

                      # Choose the first result (which is now a random result)
                      host = hd(shuffled_actor_hosts)

                      {pooled?, system.name, host.actor.id.parent, actor_id}

                    _ ->
                      fqdn =
                        {pooled?, system.name, "#{actor.id.name}-1",
                         %ActorId{actor_id | name: "#{actor.id.name}-1", parent: actor_id.name}}

                      fqdn
                  end
                else
                  {pooled?, system.name, actor_id.name, actor_id}
                end

              do_lookup_action(system.name, actor_fqdn, system, fn actor_ref, actor_ref_id ->
                %InvocationRequest{
                  actor: %Actor{} = actor
                } = request

                request_params = %InvocationRequest{
                  request
                  | actor: %Actor{actor | id: actor_ref_id}
                }

                if is_nil(request.scheduled_to) || request.scheduled_to == 0 do
                  maybe_invoke_async(async?, actor_ref, request_params, opts)
                else
                  InvocationScheduler.schedule_invoke(request_params)

                  {:ok, :async}
                end
              end)
            rescue
              e ->
                Logger.error(
                  "Failure to make a call to actor #{inspect(actor.id.name)} #{inspect(e)}"
                )

                reraise e, __STACKTRACE__
            end
            |> case do
              result = :error ->
                {:cont, result}

              result = {:error, msg} ->
                {:cont, result}

              result = {:error, :action_not_found, msg} ->
                {:halt, result}

              result ->
                {:halt, result}
            end
          end
        end
      end)

    Measurements.dispatch_invoke_duration(system.name, actor.id.name, action_name, time)
    result
  end

  defp to_spawn_hosts(id, actor_hosts, spawned_opts) do
    Enum.map(actor_hosts, fn %HostActor{
                               node: node,
                               actor: %Actor{} = unamed_actor,
                               opts: opts
                             } = _host ->
      spawned_actor = %Actor{unamed_actor | id: id}

      new_opts =
        if Keyword.has_key?(spawned_opts, :revision) do
          Keyword.put(opts, :revision, Keyword.get(spawned_opts, :revision, 0))
        else
          opts
        end

      %HostActor{node: node, actor: spawned_actor, opts: new_opts}
    end)
  end

  defp maybe_include_span(span_ctx) do
    if is_nil(span_ctx), do: {span_ctx, OpenTelemetry.Ctx.new()}, else: {span_ctx, span_ctx}
  end

  defp get_caller(nil), do: "external"
  defp get_caller(caller), do: caller.name

  @doc """
  Performs the action of looking up or creating an actor based on the given parameters.

  This function is responsible for looking up or creating an actor based on the specified actor
  fully-qualified domain name (FQDN). It incorporates distributed tracing with OpenTelemetry
  to capture relevant events and attributes during the lookup or creation process.

  ## Parameters

  - `system_name` (String): The name of the actor system.
  - `actor_fqdn` (tuple): A tuple representing the fully-qualified domain name (FQDN) of the actor.
  - `system` (%ActorSystem{}): The actor system.
  - `action_fun` (function): The function to be invoked once the actor is looked up or created.
    It receives the actor reference and actor reference ID as parameters.

  ## Tracing Context

  The function sets up a span with the name "actor-lookup" to trace the lookup or creation
  process. It adds relevant attributes, such as the actor FQDN, to the tracing context.

  ## Retry Mechanism

  The function incorporates a retry mechanism with backoff, randomization, and timeout
  to handle potential errors during the lookup or creation process.

  ## Returns

  The result of the `action_fun` function or an error tuple in case of failure.

  ## Error Handling

  In case of errors during the lookup or creation process, appropriate logging and tracing events
  are added, and the error is returned as part of the result tuple.

  """
  def do_lookup_action(
        system_name,
        {pooled, system_name, parent, %ActorId{name: actor_name} = actor_id} = actor_fqdn,
        system,
        action_fun
      ) do
    Tracer.with_span "actor-lookup" do
      Tracer.set_attributes([{:actor_fqdn, actor_fqdn}])

      case Spawn.Cluster.Node.Registry.lookup(Actors.Actor.Entity, parent) do
        [{actor_ref, actor_ref_id}] ->
          Tracer.add_event("actor-status", [{"alive", true}])
          Tracer.set_attributes([{"actor-pid", "#{inspect(actor_ref)}"}])
          Logger.debug("Lookup Actor #{actor_name}. PID: #{inspect(actor_ref)}")

          # Ensures that the name change will not affect the host function call
          if pooled do
            throw("Pooled Actors are not supported yet")
            # action_fun.(actor_ref, %ActorId{actor_ref_id | name: actor_name})
          else
            action_fun.(actor_ref, actor_ref_id)
          end

        _ ->
          Tracer.add_event("actor-status", [{"alive", false}])

          Tracer.with_span "actor-reactivation" do
            Tracer.set_attributes([{:system_name, system_name}])
            Tracer.set_attributes([{:actor_name, actor_name}])

            case ActorRegistry.lookup(actor_id,
                   filter_by_parent: pooled,
                   parent: parent
                 ) do
              {:ok, %HostActor{node: node, actor: actor, opts: opts}} ->
                do_call(
                  system,
                  node,
                  actor,
                  actor_fqdn,
                  action_fun,
                  opts
                )

              {:not_found, _} ->
                Logger.error("Actor #{actor_name} not found on ActorSystem #{system_name}")

                Tracer.add_event("reactivation-failure", [
                  {:cause, "not_found"}
                ])

                {:error, "Actor #{actor_name} not found on ActorSystem #{system_name}"}

              {:erpc, :timeout} ->
                Logger.error(
                  "Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}: Node connection timeout"
                )

                Tracer.add_event("reactivation-failure", [
                  {:cause, "timeout"}
                ])

                {:error, "Node connection timeout"}

              {:error, reason} ->
                Logger.error(
                  "Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}: #{inspect(reason)}"
                )

                Tracer.add_event("reactivation-failure", [
                  {:cause, "#{inspect(reason)}"}
                ])

                {:error, reason}

              _ ->
                Logger.error("Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}")

                Tracer.add_event("reactivation-failure", [
                  {:cause, "unknown"}
                ])

                {:error, "Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}"}
            end
          end
      end
    end
  end

  defp do_call(
         system,
         node,
         actor,
         {pooled, _system_name, _parent, actor_name} = _actor_fqdn,
         action_fun,
         opts
       ) do
    case :erpc.call(
           node,
           __MODULE__,
           :try_reactivate_actor,
           [system, actor, opts],
           @erpc_timeout
         ) do
      {:ok, actor_ref} ->
        Tracer.set_attributes([{"actor-pid", "#{inspect(actor_ref)}"}])

        Tracer.add_event("try-reactivate-actor", [
          {"reactivation-on-node", "#{inspect(node)}"}
        ])

        if pooled,
          # Ensures that the name change will not affect the host function call
          do: action_fun.(actor_ref, %ActorId{actor.id | name: actor_name.name}),
          else: action_fun.(actor_ref, actor.id)

      _ ->
        raise ErlangError
    end
  end

  defp maybe_invoke_async(true, actor_ref, request, opts) do
    ActorEntity.invoke_async(actor_ref, request, opts)
    {:ok, :async}
  end

  defp maybe_invoke_async(false, actor_ref, request, opts) do
    ActorEntity.invoke(actor_ref, request, opts)
  end

  @doc """
  Tries to reactivate an actor.

  Reactivation is attempted by looking up the actor in the registry
  or creating a new actor if not found.
  """
  @spec try_reactivate_actor(ActorSystem.t(), Actor.t(), any()) :: {:ok, any()} | {:error, any()}
  def try_reactivate_actor(system, actor, opts \\ [])

  def try_reactivate_actor(
        %ActorSystem{} = system,
        %Actor{id: %ActorId{name: name} = _id} = actor,
        opts
      ) do
    case ActorEntitySupervisor.lookup_or_create_actor(system, actor, opts) do
      {:ok, actor_ref} ->
        Logger.debug("Actor #{name} reactivated. ActorRef PID: #{inspect(actor_ref)}")
        {:ok, actor_ref}

      reason ->
        Logger.error("Failed to reactivate actor #{name}: #{inspect(reason)}")
        {:error, reason}
    end
  end

  # To lookup all actors
  def try_reactivate_actor(nil, %Actor{id: %ActorId{name: name} = _id} = actor, opts) do
    case ActorEntitySupervisor.lookup_or_create_actor(nil, actor, opts) do
      {:ok, actor_ref} ->
        Logger.debug("Actor #{name} reactivated. ActorRef PID: #{inspect(actor_ref)}")
        {:ok, actor_ref}

      reason ->
        Logger.error("Failed to reactivate actor #{name}: #{inspect(reason)}")
        {:error, reason}
    end
  end

  defp warmup_actors(actor_system, actors, opts) when is_map(actors) do
    spawn(fn ->
      actors
      |> Flow.from_enumerable(
        min_demand: @activate_actors_min_demand,
        max_demand: @activate_actors_max_demand
      )
      |> Flow.filter(&is_selectable?/1)
      |> Flow.map(fn {actor_name, actor} ->
        {time, result} =
          :timer.tc(&lookup_or_create_actor/4, [actor_system, actor_name, actor, opts])

        Logger.info(
          "Actor #{actor_name} Activated on Node #{inspect(Node.self())} in #{inspect(time)}ms"
        )

        result
      end)
      |> Flow.run()
    end)
  end

  @spec lookup_or_create_actor(ActorSystem.t(), String.t(), Actor.t(), any()) ::
          {:ok, pid()} | {:error, String.t()}
  defp lookup_or_create_actor(actor_system, actor_name, actor, opts) do
    case ActorEntitySupervisor.lookup_or_create_actor(actor_system, actor, opts) do
      {:ok, pid} ->
        {:ok, pid}

      _ ->
        Logger.debug("Failed to register Actor #{actor_name}")
        {:error, "Failed to register Actor #{actor_name}"}
    end
  end

  defp is_selectable?(
         {_actor_name,
          %Actor{
            metadata: %Metadata{channel_group: channel_group},
            settings: %ActorSettings{stateful: stateful, kind: kind}
          } = _actor}
       ) do
    cond do
      kind == :POOLED ->
        false

      match?(true, stateful) and kind != :UNAMED ->
        true

      not is_nil(channel_group) and length(channel_group) > 0 ->
        true

      true ->
        false
    end
  end

  defp is_selectable?({_actor_name, %Actor{} = _actor}),
    do: false
end