if Code.ensure_loaded?(NimbleCSV) do
defmodule ClickHouse.Format.TSVWithNamesAndTypes do
@moduledoc """
An implementation of the ClickHouse `TSVWithNamesAndTypes` format.
"""
@behaviour ClickHouse.Format
alias ClickHouse.Format.TSV
@parser TSV.Parser
################################
# ClickHouse.Format Callbacks
################################
@impl ClickHouse.Format
@spec names() :: [binary()]
def names, do: ["TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"]
@impl ClickHouse.Format
@spec decode(raw :: iodata()) :: {ClickHouse.Result.columns(), ClickHouse.Result.rows()}
def decode(raw) do
[names, types | rows] = @parser.parse_string(raw, skip_headers: false)
columns = build_columns(names, types)
rows = decode_rows(columns, rows)
{columns, rows}
end
@impl ClickHouse.Format
@spec encode(ClickHouse.data_types(), rows :: list()) :: iodata()
def encode(_types, []), do: []
def encode(_types, [names, types | rows]) do
[@parser.dump_to_iodata([names, types]), TSV.encode([], rows)]
end
def encode(_, _) do
raise ArgumentError, "missing TSV column names and types"
end
################################
# Private API
################################
defp build_columns(names, types) do
types = parse_types(types, [])
Enum.zip([names, types])
end
defp parse_types([], results), do: Enum.reverse(results)
defp parse_types([type | types], results) do
type = ClickHouse.DataType.to_internal(type)
parse_types(types, [type | results])
end
defp decode_rows(columns, rows) do
Enum.map(rows, &decode_row(columns, &1))
end
defp decode_row(columns, row) do
[columns, row]
|> Enum.zip()
|> Enum.map(fn {{_name, type}, value} ->
decode_value(type, value)
end)
end
defguardp is_enum_type(type)
when is_tuple(type) and elem(type, 0) in [:enum8, :enum16]
defguardp is_integer_type(type) when type in [:i64, :i32, :i16, :i8, :u64, :u32, :u16, :u8]
defguardp is_float_type(type) when type in [:f64, :f32]
defguardp is_ignore_type(type) when type in [:string, :uuid] or is_enum_type(type)
defguardp is_quoted_type(type)
when type in [:string, :uuid, :date, :datetime] or is_enum_type(type)
defp decode_value(type, value) when is_integer_type(type), do: to_integer(value)
defp decode_value(type, value) when is_float_type(type), do: to_float(value)
defp decode_value(type, value) when is_ignore_type(type), do: value
defp decode_value(:boolean, "true"), do: true
defp decode_value(:boolean, "false"), do: false
defp decode_value({:fixed_string, length}, value) do
Enum.reduce(1..length, value, fn _, string ->
String.replace(string, "\\0", "", global: false)
end)
end
defp decode_value(:date, value) do
Date.from_iso8601!(value)
end
defp decode_value(:datetime, value) do
[date, time] = String.split(value, " ")
date = Date.from_iso8601!(date)
time = Time.from_iso8601!(time)
{:ok, date_time} = DateTime.new(date, time)
date_time
end
defp decode_value({:nullable, _type}, "\\N"), do: nil
defp decode_value({:nullable, _type}, "NULL"), do: nil
defp decode_value({:nullable, type}, value) do
decode_value(type, value)
end
defp decode_value({:low_cardinality, type}, value) do
decode_value(type, value)
end
defp decode_value({:array, :nothing}, _), do: []
defp decode_value({:array, type}, value) when is_quoted_type(type) do
value
|> trim_array()
|> String.trim_leading("'")
|> String.trim_trailing("'")
|> String.split("','")
|> Enum.map(&decode_value(type, &1))
end
defp decode_value({:array, type}, value) when is_integer_type(type) or is_float_type(type) do
type
|> to_array(value)
|> Enum.reject(&is_nil/1)
end
defp decode_value({:array, {:array, _} = type}, value) do
value
|> trim_array()
|> String.split("],[")
|> Enum.map(fn
"" -> []
v -> decode_value(type, v)
end)
end
defp decode_value({:array, type}, value) do
to_array(type, value)
end
defp decode_value({{:simple_aggregate_function, _}, type}, value) do
decode_value(type, value)
end
defp to_integer(""), do: nil
defp to_integer(value), do: String.to_integer(value)
defp to_float(""), do: nil
defp to_float("nan"), do: :nan
defp to_float("inf"), do: :inf
defp to_float("-inf"), do: :"-inf"
defp to_float(value) do
{float, _} = Float.parse(value)
float
end
defp to_array(type, value) do
value
|> trim_array()
|> String.split(",")
|> Enum.map(&decode_value(type, &1))
end
defp trim_array(value) do
value
|> String.trim_leading("[")
|> String.trim_trailing("]")
end
end
end