lib/ch.ex

defmodule Ch do
  @moduledoc "Minimal HTTP ClickHouse client."
  alias Ch.{Connection, Query, Result}

  @doc """
  Start the connection process and connect to ClickHouse.

  ## Options

    * `:hostname` - server hostname, defaults to `"localhost"`
    * `:port` - HTTP port, defualts to `8123`
    * `:scheme` - HTTP scheme, defaults to `"http"`
    * `:database` - Database, defaults to `"default"`
    * `:username` - Username
    * `:password` - User password
    * `:settings` - Keyword list of ClickHouse settings
    * `:timeout` - HTTP receive timeout in milliseconds
    * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info

  """
  def start_link(opts \\ []) do
    DBConnection.start_link(Connection, opts)
  end

  @doc """
  Returns a supervisor child specification for a DBConnection pool.
  """
  def child_spec(opts) do
    DBConnection.child_spec(Connection, opts)
  end

  @doc """
  Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
  `{:error, Exception.t()}` if there was a database error.

  ## Options

    * `:timeout` - Query request timeout
    * `:settings` - Keyword list of settings
    * `:database` - Database
    * `:username` - Username
    * `:password` - User password

  """
  @spec query(DBConnection.conn(), iodata, params, Keyword.t()) ::
          {:ok, Result.t()} | {:error, Exception.t()}
        when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
  def query(conn, statement, params \\ [], opts \\ []) do
    query = Query.build(statement, opts)

    with {:ok, _query, result} <- DBConnection.execute(conn, query, params, opts) do
      {:ok, result}
    end
  end

  @doc """
  Runs a query and returns the result or raises `Ch.Error` if
  there was an error. See `query/4`.
  """
  @spec query!(DBConnection.conn(), iodata, params, Keyword.t()) :: Result.t()
        when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
  def query!(conn, statement, params \\ [], opts \\ []) do
    query = Query.build(statement, opts)
    DBConnection.execute!(conn, query, params, opts)
  end

  @doc false
  @spec stream(DBConnection.t(), iodata, map | [term], Keyword.t()) :: DBConnection.Stream.t()
  def stream(conn, statement, params \\ [], opts \\ []) do
    query = Query.build(statement, opts)
    DBConnection.stream(conn, query, params, opts)
  end

  @doc false
  @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
  def run(conn, f, opts \\ []) when is_function(f, 1) do
    DBConnection.run(conn, f, opts)
  end

  if Code.ensure_loaded?(Ecto.ParameterizedType) do
    @behaviour Ecto.ParameterizedType

    @impl true
    def type(params), do: {:parameterized, Ch, params}

    @impl true
    def init(opts) do
      clickhouse_type =
        opts[:raw] || opts[:type] ||
          raise ArgumentError, "keys :raw or :type not found in: #{inspect(opts)}"

      Ch.Types.decode(clickhouse_type)
    end

    @impl true
    def load(value, _loader, {:tuple, _types}), do: {:ok, value}
    def load(value, _loader, {:map, _key_type, _value_type}), do: {:ok, value}

    for type <- [:ipv4, :ipv6, :point, :ring, :polygon, :multipolygon] do
      def load(value, _loader, unquote(type)), do: {:ok, value}
    end

    def load(value, _loader, params), do: Ecto.Type.load(base_type(params), value)

    @impl true
    def dump(value, _dumper, {:tuple, types}) do
      process_tuple(types, value, &Ecto.Type.dump/2)
    end

    def dump(value, _dumper, {:map, key_type, value_type}) do
      process_map(value, key_type, value_type, &Ecto.Type.dump/2)
    end

    def dump(value, _dumper, :ipv4) do
      case value do
        {_, _, _, _} -> {:ok, value}
        nil -> {:ok, value}
        _other -> :error
      end
    end

    def dump(value, _loader, :ipv6) do
      case value do
        {_, _, _, _, _, _, _, _} -> {:ok, value}
        nil -> {:ok, value}
        _other -> :error
      end
    end

    def dump(value, _loader, :point) do
      case value do
        {x, y} when is_number(x) and is_number(y) -> {:ok, value}
        nil -> {:ok, value}
        _other -> :error
      end
    end

    def dump(value, _dumper, params), do: Ecto.Type.dump(base_type(params), value)

    @impl true
    def cast(value, {:tuple, types}) do
      with {:ok, value} <- process_tuple(types, value, &Ecto.Type.cast/2) do
        {:ok, List.to_tuple(value)}
      end
    end

    def cast(value, :ipv4) do
      case value do
        {_, _, _, _} -> {:ok, value}
        _ when is_binary(value) -> :inet.parse_ipv4_address(to_charlist(value))
        _ when is_list(value) -> :inet.parse_ipv4_address(value)
        nil -> {:ok, value}
        _ -> :error
      end
    end

    def cast(value, :ipv6) do
      case value do
        {_, _, _, _, _, _, _, _} -> {:ok, value}
        _ when is_binary(value) -> :inet.parse_ipv6_address(to_charlist(value))
        _ when is_list(value) -> :inet.parse_ipv6_address(value)
        nil -> {:ok, value}
        _ -> :error
      end
    end

    def cast(value, :point) do
      case value do
        {x, y} when is_number(x) and is_number(y) -> {:ok, value}
        nil -> {:ok, value}
        _ -> :error
      end
    end

    def cast(value, {:map, key_type, value_type}) do
      with {:ok, value} <- process_map(value, key_type, value_type, &Ecto.Type.cast/2) do
        {:ok, Map.new(value)}
      end
    end

    def cast(value, params), do: Ecto.Type.cast(base_type(params), value)

    @doc false
    def base_type(type)

    def base_type(t) when t in [:string, :boolean, :date], do: t
    def base_type(:date32), do: :date
    def base_type(:datetime), do: :naive_datetime
    def base_type(:uuid), do: Ecto.UUID

    # TODO
    def base_type({:enum8, _mappings}), do: :string
    def base_type({:enum16, _mappings}), do: :string

    for size <- [8, 16, 32, 64, 128, 256] do
      def base_type(unquote(:"i#{size}")), do: :integer
      def base_type(unquote(:"u#{size}")), do: :integer
    end

    for size <- [32, 64] do
      def base_type(unquote(:"f#{size}")), do: :float
    end

    def base_type({:array = a, type}), do: {a, base_type(type)}
    def base_type({:nullable, type}), do: base_type(type)
    def base_type({:low_cardinality, type}), do: base_type(type)
    def base_type({:simple_aggregate_function, _name, type}), do: base_type(type)
    def base_type({:fixed_string, _size}), do: :string
    def base_type({:datetime, "UTC"}), do: :utc_datetime
    def base_type({:datetime64, _precision}), do: :naive_datetime_usec
    def base_type({:datetime64, _precision, "UTC"}), do: :utc_datetime_usec
    def base_type({:decimal = d, _precision, _scale}), do: d

    for size <- [32, 64, 128, 256] do
      def base_type({unquote(:"decimal#{size}"), _scale}), do: :decimal
    end

    def base_type(:point = p), do: {:parameterized, Ch, p}
    def base_type(:ring), do: {:array, base_type(:point)}
    def base_type(:polygon), do: {:array, base_type(:ring)}
    def base_type(:multipolygon), do: {:array, base_type(:polygon)}

    def base_type({:parameterized, Ch, params}), do: base_type(params)

    defp process_tuple(types, values, mapper) when is_tuple(values) do
      process_tuple(types, Tuple.to_list(values), mapper, [])
    end

    defp process_tuple(types, values, mapper) when is_list(values) do
      process_tuple(types, values, mapper, [])
    end

    defp process_tuple(_types, nil = value, _mapper), do: {:ok, value}
    defp process_tuple(_types, _values, _napper), do: :error

    defp process_tuple([t | types], [v | values], mapper, acc) do
      case mapper.(base_type(t), v) do
        {:ok, v} -> process_tuple(types, values, mapper, [v | acc])
        :error = e -> e
      end
    end

    defp process_tuple([], [], _mapper, acc), do: {:ok, :lists.reverse(acc)}
    defp process_tuple(_types, _values, _mapper, _acc), do: :error

    defp process_map(value, key_type, value_type, mapper) when is_map(value) do
      process_map(Map.to_list(value), key_type, value_type, mapper)
    end

    defp process_map(value, key_type, value_type, mapper) when is_list(value) do
      process_map(value, base_type(key_type), base_type(value_type), mapper, [])
    end

    defp process_map(nil = value, _key_type, _value_type, _mapper), do: {:ok, value}

    defp process_map(_value, _key_type, _value_type, _mapper), do: :error

    defp process_map([{k, v} | kvs], key_type, value_type, mapper, acc) do
      with {:ok, k} <- mapper.(key_type, k),
           {:ok, v} <- mapper.(value_type, v) do
        process_map(kvs, key_type, value_type, mapper, [{k, v} | acc])
      else
        :error = e -> e
      end
    end

    defp process_map([], _key_type, _value_type, _mapper, acc), do: {:ok, :lists.reverse(acc)}
    defp process_map(_kvs, _key_type, _value_type, _mapper, _acc), do: :error

    @impl true
    def embed_as(_, _), do: :self

    @impl true
    def equal?(a, b, _), do: a == b
  end
end