lib/nsq/producer.ex

defmodule NSQ.Producer do
  @moduledoc """
  A producer is a process that connects to one or many NSQDs and publishes
  messages.

  ## Interface

  To initialize:

      {:ok, producer} = NSQ.Producer.Supervisor.start_link("the-default-topic", %NSQ.Config{
        nsqds: ["127.0.0.1:6750"]
      })

  The default `topic` argument is _required_, even if you plan on explicitly
  publishing to a different topic. If you don't plan on using, you can set it
  to something like `_default_topic_`.

  If you provide more than one nsqd, each `pub`/`mpub` will choose one
  randomly.

  Note that, unlike consumers, producers _cannot_ be configured to use
  discovery with nsqlookupd. This is because discovery requires a topic and
  channel, and an nsqd will only appear in nsqlookupd if it has already
  published messages on that topic. So there's a chicken-and-egg problem. The
  recommended solution is to run NSQD on the same box where you're publishing,
  so your address is always 127.0.0.1 with a static port.

  ### pub

  Publish a single message to NSQD.

      NSQ.Producer.pub(producer, "a message")
      NSQ.Producer.pub(producer, "different-topic", "a message")

  ### mpub

  Publish a bunch of messages to NSQD atomically.

      NSQ.Producer.mpub(producer, ["one", "two"])
      NSQ.Producer.mpub(producer, "different-topic", ["one", "two"])
  """

  # ------------------------------------------------------- #
  # Directives                                              #
  # ------------------------------------------------------- #
  import NSQ.Protocol
  use GenServer

  # ------------------------------------------------------- #
  # Module Attributes                                       #
  # ------------------------------------------------------- #
  @initial_state %{
    topic: nil,
    channel: nil,
    conn_sup_pid: nil,
    event_manager_pid: nil,
    config: nil,
    conn_info_pid: nil
  }

  # ------------------------------------------------------- #
  # Type Definitions                                        #
  # ------------------------------------------------------- #
  @typedoc """
  A tuple with a host and a port.
  """
  @type host_with_port :: {String.t(), integer}

  @typedoc """
  A tuple with a string ID (used to target the connection in
  NSQ.Connection.Supervisor) and a PID of the connection.
  """
  @type connection :: {String.t(), pid}

  @typedoc """
  A map, but we can be more specific by asserting some entries that should be
  set for a connection's state map.
  """
  @type pro_state :: %{conn_sup_pid: pid, config: NSQ.Config.t()}

  # ------------------------------------------------------- #
  # Behaviour Implementation                                #
  # ------------------------------------------------------- #
  @spec init(pro_state) :: {:ok, pro_state}
  def init(pro_state) do
    {:ok, conn_sup_pid} = NSQ.Connection.Supervisor.start_link()
    pro_state = %{pro_state | conn_sup_pid: conn_sup_pid}

    {:ok, conn_info_pid} = Agent.start_link(fn -> %{} end)
    pro_state = %{pro_state | conn_info_pid: conn_info_pid}

    manager =
      if pro_state.config.event_manager do
        pro_state.config.event_manager
      else
        {:ok, manager} = :gen_event.start_link()
        manager
      end

    pro_state = %{pro_state | event_manager_pid: manager}

    {:ok, _pro_state} = connect_to_nsqds(pro_state.config.nsqds, self(), pro_state)
  end

  @spec handle_call({:pub, binary}, any, pro_state) ::
          {:reply, {:ok, binary}, pro_state}
  def handle_call({:pub, data}, _from, pro_state) do
    do_pub(pro_state.topic, data, pro_state)
  end

  @spec handle_call({:pub, binary, binary}, any, pro_state) ::
          {:reply, {:ok, binary}, pro_state}
  def handle_call({:pub, topic, data}, _from, pro_state) do
    do_pub(topic, data, pro_state)
  end

  @spec handle_call({:mpub, binary}, any, pro_state) ::
          {:reply, {:ok, binary}, pro_state}
  def handle_call({:mpub, data}, _from, pro_state) do
    do_mpub(pro_state.topic, data, pro_state)
  end

  @spec handle_call({:mpub, binary, binary}, any, pro_state) ::
          {:reply, {:ok, binary}, pro_state}
  def handle_call({:mpub, topic, data}, _from, pro_state) do
    do_mpub(topic, data, pro_state)
  end

  @spec handle_call(:state, any, pro_state) :: {:reply, pro_state, pro_state}
  def handle_call(:state, _from, state) do
    {:reply, state, state}
  end

  # ------------------------------------------------------- #
  # API Definitions                                         #
  # ------------------------------------------------------- #
  @spec start_link({binary(), NSQ.Config.t()}) :: {:ok, pid}
  def start_link({topic, config}), do: start_link({topic, config, []})

  @spec start_link({binary, NSQ.Config.t(), GenServer.options()}) :: {:ok, pid}
  def start_link({topic, config, genserver_options}) do
    {:ok, config} = NSQ.Config.validate(config || %NSQ.Config{})
    {:ok, config} = NSQ.Config.normalize(config)
    unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}")
    state = %{@initial_state | topic: topic, config: config}
    GenServer.start_link(__MODULE__, state, genserver_options)
  end

  @spec get_connections(pro_state) :: [connection]
  def get_connections(pro_state) when is_map(pro_state) do
    children = Supervisor.which_children(pro_state.conn_sup_pid)
    Enum.map(children, fn {child_id, pid, _, _} -> {child_id, pid} end)
  end

  @spec get_connections(pid, pro_state) :: [connection]
  def get_connections(pro, pro_state \\ nil) when is_pid(pro) do
    pro_state = pro_state || get_state(pro)
    get_connections(pro_state)
  end

  @spec random_connection_pid(pro_state) :: pid
  def random_connection_pid(pro_state) do
    {_child_id, pid} = Enum.random(get_connections(pro_state))
    pid
  end

  @doc """
  Create supervised connections to NSQD.
  """
  @spec connect_to_nsqds([host_with_port], pid, pro_state) :: {:ok, pro_state}
  def connect_to_nsqds(nsqds, pro, pro_state) do
    Enum.map(nsqds, fn nsqd ->
      {:ok, _conn} =
        NSQ.Connection.Supervisor.start_child(
          pro,
          nsqd,
          pro_state,
          restart: :permanent
        )
    end)

    {:ok, pro_state}
  end

  @doc """
  Get the current state of a producer. Used in tests. Not for external use.
  """
  @spec get_state(pid) :: pro_state
  def get_state(producer) do
    GenServer.call(producer, :state)
  end

  @doc """
  Publish data to whatever topic is the default.
  """
  @spec pub(pid, binary) :: {:ok, binary}
  def pub(sup_pid, data) do
    {:ok, _resp} = GenServer.call(get(sup_pid), {:pub, data})
  end

  @doc """
  Publish data to a specific topic.
  """
  @spec pub(pid, binary, binary) :: {:ok, binary}
  def pub(sup_pid, topic, data) do
    {:ok, _resp} = GenServer.call(get(sup_pid), {:pub, topic, data})
  end

  @doc """
  Publish data to whatever topic is the default.
  """
  @spec mpub(pid, binary) :: {:ok, binary}
  def mpub(sup_pid, data) do
    GenServer.call(get(sup_pid), {:mpub, data})
  end

  @doc """
  Publish data to a specific topic.
  """
  @spec mpub(pid, binary, binary) :: {:ok, binary}
  def mpub(sup_pid, topic, data) do
    {:ok, _resp} = GenServer.call(get(sup_pid), {:mpub, topic, data})
  end

  @doc """
  The end-user will be targeting the supervisor, but it's the producer that
  can actually handle the command.
  """
  @spec get(pid) :: pid
  def get(sup_pid) do
    child =
      Supervisor.which_children(sup_pid)
      |> Enum.find(fn {kind, _, _, _} -> kind == NSQ.Producer end)

    {_, pid, _, _} = child
    pid
  end

  # ------------------------------------------------------- #
  # Private Functions                                       #
  # ------------------------------------------------------- #
  # Used to DRY up handle_call({:pub, ...).
  @spec do_pub(binary, binary, pro_state) :: {:reply, {:ok, binary}, pro_state}
  defp do_pub(topic, data, pro_state) do
    {:ok, resp} =
      random_connection_pid(pro_state)
      |> NSQ.Connection.cmd({:pub, topic, data})

    {:reply, {:ok, resp}, pro_state}
  end

  # Used to DRY up handle_call({:mpub, ...).
  @spec do_mpub(binary, binary, pro_state) :: {:reply, {:ok, binary}, pro_state}
  defp do_mpub(topic, data, pro_state) do
    {:ok, resp} =
      random_connection_pid(pro_state)
      |> NSQ.Connection.cmd({:mpub, topic, data})

    {:reply, {:ok, resp}, pro_state}
  end
end