Skip to main content

lib/drizzle.ex

defmodule Drizzle do
  @moduledoc """
  A server for second-granularity execution of jobs from a cron tab
  """
  use GenServer

  require Logger

  alias Drizzle.{Config, Parser, Time}

  @execute_fun_every 30

  defmodule Record do
    @moduledoc """
    Represents a job record to be executed by Drizzle.
    """

    @typedoc """
    ## Fields
    - `:crontab` – The cron expression defining the job's schedule.
    - `:time_zone` – The time zone for the cron expression (e.g., `"UTC"` or `"Europe/Berlin"`).
    - `:module` – The module containing the function to execute.
    - `:function` – The function to call in the module (as an atom).
    - `:args` – A list of arguments to pass to the function.
    """

    @type t :: %__MODULE__{
      crontab: String.t(),
      time_zone: String.t() | :utc,
      module: module(),
      function: atom(),
      args: list(any())
    }

    defstruct crontab: nil, time_zone: nil, module: nil, function: nil, args: nil
  end

  @typedoc """
  The main struct for the Drizzle GenServer, managing job execution state.

  ## Fields
  - `:records` – A list of `Drizzle.Record` structs, each defining a job to be executed according to its cron schedule.
  - `:last_evaluation` – The timestamp (in seconds) of the last evaluation cycle. Used to catch up on time spent offline.
  - `:evaluation_time_fun` – A function called after each execution and every 30s, used to perst the evaluation timestamp. Accepts the current timestamp (in seconds) as an argument.
  - `:wait_for_update` - Does not evaluate or catch up before `update/1` is called. Useful for dynamic configuration.
  """

  @type t :: %__MODULE__{
    records: list(Record.t()),
    last_evaluation: integer() | nil | (() -> integer() | nil),
    evaluation_time_fun: (integer() -> any()),
    wait_for_update: nil | true | false
  }
  defstruct records: [], last_evaluation: nil, evaluation_time_fun: nil, wait_for_update: nil

  # Server functions
  @spec start_link([]) :: {:ok, pid()}
  def start_link([]) do
    Config.get() |> start_link()
  end

  # Initialization
  @spec start_link(__MODULE__.t()) :: {:ok, pid()}
  def start_link(%{
    records:             records,
    last_evaluation:     last_evaluation,
    evaluation_time_fun: evaluation_time_fun,
    wait_for_update: wait_for_update}) when is_list(records) do
    GenServer.start_link(__MODULE__, [records, last_evaluation_to_time(last_evaluation), evaluation_time_fun, wait_for_update], [name: __MODULE__])
  end
  def start_link(_), do: {:error, :invalid_config}

  def init([records, last_evaluation, evaluation_time_fun, wait_for_update]) do
    # we are setting the last time to one second in the past
    # so we start with the current second
    initial_state = %Drizzle{
      records:             Parser.parse_records!(records),
      last_evaluation:     last_evaluation_to_time(last_evaluation),
      evaluation_time_fun: evaluation_time_fun || fn(_) -> :noop end,
      wait_for_update: (wait_for_update == true)
    }
    unless initial_state.wait_for_update, do: schedule_evaluation(0)
    {:ok, initial_state}
  end

  # API
  @spec update([Record.t()]) :: :ok
  @doc "Update the crontab during runtime."
  def update(records) when is_list(records) do
    GenServer.cast(__MODULE__, {:update_records, records})
  end

  # Callbacks
  def handle_cast({:update_records, records}, state) do
    case Parser.parse_records(records) do
      {:ok, records} ->
        if state.wait_for_update, do: schedule_evaluation(0)
        next_state = %{state | records: records, wait_for_update: false}
        {:noreply, next_state}
      {:error, _} ->
        {:noreply, state}
      end
  end

  def handle_info(:evaluate, state = %Drizzle{last_evaluation: last_evaluation}) do
    {now, microseconds} = Time.now()
    schedule_evaluation((1_000_000 - microseconds) / 1_000)
    case {last_evaluation, now} do 
      {same, same} ->
        # we are still in the same second, so nothing to do
        {:noreply, state}
      {last, now} when last > now ->
        # for some reason the last evaluation is in the future
        # we reset it to now
        Logger.error("last evaluation is #{last - now}s in the future - resetting.")
        {:noreply, %Drizzle{state | last_evaluation: now}}
      {last, now} ->
        evaluate(last, now, state)
        {:noreply, %Drizzle{state | last_evaluation: now}}
    end
  end

  # Internal
  defp evaluate(last, now, %Drizzle{records: records, evaluation_time_fun: evaluation_time_fun}) do
    # we start with the first second after the one we already evaluated
    executed = last+1..now |> execute_for_interval(records)
    # maybe we want to persist the time stamp
    if (Enum.any?(executed) or (rem(now, @execute_fun_every) == 0)) do
      spawn(fn() -> evaluation_time_fun.(now) end)
    end
  end

  defp schedule_evaluation(delay) do
    Process.send_after(self(), :evaluate, trunc(delay))
  end

  defp execute_for_interval(times, records) do
    for time <- times, record = %Record{crontab: crontab, time_zone: time_zone} <- records do
      if Cron.match?(crontab, Time.from_seconds(time, time_zone)), do: execute(record)
    end
  end

  defp execute(%Record{module: module, function: function, args: args}) do
    spawn(fn() -> apply(module, function, args) end)
  end

  defp last_evaluation_to_time(time) when is_integer(time) do
     time
  end
  defp last_evaluation_to_time(time_fun) when is_function(time_fun, 0) do
    time_fun.() |> last_evaluation_to_time()
  end
  defp last_evaluation_to_time(nil) do
     (Time.now() |> elem(0)) - 1
  end

end