src/superstream.erl

-module(superstream).

-export([declare/2,
         delete/2,
         declare_and_bind_partition/3,
         delete_partition/3]).

-include_lib("amqp_client/include/amqp_client.hrl").

declare(Channel, SuperStream) ->
    ArgumentsExchange = [{<<"x-super-stream">>, bool, true}],
    DeclareExchange =
        #'exchange.declare'{exchange = SuperStream,
                            type = <<"direct">>,
                            durable = true,
                            arguments = ArgumentsExchange},
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, DeclareExchange),
    ok.

delete(Channel, SuperStream) ->
    DeleteExchange = #'exchange.delete'{exchange = SuperStream},
    #'exchange.delete_ok'{} = amqp_channel:call(Channel, DeleteExchange),
    ok.

declare_and_bind_partition(Channel, SuperStream, RoutingKey) ->
    Queue = queue_name(SuperStream, RoutingKey),
    PartitionIndex = 1,
    ArgumentsQueue =
        [{<<"x-queue-type">>, longstr, <<"stream">>},
         {<<"x-stream-partition-order">>, long, PartitionIndex},
         {<<"x-queue-leader-locator">>, longstr, <<"balanced">>}],
    DeclareQueue =
        #'queue.declare'{queue = Queue,
                         durable = true,
                         arguments = ArgumentsQueue},
    #'queue.declare_ok'{} = amqp_channel:call(Channel, DeclareQueue),

    Binding =
        #'queue.bind'{queue = Queue,
                      exchange = SuperStream,
                      routing_key = RoutingKey},
    #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
    ok.

delete_partition(Channel, SuperStream, RoutingKey) ->
    Queue = queue_name(SuperStream, RoutingKey),
    DeleteQueue = #'queue.delete'{queue = Queue},
    #'queue.delete_ok'{} = amqp_channel:call(Channel, DeleteQueue),
    ok.

queue_name(SuperStream, RoutingKey) ->
    <<SuperStream/binary, "-", RoutingKey/binary>>.