lib/off_broadway_amqp10/acknowledger.ex

defmodule OffBroadwayAmqp10.Acknowledger do
  @moduledoc """
  AMQP Broadway Ackoledger
  """
  alias Broadway.Acknowledger

  @behaviour Acknowledger

  @impl Acknowledger
  def ack(amqp_state, successful, failed) do
    ack_messages(successful, amqp_state, :successful)
    ack_messages(failed, amqp_state, :failed)
  end

  defp ack_messages(_failed, _amqp_state, :failed) do
    :ok
  end

  defp ack_messages(successful, amqp_state, :successful) do
    for %{acknowledger: {_module, _receiver, ack_data}} = msg <- successful do
      :ok = amqp_state.client_module.accept_msg(amqp_state, ack_data)
      msg.data
    end

    :ok
  end
end