# SPDX-License-Identifier: Apache-2.0
defmodule Carbonite do
@readme Path.join([__DIR__, "../README.md"])
@external_resource @readme
@moduledoc @readme
|> File.read!()
|> String.split("<!-- MDOC -->")
|> Enum.drop(1)
|> Enum.take_every(2)
|> Enum.join("\n")
@moduledoc since: "0.1.0"
import Ecto.Query
alias Carbonite.{Outbox, Prefix, Query, Schema, Transaction, Trigger}
require Prefix
require Schema
@type prefix :: binary()
@type repo :: Ecto.Repo.t()
@type prefix_option :: {:carbonite_prefix, prefix()}
@doc "Returns the default audit trail prefix."
@doc since: "0.1.0"
@spec default_prefix() :: prefix()
def default_prefix, do: Prefix.default_prefix()
@doc """
Inserts a `t:Carbonite.Transaction.t/0` into the database.
Make sure to run this within a transaction.
## Parameters
* `repo` - the Ecto repository
* `params` - map of params for the `Carbonite.Transaction` (e.g., `:meta`)
* `opts` - optional keyword list
## Options
* `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
## Multiple inserts in the same transaction
Normally, you should have exactly one `insert_transaction/3` call per database transaction. In
practise, there are two scenarios in this function may be called multiple times:
1. If an operation A, which calls `insert_transaction/3`, sometimes is nested within an outer
operation B, which also calls `insert_transaction/3`.
2. In tests using Ecto's SQL sandbox, subsequent calls to transactional operations (even to the
same operation twice) are wrapped inside the overarching test transaction, and hence also
effectively call `insert_transaction/3` within the same transaction.
While the first scenario can be resolved using appropriate control flow (e.g. by conditionally
disabling the inner `insert_transaction/3` call), the second scenario is quite common and often
unavoidable.
Therefore, `insert_transaction/3` **ignores** subsequent calls within the same database
transaction (equivalent to `ON CONFLICT DO NOTHING`), **discarding metadata** passed to all
calls but the first.
"""
@doc since: "0.4.0"
@spec insert_transaction(repo()) :: {:ok, Transaction.t()} | {:error, Ecto.Changeset.t()}
@spec insert_transaction(repo(), params :: map()) ::
{:ok, Transaction.t()} | {:error, Ecto.Changeset.t()}
@spec insert_transaction(repo(), params :: map(), [prefix_option()]) ::
{:ok, Transaction.t()} | {:error, Ecto.Changeset.t()}
def insert_transaction(repo, params \\ %{}, opts \\ []) do
carbonite_prefix = Keyword.get(opts, :carbonite_prefix, default_prefix())
# NOTE: ON CONFLICT DO NOTHING does not combine with RETURNING, so we're forcing an UPDATE.
params
|> Transaction.changeset()
|> repo.insert(
prefix: carbonite_prefix,
on_conflict: {:replace, [:id]},
conflict_target: [:id],
returning: true
)
end
@doc """
Fetches all changes of the current transaction from the database.
Make sure to run this within a transaction.
## Parameters
* `repo` - the Ecto repository
* `opts` - optional keyword list
## Options
* `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
"""
@doc since: "0.5.0"
@spec fetch_changes(repo()) :: {:ok, [Carbonite.Change.t()]}
@spec fetch_changes(repo(), [prefix_option()]) :: {:ok, [Carbonite.Change.t()]}
def fetch_changes(repo, opts \\ []) do
%Carbonite.Transaction{changes: changes} =
[preload: true]
|> Keyword.merge(opts)
|> Query.current_transaction()
|> repo.one()
{:ok, changes}
end
@doc """
Sets the current transaction to "override mode" for all tables in the audit log.
## Parameters
* `repo` - the Ecto repository
* `opts` - optional keyword list
## Options
* `to` - allows to specify the target mode, useful to reset the mode after use
* `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
"""
@doc since: "0.4.0"
@spec override_mode(repo()) :: :ok
@spec override_mode(repo(), [{:to, Trigger.mode()} | prefix_option()]) :: :ok
def override_mode(repo, opts \\ []) do
opts
|> Keyword.take([:carbonite_prefix])
|> Query.triggers()
|> update([], set: [override_xact_id: ^override_xact_id(opts)])
|> repo.update_all([])
:ok
end
defp override_xact_id(opts) do
if mode = Keyword.get(opts, :to) do
dynamic(
[],
fragment("CASE WHEN mode != ? THEN pg_current_xact_id() ELSE NULL END", ^to_string(mode))
)
else
dynamic([], fragment("pg_current_xact_id()"))
end
end
@type process_option ::
Carbonite.Query.outbox_queue_option()
| {:filter, (Ecto.Query.t() -> Ecto.Query.t())}
| {:chunk, pos_integer()}
@type process_func_option :: {:memo, Outbox.memo()} | {:discard_last, boolean()}
@typedoc """
This type defines the callback function signature for `Carbonite.process/3`.
The processor function receives the current chunk of transactions and the memo of the last
function application, and must return one of
* `:cont` - continue processing
* `:halt` - stop processing after this chunk
* `{:cont | :halt, opts}` - cont/halt and set some options
After the process function invocation the Outbox is updated with new attributes.
## Options
Returned options can be:
* `memo` - memo to store on Outbox, defaults to previous memo
* `last_transaction_id` - last transaction id to remember as processed, defaults to previous
`last_transaction_id` on `:halt`, defaults to last id in current
chunk when `:cont` is returned
"""
@type process_func ::
([Transaction.t()], Outbox.memo() ->
:cont | :halt | {:cont | :halt, [process_func_option()]})
@doc """
Processes an outbox queue.
This function sends chunks of persisted transactions to a user-supplied processing function. It
looks up the current "reading position" from a given `Carbonite.Outbox` and yields transactions
matching the given filter criteria (`min_age`, etc.) until either the input source is exhausted
or a processing function application returns `:halt`.
Returns either `{:ok, outbox}` or `{:halt, outbox}` depending on whether processing was halted
explicitly or due the exhausted input source.
See `Carbonite.Query.outbox_queue/2` for query options.
## Examples
Carbonite.process(MyApp.Repo, "rabbit_holes", fn [transaction], _memo ->
# The transaction has its changes preloaded.
transaction
|> MyApp.Foo.serialize()
|> MyApp.Foo.send_to_external_database()
:cont
end)
### Memo passing
The `memo` is useful to carry data between each processor application. Let's say you wanted to
generate a hashsum chain on your processed data:
Carbonite.process(MyApp.Repo, "rabbit_holes", fn [transaction], %{"checksum" => checksum} ->
{payload, checksum} = MyApp.Foo.serialize_and_hash(transaction, checksum)
MyApp.Foo.send_to_external_database(payload)
{:cont, memo: %{"checksum" => checksum}}
end)
### Chunking / Limiting
The examples above received a single-element list as their first parameter: This is because the
transactions are actually processed in "chunks" and the default chunk size is 1. If you would
like to process more transactions in one chunk, set the `chunk` option:
Carbonite.process(MyApp.Repo, "rabbit_holes", [chunk: 50], fn transactions, _memo ->
for transaction <- transactions do
transaction
|> MyApp.Foo.serialize()
|> MyApp.Foo.send_to_external_database()
end
:cont
end)
The query that is executed to fetch the data from the database is controlled with the `limit`
option and is independent of the chunk size.
### Error handling
In case you run into an error midway into processing a batch, you may choose to halt processing
while remembering about the last processed transaction. This is equivalent to raising an
exception from the processing function.
Carbonite.process(MyApp.Repo, "rabbit_holes", fn [transaction], _memo ->
case send_to_external_database(transaction) do
:ok ->
:cont
{:error, _term} ->
:halt
end
end
You can, however, if you know the first half of a batch has been processed, still update the
`memo` and `last_transaction_id`.
Carbonite.process(MyApp.Repo, "rabbit_holes", fn transactions, _memo ->
case process_transactions(transactions) do
{:error, last_successful_transaction} ->
{:halt, last_transaction_id: last_successful_transaction.id}
:ok ->
:cont
end
end
## Parameters
* `repo` - the Ecto repository
* `outbox_name` - name of the outbox to process
* `opts` - optional keyword list
* `process_func` - see `t:process_func/0` for details
## Options
* `min_age` - the minimum age of a record, defaults to 300 seconds (set nil to disable)
* `limit` - limits the query in size, defaults to 100 (set nil to disable)
* `filter` - function for refining the batch query, defaults to nil
* `chunk` - defines the size of the chunk passed to the process function, defaults to 1
* `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
"""
@doc since: "0.4.0"
@spec process(repo(), Outbox.name(), process_func()) :: {:ok | :halt, Outbox.t()}
@spec process(repo(), Outbox.name(), [process_option()], process_func()) ::
{:ok | :halt, Outbox.t()}
def process(repo, outbox_name, opts \\ [], process_func) do
outbox = load_outbox(repo, outbox_name, opts)
functions = %{
query: query_func(repo, opts),
process: process_func(process_func),
update: update_func(repo)
}
process_outbox(outbox, functions)
end
defp load_outbox(repo, outbox_name, opts) do
opts = Keyword.take(opts, [:carbonite_prefix])
outbox_name
|> Carbonite.Query.outbox(opts)
|> repo.one!()
end
defp query_func(repo, opts) do
filter = Keyword.get(opts, :filter) || (& &1)
chunk = Keyword.get(opts, :chunk, 1)
fn outbox ->
outbox
|> Carbonite.Query.outbox_queue(opts)
|> filter.()
|> repo.all()
|> Enum.chunk_every(chunk)
end
end
defp process_func(process_func) do
fn chunk, outbox ->
case process_func.(chunk, outbox.memo) do
cont_or_halt when is_atom(cont_or_halt) -> {cont_or_halt, []}
{cont_or_halt, opts} -> {cont_or_halt, opts}
end
end
end
defp update_func(repo) do
fn outbox, attrs ->
outbox
|> Outbox.changeset(attrs)
|> repo.update!()
end
end
defp process_outbox(outbox, functions) do
case functions.query.(outbox) do
[] -> {:ok, outbox}
chunks -> process_chunks(chunks, outbox, functions)
end
end
defp process_chunks([], outbox, functions) do
process_outbox(outbox, functions)
end
defp process_chunks([chunk | rest], outbox, functions) do
case process_chunk(chunk, outbox, functions) do
{:cont, outbox} -> process_chunks(rest, outbox, functions)
halt -> halt
end
end
defp process_chunk(chunk, outbox, functions) do
{cont_or_halt, result_opts} = functions.process.(chunk, outbox)
defaults =
if cont_or_halt == :cont do
%{last_transaction_id: List.last(chunk).id}
else
%{}
end
results =
result_opts
|> Keyword.take([:memo, :last_transaction_id])
|> Map.new()
outbox = functions.update.(outbox, Map.merge(defaults, results))
{cont_or_halt, outbox}
end
@type purge_option :: Carbonite.Query.outbox_done_option()
@doc """
Deletes transactions that have been fully processed.
See `Carbonite.Query.outbox_done/1` for query options.
Returns the number of deleted transactions.
## Parameters
* `repo` - the Ecto repository
* `opts` - optional keyword list
## Options
* `min_age` - the minimum age of a record, defaults to 300 seconds (set nil to disable)
* `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
"""
@doc since: "0.4.0"
@spec purge(repo()) :: {:ok, non_neg_integer()}
@spec purge(repo(), [purge_option()]) :: {:ok, non_neg_integer()}
def purge(repo, opts \\ []) do
{deleted, nil} =
opts
|> Carbonite.Query.outbox_done()
|> repo.delete_all()
{:ok, deleted}
end
end