lib/mosql/full_export.ex

defmodule Mosql.FullExport do
  @moduledoc """
  Documentation for `FullExport`.
  """
  use Broadway

  alias Broadway.Message
  alias Mosql.FullExportProducer
  alias Mosql.Mongo
  alias Mosql.Schema
  alias Mosql.SQL
  alias Mosql.Postgres

  require Logger

  # Load the full export config values
  # [coll_batch_size: 60, bulk_insert_size: 20, skip_collections: []]
  @full_export_config Application.compile_env!(:mosql, :full_export)

  @doc """
  Kick of the migration process for the given namespace
  """
  def trigger(ns) do
    FullExportProducer.trigger(ns)
  end

  def export_status(ns) do
    FullExportProducer.info_producer_status(ns)
  end

  def start_link(_opts) do
    {:ok, pid} =
      Broadway.start_link(__MODULE__,
        name: __MODULE__,
        producer: [
          module:
            {FullExportProducer,
             %{
               namespace: '',
               export_triggered: false,
               collections: [],
               exported_collections: [],
               demand_filled: 0,
               pending_demand: 0
             }},
          transformer: {__MODULE__, :transform, []}
        ],
        processors: [
          # all default values
          default: [concurrency: System.schedulers_online() * 2, max_demand: 100, min_demand: 20]
        ],
        batchers: [
          default: [batch_size: 10]
        ],
        partition_by: &partition/1
      )

    Logger.info("full export pipeline pid #{inspect(pid)}")
    {:ok, pid}
  end

  # partition messages by collection so it is handled by the same processor and batch processor
  defp partition(message) do
    :erlang.phash2(message.data[:collection])
  end

  # Producer transformer to transformer the event data generated by the producer
  # to %Broadway.Message{}
  def transform(event, opts) do
    IO.puts("Transform event: #{inspect(event)}, opts: #{inspect(opts)}")

    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  # Prepare the messages that contain the mongo collection names for the export.
  # Attach the Mongo.Stream cursor to fetch all the documents for each collection
  # on message data.
  @impl true
  def prepare_messages(messages, _context) do
    Logger.info("Handling callback `prepare_messages`. messages: #{inspect(messages)}}")

    # Update the message to include Mongo cursor to fetch all the reocrds lazily
    messages =
      Enum.map(messages, fn message ->
        coll = message.data[:collection]
        ns = message.data[:namespace]

        Logger.info("Fetching documents for the collection #{coll} in namespace #{ns}")

        cursor = Mongo.find_all(coll, @full_export_config[:coll_batch_size])

        Message.update_data(message, fn _ ->
          %{namespace: ns, collection: coll, rows: cursor}
        end)
      end)

    messages
  end

  # Handle the export for each document of the collection
  @impl true
  def handle_message(_processor, message, _context) do
    %{data: %{namespace: namespace, collection: collection, rows: cursor}} = message

    Logger.info("Handling callback 'handle_message' for collection '#{collection}'")

    schema = Schema.saved_schema(namespace, collection)

    sqls =
      cursor
      |> Stream.chunk_every(@full_export_config[:bulk_insert_size])
      |> Enum.map(&build_upsert_sql(schema, &1))

    message =
      message
      |> Message.put_data(%{namespace: namespace, collection: collection, sqls: sqls})
      |> Message.put_batcher(:default)
      |> Message.put_batch_key(collection)

    message
  end

  defp build_upsert_sql(schema, docs) do
    csv_values = Enum.map(docs, &to_csv_values(schema, &1))
    SQL.bulk_upsert_sql(schema, csv_values)
  end

  defp to_csv_values(schema, doc) do
    flat_doc = Mongo.flat_document_map(doc)
    SQL.to_insert_values(schema, flat_doc)
  end

  @impl true
  def handle_batch(:default, messages, batch_info, _context) do
    Logger.info("Handling callback 'handle_batch' for key '#{batch_info.batch_key}'")
    Enum.map(messages, &handle_batch_message(&1))
  end

  defp handle_batch_message(message) do
    %{data: %{sqls: sqls}} = message

    results =
      Enum.map(sqls, fn sql ->
        Logger.info("Executing upsert query: #{sql}")

        case Postgres.query(sql) do
          {:ok, _} -> ""
          {:error, reason} -> reason
        end
      end)

    errors = Enum.filter(results, fn r -> r != "" end)

    if Enum.count(errors) > 0 do
      Message.failed(message, errors)
    else
      message
    end
  end

  def ack(:ack_id, _successful, failed) do
    # Write ack code here
    IO.puts("FAILED: #{inspect(failed)}")
    :ok
  end
end