lib/elsa/group/acknowledger.ex

defmodule Elsa.Group.Acknowledger do
  @moduledoc """
  Handles acknowledgement of messages to the
  group coordinator to prevent the group manager
  from queuing up messages for acknowledgement
  when events such as a rebalance occur.
  """
  use GenServer
  import Elsa.ElsaSupervisor, only: [registry: 1]
  alias Elsa.ElsaRegistry
  require Logger

  @doc """
  Trigger acknowledgement of processed messages back to the cluster.
  """
  @spec ack(Elsa.connection(), Elsa.topic(), Elsa.partition(), Elsa.Group.Manager.generation_id(), integer()) :: :ok
  def ack(connection, topic, partition, generation_id, offset) do
    acknowledger = {:via, ElsaRegistry, {registry(connection), __MODULE__}}
    GenServer.cast(acknowledger, {:ack, topic, partition, generation_id, offset})
  end

  @doc """
  Trigger acknowledgement of processed messages back to the cluster.
  """
  @spec ack(Elsa.connection(), %{
          topic: Elsa.topic(),
          partition: Elsa.partition(),
          generation_id: Elsa.Group.Manager.generation_id(),
          offset: integer()
        }) :: :ok
  def ack(connection, %{topic: topic, partition: partition, generation_id: generation_id, offset: offset}) do
    ack(connection, topic, partition, generation_id, offset)
  end

  @doc """
  Sync the group generation ID back to the acknowledger state for validation.
  """
  @spec update_generation_id(GenServer.server(), Elsa.Group.Manager.generation_id()) :: :ok
  def update_generation_id(acknowledger, generation_id) do
    GenServer.cast(acknowledger, {:update_generation, generation_id})
  end

  @doc """
  Retrieve the latest offset for a topic and partition. Primarily used for reinitializing
  consumer workers to the latest unacknowledged offset after a rebalance or other disruption.
  """
  @spec get_latest_offset(GenServer.server(), Elsa.topic(), Elsa.partition()) :: Elsa.Group.Manager.begin_offset() | nil
  def get_latest_offset(acknowledger, topic, partition) do
    GenServer.call(acknowledger, {:get_latest_offset, topic, partition})
  end

  @doc """
  Update the latest offset for a topic and partition. Primarily used when a consumer worker
  gets a partition assignments with an offset, so that future calls to get_latest_offset will
  return an up-to-date result.

  Failing to set this on partition assignment can cause a rewind to occur if the worker
  restarts before acking anything.
  """
  @spec set_latest_offset(GenServer.server(), Elsa.topic(), Elsa.partition(), Elsa.Group.Manager.begin_offset()) :: :ok
  def set_latest_offset(acknowledger, topic, partition, offset) do
    GenServer.call(acknowledger, {:set_latest_offset, topic, partition, offset})
  end

  @doc """
  Instantiate an acknowledger process and register it to the Elsa registry.
  """
  @spec start_link(term()) :: GenServer.on_start()
  def start_link(opts) do
    connection = Keyword.fetch!(opts, :connection)
    GenServer.start_link(__MODULE__, opts, name: {:via, ElsaRegistry, {registry(connection), __MODULE__}})
  end

  @impl GenServer
  def init(opts) do
    connection = Keyword.fetch!(opts, :connection)

    state = %{
      connection: connection,
      current_offsets: %{},
      generation_id: nil,
      group_coordinator_pid: nil
    }

    {:ok, state, {:continue, :get_coordinator}}
  end

  @impl GenServer
  @spec handle_continue(:get_coordinator, %{
          :connection => atom() | binary(),
          :group_coordinator_pid => any(),
          optional(any()) => any()
        }) ::
          {:noreply,
           %{:connection => atom() | binary(), :group_coordinator_pid => :undefined | pid(), optional(any()) => any()}}
  def handle_continue(:get_coordinator, state) do
    group_coordinator_pid = ElsaRegistry.whereis_name({registry(state.connection), :brod_group_coordinator})

    {:noreply, %{state | group_coordinator_pid: group_coordinator_pid}}
  end

  @impl GenServer
  def handle_call({:get_latest_offset, topic, partition}, _pid, %{current_offsets: offsets} = state) do
    latest_offset = Map.get(offsets, {topic, partition})

    {:reply, latest_offset, state}
  end

  @impl GenServer
  def handle_call({:set_latest_offset, topic, partition, offset}, _pid, state) do
    new_offsets =
      Map.update(state.current_offsets, {topic, partition}, offset, fn existing_offset ->
        if offset < existing_offset do
          # If this was called with an older offset, ignore it and log a warning.
          # This should never happen, but if it does this check will avoid causing a rewind.
          Logger.warn(
            "#{__MODULE__} Ignoring :set_latest_offset - \
          it was called for topic #{topic}, partition #{partition} with an offset (#{offset}) less than the existing one (#{existing_offset})."
          )

          existing_offset
        else
          offset
        end
      end)

    {:reply, :ok, %{state | current_offsets: new_offsets}}
  end

  @impl GenServer
  def handle_cast({:update_generation, generation_id}, state) do
    {:noreply, %{state | generation_id: generation_id}}
  end

  @impl GenServer
  def handle_cast({:ack, topic, partition, generation_id, offset}, state) do
    case state.generation_id == generation_id do
      true ->
        :ok = :brod_group_coordinator.ack(state.group_coordinator_pid, generation_id, topic, partition, offset)
        :ok = Elsa.Consumer.ack(state.connection, topic, partition, offset)

        new_offsets = update_offset(state.current_offsets, topic, partition, offset)
        {:noreply, %{state | current_offsets: new_offsets}}

      false ->
        Logger.warn(
          "#{__MODULE__}: Invalid generation_id #{state.generation_id} == #{generation_id}, ignoring ack - topic #{topic} partition #{partition} offset #{offset}"
        )

        {:noreply, state}
    end
  end

  defp update_offset(offsets, topic, partition, offset) do
    Map.put(offsets, {topic, partition}, offset + 1)
  end
end