lib/solvent.ex

defmodule Solvent do
  use TelemetryRegistry

  telemetry_event %{
    event: [:solvent, :event, :published],
    description: "Emitted when an event is published",
    measurements: "%{}",
    metadata: "%{event_source: String.t(), event_id: String.t(), event_type: String.t(), subscriber_count: non_neg_integer()}"
  }

  telemetry_event %{
    event: [:solvent, :subscriber, :processing, :start],
    description: "Emitted when a subscriber begins processing an event",
    measurements: "%{}",
    metadata: "%{subscriber_id: String.t(), event_source: String.t(), event_id: String.t(), event_type: String.t()}"
  }

  telemetry_event %{
    event: [:solvent, :subscriber, :processing, :stop],
    description: "Emitted when a subscriber finishes processing an event",
    measurements: "%{duration: non_neg_integer()}",
    metadata: "%{subscriber_id: String.t(), event_source: String.t(), event_id: String.t(), event_type: String.t()}"
  }

  telemetry_event %{
    event: [:solvent, :subscriber, :subscribing, :start],
    description: "Emitted when a subscriber begins subscribing to the event stream",
    measurements: "%{}",
    metadata: "%{subscriber_id: String.t(), filter: String.t()}"
  }

  telemetry_event %{
    event: [:solvent, :subscriber, :subscribing, :stop],
    description: "Emitted when a subscriber is finished subscribing to the event stream",
    measurements: "%{duration: non_neg_integer()}",
    metadata: "%{subscriber_id: String.t(), filter: String.t()}"
  }

  @moduledoc """
  Solvent is an event bus built to be fast and easy-to-use,
  and takes a lot of inspiration from the [CloudEvents](https://cloudevents.io) spec for the best interoperability with other event systems.

  In the CloudEvents specification, every event is required to have an ID, a source, and a type.
  The `source` field identifies the system that sent the event, and the ID must identify the event and be unique in the scope of the source.
  Lastly, the `type` field, also called a "topic," identifies the kind of event that took place.

  In Solvent, only the `type` field is required, but it is strongly recommended to provide a `source` field as well.
  Solvent generates a version 7 UUID for event ID fields and this rarely needs to be overridden.
  See the `Solvent.Event` docs for details on what defaults are provided.

  Subscribe to the event stream with `subscribe/2`, and publish events using `publish/2`.
  You can also create a `Solvent.Subscription` or a `Solvent.Event` yourself and pass those to `subscribe/1` and `publish/1`, respectively.
  The functions here share a signature with their corresponding struct functions, but have the additional benefit of interacting with the event bus.

  ## Telemetry

  #{telemetry_docs()}

  """

  alias Solvent.Subscription

  require Logger

  @doc """
  Subscribe to the event stream with a pre-made `Solvent.Subscription` struct.
  """
  def subscribe(%Subscription{} = sub) do
    :telemetry.span(
      [:solvent, :subscriber, :subscribing],
      %{subscriber_id: sub.id, filter: sub.filters},
      fn ->
        :ok = Solvent.SubscriberStore.insert(sub)
        {:ok, %{}}
      end
    )

    {:ok, sub.id}
  end

  @doc """
  Subscribe to the event stream.

  The sink is what receives your events, and can be anything that implements the `Solvent.Sink` protocol,
  which includes anonymous functions, module-function-args tuples, and PIDs.
  Here's an example with a module-function-args tuple that subscribes to a single event type
  and identifies the subscription as `"My first subscriber"`.

      iex> Solvent.subscribe(
      ...>  {Solvent.MessengerHandler, :handle_event, []},
      ...>  types: ["com.example.event.published"],
      ...>  id: "My first subscriber"
      ...> )
      {:ok, "My first subscriber"}

  This function shares its signature with `Solvent.Subscription.new/2`,
  and creates a `Solvent.Subscription` in the same way,
  while also inserting the subscription into the `Solvent.SubscriberStore` so that the sink will begin to receive events.

  Sinks are given an event _identifier_, and _not_ the event itself.
  Your sink must fetch the full event from the `Solvent.EventStore`, so that extra data is not copied between processes.
  You must also call `Solvent.EventStore.ack/2` once you are done with the event,
  unless you want the event to stay in the event store forever.

  > #### Tip {: .tip}
  >
  > Use the `Solvent.Subscriber` module to make a subscriber that automatically acknowledges events,
  > along with lots of other nice features.

  You can also create a `Solvent.Subscription` struct yourself, and pass it to `subscribe/1`.
  """
  def subscribe(sink, opts \\ [])

  def subscribe(module, opts) when is_atom(module) and is_list(opts) do
    subscription = apply(module, :subscription, [opts])

    subscribe(subscription)
  end

  def subscribe(sink, opts) do
    sub = Subscription.new(sink, opts)

    subscribe(sub)
  end

  @doc """
  Remove a subscriber.
  """
  def unsubscribe(id) do
    Solvent.SubscriberStore.delete(id)
  end

  @doc """
  Publish an event to the event bus, triggering all subscriber functions.

  Only a type (AKA "topic") is required. All other fields can be supplied using the options.
  See `Solvent.Event` for details on what that struct contains.
  All values given as options are inserted into the event struct.

  ID values are version 7 UUIDs by default, and you don't need to provide them.

  I would recommend using the CloudEvents format for your event's `type` field, which starts with a reversed DNS name, and is dot-separated.
  This will help avoid collisions with events from other applications.

      Solvent.publish("io.github.cantido.myevent.published")
      {:ok, "0b06bdb7-06a7-4df9-a825-1fd225ceea43"}

  It is also recommended to provide a `source` option to identify your application, especially if your subscription did not specify a `source`.
  Otherwise, all events will be published with a default source, and subscriptions will be triggered for every event traveling through Solvent (including events from other applications).
  A good `source` value is either a fully-qualified domain name, or a UUID in URN format.

      iex> Solvent.publish(
      ...>   "io.github.cantido.documentation.read",
      ...>   id: "read-docs-id",
      ...>   source: "myapp"
      ...> )
      {:ok, {"myapp", "read-docs-id"}}

  You can also build an event yourself with `Solvent.Event.new/1` and publish it with this function.
  """
  def publish(event, opts \\ [])

  def publish(type, opts) when is_binary(type) do
    event = Solvent.Event.new(type, opts)
    publish(event, opts)
  end

  def publish(event, _opts) do
    Task.Supervisor.start_child(Solvent.TaskSupervisor, fn ->
      subscribers = Solvent.SubscriberStore.listeners_for(event)
      subscriber_ids = Enum.map(subscribers, &elem(&1, 1)) |> Enum.uniq()

      Logger.debug("Publishing event #{event.id}, (#{event.type}). Subscribers are: #{inspect subscriber_ids, pretty: true}")

      :telemetry.execute(
        [:solvent, :event, :published],
        %{},
        %{event_source: event.source, event_id: event.id, event_type: event.type, subscriber_count: Enum.count(subscribers)}
      )

      if Enum.count(subscribers) > 0 do
          :ok = Solvent.EventStore.insert(event, subscriber_ids)

        notifier_fun = fn {subscriber_id, subscription} ->
          Task.Supervisor.start_child(Solvent.TaskSupervisor, fn ->
            :telemetry.span(
              [:solvent, :subscriber, :processing],
              %{subscriber_id: subscriber_id, event_source: event.source, event_id: event.id, event_type: event.type},
              fn ->
                Logger.metadata(
                  solvent_subscriber_id: subscriber_id,
                  solvent_event_source: event.source,
                  solvent_event_id: event.id,
                  solvent_event_type: event.type
                )
                Solvent.Sink.deliver(subscription.sink, event)
                {:ok, %{}}
              end
            )

            :ok
          end)
        end

        Task.Supervisor.async_stream(Solvent.TaskSupervisor, subscribers, notifier_fun,
          timeout: :infinity
        )
        |> Stream.run()
      else
        Logger.warn("No subscribers matched event type #{event.type}. Solvent will not insert the event into the event store.")
      end
    end)

    {:ok, {event.source, event.id}}
  end

  def build_filters(filters) when is_list(filters) do
    Enum.map(filters, &build_filter/1)
  end

  defp build_filter({:exact, props}), do: %Solvent.Filter.Exact{properties: props}
  defp build_filter({:prefix, props}), do: %Solvent.Filter.Prefix{properties: props}
  defp build_filter({:suffix, props}), do: %Solvent.Filter.Suffix{properties: props}

  defp build_filter({:any, subs}) do
    %Solvent.Filter.Any{subfilters: build_filters(subs)}
  end

  defp build_filter({:all, subs}) do
    %Solvent.Filter.All{subfilters: build_filters(subs)}
  end

  defp build_filter({:not, subfilter}) do
    %Solvent.Filter.Not{subfilter: build_filter(subfilter)}
  end
end