defmodule CandleClock do
require Logger
import Ecto.Query
@type argument :: any
@type arguments :: [argument]
@type mf_args :: {module, atom, arguments}
@type interval :: non_neg_integer
@moduledoc """
CandleClock manages timers that persist even across restarts.
## Core concepts
### Timer
A **timer** is a function call that will happen at some point in the future.
It always contains an [MFA](#module-mfa) and
either a duration, an interval or a crontab expression.
### MFA
An **MFA** is a tuple with 3 elements: the module, the function and the
arguments to pass to the function. The 3 elements of the tuple are the same as
the 3 arguments you'd pass to `Kernel.apply/3`.
### Crontab
A **crontab** expression can be used when you want the timer to trigger at
regular dates or times, such as
* Every monday at 12 AM: `0 12 * * 1`
* Every 5 minutes: `*/5 * * * *`
* Every hour: `0 * * * *`
* Every day at 5 PM: `0 17 * * *`
Timers with crontabs can be given an additional time zone so `0 17 * * *` for
`Europe/Berlin` is 16:00 UTC during winter time but 15:00 UTC during summer
time.
### Startup behaviour
At startup, all timers that have an `expired_at` date in the past will be
executed.
When planning the next execution time of a single timer, it will take the last
execution time as the basis. However, that might lead to a timer being called
multiple times directly after a long downtime.
For that reason, if `:skip_if_offline` is true, interval and cron timers will
instead be planned to the next time it would've happened as if the earlier
intervals would've been triggered as well, but they won't be called more than
once at startup.
`:skip_if_offline` is enabled by default.
### Duration and intervals
All durations and intervals are integers in the unit millisecond.
## Common options
These are the common options that can be passed to `call_after/3`,
`call_interval/4` and `call_crontab/4`:
* `:name` (string) A name that makes this timer unique. Unique timers will
be replaced when a new timer with the same name is created. That way, a
defer can be implemented simply by always giving the same name.
* `:if_not_exists` (bool, default: `false`) If set to true, will not create
the timer if one already exists with that name. Requires a `name` to be set.
* `:skip_if_offline` (bool, default: `true`) If set to false, interval and
cron timers behave differently after a long downtime of the system. See
also: [Startup behaviour](#module-startup-behaviour)
* `:max_calls` (int, default: `nil` or `1` for duration timers) Controls the
maximum amount this timer can be called before it is cancelled. Not
supported in duration timers started with `call_after/3`.
"""
@doc false
@spec timer_schema() :: module
def timer_schema do
Application.get_env(:candle_clock, :timer_schema, CandleClock.Timer)
end
@doc false
def repo do
Application.get_env(:candle_clock, :repo, CandleClock.Repo)
end
@doc """
Creates a timer that is executed after the duration in milliseconds.
Returns the timer in an ok-tuple if successful.
"""
@spec call_after(mf_args, interval, keyword) :: {:ok, struct} | {:error, any}
def call_after(mfa, duration, opts \\ []) do
create(mfa, %{
duration: duration,
max_calls: 1
}, opts)
end
@spec call_at(mf_args, DateTime.t(), keyword) :: {:ok, struct} | {:error, any}
def call_at(mfa, date, opts \\ []) do
create(mfa, %{
expires_at: date,
max_calls: 1
}, opts)
end
@doc """
Creates a timer that is executed every `interval` ms.
Additionally, the duration until the first trigger can be passed with the
`duration` argument.
Returns the timer in an ok-tuple if successful.
"""
@spec call_interval(mf_args, interval, interval, keyword) :: {:ok, struct} | {:error, any}
def call_interval(mfa, duration \\ nil, interval, opts \\ []) do
duration = duration || interval
create(mfa, %{
duration: duration,
interval: interval
}, opts)
end
@doc """
Creates a timer that is executed according to the given crontab schema.
Returns the timer in an ok-tuple if successful.
"""
@spec call_crontab(mf_args, String.t, String.t, keyword) :: {:ok, struct} | {:error, any}
def call_crontab(mfa, crontab, timezone \\ "Etc/UTC", opts \\ []) do
with {:ok, crontab} <- Crontab.CronExpression.Parser.parse(crontab) do
create(mfa, %{
crontab: crontab,
crontab_timezone: timezone
}, opts)
end
end
@spec create(mf_args, map, keyword) :: {:ok, struct} | {:error, term}
defp create({m, f, a}, params, opts) do
now = DateTime.utc_now()
defaults = %{
module: m,
function: f,
arguments: a,
inserted_at: now,
updated_at: now
}
params = Enum.reduce([Enum.into(opts, %{}), defaults, params], %{}, &Map.merge/2)
timer = struct(timer_schema(), params)
on_conflict = case Keyword.get(opts, :if_not_exists, false) do
true ->
if not Keyword.has_key?(opts, :name), do: raise ArgumentError, "if_not_exists requires a name to be set"
{:replace, [:updated_at]}
false ->
:replace_all
end
with {:ok, expires_at} <- next_expiry(timer, now),
timer = Map.put(timer, :expires_at, expires_at),
{:ok, timer} <- repo().insert(timer, on_conflict: on_conflict, conflict_target: [:name], returning: true),
refresh_next_timer(timer.expires_at) do
{:ok, timer}
end
end
@doc """
Creates a list of timers at once (using insert_all).
Schemas for the different timer types:
## For all timer types
%{
module: module(),
function: atom(),
arguments: list(),
inserted_at: DateTime.t(),
updated_at: DateTime.t()
}
as well as any custom fields defined in the candle clock timer schema.
## Duration
%{
duration: non_neg_integer(), # milliseconds
max_calls: 1
}
# Interval
%{
duration: non_neg_integer(), # milliseconds
interval: non_neg_integer(), # milliseconds
max_calls: nil | non_neg_integer() # nil for unlimited calls
}
# Once, at a specific date and time
%{
expires_at: DateTime.t(),
max_calls: 1
}
# Cron
%{
crontab: Crontab.CronExpression.t(),
crontab_timezone: binary() # "Europe/Berlin" for example.
}
"""
@type cron_timer_spec :: %{crontab: Crontab.CronExpression.t(), crontab_timezone: binary()}
@type interval_timer_spec :: %{duration: non_neg_integer(), interval: non_neg_integer(), max_calls: nil | non_neg_integer()}
@type duration_timer_spec :: %{duration: non_neg_integer(), max_calls: 1}
@type alarm_spec :: %{expires_at: DateTime.t(), max_calls: 1}
@type timer_spec :: cron_timer_spec | duration_timer_spec | interval_timer_spec | alarm_spec
@spec create_many([timer_spec], keyword) :: [struct()]
def create_many(timers, opts \\ [])
def create_many([], _), do: []
def create_many(timers, opts) do
opts_map =
Enum.into(opts, %{})
|> Map.put_new_lazy(:inserted_at, fn -> DateTime.utc_now() end)
|> Map.put_new_lazy(:updated_at, fn -> DateTime.utc_now() end)
id_type = timer_schema().__schema__(:type, :id)
schema_fields = timer_schema().__schema__(:fields)
timers = Enum.map(timers, fn params ->
# parse crontab
params = if Map.has_key?(params, :crontab) do
Map.update!(params, :crontab, fn
bin when is_binary(bin) -> Crontab.CronExpression.Parser.parse!(bin)
%Crontab.CronExpression{} = expr -> expr
end)
else
params
end
# merge opts
params = Map.merge(opts_map, params)
# calculate expires at
timer = struct(timer_schema(), params)
{:ok, expires_at} = next_expiry(timer, DateTime.utc_now())
timer = case id_type do
uuid when uuid in [:binary, Ecto.UUID] -> Map.put(timer, :id, Ecto.UUID.generate())
_ -> Map.drop(timer, [:id])
end
timer
|> Map.put(:expires_at, expires_at)
|> Map.take(schema_fields)
|> Map.drop([:__struct__, :__meta__])
end)
earliest = Enum.min_by(timers, &(&1.expires_at), &(DateTime.compare(&1, &2) == :lt))
{_, timers} = repo().insert_all(timer_schema(), timers, on_conflict: :replace_all, conflict_target: [:name], returning: true)
refresh_next_timer(earliest.expires_at)
timers
end
@doc """
Cancels a timer by its ID.
Returns `{:ok, 1}` if the ID matched.
"""
@spec cancel_by_id(any) :: {:ok, non_neg_integer}
def cancel_by_id(id) do
cancel_by_query(id_query(id))
end
@doc """
Cancels the timer with the given name.
Returns `{:ok, 1}` if a timer with that name was found.
"""
@spec cancel_by_name(String.t) :: {:ok, non_neg_integer}
def cancel_by_name(name) do
cancel_by_query(name_query(name))
end
@doc """
Cancels all timers that call the given module and function.
Returns `{:ok, amount}` if successful, where amount is the number of timers
that were cancelled.
"""
@spec cancel_all(module, atom) :: {:ok, non_neg_integer}
def cancel_all(module, function) do
cancel_by_query(mf_query(module, function))
end
defp cancel_by_query(query) do
{num, _} = repo().delete_all(query)
Logger.debug("Cancelled #{num} timers")
refresh_next_timer()
{:ok, num}
end
defp refresh_next_timer() do
:rpc.multicall(CandleClock.Worker, :refresh, [])
end
defp refresh_next_timer(expires_at) do
:rpc.multicall(CandleClock.Worker, :set_next_expiry, [expires_at])
end
@doc """
Returns if a timer with the given name exists
"""
@spec name_exists?(String.t) :: boolean
def name_exists?(name) do
repo().exists?(name_query(name))
end
@doc """
Returns if a timer with the given id exists
"""
@spec id_exists?(any) :: boolean
def id_exists?(id) do
repo().exists?(id_query(id))
end
defp name_query(name) do
from t in timer_schema(),
where: t.name == ^name
end
defp mf_query(module, function) do
from t in timer_schema(),
where: t.module == ^module,
where: t.function == ^function
end
defp id_query(id) do
from t in timer_schema(),
where: t.id == ^id
end
@doc """
Calculates the next expiry date for the given timer from the given date
onwards.
Returns `{:ok, datetime}` or an error-tuple
"""
@spec next_expiry(struct, DateTime.t) :: {:ok, DateTime.t} | {:error, any}
def next_expiry(timer, date \\ DateTime.utc_now())
def next_expiry(%{expires_at: expires_at} = timer, date) when not is_nil(expires_at) do
if DateTime.compare(expires_at, date) == :lt do
next_expiry(%{timer | expires_at: nil}, date)
else
{:ok, expires_at}
end
end
def next_expiry(%{skip_if_offline: true} = timer, date) do
case timer do
%{duration: duration, calls: 0} when not is_nil(duration) ->
next = DateTime.add(timer.inserted_at, duration, :millisecond)
{:ok, next}
%{interval: interval, calls: calls} when not is_nil(interval) and calls >= 1 ->
start_at = DateTime.add(timer.inserted_at, timer.duration, :millisecond)
res = get_next_interval_after(start_at, date, interval)
{:ok, res}
%{crontab: crontab} when not is_nil(crontab) ->
with {:ok, date} <- DateTime.shift_zone(date, timer.crontab_timezone),
naive = DateTime.to_naive(date),
{:ok, naive} <- Crontab.Scheduler.get_next_run_date(crontab, naive),
{:ok, with_tz} <- DateTime.from_naive(naive, timer.crontab_timezone) do
with_tz = Map.put(with_tz, :microsecond, {0, 6})
DateTime.shift_zone(with_tz, "Etc/UTC")
else
{:ambiguous, _first_dt, second_dt} -> {:ok, second_dt}
{:gap, _just_before, just_after} -> {:ok, just_after}
other_error -> other_error
end
end
end
def next_expiry(%{skip_if_offline: false} = timer, _date) do
date = timer.expires_at || timer.inserted_at
next_expiry(%{timer | skip_if_offline: true}, date)
end
defp get_next_interval_after(start_at, date, interval) do
next = DateTime.add(start_at, interval, :millisecond)
case DateTime.compare(date, next) do
:gt -> get_next_interval_after(next, date, interval)
:lt -> next
:eq -> next
end
end
end