src/superstream.erl

-module(superstream).

-export([declare/2, delete/2, declare_and_bind_partition/3, delete_partition/3]).

-include_lib("amqp_client/include/amqp_client.hrl").

declare(Channel, SuperStream) ->
    ArgumentsExchange = [{<<"x-super-stream">>, bool, true}],
    DeclareExchange = #'exchange.declare'{exchange = SuperStream, type = <<"direct">>, durable = true, arguments = ArgumentsExchange},
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, DeclareExchange),
    ok.

delete(Channel, SuperStream) ->
    DeleteExchange = #'exchange.delete'{exchange = SuperStream},
    #'exchange.delete_ok'{} = amqp_channel:call(Channel, DeleteExchange),
    ok.

declare_and_bind_partition(Channel, SuperStream, RoutingKey) ->
    Queue = queue_name(SuperStream, RoutingKey),
    PartitionIndex = 1,
    ArgumentsQueue = [
        {<<"x-queue-type">>, longstr, <<"stream">>},
        {<<"x-stream-partition-order">>, long, PartitionIndex},
        {<<"x-queue-leader-locator">>, longstr, <<"balanced">>}
    ],
    DeclareQueue = #'queue.declare'{queue = Queue, durable = true, arguments = ArgumentsQueue},
    #'queue.declare_ok'{} = amqp_channel:call(Channel, DeclareQueue),

    Binding = #'queue.bind'{queue       = Queue,
                            exchange    = SuperStream,
                            routing_key = RoutingKey},
    #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
    ok.

delete_partition(Channel, SuperStream, RoutingKey) ->
    Queue = queue_name(SuperStream, RoutingKey),
    DeleteQueue = #'queue.delete'{queue = Queue},
    #'queue.delete_ok'{} = amqp_channel:call(Channel, DeleteQueue),
    ok.

queue_name(SuperStream, RoutingKey) ->
    <<SuperStream/binary, "-", RoutingKey/binary>>.