lib/event_store/tasks/migrations.ex

defmodule EventStore.Tasks.Migration do
  defstruct version: nil, state: :pending, migrated_at: nil
end

defmodule EventStore.Tasks.Migrations do
  @moduledoc """
  Task to show the migration status of EventStore
  """

  import EventStore.Tasks.Output

  alias EventStore.Config
  alias EventStore.Tasks.Migration

  @available_migrations [
    "0.13.0",
    "0.14.0",
    "0.17.0",
    "1.1.0",
    "1.2.0",
    "1.3.0",
    "1.3.2"
  ]

  @dialyzer {:no_return, exec: 2, handle_response: 1}

  @doc """
  Run task

  ## Parameters

    - config: the parsed EventStore config

  ## Opts

    - is_mix: set to `true` if running as part of a Mix task

  """
  def exec(config, opts \\ []) do
    opts = Keyword.merge([is_mix: false, quiet: false], opts)
    schema = Keyword.fetch!(config, :schema)
    config = Config.default_postgrex_opts(config)

    migrations = migrations(config, schema)
    event_store = Keyword.get(opts, :eventstore, "default")
    event_store_name = event_store |> to_string() |> String.replace_prefix("Elixir.", "")

    write_info("\nEventStore: #{event_store_name}\n", opts)
    write_table_header(opts)

    Enum.each(migrations, &list_migration(&1, opts))

    write_info("", opts)

    Enum.map(migrations, & &1.state)
  end

  @doc false
  def available_migrations(), do: @available_migrations

  defp list_migration(%Migration{version: version, state: :pending}, opts) do
    write_info("  #{version |> String.pad_trailing(10)}\tpending", opts)
  end

  defp list_migration(%Migration{version: version, state: :completed, migrated_at: time}, opts) do
    write_info("  #{version |> String.pad_trailing(10)}\tcompleted\t#{time}", opts)
  end

  defp write_table_header(opts) do
    """
      migration     state           migrated_at
    -------------------------------------------------------------
    """
    |> String.trim_trailing()
    |> write_info(opts)
  end

  defp migrations(config, schema) do
    completed = query_schema_migrations(config, schema)
    latest_migration = completed |> Enum.reverse() |> Enum.at(0, nil)

    completed ++ pending_migrations(latest_migration)
  end

  defp pending_migrations(nil), do: @available_migrations

  defp pending_migrations(%Migration{state: :completed, version: event_store_version}) do
    @available_migrations
    |> Enum.map(fn version -> %Migration{version: version} end)
    |> Enum.filter(fn migration ->
      Version.compare(migration.version, event_store_version) == :gt
    end)
  end

  defp query_schema_migrations(config, schema) do
    config
    |> run_query("""
      SELECT major_version, minor_version, patch_version, migrated_at
      FROM #{schema}.schema_migrations
      ORDER BY 1, 2, 3
    """)
    |> handle_response()
  end

  defp run_query(config, query) do
    {:ok, conn} = Postgrex.start_link(config)

    try do
      Postgrex.query!(conn, query, [])
    after
      GenServer.stop(conn)
    end
  end

  defp handle_response(%Postgrex.Result{rows: rows}) do
    Enum.map(rows, fn [major_version, minor_version, patch_version, migrated_at] ->
      %Migration{
        version: "#{major_version}.#{minor_version}.#{patch_version}",
        state: :completed,
        migrated_at: migrated_at
      }
    end)
  end
end