Skip to main content

lib/quack_db/db_connection.ex

defmodule QuackDB.DBConnection do
  @moduledoc """
  DBConnection implementation for the remote DuckDB Quack protocol.

  This module owns connection lifecycle, request execution, cursor-based streaming, transaction callbacks, and result normalization. HTTP transport and binary protocol encoding are delegated so the codec remains independent from DBConnection.
  """

  use DBConnection

  alias QuackDB.Error
  alias QuackDB.Protocol.Codec
  alias QuackDB.Protocol.DataChunk
  alias QuackDB.Protocol.Message.ConnectionRequest
  alias QuackDB.Protocol.Message.ConnectionResponse
  alias QuackDB.Protocol.Message.Disconnect
  alias QuackDB.Protocol.Message.ErrorResponse
  alias QuackDB.Protocol.Message.AppendRequest
  alias QuackDB.Protocol.Message.FetchRequest
  alias QuackDB.Protocol.Message.FetchResponse
  alias QuackDB.Protocol.Message.PrepareRequest
  alias QuackDB.Protocol.Message.PrepareResponse
  alias QuackDB.Query
  alias QuackDB.Result
  alias QuackDB.Telemetry

  @disconnect_timeout 1_000
  @sql_command_atoms ~w(
    alter analyze attach begin call checkpoint commit copy create delete detach drop explain insert
    install load pragma rollback select set summarize truncate update use vacuum
  )a
  @sql_command_map Map.new(@sql_command_atoms, &{Atom.to_string(&1), &1})
  @sql_command_aliases %{"from" => :select, "values" => :select}

  defstruct [
    :uri,
    :token,
    :connection_id,
    :server,
    :transport,
    :transport_owner,
    :transport_options,
    :client_version,
    :telemetry_prefix,
    next_query_id: 1,
    status: :idle,
    cursors: %{}
  ]

  @type state :: %__MODULE__{
          uri: URI.t(),
          token: String.t(),
          connection_id: String.t() | nil,
          server: ConnectionResponse.t() | nil,
          transport: function(),
          transport_owner: pid() | nil,
          transport_options: keyword(),
          client_version: String.t(),
          telemetry_prefix: [atom()],
          next_query_id: pos_integer(),
          status: DBConnection.status(),
          cursors: map()
        }

  @spec start_link(Keyword.t()) :: GenServer.on_start()
  def start_link(options) do
    DBConnection.start_link(__MODULE__, options)
  end

  @spec child_spec(Keyword.t()) :: Supervisor.child_spec()
  def child_spec(options) do
    DBConnection.child_spec(__MODULE__, options)
  end

  @impl true
  def connect(options) do
    with {:ok, state} <- build_state(options),
         {:ok, state} <- connect_quack(state) do
      {:ok, state}
    end
  end

  @impl true
  def checkout(state), do: {:ok, state}

  @impl true
  def ping(state) do
    case execute_statement(%Query{statement: "SELECT 1"}, [], [], state) do
      {:ok, _query, _result, state} -> {:ok, state}
      {:error, error, state} -> {:disconnect, error, state}
    end
  end

  @impl true
  def handle_prepare(%Query{operation: nil} = query, _options, state) do
    {:ok, %{query | statement: IO.iodata_to_binary(query.statement)}, state}
  end

  def handle_prepare(%Query{} = query, _options, state), do: {:ok, query, state}

  @impl true
  def handle_execute(
        %Query{operation: {:insert_rows, table, rows, insert_options}} = query,
        params,
        options,
        state
      ) do
    if params == [] do
      append_rows(query, table, rows, Keyword.merge(insert_options, options), state)
    else
      {:error,
       Error.new(:unsupported_params, "append queries do not accept params", source: :client),
       state}
    end
  end

  def handle_execute(
        %Query{operation: {:insert_columns, table, columns, insert_options}} = query,
        params,
        options,
        state
      ) do
    if params == [] do
      append_columns(query, table, columns, Keyword.merge(insert_options, options), state)
    else
      {:error,
       Error.new(:unsupported_params, "append queries do not accept params", source: :client),
       state}
    end
  end

  def handle_execute(%Query{} = query, params, options, state) do
    execute_statement(query, params, options, state)
  end

  @impl true
  def handle_close(_query, _options, state) do
    {:ok, empty_result(:close), state}
  end

  @impl true
  def handle_begin(_options, %{status: :idle} = state) do
    transaction_statement("BEGIN", :begin, :transaction, state)
  end

  def handle_begin(_options, state), do: {state.status, state}

  @impl true
  def handle_commit(_options, %{status: :transaction} = state) do
    transaction_statement("COMMIT", :commit, :idle, state)
  end

  def handle_commit(_options, state), do: {state.status, state}

  @impl true
  def handle_rollback(_options, %{status: status} = state)
      when status in [:transaction, :error] do
    transaction_statement("ROLLBACK", :rollback, :idle, state)
  end

  def handle_rollback(_options, state), do: {state.status, state}

  @impl true
  def handle_status(_options, state), do: {state.status, state}

  @impl true
  def handle_declare(%Query{} = query, params, options, state) do
    declare_query(query, params, options, state)
  end

  @impl true
  def handle_fetch(_query, %QuackDB.Cursor{} = cursor, options, state) do
    cursor_state = Map.fetch!(state.cursors, cursor.ref)

    if cursor_state.mode == :columnar do
      handle_fetch_columnar(cursor_state, cursor, options, state)
    else
      handle_fetch_rows(cursor_state, cursor, options, state)
    end
  end

  @impl true
  def handle_deallocate(_query, %QuackDB.Cursor{} = cursor, _options, state) do
    {:ok, cursor_result(cursor, []), %{state | cursors: Map.delete(state.cursors, cursor.ref)}}
  end

  @impl true
  def disconnect(_error, %{connection_id: nil}), do: :ok

  def disconnect(_error, state) do
    request = Codec.encode(%Disconnect{}, connection_id: state.connection_id)
    _ignored = state.transport.(state.uri, request, timeout: @disconnect_timeout)

    if state.transport_owner do
      GenServer.stop(state.transport_owner, :normal, @disconnect_timeout)
    end

    :ok
  end

  defp declare_query(%Query{} = query, params, options, state) do
    with {:ok, statement} <- QuackDB.SQL.format(query.statement, params) do
      do_declare_query(query, statement, options, state)
    else
      {:error, error} -> {:error, error, state}
    end
  end

  defp do_declare_query(%Query{} = query, statement, options, state) do
    {query_id, state} = next_query_id(state)
    options = Keyword.put(options, :client_query_id, query_id)

    request =
      %PrepareRequest{sql_query: statement}
      |> Codec.encode(connection_id: state.connection_id, client_query_id: query_id)

    with {:ok, response} <- state.transport.(state.uri, request, options),
         {:ok, decoded} <- Codec.decode(response),
         {:ok, query, cursor, cursor_state} <-
           normalize_declare_response(decoded, query, options, state) do
      state = put_cursor_state(state, cursor.ref, cursor_state)
      {:ok, query, cursor, state}
    else
      {:error, error} ->
        {:error, annotate_error(error, query, state),
         %{state | status: failed_status(state.status)}}
    end
  end

  defp normalize_declare_response(
         {_header, %ErrorResponse{message: message}},
         _query,
         _options,
         _state
       ) do
    {:error, Error.new(:server_error, message, source: :server)}
  end

  defp normalize_declare_response({_header, %PrepareResponse{} = response}, query, options, state) do
    ref = make_ref()

    query = %{
      query
      | columns: response.result_names,
        result_types: response.result_types,
        result_uuid: response.result_uuid
    }

    cursor = %QuackDB.Cursor{
      ref: ref,
      result_uuid: response.result_uuid,
      columns: response.result_names,
      result_types: response.result_types,
      connection_id: state.connection_id,
      statement: query.statement
    }

    cursor_state =
      if Keyword.get(options, :result_format) == :columnar do
        %{
          mode: :columnar,
          queued_chunks: response.results,
          done?: not response.needs_more_fetch
        }
      else
        %{
          mode: :rows,
          queued_rows: materialize_rows(response.results, response.result_names),
          done?: not response.needs_more_fetch
        }
      end

    {:ok, query, cursor, cursor_state}
  end

  defp normalize_declare_response({header, _body}, _query, _options, _state) do
    {:error,
     Error.new(:unexpected_message, "expected prepare response, got #{header.type}",
       source: :protocol
     )}
  end

  defp append_rows(%Query{} = query, table, rows, options, state) do
    with {:ok, columns} <- DataChunk.columns_from_rows(rows, options),
         {:ok, batches} <- append_batches(rows, options) do
      options = Keyword.put(options, :columns, columns)
      {query_id, state} = next_query_id(state)
      options = Keyword.put(options, :client_query_id, query_id)
      metadata = append_metadata(query, table, rows, batches, options, state)

      Telemetry.span(state.telemetry_prefix, :append, metadata, fn ->
        append_started_at = System.monotonic_time()

        {result, append_metrics} =
          case append_batches(table, batches, options, state) do
            {:ok, metrics} ->
              result = append_result(rows, state)

              {{:ok, query, result, %{state | status: successful_status(state.status)}}, metrics}

            {:error, error} ->
              {{:error, annotate_error(error, query, state),
                %{state | status: failed_status(state.status)}}, empty_append_metrics()}
          end

        append_duration = System.monotonic_time() - append_started_at
        {result, append_stop_metadata(result, append_metrics, append_duration)}
      end)
    else
      {:error, error} ->
        {:error, annotate_error(error, query, state),
         %{state | status: failed_status(state.status)}}
    end
  end

  defp append_batches([], options) do
    case Keyword.get(options, :batch_size, 1) do
      batch_size when is_integer(batch_size) and batch_size >= 1 -> {:ok, [[]]}
      _batch_size -> invalid_batch_size()
    end
  end

  defp append_batches(rows, options) do
    row_count = length(rows)
    batch_size = Keyword.get(options, :batch_size, row_count)

    cond do
      not (is_integer(batch_size) and batch_size >= 1) -> invalid_batch_size()
      batch_size >= row_count -> {:ok, [rows]}
      true -> {:ok, Enum.chunk_every(rows, batch_size)}
    end
  end

  defp invalid_batch_size do
    {:error,
     Error.new(:invalid_batch_size, "append batch_size must be a positive integer",
       source: :client
     )}
  end

  defp append_columns(%Query{} = query, table, columns, options, state) do
    with {:ok, row_count} <- column_row_count(columns, options),
         {:ok, batches} <- append_column_batches(columns, row_count, options) do
      {query_id, state} = next_query_id(state)
      options = Keyword.put(options, :client_query_id, query_id)
      metadata = append_metadata(query, table, row_count, batches, options, state)

      Telemetry.span(state.telemetry_prefix, :append, metadata, fn ->
        append_started_at = System.monotonic_time()

        {result, append_metrics} =
          case append_column_batches(table, batches, options, state) do
            {:ok, metrics} ->
              result = append_result(row_count, state)

              {{:ok, query, result, %{state | status: successful_status(state.status)}}, metrics}

            {:error, error} ->
              {{:error, annotate_error(error, query, state),
                %{state | status: failed_status(state.status)}}, empty_append_metrics()}
          end

        append_duration = System.monotonic_time() - append_started_at
        {result, append_stop_metadata(result, append_metrics, append_duration)}
      end)
    else
      {:error, error} ->
        {:error, annotate_error(error, query, state),
         %{state | status: failed_status(state.status)}}
    end
  end

  defp column_row_count(columns, _options) do
    columns
    |> Enum.map(fn
      {_name, values} when is_list(values) ->
        {:ok, length(values)}

      {name, values} ->
        {:error,
         Error.new(
           :invalid_append_column,
           "column #{inspect(name)} values must be a list, got #{inspect(values)}",
           source: :client
         )}

      value ->
        {:error,
         Error.new(:invalid_append_column, "invalid append column values #{inspect(value)}",
           source: :client
         )}
    end)
    |> collect_column_counts()
  end

  defp collect_column_counts(counts) do
    Enum.reduce_while(counts, {:ok, []}, fn
      {:ok, count}, {:ok, counts} -> {:cont, {:ok, [count | counts]}}
      {:error, _error} = error, {:ok, _counts} -> {:halt, error}
    end)
    |> case do
      {:ok, []} ->
        {:error,
         Error.new(
           :missing_append_columns,
           "cannot infer append row count from an empty column set",
           source: :client
         )}

      {:ok, counts} ->
        case Enum.uniq(counts) do
          [row_count] ->
            {:ok, row_count}

          counts ->
            {:error,
             Error.new(
               :invalid_vector_size,
               "append columns have mismatched row counts #{inspect(counts)}",
               source: :client
             )}
        end

      {:error, _error} = error ->
        error
    end
  end

  defp append_column_batches(columns, 0, options) do
    case Keyword.get(options, :batch_size, 1) do
      batch_size when is_integer(batch_size) and batch_size >= 1 -> {:ok, [columns]}
      _batch_size -> invalid_batch_size()
    end
  end

  defp append_column_batches(columns, row_count, options) do
    batch_size = Keyword.get(options, :batch_size, row_count)

    cond do
      not (is_integer(batch_size) and batch_size >= 1) ->
        invalid_batch_size()

      batch_size >= row_count ->
        {:ok, [columns]}

      true ->
        {:ok, columns |> Enum.map(&chunk_column(&1, batch_size)) |> transpose_column_batches()}
    end
  end

  defp chunk_column({name, values}, batch_size), do: {name, Enum.chunk_every(values, batch_size)}

  defp transpose_column_batches(chunked_columns) do
    names = Enum.map(chunked_columns, &elem(&1, 0))

    chunked_columns
    |> Enum.map(&elem(&1, 1))
    |> Enum.zip()
    |> Enum.map(fn chunks -> Enum.zip(names, Tuple.to_list(chunks)) end)
  end

  defp append_batches(table, batches, options, state) do
    Enum.reduce_while(batches, {:ok, empty_append_metrics()}, fn rows, {:ok, metrics} ->
      case append_batch(table, rows, options, state) do
        {:ok, batch_metrics} -> {:cont, {:ok, merge_append_metrics(metrics, batch_metrics)}}
        {:error, _error} = error -> {:halt, error}
      end
    end)
  end

  defp append_column_batches(table, batches, options, state) do
    Enum.reduce_while(batches, {:ok, empty_append_metrics()}, fn columns, {:ok, metrics} ->
      case append_column_batch(table, columns, options, state) do
        {:ok, batch_metrics} -> {:cont, {:ok, merge_append_metrics(metrics, batch_metrics)}}
        {:error, _error} = error -> {:halt, error}
      end
    end)
  end

  defp append_batch(table, rows, options, state) do
    encode_started_at = System.monotonic_time()

    with {:ok, chunk} <- DataChunk.from_rows(rows, options),
         request = %AppendRequest{
           schema_name: Keyword.get(options, :schema, ""),
           table_name: to_string(table),
           append_chunk: chunk
         },
         encoded <-
           Codec.encode(request,
             connection_id: state.connection_id,
             client_query_id: Keyword.get(options, :client_query_id)
           ) do
      encode_duration = System.monotonic_time() - encode_started_at
      append_encoded(encoded, options, state, encode_duration)
    end
  end

  defp normalize_append_response({_header, %ErrorResponse{message: message}}) do
    {:error, Error.new(:server_error, message, source: :server)}
  end

  defp normalize_append_response({_header, %QuackDB.Protocol.Message.SuccessResponse{}}), do: :ok

  defp normalize_append_response({header, _body}) do
    {:error,
     Error.new(:unexpected_message, "expected success response, got #{header.type}",
       source: :protocol
     )}
  end

  defp append_encoded(encoded, options, state, encode_duration) do
    {transport_duration, transport_result} =
      timed(fn -> state.transport.(state.uri, encoded, options) end)

    with {:ok, response} <- transport_result do
      {decode_duration, decode_result} = timed(fn -> Codec.decode(response) end)

      with {:ok, decoded} <- decode_result,
           :ok <- normalize_append_response(decoded) do
        {:ok,
         append_batch_metrics(
           encode_duration,
           transport_duration,
           decode_duration,
           encoded,
           response
         )}
      end
    end
  end

  defp append_batch_metrics(
         encode_duration,
         transport_duration,
         decode_duration,
         request,
         response
       ) do
    %{
      batches: 1,
      encode_duration: encode_duration,
      transport_duration: transport_duration,
      decode_duration: decode_duration,
      request_bytes: IO.iodata_length(request),
      response_bytes: byte_size(response)
    }
  end

  defp empty_append_metrics do
    %{
      batches: 0,
      encode_duration: 0,
      transport_duration: 0,
      decode_duration: 0,
      request_bytes: 0,
      response_bytes: 0
    }
  end

  defp query_metrics(encode_duration, request) do
    %{
      encode_duration: encode_duration,
      request_bytes: IO.iodata_length(request)
    }
  end

  defp put_transport_metrics(metrics, transport_duration, response) do
    metrics
    |> Map.put(:transport_duration, transport_duration)
    |> Map.put(:response_bytes, byte_size(response))
  end

  defp merge_append_metrics(left, right) do
    Map.merge(left, right, fn _key, left_value, right_value -> left_value + right_value end)
  end

  defp timed(fun) do
    started_at = System.monotonic_time()
    result = fun.()
    {System.monotonic_time() - started_at, result}
  end

  defp append_column_batch(table, columns, options, state) do
    encode_started_at = System.monotonic_time()

    with {:ok, chunk} <- DataChunk.from_columns(columns, options),
         request = %AppendRequest{
           schema_name: Keyword.get(options, :schema, ""),
           table_name: to_string(table),
           append_chunk: chunk
         },
         encoded <-
           Codec.encode(request,
             connection_id: state.connection_id,
             client_query_id: Keyword.get(options, :client_query_id)
           ) do
      encode_duration = System.monotonic_time() - encode_started_at
      append_encoded(encoded, options, state, encode_duration)
    end
  end

  defp append_result(rows, state) when is_list(rows), do: append_result(length(rows), state)

  defp append_result(row_count, state) do
    %Result{
      command: :insert,
      columns: [],
      rows: nil,
      num_rows: row_count,
      connection_id: state.connection_id,
      messages: [],
      metadata: %{}
    }
  end

  defp execute_statement(%Query{} = query, params, options, state) do
    with {:ok, statement} <- QuackDB.SQL.format(query.statement, params) do
      do_execute_statement(query, statement, params, options, state)
    else
      {:error, error} -> {:error, error, state}
    end
  end

  defp do_execute_statement(%Query{} = query, statement, params, options, state) do
    {query_id, state} = next_query_id(state)
    options = Keyword.put(options, :client_query_id, query_id)

    Telemetry.span(
      state.telemetry_prefix,
      :query,
      query_metadata(query, params, options, state),
      fn ->
        {encode_duration, request} =
          timed(fn ->
            %PrepareRequest{sql_query: statement}
            |> Codec.encode(connection_id: state.connection_id, client_query_id: query_id)
          end)

        metrics = query_metrics(encode_duration, request)

        {result, metrics} =
          execute_query_request(query, request, options, state, metrics)

        {result, query_stop_metadata(result, metrics)}
      end
    )
  end

  defp execute_query_request(query, request, options, state, metrics) do
    {transport_duration, transport_result} =
      timed(fn -> state.transport.(state.uri, request, options) end)

    case transport_result do
      {:ok, response} ->
        metrics = put_transport_metrics(metrics, transport_duration, response)
        decode_query_response(query, response, options, state, metrics)

      {:error, error} ->
        result =
          {:error, annotate_error(error, query, state),
           %{state | status: failed_status(state.status)}}

        {result, Map.put(metrics, :transport_duration, transport_duration)}
    end
  end

  defp decode_query_response(query, response, options, state, metrics) do
    {decode_duration, decode_result} = timed(fn -> Codec.decode(response) end)
    metrics = Map.put(metrics, :decode_duration, decode_duration)

    case decode_result do
      {:ok, decoded} ->
        normalize_query_result(query, decoded, options, state, metrics)

      {:error, error} ->
        result =
          {:error, annotate_error(error, query, state),
           %{state | status: failed_status(state.status)}}

        {result, metrics}
    end
  end

  defp normalize_query_result(query, decoded, options, state, metrics) do
    {normalize_duration, normalize_result} =
      timed(fn -> normalize_query_response(decoded, query, state, options) end)

    metrics = Map.put(metrics, :normalize_duration, normalize_duration)

    case normalize_result do
      {:ok, query, result} ->
        {{:ok, query, result, %{state | status: successful_status(state.status)}}, metrics}

      {:error, error} ->
        result =
          {:error, annotate_error(error, query, state),
           %{state | status: failed_status(state.status)}}

        {result, metrics}
    end
  end

  defp normalize_query_response(
         {_header, %ErrorResponse{message: message}},
         _query,
         _state,
         _options
       ) do
    {:error, Error.new(:server_error, message, source: :server)}
  end

  defp normalize_query_response({_header, %PrepareResponse{} = response}, query, state, options) do
    with {:ok, chunks} <- fetch_remaining_chunks(response, state, options) do
      rows = materialize_rows(response.results ++ chunks, response.result_names)

      query = %{
        query
        | columns: response.result_names,
          result_types: response.result_types,
          result_uuid: response.result_uuid
      }

      result =
        %Result{
          command: command(query.statement),
          columns: response.result_names,
          rows: rows,
          num_rows: length(rows),
          connection_id: state.connection_id,
          messages: [],
          metadata: %{
            needs_more_fetch: response.needs_more_fetch,
            result_uuid: response.result_uuid
          }
        }
        |> Result.normalize()

      {:ok, query, result}
    end
  end

  defp normalize_query_response({header, _body}, _query, _state, _options) do
    {:error,
     Error.new(:unexpected_message, "expected prepare response, got #{header.type}",
       source: :protocol
     )}
  end

  defp fetch_remaining_chunks(%PrepareResponse{needs_more_fetch: false}, _state, _options),
    do: {:ok, []}

  defp fetch_remaining_chunks(%PrepareResponse{} = response, state, options) do
    fetch_chunks(response.result_uuid, state, options, [])
  end

  defp fetch_chunks(result_uuid, state, options, chunks) do
    Telemetry.span(
      state.telemetry_prefix,
      :fetch,
      fetch_metadata(result_uuid, options, state),
      fn ->
        {encode_duration, request} =
          timed(fn ->
            Codec.encode(%FetchRequest{uuid: result_uuid}, connection_id: state.connection_id)
          end)

        metrics = query_metrics(encode_duration, request)

        {result, metrics} =
          execute_fetch_request(result_uuid, request, options, state, chunks, metrics)

        {result, fetch_stop_metadata(result, metrics)}
      end
    )
  end

  defp execute_fetch_request(result_uuid, request, options, state, chunks, metrics) do
    {transport_duration, transport_result} =
      timed(fn -> state.transport.(state.uri, request, options) end)

    case transport_result do
      {:ok, response} ->
        metrics = put_transport_metrics(metrics, transport_duration, response)
        decode_fetch_response(result_uuid, response, options, state, chunks, metrics)

      {:error, error} ->
        {{:error, error}, Map.put(metrics, :transport_duration, transport_duration)}
    end
  end

  defp decode_fetch_response(result_uuid, response, options, state, chunks, metrics) do
    {decode_duration, decode_result} = timed(fn -> Codec.decode(response) end)
    metrics = Map.put(metrics, :decode_duration, decode_duration)

    case decode_result do
      {:ok, decoded} ->
        {normalize_duration, result} =
          timed(fn -> normalize_fetch_response(decoded, result_uuid, state, options, chunks) end)

        {result, Map.put(metrics, :normalize_duration, normalize_duration)}

      {:error, error} ->
        {{:error, error}, metrics}
    end
  end

  defp normalize_fetch_response(
         {_header, %FetchResponse{results: []}},
         _result_uuid,
         _state,
         _options,
         chunks
       ) do
    {:ok, Enum.reverse(chunks)}
  end

  defp normalize_fetch_response(
         {_header, %FetchResponse{} = response},
         result_uuid,
         state,
         options,
         chunks
       ) do
    fetch_chunks(result_uuid, state, options, Enum.reverse(response.results, chunks))
  end

  defp normalize_fetch_response(
         {_header, %ErrorResponse{message: message}},
         _result_uuid,
         _state,
         _options,
         _chunks
       ) do
    {:error, Error.new(:server_error, message, source: :server)}
  end

  defp normalize_fetch_response({header, _body}, _result_uuid, _state, _options, _chunks) do
    {:error,
     Error.new(:unexpected_message, "expected fetch response, got #{header.type}",
       source: :protocol
     )}
  end

  defp handle_fetch_rows(cursor_state, cursor, options, state) do
    cond do
      cursor_state.queued_rows != [] ->
        {rows, cursor_state} =
          take_cursor_rows(cursor_state, Keyword.get(options, :max_rows, 500))

        status = if cursor_state.done? and cursor_state.queued_rows == [], do: :halt, else: :cont
        state = put_cursor_state(state, cursor.ref, cursor_state)
        {status, cursor_result(cursor, rows), state}

      cursor_state.done? ->
        {:halt, cursor_result(cursor, []), state}

      true ->
        fetch_more_cursor_rows(cursor_state, cursor, options, state)
    end
  end

  defp fetch_more_cursor_rows(cursor_state, cursor, options, state) do
    with {:ok, cursor_state} <- fetch_cursor_state(cursor_state, cursor, options, state) do
      state = put_cursor_state(state, cursor.ref, cursor_state)
      handle_fetch(nil, cursor, options, state)
    else
      {:error, error} -> {:error, annotate_cursor_error(error, cursor), state}
    end
  end

  defp handle_fetch_columnar(cursor_state, cursor, options, state) do
    cond do
      cursor_state.queued_chunks != [] ->
        {chunks, cursor_state} =
          take_cursor_chunks(cursor_state, Keyword.get(options, :max_rows, 500))

        status =
          if cursor_state.done? and cursor_state.queued_chunks == [], do: :halt, else: :cont

        state = put_cursor_state(state, cursor.ref, cursor_state)
        {status, cursor_result(cursor, nil, chunks), state}

      cursor_state.done? ->
        {:halt, cursor_result(cursor, nil, []), state}

      true ->
        fetch_more_cursor_rows(cursor_state, cursor, options, state)
    end
  end

  defp fetch_cursor_state(cursor_state, cursor, options, state) do
    request =
      Codec.encode(%FetchRequest{uuid: cursor.result_uuid}, connection_id: state.connection_id)

    with {:ok, response} <- state.transport.(state.uri, request, options),
         {:ok, decoded} <- Codec.decode(response) do
      update_cursor_from_fetch(decoded, cursor_state, cursor)
    end
  end

  defp update_cursor_from_fetch({_header, %FetchResponse{results: []}}, cursor_state, _cursor) do
    {:ok, %{cursor_state | done?: true}}
  end

  defp update_cursor_from_fetch(
         {_header, %FetchResponse{} = response},
         %{mode: :columnar} = cursor_state,
         _cursor
       ) do
    {:ok, %{cursor_state | queued_chunks: cursor_state.queued_chunks ++ response.results}}
  end

  defp update_cursor_from_fetch({_header, %FetchResponse{} = response}, cursor_state, cursor) do
    rows = materialize_rows(response.results, cursor.columns)
    {:ok, %{cursor_state | queued_rows: cursor_state.queued_rows ++ rows}}
  end

  defp update_cursor_from_fetch(
         {_header, %ErrorResponse{message: message}},
         _cursor_state,
         _cursor
       ) do
    {:error, Error.new(:server_error, message, source: :server)}
  end

  defp update_cursor_from_fetch({header, _body}, _cursor_state, _cursor) do
    {:error,
     Error.new(:unexpected_message, "expected fetch response, got #{header.type}",
       source: :protocol
     )}
  end

  defp put_cursor_state(state, ref, cursor_state) do
    %{state | cursors: Map.put(state.cursors, ref, cursor_state)}
  end

  defp annotate_error(%Error{} = error, %Query{} = query, state) do
    %Error{error | query: query.statement, connection_id: state.connection_id}
  end

  defp next_query_id(state) do
    {state.next_query_id, %{state | next_query_id: state.next_query_id + 1}}
  end

  defp annotate_cursor_error(%Error{} = error, %QuackDB.Cursor{} = cursor) do
    %Error{error | query: cursor.statement, connection_id: cursor.connection_id}
  end

  defp materialize_rows(chunks, columns) do
    Enum.flat_map(chunks, &DataChunk.rows(&1, columns))
  end

  defp transaction_statement(statement, command, next_status, state) do
    query = %Query{statement: statement}

    case execute_statement(query, [], [], state) do
      {:ok, _query, result, state} ->
        {:ok, %{result | command: command}, %{state | status: next_status}}

      {:error, error, state} ->
        {:disconnect, error, state}
    end
  end

  defp build_state(options) do
    uri = Keyword.get(options, :uri, "http://localhost:9494")

    with {:ok, uri} <- QuackDB.URI.normalize(uri) do
      {:ok,
       %__MODULE__{
         uri: uri,
         token: Keyword.get(options, :token, ""),
         transport: Keyword.get(options, :transport),
         transport_owner: nil,
         transport_options: transport_options(options),
         client_version: Keyword.get(options, :client_version, client_version()),
         telemetry_prefix: Keyword.get(options, :telemetry_prefix, Telemetry.default_prefix())
       }}
    end
  end

  defp connect_quack(state) do
    with {:ok, state} <- start_transport(state) do
      do_connect_quack(state)
    end
  end

  defp do_connect_quack(state) do
    request =
      %ConnectionRequest{
        auth_string: state.token,
        client_duckdb_version: state.client_version,
        client_platform: client_platform()
      }
      |> Codec.encode()

    with {:ok, response} <- state.transport.(state.uri, request, []),
         {:ok, decoded} <- Codec.decode(response) do
      normalize_connect_response(decoded, state)
    end
  end

  defp start_transport(%{transport: nil, uri: uri} = state) do
    case QuackDB.Transport.start_link(uri, state.transport_options) do
      {:ok, owner} ->
        {:ok,
         %{state | transport: &QuackDB.Transport.post(owner, &1, &2, &3), transport_owner: owner}}

      {:error, reason} ->
        {:error, Error.new(:transport_error, inspect(reason), source: :transport)}
    end
  end

  defp start_transport(%{transport: transport} = state) when is_function(transport, 3),
    do: {:ok, state}

  defp transport_options(options) do
    Keyword.take(options, [:connect_timeout, :receive_timeout, :shutdown_timeout, :mint_options])
  end

  defp normalize_connect_response({header, %ConnectionResponse{} = response}, state) do
    {:ok, %{state | connection_id: header.connection_id, server: response}}
  end

  defp normalize_connect_response({_header, %ErrorResponse{message: message}}, _state) do
    {:error, Error.new(:server_error, message, source: :server)}
  end

  defp normalize_connect_response({header, _body}, _state) do
    {:error,
     Error.new(:unexpected_message, "expected connection response, got #{header.type}",
       source: :protocol
     )}
  end

  defp take_cursor_rows(cursor, max_rows) do
    {rows, remaining} = Enum.split(cursor.queued_rows, max_rows)
    {rows, %{cursor | queued_rows: remaining}}
  end

  defp take_cursor_chunks(cursor, max_rows) do
    {chunks, remaining, row_count} = take_chunks(cursor.queued_chunks, max_rows, [], 0)
    {chunks, cursor |> Map.put(:queued_chunks, remaining) |> Map.put(:row_count, row_count)}
  end

  defp take_chunks(chunks, remaining_rows, selected, row_count)
       when remaining_rows <= 0 or chunks == [] do
    {Enum.reverse(selected), chunks, row_count}
  end

  defp take_chunks([chunk | chunks], remaining_rows, selected, row_count) do
    if chunk.row_count <= remaining_rows do
      take_chunks(
        chunks,
        remaining_rows - chunk.row_count,
        [chunk | selected],
        row_count + chunk.row_count
      )
    else
      {selected_chunk, remaining_chunk} = split_chunk(chunk, remaining_rows)

      {Enum.reverse([selected_chunk | selected]), [remaining_chunk | chunks],
       row_count + remaining_rows}
    end
  end

  defp split_chunk(chunk, row_count) do
    {selected_columns, remaining_columns} =
      Enum.map_reduce(chunk.columns, [], fn column, remaining_columns ->
        {selected_values, remaining_values} = Enum.split(column.values, row_count)

        selected_column = %{column | values: selected_values}
        remaining_column = %{column | values: remaining_values}

        {selected_column, [remaining_column | remaining_columns]}
      end)

    selected = %{chunk | row_count: row_count, columns: selected_columns}

    remaining = %{
      chunk
      | row_count: chunk.row_count - row_count,
        columns: Enum.reverse(remaining_columns)
    }

    {selected, remaining}
  end

  defp cursor_result(cursor, rows) do
    %Result{
      command: :fetch,
      columns: cursor.columns,
      rows: rows,
      num_rows: length(rows),
      connection_id: cursor.connection_id,
      messages: []
    }
  end

  defp cursor_result(cursor, nil, chunks) do
    %Result{
      command: :fetch,
      columns: cursor.columns,
      rows: nil,
      num_rows: Enum.reduce(chunks, 0, &(&2 + &1.row_count)),
      connection_id: cursor.connection_id,
      messages: [],
      metadata: %{columnar_chunks: chunks}
    }
  end

  defp empty_result(command) do
    %Result{command: command, columns: nil, rows: nil, num_rows: 0, messages: []}
  end

  defp query_metadata(query, params, options, state) do
    options
    |> metadata_from_options(state)
    |> Map.put(:query, query.statement)
    |> maybe_put_params(params, options)
  end

  defp append_metadata(query, table, values, batches, options, state) when is_list(values) do
    append_metadata(query, table, length(values), batches, options, state)
  end

  defp append_metadata(query, table, row_count, batches, options, state) do
    options
    |> metadata_from_options(state)
    |> Map.merge(%{
      query: query.statement,
      table: to_string(table),
      schema: Keyword.get(options, :schema, ""),
      batch_size: Keyword.get(options, :batch_size),
      batches: length(batches),
      rows: row_count
    })
  end

  defp fetch_metadata(result_uuid, options, state) do
    options
    |> metadata_from_options(state)
    |> Map.put(:result_uuid, result_uuid)
  end

  defp metadata_from_options(options, state) do
    %{
      connection_id: state.connection_id,
      client_query_id: Keyword.get(options, :client_query_id),
      options: Keyword.get(options, :telemetry_options, [])
    }
  end

  defp maybe_put_params(metadata, params, options) do
    if Keyword.get(options, :telemetry_params, false) do
      Map.put(metadata, :params, params)
    else
      metadata
    end
  end

  defp result_stop_metadata({:ok, _query, %Result{} = result, _state}) do
    %{command: result.command, rows: result.num_rows, result: :ok}
  end

  defp result_stop_metadata({:error, %Error{} = error, _state}) do
    %{error: error, result: :error}
  end

  defp query_stop_metadata(result, metrics) do
    result
    |> result_stop_metadata()
    |> Map.merge(metrics)
  end

  defp append_stop_metadata(result, metrics, duration) do
    result
    |> result_stop_metadata()
    |> Map.merge(metrics)
    |> Map.put(:append_duration, duration)
    |> maybe_put_rows_per_second(result, duration)
  end

  defp maybe_put_rows_per_second(
         metadata,
         {:ok, _query, %Result{num_rows: rows}, _state},
         duration
       )
       when is_integer(rows) and rows >= 0 and duration > 0 do
    seconds = System.convert_time_unit(duration, :native, :microsecond) / 1_000_000
    Map.put(metadata, :rows_per_second, rows / seconds)
  end

  defp maybe_put_rows_per_second(metadata, _result, _duration), do: metadata

  defp fetch_stop_metadata({:ok, chunks}, metrics) do
    %{chunks: length(chunks), result: :ok}
    |> Map.merge(metrics)
  end

  defp fetch_stop_metadata({:error, %Error{} = error}, metrics) do
    %{error: error, result: :error}
    |> Map.merge(metrics)
  end

  defp command(statement) do
    statement
    |> IO.iodata_to_binary()
    |> first_sql_word()
    |> String.downcase()
    |> sql_command_atom()
  end

  defp sql_command_atom(word) do
    Map.get(@sql_command_map, word) || Map.get(@sql_command_aliases, word, :unknown)
  end

  defp first_sql_word(statement) do
    statement
    |> skip_leading_whitespace()
    |> take_until_whitespace()
  end

  defp skip_leading_whitespace(<<char, rest::binary>>) when char in [?\s, ?\t, ?\n, ?\r, ?\f] do
    skip_leading_whitespace(rest)
  end

  defp skip_leading_whitespace(rest), do: rest

  defp take_until_whitespace(statement), do: take_until_whitespace(statement, [])

  defp take_until_whitespace(<<>>, acc), do: acc |> Enum.reverse() |> IO.iodata_to_binary()

  defp take_until_whitespace(<<char, _rest::binary>>, acc)
       when char in [?\s, ?\t, ?\n, ?\r, ?\f] do
    acc |> Enum.reverse() |> IO.iodata_to_binary()
  end

  defp take_until_whitespace(<<char, rest::binary>>, acc) do
    take_until_whitespace(rest, [char | acc])
  end

  defp successful_status(:error), do: :error
  defp successful_status(status), do: status

  defp failed_status(:transaction), do: :error
  defp failed_status(status), do: status

  defp client_version do
    case Application.spec(:quackdb, :vsn) do
      nil -> "quackdb/dev"
      version -> "quackdb/#{version}"
    end
  end

  defp client_platform do
    :system_architecture
    |> :erlang.system_info()
    |> List.to_string()
  end
end