lib/polyn.ex

defmodule Polyn do
  @moduledoc """
  Polyn is a dead simple service framework designed to be language agnostic while
  providing a simple, yet powerful, abstraction layer for building reactive events
  based services.
  """

  use Polyn.Tracing

  alias Polyn.Event
  alias Polyn.Serializers.JSON

  @typedoc """
  Options you can pass to most `Polyn` module functions

  * `:source` - The `source` of the event. By default will be the `domain` combined with the
  `source_root`
  """
  @type polyn_options ::
          {:store_name, binary()}
          | {:source, binary()}

  @typedoc """
  Options for publishing events. See `Gnat.pub/4` for more info

  * `:headers` - Headers to include in the message
  * `:reply_to` - Subject to send a response to
  """
  @type pub_options ::
          polyn_options()
          | {:headers, Gnat.headers()}
          | {:reply_to, binary()}

  @typedoc """
  Options for publishing events. See `Gnat.request/4` for more info

  * `:headers` - Headers to include in the message
  * `:receive_timeout` - How long to wait for a response
  """
  @type req_options ::
          polyn_options()
          | {:headers, Gnat.headers()}
          | {:receive_timeout, non_neg_integer()}

  @doc """
  Publish an event to the message bus. Will validate the data against an existing schema
  added by Polyn CLI.

  ## Options

  * `:source` - The `source` of the event. By default will be the `domain` combined with the
  `source_root`
  * See `Gnat.pub/4` for other options

  ## Examples

      iex>Polyn.pub(:gnat, "user.created.v1", %{name: "Mary"})
      :ok
      iex>Polyn.pub(:gnat, "user.created.v1", %{name: "Mary"}, source: "admin")
      :ok
  """
  @spec pub(conn :: Gnat.t(), event_type :: binary(), data :: any(), opts :: list(pub_options())) ::
          :ok
  def pub(conn, event_type, data, opts \\ []) do
    Polyn.Tracing.publish_span event_type do
      event = build_event(event_type, data, opts)

      json = JSON.serialize!(event, opts)

      Polyn.Tracing.span_attributes(conn: conn, type: event_type, event: event, payload: json)

      opts =
        add_nats_msg_id_header(opts, event)
        |> inject_trace_header()

      nats().pub(conn, event_type, json, remove_polyn_opts(opts))
    end
  end

  @doc """
  Issue a request in a psuedo-synchronous fashion. Requests still require an event be defined in
  the schema store. The event you send and receive will both be validated

  ## Options

  * `:source` - The `source` of the event. By default will be the `domain` combined with the
  `source_root`
  * See `Gnat.request/4` for other options

  ## Examples

      iex>Polyn.request(:gnat, "user.created.v1", %{name: "Mary"})
      {:ok, %{body: %Event{}}}
      iex>Polyn.request(:gnat, "user.created.v1", %{name: "Mary"}, source: "admin")
      {:ok, %{body: %Event{}}}
  """
  @spec request(
          conn :: Gnat.t(),
          event_type :: binary(),
          data :: any(),
          opts :: list(req_options())
        ) :: {:ok, Gnat.message()} | {:error, :timeout}
  def request(conn, event_type, data, opts \\ []) do
    Polyn.Tracing.publish_span event_type do
      event = build_event(event_type, data, opts)

      opts =
        add_nats_msg_id_header(opts, event)
        |> inject_trace_header()

      json = JSON.serialize!(event, opts)

      Polyn.Tracing.span_attributes(conn: conn, type: event_type, event: event, payload: json)

      case nats().request(
             conn,
             event_type,
             json,
             remove_polyn_opts(opts)
           ) do
        {:ok, message} ->
          handle_reponse_success(conn, message, opts)

        error ->
          Polyn.Tracing.record_timeout_exception(event_type, json)
          error
      end
    end
  end

  defp handle_reponse_success(conn, message, opts) do
    # The :reply_to subject is a temporarily generated "inbox"
    # https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/messaging/#span-name
    Polyn.Tracing.subscribe_span "(temporary)", message.headers do
      event = JSON.deserialize!(message.body, opts)

      Polyn.Tracing.span_attributes(
        conn: conn,
        type: "(temporary)",
        event: event,
        payload: message.body
      )

      {:ok, Map.put(message, :body, event)}
    end
  end

  @doc """
  Reply to an event you've subscribed to that included a `reply_to` option.

  ## Options

  * `:source` - The `source` of the event. By default will be the `domain` combined with the
  `source_root`
  * See `Gnat.pub/4` for other options

  ## Examples

      iex>Polyn.reply(:gnat, "INBOX.me", "user.created.v1", %{name: "Mary"})
      :ok
      iex>Polyn.reply(:gnat, "INBOX.me", "user.created.v1", %{name: "Mary"}, source: "admin")
      :ok
  """
  @spec reply(
          conn :: Gnat.t(),
          reply_to :: binary(),
          event_type :: binary(),
          data :: any(),
          opts :: list(pub_options())
        ) ::
          :ok
  def reply(conn, reply_to, event_type, data, opts \\ []) do
    Polyn.Tracing.publish_span "(temporary)" do
      event = build_event(event_type, data, opts)

      json = JSON.serialize!(event, opts)

      Polyn.Tracing.span_attributes(
        conn: conn,
        type: "(temporary)",
        event: event,
        payload: json
      )

      opts =
        add_nats_msg_id_header(opts, event)
        |> inject_trace_header()

      nats().pub(conn, reply_to, json, remove_polyn_opts(opts))
    end
  end

  defp build_event(event_type, data, opts) do
    Event.new(
      type: Event.full_type(event_type),
      data: data,
      specversion: "1.0.1",
      source: Event.full_source(Keyword.get(opts, :source)),
      datacontenttype: "application/json"
    )
  end

  defp remove_polyn_opts(opts) do
    Keyword.drop(opts, [:source, :store_name])
  end

  defp add_nats_msg_id_header(opts, event) do
    # Ensure accidental message duplication doesn't happen
    # https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication
    headers =
      Keyword.get(opts, :headers, [])
      |> Enum.concat([{"Nats-Msg-Id", event.id}])

    Keyword.put(opts, :headers, headers)
  end

  defp inject_trace_header(opts) do
    headers = Polyn.Tracing.add_trace_header(opts[:headers])
    Keyword.put(opts, :headers, headers)
  end

  defp nats do
    if sandbox(), do: Polyn.MockNats, else: Gnat
  end

  defp sandbox do
    Application.get_env(:polyn, :sandbox, false)
  end
end