Skip to main content

lib/wait_strategy/log_wait_strategy.ex

# SPDX-License-Identifier: MIT
defmodule TestcontainerEx.LogWaitStrategy do
  @moduledoc """
  Considers the container as ready as soon as a specific log message is detected in the container's log stream.
  """
  require Logger

  @retry_delay 200
  defstruct [:log_regex, :timeout, retry_delay: @retry_delay]

  # Public interface

  @doc """
  Creates a new LogWaitStrategy.
  This strategy waits until a specific log message, matching the provided regex, appears in the container's log.
  """
  def new(log_regex, timeout \\ 5000, retry_delay \\ @retry_delay) do
    %__MODULE__{log_regex: log_regex, timeout: timeout, retry_delay: retry_delay}
  end

  # Private functions and implementations

  defimpl TestcontainerEx.WaitStrategy do
    alias TestcontainerEx.Engine

    @impl true
    def wait_until_container_is_ready(wait_strategy, container, conn) do
      started_at = get_current_time_millis()
      wait_for_log_message(wait_strategy, container.container_id, conn, started_at)
    end

    # Main loop for waiting strategy
    defp wait_for_log_message(wait_strategy, container_id, conn, start_time) do
      if reached_timeout?(start_time, wait_strategy.timeout) do
        {:error, fail_with_logs(wait_strategy, container_id, conn, start_time), wait_strategy}
      else
        process_log(wait_strategy, container_id, conn, start_time)
      end
    end

    defp process_log(wait_strategy, container_id, conn, start_time) do
      attempt = 0
      do_process_log(wait_strategy, container_id, conn, start_time, attempt)
    end

    defp do_process_log(wait_strategy, container_id, conn, start_time, attempt) do
      case log_matches?(container_id, wait_strategy.log_regex, conn) do
        true ->
          elapsed_ms = get_current_time_millis() - start_time

          :telemetry.execute(
            [:testcontainer_ex, :wait_strategy, :poll],
            %{attempt: attempt + 1, elapsed_ms: elapsed_ms},
            %{strategy: :log_wait, container_id: container_id, result: :ok}
          )

          :ok

        false ->
          elapsed_ms = get_current_time_millis() - start_time

          :telemetry.execute(
            [:testcontainer_ex, :wait_strategy, :poll],
            %{attempt: attempt + 1, elapsed_ms: elapsed_ms},
            %{strategy: :log_wait, container_id: container_id, result: {:error, :log_not_matched}}
          )

          log_retry_message(container_id, wait_strategy.log_regex, wait_strategy.retry_delay)
          :timer.sleep(wait_strategy.retry_delay)
          do_process_log(wait_strategy, container_id, conn, start_time, attempt + 1)
      end
    end

    defp log_matches?(container_id, log_regex, conn) do
      case Engine.Api.stdout_logs(container_id, conn) do
        {:ok, log_output} -> Regex.match?(log_regex, log_output)
        _ -> false
      end
    catch
      # If the log stream crashes (e.g. hackney chunked encoding error),
      # treat as not matched and retry.
      :error, _ -> false
    end

    defp get_current_time_millis, do: System.monotonic_time(:millisecond)

    defp reached_timeout?(start_time, timeout),
      do: get_current_time_millis() - start_time > timeout

    defp fail_with_logs(_wait_strategy, container_id, conn, start_time) do
      elapsed_ms = get_current_time_millis() - start_time

      last_logs =
        case Engine.Api.stdout_logs(container_id, conn) do
          {:ok, log_output} -> String.split(log_output, "\n", trim: true) |> Enum.take(-50)
          _ -> []
        end

      TestcontainerEx.Error.wait_strategy_failed(:log_wait, elapsed_ms, last_logs)
    end

    defp log_retry_message(container_id, log_regex, delay) do
      Logger.debug(
        "Logs in container #{container_id} didn't match regex #{inspect(log_regex)}, retrying in #{delay}ms."
      )
    end
  end
end