lib/ch.ex

defmodule Ch do
  @moduledoc "Minimal HTTP ClickHouse client."
  alias Ch.{Connection, Query, Result}

  @type common_option ::
          {:database, String.t()}
          | {:username, String.t()}
          | {:password, String.t()}
          | {:settings, Keyword.t()}
          | {:timeout, timeout}

  @type start_option ::
          common_option
          | {:scheme, String.t()}
          | {:hostname, String.t()}
          | {:port, :inet.port_number()}
          | {:transport_opts, :gen_tcp.connect_option()}
          | DBConnection.start_option()

  @doc """
  Start the connection process and connect to ClickHouse.

  ## Options

    * `:scheme` - HTTP scheme, defaults to `"http"`
    * `:hostname` - server hostname, defaults to `"localhost"`
    * `:port` - HTTP port, defualts to `8123`
    * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
    * `:database` - Database, defaults to `"default"`
    * `:username` - Username
    * `:password` - User password
    * `:settings` - Keyword list of ClickHouse settings
    * `:timeout` - HTTP receive timeout in milliseconds
    * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
    * [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)

  """
  @spec start_link([start_option]) :: GenServer.on_start()
  def start_link(opts \\ []) do
    DBConnection.start_link(Connection, opts)
  end

  @doc """
  Returns a supervisor child specification for a DBConnection pool.

  See `start_link/1` for supported options.
  """
  @spec child_spec([start_option]) :: :supervisor.child_spec()
  def child_spec(opts) do
    DBConnection.child_spec(Connection, opts)
  end

  @type query_option ::
          common_option
          | {:command, Ch.Query.command()}
          | {:headers, [{String.t(), String.t()}]}
          | {:format, String.t()}
          # TODO remove
          | {:encode, boolean}
          | {:decode, boolean}
          | DBConnection.connection_option()

  @doc """
  Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
  `{:error, Exception.t()}` if there was a database error.

  ## Options

    * `:database` - Database
    * `:username` - Username
    * `:password` - User password
    * `:settings` - Keyword list of settings
    * `:timeout` - Query request timeout
    * `:command` - Command tag for the query
    * `:headers` - Custom HTTP headers for the request
    * `:format` - Custom response format for the request
    * `:decode` - Whether to automatically decode the response
    * [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)

  """
  @spec query(DBConnection.conn(), iodata, params, [query_option]) ::
          {:ok, Result.t()} | {:error, Exception.t()}
        when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
  def query(conn, statement, params \\ [], opts \\ []) do
    query = Query.build(statement, opts)

    with {:ok, _query, result} <- DBConnection.execute(conn, query, params, opts) do
      {:ok, result}
    end
  end

  @doc """
  Runs a query and returns the result or raises `Ch.Error` if
  there was an error. See `query/4`.
  """
  @spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t()
        when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
  def query!(conn, statement, params \\ [], opts \\ []) do
    query = Query.build(statement, opts)
    DBConnection.execute!(conn, query, params, opts)
  end

  @doc false
  @spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: Ch.Stream.t()
  def stream(conn, statement, params \\ [], opts \\ []) do
    query = Query.build(statement, opts)
    %Ch.Stream{conn: conn, query: query, params: params, opts: opts}
  end

  # TODO drop
  @doc false
  @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
  def run(conn, f, opts \\ []) when is_function(f, 1) do
    DBConnection.run(conn, f, opts)
  end

  if Code.ensure_loaded?(Ecto.ParameterizedType) do
    @behaviour Ecto.ParameterizedType

    @impl true
    def type(params), do: {:parameterized, Ch, params}

    @impl true
    def init(opts) do
      clickhouse_type =
        opts[:raw] || opts[:type] ||
          raise ArgumentError, "keys :raw or :type not found in: #{inspect(opts)}"

      Ch.Types.decode(clickhouse_type)
    end

    @impl true
    def load(value, _loader, _params), do: {:ok, value}

    @impl true
    def dump(value, _dumper, _params), do: {:ok, value}

    @impl true
    def cast(value, :string = type), do: Ecto.Type.cast(type, value)
    def cast(value, :boolean = type), do: Ecto.Type.cast(type, value)
    def cast(value, :uuid), do: Ecto.Type.cast(Ecto.UUID, value)
    def cast(value, :date = type), do: Ecto.Type.cast(type, value)
    def cast(value, :date32), do: Ecto.Type.cast(:date, value)
    def cast(value, :datetime), do: Ecto.Type.cast(:naive_datetime, value)
    def cast(value, {:datetime, "UTC"}), do: Ecto.Type.cast(:utc_datetime, value)
    def cast(value, {:datetime64, _p}), do: Ecto.Type.cast(:naive_datetime_usec, value)
    def cast(value, {:datetime64, _p, "UTC"}), do: Ecto.Type.cast(:utc_datetime_usec, value)
    def cast(value, {:fixed_string, _s}), do: Ecto.Type.cast(:string, value)

    for size <- [8, 16, 32, 64, 128, 256] do
      def cast(value, unquote(:"i#{size}")), do: Ecto.Type.cast(:integer, value)
      def cast(value, unquote(:"u#{size}")), do: Ecto.Type.cast(:integer, value)
    end

    for size <- [32, 64] do
      def cast(value, unquote(:"f#{size}")), do: Ecto.Type.cast(:float, value)
    end

    def cast(value, {:decimal = type, _p, _s}), do: Ecto.Type.cast(type, value)

    for size <- [32, 64, 128, 256] do
      def cast(value, {unquote(:"decimal#{size}"), _s}) do
        Ecto.Type.cast(:decimal, value)
      end
    end

    def cast(value, {:array, type}), do: Ecto.Type.cast({:array, type(type)}, value)
    def cast(value, {:nullable, type}), do: cast(value, type)
    def cast(value, {:low_cardinality, type}), do: cast(value, type)
    def cast(value, {:simple_aggregate_function, _name, type}), do: cast(value, type)

    def cast(value, :ring), do: Ecto.Type.cast({:array, type(:point)}, value)
    def cast(value, :polygon), do: Ecto.Type.cast({:array, type(:ring)}, value)
    def cast(value, :multipolygon), do: Ecto.Type.cast({:array, type(:polygon)}, value)

    def cast(nil, _params), do: {:ok, nil}

    def cast(value, {enum, mappings}) when enum in [:enum8, :enum16] do
      result =
        case value do
          _ when is_integer(value) -> List.keyfind(mappings, value, 1, :error)
          _ when is_binary(value) -> List.keyfind(mappings, value, 0, :error)
          _ -> :error
        end

      case result do
        {_, _} -> {:ok, value}
        :error = e -> e
      end
    end

    def cast(value, :ipv4) do
      case value do
        {a, b, c, d} when is_number(a) and is_number(b) and is_number(c) and is_number(d) ->
          {:ok, value}

        _ when is_binary(value) ->
          with {:error = e, _reason} <- :inet.parse_ipv4_address(to_charlist(value)), do: e

        _ when is_list(value) ->
          with {:error = e, _reason} <- :inet.parse_ipv4_address(value), do: e

        _ ->
          :error
      end
    end

    def cast(value, :ipv6) do
      case value do
        {a, s, d, f, g, h, j, k}
        when is_number(a) and is_number(s) and is_number(d) and is_number(f) and
               is_number(g) and is_number(h) and is_number(j) and is_number(k) ->
          {:ok, value}

        _ when is_binary(value) ->
          with {:error = e, _reason} <- :inet.parse_ipv6_address(to_charlist(value)), do: e

        _ when is_list(value) ->
          with {:error = e, _reason} <- :inet.parse_ipv6_address(value), do: e

        _ ->
          :error
      end
    end

    def cast(value, :point) do
      case value do
        {x, y} when is_number(x) and is_number(y) -> {:ok, value}
        _ -> :error
      end
    end

    def cast(value, {:tuple, types}), do: cast_tuple(types, value)
    def cast(value, {:map, key_type, value_type}), do: cast_map(value, key_type, value_type)

    defp cast_tuple(types, values) when is_tuple(values) do
      cast_tuple(types, Tuple.to_list(values), [])
    end

    defp cast_tuple(types, values) when is_list(values) do
      cast_tuple(types, values, [])
    end

    defp cast_tuple(_types, _values), do: :error

    defp cast_tuple([type | types], [value | values], acc) do
      case cast(value, type) do
        {:ok, value} -> cast_tuple(types, values, [value | acc])
        :error = e -> e
      end
    end

    defp cast_tuple([], [], acc), do: {:ok, List.to_tuple(:lists.reverse(acc))}
    defp cast_tuple(_types, _values, _acc), do: :error

    defp cast_map(value, key_type, value_type) when is_map(value) do
      cast_map(Map.to_list(value), key_type, value_type)
    end

    defp cast_map(value, key_type, value_type) when is_list(value) do
      cast_map(value, key_type, value_type, [])
    end

    defp cast_map(_value, _key_type, _value_type), do: :error

    defp cast_map([{key, value} | kvs], key_type, value_type, acc) do
      with {:ok, key} <- cast(key, key_type),
           {:ok, value} <- cast(value, value_type) do
        cast_map(kvs, key_type, value_type, [{key, value} | acc])
      end
    end

    defp cast_map([], _key_type, _value_type, acc), do: {:ok, Map.new(acc)}
    defp cast_map(_kvs, _key_type, _value_type, _acc), do: :error

    @impl true
    def embed_as(_, _), do: :self

    @impl true
    def equal?(a, b, _), do: a == b

    @impl true
    def format(params) do
      "#Ch<#{Ch.Types.encode(params)}>"
    end
  end
end