lib/carbonite/query.ex

# SPDX-License-Identifier: Apache-2.0

defmodule Carbonite.Query do
  @moduledoc """
  This module provides query functions for retrieving audit trails from the database.
  """

  @moduledoc since: "0.2.0"

  import Ecto.Query
  import Carbonite.Prefix
  alias Carbonite.{Change, Outbox, Transaction, Trigger}

  @type prefix :: binary()
  @type disabled :: nil | false

  @type prefix_option :: {:carbonite_prefix, prefix()}
  @type preload_option :: {:preload, boolean()}

  @type transactions_option :: prefix_option() | preload_option()

  @doc """
  Returns an `t:Ecto.Query.t/0` that can be used to select transactions from the database.

  ## Examples

      Carbonite.Query.transactions()
      |> MyApp.Repo.all()

      # Preload changes
      Carbonite.Query.transactions(preload: true)
      |> MyApp.Repo.all()

  ## Options

  * `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
  * `preload` - can be used to preload the changes, defaults to `false`
  """

  @doc since: "0.3.1"
  @spec transactions() :: Ecto.Query.t()
  @spec transactions([transactions_option()]) :: Ecto.Query.t()
  def transactions(opts \\ []) do
    from_with_prefix(Transaction, opts)
    |> maybe_preload(opts, :changes, from_with_prefix(Change, opts))
  end

  @type current_transaction_option :: prefix_option() | preload_option()

  @doc """
  Returns an `t:Ecto.Query.t/0` that can be used to select or delete the "current" transaction.

  This function is useful when your tests run in a database transaction using Ecto's SQL sandbox.

  ## Example: Asserting on the current transaction

  When you insert your `Carbonite.Transaction` record somewhere inside your domain logic, you do
  not wish to return it to the caller only to be able to assert on its attributes in tests. This
  example shows how you could assert on the metadata inserted.

      # Test running inside Ecto's SQL sandbox.
      test "my test" do
        some_operation_with_a_transaction()

        assert current_transaction_meta() == %{"type" => "some_operation"}
      end

      defp current_transaction_meta do
        Carbonite.Query.current_transaction()
        |> MyApp.Repo.one!()
        |> Map.fetch(:meta)
      end

  ## Options

  * `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
  * `preload` - can be used to preload the changes, defaults to `false`
  """
  @doc since: "0.2.0"
  @spec current_transaction() :: Ecto.Query.t()
  @spec current_transaction([current_transaction_option()]) :: Ecto.Query.t()
  def current_transaction(opts \\ []) do
    carbonite_prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())

    from_with_prefix(Transaction, opts)
    |> where(
      [t],
      t.id == fragment("CURRVAL(CONCAT(?::VARCHAR, '.transactions_id_seq'))", ^carbonite_prefix)
    )
    |> where([t], t.xact_id == fragment("pg_current_xact_id()"))
    |> maybe_preload(opts, :changes, from_with_prefix(Change, opts))
  end

  # Returns all triggers.
  @doc false
  @spec triggers() :: Ecto.Query.t()
  def triggers(opts \\ []) do
    from_with_prefix(Trigger, opts)
  end

  @doc """
  Returns an `t:Ecto.Query.t/0` that selects a outbox by name.

  ## Options

  * `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
  """
  @doc since: "0.4.0"
  @spec outbox(Outbox.name()) :: Ecto.Query.t()
  @spec outbox(Outbox.name(), [prefix_option()]) :: Ecto.Query.t()
  def outbox(outbox_name, opts \\ []) do
    from_with_prefix(Outbox, opts)
    |> where([o], o.name == ^outbox_name)
  end

  @type outbox_queue_option ::
          prefix_option()
          | preload_option()
          | {:min_age, non_neg_integer() | disabled()}
          | {:limit, non_neg_integer() | disabled()}

  @doc """
  Returns an `t:Ecto.Query.t/0` that selects the next batch of transactions for an outbox.

  * Transactions are ordered by their ID ascending, so *roughly* in order of insertion.

  ## Options

  * `min_age` - the minimum age of a record, defaults to 300 seconds (set nil to disable)
  * `limit` - limits the query in size, defaults to 100 (set nil to disable)
  * `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
  * `preload` - can be used to preload the changes, defaults to `true`
  """
  @doc since: "0.4.0"
  @spec outbox_queue(Outbox.t()) :: Ecto.Query.t()
  @spec outbox_queue(Outbox.t(), [outbox_queue_option()]) :: Ecto.Query.t()
  def outbox_queue(%Outbox{last_transaction_id: last_processed_tx_id}, opts \\ []) do
    opts = Keyword.put_new(opts, :preload, true)

    from_with_prefix(Transaction, opts)
    |> where([t], t.id > ^last_processed_tx_id)
    |> maybe_apply(opts, :limit, 100, fn q, bs -> limit(q, ^bs) end)
    |> maybe_apply(opts, :min_age, 300, &where_inserted_at_lt/2)
    |> maybe_preload(opts, :changes, from_with_prefix(Change, opts))
    |> order_by({:asc, :id})
  end

  @type outbox_done_option :: prefix_option() | {:min_age, non_neg_integer() | disabled()}

  @doc """
  Returns an `t:Ecto.Query.t/0` that selects all completely processed transactions.

  * If no outbox exists, this query returns all transactions.
  * If one or more outboxes exist, this query returns all transactions with an ID less than the
    minimum of the `last_transaction_id` attributes of the outboxes.
  * Transactions are not ordered.

  ## Options

  * `min_age` - the minimum age of a record, defaults to 300 seconds (set nil to disable)
  * `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
  """
  @doc since: "0.4.0"
  @spec outbox_done() :: Ecto.Query.t()
  @spec outbox_done([outbox_done_option()]) :: Ecto.Query.t()
  def outbox_done(opts \\ []) do
    # NOTE: The query below has a non-optimal query plan, but expressing it differently makes
    #       it a bit convoluted (e.g., fetching the min `last_transaction_id` or `MAX_INT` if that
    #       does not exist and then filtering by <= that number), so we keep the `ALL()` for now.

    outbox_query =
      Outbox
      |> from_with_prefix(opts)
      |> select([o], o.last_transaction_id)

    Transaction
    |> from_with_prefix(opts)
    |> where([t], t.id <= all(outbox_query))
    |> maybe_apply(opts, :min_age, 300, &where_inserted_at_lt/2)
  end

  @default_table_prefix "public"

  @type changes_option :: prefix_option() | preload_option() | {:table_prefix, prefix()}

  @doc """
  Returns an `t:Ecto.Query.t/0` that can be used to select changes for a single record.

  Given an `t:Ecto.Schema.t/0` struct, this function builds a query that fetches all changes
  recorded for it from the database, ordered ascending by their ID (i.e., roughly by
  insertion date descending).

  ## Example

      %MyApp.Rabbit{id: 1}
      |> Carbonite.Query.changes()
      |> MyApp.Repo.all()

  ## Options

  * `carbonite_prefix` defines the audit trail's schema, defaults to `"carbonite_default"`
  * `table_prefix` allows to override the table prefix, defaults to schema prefix of the record
  * `preload` can be used to preload the transaction
  """
  @doc since: "0.2.0"
  @spec changes(record :: Ecto.Schema.t()) :: Ecto.Query.t()
  @spec changes(record :: Ecto.Schema.t(), [changes_option()]) :: Ecto.Query.t()
  def changes(%schema{__meta__: %Ecto.Schema.Metadata{}} = record, opts \\ []) do
    table_prefix =
      Keyword.get_lazy(opts, :table_prefix, fn ->
        schema.__schema__(:prefix) || @default_table_prefix
      end)

    table_name = schema.__schema__(:source)

    table_pk =
      for pk_col <- Enum.sort(schema.__schema__(:primary_key)) do
        record |> Map.fetch!(pk_col) |> to_string()
      end

    from_with_prefix(Change, opts)
    |> where([c], c.table_prefix == ^table_prefix)
    |> where([c], c.table_name == ^table_name)
    |> where([c], c.table_pk == ^table_pk)
    |> maybe_preload(opts, :transaction, from_with_prefix(Transaction, opts))
    |> order_by({:asc, :id})
  end

  defp maybe_apply(queryable, opts, key, default, fun) do
    if value = Keyword.get(opts, key, default) do
      fun.(queryable, value)
    else
      queryable
    end
  end

  defp maybe_preload(queryable, opts, association, preload_query) do
    case Keyword.get(opts, :preload, false) do
      preload when preload in [false, nil] ->
        queryable

      true ->
        preload(queryable, [{^association, ^preload_query}])
    end
  end

  defp where_inserted_at_lt(queryable, min_age) do
    max_inserted_at = DateTime.add(DateTime.utc_now(), -1 * min_age, :second)

    where(queryable, [t], t.inserted_at <= ^max_inserted_at)
  end
end