lib/altworx_book/kafka/raw_topic.ex

defmodule AltworxBook.Kafka.RawTopic do
  @moduledoc """
  Kafka raw topic interaction helper functions.
  """

  use AltworxBook.Altworx

  alias AltworxBook.Altworx
  alias AltworxBook.Kafka
  alias Normalizer.Api.RawTopic

  @type message :: %{timestamp: Altworx.timestamp(), value: String.t()}

  @doc """
  Return list of messages from raw topic.
  """
  def list_messages(topic_name, since, count, config \\ []) do
    rpc_call(Normalizer.inspect_raw_topic(topic_name, since, count, config))
  end

  @doc """
  Return message from topic with given offset.
  """
  def get_message(topic_name, offset) do
    rpc_call(RawTopic.get_message(topic_name, offset))
  end

  @doc """
  List all raw topics.
  """
  @spec list :: [String.t()]
  def list do
    rt_topic_regex = ~r(^N[0-9]+_rt_.*)

    Kafka.list_topics()
    |> Enum.reject(fn topic_name -> Regex.match?(rt_topic_regex, topic_name) end)
  end
end