%%
%% @doc Connect to and use RabbitMQ Streams.
%%
-module(lake).
-export([
tls_connect/4,
tls_connect/5,
tls_connect/6,
connect/4,
connect/5,
connect/6,
stop/1
]).
-export([
declare_publisher/4,
publish_sync/3,
publish_async/3,
query_publisher_sequence/3,
delete_publisher/2,
credit_async/3,
create/3,
delete/2,
subscribe/6,
store_offset/4,
query_offset/3,
unsubscribe/2,
metadata/2,
route/3,
partitions/2,
stream_stats/2,
consumer_update_response/4
]).
-export([chunk_to_messages/1]).
%% @doc Establish a connection via TLS.
%% @see tls_connect/5
%% @see tls_connect/6
tls_connect(Host, User, Password, Vhost) ->
tls_connect(Host, 5551, User, Password, Vhost).
%% @doc Establish a connection via TLS.
%% @see tls_connect/4
%% @see tls_connect/6
tls_connect(Host, Port, User, Password, Vhost) ->
tls_connect(Host, Port, User, Password, Vhost, []).
%% @doc Establish a connection via TLS.
%%
%% `Options' supports the options `ssl:connect/6' offers, and also the
%% following:
%%
%% * `{exchange_command_versions, boolean()}': Enable exchanging command
%% versions. Requires RabbitMQ 3.11. Defaults to `true'.
%%
%% @see tls_connect/4
%% @see tls_connect/5
tls_connect(Host, Port, User, Password, Vhost, Options) ->
lake_connection:tls_connect(lake_utils:normalize_host(Host), Port, User, Password, Vhost, Options).
%% @doc Establish a connection.
%% @see connect/5
%% @see connect/6
connect(Host, User, Password, Vhost) ->
connect(Host, 5552, User, Password, Vhost).
%%
%% @doc Establish a connection.
%% @see connect/4
%% @see connect/6
connect(Host, Port, User, Password, Vhost) ->
connect(Host, Port, User, Password, Vhost, []).
%%
%% @doc Establish a connection.
%%
%% `Options' supports the options `ssl:connect/6' offers, and also the
%% following:
%%
%% * `{exchange_command_versions, boolean()}': Enable exchanging command
%% versions. Requires RabbitMQ 3.11. Defaults to `true'.
%% @see connect/4
%% @see connect/5
connect(Host, Port, User, Password, Vhost, Options) ->
lake_connection:connect(lake_utils:normalize_host(Host), Port, User, Password, Vhost, Options).
%% @doc Stop a connection.
stop(Connection) ->
lake_connection:stop(Connection).
%%
%% @doc Declare a publisher.
%%
declare_publisher(Connection, Stream, PublisherId, PublisherReference) when
is_binary(Stream), is_integer(PublisherId), is_binary(PublisherReference)
->
lake_connection:declare_publisher(Connection, Stream, PublisherId, PublisherReference).
%%
%% @doc Publish a message synchronously.
%%
publish_sync(Connection, PublisherId, Messages) when is_integer(PublisherId), is_list(Messages) ->
lake_connection:publish_sync(Connection, PublisherId, Messages).
%%
%% @doc Publish a message asynchronously.
%%
publish_async(Connection, PublisherId, Messages) when is_integer(PublisherId), is_list(Messages) ->
lake_connection:publish_async(Connection, PublisherId, Messages).
%%
%% @doc Query a publisher's sequence.
%%
query_publisher_sequence(Connection, PublisherReference, Stream) when
is_binary(PublisherReference), is_binary(Stream)
->
lake_connection:query_publisher_sequence(Connection, PublisherReference, Stream).
%%
%% @doc Delete a publisher.
%%
delete_publisher(Connection, PublisherId) when is_integer(PublisherId) ->
lake_connection:delete_publisher(Connection, PublisherId).
%%
%% @doc Set a subscription's credit asynchronously.
%%
credit_async(Connection, SubscriptionId, Credit) when
is_integer(SubscriptionId), is_integer(Credit)
->
lake_connection:credit_async(Connection, SubscriptionId, Credit).
%%
%% @doc Create a new stream.
%%
create(Connection, Stream, Arguments) when is_binary(Stream), is_list(Arguments) ->
lake_connection:create(Connection, Stream, Arguments).
%%
%% @doc Delete a stream.
%%
delete(Connection, Stream) when is_binary(Stream) ->
lake_connection:delete(Connection, Stream).
%%
%% @doc Subscribe to a stream.
%%
subscribe(Connection, Stream, SubscriptionId, OffsetDefinition, Credit, Properties) when
is_binary(Stream),
is_integer(SubscriptionId),
(is_tuple(OffsetDefinition) or is_atom(OffsetDefinition)),
is_integer(Credit),
is_list(Properties)
->
lake_connection:subscribe(
Connection,
Stream,
SubscriptionId,
OffsetDefinition,
Credit,
Properties
).
%%
%% @doc Unsubscribe from a stream.
%%
unsubscribe(Connection, SubscriptionId) when is_integer(SubscriptionId) ->
lake_connection:unsubscribe(Connection, SubscriptionId).
%%
%% @doc Store a publisher's offset to the stream.
%%
%% FIXME store_offset/4 is fire-and-forget; maybe the name should indicate that?
%%
store_offset(Connection, PublisherReference, Stream, Offset) when
is_binary(PublisherReference), is_binary(Stream), is_integer(Offset)
->
lake_connection:store_offset(Connection, PublisherReference, Stream, Offset).
%%
%% @doc Query a publisher's stored offset from the stream.
%%
query_offset(Connection, PublisherReference, Stream) when
is_binary(PublisherReference), is_binary(Stream)
->
lake_connection:query_offset(Connection, PublisherReference, Stream).
%%
%% @doc Retrieve stream metadata such as endpoints and replicas.
%%
metadata(Connection, Streams) when is_list(Streams) ->
lake_connection:metadata(Connection, Streams).
%%
%% @doc Convert a delivered osiris chunk into a list of messages.
%%
chunk_to_messages(OsirisChunk) ->
lake_messages:chunk_to_messages(OsirisChunk).
%%
%% @doc Determine the partition for `RoutingKey'.
%%
route(Connection, RoutingKey, SuperStream) when is_binary(RoutingKey), is_binary(SuperStream) ->
lake_connection:route(Connection, RoutingKey, SuperStream).
%%
%% @doc Retrieve all partitions of `SuperStream'.
%%
partitions(Connection, SuperStream) when is_binary(SuperStream) ->
lake_connection:partitions(Connection, SuperStream).
%%
%% @doc Respond to a `consumer_update' request.
%%
consumer_update_response(Connection, CorrelationId, ResponseCode, OffsetSpecification) ->
lake_connection:consumer_update_response(Connection, CorrelationId, ResponseCode, OffsetSpecification).
%%
%% @doc Retrieve statistics for `Stream'.
%%
stream_stats(Connection, Stream) when is_binary(Stream) ->
lake_connection:stream_stats(Connection, Stream).