lib/pgflow/migration.ex

defmodule PgFlow.Migration do
  @moduledoc """
  Installs the pgflow core schema (tables, triggers, functions) into a
  consumer app's database via EctoEvolver.

  ## Usage (consumer side)

  A consumer uses `mix pgflow.setup` to generate a wrapper migration:

      defmodule MyApp.Repo.Migrations.SetupPgflow do
        use Ecto.Migration

        def up do
          PgFlow.Migration.up()
          PgFlow.HelpersMigration.up()
        end

        def down do
          PgFlow.HelpersMigration.down()
          PgFlow.Migration.down()
        end
      end

  ## Prerequisites

  This migration assumes the following are already in place in the target DB:

    * `pgmq` schema and its functions (install via `mix pgflow.gen.pgmq_migration`)
    * `pg_cron` extension registered (`CREATE EXTENSION pg_cron`)
    * Standard Postgres extensions: `citext`, `pg_trgm`, `pgcrypto`

  Ordering: the prerequisites above must be applied first (separate
  migrations). The typical project-level order is
  `postgres_extensions → pgmq → pgflow.setup`, where `setup` bundles a
  wrapper migration that runs `PgFlow.Migration.up/0` then
  `PgFlow.HelpersMigration.up/0` in one transaction.

  ## Schema prefix

  The SQL bundle hardcodes `"pgflow"` as the schema name (matching upstream
  conventions). `default_prefix: "pgflow"` is fixed; runtime prefix override
  via `up(prefix: ...)` is not supported by this version because it would
  require rewriting every qualified reference in the vendored SQL. If the
  library later needs multi-tenant schema prefixes, regenerate the bundle
  with `$SCHEMA$` placeholders.

  ## Versioning

  New upstream pgflow releases become `V02`, `V03`, ... — each a delta from
  the previous version. V01 is never rewritten; existing deployments apply
  only new versions on upgrade.

  ## Legacy install detection

  `up/1` refuses to run V01 DDL against a schema that already contains
  pgflow tables but is missing EctoEvolver's tracking comment — a restored
  dump that lost comments, a manual `psql` seed, or any schema not installed
  by `PgFlow.Migration.up/0` itself. The raised error points the operator at
  `mix pgflow.stamp`, which adopts the existing schema into the tracking
  model without re-running the bundle.
  """

  @default_prefix "pgflow"

  # Core tables installed by V01. If any exist in a prefix with no tracking
  # comment, the schema was installed by something other than
  # `PgFlow.Migration.up/1`.
  @sentinel_tables ~w(flows runs steps step_states step_tasks workers)

  import Ecto.Migration, only: [repo: 0]

  # EctoEvolver's __using__ generates up/1, down/1, current_version/0, and
  # migrated_version/1. Isolating those in a nested module lets the public
  # up/1 run a preflight check before delegating.
  defmodule Evolver do
    @moduledoc false

    # EctoEvolver's `use` macro generates clauses for :table, :view, and
    # :materialized_view tracking objects. Only :view is exercised here.
    @dialyzer :no_match

    use EctoEvolver,
      otp_app: :pgflow,
      default_prefix: "pgflow",
      versions: [PgFlow.Migrations.Core.V01],
      tracking_object: {:view, "pgflow_version"}
  end

  @doc """
  Installs or upgrades the pgflow schema to the latest version.

  Options:

    * `:prefix` — schema prefix (default `"pgflow"`).
    * `:version` — target version (default: latest).

  Raises if the target schema already contains pgflow tables without an
  EctoEvolver tracking comment — see "Legacy install detection" above.
  """
  @spec up(keyword()) :: :ok
  def up(opts \\ []) do
    prefix = Keyword.get(opts, :prefix, @default_prefix)
    :ok = preflight!(prefix)
    Evolver.up(opts)
  end

  defdelegate down(opts \\ []), to: Evolver
  defdelegate current_version(), to: Evolver
  defdelegate migrated_version(opts \\ []), to: Evolver

  defp preflight!(prefix) do
    cond do
      Evolver.migrated_version(prefix: prefix) > 0 -> :ok
      legacy_install?(prefix) -> raise legacy_install_message(prefix)
      true -> :ok
    end
  end

  # Queries pg_class directly (same source EctoEvolver's adapter uses) so the
  # preflight and tracking-version checks see identical catalog state.
  # Fails closed: an unexpected DB error raises rather than silently
  # returning `false`, which would let V01 run against an uninspectable
  # schema and produce the CREATE TABLE conflicts the preflight exists to
  # prevent.
  defp legacy_install?(prefix) do
    query = """
    SELECT COUNT(*)::int
    FROM pg_class c
    JOIN pg_namespace n ON n.oid = c.relnamespace
    WHERE n.nspname = $1 AND c.relname = ANY($2::text[]) AND c.relkind = 'r'
    """

    case repo().query(query, [prefix, @sentinel_tables]) do
      {:ok, %{rows: [[count]]}} ->
        count > 0

      {:error, reason} ->
        raise preflight_query_error(prefix, reason)

      other ->
        raise "pgflow preflight: unexpected query result shape: #{inspect(other)}"
    end
  end

  defp preflight_query_error(prefix, reason) do
    detail = if is_exception(reason), do: Exception.message(reason), else: inspect(reason)

    """
    pgflow: preflight query failed in schema "#{prefix}"

    #{detail}

    Refusing to proceed — running V01 against a schema pgflow can't inspect
    risks CREATE TABLE conflicts against existing objects. Resolve the DB
    issue and retry, or inspect the target schema manually.
    """
  end

  defp legacy_install_message(prefix) do
    """
    pgflow: legacy install detected in schema "#{prefix}"

    Tables from an older pgflow install are present, but EctoEvolver's
    tracking comment on "#{prefix}".pgflow_version is missing. Running the
    V01 bundle now would conflict with the existing schema.

    If the installed schema matches PgFlow.Migration V01, adopt it with:

        mix pgflow.stamp --prefix #{prefix}

    then re-run `mix ecto.migrate`. If the installed schema has drifted
    from V01, do NOT stamp — reconcile the drift manually or rebuild from
    a clean schema.
    """
  end
end