lib/elsa/consumer.ex

defmodule Elsa.Consumer do
  @moduledoc """
  Public api to consumer acks asynchronously for simple consumers
  """

  import Elsa.ElsaSupervisor, only: [registry: 1]

  alias Elsa.ElsaRegistry

  @type offset :: non_neg_integer()

  @doc """
  Retrieves a process id of a consumer registered to the
  Elsa Registry and performs a consume-ack of the messages
  ready to be read off the topic.
  """
  @spec ack(Elsa.connection(), Elsa.topic(), Elsa.partition(), offset()) :: :ok
  def ack(connection, topic, partition, offset) do
    pid = get_consumer(connection, topic, partition)
    :brod_consumer.ack(pid, offset)
  end

  defp get_consumer(connection, topic, partition) do
    consumer_name = :"consumer_#{topic}_#{partition}"
    ElsaRegistry.whereis_name({registry(connection), consumer_name})
  end
end