defmodule Fluvio.Consumer do
@moduledoc """
PartitionConsumer functionalities. This module is backed by a GenServer.
"""
use GenServer
alias Fluvio.Record
defmodule Config do
@moduledoc false
@enforce_keys [:topic]
defstruct topic: nil,
partition: 0,
offset: [from_end: 0],
max_bytes: nil,
smartmodule_path: nil,
smartmodule_context_data: nil,
smartmodule_context_data_acc: nil
@typedoc """
Fluvio.Consumer.Config
"""
@type t() :: %__MODULE__{
topic: bitstring(),
partition: integer(),
offset: [{:from_beginning | :from_end | :absolute, integer()}],
max_bytes: integer(),
smartmodule_path: bitstring(),
smartmodule_context_data: list(),
smartmodule_context_data_acc: integer()
}
end
defmodule GenState do
@moduledoc false
defstruct native_mod: nil,
fluvio_ref: nil,
consumer_ref: nil,
config: nil
end
@impl true
def init(%GenState{} = gen_state) do
{:ok, gen_state}
end
@impl true
def handle_call({:stream_unfold, timeout_iter_ms}, _from, state) do
consumer_pid = self()
unfold =
Stream.unfold(nil, fn _value ->
do_unfold(state.native_mod, state.consumer_ref, timeout_iter_ms, consumer_pid)
end)
{:reply, unfold, state}
end
@impl true
def handle_call({:stream_next, timeout_iter_ms}, _from, state) do
result = do_stream_next(state.native_mod, state.consumer_ref, timeout_iter_ms)
{:reply, result, state}
end
defp new_consumer(
native_mod,
fluvio_ref,
config = %Config{}
)
when is_reference(fluvio_ref) do
native_mod.new_consumer(
fluvio_ref,
config.topic,
config.partition,
config.offset |> hd |> elem(0),
config.offset |> hd |> elem(1),
config.max_bytes,
if is_bitstring(config.smartmodule_path) do
Path.expand(config.smartmodule_path)
else
nil
end,
config.smartmodule_context_data,
config.smartmodule_context_data_acc
)
end
@doc """
Starts a Fluvio PartitionConsumer GenServer process linked to the current process.
"""
@spec start_link(map(), list(), any()) :: GenServer.on_start()
def start_link(config, gen_opts \\ [], native_mod \\ Fluvio.Native) do
config = struct!(Config, config)
with {:ok, fluvio_ref} <- native_mod.connect(),
{:ok, consumer_ref} <- new_consumer(native_mod, fluvio_ref, config) do
GenServer.start_link(
__MODULE__,
%GenState{
native_mod: native_mod,
fluvio_ref: fluvio_ref,
consumer_ref: consumer_ref,
config: config
},
gen_opts
)
end
end
defp do_unfold(native_mod, consumer_ref, timeout_iter_ms, consumer_pid) do
case do_stream_next(native_mod, consumer_ref, timeout_iter_ms) do
{:stop_next, :stop_next} ->
do_unfold(native_mod, consumer_ref, timeout_iter_ms, consumer_pid)
{:ok, record} ->
{record, record}
{:error, _msg} ->
nil
end
end
@doc """
Unfolds the infinite stream lazily.
timeout_iter_ms is a low-level iteration timeout in ms to avoid blocking the
DirtyIO Rust NIF for too long.
"""
@spec stream_unfold(pid(), integer()) :: Stream
def stream_unfold(pid, timeout_iter_ms \\ 2000) when is_pid(pid) do
GenServer.call(pid, {:stream_unfold, timeout_iter_ms})
end
defp do_stream_next(native_mod, consumer_ref, timeout_iter_ms) do
case native_mod.next(consumer_ref, timeout_iter_ms) do
{offset, partition, key, value, timestamp} ->
{:ok,
%Record{
offset: offset,
partition: partition,
key: key,
value: value,
timestamp: timestamp
}}
:stop_next ->
{:stop_next, :stop_next}
{:error, msg} ->
{:error, msg}
end
end
@doc """
Gets next record of the stream. If you want to traverse continuously the stream, you
can call `stream_each/3`, or alternatively recursively call this function.
"""
@spec stream_next(pid(), integer()) ::
{:ok, %Record{}} | {:stop_next, :stop_next} | {:error, bitstring()}
def stream_next(pid, timeout_iter_ms \\ 2000) when is_pid(pid) do
GenServer.call(pid, {:stream_next, timeout_iter_ms})
end
defp loop_fn({:stop_next, :stop_next}, _func), do: nil
defp loop_fn(result, func), do: func.(result)
@doc """
Invokes the given func/1 for each result in the stream continuously.
"""
@spec stream_each(pid(), fun(), integer()) :: :ok
def stream_each(pid, func, timeout_iter \\ 2) do
loop_fn(stream_next(pid, timeout_iter), func)
stream_each(pid, func)
end
end