guides/middleware.md

# Middleware

Middleware wraps message handling in composable, ordered layers — outermost first.
Every middleware receives the `PhoenixMicro.Message` struct and a `next` function
representing the rest of the chain.

## Composing middleware

```elixir
defmodule MyApp.Payments.CreatedConsumer do
  use PhoenixMicro.Consumer

  topic "payments.created"

  # Middleware runs left-to-right (Logger outermost, Idempotency innermost)
  middleware [
    PhoenixMicro.Middleware.Logger,
    PhoenixMicro.Middleware.Metrics,
    PhoenixMicro.Middleware.Retry,
    {PhoenixMicro.Middleware.CircuitBreaker,
     threshold: 5, window_ms: 60_000, reset_timeout_ms: 30_000},
    {PhoenixMicro.Middleware.Idempotency,
     store: PhoenixMicro.Middleware.Idempotency.ETSStore}
  ]
end
```

## Built-in middleware

### Logger

Emits a structured `debug` log on message receipt and a `debug` or `warning`
log on outcome.

```elixir
middleware [PhoenixMicro.Middleware.Logger]
```

### Metrics

Emits `:telemetry.span` events for every message. Use with `PhoenixMicro.Telemetry.metrics/0`
to wire into `TelemetryMetricsPrometheus` or similar.

```elixir
middleware [PhoenixMicro.Middleware.Metrics]
```

### Retry

Middleware-level retry with exponential backoff and jitter — runs _before_ the
consumer-level retry configured via `retry max_attempts: N`.

```elixir
middleware [
  {PhoenixMicro.Middleware.Retry,
   max: 3,
   base_delay: 200,
   max_delay: 5_000}
]
```

### CircuitBreaker

ETS-backed 3-state fuse per consumer topic.

| State        | Behaviour                                                                   |
| ------------ | --------------------------------------------------------------------------- |
| `:closed`    | Normal — all messages processed                                             |
| `:open`      | Rejecting — returns `{:error, :circuit_open}` immediately                   |
| `:half_open` | Probing — allows one message through; closes on success, reopens on failure |

```elixir
middleware [
  {PhoenixMicro.Middleware.CircuitBreaker,
   threshold:        10,       # failures within window before opening
   window_ms:        60_000,   # sliding failure window
   reset_timeout_ms: 30_000}   # how long to stay open before half-open probe
]
```

Emits telemetry on trip and reset:

- `[:phoenix_micro, :circuit_breaker, :tripped]`
- `[:phoenix_micro, :circuit_breaker, :reset]`
- `[:phoenix_micro, :circuit_breaker, :rejected]`

### Idempotency

Deduplicates messages by `message.id`. Pluggable store — use ETS in dev/test,
Redis or PostgreSQL in production.

```elixir
# Dev / test
middleware [
  {PhoenixMicro.Middleware.Idempotency,
   store: PhoenixMicro.Middleware.Idempotency.ETSStore}
]

# Production — custom Redis store
middleware [
  {PhoenixMicro.Middleware.Idempotency,
   store: MyApp.Middleware.RedisIdempotencyStore}
]
```

Custom store (implement the `PhoenixMicro.IdempotencyStore` behaviour):

```elixir
defmodule MyApp.Middleware.RedisIdempotencyStore do
  @behaviour PhoenixMicro.IdempotencyStore

  @impl PhoenixMicro.IdempotencyStore
  def seen?(id) do
    case MyApp.Redis.get("idem:#{id}") do
      {:ok, nil} -> false
      {:ok, _}   -> true
      _          -> false
    end
  end

  @impl PhoenixMicro.IdempotencyStore
  def mark_seen(id) do
    MyApp.Redis.set("idem:#{id}", "1", ex: 86_400)
    :ok
  end
end
```

### Tracing

OpenTelemetry-compatible. Extracts W3C `traceparent` headers and creates spans.
Degrades gracefully if `:opentelemetry` is not installed.

```elixir
middleware [PhoenixMicro.Middleware.Tracing]
```

Requires in your app's deps (optional):

```elixir
{:opentelemetry, "~> 1.3"},
{:opentelemetry_api, "~> 1.3"}
```

## Writing custom middleware

Implement the `PhoenixMicro.Middleware` behaviour with a single `call/2` callback:

```elixir
defmodule MyApp.Middleware.TenantContext do
  @behaviour PhoenixMicro.Middleware

  require Logger

  @impl PhoenixMicro.Middleware
  def call(%PhoenixMicro.Message{} = message, next) do
    tenant_id = Map.get(message.headers, "x-tenant-id", "default")

    # Set for the duration of this message
    Process.put(:current_tenant_id, tenant_id)
    Logger.metadata(tenant_id: tenant_id)

    result = next.(message)

    # Always clean up — even on error
    Process.delete(:current_tenant_id)
    result
  end
end
```

```elixir
defmodule MyApp.Middleware.RateLimiter do
  @behaviour PhoenixMicro.Middleware

  @impl PhoenixMicro.Middleware
  def call(%PhoenixMicro.Message{} = message, next) do
    topic = message.topic

    case MyApp.RateLimiter.check(topic) do
      :ok ->
        next.(message)

      {:error, :rate_exceeded} ->
        # Returning an error causes the consumer to retry / DLQ
        {:error, :rate_exceeded}
    end
  end
end
```

## Middleware composition order

```
consumer: middleware [A, B, C]

A.call(message, fn ->
  B.call(message, fn ->
    C.call(message, fn ->
      handler.(message)   # your handle/2
    end)
  end)
end)
```

Errors propagate back outward through each layer, so Logger and Metrics always
see the final outcome — regardless of which middleware or handler caused the failure.

## Accessing middleware state

The full middleware chain result is available in `handle_error/3`:

```elixir
@impl PhoenixMicro.Consumer
def handle_error(message, reason, _ctx) do
  Logger.error("Giving up on #{message.id}: #{inspect(reason)}")
  MyApp.Alerts.notify(:consumer_failure, message, reason)
  :ok
end
```