lib/event_store/tasks/migrate.ex

defmodule EventStore.Tasks.Migrate do
  @moduledoc """
  Task to migrate EventStore
  """

  import EventStore.Tasks.Output

  alias EventStore.{Config, Storage}
  alias EventStore.Tasks.Migrations

  @dialyzer {:no_return, exec: 2, handle_response: 1}

  @doc """
  Run task

  ## Parameters

    - config: the parsed EventStore config

  ## Opts

    - is_mix: set to `true` if running as part of a Mix task
    - quiet: set to `true` to silence output

  """
  def exec(config, opts \\ []) do
    opts = Keyword.merge([is_mix: false, quiet: false], opts)
    schema = Keyword.fetch!(config, :schema)
    config = Config.default_postgrex_opts(config)

    with_migration_lock(config, schema, opts, fn ->
      {:ok, conn} = Postgrex.start_link(config)

      try do
        migrations = available_migrations(conn, schema)

        migrate(conn, schema, opts, migrations)
      after
        GenServer.stop(conn)
      end
    end)
  end

  # Prevent database migrations from running concurrently by acquiring a
  # Postgres advisory lock.
  defp with_migration_lock(config, schema, opts, callback) do
    {:ok, lock_conn} = Postgrex.start_link(config)

    try do
      with :ok <- acquire_migration_lock(lock_conn, schema) do
        callback.()
      else
        {:error, :lock_already_taken} ->
          write_info("EventStore database migration already in progress.", opts)

        {:error, %Postgrex.Error{} = error} ->
          raise_msg(
            "The EventStore database could not be migrated. Could not acquire migration lock due to: " <>
              inspect(error),
            opts
          )
      end
    after
      GenServer.stop(lock_conn)
    end
  end

  defp acquire_migration_lock(conn, schema) do
    Storage.Lock.try_acquire_exclusive_lock(conn, -1, schema: schema)
  end

  defp available_migrations(conn, schema) do
    case event_store_schema_version(conn, schema) do
      %Version{} = event_store_version ->
        # Only run newer migrations
        Enum.filter(Migrations.available_migrations(), fn migration_version ->
          migration_version |> Version.parse!() |> Version.compare(event_store_version) == :gt
        end)

      nil ->
        # Run all migrations
        Migrations.available_migrations()
    end
  end

  defp migrate(_conn, _schema, opts, []) do
    write_info("The EventStore database is already migrated.", opts)
  end

  defp migrate(conn, schema, opts, migrations) do
    for migration <- migrations do
      write_info("Running migration v#{migration}...", opts)

      path = Application.app_dir(:eventstore, "priv/event_store/migrations/v#{migration}.sql")
      script = File.read!(path)

      statements = [
        ~s(SET LOCAL search_path TO "#{schema}";),
        script
      ]

      case transaction(conn, statements) do
        {:ok, :ok} ->
          :ok

        {:error, error} ->
          raise_msg(
            "The EventStore database couldn't be migrated, reason given: " <>
              inspect(Exception.message(error)),
            opts
          )
      end
    end

    write_info("The EventStore database has been migrated.", opts)
  end

  defp event_store_schema_version(conn, schema) do
    conn
    |> query_schema_migrations(schema)
    |> Enum.sort(fn left, right ->
      case Version.compare(left, right) do
        :gt -> true
        _ -> false
      end
    end)
    |> Enum.at(0)
  end

  defp query_schema_migrations(conn, schema) do
    Postgrex.query!(
      conn,
      """
        SELECT major_version, minor_version, patch_version
        FROM "#{schema}".schema_migrations
      """,
      []
    )
    |> handle_response()
  end

  defp transaction(conn, statements) do
    opts = [timeout: :infinity]

    Postgrex.transaction(
      conn,
      fn transaction ->
        Enum.each(statements, &Postgrex.query!(transaction, &1, [], opts))
      end,
      opts
    )
  end

  defp handle_response(%Postgrex.Result{num_rows: 0}), do: []

  defp handle_response(%Postgrex.Result{rows: rows}) do
    Enum.map(rows, fn [major_version, minor_version, patch_version] ->
      Version.parse!("#{major_version}.#{minor_version}.#{patch_version}")
    end)
  end
end