defmodule QuackDB do
@moduledoc """
Remote DuckDB Quack protocol client.
The public API is backed by `DBConnection` so it can grow into an Ecto adapter
without changing the lower-level protocol codec.
"""
alias QuackDB.Query
alias QuackDB.Stream
@type start_option ::
{:uri, String.t()}
| {:token, String.t()}
| {:name, GenServer.name()}
| {:connect_timeout, timeout()}
| {:receive_timeout, timeout()}
| {:shutdown_timeout, timeout()}
| {:mint_options, keyword()}
@type insert_row :: map() | Keyword.t()
@type insert_column :: {atom() | String.t(), [term()]}
@typedoc "Native append column type specs."
@type append_type :: QuackDB.Type.spec()
@spec start_link([start_option]) :: GenServer.on_start()
def start_link(options) do
QuackDB.DBConnection.start_link(options)
end
@spec child_spec([start_option]) :: Supervisor.child_spec()
def child_spec(options) do
QuackDB.DBConnection.child_spec(options)
end
@doc """
Appends row-oriented values to a DuckDB table through Quack's native append protocol.
Rows are maps or keywords. Pass `:columns` with type specs when values are empty,
contain only nils, or need a specific nested DuckDB type.
Plain Elixir maps infer as DuckDB `STRUCT` values. For explicit
`{:map, key_type, value_type}` columns, ordinary Elixir maps are encoded as
DuckDB `MAP` values:
QuackDB.insert_rows!(conn, "events", [[labels: %{env: "prod"}]],
columns: [labels: {:map, :varchar, :varchar}]
)
DuckDB-style key/value entries are also accepted for explicit MAP columns:
QuackDB.insert_rows!(conn, "events", [[labels: [%{key: "env", value: "prod"}]]],
columns: [labels: {:map, :varchar, :varchar}]
)
Duplicate MAP keys decode with the later entry winning, matching `Map.put/3`.
Keys and values are encoded through the declared DuckDB key/value types; for
example, atom keys in `{:map, :varchar, :varchar}` columns become strings.
"""
@spec insert_rows(DBConnection.conn(), String.t() | atom(), [insert_row()], Keyword.t()) ::
{:ok, QuackDB.Result.t()} | {:error, Exception.t()}
def insert_rows(connection, table, rows, options \\ []) when is_list(rows) do
with_connection(connection, options, fn conn ->
query = %Query{
statement: "APPEND #{table}",
operation: {:insert_rows, table, rows, options}
}
case DBConnection.prepare_execute(conn, query, [], options) do
{:ok, _query, result} -> {:ok, result}
{:error, _error} = error -> error
end
end)
end
@spec insert_rows!(DBConnection.conn(), String.t() | atom(), [insert_row()], Keyword.t()) ::
QuackDB.Result.t()
def insert_rows!(connection, table, rows, options \\ []) do
case insert_rows(connection, table, rows, options) do
{:ok, result} -> result
{:error, error} -> raise error
end
end
@doc """
Appends column-oriented values to a DuckDB table through Quack's native append protocol.
Column values are provided as `{name, values}` pairs. All columns must have the
same row count. Pass `:columns` with type specs when values are empty, contain
only nils, or need a specific nested DuckDB type. Explicit MAP columns accept
ordinary Elixir maps and DuckDB-style key/value entries.
"""
@spec insert_columns(DBConnection.conn(), String.t() | atom(), [insert_column()], Keyword.t()) ::
{:ok, QuackDB.Result.t()} | {:error, Exception.t()}
def insert_columns(connection, table, columns, options \\ []) when is_list(columns) do
with_connection(connection, options, fn conn ->
query = %Query{
statement: "APPEND #{table}",
operation: {:insert_columns, table, columns, options}
}
case DBConnection.prepare_execute(conn, query, [], options) do
{:ok, _query, result} -> {:ok, result}
{:error, _error} = error -> error
end
end)
end
@spec insert_columns!(DBConnection.conn(), String.t() | atom(), [insert_column()], Keyword.t()) ::
QuackDB.Result.t()
def insert_columns!(connection, table, columns, options \\ []) do
case insert_columns(connection, table, columns, options) do
{:ok, result} -> result
{:error, error} -> raise error
end
end
@doc "Appends an enumerable of row maps/keywords in batches through native append."
@spec insert_stream(DBConnection.conn(), String.t() | atom(), Enumerable.t(), Keyword.t()) ::
{:ok, QuackDB.Result.t()} | {:error, Exception.t()}
def insert_stream(connection, table, rows, options \\ []) do
chunk_every = Keyword.get(options, :chunk_every, Keyword.get(options, :batch_size, 1000))
if not (is_integer(chunk_every) and chunk_every > 0) do
raise ArgumentError,
"expected :chunk_every to be a positive integer, got: #{inspect(chunk_every)}"
end
rows
|> Elixir.Stream.chunk_every(chunk_every)
|> Enum.reduce_while({:ok, nil, 0}, fn batch, {:ok, _result, total_rows} ->
case insert_rows(connection, table, batch, options) do
{:ok, result} -> {:cont, {:ok, result, total_rows + result.num_rows}}
{:error, error} -> {:halt, {:error, error}}
end
end)
|> case do
{:ok, nil, _total_rows} -> {:ok, nil}
{:ok, result, total_rows} -> {:ok, %{result | num_rows: total_rows}}
{:error, error} -> {:error, error}
end
end
@spec insert_stream!(DBConnection.conn(), String.t() | atom(), Enumerable.t(), Keyword.t()) ::
QuackDB.Result.t() | nil
def insert_stream!(connection, table, rows, options \\ []) do
case insert_stream(connection, table, rows, options) do
{:ok, result} -> result
{:error, error} -> raise error
end
end
if Code.ensure_loaded?(Table.Reader) do
@doc "Appends any `Table.Reader` compatible tabular data through native append."
def insert_table(connection, table, tabular, options \\ []) do
columns =
Table.to_columns(tabular)
|> Enum.map(fn {name, values} -> {name, Enum.to_list(values)} end)
insert_columns(connection, table, columns, options)
end
def insert_table!(connection, table, tabular, options \\ []) do
case insert_table(connection, table, tabular, options) do
{:ok, result} -> result
{:error, error} -> raise error
end
end
end
@spec query(DBConnection.conn(), iodata(), [term()], Keyword.t()) ::
{:ok, QuackDB.Result.t()} | {:error, Exception.t()}
def query(connection, statement, params \\ [], options \\ []) do
with_connection(connection, options, fn conn ->
query = %Query{statement: statement}
case DBConnection.prepare_execute(conn, query, params, options) do
{:ok, _query, result} -> {:ok, result}
{:error, _error} = error -> error
end
end)
end
@spec query!(DBConnection.conn(), iodata(), [term()], Keyword.t()) :: QuackDB.Result.t()
def query!(connection, statement, params \\ [], options \\ []) do
case query(connection, statement, params, options) do
{:ok, result} -> result
{:error, error} -> raise error
end
end
@doc """
Runs a query and returns its result as a column-oriented map.
Duplicate column names are disambiguated with suffixes such as `_2` and `_3`.
Prefer `columnar/4` when you also need column order and result metadata.
"""
@spec columns(DBConnection.conn(), iodata(), [term()], Keyword.t()) ::
{:ok, %{String.t() => [term()]}} | {:error, Exception.t()}
def columns(connection, statement, params \\ [], options \\ []) do
case query(connection, statement, params, options) do
{:ok, result} -> {:ok, QuackDB.Result.to_columns(result)}
{:error, _error} = error -> error
end
end
@spec columns!(DBConnection.conn(), iodata(), [term()], Keyword.t()) :: %{
String.t() => [term()]
}
def columns!(connection, statement, params \\ [], options \\ []) do
case columns(connection, statement, params, options) do
{:ok, columns} -> columns
{:error, error} -> raise error
end
end
@doc """
Runs a query and returns a `QuackDB.Columns` struct.
This preserves column order, original names, row count, and result metadata in
addition to the column vectors.
"""
@spec columnar(DBConnection.conn(), iodata(), [term()], Keyword.t()) ::
{:ok, QuackDB.Columns.t()} | {:error, Exception.t()}
def columnar(connection, statement, params \\ [], options \\ []) do
case query(connection, statement, params, options) do
{:ok, result} -> {:ok, QuackDB.Result.to_columnar(result)}
{:error, _error} = error -> error
end
end
@spec columnar!(DBConnection.conn(), iodata(), [term()], Keyword.t()) :: QuackDB.Columns.t()
def columnar!(connection, statement, params \\ [], options \\ []) do
case columnar(connection, statement, params, options) do
{:ok, columns} -> columns
{:error, error} -> raise error
end
end
@doc """
Streams query results as column-oriented batches.
Each item is a map from disambiguated column names to the values in that fetch
batch. This keeps large analytical results vector-shaped without materializing
the whole result set. Prefer `columnar_batches/4` when you also need batch
metadata.
"""
@spec column_batches(DBConnection.conn(), iodata(), [term()], Keyword.t()) :: Enumerable.t()
def column_batches(connection, statement, params \\ [], options \\ []) do
connection
|> columnar_batches(statement, params, options)
|> Elixir.Stream.map(& &1.columns)
|> Elixir.Stream.reject(&(&1 == %{}))
end
@doc """
Streams query results as `QuackDB.Columns` batches.
This uses a columnar cursor path so large analytical results can stay
vector-shaped instead of being materialized as row lists first.
"""
@spec columnar_batches(DBConnection.conn(), iodata(), [term()], Keyword.t()) :: Enumerable.t()
def columnar_batches(connection, statement, params \\ [], options \\ []) do
options = Keyword.put(options, :result_format, :columnar)
connection
|> stream(statement, params, options)
|> Elixir.Stream.map(&QuackDB.Result.to_columnar/1)
|> Elixir.Stream.reject(&(&1.names == []))
end
@spec ping(DBConnection.conn(), Keyword.t()) :: :ok | {:error, Exception.t()}
def ping(connection, options \\ []) do
case query(connection, "SELECT 1", [], options) do
{:ok, _result} -> :ok
{:error, _error} = error -> error
end
end
@spec prepare(DBConnection.conn(), iodata(), Keyword.t()) ::
{:ok, Query.t()} | {:error, Exception.t()}
def prepare(connection, statement, options \\ []) do
DBConnection.prepare(connection, %Query{statement: statement}, options)
end
@spec prepare_execute(DBConnection.conn(), iodata(), [term()], Keyword.t()) ::
{:ok, Query.t(), QuackDB.Result.t()} | {:error, Exception.t()}
def prepare_execute(connection, statement, params \\ [], options \\ []) do
DBConnection.prepare_execute(connection, %Query{statement: statement}, params, options)
end
@spec stream(DBConnection.conn(), iodata(), [term()], Keyword.t()) :: Stream.t()
def stream(connection, statement, params \\ [], options \\ []) do
%Stream{
conn: connection,
query: %Query{statement: statement},
params: params,
options: options
}
end
@spec rows(DBConnection.conn(), iodata(), [term()], Keyword.t()) :: Enumerable.t()
def rows(connection, statement, params \\ [], options \\ []) do
connection
|> stream(statement, params, options)
|> Elixir.Stream.flat_map(&(&1.rows || []))
end
@spec maps(DBConnection.conn(), iodata(), [term()], Keyword.t()) :: Enumerable.t()
def maps(connection, statement, params \\ [], options \\ []) do
connection
|> stream(statement, params, options)
|> Elixir.Stream.flat_map(&result_maps/1)
end
defp with_connection(connection, options, fun) when is_atom(connection) do
cond do
not function_exported?(connection, :__adapter__, 0) ->
fun.(connection)
not Code.ensure_loaded?(Ecto.Repo.Registry) or
not Code.ensure_loaded?(Ecto.Adapters.SQL) ->
fun.(connection)
connection.__adapter__() != Ecto.Adapters.QuackDB ->
raise ArgumentError,
"expected a QuackDB connection or Ecto.Adapters.QuackDB repo, got: #{inspect(connection)}"
true ->
connection.checkout(
fn ->
connection
|> ecto_adapter_meta()
|> checked_out_ecto_connection()
|> fun.()
end,
options
)
end
end
defp with_connection(connection, _options, fun), do: fun.(connection)
defp ecto_adapter_meta(repo) do
repo.get_dynamic_repo()
|> then(&apply(Ecto.Repo.Registry, :lookup, [&1]))
end
defp checked_out_ecto_connection(%{pid: pool}) do
case Process.get({Ecto.Adapters.SQL, pool}) do
nil -> raise ArgumentError, "Ecto repo checkout did not provide a QuackDB connection"
:undefined -> raise ArgumentError, "Ecto repo checkout did not provide a QuackDB connection"
connection -> connection
end
end
defp result_maps(%QuackDB.Result{columns: columns, rows: rows})
when is_list(columns) and is_list(rows) do
map_keys = QuackDB.Result.disambiguate_columns(columns)
Enum.map(rows, fn row -> map_keys |> Enum.zip(row) |> Map.new() end)
end
defp result_maps(_result), do: []
end