# KafkaExHelpers
Helper functions for using kafka handlers to pass topic messages to specific functions.
## Usage
Set up a topic to consume from:
```elixir
defmodule ConsumeSource do
alias KafkaExHelpers, as: Helper
@worker_id :worker_id_atom
@topic_model "Model"
@topic_action "EventName"
@topic_version "V1"
@topic Enum.join([@topic_model, @topic_action, @topic_version], ".")
@partitions [0]
@spec kafka_meta() :: %{
worker_id: atom(), partitions: Integer.t, topic: String.t
}
@doc """
This function is used to inspect metadata about this defined module as a
kafka worker process
"""
def kafka_meta do
%{worker_id: @worker_id, partitions: @partitions, topic: @topic}
end
end
```
Consumers/Producers should be auto-spawned for each partition defined
Define a message handler (do some work from a set of messages):
```elixir
defmodule MessageHandler do
def process_batch(message_batch, _options) do
Logger.info("Processing #{inspect(length(message_batch))} messages")
message_batch
|> Flow.from_enumerable()
|> Flow.map(&Poison.decode!/1)
|> Flow.map(&do_work/1)
|> Flow.run()
:created
end
def do_work({:ok, payload}) do
{:noop, payload}
end
end
```
Set up a worker to process a collection of handlers for a given project:
```elixir
require KafkaExHelpers
require KafkaEx
defmodule KafkaHandlers.Consumers do
use GenServer
@moduledoc """
Kafka Workers - create workers with atoms for handling different controller
streams
`@workers` list of worker modules that contain interface kafka_meta
"""
alias KafkaExHelpers, as: Helper
@topics [
%{
consume: ConsumeSource,
handler: &MessageHandler.process_batch/2,
batch_size: 10,
options: %{}
}
]
def start_link do
GenServer.start_link(__MODULE__, [], name: :kafka_handlers_consumers)
end
def init(state) do
Helper.start_consumers(@topics)
{:ok, state}
end
def handle_info(:work, state) do
{:noreply, state}
end
end
```
### Produce Message
Define a topic
```elixir
defmodule MockTopic do
def kafka_meta do
%{worker_id: :mock_handler, partitions: [0], topic: 'Mock.V1'}
end
end
```
Start the producer workers for the topic using the defined worker_id
```elixir
KafkaExHelpers.create_workers([MockTopic])
```
Produce a payload using the previously created woker
```elixir
payload = %{hello: "world", id: "d1d19864-ab2e-4b65-a6b2-0368618bb706"}
KafkaExHelpers.encode_and_publish_message(
payload,
payload.id,
MockTopic.kafka_meta)
```
## Installation
If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `kafka_ex_helpers` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[{:kafka_ex_helpers, "~> 0.1.0"}]
end
```
Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at [https://hexdocs.pm/kafka_ex_helpers](https://hexdocs.pm/kafka_ex_helpers).