Skip to main content

lib/quack_db/result.ex

defmodule QuackDB.Result do
  @moduledoc """
  Normalized query result.

  The shape mirrors what `Ecto.Adapters.SQL` expects from DBConnection-backed
  drivers: `rows` and `num_rows` are always present, while `columns`,
  `connection_id`, `messages`, and `metadata` keep Quack-specific result
  information available.
  """

  @type command ::
          :select
          | :insert
          | :update
          | :delete
          | :create
          | :drop
          | :alter
          | :begin
          | :commit
          | :rollback
          | atom()

  @type t :: %__MODULE__{
          command: command() | nil,
          columns: [String.t()] | nil,
          rows: [[term()]] | nil,
          num_rows: non_neg_integer(),
          connection_id: String.t() | nil,
          messages: [map()] | nil,
          metadata: map()
        }

  defstruct command: nil,
            columns: nil,
            rows: nil,
            num_rows: 0,
            connection_id: nil,
            messages: nil,
            metadata: %{}

  @affecting_commands [:insert, :update, :delete]
  @schema_commands [:create, :drop, :alter]

  @spec normalize(t()) :: t()
  def normalize(%__MODULE__{command: command, columns: ["Count"], rows: [[count]]} = result)
      when command in @affecting_commands and is_integer(count) and count >= 0 do
    %{result | columns: nil, rows: nil, num_rows: count, metadata: raw_count_metadata(result)}
  end

  def normalize(%__MODULE__{command: command, columns: ["Count"], rows: []} = result)
      when command in @schema_commands do
    %{result | columns: nil, rows: nil, num_rows: 0, metadata: raw_count_metadata(result)}
  end

  def normalize(%__MODULE__{} = result), do: result

  @doc """
  Converts a row-oriented result into a column-oriented map.

  Duplicate column names are disambiguated with suffixes such as `_2` and `_3`,
  matching `QuackDB.maps/4`.
  """
  @spec to_columns(t()) :: %{String.t() => [term()]}
  def to_columns(%__MODULE__{} = result), do: result |> to_columnar() |> Map.fetch!(:columns)

  @doc """
  Converts a row-oriented result into a `QuackDB.Columns` struct.
  """
  @spec to_columnar(t()) :: QuackDB.Columns.t()
  def to_columnar(%__MODULE__{metadata: %{columnar_chunks: chunks}, columns: columns} = result)
      when is_list(columns) do
    keys = disambiguate_columns(columns)
    column_values = columnar_chunk_values(chunks, keys)

    %QuackDB.Columns{
      names: keys,
      original_names: result.columns,
      columns: column_values,
      num_rows: result.num_rows,
      command: result.command,
      connection_id: result.connection_id,
      messages: result.messages,
      metadata: Map.delete(result.metadata, :columnar_chunks)
    }
  end

  def to_columnar(%__MODULE__{columns: columns, rows: rows} = result)
      when is_list(columns) and is_list(rows) do
    keys = disambiguate_columns(columns)
    initial = Map.new(keys, &{&1, []})

    columns =
      rows
      |> Enum.reduce(initial, fn row, acc ->
        keys
        |> Enum.zip(row)
        |> Enum.reduce(acc, fn {key, value}, acc -> Map.update!(acc, key, &[value | &1]) end)
      end)
      |> Map.new(fn {key, values} -> {key, Enum.reverse(values)} end)

    %QuackDB.Columns{
      names: keys,
      original_names: result.columns,
      columns: columns,
      num_rows: result.num_rows,
      command: result.command,
      connection_id: result.connection_id,
      messages: result.messages,
      metadata: result.metadata
    }
  end

  def to_columnar(%__MODULE__{} = result) do
    %QuackDB.Columns{
      command: result.command,
      connection_id: result.connection_id,
      messages: result.messages,
      metadata: result.metadata
    }
  end

  @doc false
  @spec disambiguate_columns([String.t()]) :: [String.t()]
  def disambiguate_columns(columns) do
    {columns, _counts} =
      Enum.map_reduce(columns, %{}, fn column, counts ->
        counts = Map.update(counts, column, 1, &(&1 + 1))

        case counts[column] do
          1 -> {column, counts}
          count -> {"#{column}_#{count}", counts}
        end
      end)

    columns
  end

  defp columnar_chunk_values([chunk], keys) do
    keys
    |> Enum.zip(chunk.columns)
    |> Map.new(fn {key, column} -> {key, column.values} end)
  end

  defp columnar_chunk_values(chunks, keys) do
    initial = Map.new(keys, &{&1, []})

    chunks
    |> Enum.reduce(initial, fn chunk, acc ->
      keys
      |> Enum.zip(chunk.columns)
      |> Enum.reduce(acc, fn {key, column}, acc ->
        Map.update!(acc, key, &[column.values | &1])
      end)
    end)
    |> Map.new(fn {key, value_groups} ->
      {key, value_groups |> Enum.reverse() |> List.flatten()}
    end)
  end

  defp raw_count_metadata(result) do
    result.metadata
    |> Map.put(:duckdb_columns, result.columns)
    |> Map.put(:duckdb_rows, result.rows)
  end
end

defimpl Inspect, for: QuackDB.Result do
  import Inspect.Algebra

  alias QuackDB.Inspect, as: QuackInspect

  def inspect(result, opts) do
    rows_count = QuackInspect.rows_summary(result.rows)
    preview = QuackInspect.rows_preview(result.rows)
    needs_more_fetch? = result.metadata[:needs_more_fetch]

    fields = [
      command: result.command,
      columns: result.columns,
      rows: rows_count,
      preview: preview,
      connection_id: QuackInspect.short_id(result.connection_id),
      needs_more_fetch?: needs_more_fetch?
    ]

    concat(QuackInspect.container("QuackDB.Result", fields, opts))
  end
end

if Code.ensure_loaded?(Table.Reader) do
  defimpl Table.Reader, for: QuackDB.Result do
    def init(%{columns: columns}) when columns in [nil, []], do: :none

    def init(%{rows: rows} = result) do
      columns = QuackDB.Result.disambiguate_columns(result.columns)
      {:rows, %{columns: columns, count: result.num_rows}, rows || []}
    end
  end
end