lib/mongo/monitor.ex

defmodule Mongo.Monitor do
  @moduledoc """
  Each server has a monitor process. The monitor process is created by the topology process.

  If the network connection is working, then the monitor process reports this and the topology process starts the
  connection pool. Per server we get 1 + pool size connections to each server.

  After waiting for `heartbeat_frequency_ms` milliseconds, the monitor process calls `hello` command and
  reports the result to the topology process.

  The result of the hello command is mapped the `ServerDescription` structure and sent to the topology process, which
  updates it internal data structure.

  see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#monitoring
  """
  require Logger

  use GenServer

  alias Mongo.Events.ServerHeartbeatFailedEvent
  alias Mongo.Events.ServerHeartbeatStartedEvent
  alias Mongo.Events.ServerHeartbeatSucceededEvent
  alias Mongo.Monitor
  alias Mongo.ServerDescription
  alias Mongo.StreamingHelloMonitor
  alias Mongo.Topology

  @monitor_modes [
    :polling_mode,
    :streaming_mode
  ]

  @min_wire_version_streaming_protocol 9

  # this is not configurable because the specification says so
  # see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#minheartbeatfrequencyms
  # not used @min_heartbeat_frequency_ms 500

  def start_link(args) do
    GenServer.start_link(Monitor, args)
  end

  @doc """
  Try to update the server description.
  """
  def force_check(pid) do
    GenServer.cast(pid, :update)
  end

  def stop_streaming_mode(pid) do
    GenServer.cast(pid, :stop_streaming_mode)
  end

  def set_heartbeat_frequency_ms(pid, heartbeat_frequency_ms) do
    GenServer.cast(pid, {:update, heartbeat_frequency_ms})
  end

  def get_state(pid) do
    GenServer.call(pid, :get_state)
  end

  @doc """
  Initialize the monitor process
  """
  def init([address, topology_pid, heartbeat_frequency_ms, connection_opts]) do
    ## Logger.info("Starting monitor process with pid #{inspect(self())}, #{inspect(address)}")

    # monitors don't authenticate and use the "admin" database
    opts =
      connection_opts
      |> Keyword.put(:database, "admin")
      |> Keyword.put(:skip_auth, true)
      |> Keyword.put(:after_connect, {Monitor, :connected, [self(), topology_pid]})
      |> Keyword.put(:backoff_min, 500)
      |> Keyword.put(:backoff_max, 1_000)
      |> Keyword.put(:connection_type, :monitor)
      |> Keyword.put(:topology_pid, topology_pid)
      |> Keyword.put(:pool_size, 1)
      |> Keyword.put(:idle_interval, 5_000)
      |> Keyword.put(:server_pid, self())

    with {:ok, pid} <- DBConnection.start_link(Mongo.MongoDBConnection, opts) do
      {:ok,
       %{
         ## we are starting with the polling mode
         mode: :polling_mode,
         ## our connection pid to the mongodb server
         connection_pid: pid,
         ## the topology_pid to which we report
         topology_pid: topology_pid,
         ## the address of the server, needed to make updates
         address: address,
         ## current round_trip_time, needed to make average value
         round_trip_time: nil,
         ## current heartbeat_frequency_ms
         heartbeat_frequency_ms: heartbeat_frequency_ms,
         ## options
         opts: opts,
         streaming_pid: nil
       }}
    end
  end

  @doc """
  In case of terminating we stop our linked processes as well:
  * connection
  * streaming process
  """
  def terminate(reason, %{connection_pid: connection_pid, streaming_pid: nil}) do
    ## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}")
    GenServer.stop(connection_pid, reason)
  end

  def terminate(reason, %{connection_pid: connection_pid, streaming_pid: streaming_pid}) do
    ## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}, #{inspect(streaming_pid)}")
    GenServer.stop(connection_pid, reason)
    GenServer.stop(streaming_pid, reason)
  end

  @doc """
  Report the connection event, so the topology process can now create the connection pool.
  """
  def connected(_connection, me, topology_pid) do
    ## Logger.info("Monitor #{inspect(me)} connected to server! ")
    Topology.monitor_connected(topology_pid, me)
    GenServer.cast(me, :update)
  end

  def handle_call(:get_state, _from, state) do
    {:reply, Map.put(state, :pid, self()), state}
  end

  def handle_cast(:stop_streaming_mode, %{streaming_pid: streaming_pid} = state) when streaming_pid != nil do
    spawn(fn -> GenServer.stop(streaming_pid) end)
    {:noreply, %{state | mode: :polling_mode, streaming_pid: nil}}
  end

  def handle_cast(:stop_streaming_mode, state) do
    {:noreply, %{state | mode: :polling_mode}}
  end

  ##
  # Update the server description or the rrt value
  ##
  def handle_cast(:update, state) do
    handle_info(:update, state)
  end

  ##
  # Update the server description or the rrt value and set new heartbeat value
  ##
  def handle_cast({:update, heartbeat_frequency_ms}, state) do
    new_state =
      state
      |> update_server_description()
      |> Map.put(:heartbeat_frequency_ms, heartbeat_frequency_ms)

    {:noreply, new_state}
  end

  ##
  # Updates the server description or the rrt value
  ##
  def handle_info(:update, state) do
    new_state = update_server_description(state)

    ## debug info("Calling update: #{inspect new_state.address}, #{inspect new_state.heartbeat_frequency_ms}")
    Process.send_after(self(), :update, new_state.heartbeat_frequency_ms)

    {:noreply, new_state}
  end

  ##
  # Polling mode: get a new server description from the server and new round_trip_time value
  # and send it to the topology process. If possible start the streaming mode.
  ##
  defp update_server_description(%{connection_pid: conn_pid, topology_pid: topology_pid, mode: :polling_mode} = state) do
    case get_server_description(state) do
      %{round_trip_time: round_trip_time, max_wire_version: max_wire_version} = server_description ->
        ## debug info("Updating server description: #{inspect(server_description, pretty: true)}")

        Mongo.Events.notify(%ServerHeartbeatStartedEvent{connection_pid: conn_pid})
        Topology.update_server_description(topology_pid, server_description)
        state = %{state | round_trip_time: round_trip_time}

        case max_wire_version >= @min_wire_version_streaming_protocol do
          true ->
            start_streaming_mode(state, server_description)

          false ->
            state
        end

      error ->
        warning("Unable to update server description because of #{inspect(error)}")
        state
    end
  end

  ##
  # Get a new server description from the server and send it to the Topology process.
  # * Monitors MUST use a dedicated connection for RTT commands
  # * Monitors MUST use the hello or legacy hello command to measure RTT
  # * Why don't clients mark a server unknown when an RTT command fails?
  ##
  defp update_server_description(%{topology_pid: topology_pid, address: address, mode: :streaming_mode} = state) do
    case get_server_description(state) do
      %{round_trip_time: round_trip_time} ->
        ## debug info("Updating round_trip_time: #{inspect round_trip_time}")
        Topology.update_rrt(topology_pid, address, round_trip_time)

        %{state | round_trip_time: round_trip_time}

      error ->
        warning("Unable to round trip time because of #{inspect(error)}")
        state
    end
  end

  ##
  # Starts the streaming mode
  ##
  defp start_streaming_mode(%{address: address, topology_pid: topology_pid, opts: opts} = state, _server_description) do
    args = [self(), topology_pid, address, opts]

    case StreamingHelloMonitor.start_link(args) do
      {:ok, pid} ->
        ## Logger.debug("Starting streaming mode: #{inspect(pid)}")
        %{state | mode: :streaming_mode, streaming_pid: pid, heartbeat_frequency_ms: 10_000}

      error ->
        warning("Unable to start the streaming hello monitor, because of #{inspect(error)}")
        state
    end
  end

  ##
  # Streaming mode: calls hello command and updated the round trip time for the command.
  ##
  defp get_server_description(%{connection_pid: conn_pid, round_trip_time: last_rtt, mode: :streaming_mode, opts: opts}) do
    {rtt, response} = :timer.tc(fn -> Mongo.exec_hello(conn_pid, opts) end)

    case response do
      {:ok, {_flags, _hello_doc}} ->
        %{round_trip_time: average_rtt(last_rtt, div(rtt, 1000))}

      error ->
        error
    end
  end

  ##
  # Polling mode: updating the server description and the round trip time together
  ##
  defp get_server_description(%{connection_pid: conn_pid, address: address, round_trip_time: last_rtt, opts: opts}) do
    {rtt, response} = :timer.tc(fn -> Mongo.exec_hello(conn_pid, opts) end)

    case response do
      {:ok, {_flags, hello_doc}} ->
        notify_success(rtt, hello_doc, conn_pid)

        hello_doc
        |> ServerDescription.parse_hello_response()
        |> Map.put(:round_trip_time, average_rtt(last_rtt, div(rtt, 1000)))
        |> Map.put(:address, address)
        |> Map.put(:last_update_time, DateTime.utc_now())
        |> Map.put(:error, nil)

      {:error, error} ->
        notify_error(rtt, error, conn_pid)

        ServerDescription.new()
        |> Map.put(:address, address)
        |> Map.put(:error, error)
    end
  end

  defp average_rtt(nil, rtt) do
    round(rtt)
  end

  defp average_rtt(last_rtt, rtt) do
    round(0.2 * rtt + 0.8 * last_rtt)
  end

  defp notify_error(rtt, error, conn_pid) do
    Mongo.Events.notify(%ServerHeartbeatFailedEvent{duration: rtt, failure: error, connection_pid: conn_pid})
  end

  defp notify_success(rtt, reply, conn_pid) do
    Mongo.Events.notify(%ServerHeartbeatSucceededEvent{duration: rtt, reply: reply, connection_pid: conn_pid})
  end

  @doc """
  Returns the possible modes of the monitor process.
  """
  def modes() do
    @monitor_modes
  end

  def info(message) do
    Logger.info(IO.ANSI.format([:light_magenta, :bright, message]))
  end

  if macro_exported?(Logger, :warning, 2) do
    defp warning(message) do
      Logger.warning(message)
    end
  else
    defp warning(message) do
      Logger.warn(message)
    end
  end
end