README.md

<div align="center">
  <h1><code>fluvio-ex</code></h1>

  <strong>💧Elixir client for <a href="https://www.fluvio.io/">Fluvio</a> streaming platform</strong>

  <p></p>
  <p>
    <a href="https://github.com/viniarck/fluvio-ex/actions/workflows/unit_tests.yml"><img src="https://github.com/viniarck/fluvio-ex/actions/workflows/unit_tests.yml/badge.svg" alt="unit tests status" /></a>
    <a href="https://coveralls.io/r/viniarck/excoveralls?branch=master"><img src="https://coveralls.io/repos/viniarck/fluvio-ex/badge.svg?branch=master" alt="test coverage" /></a>
    <a href="https://hex.pm/packages/fluvio"><img src="https://img.shields.io/hexpm/v/fluvio.svg" alt="hex.pm version" /></a>
    <a href="https://hex.pm/packages/fluvio"><img src="https://img.shields.io/hexpm/dt/fluvio.svg" alt="hex.pm downloads" /></a>
  </p>


  <h3>
    <a href="https://hexdocs.pm/fluvio/Fluvio.html">Docs</a>
  </h3>
</div>


## Installation

You can add `fluvio` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:fluvio, "~> 0.2.0"}
  ]
end
```
## Features

The following Fluvio Rust abstractions are supported on Elixir:

| **Rust**                    | **Elixir**        |
|-----------------------------|-------------------|
| `fluvio::PartitionConsumer` | `Fluvio.Consumer` |
| `fluvio::TopicProducer`     | `Fluvio.Producer` |
| `fluvio::FluvioAdmin`       | `Fluvio.Admin`    |

Each Elixir abstraction tend to provide equivalent functionalities that [`fluvio` crate exposes](https://docs.rs/fluvio/latest/fluvio/), although more Elixir-oriented to be well integrated with Elixir ecosystem and OTP. `fluvio::FluvioAdmin` is minimally supported since provisioning cluster resources is typically done upfront and not at the application level, but for experimentation it's useful to have `Fluvio.Admin`.

## Examples

### Consumer

This snippet illustrates a `Consumer` connected to a `"lobby"` topic and using an optional SmarModule filter. Initially, this consumer calls `Consumer.stream_unfold/2` to lazily take 4 reords and chunk every 2 records, printing them. After that, keeps executing `Consumer.stream_each/2` consuming the stream. `Consumer.stream_each` is a higher-level function that recursively calls `Consumer.stream_next/2`.

```elixir
alias Fluvio.Consumer

{:ok, pid} =
  Consumer.start_link(%{
    topic: "lobby",
    offset: [from_beginning: 0],
    smartmodule_path: "./examples/wasm/map_reverse.wasm"
  })

Consumer.stream_unfold(pid)
|> Stream.take(4)
|> Stream.chunk_every(2)
|> Enum.to_list()
|> IO.inspect()

Consumer.stream_each(pid, fn result ->
  case result do
    {:ok, record} -> IO.inspect(record)
    {:error, msg} -> IO.inspect("Error: #{msg}")
  end
end)
```

### Producer

This snippet illustrates a `Producer` connected to a `"lobby"` topic. Initially, a string "hello" is sent and flushed, after that, more three strigs `"are"`, `"you"`, `"there?"` are sent and asserted. Finally, integers from 1 to 20 mapped as strings are sent asynchronously in chunk of 10 performing a flush between each iteration. If you're going to send and await records asynchronusly, make sure that it's suitable and benefitial to your application:


```elixir
alias Fluvio.Producer

{:ok, pid} = Producer.start_link(%{topic: "lobby"})

{:ok, _} = Producer.send(pid, "hello")
{:ok, _} = Producer.flush(pid)

Producer.send(pid, ["are", "you", "there?"])
|> Enum.each(&({:ok, _} = &1))

{:ok, _} = Producer.flush(pid)

[] =
  1..20
  |> Stream.chunk_every(10)
  |> Stream.flat_map(fn chunk ->
    [
      chunk
      |> Enum.map(fn value ->
        Task.async(fn -> {Producer.send(pid, to_string(value)), value} end)
      end)
      |> Task.await_many()
      |> Enum.filter(&match?({{:error, _msg}, _value}, &1)),
      [{Producer.flush(pid), :flush}]
      |> Enum.filter(&match?({{:error, _msg}, _value}, &1))
    ]
  end)
  |> Stream.concat()
  |> Enum.to_list()
```