lib/exampple/router/task/monitor.ex

defmodule Exampple.Router.Task.Monitor do
  @moduledoc """
  The monitor starts a task to attend the incoming request,
  when the task is launched a timer is set. The timer is cancelled
  when the task is terminated. If the task crashes, the monitor
  receives a message and replies with an error. In case of timeout
  the request is returning a timeout and the task is terminated.

  In addition to the logs regarding the stanzas we have the following information to be gathered by telemetry:

  - `[:xmpp, :request, :success]`
  - `[:xmpp, :request, :failure]`
  - `[:xmpp, :request, :timeout]`

  All of them register `duration` in milliseconds so, you can get
  the maximum, minimum, average, percentile and more statistics from
  the duration of the stanzas inside of the system based on if they
  are correct (success), wrong (failure) or was not attended (timeout).
  """
  use GenServer, restart: :temporary
  require Logger

  alias Exampple.Component
  alias Exampple.Router.Conn
  alias Exampple.Router.Task, as: RouterTask
  alias Exampple.Xmpp
  alias Exampple.Xmpp.Stanza

  @syntax_colors [
    number: :yellow,
    atom: :cyan,
    string: :green,
    boolean: :magenta,
    nil: :magenta
  ]

  @format [
    pretty: true,
    structs: true,
    syntax_colors: @syntax_colors
  ]

  @metric_prefix [:xmpp, :request]

  defmodule Data do
    @moduledoc false
    defstruct ~w[
      xmlel
      domain
      otp_app
      task_pid
      timer_ref
      timeout
    ]a
  end

  @doc """
  Starts the monitor as a server passing the stanza in `xmlel` format,
  the XMPP `domain` for the component and the name of the application
  (`otp_app`) and the `timeout`, all of those parameters as a `list`.

  The `timeout` is needed to know where we have to terminate the task and
  annotate this kind of failure.
  """
  def start_link([xmlel, domain, otp_app, timeout]) do
    GenServer.start_link(__MODULE__, [xmlel, domain, otp_app, timeout])
  end

  @doc false
  @impl GenServer
  def init([xmlel, domain, otp_app, timeout]) do
    Logger.debug("init monitor: #{inspect(xmlel)}")
    {:ok, pid} = RouterTask.start(xmlel, domain, otp_app)
    _monitor_ref = Process.monitor(pid)
    timer_ref = Process.send_after(self(), :timeout, timeout)

    {:ok,
     %Data{
       timeout: timeout,
       xmlel: xmlel,
       domain: domain,
       otp_app: otp_app,
       task_pid: pid,
       timer_ref: timer_ref
     }}
  end

  @doc false
  @impl GenServer
  def handle_info({:DOWN, _ref, :process, pid, reason}, %Data{task_pid: pid} = state)
      when reason in [:normal, :noproc] do
    sucess(state)
    {:stop, :normal, state}
  end

  def handle_info({:DOWN, _ref, :process, pid, reason}, %Data{task_pid: pid} = state) do
    failure(state, reason)
    {:stop, :normal, state}
  end

  def handle_info(:timeout, state) do
    timeout(state)
    {:stop, :etimedout, state}
  end

  defp prepare_logger(%Data{xmlel: xmlel, domain: domain}, diff_time) do
    conn = Conn.new(xmlel, domain)

    Logger.metadata(
      stanza_id: conn.id,
      type: conn.type,
      stanza_type: conn.stanza_type,
      ellapsed_time: diff_time,
      xmlns: conn.xmlns,
      from_jid: to_string(conn.from_jid),
      to_jid: to_string(conn.to_jid)
    )

    conn
  end

  defp sucess(state) do
    {diff_time_txt, diff_time_ms} = diff_time(state)
    conn = prepare_logger(state, diff_time_txt)

    Logger.info("success", @format)

    :telemetry.execute(
      @metric_prefix ++ [:success],
      %{duration: diff_time_ms},
      %{request_ns: conn.xmlns}
    )
  end

  defp failure(state, reason) do
    {diff_time_txt, diff_time_ms} = diff_time(state)
    conn = prepare_logger(state, diff_time_txt)
    Logger.error("error: #{inspect(reason)}", @format)

    :telemetry.execute(
      @metric_prefix ++ [:failure],
      %{duration: diff_time_ms},
      %{request_ns: conn.xmlns}
    )

    case reason do
      {%Xmpp.Error{} = error, _stacktrace} ->
        conn
        |> Stanza.error({error.message, error.lang, error.reason})
        |> Component.send()

      _ ->
        conn
        |> Stanza.error({"internal-server-error", "en", "An error happened"})
        |> Component.send()
    end
  end

  defp timeout(%Data{task_pid: task_pid, timeout: timeout} = state) do
    RouterTask.stop(task_pid)
    msecs = human_readable(timeout)
    conn = prepare_logger(state, msecs)
    Logger.error("error timeout", @format)

    :telemetry.execute(
      @metric_prefix ++ [:timeout],
      %{duration: timeout},
      %{request_ns: conn.xmlns}
    )

    Stanza.error(
      conn,
      {"remote-server-timeout", "en", "silent error or too much time to process the request"}
    )
  end

  defp human_readable(msecs) when msecs >= 1_000 do
    secs = div(msecs, 1_000)

    msecs =
      msecs
      |> rem(1_000)
      |> to_string()
      |> String.pad_leading(3, "0")

    "#{secs}.#{msecs}s"
  end

  defp human_readable(msecs), do: "#{msecs}ms"

  # returns {human_readable_time, microseconds}
  defp diff_time(%Data{timer_ref: timer_ref, timeout: timeout}) do
    msecs = timeout - Process.cancel_timer(timer_ref)
    {human_readable(msecs), msecs * 1_000}
  end
end