lib/mongo/streaming_hello_monitor.ex

defmodule Mongo.StreamingHelloMonitor do
  @moduledoc """
  See https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#streaming-protocol

  The streaming protocol is used to monitor MongoDB 4.4+ servers and optimally reduces the time it takes for a client to discover server state changes.
  Multi-threaded or asynchronous drivers MUST use the streaming protocol when connected to a server that supports the awaitable hello or legacy hello commands.
  This protocol requires an extra thread and an extra socket for each monitor to perform RTT calculations.

  This module implements the streaming protocol. The GenServer is started and maintained by the Monitor process. The streaming hello monitor uses the
  more to come flag while updating the current server description.
  """
  require Logger

  use GenServer
  import Bitwise

  alias Mongo.Events.ServerHeartbeatFailedEvent
  alias Mongo.Events.ServerHeartbeatStartedEvent
  alias Mongo.Events.ServerHeartbeatSucceededEvent
  alias Mongo.ServerDescription
  alias Mongo.StreamingHelloMonitor
  alias Mongo.Topology

  @more_to_come_mask 0x2

  def start_link(args) do
    GenServer.start_link(StreamingHelloMonitor, args)
  end

  @doc """
  Initialize the monitor process
  """
  def init([topology_pid, address, opts]) do
    heartbeat_frequency_ms = 10_000

    opts =
      opts
      |> Keyword.drop([:after_connect])
      |> Keyword.put(:after_connect, {StreamingHelloMonitor, :connected, [self()]})
      |> Keyword.put(:connection_type, :stream_monitor)

    ## debug info("Starting stream hello monitor with options #{inspect(opts, pretty: true)}")

    with {:ok, pid} <- DBConnection.start_link(Mongo.MongoDBConnection, opts) do
      {:ok,
       %{
         ## our connection pid to the mongodb server
         connection_pid: pid,
         ## the topology_pid to which we report
         topology_pid: topology_pid,
         ## the address of the server, needed to make updates
         address: address,
         ## current heartbeat_frequency_ms
         heartbeat_frequency_ms: heartbeat_frequency_ms,
         max_await_time_ms: heartbeat_frequency_ms,
         more_to_come: false,
         topology_version: nil,
         ## options
         opts: opts
       }}
    end
  end

  @doc """
  In this case we stop the DBConnection.
  """
  def terminate(reason, %{connection_pid: connection_pid}) do
    ## debug info("Terminating streaming hello monitor for reason #{inspect reason}")
    GenServer.stop(connection_pid, reason)
  end

  @doc """
  Report the connection event, so the topology process can now create the connection pool.
  """
  def connected(_connection, me) do
    GenServer.cast(me, :update)
  end

  @doc """
  Time to update the topology. Calling `hello` and updating the server description
  """
  def handle_cast(:update, state) do
    handle_info(:update, state)
  end

  def handle_info({:EXIT, _pid, reason}, state) do
    ## debug Logger.warn("Stopped with reason #{inspect reason}")
    {:stop, reason, state}
  end

  def handle_info(:update, state) do
    state = update_server_description(state)

    case state.more_to_come do
      true ->
        send(self(), :update)

      false ->
        :noop
    end

    {:noreply, state}
  end

  ##
  # Get a new server description from the server and send it to the Topology process.
  #
  defp update_server_description(%{topology_pid: topology_pid} = state) do
    with {topology_version, flags, server_description} <- get_server_description(state) do
      ## debug info("Updating server description")
      Topology.update_server_description(topology_pid, server_description)
      state = %{state | topology_version: topology_version}

      case flags &&& @more_to_come_mask do
        @more_to_come_mask ->
          ## debug info("More to come...")
          %{state | more_to_come: true}

        _other ->
          Process.send_after(self(), :update, state.heartbeat_frequency_ms)
          %{state | more_to_come: false}
      end
    end
  end

  # see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#network-error-when-calling-ismaster
  ##
  # Calls hello command and parses the result to update the server description
  #
  defp get_server_description(%{connection_pid: conn_pid, address: address, topology_version: topology_version, max_await_time_ms: max_await_time_ms, opts: opts} = state) do
    Mongo.Events.notify(%ServerHeartbeatStartedEvent{connection_pid: conn_pid})

    opts = Keyword.merge(opts, timeout: max_await_time_ms * 2)

    {duration, result} =
      case state do
        %{more_to_come: true} ->
          :timer.tc(fn -> Mongo.exec_more_to_come(conn_pid, opts) end)

        _other ->
          opts = Keyword.put(opts, :max_await_time_ms, max_await_time_ms)
          :timer.tc(fn -> hello_command(conn_pid, topology_version, opts) end)
      end

    case result do
      {:ok, {flags, hello_doc}} ->
        server_description =
          hello_doc
          |> ServerDescription.parse_hello_response()
          |> Map.put(:address, address)
          |> Map.put(:last_update_time, DateTime.utc_now())
          |> Map.put(:error, nil)

        notify_success(duration, hello_doc, conn_pid)
        {hello_doc["topologyVersion"], flags, server_description}

      {:error, error} ->
        notify_error(duration, error, conn_pid)

        server_description =
          ServerDescription.new()
          |> Map.put(:address, address)
          |> Map.put(:error, error)

        {nil, 0x0, server_description}
    end
  end

  defp hello_command(conn_pid, %{"counter" => counter, "processId" => process_id}, opts) do
    max_await_time_ms = Keyword.get(opts, :max_await_time_ms, 10_000)

    opts =
      opts
      |> Keyword.merge(flags: [:exhaust_allowed])
      |> Keyword.merge(timeout: max_await_time_ms * 2)

    cmd = [
      maxAwaitTimeMS: max_await_time_ms,
      topologyVersion: %{
        counter: %BSON.LongNumber{value: counter},
        processId: process_id
      }
    ]

    Mongo.exec_hello(conn_pid, cmd, opts)
  end

  defp hello_command(conn_pid, _topology_version, opts) do
    Mongo.exec_hello(conn_pid, opts)
  end

  defp notify_error(duration, error, conn_pid) do
    Mongo.Events.notify(%ServerHeartbeatFailedEvent{duration: duration, failure: error, connection_pid: conn_pid})
  end

  defp notify_success(duration, reply, conn_pid) do
    Mongo.Events.notify(%ServerHeartbeatSucceededEvent{duration: duration, reply: reply, connection_pid: conn_pid})
  end

  def info(message) do
    Logger.info(IO.ANSI.format([:blue, :bright, message]))
  end
end