if Code.ensure_loaded?(NimbleCSV) do
defmodule ClickHouse.Format.TSVWithNamesAndTypes do
@moduledoc """
An implementation of the ClickHouse `TSVWithNamesAndTypes` format.
"""
@behaviour ClickHouse.Format
alias ClickHouse.Format.TSV
@parser TSV.Parser
@type_mappings [
{"Int64", :i64},
{"Int32", :i32},
{"Int16", :i16},
{"Int8", :i8},
{"UInt64", :u64},
{"UInt32", :u32},
{"UInt16", :u16},
{"UInt8", :u8},
{"Float64", :f64},
{"Float32", :f32},
{"String", :string},
{"UUID", :uuid},
{"Date", :date},
{"DateTime", :datetime}
]
@simple_aggregate_function_mappings [
{"any", :any},
{"anyLast", :any_last},
{"min", :min},
{"max", :max},
{"sum", :sum},
{"sumWithOverflow", :sum_with_overflow},
{"groupBitAnd", :group_bit_and},
{"groupBitOr", :group_bit_or},
{"groupBitXor", :group_bit_xor},
{"groupArrayArray", :group_array_array},
{"groupUniqArrayArray", :group_uniq_array_array},
{"sumMap", :sum_map},
{"minMap", :min_map},
{"maxMap", :max_map}
]
################################
# ClickHouse.Format Callbacks
################################
@impl ClickHouse.Format
@spec names() :: [binary()]
def names, do: ["TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"]
@impl ClickHouse.Format
@spec decode(raw :: iodata()) :: {ClickHouse.Result.columns(), ClickHouse.Result.rows()}
def decode(raw) do
[names, types | rows] = @parser.parse_string(raw, skip_headers: false)
columns = build_columns(names, types)
rows = decode_rows(columns, rows)
{columns, rows}
end
@impl ClickHouse.Format
@spec encode(params :: list()) :: iodata()
def encode([names, types | rows]) do
[@parser.dump_to_iodata([names, types]), TSV.encode(rows)]
end
def encode(_) do
raise ArgumentError, "missing TSV column names and types"
end
################################
# Private API
################################
defp build_columns(names, types) do
types = parse_types(types)
Enum.zip([names, types])
end
defp parse_types(types, results \\ [])
defp parse_types([], results) do
Enum.reverse(results)
end
defp parse_types([type | types], results) do
type = parse_type(type)
parse_types(types, [type | results])
end
defp parse_type(<<"Nullable(", type::binary>>) do
rest_type =
type
|> String.replace_suffix(")", "")
|> parse_type()
{:nullable, rest_type}
end
defp parse_type(<<"LowCardinality(", type::binary>>) do
rest_type =
type
|> String.replace_suffix(")", "")
|> parse_type()
{:low_cardinality, rest_type}
end
defp parse_type(<<"FixedString(", rest::binary>>) do
{length, _rest} = Integer.parse(rest)
{:fixed_string, length}
end
defp parse_type(<<"Array(", type::binary>>) do
rest_type =
type
|> String.replace_suffix(")", "")
|> parse_type()
{:array, rest_type}
end
defp parse_type(<<"Enum8(", type::binary>>) do
enums = parse_enum(type)
{:enum8, enums}
end
defp parse_type(<<"Enum16(", type::binary>>) do
enums = parse_enum(type)
{:enum16, enums}
end
defp parse_type(<<"SimpleAggregateFunction(", rest::binary>>) do
[function_type, rest] = String.split(rest, ",")
function_type = parse_aggregate_function(function_type)
rest_type =
rest
|> String.split(",")
|> List.last()
|> String.replace_suffix(")", "")
|> String.trim()
|> parse_type()
{{:simple_aggregate_function, function_type}, rest_type}
end
for {clickhouse_type, local_type} <- @type_mappings do
defp parse_type(unquote(clickhouse_type)) do
unquote(local_type)
end
end
defp parse_enum(type) do
type
|> String.replace_suffix(")", "")
|> String.split(",")
|> Stream.map(&(String.split(&1, "=") |> List.first()))
|> Stream.map(&String.replace(&1, "\\'", ""))
|> Stream.map(&String.trim/1)
|> Enum.into([])
end
for {clickhouse_type, local_type} <- @simple_aggregate_function_mappings do
defp parse_aggregate_function(unquote(clickhouse_type)) do
unquote(local_type)
end
end
defp decode_rows(columns, rows) do
Enum.map(rows, &decode_row(columns, &1))
end
defp decode_row(columns, row) do
[columns, row]
|> Enum.zip()
|> Enum.map(fn {{_name, type}, value} ->
decode_value(type, value)
end)
end
defguardp is_enum_type(type)
when is_tuple(type) and elem(type, 0) in [:enum8, :enum16]
defguardp is_integer_type(type) when type in [:i64, :i32, :i16, :i8, :u64, :u32, :u16, :u8]
defguardp is_float_type(type) when type in [:f64, :f32]
defguardp is_ignore_type(type) when type in [:string, :uuid] or is_enum_type(type)
defguardp is_quoted_type(type)
when type in [:string, :uuid, :date, :datetime] or is_enum_type(type)
defp decode_value(type, value) when is_integer_type(type), do: to_integer(value)
defp decode_value(type, value) when is_float_type(type), do: to_float(value)
defp decode_value(type, value) when is_ignore_type(type), do: value
defp decode_value({:fixed_string, length}, value) do
Enum.reduce(1..length, value, fn _, string ->
String.replace(string, "\\0", "", global: false)
end)
end
defp decode_value(:date, value) do
Date.from_iso8601!(value)
end
defp decode_value(:datetime, value) do
[date, time] = String.split(value, " ")
date = Date.from_iso8601!(date)
time = Time.from_iso8601!(time)
{:ok, date_time} = DateTime.new(date, time)
date_time
end
defp decode_value({:nullable, _type}, "\\N"), do: nil
defp decode_value({:nullable, _type}, "NULL"), do: nil
defp decode_value({:nullable, type}, value) do
decode_value(type, value)
end
defp decode_value({:low_cardinality, type}, value) do
decode_value(type, value)
end
defp decode_value({:array, type}, value) when is_quoted_type(type) do
value
|> trim_array()
|> String.trim_leading("'")
|> String.trim_trailing("'")
|> String.split("','")
|> Enum.map(&decode_value(type, &1))
end
defp decode_value({:array, type}, value) when is_integer_type(type) or is_float_type(type) do
type
|> to_array(value)
|> Enum.reject(&is_nil/1)
end
defp decode_value({:array, {:array, _} = type}, value) do
value
|> trim_array()
|> String.split("],[")
|> Enum.map(fn
"" -> []
v -> decode_value(type, v)
end)
end
defp decode_value({:array, type}, value) do
to_array(type, value)
end
defp decode_value({{:simple_aggregate_function, _}, type}, value) do
decode_value(type, value)
end
defp to_integer(""), do: nil
defp to_integer(value), do: String.to_integer(value)
defp to_float(""), do: nil
defp to_float("nan"), do: :nan
defp to_float("inf"), do: :inf
defp to_float("-inf"), do: :"-inf"
defp to_float(value) do
{float, _} = Float.parse(value)
float
end
defp to_array(type, value) do
value
|> trim_array()
|> String.split(",")
|> Enum.map(&decode_value(type, &1))
end
defp trim_array(value) do
value
|> String.trim_leading("[")
|> String.trim_trailing("]")
end
end
end