Skip to main content

lib/squidie/runtime/journal/dispatch_scheduler.ex

defmodule Squidie.Runtime.Journal.DispatchScheduler do
  @moduledoc false

  alias Squidie.Runtime.DispatchAgent
  alias Squidie.Runtime.WorkflowAgent

  @doc """
  Schedules pending workflow dispatches with optimistic-conflict retry.
  """
  @spec schedule_pending_dispatches(term(), term(), term(), DateTime.t(), non_neg_integer()) ::
          {:ok, map()} | {:error, term()}
  def schedule_pending_dispatches(
        storage,
        workflow_agent,
        dispatch_agent,
        %DateTime{} = now,
        retries
      ) do
    do_schedule_pending_dispatches(storage, workflow_agent, dispatch_agent, now, retries)
  end

  defp do_schedule_pending_dispatches(
         _storage,
         workflow_agent,
         dispatch_agent,
         _now,
         _retries_left
       )
       when workflow_agent.state.projection.terminal_status in [:completed, :failed, :cancelled] do
    {:ok, %{agent: dispatch_agent, runnables: []}}
  end

  defp do_schedule_pending_dispatches(_storage, _workflow_agent, _dispatch_agent, _now, 0),
    do: {:error, :conflict}

  defp do_schedule_pending_dispatches(storage, workflow_agent, dispatch_agent, now, retries_left) do
    case WorkflowAgent.schedule_pending_dispatches(storage, workflow_agent, dispatch_agent,
           now: now
         ) do
      {:ok, _schedule_update} = ok ->
        ok

      {:error, :conflict} ->
        with {:ok, workflow_agent} <- WorkflowAgent.rebuild(storage, workflow_agent.state.run_id),
             {:ok, dispatch_agent} <- DispatchAgent.rebuild(storage, dispatch_agent.state.queue) do
          do_schedule_pending_dispatches(
            storage,
            workflow_agent,
            dispatch_agent,
            now,
            retries_left - 1
          )
        end

      {:error, _reason} = error ->
        error
    end
  end
end