-module(superstream).
-export([declare/2, delete/2, declare_and_bind_partition/3, delete_partition/3]).
-include_lib("amqp_client/include/amqp_client.hrl").
declare(Channel, SuperStream) ->
ArgumentsExchange = [{<<"x-super-stream">>, bool, true}],
DeclareExchange = #'exchange.declare'{exchange = SuperStream, type = <<"direct">>, durable = true, arguments = ArgumentsExchange},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, DeclareExchange),
ok.
delete(Channel, SuperStream) ->
DeleteExchange = #'exchange.delete'{exchange = SuperStream},
#'exchange.delete_ok'{} = amqp_channel:call(Channel, DeleteExchange),
ok.
declare_and_bind_partition(Channel, SuperStream, RoutingKey) ->
Queue = queue_name(SuperStream, RoutingKey),
PartitionIndex = 1,
ArgumentsQueue = [
{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-stream-partition-order">>, long, PartitionIndex},
{<<"x-queue-leader-locator">>, longstr, <<"balanced">>}
],
DeclareQueue = #'queue.declare'{queue = Queue, durable = true, arguments = ArgumentsQueue},
#'queue.declare_ok'{} = amqp_channel:call(Channel, DeclareQueue),
Binding = #'queue.bind'{queue = Queue,
exchange = SuperStream,
routing_key = RoutingKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
ok.
delete_partition(Channel, SuperStream, RoutingKey) ->
Queue = queue_name(SuperStream, RoutingKey),
DeleteQueue = #'queue.delete'{queue = Queue},
#'queue.delete_ok'{} = amqp_channel:call(Channel, DeleteQueue),
ok.
queue_name(SuperStream, RoutingKey) ->
<<SuperStream/binary, "-", RoutingKey/binary>>.