defmodule Drizzle do
@moduledoc """
A server for second-granularity execution of jobs from a cron tab
"""
use GenServer
require Logger
alias Drizzle.{Config, Parser, Time}
@execute_fun_every 30
defmodule Record do
@moduledoc """
Represents a job record to be executed by Drizzle.
"""
@typedoc """
## Fields
- `:crontab` – The cron expression defining the job's schedule.
- `:time_zone` – The time zone for the cron expression (e.g., `"UTC"` or `"Europe/Berlin"`).
- `:module` – The module containing the function to execute.
- `:function` – The function to call in the module (as an atom).
- `:args` – A list of arguments to pass to the function.
"""
@type t :: %__MODULE__{
crontab: String.t(),
time_zone: String.t() | :utc,
module: module(),
function: atom(),
args: list(any())
}
defstruct crontab: nil, time_zone: nil, module: nil, function: nil, args: nil
end
@typedoc """
The main struct for the Drizzle GenServer, managing job execution state.
## Fields
- `:records` – A list of `Drizzle.Record` structs, each defining a job to be executed according to its cron schedule.
- `:last_evaluation` – The timestamp (in seconds) of the last evaluation cycle. Used to catch up on time spent offline.
- `:evaluation_time_fun` – A function called after each execution and every 30s, used to perst the evaluation timestamp. Accepts the current timestamp (in seconds) as an argument.
- `:wait_for_update` - Does not evaluate or catch up before `update/1` is called. Useful for dynamic configuration.
"""
@type t :: %__MODULE__{
records: list(Record.t()),
last_evaluation: integer() | nil | (() -> integer() | nil),
evaluation_time_fun: (integer() -> any()),
wait_for_update: nil | true | false
}
defstruct records: [], last_evaluation: nil, evaluation_time_fun: nil, wait_for_update: nil
# Server functions
@spec start_link([]) :: {:ok, pid()}
def start_link([]) do
Config.get() |> start_link()
end
# Initialization
@spec start_link(__MODULE__.t()) :: {:ok, pid()}
def start_link(%{
records: records,
last_evaluation: last_evaluation,
evaluation_time_fun: evaluation_time_fun,
wait_for_update: wait_for_update}) when is_list(records) do
GenServer.start_link(__MODULE__, [records, last_evaluation_to_time(last_evaluation), evaluation_time_fun, wait_for_update], [name: __MODULE__])
end
def start_link(_), do: {:error, :invalid_config}
def init([records, last_evaluation, evaluation_time_fun, wait_for_update]) do
# we are setting the last time to one second in the past
# so we start with the current second
initial_state = %Drizzle{
records: Parser.parse_records!(records),
last_evaluation: last_evaluation_to_time(last_evaluation),
evaluation_time_fun: evaluation_time_fun || fn(_) -> :noop end,
wait_for_update: (wait_for_update == true)
}
unless initial_state.wait_for_update, do: schedule_evaluation(0)
{:ok, initial_state}
end
# API
@spec update([Record.t()]) :: :ok
@doc "Update the crontab during runtime."
def update(records) when is_list(records) do
GenServer.cast(__MODULE__, {:update_records, records})
end
# Callbacks
def handle_cast({:update_records, records}, state) do
case Parser.parse_records(records) do
{:ok, records} ->
if state.wait_for_update, do: schedule_evaluation(0)
next_state = %{state | records: records, wait_for_update: false}
{:noreply, next_state}
{:error, _} ->
{:noreply, state}
end
end
def handle_info(:evaluate, state = %Drizzle{last_evaluation: last_evaluation}) do
{now, microseconds} = Time.now()
schedule_evaluation((1_000_000 - microseconds) / 1_000)
case {last_evaluation, now} do
{same, same} ->
# we are still in the same second, so nothing to do
{:noreply, state}
{last, now} when last > now ->
# for some reason the last evaluation is in the future
# we reset it to now
Logger.error("last evaluation is #{last - now}s in the future - resetting.")
{:noreply, %Drizzle{state | last_evaluation: now}}
{last, now} ->
evaluate(last, now, state)
{:noreply, %Drizzle{state | last_evaluation: now}}
end
end
# Internal
defp evaluate(last, now, %Drizzle{records: records, evaluation_time_fun: evaluation_time_fun}) do
# we start with the first second after the one we already evaluated
executed = last+1..now |> execute_for_interval(records)
# maybe we want to persist the time stamp
if (Enum.any?(executed) or (rem(now, @execute_fun_every) == 0)) do
spawn(fn() -> evaluation_time_fun.(now) end)
end
end
defp schedule_evaluation(delay) do
Process.send_after(self(), :evaluate, trunc(delay))
end
defp execute_for_interval(times, records) do
for time <- times, record = %Record{crontab: crontab, time_zone: time_zone} <- records do
if Cron.match?(crontab, Time.from_seconds(time, time_zone)), do: execute(record)
end
end
defp execute(%Record{module: module, function: function, args: args}) do
spawn(fn() -> apply(module, function, args) end)
end
defp last_evaluation_to_time(time) when is_integer(time) do
time
end
defp last_evaluation_to_time(time_fun) when is_function(time_fun, 0) do
time_fun.() |> last_evaluation_to_time()
end
defp last_evaluation_to_time(nil) do
(Time.now() |> elem(0)) - 1
end
end