README.md

Cafex
=====

Cafex is a pure Elixir implementation of [Kafka][kafka] client with [ZooKeeper][zookeeper] and [Consul][consul.io] intergration.

Cafex support Kafka 0.8 and 0.9 group membership APIs.

Cafex provides all kafka APIs encapsulation, producer implementation and high-level consumer implementation.

## Producer

### Example

```elixir
iex> Application.start :cafex
iex> topic_name = "test_topic"
iex> brokers = [{"127.0.0.1", 9092}]
iex> {:ok, producer} = Cafex.start_producer topic_name, client_id: "myproducer",
                                                        brokers: brokers,
                                                        partitioner: MyPartitioner,
                                                        acks: 1,
                                                        batch_num: 100,
                                                        linger_ms: 10
iex> Cafex.produce producer, "message", key: "key"
iex> Cafex.async_produce producer, "message", key: "key"
```

### Producer options

#### `partitioner`

The partitioner for partitioning messages amongst sub-topics.
The default partitioner is `Cafex.Partitioner.Random`.

#### `client_id`

The client id is a user-specified string sent in each request to help trace
calls.  It should logically identify the application making the request.

Default `cafex_producer`.

#### `acks`

The number of acknowledgments the producer requires the leader to have received
before considering a request complete. This controls the durability of records
that are sent.

Default value is `1`.

#### `batch_num`

The number of messages to send in one batch when `linger_ms` is not zero.
The producer will wait until either this number of messages are ready to send.

#### `linger_ms`
This setting is the same as `linger.ms` config in the new official producer configs.
This setting defaults to 0 (i.e. no delay).

> NOTE: If `linger_ms` is set to `0`, the `batch_num` will not take effect.

## Consumer

### Example

```elixir
defmodule MyConsumer do
  use Cafex.Consumer

  def consume(msg, state) do
    # handle the msg
    {:ok, state}
  end
end

iex> Application.start :cafex
iex> topic_name = "test_topic"
iex> brokers = [{"127.0.0.1", 9092}]
iex> options = [client_id: "myconsumer",
                topic: topic_name,
                brokers: brokers,
                offset_storage: :kafka,
                group_manager: :kafka,
                lock: :consul,
                group_session_timeout: 7000, # ms
                auto_commit: true,
                auto_commit_interval: 500,   # ms
                auto_commit_max_buffers: 50,
                fetch_wait_time: 100,        # ms
                fetch_min_bytes: 32 * 1024,
                fetch_max_bytes: 64 * 1024,
                handler: {MyConsumer, []}]
iex> {:ok, consumer} = Cafex.start_consumer :myconsumer, options
```

The `options` argument of the function `start_consumer` can be put in the
`config/config.exs`:

```elixir
config :cafex, :myconsumer,
  client_id: "cafex",
  topic: "test_topic",
  brokers: [
    {"192.168.99.100", 9092},
    {"192.168.99.101", 9092}
  ],
  offset_storage: :kafka,
  group_manager: :kafka,
  lock: :consul,
  group_session_timeout: 7000, # ms
  auto_commit: true,
  auto_commit_interval: 500,   # ms
  auto_commit_max_buffers: 50,
  fetch_wait_time: 100,        # ms
  fetch_min_bytes: 32 * 1024,
  fetch_max_bytes: 64 * 1024,
  handler: {MyConsumer, []}
```

By default, cafex will use `:kafka` as the offset storage, use the new kafka
group membership API, which was added in the 0.9.x, as the group manager,
and use the `:consul` as the worker lock. Make suer your Kafka server is 0.9.x
or above.

But `:zookeeper` is another option for these. If you use zookeeper, the starting
options of `:erlzk` must be specified under the `:zookeeper` key:

```elixir
config :cafex, :myconsumer,
  client_id: "cafex",
  topic: "test_topic",
  brokers: [...],
  offset_storage: :zookeeper,
  group_manager: :zookeeper,
  lock: :zookeeper,
  zookeeper: [
    timeout: 5000,
    servers: [{"192.168.99.100", 2181}],
    chroot: "/cafex"
  ],
  ...
```

## TODO

* Support kafka 0.10.x.x
* Add tests

[kafka]: http://kafka.apache.org
[zookeeper]: http://zookeeper.apache.org
[consul.io]: https://consul.io