README.md

KafkaEx
========

[Apache Kafka](http://kafka.apache.org/) client for Elixir/Erlang.

Usage
-----

Add KafkaEx to your mix.exs:

```elixir
{:kafka_ex, "~> 0.0.1"}
```

And run:

```
mix deps.get
```

### Configuration

In your config/config.exs add the list of kafka brokers as below:
```elixir
config KafkaEx,
  brokers: [{HOST, PORT}]
```

Alternatively from iex:
```elixir
iex> Application.put_env(KafkaEx, :brokers, [{"localhost", 9092}, {"localhost", 9093}])
:ok
```

### Create KafkaEx worker
```elixir
iex> KafkaEx.create_worker(:pr) # where :pr is the process name of the created worker
{:ok, #PID<0.171.0>}
```

### Retrieve kafka metadata
For all metadata

```elixir
iex> KafkaEx.metadata
%{brokers: %{1 => {"localhost", 9092}},
  topics: %{"foo" => %{error_code: 0,
      partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}},
      "bar" => %{error_code: 0,
      partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}}}}
```

For a specific topic

```elixir
iex> KafkaEx.metadata(topic: "foo")
%{brokers: %{1 => {"localhost", 9092}},
  topics: %{"foo" => %{error_code: 0,
      partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}}}}
```

### Retrieve the latest offset

```elixir
iex> KafkaEx.latest_offset("foo", 0) # where 0 is the partition
{:ok, %{"foo" => %{0 => %{error_code: 0, offsets: [16]}}}}
```

### Retrieve the earliest offset

```elixir
iex> KafkaEx.earliest_offset("foo", 0) # where 0 is the partition
{:ok, %{"foo" => %{0 => %{error_code: 0, offsets: [0]}}}}
```

### Fetch kafka logs

```elixir
iex> KafkaEx.fetch("foo", 0, 5) # where 0 is the partition and 5 is the offset we want to start fetching from
{:ok,
 %{"foo" => %{0 => %{error_code: 0, hw_mark_offset: 133,
       message_set: [%{attributes: 0, crc: 4264455069, key: nil, offset: 5,
          value: "hey"},
        %{attributes: 0, crc: 4264455069, key: nil, offset: 6, value: "hey"},
...]}}}}
```

### Produce kafka logs

```elixir
iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is the message
:ok
```

### Stream kafka logs

```elixir
iex> KafkaEx.create_worker([{"localhost", 9092}], :stream)
{:ok, #PID<0.196.0>}
iex> KafkaEx.produce("foo", 0, "hey", :stream)
:ok
iex> KafkaEx.produce("foo", 0, "hi", :stream)
:ok
iex> KafkaEx.stream("foo", 0) |> iex> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
 %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]
```


### Test

#### Unit tests
```
mix test --no-start
```

#### Integration tests
Add the broker config to `config/config.exs` and run:
```
mix test --only integration
```

### Static analysis

```
mix dialyze --unmatched-returns --error-handling --race-conditions --underspecs
```

### Contributing
Please see [CONTRIBUTING.md](CONTRIBUTING.md)