defmodule Mosql do
@moduledoc """
Specification for `Mosql`. It is used to export data from MongoDB
database to Postgres database
There are two kind of Mosql exports: full export and change stream
## FullExport
FullExport is used to do a one time full export of MongoDB database
to a destination Postgres database using the schema mapping definitions
that maps each collection and document fields to SQL table and table
attributes. The process to start the export:
1. Create a postgres export type with a namespace. Namespace has to be unique
Mosql.create_postgres_export("my_namespace")
The export can be customized by using the `options` argument
to provide `exclusions` and `exclusives`
* `:exclusions` - the list of collections to exclude from the export
* `:exclusives` - the list of collections that must be included
2. Generate export schema definitions and export to a file system
ex = Mosql.generate_default_schemas(export, [persist: true, export_path: "schema"])
Export.to_json(ex, "schema")
3. Reload the (updated) schema files from the path to the export
Mosql.reload_schema_files(export, schema_path)
3. Trigger full export
Mosql.start_full_export(export)
"""
alias Mosql.FullExport
alias Mosql.Export
alias Mosql.Schema
alias Mosql.SQL
require Logger
@type_postgres "postgres"
@doc """
Perform one time setup operation for MoSQL to initialize export
database and other things
"""
def setup do
Export.setup!()
end
@doc """
creates the complete postgres type export definition for the given namespace
"""
def create_postgres_export(namespace, options \\ []) do
Export.new(namespace, @type_postgres, options)
end
@doc """
Fetch the postgres export based on the namespace and the type
"""
def fetch_postgres_export(namespace) do
Export.fetch(namespace, @type_postgres)
end
@doc """
Loads the saved export to the schema store for the export
"""
def load_postgres_export(namespace) do
case Export.fetch(namespace, @type_postgres) do
{:ok, ex} ->
if Enum.count(ex.schemas) > 0 do
Export.populate_schema_store(ex)
{:ok, ex}
else
{:error, "Missing schema definitions"}
end
{:error, :not_found} ->
{:error, :not_found}
_ ->
IO.puts("Something unexpected happened, please try again.")
end
end
@doc """
Removes the saved export based on the namespace and type
"""
def remove_export(namespace, type) do
Export.delete(namespace, type)
end
@doc """
Generates the default export schema definitions for the given export.
These schema definitions can be modified to customize the export at the
collection and field level. It takes additional opts to provide `export_path`
and `persist`
* `:export_path` - export the schema as JSON files on the given export path
* `:persist` - persist export definition in the MoSQL internal store
MoSQL internal store is a mnesia based disk store. See `Mosql.Export.setup!()` for
more details
"""
def generate_default_schemas(export, opts \\ []) do
schemas = Export.generate_schema_mappings(export)
ex = %{export | schemas: schemas}
if Keyword.has_key?(opts, :export_path) do
Export.to_json(ex, Keyword.get(opts, :export_path))
end
if Keyword.has_key?(opts, :persist) do
Export.save(ex)
end
end
@doc """
Reload the schema definition from the JSON files to the given export from
the file path. This will override the existing in-memory schema definition
associated with the export
"""
def reload_schema_files(export, schema_path) do
Logger.info("Loading schema files from the path #{schema_path}....")
schemas = Schema.load_schema_files(export.ns, schema_path)
# if any schema failed to load return the error
if has_schema_load_errors(schemas) do
{:error, schema_load_errors(schemas)}
else
{:ok, %{export | schemas: loaded_schemas(schemas)}}
end
end
@doc """
Start the full export process for the given export
"""
def start_full_export(export) do
case Enum.count(export.schemas) do
0 ->
{:error, "Missing schema definitions"}
_ ->
Export.populate_schema_store(export)
SQL.prepare(export)
FullExport.trigger(export.ns)
{:ok, "Full export pipeline started...."}
end
end
defp has_schema_load_errors(schemas) do
Enum.any?(schemas, &schema_load_error(&1))
end
defp schema_load_errors(schemas) do
Enum.filter(schemas, &schema_load_error(&1))
end
defp loaded_schemas(schemas) do
Enum.filter(schemas, &no_error_schema(&1)) |> Enum.map(fn {:ok, schema} -> schema end)
end
defp schema_load_error({:error, _}), do: true
defp schema_load_error({:ok, _}), do: false
defp no_error_schema({:ok, _}), do: true
defp no_error_schema({:error, _}), do: false
end