lib/oban/config.ex

defmodule Oban.Config do
  @moduledoc """
  The Config struct validates and encapsulates Oban instance state.

  Options passed to `Oban.start_link/1` are validated and stored in a config struct. Internal
  modules and plugins are always passed the config with a `:conf` key.
  """

  @type t :: %__MODULE__{
          circuit_backoff: timeout(),
          dispatch_cooldown: pos_integer(),
          engine: module(),
          notifier: module(),
          name: Oban.name(),
          node: binary(),
          plugins: [module() | {module() | Keyword.t()}],
          prefix: binary(),
          queues: [{atom(), Keyword.t()}],
          repo: module(),
          shutdown_grace_period: timeout(),
          log: false | Logger.level(),
          get_dynamic_repo: nil | (() -> pid() | atom())
        }

  @type option :: {:name, module()} | {:conf, t()}

  @enforce_keys [:node, :repo]
  defstruct circuit_backoff: :timer.seconds(30),
            dispatch_cooldown: 5,
            engine: Oban.Queue.BasicEngine,
            notifier: Oban.PostgresNotifier,
            name: Oban,
            node: nil,
            plugins: [],
            prefix: "public",
            queues: [],
            repo: nil,
            shutdown_grace_period: :timer.seconds(15),
            log: false,
            get_dynamic_repo: nil

  defguardp is_pos_integer(interval) when is_integer(interval) and interval > 0

  @doc false
  @spec new(Keyword.t()) :: t()
  def new(opts) when is_list(opts) do
    opts =
      opts
      |> crontab_to_plugin()
      |> poll_interval_to_plugin()
      |> Keyword.put_new(:node, node_name())
      |> Keyword.update(:plugins, [], &(&1 || []))
      |> Keyword.update(:queues, [], &(&1 || []))

    Enum.each(opts, &validate_opt!/1)

    opts =
      opts
      |> Keyword.update!(:queues, &parse_queues/1)
      |> Keyword.update!(:plugins, &normalize_plugins/1)

    struct!(__MODULE__, opts)
  end

  @doc false
  @spec node_name(%{optional(binary()) => binary()}) :: binary()
  def node_name(env \\ System.get_env()) do
    cond do
      Node.alive?() ->
        to_string(node())

      Map.has_key?(env, "DYNO") ->
        Map.get(env, "DYNO")

      true ->
        :inet.gethostname()
        |> elem(1)
        |> to_string()
    end
  end

  @doc false
  @spec to_ident(t()) :: binary()
  def to_ident(%__MODULE__{name: name, node: node}) do
    inspect(name) <> "." <> to_string(node)
  end

  @doc false
  @spec match_ident?(t(), binary()) :: boolean()
  def match_ident?(%__MODULE__{} = conf, ident) when is_binary(ident) do
    to_ident(conf) == ident
  end

  # Helpers

  @cron_keys [:crontab, :timezone]

  defp crontab_to_plugin(opts) do
    case {opts[:plugins], opts[:crontab]} do
      {plugins, [_ | _]} when is_list(plugins) or is_nil(plugins) ->
        {cron_opts, base_opts} = Keyword.split(opts, @cron_keys)

        plugin = {Oban.Plugins.Cron, cron_opts}

        Keyword.update(base_opts, :plugins, [plugin], &[plugin | &1])

      _ ->
        Keyword.drop(opts, @cron_keys)
    end
  end

  defp poll_interval_to_plugin(opts) do
    case {opts[:plugins], opts[:poll_interval]} do
      {plugins, interval} when (is_list(plugins) or is_nil(plugins)) and is_integer(interval) ->
        plugin = {Oban.Plugins.Stager, interval: interval}

        opts
        |> Keyword.delete(:poll_interval)
        |> Keyword.update(:plugins, [plugin], &[plugin | &1])

      {plugins, nil} when is_list(plugins) or is_nil(plugins) ->
        plugin = Oban.Plugins.Stager

        Keyword.update(opts, :plugins, [plugin], &[plugin | &1])

      _ ->
        Keyword.drop(opts, [:poll_interval])
    end
  end

  defp validate_opt!({:circuit_backoff, backoff}) do
    unless is_pos_integer(backoff) do
      raise ArgumentError,
            "expected :circuit_backoff to be a positive integer, got: #{inspect(backoff)}"
    end
  end

  defp validate_opt!({:dispatch_cooldown, cooldown}) do
    unless is_pos_integer(cooldown) do
      raise ArgumentError,
            "expected :dispatch_cooldown to be a positive integer, got: #{inspect(cooldown)}"
    end
  end

  defp validate_opt!({:engine, engine}) do
    unless Code.ensure_loaded?(engine) and function_exported?(engine, :init, 2) do
      raise ArgumentError,
            "expected :engine to be an Oban.Queue.Engine, got: #{inspect(engine)}"
    end
  end

  defp validate_opt!({:notifier, notifier}) do
    unless Code.ensure_loaded?(notifier) and function_exported?(notifier, :listen, 2) do
      raise ArgumentError,
            "expected :notifier to be an Oban.Notifier, got: #{inspect(notifier)}"
    end
  end

  defp validate_opt!({:name, _}), do: :ok

  defp validate_opt!({:node, node}) do
    unless is_binary(node) and String.trim(node) != "" do
      raise ArgumentError,
            "expected :node to be a non-empty binary, got: #{inspect(node)}"
    end
  end

  defp validate_opt!({:plugins, plugins}) do
    unless is_list(plugins) and Enum.all?(plugins, &valid_plugin?/1) do
      raise ArgumentError,
            "expected :plugins to be a list of modules or {module, keyword} tuples " <>
              ", got: #{inspect(plugins)}"
    end
  end

  defp validate_opt!({:prefix, prefix}) do
    unless is_binary(prefix) and Regex.match?(~r/^[a-z0-9_]+$/i, prefix) do
      raise ArgumentError,
            "expected :prefix to be a binary with alphanumeric characters, got: #{inspect(prefix)}"
    end
  end

  defp validate_opt!({:queues, queues}) do
    unless Keyword.keyword?(queues) and Enum.all?(queues, &valid_queue?/1) do
      raise ArgumentError,
            "expected :queues to be a keyword list of {atom, integer} pairs or " <>
              "a list of {atom, keyword} pairs, got: #{inspect(queues)}"
    end
  end

  defp validate_opt!({:repo, repo}) do
    unless Code.ensure_loaded?(repo) and function_exported?(repo, :__adapter__, 0) do
      raise ArgumentError,
            "expected :repo to be an Ecto.Repo, got: #{inspect(repo)}"
    end
  end

  defp validate_opt!({:shutdown_grace_period, period}) do
    unless is_pos_integer(period) do
      raise ArgumentError,
            "expected :shutdown_grace_period to be a positive integer, got: #{inspect(period)}"
    end
  end

  @log_levels ~w(false emergency alert critical error warning warn notice info debug)a

  defp validate_opt!({:log, log}) do
    unless log in @log_levels do
      raise ArgumentError,
            "expected :log to be one of #{inspect(@log_levels)}, got: #{inspect(log)}"
    end
  end

  defp validate_opt!({:get_dynamic_repo, fun}) do
    unless is_nil(fun) or is_function(fun, 0) do
      raise ArgumentError,
            "expected :get_dynamic_repo to be nil or a zero arity function, got: #{inspect(fun)}"
    end
  end

  defp validate_opt!(option) do
    raise ArgumentError, "unknown option provided #{inspect(option)}"
  end

  defp valid_queue?({_name, opts}) do
    is_pos_integer(opts) or Keyword.keyword?(opts)
  end

  defp valid_plugin?({plugin, opts}) do
    is_atom(plugin) and
      Code.ensure_loaded?(plugin) and
      function_exported?(plugin, :init, 1) and
      Keyword.keyword?(opts)
  end

  defp valid_plugin?(plugin), do: valid_plugin?({plugin, []})

  defp parse_queues(queues) do
    for {name, value} <- queues do
      opts = if is_integer(value), do: [limit: value], else: value

      {name, opts}
    end
  end

  # Manually specified plugins will be overwritten by auto-specified plugins unless we reverse the
  # plugin list. The order doesn't matter as they are supervised one-for-one.
  defp normalize_plugins(plugins) do
    plugins
    |> Enum.reverse()
    |> Enum.uniq_by(fn
      {module, _opts} -> module
      module -> module
    end)
  end
end