Skip to main content

UPGRADING.md

# Upgrading to KafkaEx 1.0

## Overview

KafkaEx 1.0 brings a cleaner API, removes legacy code, and uses Kayrock as the sole protocol implementation. 
This guide helps you migrate from KafkaEx 0.x.

## Breaking Changes

### Removed Legacy Servers

The following server implementations have been removed:

- `KafkaEx.Server0P8P0`
- `KafkaEx.Server0P8P2`
- `KafkaEx.Server0P9P0`
- `KafkaEx.Server0P10AndLater`

Kayrock is now the only implementation, providing automatic API version negotiation.

### Configuration Changes

**Removed options:**
- `kafka_version` - No longer needed; the client automatically negotiates versions

**Update your config:**
```elixir
# Before (0.x)
config :kafka_ex,
  kafka_version: "kayrock",
  brokers: [{"localhost", 9092}]

# After (1.0)
config :kafka_ex,
  brokers: [{"localhost", 9092}]
```

### Module Reorganization

Modules have been reorganized by domain:

| Old Module               | New Module                       |
|--------------------------|----------------------------------|
| `KafkaEx.GenConsumer`    | `KafkaEx.Consumer.GenConsumer`   |
| `KafkaEx.ConsumerGroup`  | `KafkaEx.Consumer.ConsumerGroup` |
| `KafkaEx.New.Client`     | `KafkaEx.Client`                 |
| `KafkaEx.New.KafkaExAPI` | `KafkaEx.API`                    |
| `KafkaEx.New.Kafka.*`    | `KafkaEx.Messages.*`             |

### API Changes

**New explicit client API:**
```elixir
# Before (0.x) - implicit worker
KafkaEx.produce("topic", 0, "message")
KafkaEx.fetch("topic", 0, 0)  # offset is positional

# After (1.0) - explicit client
{:ok, client} = KafkaEx.API.start_client(brokers: [{"localhost", 9092}])
{:ok, _} = KafkaEx.API.produce(client, "topic", 0, [%{value: "message"}])
{:ok, result} = KafkaEx.API.fetch(client, "topic", 0, 0)
```

### Headers API — `[%Header{}]` instead of `[{key, value}]`

The `headers:` option on every produce function now takes a list of
`%KafkaEx.Messages.Header{}` structs instead of `{key, value}` tuples.
This is a **runtime breaking change** — your code will compile and
only fail with `FunctionClauseError` on the first produce. Migrate
before upgrading in production.

```elixir
# Before (0.x / rc.2)
KafkaEx.API.produce(client, "t", 0, [
  %{value: "v", headers: [{"trace-id", "abc"}, {"tenant", "prod"}]}
])

# After (1.0)
alias KafkaEx.Messages.Header
KafkaEx.API.produce(client, "t", 0, [
  %{value: "v", headers: [
    Header.new("trace-id", "abc"),
    Header.new("tenant", "prod")
  ]}
])
```

Why: the fetch path was already returning `%Header{}` structs. The
produce side was the asymmetric outlier; a single consistent shape
across produce and fetch makes round-trip code cleaner.

### Broker version requirements

- **Minimum: Kafka 0.11.0+** — required for RecordBatch format,
  headers, and timestamps. Earlier brokers will fail at produce.
- **Tested: Kafka 2.1.0 through 3.8.x.**
- **Kafka 2.3+ recommended** — needed for KIP-394 two-step
  JoinGroup semantics with `group.initial.rebalance.delay.ms`.
  kafka_ex auto-handles the two-step dance, but broker support is
  required.
- **Kafka 4.0+** — partial compatibility; tracked in
  [#497](https://github.com/kafkaex/kafka_ex/issues/497). Consumer
  groups may hit protocol changes.

### Optional dependency matrix

Some features require additional deps in your app's `mix.exs`. If
you configure a feature without the backing dep, you'll get an
`UndefinedFunctionError` at runtime (not at startup).

| Feature | Required dep |
|---|---|
| Snappy compression | `{:snappyer, "~> 1.2"}` |
| Zstd compression | `{:ezstd, "~> 1.0"}` |
| LZ4 compression | `{:lz4b, "~> 0.0.13"}` |
| MSK-IAM SASL | `{:jason, "~> 1.0"}`, `{:aws_signature, "~> 0.4"}`, `{:aws_credentials, "~> 1.0"}` |
| OAuth JWT parsing | user's choice (e.g., `{:joken, "~> 2.6"}` — only if your token_provider needs to parse JWTs) |

### 0.x → 1.0 API cheat-sheet

| 0.x | 1.0 |
|---|---|
| `KafkaEx.produce("t", 0, "m")` | `KafkaEx.API.produce_one(client, "t", 0, "m")` |
| `KafkaEx.fetch("t", 0, offset: 0)` | `KafkaEx.API.fetch(client, "t", 0, 0)` |
| `KafkaEx.GenConsumer` | `KafkaEx.Consumer.GenConsumer` |
| `KafkaEx.ConsumerGroup` | `KafkaEx.Consumer.ConsumerGroup` |
| `config :kafka_ex, kafka_version: "kayrock"` | (remove — no longer needed) |
| `headers: [{"k", "v"}]` on produce | `headers: [Header.new("k", "v")]` |

### OffsetCommit error handling (new in 1.0)

In earlier kafka_ex, `:illegal_generation` and related errors were
logged and swallowed — the consumer kept running on a stale
generation until the next heartbeat happened to also fail.

v1.0 classifies OffsetCommit errors across three paths, matching
the reference Kafka clients (Java, librdkafka, brod, kafka-python):

- **Terminal** (`:fenced_instance_id`, `:group_authorization_failed`,
  `:topic_authorization_failed`, `:offset_metadata_too_large`,
  `:invalid_commit_offset_size`) — consumer stops without rejoining.
  Under `restart: :transient` the supervisor does not respawn.
- **Fatal** (`:illegal_generation`, `:unknown_member_id`) — GenConsumer
  casts `{:rejoin_required, reason, stale_gen}` to the group manager
  and self-stops. The manager resets member_id/generation_id and
  runs a rebalance. Duplicate casts from sibling partitions coalesce
  in the manager's mailbox.
- **Retryable** (`:rebalance_in_progress`, `:unstable_offset_commit`,
  `:timeout`, `:coordinator_not_available`, …) — commit is retried
  with exponential backoff.

No user callback is invoked — kafka_ex v1 does not have a synchronous
`handle_commit_failure/3` behaviour (deferred post-1.0). Subscribe
to the new telemetry event to observe failures:

```elixir
:telemetry.attach(
  "my-commit-failure-observer",
  [:kafka_ex, :consumer, :commit_failed],
  fn _event, %{count: 1}, metadata, _ ->
    # metadata: %{group_id, topic, partition, offset, kind, error}
    Logger.warning("Commit failed: #{inspect(metadata)}")
  end,
  nil
)
```

At-least-once semantics are preserved: any uncommitted messages
since the last successful commit will be redelivered after the
rejoin, so your `handle_message_set/2` must be idempotent (or
tolerate duplicates).

### GenConsumer Changes

```elixir
# Before (0.x)
defmodule MyConsumer do
  use KafkaEx.GenConsumer
  # ...
end

# After (1.0)
defmodule MyConsumer do
  use KafkaEx.Consumer.GenConsumer
  # ...
end
```

### ConsumerGroup Changes

```elixir
# Before (0.x)
KafkaEx.ConsumerGroup.start_link(
MyConsumer, "my-group", ["topic"],
  # ...
)

# After (1.0)
KafkaEx.Consumer.ConsumerGroup.start_link(
  MyConsumer, "my-group", ["topic"],
  # ...
)
```

## Deprecations

The following functions and modules are deprecated in v1.0 and scheduled for
removal in v2.0. They continue to work in the entire 1.x series — plan migration
at your convenience.

| Deprecated                                         | Replacement                                        | Notes                                       |
|----------------------------------------------------|----------------------------------------------------|---------------------------------------------|
| `KafkaEx.Config.consumer_group/0`                  | `KafkaEx.Config.default_consumer_group/0`          | Function-for-function swap.                 |
| `KafkaEx.Client.State.max_supported_api_version/3` | `KafkaEx.Client.State.max_supported_api_version/2` | Drop the default arg and match on `{:ok, vsn}` / `{:error, :api_not_supported_by_broker}`. |
| `KafkaEx.Producer.Partitioner.Legacy`              | `KafkaEx.Producer.Partitioner.Default`             | See `KafkaEx.Producer.Partitioner` moduledoc. |

Each of these emits an Elixir compile-time `@deprecated` warning — `mix compile --warnings-as-errors` will flag the first call site.

## Migration Checklist

- [ ] Remove `kafka_version` from config
- [ ] Update `KafkaEx.GenConsumer` to `KafkaEx.Consumer.GenConsumer` (required - code will not compile)
- [ ] Update `KafkaEx.ConsumerGroup` to `KafkaEx.Consumer.ConsumerGroup` (required - code will not compile)
- [ ] Update code to use `KafkaEx.API` functions (optional but recommended)
- [ ] Update any references to `KafkaEx.New.*` modules
- [ ] If you depend on specific protocol versions, add `api_versions` to config (see API Version Resolution below)
- [ ] Run tests and fix deprecation warnings
- [ ] Verify with your Kafka cluster

**Important:** Old module names (`KafkaEx.GenConsumer`, `KafkaEx.ConsumerGroup`, etc.) are **not aliased**. Code using old module names will fail to compile immediately. All references must be updated.

## New Features in 1.0

### Explicit Client API

The new `KafkaEx.API` module provides explicit, client-based functions:

```elixir
{:ok, client} = KafkaEx.API.start_client(brokers: [{"localhost", 9092}])

# Produce
{:ok, metadata} = KafkaEx.API.produce_one(client, "topic", 0, "value")

# Fetch
{:ok, result} = KafkaEx.API.fetch(client, "topic", 0, 0)

# Offsets
{:ok, offset} = KafkaEx.API.latest_offset(client, "topic", 0)
{:ok, _} = KafkaEx.API.commit_offset(client, "group", "topic", [%{partition_num: 0, offset: offset}])

# Topic management
{:ok, _} = KafkaEx.API.create_topic(client, "new-topic", num_partitions: 3)
```

### API Version Resolution

The client now uses the highest protocol version supported by both the broker and the protocol library by default. Previous versions used conservative hardcoded defaults (e.g., fetch v3, produce v3) even when the broker supported higher versions.

If you need to pin specific API versions — for example, to match previous behavior or work around broker-specific issues — use the new `api_versions` application config:

```elixir
config :kafka_ex,
  api_versions: %{
    fetch: 3,
    produce: 3,
    metadata: 1
  }
```

Version selection follows this priority order:

1. Per-request `:api_version` option (highest priority)
2. Application config `api_versions` map
3. Broker-negotiated max (default)

The `GenConsumer` / `ConsumerGroup` `:api_versions` supervisor option continues to work for per-consumer-group overrides. Application config is no longer read by `GenConsumer` directly — it is handled centrally by the client's request builder.

`latest_offset/4` and `earliest_offset/4` no longer force list_offsets v1. They use the standard version resolution like all other API calls.

### Telemetry & Observability

Built-in telemetry support for monitoring connections, requests, and consumer operations:

```elixir
:telemetry.attach(
  "kafka-handler",
  [:kafka_ex, :request, :stop],
  &MyApp.handle_event/4,
  nil
)
```

See [README.md](./README.md#telemetry--observability) for complete event reference and setup examples.

### Compression Support

Support for multiple compression formats on a per-request basis:

```elixir
# Gzip compression (built-in)
{:ok, _} = KafkaEx.API.produce(client, "topic", 0, messages, compression: :gzip)

# Supported: :gzip, :snappy, :lz4, :zstd
```

See [README.md](./README.md#compression) for details on all compression formats.

### SASL Authentication

Full SASL support including PLAIN, SCRAM-SHA-256/512, OAUTHBEARER, and AWS MSK IAM:

```elixir
# SCRAM example
config :kafka_ex,
  brokers: [{"localhost", 9292}],
  use_ssl: true,
  sasl: %{
    mechanism: :scram,
    username: "user",
    password: "pass",
    mechanism_opts: %{algo: :sha256}
  }
```

See [AUTH.md](./AUTH.md) for complete configuration examples for all authentication mechanisms.

## Getting Help

- GitHub Issues: https://github.com/kafkaex/kafka_ex/issues
- Slack: #kafkaex on elixir-lang.slack.com