Skip to main content

lib/quack_db/profile.ex

defmodule QuackDB.Profile.Operator do
  @moduledoc "A DuckDB profiling operator node."

  defstruct [
    :cpu_time,
    :cumulative_cardinality,
    :cumulative_rows_scanned,
    :extra_info,
    :operator_cardinality,
    :operator_name,
    :operator_rows_scanned,
    :operator_timing,
    :operator_type,
    :result_set_size,
    :system_peak_buffer_memory,
    :system_peak_temp_dir_size,
    children: []
  ]

  @type t :: %__MODULE__{
          operator_name: String.t() | nil,
          operator_type: String.t() | nil,
          operator_timing: number() | nil,
          cpu_time: number() | nil,
          operator_cardinality: non_neg_integer() | nil,
          operator_rows_scanned: non_neg_integer() | nil,
          cumulative_cardinality: non_neg_integer() | nil,
          cumulative_rows_scanned: non_neg_integer() | nil,
          result_set_size: non_neg_integer() | nil,
          extra_info: map(),
          children: [t()]
        }
end

defmodule QuackDB.Profile do
  @moduledoc """
  DuckDB query profiling helpers.

  `analyze/4` runs `EXPLAIN (ANALYZE, FORMAT json)` and decodes DuckDB's
  profile with Elixir's built-in `JSON.decode/3`. Known DuckDB profile keys are
  decoded with `String.to_existing_atom/1` and loaded into structs. DuckDB's
  open-ended `extra_info` maps keep their own keys.

  Use `flatten/1`, `slowest/2`, and `report/2` when you want a profiler-style
  operator view.
  """

  alias QuackDB.Profile.Operator

  defstruct [
    :all_optimizers,
    :attach_load_storage_latency,
    :attach_replay_wal_latency,
    :blocked_thread_time,
    :checkpoint_latency,
    :commit_local_storage_latency,
    :cpu_time,
    :cumulative_cardinality,
    :cumulative_optimizer_timing,
    :cumulative_rows_scanned,
    :extra_info,
    :latency,
    :physical_planner,
    :physical_planner_column_binding,
    :physical_planner_create_plan,
    :physical_planner_resolve_types,
    :planner,
    :planner_binding,
    :query_name,
    :result_set_size,
    :rows_returned,
    :system_peak_buffer_memory,
    :system_peak_temp_dir_size,
    :total_bytes_read,
    :total_bytes_written,
    :total_memory_allocated,
    :waiting_to_attach_latency,
    :wal_replay_entry_count,
    :write_to_wal_latency,
    children: [],
    optimizers: %{}
  ]

  @type t :: %__MODULE__{
          query_name: String.t() | nil,
          latency: number() | nil,
          cpu_time: number() | nil,
          rows_returned: non_neg_integer() | nil,
          result_set_size: non_neg_integer() | nil,
          cumulative_cardinality: non_neg_integer() | nil,
          cumulative_rows_scanned: non_neg_integer() | nil,
          children: [Operator.t()],
          optimizers: map()
        }

  @typedoc "A flattened DuckDB operator row with its tree path and timing share."
  @type operator_row :: %{
          path: [non_neg_integer()],
          name: String.t() | nil,
          type: String.t() | nil,
          timing: number() | nil,
          timing_percent: float() | nil,
          cpu_time: number() | nil,
          cardinality: non_neg_integer() | nil,
          rows_scanned: non_neg_integer() | nil,
          cumulative_cardinality: non_neg_integer() | nil,
          cumulative_rows_scanned: non_neg_integer() | nil,
          result_set_size: non_neg_integer() | nil,
          extra_info: map()
        }

  @doc "Runs `EXPLAIN (FORMAT json)` and returns a decoded DuckDB plan profile."
  @spec explain(DBConnection.conn() | module(), iodata() | term(), [term()], keyword()) ::
          {:ok, t()} | {:error, Exception.t()}
  def explain(connection, statement, params \\ [], options \\ []) do
    {statement, params, options} = normalize_arguments(statement, params, options)

    with {:ok, sql, params} <- profile_statement(connection, statement, params, analyze: false),
         {:ok, result} <- QuackDB.query(connection, sql, params, options),
         {:ok, profile} <- decode_result(result) do
      {:ok, build_profile(profile)}
    end
  end

  @doc "Runs `EXPLAIN (FORMAT json)` and returns a decoded DuckDB plan profile, raising on errors."
  @spec explain!(DBConnection.conn() | module(), iodata() | term(), [term()], keyword()) :: t()
  def explain!(connection, statement, params \\ [], options \\ []) do
    case explain(connection, statement, params, options) do
      {:ok, profile} -> profile
      {:error, error} -> raise error
    end
  end

  @doc "Runs `EXPLAIN (ANALYZE, FORMAT json)` and returns a decoded DuckDB query profile."
  @spec analyze(DBConnection.conn() | module(), iodata() | term(), [term()], keyword()) ::
          {:ok, t()} | {:error, Exception.t()}
  def analyze(connection, statement, params \\ [], options \\ []) do
    {statement, params, options} = normalize_arguments(statement, params, options)

    with {:ok, sql, params} <- profile_statement(connection, statement, params, analyze: true),
         {:ok, result} <- QuackDB.query(connection, sql, params, options),
         {:ok, profile} <- decode_result(result) do
      {:ok, build_profile(profile)}
    end
  end

  @doc "Runs `EXPLAIN (ANALYZE, FORMAT json)` and returns a decoded DuckDB query profile, raising on errors."
  @spec analyze!(DBConnection.conn() | module(), iodata() | term(), [term()], keyword()) :: t()
  def analyze!(connection, statement, params \\ [], options \\ []) do
    case analyze(connection, statement, params, options) do
      {:ok, profile} -> profile
      {:error, error} -> raise error
    end
  end

  @doc "Converts DuckDB's profile tree into profiler-style operator rows."
  @spec flatten(t()) :: [operator_row()]
  def flatten(%__MODULE__{} = profile) do
    total = timing_total(profile)

    profile.children
    |> Enum.with_index()
    |> Enum.flat_map(fn {operator, index} -> flatten_operator(operator, [index], total) end)
  end

  @doc "Returns the slowest operators by `operator_timing`."
  @spec slowest(t(), pos_integer()) :: [operator_row()]
  def slowest(%__MODULE__{} = profile, limit \\ 10) when is_integer(limit) and limit > 0 do
    profile
    |> flatten()
    |> Enum.sort_by(&(&1.timing || 0), :desc)
    |> Enum.take(limit)
  end

  @doc "Formats a compact text report for humans."
  @spec report(t(), keyword()) :: String.t()
  def report(%__MODULE__{} = profile, options \\ []) do
    limit = Keyword.get(options, :limit, 10)
    rows = slowest(profile, limit)

    [
      "DuckDB query profile\n\n",
      metric_line("Latency", seconds(profile.latency)),
      metric_line("CPU time", seconds(profile.cpu_time)),
      metric_line("Rows scanned", integer(profile.cumulative_rows_scanned)),
      metric_line("Rows returned", integer(profile.rows_returned)),
      metric_line("Peak memory", bytes(profile.system_peak_buffer_memory)),
      "\n",
      "% time   time      rows      scanned   operator\n",
      Enum.map(rows, &operator_line/1)
    ]
    |> IO.iodata_to_binary()
  end

  defp build_profile(%{} = decoded) do
    profile_fields = Map.keys(%__MODULE__{}) -- [:__struct__, :children, :optimizers]

    decoded
    |> Map.take(profile_fields)
    |> Map.put(:children, Enum.map(Map.get(decoded, :children, []), &build_operator/1))
    |> Map.put(:optimizers, optimizer_metrics(decoded))
    |> then(&struct!(__MODULE__, &1))
  end

  defp build_operator(%{} = decoded) do
    operator_fields = Map.keys(%Operator{}) -- [:__struct__, :children]

    decoded
    |> Map.take(operator_fields)
    |> Map.put(:children, Enum.map(Map.get(decoded, :children, []), &build_operator/1))
    |> then(&struct!(Operator, &1))
  end

  defp normalize_arguments(statement, params, []) when is_list(params) do
    if Keyword.keyword?(params) do
      {statement, [], params}
    else
      {statement, params, []}
    end
  end

  defp normalize_arguments(statement, params, options), do: {statement, params, options}

  defp profile_statement(connection, statement, params, options) do
    case ecto_query_statement(connection, statement) do
      {:ok, sql, ecto_params} ->
        {:ok, QuackDB.SQL.explain(sql, Keyword.put(options, :format, :json)), ecto_params}

      :error ->
        {:ok, QuackDB.SQL.explain(statement, Keyword.put(options, :format, :json)), params}
    end
  end

  defp ecto_query_statement(repo, queryable) when is_atom(repo) and not is_binary(queryable) do
    ecto_sql = Module.concat([Ecto, Adapters, SQL])

    if Code.ensure_loaded?(ecto_sql) and function_exported?(repo, :__adapter__, 0) do
      case apply(ecto_sql, :to_sql, [:all, repo, queryable]) do
        {sql, params} -> {:ok, sql, params}
        _other -> :error
      end
    else
      :error
    end
  rescue
    _error in [ArgumentError, FunctionClauseError, RuntimeError, UndefinedFunctionError] -> :error
  end

  defp ecto_query_statement(_connection, _statement), do: :error

  defp decode_result(%QuackDB.Result{columns: columns, rows: [row | _]}) when is_list(row) do
    case explain_json_value(columns || [], row) do
      value when is_binary(value) -> decode_json(value)
      _value -> {:error, %QuackDB.Error{message: "DuckDB profile output did not include JSON"}}
    end
  end

  defp decode_result(%QuackDB.Result{}) do
    {:error, %QuackDB.Error{message: "DuckDB profile output did not include JSON"}}
  end

  defp explain_json_value(columns, row) do
    case Enum.find_index(columns, &(&1 == "explain_value")) do
      nil -> fallback_explain_value(row)
      index -> Enum.at(row, index)
    end
  end

  defp fallback_explain_value([value]), do: value
  defp fallback_explain_value([_key, value | _rest]), do: value
  defp fallback_explain_value([value | _rest]), do: value
  defp fallback_explain_value([]), do: nil

  defp decode_json(json) do
    case JSON.decode(json, [], profile_decoders()) do
      {[profile], _acc, ""} when is_map(profile) ->
        {:ok, profile}

      {profile, _acc, ""} when is_map(profile) ->
        {:ok, profile}

      {other, _acc, ""} ->
        {:error,
         %QuackDB.Error{message: "expected DuckDB profile JSON object, got: #{inspect(other)}"}}

      {:error, error} ->
        {:error,
         %QuackDB.Error{message: "could not decode DuckDB profile JSON: #{inspect(error)}"}}
    end
  end

  defp profile_decoders do
    [
      object_push: fn key, value, acc -> [{existing_profile_key(key), value} | acc] end,
      object_finish: fn acc, old_acc -> {Map.new(acc), old_acc} end
    ]
  end

  defp existing_profile_key("optimizer_" <> _ = key), do: key

  defp existing_profile_key(key) do
    String.to_existing_atom(key)
  rescue
    ArgumentError -> key
  end

  defp optimizer_metrics(decoded) do
    decoded
    |> Enum.filter(fn {key, value} ->
      is_binary(key) and String.starts_with?(key, "optimizer_") and is_number(value)
    end)
    |> Map.new(fn {key, value} -> {String.replace_prefix(key, "optimizer_", ""), value} end)
  end

  defp timing_total(%__MODULE__{cpu_time: value}) when is_number(value) and value > 0, do: value

  defp timing_total(%__MODULE__{} = profile) do
    total = Enum.reduce(profile.children, 0, &(&2 + total_operator_timing(&1)))
    if total > 0, do: total, else: nil
  end

  defp total_operator_timing(%Operator{} = operator) do
    (operator.operator_timing || 0) +
      Enum.reduce(operator.children, 0, &(&2 + total_operator_timing(&1)))
  end

  defp flatten_operator(%Operator{} = operator, path, total) do
    row = %{
      path: Enum.reverse(path),
      name: operator.operator_name,
      type: operator.operator_type,
      timing: operator.operator_timing,
      timing_percent: timing_percent(operator.operator_timing, total),
      cpu_time: operator.cpu_time,
      cardinality: operator.operator_cardinality,
      rows_scanned: operator.operator_rows_scanned,
      cumulative_cardinality: operator.cumulative_cardinality,
      cumulative_rows_scanned: operator.cumulative_rows_scanned,
      result_set_size: operator.result_set_size,
      extra_info: operator.extra_info || %{}
    }

    child_rows =
      operator.children
      |> Enum.with_index()
      |> Enum.flat_map(fn {child, index} -> flatten_operator(child, [index | path], total) end)

    [row | child_rows]
  end

  defp timing_percent(timing, total) when is_number(timing) and is_number(total) and total > 0 do
    Float.round(timing / total * 100, 1)
  end

  defp timing_percent(_timing, _total), do: nil

  defp metric_line(label, value), do: [String.pad_trailing(label <> ":", 16), value, "\n"]

  defp operator_line(row) do
    [
      String.pad_leading(format_percent(row.timing_percent), 6),
      "   ",
      String.pad_leading(seconds(row.timing), 8),
      "  ",
      String.pad_leading(integer(row.cardinality), 8),
      "  ",
      String.pad_leading(integer(row.rows_scanned), 8),
      "   ",
      row.name || row.type || "?",
      "\n"
    ]
  end

  defp format_percent(nil), do: "-"
  defp format_percent(value), do: :io_lib.format("~.1f", [value]) |> IO.iodata_to_binary()

  defp seconds(nil), do: "-"

  defp seconds(value) when is_number(value) do
    :io_lib.format("~.3fms", [value * 1000]) |> IO.iodata_to_binary()
  end

  defp integer(nil), do: "-"

  defp integer(value) when is_integer(value),
    do: value |> Integer.to_string() |> add_integer_separators()

  defp integer(value), do: to_string(value)

  defp bytes(nil), do: "-"
  defp bytes(value) when is_integer(value) and value < 1024, do: "#{value} B"

  defp bytes(value) when is_integer(value) and value < 1_048_576 do
    :io_lib.format("~.1f KB", [value / 1024]) |> IO.iodata_to_binary()
  end

  defp bytes(value) when is_integer(value) do
    :io_lib.format("~.1f MB", [value / 1_048_576]) |> IO.iodata_to_binary()
  end

  defp add_integer_separators(value) do
    value
    |> String.reverse()
    |> String.graphemes()
    |> Enum.chunk_every(3)
    |> Enum.map_join(",", &Enum.join/1)
    |> String.reverse()
  end
end