# Kaffe
[](https://hex.pm/packages/kaffe)
[](https://hexdocs.pm/kaffe/)
[](https://hex.pm/packages/kaffe)
[](https://github.com/spreedly/kaffe/blob/master/LICENSE.md)
[](https://github.com/spreedly/kaffe/commits/master)
An opinionated, highly specific, Elixir wrapper around [Brod](https://github.com/klarna/brod): the Erlang Kafka client. :coffee:
**NOTE**: Although we're using this in production at Spreedly it is still under active development. The API may change and there may be serious bugs we've yet to encounter.
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*
- [Installation](#installation)
- [Kaffe Consumer Usage](#kaffe-consumer-usage)
- [Kaffe GroupMember - Batch Message Consumer](#kaffe-groupmember---batch-message-consumer)
- [Managing how offsets are committed](#managing-how-offsets-are-committed)
- [Kaffe Consumer - Single Message Consumer (Deprecated)](#kaffe-consumer---single-message-consumer-deprecated)
- [async message acknowledgement](#async-message-acknowledgement)
- [Kaffe Producer Usage](#kaffe-producer-usage)
- [Heroku Configuration](#heroku-configuration)
- [Producing to Kafka](#producing-to-kafka)
- [Testing](#testing)
- [Setup](#setup)
- [Running](#running)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
## Installation
1. Add `kaffe` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[{:kaffe, "~> 1.0"}]
end
```
2. Configure a Kaffe Consumer and/or Producer
## Kaffe Consumer Usage
Consumers receive a list of messages and work as part of the `:brod_group_member` behavior. This has a few important benefits:
1. Group members assign a "subscriber" to each partition in the topic. Because Kafka topics scale with partitions, having a worker per partition usually increases throughput.
2. Group members correctly handle partition assignments across multiple clients in a consumer group. This means that this mode of operation will scale horizontally (e.g., multiple dynos on Heroku).
3. Downstream processing that benefits from batching (like writing to another Kafka topic) is more easily supported.
There is also legacy support for single message consumers, which process one message at a time using the `:brod_group_subscriber` behavior. This was the original mode of operation for Kaffe but is slow and does not scale. For this reason it is considered deprecated.
### Kaffe GroupMember - Batch Message Consumer
1. Define a `handle_messages/1` function in the provided module implementing the `Kaffe.MessageHandler` behaviour.
`handle_messages/1` will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.
```elixir
defmodule MessageProcessor do
@behaviour Kaffe.MessageHandler
@impl Kaffe.MessageHandler
def handle_messages(messages) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
IO.puts "#{key}: #{value}"
end
:ok # Important!
end
end
```
2. The configuration options for the `GroupMember` consumer are a superset of those for `Kaffe.Consumer`. Additional options can be found in `Kaffe.Config.Consumer`.
```elixir
config :kaffe,
consumers: %{
"subscriber_1" => [
endpoints: [kafka: 9092],
topics: ["interesting-topic"],
consumer_group: "your-app-consumer-group",
message_handler: MessageProcessor,
offset_reset_policy: :reset_to_latest,
max_bytes: 100_000,
min_bytes: 10_000,
max_wait_time: 1_000,
worker_allocation_strategy: :worker_per_topic_partition
# optional
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
}
],
"subscriber_2" => [
endpoints: [kafka: 9092],
topics: ["topic-2"],
consumer_group: "your-app-consumer-group",
message_handler: AnotherMessageHandler,
offset_reset_policy: :reset_to_latest,
max_bytes: 50_000,
worker_allocation_strategy: :worker_per_topic_partition
]
}
```
3. Add `Kaffe.GroupMemberSupervisor` as a supervisor in your supervision tree.
```elixir
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
%{
id: Kaffe.GroupMemberSupervisor.Subscriber1,
start: {Kaffe.GroupMemberSupervisor, :start_link, ["subscriber_1"]},
type: :supervisor
},
%{
id: Kaffe.GroupMemberSupervisor.Subscriber2,
start: {Kaffe.GroupMemberSupervisor, :start_link, ["subscriber_2"]},
type: :supervisor
}
]
opts = [strategy: :one_for_one, name: MyApp.Application.Supervisor]
Supervisor.start_link(children, opts)
end
end
```
#### Managing how offsets are committed
In some cases you may not want to commit back the most recent offset after processing a list of messages. For example, if you're batching messages to be sent elsewhere and want to ensure that a batch can be rebuilt should there be an error further downstream. In that example you might want to keep the offset of the first message in your batch so your consumer can restart back at that point to reprocess and rebatch the messages.
Your message handler can respond in the following ways to manage how offsets are committed back:
`:ok` - commit back the most recent offset and request more messages
`{:ok, :no_commit}` - do _not_ commit back the most recent offset and request more messages from the offset of the last message
`{:ok, offset}` - commit back at the offset specified and request messages from that point forward
Example:
```elixir
defmodule MessageProcessor do
@behaviour Kaffe.MessageHandler
@impl Kaffe.MessageHandler
def handle_messages(messages) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
IO.puts "#{key}: #{value}"
end
{:ok, :no_commit}
end
end
```
### Kaffe Consumer - Single Message Consumer (Deprecated)
_For backward compatibility only! `Kaffe.GroupMemberSupervisor` is recommended instead!_
1. Add a `handle_message/1` function to a local module (e.g. `MessageProcessor`). This function will be called with each Kafka message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.
The module's `handle_message/1` function _must_ return `:ok` or Kaffe will throw an error. In normal (synchronous consumer) operation the Kaffe consumer will block until your `handle_message/1` function returns `:ok`.
### Example
```elixir
defmodule MessageProcessor do
def handle_message(%{key: key, value: value} = message) do
IO.inspect message
IO.puts "#{key}: #{value}"
:ok # The handle_message function MUST return :ok
end
end
```
### Message Structure
```elixir
%{
attributes: 0,
crc: 1914336469,
key: "kafka message key",
magic_byte: 0,
offset: 41,
partition: 17,
topic: "some-kafka-topic",
value: "the actual kafka message value is here",
ts: 1234567890123, # timestamp in milliseconds
ts_type: :append # timestamp type: :undefined | :create | :append
}
```
2. Configure your Kaffe Consumer in your mix config
```elixir
config :kaffe,
consumer: [
endpoints: [kafka: 9092], # that's [hostname: kafka_port]
topics: ["interesting-topic"], # the topic(s) that will be consumed
consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
message_handler: MessageProcessor, # the module from Step 1 that will process messages
# optional
async_message_ack: false, # see "async message acknowledgement" below
start_with_earliest_message: true # default false
],
```
The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted, your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.
### Heroku Configuration
To configure a Kaffe Consumer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true`
```elixir
config :kaffe,
consumer: [
heroku_kafka_env: true,
topics: ["interesting-topic"],
consumer_group: "your-app-consumer-group",
message_handler: MessageProcessor
]
```
With that setting in place Kaffe will automatically pull required info from the following ENV variables:
- `KAFKA_URL`
- `KAFKA_CLIENT_CERT`
- `KAFKA_CLIENT_CERT_KEY`
- `KAFKA_TRUSTED_CERT` (not used yet)
3. Add `Kaffe.Consumer` as a worker in your supervision tree
```elixir
worker(Kaffe.Consumer, [])
```
#### async message acknowledgement
If you need asynchronous message consumption:
1. Add a `handle_message/2` function to your processing module. This function will be called with the Consumer `pid` and the Kafka message. When your processing is complete you will need to call `Kaffe.Consumer.ack(pid, message)` to acknowledge the offset.
2. Set `async` to true when you start the Kaffe.Consumer
```elixir
consumer_group = "demo-commitlog-consumer"
topic = "commitlog"
message_handler = MessageProcessor
async = true
worker(Kaffe.Consumer, [consumer_group, topics, message_handler, async])
# … in your message handler module
def handle_message(pid, message) do
spawn_message_processing_worker(pid, message)
:ok # MUST return :ok
end
# … somewhere in your system when the worker is finished processing
Kaffe.Consumer.ack(pid, message)
```
**NOTE**: Asynchronous consumption means your system will no longer provide any backpressure to the Kaffe.Consumer. You will also need to add robust measures to your system to ensure that no messages are lost in processing. I.e., if you spawn 5 workers processing a series of asynchronous messages from Kafka and 1 of them crashes without acknowledgement then it's possible and likely that the message will be skipped entirely.
Kafka only tracks a single numeric offset, not individual messages. If a message fails and a later offset is committed then the failed message will _not_ be sent again.
It's possible that your topic and system are entirely ok with losing some messages (i.e. frequent metrics that aren't individually important).
## Kaffe Producer Usage
`Kaffe.Producer` handles producing messages to Kafka and will automatically select the topic partitions per message or can be given a function to call to determine the partition per message. Kaffe automatically inserts a Kafka timestamp with each message.
Configure your Kaffe Producer in your mix config. For all options, see `Kaffe.Config.Producer`.
```elixir
config :kaffe,
producer: [
endpoints: [kafka: 9092], # [hostname: port]
topics: ["kafka-topic"],
# optional
partition_strategy: :md5,
ssl: true,
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
}
]
```
The `partition_strategy` setting can be one of:
- `:md5`: (default) provides even and deterministic distribution of the messages over the available partitions based on an MD5 hash of the key
- `:random`: select a random partition for each message
- function: a given function to call to determine the correct partition
You can also set any of the Brod producer configuration options in the `producer` section - see [the Brod sources](https://github.com/klarna/brod/blob/master/src/brod_producer.erl#L90) for a list of keys and their meaning.
If the Kafka broker is configured with `SASL_PLAINTEXT` auth, the `sasl` option can be added.
If using Confluent Hosted Kafka, also add `ssl: true` as shown above.
## Heroku Configuration
To configure a Kaffe Producer for a Heroku Kafka compatible environment, including SSL, omit the `endpoint` and instead set `heroku_kafka_env: true`
```elixir
config :kaffe,
producer: [
heroku_kafka_env: true,
topics: ["kafka-topic"],
# optional
partition_strategy: :md5
]
```
With that setting in place Kaffe will automatically pull required info from the following ENV variables:
- `KAFKA_URL`
- `KAFKA_CLIENT_CERT`
- `KAFKA_CLIENT_CERT_KEY`
- `KAFKA_TRUSTED_CERT`
## Producing to Kafka
Currently only synchronous message production is supported.
There are several ways to produce:
- `topic`/`message_list` - Produce each message in the list to the given `topic`. The messages are produced to the correct partition based on the configured partitioning strategy.
Each item in the list is a tuple of the key and value: `{key, value}`.
```elixir
Kaffe.Producer.produce_sync("topic", [{"key1", "value1"}, {"key2", "value2"}])
```
- `topic`/`partition`/`message_list` - Produce each message in the list to the given `topic`/`partition`.
Each item in the list is a tuple of the key and value: `{key, value}`.
```elixir
Kaffe.Producer.produce_sync("topic", 2, [{"key1", "value1"}, {"key2", "value2"}])
```
- `key`/`value` - The key/value will be produced to the first topic given to the producer when it was started. The partition will be selected with the chosen strategy or given function.
```elixir
Kaffe.Producer.produce_sync("key", "value")
```
- `topic`/`key`/`value` - The key/value will be produced to the given topic.
```elixir
Kaffe.Producer.produce_sync("whitelist", "key", "value")
```
- `topic`/`partition`/`key`/`value` - The key/value will be produced to the given topic/partition.
```elixir
Kaffe.Producer.produce_sync("whitelist", 2, "key", "value")
```
**NOTE**: With this approach Kaffe will not calculate the next partition since it assumes you're taking over that job by giving it a specific partition.
## Testing
### Setup
In order to run the end-to-end tests, a Kafka topic is required. It must:
* be named `kaffe-test`
* have 32 partitions
If using the `kafka-topics.sh` script that comes with the Kafka distribution, you may use something like:
```bash
kafka-topics.sh --zookeeper localhost:2181 --create --partitions 32 --replication-factor 1 --topic kaffe-test
```
### Running
```bash
# unit tests
mix test
# end to end test
mix test --only e2e
```
## Copyright and License
Copyright (c) 2017 Spreedly, Inc.
This software is released under the [MIT License](./LICENSE.md).