Skip to main content

lib/ecto/adapters/ex_sql/connection.ex

defmodule Ecto.Adapters.ExSQL.Connection do
  @moduledoc false

  alias Ecto.Migration.{Constraint, Index, Reference, Table}
  alias ExSQL.Ecto.Query

  def child_spec(options) do
    {:ok, _} = Application.ensure_all_started(:db_connection)
    DBConnection.child_spec(ExSQL.Ecto.Connection, options)
  end

  def start_link(options) do
    DBConnection.start_link(ExSQL.Ecto.Connection, options)
  end

  def prepare_execute(conn, name, sql, params, options) do
    query = Query.build(name: name, statement: sql)
    DBConnection.prepare_execute(conn, query, params, options)
  end

  def execute(conn, %Query{} = query, params, options) do
    DBConnection.execute(conn, query, params, options)
  end

  def execute(conn, sql, params, options) when is_binary(sql) or is_list(sql) do
    query = Query.build(name: "", statement: IO.iodata_to_binary(sql))

    case DBConnection.prepare_execute(conn, query, params, options) do
      {:ok, %Query{}, result} -> {:ok, result}
      other -> other
    end
  end

  def query(conn, sql, params, options) do
    query = Query.build(statement: IO.iodata_to_binary(sql))

    case DBConnection.prepare_execute(conn, query, params, options) do
      {:ok, %Query{}, result} -> {:ok, result}
      other -> other
    end
  end

  def query_many(_conn, _sql, _params, _options) do
    raise RuntimeError, "query_many is not supported in the ExSQL adapter"
  end

  def to_constraints(%ExSQL.Ecto.Error{message: "UNIQUE constraint failed: " <> columns}, _opts) do
    [unique: unique_constraint_name(columns)]
  end

  def to_constraints(
        %ExSQL.Ecto.Error{message: "CHECK constraint failed: " <> check} = _error,
        _opts
      ) do
    [check: check]
  end

  def to_constraints(%ExSQL.Ecto.Error{message: "FOREIGN KEY constraint failed"} = error, opts) do
    case foreign_key_candidates(error.statement, opts) do
      [] -> [foreign_key: default_foreign_key_name(opts[:source])]
      candidates -> candidates
    end
  end

  def to_constraints(_error, _opts), do: []

  defp foreign_key_candidates(statement, opts) do
    with true <- is_binary(statement),
         source <- default_foreign_key_name(opts[:source]),
         columns <- extract_target_columns(statement),
         cols when cols != [] <- Enum.filter(columns, &String.ends_with?(&1, "_id")) do
      Enum.map(cols, &{:foreign_key, "#{source}_#{&1}_fkey"})
    else
      _ -> []
    end
  end

  defp default_foreign_key_name(nil), do: ""
  defp default_foreign_key_name(source), do: to_string(source)

  defp extract_target_columns(statement) do
    statement
    |> String.trim()
    |> to_string()
    |> case do
      "INSERT INTO " <> rest ->
        rest
        |> String.split("(", parts: 2)
        |> case do
          [_table, cols_and_rest] ->
            cols_and_rest
            |> String.split(")", parts: 2)
            |> case do
              [raw_cols | _] ->
                raw_cols
                |> String.split(",")
                |> Enum.map(&String.trim/1)
                |> Enum.map(&String.trim(&1, "\""))

              _ ->
                []
            end

          _ ->
            []
        end

      "UPDATE " <> rest ->
        rest
        |> String.split("SET", parts: 2)
        |> case do
          [_table, assignments] ->
            assignments
            |> String.split("WHERE", parts: 2)
            |> List.first()
            |> String.split(",")
            |> Enum.map(&String.trim/1)
            |> Enum.map(&String.split(&1, " = ", parts: 2))
            |> Enum.map(fn
              [lhs, _rhs] ->
                lhs
                |> String.trim()
                |> String.trim_leading("\"")
                |> String.trim_trailing("\"")

              _ ->
                nil
            end)
            |> Enum.filter(&is_binary/1)

          _ ->
            []
        end

      _ ->
        []
    end
  end

  def stream(conn, sql, params, options) do
    query = Query.build(statement: sql)
    DBConnection.stream(conn, query, params, options)
  end

  def execute_ddl({_command, %Table{options: options}, _}) when is_list(options) do
    raise ArgumentError, "ExSQL adapter does not support keyword lists in :options"
  end

  def execute_ddl({:create, %Table{} = table, columns}) do
    [create_table_sql("CREATE TABLE ", table, columns)]
  end

  def execute_ddl({:create_if_not_exists, %Table{} = table, columns}) do
    [create_table_sql("CREATE TABLE IF NOT EXISTS ", table, columns)]
  end

  def execute_ddl({:drop, %Table{} = table}) do
    [["DROP TABLE ", quote_table(table.prefix, table.name)]]
  end

  def execute_ddl({:drop, %Table{} = table, _mode}), do: execute_ddl({:drop, table})

  def execute_ddl({:drop_if_exists, %Table{} = table}) do
    [["DROP TABLE IF EXISTS ", quote_table(table.prefix, table.name)]]
  end

  def execute_ddl({:drop_if_exists, %Table{} = table, _mode}) do
    execute_ddl({:drop_if_exists, table})
  end

  def execute_ddl({:alter, %Table{} = table, changes}) do
    Enum.map(changes, fn change ->
      ["ALTER TABLE ", quote_table(table.prefix, table.name), ?\s, column_change(table, change)]
    end)
  end

  def execute_ddl({_, %Index{concurrently: true}}) do
    raise ArgumentError, "`concurrently` is not supported with ExSQL"
  end

  def execute_ddl({_, %Index{only: true}}) do
    raise ArgumentError, "`only` is not supported with ExSQL"
  end

  def execute_ddl({_, %Index{include: [_ | _]}}) do
    raise ArgumentError, "`include` is not supported with ExSQL"
  end

  def execute_ddl({_, %Index{using: using}}) when not is_nil(using) do
    raise ArgumentError, "`using` is not supported with ExSQL"
  end

  def execute_ddl({_, %Index{nulls_distinct: nulls_distinct}})
      when not is_nil(nulls_distinct) do
    raise ArgumentError, "`nulls_distinct` is not supported with ExSQL"
  end

  def execute_ddl({:create, %Index{} = index}) do
    [create_index_sql("CREATE ", index, quote_name(index.name))]
  end

  def execute_ddl({:create_if_not_exists, %Index{} = index}) do
    [create_index_sql("CREATE ", index, ["IF NOT EXISTS ", quote_name(index.name)])]
  end

  def execute_ddl({:drop, %Index{} = index}) do
    [["DROP INDEX ", quote_table(index.prefix, index.name)]]
  end

  def execute_ddl({:drop, %Index{} = index, _mode}), do: execute_ddl({:drop, index})

  def execute_ddl({:drop_if_exists, %Index{} = index}) do
    [["DROP INDEX IF EXISTS ", quote_table(index.prefix, index.name)]]
  end

  def execute_ddl({:drop_if_exists, %Index{} = index, _mode}) do
    execute_ddl({:drop_if_exists, index})
  end

  def execute_ddl({:rename, %Table{} = current_table, %Table{} = new_table}) do
    [
      [
        "ALTER TABLE ",
        quote_table(current_table.prefix, current_table.name),
        " RENAME TO ",
        quote_table(nil, new_table.name)
      ]
    ]
  end

  def execute_ddl({:rename, %Table{} = current_table, old_col, new_col}) do
    [
      [
        "ALTER TABLE ",
        quote_table(current_table.prefix, current_table.name),
        " RENAME COLUMN ",
        quote_name(old_col),
        " TO ",
        quote_name(new_col)
      ]
    ]
  end

  def execute_ddl({:rename, %Index{} = index, new_index}) do
    [
      execute_ddl({:drop, index}),
      execute_ddl({:create, %Index{index | name: new_index}})
    ]
  end

  def execute_ddl({:create, %Constraint{}}) do
    raise ArgumentError, "ExSQL does not support ALTER TABLE ADD CONSTRAINT."
  end

  def execute_ddl({:drop, %Constraint{}, _mode}) do
    raise ArgumentError, "ExSQL does not support ALTER TABLE DROP CONSTRAINT."
  end

  def execute_ddl({:drop_if_exists, %Constraint{}, _mode}) do
    raise ArgumentError, "ExSQL does not support ALTER TABLE DROP CONSTRAINT."
  end

  def execute_ddl(string) when is_binary(string), do: [string]

  def execute_ddl(keyword) when is_list(keyword) do
    raise ArgumentError, "ExSQL adapter does not support keyword lists in execute"
  end

  def ddl_logs(_result), do: []

  def all(query), do: select_sql(query)
  def update_all(query), do: update_all_sql(query)
  def delete_all(query), do: delete_all_sql(query)

  def insert(prefix, table, header, rows, on_conflict, returning, placeholders) do
    insert_sql(prefix, table, header, rows, on_conflict, returning, placeholders)
  end

  def insert(prefix, table, header, rows, on_conflict, returning, placeholders, _opts) do
    insert_sql(prefix, table, header, rows, on_conflict, returning, placeholders)
  end

  def update(prefix, table, fields, filters, returning) do
    fields = Enum.map_intersperse(fields, ", ", &[quote_name(&1), " = ?"])

    [
      "UPDATE ",
      quote_table(prefix, table),
      " SET ",
      fields,
      " WHERE ",
      filters_expr(filters),
      returning_expr(returning)
    ]
  end

  def delete(prefix, table, filters, returning) do
    [
      "DELETE FROM ",
      quote_table(prefix, table),
      " WHERE ",
      filters_expr(filters),
      returning_expr(returning)
    ]
  end

  defp update_all_sql(%Ecto.Query{} = query) do
    assert_simple_update_query!(query)

    [
      "UPDATE ",
      update_from_expr(query),
      " SET ",
      update_fields(query),
      update_where_expr(query.wheres),
      update_returning_expr(query)
    ]
  end

  defp update_fields(%Ecto.Query{updates: updates}) do
    updates
    |> Enum.flat_map(fn
      %Ecto.Query.QueryExpr{expr: expr} ->
        Enum.flat_map(expr, fn
          {op, fields} when is_list(fields) ->
            Enum.map(fields, fn {field, value} ->
              update_operation(op, field, value)
            end)

          _other ->
            []
        end)

      _other ->
        []
    end)
    |> Enum.intersperse(", ")
  end

  # SQLite UPDATE has no source alias, so the SET right-hand side must reference
  # the target's columns unqualified — `json_insert("errors", ...)`, not
  # `json_insert("s0"."errors", ...)`. Strip the alias as the WHERE clause does.
  defp update_operation(:set, field, value),
    do: [update_field(field), " = ", update_expr_expr(value)]

  defp update_operation(:inc, field, value),
    do: [update_field(field), " = ", update_field(field), " + ", update_expr_expr(value)]

  defp update_operation(operation, _field, _value),
    do:
      raise(
        ArgumentError,
        "unsupported update operation for ExSQL adapter: #{inspect(operation)}"
      )

  defp update_field(field), do: quote_name(field)

  defp update_from_expr(%Ecto.Query{from: %{source: {source, _schema}, prefix: prefix}}) do
    quote_table(prefix, source)
  end

  defp unique_constraint_name(columns) do
    columns
    |> String.split(", ")
    |> Enum.map(fn column ->
      column
      |> String.split(".")
      |> List.last()
    end)
    |> then(fn column_names ->
      table =
        columns
        |> String.split(", ")
        |> List.first()
        |> String.split(".")
        |> List.first()

      Enum.join([table | column_names] ++ ["index"], "_")
    end)
  end

  defp select_sql(%Ecto.Query{} = query) do
    assert_simple_query!(query)

    [
      "SELECT ",
      distinct_expr(query.distinct),
      select_expr(query),
      " FROM ",
      from_expr(query),
      join_expr(query.joins),
      where_expr(query.wheres),
      group_by_expr(query.group_bys),
      having_expr(query.havings),
      order_by_expr(query.order_bys),
      limit_expr(query.limit),
      offset_expr(query.offset)
    ]
  end

  defp delete_all_sql(%Ecto.Query{} = query) do
    assert_simple_query!(query)

    [
      "DELETE FROM ",
      delete_from_expr(query),
      delete_where_expr(query.wheres)
    ]
  end

  defp insert_sql(prefix, table, header, rows, on_conflict, returning, placeholders) do
    if placeholders != [] do
      raise ArgumentError, "ExSQL adapter does not support insert placeholders yet"
    end

    values =
      case header do
        [] ->
          [" DEFAULT VALUES"]

        _ ->
          columns = [" (", quote_names(header), ") "]
          row_placeholders = Enum.map_intersperse(rows, ", ", fn row -> placeholders(row) end)
          [columns, "VALUES ", row_placeholders]
      end

    [
      "INSERT INTO ",
      quote_table(prefix, table),
      values,
      on_conflict_expr(on_conflict, header),
      returning_expr(returning)
    ]
  end

  defp assert_simple_query!(%Ecto.Query{} = query) do
    if query.combinations != [] or query.with_ctes != nil or query.lock != nil do
      raise ArgumentError, "this Ecto query shape is not implemented in Ecto.Adapters.ExSQL yet"
    end
  end

  defp distinct_expr(nil), do: []

  defp distinct_expr(%Ecto.Query.ByExpr{expr: true}) do
    "DISTINCT "
  end

  defp distinct_expr(%Ecto.Query.ByExpr{}) do
    raise ArgumentError, "distinct expressions are not supported in Ecto.Adapters.ExSQL yet"
  end

  defp assert_simple_update_query!(%Ecto.Query{} = query) do
    assert_simple_query!(query)

    if query.group_bys != [] or query.havings != [] or
         query.order_bys != [] or
         query.limit != nil or query.offset != nil or query.joins != [] do
      raise ArgumentError,
            "this Ecto query shape is not implemented in Ecto.Adapters.ExSQL yet"
    end
  end

  defp update_returning_expr(%Ecto.Query{select: nil}), do: []

  defp update_returning_expr(
         %Ecto.Query{select: %{expr: {:&, [], [source_index]}} = select} = query
       ) do
    fields =
      select
      |> Map.get(:fields, nil)
      |> normalize_selected_fields(query, source_index)

    [" RETURNING ", quote_names(fields)]
  end

  defp update_returning_expr(%Ecto.Query{select: %{expr: fields, take: take}} = query)
       when is_list(fields) do
    field_names =
      fields
      |> Enum.map(&returning_projection_field(&1, query, take))
      |> Enum.uniq()

    [" RETURNING ", quote_names(field_names)]
  end

  defp update_returning_expr(%Ecto.Query{select: _}),
    do: raise(ArgumentError, "unsupported select in update for ExSQL adapter: query returning")

  defp normalize_selected_fields(nil, query, source_index) do
    if source_schema(query, source_index) do
      source_schema_fields(query, source_index)
    else
      raise ArgumentError,
            "unsupported update returning shape in ExSQL adapter: cannot infer fields without schema"
    end
  end

  defp normalize_selected_fields(_fields, query, source_index) do
    case Map.get(query.select.take, source_index) do
      {:any, field_names} ->
        field_names

      _ ->
        if source_schema(query, source_index) do
          source_schema_fields(query, source_index)
        else
          raise ArgumentError,
                "unsupported update returning shape in ExSQL adapter: cannot infer fields without schema"
        end
    end
  end

  defp returning_projection_field(
         {{:., _, [{:&, [], [source_index]}, field]}, [], []},
         _query,
         take
       ) do
    query_fields =
      case Map.get(take, source_index) do
        nil ->
          [field]

        {:any, only_fields} ->
          if field in only_fields, do: [field], else: []

        _ ->
          [field]
      end

    query_fields
  end

  defp returning_projection_field(_, _query, _take), do: []

  defp select_expr(%Ecto.Query{select: nil, from: %{source: {_source, nil}}}), do: "*"

  defp select_expr(%Ecto.Query{select: nil, from: %{source: {_source, schema}}}) do
    qualified_names(0, schema.__schema__(:fields))
  end

  defp select_expr(%Ecto.Query{select: %{expr: expr}} = query), do: select_projection(expr, query)

  defp select_projection({:merge, _, [left, right]}, query),
    do: select_merge_projection(left, right, query)

  defp select_projection({:%{}, _, [expr]}, query) do
    case expr do
      {:|, _, [{:&, _, [source_index]}, updates]} ->
        select_source_update_projection(source_index, updates, query)

      _ ->
        map_projection_expr([expr])
    end
  end

  defp select_projection({:%{}, _, fields}, _query), do: map_projection_expr(fields)

  defp select_projection({:{}, _, fields}, query),
    do: Enum.map_intersperse(fields, ", ", &select_projection(&1, query))

  defp select_projection({{:., _, [{:&, _, [source_index]}, field]}, _, []}, _query),
    do: qualified_name(source_index, field)

  defp select_projection({:type, _, [expr, _type]}, query), do: select_projection(expr, query)

  defp select_projection({:&, _, [source_index]}, query) do
    source_fields = source_schema_fields(query, source_index)
    qualified_names(source_index, source_fields)
  end

  defp select_projection({:^, _, [_ix]}, _query), do: "?"
  defp select_projection(nil, _query), do: "NULL"

  defp select_projection(value, _query) when is_integer(value) or is_float(value),
    do: to_string(value)

  defp select_projection(value, _query) when is_binary(value),
    do: ["'", escape_string(value), "'"]

  defp select_projection({function, _, _args} = expr, _query)
       when function in [:coalesce, :count, :max, :sum, :avg, :min],
       do: expr_expr(expr)

  defp select_projection(expr, _query) do
    raise ArgumentError, "unsupported select expression for ExSQL adapter: #{inspect(expr)}"
  end

  defp select_merge_projection(left, right, query) do
    cond do
      source_expr?(left) and map_projection_expr?(right) ->
        source_with_map_projection(left, right, query)

      source_expr?(right) and map_projection_expr?(left) ->
        source_with_map_projection(right, left, query)

      map_projection_expr?(left) and map_projection_expr?(right) ->
        [
          map_projection_expr(map_projection_fields(left)),
          ", ",
          map_projection_expr(map_projection_fields(right))
        ]

      true ->
        [select_projection(left, query), ", ", select_projection(right, query)]
    end
  end

  defp source_with_map_projection(
         {:&, _, [source_index]},
         map_expr,
         query
       ) do
    map_fields = map_projection_fields(map_expr)
    override_keys = Enum.map(map_fields, &normalize_projection_key(elem(&1, 0)))

    selected_fields =
      source_schema_fields(query, source_index)
      |> Enum.reject(&Enum.member?(override_keys, &1))
      |> Enum.map_intersperse(", ", &qualified_name(source_index, &1))

    merged_fields = map_projection_expr(map_fields)

    case {selected_fields, merged_fields} do
      {[], []} -> []
      {[], merged_fields} -> merged_fields
      {selected_fields, []} -> selected_fields
      {selected_fields, merged_fields} -> [selected_fields, ", ", merged_fields]
    end
  end

  defp select_source_update_projection(source_index, updates, query) do
    source_fields = source_schema_fields(query, source_index)
    override_fields = Enum.map(updates, &normalize_projection_key(elem(&1, 0)))

    base_projection =
      source_fields
      |> Enum.reject(&Enum.member?(override_fields, &1))
      |> Enum.map_intersperse(", ", &qualified_name(source_index, &1))

    update_projection = map_projection_expr(updates)

    case {base_projection, update_projection} do
      {[], []} -> []
      {[], update_projection} -> update_projection
      {base_projection, []} -> base_projection
      {base_projection, update_projection} -> [base_projection, ", ", update_projection]
    end
  end

  defp map_projection_expr?(expr) do
    case expr do
      {:%{}, _, fields} when is_list(fields) ->
        Enum.all?(fields, &match?({_, _}, &1))

      _ ->
        false
    end
  end

  defp map_projection_fields({:%{}, _, fields}), do: fields
  defp map_projection_fields(_), do: []

  defp map_projection_expr(fields) do
    Enum.map_intersperse(fields, ", ", fn
      {key, value} -> [expr_expr(value), " AS ", quote_name(key)]
    end)
  end

  defp source_expr?({:&, _, [source_index]}) when is_integer(source_index), do: true
  defp source_expr?(_), do: false

  defp source_schema_fields(query, source_index) do
    take =
      query
      |> Map.get(:select)
      |> case do
        %Ecto.Query.SelectExpr{take: take} -> take
        _ -> %{}
      end

    case take[source_index] do
      {:struct, fields} when is_list(fields) ->
        Enum.map(fields, &normalize_projection_key/1)

      _ ->
        case source_schema(query, source_index) do
          nil ->
            raise ArgumentError,
                  "unsupported select expression for ExSQL adapter: {:&, _, [#{source_index}]}"

          schema ->
            Enum.map(schema.__schema__(:fields), &normalize_projection_key/1)
        end
    end
  end

  defp source_schema(%Ecto.Query{from: %{source: source}, joins: joins}, source_index) do
    case source_index do
      0 ->
        {_source, schema} = source
        schema

      index when is_integer(index) and index > 0 ->
        joins
        |> Enum.at(index - 1)
        |> case do
          nil -> nil
          %{source: {_source, schema}} -> schema
          _ -> nil
        end

      _ ->
        nil
    end
  end

  defp normalize_projection_key(key) when is_atom(key), do: Atom.to_string(key)
  defp normalize_projection_key(key), do: to_string(key)

  defp from_expr(%Ecto.Query{from: %{source: {source, nil}, prefix: prefix}}) do
    [quote_table(prefix, source), " AS ", source_alias(0)]
  end

  defp from_expr(%Ecto.Query{from: %{source: {source, _schema}, prefix: prefix}}) do
    [quote_table(prefix, source), " AS ", source_alias(0)]
  end

  defp from_expr(%Ecto.Query{} = query) do
    raise ArgumentError, "unsupported from expression for ExSQL adapter: #{inspect(query.from)}"
  end

  defp delete_from_expr(%Ecto.Query{from: %{source: {source, _schema}, prefix: prefix}}) do
    quote_table(prefix, source)
  end

  defp where_expr([]), do: []

  defp where_expr(wheres) do
    [" WHERE ", Enum.map_intersperse(wheres, " AND ", &boolean_expr/1)]
  end

  defp delete_where_expr([]), do: []

  defp delete_where_expr(wheres) do
    [" WHERE ", Enum.map_intersperse(wheres, " AND ", &delete_boolean_expr/1)]
  end

  defp update_where_expr([]), do: []

  defp update_where_expr(wheres) do
    [" WHERE ", Enum.map_intersperse(wheres, " AND ", &update_boolean_expr/1)]
  end

  defp join_expr([]), do: []

  defp join_expr(joins) do
    joins
    |> Enum.with_index(1)
    |> Enum.map(fn {join, index} ->
      [
        ?\s,
        join_qual(join.qual),
        ?\s,
        join_source(join, index),
        " ON ",
        boolean_expr(join.on)
      ]
    end)
  end

  defp join_qual(:inner), do: "INNER JOIN"
  defp join_qual(:left), do: "LEFT OUTER JOIN"
  defp join_qual(:left_lateral), do: "LEFT OUTER JOIN"
  defp join_qual(:cross), do: "CROSS JOIN"

  defp join_source(%Ecto.Query.JoinExpr{source: {source, _schema}}, index) do
    [quote_table(nil, source), " AS ", source_alias(index)]
  end

  defp boolean_expr(%Ecto.Query.BooleanExpr{expr: expr, subqueries: subqueries}),
    do: expr_expr(resolve_subqueries(expr, subqueries))

  defp boolean_expr(%Ecto.Query.QueryExpr{expr: expr}), do: expr_expr(expr)

  defp update_boolean_expr(%Ecto.Query.BooleanExpr{expr: expr, subqueries: subqueries}),
    do: update_expr_expr(resolve_subqueries(expr, subqueries))

  defp update_boolean_expr(%Ecto.Query.QueryExpr{expr: expr}), do: update_expr_expr(expr)

  defp delete_boolean_expr(%Ecto.Query.BooleanExpr{expr: expr, subqueries: subqueries}),
    do: update_expr_expr(resolve_subqueries(expr, subqueries))

  defp delete_boolean_expr(%Ecto.Query.QueryExpr{expr: expr}),
    do: update_expr_expr(expr)

  defp update_expr_expr(expr) do
    expr_expr(expr)
    |> strip_update_aliases()
  end

  defp strip_update_aliases(iodata) do
    iodata
    |> IO.iodata_to_binary()
    |> String.replace(~r/\"s\d+\"\.\"([^\"]+)\"/, "\"\\1\"")
  end

  defp resolve_subqueries({:subquery, ix}, subqueries) do
    subqueries |> Enum.at(ix) |> maybe_to_ecto_subquery()
  end

  defp resolve_subqueries(%Ecto.SubQuery{} = subquery, _subqueries) do
    subquery
  end

  defp resolve_subqueries({op, meta, args}, subqueries) when is_list(args) do
    {op, meta, Enum.map(args, &resolve_subqueries(&1, subqueries))}
  end

  defp resolve_subqueries({op, meta, args}, subqueries) when is_tuple(args) do
    {op, meta, resolve_subqueries(args, subqueries)}
  end

  defp resolve_subqueries({left, right}, subqueries) do
    {resolve_subqueries(left, subqueries), resolve_subqueries(right, subqueries)}
  end

  defp resolve_subqueries(list, subqueries) when is_list(list) do
    Enum.map(list, &resolve_subqueries(&1, subqueries))
  end

  defp resolve_subqueries(map, subqueries) when is_map(map) do
    map
    |> Enum.map(fn {key, value} -> {key, resolve_subqueries(value, subqueries)} end)
    |> Enum.into(%{})
  end

  defp resolve_subqueries(expr, _subqueries), do: expr

  defp maybe_to_ecto_subquery(nil), do: nil

  defp maybe_to_ecto_subquery(%Ecto.SubQuery{} = subquery), do: subquery

  defp maybe_to_ecto_subquery(other) do
    raise ArgumentError, "missing subquery for ExSQL expression: #{inspect(other)}"
  end

  defp subquery_expr(%Ecto.SubQuery{query: query}), do: select_sql(query)

  defp expr_expr({:==, _, [left, right]}), do: [expr_expr(left), " = ", expr_expr(right)]
  defp expr_expr({:!=, _, [left, right]}), do: [expr_expr(left), " != ", expr_expr(right)]
  defp expr_expr({:>, _, [left, right]}), do: [expr_expr(left), " > ", expr_expr(right)]
  defp expr_expr({:<, _, [left, right]}), do: [expr_expr(left), " < ", expr_expr(right)]
  defp expr_expr({:>=, _, [left, right]}), do: [expr_expr(left), " >= ", expr_expr(right)]
  defp expr_expr({:<=, _, [left, right]}), do: [expr_expr(left), " <= ", expr_expr(right)]

  defp expr_expr({:in, _, [left, {:^, _, [_ix, count]}]}) when is_integer(count) and count >= 0 do
    placeholders =
      if count == 0 do
        "NULL"
      else
        String.duplicate("?,", count - 1) |> Kernel.<>("?")
      end

    [expr_expr(left), " IN (", placeholders, ")"]
  end

  defp expr_expr({:in, _, [left, {:^, _, [_ix]}]}) do
    [expr_expr(left), " IN (?)"]
  end

  defp expr_expr({:in, _, [left, right]}) when is_list(right) do
    values = Enum.map_intersperse(right, ", ", &expr_expr/1)
    [expr_expr(left), " IN (", values, ")"]
  end

  defp expr_expr({:and, _, [left, right]}),
    do: [?(, expr_expr(left), " AND ", expr_expr(right), ?)]

  defp expr_expr({:or, _, [left, right]}), do: [?(, expr_expr(left), " OR ", expr_expr(right), ?)]

  defp expr_expr({:not, _, [expr]}), do: ["NOT (", expr_expr(expr), ")"]

  defp expr_expr({:fragment, _, fragments}) do
    Enum.map_intersperse(fragments, "", fn
      {:raw, raw} when is_binary(raw) -> raw
      {:expr, expr} -> expr_expr(expr)
      {_key, value} -> expr_expr(value)
      value -> expr_expr(value)
    end)
  end

  defp expr_expr({:is_nil, _, [expr]}), do: [expr_expr(expr), " IS NULL"]
  defp expr_expr({:not_nil, _, [expr]}), do: [expr_expr(expr), " IS NOT NULL"]
  defp expr_expr(%Ecto.Query.Tagged{value: value}), do: expr_expr(value)
  defp expr_expr(%Ecto.SubQuery{} = subquery), do: subquery_expr(subquery)

  defp expr_expr({:in, _, [left, %Ecto.SubQuery{} = subquery]}) do
    [expr_expr(left), " IN (", expr_expr(subquery), ")"]
  end

  defp expr_expr({:coalesce, _, args}), do: function_expr("coalesce", args)
  defp expr_expr({:count, _, args}), do: function_expr("count", args)
  defp expr_expr({:max, _, args}), do: function_expr("max", args)
  defp expr_expr({:sum, _, args}), do: function_expr("sum", args)
  defp expr_expr({:avg, _, args}), do: function_expr("avg", args)
  defp expr_expr({:min, _, args}), do: function_expr("min", args)

  defp expr_expr({{:., _, [{:&, _, [source_index]}, field]}, _, []}) do
    [source_alias(source_index), ".", quote_name(field)]
  end

  defp expr_expr({:type, _, [expr, _type]}), do: expr_expr(expr)
  defp expr_expr({:^, _, [_ix]}), do: "?"
  defp expr_expr(nil), do: "NULL"
  defp expr_expr(true), do: "1"
  defp expr_expr(false), do: "0"
  defp expr_expr(value) when is_integer(value) or is_float(value), do: to_string(value)
  defp expr_expr(value) when is_binary(value), do: ["'", escape_string(value), "'"]

  defp expr_expr(%DateTime{} = value), do: ["'", DateTime.to_iso8601(value), "'"]
  defp expr_expr(%NaiveDateTime{} = value), do: ["'", NaiveDateTime.to_iso8601(value), "'"]

  defp expr_expr(value) when is_map(value) or is_list(value) do
    expression = Jason.encode_to_iodata!(value)

    ["'", escape_string(IO.iodata_to_binary(expression)), "'"]
  end

  defp expr_expr(expr) do
    raise ArgumentError, "unsupported expression for ExSQL adapter: #{inspect(expr)}"
  end

  defp filters_expr(filters) do
    Enum.map_intersperse(filters, " AND ", fn
      {field, nil} -> [quote_name(field), " IS NULL"]
      {field, _value} -> [quote_name(field), " = ?"]
    end)
  end

  defp order_by_expr([]), do: []

  defp order_by_expr(order_bys) do
    fields =
      order_bys
      |> Enum.flat_map(& &1.expr)
      |> Enum.flat_map(&order_expr/1)
      |> Enum.intersperse(", ")

    [" ORDER BY ", fields]
  end

  defp group_by_expr([]), do: []

  defp group_by_expr(group_bys) do
    fields =
      group_bys
      |> Enum.flat_map(& &1.expr)
      |> Enum.map_intersperse(", ", &expr_expr/1)

    [" GROUP BY ", fields]
  end

  defp having_expr([]), do: []

  defp having_expr(havings),
    do: [" HAVING ", Enum.map_intersperse(havings, " AND ", &boolean_expr/1)]

  defp limit_expr(nil), do: []
  defp limit_expr(%Ecto.Query.QueryExpr{expr: expr}), do: [" LIMIT ", expr_expr(expr)]
  defp limit_expr(%Ecto.Query.LimitExpr{expr: expr}), do: [" LIMIT ", expr_expr(expr)]

  defp offset_expr(nil), do: []
  defp offset_expr(%Ecto.Query.QueryExpr{expr: expr}), do: [" OFFSET ", expr_expr(expr)]

  defp order_direction(:asc), do: "ASC"
  defp order_direction(:desc), do: "DESC"
  defp order_direction(:asc_nulls_first), do: "ASC"
  defp order_direction(:desc_nulls_last), do: "DESC"

  defp order_expr({:asc_nulls_last, expr}) do
    expr = expr_expr(expr)
    [[expr, " IS NULL ASC"], [expr, " ASC"]]
  end

  defp order_expr({:desc_nulls_first, expr}) do
    expr = expr_expr(expr)
    [[expr, " IS NULL DESC"], [expr, " DESC"]]
  end

  defp order_expr({direction, expr}), do: [[expr_expr(expr), ?\s, order_direction(direction)]]

  defp function_expr(name, args) do
    [name, ?(, Enum.map_intersperse(args, ", ", &expr_expr/1), ?)]
  end

  defp placeholders(row) do
    ["(", Enum.map_intersperse(row, ", ", fn _ -> "?" end), ")"]
  end

  defp on_conflict_expr({:raise, _, []}, _header), do: []
  defp on_conflict_expr({:nothing, _, []}, _header), do: " ON CONFLICT DO NOTHING"

  defp on_conflict_expr({:nothing, _, targets}, _header) do
    [" ON CONFLICT (", quote_names(targets), ") DO NOTHING"]
  end

  defp on_conflict_expr({kind, _, _targets}, _header) do
    raise ArgumentError, "ExSQL adapter does not support #{inspect(kind)} conflicts yet"
  end

  defp returning_expr([]), do: []
  defp returning_expr(returning), do: [" RETURNING ", quote_names(returning)]

  defp create_table_sql(prefix, table, columns) do
    {table, composite_pk_def} = composite_pk_definition(table, columns)
    composite_fk_defs = composite_fk_definitions(table, columns)

    [
      prefix,
      quote_table(table.prefix, table.name),
      ?\s,
      ?(,
      column_definitions(table, columns),
      composite_pk_def,
      composite_fk_defs,
      ?),
      options_expr(table.options)
    ]
  end

  defp create_index_sql(prefix, index, name) do
    fields = Enum.map_intersperse(index.columns, ", ", &index_expr/1)

    [
      prefix,
      if_do(index.unique, "UNIQUE "),
      "INDEX ",
      name,
      " ON ",
      quote_table(index.prefix, index.table),
      " (",
      fields,
      ?),
      if_do(index.where, [" WHERE ", to_string(index.where)])
    ]
  end

  defp column_definitions(table, columns) do
    Enum.map_intersperse(columns, ", ", &column_definition(table, &1))
  end

  defp column_definition(table, {:add, name, %Reference{} = ref, opts}) do
    [
      quote_name(name),
      ?\s,
      column_type(ref.type, opts),
      column_options(table, ref.type, opts),
      reference_expr(ref, table, name)
    ]
  end

  defp column_definition(table, {:add, name, type, opts}) do
    [quote_name(name), ?\s, column_type(type, opts), column_options(table, type, opts)]
  end

  defp column_change(table, {:add, name, %Reference{} = ref, opts}) do
    [
      "ADD COLUMN ",
      quote_name(name),
      ?\s,
      column_type(ref.type, opts),
      column_options(table, ref.type, opts),
      reference_expr(ref, table, name)
    ]
  end

  defp column_change(table, {:add, name, type, opts})
       when type in [:utc_datetime, :naive_datetime] do
    opts = Keyword.delete(opts, :null)

    [
      "ADD COLUMN ",
      quote_name(name),
      ?\s,
      column_type(type, opts),
      column_options(table, type, opts)
    ]
  end

  defp column_change(table, {:add, name, type, opts}) do
    [
      "ADD COLUMN ",
      quote_name(name),
      ?\s,
      column_type(type, opts),
      column_options(table, type, opts)
    ]
  end

  defp column_change(_table, {:modify, _name, _type, _opts}) do
    raise ArgumentError, "ALTER COLUMN not supported by ExSQL"
  end

  defp column_change(table, {:remove, name, _type, _opts}) do
    column_change(table, {:remove, name})
  end

  defp column_change(_table, {:remove, name}), do: ["DROP COLUMN ", quote_name(name)]

  defp column_change(_table, _) do
    raise ArgumentError, "Not supported by ExSQL"
  end

  defp column_options(table, type, opts) do
    default = Keyword.fetch(opts, :default)
    null = Keyword.get(opts, :null)
    pk = table.primary_key != :composite and Keyword.get(opts, :primary_key, false)
    collate = Keyword.get(opts, :collate)
    check = Keyword.get(opts, :check)

    [
      default_expr(default),
      null_expr(null),
      collate_expr(collate),
      check_expr(check),
      pk_expr(pk, type)
    ]
  end

  defp check_expr(nil), do: []
  defp check_expr(%{name: name, expr: expr}), do: [" CONSTRAINT ", name, " CHECK (", expr, ")"]

  defp collate_expr(nil), do: []
  defp collate_expr(type) when is_atom(type), do: type |> Atom.to_string() |> collate_expr()
  defp collate_expr(type), do: [" COLLATE ", String.upcase(type)]

  defp null_expr(false), do: " NOT NULL"
  defp null_expr(true), do: " NULL"
  defp null_expr(_), do: []

  defp default_expr({:ok, nil}), do: " DEFAULT NULL"

  defp default_expr({:ok, literal}) when is_binary(literal) do
    [" DEFAULT '", escape_string(literal), ?']
  end

  defp default_expr({:ok, literal}) when is_number(literal) or is_boolean(literal) do
    [" DEFAULT ", to_string(literal)]
  end

  defp default_expr({:ok, {:fragment, expression}}), do: [" DEFAULT ", expression]

  defp default_expr({:ok, value}) when is_map(value) or is_list(value) do
    expression = Jason.encode_to_iodata!(value)
    [" DEFAULT ('", escape_string(IO.iodata_to_binary(expression)), "')"]
  end

  defp default_expr(:error), do: []

  defp column_type(:id, _opts), do: "INTEGER"
  defp column_type(:serial, _opts), do: "INTEGER"
  defp column_type(:bigserial, _opts), do: "INTEGER"
  defp column_type(:boolean, _opts), do: "INTEGER"
  defp column_type(:integer, _opts), do: "INTEGER"
  defp column_type(:bigint, _opts), do: "INTEGER"
  defp column_type(:string, _opts), do: "TEXT"
  defp column_type(:text, _opts), do: "TEXT"
  defp column_type(:float, _opts), do: "NUMERIC"
  defp column_type(:binary, _opts), do: "BLOB"
  defp column_type(:date, _opts), do: "TEXT"
  defp column_type(:utc_datetime, _opts), do: "TEXT"
  defp column_type(:utc_datetime_usec, _opts), do: "TEXT"
  defp column_type(:naive_datetime, _opts), do: "TEXT"
  defp column_type(:naive_datetime_usec, _opts), do: "TEXT"
  defp column_type(:time, _opts), do: "TEXT"
  defp column_type(:time_usec, _opts), do: "TEXT"
  defp column_type(:timestamp, _opts), do: "TEXT"
  defp column_type(:binary_id, _opts), do: "TEXT"
  defp column_type(:map, _opts), do: "TEXT"
  defp column_type({:map, _}, _opts), do: "TEXT"
  defp column_type(:array, _opts), do: "TEXT"
  defp column_type({:array, _}, _opts), do: "TEXT"
  defp column_type(:uuid, _opts), do: "TEXT"
  defp column_type(:decimal, nil), do: "DECIMAL"

  defp column_type(:decimal, opts) do
    precision = Keyword.get(opts, :precision)
    scale = Keyword.get(opts, :scale, 0)

    if precision, do: "DECIMAL(#{precision},#{scale})", else: "DECIMAL"
  end

  defp column_type(type, _opts) when is_atom(type) do
    type |> Atom.to_string() |> String.upcase()
  end

  defp column_type(type, _opts) when is_binary(type), do: type

  defp column_type(type, _opts) do
    raise ArgumentError, "unsupported type `#{inspect(type)}`"
  end

  defp index_expr(literal) when is_binary(literal), do: literal

  defp index_expr({direction, literal}) when direction in [:asc, :desc] do
    [quote_name(literal), ?\s, direction |> Atom.to_string() |> String.upcase()]
  end

  defp index_expr(literal), do: quote_name(literal)

  defp pk_expr(true, type) when type in [:serial, :bigserial],
    do: " PRIMARY KEY AUTOINCREMENT"

  defp pk_expr(true, _), do: " PRIMARY KEY"
  defp pk_expr(_, _), do: []

  defp options_expr(nil), do: []

  defp options_expr(options) when is_list(options) do
    raise ArgumentError, "ExSQL adapter does not support keyword lists in :options"
  end

  defp options_expr(options), do: [?\s, to_string(options)]

  defp reference_expr(%Reference{with: [_]}, _table, _name), do: []

  defp reference_expr(%Reference{} = ref, table, name) do
    [
      " CONSTRAINT ",
      reference_name(ref, table, name),
      " REFERENCES ",
      quote_table(ref.prefix || table.prefix, ref.table),
      ?(,
      quote_name(ref.column),
      ?),
      reference_on_delete(ref.on_delete),
      reference_on_update(ref.on_update)
    ]
  end

  defp reference_name(%Reference{name: nil}, table, column) do
    quote_name("#{table.name}_#{column}_fkey")
  end

  defp reference_name(%Reference{name: name}, _table, _column), do: quote_name(name)

  defp reference_on_delete(:nilify_all), do: " ON DELETE SET NULL"
  defp reference_on_delete(:default_all), do: " ON DELETE SET DEFAULT"
  defp reference_on_delete(:delete_all), do: " ON DELETE CASCADE"
  defp reference_on_delete(:restrict), do: " ON DELETE RESTRICT"
  defp reference_on_delete(_), do: []

  defp reference_on_update(:nilify_all), do: " ON UPDATE SET NULL"
  defp reference_on_update(:default_all), do: " ON UPDATE SET DEFAULT"
  defp reference_on_update(:update_all), do: " ON UPDATE CASCADE"
  defp reference_on_update(:restrict), do: " ON UPDATE RESTRICT"
  defp reference_on_update(_), do: []

  defp composite_pk_definition(%Table{} = table, columns) do
    pks =
      Enum.reduce(columns, [], fn {_, name, _, opts}, pk_acc ->
        if Keyword.get(opts, :primary_key, false), do: [name | pk_acc], else: pk_acc
      end)

    if length(pks) > 1 do
      composite_pk_expr = pks |> Enum.reverse() |> Enum.map_join(",", &quote_name/1)
      {%{table | primary_key: :composite}, ", PRIMARY KEY (" <> composite_pk_expr <> ")"}
    else
      {table, ""}
    end
  end

  defp composite_fk_definitions(%Table{} = table, columns) do
    columns
    |> Enum.filter(fn
      {_op, _name, %Reference{with: [_]}, _opts} -> true
      _ -> false
    end)
    |> Enum.map(&composite_fk_definition(table, &1))
  end

  defp composite_fk_definition(table, {_op, name, ref, _opts}) do
    {current_columns, reference_columns} = Enum.unzip([{name, ref.column} | ref.with])

    [
      ", FOREIGN KEY (",
      quote_names(current_columns),
      ") REFERENCES ",
      quote_table(ref.prefix || table.prefix, ref.table),
      ?(,
      quote_names(reference_columns),
      ?),
      reference_on_delete(ref.on_delete),
      reference_on_update(ref.on_update)
    ]
  end

  defp quote_names(names), do: Enum.map_intersperse(names, ?,, &quote_name/1)

  defp qualified_names(source_index, names),
    do: Enum.map_intersperse(names, ", ", &qualified_name(source_index, &1))

  defp qualified_name(source_index, name), do: [source_alias(source_index), ".", quote_name(name)]
  defp source_alias(index), do: quote_name("s#{index}")
  defp quote_name(name), do: quote_entity(name)
  defp quote_table(nil, name), do: quote_entity(name)
  defp quote_table(false, name), do: quote_entity(name)

  defp quote_table(_, name), do: quote_entity(name)

  defp quote_entity(value) when is_atom(value), do: value |> Atom.to_string() |> quote_entity()
  defp quote_entity(value), do: [[?", value, ?"]]

  defp if_do(condition, value), do: if(condition, do: value, else: [])

  defp escape_string(value) when is_binary(value) do
    value
    |> :binary.replace("'", "''", [:global])
    |> :binary.replace("\\", "\\\\", [:global])
  end
end