defmodule QuackDB.DBConnection do
@moduledoc """
DBConnection implementation for the remote DuckDB Quack protocol.
This module owns connection lifecycle, request execution, cursor-based streaming, transaction callbacks, and result normalization. HTTP transport and binary protocol encoding are delegated so the codec remains independent from DBConnection.
"""
use DBConnection
alias QuackDB.Error
alias QuackDB.Protocol.Codec
alias QuackDB.Protocol.DataChunk
alias QuackDB.Protocol.Message.ConnectionRequest
alias QuackDB.Protocol.Message.ConnectionResponse
alias QuackDB.Protocol.Message.Disconnect
alias QuackDB.Protocol.Message.ErrorResponse
alias QuackDB.Protocol.Message.AppendRequest
alias QuackDB.Protocol.Message.FetchRequest
alias QuackDB.Protocol.Message.FetchResponse
alias QuackDB.Protocol.Message.PrepareRequest
alias QuackDB.Protocol.Message.PrepareResponse
alias QuackDB.Query
alias QuackDB.Result
alias QuackDB.Telemetry
@disconnect_timeout 1_000
@sql_command_atoms ~w(
alter analyze attach begin call checkpoint commit copy create delete detach drop explain insert
install load pragma rollback select set summarize truncate update use vacuum
)a
@sql_command_map Map.new(@sql_command_atoms, &{Atom.to_string(&1), &1})
@sql_command_aliases %{"from" => :select, "values" => :select}
defstruct [
:uri,
:token,
:connection_id,
:server,
:transport,
:transport_owner,
:transport_options,
:client_version,
:telemetry_prefix,
next_query_id: 1,
status: :idle,
cursors: %{}
]
@type state :: %__MODULE__{
uri: URI.t(),
token: String.t(),
connection_id: String.t() | nil,
server: ConnectionResponse.t() | nil,
transport: function(),
transport_owner: pid() | nil,
transport_options: keyword(),
client_version: String.t(),
telemetry_prefix: [atom()],
next_query_id: pos_integer(),
status: DBConnection.status(),
cursors: map()
}
@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(options) do
DBConnection.start_link(__MODULE__, options)
end
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(options) do
DBConnection.child_spec(__MODULE__, options)
end
@impl true
def connect(options) do
with {:ok, state} <- build_state(options),
{:ok, state} <- connect_quack(state) do
{:ok, state}
end
end
@impl true
def checkout(state), do: {:ok, state}
@impl true
def ping(state) do
case execute_statement(%Query{statement: "SELECT 1"}, [], [], state) do
{:ok, _query, _result, state} -> {:ok, state}
{:error, error, state} -> {:disconnect, error, state}
end
end
@impl true
def handle_prepare(%Query{operation: nil} = query, _options, state) do
{:ok, %{query | statement: IO.iodata_to_binary(query.statement)}, state}
end
def handle_prepare(%Query{} = query, _options, state), do: {:ok, query, state}
@impl true
def handle_execute(
%Query{operation: {:insert_rows, table, rows, insert_options}} = query,
params,
options,
state
) do
if params == [] do
append_rows(query, table, rows, Keyword.merge(insert_options, options), state)
else
{:error,
Error.new(:unsupported_params, "append queries do not accept params", source: :client),
state}
end
end
def handle_execute(
%Query{operation: {:insert_columns, table, columns, insert_options}} = query,
params,
options,
state
) do
if params == [] do
append_columns(query, table, columns, Keyword.merge(insert_options, options), state)
else
{:error,
Error.new(:unsupported_params, "append queries do not accept params", source: :client),
state}
end
end
def handle_execute(%Query{} = query, params, options, state) do
execute_statement(query, params, options, state)
end
@impl true
def handle_close(_query, _options, state) do
{:ok, empty_result(:close), state}
end
@impl true
def handle_begin(_options, %{status: :idle} = state) do
transaction_statement("BEGIN", :begin, :transaction, state)
end
def handle_begin(_options, state), do: {state.status, state}
@impl true
def handle_commit(_options, %{status: :transaction} = state) do
transaction_statement("COMMIT", :commit, :idle, state)
end
def handle_commit(_options, state), do: {state.status, state}
@impl true
def handle_rollback(_options, %{status: status} = state)
when status in [:transaction, :error] do
transaction_statement("ROLLBACK", :rollback, :idle, state)
end
def handle_rollback(_options, state), do: {state.status, state}
@impl true
def handle_status(_options, state), do: {state.status, state}
@impl true
def handle_declare(%Query{} = query, params, options, state) do
declare_query(query, params, options, state)
end
@impl true
def handle_fetch(_query, %QuackDB.Cursor{} = cursor, options, state) do
cursor_state = Map.fetch!(state.cursors, cursor.ref)
if cursor_state.mode == :columnar do
handle_fetch_columnar(cursor_state, cursor, options, state)
else
handle_fetch_rows(cursor_state, cursor, options, state)
end
end
@impl true
def handle_deallocate(_query, %QuackDB.Cursor{} = cursor, _options, state) do
{:ok, cursor_result(cursor, []), %{state | cursors: Map.delete(state.cursors, cursor.ref)}}
end
@impl true
def disconnect(_error, %{connection_id: nil}), do: :ok
def disconnect(_error, state) do
request = Codec.encode(%Disconnect{}, connection_id: state.connection_id)
_ignored = state.transport.(state.uri, request, timeout: @disconnect_timeout)
if state.transport_owner do
GenServer.stop(state.transport_owner, :normal, @disconnect_timeout)
end
:ok
end
defp declare_query(%Query{} = query, params, options, state) do
with {:ok, statement} <- QuackDB.SQL.format(query.statement, params) do
do_declare_query(query, statement, options, state)
else
{:error, error} -> {:error, error, state}
end
end
defp do_declare_query(%Query{} = query, statement, options, state) do
{query_id, state} = next_query_id(state)
options = Keyword.put(options, :client_query_id, query_id)
request =
%PrepareRequest{sql_query: statement}
|> Codec.encode(connection_id: state.connection_id, client_query_id: query_id)
with {:ok, response} <- state.transport.(state.uri, request, options),
{:ok, decoded} <- Codec.decode(response),
{:ok, query, cursor, cursor_state} <-
normalize_declare_response(decoded, query, options, state) do
state = put_cursor_state(state, cursor.ref, cursor_state)
{:ok, query, cursor, state}
else
{:error, error} ->
{:error, annotate_error(error, query, state),
%{state | status: failed_status(state.status)}}
end
end
defp normalize_declare_response(
{_header, %ErrorResponse{message: message}},
_query,
_options,
_state
) do
{:error, Error.new(:server_error, message, source: :server)}
end
defp normalize_declare_response({_header, %PrepareResponse{} = response}, query, options, state) do
ref = make_ref()
query = %{
query
| columns: response.result_names,
result_types: response.result_types,
result_uuid: response.result_uuid
}
cursor = %QuackDB.Cursor{
ref: ref,
result_uuid: response.result_uuid,
columns: response.result_names,
result_types: response.result_types,
connection_id: state.connection_id,
statement: query.statement
}
cursor_state =
if Keyword.get(options, :result_format) == :columnar do
%{
mode: :columnar,
queued_chunks: response.results,
done?: not response.needs_more_fetch
}
else
%{
mode: :rows,
queued_rows: materialize_rows(response.results, response.result_names),
done?: not response.needs_more_fetch
}
end
{:ok, query, cursor, cursor_state}
end
defp normalize_declare_response({header, _body}, _query, _options, _state) do
{:error,
Error.new(:unexpected_message, "expected prepare response, got #{header.type}",
source: :protocol
)}
end
defp append_rows(%Query{} = query, table, rows, options, state) do
with {:ok, columns} <- DataChunk.columns_from_rows(rows, options),
{:ok, batches} <- append_batches(rows, options) do
options = Keyword.put(options, :columns, columns)
{query_id, state} = next_query_id(state)
options = Keyword.put(options, :client_query_id, query_id)
metadata = append_metadata(query, table, rows, batches, options, state)
Telemetry.span(state.telemetry_prefix, :append, metadata, fn ->
append_started_at = System.monotonic_time()
{result, append_metrics} =
case append_batches(table, batches, options, state) do
{:ok, metrics} ->
result = append_result(rows, state)
{{:ok, query, result, %{state | status: successful_status(state.status)}}, metrics}
{:error, error} ->
{{:error, annotate_error(error, query, state),
%{state | status: failed_status(state.status)}}, empty_append_metrics()}
end
append_duration = System.monotonic_time() - append_started_at
{result, append_stop_metadata(result, append_metrics, append_duration)}
end)
else
{:error, error} ->
{:error, annotate_error(error, query, state),
%{state | status: failed_status(state.status)}}
end
end
defp append_batches([], options) do
case Keyword.get(options, :batch_size, 1) do
batch_size when is_integer(batch_size) and batch_size >= 1 -> {:ok, [[]]}
_batch_size -> invalid_batch_size()
end
end
defp append_batches(rows, options) do
row_count = length(rows)
batch_size = Keyword.get(options, :batch_size, row_count)
cond do
not (is_integer(batch_size) and batch_size >= 1) -> invalid_batch_size()
batch_size >= row_count -> {:ok, [rows]}
true -> {:ok, Enum.chunk_every(rows, batch_size)}
end
end
defp invalid_batch_size do
{:error,
Error.new(:invalid_batch_size, "append batch_size must be a positive integer",
source: :client
)}
end
defp append_columns(%Query{} = query, table, columns, options, state) do
with {:ok, row_count} <- column_row_count(columns, options),
{:ok, batches} <- append_column_batches(columns, row_count, options) do
{query_id, state} = next_query_id(state)
options = Keyword.put(options, :client_query_id, query_id)
metadata = append_metadata(query, table, row_count, batches, options, state)
Telemetry.span(state.telemetry_prefix, :append, metadata, fn ->
append_started_at = System.monotonic_time()
{result, append_metrics} =
case append_column_batches(table, batches, options, state) do
{:ok, metrics} ->
result = append_result(row_count, state)
{{:ok, query, result, %{state | status: successful_status(state.status)}}, metrics}
{:error, error} ->
{{:error, annotate_error(error, query, state),
%{state | status: failed_status(state.status)}}, empty_append_metrics()}
end
append_duration = System.monotonic_time() - append_started_at
{result, append_stop_metadata(result, append_metrics, append_duration)}
end)
else
{:error, error} ->
{:error, annotate_error(error, query, state),
%{state | status: failed_status(state.status)}}
end
end
defp column_row_count(columns, _options) do
columns
|> Enum.map(fn
{_name, values} when is_list(values) ->
{:ok, length(values)}
{name, values} ->
{:error,
Error.new(
:invalid_append_column,
"column #{inspect(name)} values must be a list, got #{inspect(values)}",
source: :client
)}
value ->
{:error,
Error.new(:invalid_append_column, "invalid append column values #{inspect(value)}",
source: :client
)}
end)
|> collect_column_counts()
end
defp collect_column_counts(counts) do
Enum.reduce_while(counts, {:ok, []}, fn
{:ok, count}, {:ok, counts} -> {:cont, {:ok, [count | counts]}}
{:error, _error} = error, {:ok, _counts} -> {:halt, error}
end)
|> case do
{:ok, []} ->
{:error,
Error.new(
:missing_append_columns,
"cannot infer append row count from an empty column set",
source: :client
)}
{:ok, counts} ->
case Enum.uniq(counts) do
[row_count] ->
{:ok, row_count}
counts ->
{:error,
Error.new(
:invalid_vector_size,
"append columns have mismatched row counts #{inspect(counts)}",
source: :client
)}
end
{:error, _error} = error ->
error
end
end
defp append_column_batches(columns, 0, options) do
case Keyword.get(options, :batch_size, 1) do
batch_size when is_integer(batch_size) and batch_size >= 1 -> {:ok, [columns]}
_batch_size -> invalid_batch_size()
end
end
defp append_column_batches(columns, row_count, options) do
batch_size = Keyword.get(options, :batch_size, row_count)
cond do
not (is_integer(batch_size) and batch_size >= 1) ->
invalid_batch_size()
batch_size >= row_count ->
{:ok, [columns]}
true ->
{:ok, columns |> Enum.map(&chunk_column(&1, batch_size)) |> transpose_column_batches()}
end
end
defp chunk_column({name, values}, batch_size), do: {name, Enum.chunk_every(values, batch_size)}
defp transpose_column_batches(chunked_columns) do
names = Enum.map(chunked_columns, &elem(&1, 0))
chunked_columns
|> Enum.map(&elem(&1, 1))
|> Enum.zip()
|> Enum.map(fn chunks -> Enum.zip(names, Tuple.to_list(chunks)) end)
end
defp append_batches(table, batches, options, state) do
Enum.reduce_while(batches, {:ok, empty_append_metrics()}, fn rows, {:ok, metrics} ->
case append_batch(table, rows, options, state) do
{:ok, batch_metrics} -> {:cont, {:ok, merge_append_metrics(metrics, batch_metrics)}}
{:error, _error} = error -> {:halt, error}
end
end)
end
defp append_column_batches(table, batches, options, state) do
Enum.reduce_while(batches, {:ok, empty_append_metrics()}, fn columns, {:ok, metrics} ->
case append_column_batch(table, columns, options, state) do
{:ok, batch_metrics} -> {:cont, {:ok, merge_append_metrics(metrics, batch_metrics)}}
{:error, _error} = error -> {:halt, error}
end
end)
end
defp append_batch(table, rows, options, state) do
encode_started_at = System.monotonic_time()
with {:ok, chunk} <- DataChunk.from_rows(rows, options),
request = %AppendRequest{
schema_name: Keyword.get(options, :schema, ""),
table_name: to_string(table),
append_chunk: chunk
},
encoded <-
Codec.encode(request,
connection_id: state.connection_id,
client_query_id: Keyword.get(options, :client_query_id)
) do
encode_duration = System.monotonic_time() - encode_started_at
append_encoded(encoded, options, state, encode_duration)
end
end
defp normalize_append_response({_header, %ErrorResponse{message: message}}) do
{:error, Error.new(:server_error, message, source: :server)}
end
defp normalize_append_response({_header, %QuackDB.Protocol.Message.SuccessResponse{}}), do: :ok
defp normalize_append_response({header, _body}) do
{:error,
Error.new(:unexpected_message, "expected success response, got #{header.type}",
source: :protocol
)}
end
defp append_encoded(encoded, options, state, encode_duration) do
{transport_duration, transport_result} =
timed(fn -> state.transport.(state.uri, encoded, options) end)
with {:ok, response} <- transport_result do
{decode_duration, decode_result} = timed(fn -> Codec.decode(response) end)
with {:ok, decoded} <- decode_result,
:ok <- normalize_append_response(decoded) do
{:ok,
append_batch_metrics(
encode_duration,
transport_duration,
decode_duration,
encoded,
response
)}
end
end
end
defp append_batch_metrics(
encode_duration,
transport_duration,
decode_duration,
request,
response
) do
%{
batches: 1,
encode_duration: encode_duration,
transport_duration: transport_duration,
decode_duration: decode_duration,
request_bytes: IO.iodata_length(request),
response_bytes: byte_size(response)
}
end
defp empty_append_metrics do
%{
batches: 0,
encode_duration: 0,
transport_duration: 0,
decode_duration: 0,
request_bytes: 0,
response_bytes: 0
}
end
defp query_metrics(encode_duration, request) do
%{
encode_duration: encode_duration,
request_bytes: IO.iodata_length(request)
}
end
defp put_transport_metrics(metrics, transport_duration, response) do
metrics
|> Map.put(:transport_duration, transport_duration)
|> Map.put(:response_bytes, byte_size(response))
end
defp merge_append_metrics(left, right) do
Map.merge(left, right, fn _key, left_value, right_value -> left_value + right_value end)
end
defp timed(fun) do
started_at = System.monotonic_time()
result = fun.()
{System.monotonic_time() - started_at, result}
end
defp append_column_batch(table, columns, options, state) do
encode_started_at = System.monotonic_time()
with {:ok, chunk} <- DataChunk.from_columns(columns, options),
request = %AppendRequest{
schema_name: Keyword.get(options, :schema, ""),
table_name: to_string(table),
append_chunk: chunk
},
encoded <-
Codec.encode(request,
connection_id: state.connection_id,
client_query_id: Keyword.get(options, :client_query_id)
) do
encode_duration = System.monotonic_time() - encode_started_at
append_encoded(encoded, options, state, encode_duration)
end
end
defp append_result(rows, state) when is_list(rows), do: append_result(length(rows), state)
defp append_result(row_count, state) do
%Result{
command: :insert,
columns: [],
rows: nil,
num_rows: row_count,
connection_id: state.connection_id,
messages: [],
metadata: %{}
}
end
defp execute_statement(%Query{} = query, params, options, state) do
with {:ok, statement} <- QuackDB.SQL.format(query.statement, params) do
do_execute_statement(query, statement, params, options, state)
else
{:error, error} -> {:error, error, state}
end
end
defp do_execute_statement(%Query{} = query, statement, params, options, state) do
{query_id, state} = next_query_id(state)
options = Keyword.put(options, :client_query_id, query_id)
Telemetry.span(
state.telemetry_prefix,
:query,
query_metadata(query, params, options, state),
fn ->
{encode_duration, request} =
timed(fn ->
%PrepareRequest{sql_query: statement}
|> Codec.encode(connection_id: state.connection_id, client_query_id: query_id)
end)
metrics = query_metrics(encode_duration, request)
{result, metrics} =
execute_query_request(query, request, options, state, metrics)
{result, query_stop_metadata(result, metrics)}
end
)
end
defp execute_query_request(query, request, options, state, metrics) do
{transport_duration, transport_result} =
timed(fn -> state.transport.(state.uri, request, options) end)
case transport_result do
{:ok, response} ->
metrics = put_transport_metrics(metrics, transport_duration, response)
decode_query_response(query, response, options, state, metrics)
{:error, error} ->
result =
{:error, annotate_error(error, query, state),
%{state | status: failed_status(state.status)}}
{result, Map.put(metrics, :transport_duration, transport_duration)}
end
end
defp decode_query_response(query, response, options, state, metrics) do
{decode_duration, decode_result} = timed(fn -> Codec.decode(response) end)
metrics = Map.put(metrics, :decode_duration, decode_duration)
case decode_result do
{:ok, decoded} ->
normalize_query_result(query, decoded, options, state, metrics)
{:error, error} ->
result =
{:error, annotate_error(error, query, state),
%{state | status: failed_status(state.status)}}
{result, metrics}
end
end
defp normalize_query_result(query, decoded, options, state, metrics) do
{normalize_duration, normalize_result} =
timed(fn -> normalize_query_response(decoded, query, state, options) end)
metrics = Map.put(metrics, :normalize_duration, normalize_duration)
case normalize_result do
{:ok, query, result} ->
{{:ok, query, result, %{state | status: successful_status(state.status)}}, metrics}
{:error, error} ->
result =
{:error, annotate_error(error, query, state),
%{state | status: failed_status(state.status)}}
{result, metrics}
end
end
defp normalize_query_response(
{_header, %ErrorResponse{message: message}},
_query,
_state,
_options
) do
{:error, Error.new(:server_error, message, source: :server)}
end
defp normalize_query_response({_header, %PrepareResponse{} = response}, query, state, options) do
with {:ok, chunks} <- fetch_remaining_chunks(response, state, options) do
rows = materialize_rows(response.results ++ chunks, response.result_names)
query = %{
query
| columns: response.result_names,
result_types: response.result_types,
result_uuid: response.result_uuid
}
result =
%Result{
command: command(query.statement),
columns: response.result_names,
rows: rows,
num_rows: length(rows),
connection_id: state.connection_id,
messages: [],
metadata: %{
needs_more_fetch: response.needs_more_fetch,
result_uuid: response.result_uuid
}
}
|> Result.normalize()
{:ok, query, result}
end
end
defp normalize_query_response({header, _body}, _query, _state, _options) do
{:error,
Error.new(:unexpected_message, "expected prepare response, got #{header.type}",
source: :protocol
)}
end
defp fetch_remaining_chunks(%PrepareResponse{needs_more_fetch: false}, _state, _options),
do: {:ok, []}
defp fetch_remaining_chunks(%PrepareResponse{} = response, state, options) do
fetch_chunks(response.result_uuid, state, options, [])
end
defp fetch_chunks(result_uuid, state, options, chunks) do
Telemetry.span(
state.telemetry_prefix,
:fetch,
fetch_metadata(result_uuid, options, state),
fn ->
{encode_duration, request} =
timed(fn ->
Codec.encode(%FetchRequest{uuid: result_uuid}, connection_id: state.connection_id)
end)
metrics = query_metrics(encode_duration, request)
{result, metrics} =
execute_fetch_request(result_uuid, request, options, state, chunks, metrics)
{result, fetch_stop_metadata(result, metrics)}
end
)
end
defp execute_fetch_request(result_uuid, request, options, state, chunks, metrics) do
{transport_duration, transport_result} =
timed(fn -> state.transport.(state.uri, request, options) end)
case transport_result do
{:ok, response} ->
metrics = put_transport_metrics(metrics, transport_duration, response)
decode_fetch_response(result_uuid, response, options, state, chunks, metrics)
{:error, error} ->
{{:error, error}, Map.put(metrics, :transport_duration, transport_duration)}
end
end
defp decode_fetch_response(result_uuid, response, options, state, chunks, metrics) do
{decode_duration, decode_result} = timed(fn -> Codec.decode(response) end)
metrics = Map.put(metrics, :decode_duration, decode_duration)
case decode_result do
{:ok, decoded} ->
{normalize_duration, result} =
timed(fn -> normalize_fetch_response(decoded, result_uuid, state, options, chunks) end)
{result, Map.put(metrics, :normalize_duration, normalize_duration)}
{:error, error} ->
{{:error, error}, metrics}
end
end
defp normalize_fetch_response(
{_header, %FetchResponse{results: []}},
_result_uuid,
_state,
_options,
chunks
) do
{:ok, Enum.reverse(chunks)}
end
defp normalize_fetch_response(
{_header, %FetchResponse{} = response},
result_uuid,
state,
options,
chunks
) do
fetch_chunks(result_uuid, state, options, Enum.reverse(response.results, chunks))
end
defp normalize_fetch_response(
{_header, %ErrorResponse{message: message}},
_result_uuid,
_state,
_options,
_chunks
) do
{:error, Error.new(:server_error, message, source: :server)}
end
defp normalize_fetch_response({header, _body}, _result_uuid, _state, _options, _chunks) do
{:error,
Error.new(:unexpected_message, "expected fetch response, got #{header.type}",
source: :protocol
)}
end
defp handle_fetch_rows(cursor_state, cursor, options, state) do
cond do
cursor_state.queued_rows != [] ->
{rows, cursor_state} =
take_cursor_rows(cursor_state, Keyword.get(options, :max_rows, 500))
status = if cursor_state.done? and cursor_state.queued_rows == [], do: :halt, else: :cont
state = put_cursor_state(state, cursor.ref, cursor_state)
{status, cursor_result(cursor, rows), state}
cursor_state.done? ->
{:halt, cursor_result(cursor, []), state}
true ->
fetch_more_cursor_rows(cursor_state, cursor, options, state)
end
end
defp fetch_more_cursor_rows(cursor_state, cursor, options, state) do
with {:ok, cursor_state} <- fetch_cursor_state(cursor_state, cursor, options, state) do
state = put_cursor_state(state, cursor.ref, cursor_state)
handle_fetch(nil, cursor, options, state)
else
{:error, error} -> {:error, annotate_cursor_error(error, cursor), state}
end
end
defp handle_fetch_columnar(cursor_state, cursor, options, state) do
cond do
cursor_state.queued_chunks != [] ->
{chunks, cursor_state} =
take_cursor_chunks(cursor_state, Keyword.get(options, :max_rows, 500))
status =
if cursor_state.done? and cursor_state.queued_chunks == [], do: :halt, else: :cont
state = put_cursor_state(state, cursor.ref, cursor_state)
{status, cursor_result(cursor, nil, chunks), state}
cursor_state.done? ->
{:halt, cursor_result(cursor, nil, []), state}
true ->
fetch_more_cursor_rows(cursor_state, cursor, options, state)
end
end
defp fetch_cursor_state(cursor_state, cursor, options, state) do
request =
Codec.encode(%FetchRequest{uuid: cursor.result_uuid}, connection_id: state.connection_id)
with {:ok, response} <- state.transport.(state.uri, request, options),
{:ok, decoded} <- Codec.decode(response) do
update_cursor_from_fetch(decoded, cursor_state, cursor)
end
end
defp update_cursor_from_fetch({_header, %FetchResponse{results: []}}, cursor_state, _cursor) do
{:ok, %{cursor_state | done?: true}}
end
defp update_cursor_from_fetch(
{_header, %FetchResponse{} = response},
%{mode: :columnar} = cursor_state,
_cursor
) do
{:ok, %{cursor_state | queued_chunks: cursor_state.queued_chunks ++ response.results}}
end
defp update_cursor_from_fetch({_header, %FetchResponse{} = response}, cursor_state, cursor) do
rows = materialize_rows(response.results, cursor.columns)
{:ok, %{cursor_state | queued_rows: cursor_state.queued_rows ++ rows}}
end
defp update_cursor_from_fetch(
{_header, %ErrorResponse{message: message}},
_cursor_state,
_cursor
) do
{:error, Error.new(:server_error, message, source: :server)}
end
defp update_cursor_from_fetch({header, _body}, _cursor_state, _cursor) do
{:error,
Error.new(:unexpected_message, "expected fetch response, got #{header.type}",
source: :protocol
)}
end
defp put_cursor_state(state, ref, cursor_state) do
%{state | cursors: Map.put(state.cursors, ref, cursor_state)}
end
defp annotate_error(%Error{} = error, %Query{} = query, state) do
%Error{error | query: query.statement, connection_id: state.connection_id}
end
defp next_query_id(state) do
{state.next_query_id, %{state | next_query_id: state.next_query_id + 1}}
end
defp annotate_cursor_error(%Error{} = error, %QuackDB.Cursor{} = cursor) do
%Error{error | query: cursor.statement, connection_id: cursor.connection_id}
end
defp materialize_rows(chunks, columns) do
Enum.flat_map(chunks, &DataChunk.rows(&1, columns))
end
defp transaction_statement(statement, command, next_status, state) do
query = %Query{statement: statement}
case execute_statement(query, [], [], state) do
{:ok, _query, result, state} ->
{:ok, %{result | command: command}, %{state | status: next_status}}
{:error, error, state} ->
{:disconnect, error, state}
end
end
defp build_state(options) do
uri = Keyword.get(options, :uri, "http://localhost:9494")
with {:ok, uri} <- QuackDB.URI.normalize(uri) do
{:ok,
%__MODULE__{
uri: uri,
token: Keyword.get(options, :token, ""),
transport: Keyword.get(options, :transport),
transport_owner: nil,
transport_options: transport_options(options),
client_version: Keyword.get(options, :client_version, client_version()),
telemetry_prefix: Keyword.get(options, :telemetry_prefix, Telemetry.default_prefix())
}}
end
end
defp connect_quack(state) do
with {:ok, state} <- start_transport(state) do
do_connect_quack(state)
end
end
defp do_connect_quack(state) do
request =
%ConnectionRequest{
auth_string: state.token,
client_duckdb_version: state.client_version,
client_platform: client_platform()
}
|> Codec.encode()
with {:ok, response} <- state.transport.(state.uri, request, []),
{:ok, decoded} <- Codec.decode(response) do
normalize_connect_response(decoded, state)
end
end
defp start_transport(%{transport: nil, uri: uri} = state) do
case QuackDB.Transport.start_link(uri, state.transport_options) do
{:ok, owner} ->
{:ok,
%{state | transport: &QuackDB.Transport.post(owner, &1, &2, &3), transport_owner: owner}}
{:error, reason} ->
{:error, Error.new(:transport_error, inspect(reason), source: :transport)}
end
end
defp start_transport(%{transport: transport} = state) when is_function(transport, 3),
do: {:ok, state}
defp transport_options(options) do
Keyword.take(options, [:connect_timeout, :receive_timeout, :shutdown_timeout, :mint_options])
end
defp normalize_connect_response({header, %ConnectionResponse{} = response}, state) do
{:ok, %{state | connection_id: header.connection_id, server: response}}
end
defp normalize_connect_response({_header, %ErrorResponse{message: message}}, _state) do
{:error, Error.new(:server_error, message, source: :server)}
end
defp normalize_connect_response({header, _body}, _state) do
{:error,
Error.new(:unexpected_message, "expected connection response, got #{header.type}",
source: :protocol
)}
end
defp take_cursor_rows(cursor, max_rows) do
{rows, remaining} = Enum.split(cursor.queued_rows, max_rows)
{rows, %{cursor | queued_rows: remaining}}
end
defp take_cursor_chunks(cursor, max_rows) do
{chunks, remaining, row_count} = take_chunks(cursor.queued_chunks, max_rows, [], 0)
{chunks, cursor |> Map.put(:queued_chunks, remaining) |> Map.put(:row_count, row_count)}
end
defp take_chunks(chunks, remaining_rows, selected, row_count)
when remaining_rows <= 0 or chunks == [] do
{Enum.reverse(selected), chunks, row_count}
end
defp take_chunks([chunk | chunks], remaining_rows, selected, row_count) do
if chunk.row_count <= remaining_rows do
take_chunks(
chunks,
remaining_rows - chunk.row_count,
[chunk | selected],
row_count + chunk.row_count
)
else
{selected_chunk, remaining_chunk} = split_chunk(chunk, remaining_rows)
{Enum.reverse([selected_chunk | selected]), [remaining_chunk | chunks],
row_count + remaining_rows}
end
end
defp split_chunk(chunk, row_count) do
{selected_columns, remaining_columns} =
Enum.map_reduce(chunk.columns, [], fn column, remaining_columns ->
{selected_values, remaining_values} = Enum.split(column.values, row_count)
selected_column = %{column | values: selected_values}
remaining_column = %{column | values: remaining_values}
{selected_column, [remaining_column | remaining_columns]}
end)
selected = %{chunk | row_count: row_count, columns: selected_columns}
remaining = %{
chunk
| row_count: chunk.row_count - row_count,
columns: Enum.reverse(remaining_columns)
}
{selected, remaining}
end
defp cursor_result(cursor, rows) do
%Result{
command: :fetch,
columns: cursor.columns,
rows: rows,
num_rows: length(rows),
connection_id: cursor.connection_id,
messages: []
}
end
defp cursor_result(cursor, nil, chunks) do
%Result{
command: :fetch,
columns: cursor.columns,
rows: nil,
num_rows: Enum.reduce(chunks, 0, &(&2 + &1.row_count)),
connection_id: cursor.connection_id,
messages: [],
metadata: %{columnar_chunks: chunks}
}
end
defp empty_result(command) do
%Result{command: command, columns: nil, rows: nil, num_rows: 0, messages: []}
end
defp query_metadata(query, params, options, state) do
options
|> metadata_from_options(state)
|> Map.put(:query, query.statement)
|> maybe_put_params(params, options)
end
defp append_metadata(query, table, values, batches, options, state) when is_list(values) do
append_metadata(query, table, length(values), batches, options, state)
end
defp append_metadata(query, table, row_count, batches, options, state) do
options
|> metadata_from_options(state)
|> Map.merge(%{
query: query.statement,
table: to_string(table),
schema: Keyword.get(options, :schema, ""),
batch_size: Keyword.get(options, :batch_size),
batches: length(batches),
rows: row_count
})
end
defp fetch_metadata(result_uuid, options, state) do
options
|> metadata_from_options(state)
|> Map.put(:result_uuid, result_uuid)
end
defp metadata_from_options(options, state) do
%{
connection_id: state.connection_id,
client_query_id: Keyword.get(options, :client_query_id),
options: Keyword.get(options, :telemetry_options, [])
}
end
defp maybe_put_params(metadata, params, options) do
if Keyword.get(options, :telemetry_params, false) do
Map.put(metadata, :params, params)
else
metadata
end
end
defp result_stop_metadata({:ok, _query, %Result{} = result, _state}) do
%{command: result.command, rows: result.num_rows, result: :ok}
end
defp result_stop_metadata({:error, %Error{} = error, _state}) do
%{error: error, result: :error}
end
defp query_stop_metadata(result, metrics) do
result
|> result_stop_metadata()
|> Map.merge(metrics)
end
defp append_stop_metadata(result, metrics, duration) do
result
|> result_stop_metadata()
|> Map.merge(metrics)
|> Map.put(:append_duration, duration)
|> maybe_put_rows_per_second(result, duration)
end
defp maybe_put_rows_per_second(
metadata,
{:ok, _query, %Result{num_rows: rows}, _state},
duration
)
when is_integer(rows) and rows >= 0 and duration > 0 do
seconds = System.convert_time_unit(duration, :native, :microsecond) / 1_000_000
Map.put(metadata, :rows_per_second, rows / seconds)
end
defp maybe_put_rows_per_second(metadata, _result, _duration), do: metadata
defp fetch_stop_metadata({:ok, chunks}, metrics) do
%{chunks: length(chunks), result: :ok}
|> Map.merge(metrics)
end
defp fetch_stop_metadata({:error, %Error{} = error}, metrics) do
%{error: error, result: :error}
|> Map.merge(metrics)
end
defp command(statement) do
statement
|> IO.iodata_to_binary()
|> first_sql_word()
|> String.downcase()
|> sql_command_atom()
end
defp sql_command_atom(word) do
Map.get(@sql_command_map, word) || Map.get(@sql_command_aliases, word, :unknown)
end
defp first_sql_word(statement) do
statement
|> skip_leading_whitespace()
|> take_until_whitespace()
end
defp skip_leading_whitespace(<<char, rest::binary>>) when char in [?\s, ?\t, ?\n, ?\r, ?\f] do
skip_leading_whitespace(rest)
end
defp skip_leading_whitespace(rest), do: rest
defp take_until_whitespace(statement), do: take_until_whitespace(statement, [])
defp take_until_whitespace(<<>>, acc), do: acc |> Enum.reverse() |> IO.iodata_to_binary()
defp take_until_whitespace(<<char, _rest::binary>>, acc)
when char in [?\s, ?\t, ?\n, ?\r, ?\f] do
acc |> Enum.reverse() |> IO.iodata_to_binary()
end
defp take_until_whitespace(<<char, rest::binary>>, acc) do
take_until_whitespace(rest, [char | acc])
end
defp successful_status(:error), do: :error
defp successful_status(status), do: status
defp failed_status(:transaction), do: :error
defp failed_status(status), do: status
defp client_version do
case Application.spec(:quackdb, :vsn) do
nil -> "quackdb/dev"
version -> "quackdb/#{version}"
end
end
defp client_platform do
:system_architecture
|> :erlang.system_info()
|> List.to_string()
end
end