src/cuckoo_filter.erl

%%%-------------------------------------------------------------------
%% @doc High-performance, concurrent, and mutable Cuckoo Filter
%% implemented using atomics for Erlang and Elixir.
%% @end
%%%-------------------------------------------------------------------

-module(cuckoo_filter).

-export([
    new/1, new/2,
    add/2, add/3,
    contains/2,
    delete/2,
    hash/2,
    add_hash/2, add_hash/3,
    contains_hash/2,
    contains_fingerprint/3,
    delete_hash/2,
    capacity/1,
    size/1,
    whereis/1,
    export/1,
    import/2
]).

-include("cuckoo_filter.hrl").

-type cuckoo_filter() :: #cuckoo_filter{}.
-type hash() :: non_neg_integer().

-export_type([cuckoo_filter/0, hash/0]).

-type filter_name() :: term().
-type fingerprint() :: pos_integer().
-type index() :: non_neg_integer().
-type options() :: [option()].
-type option() ::
    {name, filter_name()}
    | {fingerprint_size, 4 | 8 | 16 | 32 | 64}
    | {bucket_size, pos_integer()}
    | {max_evictions, non_neg_integer()}
    | {hash_function, fun((binary()) -> hash())}.

%% Default configurations
-define(DEFAULT_FINGERPRINT_SIZE, 16).
-define(DEFAULT_BUCKET_SIZE, 4).
-define(DEFAULT_EVICTIONS, 100).

-define(FILTER(FilterName), persistent_term:get({?MODULE, FilterName})).
-define(TRANSACTION(Filter, Expr),
    case Filter of
        #cuckoo_filter{max_evictions = 0} ->
            Expr;
        #cuckoo_filter{lock = Lock} ->
            LockId = spinlock:acquire(Lock),
            try Expr of
                Res -> Res
            after
                spinlock:release(Lock, LockId)
            end
    end
).

%% @equiv new(Capacity, [])
-spec new(pos_integer()) -> cuckoo_filter().
new(Capacity) ->
    new(Capacity, []).

%% @doc Creates a new cuckoo filter with the given capacity and options
%%
%% Note that the actual capacity might be higher than the given capacity,
%% because internally number of buckets in a cuckoo filter must be a power of 2.
%%
%% Possible options are:
%% <ul>
%% <li>`{name, Name}'
%% <p>If you give it a name, created filter instance will be stored in
%% persistent_term, and later you can access the filter by its name.</p>
%% </li>
%% <li>`{fingerprint_size, FingerprintSize}'
%% <p>FingerprintSize can be one of 4, 8, 16, 32, and 64 bits. Default fingerprint
%% size is 16 bits.</p>
%% </li>
%% <li>`{bucket_size, BucketSize}'
%% <p>BucketSize must be a non negative integer, and the default value is 4.
%% Higher bucket sizes can reduce insert time considerably since it reduces the number
%% of relocations of existing fingerprints in occupied buckets, but it increases the
%% lookup time, and false positive rate.</p>
%% </li>
%% <li>`{max_evictions, MaxEvictions}'
%% <p> MaxEvictions indicates the maximum number of relocation attemps before giving up
%% when inserting a new element.</p>
%% </li>
%% <li>`{hash_function, HashFunction}'
%% <p> You can specify a custom hash function that accepts a binary as argument
%% and returns hash value as an integer. By default xxh3 hash functions are used,
%% and you need to manually add `xxh3' to the list of your project dependencies.</p>
%% </li>
%% </ul>
-spec new(pos_integer(), options()) -> cuckoo_filter().
new(Capacity, Opts) ->
    is_integer(Capacity) andalso Capacity > 0 orelse error(badarg),
    BucketSize = proplists:get_value(bucket_size, Opts, ?DEFAULT_BUCKET_SIZE),
    is_integer(BucketSize) andalso BucketSize > 0 orelse error(badarg),
    FingerprintSize = proplists:get_value(fingerprint_size, Opts, ?DEFAULT_FINGERPRINT_SIZE),
    lists:member(FingerprintSize, [4, 8, 16, 32, 64]) orelse error(badarg),
    MaxEvictions = proplists:get_value(max_evictions, Opts, ?DEFAULT_EVICTIONS),
    is_integer(MaxEvictions) andalso MaxEvictions >= 0 orelse error(badarg),
    HashFunction = proplists:get_value(
        hash_function,
        Opts,
        default_hash_function(BucketSize + FingerprintSize)
    ),
    NumBuckets = 1 bsl ceil(math:log2(ceil(Capacity / BucketSize))),
    MaxHash = NumBuckets bsl FingerprintSize - 1,
    AtomicsSize = ceil(NumBuckets * BucketSize * FingerprintSize / 64) + 3,
    AtomicsRef = atomics:new(AtomicsSize, [{signed, false}]),
    Filter = #cuckoo_filter{
        buckets = AtomicsRef,
        lock = spinlock:new([
            {atomics_ref, AtomicsRef},
            {max_retry, max(100_000, MaxEvictions * 10_000)}
        ]),
        num_buckets = NumBuckets,
        max_hash = MaxHash,
        bucket_size = BucketSize,
        fingerprint_size = FingerprintSize,
        max_evictions = MaxEvictions,
        hash_function = HashFunction
    },
    case proplists:get_value(name, Opts) of
        undefined ->
            Filter;
        FilterName ->
            persistent_term:put({?MODULE, FilterName}, Filter),
            Filter
    end.

%% @equiv add(Filter, Element, false)
-spec add(cuckoo_filter() | filter_name(), term()) -> ok | {error, not_enough_space}.
add(Filter = #cuckoo_filter{}, Element) ->
    add_hash(Filter, hash(Filter, Element), false);
add(FilterName, Element) ->
    add(?FILTER(FilterName), Element).

%% @equiv add_hash(Filter, Element, false)
-spec add_hash(cuckoo_filter() | filter_name(), hash()) -> ok | {error, not_enough_space}.
add_hash(Filter = #cuckoo_filter{}, Hash) ->
    add_hash(Filter, Hash, false);
add_hash(FilterName, Hash) ->
    add_hash(?FILTER(FilterName), Hash, false).

%% @doc Adds an element to a filter.
%%
%% Returns `ok' if the insertion was successful, but could return
%% `{error, not_enough_space}', when the filter is nearing its capacity.
%%
%% `Forced' argument is a boolean that indicates whether the insertion should
%% be forced or not. A forced insertion will never return a `not_enough_space'
%% error, instead when the filter is full, another random element is removed,
%% and the removed element is returned as `{ok, {Index, Fingerprint}}'.
%% In this case, elements are not relocated, and no lock is acquired.
%%
%% Forced insertion can only be used with `max_evictions' set to 0.
-spec add
    (cuckoo_filter() | filter_name(), term(), false) ->
        ok | {error, not_enough_space};
    (cuckoo_filter() | filter_name(), term(), true) ->
        ok | {ok, Evicted :: {index(), fingerprint()}}.
add(Filter = #cuckoo_filter{}, Element, Forced) ->
    add_hash(Filter, hash(Filter, Element), Forced);
add(FilterName, Element, Forced) ->
    add(?FILTER(FilterName), Element, Forced).

%% @doc Adds an element to a filter by its hash.
%%
%% Same as {@link add/3} except that it accepts the hash of the element instead
%% of the element.
-spec add_hash
    (cuckoo_filter() | filter_name(), hash(), false) ->
        ok | {error, not_enough_space};
    (cuckoo_filter() | filter_name(), hash(), true) ->
        ok | {ok, Evicted :: {index(), fingerprint()}}.
add_hash(
    Filter = #cuckoo_filter{
        fingerprint_size = FingerprintSize,
        num_buckets = NumBuckets,
        hash_function = HashFunction
    },
    Hash,
    Forced
) ->
    {Index, Fingerprint} = index_and_fingerprint(Hash, FingerprintSize),
    case insert_at_index(Filter, Index, Fingerprint) of
        ok ->
            ok;
        {error, full} ->
            AltIndex = alt_index(Index, Fingerprint, NumBuckets, HashFunction),
            case insert_at_index(Filter, AltIndex, Fingerprint) of
                ok ->
                    ok;
                {error, full} ->
                    RState = rand:mwc59_seed(),
                    Rand = rand:mwc59_value32(RState) bsr 31 + 1,
                    RandIndex = element(Rand, {Index, AltIndex}),
                    case Forced of
                        true ->
                            force_insert(Filter, RandIndex, Fingerprint, RState);
                        false ->
                            try_insert(Filter, RandIndex, Fingerprint, RState)
                    end
            end
    end;
add_hash(FilterName, Hash, Forced) ->
    add_hash(?FILTER(FilterName), Hash, Forced).

%% @doc Checks if an element is in a filter.
-spec contains(cuckoo_filter() | filter_name(), term()) -> boolean().
contains(Filter = #cuckoo_filter{}, Element) ->
    contains_hash(Filter, hash(Filter, Element));
contains(FilterName, Element) ->
    contains(?FILTER(FilterName), Element).

%% @doc Checks if an element is in a filter by its hash.
-spec contains_hash(cuckoo_filter() | filter_name(), hash()) -> boolean().
contains_hash(Filter = #cuckoo_filter{fingerprint_size = FingerprintSize}, Hash) ->
    {Index, Fingerprint} = index_and_fingerprint(Hash, FingerprintSize),
    contains_fingerprint(Filter, Index, Fingerprint);
contains_hash(FilterName, Hash) ->
    contains_hash(?FILTER(FilterName), Hash).

%% @doc Checks whether a filter contains a fingerprint at the given index or its alternative index.
-spec contains_fingerprint(cuckoo_filter() | filter_name(), index(), fingerprint()) -> boolean().
contains_fingerprint(
    Filter = #cuckoo_filter{
        num_buckets = NumBuckets,
        hash_function = HashFunction,
        max_evictions = MaxEvictions
    },
    Index,
    Fingerprint
) ->
    Retry =
        case MaxEvictions of
            0 -> 1;
            _ -> 2
        end,
    AltIndex = fun() -> alt_index(Index, Fingerprint, NumBuckets, HashFunction) end,
    contains_fingerprint(Filter, {Index, AltIndex}, Fingerprint, Retry);
contains_fingerprint(FilterName, Index, Fingerprint) ->
    contains_fingerprint(?FILTER(FilterName), Index, Fingerprint).

%% @doc Deletes an element from a filter.
%%
%% Returns `ok' if the deletion was successful, and returns {error, not_found}
%% if the element could not be found in the filter.
%%
%% <b>Note:</b> A cuckoo filter can only delete items that are known to be
%% inserted before. Deleting non inserted items might lead to deletion of
%% another element in case of a hash collision.
-spec delete(cuckoo_filter() | filter_name(), term()) -> ok | {error, not_found}.
delete(Filter = #cuckoo_filter{}, Element) ->
    delete_hash(Filter, hash(Filter, Element));
delete(FilterName, Element) ->
    delete(?FILTER(FilterName), Element).

%% @doc Deletes an element from a filter by its hash.
%%
%% Same as {@link delete/2} except that it uses the hash of the element instead
%% of the element.
-spec delete_hash(cuckoo_filter() | filter_name(), hash()) -> ok | {error, not_found}.
delete_hash(
    Filter = #cuckoo_filter{
        fingerprint_size = FingerprintSize,
        num_buckets = NumBuckets,
        hash_function = HashFunction
    },
    Hash
) ->
    {Index, Fingerprint} = index_and_fingerprint(Hash, FingerprintSize),
    ?TRANSACTION(
        Filter,
        case delete_fingerprint(Filter, Fingerprint, Index) of
            ok ->
                ok;
            {error, not_found} ->
                AltIndex = alt_index(Index, Fingerprint, NumBuckets, HashFunction),
                delete_fingerprint(Filter, Fingerprint, AltIndex)
        end
    );
delete_hash(FilterName, Hash) ->
    delete_hash(?FILTER(FilterName), Hash).

%% @doc Returns the hash value of an element using the hash function of the filter.
-spec hash(cuckoo_filter() | filter_name(), term()) -> hash().
hash(
    #cuckoo_filter{
        max_hash = MaxHash,
        hash_function = HashFunction
    },
    Element
) ->
    HashFunction(Element) band MaxHash;
hash(FilterName, Element) ->
    hash(?FILTER(FilterName), Element).

%% @doc Returns the maximum capacity of a filter.
-spec capacity(cuckoo_filter() | filter_name()) -> pos_integer().
capacity(#cuckoo_filter{bucket_size = BucketSize, num_buckets = NumBuckets}) ->
    NumBuckets * BucketSize;
capacity(FilterName) ->
    capacity(?FILTER(FilterName)).

%% @doc Returns number of items in a filter.
-spec size(cuckoo_filter() | filter_name()) -> non_neg_integer().
size(#cuckoo_filter{buckets = Buckets}) ->
    atomics:get(Buckets, 3);
size(FilterName) ->
    ?MODULE:size(?FILTER(FilterName)).

%% @doc Retrieves a cuckoo_filter from persistent_term by its name.
-spec whereis(filter_name()) -> cuckoo_filter().
whereis(FilterName) ->
    ?FILTER(FilterName).

%% @doc Exports a filter as a binary.
%%
%% Returned binary can be used to reconstruct the filter again, using
%% {@link import/2} function.
-spec export(cuckoo_filter() | filter_name()) -> binary().
export(Filter = #cuckoo_filter{buckets = Buckets}) ->
    AtomicsSize = maps:get(size, atomics:info(Buckets)),
    ?TRANSACTION(
        Filter,
        <<
            <<(atomics:get(Buckets, I)):64/big-unsigned-integer>>
         || I <- lists:seq(3, AtomicsSize)
        >>
    );
export(FilterName) ->
    export(?FILTER(FilterName)).

%% @doc Imports filter data from a binary created using {@link export/1}.
%%
%% Returns ok if the import was successful, but could return {ok, invalid_data_size}
%% if the size of the given binary does not match the size of the filter.
-spec import(cuckoo_filter() | filter_name(), binary()) -> ok | {error, invalid_data_size}.
import(Filter = #cuckoo_filter{buckets = Buckets}, Data) when is_binary(Data) ->
    ByteSize = (maps:get(size, atomics:info(Buckets)) - 2) * 8,
    case byte_size(Data) of
        ByteSize ->
            ?TRANSACTION(Filter, import(Buckets, Data, 3));
        _ ->
            {error, invalid_data_size}
    end;
import(FilterName, Data) ->
    import(?FILTER(FilterName), Data).

%%%-------------------------------------------------------------------
%% Internal functions
%%%-------------------------------------------------------------------

-dialyzer({nowarn_function, default_hash_function/1}).

default_hash_function(Size) when Size > 64 ->
    fun(Element) -> xxh3:hash128(term_to_binary(Element)) end;
default_hash_function(Size) when Size > 32 ->
    fun(Element) -> xxh3:hash64(term_to_binary(Element)) end;
default_hash_function(Size) when Size > 27 ->
    fun(Element) -> erlang:phash2(Element, 4294967296) end;
default_hash_function(_Size) ->
    fun erlang:phash2/1.

import(_Buckets, <<>>, _Index) ->
    ok;
import(Buckets, <<Atomic:64/big-unsigned-integer, Data/binary>>, Index) ->
    atomics:put(Buckets, Index, Atomic),
    import(Buckets, Data, Index + 1).

contains_fingerprint(Filter, {Index, AltIndex}, Fingerprint, Retry) ->
    Bucket = read_bucket(Index, Filter),
    lists:member(Fingerprint, Bucket) orelse
        Retry > 0 andalso
            contains_fingerprint(
                Filter,
                {AltIndex(), fun() -> Index end},
                Fingerprint,
                Retry - 1
            ).

delete_fingerprint(Filter, Fingerprint, Index) ->
    Bucket = read_bucket(Index, Filter),
    case find_in_bucket(Bucket, Fingerprint) of
        {ok, SubIndex} ->
            case update_in_bucket(Filter, Index, SubIndex, Fingerprint, 0) of
                ok -> ok;
                {error, outdated} -> delete_fingerprint(Filter, Fingerprint, Index)
            end;
        {error, not_found} ->
            {error, not_found}
    end.

index_and_fingerprint(Hash, FingerprintSize) ->
    Fingerprint = Hash rem (1 bsl FingerprintSize - 1) + 1,
    Index = Hash bsr FingerprintSize,
    {Index, Fingerprint}.

alt_index(Index, Fingerprint, NumBuckets, HashFunction) ->
    Index bxor HashFunction(Fingerprint) band (NumBuckets - 1).

atomic_index(BitIndex) ->
    BitIndex bsr 6 + 4.

insert_at_index(Filter, Index, Fingerprint) ->
    Bucket = read_bucket(Index, Filter),
    case find_in_bucket(Bucket, 0) of
        {ok, SubIndex} ->
            case update_in_bucket(Filter, Index, SubIndex, 0, Fingerprint) of
                ok ->
                    ok;
                {error, outdated} ->
                    insert_at_index(Filter, Index, Fingerprint)
            end;
        {error, not_found} ->
            {error, full}
    end.

force_insert(Filter = #cuckoo_filter{bucket_size = BucketSize}, Index, Fingerprint, RState) ->
    Filter#cuckoo_filter.max_evictions == 0 orelse error(badarg),
    Bucket = read_bucket(Index, Filter),
    UpdatedRState = rand:mwc59(RState),
    SubIndex = (rand:mwc59_value32(UpdatedRState) * BucketSize) bsr 32,
    case lists:nth(SubIndex + 1, Bucket) of
        0 ->
            case update_in_bucket(Filter, Index, SubIndex, 0, Fingerprint) of
                ok -> ok;
                {error, outdated} -> force_insert(Filter, Index, Fingerprint, UpdatedRState)
            end;
        Fingerprint ->
            {ok, {Index, Fingerprint}};
        Evicted ->
            case update_in_bucket(Filter, Index, SubIndex, Evicted, Fingerprint) of
                ok -> {ok, {Index, Evicted}};
                {error, outdated} -> force_insert(Filter, Index, Fingerprint, UpdatedRState)
            end
    end.

try_insert(Filter, Index, Fingerprint, RState) ->
    ?TRANSACTION(
        Filter,
        try_insert(
            Filter,
            Index,
            Fingerprint,
            RState,
            #{},
            [],
            Filter#cuckoo_filter.bucket_size
        )
    ).

try_insert(_Filter, _Index, _Fingerprint, _RState, _Evictions, _EvictionsList, 0) ->
    {error, not_enough_space};
try_insert(
    #cuckoo_filter{max_evictions = MaxEvictions},
    _Index,
    _Fingerprint,
    _RState,
    Evictions,
    _EvictionsList,
    _Retry
) when map_size(Evictions) > MaxEvictions ->
    {error, not_enough_space};
try_insert(
    Filter = #cuckoo_filter{
        bucket_size = BucketSize,
        num_buckets = NumBuckets,
        hash_function = HashFunction
    },
    Index,
    Fingerprint,
    RState,
    Evictions,
    EvictionsList,
    Retry
) ->
    Bucket = read_bucket(Index, Filter),
    case find_in_bucket(Bucket, 0) of
        {ok, SubIndex} ->
            case update_in_bucket(Filter, Index, SubIndex, 0, Fingerprint) of
                ok ->
                    persist_evictions(Filter, Evictions, EvictionsList, Fingerprint);
                {error, outdated} ->
                    try_insert(Filter, Index, Fingerprint, RState, Evictions, EvictionsList, Retry)
            end;
        {error, not_found} ->
            UpdatedRState = rand:mwc59(RState),
            SubIndex = (rand:mwc59_value32(UpdatedRState) * BucketSize) bsr 32,
            Evicted = lists:nth(SubIndex + 1, Bucket),
            Key = {Index, SubIndex},
            if
                Fingerprint == Evicted orelse is_map_key(Key, Evictions) ->
                    try_insert(
                        Filter,
                        Index,
                        Fingerprint,
                        UpdatedRState,
                        Evictions,
                        EvictionsList,
                        Retry - 1
                    );
                true ->
                    AltIndex = alt_index(Index, Evicted, NumBuckets, HashFunction),
                    try_insert(
                        Filter,
                        AltIndex,
                        Evicted,
                        UpdatedRState,
                        Evictions#{Key => Fingerprint},
                        [Key | EvictionsList],
                        BucketSize
                    )
            end
    end.

persist_evictions(_Filter, _Evictions, [], _Evicted) ->
    ok;
persist_evictions(
    Filter,
    Evictions,
    [Key = {Index, SubIndex} | EvictionsList],
    Evicted
) ->
    Fingerprint = maps:get(Key, Evictions),
    ok = update_in_bucket(Filter, Index, SubIndex, Evicted, Fingerprint),
    persist_evictions(Filter, Evictions, EvictionsList, Fingerprint).

find_in_bucket(Bucket, Fingerprint) ->
    find_in_bucket(Bucket, Fingerprint, 0).

find_in_bucket([], _Fingerprint, _Index) ->
    {error, not_found};
find_in_bucket([Fingerprint | _Bucket], Fingerprint, Index) ->
    {ok, Index};
find_in_bucket([_ | Bucket], Fingerprint, Index) ->
    find_in_bucket(Bucket, Fingerprint, Index + 1).

read_bucket(
    Index,
    #cuckoo_filter{
        buckets = Buckets,
        bucket_size = BucketSize,
        fingerprint_size = FingerprintSize
    }
) ->
    BucketBitSize = BucketSize * FingerprintSize,
    BitIndex = Index * BucketBitSize,
    AtomicIndex = atomic_index(BitIndex),
    SkipBits = BitIndex band 63,
    EndIndex = atomic_index(BitIndex + BucketBitSize - 1),
    <<_:SkipBits, Bucket:BucketBitSize/bitstring, _/bitstring>> = <<
        <<(atomics:get(Buckets, I)):64/big-unsigned-integer>>
     || I <- lists:seq(AtomicIndex, EndIndex)
    >>,
    [F || <<F:FingerprintSize/big-unsigned-integer>> <= Bucket].

update_in_bucket(
    Filter = #cuckoo_filter{
        buckets = Buckets,
        bucket_size = BucketSize,
        fingerprint_size = FingerprintSize
    },
    Index,
    SubIndex,
    OldValue,
    Value
) ->
    BitIndex = Index * BucketSize * FingerprintSize + SubIndex * FingerprintSize,
    AtomicIndex = atomic_index(BitIndex),
    SkipBits = BitIndex band 63,
    AtomicValue = atomics:get(Buckets, AtomicIndex),
    case <<AtomicValue:64/big-unsigned-integer>> of
        <<Prefix:SkipBits/bitstring, OldValue:FingerprintSize/big-unsigned-integer,
            Suffix/bitstring>> ->
            <<UpdatedAtomic:64/big-unsigned-integer>> =
                <<Prefix/bitstring, Value:FingerprintSize/big-unsigned-integer, Suffix/bitstring>>,
            case atomics:compare_exchange(Buckets, AtomicIndex, AtomicValue, UpdatedAtomic) of
                ok ->
                    case {OldValue, Value} of
                        {0, _} -> atomics:add(Buckets, 3, 1);
                        {_, 0} -> atomics:sub(Buckets, 3, 1);
                        {_, _} -> ok
                    end;
                _ ->
                    update_in_bucket(Filter, Index, SubIndex, OldValue, Value)
            end;
        _ ->
            {error, outdated}
    end.