Skip to main content

lib/mix/tasks/continuum.audit.ex

defmodule Mix.Tasks.Continuum.Audit do
  @moduledoc """
  Audits loaded Continuum workflows and durable patch markers.

      mix continuum.audit --repo MyApp.Repo
      mix continuum.audit --repo MyApp.Repo --format json
      mix continuum.audit --repo MyApp.Repo --strict
  """
  use Mix.Task

  import Ecto.Query

  alias Continuum.Schema.{ActivityTask, Event, Run}

  @shortdoc "Audits determinism metadata and stale patch markers"
  @non_terminal_states ~w(running suspended stuck_unknown_version)

  @impl true
  def run(args) do
    {opts, _, _} =
      OptionParser.parse(args, switches: [repo: :string, format: :string, strict: :boolean])

    Mix.Task.run("app.start")

    repo = parse_repo(opts)
    report = build_report(repo)

    case Keyword.get(opts, :format, "text") do
      "json" -> Mix.shell().info(Jason.encode!(report))
      _ -> print_text(report)
    end

    if Keyword.get(opts, :strict, false) and strict_failure?(report) do
      System.halt(1)
    end
  end

  defp build_report(repo) do
    workflows =
      Continuum.VersionRegistry.entries()
      |> Enum.sort_by(&{&1.workflow_string, Base.encode16(&1.version_hash)})
      |> Enum.map(&audit_workflow(repo, &1))

    %{
      workflows: workflows,
      stuck_unknown_version_runs: stuck_unknown_version_count(repo),
      expired_leased_activity_tasks: expired_leased_task_count(repo)
    }
  end

  defp audit_workflow(repo, entry) do
    metadata = entry.entrypoint.__continuum_workflow__()
    patch_sites = Map.get(metadata, :patch_sites, [])

    %{
      workflow: entry.workflow_string,
      version_hash: Base.encode16(entry.version_hash, case: :lower),
      patch_sites: Enum.map(patch_sites, &audit_patch_site(repo, entry, &1))
    }
  end

  defp audit_patch_site(repo, entry, site) do
    runs =
      repo.all(
        from(r in Run,
          where:
            r.workflow == ^entry.workflow_string and
              r.version_hash == ^entry.version_hash and
              r.state in ^@non_terminal_states,
          select: r.id
        )
      )

    patched_events = patched_events(repo, runs)

    pre_patch_count =
      Enum.count(runs, fn run_id ->
        not Enum.any?(Map.get(patched_events, run_id, []), &matches_site?(&1, site.command_id))
      end)

    first_seen_at =
      patched_events
      |> Map.values()
      |> List.flatten()
      |> Enum.filter(&matches_site?(&1, site.command_id))
      |> Enum.map(& &1.inserted_at)
      |> Enum.min(DateTime, fn -> nil end)

    %{
      name: inspect(site.name),
      file: site.file,
      line: site.line,
      verdict: if(pre_patch_count == 0, do: "safe-to-remove", else: "still-in-use"),
      in_flight_pre_patch: pre_patch_count,
      first_seen_at: first_seen_at
    }
  end

  defp patched_events(_repo, []), do: %{}

  defp patched_events(repo, run_ids) do
    repo.all(
      from(e in Event,
        where: e.run_id in ^run_ids and e.event_type == "patched",
        order_by: [asc: e.inserted_at]
      )
    )
    |> Enum.map(fn event ->
      payload = :erlang.binary_to_term(event.payload)
      %{run_id: event.run_id, inserted_at: event.inserted_at, command_id: payload.command_id}
    end)
    |> Enum.group_by(& &1.run_id)
  end

  defp matches_site?(%{command_id: command_id}, base) when is_tuple(command_id) do
    tuple_size(command_id) == tuple_size(base) + 1 and
      Tuple.delete_at(command_id, tuple_size(command_id) - 1) == base
  end

  defp matches_site?(_event, _base), do: false

  defp stuck_unknown_version_count(repo) do
    repo.one(from(r in Run, where: r.state == "stuck_unknown_version", select: count(r.id)))
  end

  # Tasks still 'leased' past their lease expiry are stranded between worker
  # death and the steady-state sweep — a persistent non-zero count means the
  # sweep is not running or workers are dying faster than it rescues.
  defp expired_leased_task_count(repo) do
    repo.one(
      from(t in ActivityTask,
        where: t.state == "leased" and t.lease_expires_at < fragment("clock_timestamp()"),
        select: count(t.id)
      )
    )
  end

  defp strict_failure?(%{workflows: workflows, stuck_unknown_version_runs: stuck}) do
    stuck > 0 or
      Enum.any?(workflows, fn workflow ->
        Enum.any?(workflow.patch_sites, &(&1.verdict == "safe-to-remove"))
      end)
  end

  defp print_text(report) do
    Mix.shell().info("Continuum audit")
    Mix.shell().info("stuck_unknown_version_runs: #{report.stuck_unknown_version_runs}")

    Mix.shell().info("expired_leased_activity_tasks: #{report.expired_leased_activity_tasks}")

    Enum.each(report.workflows, fn workflow ->
      Mix.shell().info("#{workflow.workflow} #{workflow.version_hash}")

      Enum.each(workflow.patch_sites, fn site ->
        Mix.shell().info(
          "  patch #{site.name}: #{site.verdict}, pre_patch=#{site.in_flight_pre_patch}, first_seen=#{format_seen(site.first_seen_at)}"
        )
      end)
    end)
  end

  defp format_seen(nil), do: "-"
  defp format_seen(%DateTime{} = datetime), do: DateTime.to_iso8601(datetime)

  defp parse_repo(opts) do
    case opts[:repo] do
      nil ->
        Application.get_env(:continuum, :repo) ||
          Mix.raise("no repo configured. Pass --repo MyApp.Repo or set :continuum, :repo")

      repo ->
        Module.concat([repo])
    end
  end
end