src/brod.erl

%%%
%%%   Copyright (c) 2014-2021, Klarna Bank AB (publ)
%%%
%%%   Licensed under the Apache License, Version 2.0 (the "License");
%%%   you may not use this file except in compliance with the License.
%%%   You may obtain a copy of the License at
%%%
%%%       http://www.apache.org/licenses/LICENSE-2.0
%%%
%%%   Unless required by applicable law or agreed to in writing, software
%%%   distributed under the License is distributed on an "AS IS" BASIS,
%%%   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%%   See the License for the specific language governing permissions and
%%%   limitations under the License.
%%%

-module(brod).
-behaviour(application).

%% Application
-export([ start/0
        , start/2
        , stop/0
        , stop/1
        ]).

%% Client API
-export([ get_partitions_count/2
        , get_partitions_count_safe/2
        , start_client/1
        , start_client/2
        , start_client/3
        , start_consumer/3
        , start_producer/3
        , stop_client/1
        ]).

-export([ start_link_client/1
        , start_link_client/2
        , start_link_client/3
        ]).

%% Producer API
-export([ get_producer/3
        , produce/2
        , produce/3
        , produce/5
        , produce_cb/4
        , produce_cb/6
        , produce_sync/2
        , produce_sync/3
        , produce_sync/5
        , produce_sync_offset/5
        , produce_no_ack/5
        , sync_produce_request/1
        , sync_produce_request/2
        , sync_produce_request_offset/1
        , sync_produce_request_offset/2
        ]).

%% Simple Consumer API
-export([ consume_ack/2
        , consume_ack/4
        , get_consumer/3
        , subscribe/3
        , subscribe/5
        , unsubscribe/1
        , unsubscribe/2
        , unsubscribe/3
        , unsubscribe/4
        ]).

%% Subscriber API
-export([ start_link_group_subscriber_v2/1
        , start_link_topic_subscriber/1
        ]).

%% Deprecated API
-export([ start_link_group_subscriber/7
        , start_link_group_subscriber/8
        , start_link_topic_subscriber/5
        , start_link_topic_subscriber/6
        , start_link_topic_subscriber/7
        ]).

%% Topic APIs
-export([ create_topics/3
        , create_topics/4
        , delete_topics/3
        , delete_topics/4
        ]).

%% APIs for quick metadata or message inspection and brod_cli
-export([ get_metadata/1
        , get_metadata/2
        , get_metadata/3
        , resolve_offset/3
        , resolve_offset/4
        , resolve_offset/5
        , resolve_offset/6
        , fetch/4
        , fetch/5
        , fold/8
        , connect_leader/4
        , list_all_groups/2
        , list_groups/2
        , describe_groups/3
        , connect_group_coordinator/3
        , fetch_committed_offsets/2
        , fetch_committed_offsets/3
        ]).

-deprecated([ {fetch, 7, next_version}
            , {fetch, 8, next_version}
            ]).

-export([ fetch/7
        , fetch/8
        ]).

-ifdef(build_brod_cli).
-export([main/1]).
-endif.

-export_type([ batch_input/0
             , bootstrap/0
             , call_ref/0
             , cg/0
             , cg_protocol_type/0
             , client/0
             , client_config/0
             , client_id/0
             , compression/0
             , connection/0
             , conn_config/0
             , consumer_config/0
             , endpoint/0
             , error_code/0
             , fetch_opts/0
             , fold_acc/0
             , fold_fun/1
             , fold_limits/0
             , fold_stop_reason/0
             , fold_result/0
             , group_config/0
             , group_generation_id/0
             , group_id/0
             , group_member/0
             , group_member_id/0
             , hostname/0
             , key/0
             , msg_input/0
             , msg_ts/0
             , message/0
             , message_set/0
             , offset/0
             , offset_time/0
             , partition/0
             , partition_assignment/0
             , partition_fun/0
             , partitioner/0
             , portnum/0
             , produce_ack_cb/0
             , producer_config/0
             , produce_reply/0
             , produce_result/0
             , received_assignments/0
             , topic/0
             , topic_partition/0
             , value/0
             ]).

-include("brod_int.hrl").

%%%_* Types ====================================================================

%% basics
-type hostname() :: kpro:hostname().
-type portnum() :: pos_integer().
-type endpoint() :: {hostname(), portnum()}.
-type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
-type partition() :: kpro:partition().
-type topic_partition() :: {topic(), partition()}.
-type offset() :: kpro:offset(). %% Physical offset (an integer)
-type key() :: undefined %% no key, transformed to <<>>
             | binary().
-type value() :: undefined %% no value, transformed to <<>>
               | iodata() %% single value
               | {msg_ts(), binary()} %% one message with timestamp
               | [?KV(key(), value())] %% backward compatible
               | [?TKV(msg_ts(), key(), value())] %% backward compatible
               | kpro:msg_input() %% one magic v2 message
               | kpro:batch_input(). %% maybe nested batch

-type msg_input() :: kpro:msg_input().
-type batch_input() :: [msg_input()].

-type msg_ts() :: kpro:msg_ts(). %% Unix time in milliseconds
-type client_id() :: atom().
-type client() :: client_id() | pid().
-type client_config() :: brod_client:config().
-type bootstrap() :: [endpoint()] %% default client config
                   | {[endpoint()], client_config()}.
-type offset_time() :: msg_ts()
                     | ?OFFSET_EARLIEST
                     | ?OFFSET_LATEST.
-type message() :: kpro:message(). %% A record with offset, key, value, ts_type, ts, and headers.
-type message_set() :: #kafka_message_set{}.
%% A record with topic, partition, high_wm_offset (max offset of the partition), and messages.
%%
%% See <a href="https://github.com/kafka4beam/brod/blob/master/include/brod.hrl#L26">
%% the definition</a> for more information.
-type error_code() :: kpro:error_code().

%% producers
-type produce_reply() :: #brod_produce_reply{}.
%% A record with call_ref, base_offset, and result.
%%
%% See the <a href="https://github.com/kafka4beam/brod/blob/master/include/brod.hrl#L49">
%% the definition</a> for more information.
-type producer_config() :: brod_producer:config().
-type partition_fun() :: fun((topic(), pos_integer(), key(), value()) ->
                                {ok, partition()}).
-type partitioner() :: partition_fun() | random | hash.
-type produce_ack_cb() :: fun((partition(), offset()) -> _).
-type compression() :: no_compression | gzip | snappy.
-type call_ref() :: #brod_call_ref{}. %% A record with caller, callee, and ref.
-type produce_result() :: brod_produce_req_buffered
                        | brod_produce_req_acked.


%% consumers
-type consumer_config() :: [ {begin_offset,        offset_time()}
                           | {min_bytes,           non_neg_integer()}
                           | {max_bytes,           non_neg_integer()}
                           | {max_wait_time,       integer()}
                           | {sleep_timeout,       integer()}
                           | {prefetch_count,      integer()}
                           | {prefetch_bytes,      non_neg_integer()}
                           | {offset_reset_policy, brod_consumer:offset_reset_policy()}
                           | {size_stat_window,    non_neg_integer()}
                           | {isolation_level,     brod_consumer:isolation_level()}
                           ].
%% Consumer configuration.
%%
%% The meaning of the options is documented at {@link brod_consumer:start_link/5}.
-type connection() :: kpro:connection().
-type conn_config() :: [{atom(), term()}] | kpro:conn_config().
%% Connection configuration that will be passed to `kpro' calls.
%%
%% For more info, see the {@link kpro_connection:config()} type.

%% consumer groups
-type group_id() :: kpro:group_id().
-type group_member_id() :: binary().
-type group_member() :: {group_member_id(), #kafka_group_member_metadata{}}.
-type group_generation_id() :: non_neg_integer().
-type group_config() :: proplists:proplist().
-type partition_assignment() :: {topic() , [partition()]}.
-type received_assignments() :: [#brod_received_assignment{}].
-type cg() :: #brod_cg{}.
-type cg_protocol_type() :: binary().
-type fetch_opts() :: kpro:fetch_opts().
-type fold_acc() :: term().
-type fold_fun(Acc) :: fun((message(), Acc) -> {ok, Acc} | {error, any()}).
%% `fold' always returns when reaches the high watermark offset. `fold'
%% also returns when any of the limits is hit.
-type fold_limits() :: #{ message_count => pos_integer()
                        , reach_offset => offset()
                        }.
-type fold_stop_reason() :: reached_end_of_partition
                          | reached_message_count_limit
                          | reached_target_offset
                          | {error, any()}.
%% OffsetToContinue: begin offset for the next fold call
-type fold_result() :: ?BROD_FOLD_RET(fold_acc(),
                                      OffsetToContinue :: offset(),
                                      fold_stop_reason()).

%%%_* APIs =====================================================================

%% @doc Start brod application.
-spec start() -> ok | no_return().
start() ->
  {ok, _Apps} = application:ensure_all_started(brod),
  ok.

%% @doc Stop brod application.
-spec stop() -> ok.
stop() ->
  application:stop(brod).

%% @doc Application behaviour callback
start(_StartType, _StartArgs) -> brod_sup:start_link().

%% @doc Application behaviour callback
stop(_State) -> ok.

%% @equiv start_client(BootstrapEndpoints, brod_default_client)
-spec start_client([endpoint()]) -> ok | {error, any()}.
start_client(BootstrapEndpoints) ->
  start_client(BootstrapEndpoints, ?BROD_DEFAULT_CLIENT_ID).

%% @equiv start_client(BootstrapEndpoints, ClientId, [])
-spec start_client([endpoint()], client_id()) -> ok | {error, any()}.
start_client(BootstrapEndpoints, ClientId) ->
  start_client(BootstrapEndpoints, ClientId, []).

%% @doc Start a client ({@link brod_client}).
%%
%% `BootstrapEndpoints':
%%   Kafka cluster endpoints, can be any of the brokers in the cluster,
%%   which does not necessarily have to be the leader of any partition,
%%   e.g. a load-balanced entrypoint to the remote Kafka cluster.
%%
%% `ClientId': Atom to identify the client process.
%%
%% `Config' is a proplist, possible values:
%%  <ul>
%%   <li>`restart_delay_seconds' (optional, default=10)
%%
%%     How long to wait between attempts to restart brod_client
%%     process when it crashes.</li>
%%
%%   <li>`get_metadata_timeout_seconds' (optional, default=5)
%%
%%     Return `{error, timeout}' from `brod_client:get_xxx' calls if
%%     responses for APIs such as `metadata', `find_coordinator'
%%     are not received in time.</li>
%%
%%   <li>`reconnect_cool_down_seconds' (optional, default=1)
%%
%%     Delay this configured number of seconds before retrying to
%%     establish a new connection to the kafka partition leader.</li>
%%
%%   <li>`allow_topic_auto_creation' (optional, default=true)
%%
%%     By default, brod respects what is configured in the broker
%%     about topic auto-creation. i.e. whether
%%     `auto.create.topics.enable' is set in the broker configuration.
%%     However if `allow_topic_auto_creation' is set to `false' in
%%     client config, brod will avoid sending metadata requests that
%%     may cause an auto-creation of the topic regardless of what
%%     broker config is.</li>
%%
%%   <li>`auto_start_producers' (optional, default=false)
%%
%%     If true, brod client will spawn a producer automatically when
%%     user is trying to call `produce' but did not call `brod:start_producer'
%%     explicitly. Can be useful for applications which don't know beforehand
%%     which topics they will be working with.</li>
%%
%%   <li>`default_producer_config' (optional, default=[])
%%
%%     Producer configuration to use when auto_start_producers is true.
%%     See {@link brod_producer:start_link/4} for details about producer config</li>
%%
%% </ul>
%%
%% Connection options can be added to the same proplist. See
%% `kpro_connection.erl' in `kafka_protocol' for the details:
%%
%% <ul>
%%   <li>`ssl' (optional, default=false)
%%
%%     `true | false | ssl:ssl_option()'
%%     `true' is translated to `[]' as `ssl:ssl_option()' i.e. all default.
%%   </li>
%%
%%   <li>`sasl' (optional, default=undefined)
%%
%%     Credentials for SASL/Plain authentication.
%%     `{mechanism(), Filename}' or `{mechanism(), UserName, Password}'
%%     where mechanism can be atoms: `plain' (for "PLAIN"), `scram_sha_256'
%%     (for "SCRAM-SHA-256") or `scram_sha_512' (for SCRAM-SHA-512).
%%     `Filename' should be a file consisting two lines, first line
%%     is the username and the second line is the password.
%%     Both `Username' and `Password' should be `string() | binary()'</li>
%%
%%   <li>`connect_timeout' (optional, default=5000)
%%
%%     Timeout when trying to connect to an endpoint.</li>
%%
%%   <li>`request_timeout' (optional, default=240000, constraint: >= 1000)
%%
%%     Timeout when waiting for a response, connection restart when timed
%%     out.</li>
%%
%%   <li>`query_api_versions' (optional, default=true)
%%
%%     Must be set to false to work with kafka versions prior to 0.10,
%%     When set to `true', at connection start, brod will send a query request
%%     to get the broker supported API version ranges.
%%     When set to 'false', brod will always use the lowest supported API version
%%     when sending requests to kafka.
%%     Supported API version ranges can be found in:
%%     `brod_kafka_apis:supported_versions/1'</li>
%%
%%   <li>`extra_sock_opts' (optional, default=[])
%%
%%     Extra socket options to tune socket performance.
%%     e.g. `[{sndbuf, 1 bsl 20}]'.
%%     <a href="http://erlang.org/doc/man/gen_tcp.html#type-option">More info
%%     </a>
%%   </li>
%% </ul>
%%
%% You can read more about clients in the
%% <a href="https://hexdocs.pm/brod/readme.html#clients">overview</a>.
-spec start_client([endpoint()], client_id(), client_config()) ->
                      ok | {error, any()}.
start_client(BootstrapEndpoints, ClientId, Config) ->
  case brod_sup:start_client(BootstrapEndpoints, ClientId, Config) of
    ok                               -> ok;
    {error, {already_started, _Pid}} -> ok;
    {error, Reason}                  -> {error, Reason}
  end.

%% @equiv start_link_client(BootstrapEndpoints, brod_default_client)
-spec start_link_client([endpoint()]) -> {ok, pid()} | {error, any()}.
start_link_client(BootstrapEndpoints) ->
  start_link_client(BootstrapEndpoints, ?BROD_DEFAULT_CLIENT_ID).

%% @equiv start_link_client(BootstrapEndpoints, ClientId, [])
-spec start_link_client([endpoint()], client_id()) ->
        {ok, pid()} | {error, any()}.
start_link_client(BootstrapEndpoints, ClientId) ->
  start_link_client(BootstrapEndpoints, ClientId, []).

-spec start_link_client([endpoint()], client_id(), client_config()) ->
        {ok, pid()} | {error, any()}.
start_link_client(BootstrapEndpoints, ClientId, Config) ->
  brod_client:start_link(BootstrapEndpoints, ClientId, Config).

%% @doc Stop a client.
-spec stop_client(client()) -> ok.
stop_client(Client) when is_atom(Client) ->
  case brod_sup:find_client(Client) of
    [_Pid] -> brod_sup:stop_client(Client);
    []     -> brod_client:stop(Client)
  end;
stop_client(Client) when is_pid(Client) ->
  brod_client:stop(Client).

%% @doc Dynamically start a per-topic producer and register it in the client.
%%
%% You have to start a producer for each topic you want to produce messages
%% into, unless you have specified `auto_start_producers = true' when starting
%% the client (in that case you don't have to call this function at all).
%%
%% After starting the producer, you can call {@link produce/5} and friends
%% for producing messages.
%%
%% You can read more about producers in the
%% <a href="https://hexdocs.pm/brod/readme.html#producers">overview</a>.
%%
%% A client has to be already started before making this call (e.g. by calling
%% {@link start_client/3}).
%%
%% See {@link brod_producer:start_link/4} for a list of available configuration
%% options.
%%
%% Example:
%% ```
%% > brod:start_producer(my_client, <<"my_topic">>, [{max_retries, 5}]).
%% ok
%% '''
-spec start_producer(client(), topic(), producer_config()) ->
                        ok | {error, any()}.
start_producer(Client, TopicName, ProducerConfig) ->
  brod_client:start_producer(Client, TopicName, ProducerConfig).

%% @doc Dynamically start topic consumer(s) and register it in the client.
%%
%% A {@link brod_consumer} is started for each partition of the given topic.
%% Note that you can have only one consumer per client-topic.
%%
%% See {@link brod_consumer:start_link/5} for details about consumer config.
%%
%% You can read more about consumers in the
%% <a href="https://hexdocs.pm/brod/readme.html#consumers">overview</a>.
-spec start_consumer(client(), topic(), consumer_config()) ->
                        ok | {error, any()}.
start_consumer(Client, TopicName, ConsumerConfig) ->
  brod_client:start_consumer(Client, TopicName, ConsumerConfig).

%% @doc Get number of partitions for a given topic.
%%
%% The higher level producers may need the partition numbers to
%% find the partition producer pid – if the number of partitions
%% is not statically configured for them.
%% It is up to the callers how they want to distribute their data
%% (e.g. random, roundrobin or consistent-hashing) to the partitions.
%% NOTE: The partitions count is cached for 120 seconds.
-spec get_partitions_count(client(), topic()) ->
        {ok, pos_integer()} | {error, any()}.
get_partitions_count(Client, Topic) ->
  brod_client:get_partitions_count(Client, Topic).

%% @doc The same as `get_partitions_count(Client, Topic)'
%% but ensured not to auto-create topics in Kafka even
%% when Kafka has topic auto-creation configured.
-spec get_partitions_count_safe(client(), topic()) ->
        {ok, pos_integer()} | {error, any()}.
get_partitions_count_safe(Client, Topic) ->
  brod_client:get_partitions_count_safe(Client, Topic).

-spec get_consumer(client(), topic(), partition()) ->
        {ok, pid()} | {error, Reason}
          when Reason :: client_down
                       | {client_down, any()}
                       | {consumer_down, any()}
                       | {consumer_not_found, topic()}
                       | {consumer_not_found, topic(), partition()}.
get_consumer(Client, Topic, Partition) ->
  brod_client:get_consumer(Client, Topic, Partition).

%% @equiv brod_client:get_producer(Client, Topic, Partition)
-spec get_producer(client(), topic(), partition()) ->
        {ok, pid()} | {error, Reason}
          when Reason :: client_down
                       | {client_down, any()}
                       | {producer_down, any()}
                       | {producer_not_found, topic()}
                       | {producer_not_found, topic(), partition()}.
get_producer(Client, Topic, Partition) ->
  brod_client:get_producer(Client, Topic, Partition).

%% @equiv produce(Pid, <<>>, Value)
-spec produce(pid(), value()) -> {ok, call_ref()} | {error, any()}.
produce(Pid, Value) ->
  produce(Pid, _Key = <<>>, Value).

%% @doc Produce one or more messages.
%%
%% See {@link produce/5} for information about possible shapes
%% of `Value'.
%%
%% The pid should be a partition producer pid, NOT client pid.
%%
%% The return value is a call reference of type `call_ref()',
%% so the caller can use it to expect (match)
%% a `#brod_produce_reply{result = brod_produce_req_acked}'
%% message after the produce request has been acked by Kafka.
-spec produce(pid(), key(), value()) ->
        {ok, call_ref()} | {error, any()}.
produce(ProducerPid, Key, Value) ->
  brod_producer:produce(ProducerPid, Key, Value).

%% @doc Produce one or more messages.
%%
%% `Value' can have many different forms:
%% <ul>
%%  <li>`binary()': Single message with key from the `Key' argument</li>
%%  <li>`{brod:msg_ts(), binary()}': Single message with
%%       its create-time timestamp and key from `Key'</li>
%%  <li>`#{ts => brod:msg_ts(), value => binary(), headers => [{_, _}]}':
%%       Single message; if this map does not have a `key'
%%       field, `Key' is used instead</li>
%%  <li>`[{K, V} | {T, K, V}]': A batch, where `V' could be
%%       a nested list of such representation</li>
%%  <li>`[#{key => K, value => V, ts => T, headers => [{_, _}]}]':
%%       A batch</li>
%% </ul>
%%
%% When `Value' is a batch, the `Key' argument is only used
%% as partitioner input and all messages are written on the
%% same partition.
%%
%% `ts' field is dropped for kafka prior to version `0.10'
%% (produce API version 0, magic version 0). `headers' field
%% is dropped for kafka prior to version `0.11' (produce API
%% version 0-2, magic version 0-1).
%%
%% `Partition' may be either a concrete partition (an integer)
%% or a partitioner (see {@link partitioner()} for more info).
%%
%% A producer for the particular topic has to be already started
%% (by calling {@link start_producer/3}), unless you have specified
%% `auto_start_producers = true' when starting the client.
%%
%% This function first looks up the producer pid, then calls {@link produce/3}
%% to do the real work.
%%
%% The return value is a call reference of type {@link call_ref()}, so the caller
%% can used it to expect (match)
%% a `#brod_produce_reply{result = brod_produce_req_acked}'
%% (see the {@link produce_reply()} type) message after the
%% produce request has been acked by Kafka.
%%
%% Example:
%% ```
%% > brod:produce(my_client, <<"my_topic">>, 0, "key", <<"Hello from erlang!">>).
%% {ok,{brod_call_ref,<0.83.0>,<0.133.0>,#Ref<0.3024768151.2556690436.92841>}}
%% > flush().
%% Shell got {brod_produce_reply,
%%               {brod_call_ref,<0.83.0>,<0.133.0>,
%%                   #Ref<0.3024768151.2556690436.92841>},
%%               12,brod_produce_req_acked}
%% '''
-spec produce(client(), topic(), partition() | partitioner(),
              key(), value()) -> {ok, call_ref()} | {error, any()}.
produce(Client, Topic, Partition, Key, Value) when is_integer(Partition) ->
  case get_producer(Client, Topic, Partition) of
    {ok, Pid} -> produce(Pid, Key, Value);
    {error, Reason} -> {error, Reason}
  end;
produce(Client, Topic, Partitioner, Key, Value) ->
  PartFun = brod_utils:make_part_fun(Partitioner),
  case brod_client:get_partitions_count(Client, Topic) of
    {ok, PartitionsCnt} ->
      {ok, Partition} = PartFun(Topic, PartitionsCnt, Key, Value),
      produce(Client, Topic, Partition, Key, Value);
    {error, Reason} ->
      {error, Reason}
  end.

%% @doc Same as {@link produce/3}, only the ack is not delivered as a message,
%% instead, the callback is evaluated by producer worker when ack is received
%% from kafka (see the {@link produce_ack_cb()} type).
-spec produce_cb(pid(), key(), value(), produce_ack_cb()) ->
        ok | {error, any()}.
produce_cb(ProducerPid, Key, Value, AckCb) ->
  brod_producer:produce_cb(ProducerPid, Key, Value, AckCb).

%% @doc Same as {@link produce/5} only the ack is not delivered as a message,
%% instead, the callback is evaluated by producer worker when ack is received
%% from kafka (see the {@link produce_ack_cb()} type).
%%
%% Return the partition to caller as `{ok, Partition}' for caller
%% to correlate the callback when the 3rd arg is not a partition number.
-spec produce_cb(client(), topic(), partition() | partitioner(),
                 key(), value(), produce_ack_cb()) ->
        ok | {ok, partition()} | {error, any()}.
produce_cb(Client, Topic, Part, Key, Value, AckCb) when is_integer(Part) ->
  case get_producer(Client, Topic, Part) of
    {ok, Pid} -> produce_cb(Pid, Key, Value, AckCb);
    {error, Reason} -> {error, Reason}
  end;
produce_cb(Client, Topic, Partitioner, Key, Value, AckCb) ->
  PartFun = brod_utils:make_part_fun(Partitioner),
  case brod_client:get_partitions_count(Client, Topic) of
    {ok, PartitionsCnt} ->
      {ok, Partition} = PartFun(Topic, PartitionsCnt, Key, Value),
      case produce_cb(Client, Topic, Partition, Key, Value, AckCb) of
        ok -> {ok, Partition};
        {error, Reason} -> {error, Reason}
      end;
    {error, Reason} ->
      {error, Reason}
  end.

%% @doc Send the message to partition worker without any ack.
%%
%% NOTE: This call has no back-pressure to the caller,
%%       excessive usage may cause BEAM to run out of memory.
-spec produce_no_ack(pid(), key(), value()) -> ok | {error, any()}.
produce_no_ack(ProducerPid, Key, Value) ->
  brod_producer:produce_no_ack(ProducerPid, Key, Value).

%% @doc Find the partition worker and send message without any ack.
%%
%% NOTE: This call has no back-pressure to the caller,
%%       excessive usage may cause BEAM to run out of memory.
-spec produce_no_ack(client(), topic(), partition() | partitioner(),
           key(), value()) -> ok | {error, any()}.
produce_no_ack(Client, Topic, Part, Key, Value) when is_integer(Part) ->
  case get_producer(Client, Topic, Part) of
    {ok, Pid} -> produce_no_ack(Pid, Key, Value);
    {error, Reason} -> {error, Reason}
  end;
produce_no_ack(Client, Topic, Partitioner, Key, Value) ->
  PartFun = brod_utils:make_part_fun(Partitioner),
  case brod_client:get_partitions_count(Client, Topic) of
    {ok, PartitionsCnt} ->
      {ok, Partition} = PartFun(Topic, PartitionsCnt, Key, Value),
      produce_no_ack(Client, Topic, Partition, Key, Value);
    {error, _Reason} ->
      %% error ignored
      ok
  end.

%% @equiv produce_sync(Pid, <<>>, Value)
-spec produce_sync(pid(), value()) -> ok | {error, any()}.
produce_sync(Pid, Value) ->
  produce_sync(Pid, _Key = <<>>, Value).

%% @doc Sync version of {@link produce/3}.
%%
%% This function will not return until the response is received from
%% Kafka. But when producer is started with `required_acks' set to 0,
%% this function will return once the messages are buffered in the
%% producer process.
-spec produce_sync(pid(), key(), value()) ->
        ok | {error, any()}.
produce_sync(Pid, Key, Value) ->
  case produce(Pid, Key, Value) of
    {ok, CallRef} ->
      %% Wait until the request is acked by kafka
      sync_produce_request(CallRef);
    {error, Reason} ->
      {error, Reason}
  end.

%% @doc Sync version of {@link produce/5}.
%%
%% This function will not return until a response is received from kafka,
%% however if producer is started with `required_acks' set to 0, this function
%% will return once the messages are buffered in the producer process.
-spec produce_sync(client(), topic(), partition() | partitioner(),
                   key(), value()) -> ok | {error, any()}.
produce_sync(Client, Topic, Partition, Key, Value) ->
  case produce_sync_offset(Client, Topic, Partition, Key, Value) of
    {ok, _} -> ok;
    Else -> Else
  end.

%% @doc Version of {@link produce_sync/5} that returns the offset assigned by Kafka.
%%
%% If producer is started with `required_acks' set to 0, the offset will be
%% `?BROD_PRODUCE_UNKNOWN_OFFSET'.
-spec produce_sync_offset(client(), topic(), partition() | partitioner(),
                          key(), value()) -> {ok, offset()} | {error, any()}.
produce_sync_offset(Client, Topic, Partition, Key, Value) ->
  case produce(Client, Topic, Partition, Key, Value) of
    {ok, CallRef} ->
      sync_produce_request_offset(CallRef);
    {error, Reason} ->
      {error, Reason}
  end.

%% @equiv sync_produce_request(CallRef, infinity)
-spec sync_produce_request(call_ref()) ->
        ok | {error, Reason :: any()}.
sync_produce_request(CallRef) ->
  sync_produce_request(CallRef, infinity).

%% @doc Block wait for sent produced request to be acked by kafka.
%%
%% This way, you can turn asynchronous requests, made by {@link produce/5}
%% and friends, into synchronous ones.
%%
%% Example:
%% ```
%% {ok, CallRef} = brod:produce(
%%   brod_client_1, <<"my_topic">>, 0, <<"some-key">>, <<"some-value">>)
%% ). % returns immediately
%% % the following call waits and returns after the ack is received or timed out
%% brod:sync_produce_request(CallRef, 5_000).
%% '''
-spec sync_produce_request(call_ref(), timeout()) ->
        ok | {error, Reason :: any()}.
sync_produce_request(CallRef, Timeout) ->
  case sync_produce_request_offset(CallRef, Timeout) of
    {ok, _} -> ok;
    Else -> Else
  end.

%% @equiv sync_produce_request_offset(CallRef, infinity)
-spec sync_produce_request_offset(call_ref()) ->
        {ok, offset()} | {error, Reason :: any()}.
sync_produce_request_offset(CallRef) ->
  sync_produce_request_offset(CallRef, infinity).

%% @doc As {@link sync_produce_request/2}, but also returning assigned offset.
%%
%% See @{link produce_sync_offset/5}.
-spec sync_produce_request_offset(call_ref(), timeout()) ->
        {ok, offset()} | {error, Reason :: any()}.
sync_produce_request_offset(CallRef, Timeout) ->
  brod_producer:sync_produce_request(CallRef, Timeout).

%% @doc Subscribe to a data stream from the given topic-partition.
%%
%% A client has to be already started (by calling {@link start_client/3},
%% one client per multiple topics is enough) and a corresponding consumer
%% for the topic and partition as well (by calling {@link start_consumer/3}),
%% before calling this function.
%%
%% Caller may specify a set of options extending consumer config.
%% See {@link brod_consumer:subscribe/3} for more info on that.
%%
%% If `{error, Reason}' is returned, the caller should perhaps retry later.
%%
%% `{ok, ConsumerPid}' is returned on success. The caller may want to
%% monitor the consumer pid and re-subscribe should the `ConsumerPid' crash.
%%
%% Upon successful subscription the subscriber process should expect messages
%% of pattern:
%% `{ConsumerPid, #kafka_message_set{}}' and
%% `{ConsumerPid, #kafka_fetch_error{}}'.
%%
%% `-include_lib("brod/include/brod.hrl")' to access the records.
%%
%% In case `#kafka_fetch_error{}' is received the subscriber should
%% re-subscribe itself to resume the data stream.
%%
%% To provide a mechanism to handle backpressure, brod requires all messages
%% sent to a subscriber to be acked by calling {@link consume_ack/4} after
%% they are processed. If there are too many not-acked messages received by
%% the subscriber, the consumer will stop to fetch new ones so the subscriber
%% won't get overwhelmed.
%%
%% Only one process can be subscribed to a consumer. This means that if
%% you want to read at different places (or at different paces), you have
%% to create separate consumers (and thus also separate clients).
-spec subscribe(client(), pid(), topic(), partition(),
                consumer_config()) -> {ok, pid()} | {error, any()}.
subscribe(Client, SubscriberPid, Topic, Partition, Options) ->
  case brod_client:get_consumer(Client, Topic, Partition) of
    {ok, ConsumerPid} ->
      case subscribe(ConsumerPid, SubscriberPid, Options) of
        ok    -> {ok, ConsumerPid};
        Error -> Error
      end;
    {error, Reason} ->
      {error, Reason}
  end.

%% @doc Subscribe to a data stream from the given consumer.
%%
%% See {@link subscribe/5} for more information.
-spec subscribe(pid(), pid(), consumer_config()) -> ok | {error, any()}.
subscribe(ConsumerPid, SubscriberPid, Options) ->
  brod_consumer:subscribe(ConsumerPid, SubscriberPid, Options).

%% @doc Unsubscribe the current subscriber.
%%
%% Assuming the subscriber is %% `self()'.
-spec unsubscribe(client(), topic(), partition()) -> ok | {error, any()}.
unsubscribe(Client, Topic, Partition) ->
  unsubscribe(Client, Topic, Partition, self()).

%% @doc Unsubscribe the current subscriber.
-spec unsubscribe(client(), topic(), partition(), pid()) -> ok | {error, any()}.
unsubscribe(Client, Topic, Partition, SubscriberPid) ->
  case brod_client:get_consumer(Client, Topic, Partition) of
    {ok, ConsumerPid} -> unsubscribe(ConsumerPid, SubscriberPid);
    Error             -> Error
  end.

%% @doc Unsubscribe the current subscriber.
%%
%% Assuming the subscriber is %% `self()'.
-spec unsubscribe(pid()) -> ok | {error, any()}.
unsubscribe(ConsumerPid) ->
  unsubscribe(ConsumerPid, self()).

%% @doc Unsubscribe the current subscriber.
-spec unsubscribe(pid(), pid()) -> ok | {error, any()}.
unsubscribe(ConsumerPid, SubscriberPid) ->
  brod_consumer:unsubscribe(ConsumerPid, SubscriberPid).

%% @doc Acknowledge that one or more messages have been processed.
%%
%% {@link brod_consumer} sends message-sets to the subscriber process, and keep
%% the messages in a 'pending' queue.
%% The subscriber may choose to ack any received offset.
%% Acknowledging a greater offset will automatically acknowledge
%% the messages before this offset.
%% For example, if message `[1, 2, 3, 4]' have been sent to (as one or more message-sets)
%% to the subscriber, the subscriber may acknowledge with offset `3' to indicate that
%% the first three messages are successfully processed, leaving behind only message `4'
%% pending.
%%
%%
%% The 'pending' queue has a size limit (see `prefetch_count' consumer config)
%% which is to provide a mechanism to handle back-pressure.
%% If there are too many messages pending on ack, the consumer will stop
%% fetching new ones so the subscriber won't get overwhelmed.
%%
%% Note, there is no range check done for the acknowledging offset, meaning if offset `[M, N]'
%% are pending to be acknowledged, acknowledging with `Offset > N' will cause all offsets to be
%% removed from the pending queue, and acknowledging with `Offset < M' has no effect.
%%
%% Use this function only with plain partition subscribers (i.e., when you
%% manually call {@link subscribe/5}). Behaviours like
%% {@link brod_topic_subscriber} have their own way how to ack messages.
-spec consume_ack(client(), topic(), partition(), offset()) ->
        ok | {error, any()}.
consume_ack(Client, Topic, Partition, Offset) ->
  case brod_client:get_consumer(Client, Topic, Partition) of
    {ok, ConsumerPid} -> consume_ack(ConsumerPid, Offset);
    {error, Reason}   -> {error, Reason}
  end.

%% @equiv brod_consumer:ack(ConsumerPid, Offset)
%% @doc See {@link consume_ack/4} for more information.
-spec consume_ack(pid(), offset()) -> ok | {error, any()}.
consume_ack(ConsumerPid, Offset) ->
  brod_consumer:ack(ConsumerPid, Offset).

%% @see brod_group_subscriber:start_link/7
-spec start_link_group_subscriber(
        client(), group_id(), [topic()],
        group_config(), consumer_config(), module(), term()) ->
          {ok, pid()} | {error, any()}.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig,
                            ConsumerConfig, CbModule, CbInitArg) ->
  brod_group_subscriber:start_link(Client, GroupId, Topics, GroupConfig,
                                   ConsumerConfig, CbModule, CbInitArg).

%% @doc Start group_subscriber_v2.
-spec start_link_group_subscriber_v2(
        brod_group_subscriber_v2:subscriber_config()
       ) -> {ok, pid()} | {error, any()}.
start_link_group_subscriber_v2(Config) ->
  brod_group_subscriber_v2:start_link(Config).

%% @see brod_group_subscriber:start_link/8
-spec start_link_group_subscriber(
        client(), group_id(), [topic()], group_config(),
        consumer_config(), message | message_set,
        module(), term()) ->
          {ok, pid()} | {error, any()}.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig,
                            ConsumerConfig, MessageType,
                            CbModule, CbInitArg) ->
  brod_group_subscriber:start_link(Client, GroupId, Topics, GroupConfig,
                                   ConsumerConfig, MessageType,
                                   CbModule, CbInitArg).

%% @equiv start_link_topic_subscriber(Client, Topic, 'all', ConsumerConfig,
%%                                    CbModule, CbInitArg)
%% @deprecated Please use {@link start_link_topic_subscriber/1} instead
-spec start_link_topic_subscriber(
        client(), topic(), consumer_config(), module(), term()) ->
          {ok, pid()} | {error, any()}.
start_link_topic_subscriber(Client, Topic, ConsumerConfig,
                            CbModule, CbInitArg) ->
  start_link_topic_subscriber(Client, Topic, all, ConsumerConfig,
                              CbModule, CbInitArg).

%% @equiv start_link_topic_subscriber(Client, Topic, Partitions,
%%                                    ConsumerConfig, message,
%%                                    CbModule, CbInitArg)
%% @deprecated Please use {@link start_link_topic_subscriber/1} instead
-spec start_link_topic_subscriber(
        client(), topic(), all | [partition()],
        consumer_config(), module(), term()) ->
          {ok, pid()} | {error, any()}.
start_link_topic_subscriber(Client, Topic, Partitions,
                            ConsumerConfig, CbModule, CbInitArg) ->
  start_link_topic_subscriber(Client, Topic, Partitions,
                              ConsumerConfig, message, CbModule, CbInitArg).

%% @see brod_topic_subscriber:start_link/7
%% @deprecated Please use {@link start_link_topic_subscriber/1} instead
-spec start_link_topic_subscriber(
        client(), topic(), all | [partition()],
        consumer_config(), message | message_set,
        module(), term()) ->
          {ok, pid()} | {error, any()}.
start_link_topic_subscriber(Client, Topic, Partitions,
                            ConsumerConfig, MessageType, CbModule, CbInitArg) ->
  brod_topic_subscriber:start_link(Client, Topic, Partitions,
                                   ConsumerConfig, MessageType,
                                   CbModule, CbInitArg).

%% @see brod_topic_subscriber:start_link/1
-spec start_link_topic_subscriber(
        brod_topic_subscriber:topic_subscriber_config()
       ) -> {ok, pid()} | {error, any()}.
start_link_topic_subscriber(Config) ->
  brod_topic_subscriber:start_link(Config).

%% @equiv create_topics(Hosts, TopicsConfigs, RequestConfigs, [])
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32()}) ->
        ok | {error, any()}.
create_topics(Hosts, TopicConfigs, RequestConfigs) ->
  brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs).

%% @doc Create topic(s) in kafka.
%%
%% `TopicConfigs' is a list of topic configurations.
%% A topic configuration is a map (or tuple list for backward compatibility)
%% with the following keys (all of them are reuired):
%%  <ul>
%%    <li>`name'
%%
%%      The topic name.</li>
%%
%%    <li>`num_partitions'
%%
%%      The number of partitions to create in the topic, or -1 if we are
%%      either specifying a manual partition assignment or using the default
%%      partitions.</li>
%%
%%    <li>`replication_factor'
%%
%%      The number of replicas to create for each partition in the topic,
%%      or -1 if we are either specifying a manual partition assignment
%%      or using the default replication factor.</li>
%%
%%    <li>`assignments'
%%
%%      The manual partition assignment, or the empty list if we let Kafka
%%      automatically assign them. It is a list of maps (or tuple lists) with the
%%      following keys: `partition_index' and `broker_ids' (a list of of brokers to
%%      place the partition on).</li>
%%
%%    <li>`configs'
%%
%%      The custom topic configurations to set. It is a list of of maps (or
%%      tuple lists) with keys `name' and `value'. You can find possible
%%      options in the Kafka documentation.</li>
%% </ul>
%%
%% Example:
%% ```
%% > TopicConfigs = [
%%     #{
%%       name => <<"my_topic">>,
%%       num_partitions => 1,
%%       replication_factor => 1,
%%       assignments => [],
%%       configs => [ #{name  => <<"cleanup.policy">>, value => "compact"}]
%%     }
%%   ].
%% > brod:create_topics([{"localhost", 9092}], TopicConfigs, #{timeout => 1000}, []).
%% ok
%% '''
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32()},
                    conn_config()) ->
        ok | {error, any()}.
create_topics(Hosts, TopicConfigs, RequestConfigs, Options) ->
  brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs, Options).

%% @equiv delete_topics(Hosts, Topics, Timeout, [])
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
        ok | {error, any()}.
delete_topics(Hosts, Topics, Timeout) ->
  brod_utils:delete_topics(Hosts, Topics, Timeout).

%% @doc Delete topic(s) from kafka.
%%
%% Example:
%% ```
%% > brod:delete_topics([{"localhost", 9092}], ["my_topic"], 5000, []).
%% ok
%% '''
-spec delete_topics([endpoint()], [topic()], pos_integer(), conn_config()) ->
        ok | {error, any()}.
delete_topics(Hosts, Topics, Timeout, Options) ->
  brod_utils:delete_topics(Hosts, Topics, Timeout, Options).

%% @doc Fetch broker metadata for all topics.
%%
%% See {@link get_metadata/3} for more information.
-spec get_metadata([endpoint()]) -> {ok, kpro:struct()} | {error, any()}.
get_metadata(Hosts) ->
  brod_utils:get_metadata(Hosts).

%% @doc Fetch broker metadata for the given topics.
%%
%% See {@link get_metadata/3} for more information.
-spec get_metadata([endpoint()], all | [topic()]) ->
        {ok, kpro:struct()} | {error, any()}.
get_metadata(Hosts, Topics) ->
  brod_utils:get_metadata(Hosts, Topics).

%% @doc Fetch broker metadata for the given topics using the given connection options.
%%
%% The response differs in each version of the `Metadata' API call.
%% The last supported `Metadata' API version is 2, so this will be
%% probably used (if your Kafka supports it too). See
%% <a href="https://github.com/kafka4beam/kafka_protocol/blob/master/priv/kafka.bnf">kafka.bnf</a>
%% (search for `MetadataResponseV2') for response schema with comments.
%%
%% Beware that when `auto.create.topics.enable' is set to true in
%% the broker configuration, fetching metadata with a concrete
%% topic specified (in the `Topics' parameter) may cause creation of
%% the topic when it does not exist. If you want a safe `get_metadata'
%% call, always pass `all' as `Topics' and then filter them.
%%
%%
%% ```
%% > brod:get_metadata([{"localhost", 9092}], [<<"my_topic">>], []).
%% {ok,#{brokers =>
%%           [#{host => <<"localhost">>,node_id => 1,port => 9092,
%%              rack => <<>>}],
%%       cluster_id => <<"jTb2faMLRf6p21yD1y3v-A">>,
%%       controller_id => 1,
%%       topics =>
%%           [#{error_code => no_error,is_internal => false,
%%              name => <<"my_topic">>,
%%              partitions =>
%%                  [#{error_code => no_error,
%%                     isr_nodes => [1],
%%                     leader_id => 1,partition_index => 1,
%%                     replica_nodes => [1]},
%%                   #{error_code => no_error,
%%                     isr_nodes => [1],
%%                     leader_id => 1,partition_index => 0,
%%                     replica_nodes => [1]}]}]}}
%% '''
-spec get_metadata([endpoint()], all | [topic()], conn_config()) ->
        {ok, kpro:struct()} | {error, any()}.
get_metadata(Hosts, Topics, Options) ->
  brod_utils:get_metadata(Hosts, Topics, Options).

%% @equiv resolve_offset(Hosts, Topic, Partition, latest, [])
-spec resolve_offset([endpoint()], topic(), partition()) ->
        {ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition) ->
  resolve_offset(Hosts, Topic, Partition, ?OFFSET_LATEST).

%% @equiv resolve_offset(Hosts, Topic, Partition, Time, [])
-spec resolve_offset([endpoint()], topic(), partition(), offset_time()) ->
        {ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time) ->
  resolve_offset(Hosts, Topic, Partition, Time, []).

%% @doc Resolve semantic offset or timestamp to real offset.
%%
%% The same as {@link resolve_offset/6} but the timeout is
%% extracted from connection config.
-spec resolve_offset([endpoint()], topic(), partition(),
                     offset_time(), conn_config()) ->
        {ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg) ->
  brod_utils:resolve_offset(Hosts, Topic, Partition, Time, ConnCfg).

%% @doc Resolve semantic offset or timestamp to real offset.
%%
%% The function returns the offset of the first message
%% with the given timestamp, or of the first message after
%% the given timestamp (in case no message matches the
%% timestamp exactly), or -1 if the timestamp is newer
%% than (>) all messages in the topic.
%%
%% You can also use two semantic offsets instead of
%% a timestamp: `earliest' gives you the offset of the
%% first message in the topic and `latest' gives you
%% the offset of the last message incremented by 1.
%%
%% If the topic is empty, both `earliest' and `latest'
%% return the same value (which is 0 unless some messages
%% were deleted from the topic), and any timestamp returns
%% -1.
%%
%% An example for illustration:
%% ```
%% Messages:
%% offset       0   1   2   3
%% timestamp    10  20  20  30
%%
%% Calls:
%% resolve_offset(Endpoints, Topic, Partition, 5)  0
%% resolve_offset(Endpoints, Topic, Partition, 10)  0
%% resolve_offset(Endpoints, Topic, Partition, 13)  1
%% resolve_offset(Endpoints, Topic, Partition, 20)  1
%% resolve_offset(Endpoints, Topic, Partition, 31)  -1
%% resolve_offset(Endpoints, Topic, Partition, earliest)  0
%% resolve_offset(Endpoints, Topic, Partition, latest)  4
%% '''
-spec resolve_offset([endpoint()], topic(), partition(),
                     offset_time(), conn_config(),
                      #{timeout => kpro:int32()}) ->
        {ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts) ->
  brod_utils:resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts).

%% @doc Fetch a single message set from the given topic-partition.
%%
%% Calls {@link fetch/5} with the default options: `max_wait_time' = 1 second,
%% `min_bytes' = 1 B, and `max_bytes' = 2^20 B (1 MB).
%%
%% See {@link fetch/5} for more information.
-spec fetch(connection() | client_id() | bootstrap(),
            topic(), partition(), integer()) ->
              {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
fetch(ConnOrBootstrap, Topic, Partition, Offset) ->
  Opts = #{ max_wait_time => 1000
          , min_bytes => 1
          , max_bytes => 1 bsl 20
          },
  fetch(ConnOrBootstrap, Topic, Partition, Offset, Opts).

%% @doc Fetch a single message set from the given topic-partition.
%%
%% The first arg can either be an already established connection to leader,
%% or `{Endpoints, ConnConfig}' (or just `Endpoints') so to establish a new
%% connection before fetch.
%%
%% The fourth argument is the start offset of the query. Messages with offset
%% greater or equal will be fetched.
%%
%% You can also pass options for the fetch query.
%% See the {@link kpro_req_lib:fetch_opts()} type for their documentation.
%% Only `max_wait_time', `min_bytes', `max_bytes', and `isolation_level'
%% options are currently supported. The defaults are the same as documented
%% in the linked type, except for `min_bytes' which defaults to 1 in `brod'.
%% Note that `max_bytes' will be rounded up so that full messages are
%% retrieved. For example, if you specify `max_bytes = 42' and there
%% are three messages of size 40 bytes, two of them will be fetched.
%%
%% On success, the function returns the messages along with the <i>last stable
%% offset</i> (when using `read_committed' mode, the last committed offset) or the
%% <i>high watermark offset</i> (offset of the last message that was successfully
%% copied to all replicas, incremented by 1), whichever is lower. In essence, this
%% is the offset up to which it was possible to read the messages at the time of
%% fetching. This is similar to what {@link resolve_offset/6} with `latest'
%% returns. You can use this information to determine how far from the end of the
%% topic you currently are. Note that when you use this offset as the start offset
%% for a subseuqent call, an empty list of messages will be returned (assuming the
%% topic hasn't changed, e.g. no new message arrived). Only when you use an offset
%% greater than this one, `{error, offset_out_of_range}' will be returned.
%%
%% Note also that Kafka batches messages in a message set only up to the end of
%% a topic segment in which the first retrieved message is, so there may actually
%% be more messages behind the last fetched offset even if the fetched size is
%% significantly less than `max_bytes' provided in `fetch_opts()'.
%% See <a href="https://github.com/kafka4beam/brod/issues/251">this issue</a>
%% for more details.
%%
%% Example (the topic has only two messages):
%% ```
%% > brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 0, #{max_bytes => 1024}).
%% {ok,{2,
%%      [{kafka_message,0,<<"some_key">>,<<"Hello world!">>,
%%                      create,1663940976473,[]},
%%       {kafka_message,1,<<"another_key">>,<<"This is a message with offset 1.">>,
%%                      create,1663940996335,[]}]}}
%%
%% > brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 2, #{max_bytes => 1024}).
%% {ok,{2,[]}}
%%
%% > brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 3, #{max_bytes => 1024}).
%% {error,offset_out_of_range}
%% '''
-spec fetch(connection() | client_id() | bootstrap(),
            topic(), partition(), offset(), fetch_opts()) ->
              {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
fetch(ConnOrBootstrap, Topic, Partition, Offset, Opts) ->
  brod_utils:fetch(ConnOrBootstrap, Topic, Partition, Offset, Opts).

%% @doc Fold through messages in a partition.
%%
%% Works like `lists:foldl/2' but with below stop conditions:
%% <ul>
%% <li> Always return after reach high watermark offset </li>
%% <li> Return after the given message count limit is reached </li>
%% <li> Return after the given kafka offset is reached </li>
%% <li> Return if the `FoldFun' returns an `{error, Reason}' tuple </li>
%% </ul>
%%
%% NOTE: Exceptions from evaluating `FoldFun' are not caught.
-spec fold(connection() | client_id() | bootstrap(),
           topic(), partition(), offset(), fetch_opts(),
           Acc, fold_fun(Acc), fold_limits()) ->
             fold_result() when Acc :: fold_acc().
fold(Bootstrap, Topic, Partition, Offset, Opts, Acc, Fun, Limits) ->
  brod_utils:fold(Bootstrap, Topic, Partition, Offset, Opts, Acc, Fun, Limits).

%% @equiv fetch(Hosts, Topic, Partition, Offset, Wait, MinBytes, MaxBytes, [])
%% @deprecated Please use {@link fetch/5} instead
-spec fetch([endpoint()], topic(), partition(), offset(),
            non_neg_integer(), non_neg_integer(), pos_integer()) ->
               {ok, [message()]} | {error, any()}.
fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes) ->
  fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes, []).

%% @doc Fetch a single message set from the given topic-partition.
%% @deprecated Please use {@link fetch/5} instead
-spec fetch([endpoint()], topic(), partition(), offset(),
            non_neg_integer(), non_neg_integer(), pos_integer(),
            conn_config()) -> {ok, [message()]} | {error, any()}.
fetch(Hosts, Topic, Partition, Offset,
      MaxWaitTime, MinBytes, MaxBytes, ConnConfig) ->
  FetchOpts = #{ max_wait_time => MaxWaitTime
               , min_bytes => MinBytes
               , max_bytes => MaxBytes
               },
  case fetch({Hosts, ConnConfig}, Topic, Partition, Offset, FetchOpts) of
    {ok, {_HwOffset, Batch}} -> {ok, Batch}; %% backward compatible
    {error, Reason} -> {error, Reason}
  end.

%% @doc Connect partition leader.
-spec connect_leader([endpoint()], topic(), partition(),
                     conn_config()) -> {ok, pid()}.
connect_leader(Hosts, Topic, Partition, ConnConfig) ->
  KproOptions = brod_utils:kpro_connection_options(ConnConfig),
  kpro:connect_partition_leader(Hosts, ConnConfig, Topic, Partition, KproOptions).

%% @doc List ALL consumer groups in the given kafka cluster.
%%
%% NOTE: Exception if failed to connect any of the coordinator brokers.
-spec list_all_groups([endpoint()], conn_config()) ->
        [{endpoint(), [cg()] | {error, any()}}].
list_all_groups(Endpoints, ConnCfg) ->
  brod_utils:list_all_groups(Endpoints, ConnCfg).

%% @doc List consumer groups in the given group coordinator broker.
-spec list_groups(endpoint(), conn_config()) -> {ok, [cg()]} | {error, any()}.
list_groups(CoordinatorEndpoint, ConnCfg) ->
  brod_utils:list_groups(CoordinatorEndpoint, ConnCfg).

%% @doc Describe consumer groups.
%%
%% The given consumer group IDs should be all
%% managed by the coordinator-broker running at the given endpoint.
%% Otherwise error codes will be returned in the result structs.
%% Return `describe_groups' response body field named `groups'.
%% See `kpro_schema.erl' for struct details.
-spec describe_groups(endpoint(), conn_config(), [group_id()]) ->
        {ok, [kpro:struct()]} | {error, any()}.
describe_groups(CoordinatorEndpoint, ConnCfg, IDs) ->
  brod_utils:describe_groups(CoordinatorEndpoint, ConnCfg, IDs).

%% @doc Connect to consumer group coordinator broker.
%%
%% Done in steps: <ol>
%% <li>Connect to any of the given bootstrap ednpoints</li>
%%
%% <li>Send group_coordinator_request to resolve group coordinator
%% endpoint</li>
%%
%% <li>Connect to the resolved endpoint and return the connection
%% pid</li></ol>
-spec connect_group_coordinator([endpoint()], conn_config(), group_id()) ->
        {ok, pid()} | {error, any()}.
connect_group_coordinator(BootstrapEndpoints, ConnCfg, GroupId) ->
  KproOptions = brod_utils:kpro_connection_options(ConnCfg),
  Args = maps:merge(KproOptions, #{type => group, id => GroupId}),
  kpro:connect_coordinator(BootstrapEndpoints, ConnCfg, Args).

%% @doc Fetch committed offsets for ALL topics in the given consumer group.
%%
%% Return the `responses' field of the `offset_fetch' response.
%% See `kpro_schema.erl' for struct details.
-spec fetch_committed_offsets([endpoint()], conn_config(), group_id()) ->
        {ok, [kpro:struct()]} | {error, any()}.
fetch_committed_offsets(BootstrapEndpoints, ConnCfg, GroupId) ->
  brod_utils:fetch_committed_offsets(BootstrapEndpoints, ConnCfg, GroupId, []).

%% @doc Same as @{link fetch_committed_offsets/3},
%% but works with a started `brod_client'
-spec fetch_committed_offsets(client(), group_id()) ->
        {ok, [kpro:struct()]} | {error, any()}.
fetch_committed_offsets(Client, GroupId) ->
  brod_utils:fetch_committed_offsets(Client, GroupId, []).

-ifdef(build_brod_cli).
main(X) -> brod_cli:main(X).
-endif.

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End: