defmodule Mosql.FullExport do
@moduledoc """
Documentation for `FullExport`.
"""
use Broadway
alias Broadway.Message
alias Mosql.FullExportProducer
alias Mosql.Mongo
alias Mosql.Schema
alias Mosql.SQL
alias Mosql.Postgres
require Logger
# Load the full export config values
# [coll_batch_size: 60, bulk_insert_size: 20, skip_collections: []]
@full_export_config Application.compile_env!(:mosql, :full_export)
@doc """
Kick of the migration process for the given namespace
"""
def trigger(ns) do
FullExportProducer.trigger(ns)
end
def export_status(ns) do
FullExportProducer.info_producer_status(ns)
end
def start_link(_opts) do
{:ok, pid} =
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{FullExportProducer,
%{
namespace: '',
export_triggered: false,
collections: [],
exported_collections: [],
demand_filled: 0,
pending_demand: 0
}},
transformer: {__MODULE__, :transform, []}
],
processors: [
# all default values
default: [concurrency: System.schedulers_online() * 2, max_demand: 100, min_demand: 20]
],
batchers: [
default: [batch_size: 10]
],
partition_by: &partition/1
)
Logger.info("full export pipeline pid #{inspect(pid)}")
{:ok, pid}
end
# partition messages by collection so it is handled by the same processor and batch processor
defp partition(message) do
:erlang.phash2(message.data[:collection])
end
# Producer transformer to transformer the event data generated by the producer
# to %Broadway.Message{}
def transform(event, opts) do
IO.puts("Transform event: #{inspect(event)}, opts: #{inspect(opts)}")
%Message{
data: event,
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
end
# Prepare the messages that contain the mongo collection names for the export.
# Attach the Mongo.Stream cursor to fetch all the documents for each collection
# on message data.
@impl true
def prepare_messages(messages, _context) do
Logger.info("Handling callback `prepare_messages`. messages: #{inspect(messages)}}")
# Update the message to include Mongo cursor to fetch all the reocrds lazily
messages =
Enum.map(messages, fn message ->
coll = message.data[:collection]
ns = message.data[:namespace]
Logger.info("Fetching documents for the collection #{coll} in namespace #{ns}")
cursor = Mongo.find_all(coll, @full_export_config[:coll_batch_size])
Message.update_data(message, fn _ ->
%{namespace: ns, collection: coll, rows: cursor}
end)
end)
messages
end
# Handle the export for each document of the collection
@impl true
def handle_message(_processor, message, _context) do
%{data: %{namespace: namespace, collection: collection, rows: cursor}} = message
Logger.info("Handling callback 'handle_message' for collection '#{collection}'")
schema = Schema.saved_schema(namespace, collection)
sqls =
cursor
|> Stream.chunk_every(@full_export_config[:bulk_insert_size])
|> Enum.map(&build_upsert_sql(schema, &1))
message =
message
|> Message.put_data(%{namespace: namespace, collection: collection, sqls: sqls})
|> Message.put_batcher(:default)
|> Message.put_batch_key(collection)
message
end
defp build_upsert_sql(schema, docs) do
csv_values = Enum.map(docs, &to_csv_values(schema, &1))
SQL.bulk_upsert_sql(schema, csv_values)
end
defp to_csv_values(schema, doc) do
flat_doc = Mongo.flat_document_map(doc)
SQL.to_insert_values(schema, flat_doc)
end
@impl true
def handle_batch(:default, messages, batch_info, _context) do
Logger.info("Handling callback 'handle_batch' for key '#{batch_info.batch_key}'")
Enum.map(messages, &handle_batch_message(&1))
end
defp handle_batch_message(message) do
%{data: %{sqls: sqls}} = message
results =
Enum.map(sqls, fn sql ->
Logger.info("Executing upsert query: #{sql}")
case Postgres.query(sql) do
{:ok, _} -> ""
{:error, reason} -> reason
end
end)
errors = Enum.filter(results, fn r -> r != "" end)
if Enum.count(errors) > 0 do
Message.failed(message, errors)
else
message
end
end
def ack(:ack_id, _successful, failed) do
# Write ack code here
IO.puts("FAILED: #{inspect(failed)}")
:ok
end
end