lib/ecto/adapters/qlc.ex

defmodule EctoQLC.Adapters.QLC do
  @moduledoc ~S"""
  This application provides functionality for working with Erlang databases in `Ecto`.

  ## Built-in adapters

    * `EctoQLC.Adapters.DETS` for [`dets`](https://www.erlang.org/doc/man/dets.html)
    * `EctoQLC.Adapters.ETS` for [`ets`](https://www.erlang.org/doc/man/ets.html)
    * `EctoQLC.Adapters.Mnesia` for [`mnesia`](https://www.erlang.org/doc/man/mnesia.html)

  ## Migrations

  `ecto_qlc` supports `ecto_sql` database migrations, currently none of the adapters support constraints, unique index or multi column index.
  """
  require Kernel
  require Logger
  alias Ecto.Migration.Table
  alias Ecto.Migration.Index
  alias Ecto.Migration.Constraint

  @doc false
  defmacro __using__(opts) do
    quote do
      @behaviour Ecto.Adapter
      @behaviour Ecto.Adapter.Migration
      @behaviour Ecto.Adapter.Structure
      @behaviour Ecto.Adapter.Queryable
      @behaviour Ecto.Adapter.Schema
      @behaviour Ecto.Adapter.Storage
      @behaviour Ecto.Adapter.Transaction

      alias __MODULE__

      opts = unquote(opts)
      @driver Keyword.fetch!(opts, :driver)

      @impl Ecto.Adapter
      def ensure_all_started(_config, _type) do
        case Application.ensure_all_started(@driver) do
          {:ok, []} -> {:ok, [@driver]}
          {:ok, [@driver]} -> {:ok, [@driver]}
          {:error, _reason} -> {:ok, [@driver]}
        end
      end

      @impl Ecto.Adapter
      def init(config) do
        EctoQLC.Adapters.QLC.init(config, @driver)
      end

      @impl Ecto.Adapter
      def checkout(%{pid: pid}, _opts, fun) do
        Process.put({__MODULE__, pid}, true)
        result = fun.()
        Process.delete({__MODULE__, pid})
        result
      end

      @impl Ecto.Adapter
      def checked_out?(%{pid: pid} = _adapter_meta) do
        Process.get({__MODULE__, pid}) != nil
      end

      @impl Ecto.Adapter
      defmacro __before_compile__(_env), do: :ok

      @impl Ecto.Adapter
      def loaders({:map, _}, type),   do: [&Ecto.Type.embedded_load(type, &1, :json)]
      def loaders(match, type) when match in ~w[binary_id embed_id]a, do: [Ecto.UUID, type]
      def loaders(_, type), do: [type]

      @impl Ecto.Adapter
      def dumpers({:map, _}, type),   do: [&Ecto.Type.embedded_load(type, &1, :json)]
      def dumpers({:array, {:array, value}}, _type), do: [{:in, value}]
      def dumpers(match, type) when match in ~w[binary_id embed_id]a, do: [type, Ecto.UUID]
      def dumpers(_, type), do: [type]

      @impl Ecto.Adapter.Queryable
      def prepare(operation, %Ecto.Query{} = query) do
        {:nocache, {operation, query}}
      end

      @impl Ecto.Adapter.Queryable
      def execute(adapter_meta, query_meta, {:nocache, query}, params, options) do
        EctoQLC.Adapters.QLC.execute(adapter_meta, query_meta, query, params, options)
      end

      @impl Ecto.Adapter.Queryable
      def stream(adapter_meta, query_meta, {:nocache, query}, params, options) do
        EctoQLC.Adapters.QLC.stream(adapter_meta, query_meta, query, params, options)
      end

      @impl Ecto.Adapter.Schema
      def delete(adapter_meta, schema_meta, filters, returning, options) do
        EctoQLC.Adapters.QLC.delete(@driver, adapter_meta, schema_meta, filters, returning, options)
      end

      @impl Ecto.Adapter.Schema
      def insert(adapter_meta, schema_meta, fields, on_conflict, returning, options) do
        EctoQLC.Adapters.QLC.insert(@driver, adapter_meta, schema_meta, fields, on_conflict, returning, options)
      end

      @impl Ecto.Adapter.Schema
      def insert_all(adapter_meta, schema_meta, header, list, on_conflict, returning, placeholders, options) do
        EctoQLC.Adapters.QLC.insert_all(@driver, adapter_meta, schema_meta, header, list, on_conflict, returning, placeholders, options)
      end

      @impl Ecto.Adapter.Schema
      def update(adapter_meta, schema_meta, fields, filters, returning, options) do
        EctoQLC.Adapters.QLC.update(@driver, adapter_meta, schema_meta, fields, filters, returning, options)
      end

      @impl Ecto.Adapter.Schema
      # TODO :id should be based on the table size, since restarting the system can give the same id
      def autogenerate(:id), do: System.unique_integer([:positive, :monotonic])
      def autogenerate(:embed_id), do: Ecto.UUID.generate()
      def autogenerate(:binary_id), do: Ecto.UUID.bingenerate()

      @impl Ecto.Adapter.Migration
      def execute_ddl(adapter_meta, command, options) do
        EctoQLC.Adapters.QLC.execute_ddl(adapter_meta, command, options)
      end

      @impl Ecto.Adapter.Migration
      def lock_for_migrations(adapter_meta, options, fun) do
        EctoQLC.Adapters.QLC.lock_for_migrations(adapter_meta, options, fun)
      end

      @impl Ecto.Adapter.Migration
      def supports_ddl_transaction?(), do: false

      @impl Ecto.Adapter.Transaction
      def transaction(adapter_meta, opts, fun) do
        Process.put({adapter_meta.pid, :transaction}, true)
        v = fun.()
        Process.put({adapter_meta.pid, :transaction}, false)
        {:ok, v}
      end

      @impl Ecto.Adapter.Transaction
      def in_transaction?(adapter_meta), do: Process.get({adapter_meta.pid, :transaction}, false)

      @impl Ecto.Adapter.Transaction
      def rollback(adapter_meta, %_schema{} = value) do
        if in_transaction?(adapter_meta) do
          throw(value)
        else
          raise "cannot call rollback outside of transaction"
        end
      end

      @impl Ecto.Adapter.Structure
      def dump_cmd(args, opts, config) do
        EctoQLC.Adapters.QLC.dump_cmd(args, opts, config, @driver)
      end

      @impl Ecto.Adapter.Structure
      def structure_dump(default, config) do
        EctoQLC.Adapters.QLC.structure_dump(default, config, @driver)
      end

      @impl Ecto.Adapter.Structure
      def structure_load(default, config) do
        EctoQLC.Adapters.QLC.structure_load(default, config, @driver)
      end

      defoverridable [prepare: 2, execute: 5, stream: 5, execute_ddl: 3, loaders: 2, dumpers: 2, checked_out?: 1, checkout: 3, autogenerate: 1, ensure_all_started: 2, __before_compile__: 1, lock_for_migrations: 3, supports_ddl_transaction?: 0, transaction: 3, in_transaction?: 1, rollback: 2]
    end
  end

  @doc false
  def dump_cmd(_args, _opts, _config, _driver) do
    {"not_implemnted", 1}
  end

  @doc false
  def structure_dump(_default, _config, _driver) do
    {:error, "not_implemnted"}
  end

  @doc false
  def structure_load(_default, _config, _driver) do
    {:error, "not_implemnted"}
  end

  @doc false
  def init(config, :mnesia = driver) do
    dir = '#{Keyword.fetch!(config, :dir)}'
    File.mkdir_p!(dir)
    Application.put_env(:mnesia, :dir, dir)
    log = Keyword.get(config, :log, :debug)
    stacktrace = Keyword.get(config, :stacktrace, nil)
    telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
    telemetry = {config[:repo], log, telemetry_prefix ++ [:query]}
    {:ok, DynamicSupervisor.child_spec(strategy: :one_for_one, name: Module.concat([config[:repo], driver])), %{telemetry: telemetry, stacktrace: stacktrace, opts: config}}
  end

  def init(config, driver) do
    log = Keyword.get(config, :log, :debug)
    stacktrace = Keyword.get(config, :stacktrace, nil)
    telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
    telemetry = {config[:repo], log, telemetry_prefix ++ [:query]}
    {:ok, DynamicSupervisor.child_spec(strategy: :one_for_one, name: Module.concat([config[:repo], driver])), %{telemetry: telemetry, stacktrace: stacktrace, opts: config}}
  end

  @aggregates ~w[avg count max min sum]a
  @operators ~w[or and > < >= <= == === != + - * /]a

  @doc false
  def execute(%{adapter: EctoQLC.Adapters.Mnesia} = adapter_meta, query_meta, {operator, query}, params, options) do
    ## TODO Improve error when database does not exists
    prepareed = prepare(adapter_meta, query_meta, query, params, options)
    qlc = to_qlc(operator, prepareed)
    :mnesia.transaction(fn ->
      if lock = elem(prepareed, 2).lock do
        :mnesia.lock(elem(lock, 0), elem(lock, 1))
      end
      query_handle = to_query_handle(operator, prepareed, qlc)
      {query_time, values} = :timer.tc(:qlc, :eval, [query_handle, []])
      {decode_time, value} = :timer.tc(__MODULE__, :select, [values, operator, prepareed])
      log(value, get_source(query.sources), qlc, query_time, decode_time, 0, 0, operator, adapter_meta.telemetry, params, query, options ++ adapter_meta.opts)
    end)
    |> elem(1)
  end
  def execute(adapter_meta, query_meta, {operator, query}, params, options) do
    prepareed = prepare(adapter_meta, query_meta, query, params, options)
    qlc = to_qlc(operator, prepareed)
    query_handle = to_query_handle(operator, prepareed, qlc)
    {query_time, values} = :timer.tc(:qlc, :eval, [query_handle, []])
    {decode_time, value} = :timer.tc(__MODULE__, :select, [values, operator, prepareed])
    log(value, get_source(query.sources), qlc, query_time, decode_time, 0, 0, operator, adapter_meta.telemetry, params, query, options ++ adapter_meta.opts)
  end

  @doc false
  def stream(adapter_meta, query_meta, {operator, query}, params, options) do
    key = :erlang.timestamp
    prepareed = prepare(adapter_meta, query_meta, query, params, options)
    qlc = to_qlc(operator, prepareed)
    query_handle = to_query_handle(operator, prepareed, qlc)
    Stream.resource(
      fn ->
      Process.put(key, :erlang.timestamp)
      :qlc.cursor(query_handle, elem(prepareed, 4))
      end,
      fn cursor ->
        case :qlc.next_answers(cursor, options[:max_rows] || 500) do
          []   -> {:halt, cursor}
          rows ->  {[select(rows, :all, prepareed)], cursor}
        end
      end,
      fn cursor ->
        result = :qlc.delete_cursor(cursor)
        query_time = :timer.now_diff(:erlang.timestamp, Process.get(key))
        log(result, get_source(query.sources), qlc, query_time, 0, 0, 0, operator, adapter_meta.telemetry, params, query, options ++ adapter_meta.opts)
      end
    )
  end

  @doc false
  def get_source({%Ecto.SubQuery{} = subquery}), do: get_source(subquery.query.sources)
  def get_source({source, _module_, _prefix}) when is_binary(source), do: source
  def get_source(source) when is_tuple(source), do: get_source(elem(source, 0))

  @creates [:create, :create_if_not_exists]
  @drops [:drop, :drop_if_exists]

  @doc false
  def execute_ddl(adapter_meta, {_command, %Constraint{}}, _options) do
    {:ok, [{:warn, "#{adapter_meta.adapter} adapter does not support CONSTRAINT commands", []}]}
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.DETS} = adapter_meta, {command, %Table{} = table, _columns}, options) when command in @creates do
    options = Keyword.merge(adapter_meta.opts, List.wrap(table.options) ++ options)
    table = to_table(adapter_meta, table.name, table.prefix, options)
    file = if dir = Application.get_env(:dets, :dir), do: Path.join(dir, "#{table}"), else: table
    options = Keyword.take(Keyword.merge([file: '#{file}'],  options), ~w[access auto_save estimated_no_objects file max_no_slots min_no_slots keypos ram_file repair type]a)
    case :dets.open_file(table, options) do
      {:ok, ^table}   ->
        Enum.map(:dets.all, &:dets.sync/1)
        {:ok, []}
      {:error, reason} -> {:ok, [{:warn, "#{inspect(:mnesia.error_description(reason))}", []}]}
    end
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.ETS} = adapter_meta, {command, %Table{} = table, _columns}, options) when command in @creates do
    options = Keyword.merge([write_concurrency: true, read_concurrency: true], Keyword.merge(adapter_meta.opts, List.wrap(table.options) ++ options))
    options = [:set, :public, :named_table, {:heir, GenServer.whereis(:__ecto_qlc__) || self(), %{}}] ++ Keyword.take(options, ~w[write_concurrency read_concurrency keypos heir heir decentralized_counters compressed]a)
    table = to_table(adapter_meta, table.name, table.prefix, options)
    with :undefined <- :ets.info(table),
         ^table     <- :ets.new(table, options) do
      {:ok, []}
        else
        _ ->
      {:ok, []}
    end
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.Mnesia} = adapter_meta, {command, %Table{} = table, columns}, options) when command in @creates do
    options = if "schema_migrations" == table.name do
      [disc_only_copies: [node() | Node.list]]
    else
      Keyword.take(Keyword.merge(adapter_meta.opts, List.wrap(table.options) ++ options), ~w[disc_copies access_mode disc_only_copies index load_order majority ram_copies record_name snmp storage_properties type local_content]a)
    end
    table = to_table(adapter_meta, table.name, table.prefix, options)
    primary_keys = Enum.count(columns, fn {_command, _column, _type, options} -> options[:primary_key] == true end)
    attributes = Enum.reject(columns, fn {_command, _column, _type, options} -> options[:primary_key] == true end) |> Enum.map(&elem(&1, 1))
    attributes = if primary_keys > 1, do: [:primary_keys | attributes], else: Enum.map(columns, &elem(&1, 1))

    with :ok <- :mnesia.start(),
         {:atomic, :ok} <- :mnesia.create_table(table, [{:attributes, attributes} | options]) do
          {:ok, []}
    else
      {:aborted, {:already_exists, ^table}} -> {:ok, []}

      {status, reason} when status in ~w[error aborted]a  -> {:ok, [{:warn, "#{inspect(:mnesia.error_description(reason))}", []}]}
    end
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.DETS} = adapter_meta, {command, %Table{} = table, _columns}, options) when command in @drops do
    table = to_table(adapter_meta, table.name, table.prefix, options)
    case :dets.close(table) do
      :ok -> {:ok, []}
      {:error, resoan} -> {:ok, [{:warn, "#{inspect(resoan)}", []}]}
    end
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.ETS} = adapter_meta, {command, %Table{} = table, _columns}, options) when command in @drops do
    table = to_table(adapter_meta, table.name, table.prefix, options)
    :ets.delete(table)
    {:ok, []}
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.Mnesia} = adapter_meta, {command, %Table{} = table, _columns}, options) when command in @drops do
    case :mnesia.delete_table(to_table(adapter_meta, table.name, table.prefix, options)) do
      {:atomic, :ok} -> {:ok, []}
      {:aborted, resoan} -> {:ok, [{:warn, "#{inspect(resoan)}", []}]}
    end
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.Mnesia} = adapter_meta, {:alter, %Table{} = table, changes}, options) do
    table = to_table(adapter_meta, table.name, table.prefix, options)
    attributes = :mnesia.table_info(table, :attributes)
    new_attributes = Enum.reduce(changes, attributes, fn change, attributes -> update_attributes(change, attributes) end)
    with true <- attributes != new_attributes,
      {:atomic, :ok} <- :mnesia.transform_table(table, &Enum.reduce(changes, &1, fn change, row -> update_row(adapter_meta, row, change, attributes) end), new_attributes) do
      {:ok, []}
    else
      false -> {:ok, []}
      {status, reason} when status in ~w[error aborted]a -> {:ok, [{:warn, "#{inspect(:mnesia.error_description(reason))}", []}]}
    end
  end
  def execute_ddl(adapter_meta, {:alter, %Table{} = _table, _changes}, _options) do
    {:ok, [{:warn, "#{adapter_meta.adapter} adapter does not support alter", []}]}
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.Mnesia} = adapter_meta, {command, %Index{columns: [column]} = index}, options) when command in @creates do
    case :mnesia.add_table_index(to_table(adapter_meta, index.name, index.prefix, options), column) do
      {:atomic, :ok} -> {:ok, []}
      {:aborted, {:already_exists, _table, _}} when command == :create_if_not_exists -> {:ok, []}
      {:aborted, {:already_exists, _table, _}} -> raise "index already exists"
      {:aborted, resoan} -> {:ok, [{:warn, "#{inspect(resoan)}", []}]}
    end
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.Mnesia} = adapter_meta, {command, %Index{columns: [column]} = index, _mode}, options) when command in @drops do
    case :mnesia.del_table_index(to_table(adapter_meta, index.name, index.prefix, options), column) do
      {:atomic, :ok} -> {:ok, []}
      {:aborted, {:no_exists, _table, _}} when command == :drop_if_exists -> {:ok, []}
      {:aborted, {:no_exists, _table, _}} -> raise "index does not exists"
      {:aborted, resoan} -> {:ok, [{:warn, "#{inspect(resoan)}", []}]}
    end
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.Mnesia}, {_command, %Index{}}, _options) do
    {:ok, [{:warn, "Mnesia adapter does not support index with multiply columns", []}]}
  end
  def execute_ddl(adapter_meta, {_command, %Index{}}, _options) do
    {:ok, [{:warn, "#{adapter_meta.adapter} adapter does not support index", []}]}
  end
  def execute_ddl(adapter_meta, {_command, %Index{}, _mode}, _options) do
    {:ok, [{:warn, "#{adapter_meta.adapter} adapter does not support index", []}]}
  end
  def execute_ddl(adapter_meta, {:rename, %Table{} = _current_table, %Table{} = _new_table}, _options) do
    # Since table name always stays the same then we need to copy the table into a newley created one.
    # There might be some limitations with also being aware of indexes
    # So for now we just gonna warn
    # current_table = to_table(adapter_meta, current_table.name, current_table.prefix, options)
    # new_table = to_table(adapter_meta, new_table.name, new_table.prefix, options)
    # case :mnesia.transform_table(current_table, &:erlang.setelement(1, &1, new_table), :mnesia.table_info(current_table, :attributes), new_table) do
    #   {:atomic, :ok} -> {:ok, []}
    #   {status, reason} when status in ~w[error aborted]a -> {:ok, [{:warn, "#{inspect(:mnesia.error_description(reason))}", []}]}
    # end
    {:ok, [{:warn, "#{adapter_meta.adapter} adapter does not support RENAME table commands", []}]}
  end
  def execute_ddl(%{adapter: EctoQLC.Adapters.Mnesia} = adapter_meta, {:rename, %Table{} = table, current_column, new_column}, options) do
    table = to_table(adapter_meta, table.name, table.prefix, options)
    attributes = Enum.map(:mnesia.table_info(table, :attributes), fn
      ^current_column -> new_column
      column -> column
    end)
    case :mnesia.transform_table(table, &(&1), attributes) do
      {:atomic, :ok} -> {:ok, []}
      {status, reason} when status in ~w[error aborted]a -> {:ok, [{:warn, "#{inspect(:mnesia.error_description(reason))}", []}]}
    end
  end
  def execute_ddl(adapter_meta, {:rename, %Table{} = _table, _current_column, _new_column}, _options) do
    {:ok, [{:warn, "#{adapter_meta.adapter} adapter does not support RENAME column commands", []}]}
  end
  def execute_ddl(adapter_meta, command, _options) when is_binary(command) do
    raise "#{adapter_meta.adapter} adapter does not support binary in execute"
  end
  def execute_ddl(adapter_meta, command, _options) when is_list(command) do
    raise "#{adapter_meta.adapter} adapter does not support keyword lists in execute"
  end

  @doc false
  def lock_for_migrations(%{adapter: EctoQLC.Adapters.Mnesia}, _options, fun) do
    with :ok   <- :global.sync(),
         :ok   <- :mnesia.start(),
         value when value != :aborted <- :global.trans({:lock_for_migrations, __MODULE__}, fun),
         v when v in [:ok, {:error, :no_such_log}] <- :mnesia.sync_log(),
         true  <- :global.del_lock({:lock_for_migrations, __MODULE__}) do
      value
    else
      reason -> {:error, reason}
    end
  end
  def lock_for_migrations(%{adapter: EctoQLC.Adapters.DETS}, _options, fun) do
    with :ok   <- :global.sync(),
      value when value != :aborted <- :global.trans({:lock_for_migrations, __MODULE__}, fun),
      _     <- Enum.map(:dets.all, &:dets.sync/1),
      true  <- :global.del_lock({:lock_for_migrations, __MODULE__}) do
      value
    else
      reason -> {:error, reason}
    end
  end
  def lock_for_migrations(_adapter_meta, _options, fun) do
    with :ok   <- :global.sync(),
      value when value != :aborted <- :global.trans({:lock_for_migrations, __MODULE__}, fun),
      true  <- :global.del_lock({:lock_for_migrations, __MODULE__}) do
      value
    else
      reason -> {:error, reason}
    end
  end

  defp update_attributes({command, column, _type, _options}, attributes) when command in [:remove_if_exists, :remove] do
    attributes -- [column]
  end
  defp update_attributes({command, column, _type, _options}, attributes) when command in [:add_if_not_exists, :add] do
    if column in attributes do
      attributes
    else
      attributes ++ [column]
    end
  end
  defp update_attributes(_change, attributes), do: attributes

  @doc false
  def update_row(_adapter_meta, row, {:add, _column, _type, options}, _attributes) do
    Tuple.append(row, options[:default])
  end
  def update_row(_adapter_meta, row, {:add_if_not_exists, column, _type, options}, attributes) do
    if column not in attributes, do: Tuple.append(row, options[:default]), else: row
  end
  def update_row(_adapter_meta, row, {:remove, column, _type, _options}, attributes) do
    Tuple.delete_at(row, Enum.find_index(attributes, &(&1 == column)))
  end
  def update_row(_adapter_meta, row, {:remove_if_exists, column, _type, _options}, attributes) do
    if column in attributes, do: Tuple.delete_at(row, Enum.find_index(attributes, &(&1 == column))), else: row
  end
  def update_row(_adapter_meta, row, {:modify, column, type, options}, attributes) do
    idx = Enum.find_index(attributes, &(&1 == column))
    :erlang.setelement(idx, row, cast(elem(row, idx), options[:from], type))
  end
  def update_row(adapter_meta, schema, fields, row) do
    Enum.reduce(fields, row, fn {column, value}, row ->
      idx = get_index(adapter_meta, column, schema.__schema__(:fields), schema.__schema__(:primary_key))
      :erlang.setelement(idx, row, value)
    end)
  end

  defp get_key([], [primary_key | _ ], fields), do: fields[primary_key]
  defp get_key([primary_key], _columns, fields), do: fields[primary_key]
  defp get_key(primary_keys, _columns, fields), do: Enum.reduce(primary_keys, {}, &Tuple.insert_at(&2, tuple_size(&2), fields[&1]))

  defp cast(value, _from, :list), do: '#{value}'
  defp cast(value, _from, :string), do: "#{value}"
  defp cast(value, _from, :integer) when is_binary(value) or is_list(value) do
    case Integer.parse("#{value}") do
      {integer, ""} -> integer
      result -> raise "Could no parse #{value} to integer got: #{inspect(result)}"
    end
  end
  defp cast(value, _from, :float) when is_binary(value) or is_list(value) do
    case Float.parse("#{value}") do
      {float, ""} -> float
      result -> raise "Could no parse #{value} to float got: #{inspect(result)}"
    end
  end

  defp bindings(params, bindings \\ :erl_eval.new_bindings()) do
    params
    |> Enum.reduce({length(bindings), bindings}, fn v, {count, bindings} ->
      count = count + 1
      {count, :erl_eval.add_binding(:"PARAM#{count}", v, bindings)}
    end)
    |> elem(1)
  end

  @doc false
  def coalesce(nil, nil), do: nil
  def coalesce(nil, right), do: right
  def coalesce(left, nil), do: left
  def coalesce(left, _), do: left

  @doc false
  def like(left, right) do
    String.match?(left, Regex.compile!(right))
  end

  @doc false
  def ilike(left, right) do
    String.match?(left, Regex.compile!(right, [:caseless]))
  end

  @doc false
  def to_match_spec(adapter_meta, schema, filters) do
    primary_key = schema.__schema__(:primary_key)
    columns = schema.__schema__(:fields) -- primary_key
    key = if length(primary_key) > 1, do: Enum.reduce(primary_key, {}, &Tuple.insert_at(&2, tuple_size(&2), filters[&1])), else: filters[hd(primary_key)]
    row = if adapter_meta.adapter == EctoQLC.Adapters.Mnesia, do: {to_table(adapter_meta, schema.__schema__(:source), schema.__schema__(:prefix), []), key}, else: {key}
    head = Enum.reduce(columns, row, fn column, head -> Tuple.insert_at(head, tuple_size(head), filters[column] || :"$#{tuple_size(head)}") end)
    body = if adapter_meta.adapter == EctoQLC.Adapters.Mnesia, do: [{head}], else: [true]
    conditions = []
    match_spec = [{head, conditions, body}]
    case :ets.test_ms(head, match_spec) do
      {:error, reason} -> raise RuntimeError, "invalid MatchSpec: #{inspect reason}"
      _ -> match_spec
    end
  end

  @doc false
  def prepare(%{adapter: adapter}, _query_meta, %Ecto.Query{lock: lock} = query, _params, _options) when not is_nil(lock) and adapter != EctoQLC.Adapters.Mnesia do
    raise Ecto.QueryError, query: query, message: "#{List.last(Module.split(adapter))} adapter does not support locks"
  end
  def prepare(_adapter_meta, _query_meta, %Ecto.Query{with_ctes: with_ctes} = query, _params, _options) when not is_nil(with_ctes) do
    raise Ecto.QueryError, query: query, message: "QLC adapter does not support CTE"
  end
  def prepare(_adapter_meta, _query_meta, %Ecto.Query{windows: windows} = query, _params, _options) when windows != [] do
    raise Ecto.QueryError, query: query, message: "QLC adapter does not support windows"
  end
  def prepare(_adapter_meta, _query_meta, %Ecto.Query{combinations: combinations} = query, _params, _options) when combinations != [] do
    raise Ecto.QueryError, query: query, message: "QLC adapter does not support combinations like: #{Enum.map_join(combinations, ", ", fn {k, _} -> k end)}"
  end
  def prepare(adapter_meta, query_meta, %Ecto.Query{} = query, params, options) do
    if query.select && Enum.any?(query.select.fields, &has_fragment/1), do: raise(Ecto.QueryError, query: query, message: "QLC adapter does not support fragemnt in select clauses")
    if query.wheres |> Enum.flat_map(&(&1.subqueries)) |> Enum.any?(&has_parent_as/1), do: raise(Ecto.QueryError, query: query, message: "QLC adapter does not support parent_as in a subquery's where clauses")
    options = options(query, options)
    prefix = options[:prefix] || query.from.prefix || query.prefix
    order_bys = if query.distinct && Keyword.keyword?(query.distinct.expr), do: [query.distinct | query.order_bys], else: query.order_bys
    {adapter_meta, query_meta, %{query |
      order_bys: order_bys,
      group_bys: group_bys(query),
      updates: updates(adapter_meta, query, params),
      offset: offset(query, params),
      lock: lock(adapter_meta, query, prefix),
      limit: limit(adapter_meta, query, prefix, params),
     }, params, options}
  end

  defp lock(_adapter_meta, %Ecto.Query{lock: nil}, _prefix), do: nil
  defp lock(adapter_meta, %Ecto.Query{lock: "write", from: %{source: {source, _module}}}, prefix), do: {{:table, to_table(adapter_meta, source, prefix, [])}, :write}
  defp lock(adapter_meta, %Ecto.Query{lock: "read", from: %{source: {source, _module}}}, prefix), do: {{:table, to_table(adapter_meta, source, prefix, [])}, :read}
  defp lock(adapter_meta, %Ecto.Query{lock: "sticky_write", from: %{source: {source, _module}}}, prefix), do: {{:table, to_table(adapter_meta, source, prefix, [])}, :sticky_write}
  defp lock(_adapter_meta, %Ecto.Query{lock: lock} = query, _prefix), do: raise(Ecto.QueryError, query: query, message: "Unsupported lock: #{inspect lock}, supported locks: write, read, stickey_write")

  defp offset(%Ecto.Query{offset: %{expr: {:^, _, [idx]}}}, params), do: Enum.at(params, idx, idx)
  defp offset(%Ecto.Query{offset: %{expr: expr}}, _params), do: expr
  defp offset(%Ecto.Query{}, _params), do: 0

  defp limit(adapter_meta, %Ecto.Query{limit: nil, from: %{source: {source, _module}}}, prefix, _params) do
    mod = :"#{String.downcase(List.last(Module.split(adapter_meta.adapter)))}"
    fun = if mod == :mnesia, do: :table_info, else: :info
    case apply(mod, fun, [to_table(adapter_meta, source, prefix, adapter_meta.opts), :size]) do
      :undefined -> 1
      0 -> 500
      limit -> limit
    end
  end
  defp limit(_adapter_meta, %Ecto.Query{limit: %{expr: {:^, _, [idx]}}}, _prefix, params), do: Enum.at(params, idx, idx)
  defp limit(_adapter_meta, %Ecto.Query{limit: %{expr: expr}}, _prefix, _params), do: expr
  defp limit(_adapter_meta, %Ecto.Query{limit: limit}, _prefix, _params), do: limit || 500

  defp group_bys(%Ecto.Query{group_bys: group_bys, sources: sources}) do
    Enum.reduce(group_bys, [], fn
      %Ecto.Query.QueryExpr{expr: [{:selected_as, [], [:date]}]}, acc -> acc

      %Ecto.Query.QueryExpr{expr: expr}, acc ->
        acc ++  for {{:., _, [{:&, _, [idx]}, column]}, _, _} <- expr do
          module = elem(elem(sources, idx), 1)
          {column, get_index(%{adapter: nil}, column, module.__schema__(:fields), module.__schema__(:primary_key))}
        end
    end)
  end

  defp updates(_adapter_meta, %Ecto.Query{updates: [] = updates}, _params), do: updates
  defp updates(adapter_meta, %Ecto.Query{from: %{source: {_source, module}}, updates: updates}, params) do
    addition = if adapter_meta.adapter == EctoQLC.Adapters.Mnesia, do: 2, else: 1
    columns = module.__schema__(:fields)
    Enum.flat_map(updates, fn %Ecto.Query.QueryExpr{expr: [set: set]} ->
      Enum.map(set, fn
        {column, {:^, _, [idx]}} -> {column, {Enum.find_index(columns, &(&1 == column)) + addition, Enum.at(params, idx)}}
        {column, value} -> {column, {Enum.find_index(columns, &(&1 == column)) + addition, value}}
      end)
    end)
  end

  # Subqurries are currently not allowed to have parent_as due to having to plan which query to execute first, an example would be if the subquery would have to match on the FK from the main query or to evaluate columns from the main query.
  # in that case we would have to execute the main query before we could evaluate the subquery for then filtering the main query based on the result of the subquery
  defp has_parent_as(fields, acc \\ false)
  defp has_parent_as(%Ecto.SubQuery{query: query}, acc) do
    has_parent_as(query.select.fields, acc) || if Enum.find(query.wheres, &has_parent_as(&1.expr, acc)), do: true, else: false
  end
  defp has_parent_as({_op, _meta, children}, acc), do: has_parent_as(children, acc)
  defp has_parent_as(nil, acc), do: acc
  defp has_parent_as([], acc), do: acc
  defp has_parent_as([{{:., _, [{:parent_as, _, _}, _]}, _, _} | _fields], _acc), do: true
  defp has_parent_as([{_op, _meta, children} | fields], acc), do: has_parent_as(children, acc) || has_parent_as(fields, acc)
  defp has_parent_as([_ | fields], acc), do: has_parent_as(fields, acc)

  defp has_fragment(fields, acc \\ false)
  defp has_fragment(%Ecto.SubQuery{query: query}, acc), do: has_fragment(query.select.fields, acc)
  defp has_fragment(%Ecto.Query.Tagged{value: value}, acc), do: has_fragment(value, acc)
  defp has_fragment(nil, acc), do: acc
  defp has_fragment([], acc), do: acc
  defp has_fragment({:fragment, _meta, _children}, _acc), do: true
  defp has_fragment([{{:., _, [{:fragment, _, _}, _]}, _, _} | _fields], _acc), do: true
  defp has_fragment({_op, _meta, children}, acc), do: has_fragment(children, acc)
  defp has_fragment({_op, children}, acc), do: has_fragment(children, acc)
  defp has_fragment([{:fragment, _meta, _children} | _fields], _acc), do: true
  defp has_fragment([{_op, _meta, children} | fields], acc), do: has_fragment(children, acc) || has_fragment(fields, acc)
  defp has_fragment([_ | fields], acc), do: has_fragment(fields, acc)

  defp has_aggregates(fields, acc \\ false)
  defp has_aggregates(nil, acc), do: acc
  defp has_aggregates([], acc), do: acc
  defp has_aggregates([{_, {op, _meta, _children}} | _fields], false) when op in @aggregates, do: true
  defp has_aggregates([{op, _meta, _children} | _fields], false) when op in @aggregates, do: true
  defp has_aggregates([{_op, _meta, children} | fields], _acc) do
    if has_aggregates(children), do: true, else: has_aggregates(fields)
  end
  defp has_aggregates([_ | fields], acc), do: has_aggregates(fields, acc)

  defp options(%Ecto.Query{} = query, options) do
    unique = unique?(query)
    if options[:unique] && unique, do: raise(Ecto.QueryError, query: query, message: "QLC does not support mixing distinct in queries and unique options")
    options
    # |> Keyword.put_new(:unique, unique)
    |> Enum.take_while(fn
      {k, _v} -> k in ~w[max_lookup cache join lookup unique]a
      k -> k in ~w[cache unique]a
    end)
    |> Enum.map(fn
      {:join, join} when join not in ~w[any merge lookup nested_loop]a -> raise(Ecto.QueryError, query: query, message: "QLC only supports: :any, :merge, :lookup or :nested_loop joins, got: `#{inspect(join)}`")
      x -> x
    end)
  end

  defp unique?(%Ecto.Query{distinct: %Ecto.Query.QueryExpr{}}), do: false
  defp unique?(%Ecto.Query{}), do: false


  defp to_qlc(:subquery = operator, query), do: '[#{to_expression(operator, query)} || #{to_qualifiers(query)}]'
  defp to_qlc(operator, query), do: '[#{to_expression(operator, query)} || #{to_qualifiers(query)}].'

  defp to_expression(operator, {adapter_meta, _query_meta, query, _params, options} = q) do
    mod = :"#{String.downcase(List.last(Module.split(adapter_meta.adapter)))}"
    count = tuple_size(query.sources) - 1
    if operator in ~w[delete_all update_all]a do
      if query.select do
        '{#{Enum.map_join(query.select.fields, ", ", &expr(&1, q))}}'
      else
        '#{Enum.map_join(0..count, ", ", fn idx -> "#{String.upcase(String.first(elem(elem(query.sources, idx), 0)))}#{idx}"  end)}'
      end
     else
      '{#{Enum.map_join(0..count, ", ", fn idx ->
        case elem(query.sources, idx) do
          {<<s::binary-size(1), _::binary>> = source, nil, prefix} ->
            table = to_table(adapter_meta, source, prefix, options)
            [primary_keys | fields] = if mod == :mnesia and table in :mnesia.system_info(:tables), do: :mnesia.table_info(table, :attributes), else: [:version, :inserted_at]
            primary_keys = if source == "schema_migrations", do: [:version], else: [primary_keys]
            Enum.map_join(fields, ", ", &to_element(adapter_meta, &1, fields, primary_keys, "#{String.upcase(s)}#{idx}"))

          {<<s::binary-size(1), _::binary>>, module, _} ->
            Enum.map_join(module.__schema__(:fields), ", ", &to_element(adapter_meta, &1, module.__schema__(:fields), module.__schema__(:primary_key), "#{String.upcase(s)}#{idx}"))

          %{query: %{sources: {{<<s::binary-size(1), _::binary>>, module, _}}}} ->
            Enum.map_join(module.__schema__(:fields), ", ", &to_element(adapter_meta, &1, module.__schema__(:fields), module.__schema__(:primary_key), "#{String.upcase(s)}#{idx}"))

          %{query: query} ->
            {<<s::binary-size(1), _::binary>>, module, _} = elem(query.sources, idx)
            Enum.map_join(module.__schema__(:fields), ", ", &to_element(adapter_meta, &1, module.__schema__(:fields), module.__schema__(:primary_key), "#{String.upcase(s)}#{idx}"))

        end
        end)}}'
     end
   end

  defp to_qualifiers({adapter_meta, _query_meta, query, _params, options} = q) do
    prefix = options[:prefix] || query.from.prefix || query.prefix
    options = options(query, options)
    count = tuple_size(query.sources) - 1
    mod = :"#{String.downcase(List.last(Module.split(adapter_meta.adapter)))}"
    options = Keyword.merge(options, [n_objects: query.limit])
    take = if mod == :mnesia, do: ~w[lock traverse n_objects]a, else: ~w[traverse n_objects]a
    table_opts = Keyword.take(options, take)
    generators = Enum.map_join(0..count, ", ", fn idx ->
      case elem(query.sources, idx) do
      {<<s::binary-size(1), _::binary>> = source, _module, _} ->
      "#{String.upcase(s)}#{idx} <- #{mod}:table('#{to_table(adapter_meta, source, prefix, options)}', [#{Enum.map_join(table_opts, ", ", fn {k, v} -> "{#{k}, #{v}}" end)}])"
      %{query: %{sources: {{<<s::binary-size(1), _::binary>> = source, _module, _}}}} ->
        "#{String.upcase(s)}#{idx} <- #{mod}:table('#{to_table(adapter_meta, source, prefix, options)}', [#{Enum.map_join(table_opts, ", ", fn {k, v} -> "{#{k}, #{v}}" end)}])"

      %{query: %{sources: sources}} ->
        {<<s::binary-size(1), _::binary>> = source, _module, _} = elem(sources, idx)
        "#{String.upcase(s)}#{idx} <- #{mod}:table('#{to_table(adapter_meta, source, prefix, options)}', [#{Enum.map_join(table_opts, ", ", fn {k, v} -> "{#{k}, #{v}}" end)}])"
      end
    end)
    filters = Enum.map_join(query.joins, " andalso ", fn %Ecto.Query.JoinExpr{prefix: _prefix, on: %Ecto.Query.QueryExpr{expr: expr}} -> expr(expr, q) end)
    guards = wheres(query.wheres, q, [])
    cond do
      filters == "" and guards == "" -> generators
      guards == "" -> "#{generators}, #{filters}"
      filters == "" -> "#{generators}, #{guards}"
      true -> "#{generators}, #{filters}, #{guards}"
    end
  end

  defp wheres([], _query, [" andalso "]), do: ""
  defp wheres([], _query, acc), do: to_string(acc)
  defp wheres([%Ecto.Query.BooleanExpr{expr: expr} | rest], query, [] = acc), do: wheres(rest, query, [expr(expr, query) | acc])
  defp wheres([%Ecto.Query.BooleanExpr{op: op, expr: expr} | rest], query, acc), do: wheres(rest, query, [acc] ++ "#{to_erlang_term(op)} #{expr(expr, query)}")

  defp to_query_handle(_operator, {_adapter_meta, _query_meta, query, params, options}, qlc) do
    Enum.reduce(query.order_bys, :qlc.string_to_handle(qlc, options, bindings(params)), fn %{expr: expr}, qh ->
      Enum.reduce(expr, qh, fn {k, {{:., _, [{:&, _, [idx]}, column]}, _, _}}, qh when k in ~w[asc desc]a  ->
        module = elem(elem(query.sources, idx), 1)
        columns = module.__schema__(:fields)
        primary_key = module.__schema__(:primary_key)

        key = case primary_key do
        [_primary_key] ->
          Enum.find_index(columns, &(&1 == column)) + 1
        primary_keys ->
          if idx = Enum.find_index(columns -- primary_keys, &(&1 == column)), do: idx + length(primary_keys), else: 1
        end
        :qlc.keysort(key, qh, order: to_order(query, k))

        {k, _v}, qh ->
          :qlc.sort(qh, order: to_order(query, k))
      end)
     end)
  end

  defp to_order(_query, :asc), do: :ascending
  defp to_order(_query, :desc), do: :descending
  defp to_order(query, order), do: raise(Ecto.QueryError, query: query, message: "QLC does not support ordering by: #{inspect order}")

  defp expr({_, _,[{{_, _, [{:parent_as, _, _}, _]}, _, _}, _]}, _query), do: ""
  defp expr({_, _,[_, {{_, _, [{:parent_as, _, _}, _]}, _, _}]}, _query), do: ""
  defp expr({:exists, _, [%Ecto.SubQuery{} = subquery]}, {adapter_meta, _query_meta, query, params, options}) do
    execute(adapter_meta, query, {:all, subquery.query}, params, options)
    |> elem(1)
    |> List.flatten()
    |> Enum.empty?()
    |> Kernel.not()
    |> to_string()
  end
  defp expr({op, _, [left, {o, _, [%Ecto.SubQuery{} = subquery]}]}, {adapter_meta, _query_meta, query, params, options} = q)  when o in ~w[all any]a  do
    values =
     execute(adapter_meta, query, {:all, subquery.query}, params, options)
     |> elem(1)
     |> List.flatten()

     "apply('Elixir.Enum', '#{o}?', [#{expr(values, q)}, fun(Val) -> #{expr(left, q)} #{to_erlang_term(op)} Val end])"
  end
  defp expr({:sum, _, [expr]}, query) do
    "{sum, #{expr(expr, query)}}"
  end
  defp expr({:fragment, _, fragemnt}, query) do
    fragemnt
    |> Enum.reduce([], fn
      {:raw, raw}, acc -> acc ++ [raw]
      {:expr, {expr, _, _}}, acc -> acc ++ [expr(expr, query)]
    end)
    |> to_string()
  end
  defp expr({:not = operator, mdl, [{:in, mdr, [left, %Ecto.SubQuery{} = subquery]}]}, {adapter_meta, _query_meta, query, params, options} = q), do: expr({operator, mdl, [{:in, mdr, [left, elem(execute(adapter_meta, query, {:all, subquery.query}, params, options), 1)]}]}, q)
  defp expr({:not = operator, _, [{:in, _, _} = expr]}, query), do: unroll(expr, query, operator)
  defp expr({:not = operator, [], [{:is_nil, _, [{{:., _, [{:&, _, [index]}, column]}, _, _}]}]}, {adapter_meta, _query_meta, query, _params, _options}) do
    {<<s::binary-size(1), _::binary>>, module, _prefix} = elem(query.sources, index)
    "#{to_element(adapter_meta, column, module.__schema__(:fields), module.__schema__(:primary_key), "#{String.upcase(s)}#{index}")} #{to_erlang_term(operator)} nil"
  end
  defp expr({:not, _, [expr]}, query) do
    "(#{expr(expr, query)}) == false"
  end
  defp expr({operator, _, [{l, _, _} = left, {r, _, _} = right]}, query) when operator in ~w[> < >= <= == === !=]a and l == :datetime_add or r == :datetime_add do
    case operator do
      :>   -> "apply('Elixir.DateTime', compare, [#{expr(left, query)}, #{expr(right, query)}]) == gt"
      :<   -> "apply('Elixir.DateTime', compare, [#{expr(left, query)}, #{expr(right, query)}]) == lt"
      :>=  -> "apply('Elixir.DateTime', compare, [#{expr(left, query)}, #{expr(right, query)}]) == gt or eq"
      :<=  -> "apply('Elixir.DateTime', compare, [#{expr(left, query)}, #{expr(right, query)}]) == lt or eq"
      :==  -> "apply('Elixir.DateTime', compare, [#{expr(left, query)}, #{expr(right, query)}]) == eq"
      :=== -> "apply('Elixir.DateTime', compare, [#{expr(left, query)}, #{expr(right, query)}]) == eq"
      :!=  -> "apply('Elixir.DateTime', compare, [#{expr(left, query)}, #{expr(right, query)}]) /= eq"
    end
  end
  defp expr({operator, _, [left, right]}, query) when operator in ~w[not or and > < >= <= == === != + - * /]a do
    "#{expr(left, query)} #{to_erlang_term(operator)} #{expr(right, query)}"
  end
  defp expr(%Ecto.Query.Tagged{value: expr}, query), do: expr(expr, query)
  defp expr({:json_extract_path, _, [left, right]}, query) do
    "apply('Elixir.EctoQLC.Adapters.QLC', get_in, [#{expr(left, query)}, #{expr(right, query)}])"
  end
  defp expr({:like, _, [left, match]}, query) do
    "apply('Elixir.EctoQLC.Adapters.QLC', 'like', [#{expr(left, query)}, #{expr(match, query)}]) == true"
  end
  defp expr({:ilike, _, [left, match]}, query) do
    "apply('Elixir.EctoQLC.Adapters.QLC', 'ilike', [#{expr(left, query)}, #{expr(match, query)}]) == true"
  end
  defp expr({:datetime_add, _, [left, right, interval]}, query) do
    "apply('Elixir.DateTime', add, [#{expr(left, query)}, #{interval_to_seconds(interval) * expr(right, query)}])"
  end
  defp expr({:date_add, _, [left, right, interval]}, query) do
    "apply('Elixir.Date', add, [#{expr(left, query)}, #{round(interval_to_days(interval) * expr(right, query))}])"
  end
  defp expr({:count, _, [{expr, [], []}, :distinct]}, query), do: expr(expr, query)
  defp expr({:count, _, [{expr, [], []}]}, query), do: expr(expr, query)
  defp expr({:coalesce, _, [left, right]}, query) do
    "apply('Elixir.EctoQLC.Adapters.QLC', coalesce, [#{expr(left, query)}, #{expr(right, query)}])"
  end
  defp expr({:parent_as, _, [key]}, {_adapter_meta, query_meta, _query, _params, _options}), do: query_meta.aliases[key]
  defp expr({:., _, [{:&, _, [idx]}, column]}, {adapter_meta, _query_meta, query, _params, options}) do
    case elem(query.sources, idx) do
      {<<s::binary-size(1), _::binary>> = source, nil, prefix} when adapter_meta.adapter == EctoQLC.Adapters.Mnesia ->
        attributes = :mnesia.table_info(to_table(adapter_meta, source, prefix, options), :attributes)
        to_element(adapter_meta, column, tl(attributes), [hd(attributes)], "#{String.upcase(s)}#{idx}")

      {"schema_migrations", nil, _prefix} ->
        to_element(adapter_meta, column, Ecto.Migration.SchemaMigration.__schema__(:fields), Ecto.Migration.SchemaMigration.__schema__(:primary_key), "S#{idx}")

      {<<s::binary-size(1), _::binary>>, module, _prefix} ->
        to_element(adapter_meta, column, module.__schema__(:fields), module.__schema__(:primary_key), "#{String.upcase(s)}#{idx}")
    end
  end
  defp expr({:., _, [{:parent_as, _, [key]}, column]}, {adapter_meta, query_meta, _query, _params, _options}) do
    idx = query_meta.aliases[key]
    {<<s::binary-size(1), _::binary>>, module, _prefix} = elem(query_meta.sources, query_meta.aliases[key])
    to_element(adapter_meta, column, module.__schema__(:fields), module.__schema__(:primary_key), "#{String.upcase(s)}#{idx}")
  end
  defp expr({:in, metadata, [left, %Ecto.SubQuery{} = subquery]}, {adapter_meta, _query_meta, query, params, options} = q), do: expr({:in, metadata, [left, elem(execute(adapter_meta, query, {:all, subquery.query}, params, options), 1)]}, q)
  defp expr({:in, _, _} = expr, query), do: unroll(expr, query, :==)
  defp expr({:is_nil, _, [expr]}, query), do: "#{expr(expr, query)} == nil"
  defp expr({:^, _, [ix]}, _query), do: "PARAM#{ix + 1}"
  defp expr({expr, [], []}, query), do: expr(expr, query)
  defp expr(expr, _query), do: to_erlang_term(expr)

  defp expr({:filter, _, [expr, filter]}, group, query) do
    expr(expr, Enum.filter(group, &expr(filter, &1, query)), query)
  end
  defp expr({:count, _, []}, group, _query), do: length(group)
  defp expr({:count, _, [expr, :distinct]}, group, query) do
    group
    |> Enum.uniq_by(&expr(expr, &1, query))
    |> Enum.count()
  end
  defp expr({:avg, _, [expr]}, group, query) do
    case Enum.map(group, &expr(expr, &1, query)) do
      [] -> 0
      values -> Enum.sum(values) / length(values)
    end
  end
  defp expr({op, _, [expr]}, group, query) when op in @aggregates do
    case Enum.map(group, &expr(expr, &1, query)) do
      [] -> 0
      values -> apply(Enum, op, [values])
    end
  end
  defp expr({:in, _, [left, right]}, row, query), do: expr(left, row, query) in expr(right, row, query)
  defp expr({:and, _, [left, right]}, row, query), do: expr(left, row, query) and expr(right, row, query)
  defp expr({:or, _, [left, right]}, row, query), do: expr(left, row, query) or expr(right, row, query)
  defp expr({op, _, [left, right]}, row, query) when op in @operators, do: apply(Kernel, op, [expr(left, row, query), expr(right, row, query)])
  defp expr({op, _, _} = expr, [row | _] = group, query) when op not in @aggregates and is_tuple(row), do: expr(expr, List.first(group), query)
  defp expr({:parent_as, _, [key]}, _row, {_adapter_meta, query_meta, _query, _params, _options}), do: query_meta.aliases[key]
  defp expr({:&, _, [idx]}, _row, _query), do: idx
  defp expr({{:., _, [_expr, _column]}, _, _} = expr, row, {adapter_meta, query_meta, %{query: query}, params, options}), do: expr(expr, row, {adapter_meta, query_meta, query, params, options})
  defp expr({{:., _, [expr, column]}, _, _}, row, {_adapter_meta, _query_meta, query, _params, _options} = q) do
    idx = expr(expr, row, q)
    case elem(query.sources, idx) do
      %{query: query} ->
        {_source, module, _prefix} = elem(query.sources, idx)

        base = if idx > 0, do: Enum.reduce(0..idx - 1, 0, fn idx, acc ->
          {_source, module, _prefix} = elem(query.sources, idx)
          length(module.__schema__(:fields)) + acc
        end), else: 0
        elem(row, base + Enum.find_index(module.__schema__(:fields), &(&1 == column)))

      {"schema_migrations", nil, _prefix} -> elem(row, 0)

      {_source, module, _prefix} ->
        base = if idx > 0, do: Enum.reduce(0..idx - 1, 0, fn idx, acc ->
          {_source, module, _prefix} = elem(query.sources, idx)
          length(module.__schema__(:fields)) + acc
        end), else: 0
        elem(row, base + Enum.find_index(module.__schema__(:fields), &(&1 == column)))
    end
  end
  defp expr({:^, _, [idx]}, _row, {_adapter_meta, _query_meta, _query, params, _options}), do: Enum.at(params, idx, idx)
  defp expr({:like, _, [left, right]}, row, query), do: String.match?(expr(left, row, query), Regex.compile!(expr(right, row, query)))
  defp expr({:ilike, _, [left, right]}, row, query), do: String.match?(expr(left, row, query), Regex.compile!(expr(right, row, query), [:caseless]))
  defp expr({:json_extract_path, _, [left, right]}, row, query) do
    Enum.reduce(expr(right, row, query), expr(left, row, query), fn
    _k, nil = data -> data
    k, %_{} = struct -> Map.get(struct, String.to_existing_atom(k))
    k, data when is_integer(k) and is_list(data) -> Enum.at(data, k)
    k, data when is_integer(k) and is_tuple(data) -> elem(data, k)
    k, data -> data[k] || data[String.to_existing_atom(k)]
  end)
  end
  defp expr({operator, md, [%Ecto.Query.Tagged{value: expr}, right, interval]}, row, query) when operator in ~w[datetime_add date_add]a do
    expr({operator, md, [expr, right, interval]}, row, query)
  end
  defp expr({operator, md, [left, %Ecto.Query.Tagged{value: expr}, interval]}, row, query) when operator in ~w[datetime_add date_add]a do
    expr({operator, md, [left, expr, interval]}, row, query)
  end
  defp expr({:datetime_add, _, [left, right, interval]}, row, query) do
    DateTime.add(expr(left, row, query), interval_to_seconds(interval) * expr(right, row, query), :second)
  end
  defp expr({:date_add, _, [left, right, interval]}, row, query) do
    Date.add(expr(left, row, query), round(interval_to_days(interval) * expr(right, row, query)))
  end
  defp expr({op, _, [left, right]}, row, query), do: apply(__MODULE__, op, [expr(left, row, query), expr(right, row, query)])
  defp expr(%Ecto.Query.Tagged{value: expr}, row, query), do: expr(expr, row, query)
  defp expr({_selected_as, expr}, row, query), do: expr(expr, row, query)
  defp expr(%Ecto.SubQuery{} = subquery, _row, {adapter_meta, _query_meta, query, params, options}), do: execute(adapter_meta, query, {:all, subquery.query}, params, options) |> elem(1) |> List.flatten()
  defp expr(expr, _row, _query), do: expr

  @doc false
  def select(rows, :all, {adapter_meta, _query_meta, query, _params, _options} = q) do
    if query.select && has_aggregates(query.select.fields) do
      rows = rows
             |> Enum.group_by(&Enum.map_join(query.group_bys, ":", fn {_column, idx} -> :erlang.element(idx, &1) end))
             |> Map.values()
             |> Enum.map(fn group -> Enum.map(query.select.fields, &expr(&1, group, q)) end)
             |> distinct(query, adapter_meta)
             |> offset(query, 0)
             |> Enum.take(query.limit)

      {length(rows), rows}
    else
      rows = rows
             |> Enum.map(fn row ->
               if query.select do
                 Enum.map(query.select.fields, &expr(&1, row, q))
               else
                 row
               end
              end)
              |> distinct(query, adapter_meta)
              |> offset(query, 0)
              |> Enum.take(query.limit)

      {length(rows), rows}
    end
  end
  def select(rows, :delete_all, {adapter_meta, _query_meta, %Ecto.Query{from: %{source: {source, _module}}} = query, _params, options}) do
    mod = :"#{String.downcase(List.last(Module.split(adapter_meta.adapter)))}"
    prefix = options[:prefix] || query.from.prefix || query.prefix
    table = to_table(adapter_meta, source, prefix, options)
    rows
    |> distinct(query, adapter_meta)
    |> offset(query, 0)
    |> Enum.take(query.limit)
    |> Enum.reduce({0, nil}, fn row, {count, acc} ->
      args = if mod == :mnesia, do: [:erlang.setelement(1, row, table)], else: [table, row]
      if apply(mod, :delete_object, args) do
        {count + 1, acc}
      else
        {count, acc}
      end
    end)
  end
  def select(rows, :update_all, {%{adapter: EctoQLC.Adapters.Mnesia} = adapter_meta, _query_meta, %Ecto.Query{from: %{source: {source, _module}}} = query, _params, options}) do
    {:atomic, value} = :mnesia.transaction(fn ->
      table = to_table(adapter_meta, source, query.from.prefix || query.prefix, options)
      rows
      |> distinct(query, adapter_meta)
      |> offset(query, 0)
      |> Enum.take(query.limit)
      |> Enum.reduce({0, nil}, fn row, {count, acc} ->
        if _row = update(:mnesia, table, row, query) do
          {count + 1, acc}
        else
          {count, acc}
        end
      end)
   end)
   value
  end
  def select(rows, :update_all, {adapter_meta, _query_meta, %Ecto.Query{from: %{source: {source, _module}}} = query, _params, options}) do
    mod = :"#{String.downcase(List.last(Module.split(adapter_meta.adapter)))}"
    table = to_table(adapter_meta, source, query.from.prefix || query.prefix, options)
    rows
    |> distinct(query, adapter_meta)
    |> offset(query, 0)
    |> Enum.take(query.limit)
    |> Enum.reduce({0, nil}, fn row, {count, acc} ->
      if _row = update(mod, table, row, query) do
        {count + 1, acc}
      else
        {count, acc}
      end
    end)
  end

  @doc false
  def get_in(data, keys) do
    Enum.reduce(keys, data, fn
      k, %_{} = struct -> Map.get(struct, String.to_existing_atom(k))
      k, data -> data[k] || data[String.to_existing_atom(k)]
    end)
  end

  defp update(:ets = mod, table, row, query), do: mod.update_element(table, elem(row, 0), Keyword.values(query.updates))
  defp update(:mnesia = mod, _table, row, query), do: mod.write(Enum.reduce(Keyword.values(query.updates), row, fn {idx, value}, row -> :erlang.setelement(idx, row, value) end))
  defp update(:dets = mod, table, row, query), do: mod.insert(table, Enum.reduce(Keyword.values(query.updates), row, fn {idx, value}, row -> :erlang.setelement(idx, row, value) end))

  @doc false
  def delete(mod, adapter_meta, %{source: source, prefix: prefix, schema: schema}, filters, _returning, options) when mod in ~w[dets ets]a do
    table = to_table(adapter_meta, source, prefix, options)
    ms = to_match_spec(adapter_meta, schema, filters)
    {query_time, count} = :timer.tc(mod, :select_delete, [table, ms])
    if 0 < count do
      {:ok, []}
    else
      {:error, :stale}
    end
    |> log(source, "DELETE #{inspect source} #{inspect filters} MATCHSPEC #{inspect ms}", query_time, 0, 0, 0, :delete_all, adapter_meta.telemetry, filters, [], options ++ adapter_meta.opts)
  end
  def delete(:mnesia = mod, adapter_meta, %{source: source, prefix: prefix, schema: schema}, filters, _returning, options) do
    ms = to_match_spec(adapter_meta, schema, filters)
    fun = fn ->
            with table <- to_table(adapter_meta, source, prefix, options),
                 [row] <- mod.select(table, ms),
                   :ok <- mod.delete_object(row) do
              {:ok, []}
            else
              _ -> {:error, :stale}
            end
          end
    {query_time, {:atomic, result}} = :timer.tc(mod, :transaction, [fun])
    log(result, source, "DELETE #{inspect source} #{inspect filters} MATCHSPEC #{inspect ms}", query_time, 0, 0, 0, :delete_all, adapter_meta.telemetry, filters, [], options ++ adapter_meta.opts)
  end

  @doc false
  def insert(:dets = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, _on_conflict, returning, options) do
    table = to_table(adapter_meta, source, prefix, options)
    primary_key = schema.__schema__(:primary_key)
    columns = schema.__schema__(:fields)
    key = get_key(primary_key, columns, fields)
    record = to_record({key}, columns, primary_key, fields)
    file = if dir = Application.get_env(mod, :dir), do: Path.join(dir, "#{table}"), else: table
    options = Keyword.take(Keyword.merge([file: '#{file}'],  options), ~w[access auto_save estimated_no_objects file max_no_slots min_no_slots keypos ram_file repair type]a)
    {query_time, result} = with {:ok, ^table} <- mod.open_file(table, options),
                                {query_time, true} <- :timer.tc(mod, :insert_new, [table, record]),
                                :ok <- mod.sync(table) do
                                {query_time, {:ok, Enum.map(returning, &fields[&1])}}
                            else
                              {query_time, false} when is_integer(query_time) -> {query_time, {:invalid, [unique: "primary_key"]}}
                            end
    log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, fields, [], options ++ adapter_meta.opts)
  end
  def insert(:ets = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, _on_conflict, returning, options) do
    table = to_table(adapter_meta, source, prefix, options)
    primary_key = schema.__schema__(:primary_key)
    columns = schema.__schema__(:fields)
    key = get_key(primary_key, columns, fields)
    record = to_record({key}, columns, primary_key, fields)

    {query_time, result} = case :timer.tc(mod, :insert_new, [table, record]) do
                              {query_time, true}  -> {query_time, {:ok, Enum.map(returning, &fields[&1])}}
                              {query_time, false} -> {query_time, {:invalid, [unique: "primary_key"]}}
                            end
    log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, fields, [], options ++ adapter_meta.opts)
  end
  def insert(:mnesia = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, _on_conflict, returning, options) do
    table = to_table(adapter_meta, source, prefix, options)
    primary_key = schema.__schema__(:primary_key)
    columns = schema.__schema__(:fields)
    key = get_key(primary_key, columns, fields)
    record = to_record({table, key}, columns, primary_key, fields)
    fun  = fn ->
                with [] <- mod.wread({table, key}),
                    :ok <- mod.write(record) do
                  {:ok, []}
                else
                  [_record] -> {:invalid, [unique: "primary_key"]}

                  {:aborted, {:no_exists, ^table}} -> {:invalid, [no_exists: table]}

                  {:aborted, {:bad_type, _}} -> {:invalid, [bad_type: record]}
                end
            end

   {query_time, result} = case :timer.tc(mod, :transaction, [fun]) do
                            {query_time, {:aborted, {:bad_type, ^record}}} -> {query_time, {:invalid, [bad_type: inspect(record)]}}
                            {query_time, {:atomic, :ok}}                   -> {query_time, {:ok, Enum.map(fields, &Enum.map(returning, fn k -> &1[k] end))}}
                            {query_time, {:atomic, result}}                -> {query_time, result}
                          end

    log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, fields, [], options ++ adapter_meta.opts)
  end

  @doc false
  def insert_all(:dets = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, _header, rows, _on_conflict, returning, _placeholders, options) do
    table = to_table(adapter_meta, source, prefix, options)
    {query_time, rows} = if is_list(rows), do: {0, rows}, else: :timer.tc(adapter_meta.repo, :all, [rows])
    primary_key = schema.__schema__(:primary_key)
    columns = schema.__schema__(:fields)
    records = for row <- rows do
      key = get_key(primary_key, columns, row)
      to_record({key}, columns, primary_key, row)
    end
    file = if dir = Application.get_env(mod, :dir), do: Path.join(dir, "#{table}"), else: table
    options = Keyword.take(Keyword.merge([file: '#{file}'],  options), ~w[access auto_save estimated_no_objects file max_no_slots min_no_slots keypos ram_file repair type]a)

    {query_time, result}  = with {open_time, {:ok, ^table}} <- :timer.tc(mod, :open_file, [table, options]),
         {insert_time, true} <- :timer.tc(mod, :insert_new, [table, records]),
         {sync_time, :ok} <- :timer.tc(mod, :sync, [table]) do
         result = unless returning == [], do: Enum.map(rows, &Enum.map(returning, fn k -> &1[k] end))
         {query_time + open_time + insert_time + sync_time, {length(records), result}}
    else
      {insert_time, false} when is_integer(insert_time) ->
        {insert_time, {0, nil}}
    end


    log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect rows}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, rows, [], options ++ adapter_meta.opts)
  end
  def insert_all(:ets = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, _header, rows, _on_conflict, returning, _placeholders, options) do
    table = to_table(adapter_meta, source, prefix, options)
    {query_time, rows} = if is_list(rows), do: {0, rows}, else: :timer.tc(adapter_meta.repo, :all, [rows])
    primary_key = schema.__schema__(:primary_key)
    columns = schema.__schema__(:fields)
    records = for row <- rows, do: to_record({get_key(primary_key, columns, row)}, columns, primary_key, row)
    result = unless returning == [], do: Enum.map(rows, &Enum.map(returning, fn k -> &1[k] end))
    {insert_time, value} = :timer.tc(mod, :insert_new, [table, records])
    query_time = query_time + insert_time
    if value do
      {length(records), result}
    else
      {0, result}
    end
    |> log(source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect rows}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, rows, [], options ++ adapter_meta.opts)
  end
  def insert_all(:mnesia = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, _header, rows, _on_conflict, returning, _placeholders, options) do
    table = to_table(adapter_meta, source, prefix, options)
    {query_time, rows} = if is_list(rows), do: {0, rows}, else: :timer.tc(adapter_meta.repo, :all, [rows])
    primary_key = schema.__schema__(:primary_key)
    columns = schema.__schema__(:fields)
    {:atomic, result} = mod.transaction(fn ->
      Enum.reduce(rows, {0, []}, fn row, {count, acc} ->
        key = get_key(primary_key, columns, row)
        record = to_record({table, key}, columns, primary_key, row)
        {insert_time, value} = :timer.tc(mod, :write, [record])
        query_time = query_time + insert_time
        if value == :ok do
          acc = if returning != [], do: acc ++ [Enum.map(returning, fn k -> row[k] end)]
          log({count + 1, acc}, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect row}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, rows, [], options ++ adapter_meta.opts)
        else
          {count, acc}
        end
      end)
    end)
    result
  end

  @doc false
  def update(:dets = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, params, returning, options) do
    table = to_table(adapter_meta, source, prefix, options)
    key = to_key(params)
    file = if dir = Application.get_env(mod, :dir), do: Path.join(dir, "#{table}"), else: table
    options = Keyword.take(Keyword.merge([file: '#{file}'],  options), ~w[access auto_save estimated_no_objects file max_no_slots min_no_slots keypos ram_file repair type]a)
    {query_time, result} = with {:ok, ^table} <- mod.open_file(table, options),
                                        [row] <- mod.lookup(table, key),
                            {query_time, :ok} <- :timer.tc(mod, :insert, [table, update_row(adapter_meta, schema, fields, row)]),
                                          :ok <- mod.sync(table),
                           {decode_time, row} <- :timer.tc(Enum, :map, [returning, &fields[&1]]) do
                             {query_time + decode_time, {:ok, row}}
                           else
                             {query_time, _error} when is_integer(query_time) ->  {query_time, {:invalid, [unique: "primary_key"]}}
                           end
    log(result, source, "UPDATE INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :update_all, adapter_meta.telemetry, params, [], options ++ adapter_meta.opts)
  end
  def update(:ets = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, params, returning, options) do
    key = to_key(params)
    {records, _count} = Enum.reduce(schema.__schema__(:fields) -- schema.__schema__(:primary_key), {[], 1}, fn k, {acc, count} -> if Keyword.has_key?(fields, k), do: {[{count, fields[k]} | acc], count + 1}, else: {acc, count + 1}  end)
    table = to_table(adapter_meta, source, prefix, options)
    {query_time, result} = with {query_time, true} <- :timer.tc(mod, :update_element, [table, key, records]),
                                {decode_time, row} <- :timer.tc(Enum, :map, [returning, &fields[&1]]) do
                             {query_time + decode_time, {:ok, row}}
                           else
                             {query_time, _} -> {query_time, {:invalid, [unique: "primary_key"]}}
                           end
    log(result, source, "UPDATE INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :update_all, adapter_meta.telemetry, params, [], options ++ adapter_meta.opts)
  end
  def update(:mnesia = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, params, returning, options) do
   table = to_table(adapter_meta, source, prefix, options)
   key = to_key(params)
   fun = fn ->
            with [row] <- mod.wread({table, key}),
            :ok <- mod.write(update_row(adapter_meta, schema, fields, row)) do
              {:ok, []}
            else
            {:error, _resaon} ->
              {:invalid, [unique: "primary_key"]}
            end
          end

   {query_time, result} = case :timer.tc(mod, :transaction, [fun]) do
                            {query_time, {:atomic, :ok}}     -> {query_time, {:ok, Enum.map(fields, &Enum.map(returning, fn k -> &1[k] end))}}
                            {query_time, {:atomic, result}}  -> {query_time, result}
                          end
   log(result, source, "UPDATE INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :update_all, adapter_meta.telemetry, params, [], options ++ adapter_meta.opts)
 end

  defp distinct(rows, %Ecto.Query{distinct: nil}, _adapter_meta), do: rows
  defp distinct(rows, %Ecto.Query{distinct: true}, _adapter_meta), do: Enum.uniq(rows)
  defp distinct(rows, %Ecto.Query{distinct: %Ecto.Query.QueryExpr{expr: true}}, _adapter_meta), do: Enum.uniq(rows)
  defp distinct(rows, %Ecto.Query{distinct: %Ecto.Query.QueryExpr{expr: expr}} = query, adapter_meta) do
    Enum.reduce(expr, rows, fn
      {sorter, {{:., _, [{:&, _, [index]}, column]}, _, _}}, rows ->
        {_source, module, _prefix} = elem(query.sources, index)
        idx = get_index(adapter_meta, column, module.__schema__(:fields), module.__schema__(:primary_key))
        rows
        |> Enum.sort_by(fn
          [v] -> v
          row -> Enum.at(row, idx)
        end, sorter)
        |> Enum.uniq_by(fn
          [v] -> v
          row -> Enum.at(row, idx)
        end)

      {sorter, {:json_extract_path, _, [{{:., _, [{:&, _, [index]}, column]}, _, _}, keys]}}, rows ->
          {_source, module, _prefix} = elem(query.sources, index)
          idx = get_index(adapter_meta, column, module.__schema__(:fields), module.__schema__(:primary_key))

          rows
          |> Enum.sort_by(fn
            [v] -> v
            row ->
              Enum.reduce(keys, Enum.at(row, idx), fn
                k, %_{} = struct -> Map.get(struct, String.to_existing_atom(k))
                k, data -> data[k] || data[String.to_existing_atom(k)]
              end)
          end, sorter)
          |> Enum.uniq_by(fn
            [v] -> v
            row ->
              Enum.reduce(keys, Enum.at(row, idx), fn
                k, %_{} = struct -> Map.get(struct, String.to_existing_atom(k))
                k, data -> data[k] || data[String.to_existing_atom(k)]
              end)
          end)
      end)
  end

  defp offset(rows, %{offset: offset}, offset), do: rows
  defp offset([_ | rows], query, offset), do: offset(rows, query, offset + 1)

  ## TBD There is a cavet here and we should properly use ex_cldr to do proper calculation
  ## E.g months in second is the same as 30 days in seconds, but some months don't have 30 days.
  defp interval_to_seconds("year"), do: 31557600
  defp interval_to_seconds("month"), do: 2629800
  defp interval_to_seconds("week"), do: 604800
  defp interval_to_seconds("day"), do: 86400
  defp interval_to_seconds("hour"), do: 3600
  defp interval_to_seconds("minute"), do: 60
  defp interval_to_seconds("seconds"), do: 1
  defp interval_to_seconds("millisecond"), do: 0.001
  defp interval_to_seconds("microsecond"), do: 0.000001

  defp interval_to_days("year"), do: 365.25
  defp interval_to_days("month"), do: 30.4375
  defp interval_to_days("week"), do: 7
  defp interval_to_days("day"), do: 1
  defp interval_to_days("hour"), do: 0.04166667
  defp interval_to_days("minute"), do: 6.944444e-4
  defp interval_to_days("seconds"), do: 1.157407e-5
  defp interval_to_days("millisecond"), do: 1.157407e-8
  defp interval_to_days("microsecond"), do: 1.157407e-11

  defp unroll({:in, _, [{{:., [], [{:&, [], [index]}, column]}, _, _}, values]}, {adapter_meta, _query_meta, query, _params, _options} = q, operator) do
    [v | values] = unbind(values, q)
    {<<s::binary-size(1), _::binary>>, module, _prefix} = elem(query.sources, index)
    el = to_element(adapter_meta, column, module.__schema__(:fields), module.__schema__(:primary_key), "#{String.upcase(s)}#{index}")
    values
    |> Enum.reduce(["#{el} #{to_erlang_term(operator)} #{v}"], fn v, acc -> acc ++ [" orelse #{el} #{to_erlang_term(operator)} #{v}"] end)
    |> to_string()
  end

  defp unbind({:^, _, [index]}, {_adapter_meta, _query_meta, _query, params, _options}), do: to_erlang_term(Enum.at(params, index))
  defp unbind({:^, _, values}, {_adapter_meta, _query_meta, _query, params, _options}), do: Enum.map(values, &to_erlang_term(Enum.at(params, &1, &1)))
  defp unbind([_ | _] = values, query), do: Enum.map(values, &unbind(&1, query))
  defp unbind(value, _meta), do: to_erlang_term(value)

  defp to_erlang_term(<<value::binary>>), do: '<<"#{value}">>'
  defp to_erlang_term(:or), do: :orelse
  defp to_erlang_term(:and), do: :andalso
  defp to_erlang_term(:<=), do: :"=<"
  defp to_erlang_term(:===), do: :"=:="
  defp to_erlang_term(op) when op in ~w[!= not]a, do: :"/="
  defp to_erlang_term(op) when op in ~w[> < >= == + - * /]a, do: "#{op}"
  defp to_erlang_term(value) when is_list(value), do: "[#{Enum.map_join(value, ", ", &to_erlang_term(&1))}]"
  defp to_erlang_term(value) when is_tuple(value), do: "#{value |> Tuple.to_list() |> Enum.map(&to_erlang_term/1) |> List.to_tuple() |> inspect}"
  defp to_erlang_term(value) when is_atom(value), do: "'#{value}'"
  defp to_erlang_term(value) when is_number(value), do: value
  defp to_erlang_term(value) when is_map(value), do: "#" <> "{" <> Enum.map_join(value, ", ", fn {k, v} -> "#{to_erlang_term(k)} => #{to_erlang_term(v)}" end) <> "}"
  defp to_erlang_term(value), do: value


  defp to_record(tuple, [_ | columns], [], fields), do: Enum.reduce(columns, tuple, &Tuple.insert_at(&2, tuple_size(&2), fields[&1]))
  defp to_record(tuple, columns, primary_key, fields), do: Enum.reduce(columns -- primary_key, tuple, &Tuple.insert_at(&2, tuple_size(&2), fields[&1]))

  defp to_element(adapter_meta, column, columns, [_primary_key] = primary_keys, var) do
    "element(#{get_index(adapter_meta, column, columns, primary_keys)}, #{var})"
  end
  defp to_element(adapter_meta, column, columns, primary_keys, var) do
   idx = get_index(adapter_meta, column, columns, primary_keys)
   if column in primary_keys do
      "element(#{idx}, element(#{if adapter_meta.adapter == EctoQLC.Adapters.Mnesia, do: 2, else: 1}, #{var}))"
   else
     "element(#{idx}, #{var})"
    end
  end

  defp get_index(adapter_meta, column, columns, [_primary_key]) do
    Enum.find_index(columns, &(&1 == column))
    |> Kernel.||(0)
    |> Kernel.+(if(adapter_meta.adapter == EctoQLC.Adapters.Mnesia, do: 2, else: 1))
  end
  defp get_index(adapter_meta, column, columns, primary_keys) do
    if column in primary_keys do
      Enum.find_index(primary_keys, &(&1 == column)) + 1
    else
      Enum.find_index(columns -- primary_keys, &(&1 == column)) + length(primary_keys) + if(adapter_meta.adapter == EctoQLC.Adapters.Mnesia, do: 1, else: 1)
    end
  end

  defp to_key(params) do
    case Keyword.values(params) do
      [k] -> k
      values -> List.to_tuple(values)
    end
  end

  defp to_table(adapter_meta, source, prefix, options) do
    Module.concat([adapter_meta.adapter, options[:prefix] || prefix, source])
  end

  defp log(:ok, source, query, query_time, decode_time, queue_time, idle_time, operator, telemetry, params, columns, opts) do
    log({0, []}, source, query, query_time, decode_time, queue_time, idle_time, operator, telemetry, params, columns, opts)
  end
  defp log({num_rows, rows} = result, source, query, query_time, decode_time, queue_time, idle_time, operator, {repo, log, event_name} = _telemetry, params, columns, opts) do
    columns = if is_struct(columns) and columns.select, do: columns.select.fields, else: []
    query = String.Chars.to_string(query)
    stacktrace = Keyword.get(opts, :stacktrace)
    if event_name = Keyword.get(opts, :telemetry_event, event_name) do
      :telemetry.execute(event_name,
                        %{query_time: query_time, decode_time: decode_time, queue_time: queue_time, idle_time: idle_time, total_time: query_time + decode_time + queue_time + idle_time},
                        %{type: :ecto_qlc_query, repo: repo, result: {:ok, %{command: to_command(operator), rows: rows, num_rows: num_rows, columns: columns}}, params: params, query: query, source: source, stacktrace: stacktrace, options: Keyword.get(opts, :telemetry_options, [])})
    end
    fun = fn -> log_iodata(query_time, decode_time, queue_time, idle_time, repo, source, query, opts[:cast_params] || params, result, stacktrace) end
    case Keyword.get(opts, :log, log) do
      true  ->
        Logger.log(:debug, fun, ansi_color: operator_to_color(operator))
        result
      false ->
        :ok
        result
      level ->
        Logger.log(level, fun, ansi_color: operator_to_color(operator))
        result
    end
  end

  defp to_command(:all), do: :select
  defp to_command(command), do: command

  defp log_iodata(query_time, decode_time, queue_time, idle_time, repo, source, query, params, result, stacktrace) do
    result =  if is_tuple(result) and is_atom(elem(result, 0)), do: String.upcase("#{elem(result, 0)}"), else: "OK"
    stacktrace = case stacktrace do
      [_ | _] = stacktrace ->
        {module, function, arity, info} = last_non_ecto(Enum.reverse(stacktrace), repo, nil)
        [?\n, IO.ANSI.light_black(), "↳ ", Exception.format_mfa(module, function, arity), log_stacktrace_info(info), IO.ANSI.reset()]
        _ -> []
    end
    ['QUERY ',
     result,
     " source=#{inspect(source)}",
     format_time("db", query_time),
     format_time("decode", decode_time),
     format_time("queue", queue_time),
     format_time("idle_time", idle_time),
     ?\n,
     query,
     ?\s,
     inspect(params, charlists: false),
     List.wrap(stacktrace)]
  end

  defp format_time(label, time) when time > 999, do: [?\s, label, ?=, :io_lib_format.fwrite_g(time / 1000), ?m, ?s]
  defp format_time(label, time) when time <= 999, do: [?\s, label, ?=, "#{time}", , ?s]

  defp log_stacktrace_info([file: file, line: line] ++ _rest), do: [", at: ", file, ?:, Integer.to_string(line)]
  defp log_stacktrace_info(_), do: []

  defp last_non_ecto([{mod, _, _, _} | _stacktrace], repo, last) when mod == repo or mod in [Ecto.Repo.Queryable, Ecto.Repo.Schema, Ecto.Repo.Transaction], do: last
  defp last_non_ecto([last | stacktrace], repo, _last), do: last_non_ecto(stacktrace, repo, last)
  defp last_non_ecto([], _repo, last), do: last

  defp operator_to_color(:all), do: :cyan
  defp operator_to_color(:update_all), do: :yellow
  defp operator_to_color(:delete_all), do: :red
  defp operator_to_color(:insert), do: :green
  defp operator_to_color(_op), do: nil
end