lib/nsq/message.ex

defmodule NSQ.Message do
  # ------------------------------------------------------- #
  # Directives                                              #
  # ------------------------------------------------------- #
  import NSQ.Protocol
  alias NSQ.Connection.Buffer
  use GenServer

  # ------------------------------------------------------- #
  # Struct Definition                                       #
  # ------------------------------------------------------- #
  defstruct [
    :id,
    :reader,
    :writer,
    :timestamp,
    :attempts,
    :body,
    :connection,
    :consumer,
    :config,
    :parent,
    :processing_pid,
    :event_manager_pid,
    :msg_timeout
  ]

  # ------------------------------------------------------- #
  # Behaviour Implementation                                #
  # ------------------------------------------------------- #
  def start_link(message, opts \\ []) do
    {:ok, _pid} = GenServer.start_link(__MODULE__, message, opts)
  end

  def init(message) do
    # Process the message asynchronously after init.
    GenServer.cast(self(), :process)
    {:ok, message}
  end

  def handle_cast(:process, message) do
    # `process/1` will send the return value to `message.connection` as part of
    # its standard procedure.
    process(message)
    {:noreply, message}
  end

  # ------------------------------------------------------- #
  # API Definitions                                         #
  # ------------------------------------------------------- #
  @doc """
  Given a raw NSQ message with frame_type = message, return a struct with its
  parsed data.
  """
  def from_data(data) do
    {:ok, message} = decode_as_message(data)
    Map.merge(%NSQ.Message{}, message)
  end

  @doc """
  This is the main entry point when processing a message. It starts the message
  GenServer and immediately kicks of a processing call.
  """
  def process(message) do
    # Kick off processing in a separate process, so we can kill it if it takes
    # too long.
    message = %{message | parent: self()}

    {:ok, pid} =
      Task.start_link(fn ->
        process_without_timeout(message)
      end)

    message = %{message | processing_pid: pid}

    {:ok, ret_val} = wait_for_msg_done(message)

    # Even if we've killed the message processing, we're still "done"
    # processing it for now. That is, we should free up a spot on
    # messages_in_flight.
    result = {:message_done, message, ret_val}
    send(message.connection, result)

    # Nothing more for this process to do.
    Process.exit(self(), :normal)
  end

  @doc """
  Tells NSQD that we're done processing this message. This is called
  automatically when the handler returns successfully, or when all retries have
  been exhausted.
  """
  def fin(message) do
    NSQ.Logger.debug("(#{inspect(message.connection)}) fin msg ID #{message.id}")
    message |> Buffer.send!(encode({:fin, message.id}))
    :gen_event.notify(message.event_manager_pid, {:message_finished, message})
    GenServer.call(message.consumer, {:start_stop_continue_backoff, :resume})
  end

  @doc """
  Tells NSQD to requeue the message, with delay and backoff. According to the
  go-nsq client (but doc'ed nowhere), a delay of -1 is a special value that
  means nsqd will calculate the duration itself based on the default requeue
  timeout. If backoff=true is set, then the connection will go into "backoff"
  mode, where it stops receiving messages for a fixed duration.
  """
  def req(message, delay \\ -1, backoff \\ false) do
    delay =
      if delay == -1 do
        delay =
          calculate_delay(
            message.attempts,
            message.config.max_requeue_delay
          )

        NSQ.Logger.debug(
          "(#{inspect(message.connection)}) requeue msg ID #{message.id}, delay #{delay} (auto-calculated with attempts #{message.attempts}), backoff #{backoff}"
        )

        delay
      else
        NSQ.Logger.debug(
          "(#{inspect(message.connection)}) requeue msg ID #{message.id}, delay #{delay}, backoff #{backoff}"
        )

        delay
      end

    delay =
      if delay > message.config.max_requeue_delay do
        NSQ.Logger.warn(
          "Invalid requeue delay #{delay}. Must be between 0 and #{message.config.max_requeue_delay}. Sending with max delay #{message.config.max_requeue_delay} instead."
        )

        message.config.max_requeue_delay
      else
        delay
      end

    if backoff do
      GenServer.call(message.consumer, {:start_stop_continue_backoff, :backoff})
    else
      GenServer.call(message.consumer, {:start_stop_continue_backoff, :continue})
    end

    message |> Buffer.send!(encode({:req, message.id, delay}))
    :gen_event.notify(message.event_manager_pid, {:message_requeued, message})
  end

  @doc """
  This function is intended to be used by the handler for long-running
  functions. They can set up a separate process that periodically touches the
  message until the process finishes.
  """
  def touch(message) do
    NSQ.Logger.debug("(#{inspect(message.connection)}) touch msg ID #{message.id}")
    message |> Buffer.send!(encode({:touch, message.id}))
    send(message.parent, {:message_touch, message})
  end

  # ------------------------------------------------------- #
  # Private Functions                                       #
  # ------------------------------------------------------- #
  defp process_without_timeout(message) do
    ret_val =
      if should_fail_message?(message) do
        :fail |> respond_to_nsq(message)
      else
        run_handler_safely(message) |> respond_to_nsq(message)
      end

    send(message.parent, {:message_done, message, ret_val})
  end

  defp should_fail_message?(message) do
    message.config.max_attempts > 0 &&
      message.attempts > message.config.max_attempts
  end

  # Handler can be either an anonymous function or a module that implements the
  # `handle_message\2` function.
  defp run_handler(handler, message) do
    if is_function(handler) do
      handler.(message.body, message)
    else
      handler.handle_message(message.body, message)
    end
  end

  defp run_handler_safely(message) do
    Process.flag(:trap_exit, true)

    try do
      result = run_handler(message.config.message_handler, message)
      Process.flag(:trap_exit, false)
      result
    rescue
      e ->
        NSQ.Logger.error("Error running message handler: #{inspect(e)}")
        NSQ.Logger.error(inspect(__STACKTRACE__))
        {:req, -1, true}
    catch
      :exit, b ->
        NSQ.Logger.error("Caught exit running message handler: :exit, #{inspect(b)}")
        NSQ.Logger.error(inspect(__STACKTRACE__))
        {:req, -1, true}

      a, b ->
        NSQ.Logger.error("Caught exception running message handler: #{inspect(a)}, #{inspect(b)}")
        NSQ.Logger.error(inspect(__STACKTRACE__))
        {:req, -1, true}
    end
  end

  defp respond_to_nsq(ret_val, message) do
    case ret_val do
      :ok ->
        fin(message)

      :fail ->
        NSQ.Logger.warn("msg #{message.id} attempted #{message.attempts} times, giving up")
        fin(message)

      :req ->
        req(message)

      {:req, delay} ->
        req(message, delay)

      {:req, delay, backoff} ->
        req(message, delay, backoff)

      _ ->
        NSQ.Logger.error(
          "Unexpected handler result #{inspect(ret_val)}, requeueing message #{message.id}"
        )

        req(message)
    end

    ret_val
  end

  # We expect our function will send us a message when it's done. Block until
  # that happens. If it takes too long, requeue the message and cancel
  # processing.
  defp wait_for_msg_done(message) do
    receive do
      {:message_done, _msg, ret_val} ->
        {:ok, ret_val}

      {:message_touch, _msg} ->
        NSQ.Logger.debug("Msg #{message.id} received TOUCH, starting a new wait...")
        # If NSQ.Message.touch(msg) is called, we will send TOUCH msg_id to
        # NSQD, but we also need to reset our timeout on the client to avoid
        # processes that hang forever.
        wait_for_msg_done(message)
    after
      message.msg_timeout ->
        # If we've waited this long, we can assume NSQD will requeue the
        # message on its own.
        NSQ.Logger.warn(
          "Msg #{message.id} timed out, quit processing it and expect nsqd to requeue"
        )

        :gen_event.notify(message.event_manager_pid, {:message_requeued, message})
        unlink_and_exit(message.parent)
        {:ok, :req}
    end
  end

  defp unlink_and_exit(pid) do
    Process.unlink(pid)
    Process.exit(pid, :kill)
  end

  defp calculate_delay(attempts, max_requeue_delay) do
    exponential_backoff = :math.pow(2, attempts) * 1000
    jitter = round(0.3 * :rand.uniform() * exponential_backoff)

    min(
      exponential_backoff + jitter,
      max_requeue_delay
    )
    |> round
  end
end