Skip to main content

lib/continuum/runtime/activity_worker.ex

defmodule Continuum.Runtime.ActivityWorker do
  @moduledoc false

  require Logger

  alias Continuum.{Runtime.Engine, Runtime.Journal, Telemetry}

  def execute(task) do
    started_at = System.monotonic_time(:millisecond)

    case renew_task_lease(task) do
      :ok ->
        heartbeat = start_task_lease_heartbeat(task)

        try do
          fenced(task, fn ->
            if task.attempt > max_attempts(task.retry) do
              # Only crash requeues push attempt past the policy: the previous
              # execution died mid-flight and consumed the final attempt. Fail
              # without re-executing rather than re-running a poison task (or a
              # non-retryable side effect) forever.
              fail(task, :attempts_exhausted, started_at)
            else
              emit_started(task)

              case idempotency_hit(task) do
                {:hit, result} ->
                  complete(task, result, started_at, idempotency_hit?: true)

                :miss ->
                  case run_activity(task) do
                    {:ok, result} -> complete(task, result, started_at)
                    {:error, error} -> fail_or_retry(task, error, started_at)
                  end
              end
            end
          end)
        after
          stop_task_lease_heartbeat(heartbeat)
        end

        :ok

      :lost ->
        Logger.warning(
          "Continuum activity task #{task.id} lease no longer held by #{task.lease_owner}; skipping execution"
        )

        :ok
    end
  end

  # The dispatcher claims tasks with a short TTL that only needs to cover the
  # claim-to-execution window. The activity may legally run for its full
  # configured timeout, and every completion/retry write validates task-lease
  # expiry — so the worker heartbeats the task lease on a short horizon while
  # executing. A crashed worker's task therefore expires within one TTL and
  # the steady-state sweep rescues it promptly, instead of waiting out a
  # one-shot timeout + margin extension (minutes for long activities).
  defp renew_task_lease(task) do
    sql = """
    UPDATE continuum_activity_tasks
    SET lease_expires_at = clock_timestamp() + make_interval(secs => $3)
    WHERE id = $1::text::uuid
      AND state = 'leased'
      AND lease_owner = $2
    """

    case task.instance.repo.query(sql, [task.id, task.lease_owner, task_lease_ttl_seconds()]) do
      {:ok, %{num_rows: 1}} ->
        :ok

      {:ok, %{num_rows: 0}} ->
        :lost

      {:error, reason} ->
        raise "Continuum activity task lease renewal failed: #{inspect(reason)}"
    end
  end

  # Public for tests only: the transient-vs-lost renewal classification is
  # exercised with a scripted repo, without a full claim/execute cycle.
  @doc false
  def start_task_lease_heartbeat(task) do
    worker = self()

    # Unlinked on purpose: a renewal hiccup must not kill the executing
    # worker. The monitor stops the heartbeat if the worker dies, so the
    # lease then expires within one TTL and the sweep rescues the task.
    spawn(fn ->
      ref = Process.monitor(worker)
      task_lease_heartbeat_loop(task, ref, task_lease_renew_ms())
    end)
  end

  defp stop_task_lease_heartbeat(pid) do
    send(pid, :stop)
    :ok
  end

  defp task_lease_heartbeat_loop(task, ref, delay_ms) do
    receive do
      :stop -> :ok
      {:DOWN, ^ref, :process, _pid, _reason} -> :ok
    after
      delay_ms ->
        case safe_renew_task_lease(task) do
          :ok -> task_lease_heartbeat_loop(task, ref, task_lease_renew_ms())
          :retry -> task_lease_heartbeat_loop(task, ref, task_lease_retry_ms())
          :lost -> :ok
        end
    end
  end

  defp safe_renew_task_lease(task) do
    renew_task_lease(task)
  rescue
    error ->
      # Only a renewal CAS miss (`:lost`) is authoritative — the lease is gone
      # and renewing again can never succeed. A raised DB error is transient:
      # stopping the heartbeat on one blip would expire a healthy long
      # activity's lease, and the sweep would then consume an attempt for an
      # execution that is still running. Retry on a shorter horizon instead;
      # the TTL leaves several renewal periods of slack to ride it out.
      Logger.warning(
        "Continuum task lease heartbeat for #{task.id} renewal failed (will retry): " <>
          Exception.message(error)
      )

      :retry
  end

  defp task_lease_ttl_seconds do
    Application.get_env(:continuum, :task_lease_ttl_seconds, 30)
  end

  defp task_lease_renew_ms do
    Application.get_env(:continuum, :task_lease_renew_ms, 10_000)
  end

  defp task_lease_retry_ms do
    max(div(task_lease_renew_ms(), 4), 250)
  end

  # Fencing rejections (run lease rotated, task lease expired or taken over,
  # run already terminal) are expected races, not worker bugs. Raising here
  # would strand the task in 'leased' — instead release it so the next claim
  # re-executes under the current authority, or discard it when the run can
  # no longer use the result. The rejection itself stays authoritative: the
  # journal write was rolled back, nothing of this attempt is visible.
  defp fenced(task, fun) do
    fun.()
  rescue
    error in Continuum.Runtime.JournalError ->
      case classify_fenced(error.reason) do
        :requeue -> release_fenced_task(task, error)
        :discard -> discard_fenced_task(task, error)
        :drop -> log_fenced(task, :dropped, error)
        :reraise -> reraise(error, __STACKTRACE__)
      end
  end

  defp classify_fenced(reason) do
    case reason do
      # Task lease still ours, only expired/incomplete: safe to re-execute.
      {:activity_task_lease_expired, _} -> :requeue
      {:activity_task_lease_missing_expiry, _} -> :requeue
      # Run lease rotated under us: the new engine still needs this activity.
      {:lease_mismatch, _} -> :requeue
      # Task no longer ours: another claimer owns it, leave it alone.
      {:activity_task_lease_mismatch, _} -> :drop
      {:activity_task_attempt_mismatch, _} -> :drop
      {:activity_task_not_leased, _} -> :drop
      {:activity_task_not_found, _} -> :drop
      {:activity_task_run_mismatch, _} -> :drop
      {_op, :task_lease_mismatch} -> :drop
      # Run is terminal or gone: the result has nowhere to land.
      {:run_not_found, _} -> :discard
      {:run_not_active, _} -> :discard
      _other -> :reraise
    end
  end

  defp release_fenced_task(task, error) do
    update_own_task(task, "available", "available_at = now(),")
    log_fenced(task, :requeued, error)
  end

  defp discard_fenced_task(task, error) do
    update_own_task(task, "discarded", "")
    log_fenced(task, :discarded, error)
  end

  # CAS on our own claim: if the task was reclaimed, completed, or cancelled
  # in the meantime, this matches zero rows and the row is left untouched.
  defp update_own_task(task, state, extra_set_sql) do
    sql = """
    UPDATE continuum_activity_tasks
    SET state = $3,
        #{extra_set_sql}
        lease_owner = NULL,
        lease_expires_at = NULL
    WHERE id = $1::text::uuid
      AND state = 'leased'
      AND lease_owner = $2
    """

    case task.instance.repo.query(sql, [task.id, task.lease_owner, state]) do
      {:ok, _} -> :ok
      {:error, reason} -> raise "Continuum fenced task release failed: #{inspect(reason)}"
    end
  end

  defp log_fenced(task, action, error) do
    Logger.warning(
      "Continuum activity task #{task.id} (run #{task.run_id}) write was fenced out; " <>
        "#{action}: #{Exception.message(error)}"
    )

    Telemetry.execute([:continuum, :activity, :fenced], %{}, %{
      run_id: task.run_id,
      task_id: task.id,
      attempt: task.attempt,
      executor: executor(task),
      action: action
    })

    :ok
  end

  defp idempotency_hit(task) do
    case idempotency(task) do
      nil ->
        :miss

      [module: module, key: key] ->
        case Journal.Postgres.get_activity_result(task.instance, module, key) do
          {:ok, result} -> {:hit, result}
          :miss -> :miss
        end
    end
  end

  defp run_activity(%{mfa: {mod, fun, args}, timeout_ms: timeout_ms}) do
    parent = self()
    ref = make_ref()

    {pid, monitor_ref} =
      spawn_monitor(fn ->
        result =
          try do
            {:ok, apply(mod, fun, args)}
          rescue
            exception -> {:error, exception}
          catch
            kind, reason -> {:error, {kind, reason}}
          end

        send(parent, {ref, result})
      end)

    receive do
      {^ref, result} ->
        Process.demonitor(monitor_ref, [:flush])
        result

      {:DOWN, ^monitor_ref, :process, ^pid, reason} ->
        {:error, {:exit, reason}}
    after
      timeout_ms ->
        Process.demonitor(monitor_ref, [:flush])
        Process.exit(pid, :kill)
        {:error, :timeout}
    end
  end

  defp complete(task, result, started_at, opts \\ []) do
    idempotency = idempotency(task)
    duration = System.monotonic_time(:millisecond) - started_at
    metadata = base_metadata(task)

    if compensation?(task) do
      :ok =
        Journal.Postgres.complete_compensation_task!(
          task.instance,
          task,
          result,
          task.run_lease_token,
          idempotency_opts(idempotency)
        )

      Engine.wake(task.instance, task.run_id)

      Telemetry.execute(
        [:continuum, :compensation, :completed],
        %{duration_ms: duration},
        metadata
      )
    else
      :ok =
        Journal.Postgres.complete_activity_task!(
          task.instance,
          task,
          result,
          task.run_lease_token,
          idempotency_opts(idempotency)
        )

      Engine.wake(task.instance, task.run_id)

      if Keyword.get(opts, :idempotency_hit?, false) do
        Telemetry.execute([:continuum, :activity, :idempotency_hit], %{}, %{
          run_id: metadata.run_id,
          task_id: metadata.task_id,
          mfa: task.mfa,
          attempt: metadata.attempt,
          executor: metadata.executor,
          idempotency_key: Keyword.fetch!(idempotency, :key)
        })
      end

      Telemetry.execute(
        [:continuum, :activity, :completed],
        %{duration_ms: duration},
        Map.put(metadata, :mfa, task.mfa)
      )
    end
  end

  defp fail_or_retry(task, error, started_at) do
    if task.attempt < max_attempts(task.retry) do
      retry(task, error, started_at)
    else
      fail(task, error, started_at)
    end
  end

  defp retry(task, error, started_at) do
    backoff_ms = backoff_ms(task.retry, task.attempt)

    # The authoritative available_at is computed on the database clock inside
    # retry_activity_task!; this app-clock timestamp is observability metadata.
    retry_at =
      DateTime.utc_now()
      |> DateTime.add(backoff_ms, :millisecond)
      |> DateTime.truncate(:microsecond)

    :ok =
      Journal.Postgres.retry_activity_task!(
        task.instance,
        task,
        error,
        backoff_ms,
        task.run_lease_token
      )

    Telemetry.execute(
      [:continuum, :activity, :retried],
      %{duration_ms: System.monotonic_time(:millisecond) - started_at},
      %{
        run_id: task.run_id,
        task_id: task.id,
        mfa: task.mfa,
        attempt: task.attempt,
        executor: executor(task),
        next_attempt: task.attempt + 1,
        retry_at: retry_at,
        error: error
      }
    )
  end

  defp fail(task, error, started_at) do
    duration = System.monotonic_time(:millisecond) - started_at
    metadata = base_metadata(task)

    if compensation?(task) do
      :ok =
        Journal.Postgres.fail_compensation_task!(task.instance, task, error, task.run_lease_token)

      Engine.wake(task.instance, task.run_id)

      Telemetry.execute(
        [:continuum, :compensation, :failed],
        %{duration_ms: duration},
        Map.put(metadata, :error, error)
      )
    else
      :ok = Journal.Postgres.fail_activity_task!(task.instance, task, error, task.run_lease_token)
      Engine.wake(task.instance, task.run_id)

      Telemetry.execute([:continuum, :activity, :failed], %{duration_ms: duration}, %{
        run_id: metadata.run_id,
        task_id: metadata.task_id,
        mfa: task.mfa,
        attempt: metadata.attempt,
        executor: metadata.executor,
        error: error
      })
    end
  end

  defp compensation?(task), do: Map.get(task, :kind) == :compensation

  defp emit_started(task) do
    metadata = base_metadata(task)

    if compensation?(task) do
      Telemetry.execute([:continuum, :compensation, :started], %{}, metadata)
    else
      Telemetry.execute([:continuum, :activity, :started], %{}, %{
        run_id: metadata.run_id,
        task_id: metadata.task_id,
        mfa: task.mfa,
        attempt: metadata.attempt,
        executor: metadata.executor
      })
    end
  end

  defp base_metadata(task) do
    metadata =
      %{
        run_id: task.run_id,
        task_id: task.id,
        attempt: task.attempt,
        executor: executor(task)
      }
      |> maybe_put(:oban_job_id, Map.get(task, :oban_job_id))

    if compensation?(task) do
      Map.put(metadata, :target_activity_id, task.target_activity_id)
    else
      metadata
    end
  end

  defp executor(task), do: Map.get(task, :executor, :builtin)

  defp maybe_put(map, _key, nil), do: map
  defp maybe_put(map, key, value), do: Map.put(map, key, value)

  defp max_attempts(retry) do
    Keyword.get(retry || [], :max_attempts, 1)
  end

  defp idempotency(%{idempotency_key: nil}), do: nil

  defp idempotency(%{idempotency_key: key, mfa: {module, _fun, _args}}) do
    [module: module, key: key]
  end

  defp idempotency(_task), do: nil

  defp idempotency_opts(nil), do: []
  defp idempotency_opts(idempotency), do: [idempotency: idempotency]

  defp backoff_ms(retry, attempt) do
    retry = retry || []
    base_ms = Keyword.get(retry, :base_ms, 1_000)

    case Keyword.get(retry, :backoff, :constant) do
      :exponential -> trunc(base_ms * :math.pow(2, max(attempt - 1, 0)))
      _ -> base_ms
    end
  end
end