%%%
%%% Copyright (c) 2014-2021, Klarna Bank AB (publ)
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%
-module(brod).
-behaviour(application).
%% Application
-export([ start/0
, start/2
, stop/0
, stop/1
]).
%% Client API
-export([ get_partitions_count/2
, start_client/1
, start_client/2
, start_client/3
, start_consumer/3
, start_producer/3
, stop_client/1
]).
-export([ start_link_client/1
, start_link_client/2
, start_link_client/3
]).
%% Producer API
-export([ get_producer/3
, produce/2
, produce/3
, produce/5
, produce_cb/4
, produce_cb/6
, produce_sync/2
, produce_sync/3
, produce_sync/5
, produce_sync_offset/5
, produce_no_ack/5
, sync_produce_request/1
, sync_produce_request/2
, sync_produce_request_offset/1
, sync_produce_request_offset/2
]).
%% Simple Consumer API
-export([ consume_ack/2
, consume_ack/4
, get_consumer/3
, subscribe/3
, subscribe/5
, unsubscribe/1
, unsubscribe/2
, unsubscribe/3
, unsubscribe/4
]).
%% Subscriber API
-export([ start_link_group_subscriber_v2/1
, start_link_topic_subscriber/1
]).
%% Deprecated API
-export([ start_link_group_subscriber/7
, start_link_group_subscriber/8
, start_link_topic_subscriber/5
, start_link_topic_subscriber/6
, start_link_topic_subscriber/7
]).
%% Topic APIs
-export([ create_topics/3
, create_topics/4
, delete_topics/3
, delete_topics/4
]).
%% APIs for quick metadata or message inspection and brod_cli
-export([ get_metadata/1
, get_metadata/2
, get_metadata/3
, resolve_offset/3
, resolve_offset/4
, resolve_offset/5
, resolve_offset/6
, fetch/4
, fetch/5
, fold/8
, connect_leader/4
, list_all_groups/2
, list_groups/2
, describe_groups/3
, connect_group_coordinator/3
, fetch_committed_offsets/2
, fetch_committed_offsets/3
]).
-deprecated([ {fetch, 7, next_version}
, {fetch, 8, next_version}
]).
-export([ fetch/7
, fetch/8
]).
-ifdef(build_brod_cli).
-export([main/1]).
-endif.
-export_type([ batch_input/0
, bootstrap/0
, call_ref/0
, cg/0
, cg_protocol_type/0
, client/0
, client_config/0
, client_id/0
, compression/0
, connection/0
, conn_config/0
, consumer_config/0
, consumer_option/0
, consumer_options/0
, endpoint/0
, error_code/0
, fetch_opts/0
, fold_acc/0
, fold_fun/1
, fold_limits/0
, fold_stop_reason/0
, fold_result/0
, group_config/0
, group_generation_id/0
, group_id/0
, group_member/0
, group_member_id/0
, hostname/0
, key/0
, msg_input/0
, msg_ts/0
, message/0
, message_set/0
, offset/0
, offset_time/0
, partition/0
, partition_assignment/0
, partition_fun/0
, partitioner/0
, portnum/0
, produce_ack_cb/0
, producer_config/0
, produce_reply/0
, produce_result/0
, received_assignments/0
, topic/0
, topic_partition/0
, value/0
]).
-include("brod_int.hrl").
%%%_* Types ====================================================================
%% basics
-type hostname() :: kpro:hostname().
-type portnum() :: pos_integer().
-type endpoint() :: {hostname(), portnum()}.
-type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
-type partition() :: kpro:partition().
-type topic_partition() :: {topic(), partition()}.
-type offset() :: kpro:offset().
-type key() :: undefined %% no key, transformed to <<>>
| binary().
-type value() :: undefined %% no value, transformed to <<>>
| iodata() %% single value
| {msg_ts(), binary()} %% one message with timestamp
| [?KV(key(), value())] %% backward compatible
| [?TKV(msg_ts(), key(), value())] %% backward compatible
| kpro:msg_input() %% one magic v2 message
| kpro:batch_input(). %% maybe nested batch
-type msg_input() :: kpro:msg_input().
-type batch_input() :: [msg_input()].
-type msg_ts() :: kpro:msg_ts().
-type client_id() :: atom().
-type client() :: client_id() | pid().
-type client_config() :: brod_client:config().
-type bootstrap() :: [endpoint()] %% default client config
| {[endpoint()], client_config()}.
-type offset_time() :: integer()
| ?OFFSET_EARLIEST
| ?OFFSET_LATEST.
-type message() :: kpro:message().
-type message_set() :: #kafka_message_set{}.
-type error_code() :: kpro:error_code().
%% producers
-type produce_reply() :: #brod_produce_reply{}.
-type producer_config() :: brod_producer:config().
-type partition_fun() :: fun((topic(), pos_integer(), key(), value()) ->
{ok, partition()}).
-type partitioner() :: partition_fun() | random | hash.
-type produce_ack_cb() :: fun((partition(), offset()) -> _).
-type compression() :: no_compression | gzip | snappy.
-type call_ref() :: #brod_call_ref{}.
-type produce_result() :: brod_produce_req_buffered
| brod_produce_req_acked.
%% consumers
-type consumer_option() :: begin_offset
| min_bytes
| max_bytes
| max_wait_time
| sleep_timeout
| prefetch_count
| prefetch_bytes
| offset_reset_policy
| size_stat_window.
-type consumer_options() :: [{consumer_option(), integer()}].
-type consumer_config() :: brod_consumer:config().
-type connection() :: kpro:connection().
-type conn_config() :: [{atom(), term()}] | kpro:conn_config().
%% consumer groups
-type group_id() :: kpro:group_id().
-type group_member_id() :: binary().
-type group_member() :: {group_member_id(), #kafka_group_member_metadata{}}.
-type group_generation_id() :: non_neg_integer().
-type group_config() :: proplists:proplist().
-type partition_assignment() :: {topic() , [partition()]}.
-type received_assignments() :: [#brod_received_assignment{}].
-type cg() :: #brod_cg{}.
-type cg_protocol_type() :: binary().
-type fetch_opts() :: kpro:fetch_opts().
-type fold_acc() :: term().
-type fold_fun(Acc) :: fun((message(), Acc) -> {ok, Acc} | {error, any()}).
%% `fold' always returns when reaches the high watermark offset. `fold'
%% also returns when any of the limits is hit.
-type fold_limits() :: #{ message_count => pos_integer()
, reach_offset => offset()
}.
-type fold_stop_reason() :: reached_end_of_partition
| reached_message_count_limit
| reached_target_offset
| {error, any()}.
%% OffsetToContinue: begin offset for the next fold call
-type fold_result() :: ?BROD_FOLD_RET(fold_acc(),
OffsetToContinue :: offset(),
fold_stop_reason()).
%%%_* APIs =====================================================================
%% @doc Start brod application.
-spec start() -> ok | no_return().
start() ->
{ok, _Apps} = application:ensure_all_started(brod),
ok.
%% @doc Stop brod application.
-spec stop() -> ok.
stop() ->
application:stop(brod).
%% @doc Application behaviour callback
start(_StartType, _StartArgs) -> brod_sup:start_link().
%% @doc Application behaviour callback
stop(_State) -> ok.
%% @equiv start_client(BootstrapEndpoints, brod_default_client)
-spec start_client([endpoint()]) -> ok | {error, any()}.
start_client(BootstrapEndpoints) ->
start_client(BootstrapEndpoints, ?BROD_DEFAULT_CLIENT_ID).
%% @equiv start_client(BootstrapEndpoints, ClientId, [])
-spec start_client([endpoint()], client_id()) -> ok | {error, any()}.
start_client(BootstrapEndpoints, ClientId) ->
start_client(BootstrapEndpoints, ClientId, []).
%% @doc Start a client.
%%
%% `BootstrapEndpoints':
%% Kafka cluster endpoints, can be any of the brokers in the cluster,
%% which does not necessarily have to be the leader of any partition,
%% e.g. a load-balanced entrypoint to the remote Kafka cluster.
%%
%% `ClientId': Atom to identify the client process.
%%
%% `Config' is a proplist, possible values:
%% <ul>
%% <li>`restart_delay_seconds' (optional, default=10)
%%
%% How long to wait between attempts to restart brod_client
%% process when it crashes.</li>
%%
%% <li>`get_metadata_timeout_seconds' (optional, default=5)
%%
%% Return `{error, timeout}' from `brod_client:get_xxx' calls if
%% responses for APIs such as `metadata', `find_coordinator'
%% are not received in time.</li>
%%
%% <li>`reconnect_cool_down_seconds' (optional, default=1)
%%
%% Delay this configured number of seconds before retrying to
%% establish a new connection to the kafka partition leader.</li>
%%
%% <li>`allow_topic_auto_creation' (optional, default=true)
%%
%% By default, brod respects what is configured in the broker
%% about topic auto-creation. i.e. whether
%% `auto.create.topics.enable' is set in the broker configuration.
%% However if `allow_topic_auto_creation' is set to `false' in
%% client config, brod will avoid sending metadata requests that
%% may cause an auto-creation of the topic regardless of what
%% broker config is.</li>
%%
%% <li>`auto_start_producers' (optional, default=false)
%%
%% If true, brod client will spawn a producer automatically when
%% user is trying to call `produce' but did not call `brod:start_producer'
%% explicitly. Can be useful for applications which don't know beforehand
%% which topics they will be working with.</li>
%%
%% <li>`default_producer_config' (optional, default=[])
%%
%% Producer configuration to use when auto_start_producers is true.
%% See `brod_producer:start_link/4' for details about producer config</li>
%%
%% </ul>
%%
%% Connection options can be added to the same proplist. See
%% `kpro_connection.erl' in `kafka_protocol' for the details:
%%
%% <ul>
%% <li>`ssl' (optional, default=false)
%%
%% `true | false | ssl:ssl_option()'
%% `true' is translated to `[]' as `ssl:ssl_option()' i.e. all default.
%% </li>
%%
%% <li>`sasl' (optional, default=undefined)
%%
%% Credentials for SASL/Plain authentication.
%% `{mechanism(), Filename}' or `{mechanism(), UserName, Password}'
%% where mechanism can be atoms: `plain' (for "PLAIN"), `scram_sha_256'
%% (for "SCRAM-SHA-256") or `scram_sha_512' (for SCRAM-SHA-512).
%% `Filename' should be a file consisting two lines, first line
%% is the username and the second line is the password.
%% Both `Username' and `Password' should be `string() | binary()'</li>
%%
%% <li>`connect_timeout' (optional, default=5000)
%%
%% Timeout when trying to connect to an endpoint.</li>
%%
%% <li>`request_timeout' (optional, default=240000, constraint: >= 1000)
%%
%% Timeout when waiting for a response, connection restart when timed
%% out.</li>
%%
%% <li>`query_api_versions' (optional, default=true)
%%
%% Must be set to false to work with kafka versions prior to 0.10,
%% When set to `true', at connection start, brod will send a query request
%% to get the broker supported API version ranges.
%% When set to 'false', brod will always use the lowest supported API version
%% when sending requests to kafka.
%% Supported API version ranges can be found in:
%% `brod_kafka_apis:supported_versions/1'</li>
%%
%% <li>`extra_sock_opts' (optional, default=[])
%%
%% Extra socket options to tune socket performance.
%% e.g. `[{sndbuf, 1 bsl 20}]'.
%% <a href="http://erlang.org/doc/man/gen_tcp.html#type-option">More info
%% </a>
%% </li>
%% </ul>
-spec start_client([endpoint()], client_id(), client_config()) ->
ok | {error, any()}.
start_client(BootstrapEndpoints, ClientId, Config) ->
case brod_sup:start_client(BootstrapEndpoints, ClientId, Config) of
ok -> ok;
{error, {already_started, _Pid}} -> ok;
{error, Reason} -> {error, Reason}
end.
%% @equiv start_link_client(BootstrapEndpoints, brod_default_client)
-spec start_link_client([endpoint()]) -> {ok, pid()} | {error, any()}.
start_link_client(BootstrapEndpoints) ->
start_link_client(BootstrapEndpoints, ?BROD_DEFAULT_CLIENT_ID).
%% @equiv start_link_client(BootstrapEndpoints, ClientId, [])
-spec start_link_client([endpoint()], client_id()) ->
{ok, pid()} | {error, any()}.
start_link_client(BootstrapEndpoints, ClientId) ->
start_link_client(BootstrapEndpoints, ClientId, []).
-spec start_link_client([endpoint()], client_id(), client_config()) ->
{ok, pid()} | {error, any()}.
start_link_client(BootstrapEndpoints, ClientId, Config) ->
brod_client:start_link(BootstrapEndpoints, ClientId, Config).
%% @doc Stop a client.
-spec stop_client(client()) -> ok.
stop_client(Client) when is_atom(Client) ->
case brod_sup:find_client(Client) of
[_Pid] -> brod_sup:stop_client(Client);
[] -> brod_client:stop(Client)
end;
stop_client(Client) when is_pid(Client) ->
brod_client:stop(Client).
%% @doc Dynamically start a per-topic producer.
%% @see brod_producer:start_link/4
-spec start_producer(client(), topic(), producer_config()) ->
ok | {error, any()}.
start_producer(Client, TopicName, ProducerConfig) ->
brod_client:start_producer(Client, TopicName, ProducerConfig).
%% @doc Dynamically start a topic consumer.
%% @see brod_consumer:start_link/5. for details about consumer config.
-spec start_consumer(client(), topic(), consumer_config()) ->
ok | {error, any()}.
start_consumer(Client, TopicName, ConsumerConfig) ->
brod_client:start_consumer(Client, TopicName, ConsumerConfig).
%% @doc Get number of partitions for a given topic.
%% The higher level producers may need the partition numbers to
%% find the partition producer pid --- if the number of partitions
%% is not statically configured for them.
%% It is up to the callers how they want to distribute their data
%% (e.g. random, roundrobin or consistent-hashing) to the partitions.
-spec get_partitions_count(client(), topic()) ->
{ok, pos_integer()} | {error, any()}.
get_partitions_count(Client, Topic) ->
brod_client:get_partitions_count(Client, Topic).
-spec get_consumer(client(), topic(), partition()) ->
{ok, pid()} | {error, Reason}
when Reason :: client_down
| {client_down, any()}
| {consumer_down, any()}
| {consumer_not_found, topic()}
| {consumer_not_found, topic(), partition()}.
get_consumer(Client, Topic, Partition) ->
brod_client:get_consumer(Client, Topic, Partition).
%% @equiv brod_client:get_producer/3
-spec get_producer(client(), topic(), partition()) ->
{ok, pid()} | {error, Reason}
when Reason :: client_down
| {client_down, any()}
| {producer_down, any()}
| {producer_not_found, topic()}
| {producer_not_found, topic(), partition()}.
get_producer(Client, Topic, Partition) ->
brod_client:get_producer(Client, Topic, Partition).
%% @equiv produce(Pid, <<>>, Value)
-spec produce(pid(), value()) -> {ok, call_ref()} | {error, any()}.
produce(Pid, Value) ->
produce(Pid, _Key = <<>>, Value).
%% @doc Produce one message if `Value' is a binary or an
%% iolist. Otherwise send a batch, if `Value' is a (nested) key-value
%% list, or a list of maps. In this case `Key' is discarded (only the
%% keys in the key-value list are sent to Kafka). The pid should be a
%% partition producer pid, NOT client pid. The return value is a call
%% reference of type `call_ref()', so the caller can use it to expect
%% (match) a `#brod_produce_reply{result = brod_produce_req_acked}'
%% message after the produce request has been acked by Kafka.
-spec produce(pid(), key(), value()) ->
{ok, call_ref()} | {error, any()}.
produce(ProducerPid, Key, Value) ->
brod_producer:produce(ProducerPid, Key, Value).
%% @doc Produce one message if `Value' is a binary or an iolist.
%% Otherwise send a batch if `Value' is a (nested) key-value list, or
%% a list of maps. In this case `Key' is used only for partitioning,
%% or discarded if the 3rd argument is a partition number instead of a
%% partitioner callback. This function first looks up the producer
%% pid, then calls `produce/3' to do the real work. The return value
%% is a call reference of type `call_ref()', so the caller can used it
%% to expect (match) a `#brod_produce_reply{result =
%% brod_produce_req_acked}' message after the produce request has been
%% acked by Kafka.
-spec produce(client(), topic(), partition() | partitioner(),
key(), value()) -> {ok, call_ref()} | {error, any()}.
produce(Client, Topic, Partition, Key, Value) when is_integer(Partition) ->
case get_producer(Client, Topic, Partition) of
{ok, Pid} -> produce(Pid, Key, Value);
{error, Reason} -> {error, Reason}
end;
produce(Client, Topic, Partitioner, Key, Value) ->
PartFun = brod_utils:make_part_fun(Partitioner),
case brod_client:get_partitions_count(Client, Topic) of
{ok, PartitionsCnt} ->
{ok, Partition} = PartFun(Topic, PartitionsCnt, Key, Value),
produce(Client, Topic, Partition, Key, Value);
{error, Reason} ->
{error, Reason}
end.
%% @doc Same as `produce/3', only the ack is not delivered as a message,
%% instead, the callback is evaluated by producer worker when ack is received
%% from kafka.
-spec produce_cb(pid(), key(), value(), produce_ack_cb()) ->
ok | {error, any()}.
produce_cb(ProducerPid, Key, Value, AckCb) ->
brod_producer:produce_cb(ProducerPid, Key, Value, AckCb).
%% @doc Same as `produce/5' only the ack is not delivered as a message,
%% instead, the callback is evaluated by producer worker when ack is received
%% from kafka. Return the partition to caller as `{ok, Partition}' for caller
%% to correlate the callback when the 3rd arg is not a partition number.
-spec produce_cb(client(), topic(), partition() | partitioner(),
key(), value(), produce_ack_cb()) ->
ok | {ok, partition()} | {error, any()}.
produce_cb(Client, Topic, Part, Key, Value, AckCb) when is_integer(Part) ->
case get_producer(Client, Topic, Part) of
{ok, Pid} -> produce_cb(Pid, Key, Value, AckCb);
{error, Reason} -> {error, Reason}
end;
produce_cb(Client, Topic, Partitioner, Key, Value, AckCb) ->
PartFun = brod_utils:make_part_fun(Partitioner),
case brod_client:get_partitions_count(Client, Topic) of
{ok, PartitionsCnt} ->
{ok, Partition} = PartFun(Topic, PartitionsCnt, Key, Value),
case produce_cb(Client, Topic, Partition, Key, Value, AckCb) of
ok -> {ok, Partition};
{error, Reason} -> {error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
%% @doc Send the message to partition worker without any ack.
%% NOTE: This call has no back-pressure to the caller,
%% excessive usage may cause BEAM to run out of memory.
-spec produce_no_ack(pid(), key(), value()) -> ok | {error, any()}.
produce_no_ack(ProducerPid, Key, Value) ->
brod_producer:produce_no_ack(ProducerPid, Key, Value).
%% @doc Find the partition worker and send message without any ack.
%% NOTE: This call has no back-pressure to the caller,
%% excessive usage may cause BEAM to run out of memory.
-spec produce_no_ack(client(), topic(), partition() | partitioner(),
key(), value()) -> ok | {error, any()}.
produce_no_ack(Client, Topic, Part, Key, Value) when is_integer(Part) ->
case get_producer(Client, Topic, Part) of
{ok, Pid} -> produce_no_ack(Pid, Key, Value);
{error, Reason} -> {error, Reason}
end;
produce_no_ack(Client, Topic, Partitioner, Key, Value) ->
PartFun = brod_utils:make_part_fun(Partitioner),
case brod_client:get_partitions_count(Client, Topic) of
{ok, PartitionsCnt} ->
{ok, Partition} = PartFun(Topic, PartitionsCnt, Key, Value),
produce_no_ack(Client, Topic, Partition, Key, Value);
{error, _Reason} ->
%% error ignored
ok
end.
%% @doc Same as `produce/5' only the ack is not d
%% @equiv produce_sync(Pid, <<>>, Value)
-spec produce_sync(pid(), value()) -> ok.
produce_sync(Pid, Value) ->
produce_sync(Pid, _Key = <<>>, Value).
%% @doc Sync version of produce/3
%%
%% This function will not return until the response is received from
%% Kafka. But when producer is started with `required_acks' set to 0,
%% this function will return once the messages are buffered in the
%% producer process.
-spec produce_sync(pid(), key(), value()) ->
ok | {error, any()}.
produce_sync(Pid, Key, Value) ->
case produce(Pid, Key, Value) of
{ok, CallRef} ->
%% Wait until the request is acked by kafka
sync_produce_request(CallRef);
{error, Reason} ->
{error, Reason}
end.
%% @doc Sync version of produce/5
%% This function will not return until a response is received from kafka,
%% however if producer is started with required_acks set to 0, this function
%% will return once the messages are buffered in the producer process.
-spec produce_sync(client(), topic(), partition() | partitioner(),
key(), value()) -> ok | {error, any()}.
produce_sync(Client, Topic, Partition, Key, Value) ->
case produce_sync_offset(Client, Topic, Partition, Key, Value) of
{ok, _} -> ok;
Else -> Else
end.
%% @doc Version of produce_sync/5 that returns the offset assigned by Kafka
%% If producer is started with required_acks set to 0, the offset will be
%% `?BROD_PRODUCE_UNKNOWN_OFFSET'.
-spec produce_sync_offset(client(), topic(), partition() | partitioner(),
key(), value()) -> {ok, offset()} | {error, any()}.
produce_sync_offset(Client, Topic, Partition, Key, Value) ->
case produce(Client, Topic, Partition, Key, Value) of
{ok, CallRef} ->
sync_produce_request_offset(CallRef);
{error, Reason} ->
{error, Reason}
end.
%% @doc Block wait for sent produced request to be acked by kafka.
-spec sync_produce_request(call_ref()) ->
ok | {error, Reason :: any()}.
sync_produce_request(CallRef) ->
sync_produce_request(CallRef, infinity).
-spec sync_produce_request(call_ref(), timeout()) ->
ok | {error, Reason :: any()}.
sync_produce_request(CallRef, Timeout) ->
case sync_produce_request_offset(CallRef, Timeout) of
{ok, _} -> ok;
Else -> Else
end.
%% @doc As sync_produce_request_offset/1, but also returning assigned offset
%% See produce_sync_offset/5.
-spec sync_produce_request_offset(call_ref()) ->
{ok, offset()} | {error, Reason :: any()}.
sync_produce_request_offset(CallRef) ->
sync_produce_request_offset(CallRef, infinity).
-spec sync_produce_request_offset(call_ref(), timeout()) ->
{ok, offset()} | {error, Reason :: any()}.
sync_produce_request_offset(CallRef, Timeout) ->
brod_producer:sync_produce_request(CallRef, Timeout).
%% @doc Subscribe to a data stream from the given topic-partition.
%%
%% If `{error, Reason}' is returned, the caller should perhaps retry later.
%%
%% `{ok, ConsumerPid}' is returned on success. The caller may want to
%% monitor the consumer pid and re-subscribe should the `ConsumerPid' crash.
%%
%% Upon successful subscription the subscriber process should expect messages
%% of pattern:
%% `{ConsumerPid, #kafka_message_set{}}' and
%% `{ConsumerPid, #kafka_fetch_error{}}'.
%%
%% `-include_lib("brod/include/brod.hrl")' to access the records.
%%
%% In case `#kafka_fetch_error{}' is received the subscriber should
%% re-subscribe itself to resume the data stream.
-spec subscribe(client(), pid(), topic(), partition(),
consumer_options()) -> {ok, pid()} | {error, any()}.
subscribe(Client, SubscriberPid, Topic, Partition, Options) ->
case brod_client:get_consumer(Client, Topic, Partition) of
{ok, ConsumerPid} ->
case subscribe(ConsumerPid, SubscriberPid, Options) of
ok -> {ok, ConsumerPid};
Error -> Error
end;
{error, Reason} ->
{error, Reason}
end.
-spec subscribe(pid(), pid(), consumer_options()) -> ok | {error, any()}.
subscribe(ConsumerPid, SubscriberPid, Options) ->
brod_consumer:subscribe(ConsumerPid, SubscriberPid, Options).
%% @doc Unsubscribe the current subscriber. Assuming the subscriber is
%% `self()'.
-spec unsubscribe(client(), topic(), partition()) -> ok | {error, any()}.
unsubscribe(Client, Topic, Partition) ->
unsubscribe(Client, Topic, Partition, self()).
%% @doc Unsubscribe the current subscriber.
-spec unsubscribe(client(), topic(), partition(), pid()) -> ok | {error, any()}.
unsubscribe(Client, Topic, Partition, SubscriberPid) ->
case brod_client:get_consumer(Client, Topic, Partition) of
{ok, ConsumerPid} -> unsubscribe(ConsumerPid, SubscriberPid);
Error -> Error
end.
%% @doc Unsubscribe the current subscriber. Assuming the subscriber is
%% `self()'.
-spec unsubscribe(pid()) -> ok | {error, any()}.
unsubscribe(ConsumerPid) ->
unsubscribe(ConsumerPid, self()).
%% @doc Unsubscribe the current subscriber.
-spec unsubscribe(pid(), pid()) -> ok | {error, any()}.
unsubscribe(ConsumerPid, SubscriberPid) ->
brod_consumer:unsubscribe(ConsumerPid, SubscriberPid).
-spec consume_ack(client(), topic(), partition(), offset()) ->
ok | {error, any()}.
consume_ack(Client, Topic, Partition, Offset) ->
case brod_client:get_consumer(Client, Topic, Partition) of
{ok, ConsumerPid} -> consume_ack(ConsumerPid, Offset);
{error, Reason} -> {error, Reason}
end.
-spec consume_ack(pid(), offset()) -> ok | {error, any()}.
consume_ack(ConsumerPid, Offset) ->
brod_consumer:ack(ConsumerPid, Offset).
%% @see brod_group_subscriber:start_link/7
-spec start_link_group_subscriber(
client(), group_id(), [topic()],
group_config(), consumer_config(), module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig,
ConsumerConfig, CbModule, CbInitArg) ->
brod_group_subscriber:start_link(Client, GroupId, Topics, GroupConfig,
ConsumerConfig, CbModule, CbInitArg).
%% @doc Start group_subscriber_v2
-spec start_link_group_subscriber_v2(
brod_group_subscriber_v2:subscriber_config()
) -> {ok, pid()} | {error, any()}.
start_link_group_subscriber_v2(Config) ->
brod_group_subscriber_v2:start_link(Config).
%% @see brod_group_subscriber:start_link/8
-spec start_link_group_subscriber(
client(), group_id(), [topic()], group_config(),
consumer_config(), message | message_set,
module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig,
ConsumerConfig, MessageType,
CbModule, CbInitArg) ->
brod_group_subscriber:start_link(Client, GroupId, Topics, GroupConfig,
ConsumerConfig, MessageType,
CbModule, CbInitArg).
%% @equiv start_link_topic_subscriber(Client, Topic, 'all', ConsumerConfig,
%% CbModule, CbInitArg)
%% @deprecated Please use {@link start_link_topic_subscriber/1} instead
-spec start_link_topic_subscriber(
client(), topic(), consumer_config(), module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_topic_subscriber(Client, Topic, ConsumerConfig,
CbModule, CbInitArg) ->
start_link_topic_subscriber(Client, Topic, all, ConsumerConfig,
CbModule, CbInitArg).
%% @equiv start_link_topic_subscriber(Client, Topic, Partitions,
%% ConsumerConfig, message,
%% CbModule, CbInitArg)
%% @deprecated Please use {@link start_link_topic_subscriber/1} instead
-spec start_link_topic_subscriber(
client(), topic(), all | [partition()],
consumer_config(), module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_topic_subscriber(Client, Topic, Partitions,
ConsumerConfig, CbModule, CbInitArg) ->
start_link_topic_subscriber(Client, Topic, Partitions,
ConsumerConfig, message, CbModule, CbInitArg).
%% @see brod_topic_subscriber:start_link/7
%% @deprecated Please use {@link start_link_topic_subscriber/1} instead
-spec start_link_topic_subscriber(
client(), topic(), all | [partition()],
consumer_config(), message | message_set,
module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_topic_subscriber(Client, Topic, Partitions,
ConsumerConfig, MessageType, CbModule, CbInitArg) ->
brod_topic_subscriber:start_link(Client, Topic, Partitions,
ConsumerConfig, MessageType,
CbModule, CbInitArg).
%% @see brod_topic_subscriber:start_link/1
-spec start_link_topic_subscriber(
brod_topic_subscriber:topic_subscriber_config()
) -> {ok, pid()} | {error, any()}.
start_link_topic_subscriber(Config) ->
brod_topic_subscriber:start_link(Config).
%% @equiv create_topics(Hosts, TopicsConfigs, RequestConfigs, [])
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32(),
validate_only => boolean()}) ->
ok | {ok, kpro:struct()} | {error, any()}.
create_topics(Hosts, TopicConfigs, RequestConfigs) ->
brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs).
%% @doc Create topic(s) in kafka
%% Return the message body of `create_topics', response.
%% See `kpro_schema.erl' for struct details
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32(),
validate_only => boolean()}, conn_config()) ->
ok | {ok, kpro:struct()} | {error, any()}.
create_topics(Hosts, TopicConfigs, RequestConfigs, Options) ->
brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs, Options).
%% @equiv delete_topics(Hosts, Topics, Timeout, [])
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
ok | {error, any()}.
delete_topics(Hosts, Topics, Timeout) ->
brod_utils:delete_topics(Hosts, Topics, Timeout).
%% @doc Delete topic(s) from kafka
%% Return the message body of `delete_topics', response.
%% See `kpro_schema.erl' for struct details
-spec delete_topics([endpoint()], [topic()], pos_integer(), conn_config()) ->
ok | {error, any()}.
delete_topics(Hosts, Topics, Timeout, Options) ->
brod_utils:delete_topics(Hosts, Topics, Timeout, Options).
%% @doc Fetch broker metadata
%% Return the message body of `metadata' response.
%% See `kpro_schema.erl' for details
-spec get_metadata([endpoint()]) -> {ok, kpro:struct()} | {error, any()}.
get_metadata(Hosts) ->
brod_utils:get_metadata(Hosts).
%% @doc Fetch broker/topic metadata
%% Return the message body of `metadata' response.
%% See `kpro_schema.erl' for struct details
-spec get_metadata([endpoint()], all | [topic()]) ->
{ok, kpro:struct()} | {error, any()}.
get_metadata(Hosts, Topics) ->
brod_utils:get_metadata(Hosts, Topics).
%% @doc Fetch broker/topic metadata
%% Return the message body of `metadata' response.
%% See `kpro_schema.erl' for struct details
-spec get_metadata([endpoint()], all | [topic()], conn_config()) ->
{ok, kpro:struct()} | {error, any()}.
get_metadata(Hosts, Topics, Options) ->
brod_utils:get_metadata(Hosts, Topics, Options).
%% @equiv resolve_offset(Hosts, Topic, Partition, latest, 1)
-spec resolve_offset([endpoint()], topic(), partition()) ->
{ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition) ->
resolve_offset(Hosts, Topic, Partition, ?OFFSET_LATEST).
%% @doc Resolve semantic offset or timestamp to real offset.
-spec resolve_offset([endpoint()], topic(), partition(), offset_time()) ->
{ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time) ->
resolve_offset(Hosts, Topic, Partition, Time, []).
%% @doc Resolve semantic offset or timestamp to real offset.
-spec resolve_offset([endpoint()], topic(), partition(),
offset_time(), conn_config()) ->
{ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg) ->
brod_utils:resolve_offset(Hosts, Topic, Partition, Time, ConnCfg).
%% @doc Resolve semantic offset or timestamp to real offset.
-spec resolve_offset([endpoint()], topic(), partition(),
offset_time(), conn_config(),
#{timeout => kpro:int32()}) ->
{ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts) ->
brod_utils:resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts).
%% @doc Fetch a single message set from the given topic-partition.
%% The first arg can either be an already established connection to leader,
%% or `{Endpoints, ConnConfig}' so to establish a new connection before fetch.
-spec fetch(connection() | client_id() | bootstrap(),
topic(), partition(), integer()) ->
{ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
fetch(ConnOrBootstrap, Topic, Partition, Offset) ->
Opts = #{ max_wait_time => 1000
, min_bytes => 1
, max_bytes => 1 bsl 20
},
fetch(ConnOrBootstrap, Topic, Partition, Offset, Opts).
%% @doc Fetch a single message set from the given topic-partition.
%% The first arg can either be an already established connection to leader,
%% or `{Endpoints, ConnConfig}' so to establish a new connection before fetch.
-spec fetch(connection() | client_id() | bootstrap(),
topic(), partition(), offset(), fetch_opts()) ->
{ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
fetch(ConnOrBootstrap, Topic, Partition, Offset, Opts) ->
brod_utils:fetch(ConnOrBootstrap, Topic, Partition, Offset, Opts).
%% @doc Fold through messages in a partition.
%% Works like `lists:foldl/2' but with below stop conditions:
%% * Always return after reach high watermark offset
%% * Return after the given message count limit is reached
%% * Return after the given kafka offset is reached.
%% * Return if the `FoldFun' returns an `{error, Reason}' tuple.
%% NOTE: Exceptions from evaluating `FoldFun' are not caught.
-spec fold(connection() | client_id() | bootstrap(),
topic(), partition(), offset(), fetch_opts(),
Acc, fold_fun(Acc), fold_limits()) ->
fold_result() when Acc :: fold_acc().
fold(Bootstrap, Topic, Partition, Offset, Opts, Acc, Fun, Limits) ->
brod_utils:fold(Bootstrap, Topic, Partition, Offset, Opts, Acc, Fun, Limits).
%% @deprecated
%% fetch(Hosts, Topic, Partition, Offset, Wait, MinBytes, MaxBytes, [])
-spec fetch([endpoint()], topic(), partition(), offset(),
non_neg_integer(), non_neg_integer(), pos_integer()) ->
{ok, [message()]} | {error, any()}.
fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes) ->
fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes, []).
%% @deprecated Fetch a single message set from the given topic-partition.
-spec fetch([endpoint()], topic(), partition(), offset(),
non_neg_integer(), non_neg_integer(), pos_integer(),
conn_config()) -> {ok, [message()]} | {error, any()}.
fetch(Hosts, Topic, Partition, Offset,
MaxWaitTime, MinBytes, MaxBytes, ConnConfig) ->
FetchOpts = #{ max_wait_time => MaxWaitTime
, min_bytes => MinBytes
, max_bytes => MaxBytes
},
case fetch({Hosts, ConnConfig}, Topic, Partition, Offset, FetchOpts) of
{ok, {_HwOffset, Batch}} -> {ok, Batch}; %% backward compatible
{error, Reason} -> {error, Reason}
end.
%% @doc Connect partition leader.
-spec connect_leader([endpoint()], topic(), partition(),
conn_config()) -> {ok, pid()}.
connect_leader(Hosts, Topic, Partition, ConnConfig) ->
KproOptions = brod_utils:kpro_connection_options(ConnConfig),
kpro:connect_partition_leader(Hosts, ConnConfig, Topic, Partition, KproOptions).
%% @doc List ALL consumer groups in the given kafka cluster.
%% NOTE: Exception if failed to connect any of the coordinator brokers.
-spec list_all_groups([endpoint()], conn_config()) ->
[{endpoint(), [cg()] | {error, any()}}].
list_all_groups(Endpoints, ConnCfg) ->
brod_utils:list_all_groups(Endpoints, ConnCfg).
%% @doc List consumer groups in the given group coordinator broker.
-spec list_groups(endpoint(), conn_config()) -> {ok, [cg()]} | {error, any()}.
list_groups(CoordinatorEndpoint, ConnCfg) ->
brod_utils:list_groups(CoordinatorEndpoint, ConnCfg).
%% @doc Describe consumer groups. The given consumer group IDs should be all
%% managed by the coordinator-broker running at the given endpoint.
%% Otherwise error codes will be returned in the result structs.
%% Return `describe_groups' response body field named `groups'.
%% See `kpro_schema.erl' for struct details
-spec describe_groups(endpoint(), conn_config(), [group_id()]) ->
{ok, [kpro:struct()]} | {error, any()}.
describe_groups(CoordinatorEndpoint, ConnCfg, IDs) ->
brod_utils:describe_groups(CoordinatorEndpoint, ConnCfg, IDs).
%% @doc Connect to consumer group coordinator broker.
%%
%% Done in steps: <ol>
%% <li>Connect to any of the given bootstrap ednpoints</li>
%%
%% <li>Send group_coordinator_request to resolve group coordinator
%% endpoint</li>
%%
%% <li>Connect to the resolved endpoint and return the connection
%% pid</li></ol>
-spec connect_group_coordinator([endpoint()], conn_config(), group_id()) ->
{ok, pid()} | {error, any()}.
connect_group_coordinator(BootstrapEndpoints, ConnCfg, GroupId) ->
KproOptions = brod_utils:kpro_connection_options(ConnCfg),
Args = maps:merge(KproOptions, #{type => group, id => GroupId}),
kpro:connect_coordinator(BootstrapEndpoints, ConnCfg, Args).
%% @doc Fetch committed offsets for ALL topics in the given consumer group.
%% Return the `responses' field of the `offset_fetch' response.
%% See `kpro_schema.erl' for struct details.
-spec fetch_committed_offsets([endpoint()], conn_config(), group_id()) ->
{ok, [kpro:struct()]} | {error, any()}.
fetch_committed_offsets(BootstrapEndpoints, ConnCfg, GroupId) ->
brod_utils:fetch_committed_offsets(BootstrapEndpoints, ConnCfg, GroupId, []).
%% @doc Same as `fetch_committed_offsets/3',
%% but works with a started `brod_client'
-spec fetch_committed_offsets(client(), group_id()) ->
{ok, [kpro:struct()]} | {error, any()}.
fetch_committed_offsets(Client, GroupId) ->
brod_utils:fetch_committed_offsets(Client, GroupId, []).
-ifdef(build_brod_cli).
main(X) -> brod_cli:main(X).
-endif.
%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End: