Skip to main content

lib/quack_db/protocol/vector.ex

defmodule QuackDB.Protocol.Vector do
  @moduledoc """
  DuckDB vector encoding and decoding helpers.
  """

  import Bitwise

  alias QuackDB.Error
  alias QuackDB.Protocol.LogicalType
  alias QuackDB.Protocol.Reader
  alias QuackDB.Protocol.Value
  alias QuackDB.Protocol.Writer

  defstruct [:type, vector_type: :flat, values: []]

  @type t :: %__MODULE__{type: LogicalType.t(), vector_type: atom(), values: [term()]}

  @spec encode(LogicalType.t(), [term()], non_neg_integer()) :: iodata()
  def encode(type, values, row_count) when is_list(values) do
    value_count = length(values)

    if value_count == row_count do
      [encode_flat(type, values, row_count), Writer.end_object()]
    else
      raise Error.new(
              :invalid_vector_size,
              "vector has #{value_count} values, expected #{row_count}",
              source: :protocol
            )
    end
  end

  @spec decode(binary(), LogicalType.t(), non_neg_integer()) :: Reader.read_result(t())
  def decode(binary, type, row_count) do
    decode_object(binary, type, row_count, %__MODULE__{type: type})
  end

  defp encode_flat(type, values, row_count) do
    validity = validity_field(values, row_count)

    [
      maybe_geometry_version(type),
      validity,
      encode_values(type, values, row_count)
    ]
  end

  defp maybe_geometry_version(%LogicalType{name: :geometry}),
    do: Writer.field(99, Writer.uleb128(1))

  defp maybe_geometry_version(_type), do: []

  defp validity_field(values, row_count) do
    if Enum.any?(values, &is_nil/1) do
      [
        Writer.field(100, Writer.bool(true)),
        Writer.field(101, Writer.blob(validity_mask(values, row_count)))
      ]
    else
      Writer.field(100, Writer.bool(false))
    end
  end

  defp encode_values(type, values, row_count) do
    physical_type = LogicalType.physical_type(type)

    if LogicalType.fixed_size?(physical_type) do
      blob = encode_fixed_blob(type, physical_type, values)
      expected_size = LogicalType.fixed_size(physical_type) * row_count

      if byte_size(blob) == expected_size do
        Writer.field(102, Writer.blob(blob))
      else
        raise Error.new(
                :invalid_vector_size,
                "encoded vector has #{byte_size(blob)} bytes, expected #{expected_size}",
                source: :protocol
              )
      end
    else
      encode_variable_values(type, values, physical_type, row_count)
    end
  end

  defp encode_fixed_blob(%LogicalType{name: name}, :int64, values)
       when name not in [
              :decimal,
              :time,
              :time_ns,
              :time_tz,
              :timestamp,
              :timestamp_ms,
              :timestamp_ns,
              :timestamp_sec,
              :timestamp_tz
            ] do
    if Enum.any?(values, &is_nil/1) do
      values
      |> Enum.map(&encode_fixed_value(%LogicalType{name: name}, :int64, &1))
      |> IO.iodata_to_binary()
    else
      for value <- values, into: <<>>, do: <<value::little-signed-64>>
    end
  end

  defp encode_fixed_blob(type, physical_type, values) do
    values |> Enum.map(&encode_fixed_value(type, physical_type, &1)) |> IO.iodata_to_binary()
  end

  defp encode_variable_values(type, values, :varchar, row_count) do
    Writer.field(
      102,
      Writer.list(values, row_count, fn value -> Writer.blob(encode_string_like(type, value)) end)
    )
  end

  defp encode_variable_values(type, values, :struct, _row_count) do
    children = LogicalType.struct_children(type)

    Writer.field(
      103,
      Writer.list(children, fn child ->
        child_values = Enum.map(values, &struct_child_value(&1, child.name))
        encode(child.type, child_values, length(child_values))
      end)
    )
  end

  defp encode_variable_values(%LogicalType{name: :map} = type, values, :list, _row_count) do
    child_type = LogicalType.child_type(type)

    values =
      Enum.map(values, fn
        nil -> nil
        value when is_map(value) -> map_to_entries(value)
        value -> value
      end)

    {entries, child_values} = list_entries_and_child_values(values)
    child_count = length(child_values)

    [
      Writer.field(104, Writer.uleb128(child_count)),
      Writer.field(105, Writer.list(entries, &encode_list_entry/1)),
      Writer.field(106, encode(child_type, child_values, child_count))
    ]
  end

  defp encode_variable_values(type, values, :list, _row_count) do
    child_type = LogicalType.child_type(type)
    {entries, child_values} = list_entries_and_child_values(values)
    child_count = length(child_values)

    [
      Writer.field(104, Writer.uleb128(child_count)),
      Writer.field(105, Writer.list(entries, &encode_list_entry/1)),
      Writer.field(106, encode(child_type, child_values, child_count))
    ]
  end

  defp encode_variable_values(type, values, :array, _row_count) do
    child_type = LogicalType.child_type(type)
    array_size = LogicalType.array_size(type)

    child_values =
      Enum.flat_map(values, fn
        nil ->
          List.duplicate(nil, array_size)

        value when is_list(value) ->
          if Enum.count(value) == array_size do
            value
          else
            invalid_array_value!(value, array_size)
          end

        value ->
          invalid_array_value!(value, array_size)
      end)

    [
      Writer.field(103, Writer.uleb128(array_size)),
      Writer.field(104, encode(child_type, child_values, length(child_values)))
    ]
  end

  defp encode_variable_values(_type, _values, physical_type, _row_count) do
    raise Error.new(:unsupported_physical_type, "#{physical_type} vectors are not encodable yet",
            source: :protocol
          )
  end

  defp map_to_entries(value) do
    Enum.map(value, fn {key, entry_value} -> %{key: key, value: entry_value} end)
  end

  defp invalid_array_value!(value, array_size) do
    raise Error.new(
            :invalid_array_value,
            "ARRAY values must be lists of #{array_size} elements, got #{inspect(value)}",
            source: :protocol
          )
  end

  defp decode_object(binary, type, row_count, vector) do
    with {:ok, field_id, rest} <- Reader.read_field_id(binary) do
      cond do
        field_id == QuackDB.Protocol.field_end() ->
          {:ok, vector, rest}

        field_id == 90 ->
          with {:ok, vector_type_id, rest} <- Reader.read_uleb128(rest),
               {:ok, vector_type} <- vector_type(vector_type_id) do
            decode_object(rest, type, row_count, %{vector | vector_type: vector_type})
          end

        true ->
          decode_body(binary, type, row_count, vector.vector_type)
      end
    end
  end

  defp decode_body(binary, type, row_count, :flat) do
    decode_flat(binary, type, row_count, %__MODULE__{type: type})
  end

  defp decode_body(binary, type, row_count, :constant) do
    with {:ok, vector, rest} <- decode_body(binary, type, min(row_count, 1), :flat) do
      value = List.first(vector.values)
      {:ok, %{vector | vector_type: :constant, values: List.duplicate(value, row_count)}, rest}
    end
  end

  defp decode_body(binary, type, row_count, :dictionary) do
    with {:ok, selection, rest} <- read_required(binary, 91, &read_selection(&1, row_count)),
         {:ok, dictionary_count, rest} <- read_required(rest, 92, &Reader.read_uleb128/1),
         {:ok, dictionary, rest} <-
           decode_object(rest, type, dictionary_count, %__MODULE__{type: type}),
         {:ok, values} <- select_dictionary_values(dictionary.values, selection) do
      {:ok, %__MODULE__{type: type, vector_type: :dictionary, values: values}, rest}
    end
  end

  defp decode_body(binary, type, row_count, :sequence) do
    with {:ok, start, rest} <- read_required(binary, 91, &Reader.read_sleb128/1),
         {:ok, increment, rest} <- read_required(rest, 92, &Reader.read_sleb128/1),
         {:ok, field_end, rest} <- Reader.read_field_id(rest),
         :ok <- expect_vector_end(field_end) do
      values =
        for index <- 0..(row_count - 1)//1,
            do: Value.decode_sequence(type, start + increment * index)

      {:ok, %__MODULE__{type: type, vector_type: :sequence, values: values}, rest}
    end
  end

  defp decode_body(_binary, _type, _row_count, vector_type) do
    error(:unsupported_vector_type, "#{vector_type} vectors are not implemented yet")
  end

  defp decode_flat(binary, type, row_count, vector) do
    with {:ok, rest} <- maybe_skip_geometry_version(binary, type),
         {:ok, has_validity?, rest} <- read_required(rest, 100, &Reader.read_bool/1),
         {:ok, validity, rest} <- maybe_read_validity(rest, has_validity?, row_count),
         {:ok, values, rest} <- read_values(rest, type, row_count, validity),
         {:ok, field_end, rest} <- Reader.read_field_id(rest),
         :ok <- expect_vector_end(field_end) do
      {:ok, %{vector | values: values}, rest}
    end
  end

  defp maybe_skip_geometry_version(binary, %{name: :geometry}) do
    case Reader.read_field_id(binary) do
      {:ok, 99, rest} ->
        with {:ok, _version, rest} <- Reader.read_uleb128(rest), do: {:ok, rest}

      {:ok, _field_id, _rest} ->
        {:ok, binary}

      {:error, _error} = error ->
        error
    end
  end

  defp maybe_skip_geometry_version(binary, _type), do: {:ok, binary}

  defp read_values(binary, type, row_count, validity) do
    physical_type = LogicalType.physical_type(type)

    if LogicalType.fixed_size?(physical_type) do
      byte_size = LogicalType.fixed_size(physical_type) * row_count

      with {:ok, blob, rest} <- read_required(binary, 102, &Reader.read_blob/1),
           :ok <- expect_blob_size(blob, byte_size),
           {:ok, values} <- decode_fixed_values(blob, type, physical_type, row_count, validity) do
        {:ok, values, rest}
      end
    else
      decode_variable_values(binary, type, physical_type, row_count, validity)
    end
  end

  defp decode_variable_values(binary, type, :varchar, row_count, validity) do
    read_blob_list = fn rest -> Reader.read_list(rest, &Reader.read_blob/1) end

    with {:ok, values, rest} <- read_required(binary, 102, read_blob_list),
         :ok <- expect_value_count(values, row_count, :varchar) do
      values =
        values
        |> Enum.with_index()
        |> Enum.map(fn {value, index} ->
          if valid?(validity, index), do: decode_string_like(type, value), else: nil
        end)

      {:ok, values, rest}
    end
  end

  defp decode_variable_values(binary, type, :struct, row_count, validity) do
    children = LogicalType.struct_children(type)

    with {:ok, child_vectors, rest} <-
           read_required(binary, 103, &read_child_vectors(&1, children, row_count)) do
      values = struct_values(children, child_vectors, row_count, validity)

      {:ok, values, rest}
    end
  end

  defp decode_variable_values(binary, type, :list, row_count, validity) do
    with {:ok, list_size, rest} <- read_required(binary, 104, &Reader.read_uleb128/1),
         {:ok, entries, rest} <- read_required(rest, 105, &read_list_entries(&1, row_count)),
         {:ok, child_vector, rest} <-
           read_required(rest, 106, &decode(&1, LogicalType.child_type(type), list_size)) do
      with :ok <- validate_list_entries(entries, list_size),
           {:ok, values} <- list_values(type, entries, child_vector.values, validity) do
        {:ok, values, rest}
      end
    end
  end

  defp decode_variable_values(binary, type, :array, row_count, validity) do
    with {:ok, array_size, rest} <- read_required(binary, 103, &Reader.read_uleb128/1),
         :ok <- expect_array_size(type, array_size),
         {:ok, child_vector, rest} <-
           read_required(
             rest,
             104,
             &decode(&1, LogicalType.child_type(type), array_size * row_count)
           ) do
      values =
        for row_index <- 0..(row_count - 1)//1 do
          if valid?(validity, row_index) do
            Enum.slice(child_vector.values, row_index * array_size, array_size)
          else
            nil
          end
        end

      {:ok, values, rest}
    end
  end

  defp decode_variable_values(_binary, _type, physical_type, _row_count, _validity) do
    error(:unsupported_physical_type, "#{physical_type} vectors are not implemented yet")
  end

  defp encode_fixed_value(_type, physical_type, nil) do
    :binary.copy(<<0>>, LogicalType.fixed_size(physical_type))
  end

  defp encode_fixed_value(_type, :bool, value), do: <<if(value, do: 1, else: 0)>>
  defp encode_fixed_value(_type, :int8, value), do: <<value::little-signed-8>>
  defp encode_fixed_value(_type, :uint8, value), do: <<value::little-unsigned-8>>
  defp encode_fixed_value(_type, :int16, value), do: <<value::little-signed-16>>
  defp encode_fixed_value(_type, :uint16, value), do: <<value::little-unsigned-16>>

  defp encode_fixed_value(
         %LogicalType{name: :date},
         :int32,
         %{calendar: _, year: _, month: _, day: _} = value
       ),
       do: <<date_unscaled(value)::little-signed-32>>

  defp encode_fixed_value(%LogicalType{name: :decimal} = type, :int32, value),
    do: <<decimal_unscaled(type, value)::little-signed-32>>

  defp encode_fixed_value(_type, :int32, value), do: <<value::little-signed-32>>
  defp encode_fixed_value(_type, :uint32, value), do: <<value::little-unsigned-32>>

  defp encode_fixed_value(%LogicalType{name: :decimal} = type, :int64, value),
    do: <<decimal_unscaled(type, value)::little-signed-64>>

  defp encode_fixed_value(%LogicalType{name: name}, :int64, value)
       when name in [
              :time,
              :time_ns,
              :time_tz,
              :timestamp,
              :timestamp_ms,
              :timestamp_ns,
              :timestamp_sec,
              :timestamp_tz
            ],
       do: <<temporal_unscaled(name, value)::little-signed-64>>

  defp encode_fixed_value(_type, :int64, value), do: <<value::little-signed-64>>
  defp encode_fixed_value(_type, :uint64, value), do: <<value::little-unsigned-64>>
  defp encode_fixed_value(_type, :float, value), do: <<value::little-float-32>>
  defp encode_fixed_value(_type, :double, value), do: <<value::little-float-64>>

  defp encode_fixed_value(%LogicalType{name: :decimal} = type, :int128, value),
    do: encode_int128(decimal_unscaled(type, value))

  defp encode_fixed_value(_type, :int128, value), do: encode_int128(value)
  defp encode_fixed_value(_type, :uint128, value), do: encode_uint128(value)

  defp encode_fixed_value(_type, :interval, %QuackDB.Interval{} = interval),
    do: encode_interval(interval.months, interval.days, interval.microseconds)

  defp encode_fixed_value(_type, :interval, {:interval, months, days, micros}),
    do: encode_interval(months, days, micros)

  defp decimal_unscaled(%LogicalType{type_info: %{scale: scale}}, %Decimal{} = decimal) do
    decimal
    |> Decimal.mult(Decimal.new(1, 1, scale))
    |> Decimal.round(0)
    |> Decimal.to_integer()
  end

  defp decimal_unscaled(_type, value), do: value

  defp encode_interval(months, days, micros) do
    <<months::little-signed-32, days::little-signed-32, micros::little-signed-64>>
  end

  defp date_unscaled(value) do
    value
    |> convert_date!()
    |> Date.diff(~D[1970-01-01])
  end

  defp temporal_unscaled(
         :time,
         %{calendar: _, hour: _, minute: _, second: _, microsecond: _} = value
       ) do
    value
    |> convert_time!()
    |> Time.diff(~T[00:00:00], :microsecond)
  end

  defp temporal_unscaled(:timestamp, value), do: timestamp_unscaled(value, :microsecond)

  defp temporal_unscaled(:timestamp_tz, %DateTime{} = value) do
    value
    |> convert_datetime!()
    |> DateTime.diff(~U[1970-01-01 00:00:00Z], :microsecond)
  end

  defp temporal_unscaled(:timestamp_ms, value), do: timestamp_unscaled(value, :millisecond)
  defp temporal_unscaled(:timestamp_sec, value), do: timestamp_unscaled(value, :second)

  defp temporal_unscaled(:time_ns, %QuackDB.NanosecondTime{nanoseconds: nanoseconds}),
    do: nanoseconds

  defp temporal_unscaled(:time_tz, %QuackDB.TimeWithTimeZone{} = value),
    do: QuackDB.TimeWithTimeZone.to_bits(value)

  defp temporal_unscaled(:timestamp_ns, %QuackDB.NanosecondTimestamp{nanoseconds: nanoseconds}),
    do: nanoseconds

  defp temporal_unscaled(_type, value) when is_integer(value), do: value

  defp timestamp_unscaled(
         %{calendar: _, year: _, month: _, day: _, hour: _, minute: _, second: _, microsecond: _} =
           value,
         unit
       ) do
    value
    |> convert_naive_datetime!()
    |> NaiveDateTime.diff(~N[1970-01-01 00:00:00], unit)
  end

  defp convert_date!(value), do: convert_calendar!(Date, value, :date)
  defp convert_time!(value), do: convert_calendar!(Time, value, :time)
  defp convert_naive_datetime!(value), do: convert_calendar!(NaiveDateTime, value, :timestamp)
  defp convert_datetime!(value), do: convert_calendar!(DateTime, value, :timestamp_tz)

  defp convert_calendar!(module, value, target) do
    case module.convert(value, Calendar.ISO) do
      {:ok, converted} ->
        converted

      {:error, reason} ->
        raise Error.new(
                :unsupported_calendar,
                "cannot encode #{inspect(value)} as DuckDB #{target}: #{inspect(reason)}",
                source: :protocol
              )
    end
  end

  defp encode_int128(value) do
    lower = value &&& 0xFFFF_FFFF_FFFF_FFFF
    upper = value >>> 64
    <<lower::little-unsigned-64, upper::little-signed-64>>
  end

  defp encode_uint128(value) do
    lower = value &&& 0xFFFF_FFFF_FFFF_FFFF
    upper = value >>> 64
    <<lower::little-unsigned-64, upper::little-unsigned-64>>
  end

  defp encode_string_like(_type, nil), do: ""

  defp encode_string_like(%LogicalType{name: name}, value)
       when name in [:blob, :bit, :geometry] and is_binary(value),
       do: value

  defp encode_string_like(%LogicalType{name: :bignum}, value) when is_integer(value),
    do: encode_bignum(value)

  defp encode_string_like(_type, value), do: to_string(value)

  defp validity_mask(values, row_count) do
    values
    |> Enum.reduce({0, 0, []}, fn value, {bit_index, byte, bytes} ->
      byte = if is_nil(value), do: byte, else: byte ||| 1 <<< bit_index
      bit_index = bit_index + 1

      if bit_index == 8 do
        {0, 0, [<<byte>> | bytes]}
      else
        {bit_index, byte, bytes}
      end
    end)
    |> finish_validity_mask(row_count)
  end

  defp finish_validity_mask({0, _byte, bytes}, row_count) do
    bytes
    |> pad_validity_mask(row_count)
    |> Enum.reverse()
    |> IO.iodata_to_binary()
  end

  defp finish_validity_mask({_bit_index, byte, bytes}, row_count) do
    bytes
    |> then(&[<<byte>> | &1])
    |> pad_validity_mask(row_count)
    |> Enum.reverse()
    |> IO.iodata_to_binary()
  end

  defp pad_validity_mask(bytes, row_count) do
    byte_count = div(row_count + 63, 64) * 8
    missing = byte_count - length(bytes)

    if missing > 0 do
      List.duplicate(<<0>>, missing) ++ bytes
    else
      bytes
    end
  end

  defp list_entries_and_child_values(values) do
    {entries, child_value_groups, _offset} =
      Enum.reduce(values, {[], [], 0}, &append_list_entry/2)

    {Enum.reverse(entries), child_value_groups |> Enum.reverse() |> :lists.append()}
  end

  defp append_list_entry(nil, {entries, child_value_groups, offset}) do
    {[%{offset: 0, length: 0} | entries], child_value_groups, offset}
  end

  defp append_list_entry(value, {entries, child_value_groups, offset}) when is_list(value) do
    value_length = length(value)

    {[%{offset: offset, length: value_length} | entries], [value | child_value_groups],
     offset + value_length}
  end

  defp append_list_entry(value, _acc) do
    raise Error.new(:invalid_list_value, "LIST/MAP values must be lists, got #{inspect(value)}",
            source: :protocol
          )
  end

  defp encode_list_entry(%{offset: offset, length: length}) do
    [
      Writer.field(100, Writer.uleb128(offset)),
      Writer.field(101, Writer.uleb128(length)),
      Writer.end_object()
    ]
  end

  defp struct_child_value(value, name), do: QuackDB.KeyLookup.fetch(value, name)

  defp read_child_vectors(binary, children, row_count) do
    with {:ok, count, rest} <- Reader.read_uleb128(binary),
         :ok <- expect_struct_child_count(children, count) do
      read_child_vectors(rest, children, row_count, count, [])
    end
  end

  defp read_child_vectors(rest, _children, _row_count, 0, vectors),
    do: {:ok, Enum.reverse(vectors), rest}

  defp read_child_vectors(binary, [child | children], row_count, remaining, vectors) do
    with {:ok, vector, rest} <- decode(binary, child.type, row_count) do
      read_child_vectors(rest, children, row_count, remaining - 1, [vector | vectors])
    end
  end

  defp read_child_vectors(_binary, [], _row_count, _remaining, _vectors) do
    error(:struct_child_mismatch, "struct has more child vectors than child types")
  end

  defp decode_fixed_values(blob, type, physical_type, row_count, nil) do
    with {:ok, values, <<>>} <-
           decode_present_fixed_values(blob, type, physical_type, row_count, []) do
      {:ok, Enum.reverse(values)}
    end
  end

  defp decode_fixed_values(blob, type, physical_type, row_count, validity) do
    size = LogicalType.fixed_size(physical_type)

    with {:ok, values, <<>>} <-
           decode_nullable_fixed_values(
             blob,
             type,
             physical_type,
             row_count,
             validity,
             size,
             0,
             []
           ) do
      {:ok, Enum.reverse(values)}
    end
  end

  defp decode_present_fixed_values(rest, _type, _physical_type, 0, values),
    do: {:ok, values, rest}

  defp decode_present_fixed_values(binary, type, physical_type, remaining, values) do
    with {:ok, value, rest} <- Value.decode_fixed(binary, type, physical_type) do
      decode_present_fixed_values(rest, type, physical_type, remaining - 1, [value | values])
    end
  end

  defp decode_nullable_fixed_values(
         rest,
         _type,
         _physical_type,
         0,
         _validity,
         _size,
         _index,
         values
       ),
       do: {:ok, values, rest}

  defp decode_nullable_fixed_values(
         binary,
         type,
         physical_type,
         remaining,
         validity,
         size,
         index,
         values
       ) do
    if valid?(validity, index) do
      with {:ok, value, rest} <- Value.decode_fixed(binary, type, physical_type) do
        decode_nullable_fixed_values(
          rest,
          type,
          physical_type,
          remaining - 1,
          validity,
          size,
          index + 1,
          [
            value | values
          ]
        )
      end
    else
      <<_ignored::binary-size(^size), rest::binary>> = binary

      decode_nullable_fixed_values(
        rest,
        type,
        physical_type,
        remaining - 1,
        validity,
        size,
        index + 1,
        [
          nil | values
        ]
      )
    end
  end

  defp struct_values(children, child_vectors, row_count, validity) do
    columns =
      Enum.map(Enum.zip(children, child_vectors), fn {%{name: name}, vector} ->
        {name, vector.values}
      end)

    build_struct_values(columns, row_count, validity, 0, [])
  end

  defp build_struct_values(_columns, 0, _validity, _index, values), do: Enum.reverse(values)

  defp build_struct_values(columns, remaining, validity, index, values) do
    {row, columns} = take_struct_row(columns, valid?(validity, index), %{}, [])
    build_struct_values(columns, remaining - 1, validity, index + 1, [row | values])
  end

  defp take_struct_row([], true, row, columns), do: {row, Enum.reverse(columns)}
  defp take_struct_row([], false, _row, columns), do: {nil, Enum.reverse(columns)}

  defp take_struct_row([{name, [value | rest]} | columns], valid?, row, advanced) do
    row = if valid?, do: Map.put(row, name, value), else: row
    take_struct_row(columns, valid?, row, [{name, rest} | advanced])
  end

  defp maybe_read_validity(binary, false, _row_count), do: {:ok, nil, binary}

  defp maybe_read_validity(binary, true, row_count),
    do: read_required(binary, 101, &read_validity(&1, row_count))

  defp read_validity(binary, row_count) do
    expected_size = div(row_count + 63, 64) * 8

    with {:ok, blob, rest} <- Reader.read_blob(binary),
         :ok <- expect_blob_size(blob, expected_size) do
      {:ok, blob, rest}
    end
  end

  defp read_selection(binary, row_count) do
    expected_size = row_count * 4

    with {:ok, blob, rest} <- Reader.read_blob(binary),
         :ok <- expect_blob_size(blob, expected_size) do
      {:ok, decode_selection(blob, []), rest}
    end
  end

  defp decode_selection(<<>>, indexes), do: Enum.reverse(indexes)

  defp decode_selection(<<index::little-unsigned-32, rest::binary>>, indexes),
    do: decode_selection(rest, [index | indexes])

  defp read_list_entries(binary, row_count) do
    with {:ok, entries, rest} <- Reader.read_list(binary, &read_list_entry/1) do
      if length(entries) == row_count do
        {:ok, entries, rest}
      else
        error(
          :list_entry_count_mismatch,
          "list vector serialized #{length(entries)} entries for #{row_count} rows"
        )
      end
    end
  end

  defp read_list_entry(binary), do: read_list_entry(binary, %{})

  defp read_list_entry(binary, entry) do
    with {:ok, field_id, rest} <- Reader.read_field_id(binary) do
      cond do
        field_id == QuackDB.Protocol.field_end() ->
          {:ok, entry, rest}

        field_id == 100 ->
          with {:ok, offset, rest} <- Reader.read_uleb128(rest),
               do: read_list_entry(rest, Map.put(entry, :offset, offset))

        field_id == 101 ->
          with {:ok, length, rest} <- Reader.read_uleb128(rest),
               do: read_list_entry(rest, Map.put(entry, :length, length))

        true ->
          error(:unknown_list_entry_field, "unknown list entry field #{field_id}")
      end
    end
  end

  defp validate_list_entries(entries, list_size) do
    Enum.reduce_while(entries, :ok, fn entry, :ok ->
      with {:ok, offset, length} <- list_entry_bounds(entry) do
        if offset + length <= list_size do
          {:cont, :ok}
        else
          {:halt,
           error(
             :list_entry_out_of_bounds,
             "list entry offset #{offset} with length #{length} exceeds child vector size #{list_size}"
           )}
        end
      else
        {:error, _error} = error -> {:halt, error}
      end
    end)
  end

  defp list_entry_bounds(%{offset: offset, length: length}), do: {:ok, offset, length}

  defp list_entry_bounds(entry) do
    error(
      :invalid_list_entry,
      "LIST entry must include offset and length fields, got #{inspect(entry)}"
    )
  end

  defp list_values(type, entries, child_values, validity) do
    child_values = List.to_tuple(child_values)

    entries
    |> Enum.with_index()
    |> Enum.reduce_while({:ok, []}, fn {%{offset: offset, length: length}, row_index},
                                       {:ok, values} ->
      if valid?(validity, row_index) do
        value = tuple_slice(child_values, offset, length)

        case list_value(type, value) do
          {:ok, value} -> {:cont, {:ok, [value | values]}}
          {:error, _error} = error -> {:halt, error}
        end
      else
        {:cont, {:ok, [nil | values]}}
      end
    end)
    |> case do
      {:ok, values} -> {:ok, Enum.reverse(values)}
      {:error, _error} = error -> error
    end
  end

  defp list_value(%{name: :map} = type, entries) do
    with :ok <- validate_map_child_type(type) do
      map_entries(entries)
    end
  end

  defp list_value(_type, entries), do: {:ok, entries}

  defp tuple_slice(tuple, offset, length), do: tuple_slice(tuple, offset, length, [])

  defp tuple_slice(_tuple, _offset, 0, values), do: Enum.reverse(values)

  defp tuple_slice(tuple, offset, remaining, values) do
    tuple_slice(tuple, offset + 1, remaining - 1, [:erlang.element(offset + 1, tuple) | values])
  end

  defp validate_map_child_type(type) do
    child_type = LogicalType.child_type(type)

    if child_type.name == :struct do
      children = LogicalType.struct_children(child_type)
      child_names = MapSet.new(children, & &1.name)

      cond do
        not MapSet.member?(child_names, "key") ->
          error(:invalid_map_type, "MAP child struct must include a key field")

        not MapSet.member?(child_names, "value") ->
          error(:invalid_map_type, "MAP child struct must include a value field")

        true ->
          :ok
      end
    else
      error(:invalid_map_type, "MAP child type must be STRUCT, got #{inspect(child_type.name)}")
    end
  end

  defp map_entries(entries) do
    Enum.reduce_while(entries, {:ok, %{}}, fn entry, {:ok, map} ->
      with {:ok, key, value} <- map_key_value(entry) do
        {:cont, {:ok, Map.put(map, key, value)}}
      else
        {:error, _error} = error -> {:halt, error}
      end
    end)
  end

  defp map_key_value(%{"key" => key, "value" => value}), do: {:ok, key, value}
  defp map_key_value(%{key: key, value: value}), do: {:ok, key, value}

  defp map_key_value(other), do: invalid_map_entry(other)

  defp invalid_map_entry(entry) do
    error(
      :invalid_map_entry,
      "MAP entry must include key and value fields, got #{inspect(entry)}"
    )
  end

  defp select_dictionary_values(values, selection) do
    Enum.reduce_while(selection, {:ok, []}, fn index, {:ok, selected} ->
      case Enum.fetch(values, index) do
        {:ok, value} ->
          {:cont, {:ok, [value | selected]}}

        :error ->
          {:halt,
           error(:dictionary_index_out_of_range, "dictionary index #{index} is out of range")}
      end
    end)
    |> case do
      {:ok, selected} -> {:ok, Enum.reverse(selected)}
      error -> error
    end
  end

  defp valid?(nil, _index), do: true

  defp valid?(validity, index) do
    byte = :binary.at(validity, div(index, 8))
    (byte &&& 1 <<< rem(index, 8)) != 0
  end

  defp decode_string_like(%{name: name}, value) when name in [:blob, :geometry], do: value
  defp decode_string_like(%{name: :bit}, value), do: decode_bitstring(value)

  defp decode_string_like(%{name: :bignum}, value), do: decode_bignum(value)

  defp decode_string_like(_type, value) do
    if String.valid?(value) do
      value
    else
      raise Error.new(:invalid_string, "expected valid UTF-8 string vector value",
              source: :protocol
            )
    end
  end

  defp encode_bignum(value) when value < 0 do
    value
    |> abs()
    |> encode_bignum()
    |> :binary.bin_to_list()
    |> Enum.map(&(bnot(&1) &&& 0xFF))
    |> :binary.list_to_bin()
  end

  defp encode_bignum(value) do
    magnitude = encode_unsigned_big_endian(value)
    header = 0x80_0000 + byte_size(magnitude)
    <<header::unsigned-24, magnitude::binary>>
  end

  defp encode_unsigned_big_endian(0), do: <<0>>

  defp encode_unsigned_big_endian(value) do
    value
    |> Stream.unfold(fn
      0 -> nil
      integer -> {rem(integer, 256), div(integer, 256)}
    end)
    |> Enum.reverse()
    |> :binary.list_to_bin()
  end

  defp decode_bignum(<<1::1, _rest::bitstring>> = value) do
    decode_positive_bignum(value)
  end

  defp decode_bignum(value) when is_binary(value) do
    value
    |> :binary.bin_to_list()
    |> Enum.map(&(bnot(&1) &&& 0xFF))
    |> :binary.list_to_bin()
    |> decode_positive_bignum()
    |> Kernel.*(-1)
  end

  defp decode_positive_bignum(<<header::unsigned-24, magnitude::binary>>)
       when header >= 0x80_0000 do
    size = header - 0x80_0000

    if byte_size(magnitude) == size do
      decode_unsigned_big_endian(magnitude)
    else
      raise Error.new(:invalid_bignum, "BIGNUM payload size does not match header",
              source: :protocol
            )
    end
  end

  defp decode_positive_bignum(_value) do
    raise Error.new(:invalid_bignum, "expected DuckDB BIGNUM payload", source: :protocol)
  end

  defp decode_unsigned_big_endian(value) do
    value
    |> :binary.bin_to_list()
    |> Enum.reduce(0, fn byte, acc -> acc * 256 + byte end)
  end

  defp decode_bitstring(<<padding, bytes::binary>>) when padding in 0..7 do
    bytes
    |> :binary.bin_to_list()
    |> Enum.map_join(fn byte -> byte |> Integer.to_string(2) |> String.pad_leading(8, "0") end)
    |> String.slice(padding..-1//1)
  end

  defp decode_bitstring(_value) do
    raise Error.new(:invalid_bitstring, "expected DuckDB BIT payload", source: :protocol)
  end

  defp read_required(binary, expected_field_id, read_value) do
    with {:ok, field_id, rest} <- Reader.read_field_id(binary),
         :ok <- expect_field(field_id, expected_field_id) do
      read_value.(rest)
    end
  end

  defp expect_field(field_id, field_id), do: :ok

  defp expect_field(field_id, expected_field_id),
    do: error(:unexpected_field, "expected field #{expected_field_id}, got #{field_id}")

  defp expect_vector_end(field_id) do
    if field_id == QuackDB.Protocol.field_end(),
      do: :ok,
      else: error(:unexpected_vector_field, "unexpected vector field #{field_id}")
  end

  defp expect_array_size(type, size) do
    expected = LogicalType.array_size(type)

    if size == expected,
      do: :ok,
      else:
        error(:array_size_mismatch, "array vector serialized size #{size}, expected #{expected}")
  end

  defp expect_value_count(values, row_count, physical_type) do
    count = length(values)

    if count == row_count,
      do: :ok,
      else:
        error(
          :vector_value_count_mismatch,
          "#{physical_type} vector serialized #{count} values for #{row_count} rows"
        )
  end

  defp expect_struct_child_count(children, count) do
    expected = length(children)

    if count == expected,
      do: :ok,
      else:
        error(
          :struct_child_mismatch,
          "struct vector serialized #{count} child vectors for #{expected} child types"
        )
  end

  defp expect_blob_size(blob, size) when byte_size(blob) == size, do: :ok

  defp expect_blob_size(blob, size),
    do: error(:invalid_blob_size, "expected #{size} bytes, got #{byte_size(blob)}")

  defp vector_type(0), do: {:ok, :flat}
  defp vector_type(1), do: {:ok, :fsst}
  defp vector_type(2), do: {:ok, :constant}
  defp vector_type(3), do: {:ok, :dictionary}
  defp vector_type(4), do: {:ok, :sequence}
  defp vector_type(id), do: error(:unknown_vector_type, "unknown vector type #{id}")

  defp error(code, message), do: {:error, Error.new(code, message, source: :protocol)}
end