src/lake.erl

%%
%% @doc Connect to and use RabbitMQ Streams.
%%
-module(lake).

-export([
    tls_connect/4,
    tls_connect/5,
    tls_connect/6,
    connect/4,
    connect/5,
    connect/6,
    stop/1
]).

-export([
    declare_publisher/4,
    publish_sync/3,
    publish_async/3,
    query_publisher_sequence/3,
    delete_publisher/2,
    credit_async/3,
    create/3,
    delete/2,
    subscribe/6,
    store_offset/4,
    query_offset/3,
    unsubscribe/2,
    metadata/2,
    route/3,
    partitions/2,
    stream_stats/2,
    consumer_update_response/4
]).

-export([chunk_to_messages/1]).

%% @doc Establish a connection via TLS.
%% @see tls_connect/5
%% @see tls_connect/6
tls_connect(Host, User, Password, Vhost) ->
    tls_connect(Host, 5551, User, Password, Vhost).

%% @doc Establish a connection via TLS.
%% @see tls_connect/4
%% @see tls_connect/6
tls_connect(Host, Port, User, Password, Vhost) ->
    tls_connect(Host, Port, User, Password, Vhost, []).

%% @doc Establish a connection via TLS.
%%
%% `Options' supports the options `ssl:connect/6' offers, and also the
%% following:
%%
%% * `{exchange_command_versions, boolean()}': Enable exchanging command
%%   versions. Requires RabbitMQ 3.11. Defaults to `true'.
%%
%% @see tls_connect/4
%% @see tls_connect/5
tls_connect(Host, Port, User, Password, Vhost, Options) ->
    lake_connection:tls_connect(lake_utils:normalize_host(Host), Port, User, Password, Vhost, Options).

%% @doc Establish a connection.
%% @see connect/5
%% @see connect/6
connect(Host, User, Password, Vhost) ->
    connect(Host, 5552, User, Password, Vhost).

%%
%% @doc Establish a connection.
%% @see connect/4
%% @see connect/6
connect(Host, Port, User, Password, Vhost) ->
    connect(Host, Port, User, Password, Vhost, []).

%%
%% @doc Establish a connection.
%%
%% `Options' supports the options `ssl:connect/6' offers, and also the
%% following:
%%
%% * `{exchange_command_versions, boolean()}': Enable exchanging command
%%   versions. Requires RabbitMQ 3.11. Defaults to `true'.
%% @see connect/4
%% @see connect/5
connect(Host, Port, User, Password, Vhost, Options) ->
    lake_connection:connect(lake_utils:normalize_host(Host), Port, User, Password, Vhost, Options).

%% @doc Stop a connection.
stop(Connection) ->
    lake_connection:stop(Connection).

%%
%% @doc Declare a publisher.
%%
declare_publisher(Connection, Stream, PublisherId, PublisherReference) when
    is_binary(Stream), is_integer(PublisherId), is_binary(PublisherReference)
->
    lake_connection:declare_publisher(Connection, Stream, PublisherId, PublisherReference).

%%
%% @doc Publish a message synchronously.
%%
publish_sync(Connection, PublisherId, Messages) when is_integer(PublisherId), is_list(Messages) ->
    lake_connection:publish_sync(Connection, PublisherId, Messages).

%%
%% @doc Publish a message asynchronously.
%%
publish_async(Connection, PublisherId, Messages) when is_integer(PublisherId), is_list(Messages) ->
    lake_connection:publish_async(Connection, PublisherId, Messages).

%%
%% @doc Query a publisher's sequence.
%%
query_publisher_sequence(Connection, PublisherReference, Stream) when
    is_binary(PublisherReference), is_binary(Stream)
->
    lake_connection:query_publisher_sequence(Connection, PublisherReference, Stream).

%%
%% @doc Delete a publisher.
%%
delete_publisher(Connection, PublisherId) when is_integer(PublisherId) ->
    lake_connection:delete_publisher(Connection, PublisherId).

%%
%% @doc Set a subscription's credit asynchronously.
%%
credit_async(Connection, SubscriptionId, Credit) when
    is_integer(SubscriptionId), is_integer(Credit)
->
    lake_connection:credit_async(Connection, SubscriptionId, Credit).

%%
%% @doc Create a new stream.
%%
create(Connection, Stream, Arguments) when is_binary(Stream), is_list(Arguments) ->
    lake_connection:create(Connection, Stream, Arguments).

%%
%% @doc Delete a stream.
%%
delete(Connection, Stream) when is_binary(Stream) ->
    lake_connection:delete(Connection, Stream).

%%
%% @doc Subscribe to a stream.
%%
subscribe(Connection, Stream, SubscriptionId, OffsetDefinition, Credit, Properties) when
    is_binary(Stream),
    is_integer(SubscriptionId),
    (is_tuple(OffsetDefinition) or is_atom(OffsetDefinition)),
    is_integer(Credit),
    is_list(Properties)
->
    lake_connection:subscribe(
        Connection,
        Stream,
        SubscriptionId,
        OffsetDefinition,
        Credit,
        Properties
    ).

%%
%% @doc Unsubscribe from a stream.
%%
unsubscribe(Connection, SubscriptionId) when is_integer(SubscriptionId) ->
    lake_connection:unsubscribe(Connection, SubscriptionId).

%%
%% @doc Store a publisher's offset to the stream.
%%
%% FIXME store_offset/4 is fire-and-forget; maybe the name should indicate that?
%%
store_offset(Connection, PublisherReference, Stream, Offset) when
    is_binary(PublisherReference), is_binary(Stream), is_integer(Offset)
->
    lake_connection:store_offset(Connection, PublisherReference, Stream, Offset).

%%
%% @doc Query a publisher's stored offset from the stream.
%%
query_offset(Connection, PublisherReference, Stream) when
    is_binary(PublisherReference), is_binary(Stream)
->
    lake_connection:query_offset(Connection, PublisherReference, Stream).

%%
%% @doc Retrieve stream metadata such as endpoints and replicas.
%%
metadata(Connection, Streams) when is_list(Streams) ->
    lake_connection:metadata(Connection, Streams).

%%
%% @doc Convert a delivered osiris chunk into a list of messages.
%%
chunk_to_messages(OsirisChunk) ->
    lake_messages:chunk_to_messages(OsirisChunk).

%%
%% @doc Determine the partition for `RoutingKey'.
%%
route(Connection, RoutingKey, SuperStream) when is_binary(RoutingKey), is_binary(SuperStream) ->
    lake_connection:route(Connection, RoutingKey, SuperStream).

%%
%% @doc Retrieve all partitions of `SuperStream'.
%%
partitions(Connection, SuperStream) when is_binary(SuperStream) ->
    lake_connection:partitions(Connection, SuperStream).

%%
%% @doc Respond to a `consumer_update' request.
%%
consumer_update_response(Connection, CorrelationId, ResponseCode, OffsetSpecification) ->
    lake_connection:consumer_update_response(Connection, CorrelationId, ResponseCode, OffsetSpecification).

%%
%% @doc Retrieve statistics for `Stream'.
%%
stream_stats(Connection, Stream) when is_binary(Stream) ->
    lake_connection:stream_stats(Connection, Stream).