src/brod_kafka_request.erl

%%%
%%%   Copyright (c) 2017-2021 Klarna Bank AB (publ)
%%%
%%%   Licensed under the Apache License, Version 2.0 (the "License");
%%%   you may not use this file except in compliance with the License.
%%%   You may obtain a copy of the License at
%%%
%%%       http://www.apache.org/licenses/LICENSE-2.0
%%%
%%%   Unless required by applicable law or agreed to in writing, software
%%%   distributed under the License is distributed on an "AS IS" BASIS,
%%%   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%%   See the License for the specific language governing permissions and
%%%   limitations under the License.
%%%

%% @doc Helper functions for building request messages.
-module(brod_kafka_request).

-export([ create_topics/3
        , delete_topics/3
        , fetch/8
        , list_groups/1
        , list_offsets/4
        , join_group/2
        , metadata/2
        , offset_commit/2
        , offset_fetch/3
        , produce/7
        , sync_group/2
        ]).

-include("brod_int.hrl").

-type api() :: brod_kafka_apis:api().
-type vsn() :: brod_kafka_apis:vsn().
-type topic() :: brod:topic().
-type topic_config() :: kpro:struct().
-type partition() :: brod:partition().
-type offset() :: brod:offset().
-type conn() :: kpro:connection().


%% @doc Make a produce request, If the first arg is a connection pid, call
%% `brod_kafka_apis:pick_version/2' to resolve version.
-spec produce(conn() | vsn(), topic(), partition(),
              kpro:batch_input(), integer(), integer(),
              brod:compression()) -> kpro:req().
produce(MaybePid, Topic, Partition, BatchInput,
        RequiredAcks, AckTimeout, Compression) ->
  Vsn = pick_version(produce, MaybePid),
  kpro_req_lib:produce(Vsn, Topic, Partition, BatchInput,
                       #{ required_acks => RequiredAcks
                        , ack_timeout => AckTimeout
                        , compression => Compression
                        }).

%% @doc Make a create_topics request.
-spec create_topics(vsn() | conn(), [topic_config()], #{timeout => kpro:int32(),
                    validate_only => boolean()}) -> kpro:req().
create_topics(Connection, TopicConfigs, RequestConfigs)
              when is_pid(Connection) ->
  Vsn = brod_kafka_apis:pick_version(Connection, create_topics),
  create_topics(Vsn, TopicConfigs, RequestConfigs);
create_topics(Vsn, TopicConfigs, RequestConfigs) ->
  kpro_req_lib:create_topics(Vsn, TopicConfigs, RequestConfigs).

%% @doc Make a delete_topics request.
-spec delete_topics(vsn() | conn(), [topic()], pos_integer()) -> kpro:req().
delete_topics(Connection, Topics, Timeout) when is_pid(Connection) ->
  Vsn = brod_kafka_apis:pick_version(Connection, delete_topics),
  delete_topics(Vsn, Topics, Timeout);
delete_topics(Vsn, Topics, Timeout) ->
  kpro_req_lib:delete_topics(Vsn, Topics, #{timeout => Timeout}).

%% @doc Make a fetch request, If the first arg is a connection pid, call
%% `brod_kafka_apis:pick_version/2' to resolve version.
-spec fetch(conn(), topic(), partition(), offset(),
            kpro:wait(), kpro:count(), kpro:count(),
            kpro:isolation_level()) -> kpro:req().
fetch(Pid, Topic, Partition, Offset,
      WaitTime, MinBytes, MaxBytes, IsolationLevel) ->
  Vsn = pick_version(fetch, Pid),
  kpro_req_lib:fetch(Vsn, Topic, Partition, Offset,
                     #{ max_wait_time => WaitTime
                      , min_bytes => MinBytes
                      , max_bytes => MaxBytes
                      , isolation_level => IsolationLevel
                      }).

%% @doc Make a `list_offsets' request message for offset resolution.
%% In kafka protocol, -2 and -1 are semantic 'time' to request for
%% 'earliest' and 'latest' offsets.
%% In brod implementation, -2, -1, 'earliest' and 'latest'
%% are semantic 'offset', this is why often a variable named
%% Offset is used as the Time argument.
-spec list_offsets(conn(), topic(), partition(), brod:offset_time()) ->
        kpro:req().
list_offsets(Connection, Topic, Partition, TimeOrSemanticOffset) ->
  Time = ensure_integer_offset_time(TimeOrSemanticOffset),
  Vsn = pick_version(list_offsets, Connection),
  kpro_req_lib:list_offsets(Vsn, Topic, Partition, Time).

%% @doc Make a metadata request.
-spec metadata(vsn() | conn(), all | [topic()]) -> kpro:req().
metadata(Connection, Topics) when is_pid(Connection) ->
  Vsn = brod_kafka_apis:pick_version(Connection, metadata),
  metadata(Vsn, Topics);
metadata(Vsn, Topics) ->
  kpro_req_lib:metadata(Vsn, Topics).

%% @doc Make a offset fetch request.
%% NOTE: empty topics list only works for kafka 0.10.2.0 or later
-spec offset_fetch(conn(), brod:group_id(), Topics) -> kpro:req()
        when Topics :: [{topic(), [partition()]}].
offset_fetch(Connection, GroupId, Topics0) ->
  Topics =
    lists:map(
      fun({Topic, Partitions}) ->
        [ {name, Topic}
        , {partition_indexes, Partitions}
        ]
      end, Topics0),
  Body = [ {group_id, GroupId}
         , {topics, case Topics of
                      [] -> ?kpro_null;
                      _  -> Topics
                    end}
         ],
  Vsn = pick_version(offset_fetch, Connection),
  kpro:make_request(offset_fetch, Vsn, Body).

%% @doc Make a `list_groups' request.
-spec list_groups(conn()) -> kpro:req().
list_groups(Connection) ->
  Vsn = pick_version(list_groups, Connection),
  case Vsn >= 3 of
    true ->
      kpro:make_request(list_groups, Vsn, #{tagged_fields => []});
    false ->
      kpro:make_request(list_groups, Vsn, [])
  end.

%% @doc Make a `join_group' request.
-spec join_group(conn(), kpro:struct()) -> kpro:req().
join_group(Conn, Fields) ->
  make_req(join_group, Conn, Fields).

%% @doc Make a `sync_group' request.
-spec sync_group(conn(), kpro:struct()) -> kpro:req().
sync_group(Conn, Fields) ->
  make_req(sync_group, Conn, Fields).

%% @doc Make a `offset_commit' request.
-spec offset_commit(conn(), kpro:struct()) -> kpro:req().
offset_commit(Conn, Fields) ->
  make_req(offset_commit, Conn, Fields).

%%%_* Internal Functions =======================================================

make_req(API, Conn, Fields) when is_pid(Conn) ->
  Vsn = pick_version(API, Conn),
  make_req(API, Vsn, Fields);
make_req(API, Vsn, Fields) ->
  kpro:make_request(API, Vsn, Fields).

-spec pick_version(api(), pid()) -> vsn().
pick_version(_API, Vsn) when is_integer(Vsn) -> Vsn;
pick_version(API, Connection) when is_pid(Connection) ->
  brod_kafka_apis:pick_version(Connection, API);
pick_version(API, _) ->
  brod_kafka_apis:default_version(API).

-spec ensure_integer_offset_time(brod:offset_time()) -> integer().
ensure_integer_offset_time(?OFFSET_EARLIEST)     -> -2;
ensure_integer_offset_time(?OFFSET_LATEST)       -> -1;
ensure_integer_offset_time(T) when is_integer(T) -> T.

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End: