defmodule Talan.BloomFilter do
@moduledoc """
Bloom filter implementation with **concurrent accessibility**,
powered by [:atomics](http://erlang.org/doc/man/atomics.html) module.
"A Bloom filter is a space-efficient probabilistic data structure,
conceived by Burton Howard Bloom in 1970,
that is used to test whether an element is a member of a set"
[Bloom filter on Wikipedia](https://en.wikipedia.org/wiki/Bloom_filter#CITEREFZhiwangJungangJian2010)
## Credit
Partly inspired by [Blex](https://github.com/gyson/blex)
## Features
* Fixed size Bloom filter
* Concurrent reads & writes
* Custom & default hash functions
* Merge multiple Bloom filters into one
* Intersect multiple Bloom filters into one
* Estimate number of unique elements
* Estimate current false positive probability
## Examples
iex> b = Talan.BloomFilter.new(1000)
iex> b |> Talan.BloomFilter.put("Barna")
iex> b |> Talan.BloomFilter.member?("Barna")
true
iex> b |> Talan.BloomFilter.member?("Kovacs")
false
"""
alias __MODULE__, as: BF
@enforce_keys [:atomics_ref, :filter_length, :hash_functions]
defstruct [:atomics_ref, :filter_length, :hash_functions]
@type t :: %__MODULE__{
atomics_ref: reference,
filter_length: non_neg_integer,
hash_functions: list
}
@doc """
Returns a new `%Talan.BloomFilter{}` for the desired `cardinality`.
`cardinality` is the expected number of unique items. Duplicated items
can be infinite.
## Options
* `:false_positive_probability` - a float, defaults to 0.01
* `:hash_functions` - a list of hash functions, defaults to randomly seeded murmur
## Examples
iex> bloom_filter = Talan.BloomFilter.new(1_000_000)
iex> bloom_filter |> Talan.BloomFilter.put("Barna Kovacs")
:ok
"""
@spec new(pos_integer, list) :: t
def new(cardinality, options \\ []) when is_integer(cardinality) and cardinality > 0 do
false_positive_probability = options |> Keyword.get(:false_positive_probability, 0.01)
hash_functions = options |> Keyword.get(:hash_functions, [])
if false_positive_probability <= 0 || false_positive_probability >= 1 do
raise ArgumentError, """
false_positive_probability must be a float between 0 and 1.
E.g. 0.01
Got: #{inspect(false_positive_probability)}
"""
end
hash_functions =
case hash_functions do
[] ->
hash_count = required_hash_function_count(false_positive_probability)
Talan.seed_n_murmur_hash_fun(hash_count)
list ->
list
end
filter_length = required_filter_length(cardinality, false_positive_probability)
atomics_arity = max(div(filter_length, 64), 1)
atomics_ref = :atomics.new(atomics_arity, signed: false)
%BF{
atomics_ref: atomics_ref,
filter_length: atomics_arity * 64,
hash_functions: hash_functions
}
end
@doc """
Returns count of required hash functions for the
given `false_positive_probability`.
[Wikipedia - Bloom filter - Optimal number of hash functions](https://en.wikipedia.org/wiki/Bloom_filter#Optimal_number_of_hash_functions)
## Examples
iex> Talan.BloomFilter.required_hash_function_count(0.01)
7
iex> Talan.BloomFilter.required_hash_function_count(0.001)
10
iex> Talan.BloomFilter.required_hash_function_count(0.0001)
14
"""
@spec required_hash_function_count(float) :: non_neg_integer
def required_hash_function_count(false_positive_probability) do
-:math.log2(false_positive_probability)
|> Float.ceil()
|> round()
end
@doc """
Returns the required bit count given
* `cardinality` - Number of unique elements that will be inserted
* `false_positive_probability` - Desired false positive probability of membership
[Wikipedia - Bloom filter - Optimal number of hash functions](https://en.wikipedia.org/wiki/Bloom_filter#Optimal_number_of_hash_functions)
## Examples
iex> Talan.BloomFilter.required_filter_length(10_000, 0.01)
95851
"""
@spec required_filter_length(non_neg_integer, float) :: non_neg_integer
def required_filter_length(cardinality, false_positive_probability)
when is_integer(cardinality) and cardinality > 0 and false_positive_probability > 0 and
false_positive_probability < 1 do
import :math, only: [log: 1, pow: 2]
Float.ceil(-cardinality * log(false_positive_probability) / pow(log(2), 2))
|> round()
end
@doc """
Puts `term` into `bloom_filter` a `%Talan.BloomFilter{}` struct.
After this the `member?` function will always return `true`
for the membership of `term`.
Returns `:ok`.
## Examples
iex> b = Talan.BloomFilter.new(1000)
iex> b |> Talan.BloomFilter.put("Chris McCord")
:ok
iex> b |> Talan.BloomFilter.put("Jose Valim")
:ok
"""
@spec put(t, any) :: :ok
def put(%BF{} = bloom_filter, term) do
hashes = hash_term(bloom_filter, term)
put_hashes(bloom_filter, hashes)
:ok
end
@doc false
def put_hashes(%BF{atomics_ref: atomics_ref}, hashes) when is_list(hashes) do
hashes
|> Enum.each(fn hash ->
Abit.set_bit_at(atomics_ref, hash, 1)
end)
end
@doc """
Checks for membership of `term` in `bloom_filter`.
Returns `false` if not a member. (definitely not member)
Returns `true` if maybe a member. (possibly member)
## Examples
iex> b = Talan.BloomFilter.new(1000)
iex> b |> Talan.BloomFilter.member?("Barna Kovacs")
false
iex> b |> Talan.BloomFilter.put("Barna Kovacs")
iex> b |> Talan.BloomFilter.member?("Barna Kovacs")
true
"""
@spec member?(t, any) :: boolean
def member?(%BF{atomics_ref: atomics_ref} = bloom_filter, term) do
hashes = hash_term(bloom_filter, term)
do_member?(atomics_ref, hashes)
end
defp do_member?(atomics_ref, [hash | hashes_tl]) do
if Abit.bit_at(atomics_ref, hash) == 1 do
do_member?(atomics_ref, hashes_tl)
else
false
end
end
defp do_member?(_, []), do: true
@doc """
Hashes `term` with all `hash_functions` of `%Talan.BloomFilter{}`.
Returns a list of hashed values.
## Examples
b = Talan.BloomFilter.new(1000)
Talan.BloomFilter.hash_term(b, :any_term_can_be_hashed)
[9386, 8954, 8645, 4068, 5445, 6914, 2844]
"""
@spec hash_term(t, any) :: list(integer)
def hash_term(%BF{filter_length: filter_length, hash_functions: hash_functions}, term) do
do_hash_term(filter_length, hash_functions, term)
end
defp do_hash_term(filter_length, hash_functions, term, acc \\ [])
defp do_hash_term(filter_length, [hash_fun | tl], term, acc) do
new_acc = [rem(hash_fun.(term), filter_length) | acc]
do_hash_term(filter_length, tl, term, new_acc)
end
defp do_hash_term(_, [], _, acc), do: acc
@doc """
Merge multiple `%Talan.BloomFilter{}` structs's atomics into one new struct.
Note: To work correctly filters with identical size & hash functions must be used.
Returns a new `%Talan.BloomFilter{}` struct which set bits are the merged set bits of
the bloom filters in the `list`.
## Examples
iex> hash_functions = Talan.seed_n_murmur_hash_fun(7)
iex> b1 = Talan.BloomFilter.new(1000, hash_functions: hash_functions)
iex> b1 |> Talan.BloomFilter.put("GitHub")
iex> b2 = Talan.BloomFilter.new(1000, hash_functions: hash_functions)
iex> b2 |> Talan.BloomFilter.put("Octocat")
:ok
iex> b3 = Talan.BloomFilter.merge([b1, b2])
iex> b3 |> Talan.BloomFilter.member?("GitHub")
true
iex> b3 |> Talan.BloomFilter.member?("Octocat")
true
"""
@spec merge(nonempty_list(t)) :: t
def merge(list = [first = %BF{atomics_ref: first_atomics_ref} | _tl]) do
%{size: size} = :atomics.info(first_atomics_ref)
new_atomics_ref = :atomics.new(size, signed: false)
list
|> Enum.reduce(
new_atomics_ref,
fn %BF{atomics_ref: atomics_ref}, acc ->
Abit.merge(acc, atomics_ref)
end
)
%BF{first | atomics_ref: new_atomics_ref}
end
@doc """
Intersection of `%Talan.BloomFilter{}` structs's atomics into one new struct.
Note: To work correctly filters with identical size & hash functions must be used.
Returns a new `%BloomFilter{}` struct which set bits are the intersection
the bloom filters in the `list`.
## Examples
iex> hash_functions = Talan.seed_n_murmur_hash_fun(7)
iex> b1 = Talan.BloomFilter.new(1000, hash_functions: hash_functions)
iex> b1 |> Talan.BloomFilter.put("GitHub")
iex> b2 = Talan.BloomFilter.new(1000, hash_functions: hash_functions)
iex> b2 |> Talan.BloomFilter.put("GitHub")
iex> b2 |> Talan.BloomFilter.put("Octocat")
:ok
iex> b3 = Talan.BloomFilter.intersection([b1, b2])
iex> b3 |> Talan.BloomFilter.member?("GitHub")
true
iex> b3 |> Talan.BloomFilter.member?("Octocat")
false
"""
@spec intersection(nonempty_list(t)) :: t
def intersection(list = [first = %BF{atomics_ref: first_atomics_ref} | _tl]) do
%{size: size} = :atomics.info(first_atomics_ref)
new_atomics_ref = :atomics.new(size, signed: false)
Abit.merge(new_atomics_ref, first_atomics_ref)
list
|> Enum.reduce(
new_atomics_ref,
fn %BF{atomics_ref: atomics_ref}, acc ->
Abit.intersect(acc, atomics_ref)
end
)
%BF{first | atomics_ref: new_atomics_ref}
end
@doc """
Returns an non negative integer representing the
estimated cardinality count of unique elements in the filter.
## Examples
iex> b = Talan.BloomFilter.new(1000)
iex> b |> Talan.BloomFilter.cardinality()
0
iex> b |> Talan.BloomFilter.put("Barna")
iex> b |> Talan.BloomFilter.cardinality()
1
iex> b |> Talan.BloomFilter.put("Barna")
iex> b |> Talan.BloomFilter.cardinality()
1
iex> b |> Talan.BloomFilter.put("Kovacs")
iex> b |> Talan.BloomFilter.cardinality()
2
"""
@spec cardinality(t) :: non_neg_integer
def cardinality(%BF{
atomics_ref: atomics_ref,
filter_length: filter_length,
hash_functions: hash_functions
}) do
set_bits_count = Abit.set_bits_count(atomics_ref)
hash_function_count = length(hash_functions)
cond do
set_bits_count < hash_function_count ->
0
set_bits_count == hash_function_count ->
1
filter_length == set_bits_count ->
round(filter_length / hash_function_count)
true ->
est = :math.log(filter_length - set_bits_count) - :math.log(filter_length)
round(filter_length * -est / hash_function_count)
end
end
@doc """
Returns a float representing current estimated
false positivity probability.
## Examples
iex> b = Talan.BloomFilter.new(1000)
iex> b |> Talan.BloomFilter.false_positive_probability()
0.0 # fpp zero when bloom filter is empty
iex> b |> Talan.BloomFilter.put("Barna") # fpp increase
iex> b |> Talan.BloomFilter.put("Kovacs")
iex> fpp = b |> Talan.BloomFilter.false_positive_probability()
iex> fpp > 0 && fpp < 1
true
"""
@spec false_positive_probability(t()) :: float()
def false_positive_probability(%BF{
atomics_ref: atomics_ref,
filter_length: filter_length,
hash_functions: hash_functions
}) do
bits_not_set_count = filter_length - Abit.set_bits_count(atomics_ref)
hash_function_count = length(hash_functions)
:math.pow(1 - bits_not_set_count / filter_length, hash_function_count)
end
@doc """
Returns a map representing the bit state of the `atomics_ref`.
Use this for debugging purposes.
## Examples
iex> b = Talan.BloomFilter.new(1000)
iex> b |> Talan.BloomFilter.bits_info()
%{total_bits: 9536, set_bits_count: 0, set_ratio: 0.0}
"""
@spec bits_info(t()) :: map()
def bits_info(%BF{atomics_ref: atomics_ref, filter_length: filter_length}) do
set_bits_count = Abit.set_bits_count(atomics_ref)
%{
total_bits: filter_length,
set_bits_count: set_bits_count,
set_ratio: set_bits_count / filter_length
}
end
end