lib/cubdb/transaction.ex

defmodule CubDB.Tx do
  @moduledoc """
  The `CubDB.Tx` module contains functions to read or write within atomic
  transactions created with `CubDB.transaction/2`.

  The functions in this module mirror the ones with the same name in the `CubDB`
  module, but work on transactions instead of on the live database.
  """

  alias CubDB.Btree
  alias CubDB.Reader
  alias CubDB.Snapshot
  alias CubDB.Tx

  @enforce_keys [:btree, :compacting, :owner, :db]
  defstruct [
    :btree,
    :compacting,
    :owner,
    :db,
    recompact: false
  ]

  @typep transaction :: %Tx{
           btree: Btree.t(),
           compacting: boolean,
           owner: GenServer.from(),
           db: GenServer.server(),
           recompact: boolean
         }

  @type t :: transaction

  @spec get(Tx.t(), CubDB.key(), any) :: CubDB.value()

  @doc """
  Gets the value associated to `key` from the transaction.

  If no value is associated with `key`, `default` is returned (which is `nil`,
  unless specified otherwise).

  It works the same as `CubDB.get/3`, but reads from a transaction instead of
  the live database.
  """
  def get(tx = %Tx{btree: btree}, key, default \\ nil) do
    validate_transaction!(tx)
    Reader.get(btree, key, default)
  end

  @spec fetch(Tx.t(), CubDB.key()) :: {:ok, CubDB.value()} | :error

  @doc """
  Fetches the value for the given `key` from the transaction, or returns
  `:error` if `key` is not present.

  If the snapshot contains an entry with the given `key` and value `value`, it
  returns `{:ok, value}`. If `key` is not found, it returns `:error`.

  It works the same as `CubDB.fetch/2`, but reads from a transaction instead of
  the live database.
  """
  def fetch(tx = %Tx{btree: btree}, key) do
    validate_transaction!(tx)
    Reader.fetch(btree, key)
  end

  @spec has_key?(Tx.t(), CubDB.key()) :: boolean

  @doc """
  Returns whether an entry with the given `key` exists in the transaction.

  It works the same as `CubDB.has_key?/2`, but reads from a transaction instead
  of the live database.
  """
  def has_key?(tx = %Tx{btree: btree}, key) do
    validate_transaction!(tx)
    Reader.has_key?(btree, key)
  end

  @spec select(Tx.t(), [CubDB.select_option()]) :: Enumerable.t()

  @doc """
  Selects a range of entries from the transaction, returning a lazy stream.

  The lazy stream should be evaluated within the transaction scope, or a
  `RuntimeError` will be raised.

  It works the same and accepts the same options as `CubDB.select/2`, but reads
  from a transaction instead of the live database.
  """
  def select(%Tx{btree: btree} = tx, options \\ []) when is_list(options) do
    Stream.resource(
      fn ->
        validate_transaction!(tx)
        stream = Reader.select(btree, options)
        step = fn val, _acc -> {:suspend, val} end
        &Enumerable.reduce(stream, &1, step)
      end,
      fn next ->
        case next.({:cont, nil}) do
          {:done, _} ->
            {:halt, nil}

          {:suspended, value, next} ->
            {[value], next}
        end
      end,
      fn _ -> nil end
    )
  end

  @spec refetch(Tx.t(), CubDB.key(), Snapshot.t()) :: :unchanged | {:ok, CubDB.value()} | :error

  @doc """
  If `key` was not written since the point in time represented by `snapshot`,
  returns `:unchanged`. Otherwise, behaves like `fetch/2`.

  Checking if the key was written is done without fetching the whole entry, but
  instead only checking index pages. This makes `refetch/3` useful as a
  performance optimization in cases when one needs to verify if an entry changed
  with respect to a snapshot: using `refetch/3` can save some disk access if
  `CubDB` is able to determine that the key was not written since the snapshot.

  In some situations, such as when the entry is not present in the database, or
  when a compaction completed after `snapshot`, the function cannot determine if
  the entry was written or not since `snapshot`, and therefore fetches it. In
  other words, `refetch/3` is a performance optimization to save some disk reads
  when possible, but it might fetch an entry even if it technically did not
  change.

  ## Example

  The function `refetch/3` can be useful when implementing optimistic
  concurrency control. Suppose, for example, that computing the updated value of
  some entry is a slow operation. In order to avoid holding a transaction open
  for too long, one could compute the update outside of the transaction, and
  then check if the value on which the update was computed is still the same: if
  so, commit the update, otherwise perform the computation again. This kind of
  optimistic concurrency control can use `refetch/3` to avoid reading the value
  from disk when nothing has changed:

      def update_optimistically(db, key) do
        outcome = CubDB.with_snapshot(db, fn snap ->
          {:ok, value} = CubDB.Snapshot.fetch(snap, key)

          # Perform the slow calculation outside of the transaction
          new_value = some_slow_calculation(value)

          # In a transaction, check if the value changed, and update it if not
          write_if_unchanged(db, key, value, snap)
        end)

        # Depending on the outcome, return or recompute
        case outcome do
          :recompute ->
            update_optimistically(db, key)

          :ok ->
            :ok
        end
      end

      defp write_if_unchanged(db, key, value, snap) do
        CubDB.transaction(db, fn tx ->
          # Check if the value changed in the meanwhile
          case CubDB.Tx.refetch(tx, key, snap) do
            :unchanged ->
              # The entry was not written since we last read it. Commit the
              # new value and return :ok
              {:commit, CubDB.Tx.put(tx, key, new_value), :ok}

            {:ok, ^value} ->
              # The entry was written, but its value did not change. Commit the
              # new value and return :ok
              {:commit, CubDB.Tx.put(tx, key, new_value), :ok}

            _ ->
              # The entry changed since we last read it, cancel the
              # transaction and return :recompute
              {:cancel, :recompute}
          end
        end)
      end
  """
  def refetch(tx, key, snapshot)

  def refetch(%Tx{db: db}, _key, %Snapshot{db: other_db}) when db != other_db do
    raise "Invalid snapshot from another CubDB process"
  end

  def refetch(tx = %Tx{btree: btree}, key, %Snapshot{btree: reference_btree}) do
    validate_transaction!(tx)

    case Btree.written_since?(btree, key, reference_btree) do
      false ->
        :unchanged

      _ ->
        Reader.fetch(btree, key)
    end
  end

  @spec size(Tx.t()) :: non_neg_integer

  @doc """
  Returns the number of entries present in the transaction.

  It works the same as `CubDB.size/1`, but works on a transaction instead of the
  live database.
  """
  def size(tx = %Tx{btree: btree}) do
    validate_transaction!(tx)
    Reader.size(btree)
  end

  @spec put(Tx.t(), CubDB.key(), CubDB.value()) :: Tx.t()

  @doc """
  Writes an entry in the transaction, associating `key` to `value`.

  If `key` was already present, it is overwritten.

  Like other functions in this module, it does *not* mutate the transaction in
  place, and instead returns a modified transaction.
  """
  def put(tx = %Tx{btree: btree}, key, value) do
    validate_transaction!(tx)
    %Tx{tx | btree: Btree.insert(btree, key, value)}
  end

  @spec put_new(Tx.t(), CubDB.key(), CubDB.value()) :: Tx.t() | {:error, :exists}

  @doc """
  Writes an entry in the transaction, associating `key` to `value`, only if
  `key` is not yet present.

  If `key` is already present, it does not change it, and returns `{:error,
  :exists}`.

  Like other functions in this module, it does *not* mutate the transaction in
  place, and instead returns a modified transaction.
  """
  def put_new(tx = %Tx{btree: btree}, key, value) do
    validate_transaction!(tx)

    case Btree.insert_new(btree, key, value) do
      {:error, :exists} = reply ->
        reply

      btree ->
        %Tx{tx | btree: btree}
    end
  end

  @spec delete(Tx.t(), CubDB.key()) :: Tx.t()

  @doc """
  Deletes the entry associated to `key` from the transaction.

  If `key` was not present, nothing is done.

  Like other functions in this module, it does *not* mutate the transaction in
  place, and instead returns a modified transaction.
  """
  def delete(tx = %Tx{btree: btree, compacting: compacting}, key) do
    validate_transaction!(tx)

    if compacting do
      %Tx{tx | btree: Btree.mark_deleted(btree, key)}
    else
      %Tx{tx | btree: Btree.delete(btree, key)}
    end
  end

  @spec clear(Tx.t()) :: Tx.t()

  @doc """
  Deletes all entries, resulting in an empty database.

  The deletion is much more performant than deleating each entry manually.

  It works like `CubDB.clear/1`, but on a transaction.

  If a compaction is in progress when `clear/1` is called, and the transaction
  is committed, the compaction is halted, and a new one started immediately
  after. The new compaction should be very fast, as the database is empty as a
  result of the `clear/1` call.

  Like other functions in this module, it does *not* mutate the transaction in
  place, and instead returns a modified transaction.
  """
  def clear(tx = %Tx{btree: btree, compacting: compacting}) do
    validate_transaction!(tx)
    %Tx{tx | btree: Btree.clear(btree), recompact: compacting}
  end

  defp validate_transaction!(tx = %Tx{db: db}) do
    case GenServer.call(db, {:validate_transaction, tx}, :infinity) do
      :ok ->
        :ok

      :error ->
        raise "Invalid transaction, likely because it was used outside of its scope"
    end
  end
end