lib/gnat/services/server.ex

defmodule Gnat.Services.Server do
  require Logger
  alias Gnat.Services.{Service, ServiceResponder}

  @moduledoc """
  A behavior for acting as a NATS service

  Creating a service with this behavior works almost exactly the same as `Gnat.Server`,
  with the bonus that this service keeps track of requests, errors, processing time, and
  participates in service discovery and monitoring as defined by
  the [NATS service protocol](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-32.md).


  ## Example

      defmodule MyApp.Service do
        use Gnat.Services.Server

        # Classic subject matching
        def request(%{body: _body, topic: "myservice.req"}, _, _) do
          {:reply, "handled request"}
        end

        # Can also match on endpoint or group
        def request(msg, "add", "calculator") do
          {:reply, "42"}
        end

        # defining an error handler is optional, the default one will just call Logger.error for you
        def error(%{gnat: gnat, reply_to: reply_to}, _error) do
          Gnat.pub(gnat, reply_to, "Something went wrong and I can't handle your request")
        end
      end
  """

  @doc """
  Called when a message is received from the broker. The endpoint on which the message arrived
  is always supplied. If the endpoint is a member of a group, the group name will also be
  provided.

  Automatically increments the request time and processing time stats for this service.
  """
  @callback request(message::Gnat.message(), endpoint :: String.t(), group :: String.t() | nil) :: :ok | {:reply, iodata()} | {:error, term()}

  @doc """
  Called when an error occured during the `request/1`. Automatically increments the error count
  and processing time stats for this service.

  If your `request/1` function returned `{:error, term}`, then the `term` you returned will be passed as the second argument.
  If an exception was raised during your `request/1` function, then the exception will be passed as the second argument.
  If your `request/1` function returned something other than the supported return types, then its return value will be passed as the second argument.
  """
  @callback error(message::Gnat.message(), error::term()) :: :ok | {:reply, iodata()}

  defmacro __using__(_opts) do
    quote do
      @behaviour Gnat.Services.Server

      def error(_message, error) do
        require Logger
        Logger.error(
          "Gnat.Server encountered an error while handling a request: #{inspect(error)}",
          type: :gnat_server_error,
          error: error
        )
      end

      defoverridable error: 2
    end
  end


  @typedoc """
  Service configuration is provided as part of the consumer supervisor settings in the `service_definition` field.
  You can specify _either_ the `subscription_topics` field for a regluar server or the `service_definition` field
  for a new NATS service.

  * `name` - The name of the service. Needs to conform to the rules for NATS service names
  * `version` - A required version number (w/out "v" prefix) conforming to semver rules
  * `queue_group` - An optional queue group for service subscriptions. If left off, "q" will be used.
  * `description` - An optional description of the service
  * `metadata` - An optional string->string map of service metadata
  * `endpoints` - A required list of service endpoints. All services must have at least one endpoint
  """
  @type service_configuration :: %{
    required(:name) =>         binary(),
    required(:version) =>      binary(),
    required(:endpoints) =>    [endpoint_configuration()],
    optional(:description) => binary(),
    optional(:metadata) =>     map(),
  }

  @typedoc """
  Each service configuration must contain at least one endpoint. Endpoints can manually specify their
  subscription subjects or they can be derived from the endpoint name.

  * `subject` - A specific subject for this endpoint to listen on. If this is not provided, then the endpoint name will be used.
  * `name` - The required name of the endpoint
  * `group_name` - An optional group to which this endpoint belongs
  * `queue_group` - A queue group for this endpoint's subscription. If not supplied, "q" will be used (indicated by protocol spec).
  * `metadata` - An optional string->string map containing metadata for this endpoint
  """
  @type endpoint_configuration :: %{
    required(:name) => binary(),
    optional(:subject) => binary(),
    optional(:group_name) => binary(),
    optional(:queue_group) => binary(),
    optional(:metadata) => map()
  }


  @doc false
  def execute(_module, %{topic: "$SRV" <> _} = message, service) do
    ServiceResponder.maybe_respond(message, service)
  end

  def execute(module, message, service) do
    try do
      endpoint = Map.get(service.subjects, message.topic)
      %{group_name: group_name, name: endpoint_name} = endpoint
      telemetry_tags = %{topic: message.topic, endpoint: endpoint_name, group: group_name}

      case :timer.tc(fn -> apply(module, :request, [message, endpoint_name, group_name]) end) do
        {_elapsed, :ok} -> :done
        {elapsed_micros, {:reply, data}} ->
          send_reply(message, data)
          :telemetry.execute([:gnat, :service_request], %{latency: elapsed_micros}, telemetry_tags)
          Service.record_request(endpoint, elapsed_micros)

        {elapsed_micros, {:error, error}} ->
          execute_error(module, message, error)
          :telemetry.execute([:gnat, :service_error], %{latency: elapsed_micros}, telemetry_tags)
          Service.record_error(endpoint, elapsed_micros)

        other -> execute_error(module, message, other)
      end

    rescue e ->
      execute_error(module, message, e)
    end
  end

  @doc false
  defp execute_error(module, message, error) do
    try do
      case apply(module, :error, [message, error]) do
        :ok -> :done
        {:reply, data} -> send_reply(message, data)
        other ->
          Logger.error(
            "error handler for #{module} returned something unexpected: #{inspect(other)}",
            type: :gnat_server_error
          )
      end

    rescue e ->
      Logger.error(
        "error handler for #{module} encountered an error: #{inspect(e)}",
        type: :gnat_server_error
      )
    end
  end

  @doc false
  def send_reply(%{gnat: gnat, reply_to: return_address}, iodata) when is_binary(return_address) do
    Gnat.pub(gnat, return_address, iodata)
  end
  def send_reply(_other, _iodata) do
    Logger.error(
      "Could not send reply because no reply_to was provided with the original message",
      type: :gnat_server_error
    )
  end
end