lib/nebulex/adapter/transaction.ex

defmodule Nebulex.Adapter.Transaction do
  @moduledoc """
  Specifies the adapter transactions API.

  ## Default implementation

  This module also provides a default implementation which uses the Erlang
  library `:global`.

  This implementation accepts the following options:

    * `:keys` - The list of the keys that will be locked. Since the lock id is
      generated based on the key, if this option is not set, a fixed/constant
      lock id is used to perform the transaction, then all further transactions
      (without this option set) are serialized and the performance is affected
      significantly. For that reason it is recommended to pass the list of keys
      involved in the transaction.

    * `:nodes` - The list of the nodes where the lock will be set, or on
      all nodes if none are specified.

    * `:retries` - If the key has been locked by other process already, and
      `:retries` is not equal to 0, the process sleeps for a while and tries
      to execute the action later. When `:retries` attempts have been made,
      an exception is raised. If `:retries` is `:infinity` (the default),
      the function will eventually be executed (unless the lock is never
      released).

  Let's see an example:

      MyCache.transaction fn ->
        counter = MyCache.get(:counter)
        MyCache.set(:counter, counter + 1)
      end

  Locking only the involved key (recommended):

      MyCache.transaction [keys: [:counter]], fn ->
        counter = MyCache.get(:counter)
        MyCache.set(:counter, counter + 1)
      end

      MyCache.transaction [keys: [:alice, :bob]], fn ->
        alice = MyCache.get(:alice)
        bob = MyCache.get(:bob)
        MyCache.set(:alice, %{alice | balance: alice.balance + 100})
        MyCache.set(:bob, %{bob | balance: bob.balance + 100})
      end

  """

  @doc """
  Runs the given function inside a transaction.

  A successful transaction returns the value returned by the function.

  See `c:Nebulex.Cache.transaction/2`.
  """
  @callback transaction(Nebulex.Adapter.adapter_meta(), Nebulex.Cache.opts(), fun) :: any

  @doc """
  Returns `true` if the given process is inside a transaction.

  See `c:Nebulex.Cache.in_transaction?/0`.
  """
  @callback in_transaction?(Nebulex.Adapter.adapter_meta()) :: boolean

  @doc false
  defmacro __using__(_opts) do
    quote do
      @behaviour Nebulex.Adapter.Transaction

      @impl true
      def transaction(%{cache: cache, pid: pid} = adapter_meta, opts, fun) do
        adapter_meta
        |> in_transaction?()
        |> do_transaction(
          pid,
          adapter_meta[:name] || cache,
          Keyword.get(opts, :keys, []),
          Keyword.get(opts, :nodes, [node()]),
          Keyword.get(opts, :retries, :infinity),
          fun
        )
      end

      @impl true
      def in_transaction?(%{pid: pid}) do
        !!Process.get({pid, self()})
      end

      defoverridable transaction: 3, in_transaction?: 1

      ## Helpers

      defp do_transaction(true, _pid, _name, _keys, _nodes, _retries, fun) do
        fun.()
      end

      defp do_transaction(false, pid, name, keys, nodes, retries, fun) do
        ids = lock_ids(name, keys)

        case set_locks(ids, nodes, retries) do
          true ->
            try do
              _ = Process.put({pid, self()}, %{keys: keys, nodes: nodes})

              fun.()
            after
              _ = Process.delete({pid, self()})

              del_locks(ids, nodes)
            end

          false ->
            raise "transaction aborted"
        end
      end

      defp set_locks(ids, nodes, retries) do
        maybe_set_lock = fn id, {:ok, acc} ->
          case :global.set_lock(id, nodes, retries) do
            true -> {:cont, {:ok, [id | acc]}}
            false -> {:halt, {:error, acc}}
          end
        end

        case Enum.reduce_while(ids, {:ok, []}, maybe_set_lock) do
          {:ok, _} ->
            true

          {:error, locked_ids} ->
            :ok = del_locks(locked_ids, nodes)

            false
        end
      end

      defp del_locks(ids, nodes) do
        Enum.each(ids, &:global.del_lock(&1, nodes))
      end

      defp lock_ids(name, []) do
        [{name, self()}]
      end

      defp lock_ids(name, keys) do
        Enum.map(keys, &{{name, &1}, self()})
      end
    end
  end
end