# Kafka Transport Guide
This guide covers everything you need to connect `phoenix_micro` to Apache
Kafka — from a local Docker setup to production multi-broker clusters.
## Why no Kafka dep in phoenix_micro?
`phoenix_micro` lists **zero** Kafka dependencies in its own `mix.exs`.
This is intentional: every available Kafka client for Elixir pulls in native C
or C++ code that requires a compiler toolchain:
| Client | Native dep | Needed for |
| ------------------ | ----------------- | ------------------ |
| `kafka_ex ~> 0.13` | `:crc32cer` (C) | CRC32 checksums |
| `brod ~> 3.17` | `:snappyer` (C++) | Snappy compression |
On **Windows** without Visual Studio Build Tools, these cause the `cl.exe not
found` error you may have seen. By keeping them out of `phoenix_micro`'s deps,
the library compiles cleanly everywhere. You add the Kafka client only to _your_
application, only when you actually need Kafka.
---
## Step-by-step setup
### Step 1 — Choose a client
Add exactly one of these to **your application's** `mix.exs`:
```elixir
# Option A — kafka_ex (works on all platforms with build tools)
{:kafka_ex, "~> 0.13"}
# Option B — brod (Linux/macOS only)
{:brod, "~> 3.17"}
```
> #### Windows + kafka_ex {: .tip}
>
> `kafka_ex` needs `:crc32cer` which is written in C. To compile it on Windows:
>
> 1. Download [Visual Studio Build Tools 2022](https://visualstudio.microsoft.com/downloads/#build-tools-for-visual-studio-2022)
> 2. Run the installer and select **"Desktop development with C++"**
> 3. Restart your terminal
> 4. Run `mix deps.compile crc32cer --force`
>
> Alternatively, use **RabbitMQ**, **NATS**, or **Redis Streams** — they
> compile on Windows with no C toolchain at all.
### Step 2 — Configure kafka_ex
```elixir
# config/config.exs
config :kafka_ex,
brokers: [{"localhost", 9092}],
consumer_group: "my_app_consumers",
use_ssl: false,
required_acks: 1,
ack_timeout_ms: 3_000,
# Use "kayrock" for Kafka 0.11+ (recommended)
# Use "0.9.0" for older Kafka clusters
kafka_version: "kayrock",
# How long to wait for messages when polling (ms)
fetch_wait_max_ms: 100
```
### Step 3 — Configure PhoenixMicro
```elixir
# config/config.exs
config :phoenix_micro,
transport: :kafka,
transports: [
kafka: [
# URL form — easiest to read
url: "kafka://localhost:9092",
# Or explicit list (choose one, not both)
# brokers: [{"localhost", 9092}],
group_id: "my_app_consumers", # consumer group
client_id: "my_app", # identifies this app in Kafka
begin_offset: :latest # :latest | :earliest | integer
]
]
```
### Step 4 — Start a local Kafka
```yaml
# docker-compose.yml
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_LOG_RETENTION_HOURS: 24
```
```bash
docker-compose up -d
# Check it's running:
docker-compose logs kafka | grep "started"
```
### Step 5 — Define a consumer
```elixir
defmodule MyApp.Consumers.PaymentConsumer do
use PhoenixMicro.Consumer
topic "payments.created"
transport :kafka
concurrency 5
retry max_attempts: 3,
base_delay: 1_000,
max_delay: 30_000,
jitter: true
middleware [
PhoenixMicro.Middleware.Logger,
PhoenixMicro.Middleware.Metrics
]
dead_letter_topic "payments.created.dlq"
def handle(message, _ctx) do
%{"payment_id" => id, "amount_cents" => amount} = message.payload
MyApp.Payments.record(id, amount)
:ok
end
end
```
### Step 6 — Register in config
```elixir
# config/config.exs
config :phoenix_micro,
consumers: [MyApp.Consumers.PaymentConsumer]
```
Or register dynamically at runtime:
```elixir
{:ok, _pid} = PhoenixMicro.register_consumer(MyApp.Consumers.PaymentConsumer)
```
---
## Publishing to Kafka
```elixir
# Async (default)
PhoenixMicro.publish("payments.created", %{
payment_id: "pay_abc123",
amount_cents: 9999,
currency: "USD"
})
# Sync — waits for broker acknowledgment
:ok = PhoenixMicro.publish_sync("payments.created", %{payment_id: "pay_abc123"})
# With options
PhoenixMicro.publish("payments.created", payload,
partition: 2, # target partition (default: 0)
sync: true # wait for ack
)
# Batch publish
PhoenixMicro.publish_batch([
{"payments.created", %{payment_id: "pay_1", amount_cents: 100}},
{"payments.created", %{payment_id: "pay_2", amount_cents: 200}},
{"orders.placed", %{order_id: "ord_1"}}
])
```
---
## Multi-broker production setup
```elixir
# config/runtime.exs
config :kafka_ex,
brokers: [
{System.get_env("KAFKA_BROKER_1", "b1.example.com"), 9092},
{System.get_env("KAFKA_BROKER_2", "b2.example.com"), 9092},
{System.get_env("KAFKA_BROKER_3", "b3.example.com"), 9092}
],
consumer_group: System.get_env("KAFKA_GROUP_ID", "my_app_prod"),
use_ssl: System.get_env("KAFKA_USE_SSL", "false") == "true",
required_acks: 1,
ack_timeout_ms: 5_000,
kafka_version: "kayrock"
config :phoenix_micro,
transport: :kafka,
transports: [
kafka: [
# Multi-broker URL (comma-separated)
url: System.get_env("KAFKA_URL", "kafka://b1.example.com:9092,b2.example.com:9092"),
group_id: System.get_env("KAFKA_GROUP_ID", "my_app_prod"),
client_id: System.get_env("KAFKA_CLIENT_ID", "my_app"),
begin_offset: :latest,
pool_size: String.to_integer(System.get_env("KAFKA_POOL_SIZE", "10"))
]
]
```
---
## Dead-letter queue
Messages that fail after all retry attempts are published to
`<original_topic>.dlq` automatically:
```elixir
defmodule MyApp.Consumers.PaymentDLQ do
use PhoenixMicro.Consumer
topic "payments.created.dlq"
transport :kafka
concurrency 2
def handle(message, _ctx) do
# Inspect why it failed
reason = message.headers["x-nack-reason"]
MyApp.Alerting.notify(:dlq_message, %{topic: message.topic, reason: reason})
MyApp.Repo.insert!(%MyApp.DeadLetter{message_id: message.id, reason: reason})
:ok
end
end
```
---
## Schema validation with Kafka
Combine the Schema DSL with Kafka to enforce contracts at the consumer:
```elixir
defmodule MyApp.Events.PaymentCreated do
use PhoenixMicro.Schema
schema_version 2
topic "payments.created"
field :payment_id, :string, required: true
field :amount_cents, :integer, required: true
field :currency, :string, required: true, default: "USD"
compatible_with [1]
def migrate(1, payload) do
amount = Map.get(payload, "amount", 0)
payload |> Map.delete("amount") |> Map.put("amount_cents", round(amount * 100))
end
end
defmodule MyApp.Consumers.PaymentConsumer do
use PhoenixMicro.Consumer
topic "payments.created"
transport :kafka
def handle(message, _ctx) do
# Auto-migrates v1 payloads to v2 before validation
case PhoenixMicro.Schema.decode(MyApp.Events.PaymentCreated, message.payload, message.headers) do
{:ok, event} ->
process(event)
:ok
{:error, {:schema_validation_failed, errors}} ->
Logger.warning("Schema invalid: #{inspect(errors)}")
:nack # → DLQ immediately, no retry
end
end
end
```
---
## Testing with Kafka
Use Memory transport in tests — no Kafka broker needed:
```elixir
# config/test.exs
config :phoenix_micro,
transport: :memory,
consumers: []
```
```elixir
defmodule MyApp.PaymentConsumerTest do
use ExUnit.Case, async: false
alias PhoenixMicro.{Consumer, Message}
setup do
start_supervised!({PhoenixMicro.Transport.Memory, []})
:ok
end
test "handles valid payment" do
msg = Message.new("payments.created", %{
"payment_id" => "pay_1",
"amount_cents" => 999,
"currency" => "USD"
})
ctx = %{
transport: :memory,
topic: msg.topic,
attempt: 1,
transport_mod: PhoenixMicro.Transport.Memory,
message: msg
}
assert :ok = Consumer.dispatch(MyApp.Consumers.PaymentConsumer, msg, ctx)
end
test "routes invalid schema to DLQ" do
msg = Message.new("payments.created", %{"bad" => "payload"})
ctx = %{transport: :memory, topic: msg.topic, attempt: 1,
transport_mod: PhoenixMicro.Transport.Memory, message: msg}
# Consumer should nack invalid schema
assert :nack = Consumer.dispatch(MyApp.Consumers.PaymentConsumer, msg, ctx)
# Verify DLQ received it
dlq_msgs = PhoenixMicro.Transport.Memory.dlq_messages()
assert length(dlq_msgs) == 1
end
end
```
---
## Troubleshooting
### `cl.exe not found` (Windows)
```
** (Mix) Could not compile dependency :crc32cer
```
**Fix:** Install [Visual Studio Build Tools 2022](https://visualstudio.microsoft.com/downloads/#build-tools-for-visual-studio-2022),
select **"Desktop development with C++"**, restart terminal, run:
```bash
mix deps.compile crc32cer --force
```
**Easier fix:** Use RabbitMQ, NATS, or Redis Streams instead:
```elixir
config :phoenix_micro, transport: :rabbitmq
```
### `No Kafka client found` warning at startup
```
[PhoenixMicro.Transport.Kafka] No Kafka client found.
```
You configured `:kafka` as the transport but didn't add `kafka_ex` or `brod`
to your app's `mix.exs`. Add one:
```elixir
{:kafka_ex, "~> 0.13"}
```
### Connection timeouts
```
[Kafka] Connect failed: :timeout, retrying in 3142ms
```
Kafka is not reachable. Check:
```bash
# Is Kafka running?
docker ps | grep kafka
# Can you reach port 9092?
telnet localhost 9092
# Check advertised listeners match what you're connecting to
docker logs kafka | grep ADVERTISED
```
### Messages not being received
1. Check `begin_offset: :earliest` to read from the start of the topic
2. Verify the consumer group ID matches your `config :kafka_ex, consumer_group:`
3. Ensure the topic exists: `docker exec kafka kafka-topics --list --bootstrap-server localhost:9092`
### Creating topics manually
```bash
docker exec kafka kafka-topics \
--create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 3 \
--topic payments.created
```