lib/actors/actor/caller_consumer.ex

defmodule Actors.Actor.CallerConsumer do
  @moduledoc """

  """
  use GenStage
  use Retry

  require Logger
  require OpenTelemetry.Tracer, as: Tracer

  alias Actors.Actor.CallerProducer
  alias Actors.Config.PersistentTermConfig, as: Config
  alias Actors.Actor.Entity, as: ActorEntity
  alias Actors.Actor.Entity.Supervisor, as: ActorEntitySupervisor
  alias Actors.Actor.InvocationScheduler
  alias Actors.Actor.Pool, as: ActorPool

  alias Actors.Registry.{ActorRegistry, HostActor}

  alias Eigr.Functions.Protocol.Actors.{
    Actor,
    ActorId,
    Metadata,
    ActorSettings,
    ActorSystem,
    Registry
  }

  alias Eigr.Functions.Protocol.{
    InvocationRequest,
    ProxyInfo,
    RegistrationRequest,
    RegistrationResponse,
    RequestStatus,
    ServiceInfo,
    SpawnRequest,
    SpawnResponse
  }

  alias Sidecar.Measurements

  import Spawn.Utils.Common, only: [to_existing_atom_or_new: 1]

  @activate_actors_min_demand 0
  @activate_actors_max_demand 4

  @erpc_timeout 5_000

  def start_link(opts \\ []) do
    id = Keyword.get(opts, :id, 1)
    GenStage.start_link(__MODULE__, opts, name: Module.concat(__MODULE__, "#{id}"))
  end

  @impl true
  def init(opts) do
    {min_demand, max_demand} = get_backpressure_values_allowed(opts)

    {:consumer, :ok,
     subscribe_to: [
       {CallerProducer, min_demand: min_demand, max_demand: max_demand}
     ]}
  end

  defp get_backpressure_values_allowed(opts) do
    index = Keyword.get(opts, :id, 0)
    actual_max_demand = Config.get(:actors_global_backpressure_max_demand)
    actual_min_demand = Config.get(:actors_global_backpressure_min_demand)
    backpressure_options = {actual_min_demand, actual_max_demand}

    Logger.debug(
      "Initialize Actor Event Consumer ID: #{index}. With Backpressure options: #{inspect(backpressure_options)}"
    )

    backpressure_options
  end

  @impl true
  def handle_events(events, _from, state) do
    if length(events) > 1,
      do: Logger.debug("Flushing the Event buffer. Buffer Size: #{inspect(length(events))}")

    Enum.each(events, &dispatch_to_actor/1)

    {:noreply, [], state}
  end

  defp dispatch_to_actor({from, {:register, event, opts}} = _producer_event) do
    reply_to_producer(from, register(event, opts))
  end

  defp dispatch_to_actor({from, {:get_state, event, _opts}} = _producer_event) do
    reply_to_producer(from, get_state(event))
  end

  defp dispatch_to_actor({from, {:spawn_actor, event, opts}} = _producer_event) do
    reply_to_producer(from, spawn_actor(event, opts))
  end

  defp dispatch_to_actor({from, {:invoke, request, opts}} = _producer_event) do
    if request.register_ref != "" and not is_nil(request.register_ref) do
      spawn_req = %SpawnRequest{
        actors: [%ActorId{request.actor.id | parent: request.register_ref}]
      }

      spawn_actor(spawn_req, opts)
    end

    reply_to_producer(from, invoke_with_span(request, opts))
  end

  defp reply_to_producer(:fake_from, response), do: response

  defp reply_to_producer(from, response) do
    GenStage.reply(from, response)
  end

  def register(
        %RegistrationRequest{
          service_info: %ServiceInfo{} = _service_info,
          actor_system:
            %ActorSystem{name: _name, registry: %Registry{actors: actors} = _registry} =
              actor_system
        } = _registration,
        opts
      ) do
    if Sidecar.GracefulShutdown.running?() do
      actors
      |> Map.values()
      |> Enum.map(fn actor -> ActorPool.create_actor_host_pool(actor, opts) end)
      |> List.flatten()
      |> Enum.filter(&(&1.node == Node.self()))
      |> ActorRegistry.register()
      |> tap(fn _sts -> warmup_actors(actor_system, actors, opts) end)
      |> case do
        :ok ->
          status = %RequestStatus{status: :OK, message: "Accepted"}
          {:ok, %RegistrationResponse{proxy_info: get_proxy_info(), status: status}}

        _ ->
          status = %RequestStatus{
            status: :ERROR,
            message: "Failed to register one or more Actors"
          }

          {:error, %RegistrationResponse{proxy_info: get_proxy_info(), status: status}}
      end
    else
      status = %RequestStatus{
        status: :ERROR,
        message: "You can't register actors when node is stopping"
      }

      {:error, %RegistrationResponse{proxy_info: get_proxy_info(), status: status}}
    end
  end

  defp get_proxy_info() do
    %ProxyInfo{
      protocol_major_version: 1,
      protocol_minor_version: 2,
      proxy_name: "spawn",
      proxy_version: "1.0.0-rc.33"
    }
  end

  def get_state(%ActorId{name: actor_name, system: system_name} = id) do
    retry with: exponential_backoff() |> randomize |> expiry(30_000),
          atoms: [:error, :exit, :noproc, :erpc, :noconnection, :timeout],
          rescue_only: [ErlangError] do
      try do
        do_lookup_action(
          system_name,
          {false, system_name, actor_name, id},
          nil,
          fn actor_ref, _actor_ref_id ->
            ActorEntity.get_state(actor_ref)
          end
        )
      rescue
        e ->
          Logger.error("Failure to make a call to actor #{inspect(actor_name)} #{inspect(e)}")

          reraise e, __STACKTRACE__
      end
    after
      result -> result
    else
      error -> error
    end
  end

  def spawn_actor(spawn, opts \\ [])

  def spawn_actor(%SpawnRequest{actors: actors} = _spawn, opts) do
    hosts =
      Enum.map(actors, fn %ActorId{} = id ->
        case ActorRegistry.get_hosts_by_actor(id, parent: true) do
          {:ok, actor_hosts} ->
            to_spawn_hosts(id, actor_hosts, opts)
            |> then(fn hosts ->
              if Sidecar.GracefulShutdown.get_status() in [:draining, :stopping] do
                Enum.reject(hosts, &(&1.node == Node.self()))
              else
                hosts
              end
            end)

          error ->
            raise ArgumentError,
                  "You are trying to create an actor from an Unamed actor that has never been registered before. ActorId: #{inspect(id)}. Details. #{inspect(error)}"
        end
      end)
      |> List.flatten()
      |> Enum.filter(&(&1.node == Node.self()))

    ActorRegistry.register(hosts)

    status = %RequestStatus{status: :OK, message: "Accepted"}
    {:ok, %SpawnResponse{status: status}}
  end

  def invoke_with_span(
        %InvocationRequest{
          actor: %Actor{id: %ActorId{name: _name, system: _actor_id_system} = actor_id} = actor,
          system: %ActorSystem{} = system,
          action_name: action_name,
          async: async?,
          metadata: metadata,
          caller: caller,
          pooled: pooled?
        } = request,
        opts
      ) do
    {time, result} =
      :timer.tc(fn ->
        metadata_attributes =
          for {key, value} <- metadata,
              do: {to_existing_atom_or_new(key), value}

        metadata_attributes =
          metadata_attributes ++
            [
              {:async, async?},
              {"from", get_caller(caller)},
              {"target", actor_id.name}
            ]

        {_current, opts} =
          Keyword.get_and_update(opts, :span_ctx, fn span_ctx ->
            maybe_include_span(span_ctx)
          end)

        Tracer.with_span opts[:span_ctx], "client invoke", kind: :client do
          Tracer.set_attributes(metadata_attributes)

          # Instead of using Map.get/3, which performs a lookup twice, we use pattern matching
          timeout =
            case metadata["request-timeout"] do
              nil -> 10_000
              value -> value
            end

          retry_while with: exponential_backoff() |> randomize |> expiry(timeout) do
            try do
              Tracer.add_event("lookup", [{"target", actor.id.name}])

              actor_fqdn =
                if pooled? do
                  case ActorRegistry.get_hosts_by_actor(actor_id) do
                    {:ok, actor_hosts} ->
                      # Here the results are shuffled using Enum.shuffle/1 to introduce randomness.
                      # Then, the first shuffled result is chosen as the random choice.
                      # This approach is more efficient than choosing randomly from a complete list

                      # Shuffle the results to introduce randomness
                      shuffled_actor_hosts = Enum.shuffle(actor_hosts)

                      # Choose the first result (which is now a random result)
                      host = hd(shuffled_actor_hosts)

                      {pooled?, system.name, host.actor.id.parent, actor_id}

                    _ ->
                      fqdn =
                        {pooled?, system.name, "#{actor.id.name}-1",
                         %ActorId{actor_id | name: "#{actor.id.name}-1", parent: actor_id.name}}

                      fqdn
                  end
                else
                  {pooled?, system.name, actor_id.name, actor_id}
                end

              do_lookup_action(system.name, actor_fqdn, system, fn actor_ref, actor_ref_id ->
                %InvocationRequest{
                  actor: %Actor{} = actor
                } = request

                request_params = %InvocationRequest{
                  request
                  | actor: %Actor{actor | id: actor_ref_id}
                }

                if is_nil(request.scheduled_to) || request.scheduled_to == 0 do
                  maybe_invoke_async(async?, actor_ref, request_params, opts)
                else
                  InvocationScheduler.schedule_invoke(request_params)

                  {:ok, :async}
                end
              end)
            rescue
              e ->
                Logger.error(
                  "Failure to make a call to actor #{inspect(actor.id.name)} #{inspect(e)}"
                )

                reraise e, __STACKTRACE__
            end
            |> case do
              result = :error ->
                {:cont, result}

              result = {:error, msg} ->
                {:cont, result}

              result = {:error, :action_not_found, msg} ->
                {:halt, result}

              result ->
                {:halt, result}
            end
          end
        end
      end)

    Measurements.dispatch_invoke_duration(system.name, actor.id.name, action_name, time)
    result
  end

  defp to_spawn_hosts(id, actor_hosts, spawned_opts) do
    Enum.map(actor_hosts, fn %HostActor{
                               node: node,
                               actor: %Actor{} = unamed_actor,
                               opts: opts
                             } = _host ->
      spawned_actor = %Actor{unamed_actor | id: id}

      new_opts =
        if Keyword.has_key?(spawned_opts, :revision) do
          Keyword.put(opts, :revision, Keyword.get(spawned_opts, :revision, 0))
        else
          opts
        end

      %HostActor{node: node, actor: spawned_actor, opts: new_opts}
    end)
  end

  defp maybe_include_span(span_ctx) do
    if is_nil(span_ctx), do: {span_ctx, OpenTelemetry.Ctx.new()}, else: {span_ctx, span_ctx}
  end

  defp get_caller(nil), do: "external"
  defp get_caller(caller), do: caller.name

  defp do_lookup_action(
         system_name,
         {pooled, system_name, parent, %ActorId{name: actor_name} = actor_id} = actor_fqdn,
         system,
         action_fun
       ) do
    Tracer.with_span "actor-lookup" do
      Tracer.set_attributes([{:actor_fqdn, actor_fqdn}])

      case Spawn.Cluster.Node.Registry.lookup(Actors.Actor.Entity, parent) do
        [{actor_ref, actor_ref_id}] ->
          Tracer.add_event("actor-status", [{"alive", true}])
          Tracer.set_attributes([{"actor-pid", "#{inspect(actor_ref)}"}])
          Logger.debug("Lookup Actor #{actor_name}. PID: #{inspect(actor_ref)}")

          # Ensures that the name change will not affect the host function call
          if pooled do
            throw("Pooled Actors are not supported yet")
            # action_fun.(actor_ref, %ActorId{actor_ref_id | name: actor_name})
          else
            action_fun.(actor_ref, actor_ref_id)
          end

        _ ->
          Tracer.add_event("actor-status", [{"alive", false}])

          Tracer.with_span "actor-reactivation" do
            Tracer.set_attributes([{:system_name, system_name}])
            Tracer.set_attributes([{:actor_name, actor_name}])

            case ActorRegistry.lookup(actor_id,
                   filter_by_parent: pooled,
                   parent: parent
                 ) do
              {:ok, %HostActor{node: node, actor: actor, opts: opts}} ->
                do_call(
                  system,
                  node,
                  actor,
                  actor_fqdn,
                  action_fun,
                  opts
                )

              {:not_found, _} ->
                Logger.error("Actor #{actor_name} not found on ActorSystem #{system_name}")

                Tracer.add_event("reactivation-failure", [
                  {:cause, "not_found"}
                ])

                {:error, "Actor #{actor_name} not found on ActorSystem #{system_name}"}

              {:erpc, :timeout} ->
                Logger.error(
                  "Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}: Node connection timeout"
                )

                Tracer.add_event("reactivation-failure", [
                  {:cause, "timeout"}
                ])

                {:error, "Node connection timeout"}

              {:error, reason} ->
                Logger.error(
                  "Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}: #{inspect(reason)}"
                )

                Tracer.add_event("reactivation-failure", [
                  {:cause, "#{inspect(reason)}"}
                ])

                {:error, reason}

              _ ->
                Logger.error("Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}")

                Tracer.add_event("reactivation-failure", [
                  {:cause, "unknown"}
                ])

                {:error, "Failed to invoke Actor #{actor_name} on ActorSystem #{system_name}"}
            end
          end
      end
    end
  end

  defp do_call(
         system,
         node,
         actor,
         {pooled, _system_name, _parent, actor_name} = _actor_fqdn,
         action_fun,
         opts
       ) do
    case :erpc.call(
           node,
           __MODULE__,
           :try_reactivate_actor,
           [system, actor, opts],
           @erpc_timeout
         ) do
      {:ok, actor_ref} ->
        Tracer.set_attributes([{"actor-pid", "#{inspect(actor_ref)}"}])

        Tracer.add_event("try-reactivate-actor", [
          {"reactivation-on-node", "#{inspect(node)}"}
        ])

        if pooled,
          # Ensures that the name change will not affect the host function call
          do: action_fun.(actor_ref, %ActorId{actor.id | name: actor_name.name}),
          else: action_fun.(actor_ref, actor.id)

      _ ->
        raise ErlangError
    end
  end

  defp maybe_invoke_async(true, actor_ref, request, opts) do
    ActorEntity.invoke_async(actor_ref, request, opts)
    {:ok, :async}
  end

  defp maybe_invoke_async(false, actor_ref, request, opts) do
    ActorEntity.invoke(actor_ref, request, opts)
  end

  @spec try_reactivate_actor(ActorSystem.t(), Actor.t(), any()) :: {:ok, any()} | {:error, any()}
  def try_reactivate_actor(system, actor, opts \\ [])

  def try_reactivate_actor(
        %ActorSystem{} = system,
        %Actor{id: %ActorId{name: name} = _id} = actor,
        opts
      ) do
    case ActorEntitySupervisor.lookup_or_create_actor(system, actor, opts) do
      {:ok, actor_ref} ->
        Logger.debug("Actor #{name} reactivated. ActorRef PID: #{inspect(actor_ref)}")
        {:ok, actor_ref}

      reason ->
        Logger.error("Failed to reactivate actor #{name}: #{inspect(reason)}")
        {:error, reason}
    end
  end

  # To lookup all actors
  def try_reactivate_actor(nil, %Actor{id: %ActorId{name: name} = _id} = actor, opts) do
    case ActorEntitySupervisor.lookup_or_create_actor(nil, actor, opts) do
      {:ok, actor_ref} ->
        Logger.debug("Actor #{name} reactivated. ActorRef PID: #{inspect(actor_ref)}")
        {:ok, actor_ref}

      reason ->
        Logger.error("Failed to reactivate actor #{name}: #{inspect(reason)}")
        {:error, reason}
    end
  end

  defp warmup_actors(actor_system, actors, opts) when is_map(actors) do
    spawn(fn ->
      actors
      |> Flow.from_enumerable(
        min_demand: @activate_actors_min_demand,
        max_demand: @activate_actors_max_demand
      )
      |> Flow.filter(&is_selectable?/1)
      |> Flow.map(fn {actor_name, actor} ->
        {time, result} =
          :timer.tc(&lookup_or_create_actor/4, [actor_system, actor_name, actor, opts])

        Logger.info(
          "Actor #{actor_name} Activated on Node #{inspect(Node.self())} in #{inspect(time)}ms"
        )

        result
      end)
      |> Flow.run()
    end)
  end

  @spec lookup_or_create_actor(ActorSystem.t(), String.t(), Actor.t(), any()) ::
          {:ok, pid()} | {:error, String.t()}
  defp lookup_or_create_actor(actor_system, actor_name, actor, opts) do
    case ActorEntitySupervisor.lookup_or_create_actor(actor_system, actor, opts) do
      {:ok, pid} ->
        {:ok, pid}

      _ ->
        Logger.debug("Failed to register Actor #{actor_name}")
        {:error, "Failed to register Actor #{actor_name}"}
    end
  end

  defp is_selectable?(
         {_actor_name,
          %Actor{
            metadata: %Metadata{channel_group: channel_group},
            settings: %ActorSettings{stateful: stateful, kind: kind}
          } = _actor}
       ) do
    cond do
      kind == :POOLED ->
        false

      match?(true, stateful) and kind != :UNAMED ->
        true

      not is_nil(channel_group) and length(channel_group) > 0 ->
        true

      true ->
        false
    end
  end

  defp is_selectable?({_actor_name, %Actor{} = _actor}),
    do: false
end