lib/nsq/conn_info.ex

defmodule NSQ.ConnInfo do
  @doc """
  Given a consumer state object and an nsqd host/port tuple, return the
  connection ID.
  """
  def conn_id(parent, {host, port} = _nsqd) do
    "parent:#{inspect(parent)}:conn:#{host}:#{port}"
  end

  @doc """
  Given a `conn` object return by `Consumer.get_connections` or connection state object, return the
  connection ID.
  """
  def conn_id({conn_id, conn_pid} = _conn) when is_pid(conn_pid) do
    conn_id
  end

  def conn_id(%{parent: parent, nsqd: {host, port}} = _conn_state) do
    conn_id(parent, {host, port})
  end

  @doc """
  Get info for all connections in a map like `%{conn_id: %{... data ...}}`.
  """
  def all(agent_pid) when is_pid(agent_pid) do
    Agent.get(agent_pid, fn data -> data end)
  end

  @doc """
  `func` is passed `conn_info` for each connection.
  """
  def reduce(agent_pid, start_acc, func) do
    Agent.get(agent_pid, fn all_conn_info ->
      Enum.reduce(all_conn_info, start_acc, func)
    end)
  end

  @doc """
  Get info for the connection matching `conn_id`.
  """
  def fetch(agent_pid, conn_id) when is_pid(agent_pid) do
    Agent.get(agent_pid, fn data -> data[conn_id] || %{} end)
  end

  def fetch(%{conn_info_pid: agent_pid}, conn_id) do
    fetch(agent_pid, conn_id)
  end

  @doc """
  Get specific data for the connection, e.g.:

      [rdy_count, last_rdy] = fetch(pid, "conn_id", [:rdy_count, :last_rdy])
      rdy_count = fetch(pid, "conn_id", :rdy_count)
  """
  def fetch(agent_pid, conn_id, keys) when is_pid(agent_pid) do
    Agent.get(agent_pid, fn data ->
      conn_map = data[conn_id] || %{}

      if is_list(keys) do
        Enum.map(keys, &Map.get(conn_map, &1))
      else
        Map.get(conn_map, keys)
      end
    end)
  end

  def fetch(%{conn_info_pid: agent_pid} = _state, {conn_id, _conn_pid} = _conn, keys) do
    fetch(agent_pid, conn_id, keys)
  end

  def fetch(%{conn_info_pid: agent_pid} = _state, conn_id, keys) do
    fetch(agent_pid, conn_id, keys)
  end

  @doc """
  Update the info for a specific connection matching `conn_id`.
  If a function is supplied `conn_info`is passed to `func`, and the result of `func` is saved as the new `conn_info`.
  if a map is supplied, the map is merged into the existing conn_info.
  """
  def update(agent_pid, conn_id, func) when is_pid(agent_pid) and is_function(func) do
    Agent.update(agent_pid, fn data ->
      Map.put(data, conn_id, func.(data[conn_id] || %{}))
    end)
  end

  def update(agent_pid, conn_id, map) when is_pid(agent_pid) and is_map(map) do
    Agent.update(agent_pid, fn data ->
      new_conn_data = Map.merge(data[conn_id] || %{}, map)
      Map.put(data, conn_id, new_conn_data)
    end)
  end

  def update(%{conn_info_pid: agent_pid}, conn_id, func) do
    update(agent_pid, conn_id, func)
  end

  def update(%{conn_info_pid: agent_pid, parent: parent, nsqd: nsqd}, func) do
    update(agent_pid, conn_id(parent, nsqd), func)
  end

  @doc """
  Delete connection info matching `conn_id`. This should be called when a
  connection is terminated.
  """
  def delete(agent_pid, conn_id) when is_pid(agent_pid) do
    Agent.update(agent_pid, fn data -> Map.delete(data, conn_id) end)
  end

  def delete(%{conn_info_pid: agent_pid}, conn_id) do
    delete(agent_pid, conn_id)
  end

  @spec init(map) :: any
  def init(state) do
    update(state, %{
      max_rdy: state.max_rdy,
      rdy_count: 0,
      last_rdy: 0,
      messages_in_flight: 0,
      last_msg_timestamp: now(),
      retry_rdy_pid: nil,
      finished_count: 0,
      requeued_count: 0,
      failed_count: 0,
      backoff_count: 0
    })
  end

  @spec now :: integer
  defp now do
    {megasec, sec, microsec} = :os.timestamp()
    1_000_000 * megasec + sec + microsec / 1_000_000
  end
end