Skip to main content

lib/baton/reschedule.ex

defmodule Baton.Reschedule do
  @moduledoc """
  Completion-triggered rescheduling of downstream workflow jobs.

  When a step completes, rather than waiting for snoozing downstream jobs to
  wake on their timer, this promotes any downstream job whose dependencies are
  now all satisfied so Oban picks it up on its next poll (typically within a
  second).

  Dependency information now comes from `workflow_nodes` (native text-array
  membership), so the old jsonb-containment query is gone.

  ## Multi-dependency correctness

  A downstream job with `deps: [:a, :b]` is only promoted once *both* are
  complete. Promoting on the first completion would just make it wake, fail its
  dep check, and snooze again.

  ## Note on Oban's state machine

  The final promotion does `update_all(state: "available")` on `oban_jobs`.
  Oban exposes no public "promote scheduled -> available" call, so this reaches
  into Oban's state column directly. It is the one place the engine depends on
  Oban's internal state names; if that ever breaks on an Oban upgrade, removing
  this module degrades gracefully to the snooze-timer fallback with no loss of
  correctness — only added latency.
  """

  import Ecto.Query
  require Logger

  alias Baton.{Config, Nodes}

  @doc """
  Promote downstream jobs whose deps are now all complete.

  Returns `{:ok, count}` with the number of jobs promoted.
  """
  @spec reschedule_downstream(String.t(), String.t()) :: {:ok, non_neg_integer()}
  def reschedule_downstream(workflow_id, completed_step_name) do
    case Nodes.waiting_dependents(workflow_id, completed_step_name) do
      [] ->
        {:ok, 0}

      candidates ->
        completed =
          workflow_id
          |> Nodes.completed_step_names()
          |> MapSet.new()
          # Include the just-completed step: its job may not be written as
          # `completed` yet at this instant (the timing window).
          |> MapSet.put(completed_step_name)

        unblocked_ids =
          candidates
          |> Enum.filter(fn %{deps: deps} ->
            deps
            |> List.delete(completed_step_name)
            |> Enum.all?(&MapSet.member?(completed, &1))
          end)
          |> Enum.map(& &1.id)

        promote(unblocked_ids, completed_step_name)
    end
  end

  # ── Private ───────────────────────────────────────────────────────────────

  defp promote([], _completed_step), do: {:ok, 0}

  defp promote(ids, completed_step) do
    now = DateTime.utc_now()

    {count, _} =
      from(o in Oban.Job, where: o.id in ^ids and o.state == "scheduled")
      |> Config.repo().update_all(set: [state: "available", scheduled_at: now])

    if count > 0 do
      Logger.debug(
        "[Baton.Reschedule] Promoted #{count} job(s) after '#{completed_step}' completed"
      )
    end

    {:ok, count}
  end
end