lib/nsq/consumer/backoff.ex

defmodule NSQ.Consumer.Backoff do
  @moduledoc """
  When messages fail unexpectedly hard, we go into "backoff mode".
  """

  alias NSQ.Consumer, as: C
  alias NSQ.Consumer.Connections
  alias NSQ.Consumer.RDY
  import NSQ.Consumer.Helpers

  @doc """
  Decision point about whether to continue/end/ignore backoff.
  """
  @spec start_stop_continue(pid, atom, C.state()) :: {:ok, C.state()}
  def start_stop_continue(cons, backoff_signal, cons_state) do
    {backoff_updated, cons_state} =
      cons_state |> update_backoff_counter(backoff_signal)

    cond do
      cons_state.config.max_backoff_duration <= 0 ->
        # Never backoff if max_backoff_duration is <= 0
        {:ok, cons_state}

      cons_state.backoff_counter == 0 && backoff_updated ->
        {:ok, _state} = exit_backoff(cons, cons_state)

      cons_state.backoff_counter > 0 ->
        {:ok, _state} = backoff(cons, cons_state, backoff_signal)

      true ->
        {:ok, cons_state}
    end
  end

  def start_stop_continue!(cons, backoff_signal, cons_state) do
    {:ok, cons_state} = start_stop_continue(cons, backoff_signal, cons_state)
    cons_state
  end

  @doc """
  This function is called asynchronously from `resume_later`. It
  will cause one connection to have RDY 1. We only resume after this if
  messages succeed a number of times == backoff_counter. (That logic is in
  start_stop_continue.)
  """
  @spec resume(pid, C.state()) :: {:ok, C.state()}
  def resume(_cons, %{backoff_duration: 0, backoff_counter: 0} = cons_state),
    # looks like we successfully left backoff mode already
    do: {:ok, cons_state}

  def resume(_cons, %{stop_flag: true} = cons_state),
    do: {:ok, %{cons_state | backoff_duration: 0}}

  def resume(cons, cons_state) do
    {:ok, cons_state} =
      if Connections.count(cons_state) == 0 do
        # This could happen if nsqlookupd suddenly stops discovering
        # connections. Maybe a network partition?
        NSQ.Logger.warn("no connection available to resume")
        NSQ.Logger.warn("backing off for 1 second")
        resume_later(cons, 1000, cons_state)
      else
        # pick a random connection to test the waters
        conn = random_connection_for_backoff(cons_state)
        NSQ.Logger.warn("(#{inspect(conn)}) backoff timeout expired, sending RDY 1")

        # while in backoff only ever let 1 message at a time through
        RDY.update(cons, conn, 1, cons_state)
      end

    {:ok, %{cons_state | backoff_duration: 0}}
  end

  def resume!(cons, cons_state) do
    {:ok, cons_state} = resume(cons, cons_state)
    cons_state
  end

  @spec backoff(pid, C.state(), boolean) :: {:ok, C.state()}
  defp backoff(cons, cons_state, backoff_signal) do
    backoff_duration = calculate_backoff(cons_state)

    NSQ.Logger.warn(
      "backing off for #{backoff_duration / 1000} seconds (backoff level #{cons_state.backoff_counter}), setting all to RDY 0"
    )

    # send RDY 0 immediately (to *all* connections)
    cons_state =
      Enum.reduce(Connections.get(cons_state), cons_state, fn conn, last_state ->
        {:ok, new_state} = RDY.update(cons, conn, 0, last_state)
        new_state
      end)

    :gen_event.notify(cons_state.event_manager_pid, backoff_signal)
    {:ok, _cons_state} = resume_later(cons, backoff_duration, cons_state)
  end

  @spec update_backoff_counter(C.state(), atom) :: {boolean, C.state()}
  defp update_backoff_counter(cons_state, backoff_signal) do
    {backoff_updated, backoff_counter} =
      cond do
        backoff_signal == :resume ->
          if cons_state.backoff_counter <= 0 do
            {false, cons_state.backoff_counter}
          else
            {true, cons_state.backoff_counter - 1}
          end

        backoff_signal == :backoff ->
          {true, cons_state.backoff_counter + 1}

        true ->
          {false, cons_state.backoff_counter}
      end

    cons_state = %{cons_state | backoff_counter: backoff_counter}

    {backoff_updated, cons_state}
  end

  @spec exit_backoff(pid, C.state()) :: {:ok, C.state()}
  defp exit_backoff(cons, cons_state) do
    count = per_conn_max_in_flight(cons_state)
    NSQ.Logger.warn("exiting backoff, returning all to RDY #{count}")

    cons_state =
      Enum.reduce(Connections.get(cons_state), cons_state, fn conn, last_state ->
        {:ok, new_state} = RDY.update(cons, conn, count, last_state)
        new_state
      end)

    :gen_event.notify(cons_state.event_manager_pid, :resume)
    {:ok, cons_state}
  end

  # Try resuming from backoff in a few seconds.
  @spec resume_later(pid, integer, C.state()) ::
          {:ok, C.state()}
  defp resume_later(cons, duration, cons_state) do
    Task.start_link(fn ->
      :timer.sleep(duration)
      GenServer.cast(cons, :resume)
    end)

    cons_state = %{cons_state | backoff_duration: duration}
    {:ok, cons_state}
  end

  @spec random_connection_for_backoff(C.state()) :: C.connection()
  defp random_connection_for_backoff(cons_state) do
    if cons_state.config.backoff_strategy == :test do
      # When testing, we're only sending 1 message at a time to a single
      # nsqd. In this mode, instead of a random connection, always use the
      # first one that was defined, which ends up being the last one in our
      # list.
      cons_state |> Connections.get() |> List.last()
    else
      cons_state |> Connections.get() |> Enum.random()
    end
  end

  # Returns the backoff duration in milliseconds. Different strategies can
  # technically be used, but currently there is only `:exponential` in
  # production mode and `:test` for tests. Not for external use.
  @spec calculate_backoff(C.state()) :: integer
  defp calculate_backoff(cons_state) do
    case cons_state.config.backoff_strategy do
      :exponential -> exponential_backoff(cons_state)
      :test -> 200
    end
  end

  # Used to calculate backoff in milliseconds in production. We include jitter
  # so that, if we have many consumers in a cluster, we avoid the thundering
  # herd problem when they attempt to resume. Not for external use.
  @spec exponential_backoff(C.state()) :: integer
  defp exponential_backoff(cons_state) do
    attempts = cons_state.backoff_counter
    mult = cons_state.config.backoff_multiplier

    min(
      mult * :math.pow(2, attempts),
      cons_state.config.max_backoff_duration
    )
    |> round
  end
end