lib/actors/actor/entity/supervisor.ex

defmodule Actors.Actor.Entity.Supervisor do
  @moduledoc """
  `Entity.Supervisor` is the Supervisor of all Host Actors in the system.
  This in turn is Partitioned using a PartitionSupervisor.
  We use a hash function based on each Actor's child_spec to partition the actors
  so that the entire Supervisor is not overloaded and
  lessening the possibility of cascading failures.
  """
  use DynamicSupervisor
  require Logger

  alias Actors.Actor.Entity.EntityState
  alias Actors.Config.PersistentTermConfig, as: Config
  alias Eigr.Functions.Protocol.Actors.{Actor, ActorSystem}

  @default_number_of_partitions 8
  @shutdown_timeout_ms 330_000

  def child_spec(_opts) do
    {
      PartitionSupervisor,
      child_spec: DynamicSupervisor,
      name: __MODULE__,
      max_restarts: Config.get(:actors_max_restarts),
      max_seconds: Config.get(:actors_max_seconds),
      partitions: get_number_of_partitions()
    }
  end

  def start_link(_) do
    DynamicSupervisor.start_link(
      __MODULE__,
      [
        strategy: :one_for_one
      ],
      name: __MODULE__
    )
  end

  @impl true
  def init(args), do: DynamicSupervisor.init(args)

  @doc """
  Adds a Actor to the dynamic supervisor.
  """
  @spec lookup_or_create_actor(ActorSystem.t(), Actor.t(), any()) :: {:ok, any}
  def lookup_or_create_actor(system, actor, opts \\ [])

  def lookup_or_create_actor(
        actor_system,
        %Actor{} = actor,
        opts
      ) do
    revision = Keyword.get(opts, :revision, 0)

    entity_state = %EntityState{
      system: Map.get(actor_system || %{}, :name),
      actor: actor,
      revision: revision,
      opts: opts
    }

    child_spec = %{
      id: Actors.Actor.Entity,
      start: {Actors.Actor.Entity, :start_link, [entity_state]},
      restart: :transient,
      # wait until for 5 and a half minutes
      shutdown: @shutdown_timeout_ms
    }

    case DynamicSupervisor.start_child(via(child_spec), child_spec) do
      {:error, {:already_started, pid}} ->
        {:ok, pid}

      {:ok, pid} ->
        {:ok, pid}

      {:error, {:name_conflict, {{Actors.Actor.Entity, name}, _f}, _registry, pid}} ->
        Logger.warning("Name conflict on start Actor #{name} from PID #{inspect(pid)}.")

        :ignore
    end
  end

  defp get_key(spec), do: :erlang.phash2(Map.drop(spec, [:id]))

  defp via(spec), do: {:via, PartitionSupervisor, {__MODULE__, get_key(spec)}}

  defp get_number_of_partitions() do
    if System.schedulers_online() > 1 do
      System.schedulers_online()
    else
      @default_number_of_partitions
    end
  end
end