README.md

<div align="center">
  <h1><code>fluvio-ex</code></h1>

  <strong>💧Elixir client for <a href="https://www.fluvio.io/">Fluvio</a> streaming platform</strong>

  <p></p>
  <p>
    <a href="https://github.com/viniarck/fluvio-ex/actions/workflows/unit_tests.yml"><img src="https://github.com/viniarck/fluvio-ex/actions/workflows/unit_tests.yml/badge.svg" alt="unit tests status" /></a>
    <a href="https://coveralls.io/r/viniarck/fluvio-ex?branch=master"><img src="https://coveralls.io/repos/viniarck/fluvio-ex/badge.svg?branch=master" alt="test coverage" /></a>
    <a href="https://hex.pm/packages/fluvio"><img src="https://img.shields.io/hexpm/v/fluvio.svg" alt="hex.pm version" /></a>
    <a href="https://hex.pm/packages/fluvio"><img src="https://img.shields.io/hexpm/dt/fluvio.svg" alt="hex.pm downloads" /></a>
  </p>


  <h3>
    <a href="https://hexdocs.pm/fluvio/readme.html">Docs</a>
  </h3>
</div>


## Installation

You can add `fluvio` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:fluvio, "~> 0.2"}
  ]
end
```
## Features

The following Fluvio Rust interfaces are supported on Elixir:

| **Rust**                    | **Elixir**        |
|-----------------------------|-------------------|
| `fluvio::PartitionConsumer` | `Fluvio.Consumer` |
| `fluvio::TopicProducer`     | `Fluvio.Producer` |
| `fluvio::FluvioAdmin`       | `Fluvio.Admin`    |

Each Elixir module tends to provide equivalent functionalities that [`fluvio` crate](https://docs.rs/fluvio/latest/fluvio/) exposes, although Elixir-oriented to be well integrated with Elixir ecosystem and OTP. 

> `fluvio::FluvioAdmin` is minimally supported since provisioning cluster resources is typically done upfront and not at the application level, but for experimentation it's useful to have `Fluvio.Admin`.

## Examples

### Consumer

[This snippet](./examples/consumer_smartmodule.exs) illustrates a `Fluvio.Consumer` connected to a `"lobby"` topic and using an optional SmartModule filter. Initially, it lazily unfolds the infinite stream, taking 4 records and chunking every 2. Finally, it keeps consuming it.

```elixir
alias Fluvio.Consumer

{:ok, pid} =
  Consumer.start_link(%{
    topic: "lobby",
    offset: [from_beginning: 0],
    smartmodule_path: "./examples/wasm/map_reverse.wasm"
  })

Consumer.stream_unfold(pid)
|> Stream.take(4)
|> Stream.chunk_every(2)
|> Enum.to_list()
|> IO.inspect()

Consumer.stream_each(pid, fn result ->
  case result do
    {:ok, record} -> IO.inspect(record)
    {:error, msg} -> IO.inspect("Error: #{msg}")
  end
end)
```

```console
MIX_ENV=prod mix run examples/consumer_smartmodule.exs
```

### Producer

[This snippet](./examples/producer.exs) illustrates a `Fluvio.Producer` connected to a `"lobby"` topic. Initially, a string value `"hello"` is sent and flushed synchronously. Also, 20 values are sent asynchronously and flushed in chunks of 10.

```elixir
alias Fluvio.Producer

{:ok, pid} = Producer.start_link(%{topic: "lobby"})

{:ok, _} = Producer.send(pid, "hello")
{:ok, _} = Producer.flush(pid)

[] =
  1..20
  |> Stream.chunk_every(10)
  |> Stream.flat_map(fn chunk ->
    [
      chunk
      |> Enum.map(fn value ->
        Task.async(fn -> {Producer.send(pid, to_string(value)), value} end)
      end)
      |> Task.await_many()
      |> Enum.filter(&match?({{:error, _msg}, _value}, &1)),
      [{Producer.flush(pid), :flush}]
      |> Enum.filter(&match?({{:error, _msg}, _value}, &1))
    ]
  end)
  |> Stream.concat()
  |> Enum.to_list()
```

```console
MIX_ENV=prod mix run examples/producer.exs
```

### Supervised Producer and Consumer

[This example](./examples/ping_pong.exs) demonstrates a ping pong application that uses two pairs of a `Fluvio.Producer` and `Fluvio.Consumer`, which have been linked to an Elixir `Supervisor` (`Task.Supervisor`) to provide process fault-tolerance inside your app. You could also strategically restart and init with a different `Fluvio.Consumer` offset.

```elixir
alias Fluvio.Producer
alias Fluvio.Consumer
alias Fluvio.Record

defmodule App do
  def start(topic_1 \\ "ping", topic_2 \\ "pong", initial_value \\ "1") do
    children = [{Task.Supervisor, name: TaskSup}]
    {:ok, sup_pid} = Supervisor.start_link(children, strategy: :one_for_one)

    {:ok, pid_one} =
      Task.Supervisor.start_child(
        TaskSup,
        fn ->
          {:ok, p1_pid} = Producer.start_link(%{topic: topic_1})
          {:ok, c2_pid} = Consumer.start_link(%{topic: topic_2, offset: [from_end: 0]})
          IO.inspect("Bootstrapping '#{topic_1}' by sending '#{initial_value}'")
          {:ok, _} = Producer.send(p1_pid, initial_value)
          {:ok, _} = Producer.flush(p1_pid)
          App.PingPong.keep_consuming(c2_pid, p1_pid)
        end,
        restart: :permanent
      )

    {:ok, pid_two} =
      Task.Supervisor.start_child(
        TaskSup,
        fn ->
          {:ok, p2_pid} = Producer.start_link(%{topic: topic_2})
          {:ok, c1_pid} = Consumer.start_link(%{topic: topic_1, offset: [from_end: 0]})
          App.PingPong.keep_consuming(c1_pid, p2_pid)
        end,
        restart: :permanent
      )

    {sup_pid, pid_one, pid_two}
  end

  defmodule PingPong do
    defp produce(p_pid, value) do
      IO.inspect("Producing value: '#{value}'")
      {:ok, _} = Producer.send(p_pid, value)
      {:ok, _} = Producer.flush(p_pid)
    end

    defp do_consume({:ok, %Record{value: "4"}}, _c_pid, _p_pid) do
      raise("simulating an unrecoverable error")
    end

    defp do_consume({:ok, record}, _c_pid, p_pid) do
      IO.inspect("Consumed value: '#{record.value}'")
      produce(p_pid, to_string(String.to_integer(record.value) + 1))
    end

    defp do_consume({:error, _msg}, c_pid, _p_pid), do: Process.exit(c_pid, :kill)
    defp do_consume({:stop_next, _}, _c_pid, _p_pid), do: nil

    def keep_consuming(c_pid, p_pid, min_freq_ms \\ 1000) do
      Consumer.stream_each(c_pid, fn result ->
        Process.sleep(min_freq_ms)
        do_consume(result, c_pid, p_pid)
      end)
    end
  end
end
```

If you were to run this example, you'd see that once the pairs of producer and consumer start, the initial value `"1"` is sent, and each pair will keep incrementing the value and sending it. Once the value `"4"` is reached, the ping consumer simulates an unrecoverable error, which will crash this process and the supervisor will restart it:

```console
MIX_ENV=prod iex -S mix
iex(1)> c "examples/ping_pong.exs"
[App, App.PingPong]
iex(2)> App.start()
{#PID<0.220.0>, #PID<0.222.0>, #PID<0.223.0>}


"Bootstrapping 'ping' by sending '1'"
"Consumed value: '1'"
"Producing value: '2'"
"Consumed value: '2'"
"Producing value: '3'"
"Consumed value: '3'"
"Producing value: '4'"
iex(3)>
01:31:46.911 [error] Task #PID<0.222.0> started from #PID<0.207.0> terminating
** (RuntimeError) simulating an unrecoverable error
    examples/ping_pong.exs:46: App.PingPong.do_consume/3
    (fluvio 0.2.0) lib/fluvio/consumer.ex:172: Fluvio.Consumer.stream_each/3
    (elixir 1.14.0) lib/task/supervised.ex:89: Task.Supervised.invoke_mfa/2
    (stdlib 4.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Function: #Function<0.131083353/0 in App.start/3>
    Args: []
"Bootstrapping 'ping' by sending '1'"
"Consumed value: '1'"
"Producing value: '2'"
"Consumed value: '2'"
```