% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(erlfdb).
-compile({no_auto_import, [get/1]}).
-export([
open/0,
open/1,
open_tenant/2,
create_transaction/1,
tenant_create_transaction/1,
transactional/2,
snapshot/1,
% Db/Tx configuration
set_option/2,
set_option/3,
% Lifecycle Management
commit/1,
reset/1,
cancel/1,
cancel/2,
% Future Specific functions
is_ready/1,
get/1,
get_error/1,
block_until_ready/1,
wait/1,
wait/2,
wait_for_any/1,
wait_for_any/2,
wait_for_all/1,
wait_for_all/2,
% Data retrieval
get/2,
get_ss/2,
get_key/2,
get_key_ss/2,
get_range/3,
get_range/4,
get_range_startswith/2,
get_range_startswith/3,
get_mapped_range/4,
get_mapped_range/5,
fold_range/5,
fold_range/6,
fold_range_future/4,
fold_range_wait/4,
% Data modifications
set/3,
clear/2,
clear_range/3,
clear_range_startswith/2,
% Atomic operations
add/3,
bit_and/3,
bit_or/3,
bit_xor/3,
min/3,
max/3,
byte_min/3,
byte_max/3,
set_versionstamped_key/3,
set_versionstamped_value/3,
atomic_op/4,
% Watches
watch/2,
get_and_watch/2,
set_and_watch/3,
clear_and_watch/2,
% Conflict ranges
add_read_conflict_key/2,
add_read_conflict_range/3,
add_write_conflict_key/2,
add_write_conflict_range/3,
add_conflict_range/4,
% Transaction versioning
set_read_version/2,
get_read_version/1,
get_committed_version/1,
get_versionstamp/1,
% Transaction size info
get_approximate_size/1,
% Transaction status
get_next_tx_id/1,
is_read_only/1,
has_watches/1,
get_writes_allowed/1,
% Locality and Statistics
get_addresses_for_key/2,
get_estimated_range_size/3,
% Get conflict information
get_conflicting_keys/1,
% Misc
on_error/2,
error_predicate/2,
get_last_error/0,
get_error_string/1
]).
-export_type([
atomic_mode/0,
atomic_operand/0,
cluster_filename/0,
database/0,
database_option/0,
error/0,
error_predicate/0,
fold_future/0,
fold_option/0,
future/0,
key/0,
key_selector/0,
mapper/0,
result/0,
snapshot/0,
tenant/0,
tenant_name/0,
transaction/0,
transaction_option/0,
value/0,
version/0,
wait_option/0
]).
-define(IS_FUTURE, {erlfdb_future, _, _}).
-define(IS_FOLD_FUTURE, {fold_info, _, _}).
-define(IS_DB, {erlfdb_database, _}).
-define(IS_TENANT, {erlfdb_tenant, _}).
-define(IS_TX, {erlfdb_transaction, _}).
-define(IS_SS, {erlfdb_snapshot, _}).
-define(GET_TX(SS), element(2, SS)).
-define(ERLFDB_ERROR, '$erlfdb_error').
-record(fold_st, {
start_key,
end_key,
mapper,
limit,
target_bytes,
streaming_mode,
iteration,
snapshot,
reverse
}).
-type atomic_mode() :: erlfdb_nif:atomic_mode().
-type atomic_operand() :: erlfdb_nif:atomic_operand().
-type cluster_filename() :: binary().
-type database() :: erlfdb_nif:database().
-type database_option() :: erlfdb_nif:database_option().
-type error() :: erlfdb_nif:error().
-type error_predicate() :: erlfdb_nif:error_predicate().
-type fold_future() :: {fold_info, #fold_st{}, future()}.
-type fold_option() ::
{reverse, boolean() | integer()}
| {limit, non_neg_integer()}
| {target_bytes, non_neg_integer()}
| {streaming_mode, atom()}
| {iteration, pos_integer()}
| {snapshot, boolean()}
| {mapper, binary()}.
-type future() :: erlfdb_nif:future().
-type key() :: erlfdb_nif:key().
-type key_selector() :: erlfdb_nif:key_selector().
-type result() :: erlfdb_nif:future_result().
-type mapper() :: tuple().
-type snapshot() :: {erlfdb_snapshot, transaction()}.
-type tenant() :: erlfdb_nif:tenant().
-type tenant_name() :: binary().
-type transaction() :: erlfdb_nif:transaction().
-type transaction_option() :: erlfdb_nif:transaction_option().
-type value() :: erlfdb_nif:value().
-type version() :: erlfdb_nif:version().
-type wait_option() :: {timeout, non_neg_integer() | infinity}.
-spec open() -> database().
open() ->
open(<<>>).
-spec open(cluster_filename()) -> database().
open(ClusterFile) ->
erlfdb_nif:create_database(ClusterFile).
-spec open_tenant(database(), tenant_name()) -> tenant().
open_tenant(?IS_DB = Db, TenantName) ->
erlfdb_nif:database_open_tenant(Db, TenantName).
-spec create_transaction(database()) -> transaction().
create_transaction(?IS_DB = Db) ->
erlfdb_nif:database_create_transaction(Db).
-spec tenant_create_transaction(tenant()) -> transaction().
tenant_create_transaction(?IS_TENANT = Tenant) ->
erlfdb_nif:tenant_create_transaction(Tenant).
-spec transactional(database() | tenant() | transaction() | snapshot(), function()) -> any().
transactional(?IS_DB = Db, UserFun) when is_function(UserFun, 1) ->
clear_erlfdb_error(),
Tx = create_transaction(Db),
do_transaction(Tx, UserFun);
transactional(?IS_TENANT = Tenant, UserFun) when is_function(UserFun, 1) ->
clear_erlfdb_error(),
Tx = tenant_create_transaction(Tenant),
do_transaction(Tx, UserFun);
transactional(?IS_TX = Tx, UserFun) when is_function(UserFun, 1) ->
UserFun(Tx);
transactional(?IS_SS = SS, UserFun) when is_function(UserFun, 1) ->
UserFun(SS).
-spec snapshot(transaction() | snapshot()) -> snapshot().
snapshot(?IS_TX = Tx) ->
{erlfdb_snapshot, Tx};
snapshot(?IS_SS = SS) ->
SS.
-spec set_option(database() | transaction(), database_option() | transaction_option()) -> ok.
set_option(DbOrTx, Option) ->
set_option(DbOrTx, Option, <<>>).
-spec set_option(database() | transaction(), database_option() | transaction_option(), binary()) ->
ok.
set_option(?IS_DB = Db, DbOption, Value) ->
erlfdb_nif:database_set_option(Db, DbOption, Value);
set_option(?IS_TX = Tx, TxOption, Value) ->
erlfdb_nif:transaction_set_option(Tx, TxOption, Value).
-spec commit(transaction()) -> future().
commit(?IS_TX = Tx) ->
erlfdb_nif:transaction_commit(Tx).
-spec reset(transaction()) -> ok.
reset(?IS_TX = Tx) ->
ok = erlfdb_nif:transaction_reset(Tx).
-spec cancel(fold_future() | future() | transaction()) -> ok.
cancel(?IS_FOLD_FUTURE = FoldInfo) ->
cancel(FoldInfo, []);
cancel(?IS_FUTURE = Future) ->
cancel(Future, []);
cancel(?IS_TX = Tx) ->
ok = erlfdb_nif:transaction_cancel(Tx).
-spec cancel(fold_future() | future(), [{flush, boolean()}] | []) -> ok.
cancel(?IS_FOLD_FUTURE = FoldInfo, Options) ->
{fold_info, _St, Future} = FoldInfo,
cancel(Future, Options);
cancel(?IS_FUTURE = Future, Options) ->
ok = erlfdb_nif:future_cancel(Future),
case erlfdb_util:get(Options, flush, false) of
true -> flush_future_message(Future);
false -> ok
end.
-spec is_ready(future()) -> boolean().
is_ready(?IS_FUTURE = Future) ->
erlfdb_nif:future_is_ready(Future).
-spec get_error(future()) -> error().
get_error(?IS_FUTURE = Future) ->
erlfdb_nif:future_get_error(Future).
-spec get(future()) -> any().
get(?IS_FUTURE = Future) ->
erlfdb_nif:future_get(Future).
-spec block_until_ready(future()) -> ok.
block_until_ready(?IS_FUTURE = Future) ->
{erlfdb_future, MsgRef, _FRef} = Future,
receive
{MsgRef, ready} -> ok
end.
-spec wait(future() | result()) -> result().
wait(?IS_FUTURE = Future) ->
wait(Future, []);
wait(Ready) ->
Ready.
-spec wait(future() | any(), [wait_option()]) -> any().
wait(?IS_FUTURE = Future, Options) ->
case is_ready(Future) of
true ->
Result = get(Future),
% Flush ready message if already sent
flush_future_message(Future),
Result;
false ->
Timeout = erlfdb_util:get(Options, timeout, infinity),
{erlfdb_future, MsgRef, _Res} = Future,
receive
{MsgRef, ready} -> get(Future)
after Timeout ->
erlang:error({timeout, Future})
end
end;
wait(Ready, _) ->
Ready.
-spec wait_for_any([future()]) -> any().
wait_for_any(Futures) ->
wait_for_any(Futures, []).
-spec wait_for_any([future()], [wait_option()]) -> any().
wait_for_any(Futures, Options) ->
wait_for_any(Futures, Options, []).
-spec wait_for_any([future()], [wait_option()], list()) -> any().
wait_for_any(Futures, Options, ResendQ) ->
Timeout = erlfdb_util:get(Options, timeout, infinity),
receive
{MsgRef, ready} = Msg ->
case lists:keyfind(MsgRef, 2, Futures) of
?IS_FUTURE = Future ->
lists:foreach(
fun(M) ->
self() ! M
end,
ResendQ
),
Future;
_ ->
wait_for_any(Futures, Options, [Msg | ResendQ])
end
after Timeout ->
lists:foreach(
fun(M) ->
self() ! M
end,
ResendQ
),
erlang:error({timeout, Futures})
end.
-spec wait_for_all([future()]) -> list().
wait_for_all(Futures) ->
wait_for_all(Futures, []).
-spec wait_for_all([future()], [wait_option()]) -> list().
wait_for_all(Futures, Options) ->
% Same as wait for all. We might want to
% handle timeouts here so we have a single
% timeout for all future waiting.
lists:map(
fun(Future) ->
wait(Future, Options)
end,
Futures
).
-spec get(database() | transaction() | snapshot(), key()) -> future().
get(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
wait(get(Tx, Key))
end);
get(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get(Tx, Key, false);
get(?IS_SS = SS, Key) ->
get_ss(?GET_TX(SS), Key).
-spec get_ss(transaction() | snapshot(), key()) -> future().
get_ss(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get(Tx, Key, true);
get_ss(?IS_SS = SS, Key) ->
get_ss(?GET_TX(SS), Key).
-spec get_key(database() | transaction() | snapshot(), key_selector()) -> future().
get_key(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
wait(get_key(Tx, Key))
end);
get_key(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get_key(Tx, Key, false);
get_key(?IS_SS = SS, Key) ->
get_key_ss(?GET_TX(SS), Key).
-spec get_key_ss(transaction(), key_selector()) -> future().
get_key_ss(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get_key(Tx, Key, true).
-spec get_range(database() | transaction(), key(), key()) -> future().
get_range(DbOrTx, StartKey, EndKey) ->
get_range(DbOrTx, StartKey, EndKey, []).
-spec get_range(database() | transaction(), key(), key(), [fold_option()]) -> future().
get_range(?IS_DB = Db, StartKey, EndKey, Options) ->
transactional(Db, fun(Tx) ->
get_range(Tx, StartKey, EndKey, Options)
end);
get_range(?IS_TX = Tx, StartKey, EndKey, Options) ->
Fun = fun(Rows, Acc) -> [Rows | Acc] end,
Chunks = fold_range_int(Tx, StartKey, EndKey, Fun, [], Options),
lists:flatten(lists:reverse(Chunks));
get_range(?IS_SS = SS, StartKey, EndKey, Options) ->
get_range(?GET_TX(SS), StartKey, EndKey, [{snapshot, true} | Options]).
-spec get_range_startswith(database() | transaction(), key()) -> future().
get_range_startswith(DbOrTx, Prefix) ->
get_range_startswith(DbOrTx, Prefix, []).
-spec get_range_startswith(database() | transaction(), key(), [fold_option()]) -> future().
get_range_startswith(DbOrTx, Prefix, Options) ->
StartKey = Prefix,
EndKey = erlfdb_key:strinc(Prefix),
get_range(DbOrTx, StartKey, EndKey, Options).
-spec get_mapped_range(database() | transaction(), key(), key(), mapper()) -> future().
get_mapped_range(DbOrTx, StartKey, EndKey, Mapper) ->
get_mapped_range(DbOrTx, StartKey, EndKey, Mapper, []).
-spec get_mapped_range(database() | transaction(), key(), key(), mapper(), [fold_option()]) ->
future().
get_mapped_range(DbOrTx, StartKey, EndKey, Mapper, Options) ->
get_range(DbOrTx, StartKey, EndKey, [{mapper, erlfdb_tuple:pack(Mapper)} | Options]).
-spec fold_range(database() | transaction(), key(), key(), function(), any()) -> any().
fold_range(DbOrTx, StartKey, EndKey, Fun, Acc) ->
fold_range(DbOrTx, StartKey, EndKey, Fun, Acc, []).
-spec fold_range(database() | transaction(), key(), key(), function(), any(), [fold_option()]) ->
any().
fold_range(?IS_DB = Db, StartKey, EndKey, Fun, Acc, Options) ->
transactional(Db, fun(Tx) ->
fold_range(Tx, StartKey, EndKey, Fun, Acc, Options)
end);
fold_range(?IS_TX = Tx, StartKey, EndKey, Fun, Acc, Options) ->
fold_range_int(
Tx,
StartKey,
EndKey,
fun(Rows, InnerAcc) ->
lists:foldl(Fun, InnerAcc, Rows)
end,
Acc,
Options
);
fold_range(?IS_SS = SS, StartKey, EndKey, Fun, Acc, Options) ->
SSOptions = [{snapshot, true} | Options],
fold_range(?GET_TX(SS), StartKey, EndKey, Fun, Acc, SSOptions).
-spec fold_range_future(transaction() | snapshot(), key(), key(), [fold_option()]) -> fold_future().
fold_range_future(?IS_TX = Tx, StartKey, EndKey, Options) ->
St = options_to_fold_st(StartKey, EndKey, Options),
fold_range_future_int(Tx, St);
fold_range_future(?IS_SS = SS, StartKey, EndKey, Options) ->
SSOptions = [{snapshot, true} | Options],
fold_range_future(?GET_TX(SS), StartKey, EndKey, SSOptions).
-spec fold_range_wait(transaction(), fold_future(), function(), any()) -> any().
fold_range_wait(?IS_TX = Tx, ?IS_FOLD_FUTURE = FI, Fun, Acc) ->
fold_range_int(
Tx,
FI,
fun(Rows, InnerAcc) ->
lists:foldl(Fun, InnerAcc, Rows)
end,
Acc
);
fold_range_wait(?IS_SS = SS, ?IS_FOLD_FUTURE = FI, Fun, Acc) ->
fold_range_wait(?GET_TX(SS), FI, Fun, Acc).
-spec set(database() | transaction() | snapshot(), key(), value()) -> future().
set(?IS_DB = Db, Key, Value) ->
transactional(Db, fun(Tx) ->
set(Tx, Key, Value)
end);
set(?IS_TX = Tx, Key, Value) ->
erlfdb_nif:transaction_set(Tx, Key, Value);
set(?IS_SS = SS, Key, Value) ->
set(?GET_TX(SS), Key, Value).
-spec clear(database() | transaction() | snapshot(), key()) -> ok.
clear(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
clear(Tx, Key)
end);
clear(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_clear(Tx, Key);
clear(?IS_SS = SS, Key) ->
clear(?GET_TX(SS), Key).
-spec clear_range(database() | transaction() | snapshot(), key(), key()) -> ok.
clear_range(?IS_DB = Db, StartKey, EndKey) ->
transactional(Db, fun(Tx) ->
clear_range(Tx, StartKey, EndKey)
end);
clear_range(?IS_TX = Tx, StartKey, EndKey) ->
erlfdb_nif:transaction_clear_range(Tx, StartKey, EndKey);
clear_range(?IS_SS = SS, StartKey, EndKey) ->
clear_range(?GET_TX(SS), StartKey, EndKey).
-spec clear_range_startswith(database() | transaction() | snapshot(), key()) -> ok.
clear_range_startswith(?IS_DB = Db, Prefix) ->
transactional(Db, fun(Tx) ->
clear_range_startswith(Tx, Prefix)
end);
clear_range_startswith(?IS_TX = Tx, Prefix) ->
EndKey = erlfdb_key:strinc(Prefix),
erlfdb_nif:transaction_clear_range(Tx, Prefix, EndKey);
clear_range_startswith(?IS_SS = SS, Prefix) ->
clear_range_startswith(?GET_TX(SS), Prefix).
-spec add(database() | transaction() | snapshot(), key(), atomic_operand()) -> ok.
add(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, add).
-spec bit_and(database() | transaction() | snapshot(), key(), atomic_operand()) -> ok.
bit_and(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, bit_and).
-spec bit_or(database() | transaction() | snapshot(), key(), atomic_operand()) -> ok.
bit_or(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, bit_or).
-spec bit_xor(database() | transaction() | snapshot(), key(), atomic_operand()) -> ok.
bit_xor(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, bit_xor).
-spec min(database() | transaction() | snapshot(), key(), atomic_operand()) -> ok.
min(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, min).
-spec max(database() | transaction() | snapshot(), key(), atomic_operand()) -> ok.
max(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, max).
-spec byte_min(database() | transaction() | snapshot(), key(), atomic_operand()) -> ok.
byte_min(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, byte_min).
-spec byte_max(database() | transaction() | snapshot(), key(), atomic_operand()) -> ok.
byte_max(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, byte_max).
-spec set_versionstamped_key(database() | transaction() | snapshot(), key(), atomic_operand()) ->
ok.
set_versionstamped_key(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, set_versionstamped_key).
-spec set_versionstamped_value(database() | transaction() | snapshot(), key(), atomic_operand()) ->
ok.
set_versionstamped_value(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, set_versionstamped_value).
-spec atomic_op(database() | transaction() | snapshot(), key(), atomic_operand(), atomic_mode()) ->
ok.
atomic_op(?IS_DB = Db, Key, Param, Op) ->
transactional(Db, fun(Tx) ->
atomic_op(Tx, Key, Param, Op)
end);
atomic_op(?IS_TX = Tx, Key, Param, Op) ->
erlfdb_nif:transaction_atomic_op(Tx, Key, Param, Op);
atomic_op(?IS_SS = SS, Key, Param, Op) ->
atomic_op(?GET_TX(SS), Key, Param, Op).
-spec watch(database() | transaction() | snapshot(), key()) -> future().
watch(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
watch(Tx, Key)
end);
watch(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_watch(Tx, Key);
watch(?IS_SS = SS, Key) ->
watch(?GET_TX(SS), Key).
-spec get_and_watch(database(), key()) -> {result(), future()}.
get_and_watch(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
KeyFuture = get(Tx, Key),
WatchFuture = watch(Tx, Key),
{wait(KeyFuture), WatchFuture}
end).
-spec set_and_watch(database(), key(), value()) -> future().
set_and_watch(?IS_DB = Db, Key, Value) ->
transactional(Db, fun(Tx) ->
set(Tx, Key, Value),
watch(Tx, Key)
end).
-spec clear_and_watch(database(), key()) -> future().
clear_and_watch(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
clear(Tx, Key),
watch(Tx, Key)
end).
-spec add_read_conflict_key(transaction() | snapshot(), key()) -> ok.
add_read_conflict_key(TxObj, Key) ->
add_read_conflict_range(TxObj, Key, <<Key/binary, 16#00>>).
-spec add_read_conflict_range(transaction() | snapshot(), key(), key()) -> ok.
add_read_conflict_range(TxObj, StartKey, EndKey) ->
add_conflict_range(TxObj, StartKey, EndKey, read).
-spec add_write_conflict_key(transaction() | snapshot(), key()) -> ok.
add_write_conflict_key(TxObj, Key) ->
add_write_conflict_range(TxObj, Key, <<Key/binary, 16#00>>).
-spec add_write_conflict_range(transaction() | snapshot(), key(), key()) -> ok.
add_write_conflict_range(TxObj, StartKey, EndKey) ->
add_conflict_range(TxObj, StartKey, EndKey, write).
-spec add_conflict_range(transaction() | snapshot(), key(), key(), read | write) -> ok.
add_conflict_range(?IS_TX = Tx, StartKey, EndKey, Type) ->
erlfdb_nif:transaction_add_conflict_range(Tx, StartKey, EndKey, Type);
add_conflict_range(?IS_SS = SS, StartKey, EndKey, Type) ->
add_conflict_range(?GET_TX(SS), StartKey, EndKey, Type).
-spec set_read_version(transaction() | snapshot(), version()) -> ok.
set_read_version(?IS_TX = Tx, Version) ->
erlfdb_nif:transaction_set_read_version(Tx, Version);
set_read_version(?IS_SS = SS, Version) ->
set_read_version(?GET_TX(SS), Version).
-spec get_read_version(transaction() | snapshot()) -> future().
get_read_version(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_read_version(Tx);
get_read_version(?IS_SS = SS) ->
get_read_version(?GET_TX(SS)).
-spec get_committed_version(transaction() | snapshot()) -> version().
get_committed_version(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_committed_version(Tx);
get_committed_version(?IS_SS = SS) ->
get_committed_version(?GET_TX(SS)).
-spec get_versionstamp(transaction() | snapshot()) -> future().
get_versionstamp(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_versionstamp(Tx);
get_versionstamp(?IS_SS = SS) ->
get_versionstamp(?GET_TX(SS)).
-spec get_approximate_size(transaction() | snapshot()) -> non_neg_integer().
get_approximate_size(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_approximate_size(Tx);
get_approximate_size(?IS_SS = SS) ->
get_approximate_size(?GET_TX(SS)).
-spec get_next_tx_id(transaction() | snapshot()) -> non_neg_integer().
get_next_tx_id(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_next_tx_id(Tx);
get_next_tx_id(?IS_SS = SS) ->
get_next_tx_id(?GET_TX(SS)).
-spec is_read_only(transaction() | snapshot()) -> boolean().
is_read_only(?IS_TX = Tx) ->
erlfdb_nif:transaction_is_read_only(Tx);
is_read_only(?IS_SS = SS) ->
is_read_only(?GET_TX(SS)).
-spec has_watches(transaction() | snapshot()) -> boolean().
has_watches(?IS_TX = Tx) ->
erlfdb_nif:transaction_has_watches(Tx);
has_watches(?IS_SS = SS) ->
has_watches(?GET_TX(SS)).
-spec get_writes_allowed(transaction() | snapshot()) -> boolean().
get_writes_allowed(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_writes_allowed(Tx);
get_writes_allowed(?IS_SS = SS) ->
get_writes_allowed(?GET_TX(SS)).
-spec get_addresses_for_key(database() | transaction() | snapshot(), key()) -> future() | result().
get_addresses_for_key(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
wait(get_addresses_for_key(Tx, Key))
end);
get_addresses_for_key(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get_addresses_for_key(Tx, Key);
get_addresses_for_key(?IS_SS = SS, Key) ->
get_addresses_for_key(?GET_TX(SS), Key).
-spec get_estimated_range_size(transaction() | snapshot(), key(), key()) -> future().
get_estimated_range_size(?IS_TX = Tx, StartKey, EndKey) ->
erlfdb_nif:transaction_get_estimated_range_size(Tx, StartKey, EndKey);
get_estimated_range_size(?IS_SS = SS, StartKey, EndKey) ->
erlfdb_nif:transaction_get_estimated_range_size(?GET_TX(SS), StartKey, EndKey).
-spec get_conflicting_keys(transaction()) -> future().
get_conflicting_keys(?IS_TX = Tx) ->
StartKey = <<16#FF, 16#FF, "/transaction/conflicting_keys/">>,
EndKey = <<16#FF, 16#FF, "/transaction/conflicting_keys/", 16#FF>>,
get_range(Tx, StartKey, EndKey).
-spec on_error(transaction() | snapshot(), error() | integer()) -> future().
on_error(?IS_TX = Tx, {erlfdb_error, ErrorCode}) ->
on_error(Tx, ErrorCode);
on_error(?IS_TX = Tx, ErrorCode) ->
erlfdb_nif:transaction_on_error(Tx, ErrorCode);
on_error(?IS_SS = SS, Error) ->
on_error(?GET_TX(SS), Error).
-spec error_predicate(error_predicate(), error() | integer()) -> boolean().
error_predicate(Predicate, {erlfdb_error, ErrorCode}) ->
error_predicate(Predicate, ErrorCode);
error_predicate(Predicate, ErrorCode) ->
erlfdb_nif:error_predicate(Predicate, ErrorCode).
-spec get_last_error() -> error() | undefined.
get_last_error() ->
erlang:get(?ERLFDB_ERROR).
-spec get_error_string(integer()) -> binary().
get_error_string(ErrorCode) when is_integer(ErrorCode) ->
erlfdb_nif:get_error(ErrorCode).
-spec clear_erlfdb_error() -> ok.
clear_erlfdb_error() ->
put(?ERLFDB_ERROR, undefined).
-spec do_transaction(transaction(), function()) -> any().
do_transaction(?IS_TX = Tx, UserFun) ->
try
Ret = UserFun(Tx),
case is_read_only(Tx) andalso not has_watches(Tx) of
true -> ok;
false -> wait(commit(Tx), [{timeout, infinity}])
end,
Ret
catch
error:{erlfdb_error, Code} ->
put(?ERLFDB_ERROR, Code),
wait(on_error(Tx, Code), [{timeout, infinity}]),
do_transaction(Tx, UserFun)
end.
-spec fold_range_int(transaction(), key(), key(), function(), any(), [fold_option()]) -> any().
fold_range_int(?IS_TX = Tx, StartKey, EndKey, Fun, Acc, Options) ->
St = options_to_fold_st(StartKey, EndKey, Options),
fold_range_int(Tx, St, Fun, Acc).
-spec fold_range_int(transaction(), fold_future() | #fold_st{}, function(), any()) -> any().
fold_range_int(Tx, #fold_st{} = St, Fun, Acc) ->
RangeFuture = fold_range_future_int(Tx, St),
fold_range_int(Tx, RangeFuture, Fun, Acc);
fold_range_int(Tx, ?IS_FOLD_FUTURE = FI, Fun, Acc) ->
{fold_info, St, Future} = FI,
#fold_st{
start_key = StartKey,
end_key = EndKey,
limit = Limit,
iteration = Iteration,
reverse = Reverse
} = St,
{RawRows, Count, HasMore} = wait(Future),
Count = length(RawRows),
% If our limit is within the current set of
% rows we need to truncate the list
Rows =
if
Limit == 0 orelse Limit > Count -> RawRows;
true -> lists:sublist(RawRows, Limit)
end,
% Invoke our callback to update the accumulator
NewAcc =
if
Rows == [] -> Acc;
true -> Fun(Rows, Acc)
end,
% Determine if we have more rows to iterate
Recurse = (Rows /= []) and (Limit == 0 orelse Limit > Count) and HasMore,
if
not Recurse ->
NewAcc;
true ->
LastKey = fold_range_last_key_int(Rows, St),
{NewStartKey, NewEndKey} =
case Reverse /= 0 of
true ->
{StartKey, erlfdb_key:first_greater_or_equal(LastKey)};
false ->
{erlfdb_key:first_greater_than(LastKey), EndKey}
end,
NewSt = St#fold_st{
start_key = NewStartKey,
end_key = NewEndKey,
limit =
if
Limit == 0 -> 0;
true -> Limit - Count
end,
iteration = Iteration + 1
},
fold_range_int(Tx, NewSt, Fun, NewAcc)
end.
-spec fold_range_last_key_int(list(), #fold_st{}) -> key().
fold_range_last_key_int(Rows, #fold_st{mapper = undefined}) ->
element(1, lists:last(Rows));
fold_range_last_key_int(Rows, #fold_st{mapper = _Mapper}) ->
element(1, element(1, lists:last(Rows))).
-spec fold_range_future_int(transaction(), #fold_st{}) -> fold_future().
fold_range_future_int(?IS_TX = Tx, #fold_st{mapper = undefined} = St) ->
#fold_st{
start_key = StartKey,
end_key = EndKey,
limit = Limit,
target_bytes = TargetBytes,
streaming_mode = StreamingMode,
iteration = Iteration,
snapshot = Snapshot,
reverse = Reverse
} = St,
Future = erlfdb_nif:transaction_get_range(
Tx,
StartKey,
EndKey,
Limit,
TargetBytes,
StreamingMode,
Iteration,
Snapshot,
Reverse
),
{fold_info, St, Future};
fold_range_future_int(?IS_TX = Tx, #fold_st{} = St) ->
#fold_st{
start_key = StartKey,
end_key = EndKey,
mapper = Mapper,
limit = Limit,
target_bytes = TargetBytes,
streaming_mode = StreamingMode,
iteration = Iteration,
snapshot = Snapshot,
reverse = Reverse
} = St,
Future = erlfdb_nif:transaction_get_mapped_range(
Tx,
StartKey,
EndKey,
Mapper,
Limit,
TargetBytes,
StreamingMode,
Iteration,
Snapshot,
Reverse
),
{fold_info, St, Future}.
-spec options_to_fold_st(key(), key(), [fold_option()]) -> #fold_st{}.
options_to_fold_st(StartKey, EndKey, Options) ->
Reverse =
case erlfdb_util:get(Options, reverse, false) of
true -> 1;
false -> 0;
I when is_integer(I) -> I
end,
#fold_st{
mapper = erlfdb_util:get(Options, mapper),
start_key = erlfdb_key:to_selector(StartKey),
end_key = erlfdb_key:to_selector(EndKey),
limit = erlfdb_util:get(Options, limit, 0),
target_bytes = erlfdb_util:get(Options, target_bytes, 0),
streaming_mode = erlfdb_util:get(Options, streaming_mode, want_all),
iteration = erlfdb_util:get(Options, iteration, 1),
snapshot = erlfdb_util:get(Options, snapshot, false),
reverse = Reverse
}.
-spec flush_future_message(future()) -> ok.
flush_future_message(?IS_FUTURE = Future) ->
erlfdb_nif:future_silence(Future),
{erlfdb_future, MsgRef, _Res} = Future,
receive
{MsgRef, ready} -> ok
after 0 -> ok
end.