lib/elsa/topic.ex

defmodule Elsa.Topic do
  @moduledoc """
  Provides functions for managing and interacting with topics in the Kafka cluster.
  """
  import Elsa.Util, only: [with_connection: 3, reformat_endpoints: 1]
  import Record, only: [defrecord: 2, extract: 2]

  alias Elsa.Util

  defrecord :kpro_rsp, extract(:kpro_rsp, from_lib: "kafka_protocol/include/kpro.hrl")

  @doc """
  Returns a list of all topics managed by the cluster as tuple of topic name and
  number of partitions.
  """
  @spec list(keyword) :: {:ok, [{String.t(), integer}]} | {:error, term}
  def list(endpoints) do
    {:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), :all)

    topics =
      metadata.topics
      |> Enum.map(fn topic_metadata ->
        {topic_metadata.name, Enum.count(topic_metadata.partitions)}
      end)

    {:ok, topics}
  rescue
    error -> {:error, error}
  end

  @doc """
  Confirms or denies the existence of a topic managed by the cluster.
  """
  @spec exists?(keyword(), String.t()) :: boolean()
  def exists?(endpoints, topic) do
    with {:ok, topics} <- list(endpoints) do
      Enum.any?(topics, fn {t, _} -> t == topic end)
    end
  end

  @doc """
  Creates the supplied topic within the cluster. Sets the number of desired
  partitions and replication factor for the topic based on the optional
  keyword list. If the optional configs are not specified by the caller, the
  number of partitions and replicas defaults to 1.
  """
  @spec create(keyword(), String.t(), keyword()) :: :ok | {:error, term()}
  def create(endpoints, topic, opts \\ []) do
    with_connection(endpoints, :controller, fn connection ->
      config =
        opts
        |> Keyword.get(:config, [])
        |> Enum.map(fn {key, val} -> %{name: to_string(key), value: val} end)

      create_topic_args = %{
        name: topic,
        num_partitions: Keyword.get(opts, :partitions, 1),
        replication_factor: Keyword.get(opts, :replicas, 1),
        assignments: [],
        configs: config
      }

      version = Util.get_api_version(connection, :create_topics)
      topic_request = :kpro_req_lib.create_topics(version, [create_topic_args], %{timeout: 5_000})

      send_request(connection, topic_request, 5_000)
    end)
  end

  @doc """
  Deletes the supplied topic from the cluster.
  """
  @spec delete(keyword(), String.t()) :: :ok | {:error, term()}
  def delete(endpoints, topic) do
    with_connection(endpoints, :controller, fn connection ->
      version = Util.get_api_version(connection, :delete_topics)
      topic_request = :kpro_req_lib.delete_topics(version, [topic], %{timeout: 5_000})

      send_request(connection, topic_request, 5_000)
    end)
  end

  defp send_request(connection, request, timeout) do
    case :kpro.request_sync(connection, request, timeout) do
      {:ok, response} -> check_response(response)
      result -> result
    end
  end

  defp check_response(response) do
    message = kpro_rsp(response, :msg)

    response_key =
      case Map.has_key?(message, :topics) do
        true -> :topics
        false -> :responses
      end

    case Enum.find(message[response_key], fn response -> response.error_code != :no_error end) do
      nil -> :ok
      response -> {:error, {response.error_code, resp_error_msg(response, response_key)}}
    end
  end

  defp resp_error_msg(response, :topics), do: response.error_message
  defp resp_error_msg(_response, :responses), do: :delete_topic_error
end