lib/broadway/caller_acknowledger.ex

defmodule Broadway.CallerAcknowledger do
  @moduledoc """
  A simple acknowledger that sends a message back to a caller.

  If you want to use this acknowledger in messages produced by your
  `Broadway.Producer`, you can get its configuration by calling
  the `init/0` function. For example, you can use it in
  `Broadway.test_message/3`:

      some_ref = make_ref()

      Broadway.test_message(
        MyPipeline,
        "some data",
        acknowledger: Broadway.CallerAcknowledger.init({self(), some_ref}, :ignored)
      )

  The first parameter is a tuple with the PID to receive the messages
  and a unique identifier (usually a reference). Such unique identifier
  is then included in the messages sent to the PID. The second parameter,
  which is per message, is ignored.

  It sends a message in the format:

      {:ack, ref, successful_messages, failed_messages}

  If `Broadway.Message.configure_ack/2` is called on a message that
  uses this acknowledger, then the following message is sent:

      {:configure, ref, options}

  """

  @behaviour Broadway.Acknowledger

  @doc """
  Returns the acknowledger metadata.

  See the module documentation.
  """
  @spec init({pid, ref :: term}, ignored_term :: term) :: Broadway.Message.acknowledger()
  def init({pid, ref} = _pid_and_ref, ignored_term) when is_pid(pid) do
    {__MODULE__, {pid, ref}, ignored_term}
  end

  @impl true
  def ack({pid, ref}, successful, failed) do
    send(pid, {:ack, ref, successful, failed})
  end

  @impl true
  def configure({pid, ref}, ack_data, options) do
    send(pid, {:configure, ref, options})
    {:ok, ack_data}
  end
end