Skip to main content

lib/outbox.ex

defmodule Outbox do
  @moduledoc """
  Public facade for the Outbox transactional event bus.

  Producers call `publish/2` from inside their domain transactions to
  emit events that subscribers (registered in application config) react
  to via `Outbox.SubscriberJob` after the dispatcher fans them out.

  See `Outbox.Subscriber` for the subscriber contract and the README for
  the full architecture overview.
  """

  alias Outbox.OutboxEvent

  @typedoc "Event name. Convention: `<entity>.<past-tense-verb>` (lowercase, dot-separated)."
  @type name :: String.t()

  @typedoc "JSON-serializable payload. Atom keys are converted to strings on insert."
  @type payload :: map()

  @doc """
  Publish a domain event.

  This function performs a single `Repo.insert/1` and **does not open
  its own transaction**. The caller is responsible for wrapping the
  domain write and the `publish/2` call in `Repo.transaction/1` if
  atomicity-with-the-domain-write is required (it almost always is).

  Atom keys in the payload are converted to strings so subscribers
  always see string keys (consistent with what JSONB round-trips
  produce).

  ## Options

    * `:context` — opaque map merged over the ambient process context
      (see `put_context/1`).
    * `:sample` — float in `0.0..1.0`. The event is kept with this
      probability; when dropped, nothing is persisted or broadcast and
      `{:ok, :sampled_out}` is returned. For high-volume telemetry.
    * `:transient` — when `true`, the event is **not persisted**: it is
      broadcast to the configured PubSub (if any) and `{:ok, :transient}`
      is returned. No row, no Oban fan-out, no delivery guarantee — for
      loss-tolerant signals only, never an audit system of record.

  If a validator is registered for this event name (see
  `Outbox.Config.schemas/0`), the stringified payload is validated first;
  a failure returns `{:error, {:schema, reason}}` and nothing is persisted
  or broadcast.

  ## Examples

      Repo.transaction(fn ->
        {:ok, product} = Repo.insert(changeset)
        {:ok, _event} = Outbox.publish("product.created", %{"id" => product.id})
        product
      end)
  """
  @context_key {__MODULE__, :context}

  @spec publish(name(), payload(), keyword()) ::
          {:ok, OutboxEvent.t()}
          | {:ok, :sampled_out | :transient}
          | {:error, Ecto.Changeset.t()}
          | {:error, {:schema, term()}}
  def publish(name, payload, opts \\ []) when is_binary(name) and is_map(payload) do
    context = effective_context(opts)
    string_payload = stringify_keys(payload)

    with :ok <- validate_schema(name, string_payload) do
      cond do
        sampled_out?(opts) ->
          {:ok, :sampled_out}

        Keyword.get(opts, :transient, false) ->
          broadcast_transient(name, string_payload, context)
          {:ok, :transient}

        true ->
          repo = Outbox.Config.repo()

          %OutboxEvent{}
          |> OutboxEvent.changeset(%{name: name, payload: string_payload, context: context})
          |> repo.insert()
      end
    end
  end

  defp validate_schema(name, payload) do
    case Map.get(Outbox.Config.schemas(), name) do
      nil -> :ok
      fun when is_function(fun, 1) -> wrap_schema_result(fun.(payload))
      {mod, f} -> wrap_schema_result(apply(mod, f, [payload]))
    end
  end

  defp wrap_schema_result(:ok), do: :ok
  defp wrap_schema_result({:ok, _}), do: :ok
  defp wrap_schema_result({:error, reason}), do: {:error, {:schema, reason}}

  defp sampled_out?(opts) do
    case Keyword.get(opts, :sample) do
      nil -> false
      rate when is_number(rate) -> :rand.uniform() > rate
    end
  end

  defp broadcast_transient(name, payload, context) do
    case Outbox.Config.pubsub() do
      nil ->
        :ok

      pubsub ->
        Phoenix.PubSub.broadcast(
          pubsub,
          Outbox.Config.pubsub_topic(),
          {:domain_event, name, payload,
           %{event_id: nil, inserted_at: DateTime.utc_now(), context: context, transient: true}}
        )
    end
  end

  @doc """
  Merge `map` into the ambient context for the current process.

  The ambient context is attached to every event published from this
  process (unless a per-call `:context` overrides a key). Set it once
  per request (e.g. in a plug or LiveView `on_mount`) so callers don't
  thread context through every `publish/3`. The library treats the map
  as opaque — hosts decide its keys (e.g. `actor_id`, `actor_type`).
  Atom keys are stringified.
  """
  @spec put_context(map()) :: :ok
  def put_context(map) when is_map(map) do
    merged = Map.merge(get_context(), stringify_keys(map))
    Process.put(@context_key, merged)
    :ok
  end

  @doc "Returns the current process's ambient context map (defaults to `%{}`)."
  @spec get_context() :: map()
  def get_context, do: Process.get(@context_key, %{})

  @doc "Clears the current process's ambient context."
  @spec clear_context() :: :ok
  def clear_context do
    Process.delete(@context_key)
    :ok
  end

  defp effective_context(opts) do
    per_call = opts |> Keyword.get(:context, %{}) |> stringify_keys()
    Map.merge(get_context(), per_call)
  end

  @doc "Returns a Supervisor child_spec for `Outbox.Application`."
  @spec child_spec(keyword()) :: Supervisor.child_spec()
  def child_spec(opts \\ []) do
    %{
      id: __MODULE__,
      start: {Outbox.Application, :start_link, [opts]},
      type: :supervisor
    }
  end

  defp stringify_keys(map) when is_map(map) do
    for {k, v} <- map, into: %{} do
      key = if is_atom(k), do: Atom.to_string(k), else: k
      {key, stringify_keys(v)}
    end
  end

  defp stringify_keys(list) when is_list(list), do: Enum.map(list, &stringify_keys/1)
  defp stringify_keys(other), do: other
end