lib/talan/stream.ex

defmodule Talan.Stream do
  alias Talan.BloomFilter

  @doc """
  Returns a probabilistically uniq stream.

  Its main advantage is that it doesn't store elements
  emitted by the stream.
  Instead it uses a bloom filter for membership check.

  The stream never returns duplicate elements but it
  sometimes detects false positive duplicates depending
  on the bloom filter it uses.
  False positives are faulty duplicate detections that
  get rejected.

  ## Examples

      iex> list = ["a", "b", "c", "a", "b"]
      iex> bloom_filter = Talan.BloomFilter.new(100_000, false_positive_probability: 0.001)
      iex> Talan.Stream.uniq(list, bloom_filter) |> Enum.to_list
      ["a", "b", "c"]
  """
  @spec uniq(Enumerable.t(), BloomFilter.t()) :: Enumerable.t()
  def uniq(enum, bloom_filter) do
    enum
    |> Stream.reject(fn x ->
      is_member = bloom_filter |> BloomFilter.member?(x)

      if not is_member do
        bloom_filter |> BloomFilter.put(x)
      end

      is_member
    end)
  end
end