defmodule NSQ.Consumer do
@moduledoc """
A consumer is a process that creates connections to NSQD to receive messages
for a specific topic and channel. It has three primary functions:
1. Provide a simple interface for a user to setup and configure message
handlers.
2. Balance RDY across all available connections.
3. Add/remove connections as they are discovered.
## Simple Interface
In standard practice, the only function a user should need to know about is
`NSQ.Consumer.Supervisor.start_link/3`. It takes a topic, a channel, and an
NSQ.Config struct, which has possible values defined and explained in
nsq/config.ex.
{:ok, consumer} = NSQ.Consumer.Supervisor.start_link("my-topic", "my-channel", %NSQ.Config{
nsqlookupds: ["127.0.0.1:6751", "127.0.0.1:6761"],
message_handler: fn(body, msg) ->
# handle them message
:ok
end
})
### Message handler return values
The return value of the message handler determines how we will respond to
NSQ.
#### :ok
The message was handled and should not be requeued. This sends a FIN command
to NSQD.
#### :req
This message should be requeued. With no delay specified, it will calculate
delay exponentially based on the number of attempts. Refer to
Message.calculate_delay for the exact formula.
#### {:req, delay}
This message should be requeued. Use the delay specified. A positive integer
is expected.
#### {:req, delay, backoff}
This message should be requeued. Use the delay specified. If `backoff` is
truthy, the consumer will temporarily set RDY to 0 in order to stop receiving
messages. It will use a standard strategy to resume from backoff mode.
This type of return value is only meant for exceptional cases, such as
internal network partitions, where stopping message handling briefly could be
beneficial. Only use this return value if you know what you're doing.
A message handler that throws an unhandled exception will automatically
requeue and enter backoff mode.
### NSQ.Message.touch(msg)
NSQ.Config has a property called msg_timeout, which configures the NSQD
server to wait that long before assuming the message failed and requeueing
it. If you expect your message handler to take longer than that, you can call
`NSQ.Message.touch(msg)` from the message handler to reset the server-side
timer.
### NSQ.Consumer.change_max_in_flight(consumer, max_in_flight)
If you'd like to manually change the max in flight of a consumer, use this
function. It will cause the consumer's connections to rebalance to the new
value. If the new `max_in_flight` is smaller than the current messages in
flight, it must wait for the existing handlers to finish or requeue before
it can fully rebalance.
"""
# ------------------------------------------------------- #
# Directives #
# ------------------------------------------------------- #
use GenServer
import NSQ.Protocol
import NSQ.Consumer.Helpers
alias NSQ.Consumer.Backoff
alias NSQ.Consumer.Connections
alias NSQ.Consumer.RDY
alias NSQ.ConnInfo, as: ConnInfo
# ------------------------------------------------------- #
# Module Attributes #
# ------------------------------------------------------- #
@initial_state %{
channel: nil,
config: %NSQ.Config{},
conn_sup_pid: nil,
conn_info_pid: nil,
event_manager_pid: nil,
max_in_flight: 2500,
topic: nil,
message_handler: nil,
need_rdy_redistributed: false,
stop_flag: false,
backoff_counter: 0,
backoff_duration: 0,
distribution_counter: 0
}
# ------------------------------------------------------- #
# Type Definitions #
# ------------------------------------------------------- #
@typedoc """
A tuple with a host and a port.
"""
@type host_with_port :: {String.t(), integer}
@typedoc """
A tuple with a string ID (used to target the connection in
NSQ.Connection.Supervisor) and a PID of the connection.
"""
@type connection :: {String.t(), pid}
@typedoc """
A map, but we can be more specific by asserting some entries that should be
set for a connection's state map.
"""
@type cons_state :: %{conn_sup_pid: pid, config: NSQ.Config.t(), conn_info_pid: pid}
@type state :: %{conn_sup_pid: pid, config: NSQ.Config.t(), conn_info_pid: pid}
# ------------------------------------------------------- #
# Behaviour Implementation #
# ------------------------------------------------------- #
@doc """
Starts a Consumer process, called via the supervisor.
"""
@spec start_link({String.t() | String.t() | NSQ.Config.t()}) :: {:ok, pid}
def start_link({topic, channel, config}), do: start_link({topic, channel, config, []})
@spec start_link({String.t() | String.t() | NSQ.Config.t() | list}) :: {:ok, pid}
def start_link({topic, channel, config, opts}) do
{:ok, config} = NSQ.Config.validate(config)
{:ok, config} = NSQ.Config.normalize(config)
unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}")
unless is_valid_channel_name?(channel), do: raise("Invalid channel name #{channel}")
state = %{
@initial_state
| topic: topic,
channel: channel,
config: config,
max_in_flight: config.max_in_flight
}
GenServer.start_link(__MODULE__, state, opts)
end
@doc """
On init, we create a connection for each NSQD instance discovered, and set
up loops for discovery and RDY redistribution.
"""
@spec init(map) :: {:ok, cons_state}
def init(cons_state) do
{:ok, conn_sup_pid} = NSQ.Connection.Supervisor.start_link()
cons_state = %{cons_state | conn_sup_pid: conn_sup_pid}
{:ok, conn_info_pid} = Agent.start_link(fn -> %{} end)
cons_state = %{cons_state | conn_info_pid: conn_info_pid}
manager =
if cons_state.config.event_manager do
cons_state.config.event_manager
else
{:ok, manager} = :gen_event.start_link()
manager
end
cons_state = %{cons_state | event_manager_pid: manager}
cons_state = %{cons_state | max_in_flight: cons_state.config.max_in_flight}
{:ok, _cons_state} = Connections.discover_nsqds_and_connect(self(), cons_state)
end
# The RDY loop periodically calls this to make sure RDY is balanced among our
# connections.
@spec handle_call(:redistribute_rdy, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
def handle_call(:redistribute_rdy, _from, cons_state) do
{:reply, :ok, RDY.redistribute!(self(), cons_state)}
end
# The discovery loop calls this periodically to add/remove active nsqd
# connections. Called from Consumer.Supervisor.
@spec handle_call(:discover_nsqds, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
def handle_call(:discover_nsqds, _from, cons_state) do
{:reply, :ok, Connections.refresh!(cons_state)}
end
# Only used for specs.
@spec handle_call(:delete_dead_connections, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
def handle_call(:delete_dead_connections, _from, cons_state) do
{:reply, :ok, Connections.delete_dead!(cons_state)}
end
# Called from `NSQ.Message.fin/1`. Not for external use.
@spec handle_call({:start_stop_continue_backoff, atom}, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
def handle_call({:start_stop_continue_backoff, backoff_flag}, _from, cons_state) do
{:reply, :ok, Backoff.start_stop_continue!(self(), backoff_flag, cons_state)}
end
@spec handle_call({:update_rdy, connection, integer}, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
def handle_call({:update_rdy, conn, count}, _from, cons_state) do
{:reply, :ok, RDY.update!(self(), conn, count, cons_state)}
end
# Called from tests to assert correct consumer state. Not for external use.
@spec handle_call(:state, {reference, pid}, cons_state) ::
{:reply, cons_state, cons_state}
def handle_call(:state, _from, state) do
{:reply, state, state}
end
def handle_call(:starved, _from, cons_state) do
is_starved =
ConnInfo.all(cons_state.conn_info_pid)
|> Enum.any?(fn {_conn_id, info} ->
info.messages_in_flight > 0 &&
info.messages_in_flight >= info.last_rdy * 0.85
end)
{:reply, is_starved, cons_state}
end
# Called from `NSQ.Consumer.change_max_in_flight(consumer, max_in_flight)`. Not
# for external use.
@spec handle_call({:max_in_flight, integer}, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
def handle_call({:max_in_flight, new_max_in_flight}, _from, state) do
{:reply, :ok, %{state | max_in_flight: new_max_in_flight}}
end
def handle_call(:close, _, cons_state) do
{:reply, :ok, Connections.close!(cons_state)}
end
# Called from NSQ.Consume.event_manager.
@spec handle_call(:event_manager, any, cons_state) ::
{:reply, pid, cons_state}
def handle_call(:event_manager, _from, state) do
{:reply, state.event_manager_pid, state}
end
# Called to observe all connection stats. For debugging or reporting purposes.
@spec handle_call(:conn_info, any, cons_state) :: {:reply, map, cons_state}
def handle_call(:conn_info, _from, state) do
{:reply, ConnInfo.all(state.conn_info_pid), state}
end
# Called from `Backoff.resume_later/3`. Not for external use.
@spec handle_cast(:resume, cons_state) :: {:noreply, cons_state}
def handle_cast(:resume, state) do
{:noreply, Backoff.resume!(self(), state)}
end
# Called from `NSQ.Connection.handle_cast({:nsq_msg, _}, _)` after each message
# is received. Not for external use.
@spec handle_cast({:maybe_update_rdy, host_with_port}, cons_state) ::
{:noreply, cons_state}
def handle_cast({:maybe_update_rdy, {_host, _port} = nsqd}, cons_state) do
conn = conn_from_nsqd(self(), nsqd, cons_state)
{:noreply, RDY.maybe_update!(self(), conn, cons_state)}
end
# ------------------------------------------------------- #
# API Definitions #
# ------------------------------------------------------- #
def starved?(sup_pid) do
cons = get(sup_pid)
GenServer.call(cons, :starved)
end
def close(sup_pid) do
cons = get(sup_pid)
GenServer.call(cons, :close)
end
@doc """
Called from tests to assert correct consumer state. Not for external use.
"""
@spec get_state(pid) :: {:ok, cons_state}
def get_state(cons) do
GenServer.call(cons, :state)
end
@doc """
Public function to change `max_in_flight` for a consumer. The new value will
be balanced across connections.
"""
@spec change_max_in_flight(pid, integer) :: {:ok, :ok}
def change_max_in_flight(sup_pid, new_max_in_flight) do
cons = get(sup_pid)
GenServer.call(cons, {:max_in_flight, new_max_in_flight})
end
@doc """
If the event manager is not defined in NSQ.Config, it will be generated. So
if you want to attach event handlers on the fly, you can use a syntax like
`NSQ.Consumer.event_manager(consumer) |> :gen_event.add_handler(MyHandler, [])`
"""
def event_manager(sup_pid) do
cons = get(sup_pid)
GenServer.call(cons, :event_manager)
end
def conn_info(sup_pid) do
cons = get(sup_pid)
GenServer.call(cons, :conn_info)
end
@doc """
NSQ.Consumer.Supervisor.start_link returns the supervisor pid so that we can
effectively recover from consumer crashes. This function takes the supervisor
pid and returns the consumer pid. We use this for public facing functions so
that the end user can simply target the supervisor, e.g.
`NSQ.Consumer.change_max_in_flight(supervisor_pid, 100)`. Not for external
use.
"""
@spec get(pid) :: pid
def get(sup_pid) do
children = Supervisor.which_children(sup_pid)
child = Enum.find(children, fn {kind, _, _, _} -> kind == NSQ.Consumer end)
{_, pid, _, _} = child
pid
end
end