lib/nsq/consumer/helpers.ex

defmodule NSQ.Consumer.Helpers do
  alias NSQ.Consumer, as: C
  alias NSQ.Consumer.Connections
  alias NSQ.ConnInfo

  @doc """
  Each connection is responsible for maintaining its own rdy_count in ConnInfo.
  This function sums all the values of rdy_count for each connection, which
  lets us get an accurate picture of a consumer's total RDY count. Not for
  external use.
  """
  @spec total_rdy_count(pid) :: integer
  def total_rdy_count(agent_pid) when is_pid(agent_pid) do
    ConnInfo.reduce(agent_pid, 0, fn {_, conn_info}, acc ->
      acc + conn_info.rdy_count
    end)
  end

  # Convenience function; uses the consumer state to get the conn info pid.
  # Not for external use.
  @spec total_rdy_count(C.state()) :: integer
  def total_rdy_count(%{conn_info_pid: agent_pid} = _cons_state) do
    total_rdy_count(agent_pid)
  end

  @doc """
  Returns how much `max_in_flight` should be distributed to each connection.
  If `max_in_flight` is less than the number of connections, then this always
  returns 1 and they are randomly distributed via `redistribute_rdy`. Not for
  external use.
  """
  @spec per_conn_max_in_flight(C.state()) :: integer
  def per_conn_max_in_flight(cons_state) do
    max_in_flight = cons_state.max_in_flight
    conn_count = Connections.count(cons_state)

    if conn_count == 0 do
      0
    else
      min(max(1, max_in_flight / conn_count), max_in_flight) |> round
    end
  end

  @spec now() :: integer
  def now do
    :calendar.datetime_to_gregorian_seconds(:calendar.universal_time())
  end

  @spec conn_from_nsqd(pid, C.host_with_port(), C.state()) :: C.connection()
  def conn_from_nsqd(cons, nsqd, cons_state) do
    needle = ConnInfo.conn_id(cons, nsqd)

    Enum.find(Connections.get(cons_state), fn {conn_id, _} ->
      needle == conn_id
    end)
  end
end