lib/ecto/adapters/clickhouse.ex

defmodule Ecto.Adapters.ClickHouse do
  @moduledoc """
  Adapter module for ClickHouse.

  It uses `Ch` for communicating to the database.

  ## Options

  All options can be given via the repository
  configuration:

      config :your_app, YourApp.Repo,
        ...

    * `:hostname` - Server hostname (default: `"localhost"`)
    * `:username` - Username
    * `:password` - User password
    * `:port` - HTTP Server port (default: `8123`)
    * `:scheme` - HTTP scheme (default: `"http"`)
    * `:database` - the database to connect to (default: `"default"`)
    * `:settings` - Keyword list of connection settings
    * `:transport_opts` - Options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info

  """

  @behaviour Ecto.Adapter
  @behaviour Ecto.Adapter.Migration
  @behaviour Ecto.Adapter.Queryable
  @behaviour Ecto.Adapter.Schema
  @behaviour Ecto.Adapter.Storage
  @behaviour Ecto.Adapter.Structure

  @conn __MODULE__.Connection
  @driver :ch

  @impl Ecto.Adapter
  defmacro __before_compile__(_env) do
    quote do
      @doc """
      A convenience function for SQL-based repositories that executes the given query.

      See `Ecto.Adapters.SQL.query/4` for more information.
      """
      def query(sql, params \\ [], opts \\ []) do
        Ecto.Adapters.SQL.query(get_dynamic_repo(), sql, params, opts)
      end

      @doc """
      A convenience function for SQL-based repositories that executes the given query.

      See `Ecto.Adapters.SQL.query!/4` for more information.
      """
      def query!(sql, params \\ [], opts \\ []) do
        Ecto.Adapters.SQL.query!(get_dynamic_repo(), sql, params, opts)
      end

      @doc """
      A convenience function for SQL-based repositories that translates the given query to SQL.

      See `Ecto.Adapters.SQL.to_sql/3` for more information.
      """
      def to_sql(operation, queryable) do
        Ecto.Adapters.ClickHouse.to_sql(operation, queryable)
      end

      @doc """
      A convenience function for SQL-based repositories that forces all connections in the
      pool to disconnect within the given interval.

      See `Ecto.Adapters.SQL.disconnect_all/3` for more information.
      """
      def disconnect_all(interval, opts \\ []) do
        Ecto.Adapters.SQL.disconnect_all(get_dynamic_repo(), interval, opts)
      end

      @doc """
      Similar to `insert_all/2` but with the following differences:

        - accepts rows as streams or lists
        - sends rows as a chunked request
        - doesn't autogenerate ids or does any other preprocessing

      Example:

          Repo.query!("create table ecto_ch_demo(a UInt64, b String) engine Null")

          defmodule Demo do
            use Ecto.Schema

            @primary_key false
            schema "ecto_ch_demo" do
              field :a, Ch, type: "UInt64"
              field :b, :string
            end
          end

          rows = Stream.map(1..100_000, fn i -> %{a: i, b: to_string(i)} end)
          {100_000, nil} = Repo.insert_stream(Demo, rows)

          # schemaless
          {100_000, nil} = Repo.insert_stream("ecto_ch_demo", rows, types: [a: Ch.Types.u64(), b: :string])

      """
      def insert_stream(source_or_schema, rows, opts \\ []) do
        repo = get_dynamic_repo()
        # TODO need it?
        # opts = Ecto.Repo.Supervisor.tuplet(repo, prepare_opts(:insert_all, opts))
        Ecto.Adapters.ClickHouse.Schema.insert_stream(repo, source_or_schema, rows, opts)
      end
    end
  end

  @impl Ecto.Adapter
  def ensure_all_started(config, type) do
    Ecto.Adapters.SQL.ensure_all_started(@driver, config, type)
  end

  @impl Ecto.Adapter
  def init(config) do
    Ecto.Adapters.SQL.init(@conn, @driver, config)
  end

  @impl Ecto.Adapter
  def checkout(meta, opts, fun) do
    Ecto.Adapters.SQL.checkout(meta, opts, fun)
  end

  @impl Ecto.Adapter
  def checked_out?(meta) do
    Ecto.Adapters.SQL.checked_out?(meta)
  end

  @impl Ecto.Adapter
  # TODO cleanup
  def dumpers(:boolean, type), do: [type, &bool_encode/1]
  def dumpers(:uuid, Ecto.UUID), do: [&uuid_encode/1]
  def dumpers(:uuid, type), do: [type, &uuid_encode/1]
  def dumpers(_primitive, type), do: [type]

  defp bool_encode(1), do: {:ok, true}
  defp bool_encode(0), do: {:ok, false}
  defp bool_encode(x), do: {:ok, x}

  defp uuid_encode(uuid), do: Ecto.UUID.cast(uuid)

  @impl Ecto.Adapter
  # TODO cleanup
  def loaders(:binary_id, type), do: [Ecto.UUID, type]
  def loaders(:boolean, type), do: [&bool_decode/1, type]
  def loaders(:float, type), do: [&float_decode/1, type]
  def loaders(_primitive, type), do: [type]

  defp bool_decode(1), do: {:ok, true}
  defp bool_decode(0), do: {:ok, false}

  defp float_decode(%Decimal{} = decimal), do: {:ok, Decimal.to_float(decimal)}

  @impl Ecto.Adapter.Migration
  def supports_ddl_transaction?, do: false

  @impl Ecto.Adapter.Migration
  def lock_for_migrations(_meta, _options, f), do: f.()

  @impl Ecto.Adapter.Migration
  def execute_ddl(meta, definition, opts) do
    Ecto.Adapters.SQL.execute_ddl(meta, @conn, definition, opts)
  end

  @impl Ecto.Adapter.Storage
  defdelegate storage_up(opts), to: Ecto.Adapters.ClickHouse.Storage

  @impl Ecto.Adapter.Storage
  defdelegate storage_down(opts), to: Ecto.Adapters.ClickHouse.Storage

  @impl Ecto.Adapter.Storage
  defdelegate storage_status(opts), to: Ecto.Adapters.ClickHouse.Storage

  @impl Ecto.Adapter.Structure
  defdelegate structure_dump(default, config), to: Ecto.Adapters.ClickHouse.Structure

  @impl Ecto.Adapter.Structure
  defdelegate structure_load(default, config), to: Ecto.Adapters.ClickHouse.Structure

  @impl Ecto.Adapter.Structure
  def dump_cmd(_args, _opts, _config) do
    raise "not implemented"
  end

  @impl Ecto.Adapter.Schema
  def autogenerate(:id), do: nil
  def autogenerate(:embed_id), do: Ecto.UUID.generate()
  def autogenerate(:binary_id), do: Ecto.UUID.bingenerate()

  @impl Ecto.Adapter.Schema
  def insert_all(
        adapter_meta,
        schema_meta,
        header,
        rows,
        on_conflict,
        returning,
        placeholders,
        opts
      ) do
    Ecto.Adapters.ClickHouse.Schema.insert_all(
      adapter_meta,
      schema_meta,
      header,
      rows,
      on_conflict,
      returning,
      placeholders,
      opts
    )
  end

  @impl Ecto.Adapter.Schema
  def insert(adapter_meta, schema_meta, params, _, _, opts) do
    Ecto.Adapters.ClickHouse.Schema.insert(adapter_meta, schema_meta, params, opts)
  end

  @dialyzer {:no_return, update: 6}
  @impl Ecto.Adapter.Schema
  def update(adapter_meta, %{source: source, prefix: prefix}, fields, params, returning, opts) do
    {fields, field_values} = :lists.unzip(fields)
    filter_values = Keyword.values(params)
    sql = @conn.update(prefix, source, fields, params, returning)

    Ecto.Adapters.SQL.struct(
      adapter_meta,
      @conn,
      sql,
      :update,
      source,
      params,
      field_values ++ filter_values,
      :raise,
      returning,
      opts
    )
  end

  @impl Ecto.Adapter.Schema
  def delete(adapter_meta, schema_meta, params, opts) do
    Ecto.Adapters.ClickHouse.Schema.delete(adapter_meta, schema_meta, params, opts)
  end

  @impl Ecto.Adapter.Queryable
  def stream(adapter_meta, query_meta, query, params, opts) do
    Ecto.Adapters.SQL.stream(adapter_meta, query_meta, query, params, opts)
  end

  @impl Ecto.Adapter.Queryable
  def prepare(operation, query), do: {:nocache, {operation, query}}

  @impl Ecto.Adapter.Queryable
  def execute(adapter_meta, query_meta, {:nocache, {operation, query}}, params, opts) do
    sql = prepare_sql(operation, query, params)

    opts =
      case operation do
        :all ->
          [{:command, :select} | put_setting(opts, :readonly, 1)]

        :delete_all ->
          [{:command, :delete} | opts]
      end

    result = Ecto.Adapters.SQL.query!(adapter_meta, sql, params, put_source(opts, query_meta))

    case operation do
      :all ->
        %{num_rows: num_rows, rows: rows} = result
        {num_rows, rows}

      :delete_all ->
        # clickhouse doesn't give us any info on how many rows have been deleted
        {0, nil}
    end
  end

  @doc false
  def to_sql(operation, queryable) do
    queryable =
      queryable
      |> Ecto.Queryable.to_query()
      |> Ecto.Query.Planner.ensure_select(operation == :all)

    {query, _cast_params, dump_params} =
      Ecto.Adapter.Queryable.plan_query(operation, Ecto.Adapters.ClickHouse, queryable)

    sql = Ecto.Adapters.ClickHouse.prepare_sql(operation, query, dump_params)
    {IO.iodata_to_binary(sql), dump_params}
  end

  defp put_setting(opts, key, value) do
    setting = {key, value}
    Keyword.update(opts, :settings, [setting], fn settings -> [setting | settings] end)
  end

  @doc false
  def prepare_sql(:all, query, params), do: @conn.all(query, params)
  def prepare_sql(:update_all, query, params), do: @conn.update_all(query, params)
  def prepare_sql(:delete_all, query, params), do: @conn.delete_all(query, params)

  defp put_source(opts, %{sources: sources}) when is_binary(elem(elem(sources, 0), 0)) do
    {source, _, _} = elem(sources, 0)
    [source: source] ++ opts
  end

  defp put_source(opts, _) do
    opts
  end
end