defmodule Runbox.Deduplicator do
@moduledoc """
Decide if a message is a duplicate by comparing it to messages already seen.
Only messages with the biggest timestamp are remembered as "seen". If the
actual message has lower timestamp than the messages in "seen", it is
considered "old". If it has the exact same timestamp, it is compared for
equality with all the "seen" messages and if it matches, it is considered
"duplicate". In any other case, the message is considered "new".
"""
defstruct [:extract_timestamp, :last_timestamp, seen: MapSet.new()]
@opaque t() :: %__MODULE__{
extract_timestamp: (any() -> non_neg_integer()),
last_timestamp: non_neg_integer() | nil,
seen: MapSet.t()
}
@doc """
Initializes a deduplicator.
`stream` is an enumerable (possibly empty), which contains messages with
descending timestamps - from newest to oldest. It is used to initialize
"seen" messages from messages which were already processed.
`extract_timestamp` is a function which returns a timestamp for a given
message.
"""
@spec new(Enumerable.t(), (any() -> non_neg_integer())) :: t() | no_return()
def new(stream, extract_timestamp) do
Enum.reduce_while(stream, %__MODULE__{extract_timestamp: extract_timestamp}, &init/2)
end
@spec init(any(), t()) :: {:cont, t()} | {:halt, t()} | no_return()
defp init(msg, %__MODULE__{last_timestamp: last_timestamp} = state) do
case state.extract_timestamp.(msg) do
timestamp when timestamp == last_timestamp or is_nil(last_timestamp) ->
{:cont, %__MODULE__{state | last_timestamp: timestamp, seen: MapSet.put(state.seen, msg)}}
timestamp when timestamp < last_timestamp ->
{:halt, state}
timestamp when timestamp > last_timestamp ->
raise "non-monotonic timestamps"
end
end
@doc """
Decides if a given message is a duplicate or not.
Returns a tuple with `{msg_condition, deduplicator_state}` where `msg_condition` can be one of:
* `:new` - message is new and was not yet seen.
* `:old` - message is older than the messages already seen.
* `:duplicate` - message is equal to one of the latest messages already seen.
"""
@spec deduplicate(any(), t()) :: {msg_condition, t()}
when msg_condition: :new | :old | :duplicate
def deduplicate(msg, %__MODULE__{last_timestamp: last_timestamp} = state) do
case state.extract_timestamp.(msg) do
timestamp when timestamp > last_timestamp or is_nil(last_timestamp) ->
{:new, %__MODULE__{state | last_timestamp: timestamp, seen: MapSet.new([msg])}}
timestamp when timestamp == last_timestamp ->
if MapSet.member?(state.seen, msg),
do: {:duplicate, state},
else: {:new, %__MODULE__{state | seen: MapSet.put(state.seen, msg)}}
timestamp when timestamp < last_timestamp ->
{:old, state}
end
end
end