src/lake.erl

%%
%% @doc Connect to and use RabbitMQ Streams.
%%
-module(lake).

-export([tls_connect/4,
         tls_connect/5,
         tls_connect/6,
         connect/4,
         connect/5,
         connect/6,
         stop/1]).
-export([declare_publisher/4,
         publish_sync/3,
         publish_async/3,
         query_publisher_sequence/3,
         delete_publisher/2,
         credit_async/3,
         create/3,
         delete/2,
         subscribe/6,
         store_offset/4,
         query_offset/3,
         unsubscribe/2,
         metadata/2,
         route/3,
         partitions/2,
         stream_stats/2,
         consumer_update_response/4]).
-export([chunk_to_messages/1]).

%% @doc Establish a connection via TLS.
%% @see tls_connect/5
%% @see tls_connect/6
tls_connect(Host, User, Password, Vhost) ->
    tls_connect(Host, 5551, User, Password, Vhost).

%% @doc Establish a connection via TLS.
%% @see tls_connect/4
%% @see tls_connect/6
tls_connect(Host, Port, User, Password, Vhost) ->
    tls_connect(Host, Port, User, Password, Vhost, []).

%% @doc Establish a connection via TLS.
%%
%% `Options' supports the options `ssl:connect/2' offers, and also the
%% following:
%%
%% * `{exchange_command_versions, boolean()}': Enable exchanging command
%%   versions. Requires RabbitMQ 3.11. Defaults to `true'.
%%
%% @see tls_connect/4
%% @see tls_connect/5
tls_connect(Host, Port, User, Password, Vhost, Options) ->
    lake_connection:tls_connect(lake_utils:normalize_host(Host), Port, User, Password, Vhost, Options).

%% @doc Establish a connection.
%% @see connect/5
%% @see connect/6
connect(Host, User, Password, Vhost) ->
    connect(Host, 5552, User, Password, Vhost).

%%
%% @doc Establish a connection.
%% @see connect/4
%% @see connect/6
connect(Host, Port, User, Password, Vhost) ->
    connect(Host, Port, User, Password, Vhost, []).

%%
%% @doc Establish a connection.
%%
%% `Options' supports the options `gen_tcp:connect/2' offers, and also the
%% following:
%%
%% * `{exchange_command_versions, boolean()}': Enable exchanging command
%%   versions. Requires RabbitMQ 3.11. Defaults to `true'.
%% @see connect/4
%% @see connect/5
connect(Host, Port, User, Password, Vhost, Options) ->
    lake_connection:connect(lake_utils:normalize_host(Host), Port, User, Password, Vhost, Options).

%% @doc Stop a connection.
stop(Connection) ->
    lake_connection:stop(Connection).

%%
%% @doc Declare a publisher.
%%
declare_publisher(Connection, Stream, PublisherId, PublisherReference)
    when is_binary(Stream), is_integer(PublisherId), is_binary(PublisherReference) ->
    lake_connection:declare_publisher(Connection, Stream, PublisherId, PublisherReference).

%%
%% @doc Publish a message synchronously.
%%
publish_sync(Connection, PublisherId, Messages) when is_integer(PublisherId), is_list(Messages) ->
    lake_connection:publish_sync(Connection, PublisherId, Messages).

%%
%% @doc Publish a message asynchronously.
%%
publish_async(Connection, PublisherId, Messages) when is_integer(PublisherId), is_list(Messages) ->
    lake_connection:publish_async(Connection, PublisherId, Messages).

%%
%% @doc Query a publisher's sequence.
%%
query_publisher_sequence(Connection, PublisherReference, Stream)
    when is_binary(PublisherReference), is_binary(Stream) ->
    lake_connection:query_publisher_sequence(Connection, PublisherReference, Stream).

%%
%% @doc Delete a publisher.
%%
delete_publisher(Connection, PublisherId) when is_integer(PublisherId) ->
    lake_connection:delete_publisher(Connection, PublisherId).

%%
%% @doc Set a subscription's credit asynchronously.
%%
credit_async(Connection, SubscriptionId, Credit)
    when is_integer(SubscriptionId), is_integer(Credit) ->
    lake_connection:credit_async(Connection, SubscriptionId, Credit).

%%
%% @doc Create a new stream.
%%
create(Connection, Stream, Arguments) when is_binary(Stream), is_list(Arguments) ->
    lake_connection:create(Connection, Stream, Arguments).

%%
%% @doc Delete a stream.
%%
delete(Connection, Stream) when is_binary(Stream) ->
    lake_connection:delete(Connection, Stream).

%%
%% @doc Subscribe to a stream.
%%
subscribe(Connection, Stream, SubscriptionId, OffsetDefinition, Credit, Properties)
    when is_binary(Stream), is_integer(SubscriptionId),
         is_tuple(OffsetDefinition) or is_atom(OffsetDefinition), is_integer(Credit), is_list(Properties) ->
    lake_connection:subscribe(Connection, Stream, SubscriptionId, OffsetDefinition, Credit, Properties).

%%
%% @doc Unsubscribe from a stream.
%%
unsubscribe(Connection, SubscriptionId) when is_integer(SubscriptionId) ->
    lake_connection:unsubscribe(Connection, SubscriptionId).

%%
%% @doc Store a publisher's offset to the stream.
%%
%% FIXME store_offset/4 is fire-and-forget; maybe the name should indicate that?
%%
store_offset(Connection, PublisherReference, Stream, Offset)
    when is_binary(PublisherReference), is_binary(Stream), is_integer(Offset) ->
    lake_connection:store_offset(Connection, PublisherReference, Stream, Offset).

%%
%% @doc Query a publisher's stored offset from the stream.
%%
query_offset(Connection, PublisherReference, Stream)
    when is_binary(PublisherReference), is_binary(Stream) ->
    lake_connection:query_offset(Connection, PublisherReference, Stream).

%%
%% @doc Retrieve stream metadata such as endpoints and replicas.
%%
metadata(Connection, Streams) when is_list(Streams) ->
    lake_connection:metadata(Connection, Streams).

%%
%% @doc Convert a delivered osiris chunk into a list of messages.
%%
chunk_to_messages(OsirisChunk) ->
    lake_messages:chunk_to_messages(OsirisChunk).

%%
%% @doc Determine the partition for `RoutingKey'.
%%
route(Connection, RoutingKey, SuperStream) when is_binary(RoutingKey), is_binary(SuperStream) ->
    lake_connection:route(Connection, RoutingKey, SuperStream).

%%
%% @doc Retrieve all partitions of `SuperStream'.
%%
partitions(Connection, SuperStream) when is_binary(SuperStream) ->
    lake_connection:partitions(Connection, SuperStream).

%%
%% @doc Respond to a `consumer_update' request.
%%
consumer_update_response(Connection, CorrelationId, ResponseCode, OffsetSpecification) ->
    lake_connection:consumer_update_response(Connection,
                                             CorrelationId,
                                             ResponseCode,
                                             OffsetSpecification).

%%
%% @doc Retrieve statistics for `Stream'.
%%
stream_stats(Connection, Stream) when is_binary(Stream) ->
    lake_connection:stream_stats(Connection, Stream).