README.md

# lake

An Erlang client for RabbitMQ's [stream plugin](https://www.rabbitmq.com/stream.html).

A stream feeds into a lake, and a lake feeds into a stream.

## Installation

### Rebar3

```erlang
%% rebar.config
{deps, [lake]}.
```

### Mix

```elixir
defp deps do
  [
      {:lake, "~> 0.1"}
  ]
end
```

## Usage

```erlang
example() ->
    {ok, Connection} = lake:connect(host(), port(), <<"guest">>, <<"guest">>, <<"/">>),
    Stream = <<"my-stream">>,
    ok = lake:create(Connection, Stream, []),
    ok = lake:subscribe(Connection, Stream, SubscriptionId = 1, first, 1000, []),
    ok = lake:declare_publisher(Connection, Stream, PublisherId = 1, <<"my-publisher">>),
    [{1, ok}] = lake:publish_sync(Connection, PublisherId = 1, [{_PublishingId = 1, <<"Hello, World!">>}]),
    {ok, {[Message], _}} =
        receive
            {deliver_v2, _ResponseCode, _CommittedChunkId, OsirisChunk} ->
                lake:chunk_to_messages(OsirisChunk)
        after 5000 ->
            exit(timeout)
        end,
    io:format("Received: ~p~n", [Message]),
    ok = lake:unsubscribe(Connection, SubscriptionId),
    ok = lake:delete_publisher(Connection, PublisherId),
    ok = lake:delete(Connection, Stream),
    ok = lake:stop(Connection),
    ok.
```

## Build

```
$ rebar3 compile
```

## Running Tests

```
$ docker-compose up -d
$ RABBITMQ_HOST=172.18.0.2 rebar3 eunit ct
```

## Benchmark

A very simple benchmark is available in the `benchmark/` directory. It runs
one publisher and one producer at a fixed rate with a fixed payload. To
inspect the benchmark, use RabbitMQ's management interface with statistics
enabled. The `Dockerfile` provided with this repository can be used for this.

```
rebar3 escriptize && _build/default/bin/benchmark streams://172.24.0.2:5552 guest guest "/"
===> Verifying dependencies...
===> App lake is a checkout dependency and cannot be locked.
===> Analyzing applications...
===> Compiling lake
===> Analyzing applications...
===> Compiling benchmark
===> Building escript for benchmark...
Running benchmark with messages of size 350B and one publisher, one subscriber. Hit enter to stop
Read measured rate: 0.0
Will publish 25000 messages at 350 bytes each.
Read measured rate: 2.5e4
Will publish 25000 messages at 350 bytes each.
...
```

## Implemented Messages

See the [protocol documentation](https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc):

* [x] DeclarePublisher
* [x] Publish
* [x] PublishConfirm
* [x] PublishError
* [x] QueryPublisherSequence
* [x] DeletePublisher
* [x] Subscribe
* [x] Deliver
* [x] Credit
* [x] StoreOffset
* [x] QueryOffset
* [x] Unsubscribe
* [x] Create
* [x] Delete
* [x] Metadata
* [x] MetadataUpdate
* [x] PeerProperties
* [x] SaslHandshake
* [x] SaslAuthenticate
* [x] Tune
* [x] Open
* [x] Close
* [x] Heartbeat
* [x] Route
* [x] Partitions
* [x] ConsumerUpdate
* [x] ExchangeCommandVersions
* [x] StreamStats
* [ ] CreateSuperStream
* [ ] DeleteSuperStream

## Random TODOs

* [ ] A `superstream_publisher` should be implemented to hide the SuperStream's partitions
* [ ] _Single Active Consumer_ should be supported (see [here](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams))
* [ ] Encoding should not happen in the Connection - Encoding crashes must only crash the caller

## License

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.