Skip to main content

lib/mix/tasks/continuum.gc_versions.ex

defmodule Mix.Tasks.Continuum.GcVersions do
  @moduledoc """
  Lists or deletes unreferenced workflow-version registry rows.

      mix continuum.gc_versions --repo MyApp.Repo
      mix continuum.gc_versions --repo MyApp.Repo --execute

  The task is a dry run by default. A workflow-version row is deletable only
  when no non-terminal run references it and it is not one of the loaded
  versions registered in the current BEAM.
  """
  use Mix.Task

  import Ecto.Query

  alias Continuum.Schema.{Run, WorkflowVersion}

  @shortdoc "Prunes unreferenced continuum_workflow_versions rows"
  @non_terminal_states ~w(running suspended stuck_unknown_version)

  @impl true
  def run(args) do
    {opts, _, _} = OptionParser.parse(args, switches: [repo: :string, execute: :boolean])
    Mix.Task.run("app.start")

    repo = parse_repo(opts)
    dry_run? = not Keyword.get(opts, :execute, false)
    loaded = loaded_hashes_by_workflow()
    pinned = pinned_hashes_by_workflow(repo)

    candidates =
      repo.all(from(v in WorkflowVersion, order_by: [asc: v.workflow, asc: v.registered_at]))
      |> Enum.group_by(& &1.workflow)
      |> Enum.flat_map(fn {workflow, versions} ->
        classify_workflow(workflow, versions, loaded, pinned, dry_run?)
      end)

    if dry_run? do
      Mix.shell().info("Dry run: #{length(candidates)} workflow_versions rows would be deleted")
    else
      deleted = delete_candidates(repo, candidates)
      Mix.shell().info("Deleted #{deleted} workflow_versions rows")
    end
  end

  defp classify_workflow(workflow, versions, loaded, pinned, dry_run?) do
    loaded_hashes = Map.get(loaded, workflow, MapSet.new())
    pinned_hashes = Map.get(pinned, workflow, MapSet.new())

    if MapSet.size(loaded_hashes) == 0 do
      Mix.shell().info(
        "#{workflow}: skipped #{length(versions)} versions because no loaded version is registered"
      )

      []
    else
      candidates =
        Enum.reject(versions, fn version ->
          MapSet.member?(loaded_hashes, version.version_hash) or
            MapSet.member?(pinned_hashes, version.version_hash)
        end)

      Mix.shell().info(
        "#{workflow}: #{length(versions)} versions, #{MapSet.size(loaded_hashes)} loaded, " <>
          "#{MapSet.size(pinned_hashes)} pinned by non-terminal runs, " <>
          "#{length(candidates)} candidates"
      )

      Enum.each(candidates, fn version ->
        prefix = if dry_run?, do: "Would delete", else: "Delete"

        Mix.shell().info(
          "#{prefix} #{version.workflow} #{format_hash(version.version_hash)} -> #{version.entrypoint}"
        )
      end)

      candidates
    end
  end

  defp loaded_hashes_by_workflow do
    Continuum.VersionRegistry.entries()
    |> Enum.group_by(& &1.workflow_string, & &1.version_hash)
    |> Map.new(fn {workflow, hashes} -> {workflow, MapSet.new(hashes)} end)
  end

  defp pinned_hashes_by_workflow(repo) do
    rows =
      repo.all(
        from(r in Run,
          where: r.state in ^@non_terminal_states,
          select: {r.workflow, r.version_hash},
          distinct: true
        )
      )

    rows
    |> Enum.group_by(fn {workflow, _hash} -> workflow end, fn {_workflow, hash} -> hash end)
    |> Map.new(fn {workflow, hashes} -> {workflow, MapSet.new(hashes)} end)
  end

  defp delete_candidates(repo, candidates) do
    Enum.reduce(candidates, 0, fn candidate, count ->
      {deleted, _} =
        repo.delete_all(
          from(v in WorkflowVersion,
            where:
              v.workflow == ^candidate.workflow and
                v.version_hash == ^candidate.version_hash
          )
        )

      count + deleted
    end)
  end

  defp parse_repo(opts) do
    case opts[:repo] do
      nil ->
        Application.get_env(:continuum, :repo) ||
          Mix.raise("no repo configured. Pass --repo MyApp.Repo or set :continuum, :repo")

      repo ->
        Module.concat([repo])
    end
  end

  defp format_hash(hash) when is_binary(hash) do
    if String.printable?(hash) and byte_size(hash) <= 80 do
      hash
    else
      Base.encode16(hash, case: :lower)
    end
  end

  defp format_hash(hash), do: inspect(hash)
end