lib/elsa/fetch.ex

defmodule Elsa.Fetch do
  @moduledoc """
  Provides functions for doing one-off retrieval of
  messages from the Kafka cluster.
  """

  @doc """
  A simple interface for quickly retrieving a message set from
  the cluster on the given topic. Partition and offset may be
  specified as keyword options, defaulting to 0 in both cases if
  either is not supplied by the caller.
  """
  @spec fetch(keyword(), String.t(), keyword()) :: {:ok, integer(), [tuple()]} | {:error, term()}
  def fetch(endpoints, topic, opts \\ []) do
    partition = Keyword.get(opts, :partition, 0)

    offset =
      Keyword.get_lazy(opts, :offset, fn ->
        {:ok, offset} = :brod.resolve_offset(endpoints, topic, partition, :earliest)
        offset
      end)

    case :brod.fetch(endpoints, topic, partition, offset) do
      {:ok, {partition_offset, messages}} ->
        transformed_messages = Enum.map(messages, &Elsa.Message.new(&1, topic: topic, partition: partition))
        {:ok, partition_offset, transformed_messages}

      {:error, reason} ->
        {:error, reason}
    end
  end

  @doc """
  Retrieves all messages on a given topic across all partitions by default.
  Evaluates lazily, returning a `Stream` resource containing the messages.
  By default the starting offset is the earliest message offset and fetching
  continues until the latest offset at the time the stream is instantiated.
  Refine the scope of stream fetch by passing the `start_offset` and `end_offset`
  keyword arguments.
  """
  @spec fetch_stream(keyword(), String.t(), keyword()) :: Enumerable.t()
  def fetch_stream(endpoints, topic, opts \\ []) do
    partitions =
      case Keyword.get(opts, :partition) do
        nil ->
          0..(Elsa.Util.partition_count!(endpoints, topic) - 1)

        partition ->
          [partition]
      end

    Enum.reduce(partitions, [], fn partition, acc ->
      partition_stream = fetch_partition_stream(endpoints, topic, partition, opts)
      [partition_stream | acc]
    end)
    |> Stream.concat()
  end

  @doc """
  Retrieves a stream of messages for which the supplied function evaluates
  to `true`. Search can be limited by an offset which is passed through to
  the underlying fetch_stream/3 call retrieving the messages to search.
  All options for fetch_stream/3 are respected.
  """
  @spec search(keyword(), String.t(), function(), keyword()) :: Enumerable.t()
  def search(endpoints, topic, search_function, opts \\ []) do
    all_messages = fetch_stream(endpoints, topic, opts)

    Stream.filter(all_messages, fn message ->
      search_function.(message)
    end)
  end

  @doc """
  Retrieves a stream of messages where the keys contains the supplied search
  string. Search can be further limited by an offset which is passed through to the
  underlying fetch_stream/3 call retrieving the messages to search. All options
  for fetch_stream/3 are respected.
  """
  @spec search_keys(keyword(), String.t(), String.t(), keyword()) :: Enumerable.t()
  def search_keys(endpoints, topic, search_term, opts \\ []) do
    search_by_keys = fn %Elsa.Message{key: key} -> String.contains?(key, search_term) end

    search(endpoints, topic, search_by_keys, opts)
  end

  @doc """
  Retrieves a stream of messages where the values contains the supplied search
  string. Search can be further limited by an offset which is passed through to the
  underlying fetch_stream/3 call retrieving the messages to search. All options
  for fetch_stream/3 are respected.
  """
  @spec search_values(keyword(), String.t(), String.t(), keyword()) :: Enumerable.t()
  def search_values(endpoints, topic, search_term, opts \\ []) do
    search_by_values = fn %Elsa.Message{value: value} -> String.contains?(value, search_term) end

    search(endpoints, topic, search_by_values, opts)
  end

  defp fetch_partition_stream(endpoints, topic, partition, opts) do
    Stream.resource(
      fn ->
        start_offset = retrieve_offset(opts, :start_offset, endpoints, topic, partition)
        end_offset = retrieve_offset(opts, :end_offset, endpoints, topic, partition)

        {start_offset, end_offset}
      end,
      fn {current_offset, end_offset} ->
        case current_offset < end_offset do
          true ->
            {:ok, _offset, messages} = fetch(endpoints, topic, partition: partition, offset: current_offset)
            next_offset = current_offset + Enum.count(messages)
            {messages, {next_offset, end_offset}}

          false ->
            {:halt, {current_offset, end_offset}}
        end
      end,
      fn offset -> offset end
    )
  end

  defp retrieve_offset(opts, :start_offset, endpoints, topic, partition) do
    Keyword.get_lazy(opts, :start_offset, fn ->
      {:ok, start_offset} = :brod.resolve_offset(endpoints, topic, partition, :earliest)
      start_offset
    end)
  end

  defp retrieve_offset(opts, :end_offset, endpoints, topic, partition) do
    Keyword.get_lazy(opts, :end_offset, fn ->
      {:ok, end_offset} = :brod.resolve_offset(endpoints, topic, partition, :latest)
      end_offset
    end)
  end
end