defmodule AltworxBook.Kafka.RawTopic do
@moduledoc """
Kafka raw topic interaction helper functions.
"""
use AltworxBook.Altworx
alias AltworxBook.Altworx
alias AltworxBook.Kafka
alias Normalizer.Api.RawTopic
@type message :: %{timestamp: Altworx.timestamp(), value: String.t()}
@doc """
Return list of messages from raw topic.
"""
def list_messages(topic_name, since, count, config \\ []) do
rpc_call(Normalizer.inspect_raw_topic(topic_name, since, count, config))
end
@doc """
Return message from topic with given offset.
"""
def get_message(topic_name, offset) do
rpc_call(RawTopic.get_message(topic_name, offset))
end
@doc """
List all raw topics.
"""
@spec list :: [String.t()]
def list do
rt_topic_regex = ~r(^N[0-9]+_rt_.*)
Kafka.list_topics()
|> Enum.reject(fn topic_name -> Regex.match?(rt_topic_regex, topic_name) end)
end
end