defmodule Kvasir.Kafka.OffsetTracker do
use GenServer
alias Kvasir.Offset
@table :kafka_offsets
@offset_timeout 25_000
@leader_timeout @offset_timeout + 5_000
@offset_refresh 5 * 60_000
def offset(topic) do
case :ets.lookup(@table, topic) do
[{^topic, offset}] -> offset
_ -> nil
end
end
def offset(topic, partition) do
if o = offset(topic), do: Offset.get(o, partition)
end
def child_spec(topics, servers, config) do
%{
id: __MODULE__,
start: {GenServer, :start_link, [__MODULE__, {topics, servers, config}, [name: __MODULE__]]}
}
end
@impl GenServer
def init({topics, servers, config}) do
:ets.new(@table, [:set, :protected, :named_table, read_concurrency: true])
offsets = fetch_offsets(topics, servers, config)
Process.send_after(self(), :refresh, @offset_refresh)
# Sync to ETS
Enum.each(offsets, &:ets.insert(@table, &1))
{:ok, {offsets, topics, servers, config}}
end
@impl GenServer
def handle_call({:offset, topic}, _from, state = {offsets, _, _, _}) do
case Map.fetch(offsets, topic) do
{:ok, o} -> {:reply, o, state}
_ -> {:reply, nil, state}
end
end
def handle_call({:offset, topic, partition}, _from, state = {offsets, _, _, _}) do
case Map.fetch(offsets, topic) do
{:ok, o} -> {:reply, Offset.get(o, partition), state}
_ -> {:reply, nil, state}
end
end
@impl GenServer
def handle_info(:refresh, state = {_, topics, servers, config}) do
pid = self()
spawn_link(fn -> send(pid, {:refresh, fetch_offsets(topics, servers, config)}) end)
{:noreply, state}
end
def handle_info({:refresh, offsets}, {_, topics, servers, config}) do
# Sync to ETS
Enum.each(offsets, &:ets.insert(@table, &1))
Process.send_after(self(), :refresh, @offset_refresh)
{:noreply, {offsets, topics, servers, config}}
end
defp fetch_offsets(topics, servers, config) do
metadata =
case :brod_utils.get_metadata(servers, Map.keys(topics), config) do
# Old brod
{:ok, %{topic_metadata: metadata}} -> metadata
# New brod
{:ok, %{topics: topics}} -> topics
end
metadata
|> leaders()
|> Enum.map(fn {_leader, leader_topics} ->
Task.async(fn ->
topics = [{t, [p | _]} | _] = :maps.to_list(leader_topics)
{:ok, conn} = :kpro.connect_partition_leader(servers, config, t, p)
req =
:kpro_req_lib.make(:list_offsets, 1,
replica_id: -1,
isolation_level: :read_committed,
topics:
Enum.map(topics, fn {k, v} ->
[topic: k, partitions: Enum.map(v, &[partition: &1, timestamp: -2])]
end)
)
result =
case :kpro.request_sync(conn, req, @offset_timeout) do
{:ok, {:kpro_rsp, _, :list_offsets, _, %{responses: r}}} -> r
_ -> []
end
:kpro.close_connection(conn)
Enum.reduce(result, %{}, fn %{topic: t, partition_responses: r}, acc ->
Map.put(
acc,
t,
Enum.reduce(r, Offset.create(), &Offset.set(&2, &1.partition, &1.offset))
)
end)
end)
end)
|> Enum.map(&Task.await(&1, @leader_timeout))
|> Enum.reduce(%{}, fn data, acc ->
Map.merge(acc, data, fn _, v1, v2 -> Offset.merge(v1, v2) end)
end)
end
defp leaders(metadata, acc \\ %{})
defp leaders([], acc), do: acc
# Old brod
defp leaders([%{topic: t, partition_metadata: pm} | metadata], acc) do
leaders(
metadata,
Enum.reduce(pm, acc, fn %{leader: l, partition: p}, a ->
Map.update(a, l, %{t => [p]}, fn la -> Map.update(la, t, [p], &[p | &1]) end)
end)
)
end
# New brod
defp leaders([%{name: t, partitions: pm} | metadata], acc) do
leaders(
metadata,
Enum.reduce(pm, acc, fn %{leader_id: l, partition_index: p}, a ->
Map.update(a, l, %{t => [p]}, fn la -> Map.update(la, t, [p], &[p | &1]) end)
end)
)
end
end