Skip to main content

lib/quack_db/source.ex

defmodule QuackDB.Source do
  @moduledoc """
  Safe SQL fragment builders for DuckDB table-producing data sources.

  DuckDB can query files, object stores, and lakehouse tables directly through
  table functions such as `read_parquet/2`, `read_csv/2`, `read_json/2`,
  `read_xlsx/2`, `delta_scan/2`, and `iceberg_scan/2`. This module builds those
  fragments with QuackDB's SQL literal formatting so callers do not need to
  manually interpolate paths or options.

  These helpers return SQL fragments, not executable queries. Use them in raw SQL
  that is sent to `QuackDB.query/4` or `Ecto.Adapters.SQL.query/4`:

      source = QuackDB.Source.parquet("s3://bucket/events/*.parquet", hive_partitioning: true)
      QuackDB.query!(conn, ["SELECT count(*) FROM ", source])

  Options are emitted as DuckDB named parameters:

      QuackDB.Source.csv("events.csv", header: true, columns: %{id: "INTEGER", name: "VARCHAR"})

  Plain Elixir maps are formatted as DuckDB struct literals, which is the shape
  used by options such as `columns`. Use `{:map, map}` when a DuckDB `MAP {...}`
  literal is required.
  """

  alias QuackDB.Error
  alias QuackDB.SQL

  @type path_or_paths :: String.t() | [String.t()]
  @type option_value ::
          SQL.parameter()
          | atom()
          | %{optional(atom() | String.t() | integer()) => option_value()}
          | {:struct, keyword(option_value()) | map()}
          | {:map, map()}

  @doc "Builds a `read_parquet(...)` table function fragment."
  @spec parquet(path_or_paths(), keyword(option_value())) :: String.t()
  def parquet(path_or_paths, options \\ []),
    do: table_function("read_parquet", path_or_paths, options)

  @doc "Builds a `read_csv(...)` table function fragment."
  @spec csv(path_or_paths(), keyword(option_value())) :: String.t()
  def csv(path_or_paths, options \\ []), do: table_function("read_csv", path_or_paths, options)

  @doc "Builds a `read_json(...)` table function fragment."
  @spec json(path_or_paths(), keyword(option_value())) :: String.t()
  def json(path_or_paths, options \\ []), do: table_function("read_json", path_or_paths, options)

  @doc "Builds a `read_xlsx(...)` table function fragment."
  @spec xlsx(String.t(), keyword(option_value())) :: String.t()
  def xlsx(path, options \\ []) when is_binary(path),
    do: table_function("read_xlsx", path, options)

  @doc "Builds a `delta_scan(...)` table function fragment."
  @spec delta(path_or_paths(), keyword(option_value())) :: String.t()
  def delta(path_or_paths, options \\ []),
    do: table_function("delta_scan", path_or_paths, options)

  @doc "Builds an `iceberg_scan(...)` table function fragment."
  @spec iceberg(path_or_paths(), keyword(option_value())) :: String.t()
  def iceberg(path_or_paths, options \\ []),
    do: table_function("iceberg_scan", path_or_paths, options)

  @doc "Builds a `histogram_values(...)` table function fragment."
  @spec histogram_values(String.t(), String.t() | atom(), keyword(option_value())) :: String.t()
  def histogram_values(source, column, options \\ [])
      when is_binary(source) and is_list(options) do
    column = option_name(column)

    ["histogram_values(", source, ", ", column, options(:duckdb_named, options), ")"]
    |> IO.iodata_to_binary()
  end

  @doc "Builds a DuckDB table-function fragment for a validated function name."
  @spec table_function(String.t(), path_or_paths(), keyword(option_value())) :: String.t()
  def table_function(function_name, path_or_paths, options \\ [])
      when is_binary(function_name) and is_list(options) do
    validate_identifier!(function_name, :function)

    [function_name, "(", literal!(path_or_paths), options(:equals, options), ")"]
    |> IO.iodata_to_binary()
  end

  @doc "Wraps a source in a DuckDB `USING SAMPLE` subquery."
  @spec sample(String.t(), keyword()) :: String.t()
  def sample(source, options) when is_binary(source) and is_list(options) do
    ["(SELECT * FROM ", source, " USING SAMPLE ", sample_clause(options), ")"]
    |> IO.iodata_to_binary()
  end

  @doc "Returns true when a value looks like a QuackDB source table-function fragment."
  @spec source?(term()) :: boolean()
  def source?(value) when is_binary(value) do
    String.starts_with?(value, "(SELECT ") or source_table_function?(value)
  end

  def source?(_value), do: false

  defp source_table_function?(value) do
    case table_function_name(value) do
      "read_parquet" -> true
      "read_csv" -> true
      "read_json" -> true
      "read_xlsx" -> true
      "delta_scan" -> true
      "iceberg_scan" -> true
      "histogram_values" -> true
      "generate_series" -> true
      _other -> false
    end
  end

  defp sample_clause(options) do
    cond do
      rows = options[:rows] -> [literal!(rows), " ROWS"]
      percent = options[:percent] -> [literal!(percent), " PERCENT"]
      true -> raise ArgumentError, "expected :rows or :percent sample option"
    end
  end

  defp options(_style, []), do: []

  defp options(style, options) do
    Enum.map(options, fn {name, value} ->
      [", ", option_name(name), option_separator(style), literal!(value)]
    end)
  end

  defp option_separator(:equals), do: " = "
  defp option_separator(:duckdb_named), do: " := "

  defp option_name(name) when is_atom(name), do: name |> Atom.to_string() |> option_name()

  defp option_name(name) when is_binary(name) do
    validate_identifier!(name, :option)
    name
  end

  defp option_name(name) do
    raise ArgumentError,
          "expected DuckDB option name to be an atom or string, got: #{inspect(name)}"
  end

  defp literal!({:struct, values}) when is_list(values) or is_map(values),
    do: struct_literal(values)

  defp literal!({:map, values}) when is_map(values), do: ["MAP ", map_entries(values)]
  defp literal!(values) when is_map(values), do: struct_literal(values)

  defp literal!(value) when is_atom(value) and not is_boolean(value) and not is_nil(value),
    do: literal!(Atom.to_string(value))

  defp literal!(value) do
    case SQL.literal(value) do
      {:ok, literal} -> literal
      {:error, %Error{} = error} -> raise error
    end
  end

  defp struct_literal(values), do: map_entries(values)

  defp map_entries(values) do
    entries =
      values
      |> Enum.map(fn {key, value} -> [literal_key(key), ": ", literal!(value)] end)
      |> Enum.intersperse(", ")

    ["{", entries, "}"]
  end

  defp literal_key(key) when is_atom(key), do: key |> Atom.to_string() |> literal_key()
  defp literal_key(key) when is_binary(key), do: ["'", String.replace(key, "'", "''"), "'"]
  defp literal_key(key) when is_integer(key), do: Integer.to_string(key)

  defp literal_key(key) do
    raise ArgumentError, "unsupported DuckDB literal key: #{inspect(key)}"
  end

  defp table_function_name(value) do
    case :binary.match(value, "(") do
      {index, 1} when index > 0 -> binary_part(value, 0, index)
      _not_found -> nil
    end
  end

  defp validate_identifier!(value, kind) do
    unless QuackDB.Identifier.valid?(value) do
      raise ArgumentError, "invalid DuckDB #{kind} identifier: #{inspect(value)}"
    end

    :ok
  end
end