README.md

# KafkaExHelpers

Helper functions for using kafka handlers to pass topic messages to specific functions.

## Usage

Set up a topic to consume from:
```elixir
defmodule ConsumeSource do
  alias KafkaExHelpers, as: Helper

  @worker_id :worker_id_atom
  @topic_model "Model"
  @topic_action "EventName"
  @topic_version "V1"
  @topic Enum.join([@topic_model, @topic_action, @topic_version], ".")
  @partitions [0]

  @spec kafka_meta() :: %{
    worker_id: atom(), partitions: Integer.t, topic: String.t
  }
  @doc """
    This function is used to inspect metadata about this defined module as a
    kafka worker process
  """
  def kafka_meta do
    %{worker_id: @worker_id, partitions: @partitions, topic: @topic}
  end
end
```

Consumers/Producers should be auto-spawned for each partition defined

Define a message handler (do some work from a set of messages):
```elixir
defmodule MessageHandler do

  def process_batch(message_batch, _options) do
    Logger.info("Processing #{inspect(length(message_batch))} messages")
    message_batch
      |> Flow.from_enumerable()
      |> Flow.map(&Poison.decode!/1)
      |> Flow.map(&do_work/1)
      |> Flow.run()
    :created
  end

  def do_work({:ok, payload}) do
    {:noop, payload}
  end
end

```

Set up a worker to process a collection of handlers for a given project:
```elixir
require KafkaExHelpers
require KafkaEx

defmodule KafkaHandlers.Consumers do
  use GenServer
  @moduledoc """
  Kafka Workers - create workers with atoms for handling different controller
  streams
  `@workers` list of worker modules that contain interface kafka_meta
  """

  alias KafkaExHelpers, as: Helper

  @topics [
    %{
      consume: ConsumeSource,
      handler: &MessageHandler.process_batch/2,
      batch_size: 10,
      options: %{}
    }
  ]

  def start_link do
    GenServer.start_link(__MODULE__, [], name: :kafka_handlers_consumers)
  end

  def init(state) do
    Helper.start_consumers(@topics)
    {:ok, state}
  end

  def handle_info(:work, state) do
    {:noreply, state}
  end
end
```

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `kafka_ex_helpers` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [{:kafka_ex_helpers, "~> 0.1.0"}]
end
```

Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at [https://hexdocs.pm/kafka_ex_helpers](https://hexdocs.pm/kafka_ex_helpers).