lib/runbox/deduplicator.ex

defmodule Runbox.Deduplicator do
  @moduledoc """
  Decide if a message is a duplicate by comparing it to messages already seen.

  Only messages with the biggest timestamp are remembered as "seen". If the
  actual message has lower timestamp than the messages in "seen", it is
  considered "old". If it has the exact same timestamp, it is compared for
  equality with all the "seen" messages and if it matches, it is considered
  "duplicate". In any other case, the message is considered "new".
  """

  defstruct [:extract_timestamp, :last_timestamp, seen: MapSet.new()]

  @opaque t() :: %__MODULE__{
            extract_timestamp: (any() -> non_neg_integer()),
            last_timestamp: non_neg_integer() | nil,
            seen: MapSet.t()
          }

  @doc """
  Initializes a deduplicator.

  `stream` is an enumerable (possibly empty), which contains messages with
  descending timestamps - from newest to oldest. It is used to initialize
  "seen" messages from messages which were already processed.

  `extract_timestamp` is a function which returns a timestamp for a given
  message.
  """
  @spec new(Enumerable.t(), (any() -> non_neg_integer())) :: t() | no_return()
  def new(stream, extract_timestamp) do
    Enum.reduce_while(stream, %__MODULE__{extract_timestamp: extract_timestamp}, &init/2)
  end

  @spec init(any(), t()) :: {:cont, t()} | {:halt, t()} | no_return()
  defp init(msg, %__MODULE__{last_timestamp: last_timestamp} = state) do
    case state.extract_timestamp.(msg) do
      timestamp when timestamp == last_timestamp or is_nil(last_timestamp) ->
        {:cont, %__MODULE__{state | last_timestamp: timestamp, seen: MapSet.put(state.seen, msg)}}

      timestamp when timestamp < last_timestamp ->
        {:halt, state}

      timestamp when timestamp > last_timestamp ->
        raise "non-monotonic timestamps"
    end
  end

  @doc """
  Decides if a given message is a duplicate or not.

  Returns a tuple with `{msg_condition, deduplicator_state}` where `msg_condition` can be one of:

    * `:new` - message is new and was not yet seen.

    * `:old` - message is older than the messages already seen.

    * `:duplicate` - message is equal to one of the latest messages already seen.
  """
  @spec deduplicate(any(), t()) :: {msg_condition, t()}
        when msg_condition: :new | :old | :duplicate
  def deduplicate(msg, %__MODULE__{last_timestamp: last_timestamp} = state) do
    case state.extract_timestamp.(msg) do
      timestamp when timestamp > last_timestamp or is_nil(last_timestamp) ->
        {:new, %__MODULE__{state | last_timestamp: timestamp, seen: MapSet.new([msg])}}

      timestamp when timestamp == last_timestamp ->
        if MapSet.member?(state.seen, msg),
          do: {:duplicate, state},
          else: {:new, %__MODULE__{state | seen: MapSet.put(state.seen, msg)}}

      timestamp when timestamp < last_timestamp ->
        {:old, state}
    end
  end
end