defmodule BroadwayKafka.Producer do
@moduledoc """
A Kafka connector for Broadway.
BroadwayKafka can subscribe as a consumer to one or more topics and process streams
of records within the same consumer group. Communication is done through Kafka's
[Consumer API](https://kafka.apache.org/documentation.html#consumerapi) using the
[:brod](https://github.com/klarna/brod/) client.
## Options
* `:hosts` - Required. A list of host and port tuples or a single string of comma
separated HOST:PORT pairs to use for establishing the initial connection to Kafka,
e.g. [localhost: 9092]. Examples:
# Keyword
["kafka-vm1": 9092, "kafka-vm2": 9092, "kafka-vm3": 9092]
# List of tuples
[{"kafka-vm1", 9092}, {"kafka-vm2", 9092}, {"kafka-vm3", 9092}]
# String
"kafka-vm1:9092,kafka-vm2:9092,kafka-vm3:9092"
* `:group_id` - Required. A unique string that identifies the consumer group the producer
will belong to.
* `:topics` - Required. A list of topics that the producer will subscribe to.
* `:receive_interval` - Optional. The duration (in milliseconds) for which the producer
waits before making a request for more messages. Default is 2000 (2 seconds).
* `:offset_commit_on_ack` - Optional. Tells Broadway to send or not an offset commit
request after each acknowledgement. Default is `true`. Setting this value to `false` can
increase performance since commit requests will respect the `:offset_commit_interval_seconds`
option. However, setting long commit intervals might lead to a large number of duplicated
records to be processed after a server restart or connection loss. If that's the case, make
sure your logic is idempotent when consuming records to avoid inconsistencies. Also, bear
in mind the the negative performance impact might be insignificant if you're using batchers
since only one commit request will be performed per batch.
* `:offset_reset_policy` - Optional. Defines the offset to be used when there's no initial
offset in Kafka or if the current offset has expired. Possible values are `:earliest` or
`:latest`. Default is `:latest`.
* `:group_config` - Optional. A list of options used to configure the group
coordinator. See the ["Group config options"](#module-group-config-options) section below for a list of all available
options.
* `:fetch_config` - Optional. A list of options used when fetching messages. See the
["Fetch config options"](#module-fetch-config-options) section below for a list of all available options.
* `:client_config` - Optional. A list of options used when creating the client. See the
["Client config options"](#module-client-config-options) section below for a list of all available options.
## Group config options
The available options that will be passed to `:brod`'s group coordinator.
* `:offset_commit_interval_seconds` - Optional. The time interval between two
OffsetCommitRequest messages. Default is 5.
* `:rejoin_delay_seconds` - Optional. Delay in seconds before rejoining the group. Default is 1.
* `:session_timeout_seconds` - Optional. Time in seconds the group coordinator broker waits
before considering a member 'down' if no heartbeat or any kind of request is received.
A group member may also consider the coordinator broker 'down' if no heartbeat response
is received in the past N seconds. Default is 30 seconds.
* `:heartbeat_rate_seconds` - Optional. Time in seconds for member to 'ping' group coordinator.
Heartbeats are used to ensure that the consumer's session stays active and
to facilitate rebalancing when new consumers join or leave the group.
The value must be set lower than `:session_timeout_seconds`, typically equal to or lower than 1/3 of that value.
It can be adjusted even lower to control the expected time for normal rebalances. Default is 5 seconds.
* `:rebalance_timeout_seconds` - Optional. Time in seconds for each worker to join the group once a rebalance has begun.
If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures. Default is 30.
## Fetch config options
The available options that will be internally passed to `:brod.fetch/5`.
* `:min_bytes` - Optional. The minimum amount of data to be fetched from the server.
If not enough data is available the request will wait for that much data to accumulate
before answering. Default is 1 byte. Setting this value greater than 1 can improve
server throughput a bit at the cost of additional latency.
* `:max_bytes` - Optional. The maximum amount of data to be fetched at a time from a single
partition. Default is 1048576 (1 MiB). Setting greater values can improve server
throughput at the cost of more memory consumption.
* `:max_wait_time` - Optional. Time in millisecond. Max number of milliseconds allowed for the broker to collect
`min_bytes` of messages in fetch response. Default is 1000ms.
## Client config options
The available options that will be internally passed to `:brod.start_client/3`.
* `:client_id_prefix` - Optional. A string that will be used to build the client id passed to `:brod`. The example
value `client_id_prefix: :"\#{Node.self()} -"` would generate the following connection log from our integration
tests:
20:41:37.717 [info] :supervisor: {:local, :brod_sup}
:started: [
pid: #PID<0.286.0>,
id: :"nonode@nohost - Elixir.BroadwayKafka.ConsumerTest.MyBroadway.Broadway.Producer_0.Client",
mfargs: {:brod_client, :start_link,
[
[localhost: 9092],
:"nonode@nohost - Elixir.BroadwayKafka.ConsumerTest.MyBroadway.Broadway.Producer_0.Client",
[client_id_prefix: :"nonode@nohost - "]
]},
restart_type: {:permanent, 10},
shutdown: 5000,
child_type: :worker
]
* `:sasl` - Optional. A a tuple of mechanism which can be `:plain`, `:scram_sha_256` or `:scram_sha_512`, username and password. See the `:brod`'s
[`Authentication Support`](https://github.com/klarna/brod#authentication-support) documentation
for more information. Default is no sasl options.
* `:ssl` - Optional. A boolean or a list of options to use when connecting via SSL/TLS. See the
[`tls_client_option`](http://erlang.org/doc/man/ssl.html#type-tls_client_option) documentation
for more information. Default is no ssl options.
* `:connect_timeout` - Optional. Time in milliseconds to be used as a timeout for `:brod`'s communication with Kafka.
Default is to use `:brod`'s default timeout which is currently 5 seconds.
* `:request_timeout` - Optional. Time in milliseconds to be used as a timeout for waiting response from Kafka.
Default is to use `:brod`'s default timeout which is currently 240 seconds.
> **Note**: Currently, Broadway does not support all options provided by `:brod`. If you
have a scenario where you need any extra option that is not listed above, please open an
issue, so we can consider adding it.
## Example
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwayKafka.Producer, [
hosts: [localhost: 9092],
group_id: "group_1",
topics: ["test"],
]},
concurrency: 1
],
processors: [
default: [
concurrency: 10
]
]
)
## Concurrency and partitioning
The concurrency model provided by Kafka is based on partitioning, i.e., the more partitions
you have, the more concurrency you get. However, in order to take advantage of this model
you need to set up the `:concurrency` options for your processors and batchers accordingly. Having
less concurrency than topic/partitions assigned will result in individual processors handling more
than one partition, decreasing the overall level of concurrency. Therefore, if you want to
always be able to process messages at maximum concurrency (assuming you have enough resources
to do it), you should increase the concurrency up front to make sure you have enough
processors to handle the extra messages received from new partitions assigned.
> **Note**: Even if you don't plan to add more partitions to a Kafka topic, your pipeline can still
receive more assignments than planned. For instance, if another consumer crashes, the server
will reassign all its topic/partition to other available consumers, including any Broadway
producer subscribed to the same topic.
## Handling failed messages
`BroadwayKafka` never stops the flow of the stream, i.e. it will **always ack** the messages
even when they fail. Unlike queue-based connectors, where you can mark a single message as failed.
In Kafka that's not possible due to its single offset per topic/partition ack strategy. If you
want to reprocess failed messages, you need to roll your own strategy. A possible way to do that
is to implement `c:Broadway.handle_failed/2` and send failed messages to a separated stream or queue for
later processing.
## Message metadata
When producing messages, the following information will be passed to
[`Broadway.Message`](`t:Broadway.Message.t/0`)'s metadata.
* `topic` - The topic the message was published.
* `partition` - The topic partition.
* `offset` - The offset assigned to the message inside the partition.
* `key` - The partition key.
* `ts` - A timestamp associated with the message.
* `headers` - The headers of the message.
## Telemetry
This producer emits a few [Telemetry](https://github.com/beam-telemetry/telemetry)
events which are listed below.
* `[:broadway_kafka, :assignments_revoked, :start | :stop | :exception]` spans -
these events are emitted in "span style" when receiving assignments revoked call from consumer group coordinator
See `:telemetry.span/3`.
"""
use GenStage
require Logger
import Record, only: [defrecord: 2, extract: 2]
alias Broadway.{Message, Acknowledger, Producer}
alias BroadwayKafka.Allocator
alias BroadwayKafka.Acknowledger
@behaviour Producer
@behaviour :brod_group_member
defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl")
defrecord :brod_received_assignment,
extract(:brod_received_assignment, from_lib: "brod/include/brod.hrl")
@impl GenStage
def init(opts) do
Process.flag(:trap_exit, true)
client = opts[:client] || BroadwayKafka.BrodClient
case client.init(opts) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message
{:ok, config} ->
{_, producer_name} = Process.info(self(), :registered_name)
draining_after_revoke_flag =
self()
|> drain_after_revoke_table_name!()
|> drain_after_revoke_table_init!()
prefix = get_in(config, [:client_config, :client_id_prefix])
client_id = :"#{prefix}#{Module.concat([producer_name, Client])}"
max_demand =
with [{_first, processor_opts}] <- opts[:broadway][:processors],
max_demand when is_integer(max_demand) <- processor_opts[:max_demand] do
max_demand
else
_ -> 10
end
state = %{
client: client,
client_id: client_id,
group_coordinator: nil,
receive_timer: nil,
receive_interval: config.receive_interval,
reconnect_timeout: config.reconnect_timeout,
acks: Acknowledger.new(),
config: config,
allocator_names: allocator_names(opts[:broadway]),
revoke_caller: nil,
draining_after_revoke_flag: draining_after_revoke_flag,
demand: 0,
shutting_down?: false,
buffer: :queue.new(),
max_demand: max_demand
}
{:producer, connect(state)}
end
end
defp allocator_names(broadway_config) do
broadway_name = broadway_config[:name]
broadway_index = broadway_config[:index]
processors_allocators =
for {name, _} <- broadway_config[:processors] do
Module.concat([broadway_name, "Allocator_processor_#{name}"])
end
batchers_allocators =
for {name, _} <- broadway_config[:batchers] do
Module.concat([broadway_name, "Allocator_batcher_consumer_#{name}"])
end
{broadway_index, processors_allocators, batchers_allocators}
end
@impl GenStage
def handle_demand(incoming_demand, %{demand: demand} = state) do
maybe_schedule_poll(%{state | demand: demand + incoming_demand}, 0)
end
@impl GenStage
def handle_call(:drain_after_revoke, _from, %{group_coordinator: nil} = state) do
set_draining_after_revoke!(state.draining_after_revoke_flag, false)
{:reply, :ok, [], state}
end
@impl GenStage
def handle_call(:drain_after_revoke, from, %{revoke_caller: nil} = state) do
state = reset_buffer(state)
if Acknowledger.all_drained?(state.acks) do
set_draining_after_revoke!(state.draining_after_revoke_flag, false)
{:reply, :ok, [], %{state | acks: Acknowledger.new()}}
else
{:noreply, [], %{state | revoke_caller: from}}
end
end
@impl GenStage
def handle_cast({:update_topics, topics}, state) do
state.client.update_topics(state.group_coordinator, topics)
{:noreply, [], state}
end
@impl GenStage
def handle_info({:poll, key}, %{acks: acks, demand: demand, max_demand: max_demand} = state) do
# We only poll if:
#
# 1. We are not shutting down
# 2. We are not waiting for draining after receivd revoke assignment
# 3. We know the key being acked
#
# Note the key may be out of date when polling has been scheduled and
# assignments were revoked afterwards, which is why check 3 is necessary.
offset = Acknowledger.last_offset(acks, key)
if not state.shutting_down? and
not is_draining_after_revoke?(state.draining_after_revoke_flag) and
offset != nil do
messages = fetch_messages_from_kafka(state, key, offset)
to_send = min(demand, max_demand)
{new_acks, not_sent, messages, pending} = split_demand(messages, acks, key, to_send)
new_buffer = enqueue_many(state.buffer, key, pending)
new_demand = demand - to_send + not_sent
new_state = %{state | acks: new_acks, demand: new_demand, buffer: new_buffer}
{:noreply, messages, new_state}
else
{:noreply, [], state}
end
end
@impl GenStage
def handle_info(:maybe_schedule_poll, state) do
maybe_schedule_poll(%{state | receive_timer: nil}, state.receive_interval)
end
@impl GenStage
def handle_info({:put_assignments, group_generation_id, assignments}, state) do
list =
Enum.map(assignments, fn assignment ->
brod_received_assignment(
topic: topic,
partition: partition,
begin_offset: begin_offset
) = assignment
offset_reset_policy = state.config[:offset_reset_policy]
offset =
state.client.resolve_offset(
topic,
partition,
begin_offset,
offset_reset_policy,
state.config
)
{group_generation_id, topic, partition, offset}
end)
topics_partitions = Enum.map(list, fn {_, topic, partition, _} -> {topic, partition} end)
{broadway_index, processors_allocators, batchers_allocators} = state.allocator_names
for allocator_name <- processors_allocators do
Allocator.allocate(allocator_name, broadway_index, topics_partitions)
end
for allocator_name <- batchers_allocators do
Allocator.allocate(allocator_name, broadway_index, topics_partitions)
end
{:noreply, [], %{state | acks: Acknowledger.add(state.acks, list)}}
end
@impl GenStage
def handle_info({:ack, key, offsets}, state) do
%{group_coordinator: group_coordinator, client: client, acks: acks, config: config} = state
{generation_id, topic, partition} = key
{drained?, new_offset, updated_acks} = Acknowledger.update_current_offset(acks, key, offsets)
if new_offset do
try do
client.ack(
group_coordinator,
generation_id,
topic,
partition,
new_offset,
disable_offset_commit_during_revoke_call(config, state)
)
catch
kind, reason ->
Logger.error(Exception.format(kind, reason, __STACKTRACE__))
end
end
new_state =
if drained? && state.revoke_caller && Acknowledger.all_drained?(updated_acks) do
set_draining_after_revoke!(state.draining_after_revoke_flag, false)
GenStage.reply(state.revoke_caller, :ok)
%{state | revoke_caller: nil, acks: Acknowledger.new()}
else
%{state | acks: updated_acks}
end
{:noreply, [], new_state}
end
def handle_info({:DOWN, _ref, _, {client_id, _}, _reason}, %{client_id: client_id} = state) do
if coord = state.group_coordinator do
Process.exit(coord, :shutdown)
receive do
{:DOWN, _, _, ^coord, _} -> :ok
end
end
state = reset_buffer(state)
schedule_reconnect(state.reconnect_timeout)
{:noreply, [], %{state | group_coordinator: nil}}
end
def handle_info({:DOWN, _ref, _, coord, _reason}, %{group_coordinator: coord} = state) do
state = reset_buffer(state)
schedule_reconnect(state.reconnect_timeout)
{:noreply, [], %{state | group_coordinator: nil}}
end
def handle_info({:EXIT, _pid, _reason}, state) do
{:noreply, [], state}
end
@impl GenStage
def handle_info(:reconnect, state) do
if state.client.connected?(state.client_id) do
{:noreply, [], connect(state)}
else
schedule_reconnect(state.reconnect_timeout)
{:noreply, [], state}
end
end
@impl GenStage
def handle_info(_, state) do
{:noreply, [], state}
end
@impl Producer
def prepare_for_draining(state) do
# On draining, we will continue scheduling the polls, but they will be a no-op.
{:noreply, [], %{state | shutting_down?: true}}
end
@impl Producer
def prepare_for_start(_module, opts) do
broadway_name = opts[:name]
producers_concurrency = opts[:producer][:concurrency]
[first_processor_entry | other_processors_entries] = opts[:processors]
{allocator, updated_processor_entry} =
build_allocator_spec_and_consumer_entry(
broadway_name,
:processors,
"processor",
producers_concurrency,
first_processor_entry
)
{allocators, updated_batchers_entries} =
Enum.reduce(opts[:batchers], {[allocator], []}, fn entry, {allocators, entries} ->
{allocator, updated_entry} =
build_allocator_spec_and_consumer_entry(
broadway_name,
:batchers,
"batcher_consumer",
producers_concurrency,
entry
)
{[allocator | allocators], [updated_entry | entries]}
end)
updated_opts =
opts
|> Keyword.put(:processors, [updated_processor_entry | other_processors_entries])
|> Keyword.put(:batchers, updated_batchers_entries)
{allocators, updated_opts}
end
@impl :brod_group_member
def get_committed_offsets(_pid, _topics_partitions) do
raise "not implemented"
end
@impl :brod_group_member
def assignments_received(pid, _group_member_id, group_generation_id, received_assignments) do
send(pid, {:put_assignments, group_generation_id, received_assignments})
:ok
end
@impl :brod_group_member
def assignments_revoked(producer_pid) do
maybe_process_name = fn
pid when is_pid(pid) -> pid
name when is_atom(name) -> Process.whereis(name)
end
producer_pid
|> maybe_process_name.()
|> drain_after_revoke_table_name!()
|> set_draining_after_revoke!(true)
metadata = %{producer: maybe_process_name.(producer_pid)}
:telemetry.span([:broadway_kafka, :assignments_revoked], metadata, fn ->
GenStage.call(producer_pid, :drain_after_revoke, :infinity)
{:ok, metadata}
end)
end
@impl GenStage
def terminate(_reason, state) do
%{client: client, group_coordinator: group_coordinator, client_id: client_id} = state
group_coordinator && Process.exit(group_coordinator, :shutdown)
client.disconnect(client_id)
:ok
end
defp maybe_schedule_poll(%{demand: 0} = state, _interval) do
{:noreply, [], state}
end
defp maybe_schedule_poll(state, interval) do
%{buffer: buffer, demand: demand, acks: acks, receive_timer: receive_timer} = state
case dequeue_many(buffer, acks, demand, []) do
{acks, 0, events, buffer} ->
{:noreply, events, %{state | demand: 0, buffer: buffer, acks: acks}}
{acks, demand, events, buffer} ->
receive_timer = receive_timer || schedule_poll(state, interval)
state = %{
state
| demand: demand,
buffer: buffer,
receive_timer: receive_timer,
acks: acks
}
{:noreply, events, state}
end
end
defp schedule_poll(state, interval) do
for key <- Acknowledger.keys(state.acks) do
Process.send_after(self(), {:poll, key}, interval)
end
Process.send_after(self(), :maybe_schedule_poll, interval)
end
defp fetch_messages_from_kafka(state, key, offset) do
%{
client: client,
client_id: client_id,
config: config
} = state
{generation_id, topic, partition} = key
case client.fetch(client_id, topic, partition, offset, config[:fetch_config], config) do
{:ok, {_watermark_offset, kafka_messages}} ->
Enum.map(kafka_messages, fn k_msg ->
wrap_message(k_msg, topic, partition, generation_id)
end)
{:error, reason} ->
raise "cannot fetch records from Kafka (topic=#{topic} partition=#{partition} " <>
"offset=#{offset}). Reason: #{inspect(reason)}"
end
end
defp wrap_message(kafka_msg, topic, partition, generation_id) do
kafka_message(value: data, offset: offset, key: key, ts: ts, headers: headers) = kafka_msg
ack_data = %{offset: offset}
ack_ref = {self(), {generation_id, topic, partition}}
message = %Message{
data: data,
metadata: %{
topic: topic,
partition: partition,
offset: offset,
key: key,
ts: ts,
headers: headers
},
acknowledger: {Acknowledger, ack_ref, ack_data}
}
Message.put_batch_key(message, {topic, partition})
end
defp connect(state) do
%{client: client, client_id: client_id, config: config} = state
case client.setup(self(), client_id, __MODULE__, config) do
{:ok, coord_pid, _coord_ref} ->
%{state | group_coordinator: coord_pid}
error ->
raise "Cannot connect to Kafka. Reason #{inspect(error)}"
end
end
defp build_allocator_spec_and_consumer_entry(
broadway_name,
group,
prefix,
producers_concurrency,
consumer_entry
) do
{consumer_name, consumer_config} = consumer_entry
validate_partition_by(group, consumer_name, consumer_config)
consumer_concurrency = consumer_config[:concurrency]
allocator_name = Module.concat([broadway_name, "Allocator_#{prefix}_#{consumer_name}"])
partition_by = &Allocator.fetch!(allocator_name, {&1.metadata.topic, &1.metadata.partition})
new_config = Keyword.put(consumer_config, :partition_by, partition_by)
allocator =
{BroadwayKafka.Allocator, {allocator_name, producers_concurrency, consumer_concurrency}}
allocator_spec = Supervisor.child_spec(allocator, id: allocator_name)
{allocator_spec, {consumer_name, new_config}}
end
defp validate_partition_by(group, consumer_name, consumer_config) do
if Keyword.has_key?(consumer_config, :partition_by) do
raise ArgumentError,
"cannot set option :partition_by for #{group} #{inspect(consumer_name)}. " <>
"The option will be set automatically by BroadwayKafka.Producer"
end
end
## Buffer handling
defp split_demand(list, acks, key, demand) do
{rest, demand, reversed, acc} = reverse_split_demand(list, demand, [], [])
acks = update_last_offset(acks, key, reversed)
{acks, demand, Enum.reverse(acc), rest}
end
defp reverse_split_demand(rest, 0, reversed, acc) do
{rest, 0, reversed, acc}
end
defp reverse_split_demand([], demand, reversed, acc) do
{[], demand, reversed, acc}
end
defp reverse_split_demand([head | tail], demand, reversed, acc) do
reverse_split_demand(tail, demand - 1, [head | reversed], [head | acc])
end
defp enqueue_many(queue, _key, []), do: queue
defp enqueue_many(queue, key, list), do: :queue.in({key, list}, queue)
defp dequeue_many(queue, acks, demand, acc) when demand > 0 do
case :queue.out(queue) do
{{:value, {key, list}}, queue} ->
{rest, demand, reversed, acc} = reverse_split_demand(list, demand, [], acc)
acks = update_last_offset(acks, key, reversed)
case {demand, rest} do
{0, []} ->
{acks, demand, Enum.reverse(acc), queue}
{0, _} ->
{acks, demand, Enum.reverse(acc), :queue.in({key, rest}, queue)}
{_, []} ->
dequeue_many(queue, acks, demand, acc)
end
{:empty, queue} ->
{acks, demand, Enum.reverse(acc), queue}
end
end
defp update_last_offset(acks, key, [message | _] = reversed) do
last = message.metadata.offset + 1
offsets = Enum.reduce(reversed, [], &[&1.metadata.offset | &2])
Acknowledger.update_last_offset(acks, key, last, offsets)
end
defp update_last_offset(acks, _key, []) do
acks
end
defp reset_buffer(state) do
put_in(state.buffer, :queue.new())
end
defp schedule_reconnect(timeout) do
Process.send_after(self(), :reconnect, timeout)
end
defp drain_after_revoke_table_name!(pid) do
{_, producer_name} = Process.info(pid, :registered_name)
Module.concat([producer_name, DrainingAfterRevoke])
end
defp drain_after_revoke_table_init!(table_name) do
table_name = :ets.new(table_name, [:named_table, :public, :set])
set_draining_after_revoke!(table_name, false)
table_name
end
defp set_draining_after_revoke!(table_name, value) do
:ets.insert(table_name, {:draining, value})
end
defp is_draining_after_revoke?(table_name) do
:ets.lookup_element(table_name, :draining, 2)
end
defp disable_offset_commit_during_revoke_call(config, state) do
offset_commit_on_ack =
not is_draining_after_revoke?(state.draining_after_revoke_flag) and
state.config.offset_commit_on_ack
%{config | offset_commit_on_ack: offset_commit_on_ack}
end
end