Skip to main content

lib/container/kafka_container.ex

defmodule TestcontainerEx.KafkaContainer do
  @moduledoc """
  Provides functionality for creating and managing Kafka container configurations.

  This implementation uses the official `apache/kafka` Docker image which runs in KRaft mode
  by default (no Zookeeper required). This makes Kafka deployment significantly simpler.

  ## Example

      config = KafkaContainer.new()
      {:ok, container} = TestcontainerEx.start_container(config)

      # Get the bootstrap server address
      bootstrap_servers = KafkaContainer.bootstrap_servers(container)

  ## With automatic topic creation

      config =
        KafkaContainer.new()
        |> KafkaContainer.with_topics(["my-topic", "other-topic"])

      {:ok, container} = TestcontainerEx.start_container(config)

  ## Note on Port Binding

  This implementation uses a randomly selected fixed host port (between 29000-29999) for
  the Kafka listener. This is necessary because the apache/kafka image requires knowing
  the advertised listener address at startup time, before the container's dynamic port
  mapping is known.

  If you need to use a specific port, you can set it with `with_kafka_port/2`.
  """

  alias TestcontainerEx.Container.Builder
  alias TestcontainerEx.Container.Config
  alias TestcontainerEx.Engine
  alias TestcontainerEx.KafkaContainer
  alias TestcontainerEx.LogWaitStrategy

  use TestcontainerEx.ContainerConfig

  @default_image "apache/kafka"
  @default_tag "3.9.0"
  @default_image_with_tag "#{@default_image}:#{@default_tag}"
  @default_internal_kafka_port 9092
  @default_controller_port 9093
  @default_node_id 1
  @default_wait_timeout 60_000
  @default_cluster_id "4L6g3nShT-eMCtK--X86sw"

  @type t :: %__MODULE__{}

  @enforce_keys [
    :image,
    :kafka_port,
    :internal_kafka_port,
    :controller_port,
    :node_id,
    :cluster_id,
    :wait_timeout
  ]
  defstruct [
    :image,
    :kafka_port,
    :internal_kafka_port,
    :controller_port,
    :node_id,
    :cluster_id,
    :wait_timeout,
    name: nil,
    topics: [],
    check_image: @default_image,
    reuse: false
  ]

  @doc """
  Creates a new `KafkaContainer` struct with default configurations.

  A random port between 29000-29999 is selected for the Kafka listener.
  """
  def new do
    # Select a random port in a high range to minimize conflicts
    kafka_port = Enum.random(29_000..29_999)

    %__MODULE__{
      image: @default_image_with_tag,
      kafka_port: kafka_port,
      internal_kafka_port: @default_internal_kafka_port,
      controller_port: @default_controller_port,
      node_id: @default_node_id,
      cluster_id: @default_cluster_id,
      wait_timeout: @default_wait_timeout,
      topics: []
    }
  end

  @doc """
  Overrides the default image used for the Kafka container.
  """
  def with_image(%__MODULE__{} = config, image) when is_binary(image) do
    %{config | image: image}
  end

  @doc """
  Overrides the host port used for the Kafka container.

  This port will be used on the host machine and also as the advertised listener port.
  """
  def with_kafka_port(%__MODULE__{} = config, kafka_port) when is_integer(kafka_port) do
    %{config | kafka_port: kafka_port}
  end

  @doc """
  Overrides the default controller port used for the Kafka container.
  """
  def with_controller_port(%__MODULE__{} = config, controller_port)
      when is_integer(controller_port) do
    %{config | controller_port: controller_port}
  end

  @doc """
  Overrides the default node id used for the Kafka container.
  """
  def with_node_id(%__MODULE__{} = config, node_id) when is_integer(node_id) do
    %{config | node_id: node_id}
  end

  @doc """
  Overrides the default cluster id used for the Kafka container.
  """
  def with_cluster_id(%__MODULE__{} = config, cluster_id) when is_binary(cluster_id) do
    %{config | cluster_id: cluster_id}
  end

  @doc """
  Overrides the default wait timeout used for the Kafka container.
  """
  def with_wait_timeout(%__MODULE__{} = config, wait_timeout) when is_integer(wait_timeout) do
    %{config | wait_timeout: wait_timeout}
  end

  @doc """
  Sets the topics to be created automatically when the container starts.

  ## Example

      config =
        KafkaContainer.new()
        |> KafkaContainer.with_topics(["my-topic", "other-topic"])
  """
  def with_topics(%__MODULE__{} = config, topics) when is_list(topics) do
    %{config | topics: topics}
  end

  @doc """
  Sets the container name.
  """
  @spec with_name(t(), String.t()) :: t()
  def with_name(%__MODULE__{} = config, name) when is_binary(name) do
    %__MODULE__{config | name: name}
  end

  @doc """
  Returns the bootstrap servers string for connecting to the Kafka container.
  """
  def bootstrap_servers(%Config{} = container) do
    port = TestcontainerEx.get_port(container, @default_internal_kafka_port)
    "#{TestcontainerEx.get_host(container)}:#{port}"
  end

  @doc """
  Returns the port on the host machine where the Kafka container is listening.
  """
  def port(%Config{} = container),
    do: TestcontainerEx.get_port(container, @default_internal_kafka_port)

  defimpl Builder do
    @impl true
    @spec build(KafkaContainer.t()) :: Config.t()
    def build(%KafkaContainer{} = config) do
      host = TestcontainerEx.get_host()

      Config.new(config.image)
      |> Config.with_fixed_port(config.internal_kafka_port, config.kafka_port)
      |> with_kraft_config(config, host)
      |> Config.with_reuse(config.reuse)
      |> Config.with_waiting_strategy(
        LogWaitStrategy.new(
          ~r/Kafka Server started/,
          config.wait_timeout,
          1000
        )
      )
      |> then(fn cfg ->
        if config.name, do: Config.with_name(cfg, config.name), else: cfg
      end)
    end

    @doc """
    After the container starts, create any specified topics.
    """
    @impl true
    def after_start(config, container, conn) do
      # Create topics if specified
      Enum.each(config.topics, fn topic ->
        create_topic(container.container_id, conn, topic, config.internal_kafka_port)
      end)

      :ok
    end

    # KRaft mode environment configuration
    defp with_kraft_config(container, config, host) do
      container
      |> Config.with_environment(:KAFKA_NODE_ID, "#{config.node_id}")
      |> Config.with_environment(:KAFKA_PROCESS_ROLES, "broker,controller")
      |> Config.with_environment(:KAFKA_CONTROLLER_LISTENER_NAMES, "CONTROLLER")
      |> Config.with_environment(:KAFKA_INTER_BROKER_LISTENER_NAME, "PLAINTEXT")
      |> Config.with_environment(
        :KAFKA_LISTENERS,
        "PLAINTEXT://:#{config.internal_kafka_port},CONTROLLER://:#{config.controller_port}"
      )
      |> Config.with_environment(
        :KAFKA_LISTENER_SECURITY_PROTOCOL_MAP,
        "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      )
      |> Config.with_environment(
        :KAFKA_CONTROLLER_QUORUM_VOTERS,
        "#{config.node_id}@localhost:#{config.controller_port}"
      )
      |> Config.with_environment(
        :KAFKA_ADVERTISED_LISTENERS,
        "PLAINTEXT://#{host}:#{config.kafka_port}"
      )
      |> Config.with_environment(:KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR, "1")
      |> Config.with_environment(:KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR, "1")
      |> Config.with_environment(:KAFKA_TRANSACTION_STATE_LOG_MIN_ISR, "1")
      |> Config.with_environment(:KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS, "0")
    end

    defp create_topic(container_id, conn, topic, kafka_port) do
      cmd = [
        "/opt/kafka/bin/kafka-topics.sh",
        "--bootstrap-server",
        "localhost:#{kafka_port}",
        "--create",
        "--topic",
        topic,
        "--partitions",
        "1",
        "--replication-factor",
        "1",
        "--if-not-exists"
      ]

      result =
        case Engine.Api.start_exec(container_id, cmd, conn) do
          {:ok, exec_id} ->
            wait_for_exec(exec_id, conn)

          {:error, reason} ->
            {:error, reason}
        end

      # Wait for leader election to complete
      Process.sleep(2000)
      result
    end

    defp wait_for_exec(exec_id, conn) do
      case Engine.Api.inspect_exec(exec_id, conn) do
        {:ok, %{running: true}} ->
          Process.sleep(100)
          wait_for_exec(exec_id, conn)

        {:ok, %{running: false, exit_code: 0}} ->
          :ok

        {:ok, %{running: false, exit_code: code}} ->
          {:error, {:exec_failed, code}}

        {:error, reason} ->
          {:error, reason}
      end
    end
  end
end