lib/nsq/connection/command.ex

defmodule NSQ.Connection.Command do
  @moduledoc """
  Both consumers and producers use Connection, which, at a lower level, kicks
  off message processing and issues commands to NSQD. This module handles some
  of the trickier command queueing, flushing, etc.
  """

  alias NSQ.Connection, as: C
  alias NSQ.Connection.Buffer
  alias NSQ.ConnInfo
  import NSQ.Protocol

  def exec(state, cmd, kind, {_, ref} = from) do
    if state.connected do
      state = send_data_and_queue_resp(state, cmd, from, kind)
      state = update_state_from_cmd(cmd, state)
      {{:ok, ref}, state}
    else
      # Not connected currently; add this call onto a queue to be run as soon
      # as we reconnect.
      state = %{state | cmd_queue: :queue.in({cmd, from, kind}, state.cmd_queue)}
      {{:queued, :no_socket}, state}
    end
  end

  @spec send_data_and_queue_resp(C.state(), tuple, {reference, pid}, atom) :: C.state()
  def send_data_and_queue_resp(state, cmd, from, kind) do
    state |> Buffer.send!(encode(cmd))

    if kind == :noresponse do
      state
    else
      %{state | cmd_resp_queue: :queue.in({cmd, from, kind}, state.cmd_resp_queue)}
    end
  end

  @spec send_response_to_caller(C.state(), binary) :: {:ok, C.state()}
  def send_response_to_caller(state, data) do
    :gen_event.notify(state.event_manager_pid, {:response, data})
    {item, cmd_resp_queue} = :queue.out(state.cmd_resp_queue)

    case item do
      {:value, {_cmd, {pid, ref}, :reply}} ->
        send(pid, {ref, data})

      :empty ->
        :ok
    end

    {:ok, %{state | cmd_resp_queue: cmd_resp_queue}}
  end

  @spec flush_cmd_queue(C.state()) :: C.state()
  def flush_cmd_queue(state) do
    {item, new_queue} = :queue.out(state.cmd_queue)

    case item do
      {:value, {cmd, from, kind}} ->
        state = send_data_and_queue_resp(state, cmd, from, kind)
        flush_cmd_queue(%{state | cmd_queue: new_queue})

      :empty ->
        {:ok, %{state | cmd_queue: new_queue}}
    end
  end

  def flush_cmd_queue!(state) do
    {:ok, state} = flush_cmd_queue(state)
    state
  end

  @spec update_state_from_cmd(tuple, C.state()) :: C.state()
  def update_state_from_cmd(cmd, state) do
    case cmd do
      {:rdy, count} ->
        ConnInfo.update(state, %{rdy_count: count, last_rdy: count})
        state

      _any ->
        state
    end
  end
end