defmodule Quantum do
use TelemetryRegistry
telemetry_event(%{
event: [:quantum, :job, :add],
description: "dispatched when a job is added",
measurements: "%{}",
metadata: "%{job: Quantum.Job.t(), scheduler: atom()}"
})
telemetry_event(%{
event: [:quantum, :job, :update],
description: "dispatched when a job is updated",
measurements: "%{}",
metadata: "%{job: Quantum.Job.t(), scheduler: atom()}"
})
telemetry_event(%{
event: [:quantum, :job, :delete],
description: "dispatched when a job is deleted",
measurements: "%{}",
metadata: "%{job: Quantum.Job.t(), scheduler: atom()}"
})
telemetry_event(%{
event: [:quantum, :job, :start],
description: "dispatched on job execution start",
measurements: "%{system_time: integer()}",
metadata:
"%{telemetry_span_context: term(), job: Quantum.Job.t(), node: Node.t(), scheduler: atom()}"
})
telemetry_event(%{
event: [:quantum, :job, :stop],
description: "dispatched on job execution end",
measurements: "%{duration: integer()}",
metadata:
"%{telemetry_span_context: term(), job: Quantum.Job.t(), node: Node.t(), scheduler: atom(), result: term()}"
})
telemetry_event(%{
event: [:quantum, :job, :exception],
description: "dispatched on job execution fail",
measurements: "%{duration: integer()}",
metadata:
"%{telemetry_span_context: term(), job: Quantum.Job.t(), node: Node.t(), scheduler: atom(), kind: :throw | :error | :exit, reason: term(), stacktrace: list()}"
})
@moduledoc """
Defines a quantum Scheduler.
When used, the quantum scheduler expects the `:otp_app` as option.
The `:otp_app` should point to an OTP application that has
the quantum runner configuration. For example, the quantum scheduler:
defmodule MyApp.Scheduler do
use Quantum, otp_app: :my_app
end
Could be configured with:
config :my_app, MyApp.Scheduler,
jobs: [
{"@daily", {Backup, :backup, []}},
]
## Configuration:
* `:clock_broadcaster_name` - GenServer name of clock broadcaster \\
*(unstable, may break without major release until declared stable)*
* `:execution_broadcaster_name` - GenServer name of execution broadcaster \\
*(unstable, may break without major release until declared stable)*
* `:executor_supervisor_name` - GenServer name of execution supervisor \\
*(unstable, may break without major release until declared stable)*
* `:debug_logging` - Turn on debug logging
* `:jobs` - list of cron jobs to execute
* `:job_broadcaster_name` - GenServer name of job broadcaster \\
*(unstable, may break without major release until declared stable)*
* `:name` - GenServer name of scheduler \\
*(unstable, may break without major release until declared stable)*
* `:node_selector_broadcaster_name` - GenServer name of node selector broadcaster \\
*(unstable, may break without major release until declared stable)*
* `:overlap` - Default overlap of new Job
* `:otp_app` - Application where scheduler runs
* `:run_strategy` - Default Run Strategy of new Job
* `:schedule` - Default schedule of new Job
* `:storage` - Storage to use for persistence
* `:storage_name` - GenServer name of storage \\
*(unstable, may break without major release until declared stable)*
* `:supervisor_module` - Module to supervise scheduler \\
Can be overwritten to supervise processes differently (for example for clustering) \\
*(unstable, may break without major release until declared stable)*
* `:task_registry_name` - GenServer name of task registry \\
*(unstable, may break without major release until declared stable)*
* `:task_supervisor_name` - GenServer name of task supervisor \\
*(unstable, may break without major release until declared stable)*
* `:timeout` - Sometimes, you may come across GenServer timeout errors
esp. when you have too many jobs or high load. The default `GenServer.call/3`
timeout is `5_000`.
* `:timezone` - Default timezone of new Job
## Telemetry
#{telemetry_docs()}
### Examples
iex(1)> :telemetry_registry.discover_all(:quantum)
:ok
iex(2)> :telemetry_registry.spannable_events()
[{[:quantum, :job], [:start, :stop, :exception]}]
iex(3)> :telemetry_registry.list_events
[
{[:quantum, :job, :add], Quantum,
%{
description: "dispatched when a job is added",
measurements: "%{}",
metadata: "%{job: Quantum.Job.t(), scheduler: atom()}"
}},
{[:quantum, :job, :delete], Quantum,
%{
description: "dispatched when a job is deleted",
measurements: "%{}",
metadata: "%{job: Quantum.Job.t(), scheduler: atom()}"
}},
{[:quantum, :job, :exception], Quantum,
%{
description: "dispatched on job execution fail",
measurements: "%{duration: integer()}",
metadata: "%{telemetry_span_context: term(), job: Quantum.Job.t(), node: Node.t(), scheduler: atom(), kind: :throw | :error | :exit, reason: term(), stacktrace: list()}"
}},
{[:quantum, :job, :start], Quantum,
%{
description: "dispatched on job execution start",
measurements: "%{system_time: integer()}",
metadata: "%{telemetry_span_context: term(), job: Quantum.Job.t(), node: Node.t(), scheduler: atom()}"
}},
{[:quantum, :job, :stop], Quantum,
%{
description: "dispatched on job execution end",
measurements: "%{duration: integer()}",
metadata: "%{telemetry_span_context: term(), job: Quantum.Job.t(), node: Node.t(), scheduler: atom(), result: term()}"
}},
{[:quantum, :job, :update], Quantum,
%{
description: "dispatched when a job is updated",
measurements: "%{}",
metadata: "%{job: Quantum.Job.t(), scheduler: atom()}"
}}
]
"""
require Logger
alias Quantum.{Job, Normalizer, RunStrategy.Random, Storage.Noop}
@typedoc """
Quantum Scheduler Implementation
"""
@type t :: module
@defaults [
timeout: 5_000,
schedule: nil,
overlap: true,
state: :active,
timezone: :utc,
run_strategy: {Random, :cluster},
debug_logging: true,
storage: Noop
]
# Returns the configuration stored in the `:otp_app` environment.
@doc false
@callback config(Keyword.t()) :: Keyword.t()
@doc """
Starts supervision and return `{:ok, pid}`
or just `:ok` if nothing needs to be done.
Returns `{:error, {:already_started, pid}}` if the scheduler is already
started or `{:error, term}` in case anything else goes wrong.
## Options
See the configuration in the moduledoc for options.
"""
@callback start_link(opts :: Keyword.t()) ::
{:ok, pid}
| {:error, {:already_started, pid}}
| {:error, term}
@doc """
A callback executed when the quantum starts.
It takes the quantum configuration that is stored in the application
environment, and may change it to suit the application business.
It must return the updated list of configuration
"""
@callback init(config :: Keyword.t()) :: Keyword.t()
@doc """
Shuts down the quantum represented by the given pid.
"""
@callback stop(server :: GenServer.server(), timeout) :: :ok
@doc """
Creates a new Job. The job can be added by calling `add_job/1`.
## Supported options
* `name` - see `Quantum.Job.set_name/2`
* `overlap` - see `Quantum.Job.set_overlap/2`
* `run_strategy` - see `Quantum.Job.set_run_strategy/2`
* `schedule` - see `Quantum.Job.set_schedule/2`
* `state` - see `Quantum.Job.set_state/2`
* `task` - see `Quantum.Job.set_task/2`
* `timezone` - see `Quantum.Job.set_timezone/2`
"""
@callback new_job(opts :: Keyword.t()) :: Quantum.Job.t()
@doc """
Adds a new job
"""
@callback add_job(GenStage.stage(), Quantum.Job.t() | {Crontab.CronExpression.t(), Job.task()}) ::
:ok
@doc """
Deactivates a job by name
"""
@callback deactivate_job(GenStage.stage(), atom) :: :ok
@doc """
Activates a job by name
"""
@callback activate_job(GenStage.stage(), atom) :: :ok
@doc """
Runs a job by name once
"""
@callback run_job(GenStage.stage(), atom) :: :ok
@doc """
Resolves a job by name
"""
@callback find_job(GenStage.stage(), atom) :: Quantum.Job.t() | nil
@doc """
Deletes a job by name
"""
@callback delete_job(GenStage.stage(), atom) :: :ok
@doc """
Deletes all jobs
"""
@callback delete_all_jobs(GenStage.stage()) :: :ok
@doc """
Returns the list of currently defined jobs
"""
@callback jobs(GenStage.stage()) :: [Quantum.Job.t()]
@doc false
# Retrieves only scheduler related configuration.
def scheduler_config(opts, scheduler, otp_app) do
@defaults
|> Keyword.merge(Application.get_env(otp_app, scheduler, []))
|> Keyword.merge(opts)
|> Keyword.put_new(:otp_app, otp_app)
|> Keyword.put_new(:scheduler, scheduler)
|> Keyword.put_new(:name, scheduler)
|> update_in([:schedule], &Normalizer.normalize_schedule/1)
|> Keyword.put_new(:task_supervisor_name, Module.concat(scheduler, TaskSupervisor))
|> Keyword.put_new(:storage_name, Module.concat(scheduler, Storage))
|> Keyword.put_new(:task_registry_name, Module.concat(scheduler, TaskRegistry))
|> Keyword.put_new(:clock_broadcaster_name, Module.concat(scheduler, ClockBroadcaster))
|> Keyword.put_new(:job_broadcaster_name, Module.concat(scheduler, JobBroadcaster))
|> Keyword.put_new(
:execution_broadcaster_name,
Module.concat(scheduler, ExecutionBroadcaster)
)
|> Keyword.put_new(
:node_selector_broadcaster_name,
Module.concat(scheduler, NodeSelectorBroadcaster)
)
|> Keyword.put_new(:executor_supervisor_name, Module.concat(scheduler, ExecutorSupervisor))
|> Kernel.then(fn config ->
Keyword.update(config, :jobs, [], fn jobs ->
jobs
|> Enum.map(&Normalizer.normalize(scheduler.__new_job__([], config), &1))
|> remove_jobs_with_duplicate_names(scheduler)
end)
end)
|> Keyword.put_new(:supervisor_module, Quantum.Supervisor)
|> Keyword.put_new(:name, Quantum.Supervisor)
end
defp remove_jobs_with_duplicate_names(job_list, scheduler) do
job_list
|> Enum.reduce(%{}, fn %Job{name: name} = job, acc ->
if Enum.member?(Map.keys(acc), name) do
Logger.warn(
"Job with name '#{name}' of scheduler '#{scheduler}' not started due to duplicate job name"
)
acc
else
Map.put_new(acc, name, job)
end
end)
|> Map.values()
end
defmacro __using__(opts) do
quote bind_quoted: [behaviour: __MODULE__, opts: opts, moduledoc: @moduledoc],
location: :keep do
@otp_app Keyword.fetch!(opts, :otp_app)
@moduledoc moduledoc
|> String.replace(~r/MyApp\.Scheduler/, Enum.join(Module.split(__MODULE__), "."))
|> String.replace(~r/:my_app/, ":" <> Atom.to_string(@otp_app))
@behaviour behaviour
@doc false
@impl behaviour
def config(opts \\ []) do
Quantum.scheduler_config(opts, __MODULE__, @otp_app)
end
defp __job_broadcaster__ do
config() |> Keyword.fetch!(:job_broadcaster_name)
end
defp __timeout__, do: Keyword.fetch!(config(), :timeout)
@impl behaviour
def start_link(opts \\ []) do
opts = config(opts)
Keyword.fetch!(opts, :supervisor_module).start_link(__MODULE__, opts)
end
@impl behaviour
def init(opts) do
opts
end
@impl behaviour
def stop(server \\ __MODULE__, timeout \\ 5000) do
Supervisor.stop(server, :normal, timeout)
end
@impl behaviour
def add_job(server \\ __job_broadcaster__(), job)
def add_job(server, %Job{name: name} = job) do
GenStage.cast(server, {:add, job})
end
def add_job(server, {%Crontab.CronExpression{} = schedule, task})
when is_tuple(task) or is_function(task, 0) do
job =
new_job()
|> Job.set_schedule(schedule)
|> Job.set_task(task)
add_job(server, job)
end
@impl behaviour
def new_job(opts \\ []), do: __new_job__(opts, config())
@doc false
def __new_job__(opts, config) do
config
|> Keyword.take([:overlap, :schedule, :state, :timezone, :run_strategy])
|> Keyword.merge(opts)
|> Keyword.update!(:run_strategy, fn
{module, options} when is_atom(module) -> module.normalize_config!(options)
module when is_atom(module) -> module.normalize_config!(nil)
%_struct{} = run_strategy -> run_strategy
end)
|> Job.new()
end
@impl behaviour
def deactivate_job(server \\ __job_broadcaster__(), name)
when is_atom(name) or is_reference(name) do
GenStage.cast(server, {:change_state, name, :inactive})
end
@impl behaviour
def activate_job(server \\ __job_broadcaster__(), name)
when is_atom(name) or is_reference(name) do
GenStage.cast(server, {:change_state, name, :active})
end
@impl behaviour
def run_job(server \\ __job_broadcaster__(), name)
when is_atom(name) or is_reference(name) do
GenStage.cast(server, {:run_job, name})
end
@impl behaviour
def find_job(server \\ __job_broadcaster__(), name)
when is_atom(name) or is_reference(name) do
GenStage.call(server, {:find_job, name}, __timeout__())
end
@impl behaviour
def delete_job(server \\ __job_broadcaster__(), name)
when is_atom(name) or is_reference(name) do
GenStage.cast(server, {:delete, name})
end
@impl behaviour
def delete_all_jobs(server \\ __job_broadcaster__()) do
GenStage.cast(server, :delete_all)
end
@impl behaviour
def jobs(server \\ __job_broadcaster__()) do
GenStage.call(server, :jobs, __timeout__())
end
spec = [
id: opts[:id] || __MODULE__,
start: Macro.escape(opts[:start]) || quote(do: {__MODULE__, :start_link, [opts]}),
restart: opts[:restart] || :permanent,
type: :worker
]
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
%{unquote_splicing(spec)}
end
defoverridable child_spec: 1, config: 0, config: 1, init: 1
end
end
end