lib/carbonite/migrations.ex

# SPDX-License-Identifier: Apache-2.0

defmodule Carbonite.Migrations do
  @moduledoc """
  Functions to setup Carbonite audit trails in your migrations.
  """

  @moduledoc since: "0.1.0"

  use Ecto.Migration
  import Carbonite, only: [default_prefix: 0]
  import Carbonite.Migrations.Helper

  @type patch :: non_neg_integer()
  @type prefix :: binary()
  @type table_name :: binary() | atom()

  # --------------------------------- patch levels ---------------------------------

  @initial_patch 1
  @current_patch 4

  @doc false
  @spec initial_patch :: non_neg_integer()
  def initial_patch, do: @initial_patch

  @doc false
  @spec current_patch :: non_neg_integer()
  def current_patch, do: @current_patch

  # --------------------------------- main schema ----------------------------------

  @type up_option :: {:carbonite_prefix, prefix()}

  @doc """
  Runs one of Carbonite's migrations.

  ## Migration patchlevels

  Make sure that you run all migrations in your host application.

  * Initial patch: #{@initial_patch}
  * Current patch: #{@current_patch}

  ## Options

  * `carbonite_prefix` defines the audit trail's schema, defaults to `"carbonite_default"`
  """
  @doc since: "0.4.0"
  @spec up(patch()) :: :ok
  @spec up(patch(), [up_option()]) :: :ok
  def up(patch, opts \\ []) when is_integer(patch) and is_list(opts) do
    change(:up, patch, opts)
  end

  @type down_option :: {:carbonite_prefix, prefix()} | {:drop_schema, boolean()}

  @doc """
  Rollback a migration.

  ## Options

  * `carbonite_prefix` defines the audit trail's schema, defaults to `"carbonite_default"`
  * `drop_schema` controls whether the initial migration deletes the schema during rollback
  """
  @doc since: "0.4.0"
  @spec down(patch()) :: :ok
  @spec down(patch(), [down_option()]) :: :ok
  def down(patch, opts \\ []) when is_integer(patch) and is_list(opts) do
    change(:down, patch, opts)
  end

  defp change(direction, patch, opts) when is_integer(patch) do
    module = Module.concat([__MODULE__, "V#{patch}"])

    apply(module, direction, [opts])
  end

  # ------------------------------- trigger setup ----------------------------------

  @default_table_prefix "public"

  @type trigger_option :: {:table_prefix, prefix()} | {:carbonite_prefix, prefix()}

  @doc """
  Installs a change capture trigger on a table.

  ## Options

  * `table_prefix` is the name of the schema the table lives in
  * `carbonite_prefix` is the schema of the audit trail, defaults to `"carbonite_default"`
  """
  @doc since: "0.4.0"
  @spec create_trigger(table_name()) :: :ok
  @spec create_trigger(table_name(), [trigger_option()]) :: :ok
  def create_trigger(table_name, opts \\ []) do
    table_prefix = Keyword.get(opts, :table_prefix, @default_table_prefix)
    carbonite_prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())

    """
    CREATE TRIGGER capture_changes_into_#{carbonite_prefix}_trigger
    AFTER INSERT OR UPDATE OR DELETE
    ON #{table_prefix}.#{table_name}
    FOR EACH ROW
    EXECUTE PROCEDURE #{carbonite_prefix}.capture_changes();
    """
    |> squish_and_execute()

    """
    INSERT INTO #{carbonite_prefix}.triggers (
      id,
      table_prefix,
      table_name,
      inserted_at,
      updated_at
    ) VALUES (
      NEXTVAL('#{carbonite_prefix}.triggers_id_seq'),
      '#{table_prefix}',
      '#{table_name}',
      NOW(),
      NOW()
    );
    """
    |> squish_and_execute()

    :ok
  end

  @doc """
  Removes a change capture trigger from a table.

  ## Options

  * `table_prefix` is the name of the schema the table lives in
  * `carbonite_prefix` is the schema of the audit trail, defaults to `"carbonite_default"`
  """
  @doc since: "0.4.0"
  @spec drop_trigger(table_name()) :: :ok
  @spec drop_trigger(table_name(), [trigger_option()]) :: :ok
  def drop_trigger(table_name, opts \\ []) do
    table_prefix = Keyword.get(opts, :table_prefix, @default_table_prefix)
    carbonite_prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())

    """
    DELETE FROM #{carbonite_prefix}.triggers
    WHERE table_prefix = '#{table_prefix}'
    AND table_name = '#{table_name}';
    """
    |> squish_and_execute()

    """
    DROP TRIGGER capture_changes_into_#{carbonite_prefix}_trigger
    ON #{table_prefix}.#{table_name};
    """
    |> squish_and_execute()

    :ok
  end

  @type trigger_config_key :: :table_prefix | :primary_key_columns | :excluded_columns | :mode

  @doc """
  Allows to update a trigger configuration option for a given table.

  This function builds an SQL UPDATE statement that can be used within a database migration to
  update a setting stored in Carbonite's `triggers` table without using the `Carbonite.Trigger`
  schema or other application-level code that is prone to change over time. This helps to ensure
  that your data migrations continue to function, regardless of future updates to Carbonite.

  ## Configuration values

  * `primary_key_columns` is a list of columns that form the primary key of the table
                          (defaults to `["id"]`, set to `[]` to disable)
  * `excluded_columns` is a list of columns to exclude from change captures
  * `filtered_columns` is a list of columns that appear as '[FILTERED]' in the data
  * `mode` is either `:capture` or `:ignore` and defines the default behaviour of the trigger

  ## Example

      Carbonite.Migrations.put_trigger_config("rabbits", :excluded_columns, ["name"])

  ## Options

  * `table_prefix` is the name of the schema the table lives in
  * `carbonite_prefix` is the schema of the audit trail, defaults to `"carbonite_default"`
  """
  @doc since: "0.4.0"
  @spec put_trigger_config(table_name(), trigger_config_key(), any(), [trigger_option()]) :: :ok
  def put_trigger_config(table_name, key, value, opts \\ [])

  def put_trigger_config(table_name, key, value, opts)
      when key in [:primary_key_columns, :excluded_columns, :filtered_columns] do
    do_put_trigger_config(table_name, key, column_list(value), opts)
  end

  def put_trigger_config(table_name, :mode, value, opts) when value in [:capture, :ignore] do
    do_put_trigger_config(table_name, :mode, "'#{value}'", opts)
  end

  defp do_put_trigger_config(table_name, field, value, opts) do
    table_prefix = Keyword.get(opts, :table_prefix, @default_table_prefix)
    carbonite_prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())

    """
    UPDATE #{carbonite_prefix}.triggers
    SET #{field} = #{value}, updated_at = NOW()
    WHERE table_prefix = '#{table_prefix}'
    AND table_name = '#{table_name}';
    """
    |> squish_and_execute()

    :ok
  end

  # ------------------------------- outbox setup -----------------------------------

  @type outbox_name :: String.t()
  @type outbox_option :: {:carbonite_prefix, prefix()}

  @doc """
  Inserts an outbox record into the database.

  ## Options

  * `carbonite_prefix` is the schema of the audit trail, defaults to `"carbonite_default"`
  """
  @doc since: "0.4.0"
  @spec create_outbox(outbox_name()) :: :ok
  @spec create_outbox(outbox_name(), [outbox_option()]) :: :ok
  def create_outbox(outbox_name, opts \\ []) do
    carbonite_prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())

    """
    INSERT INTO #{carbonite_prefix}.outboxes (
      name,
      inserted_at,
      updated_at
    ) VALUES (
      '#{outbox_name}',
      NOW(),
      NOW()
    );
    """
    |> squish_and_execute()

    :ok
  end

  @doc """
  Removes an outbox record.

  ## Options

  * `carbonite_prefix` is the schema of the audit trail, defaults to `"carbonite_default"`
  """
  @doc since: "0.4.0"
  @spec drop_outbox(outbox_name()) :: :ok
  @spec drop_outbox(outbox_name(), [outbox_option()]) :: :ok
  def drop_outbox(outbox_name, opts \\ []) do
    carbonite_prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())

    """
    DELETE FROM #{carbonite_prefix}.outboxes
    WHERE name = '#{outbox_name}';
    """
    |> squish_and_execute()

    :ok
  end

  # ------------------------------ data migration s---------------------------------

  @type insert_migration_transaction_option :: {:carbonite_prefix, prefix()} | {:meta, map()}

  @doc """
  Inserts a transaction for a data migration.

  The transaction's `meta` attribute is populated with

      {"type": "migration", direction: "up"}

  ... and additionally the `name` from the parameters.

  ## Example

      defmodule MyApp.Repo.Migrations.SomeDataMigration do
        use Ecto.Migration

        import Carbonite.Migrations

        def change do
          insert_migration_transaction()

          execute("UPDATE ...", "...")
        end
      end

  This works the same for `up/0`/`down/0`-style migrations:

      defmodule MyApp.Repo.Migrations.SomeDataMigration do
        use Ecto.Migration

        import Carbonite.Migrations

        def up do
          insert_migration_transaction("some-data-migrated")

          execute("INSERT ...")
        end

        def down do
          insert_migration_transaction("some-data-rolled-back")

          execute("DELETE ...")
        end
      end

  ## Options

  * `carbonite_prefix` is the schema of the audit trail, defaults to `"carbonite_default"`
  * `meta` is a map with additional meta information to store on the transaction
  """
  @spec insert_migration_transaction(name :: String.t(), [insert_migration_transaction_option()]) ::
          :ok
  def insert_migration_transaction(name, opts \\ []) do
    carbonite_prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())
    meta = Keyword.get(opts, :meta, %{})

    meta =
      %{type: "migration", direction: direction(), name: to_string(name)}
      |> Map.merge(meta)
      |> Jason.encode!()

    statement =
      """
      INSERT INTO #{carbonite_prefix}.transactions (meta, inserted_at)
      VALUES ('#{meta}'::jsonb, NOW());
      """
      |> squish()

    # Needed because `change/0` requires a rollback statement. In our case irrelevant, as we're
    # inserting a transaction in both directions (and the `direction` value is set dynamically).
    execute(statement, statement)
  end

  @doc """
  Defines a `c:Ecto.Migration.after_begin/0` implementation for a data migration.

  See `insert_migration_transaction/1` for options.

  This determines the `name` of the transaction from the migration's module name.

       {"direction": "up","name": "my_app/repo/migrations/example", "type": "migration"}

  ## Example

      defmodule MyApp.Repo.Migrations.SomeDataMigration do
        use Ecto.Migration

        import Carbonite.Migrations
        insert_migration_transaction_after_begin()

        def change do
          execute("UPDATE ...")
        end
      end

  Alternatively, if you define your own migration template module:

      defmodule MyApp.Migration do
        defmacro __using__ do
          quote do
            use Ecto.Migration

            import Carbonite.Migrations
            insert_migration_transaction_after_begin()
          end
        end
      end
  """
  @doc since: "0.5.0"
  defmacro insert_migration_transaction_after_begin(opts \\ []) do
    opts = Keyword.take(opts, [:carbonite_prefix, :meta])

    quote do
      @behaviour Ecto.Migration

      @impl Ecto.Migration
      def after_begin do
        __MODULE__
        |> Macro.underscore()
        |> Carbonite.Migrations.insert_migration_transaction(unquote(opts))
      end
    end
  end
end