lib/schema_migration/migrator.ex

# SPDX-License-Identifier: Apache-2.0
defmodule EctoSparkles.Migrator do
  require Logger

  @doc """
  Run all migrations for configured repos.

  Options:
    - `continue_on_error: true` - run migrations one by one, logging errors but continuing if one fails.
  """
  def migrate(opts \\ []) do
    for repo <- repos(), do: migrate_repo(repo, opts)
  end

  @doc """
  Run all migrations for the given repo.

  Options: see `migrate/1`
  """
  def migrate_repo(repo, opts \\ []) do
    Logger.info("Migrate #{inspect(repo)}")

    if Keyword.get(opts, :continue_on_error, false) do
      run_migrations_one_by_one(repo)
    else
      {:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :up, all: true))
    end
  end

  defp run_migrations_one_by_one(repo) do
    paths = repo_migrations_path(repo)
    {:ok, migrations, _} =
      Ecto.Migrator.with_repo(repo, &Ecto.Migrator.migrations(&1, paths), mode: :temporary)

    pending =
      Enum.map(migrations, fn
        {status, version, desc, module} -> {status, version, desc, module}
        {status, version, desc} -> {status, version, desc, nil}
      end)
      |> Enum.filter(fn {status, _version, _desc, _module} -> status == :down end)
      |> Enum.map(fn {_status, version, desc, module} -> {version, desc, module} end)

    Enum.map(pending, fn {version, desc, module} ->
      Logger.info("Running migration #{version} (#{desc}) for #{inspect(repo)}")
      try do
        mod =
          case module do
            nil ->
              case migration_module_from_file_or_loaded(paths, version, desc) do
                {:ok, mod} -> mod
                {:error, reason} ->
                  IO.warn("Skipping migration #{version} (#{desc}): #{reason}")
                  throw({:skip, version, desc, reason})
              end
            mod -> mod
          end

        case Ecto.Migrator.up(repo, version, mod, []) do
          :ok ->
            {:ok, version, desc}
          :already_up ->
            Logger.info("Migration #{version} (#{desc}) was already up for #{inspect(repo)}")
            {:ok, version, desc}
          other ->
            IO.warn("Migration #{version} (#{desc}) for #{inspect(repo)} returned unexpected result: #{inspect(other)}")
            {:error, version, desc, other}
        end
      catch
        {:skip, version, desc, reason} ->
          {:skipped, version, desc, reason}
      rescue
        e ->
          IO.warn("Migration #{version} (#{desc}) failed for #{inspect(repo)}: #{Exception.message(e)}")
          {:error, version, desc, e}
      end
    end)
  end

  defp migration_module_from_file_or_loaded(paths, version, desc) do
    # Try to find a loaded module matching the migration version and description
    mod_name =
      desc
      |> String.replace(~r/[^a-zA-Z0-9]/, "_")
      |> Macro.camelize()

    candidates =
      for app <- :code.all_loaded() |> Enum.map(fn {m, _} -> m end),
          mod_str = Atom.to_string(app),
          String.contains?(mod_str, Integer.to_string(version)),
          String.contains?(mod_str, mod_name),
          do: app

    case candidates do
      [mod | _] -> {:ok, mod}
      [] ->
        # fallback: load module from migration file
        path =
          List.wrap(paths)
          |> Enum.flat_map(&Path.wildcard(Path.join(&1, "migrations/#{version}_*.exs")))
          |> List.first()

        if path do
          [{mod, _}] = Code.load_file(path)
          {:ok, mod}
        else
          {:error, "Could not find a loaded module for version #{version} or a migration file in #{inspect paths}"}
        end
    end
  end

  def rollback(repo \\ nil, step \\ 1)
  def rollback(repo, step) when not is_nil(repo) do
    Logger.info("Rollback #{inspect(repo)} by #{inspect(step)} step")

    {:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :down, step: step))
  end
  def rollback(nil, step) do
    for repo <- repos(), do: rollback(repo, step)
  end

  def rollback_to(repo, version) do
    Logger.info("Rollback #{inspect(repo)} to version #{inspect(version)}")

    {:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :down, to: version))
  end

  def rollback_all(repo) do
    Logger.info("Rollback #{inspect(repo)}")

    {:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :down, all: true))
  end

  def create(repo, attempt \\ 0) do
    try do
      case repo.__adapter__().storage_up(repo.config()) do
        :ok ->
          Logger.info("The database for #{inspect(repo)} has been created")
          :ok

        {:error, :already_up} ->
          :ok

        e ->
          Logger.warning("The database for #{inspect(repo)} could not be created: #{inspect(e)}")

          if attempt < 10 do
            # wait for Postgres to be up
            Process.sleep(1000)
            create(repo, attempt + 1)
          else
            Logger.warning("After 10 attempts, the database for #{inspect(repo)} still could not be created: #{inspect(e)}")
          end
      end
    rescue
      e ->
        Logger.error("The database for #{inspect(repo)} failed to be created: #{inspect(e)}")
    end
  end

  def rollback_to(version) do
    for repo <- repos(), do: rollback_to(repo, version)
  end

  def rollback_all() do
    for repo <- repos(), do: rollback_all(repo)
  end

  def create() do
    for repo <- repos(), do: create(repo)
  end

  def repos do
    app = Application.fetch_env!(:ecto_sparkles, :otp_app)
    Application.load(app)
    Application.fetch_env!(app, :ecto_repos)
  end

  @doc """
  Print the migration status for configured Repos' migrations.
  """
  def status do
    for repo <- repos(), do: status(repo)
  end

  def status(repo) do
    paths =
      repo_migrations_path(repo)
      |> IO.inspect(label: "Migration path")

    {:ok, repo_status, _} =
      Ecto.Migrator.with_repo(repo, &Ecto.Migrator.migrations(&1, paths), mode: :temporary)

    IO.puts(
      """
      Repo: #{inspect(repo)}
        Status    Migration ID    Migration Name
      --------------------------------------------------
      """ <>
        Enum.map_join(repo_status, "\n", fn {status, number, description} ->
          "  #{pad(status, 10)}#{pad(number, 16)}#{description}"
        end) <> "\n"
    )
  end

  defp repo_migrations_path(repo) do
    config = repo.config()

    priv =
      config[:priv] ||
        "priv/#{repo |> Module.split() |> List.last() |> Macro.underscore()}"


    (Keyword.get(config, :project_path) || Application.app_dir(Keyword.fetch!(config, :otp_app)))
    |> Path.join(priv)
  end

  defp pad(content, pad) do
    content
    |> to_string
    |> String.pad_trailing(pad)
  end
end