lib/broadway/acknowledger.ex

defmodule Broadway.Acknowledger do
  @moduledoc """
  A behaviour used to acknowledge that the received messages
  were successfully processed or failed.

  When implementing a new connector for Broadway, you should
  implement this behaviour and consider how the technology
  you're working with handles message acknowledgement.

  The `c:ack/3` callback must be implemented in order to notify
  the origin of the data that a message can be safely removed
  after been successfully processed and published. In case of
  failed messages or messages without acknowledgement, depending
  on the technology chosen, the messages can be either moved back
  in the queue or, alternatively, moved to a *dead-letter queue*.
  """

  alias Broadway.Message

  require Logger

  @doc """
  Invoked to acknowledge successful and failed messages.

    * `ack_ref` is a term that uniquely identifies how messages
      should be grouped and sent for acknowledgement. Imagine
      you have a scenario where messages are coming from
      different producers. Broadway will use this information
      to correctly identify the acknowledger and pass it among
      with the messages so you can properly communicate with
      the source of the data for acknowledgement.

    * `successful` is the list of messages that were
      successfully processed and published.

    * `failed` is the list of messages that, for some reason,
      could not be processed or published.

  """
  @callback ack(ack_ref :: term, successful :: [Message.t()], failed :: [Message.t()]) ::
              :ok

  @doc """
  Configures the acknowledger with new `options`.

  Every acknowledger can decide how to incorporate the given `options` into its
  `ack_data`. The `ack_data` is the current acknowledger's data. The return value
  of this function is `{:ok, new_ack_data}` where `new_ack_data` is the updated
  data for the acknowledger.

  Note that `options` are different for every acknowledger, as the acknowledger
  is what specifies what are the supported options. Check the documentation for the
  acknowledger you're using to see the supported options.
  """
  @callback configure(ack_ref :: term, ack_data :: term, options :: keyword) ::
              {:ok, new_ack_data :: term}

  @optional_callbacks [configure: 3]

  @doc false
  @spec ack_messages([Message.t()], [Message.t()]) :: no_return
  def ack_messages(successful, failed) do
    %{}
    |> group_by_acknowledger(successful, :successful)
    |> group_by_acknowledger(failed, :failed)
    |> Enum.each(&call_ack/1)
  end

  defp group_by_acknowledger(ackers, messages, key) do
    Enum.reduce(messages, ackers, fn %{acknowledger: {acknowledger, ack_ref, _}} = msg, acc ->
      ack_info = {acknowledger, ack_ref}
      pdict_key = {ack_info, key}
      Process.put(pdict_key, [msg | Process.get(pdict_key, [])])
      Map.put(acc, ack_info, true)
    end)
  end

  defp call_ack({{acknowledger, ack_ref} = ack_info, true}) do
    successful = Process.delete({ack_info, :successful}) || []
    failed = Process.delete({ack_info, :failed}) || []
    acknowledger.ack(ack_ref, Enum.reverse(successful), Enum.reverse(failed))
  end

  @doc false
  # Builds a crash reason used in Logger reporting.
  def crash_reason(:throw, reason, stack), do: {{:nocatch, reason}, stack}
  def crash_reason(:error, reason, stack), do: {Exception.normalize(:error, reason, stack), stack}
  def crash_reason(:exit, reason, stack), do: {reason, stack}

  # Used by the processor and the batcher to maybe call c:handle_failed/2
  # on failed messages.
  @doc false
  def maybe_handle_failed_messages(messages, module, context) do
    if function_exported?(module, :handle_failed, 2) and messages != [] do
      handle_failed_messages(messages, module, context)
    else
      messages
    end
  end

  defp handle_failed_messages(messages, module, context) do
    module.handle_failed(messages, context)
  catch
    kind, reason ->
      Logger.error(Exception.format(kind, reason, __STACKTRACE__),
        crash_reason: crash_reason(kind, reason, __STACKTRACE__)
      )

      messages
  else
    return_messages when is_list(return_messages) ->
      size = length(messages)
      return_size = length(return_messages)

      if return_size != size do
        Logger.error(
          "#{inspect(module)}.handle_failed/2 received #{size} messages and " <>
            "returned only #{return_size}. All messages given to handle_failed/2 " <>
            "must be returned"
        )
      end

      return_messages

    _other ->
      Logger.error(
        "#{inspect(module)}.handle_failed/2 didn't return a list of messages, " <>
          "so ignoring its return value"
      )

      messages
  end
end