Skip to main content

lib/recall/migrator.ex

defmodule Recall.Migrator do
  @moduledoc """
  Runs Ecto migrations against the Recall (Mnesia) adapter.

  This is a thin stand-in for `Ecto.Migrator`. The stock migrator tracks which
  versions have run by querying `schema_migrations` with a *string* source
  (`from(m in "schema_migrations", ...)`), and this adapter refuses string-source
  queries — every Mnesia table must be backed by an `Recall.Schema`. So
  this migrator keeps the exact same execution engine (`Ecto.Migration.Runner`,
  which flushes a migration's `create/alter/drop` commands to the adapter's
  `execute_ddl/3`) but does version bookkeeping through the schema-backed
  `Recall.SchemaMigration` (`Repo.all`/`insert`/`delete_all`).

  Use the mix tasks rather than calling this directly:

      mix recall.migrate
      mix recall.rollback --step 1

  ## Options

    * `:all` — run all pending migrations (the default for `:up`)
    * `:step` — run N migrations in the given direction
    * `:to` — run up/down to (and including) a target version
    * `:migrations_paths` — list of directories holding migration files
      (defaults to `Ecto.Migrator.migrations_path(repo)`)
    * `:log` — the `Logger` level for migration logs (defaults to `:info`)
  """

  import Ecto.Query, only: [from: 2]

  alias Ecto.Migration.Runner
  alias Recall.SchemaMigration

  require Logger

  @doc """
  The versions that have been applied, ascending. Creates the
  `schema_migrations` table on first touch (via the adapter's first-touch
  `Table.ensure/2`).
  """
  def migrated_versions(repo, _opts \\ []) do
    from(m in SchemaMigration, select: m.version) |> repo.all() |> Enum.sort()
  end

  @doc """
  Runs a single migration up. Returns `:ok`, or `:already_up` if its version is
  already recorded.
  """
  def up(repo, version, module, opts \\ []) do
    if version in migrated_versions(repo, opts) do
      :already_up
    else
      apply_migration(repo, version, module, :up, opts)
      :ok
    end
  end

  @doc """
  Runs a single migration down. Returns `:ok`, or `:already_down` if its version
  is not recorded.
  """
  def down(repo, version, module, opts \\ []) do
    if version in migrated_versions(repo, opts) do
      apply_migration(repo, version, module, :down, opts)
      :ok
    else
      :already_down
    end
  end

  @doc """
  Runs all migrations found under the repo's migrations path in `direction`.
  Returns the list of versions that ran.
  """
  def run(repo, direction, opts) when direction in [:up, :down] do
    paths = opts[:migrations_paths] || [Ecto.Migrator.migrations_path(repo)]
    run(repo, paths, direction, opts)
  end

  @doc """
  Runs migrations found under `paths` in `direction`. Returns the versions that
  ran.
  """
  def run(repo, paths, direction, opts) when direction in [:up, :down] do
    versions = migrated_versions(repo, opts)

    pending =
      paths
      |> migrations_in()
      |> pending_in_direction(versions, direction)
      |> within_bounds(direction, opts)

    if pending == [] do
      log(opts, "Migrations already #{direction}")
      []
    end

    for {version, _name, file} <- pending do
      module = load_migration!(file)
      apply_migration(repo, version, module, direction, opts)
      version
    end
  end

  # --- running a single migration ---------------------------------------------

  defp apply_migration(repo, version, module, :up, opts) do
    perform(repo, version, module, :up, opts)
    # Record the version only after the migration's DDL succeeded.
    now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
    repo.insert!(%SchemaMigration{version: version, inserted_at: now})
  end

  defp apply_migration(repo, version, module, :down, opts) do
    perform(repo, version, module, :down, opts)
    repo.delete_all(from(m in SchemaMigration, where: m.version == ^version))
  end

  # Mirrors `Ecto.Migrator`'s forward/backward dispatch: prefer an explicit
  # `up/0`/`down/0`, else fall back to running `change/0` forward (up) or
  # backward (down). The Runner flushes the migration's commands to the adapter's
  # `execute_ddl/3`.
  defp perform(repo, version, module, :up, opts) do
    attempt(repo, version, module, :forward, :up, :up, opts) ||
      attempt(repo, version, module, :forward, :change, :up, opts) ||
      raise Ecto.MigrationError,
        message: "#{inspect(module)} does not implement an `up/0` or `change/0` function"
  end

  defp perform(repo, version, module, :down, opts) do
    attempt(repo, version, module, :forward, :down, :down, opts) ||
      attempt(repo, version, module, :backward, :change, :down, opts) ||
      raise Ecto.MigrationError,
        message: "#{inspect(module)} does not implement a `down/0` or `change/0` function"
  end

  defp attempt(repo, version, module, direction, operation, migrator_direction, opts) do
    if Code.ensure_loaded?(module) and function_exported?(module, operation, 0) do
      config = repo.config()
      Runner.run(repo, config, version, module, direction, operation, migrator_direction, opts)
      :ok
    end
  end

  # --- discovery & bounds ------------------------------------------------------

  defp pending_in_direction(migrations, versions, :up) do
    migrations
    |> Enum.reject(fn {version, _name, _file} -> version in versions end)
    |> Enum.sort_by(&elem(&1, 0))
  end

  defp pending_in_direction(migrations, versions, :down) do
    migrations
    |> Enum.filter(fn {version, _name, _file} -> version in versions end)
    |> Enum.sort_by(&elem(&1, 0), :desc)
  end

  # `:to` wins over `:step` wins over `:all`. `:up` defaults to all pending;
  # `:down` requires an explicit bound (matching `Ecto.Migrator`, which won't
  # roll everything back unless asked).
  defp within_bounds(pending, direction, opts) do
    cond do
      to = opts[:to] -> to_bound(pending, direction, to)
      step = opts[:step] -> Enum.take(pending, step)
      opts[:all] -> pending
      direction == :up -> pending
      true -> []
    end
  end

  defp to_bound(pending, :up, to), do: Enum.filter(pending, fn {v, _, _} -> v <= to end)
  defp to_bound(pending, :down, to), do: Enum.filter(pending, fn {v, _, _} -> v >= to end)

  # All `<version>_<name>.exs` migration files under `paths`, as
  # `{version, name, file}`. A file whose name doesn't start with an integer
  # version is ignored.
  defp migrations_in(paths) do
    paths
    |> Enum.flat_map(fn path -> Path.wildcard(Path.join(path, "*.exs")) end)
    |> Enum.flat_map(fn file ->
      case Integer.parse(Path.basename(file)) do
        {version, "_" <> rest} -> [{version, Path.rootname(rest), file}]
        _ -> []
      end
    end)
    |> ensure_no_duplicates!()
  end

  defp ensure_no_duplicates!(migrations) do
    migrations
    |> Enum.group_by(&elem(&1, 0))
    |> Enum.each(fn {version, group} ->
      if length(group) > 1 do
        raise Ecto.MigrationError, "migration version #{version} is duplicated"
      end
    end)

    migrations
  end

  defp load_migration!(file) do
    # A migration file may be (re)loaded more than once in the same VM — running
    # `:up` then `:down` in one session reloads the module to run `change/0` in
    # reverse. That redefinition is expected, so suppress the "redefining module"
    # warning rather than surfacing it.
    previous = Code.get_compiler_option(:ignore_module_conflict)
    Code.put_compiler_option(:ignore_module_conflict, true)

    loaded =
      try do
        file |> Code.compile_file() |> Enum.map(&elem(&1, 0))
      after
        Code.put_compiler_option(:ignore_module_conflict, previous)
      end

    case Enum.find(loaded, &migration_module?/1) do
      nil ->
        raise Ecto.MigrationError,
              "file #{Path.relative_to_cwd(file)} does not define an Ecto.Migration"

      module ->
        module
    end
  end

  defp migration_module?(module) do
    Code.ensure_loaded?(module) and function_exported?(module, :__migration__, 0)
  end

  defp log(opts, message) do
    Logger.log(Keyword.get(opts, :log, :info), message)
  end
end