defmodule OffBroadwayKlife.Producer do
@producer_opts [
client: [
type: {:custom, __MODULE__, :validate_client, []},
doc:
"A module that `use`s `Klife.Client`. The producer starts the default consumer group module " <>
"on this client (a single group membership) and drives it in manual mode. All " <>
"`:client` pipelines in a node must use the same client." <>
". Exactly one of `:client` or `:consumer_group` " <>
"must be set."
],
consumer_group: [
type: {:custom, __MODULE__, :validate_consumer_group, []},
doc:
"A module that `use`s `Klife.Consumer.ConsumerGroup`, as an alternative to " <>
"`:client` for when you also want the group's lifecycle callbacks. Started " <>
"once by the producer (a single group membership) and driven in manual mode."
],
group_name: [
type: :string,
required: true,
doc: "The Kafka consumer group name."
],
topics: [
type: {:list, :keyword_list},
required: true,
doc:
"List of `Klife.Consumer.ConsumerGroup.TopicConfig` keyword lists, e.g. " <>
"`[[name: \"orders\"], [name: \"events\", fetch_max_bytes: 500_000]]`. " <>
"Every topic is forced into `mode: :manual`."
],
receive_interval: [
type: :non_neg_integer,
default: 1_000,
doc: "Milliseconds to wait before polling Klife again after a poll returned no records."
],
message_format: [
type: {:in, [:klife, :broadway_kafka]},
default: :klife,
doc: "Shape of the emitted `Broadway.Message`s. See the \"Message format\" section below."
],
fetch_strategy: [
type: :any,
doc: "Forwarded to the consumer group. See `Klife.Consumer.ConsumerGroup`."
],
committers_count: [
type: :pos_integer,
doc: "Forwarded to the consumer group. See `Klife.Consumer.ConsumerGroup`."
],
default_topic_config: [
type: :keyword_list,
doc: "Forwarded to the consumer group. See `Klife.Consumer.ConsumerGroup`."
],
instance_id: [
type: :string,
doc:
"Forwarded to the consumer group (static membership). See `Klife.Consumer.ConsumerGroup`."
],
rebalance_timeout_ms: [
type: :non_neg_integer,
doc: "Forwarded to the consumer group. See `Klife.Consumer.ConsumerGroup`."
]
]
@moduledoc """
A Broadway producer that consumes from Kafka through
[Klife](https://hexdocs.pm/klife)'s consumer group *manual mode*.
Klife runs the full consumer-group machinery, heartbeats, rebalances, fetching and buffering,
while this producer drives delivery: it `pull`s buffered batches on demand, turns each `Klife.Record`
into a `Broadway.Message`, and `commit`s offsets back as Broadway acknowledges them.
## Usage
Define a `Klife.Client` (see Klife's docs for client configuration) and make
sure it is started in your supervision tree *before* the pipeline:
defmodule MyApp.KafkaClient do
use Klife.Client, otp_app: :my_app
end
Then start Broadway with this producer, pointing it at the client:
defmodule MyApp.Pipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{OffBroadwayKlife.Producer,
client: MyApp.KafkaClient,
group_name: "my-broadway-group",
topics: [[name: "orders"], [name: "events"]],
receive_interval: 500},
concurrency: 1
],
processors: [default: [concurrency: 10]]
)
end
@impl true
def handle_message(_processor, message, _context) do
IO.inspect(message.data)
message
end
end
The producer starts its built-in consumer group, on the client and supervises it as part of the pipeline, so there is no
consumer group to define or add to your supervision tree. The built-in group
is bound to the client on first start, which means all `:client` pipelines in
a node must use the same Klife client.
### Using a consumer group module
To run Klife's consumer lifecycle callbacks (`handle_consumer_start/3` and
`handle_consumer_stop/4`) alongside Broadway — or to consume through more
than one Klife client — define a consumer group module and pass it as
`:consumer_group` instead of `:client` (the two options are mutually
exclusive):
defmodule MyApp.KafkaConsumerGroup do
use Klife.Consumer.ConsumerGroup, client: MyApp.KafkaClient
@impl true
def handle_consumer_start(_topic, _partition, _group_name) do
# e.g. emit telemetry
:ok
end
end
producer: [
module:
{OffBroadwayKlife.Producer,
consumer_group: MyApp.KafkaConsumerGroup,
group_name: "my-broadway-group",
topics: [[name: "orders"]]}
]
Do not implement `handle_record_batch/4`: Broadway drives fetching and
committing through Klife's manual mode, so that callback never runs. The
producer starts the group in either case — do not add it to your supervision
tree.
## Options
#{NimbleOptions.docs(@producer_opts)}
## Message format
`:message_format` controls the shape of each `Broadway.Message`:
- `:klife` (default) - `message.data` is the full `Klife.Record` struct, the
same type used across Klife's produce and fetch APIs. It carries everything
(value, key, headers as maps, `timestamp`, `consumer_attempts`,
`batch_attributes`, ...), and `message.metadata` is empty. Prefer this for
new pipelines: it is lossless and consistent with the rest of Klife.
def handle_message(_, %{data: %Klife.Record{value: value}} = msg, _),
do: msg
- `:broadway_kafka` - `message.data` is the raw value and `message.metadata`
mirrors [broadway_kafka](https://hexdocs.pm/broadway_kafka): `%{topic,
partition, offset, key, ts, headers}` with headers as `{key, value}` tuples.
Use this to drop `OffBroadwayKlife.Producer` into an existing broadway_kafka
pipeline without changing `handle_message/3`.
Either way, routing, batching, acknowledgement and offset commits are
identical — only the user-facing message shape changes.
## Producer concurrency
The Klife consumer group is started once and keeps a single membership no
matter how many producers run. You may set producer `concurrency > 1`: each
assigned `{topic, partition}` is claimed by exactly one producer via a stable
hash, so producers share the pull/commit load with no overlap and no extra group
members with no coordinator process overhead. For maximum concurrency raise it
up to the expected number of assigned partitions for a given member of the group,
any exceeding producer will stay idle.
Raise `concurrency` up to the number of partitions you expect to be
assigned to the application; producers beyond assigned stay idle.
## Ordering
Kafka guarantees ordering per topic-partition. The connector preserves it end
to end: each partition is pulled by exactly one producer, which emits its
records in offset order, and it sets Broadway's `:partition_by` so that every
record of a given `{topic, partition}` is always routed to the same processor
(and batcher) stage. Records of different partitions still process
concurrently.
When you use a batcher, each batch also holds a single partition's records:
the connector defaults `:batch_key` to `{topic, partition}`, so a
`handle_batch/4` call maps to one partition's contiguous offset range.
You may override `:batch_key` (or `:batcher`) in `handle_message/3`:
- *Coarsening* it (e.g. to the topic, or leaving it `:default`) packs records
from several partitions into one `handle_batch` call, giving fuller batches
and fewer round-trips when a node owns many low-volume partitions.
Per-partition ordering is still preserved — the batch just interleaves
partitions, so group by `partition` inside `handle_batch` if your logic
needs to.
- *Refining* it (a sub-partition key) or routing to multiple `:batcher`s
re-splits a partition across independently-flushing batches, which **gives
up strict per-partition ordering**. Use it only when per-key (not
per-partition) ordering is enough, or to fan message types out to different
sinks (e.g. a dead-letter batcher).
Offset commits stay correct in every case regardless of how records are batched.
Because the connector manages `:partition_by`, you must not set it yourself —
doing so raises. Scale out across partitions with processor/batcher
concurrency.
## Delivery semantics
Klife provides at-least-once delivery and this producer preserves it: an
offset is only committed once it and every lower delivered offset on the
same partition* have been acknowledged by Broadway.
Because Kafka tracks a single committed offset per partition, a failed
message cannot be skipped while committing past it. Both successful and
failed messages therefore advance the offset; handle failures explicitly via
`c:Broadway.handle_failed/2` (for example, by producing to a dead-letter
topic) rather than relying on them blocking the partition.
"""
use GenStage
require Logger
alias Broadway.Message
alias OffBroadwayKlife.OffsetTracker
@behaviour Broadway.Producer
@behaviour Broadway.Acknowledger
# Subset of validated options that are forwarded to the consumer group's
# start_link/1 (Klife validates them in full there).
@consumer_group_passthrough [
:instance_id,
:rebalance_timeout_ms,
:fetch_strategy,
:committers_count,
:default_topic_config
]
@impl Broadway.Producer
def prepare_for_start(_module, broadway_opts) do
{producer_module, producer_opts} = broadway_opts[:producer][:module]
opts = NimbleOptions.validate!(producer_opts, @producer_opts)
{cg_mod, client_args} = resolve_cg!(opts)
cg_args =
[group_name: opts[:group_name], topics: force_manual_mode(opts[:topics])] ++
client_args ++ Keyword.take(opts, @consumer_group_passthrough)
cg_child = %{
id: {cg_mod, opts[:group_name]},
start: {cg_mod, :start_link, [cg_args]},
restart: :permanent,
type: :worker
}
# Each producer needs the pool size to claim its share of partitions (owns?/3);
# carry the validated opts (defaults applied, :consumer_group resolved)
# forward to init/1.
producer_count = broadway_opts[:producer][:concurrency] || 1
init_opts =
opts
|> Keyword.put(:producer_count, producer_count)
|> Keyword.put(:consumer_group, cg_mod)
producer_config =
Keyword.put(broadway_opts[:producer], :module, {producer_module, init_opts})
broadway_opts =
broadway_opts
|> Keyword.put(:producer, producer_config)
|> put_partition_by()
{[cg_child], broadway_opts}
end
@impl GenStage
def init(opts) do
broadway = Keyword.fetch!(opts, :broadway)
state = %{
consumer_group: Keyword.fetch!(opts, :consumer_group),
group_name: Keyword.fetch!(opts, :group_name),
receive_interval: Keyword.fetch!(opts, :receive_interval),
message_format: Keyword.fetch!(opts, :message_format),
producer_index: Keyword.fetch!(broadway, :index),
producer_count: Keyword.fetch!(opts, :producer_count),
demand: 0,
receive_timer: nil,
ack_ref: self(),
offset_tracker: OffsetTracker.new()
}
{:producer, state}
end
@impl GenStage
def handle_demand(incoming_demand, %{demand: demand} = state) do
handle_receive_messages(%{state | demand: demand + incoming_demand})
end
@impl GenStage
def handle_info(:receive_messages, %{receive_timer: nil} = state) do
{:noreply, [], state}
end
def handle_info(:receive_messages, state) do
handle_receive_messages(%{state | receive_timer: nil})
end
def handle_info({__MODULE__, :processed, tp_offsets}, state) do
{tracker, commits} = OffsetTracker.done(state.offset_tracker, tp_offsets)
Enum.each(commits, fn {{topic, partition}, offset} ->
commit(state, topic, partition, offset)
end)
{:noreply, [], %{state | offset_tracker: tracker}}
end
def handle_info(_msg, state) do
{:noreply, [], state}
end
@impl Broadway.Acknowledger
def ack(ack_ref, successful, failed) do
# Both successful and failed messages advance the partition offset; see the
# moduledoc for why. Failures are reported via handle_failed/2 upstream.
tp_offsets = Enum.map(successful ++ failed, &message_tp_offset/1)
send(ack_ref, {__MODULE__, :processed, tp_offsets})
:ok
end
## Internal
defp handle_receive_messages(%{receive_timer: nil, demand: demand} = state) when demand > 0 do
{messages, state} = receive_messages_from_klife(state, demand)
{:noreply, messages, state}
end
defp handle_receive_messages(state) do
{:noreply, [], state}
end
defp receive_messages_from_klife(state, total_demand) do
# pull_round makes a single in-order pass and stops once demand is met, so
# without shuffling the partitions at the head of the list would always be
# served first and the tail could starve. A fresh shuffle each round keeps
# fetching fair across the assigned partitions.
partitions = state |> assigned_partitions() |> Enum.shuffle()
{messages, tracker} = pull_round(partitions, total_demand, state, [], state.offset_tracker)
new_demand = max(total_demand - length(messages), 0)
receive_timer =
case {messages, new_demand} do
{[], _} -> schedule_receive_messages(state.receive_interval)
{_, 0} -> nil
{_, _} -> schedule_receive_messages(0)
end
{messages,
%{state | demand: new_demand, offset_tracker: tracker, receive_timer: receive_timer}}
end
# The pulls are sequential on purpose. `pull/3` is a local GenServer.call that
# returns already-buffered records (not a network fetch), so it's cheap, and we
# stop as soon as demand is met, so the number of calls scales with demand, not
# with the assigned partition count. Combined with the shuffle in
# receive_messages_from_klife/2, no partition starves. Parallelizing the pulls
# would not be worth the complexity hit.
#
# If a single producer pumping many partitions ever becomes the bottleneck, raise producer
# `concurrency`: the allocator shards partitions across producers, cutting each
# one's pull count and parallelizing across processes within GenStage's model.
defp pull_round(_partitions, demand, _state, messages, tracker) when demand <= 0,
do: {messages, tracker}
defp pull_round([], _demand, _state, messages, tracker), do: {messages, tracker}
defp pull_round([tp | rest], demand, state, messages, tracker) do
case pull(state, tp) do
{:ok, [_ | _] = records} ->
new_messages = Enum.map(records, &build_message(&1, state.ack_ref, state.message_format))
tracker = OffsetTracker.delivered(tracker, tp, Enum.map(records, & &1.offset))
pull_round(rest, demand - length(new_messages), state, messages ++ new_messages, tracker)
_empty_or_error ->
pull_round(rest, demand, state, messages, tracker)
end
end
defp assigned_partitions(state) do
state.consumer_group.assigned_partitions(state.group_name)
|> Enum.filter(&owns?(&1, state.producer_index, state.producer_count))
catch
kind, reason when kind in [:exit, :error] ->
Logger.warning(
"OffBroadwayKlife assigned_partitions failed for #{state.group_name}: #{inspect({kind, reason})}"
)
[]
end
@doc false
def owns?(topic_partition, producer_index, producer_count) do
:erlang.phash2(topic_partition, producer_count) == producer_index
end
defp pull(state, {topic, partition}) do
state.consumer_group.pull(state.group_name, topic, partition)
catch
:exit, reason ->
Logger.warning("OffBroadwayKlife pull exited for #{topic}:#{partition}: #{inspect(reason)}")
{:ok, :empty}
end
defp commit(state, topic, partition, offset) do
state.consumer_group.commit(state.group_name, topic, partition, offset)
catch
:exit, reason ->
Logger.warning(
"OffBroadwayKlife commit exited for #{topic}:#{partition}@#{offset}: #{inspect(reason)}"
)
:ok
end
defp build_message(%Klife.Record{} = record, ack_ref, :klife) do
%Message{
data: record,
metadata: %{},
acknowledger: {__MODULE__, ack_ref, {record.topic, record.partition, record.offset}},
batch_key: {record.topic, record.partition}
}
end
defp build_message(%Klife.Record{} = record, ack_ref, :broadway_kafka) do
%Message{
data: record.value,
metadata: %{
topic: record.topic,
partition: record.partition,
offset: record.offset,
key: record.key,
ts: record.timestamp,
headers: encode_headers(record.headers)
},
acknowledger: {__MODULE__, ack_ref, {record.topic, record.partition, record.offset}},
batch_key: {record.topic, record.partition}
}
end
defp encode_headers(nil), do: []
defp encode_headers(headers), do: Enum.map(headers, fn %{key: k, value: v} -> {k, v} end)
defp message_tp_offset(%Message{acknowledger: {_module, _ack_ref, {topic, partition, offset}}}) do
{{topic, partition}, offset}
end
defp schedule_receive_messages(interval) do
Process.send_after(self(), :receive_messages, interval)
end
defp force_manual_mode(topics) when is_list(topics) do
Enum.map(topics, fn topic_config -> Keyword.put(topic_config, :mode, :manual) end)
end
defp put_partition_by(broadway_opts) do
if partition_by_set?(broadway_opts) do
raise ArgumentError,
"OffBroadwayKlife.Producer manages :partition_by to preserve Kafka per-partition " <>
"ordering and it must not be set manually. Remove the :partition_by option."
end
fun = &__MODULE__.partition_by/1
broadway_opts = Keyword.update!(broadway_opts, :processors, &put_stage_partition_by(&1, fun))
case Keyword.fetch(broadway_opts, :batchers) do
{:ok, batchers} ->
Keyword.put(broadway_opts, :batchers, put_stage_partition_by(batchers, fun))
:error ->
broadway_opts
end
end
defp put_stage_partition_by(stages, fun) do
Enum.map(stages, fn {name, config} -> {name, Keyword.put(config, :partition_by, fun)} end)
end
defp partition_by_set?(broadway_opts) do
Keyword.has_key?(broadway_opts, :partition_by) or
stage_has_partition_by?(broadway_opts[:processors]) or
stage_has_partition_by?(broadway_opts[:batchers])
end
defp stage_has_partition_by?(nil), do: false
defp stage_has_partition_by?(stages) do
Enum.any?(stages, fn {_name, config} -> Keyword.has_key?(config, :partition_by) end)
end
@doc false
def partition_by(%Message{data: %Klife.Record{topic: topic, partition: partition}}) do
:erlang.phash2({topic, partition})
end
def partition_by(%Message{metadata: %{topic: topic, partition: partition}}) do
:erlang.phash2({topic, partition})
end
# NimbleOptions cannot express "exactly one of"; both options are optional in
# the schema and the pairing is enforced here. Returns the consumer group
# module plus the extra start args it needs: with :client the stock is used and gets bound to the client on
# its first start (see its moduledoc); a :consumer_group module already
# carries its client in its `use` options.
defp resolve_cg!(opts) do
case {Keyword.fetch(opts, :client), Keyword.fetch(opts, :consumer_group)} do
{{:ok, client}, :error} ->
{OffBroadwayKlife.ConsumerGroup, [client: client]}
{:error, {:ok, mod}} ->
{mod, []}
{:error, :error} ->
raise ArgumentError,
"one of :client or :consumer_group is required in OffBroadwayKlife.Producer options"
{{:ok, _}, {:ok, _}} ->
raise ArgumentError,
":client and :consumer_group are mutually exclusive in OffBroadwayKlife.Producer " <>
"options; set only one of them"
end
end
@doc false
def validate_client(mod) when is_atom(mod) and not is_nil(mod) do
if Code.ensure_loaded?(mod) and function_exported?(mod, :get_default_fetcher, 0) do
{:ok, mod}
else
{:error,
"#{inspect(mod)} is not a Klife client; define it with " <>
"`use Klife.Client, otp_app: :my_app`"}
end
end
def validate_client(other) do
{:error, "expected a Klife client module, got: #{inspect(other)}"}
end
@doc false
def validate_consumer_group(mod) when is_atom(mod) do
exports? =
Code.ensure_loaded?(mod) and
function_exported?(mod, :assigned_partitions, 1) and
function_exported?(mod, :pull, 3) and
function_exported?(mod, :commit, 4)
if exports? do
{:ok, mod}
else
{:error,
"#{inspect(mod)} is not a Klife consumer group; define it with " <>
"`use Klife.Consumer.ConsumerGroup, client: MyClient`"}
end
end
def validate_consumer_group(other) do
{:error, "expected a consumer group module, got: #{inspect(other)}"}
end
end