defmodule Broadway.Acknowledger do
  @moduledoc """
  A behaviour used to acknowledge that the received messages
  were successfully processed or failed.

  When implementing a new connector for Broadway, you should
  implement this behaviour and consider how the technology
  you're working with handles message acknowledgement.

  The `c:ack/3` callback must be implemented in order to notify
  the origin of the data that a message can be safely removed
  after been successfully processed and published. In case of
  failed messages or messages without acknowledgement, depending
  on the technology chosen, the messages can be either moved back
  in the queue or, alternatively, moved to a *dead-letter queue*.

  alias Broadway.Message

  require Logger

  @doc """
  Invoked to acknowledge successful and failed messages.

    * `ack_ref` is a term that uniquely identifies how messages
      should be grouped and sent for acknowledgement. Imagine
      you have a scenario where messages are coming from
      different producers. Broadway will use this information
      to correctly identify the acknowledger and pass it among
      with the messages so you can properly communicate with
      the source of the data for acknowledgement.

    * `successful` is the list of messages that were
      successfully processed and published.

    * `failed` is the list of messages that, for some reason,
      could not be processed or published.

  @callback ack(ack_ref :: term, successful :: [Message.t()], failed :: [Message.t()]) ::

  @doc """
  Configures the acknowledger with new `options`.

  Every acknowledger can decide how to incorporate the given `options` into its
  `ack_data`. The `ack_data` is the current acknowledger's data. The return value
  of this function is `{:ok, new_ack_data}` where `new_ack_data` is the updated
  data for the acknowledger.

  Note that `options` are different for every acknowledger, as the acknowledger
  is what specifies what are the supported options. Check the documentation for the
  acknowledger you're using to see the supported options.
  @callback configure(ack_ref :: term, ack_data :: term, options :: keyword) ::
              {:ok, new_ack_data :: term}

  @optional_callbacks [configure: 3]

  @doc false
  @spec ack_messages([Message.t()], [Message.t()]) :: no_return
  def ack_messages(successful, failed) do
    |> group_by_acknowledger(successful, :successful)
    |> group_by_acknowledger(failed, :failed)
    |> Enum.each(&call_ack/1)

  defp group_by_acknowledger(ackers, messages, key) do
    Enum.reduce(messages, ackers, fn %{acknowledger: {acknowledger, ack_ref, _}} = msg, acc ->
      ack_info = {acknowledger, ack_ref}
      pdict_key = {ack_info, key}
      Process.put(pdict_key, [msg | Process.get(pdict_key, [])])
      Map.put(acc, ack_info, true)

  defp call_ack({{acknowledger, ack_ref} = ack_info, true}) do
    successful = Process.delete({ack_info, :successful}) || []
    failed = Process.delete({ack_info, :failed}) || []
    acknowledger.ack(ack_ref, Enum.reverse(successful), Enum.reverse(failed))

  @doc false
  # Builds a crash reason used in Logger reporting.
  def crash_reason(:throw, reason, stack), do: {{:nocatch, reason}, stack}
  def crash_reason(:error, reason, stack), do: {Exception.normalize(:error, reason, stack), stack}
  def crash_reason(:exit, reason, stack), do: {reason, stack}

  # Used by the processor and the batcher to maybe call c:handle_failed/2
  # on failed messages.
  @doc false
  def maybe_handle_failed_messages(messages, module, context) do
    if function_exported?(module, :handle_failed, 2) and messages != [] do
      handle_failed_messages(messages, module, context)

  defp handle_failed_messages(messages, module, context) do
    module.handle_failed(messages, context)
    kind, reason ->
      Logger.error(Exception.format(kind, reason, __STACKTRACE__),
        crash_reason: crash_reason(kind, reason, __STACKTRACE__)

    return_messages when is_list(return_messages) ->
      size = length(messages)
      return_size = length(return_messages)

      if return_size != size do
          "#{inspect(module)}.handle_failed/2 received #{size} messages and " <>
            "returned only #{return_size}. All messages given to handle_failed/2 " <>
            "must be returned"


    _other ->
        "#{inspect(module)}.handle_failed/2 didn't return a list of messages, " <>
          "so ignoring its return value"