src/brod_group_subscriber.erl

%%%
%%%   Copyright (c) 2016-2021 Klarna Bank AB (publ)
%%%
%%%   Licensed under the Apache License, Version 2.0 (the "License");
%%%   you may not use this file except in compliance with the License.
%%%   You may obtain a copy of the License at
%%%
%%%       http://www.apache.org/licenses/LICENSE-2.0
%%%
%%%   Unless required by applicable law or agreed to in writing, software
%%%   distributed under the License is distributed on an "AS IS" BASIS,
%%%   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%%   See the License for the specific language governing permissions and
%%%   limitations under the License.
%%%

%%%=============================================================================
%%% @doc
%%% A group subscriber is a gen_server which subscribes to partition consumers
%%% (poller) and calls the user-defined callback functions for message
%%% processing.
%%%
%%% An overview of what it does behind the scene:
%%% <ol>
%%% <li>Start a consumer group coordinator to manage the consumer group states,
%%%     see {@link brod_group_coordinator:start_link/6}</li>
%%% <li>Start (if not already started) topic-consumers (pollers) and subscribe
%%%     to the partition workers when group assignment is received from the
%%      group leader, see {@link brod:start_consumer/3}</li>
%%% <li>Call `CallbackModule:handle_message/4' when messages are received from
%%%     the partition consumers.</li>
%%% <li>Send acknowledged offsets to group coordinator which will be committed
%%%     to kafka periodically.</li>
%%% </ol>
%%% @end
%%%=============================================================================

-module(brod_group_subscriber).

-behaviour(gen_server).
-behaviour(brod_group_member).

-export([ ack/4
        , ack/5
        , commit/1
        , commit/4
        , start_link/7
        , start_link/8
        , stop/1
        ]).

%% callbacks for brod_group_coordinator
-export([ get_committed_offsets/2
        , assignments_received/4
        , assignments_revoked/1
        , assign_partitions/3
        , user_data/1
        ]).

-export([ code_change/3
        , handle_call/3
        , handle_cast/2
        , handle_info/2
        , init/1
        , terminate/2
        ]).

-include("brod_int.hrl").

-type cb_state() :: term().
-type member_id() :: brod:group_member_id().

%% Initialize the callback module s state.
-callback init(brod:group_id(), term()) -> {ok, cb_state()}.

%% @doc Handle a message. Return one of:
%%
%% `{ok, NewCallbackState}':
%%   The subscriber has received the message for processing async-ly.
%%   It should call {@link brod_group_subscriber:ack/4} to acknowledge later.
%%
%% `{ok, ack, NewCallbackState}':
%%   The subscriber has completed processing the message.
%%
%% `{ok, ack_no_commit, NewCallbackState}':
%%   The subscriber has completed processing the message, but it
%%   is not ready to commit offset yet. It should call
%%   {@link brod_group_subscriber:commit/4} later.
%%
%% While this callback function is being evaluated, the fetch-ahead
%% partition-consumers are fetching more messages behind the scene
%% unless prefetch_count and prefetch_bytes are set to 0 in consumer config.
%% @end
-callback handle_message(brod:topic(),
                         brod:partition(),
                         brod:message() | brod:message_set(),
                         cb_state()) -> {ok, cb_state()} |
                                        {ok, ack, cb_state()} |
                                        {ok, ack_no_commit, cb_state()}.

%% This callback is called only when subscriber is to commit offsets locally
%% instead of kafka.
%% Return {ok, Offsets, cb_state()} where Offsets can be [],
%% or only the ones that are found in e.g. local storage or database.
%% For the topic-partitions which have no committed offset found,
%% the consumer will take 'begin_offset' in consumer config as the start point
%% of data stream. If 'begin_offset' is not found in consumer config, the
%% default value -1 (latest) is used.
%
% commented out as it's an optional callback
%-callback get_committed_offsets(brod:group_id(),
%                                [{brod:topic(), brod:partition()}],
%                                cb_state()) ->  {ok,
%                                                 [{{brod:topic(),
%                                                    brod:partition()},
%                                                   brod:offset()}],
%                                                 cb_state()}.
%
%% This function is called only when 'partition_assignment_strategy' is
%% 'callback_implemented' in group config.
%% The first element in the group member list is ensured to be the group leader.
%
% commented out as it's an optional callback
%-callback assign_partitions([brod:group_member()],
%                            [{brod:topic(), brod:partition()}],
%                            cb_state()) -> [{brod:group_member_id(),
%                                             [brod:partition_assignment()]}].

-define(DOWN(Reason), {down, brod_utils:os_time_utc_str(), Reason}).

-record(consumer,
        { topic_partition :: {brod:topic(), brod:partition()}
        , consumer_pid    :: ?undef                  %% initial state
                           | pid()                   %% normal state
                           | {down, string(), any()} %% consumer restarting
        , consumer_mref   :: ?undef | reference()
        , begin_offset    :: ?undef | brod:offset()
        , acked_offset    :: ?undef | brod:offset()
        , last_offset     :: ?undef | brod:offset()
        }).

-type consumer() :: #consumer{}.

-type ack_ref() :: {brod:topic(), brod:partition(), brod:offset()}.

-record(state,
        { client             :: brod:client()
        , client_mref        :: reference()
        , groupId            :: brod:group_id()
        , memberId           :: ?undef | member_id()
        , generationId       :: ?undef | brod:group_generation_id()
        , coordinator        :: pid()
        , consumers = []     :: [consumer()]
        , consumer_config    :: brod:consumer_config()
        , is_blocked = false :: boolean()
        , subscribe_tref     :: ?undef | reference()
        , cb_module          :: module()
        , cb_state           :: cb_state()
        , message_type       :: message | message_set
        }).

-type state() :: #state{}.

%% delay 2 seconds retry the failed subscription to partition consumer process
-define(RESUBSCRIBE_DELAY, 2000).

-define(LO_CMD_SUBSCRIBE_PARTITIONS, '$subscribe_partitions').

%%%_* APIs =====================================================================

%% @equiv start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig,
%%             message, CbModule, CbInitArg)
-spec start_link(brod:client(), brod:group_id(), [brod:topic()],
                 brod:group_config(), brod:consumer_config(),
                 module(), term()) -> {ok, pid()} | {error, any()}.
start_link(Client, GroupId, Topics, GroupConfig,
           ConsumerConfig, CbModule, CbInitArg) ->
  start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig,
             message, CbModule, CbInitArg).


%% @doc Start (link) a group subscriber.
%%
%% `Client': Client ID (or pid, but not recommended) of the brod client.
%%
%% `GroupId': Consumer group ID which should be unique per kafka cluster
%%
%% `Topics': Predefined set of topic names to join the group.
%%
%%   NOTE: The group leader member will collect topics from all members and
%%         assign all collected topic-partitions to members in the group.
%%         i.e. members can join with arbitrary set of topics.
%%
%% `GroupConfig': For group coordinator, see
%%    {@link brod_group_coordinator:start_link/6}
%%
%% `ConsumerConfig': For partition consumer, see
%% {@link brod_consumer:start_link/4}
%%
%% `MessageType':
%%   The type of message that is going to be handled by the callback
%%   module. Can be either `message' or `message_set'.
%%
%% `CbModule':
%%   Callback module which should have the callback functions
%%   implemented for message processing.
%%
%% `CbInitArg':
%%   The term() that is going to be passed to `CbModule:init/2' as a
%%   second argument when initializing the subscriber.
%% @end
-spec start_link(brod:client(), brod:group_id(), [brod:topic()],
                 brod:group_config(), brod:consumer_config(),
                 message | message_set,
                 module(), term()) -> {ok, pid()} | {error, any()}.
start_link(Client, GroupId, Topics, GroupConfig,
           ConsumerConfig, MessageType, CbModule, CbInitArg) ->
  Args = {Client, GroupId, Topics, GroupConfig,
          ConsumerConfig, MessageType, CbModule, CbInitArg},
  gen_server:start_link(?MODULE, Args, []).

%% @doc Stop group subscriber, wait for pid `DOWN' before return.
-spec stop(pid()) -> ok.
stop(Pid) ->
  Mref = erlang:monitor(process, Pid),
  ok = gen_server:cast(Pid, stop),
  receive
    {'DOWN', Mref, process, Pid, _Reason} ->
      ok
  end.

%% @doc Acknowledge and commit an offset.
%% The subscriber may ack a later (greater) offset which will be considered
%% as multi-acking the earlier (smaller) offsets. This also means that
%% disordered acks may overwrite offset commits and lead to unnecessary
%% message re-delivery in case of restart.
%% @end
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
ack(Pid, Topic, Partition, Offset) ->
  ack(Pid, Topic, Partition, Offset, true).

%% @doc Acknowledge an offset.
%% This call may or may not commit group subscriber offset depending on
%% the value of `Commit' argument
%% @end
-spec ack(pid(), brod:topic(), brod:partition(), brod:offset(), boolean()) ->
             ok.
ack(Pid, Topic, Partition, Offset, Commit) ->
  gen_server:cast(Pid, {ack, Topic, Partition, Offset, Commit}).

%% @doc Commit all acked offsets. NOTE: This is an async call.
-spec commit(pid()) -> ok.
commit(Pid) ->
  gen_server:cast(Pid, commit_offsets).

%% @doc Commit offset for a topic. This is an asynchronous call
-spec commit(pid(), brod:topic(), brod:partition(), brod:offset()) -> ok.
commit(Pid, Topic, Partition, Offset) ->
  gen_server:cast(Pid, {commit_offset, Topic, Partition, Offset}).

user_data(_Pid) -> <<>>.

%%%_* APIs for group coordinator ===============================================

%% @doc Called by group coordinator when there is new assignment received.
-spec assignments_received(pid(), member_id(), integer(),
                           brod:received_assignments()) -> ok.
assignments_received(Pid, MemberId, GenerationId, TopicAssignments) ->
  gen_server:cast(Pid, {new_assignments, MemberId,
                        GenerationId, TopicAssignments}).

%% @doc Called by group coordinator before re-joining the consumer group.
-spec assignments_revoked(pid()) -> ok.
assignments_revoked(Pid) ->
  gen_server:call(Pid, unsubscribe_all_partitions, infinity).

%% @doc This function is called only when `partition_assignment_strategy'
%% is set for `callback_implemented' in group config.
%% @end
-spec assign_partitions(pid(), [brod:group_member()],
                        [{brod:topic(), brod:partition()}]) ->
        [{member_id(), [brod:partition_assignment()]}].
assign_partitions(Pid, Members, TopicPartitionList) ->
  Call = {assign_partitions, Members, TopicPartitionList},
  gen_server:call(Pid, Call, infinity).

%% @doc Called by group coordinator when initializing the assignments
%% for subscriber.
%%
%% NOTE: This function is called only when `offset_commit_policy' is set to
%%       `consumer_managed' in group config.
%%
%% NOTE: The committed offsets should be the offsets for successfully processed
%%       (acknowledged) messages, not the `begin_offset' to start fetching from.
%% @end
-spec get_committed_offsets(pid(), [{brod:topic(), brod:partition()}]) ->
        {ok, [{{brod:topic(), brod:partition()}, brod:offset()}]}.
get_committed_offsets(Pid, TopicPartitions) ->
  gen_server:call(Pid, {get_committed_offsets, TopicPartitions}, infinity).

%%%_* gen_server callbacks =====================================================

init({Client, GroupId, Topics, GroupConfig,
      ConsumerConfig, MessageType, CbModule, CbInitArg}) ->
  ok = brod_utils:assert_client(Client),
  ok = brod_utils:assert_group_id(GroupId),
  ok = brod_utils:assert_topics(Topics),
  {ok, CbState} = CbModule:init(GroupId, CbInitArg),
  {ok, Pid} = brod_group_coordinator:start_link(Client, GroupId, Topics,
                                                GroupConfig, ?MODULE, self()),
  State = #state{ client          = Client
                , client_mref     = erlang:monitor(process, Client)
                , groupId         = GroupId
                , coordinator     = Pid
                , consumer_config = ConsumerConfig
                , cb_module       = CbModule
                , cb_state        = CbState
                , message_type    = MessageType
                },
  {ok, State}.

handle_info({_ConsumerPid, #kafka_message_set{} = MsgSet}, State0) ->
  State = handle_consumer_delivery(MsgSet, State0),
  {noreply, State};
handle_info({'DOWN', Mref, process, _Pid, _Reason},
            #state{client_mref = Mref} = State) ->
  %% restart, my supervisor should restart me
  %% brod_client DOWN reason is discarded as it should have logged
  %% in its crash log
  {stop, client_down, State};
handle_info({'DOWN', _Mref, process, Pid, Reason},
            #state{consumers = Consumers} = State) ->
  case get_consumer(Pid, Consumers) of
    #consumer{} = Consumer ->
      NewConsumer = Consumer#consumer{ consumer_pid  = ?DOWN(Reason)
                                     , consumer_mref = ?undef
                                     },
      NewConsumers = put_consumer(NewConsumer, Consumers),
      NewState = State#state{consumers = NewConsumers},
      {noreply, NewState};
    false ->
      {noreply, State}
  end;
handle_info(?LO_CMD_SUBSCRIBE_PARTITIONS, State) ->
  NewState =
    case State#state.is_blocked of
      true ->
        State;
      false ->
        {ok, #state{} = St} = subscribe_partitions(State),
        St
    end,
  Tref = start_subscribe_timer(?undef, ?RESUBSCRIBE_DELAY),
  {noreply, NewState#state{subscribe_tref = Tref}};
handle_info(Info, State) ->
  log(State, info, "discarded message:~p", [Info]),
  {noreply, State}.

handle_call({get_committed_offsets, TopicPartitions}, _From,
            #state{ groupId   = GroupId
                  , cb_module = CbModule
                  , cb_state  = CbState
                  } = State) ->
  case CbModule:get_committed_offsets(GroupId, TopicPartitions, CbState) of
    {ok, Result, NewCbState} ->
      NewState = State#state{cb_state = NewCbState},
      {reply, {ok, Result}, NewState};
    Unknown ->
      erlang:error({bad_return_value,
                    {CbModule, get_committed_offsets, Unknown}})
  end;
handle_call({assign_partitions, Members, TopicPartitions}, _From,
            #state{ cb_module = CbModule
                  , cb_state  = CbState
                  } = State) ->
  case CbModule:assign_partitions(Members, TopicPartitions, CbState) of
    {NewCbState, Result} ->
      {reply, Result, State#state{ cb_state = NewCbState }};
    %% Returning an updated cb_state is optional and clients that implemented
    %% brod prior to version 3.7.1 need this backwards compatibly case clause
    Result when is_list(Result) ->
      {reply, Result, State}
  end;
handle_call(unsubscribe_all_partitions, _From,
            #state{ consumers = Consumers
                  } = State) ->
  lists:foreach(
    fun(#consumer{ consumer_pid  = ConsumerPid
                 , consumer_mref = ConsumerMref
                 }) ->
        case is_pid(ConsumerPid) of
          true ->
            _ = brod:unsubscribe(ConsumerPid, self()),
            _ = erlang:demonitor(ConsumerMref, [flush]);
          false ->
            ok
        end
    end, Consumers),
  {reply, ok, State#state{ consumers  = []
                         , is_blocked = true
                         }};
handle_call(Call, _From, State) ->
  {reply, {error, {unknown_call, Call}}, State}.

handle_cast({ack, Topic, Partition, Offset, Commit}, State) ->
  AckRef = {Topic, Partition, Offset},
  NewState = handle_ack(AckRef, State, Commit),
  {noreply, NewState};
handle_cast(commit_offsets, State) ->
  ok = brod_group_coordinator:commit_offsets(State#state.coordinator),
  {noreply, State};
handle_cast({commit_offset, Topic, Partition, Offset}, State) ->
  #state{ coordinator  = Coordinator
        , generationId = GenerationId
        } = State,
  do_commit_ack(Coordinator, GenerationId, Topic, Partition, Offset),
  {noreply, State};
handle_cast({new_assignments, MemberId, GenerationId, Assignments},
            #state{ client          = Client
                  , consumer_config = ConsumerConfig
                  , subscribe_tref  = Tref
                  } = State) ->
  AllTopics =
    lists:map(fun(#brod_received_assignment{topic = Topic}) ->
                Topic
              end, Assignments),
  lists:foreach(
    fun(Topic) ->
      ok = brod:start_consumer(Client, Topic, ConsumerConfig)
    end, lists:usort(AllTopics)),
  Consumers =
    [ #consumer{ topic_partition = {Topic, Partition}
               , consumer_pid    = ?undef
               , begin_offset    = BeginOffset
               , acked_offset    = ?undef
               }
    || #brod_received_assignment{ topic        = Topic
                                , partition    = Partition
                                , begin_offset = BeginOffset
                                } <- Assignments
    ],
  NewState = State#state{ consumers      = Consumers
                        , is_blocked     = false
                        , memberId       = MemberId
                        , generationId   = GenerationId
                        , subscribe_tref = start_subscribe_timer(Tref, 0)
                        },
  {noreply, NewState};
handle_cast(stop, State) ->
  {stop, normal, State};
handle_cast(_Cast, State) ->
  {noreply, State}.

code_change(_OldVsn, State, _Extra) ->
  {ok, State}.

terminate(_Reason, #state{}) ->
  ok.

%%%_* Internal Functions =======================================================

handle_consumer_delivery(#kafka_message_set{ topic     = Topic
                                           , partition = Partition
                                           , messages  = Messages
                                           } = MsgSet,
                         #state{ message_type = MsgType
                               , consumers = Consumers0
                               } = State0) ->
  case get_consumer({Topic, Partition}, Consumers0) of
    #consumer{} = C ->
      Consumers = update_last_offset(Messages, C, Consumers0),
      State = State0#state{consumers = Consumers},
      case MsgType of
        message -> handle_messages(Topic, Partition, Messages, State);
        message_set -> handle_message_set(MsgSet, State)
      end;
    false ->
      State0
  end.

update_last_offset(Messages, Consumer0, Consumers) ->
  %% brod_consumer never delivers empty message set, lists:last is safe
  #kafka_message{offset = LastOffset} = lists:last(Messages),
  Consumer = Consumer0#consumer{last_offset = LastOffset},
  put_consumer(Consumer, Consumers).

-spec start_subscribe_timer(?undef | reference(), timeout()) -> reference().
start_subscribe_timer(?undef, Delay) ->
  erlang:send_after(Delay, self(), ?LO_CMD_SUBSCRIBE_PARTITIONS);
start_subscribe_timer(Ref, _Delay) when is_reference(Ref) ->
  %% The old timer is not expired, keep waiting
  %% A bit delay on subscribing to brod_consumer is fine
  Ref.

handle_message_set(MessageSet, State) ->
  #kafka_message_set{ topic     = Topic
                    , partition = Partition
                    , messages  = Messages
                    } = MessageSet,
  #state{cb_module = CbModule, cb_state = CbState} = State,
  {AckNow, CommitNow, NewCbState} =
    case CbModule:handle_message(Topic, Partition, MessageSet, CbState) of
      {ok, NewCbState_} ->
        {false, false, NewCbState_};
      {ok, ack, NewCbState_} ->
        {true, true, NewCbState_};
      {ok, ack_no_commit, NewCbState_} ->
        {true, false, NewCbState_};
      Unknown ->
        erlang:error({bad_return_value,
                      {CbModule, handle_message, Unknown}})
    end,
  State1 = State#state{cb_state = NewCbState},
  case AckNow of
    true  ->
      LastMessage = lists:last(Messages),
      LastOffset  = LastMessage#kafka_message.offset,
      AckRef      = {Topic, Partition, LastOffset},
      handle_ack(AckRef, State1, CommitNow);
    false -> State1
  end.

handle_messages(_Topic, _Partition, [], State) ->
  State;
handle_messages(Topic, Partition, [Msg | Rest], State) ->
  #kafka_message{offset = Offset} = Msg,
  #state{cb_module = CbModule, cb_state = CbState} = State,
  AckRef = {Topic, Partition, Offset},
  {AckNow, CommitNow, NewCbState} =
    case CbModule:handle_message(Topic, Partition, Msg, CbState) of
      {ok, NewCbState_} ->
        {false, false, NewCbState_};
      {ok, ack, NewCbState_} ->
        {true, true, NewCbState_};
      {ok, ack_no_commit, NewCbState_} ->
        {true, false, NewCbState_};
      Unknown ->
        erlang:error({bad_return_value,
                     {CbModule, handle_message, Unknown}})
    end,
  State1 = State#state{cb_state = NewCbState},
  NewState =
    case AckNow of
      true  -> handle_ack(AckRef, State1, CommitNow);
      false -> State1
    end,
  handle_messages(Topic, Partition, Rest, NewState).

-spec handle_ack(ack_ref(), state(), boolean()) -> state().
handle_ack(AckRef, #state{ generationId = GenerationId
                         , consumers    = Consumers
                         , coordinator  = Coordinator
                         } = State, CommitNow) ->
  {Topic, Partition, Offset} = AckRef,
  case get_consumer({Topic, Partition}, Consumers) of
    #consumer{consumer_pid = ConsumerPid} = Consumer when CommitNow ->
      ok = consume_ack(ConsumerPid, Offset),
      ok = do_commit_ack(Coordinator, GenerationId, Topic, Partition, Offset),
      NewConsumer = Consumer#consumer{acked_offset = Offset},
      NewConsumers = put_consumer(NewConsumer, Consumers),
      State#state{consumers = NewConsumers};
    #consumer{consumer_pid = ConsumerPid} ->
      ok = consume_ack(ConsumerPid, Offset),
      State;
    false ->
      %% Stale async-ack, discard.
      State
  end.

%% Tell consumer process to fetch more (if pre-fetch count/byte limit allows).
consume_ack(Pid, Offset) ->
  is_pid(Pid) andalso brod:consume_ack(Pid, Offset),
  ok.

%% Send an async message to group coordinator for offset commit.
do_commit_ack(Pid, GenerationId, Topic, Partition, Offset) ->
  ok = brod_group_coordinator:ack(Pid, GenerationId, Topic, Partition, Offset).

subscribe_partitions(#state{ client    = Client
                           , consumers = Consumers0
                           } = State) ->
  Consumers =
    lists:map(fun(C) -> subscribe_partition(Client, C) end, Consumers0),
  {ok, State#state{consumers = Consumers}}.

subscribe_partition(Client, Consumer) ->
  #consumer{ topic_partition = {Topic, Partition}
           , consumer_pid    = Pid
           , begin_offset    = BeginOffset0
           , acked_offset    = AckedOffset
           , last_offset     = LastOffset
           } = Consumer,
  case brod_utils:is_pid_alive(Pid) of
    true ->
      Consumer;
    false when AckedOffset =/= LastOffset andalso LastOffset =/= ?undef ->
      %% The last fetched offset is not yet acked,
      %% do not re-subscribe now to keep it simple and slow.
      %% Otherwise if we subscribe with {begin_offset, LastOffset + 1}
      %% we may exceed pre-fetch window size.
      Consumer;
    false ->
      %% fetch from the last acked offset + 1
      %% otherwise fetch from the assigned begin_offset
      BeginOffset = case AckedOffset of
                      ?undef        -> BeginOffset0;
                      N when N >= 0 -> N + 1
                    end,
      Options =
        case BeginOffset =:= ?undef of
          true  -> []; %% fetch from 'begin_offset' in consumer config
          false -> [{begin_offset, BeginOffset}]
        end,
      case brod:subscribe(Client, self(), Topic, Partition, Options) of
        {ok, ConsumerPid} ->
          Mref = erlang:monitor(process, ConsumerPid),
          Consumer#consumer{ consumer_pid  = ConsumerPid
                           , consumer_mref = Mref
                           };
        {error, Reason} ->
          Consumer#consumer{ consumer_pid  = ?DOWN(Reason)
                           , consumer_mref = ?undef
                           }
      end
  end.

log(#state{ groupId  = GroupId
          , memberId = MemberId
          , generationId = GenerationId
          }, Level, Fmt, Args) ->
  ?BROD_LOG(
     Level,
     "group subscriber (groupId=~s,memberId=~s,generation=~p,pid=~p):\n" ++ Fmt,
     [GroupId, MemberId, GenerationId, self() | Args]).

get_consumer(Pid, Consumers) when is_pid(Pid) ->
  lists:keyfind(Pid, #consumer.consumer_pid, Consumers);
get_consumer({_, _} = TP, Consumers) ->
  lists:keyfind(TP, #consumer.topic_partition, Consumers).

put_consumer(#consumer{topic_partition = TP} = C, Consumers) ->
  lists:keyreplace(TP, #consumer.topic_partition, Consumers, C).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End: