core/core.ex

# Copyright(c) 2015-2023 ACCESS CO., LTD. All rights reserved.

use Croma

defmodule AntikytheraCore do
  use Application
  alias Antikythera.GearActionTimeout
  alias AntikytheraEal.ClusterConfiguration
  require AntikytheraCore.Logger, as: L

  defun add_translator_to_logger() :: :ok do
    Logger.add_translator({AntikytheraCore.ErlangLogTranslator, :translate})
  end

  @doc """
  Callback implementation of `Application.start/2`.

  Interdependencies between initialization steps here are crucial.
  See also `AntikytheraCore.StartupManager` for initializations after construction
  of the supervision tree.
  """
  @impl true
  def start(_type, _args) do
    add_translator_to_logger()
    # In dev or local environment, the log level is initially set to `:notice` at mix_common.exs
    # in order to avoid SASL progress reports.
    # The log level is restored to `:info` after loading antikythera.
    Logger.configure(level: :info)
    L.info("starting AntikytheraCore")
    add_gears_dir_to_erl_libs()
    AntikytheraCore.FileSetup.setup_files_and_ets_tables()
    AntikytheraCore.Config.Core.load()
    # Just to suppress log messages by `:syn.add_node_to_scopes/1`.
    if not Antikythera.Env.no_listen?() do
      calculate_connection_trial_count_from_health_check_grace_period()
      |> establish_connections_to_other_nodes()

      :syn.add_node_to_scopes([:antikythera])
    end

    L.info("activating RaftFleet")

    activate_raft_fleet(fn ->
      if not Antikythera.Env.no_listen?() do
        start_cowboy_http()
      end

      {:ok, pid} = start_sup()
      # `GearManager` and `StartupManager` must be up and running here
      AntikytheraCore.Config.Gear.load_all(0)
      L.info("started AntikytheraCore")
      {:ok, pid}
    end)
  end

  defp add_gears_dir_to_erl_libs() do
    # Set ERL_LIBS environment variable in order to load gear's code appropriately.
    # See also: http://www.erlang.org/doc/man/code.html#lib_dir-1
    dirs = (System.get_env("ERL_LIBS") || "") |> String.split(":")
    new_value = [AntikytheraCore.Version.Artifact.gears_dir() | dirs] |> Enum.join(":")
    System.put_env("ERL_LIBS", new_value)
  end

  @connection_retrial_interval_in_milliseconds 5_000

  defunpt calculate_connection_trial_count_from_health_check_grace_period() :: pos_integer do
    connection_retrial_interval_in_seconds = @connection_retrial_interval_in_milliseconds / 1000

    (ClusterConfiguration.health_check_grace_period_in_seconds() /
       connection_retrial_interval_in_seconds)
    |> trunc()
    |> max(1)
  end

  defp establish_connections_to_other_nodes(tries_remaining) do
    if tries_remaining == 0 do
      raise "cannot establish connections to other nodes!"
    else
      case AntikytheraCore.Cluster.connect_to_other_nodes_on_start() do
        {:ok, true} ->
          :ok

        _otherwise ->
          L.info("failed to establish connections to other nodes; retry afterward")
          :timer.sleep(@connection_retrial_interval_in_milliseconds)
          establish_connections_to_other_nodes(tries_remaining - 1)
      end
    end
  end

  defp activate_raft_fleet(f) do
    :ok = RaftFleet.activate(ClusterConfiguration.zone_of_this_host())

    try do
      f.()
    catch
      type, reason ->
        # When an error occurred in the core part of `start/2`, try to cleanup this node so that
        # existing consensus groups (especially `RaftFleet.Cluster`) are not disturbed by the failing node.
        RaftFleet.deactivate()
        # wait for a moment in the hope that deactivation succeeds...
        :timer.sleep(10_000)
        {:error, {type, reason}}
    end
  end

  defp start_cowboy_http() do
    dispatch_rules = AntikytheraCore.Handler.CowboyRouting.compiled_routes([], false)

    ranch_transport_opts = %{
      # limit is imposed on a per-executor pool basis
      max_connections: :infinity,
      socket_opts: [port: Antikythera.Env.port_to_listen()]
    }

    cowboy_proto_opts = %{
      # timeout of a request with no data transfer; must be sufficiently longer than the gear action timeout
      idle_timeout: GearActionTimeout.max() + 20_000,
      # timeout of a connection with no requests; this should be longer than LB's idle timeout
      request_timeout: GearActionTimeout.max() + 60_000,
      env: %{dispatch: dispatch_rules},
      stream_handlers: [:cowboy_compress_h, :cowboy_stream_h]
    }

    {:ok, _} =
      :cowboy.start_clear(:antikythera_http_listener, ranch_transport_opts, cowboy_proto_opts)
  end

  defp start_sup() do
    children = [
      AntikytheraCore.ErrorCountsAccumulator,
      {AntikytheraCore.Alert.Manager, [:antikythera, AntikytheraCore.Alert.Manager]},
      AntikytheraCore.GearManager,
      AntikytheraCore.ClusterHostsPoller,
      AntikytheraCore.ClusterNodesConnector,
      AntikytheraCore.MnesiaNodesCleaner,
      AntikytheraCore.StartupManager,
      AntikytheraCore.TerminationManager,
      AntikytheraCore.CoreConfigPoller,
      AntikytheraCore.GearConfigPoller,
      AntikytheraCore.VersionUpgradeTaskQueue,
      AntikytheraCore.VersionSynchronizer,
      AntikytheraCore.StaleGearArtifactCleaner,
      {AntikytheraCore.MetricsUploader, [:antikythera, AntikytheraCore.MetricsUploader]},
      {AntikytheraCore.SystemMetricsReporter, [AntikytheraCore.MetricsUploader]},
      AntikytheraCore.ExecutorPool.Sup,
      AntikytheraCore.GearExecutorPoolsManager,
      AntikytheraCore.TenantExecutorPoolsManager,
      AntikytheraCore.TmpdirTracker,
      AntikytheraCore.ExecutorPool.AsyncJobLog.Writer
    ]

    children_for_dev =
      if Antikythera.Env.runtime_env() == :prod,
        do: [],
        else: [
          Supervisor.child_spec(
            {
              AntikytheraCore.PeriodicLog.Writer,
              [AntikytheraCore.PeriodicLog.ReductionBuilder, "reduction"]
            },
            id: :periodic_log_writer_1
          ),
          Supervisor.child_spec(
            {
              AntikytheraCore.PeriodicLog.Writer,
              [AntikytheraCore.PeriodicLog.MessageBuilder, "message"]
            },
            id: :periodic_log_writer_2
          )
        ]

    opts = [strategy: :one_for_one, name: AntikytheraCore.Supervisor]
    Supervisor.start_link(children ++ children_for_dev, opts)
  end
end