defmodule NSQ.Consumer.RDY do
@moduledoc """
Consumers have a lot of logic around calculating and distributing RDY. This
is where that goes!
"""
alias NSQ.ConnInfo
alias NSQ.Consumer, as: C
alias NSQ.Consumer.Connections
import NSQ.Consumer.Helpers
@doc """
Initialized from NSQ.Consumer.Supervisor, sends the consumer a message on a
fixed interval.
"""
@spec redistribute_loop(pid) :: any
def redistribute_loop(cons) do
cons_state = C.get_state(cons)
GenServer.call(cons, :redistribute_rdy)
delay = cons_state.config.rdy_redistribute_interval
:timer.sleep(delay)
redistribute_loop(cons)
end
@doc """
If we're not in backoff mode and we've hit a "trigger point" to update RDY,
then go ahead and update RDY. Not for external use.
"""
@spec maybe_update(pid, C.connection(), C.state()) :: {:ok, C.state()}
def maybe_update(cons, conn, cons_state) do
if cons_state.backoff_counter > 0 || cons_state.backoff_duration > 0 do
# In backoff mode, we only let `start_stop_continue_backoff/3` handle
# this case.
NSQ.Logger.debug("""
(#{inspect(conn)}) skip sending RDY in_backoff:#{cons_state.backoff_counter} || in_backoff_timeout:#{cons_state.backoff_duration}
""")
{:ok, cons_state}
else
[remain, last_rdy] =
ConnInfo.fetch(
cons_state,
ConnInfo.conn_id(conn),
[:rdy_count, :last_rdy]
)
desired_rdy = per_conn_max_in_flight(cons_state)
if remain <= 1 || remain < last_rdy / 4 || (desired_rdy > 0 && desired_rdy < remain) do
NSQ.Logger.debug("""
(#{inspect(conn)}) sending RDY #{desired_rdy} \
(#{remain} remain from last RDY #{last_rdy})
""")
{:ok, _cons_state} = update(cons, conn, desired_rdy, cons_state)
else
NSQ.Logger.debug("""
(#{inspect(conn)}) skip sending RDY #{desired_rdy} \
(#{remain} remain out of last RDY #{last_rdy})
""")
{:ok, cons_state}
end
end
end
def maybe_update!(cons, conn, cons_state) do
{:ok, cons_state} = maybe_update(cons, conn, cons_state)
cons_state
end
@doc """
Try to update RDY for a given connection, taking configuration and the
current state into account. Not for external use.
"""
@spec update(pid, C.connection(), integer, C.state()) :: {:ok, C.state()}
def update(cons, conn, new_rdy, cons_state) do
conn_info = ConnInfo.fetch(cons_state, ConnInfo.conn_id(conn))
cancel_outstanding_retry(cons_state, conn)
# Cap the given RDY based on the connection config.
new_rdy = [new_rdy, conn_info.max_rdy] |> Enum.min() |> round
# Cap the given RDY based on how much we can actually assign. Unless it's
# 0, in which case we'll be retrying.
max_possible_rdy = calc_max_possible(cons_state, conn_info)
new_rdy =
if max_possible_rdy > 0 do
[new_rdy, max_possible_rdy] |> Enum.min() |> round
else
new_rdy
end
{:ok, cons_state} =
if max_possible_rdy <= 0 && new_rdy > 0 do
if conn_info.rdy_count == 0 do
# Schedule RDY.update(consumer, conn, new_rdy) for this connection
# again in 5 seconds. This is to prevent eternal starvation.
retry(cons, conn, new_rdy, cons_state)
else
{:ok, cons_state}
end
else
transmit(conn, new_rdy, cons_state)
end
{:ok, cons_state}
end
def update!(cons, conn, new_rdy, cons_state) do
{:ok, cons_state} = update(cons, conn, new_rdy, cons_state)
cons_state
end
@doc """
Delay for a configured interval, then call RDY.update.
"""
@spec retry(pid, C.connection(), integer, C.state()) :: {:ok, C.state()}
def retry(cons, conn, count, cons_state) do
delay = cons_state.config.rdy_retry_delay
NSQ.Logger.debug("(#{inspect(conn)}) retry RDY in #{delay / 1000} seconds")
{:ok, retry_pid} =
Task.start_link(fn ->
:timer.sleep(delay)
GenServer.call(cons, {:update_rdy, conn, count})
end)
ConnInfo.update(cons_state, ConnInfo.conn_id(conn), %{retry_rdy_pid: retry_pid})
{:ok, cons_state}
end
@doc """
Send a RDY command for the given connection.
"""
@spec transmit(C.connection(), integer, C.state()) :: {:ok, C.state()}
def transmit({_id, pid} = conn, count, cons_state) do
[last_rdy] = ConnInfo.fetch(cons_state, ConnInfo.conn_id(conn), [:last_rdy])
if count == 0 && last_rdy == 0 do
{:ok, cons_state}
else
# We intentionally don't match this GenServer.call. If the socket isn't
# set up or is erroring out, we don't want to propagate that connection
# error to the consumer.
NSQ.Connection.cmd_noresponse(pid, {:rdy, count})
{:ok, cons_state}
end
end
@doc """
This will only be triggered in odd cases where we're in backoff or when there
are more connections than max in flight. It will randomly change RDY on
some connections to 0 and 1 so that they're all guaranteed to eventually
process messages.
"""
@spec redistribute(pid, C.state()) :: {:ok, C.state()}
def redistribute(cons, cons_state) do
if should_redistribute?(cons_state) do
conns = Connections.get(cons_state)
conn_count = length(conns)
if conn_count > cons_state.max_in_flight do
NSQ.Logger.debug("""
redistributing RDY state
(#{conn_count} conns > #{cons_state.max_in_flight} max_in_flight)
""")
end
if cons_state.backoff_counter > 0 && conn_count > 1 do
NSQ.Logger.debug("""
redistributing RDY state (in backoff and #{conn_count} conns > 1)
""")
end
# Free up any connections that are RDY but not processing messages.
Connections.idle_with_rdy(cons_state)
|> Enum.map(fn conn ->
NSQ.Logger.debug("(#{inspect(conn)}) idle connection, giving up RDY")
{:ok, _cons_state} = update(cons, conn, 0, cons_state)
end)
# Determine how much RDY we can distribute. This needs to happen before
# we give up RDY, or max_in_flight will end up equalling RDY.
available_max_in_flight = get_available_max_in_flight(cons_state)
# Distribute it!
{sorted_conns, cons_state} = sort_conns_for_round_robin(conns, cons_state)
distribute(cons, sorted_conns, available_max_in_flight, cons_state)
else
# Nothing to do. This is the usual path!
{:ok, cons_state}
end
end
def redistribute!(cons, cons_state) do
{:ok, cons_state} = redistribute(cons, cons_state)
cons_state
end
# Helper for redistribute; we set RDY to 1 for _some_ connections that
# were halted, until there's no more RDY left to assign. We assume that the
# list of connections has already been sorted in the order that we should
# distribute.
@spec distribute(pid, [C.connection()], integer, C.state()) ::
{:ok, C.state()}
defp distribute(cons, possible_conns, available_max_in_flight, cons_state) do
if length(possible_conns) == 0 || available_max_in_flight <= 0 do
{:ok, cons_state}
else
[conn | rest] = possible_conns
NSQ.Logger.debug("(#{inspect(conn)}) redistributing RDY")
{:ok, cons_state} = update(cons, conn, 1, cons_state)
distribute(cons, rest, available_max_in_flight - 1, cons_state)
end
end
@spec sort_conns_for_round_robin([C.connection()], C.state()) :: {[C.connection()], C.state()}
defp sort_conns_for_round_robin(conns, cons_state) do
# We sort to ensure consistency of start_index across runs.
sorted_conns = conns |> Enum.sort_by(fn {conn_id, _} -> conn_id end)
start_index = rem(cons_state.distribution_counter, length(sorted_conns))
# We want to start distributing from a specific index. This reorders the
# list of connections so that the target index becomes first, followed by
# elements in their natural sorted order. Once we hit the end of the list,
# it wraps back to the beginning, still in order.
sorted_conns =
sorted_conns
|> Enum.split(start_index)
|> Tuple.to_list()
|> Enum.reverse()
|> Enum.concat()
# By increasing distribution count, we ensure that the start_index will be
# different next time this runs. If the number of connections does not
# change, we will shift the distribution by 1 with each run.
{
sorted_conns,
%{cons_state | distribution_counter: cons_state.distribution_counter + 1}
}
end
@spec cancel_outstanding_retry(C.state(), C.connection()) :: any
defp cancel_outstanding_retry(cons_state, conn) do
conn_info = ConnInfo.fetch(cons_state, ConnInfo.conn_id(conn))
# If this is for a connection that's retrying, kill the timer and clean up.
if retry_pid = conn_info.retry_rdy_pid do
if Process.alive?(retry_pid) do
NSQ.Logger.debug(
"(#{inspect(conn)}) rdy retry pid #{inspect(retry_pid)} detected, killing"
)
Process.exit(retry_pid, :normal)
end
ConnInfo.update(cons_state, ConnInfo.conn_id(conn), %{retry_rdy_pid: nil})
end
end
@spec calc_max_possible(C.state(), map) :: integer
defp calc_max_possible(cons_state, conn_info) do
rdy_count = conn_info.rdy_count
max_in_flight = cons_state.max_in_flight
total_rdy = total_rdy_count(cons_state)
max_in_flight - total_rdy + rdy_count
end
@spec should_redistribute?(C.state()) :: boolean
defp should_redistribute?(cons_state) do
conn_count = Connections.count(cons_state)
in_backoff = cons_state.backoff_counter > 0
in_backoff_timeout = cons_state.backoff_duration > 0
!in_backoff_timeout &&
conn_count > 0 &&
(conn_count > cons_state.max_in_flight ||
(in_backoff && conn_count > 1) ||
cons_state.need_rdy_redistributed)
end
# Cap available max in flight based on current RDY/backoff status.
defp get_available_max_in_flight(cons_state) do
total_rdy = total_rdy_count(cons_state)
if cons_state.backoff_counter > 0 do
# In backoff mode, we only ever want RDY=1 for the whole consumer. This
# makes sure that available is only 1 if total_rdy is 0.
1 - total_rdy
else
cons_state.max_in_flight - total_rdy
end
end
end