README.md

# KafkaExHelpers

Helper functions for using kafka handlers to pass topic messages to specific functions.

## Usage

Set up a topic to consume from:
```elixir
defmodule ConsumeSource do
  alias KafkaExHelpers, as: Helper

  @worker_id :worker_id_atom
  @topic_model "Model"
  @topic_action "EventName"
  @topic_version "V1"
  @topic Enum.join([@topic_model, @topic_action, @topic_version], ".")
  @partitions [0]

  @spec kafka_meta() :: %{
    worker_id: atom(), partitions: Integer.t, topic: String.t
  }
  @doc """
    This function is used to inspect metadata about this defined module as a
    kafka worker process
  """
  def kafka_meta do
    %{worker_id: @worker_id, partitions: @partitions, topic: @topic}
  end
end
```

Consumers/Producers should be auto-spawned for each partition defined

Define a message handler (do some work from a set of messages):
```elixir
defmodule MessageHandler do

  def process_batch(message_batch, _options) do
    Logger.info("Processing #{inspect(length(message_batch))} messages")
    message_batch
      |> Flow.from_enumerable()
      |> Flow.map(&Poison.decode!/1)
      |> Flow.map(&do_work/1)
      |> Flow.run()
    :created
  end

  def do_work({:ok, payload}) do
    {:noop, payload}
  end
end

```

Set up a worker to process a collection of handlers for a given project:
```elixir
require KafkaExHelpers
require KafkaEx

defmodule KafkaHandlers.Consumers do
  use GenServer
  @moduledoc """
  Kafka Workers - create workers with atoms for handling different controller
  streams
  `@workers` list of worker modules that contain interface kafka_meta
  """

  alias KafkaExHelpers, as: Helper

  @topics [
    %{
      consume: ConsumeSource,
      handler: &MessageHandler.process_batch/2,
      batch_size: 10,
      options: %{}
    }
  ]

  def start_link do
    GenServer.start_link(__MODULE__, [], name: :kafka_handlers_consumers)
  end

  def init(state) do
    Helper.start_consumers(@topics)
    {:ok, state}
  end

  def handle_info(:work, state) do
    {:noreply, state}
  end
end
```

### Produce Message


Define a  topic
```elixir
  defmodule MockTopic do

    def kafka_meta do
      %{worker_id: :mock_handler, partitions: [0], topic: 'Mock.V1'}
    end

  end
```
Start the producer workers for the topic using the defined worker_id
```elixir
  KafkaExHelpers.create_workers([MockTopic])
```

Produce a payload using the previously created woker
```elixir
  payload = %{hello: "world", id: "d1d19864-ab2e-4b65-a6b2-0368618bb706"}

  KafkaExHelpers.encode_and_publish_message(
    payload,
    payload.id,
    MockTopic.kafka_meta)
```

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `kafka_ex_helpers` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [{:kafka_ex_helpers, "~> 0.1.0"}]
end
```

Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at [https://hexdocs.pm/kafka_ex_helpers](https://hexdocs.pm/kafka_ex_helpers).