defmodule Kvasir.Offset do
@type partition_offset :: :earliest | :latest | pos_integer
@type t :: %__MODULE__{}
defstruct partitions: %{}
def create, do: %__MODULE__{}
def create(offset) when is_integer(offset), do: %__MODULE__{partitions: %{0 => offset}}
def create(partitions), do: %__MODULE__{partitions: partitions}
def create(partition, offset), do: %__MODULE__{partitions: %{partition => offset}}
def chunk(offset, chunks \\ 0)
def chunk(offset, 0), do: chunk_every(offset, 1)
def chunk(offset, 1), do: offset
def chunk(offset = %__MODULE__{partitions: p}, chunks) do
chunk_every(offset, trunc(Float.ceil(map_size(p) / chunks)))
end
def chunk_every(%__MODULE__{partitions: partitions}, every) do
partitions
|> Enum.chunk_every(every)
|> Enum.map(&%Kvasir.Offset{partitions: Map.new(&1)})
end
@doc ~S"""
Check whether an offset is empty.
"""
@spec empty?(t) :: boolean
def empty?(%__MODULE__{partitions: p}), do: p == %{}
def get(%__MODULE__{partitions: p}, partition), do: p[partition] || 0
def set(o = %__MODULE__{partitions: p}, partition, offset),
do: %{o | partitions: Map.put(p, partition, offset)}
def merge(o = %__MODULE__{partitions: p1}, %__MODULE__{partitions: p2}) do
%{o | partitions: Map.merge(p1, p2)}
end
def bump(o = %__MODULE__{partitions: p}),
do: %{o | partitions: Map.new(p, fn {k, v} -> {k, v + 1} end)}
def bump_merge(o = %__MODULE__{}, nil), do: bump(o)
def bump_merge(nil, o = %__MODULE__{}), do: bump(o)
def bump_merge(o = %__MODULE__{partitions: p0}, %__MODULE__{partitions: p1}) do
p =
Enum.reduce(p1, p0, fn {k, v}, acc ->
Map.update(acc, k, v + 1, &if(&1 == v, do: v, else: v + 1))
end)
%{o | partitions: p}
end
@doc ~S"""
## Examples
```elixir
iex> a = create(%{0 => 24, 1 => 37})
iex> b = create(%{0 => 24, 1 => 37})
iex> compare(a, b)
:eq
```
```elixir
iex> a = create(%{0 => 12, 1 => 23})
iex> b = create(%{0 => 24, 1 => 37})
iex> compare(a, b)
:lt
```
```elixir
iex> a = create(%{0 => 24, 1 => 37})
iex> b = create(%{0 => 12, 1 => 23})
iex> compare(a, b)
:gt
```
Using `:earliest`:
```elixir
iex> a = create(%{0 => :earliest})
iex> b = create(%{0 => 0})
iex> compare(a, b)
:lt
iex> a = create(%{0 => :earliest})
iex> b = create(%{0 => :earliest})
iex> compare(a, b)
:eq
```
"""
def compare(partition_a, partition_b) when is_map(partition_a) and is_map(partition_b) do
p0 = partitions(partition_a)
p1 = partitions(partition_b)
Enum.reduce_while(p0, :eq, fn {k, v0}, acc ->
case compare_value(v0, p1[k]) do
nil -> {:cont, acc}
:eq when acc == :eq -> {:cont, :eq}
:lt when acc != :gt -> {:cont, :lt}
:gt when acc != :lt -> {:cont, :gt}
_ -> {:halt, :mixed}
end
end)
end
def compare(offset_a, offset_b), do: compare_value(offset_a, offset_b)
defp partitions(%__MODULE__{partitions: p}), do: p
defp partitions(p), do: p
defp compare_value(a, b)
defp compare_value(nil, _), do: nil
defp compare_value(_, nil), do: nil
defp compare_value(eq, eq), do: :eq
defp compare_value(:earliest, _), do: :lt
defp compare_value(_, :earliest), do: :gt
defp compare_value(:latest, _), do: :gt
defp compare_value(_, :latest), do: :lt
defp compare_value(a, b), do: if(a < b, do: :lt, else: :gt)
@doc ~S"""
Compare two partition offset values and return the lowest. (earliest)
"""
@spec min(partition_offset, partition_offset) :: partition_offset
def min(offset_a, offset_b) do
case compare_value(offset_a, offset_b) do
nil -> nil
:eq -> offset_a
:lt -> offset_a
:gt -> offset_b
end
end
defimpl Jason.Encoder, for: __MODULE__ do
alias Jason.Encoder.Map
def encode(%{partitions: p}, opts), do: Map.encode(p, opts)
end
defimpl Inspect, for: __MODULE__ do
import Inspect.Algebra
def inspect(%{partitions: partitions}, opts) do
parts = Enum.sort_by(partitions, &elem(&1, 0))
if parts == [] do
concat(["#Offset<", to_doc("EMPTY", opts), ">"])
else
prepared =
parts
|> Enum.intersperse(",")
|> Enum.reduce(["#Offset<"], fn
{p, o}, acc -> [to_doc(o, opts), ":", to_doc(p, opts) | acc]
p, acc -> [p | acc]
end)
concat(:lists.reverse([">" | prepared]))
end
end
end
end