lib/oban/config.ex

defmodule Oban.Config do
  @moduledoc """
  The Config struct validates and encapsulates Oban instance state.

  Typically, you won't use the Config module directly. Oban automatically creates a Config struct
  on initialization and passes it through to all supervised children with the `:conf` key.

  To fetch a running Oban supervisor's config, see `Oban.config/1`.
  """

  alias Oban.Validation

  @type t :: %__MODULE__{
          dispatch_cooldown: pos_integer(),
          engine: module(),
          get_dynamic_repo: nil | (-> pid() | atom()),
          insert_trigger: boolean(),
          log: false | Logger.level(),
          name: Oban.name(),
          node: String.t(),
          notifier: {module(), Keyword.t()},
          peer: {module(), Keyword.t()},
          plugins: [module() | {module() | Keyword.t()}],
          prefix: false | String.t(),
          queues: Keyword.t(Keyword.t()),
          repo: module(),
          shutdown_grace_period: non_neg_integer(),
          stage_interval: timeout(),
          testing: :disabled | :inline | :manual
        }

  defstruct dispatch_cooldown: 5,
            engine: Oban.Engines.Basic,
            get_dynamic_repo: nil,
            insert_trigger: true,
            log: false,
            name: Oban,
            node: nil,
            notifier: {Oban.Notifiers.Postgres, []},
            peer: {Oban.Peers.Postgres, []},
            plugins: [],
            prefix: "public",
            queues: [],
            repo: nil,
            shutdown_grace_period: :timer.seconds(15),
            stage_interval: :timer.seconds(1),
            testing: :disabled

  @cron_keys ~w(crontab timezone)a
  @log_levels ~w(false emergency alert critical error warning warn notice info debug)a
  @renamed [{:engine, Oban.Queue.BasicEngine}, {:notifier, {Oban.PostgresNotifier, []}}]
  @testing_modes ~w(manual inline disabled)a

  @doc """
  Generate a Config struct after normalizing and verifying Oban options.

  See `Oban.start_link/1` for a comprehensive description of available options.

  ## Example

  Generate a minimal config with only a `:repo`:

      Oban.Config.new(repo: Oban.Test.Repo)
  """
  @spec new([Oban.option()]) :: t()
  def new(opts) when is_list(opts) do
    opts = normalize(opts)

    opts =
      if opts[:engine] == Oban.Engines.Lite do
        opts
        |> Keyword.put(:prefix, false)
        |> Keyword.put_new(:notifier, {Oban.Notifiers.PG, []})
        |> Keyword.put_new(:peer, {Oban.Peers.Isolated, []})
      else
        opts
      end

    opts =
      if opts[:testing] in [:manual, :inline] do
        opts
        |> Keyword.put(:peer, {Oban.Peers.Isolated, [leader?: false]})
        |> Keyword.put(:plugins, [])
        |> Keyword.put(:queues, [])
        |> Keyword.put(:stage_interval, :infinity)
      else
        opts
      end

    with {:error, reason} <- validate(opts) do
      raise ArgumentError, reason
    end

    struct!(__MODULE__, opts)
  end

  @doc """
  Verify configuration options.

  This helper is used by `new/1`, and therefore by `Oban.start_link/1`, to verify configuration
  options when an Oban supervisor starts. It is provided publicly to aid in configuration testing,
  as `test` config may differ from `prod` config.

  # Example

  Validating top level options:

      iex> Oban.Config.validate(name: Oban)
      :ok

      iex> Oban.Config.validate(name: Oban, log: false)
      :ok

      iex> Oban.Config.validate(node: {:not, :binary})
      {:error, "expected :node to be a binary, got: {:not, :binary}"}

      iex> Oban.Config.validate(plugins: true)
      {:error, "invalid value for :plugins, expected :plugins to be a list, got: true"}

  Validating plugin options:

      iex> Oban.Config.validate(plugins: [{Oban.Plugins.Pruner, max_age: 60}])
      :ok

      iex> Oban.Config.validate(plugins: [{Oban.Plugins.Pruner, max_age: 0}])
      {:error, "invalid value for :plugins, expected :max_age to be a positive integer, got: 0"}
  """
  @spec validate([Oban.option()]) :: :ok | {:error, String.t()}
  def validate(opts) when is_list(opts) do
    opts = normalize(opts)

    Validation.validate_schema(opts,
      dispatch_cooldown: :pos_integer,
      engine: {:behaviour, Oban.Engine},
      get_dynamic_repo: {:or, [:falsy, {:function, 0}]},
      insert_trigger: :boolean,
      log: {:enum, @log_levels},
      name: :any,
      node: {:pattern, ~r/^\S+$/},
      notifier: {:behaviour, Oban.Notifier},
      peer: {:or, [:falsy, {:behaviour, Oban.Peer}]},
      plugins: {:custom, &validate_plugins/1},
      prefix: {:or, [:falsy, :string]},
      queues: {:custom, &validate_queues(opts, &1)},
      repo: {:module, [config: 0]},
      shutdown_grace_period: :non_neg_integer,
      stage_interval: :timeout,
      testing: {:enum, @testing_modes}
    )
  end

  @doc false
  @spec get_engine(t()) :: module()
  def get_engine(%__MODULE__{engine: engine, testing: :disabled}), do: engine

  def get_engine(%__MODULE__{engine: engine, testing: testing}) do
    pids = [self() | Process.get(:"$callers", [])]

    if Enum.any?(pids, &inline_testing?(&1, testing)) do
      Oban.Engines.Inline
    else
      engine
    end
  end

  defp inline_testing?(pid, default) do
    case Process.info(pid, :dictionary) do
      {:dictionary, dictionary} -> Keyword.get(dictionary, :oban_testing, default) == :inline
      _ -> false
    end
  end

  @doc false
  @spec node_name(%{optional(binary()) => binary()}) :: binary()
  def node_name(env \\ System.get_env()) do
    cond do
      Node.alive?() ->
        to_string(node())

      Map.has_key?(env, "DYNO") ->
        Map.get(env, "DYNO")

      true ->
        :inet.gethostname()
        |> elem(1)
        |> to_string()
    end
  end

  @doc false
  @spec to_ident(t()) :: binary()
  def to_ident(%__MODULE__{name: name, node: node}) do
    inspect(name) <> "." <> to_string(node)
  end

  @doc false
  @spec match_ident?(t(), binary()) :: boolean()
  def match_ident?(%__MODULE__{} = conf, ident) when is_binary(ident) do
    to_ident(conf) == ident
  end

  # Validation

  defp validate_plugins(plugins) do
    Validation.validate(:plugins, plugins, &validate_plugin/1)
  end

  defp validate_plugin(plugin) when not is_tuple(plugin), do: validate_plugin({plugin, []})

  defp validate_plugin({plugin, opts}) do
    name = inspect(plugin)

    cond do
      not is_atom(plugin) ->
        {:error, "plugin #{name} is not a valid module"}

      not Code.ensure_loaded?(plugin) ->
        {:error, "plugin #{name} could not be loaded"}

      not function_exported?(plugin, :init, 1) ->
        {:error, "plugin #{name} is invalid because it's missing an `init/1` function"}

      not Keyword.keyword?(opts) ->
        {:error, "expected #{name} options to be a keyword list, got: #{inspect(opts)}"}

      function_exported?(plugin, :validate, 1) ->
        plugin.validate(opts)

      true ->
        :ok
    end
  end

  defp validate_queues(opts, queues) do
    if Keyword.keyword?(queues) do
      # Queue validation requires an engine and partial configuration. Only the engine matters,
      # but the other values are required for the struct.
      conf_opts =
        opts
        |> Keyword.take([:engine, :name, :node, :repo])
        |> Keyword.put_new(:engine, Oban.Engines.Basic)
        |> Keyword.put_new(:repo, None)

      conf = struct!(__MODULE__, conf_opts)

      Validation.validate(queues, &validate_queue(conf, &1))
    else
      {:error, "expected :queues to be a keyword list, got: #{inspect(queues)}"}
    end
  end

  defp validate_queue(conf, {name, opts}) do
    cond do
      is_integer(opts) and opts > 0 ->
        :ok

      Keyword.keyword?(opts) ->
        opts =
          opts
          |> Keyword.delete(:dispatch_cooldown)
          |> Keyword.put(:validate, true)

        case conf.engine.init(conf, opts) do
          {:ok, _meta} ->
            :ok

          {:error, error} ->
            {:error, "queue #{inspect(name)}, " <> Exception.message(error)}
        end

      true ->
        {:error,
         "expected queue #{inspect(name)} opts to be a positive integer limit or a " <>
           "keyword list, got: #{inspect(opts)}"}
    end
  end

  # Normalization

  defp normalize(opts) do
    opts
    |> crontab_to_plugin()
    |> normalize_notifier()
    |> normalize_peer()
    |> Keyword.put_new(:node, node_name())
    |> Keyword.update(:queues, [], &normalize_queues/1)
    |> Keyword.update(:plugins, [], &normalize_plugins/1)
    |> Keyword.delete(:circuit_backoff)
    |> stager_to_interval()
    |> Enum.reject(&(&1 in @renamed))
  end

  defp crontab_to_plugin(opts) do
    case {opts[:plugins], opts[:crontab]} do
      {plugins, [_ | _]} when is_list(plugins) or is_nil(plugins) ->
        {cron_opts, base_opts} = Keyword.split(opts, @cron_keys)

        plugin = {Oban.Plugins.Cron, cron_opts}

        Keyword.update(base_opts, :plugins, [plugin], &[plugin | &1])

      _ ->
        Keyword.drop(opts, @cron_keys)
    end
  end

  defp stager_to_interval(opts) do
    cond do
      Keyword.has_key?(opts, :poll_interval) ->
        opts
        |> Keyword.put_new(:stage_interval, opts[:poll_interval])
        |> Keyword.delete(:poll_interval)

      Keyword.keyword?(opts[:plugins]) ->
        {stager_opts, opts} = pop_in(opts, [:plugins, Oban.Plugins.Stager])

        if is_list(stager_opts) and Keyword.has_key?(stager_opts, :interval) do
          Keyword.put_new(opts, :stage_interval, stager_opts[:interval])
        else
          opts
        end

      true ->
        opts
    end
  end

  defp normalize_notifier(opts) do
    case Keyword.get(opts, :notifier) do
      module when is_atom(module) and not is_nil(module) ->
        Keyword.put(opts, :notifier, {module, []})

      _ ->
        opts
    end
  end

  defp normalize_peer(opts) do
    peer = opts[:peer]

    cond do
      peer == false or opts[:plugins] == false ->
        Keyword.put(opts, :peer, {Oban.Peers.Isolated, [leader?: false]})

      is_atom(peer) and not is_nil(peer) ->
        Keyword.put(opts, :peer, {peer, []})

      true ->
        opts
    end
  end

  defp normalize_queues(queues) when is_list(queues) do
    for {name, value} <- queues do
      opts = if is_integer(value), do: [limit: value], else: value

      {name, opts}
    end
  end

  defp normalize_queues(queues), do: queues || []

  # Manually specified plugins will be overwritten by auto-specified plugins unless we reverse the
  # plugin list. The order doesn't matter as they are supervised one-for-one.
  defp normalize_plugins(plugins) when is_list(plugins) do
    plugins
    |> Enum.map(&if is_atom(&1), do: {&1, []}, else: &1)
    |> Enum.reverse()
    |> Enum.uniq()
  end

  defp normalize_plugins(plugins), do: plugins || []
end