lib/phoenix_micro/outbox.ex

defmodule PhoenixMicro.Outbox do
  @moduledoc """
  Transactional outbox pattern for guaranteed message delivery.

  ## The problem this solves

  A naive `Repo.insert(order) + PhoenixMicro.publish(event)` has a race:

  ```
  1. Repo.insert(order)   ← succeeds
  2. publish(event)       ← crashes / network error
     └── event is LOST
  ```

  The outbox pattern eliminates the race by writing the event *inside the
  same database transaction* as the business record, then relaying it to
  the broker in a separate background process:

  ```
  Transaction:
    1. Repo.insert(order)
    2. Outbox.enqueue(event)   ← writes to outbox_messages table
    COMMIT ──────────────────── both succeed or both roll back

  Relay (background):
    3. Poll outbox_messages WHERE relayed_at IS NULL
    4. publish(event) to broker
    5. UPDATE outbox_messages SET relayed_at = now()
  ```

  ## Setup

  ### 1. Generate and run the migration

      mix phx.gen.migration create_outbox_messages

  Add to the migration:

      def change do
        create table(:outbox_messages, primary_key: false) do
          add :id,           :uuid,    primary_key: true, default: fragment("gen_random_uuid()")
          add :topic,        :string,  null: false
          add :payload,      :map,     null: false
          add :headers,      :map,     default: %{}
          add :attempt,      :integer, default: 1
          add :relayed_at,   :utc_datetime_usec
          add :failed_at,    :utc_datetime_usec
          add :last_error,   :string
          timestamps(type: :utc_datetime_usec)
        end

        create index(:outbox_messages, [:relayed_at, :inserted_at])
        create index(:outbox_messages, [:failed_at])
      end

  ### 2. Configure

      config :phoenix_micro,
        outbox: [
          repo: MyApp.Repo,
          poll_interval_ms: 1_000,
          batch_size: 100,
          max_attempts: 5
        ]

  ### 3. Add the relay to your supervision tree

      children = [
        MyApp.Repo,
        PhoenixMicro.Outbox.Relay
      ]

  ### 4. Use inside transactions

      Repo.transaction(fn ->
        order = Repo.insert!(Order.changeset(%Order{}, params))
        Outbox.enqueue("orders.placed", %{order_id: order.id})
      end)

  ## Guarantees

  - **At-least-once delivery** — if the relay crashes mid-flight, the
    message is still in the database and will be retried.
  - **No phantom events** — if the outer transaction rolls back, the
    outbox row rolls back too.
  - **Idempotent** — the relay sets `relayed_at` only after a successful
    broker publish, so duplicate delivery is bounded.

  ## Deduplication

  Consumers should handle duplicates using `PhoenixMicro.Middleware.Idempotency`
  or their own deduplication logic. The message `id` is stable across retries.
  """

  alias PhoenixMicro.{Config, Message}

  @doc """
  Enqueues a message in the outbox table.
  Must be called inside an Ecto `Repo.transaction/1` block.

  Returns `{:ok, outbox_record}` or `{:error, changeset}`.
  """
  @spec enqueue(String.t(), term(), keyword()) :: {:ok, map()} | {:error, term()}
  def enqueue(topic, payload, opts \\ []) do
    repo = outbox_repo!()

    record = %{
      id: Keyword.get(opts, :id, Message.generate_id()),
      topic: topic,
      payload: payload,
      headers: Keyword.get(opts, :headers, %{}),
      attempt: 1,
      relayed_at: nil,
      failed_at: nil,
      last_error: nil
    }

    case repo.insert(outbox_schema().__struct__(record), returning: true) do
      {:ok, row} -> {:ok, row}
      {:error, changeset} -> {:error, changeset}
    end
  rescue
    e -> {:error, e}
  end

  @doc """
  Enqueues a message, raising on failure.
  """
  @spec enqueue!(String.t(), term(), keyword()) :: map()
  def enqueue!(topic, payload, opts \\ []) do
    case enqueue(topic, payload, opts) do
      {:ok, row} -> row
      {:error, reason} -> raise "Outbox.enqueue! failed: #{inspect(reason)}"
    end
  end

  @doc false
  def outbox_repo! do
    config = Config.get(:outbox, [])

    Keyword.get(config, :repo) ||
      raise "PhoenixMicro.Outbox requires `config :phoenix_micro, outbox: [repo: MyApp.Repo]`"
  end

  @doc false
  def outbox_config, do: Config.get(:outbox, [])

  @doc false
  def outbox_schema do
    Keyword.get(outbox_config(), :schema, PhoenixMicro.Outbox.Message)
  end
end

# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Default Ecto schema for outbox_messages — only compiled when Ecto is loaded
# ---------------------------------------------------------------------------

defmodule PhoenixMicro.Outbox.Message do
  @moduledoc """
  Ecto schema for the `outbox_messages` table.

  Only available when `ecto` and `ecto_sql` are in your application's deps.
  Add them to use the transactional outbox feature:

      {:ecto_sql, "~> 3.11"}
      {:postgrex, ">= 0.0.0"}

  You can replace this schema by setting:

      config :phoenix_micro, outbox: [schema: MyApp.OutboxMessage]
  """

  # When Ecto is available, delegate to it at runtime via apply/3.
  # We cannot use `use Ecto.Schema` inside `if` because `use` is a macro
  # that expands at compile time regardless of the runtime condition.
  # Instead, define a plain struct that mirrors the schema columns.
  defstruct [
    :id,
    :topic,
    :payload,
    :headers,
    :attempt,
    :relayed_at,
    :failed_at,
    :last_error,
    :inserted_at,
    :updated_at
  ]

  @doc "Returns true when Ecto is available in the host application."
  @spec ecto_available?() :: boolean()
  def ecto_available?, do: Code.ensure_loaded?(Ecto.Schema)

  @doc false
  def changeset(record, attrs) do
    if Code.ensure_loaded?(Ecto.Changeset) do
      record
      |> ecto_cast(attrs)
      |> ecto_validate_required([:topic, :payload])
    else
      {:error, :ecto_not_available}
    end
  end

  defp ecto_cast(record, attrs) do
    apply(Ecto.Changeset, :cast, [
      record,
      attrs,
      [:topic, :payload, :headers, :attempt, :relayed_at, :failed_at, :last_error]
    ])
  end

  defp ecto_validate_required(changeset, fields) do
    apply(Ecto.Changeset, :validate_required, [changeset, fields])
  end

  @doc """
  Returns the Ecto schema source table name.
  Used by the Relay when building queries.
  """
  @spec __schema__(:source) :: String.t()
  def __schema__(:source), do: "outbox_messages"
end

# ---------------------------------------------------------------------------
# Relay — background process that polls and publishes
# ---------------------------------------------------------------------------

defmodule PhoenixMicro.Outbox.Relay do
  @moduledoc """
  GenServer that polls the `outbox_messages` table and relays undelivered
  messages to the configured transport.

  Requires `ecto_sql` in your application's deps:

      {:ecto_sql, "~> 3.11"}

  ## Behaviour

  Every `:poll_interval_ms` (default 1000ms) the relay:

  1. Selects up to `:batch_size` rows where `relayed_at IS NULL`
     and `attempt <= max_attempts`, ordered by `inserted_at ASC`.
  2. For each row, calls `PhoenixMicro.publish_sync/3`.
  3. On success: marks `relayed_at = now()`.
  4. On failure: increments `attempt`, sets `last_error`.
     Once `attempt > max_attempts`, sets `failed_at = now()` and gives up.
  """

  use GenServer

  require Logger

  alias PhoenixMicro.{Outbox, Telemetry}

  @default_poll_interval_ms 1_000
  @default_batch_size 100
  @default_max_attempts 5

  defstruct [:repo, :poll_interval_ms, :batch_size, :max_attempts, :schema]

  # ---------------------------------------------------------------------------
  # Public API
  # ---------------------------------------------------------------------------

  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      restart: :permanent,
      type: :worker
    }
  end

  # ---------------------------------------------------------------------------
  # GenServer
  # ---------------------------------------------------------------------------

  @impl GenServer
  def init(opts) do
    if Code.ensure_loaded?(Ecto.Query) do
      config = Keyword.merge(Outbox.outbox_config(), opts)
      repo = Keyword.get(config, :repo)

      unless repo do
        raise "PhoenixMicro.Outbox.Relay requires `config :phoenix_micro, outbox: [repo: MyApp.Repo]`"
      end

      state = %__MODULE__{
        repo: repo,
        schema: Keyword.get(config, :schema, PhoenixMicro.Outbox.Message),
        poll_interval_ms: Keyword.get(config, :poll_interval_ms, @default_poll_interval_ms),
        batch_size: Keyword.get(config, :batch_size, @default_batch_size),
        max_attempts: Keyword.get(config, :max_attempts, @default_max_attempts)
      }

      schedule_poll(state.poll_interval_ms)
      Logger.info("[Outbox.Relay] Started, polling every #{state.poll_interval_ms}ms")
      {:ok, state}
    else
      Logger.warning(
        "[Outbox.Relay] ecto_sql not loaded — relay disabled. " <>
          "Add {:ecto_sql, \"~> 3.11\"} to your application's deps."
      )

      :ignore
    end
  end

  @impl GenServer
  def handle_info(:poll, state) do
    relay_pending(state)
    schedule_poll(state.poll_interval_ms)
    {:noreply, state}
  end

  @impl GenServer
  def handle_info(_msg, state), do: {:noreply, state}

  # ---------------------------------------------------------------------------
  # Relay logic — all Ecto calls are via apply/3 to avoid compile-time errors
  # ---------------------------------------------------------------------------

  defp relay_pending(state) do
    pending = fetch_pending(state)

    if pending != [] do
      Logger.debug("[Outbox.Relay] Relaying #{length(pending)} pending messages")
    end

    Enum.each(pending, &relay_one(&1, state))
  end

  defp fetch_pending(state) do
    # All Ecto.Query macros (from, where, order_by, limit, field) expand at
    # compile time and cannot be used here. Raw SQL is the only approach that
    # is truly runtime-only and works regardless of whether Ecto is a dep.
    table = state.schema.__schema__(:source)

    sql = """
    SELECT * FROM #{table}
    WHERE relayed_at IS NULL AND failed_at IS NULL
    ORDER BY inserted_at ASC
    LIMIT $1
    """

    result = apply(state.repo, :query!, [sql, [state.batch_size]])
    rows_to_structs(result, state.schema)
  rescue
    _e -> []
  end

  defp rows_to_structs(%{columns: cols, rows: rows}, schema) do
    col_atoms = Enum.map(cols, &String.to_existing_atom/1)

    Enum.map(rows, fn row ->
      fields = col_atoms |> Enum.zip(row) |> Map.new()
      struct(schema, fields)
    end)
  rescue
    _e -> []
  end

  defp relay_one(row, state) do
    start = System.monotonic_time()

    case PhoenixMicro.publish_sync(row.topic, row.payload,
           headers: row.headers || %{},
           correlation_id: to_string(row.id)
         ) do
      :ok ->
        duration = System.monotonic_time() - start
        mark_relayed(row, state)

        Telemetry.message_published(row.topic, %{
          transport: :outbox,
          duration: duration,
          outbox_id: row.id
        })

        Logger.debug("[Outbox.Relay] Relayed #{row.id}#{row.topic}")

      {:error, reason} ->
        handle_relay_failure(row, reason, state)
    end
  end

  defp mark_relayed(row, state) do
    table = state.schema.__schema__(:source)
    sql = "UPDATE #{table} SET relayed_at = $1 WHERE id = $2"
    apply(state.repo, :query!, [sql, [DateTime.utc_now(), row.id]])
  rescue
    _e -> :ok
  end

  defp handle_relay_failure(row, reason, state) do
    new_attempt = row.attempt + 1
    error_msg = inspect(reason)

    Logger.warning(
      "[Outbox.Relay] Failed to relay #{row.id} (attempt #{row.attempt}): #{error_msg}"
    )

    table = state.schema.__schema__(:source)

    if new_attempt > state.max_attempts do
      Logger.error("[Outbox.Relay] Giving up on #{row.id} after #{row.attempt} attempts")

      sql = "UPDATE #{table} SET failed_at = $1, last_error = $2, attempt = $3 WHERE id = $4"
      apply(state.repo, :query!, [sql, [DateTime.utc_now(), error_msg, new_attempt, row.id]])

      Telemetry.message_failed(row.topic, %{reason: reason, final: true, outbox_id: row.id})
    else
      sql = "UPDATE #{table} SET attempt = $1, last_error = $2 WHERE id = $3"
      apply(state.repo, :query!, [sql, [new_attempt, error_msg, row.id]])
    end
  rescue
    _e -> :ok
  end

  defp schedule_poll(interval) do
    Process.send_after(self(), :poll, interval)
  end
end