
[Apache Kafka]( client for Elixir/Erlang.


Add KafkaEx to your mix.exs:

{:kafka_ex, "~> 0.0.1"}

And run:

mix deps.get

### Configuration

In your config/config.exs add the list of kafka brokers as below:
config KafkaEx,
  brokers: [{HOST, PORT}]

Alternatively from iex:
iex> Application.put_env(KafkaEx, :brokers, [{"localhost", 9092}, {"localhost", 9093}])

### Create KafkaEx worker
iex> KafkaEx.create_worker(:pr) # where :pr is the process name of the created worker
{:ok, #PID<0.171.0>}

### Retrieve kafka metadata
For all metadata

iex> KafkaEx.metadata
%{brokers: %{1 => {"localhost", 9092}},
  topics: %{"foo" => %{error_code: 0,
      partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}},
      "bar" => %{error_code: 0,
      partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}}}}

For a specific topic

iex> KafkaEx.metadata(topic: "foo")
%{brokers: %{1 => {"localhost", 9092}},
  topics: %{"foo" => %{error_code: 0,
      partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}}}}

### Retrieve the latest offset

iex> KafkaEx.latest_offset("foo", 0) # where 0 is the partition
{:ok, %{"foo" => %{0 => %{error_code: 0, offsets: [16]}}}}

### Retrieve the earliest offset

iex> KafkaEx.earliest_offset("foo", 0) # where 0 is the partition
{:ok, %{"foo" => %{0 => %{error_code: 0, offsets: [0]}}}}

### Fetch kafka logs

iex> KafkaEx.fetch("foo", 0, 5) # where 0 is the partition and 5 is the offset we want to start fetching from
 %{"foo" => %{0 => %{error_code: 0, hw_mark_offset: 133,
       message_set: [%{attributes: 0, crc: 4264455069, key: nil, offset: 5,
          value: "hey"},
        %{attributes: 0, crc: 4264455069, key: nil, offset: 6, value: "hey"},

### Produce kafka logs

iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is the message

### Stream kafka logs

iex> KafkaEx.create_worker([{"localhost", 9092}], :stream)
{:ok, #PID<0.196.0>}
iex> KafkaEx.produce("foo", 0, "hey", :stream)
iex> KafkaEx.produce("foo", 0, "hi", :stream)
iex>"foo", 0) |> iex> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
 %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]

### Test

#### Unit tests
mix test --no-start

#### Integration tests
Add the broker config to `config/config.exs` and run:
mix test --only integration

### Static analysis

mix dialyze --unmatched-returns --error-handling --race-conditions --underspecs

### Contributing
Please see [](