lib/ecto/adapters/mnesia/migrator.ex

defmodule Ecto.Adapters.Mnesia.Migrator do
  @moduledoc """
  Lower level API for managing migrations
  """
  require Logger

  alias Ecto.Adapters.Mnesia.Connection
  alias Ecto.Adapters.Mnesia.Migration

  @type table_copy :: :ram | :disc
  @type default_copy_opt :: {:default_copy, table_copy()}
  @type schemas_opt :: [module() | {module(), table_copy()} | {module, Keyword.t()}]
  @type migrations_opts :: [default_copy_opt() | [schemas_opt()]]
  @type options :: [{:sync, boolean()}]

  @doc false
  @spec run(module(), [Migration.t()], options()) :: [module()]
  def run(repo, migrations, options \\ []) do
    repo.checkout(fn ->
      tables = do_run_migrations(repo, migrations)

      if Keyword.get(options, :sync, false) do
        Connection.add_waited_schemas(tables)
      end

      tables
    end)
  end

  @doc false
  def with_repo(repo, fun, opts \\ []) do
    config = repo.config()
    mode = Keyword.get(opts, :mode, :permanent)
    apps = [:ecto3_mnesia | config[:start_apps_before_migration] || []]

    extra_started =
      Enum.flat_map(apps, fn app ->
        {:ok, started} = Application.ensure_all_started(app, mode)
        started
      end)

    {:ok, repo_started} = repo.__adapter__().ensure_all_started(config, mode)
    started = extra_started ++ repo_started
    migration_repo = config[:migration_repo] || repo

    case ensure_repo_started(repo, config) do
      {:ok, repo_after} ->
        case ensure_migration_repo_started(migration_repo, repo) do
          {:ok, migration_repo_after} ->
            try do
              {:ok, fun.(repo), started}
            after
              after_action(repo, repo_after)
              after_action(migration_repo, migration_repo_after)
            end

          {:error, _} = error ->
            after_action(repo, repo_after)
            error
        end

      {:error, _} = error ->
        error
    end
  end

  @doc """
  Compile migrations configuration into migrations usable by `run/2`
  """
  @spec compile(migrations_opts()) :: [Migration.t()]
  def compile(opts) do
    default_opts =
      opts
      |> Keyword.get(:default_copy, :disc)
      |> case do
        :disc -> [disc_copies: [node()], ram_copies: []]
        :ram -> [disc_copies: [], ram_copies: [node()]]
      end

    opts
    |> Keyword.get(:schemas, [])
    |> Enum.map(fn
      {schema, storage} when storage in [:ram] ->
        opts =
          default_opts
          |> Keyword.drop([:ram_copies, :disc_copies])
          |> Keyword.merge(ram_copies: [node()], disc_copies: [])

        {schema, opts}

      {schema, storage} when storage in [:disc] ->
        opts =
          default_opts
          |> Keyword.drop([:ram_copies, :disc_copies])
          |> Keyword.merge(ram_copies: [], disc_copies: [node()])

        {schema, opts}

      {schema, opts} when is_list(opts) ->
        {schema, Keyword.merge(default_opts, opts)}

      schema ->
        {schema, default_opts}
    end)
    |> Enum.map(fn {schema, opts} ->
      _ =
        schema
        |> ensure_schema!()
        |> ensure_source!()

      {schema, opts}
    end)
  end

  defp do_run_migrations(repo, migrations) do
    Enum.reduce(migrations, [], fn {schema, opts}, acc ->
      case create_table(repo, schema, opts) do
        {:ok, table} ->
          Logger.info("Creates DB table #{table}")
          [schema | acc]

        {:ignore, table} ->
          Logger.info("DB table already exists #{table}")
          [schema | acc]

        {:error, error} ->
          Logger.error("Coud not create DB table for #{schema}, error: #{inspect(error)}")
          raise "Error running migrations"
      end
    end)
  end

  defp ensure_schema!(schema) do
    case Code.ensure_compiled(schema) do
      {:module, _} ->
        if function_exported?(schema, :__schema__, 2) do
          schema
        else
          Mix.raise("Module #{inspect(schema)} is not an Ecto.Schema.")
        end

      {:error, error} ->
        Mix.raise("Could not load #{inspect(schema)}, error: #{inspect(error)}.")
    end
  end

  defp ensure_source!(schema) do
    case schema.__schema__(:source) do
      nil ->
        Mix.raise(
          "Module #{inspect(schema)} do not define a `:source`, probably an embedded schema."
        )

      _ ->
        schema
    end
  end

  defp ensure_repo_started(repo, config) do
    case repo.start_link(config) do
      {:ok, _} ->
        {:ok, :stop}

      {:error, {:already_started, _pid}} ->
        {:ok, :restart}

      {:error, _} = error ->
        error
    end
  end

  defp ensure_migration_repo_started(repo, repo) do
    {:ok, :noop}
  end

  defp ensure_migration_repo_started(migration_repo, _repo) do
    case migration_repo.start_link(migration_repo.config()) do
      {:ok, _} ->
        {:ok, :stop}

      {:error, {:already_started, _pid}} ->
        {:ok, :noop}

      {:error, _} = error ->
        error
    end
  end

  defp after_action(repo, :restart) do
    if Process.whereis(repo) do
      %{pid: pid} = Ecto.Adapter.lookup_meta(repo)
      Supervisor.restart_child(repo, pid)
    end
  end

  defp after_action(repo, :stop) do
    repo.stop()
  end

  defp after_action(_repo, :noop) do
    :noop
  end

  defp create_table(repo, model, opts) do
    mnesia_opts = Keyword.merge(opts, index: indices(model))

    table = model.__schema__(:source)

    model
    |> constraints()
    |> Enum.each(fn {:foreign_key, from, assoc, opts} ->
      :ok = Migration.references(from, assoc, opts)
    end)

    case Migration.create_table(repo, model, mnesia_opts) do
      {:ok, table} -> {:ok, table}
      :ignore -> {:ignore, table}
      {:error, reason} -> {:error, reason}
    end
  end

  defp indices(model) do
    if function_exported?(model, :__mnesia__, 1) do
      model.__mnesia__(:indices)
    else
      []
    end
  end

  defp constraints(model) do
    if function_exported?(model, :__mnesia__, 1) do
      model.__mnesia__(:constraints)
    else
      []
    end
  end
end