lib/chimeway/dispatch/oban_worker.ex

if Code.ensure_loaded?(Oban) do
  defmodule Chimeway.Dispatch.UnhandledOutcomeError do
    @moduledoc """
    Raised by `Chimeway.Dispatch.ObanWorker` when `map_outcome_to_oban_return/4`
    encounters a (outcome, error_class, status) shape that none of the documented
    clauses match AND the in-band convergence guard cannot legally fire (delivery
    is not in :failed status, or this is not the final attempt). This is the
    loud-failure branch of the BL-02 fix — see Plan 14-10.

    The exception carries enough metadata for an operator to reproduce the
    scenario and extend either `Executor.classify/1` or the documented worker
    clauses.
    """
    defexception [
      :message,
      :delivery_id,
      :outcome,
      :error_class,
      :status,
      :attempt,
      :max_attempts
    ]

    @impl true
    def exception(opts) do
      delivery_id = Keyword.fetch!(opts, :delivery_id)
      outcome = Keyword.fetch!(opts, :outcome)
      error_class = Keyword.fetch!(opts, :error_class)
      status = Keyword.fetch!(opts, :status)
      attempt = Keyword.fetch!(opts, :attempt)
      max_attempts = Keyword.fetch!(opts, :max_attempts)

      message =
        "unhandled delivery outcome shape (BL-02): " <>
          "delivery_id=#{inspect(delivery_id)} outcome=#{inspect(outcome)} " <>
          "error_class=#{inspect(error_class)} status=#{inspect(status)} " <>
          "attempt=#{attempt}/#{max_attempts}"

      %__MODULE__{
        message: message,
        delivery_id: delivery_id,
        outcome: outcome,
        error_class: error_class,
        status: status,
        attempt: attempt,
        max_attempts: max_attempts
      }
    end
  end

  defmodule Chimeway.Dispatch.ObanWorker do
    @moduledoc """
    Oban worker that performs a single Chimeway delivery by delivery_id.

    Job args contain only `delivery_id` (UUID string). Full payload is never
    stored in Oban job args — the delivery row is the source of truth.

    ## Transactional enqueue

    Insert this worker's job inside the same `Ecto.Multi` as delivery row creation:

        alias Chimeway.Dispatch.ObanWorker

        Ecto.Multi.new()
        |> Ecto.Multi.insert(:delivery, delivery_changeset)
        |> Oban.insert(:job, ObanWorker.new(%{delivery_id: delivery.id}))
        |> Chimeway.Repo.transaction()

    Rolling back the `Ecto.Multi` also rolls back the job — no orphaned jobs.

    ## Idempotency and terminal-state short-circuit

    The worker checks delivery terminal states (via `Chimeway.Deliveries.terminal_states/0`)
    on every execution. A delivery already in `:succeeded`, `:suppressed`, or `:cancelled`
    returns `:ok` immediately with no adapter call and no new attempt row.

    ## Phase 14 retry contract (REL-02 / REL-03)

    OSS Oban 2.21.1 has no exhausted callback. This worker uses an in-band
    `attempt == max_attempts` guard inside `perform/1` to know when it has reached
    the final retry — that is the moment the durable `:cancelled retries_exhausted`
    state is written via `Deliveries.exhaust_delivery/1`.

    Return-value contract:

    - Successful delivery -> `:ok`.
    - Permanent / bounced failure -> `:ok` (record_attempt already converged the
      delivery to `:cancelled` with the appropriate suppression_reason; Oban does
      not retry).
    - Transient failure with retries remaining (`attempt < max_attempts`) ->
      `{:error, reason}`; Oban schedules a retry under its default exponential-
      with-jitter `c:Oban.Worker.backoff/1`.
    - Transient failure on the final attempt (`attempt == max_attempts`) ->
      `Deliveries.exhaust_delivery/1` writes the `:cancelled retries_exhausted`
      terminal state, then this function returns `:ok` so the Oban job is marked
      `:completed` instead of `:discarded` (RESEARCH Pitfall 1: keeps operator
      telemetry dashboards clean — the durable explanation lives on the delivery
      row, not on the Oban job).
    """

    use Oban.Worker,
      queue: :chimeway_delivery,
      max_attempts: 5,
      unique: [fields: [:args], keys: [:delivery_id], period: 60]

    alias Chimeway.{Deliveries, Delivery, DeliveryAttempt, Policy}
    alias Chimeway.Dispatch.Executor
    alias Chimeway.Telemetry

    require Logger

    @impl Oban.Worker
    def perform(%Oban.Job{
          args: %{"delivery_id" => delivery_id},
          attempt: attempt,
          max_attempts: max_attempts
        }) do
      delivery = Deliveries.get_delivery!(delivery_id)

      if delivery.status in Deliveries.terminal_states() or delivery.orchestration_state != :ready do
        :ok
      else
        Telemetry.span(
          [:dispatch, :perform],
          Telemetry.safe_meta(%{
            delivery_id: delivery.id,
            channel: delivery.channel,
            notification_key: Map.get(delivery.metadata || %{}, "notification_key"),
            attempt: attempt,
            max_attempts: max_attempts
          }),
          fn ->
            result = handle_delivery(delivery, attempt, max_attempts)
            {result, %{}}
          end
        )
      end
    end

    defp handle_delivery(%Delivery{} = delivery, attempt, max_attempts) do
      case Policy.evaluate(delivery, check_read_state: delivery.delay_fallback) do
        {:suppress, reason} ->
          case Deliveries.suppress_delivery(delivery, reason, checkpoint: :perform) do
            {:ok, _} -> :ok
            {:error, _} = err -> err
          end

        {:defer, decision} ->
          handle_deferred_delivery(delivery, decision, attempt, max_attempts)

        {:ok, :proceed} ->
          do_dispatch(delivery, attempt, max_attempts)
      end
    end

    defp handle_deferred_delivery(%Delivery{} = delivery, decision, attempt, max_attempts) do
      with {:ok, updated_delivery} <- Deliveries.apply_planning_decision(delivery, decision) do
        case updated_delivery do
          %Delivery{orchestration_state: :ready} ->
            do_dispatch(updated_delivery, attempt, max_attempts)

          %Delivery{} ->
            if updated_delivery.status in Deliveries.terminal_states() do
              :ok
            else
              case configured_dispatcher().dispatch_delivery(
                     updated_delivery,
                     pre_planned: true,
                     post_commit: true
                   ) do
                {:ok, _delivery} -> :ok
                {:skip, _delivery} -> :ok
                {:error, reason} -> {:error, {:deferred_handoff_failed, reason}}
              end
            end
        end
      end
    end

    defp do_dispatch(%Delivery{id: id}, attempt, max_attempts) do
      fresh = Deliveries.get_delivery!(id)

      if fresh.status in Deliveries.terminal_states() do
        :ok
      else
        case Executor.run_delivery(fresh) do
          {:ok, %{attempt: %DeliveryAttempt{} = recorded, delivery: %Delivery{} = updated}} ->
            map_outcome_to_oban_return(recorded, updated, attempt, max_attempts)

          {:error, step, reason, _changes} ->
            {:error, {step, reason}}

          {:error, _reason} = error ->
            error
        end
      end
    end

    # Maps the recorded attempt outcome+error_class to an Oban perform/1 return value.
    #
    # - succeeded                                       -> :ok
    # - permanent/bounced (delivery already :cancelled) -> :ok (record_attempt converged)
    # - temporary AND attempt == max_attempts           -> exhaust_delivery + :ok
    # - temporary AND attempt < max_attempts            -> {:error, reason}
    defp map_outcome_to_oban_return(
           %DeliveryAttempt{outcome: :succeeded},
           _delivery,
           _attempt,
           _max
         ),
         do: :ok

    defp map_outcome_to_oban_return(
           %DeliveryAttempt{error_class: error_class},
           %Delivery{status: :cancelled},
           _attempt,
           _max
         )
         when error_class in ["permanent", "bounced"] do
      # record_attempt already wrote :cancelled with suppression_reason
      # "permanent_failure" or "bounced". No retry. Return :ok so Oban completes.
      :ok
    end

    defp map_outcome_to_oban_return(
           %DeliveryAttempt{error_class: "temporary"} = recorded,
           %Delivery{status: :failed} = delivery,
           attempt,
           max_attempts
         ) do
      reason = error_reason_from_attempt(recorded)

      if attempt >= max_attempts do
        # In-band exhaustion guard (RESEARCH Pattern 2 / Pitfall 1).
        # Write the durable terminal state, then return :ok so the Oban job is
        # marked :completed rather than :discarded.
        case Deliveries.exhaust_delivery(delivery) do
          {:ok, _exhausted} -> :ok
          {:error, exhaust_reason} -> {:error, {:exhaust_failed, exhaust_reason, reason}}
        end
      else
        # Retry under the documented Oban budget. Default backoff curve applies.
        {:error, reason}
      end
    end

    # Catch-all defensive clause (BL-02 fix). Two branches:
    #   Branch A (convergence): if this is the final attempt AND the delivery is in
    #     :failed, call exhaust_delivery/1 to land the durable :cancelled
    #     retries_exhausted state. Returns :ok so Oban marks the job :completed.
    #   Branch B (loud failure): otherwise, log + raise. The unexpected shape is
    #     a real bug (either Executor.classify/1 grew a new return shape that the
    #     documented map_outcome_to_oban_return clauses do not cover, or some
    #     adapter return path bypassed classification). Surface it loudly so the
    #     operator notices instead of silently leaking a non-terminal delivery row.
    defp map_outcome_to_oban_return(
           %DeliveryAttempt{} = recorded,
           %Delivery{} = delivery,
           attempt_n,
           max
         ) do
      if attempt_n >= max and delivery.status == :failed do
        # Branch A: convergence — mirror the temporary/exhaustion path.
        case Deliveries.exhaust_delivery(delivery) do
          {:ok, _exhausted} ->
            :ok

          {:error, exhaust_reason} ->
            {:error,
             {:exhaust_failed, exhaust_reason,
              {:unhandled_outcome, recorded.outcome, recorded.error_class, delivery.status}}}
        end
      else
        # Branch B: loud failure — the convergence helper cannot legally write
        # from this state, OR we still have retries to burn but the shape is wrong.
        # Either way the catch-all itself is the bug; raise so the contract violation
        # is impossible to miss.
        Logger.error(
          "unhandled delivery outcome (BL-02): delivery_id=#{inspect(delivery.id)} " <>
            "outcome=#{inspect(recorded.outcome)} error_class=#{inspect(recorded.error_class)} " <>
            "status=#{inspect(delivery.status)} attempt=#{attempt_n}/#{max}"
        )

        raise Chimeway.Dispatch.UnhandledOutcomeError,
          delivery_id: delivery.id,
          outcome: recorded.outcome,
          error_class: recorded.error_class,
          status: delivery.status,
          attempt: attempt_n,
          max_attempts: max
      end
    end

    defp error_reason_from_attempt(%DeliveryAttempt{provider_response: provider_response}) do
      case provider_response do
        %{} = pr when map_size(pr) > 0 -> {:adapter_temporary, pr}
        _ -> :adapter_temporary
      end
    end

    defp configured_dispatcher do
      Application.get_env(:chimeway, :dispatcher, Chimeway.Dispatch.Sync)
    end
  end
end