lib/actors/actor/entity/lifecycle.ex

defmodule Actors.Actor.Entity.Lifecycle do
  @moduledoc """
  Handles lifecycle functions for Actor Entity
  All the public functions here assumes they are executing inside a GenServer
  """
  require Logger

  alias Actors.Actor.{Entity.EntityState, Entity.Invocation, StateManager}

  alias Actors.Exceptions.NetworkPartitionException

  alias Eigr.Functions.Protocol.Actors.{
    Actor,
    ActorId,
    ActorDeactivationStrategy,
    ActorSettings,
    ActorState,
    ActorSnapshotStrategy,
    Metadata,
    TimeoutStrategy
  }

  alias Phoenix.PubSub

  alias Sidecar.Measurements

  @deactivated_status "DEACTIVATED"
  @default_deactivate_timeout 10_000
  @default_snapshot_timeout 2_000
  @default_pubsub_group :actor_channel
  @pubsub Application.compile_env(:spawn, :pubsub_group, @default_pubsub_group)
  @min_snapshot_threshold 100
  @timeout_jitter 3000

  def init(
        %EntityState{
          actor: %Actor{
            id: %ActorId{name: name, parent: parent} = _id,
            metadata: metadata,
            settings:
              %ActorSettings{
                stateful: stateful?,
                snapshot_strategy: snapshot_strategy,
                deactivation_strategy: deactivation_strategy,
                kind: kind
              } = _settings,
            timer_actions: timer_actions
          }
        } = state
      ) do
    Process.flag(:trap_exit, true)

    split_brain_detector_mod =
      Application.get_env(
        :spawn,
        :split_brain_detector
      )

    Logger.notice(
      "Activating Actor #{name} with Parent #{parent} in Node #{inspect(Node.self())}. Persistence #{stateful?}."
    )

    actor_name_key =
      if kind == :POOLED do
        parent
      else
        name
      end

    :ok = handle_metadata(name, metadata)
    :ok = Invocation.handle_timers(timer_actions)

    :ok =
      Spawn.Cluster.Node.Registry.update_entry_value(
        Actors.Actor.Entity,
        actor_name_key,
        self(),
        state.actor.id
      )

    schedule_deactivate(deactivation_strategy, get_jitter())

    state =
      case maybe_schedule_snapshot_advance(snapshot_strategy) do
        {:ok, timer} ->
          %EntityState{
            state
            | opts:
                Keyword.merge(state.opts,
                  timer: timer,
                  split_brain_detector: split_brain_detector_mod
                )
          }

        _ ->
          %EntityState{
            state
            | opts:
                Keyword.merge(state.opts,
                  split_brain_detector: split_brain_detector_mod
                )
          }
      end

    {:ok, state, {:continue, :load_state}}
  end

  def load_state(
        %EntityState{
          actor:
            %Actor{settings: %ActorSettings{stateful: true}, id: %ActorId{name: name} = id} =
              actor,
          opts: opts
        } = state
      ) do
    if is_nil(actor.state) or (!is_nil(actor.state) and is_nil(actor.state.state)) do
      "Internal state is empty for Actor #{name}. Getting state from state manager."
    else
      # This is currently not in use by any SDK. In other words, nobody starts the Actors via SDK with some initial state.
      "Internal state is not empty for Actor #{name}. Trying to reconcile the state with state manager."
    end
    |> Logger.debug()

    case StateManager.load(id) do
      {:ok, current_state, current_revision, status, node} ->
        split_brain_detector =
          Keyword.get(opts, :split_brain_detector, Actors.Node.DefaultSplitBrainDetector)

        with {:partition_check, {:ok, :continue}} <-
               {:partition_check, split_brain_detector.check_network_partition(status, node)} do
          {:noreply,
           %EntityState{
             state
             | actor: %Actor{actor | state: current_state},
               revisions: current_revision
           }, {:continue, :call_init_action}}
        else
          {:partition_check, {:error, :network_partition_detected}} ->
            Logger.warning(
              "We have detected a possible network partition issue and therefore this actor will not start"
            )

            raise NetworkPartitionException

          error ->
            Logger.warning(
              "We have detected a possible network partition issue and therefore this actor will not start. Details: #{inspect(error)}"
            )

            raise NetworkPartitionException
        end

      {:not_found, %{}, _current_revision} ->
        Logger.debug("Not found state on statestore for Actor #{name}.")
        {:noreply, state, {:continue, :call_init_action}}

      error ->
        Logger.error("Error on load state for Actor #{name}. Error: #{inspect(error)}")
        {:noreply, state, {:continue, :call_init_action}}
    end
  end

  def load_state(state), do: {:noreply, state, {:continue, :call_init_action}}

  def terminate(reason, %EntityState{
        revisions: revisions,
        actor:
          %Actor{
            id: %ActorId{name: name} = id,
            state: actor_state
          } = actor
      }) do
    if is_actor_valid?(actor) do
      StateManager.save(id, actor_state, revision: revisions, status: @deactivated_status)
    end

    Logger.debug("Terminating actor #{name} with reason #{inspect(reason)}")
  end

  def snapshot(
        %EntityState{
          system: system,
          actor:
            %Actor{
              id: %ActorId{name: name} = _id,
              state: actor_state,
              settings: %ActorSettings{
                stateful: true,
                snapshot_strategy: %ActorSnapshotStrategy{
                  strategy: {:timeout, %TimeoutStrategy{timeout: _timeout}} = snapshot_strategy
                }
              }
            } = _actor,
          opts: opts
        } = state
      )
      when is_nil(actor_state) or actor_state == %{} do
    {:message_queue_len, size} = Process.info(self(), :message_queue_len)
    Measurements.dispatch_actor_inflights(system, name, size)

    state =
      case schedule_snapshot(snapshot_strategy, opts) do
        {:ok, timer} ->
          %EntityState{state | opts: Keyword.merge(opts, timer: timer)}

        _ ->
          state
      end

    {:noreply, state, :hibernate}
  end

  def snapshot(
        %EntityState{
          system: system,
          state_hash: old_hash,
          revisions: revisions,
          actor:
            %Actor{
              id: %ActorId{name: name} = id,
              state: %ActorState{} = actor_state,
              settings: %ActorSettings{
                stateful: true,
                snapshot_strategy: %ActorSnapshotStrategy{
                  strategy: {:timeout, %TimeoutStrategy{timeout: timeout}} = snapshot_strategy
                }
              }
            } = _actor,
          opts: opts
        } = state
      ) do
    {:message_queue_len, size} = Process.info(self(), :message_queue_len)
    Measurements.dispatch_actor_inflights(system, name, size)

    # Persist State only when necessary
    new_state =
      if StateManager.is_new?(old_hash, actor_state.state) do
        Logger.debug("Snapshotting actor #{name}")
        revision = revisions + 1

        # Execute with timeout equals timeout strategy - 1 to avoid mailbox congestions
        case StateManager.save_async(id, actor_state, revision: revision, timeout: timeout - 1) do
          {:ok, _, hash} ->
            %{state | state_hash: hash, revisions: revision}

          {:error, _, _, hash} ->
            %{state | state_hash: hash, revisions: revision}

          {:error, :unsuccessfully, hash} ->
            %{state | state_hash: hash, revisions: revision}

          _ ->
            state
        end
      else
        state
      end

    state =
      case schedule_snapshot(snapshot_strategy, opts) do
        {:ok, timer} ->
          %EntityState{new_state | opts: Keyword.merge(opts, timer: timer)}

        _ ->
          new_state
      end

    {:noreply, state, :hibernate}
  end

  def snapshot(state), do: {:noreply, state, :hibernate}

  def deactivate(
        %EntityState{
          system: system,
          actor:
            %Actor{
              id: %ActorId{name: name} = _id,
              settings: %ActorSettings{
                deactivation_strategy:
                  %ActorDeactivationStrategy{strategy: deactivation_strategy} =
                    _actor_deactivation_strategy
              }
            } = _actor
        } = state
      ) do
    queue_length = Process.info(self(), :message_queue_len)
    {:message_queue_len, size} = queue_length
    Measurements.dispatch_actor_inflights(system, name, size)

    case queue_length do
      {:message_queue_len, 0} ->
        Logger.debug("Deactivating actor #{name} for timeout")
        {:stop, :shutdown, state}

      _ ->
        schedule_deactivate(deactivation_strategy)
        {:noreply, state, :hibernate}
    end
  end

  def deactivate(state), do: {:noreply, state, :hibernate}

  defp is_actor_valid?(
         %Actor{
           settings: %ActorSettings{stateful: stateful},
           state: actor_state
         } = _actor
       ) do
    stateful && !is_nil(actor_state)
  end

  defp handle_metadata(_actor, metadata) when is_nil(metadata) or metadata == %{} do
    :ok
  end

  defp handle_metadata(actor, %Metadata{channel_group: channel, tags: _tags} = _metadata) do
    :ok = subscribe(actor, channel)
    :ok
  end

  defp subscribe(_actor, channel) when is_nil(channel), do: :ok

  defp subscribe(actor, channel) do
    Logger.debug("Actor [#{actor}] is subscribing to channel [#{channel}]")
    PubSub.subscribe(@pubsub, channel)
  end

  # Timeout private functions

  defp schedule_snapshot(snapshot_strategy, opts) do
    timeout_factor = Keyword.get(opts, :timeout_factor, 0)
    timer = Keyword.get(opts, :timer, nil)

    if !is_nil(timer) do
      Process.cancel_timer(timer)
    end

    {:ok,
     Process.send_after(
       self(),
       :snapshot,
       get_snapshot_interval(snapshot_strategy, timeout_factor)
     )}
  end

  defp maybe_schedule_snapshot_advance(%ActorSnapshotStrategy{}) do
    timeout = @min_snapshot_threshold + get_jitter()

    {:ok, Process.send_after(self(), :snapshot, timeout)}
  end

  defp maybe_schedule_snapshot_advance(_), do: :ok

  defp schedule_deactivate(deactivation_strategy, timeout_factor \\ 0) do
    strategy = maybe_get_default_deactivation_strategy(deactivation_strategy)

    Process.send_after(
      self(),
      :deactivate,
      get_deactivate_interval(strategy, timeout_factor)
    )
  end

  defp maybe_get_default_deactivation_strategy({type, strategy}), do: {type, strategy}

  defp maybe_get_default_deactivation_strategy(deactivation_strategy) do
    Map.get(
      deactivation_strategy || %{},
      :strategy,
      {:timeout, %TimeoutStrategy{timeout: @default_deactivate_timeout}}
    )
  end

  defp get_snapshot_interval(
         {:timeout, %TimeoutStrategy{timeout: timeout}} = _timeout_strategy,
         timeout_factor
       ),
       do: (timeout || @default_snapshot_timeout) + timeout_factor

  defp get_deactivate_interval(
         {:timeout, %TimeoutStrategy{timeout: timeout}} = _timeout_strategy,
         timeout_factor
       ),
       do: (timeout || @default_deactivate_timeout) + timeout_factor

  defp get_jitter(), do: :rand.uniform(@timeout_jitter)
end