lib/mix/tasks/pgflow.gen.pgmq_migration.ex

defmodule Mix.Tasks.Pgflow.Gen.PgmqMigration do
  @shortdoc "Downloads pgmq SQL from upstream and generates an Ecto migration"

  @moduledoc """
  Downloads the pgmq SQL-only install bundle from the official pgmq/pgmq
  repository at task-runtime, persists it into the consuming app's `priv/`
  tree, and generates an Ecto migration that applies it.

  pgflow requires pgmq 1.8+ for `LISTEN/NOTIFY` dispatch. This task is the
  recommended way to install pgmq for apps using pgflow on hosts where pgmq
  is not available as a native extension (Neon, Supabase, self-hosted
  without build tools).

  ## Usage

      mix pgflow.gen.pgmq_migration
      mix pgflow.gen.pgmq_migration --version 1.11.0
      mix pgflow.gen.pgmq_migration --url https://... --sha256 <hash>
      mix pgflow.gen.pgmq_migration --migrations-path priv/repo/migrations

  ## Options

    * `--version` - pgmq version tag to fetch. Default: `1.11.0`.
      Must be a key in the internal known-versions map, OR paired with
      `--url` and `--sha256` for custom versions.

    * `--url` - Override the upstream URL. Requires `--sha256`.

    * `--sha256` - Expected sha256 of the downloaded bytes. Required when
      `--url` is used; otherwise looked up from the known-versions map.

    * `--migrations-path` - Where to write the generated migration.
      Default: `priv/repo/migrations`.

    * `--skip-sha256` - Skip integrity check. Emits a loud warning. Not
      recommended outside of local experimentation.

  ## Files written

    * `priv/pgflow/pgmq/pgmq-<version>.sql` — the bundled SQL, committed to
      your repo so `mix ecto.migrate` never needs network access.

    * `priv/repo/migrations/<timestamp>_install_pgmq.exs` — the Ecto
      migration wrapper.

  ## Generated migration shape

      defmodule MyApp.Repo.Migrations.InstallPgmq do
        use Ecto.Migration

        @disable_ddl_transaction true
        @disable_migration_lock true

        @sql_relpath "pgflow/pgmq/pgmq-1.11.0.sql"

        def up do
          path = Path.join(:code.priv_dir(:my_app), @sql_relpath)
          execute(File.read!(path))
        end

        def down do
          execute("DROP SCHEMA IF EXISTS pgmq CASCADE")
        end
      end

  ## Known versions

  #{inspect(%{"1.11.0" => "89f9d8a3adc43434afcf814c4afc3ef8957a20418eb120801922435238714e7a"}, pretty: true)}

  ## Why fetch at task-runtime

  Bundling the ~2000-line pgmq SQL in pgflow's `priv/` would bloat the dep
  and couple pgflow releases to pgmq releases. Fetching once at generation
  time and persisting to the consuming repo keeps pgflow slim and makes
  the SQL reviewable in the consumer's git history. `mix ecto.migrate`
  never needs network; it reads the persisted file.
  """

  use Mix.Task

  @default_version "1.11.0"
  @upstream_url_template "https://raw.githubusercontent.com/pgmq/pgmq/v~s/pgmq-extension/sql/pgmq.sql"

  @known_versions %{
    "1.11.0" => "89f9d8a3adc43434afcf814c4afc3ef8957a20418eb120801922435238714e7a"
  }

  @impl Mix.Task
  def run(args) do
    {:ok, _} = Application.ensure_all_started(:inets)
    {:ok, _} = Application.ensure_all_started(:ssl)

    {opts, _, _} =
      OptionParser.parse(args,
        switches: [
          version: :string,
          url: :string,
          sha256: :string,
          migrations_path: :string,
          skip_sha256: :boolean
        ]
      )

    version = Keyword.get(opts, :version, @default_version)
    url_override = Keyword.get(opts, :url)
    sha_override = Keyword.get(opts, :sha256)
    skip_sha? = Keyword.get(opts, :skip_sha256, false)
    migrations_path = Keyword.get(opts, :migrations_path, "priv/repo/migrations")

    {url, expected_sha} =
      resolve_source(version, url_override, sha_override, skip_sha?)

    app =
      Mix.Project.config()[:app] ||
        Mix.raise("Could not determine consuming app name from Mix.Project")

    sql_relpath = "pgflow/pgmq/pgmq-#{version}.sql"
    sql_abspath = Path.join("priv", sql_relpath)

    Mix.shell().info("Fetching pgmq #{version} from #{url}")
    bytes = fetch!(url)
    verify_integrity!(bytes, expected_sha, skip_sha?)

    sql_abspath |> Path.dirname() |> File.mkdir_p!()
    File.write!(sql_abspath, bytes)
    Mix.shell().info("  Wrote #{sql_abspath} (#{byte_size(bytes)} bytes)")

    File.mkdir_p!(migrations_path)
    timestamp = generate_timestamp()
    migration_filename = "#{timestamp}_install_pgmq.exs"
    migration_path = Path.join(migrations_path, migration_filename)
    migration_content = render_migration(app, sql_relpath)
    File.write!(migration_path, migration_content)

    Mix.shell().info("""

    Generated migration: #{migration_path}

    Run:
        mix ecto.migrate

    This will apply pgmq #{version} to your database via the bundled SQL.
    """)
  end

  defp resolve_source(version, nil, nil, false = _skip?) do
    case Map.fetch(@known_versions, version) do
      {:ok, sha} ->
        url = :io_lib.format(@upstream_url_template, [version]) |> IO.iodata_to_binary()
        {url, sha}

      :error ->
        Mix.raise("""
        Unknown pgmq version: #{version}.

        Either pass `--version` matching a known version (#{@known_versions |> Map.keys() |> Enum.join(", ")}),
        or override the source with `--url <url> --sha256 <hex>`.
        """)
    end
  end

  defp resolve_source(version, nil, nil, true = _skip?) do
    url = :io_lib.format(@upstream_url_template, [version]) |> IO.iodata_to_binary()
    {url, nil}
  end

  defp resolve_source(_version, url, sha, _skip?) when is_binary(url) and is_binary(sha) do
    {url, sha}
  end

  defp resolve_source(_version, url, _sha, skip?) when is_binary(url) and not skip? do
    Mix.raise("--url requires --sha256 (or --skip-sha256 to bypass, not recommended)")
  end

  defp resolve_source(_version, url, _sha, true = _skip?) when is_binary(url) do
    {url, nil}
  end

  defp fetch!(url) do
    headers = [{~c"User-Agent", ~c"pgflow-elixir"}]

    case :httpc.request(:get, {String.to_charlist(url), headers}, [], body_format: :binary) do
      {:ok, {{_, 200, _}, _, body}} when is_binary(body) ->
        body

      {:ok, {{_, status, _}, _, body}} ->
        Mix.raise("Failed to fetch #{url}: HTTP #{status}: #{body}")

      {:error, reason} ->
        Mix.raise("Failed to fetch #{url}: #{inspect(reason)}")
    end
  end

  defp verify_integrity!(_bytes, _expected, true = _skip?) do
    Mix.shell().info("  sha256 check SKIPPED — integrity not verified")
  end

  defp verify_integrity!(bytes, expected, false = _skip?) do
    actual = :crypto.hash(:sha256, bytes) |> Base.encode16(case: :lower)

    if actual == expected do
      Mix.shell().info("  sha256 OK (#{String.slice(expected, 0, 12)}...)")
    else
      Mix.raise("""
      SHA256 mismatch!

        expected: #{expected}
        actual:   #{actual}

      The downloaded pgmq SQL does not match the pinned hash. Either the
      upstream file changed or the connection was tampered with. Refusing
      to write the file.
      """)
    end
  end

  defp render_migration(app, sql_relpath) do
    module_name = app |> to_string() |> Macro.camelize()

    """
    defmodule #{module_name}.Repo.Migrations.InstallPgmq do
      @moduledoc \"\"\"
      Installs pgmq via the bundled SQL file.

      Generated by: `mix pgflow.gen.pgmq_migration`
      SQL source:   priv/#{sql_relpath}

      The pgmq SQL contains many top-level statements. Postgrex rejects
      multi-statement prepared queries (`42601 syntax_error`), so we use
      `PgFlow.Sql.Splitter` to split the SQL into single statements and
      run each via `execute/1`. This keeps the migration portable — no
      `psql` on PATH required at migrate time, which matters for slim
      release images and managed CI runners.

      The splitter is dollar-quote aware, so plpgsql function bodies with
      internal semicolons (`BEGIN ... END;` inside `$$ ... $$`) split
      correctly.
      \"\"\"
      use Ecto.Migration

      @disable_ddl_transaction true
      @disable_migration_lock true

      @otp_app #{inspect(app)}
      @sql_relpath #{inspect(sql_relpath)}

      def up do
        sql_path = Path.join(:code.priv_dir(@otp_app), @sql_relpath)

        sql_path
        |> File.read!()
        |> PgFlow.Sql.Splitter.split()
        |> Enum.each(&execute/1)
      end

      def down do
        execute("DROP SCHEMA IF EXISTS pgmq CASCADE")
      end
    end
    """
  end

  defp generate_timestamp do
    {{year, month, day}, {hour, minute, second}} = :calendar.universal_time()

    :io_lib.format("~4..0B~2..0B~2..0B~2..0B~2..0B~2..0B", [
      year,
      month,
      day,
      hour,
      minute,
      second
    ])
    |> IO.iodata_to_binary()
  end
end