defmodule Baton.Reschedule do
@moduledoc """
Completion-triggered rescheduling of downstream workflow jobs.
When a step completes, rather than waiting for snoozing downstream jobs to
wake on their timer, this promotes any downstream job whose dependencies are
now all satisfied so Oban picks it up on its next poll (typically within a
second).
Dependency information now comes from `workflow_nodes` (native text-array
membership), so the old jsonb-containment query is gone.
## Multi-dependency correctness
A downstream job with `deps: [:a, :b]` is only promoted once *both* are
complete. Promoting on the first completion would just make it wake, fail its
dep check, and snooze again.
## Note on Oban's state machine
The final promotion does `update_all(state: "available")` on `oban_jobs`.
Oban exposes no public "promote scheduled -> available" call, so this reaches
into Oban's state column directly. It is the one place the engine depends on
Oban's internal state names; if that ever breaks on an Oban upgrade, removing
this module degrades gracefully to the snooze-timer fallback with no loss of
correctness — only added latency.
"""
import Ecto.Query
require Logger
alias Baton.{Config, Nodes}
@doc """
Promote downstream jobs whose deps are now all complete.
Returns `{:ok, count}` with the number of jobs promoted.
"""
@spec reschedule_downstream(String.t(), String.t()) :: {:ok, non_neg_integer()}
def reschedule_downstream(workflow_id, completed_step_name) do
case Nodes.waiting_dependents(workflow_id, completed_step_name) do
[] ->
{:ok, 0}
candidates ->
completed =
workflow_id
|> Nodes.completed_step_names()
|> MapSet.new()
# Include the just-completed step: its job may not be written as
# `completed` yet at this instant (the timing window).
|> MapSet.put(completed_step_name)
unblocked_ids =
candidates
|> Enum.filter(fn %{deps: deps} ->
deps
|> List.delete(completed_step_name)
|> Enum.all?(&MapSet.member?(completed, &1))
end)
|> Enum.map(& &1.id)
promote(unblocked_ids, completed_step_name)
end
end
# ── Private ───────────────────────────────────────────────────────────────
defp promote([], _completed_step), do: {:ok, 0}
defp promote(ids, completed_step) do
now = DateTime.utc_now()
{count, _} =
from(o in Oban.Job, where: o.id in ^ids and o.state == "scheduled")
|> Config.repo().update_all(set: [state: "available", scheduled_at: now])
if count > 0 do
Logger.debug(
"[Baton.Reschedule] Promoted #{count} job(s) after '#{completed_step}' completed"
)
end
{:ok, count}
end
end