lib/data_migration/runner.ex

defmodule EctoSparkles.DataMigration.Runner do
  @moduledoc """
  Runs a `DataMigration`
  """
  import Ecto.Query
  alias EctoSparkles.DataMigration

  @spec run(module()) :: :ok | no_return()
  def run(migration_module) do
    config = migration_module.config()

    if config.async do 
      Task.start(fn ->
        throttle_change_in_batches(migration_module, config, config.first_id)
      end)

      :ok

    else
        throttle_change_in_batches(migration_module, config, config.first_id)
    end
  end


  defp throttle_change_in_batches(migration_module, config, last_id, batch_i \\ 1)
  defp throttle_change_in_batches(_migration_module, _, nil, _), do: :ok
  defp throttle_change_in_batches(migration_module, config, last_id, batch_i) do
    
    query =
      migration_module.base_query()
      |> where([i], i.id > ^last_id)
      |> order_by([i], asc: i.id)
      |> limit(^config.batch_size)

    case config.repo.all(query, log: :info, timeout: :infinity) do
      [] ->
        IO.puts("DataMigration: Done")
        # Occurs when no more elements match the query; the migration is done!
        :ok

      query_results ->

        IO.puts("DataMigration: Start batch #{batch_i} - above ID #{last_id}")

        migration_module.migrate(query_results)
        Process.sleep(config.throttle_ms)

        last_processed_id = List.last(query_results).id
        throttle_change_in_batches(migration_module, config, last_processed_id, batch_i+1)
    end
  end
end