defmodule Ecto.Adapters.ExSQL.Connection do
@moduledoc false
alias Ecto.Migration.{Constraint, Index, Reference, Table}
alias ExSQL.Ecto.Query
def child_spec(options) do
{:ok, _} = Application.ensure_all_started(:db_connection)
DBConnection.child_spec(ExSQL.Ecto.Connection, options)
end
def start_link(options) do
DBConnection.start_link(ExSQL.Ecto.Connection, options)
end
def prepare_execute(conn, name, sql, params, options) do
query = Query.build(name: name, statement: sql)
DBConnection.prepare_execute(conn, query, params, options)
end
def execute(conn, %Query{} = query, params, options) do
DBConnection.execute(conn, query, params, options)
end
def execute(conn, sql, params, options) when is_binary(sql) or is_list(sql) do
query = Query.build(name: "", statement: IO.iodata_to_binary(sql))
case DBConnection.prepare_execute(conn, query, params, options) do
{:ok, %Query{}, result} -> {:ok, result}
other -> other
end
end
def query(conn, sql, params, options) do
query = Query.build(statement: IO.iodata_to_binary(sql))
case DBConnection.prepare_execute(conn, query, params, options) do
{:ok, %Query{}, result} -> {:ok, result}
other -> other
end
end
def query_many(_conn, _sql, _params, _options) do
raise RuntimeError, "query_many is not supported in the ExSQL adapter"
end
def to_constraints(%ExSQL.Ecto.Error{message: "UNIQUE constraint failed: " <> columns}, _opts) do
[unique: unique_constraint_name(columns)]
end
def to_constraints(
%ExSQL.Ecto.Error{message: "CHECK constraint failed: " <> check} = _error,
_opts
) do
[check: check]
end
def to_constraints(%ExSQL.Ecto.Error{message: "FOREIGN KEY constraint failed"} = error, opts) do
case foreign_key_candidates(error.statement, opts) do
[] -> [foreign_key: default_foreign_key_name(opts[:source])]
candidates -> candidates
end
end
def to_constraints(_error, _opts), do: []
defp foreign_key_candidates(statement, opts) do
with true <- is_binary(statement),
source <- default_foreign_key_name(opts[:source]),
columns <- extract_target_columns(statement),
cols when cols != [] <- Enum.filter(columns, &String.ends_with?(&1, "_id")) do
Enum.map(cols, &{:foreign_key, "#{source}_#{&1}_fkey"})
else
_ -> []
end
end
defp default_foreign_key_name(nil), do: ""
defp default_foreign_key_name(source), do: to_string(source)
defp extract_target_columns(statement) do
statement
|> String.trim()
|> to_string()
|> case do
"INSERT INTO " <> rest ->
rest
|> String.split("(", parts: 2)
|> case do
[_table, cols_and_rest] ->
cols_and_rest
|> String.split(")", parts: 2)
|> case do
[raw_cols | _] ->
raw_cols
|> String.split(",")
|> Enum.map(&String.trim/1)
|> Enum.map(&String.trim(&1, "\""))
_ ->
[]
end
_ ->
[]
end
"UPDATE " <> rest ->
rest
|> String.split("SET", parts: 2)
|> case do
[_table, assignments] ->
assignments
|> String.split("WHERE", parts: 2)
|> List.first()
|> String.split(",")
|> Enum.map(&String.trim/1)
|> Enum.map(&String.split(&1, " = ", parts: 2))
|> Enum.map(fn
[lhs, _rhs] ->
lhs
|> String.trim()
|> String.trim_leading("\"")
|> String.trim_trailing("\"")
_ ->
nil
end)
|> Enum.filter(&is_binary/1)
_ ->
[]
end
_ ->
[]
end
end
def stream(conn, sql, params, options) do
query = Query.build(statement: sql)
DBConnection.stream(conn, query, params, options)
end
def execute_ddl({_command, %Table{options: options}, _}) when is_list(options) do
raise ArgumentError, "ExSQL adapter does not support keyword lists in :options"
end
def execute_ddl({:create, %Table{} = table, columns}) do
[create_table_sql("CREATE TABLE ", table, columns)]
end
def execute_ddl({:create_if_not_exists, %Table{} = table, columns}) do
[create_table_sql("CREATE TABLE IF NOT EXISTS ", table, columns)]
end
def execute_ddl({:drop, %Table{} = table}) do
[["DROP TABLE ", quote_table(table.prefix, table.name)]]
end
def execute_ddl({:drop, %Table{} = table, _mode}), do: execute_ddl({:drop, table})
def execute_ddl({:drop_if_exists, %Table{} = table}) do
[["DROP TABLE IF EXISTS ", quote_table(table.prefix, table.name)]]
end
def execute_ddl({:drop_if_exists, %Table{} = table, _mode}) do
execute_ddl({:drop_if_exists, table})
end
def execute_ddl({:alter, %Table{} = table, changes}) do
Enum.map(changes, fn change ->
["ALTER TABLE ", quote_table(table.prefix, table.name), ?\s, column_change(table, change)]
end)
end
def execute_ddl({_, %Index{concurrently: true}}) do
raise ArgumentError, "`concurrently` is not supported with ExSQL"
end
def execute_ddl({_, %Index{only: true}}) do
raise ArgumentError, "`only` is not supported with ExSQL"
end
def execute_ddl({_, %Index{include: [_ | _]}}) do
raise ArgumentError, "`include` is not supported with ExSQL"
end
def execute_ddl({_, %Index{using: using}}) when not is_nil(using) do
raise ArgumentError, "`using` is not supported with ExSQL"
end
def execute_ddl({_, %Index{nulls_distinct: nulls_distinct}})
when not is_nil(nulls_distinct) do
raise ArgumentError, "`nulls_distinct` is not supported with ExSQL"
end
def execute_ddl({:create, %Index{} = index}) do
[create_index_sql("CREATE ", index, quote_name(index.name))]
end
def execute_ddl({:create_if_not_exists, %Index{} = index}) do
[create_index_sql("CREATE ", index, ["IF NOT EXISTS ", quote_name(index.name)])]
end
def execute_ddl({:drop, %Index{} = index}) do
[["DROP INDEX ", quote_table(index.prefix, index.name)]]
end
def execute_ddl({:drop, %Index{} = index, _mode}), do: execute_ddl({:drop, index})
def execute_ddl({:drop_if_exists, %Index{} = index}) do
[["DROP INDEX IF EXISTS ", quote_table(index.prefix, index.name)]]
end
def execute_ddl({:drop_if_exists, %Index{} = index, _mode}) do
execute_ddl({:drop_if_exists, index})
end
def execute_ddl({:rename, %Table{} = current_table, %Table{} = new_table}) do
[
[
"ALTER TABLE ",
quote_table(current_table.prefix, current_table.name),
" RENAME TO ",
quote_table(nil, new_table.name)
]
]
end
def execute_ddl({:rename, %Table{} = current_table, old_col, new_col}) do
[
[
"ALTER TABLE ",
quote_table(current_table.prefix, current_table.name),
" RENAME COLUMN ",
quote_name(old_col),
" TO ",
quote_name(new_col)
]
]
end
def execute_ddl({:rename, %Index{} = index, new_index}) do
[
execute_ddl({:drop, index}),
execute_ddl({:create, %Index{index | name: new_index}})
]
end
def execute_ddl({:create, %Constraint{}}) do
raise ArgumentError, "ExSQL does not support ALTER TABLE ADD CONSTRAINT."
end
def execute_ddl({:drop, %Constraint{}, _mode}) do
raise ArgumentError, "ExSQL does not support ALTER TABLE DROP CONSTRAINT."
end
def execute_ddl({:drop_if_exists, %Constraint{}, _mode}) do
raise ArgumentError, "ExSQL does not support ALTER TABLE DROP CONSTRAINT."
end
def execute_ddl(string) when is_binary(string), do: [string]
def execute_ddl(keyword) when is_list(keyword) do
raise ArgumentError, "ExSQL adapter does not support keyword lists in execute"
end
def ddl_logs(_result), do: []
def all(query), do: select_sql(query)
def update_all(query), do: update_all_sql(query)
def delete_all(query), do: delete_all_sql(query)
def insert(prefix, table, header, rows, on_conflict, returning, placeholders) do
insert_sql(prefix, table, header, rows, on_conflict, returning, placeholders)
end
def insert(prefix, table, header, rows, on_conflict, returning, placeholders, _opts) do
insert_sql(prefix, table, header, rows, on_conflict, returning, placeholders)
end
def update(prefix, table, fields, filters, returning) do
fields = Enum.map_intersperse(fields, ", ", &[quote_name(&1), " = ?"])
[
"UPDATE ",
quote_table(prefix, table),
" SET ",
fields,
" WHERE ",
filters_expr(filters),
returning_expr(returning)
]
end
def delete(prefix, table, filters, returning) do
[
"DELETE FROM ",
quote_table(prefix, table),
" WHERE ",
filters_expr(filters),
returning_expr(returning)
]
end
defp update_all_sql(%Ecto.Query{} = query) do
assert_simple_update_query!(query)
[
"UPDATE ",
update_from_expr(query),
" SET ",
update_fields(query),
update_where_expr(query.wheres),
update_returning_expr(query)
]
end
defp update_fields(%Ecto.Query{updates: updates}) do
updates
|> Enum.flat_map(fn
%Ecto.Query.QueryExpr{expr: expr} ->
Enum.flat_map(expr, fn
{op, fields} when is_list(fields) ->
Enum.map(fields, fn {field, value} ->
update_operation(op, field, value)
end)
_other ->
[]
end)
_other ->
[]
end)
|> Enum.intersperse(", ")
end
# SQLite UPDATE has no source alias, so the SET right-hand side must reference
# the target's columns unqualified — `json_insert("errors", ...)`, not
# `json_insert("s0"."errors", ...)`. Strip the alias as the WHERE clause does.
defp update_operation(:set, field, value),
do: [update_field(field), " = ", update_expr_expr(value)]
defp update_operation(:inc, field, value),
do: [update_field(field), " = ", update_field(field), " + ", update_expr_expr(value)]
defp update_operation(operation, _field, _value),
do:
raise(
ArgumentError,
"unsupported update operation for ExSQL adapter: #{inspect(operation)}"
)
defp update_field(field), do: quote_name(field)
defp update_from_expr(%Ecto.Query{from: %{source: {source, _schema}, prefix: prefix}}) do
quote_table(prefix, source)
end
defp unique_constraint_name(columns) do
columns
|> String.split(", ")
|> Enum.map(fn column ->
column
|> String.split(".")
|> List.last()
end)
|> then(fn column_names ->
table =
columns
|> String.split(", ")
|> List.first()
|> String.split(".")
|> List.first()
Enum.join([table | column_names] ++ ["index"], "_")
end)
end
defp select_sql(%Ecto.Query{} = query) do
assert_simple_query!(query)
[
"SELECT ",
distinct_expr(query.distinct),
select_expr(query),
" FROM ",
from_expr(query),
join_expr(query.joins),
where_expr(query.wheres),
group_by_expr(query.group_bys),
having_expr(query.havings),
order_by_expr(query.order_bys),
limit_expr(query.limit),
offset_expr(query.offset)
]
end
defp delete_all_sql(%Ecto.Query{} = query) do
assert_simple_query!(query)
[
"DELETE FROM ",
delete_from_expr(query),
delete_where_expr(query.wheres)
]
end
defp insert_sql(prefix, table, header, rows, on_conflict, returning, placeholders) do
if placeholders != [] do
raise ArgumentError, "ExSQL adapter does not support insert placeholders yet"
end
values =
case header do
[] ->
[" DEFAULT VALUES"]
_ ->
columns = [" (", quote_names(header), ") "]
row_placeholders = Enum.map_intersperse(rows, ", ", fn row -> placeholders(row) end)
[columns, "VALUES ", row_placeholders]
end
[
"INSERT INTO ",
quote_table(prefix, table),
values,
on_conflict_expr(on_conflict, header),
returning_expr(returning)
]
end
defp assert_simple_query!(%Ecto.Query{} = query) do
if query.combinations != [] or query.with_ctes != nil or query.lock != nil do
raise ArgumentError, "this Ecto query shape is not implemented in Ecto.Adapters.ExSQL yet"
end
end
defp distinct_expr(nil), do: []
defp distinct_expr(%Ecto.Query.ByExpr{expr: true}) do
"DISTINCT "
end
defp distinct_expr(%Ecto.Query.ByExpr{}) do
raise ArgumentError, "distinct expressions are not supported in Ecto.Adapters.ExSQL yet"
end
defp assert_simple_update_query!(%Ecto.Query{} = query) do
assert_simple_query!(query)
if query.group_bys != [] or query.havings != [] or
query.order_bys != [] or
query.limit != nil or query.offset != nil or query.joins != [] do
raise ArgumentError,
"this Ecto query shape is not implemented in Ecto.Adapters.ExSQL yet"
end
end
defp update_returning_expr(%Ecto.Query{select: nil}), do: []
defp update_returning_expr(
%Ecto.Query{select: %{expr: {:&, [], [source_index]}} = select} = query
) do
fields =
select
|> Map.get(:fields, nil)
|> normalize_selected_fields(query, source_index)
[" RETURNING ", quote_names(fields)]
end
defp update_returning_expr(%Ecto.Query{select: %{expr: fields, take: take}} = query)
when is_list(fields) do
field_names =
fields
|> Enum.map(&returning_projection_field(&1, query, take))
|> Enum.uniq()
[" RETURNING ", quote_names(field_names)]
end
defp update_returning_expr(%Ecto.Query{select: _}),
do: raise(ArgumentError, "unsupported select in update for ExSQL adapter: query returning")
defp normalize_selected_fields(nil, query, source_index) do
if source_schema(query, source_index) do
source_schema_fields(query, source_index)
else
raise ArgumentError,
"unsupported update returning shape in ExSQL adapter: cannot infer fields without schema"
end
end
defp normalize_selected_fields(_fields, query, source_index) do
case Map.get(query.select.take, source_index) do
{:any, field_names} ->
field_names
_ ->
if source_schema(query, source_index) do
source_schema_fields(query, source_index)
else
raise ArgumentError,
"unsupported update returning shape in ExSQL adapter: cannot infer fields without schema"
end
end
end
defp returning_projection_field(
{{:., _, [{:&, [], [source_index]}, field]}, [], []},
_query,
take
) do
query_fields =
case Map.get(take, source_index) do
nil ->
[field]
{:any, only_fields} ->
if field in only_fields, do: [field], else: []
_ ->
[field]
end
query_fields
end
defp returning_projection_field(_, _query, _take), do: []
defp select_expr(%Ecto.Query{select: nil, from: %{source: {_source, nil}}}), do: "*"
defp select_expr(%Ecto.Query{select: nil, from: %{source: {_source, schema}}}) do
qualified_names(0, schema.__schema__(:fields))
end
defp select_expr(%Ecto.Query{select: %{expr: expr}} = query), do: select_projection(expr, query)
defp select_projection({:merge, _, [left, right]}, query),
do: select_merge_projection(left, right, query)
defp select_projection({:%{}, _, [expr]}, query) do
case expr do
{:|, _, [{:&, _, [source_index]}, updates]} ->
select_source_update_projection(source_index, updates, query)
_ ->
map_projection_expr([expr])
end
end
defp select_projection({:%{}, _, fields}, _query), do: map_projection_expr(fields)
defp select_projection({:{}, _, fields}, query),
do: Enum.map_intersperse(fields, ", ", &select_projection(&1, query))
defp select_projection({{:., _, [{:&, _, [source_index]}, field]}, _, []}, _query),
do: qualified_name(source_index, field)
defp select_projection({:type, _, [expr, _type]}, query), do: select_projection(expr, query)
defp select_projection({:&, _, [source_index]}, query) do
source_fields = source_schema_fields(query, source_index)
qualified_names(source_index, source_fields)
end
defp select_projection({:^, _, [_ix]}, _query), do: "?"
defp select_projection(nil, _query), do: "NULL"
defp select_projection(value, _query) when is_integer(value) or is_float(value),
do: to_string(value)
defp select_projection(value, _query) when is_binary(value),
do: ["'", escape_string(value), "'"]
defp select_projection({function, _, _args} = expr, _query)
when function in [:coalesce, :count, :max, :sum, :avg, :min],
do: expr_expr(expr)
defp select_projection(expr, _query) do
raise ArgumentError, "unsupported select expression for ExSQL adapter: #{inspect(expr)}"
end
defp select_merge_projection(left, right, query) do
cond do
source_expr?(left) and map_projection_expr?(right) ->
source_with_map_projection(left, right, query)
source_expr?(right) and map_projection_expr?(left) ->
source_with_map_projection(right, left, query)
map_projection_expr?(left) and map_projection_expr?(right) ->
[
map_projection_expr(map_projection_fields(left)),
", ",
map_projection_expr(map_projection_fields(right))
]
true ->
[select_projection(left, query), ", ", select_projection(right, query)]
end
end
defp source_with_map_projection(
{:&, _, [source_index]},
map_expr,
query
) do
map_fields = map_projection_fields(map_expr)
override_keys = Enum.map(map_fields, &normalize_projection_key(elem(&1, 0)))
selected_fields =
source_schema_fields(query, source_index)
|> Enum.reject(&Enum.member?(override_keys, &1))
|> Enum.map_intersperse(", ", &qualified_name(source_index, &1))
merged_fields = map_projection_expr(map_fields)
case {selected_fields, merged_fields} do
{[], []} -> []
{[], merged_fields} -> merged_fields
{selected_fields, []} -> selected_fields
{selected_fields, merged_fields} -> [selected_fields, ", ", merged_fields]
end
end
defp select_source_update_projection(source_index, updates, query) do
source_fields = source_schema_fields(query, source_index)
override_fields = Enum.map(updates, &normalize_projection_key(elem(&1, 0)))
base_projection =
source_fields
|> Enum.reject(&Enum.member?(override_fields, &1))
|> Enum.map_intersperse(", ", &qualified_name(source_index, &1))
update_projection = map_projection_expr(updates)
case {base_projection, update_projection} do
{[], []} -> []
{[], update_projection} -> update_projection
{base_projection, []} -> base_projection
{base_projection, update_projection} -> [base_projection, ", ", update_projection]
end
end
defp map_projection_expr?(expr) do
case expr do
{:%{}, _, fields} when is_list(fields) ->
Enum.all?(fields, &match?({_, _}, &1))
_ ->
false
end
end
defp map_projection_fields({:%{}, _, fields}), do: fields
defp map_projection_fields(_), do: []
defp map_projection_expr(fields) do
Enum.map_intersperse(fields, ", ", fn
{key, value} -> [expr_expr(value), " AS ", quote_name(key)]
end)
end
defp source_expr?({:&, _, [source_index]}) when is_integer(source_index), do: true
defp source_expr?(_), do: false
defp source_schema_fields(query, source_index) do
take =
query
|> Map.get(:select)
|> case do
%Ecto.Query.SelectExpr{take: take} -> take
_ -> %{}
end
case take[source_index] do
{:struct, fields} when is_list(fields) ->
Enum.map(fields, &normalize_projection_key/1)
_ ->
case source_schema(query, source_index) do
nil ->
raise ArgumentError,
"unsupported select expression for ExSQL adapter: {:&, _, [#{source_index}]}"
schema ->
Enum.map(schema.__schema__(:fields), &normalize_projection_key/1)
end
end
end
defp source_schema(%Ecto.Query{from: %{source: source}, joins: joins}, source_index) do
case source_index do
0 ->
{_source, schema} = source
schema
index when is_integer(index) and index > 0 ->
joins
|> Enum.at(index - 1)
|> case do
nil -> nil
%{source: {_source, schema}} -> schema
_ -> nil
end
_ ->
nil
end
end
defp normalize_projection_key(key) when is_atom(key), do: Atom.to_string(key)
defp normalize_projection_key(key), do: to_string(key)
defp from_expr(%Ecto.Query{from: %{source: {source, nil}, prefix: prefix}}) do
[quote_table(prefix, source), " AS ", source_alias(0)]
end
defp from_expr(%Ecto.Query{from: %{source: {source, _schema}, prefix: prefix}}) do
[quote_table(prefix, source), " AS ", source_alias(0)]
end
defp from_expr(%Ecto.Query{} = query) do
raise ArgumentError, "unsupported from expression for ExSQL adapter: #{inspect(query.from)}"
end
defp delete_from_expr(%Ecto.Query{from: %{source: {source, _schema}, prefix: prefix}}) do
quote_table(prefix, source)
end
defp where_expr([]), do: []
defp where_expr(wheres) do
[" WHERE ", Enum.map_intersperse(wheres, " AND ", &boolean_expr/1)]
end
defp delete_where_expr([]), do: []
defp delete_where_expr(wheres) do
[" WHERE ", Enum.map_intersperse(wheres, " AND ", &delete_boolean_expr/1)]
end
defp update_where_expr([]), do: []
defp update_where_expr(wheres) do
[" WHERE ", Enum.map_intersperse(wheres, " AND ", &update_boolean_expr/1)]
end
defp join_expr([]), do: []
defp join_expr(joins) do
joins
|> Enum.with_index(1)
|> Enum.map(fn {join, index} ->
[
?\s,
join_qual(join.qual),
?\s,
join_source(join, index),
" ON ",
boolean_expr(join.on)
]
end)
end
defp join_qual(:inner), do: "INNER JOIN"
defp join_qual(:left), do: "LEFT OUTER JOIN"
defp join_qual(:left_lateral), do: "LEFT OUTER JOIN"
defp join_qual(:cross), do: "CROSS JOIN"
defp join_source(%Ecto.Query.JoinExpr{source: {source, _schema}}, index) do
[quote_table(nil, source), " AS ", source_alias(index)]
end
defp boolean_expr(%Ecto.Query.BooleanExpr{expr: expr, subqueries: subqueries}),
do: expr_expr(resolve_subqueries(expr, subqueries))
defp boolean_expr(%Ecto.Query.QueryExpr{expr: expr}), do: expr_expr(expr)
defp update_boolean_expr(%Ecto.Query.BooleanExpr{expr: expr, subqueries: subqueries}),
do: update_expr_expr(resolve_subqueries(expr, subqueries))
defp update_boolean_expr(%Ecto.Query.QueryExpr{expr: expr}), do: update_expr_expr(expr)
defp delete_boolean_expr(%Ecto.Query.BooleanExpr{expr: expr, subqueries: subqueries}),
do: update_expr_expr(resolve_subqueries(expr, subqueries))
defp delete_boolean_expr(%Ecto.Query.QueryExpr{expr: expr}),
do: update_expr_expr(expr)
defp update_expr_expr(expr) do
expr_expr(expr)
|> strip_update_aliases()
end
defp strip_update_aliases(iodata) do
iodata
|> IO.iodata_to_binary()
|> String.replace(~r/\"s\d+\"\.\"([^\"]+)\"/, "\"\\1\"")
end
defp resolve_subqueries({:subquery, ix}, subqueries) do
subqueries |> Enum.at(ix) |> maybe_to_ecto_subquery()
end
defp resolve_subqueries(%Ecto.SubQuery{} = subquery, _subqueries) do
subquery
end
defp resolve_subqueries({op, meta, args}, subqueries) when is_list(args) do
{op, meta, Enum.map(args, &resolve_subqueries(&1, subqueries))}
end
defp resolve_subqueries({op, meta, args}, subqueries) when is_tuple(args) do
{op, meta, resolve_subqueries(args, subqueries)}
end
defp resolve_subqueries({left, right}, subqueries) do
{resolve_subqueries(left, subqueries), resolve_subqueries(right, subqueries)}
end
defp resolve_subqueries(list, subqueries) when is_list(list) do
Enum.map(list, &resolve_subqueries(&1, subqueries))
end
defp resolve_subqueries(map, subqueries) when is_map(map) do
map
|> Enum.map(fn {key, value} -> {key, resolve_subqueries(value, subqueries)} end)
|> Enum.into(%{})
end
defp resolve_subqueries(expr, _subqueries), do: expr
defp maybe_to_ecto_subquery(nil), do: nil
defp maybe_to_ecto_subquery(%Ecto.SubQuery{} = subquery), do: subquery
defp maybe_to_ecto_subquery(other) do
raise ArgumentError, "missing subquery for ExSQL expression: #{inspect(other)}"
end
defp subquery_expr(%Ecto.SubQuery{query: query}), do: select_sql(query)
defp expr_expr({:==, _, [left, right]}), do: [expr_expr(left), " = ", expr_expr(right)]
defp expr_expr({:!=, _, [left, right]}), do: [expr_expr(left), " != ", expr_expr(right)]
defp expr_expr({:>, _, [left, right]}), do: [expr_expr(left), " > ", expr_expr(right)]
defp expr_expr({:<, _, [left, right]}), do: [expr_expr(left), " < ", expr_expr(right)]
defp expr_expr({:>=, _, [left, right]}), do: [expr_expr(left), " >= ", expr_expr(right)]
defp expr_expr({:<=, _, [left, right]}), do: [expr_expr(left), " <= ", expr_expr(right)]
defp expr_expr({:in, _, [left, {:^, _, [_ix, count]}]}) when is_integer(count) and count >= 0 do
placeholders =
if count == 0 do
"NULL"
else
String.duplicate("?,", count - 1) |> Kernel.<>("?")
end
[expr_expr(left), " IN (", placeholders, ")"]
end
defp expr_expr({:in, _, [left, {:^, _, [_ix]}]}) do
[expr_expr(left), " IN (?)"]
end
defp expr_expr({:in, _, [left, right]}) when is_list(right) do
values = Enum.map_intersperse(right, ", ", &expr_expr/1)
[expr_expr(left), " IN (", values, ")"]
end
defp expr_expr({:and, _, [left, right]}),
do: [?(, expr_expr(left), " AND ", expr_expr(right), ?)]
defp expr_expr({:or, _, [left, right]}), do: [?(, expr_expr(left), " OR ", expr_expr(right), ?)]
defp expr_expr({:not, _, [expr]}), do: ["NOT (", expr_expr(expr), ")"]
defp expr_expr({:fragment, _, fragments}) do
Enum.map_intersperse(fragments, "", fn
{:raw, raw} when is_binary(raw) -> raw
{:expr, expr} -> expr_expr(expr)
{_key, value} -> expr_expr(value)
value -> expr_expr(value)
end)
end
defp expr_expr({:is_nil, _, [expr]}), do: [expr_expr(expr), " IS NULL"]
defp expr_expr({:not_nil, _, [expr]}), do: [expr_expr(expr), " IS NOT NULL"]
defp expr_expr(%Ecto.Query.Tagged{value: value}), do: expr_expr(value)
defp expr_expr(%Ecto.SubQuery{} = subquery), do: subquery_expr(subquery)
defp expr_expr({:in, _, [left, %Ecto.SubQuery{} = subquery]}) do
[expr_expr(left), " IN (", expr_expr(subquery), ")"]
end
defp expr_expr({:coalesce, _, args}), do: function_expr("coalesce", args)
defp expr_expr({:count, _, args}), do: function_expr("count", args)
defp expr_expr({:max, _, args}), do: function_expr("max", args)
defp expr_expr({:sum, _, args}), do: function_expr("sum", args)
defp expr_expr({:avg, _, args}), do: function_expr("avg", args)
defp expr_expr({:min, _, args}), do: function_expr("min", args)
defp expr_expr({{:., _, [{:&, _, [source_index]}, field]}, _, []}) do
[source_alias(source_index), ".", quote_name(field)]
end
defp expr_expr({:type, _, [expr, _type]}), do: expr_expr(expr)
defp expr_expr({:^, _, [_ix]}), do: "?"
defp expr_expr(nil), do: "NULL"
defp expr_expr(true), do: "1"
defp expr_expr(false), do: "0"
defp expr_expr(value) when is_integer(value) or is_float(value), do: to_string(value)
defp expr_expr(value) when is_binary(value), do: ["'", escape_string(value), "'"]
defp expr_expr(%DateTime{} = value), do: ["'", DateTime.to_iso8601(value), "'"]
defp expr_expr(%NaiveDateTime{} = value), do: ["'", NaiveDateTime.to_iso8601(value), "'"]
defp expr_expr(value) when is_map(value) or is_list(value) do
expression = Jason.encode_to_iodata!(value)
["'", escape_string(IO.iodata_to_binary(expression)), "'"]
end
defp expr_expr(expr) do
raise ArgumentError, "unsupported expression for ExSQL adapter: #{inspect(expr)}"
end
defp filters_expr(filters) do
Enum.map_intersperse(filters, " AND ", fn
{field, nil} -> [quote_name(field), " IS NULL"]
{field, _value} -> [quote_name(field), " = ?"]
end)
end
defp order_by_expr([]), do: []
defp order_by_expr(order_bys) do
fields =
order_bys
|> Enum.flat_map(& &1.expr)
|> Enum.flat_map(&order_expr/1)
|> Enum.intersperse(", ")
[" ORDER BY ", fields]
end
defp group_by_expr([]), do: []
defp group_by_expr(group_bys) do
fields =
group_bys
|> Enum.flat_map(& &1.expr)
|> Enum.map_intersperse(", ", &expr_expr/1)
[" GROUP BY ", fields]
end
defp having_expr([]), do: []
defp having_expr(havings),
do: [" HAVING ", Enum.map_intersperse(havings, " AND ", &boolean_expr/1)]
defp limit_expr(nil), do: []
defp limit_expr(%Ecto.Query.QueryExpr{expr: expr}), do: [" LIMIT ", expr_expr(expr)]
defp limit_expr(%Ecto.Query.LimitExpr{expr: expr}), do: [" LIMIT ", expr_expr(expr)]
defp offset_expr(nil), do: []
defp offset_expr(%Ecto.Query.QueryExpr{expr: expr}), do: [" OFFSET ", expr_expr(expr)]
defp order_direction(:asc), do: "ASC"
defp order_direction(:desc), do: "DESC"
defp order_direction(:asc_nulls_first), do: "ASC"
defp order_direction(:desc_nulls_last), do: "DESC"
defp order_expr({:asc_nulls_last, expr}) do
expr = expr_expr(expr)
[[expr, " IS NULL ASC"], [expr, " ASC"]]
end
defp order_expr({:desc_nulls_first, expr}) do
expr = expr_expr(expr)
[[expr, " IS NULL DESC"], [expr, " DESC"]]
end
defp order_expr({direction, expr}), do: [[expr_expr(expr), ?\s, order_direction(direction)]]
defp function_expr(name, args) do
[name, ?(, Enum.map_intersperse(args, ", ", &expr_expr/1), ?)]
end
defp placeholders(row) do
["(", Enum.map_intersperse(row, ", ", fn _ -> "?" end), ")"]
end
defp on_conflict_expr({:raise, _, []}, _header), do: []
defp on_conflict_expr({:nothing, _, []}, _header), do: " ON CONFLICT DO NOTHING"
defp on_conflict_expr({:nothing, _, targets}, _header) do
[" ON CONFLICT (", quote_names(targets), ") DO NOTHING"]
end
defp on_conflict_expr({kind, _, _targets}, _header) do
raise ArgumentError, "ExSQL adapter does not support #{inspect(kind)} conflicts yet"
end
defp returning_expr([]), do: []
defp returning_expr(returning), do: [" RETURNING ", quote_names(returning)]
defp create_table_sql(prefix, table, columns) do
{table, composite_pk_def} = composite_pk_definition(table, columns)
composite_fk_defs = composite_fk_definitions(table, columns)
[
prefix,
quote_table(table.prefix, table.name),
?\s,
?(,
column_definitions(table, columns),
composite_pk_def,
composite_fk_defs,
?),
options_expr(table.options)
]
end
defp create_index_sql(prefix, index, name) do
fields = Enum.map_intersperse(index.columns, ", ", &index_expr/1)
[
prefix,
if_do(index.unique, "UNIQUE "),
"INDEX ",
name,
" ON ",
quote_table(index.prefix, index.table),
" (",
fields,
?),
if_do(index.where, [" WHERE ", to_string(index.where)])
]
end
defp column_definitions(table, columns) do
Enum.map_intersperse(columns, ", ", &column_definition(table, &1))
end
defp column_definition(table, {:add, name, %Reference{} = ref, opts}) do
[
quote_name(name),
?\s,
column_type(ref.type, opts),
column_options(table, ref.type, opts),
reference_expr(ref, table, name)
]
end
defp column_definition(table, {:add, name, type, opts}) do
[quote_name(name), ?\s, column_type(type, opts), column_options(table, type, opts)]
end
defp column_change(table, {:add, name, %Reference{} = ref, opts}) do
[
"ADD COLUMN ",
quote_name(name),
?\s,
column_type(ref.type, opts),
column_options(table, ref.type, opts),
reference_expr(ref, table, name)
]
end
defp column_change(table, {:add, name, type, opts})
when type in [:utc_datetime, :naive_datetime] do
opts = Keyword.delete(opts, :null)
[
"ADD COLUMN ",
quote_name(name),
?\s,
column_type(type, opts),
column_options(table, type, opts)
]
end
defp column_change(table, {:add, name, type, opts}) do
[
"ADD COLUMN ",
quote_name(name),
?\s,
column_type(type, opts),
column_options(table, type, opts)
]
end
defp column_change(_table, {:modify, _name, _type, _opts}) do
raise ArgumentError, "ALTER COLUMN not supported by ExSQL"
end
defp column_change(table, {:remove, name, _type, _opts}) do
column_change(table, {:remove, name})
end
defp column_change(_table, {:remove, name}), do: ["DROP COLUMN ", quote_name(name)]
defp column_change(_table, _) do
raise ArgumentError, "Not supported by ExSQL"
end
defp column_options(table, type, opts) do
default = Keyword.fetch(opts, :default)
null = Keyword.get(opts, :null)
pk = table.primary_key != :composite and Keyword.get(opts, :primary_key, false)
collate = Keyword.get(opts, :collate)
check = Keyword.get(opts, :check)
[
default_expr(default),
null_expr(null),
collate_expr(collate),
check_expr(check),
pk_expr(pk, type)
]
end
defp check_expr(nil), do: []
defp check_expr(%{name: name, expr: expr}), do: [" CONSTRAINT ", name, " CHECK (", expr, ")"]
defp collate_expr(nil), do: []
defp collate_expr(type) when is_atom(type), do: type |> Atom.to_string() |> collate_expr()
defp collate_expr(type), do: [" COLLATE ", String.upcase(type)]
defp null_expr(false), do: " NOT NULL"
defp null_expr(true), do: " NULL"
defp null_expr(_), do: []
defp default_expr({:ok, nil}), do: " DEFAULT NULL"
defp default_expr({:ok, literal}) when is_binary(literal) do
[" DEFAULT '", escape_string(literal), ?']
end
defp default_expr({:ok, literal}) when is_number(literal) or is_boolean(literal) do
[" DEFAULT ", to_string(literal)]
end
defp default_expr({:ok, {:fragment, expression}}), do: [" DEFAULT ", expression]
defp default_expr({:ok, value}) when is_map(value) or is_list(value) do
expression = Jason.encode_to_iodata!(value)
[" DEFAULT ('", escape_string(IO.iodata_to_binary(expression)), "')"]
end
defp default_expr(:error), do: []
defp column_type(:id, _opts), do: "INTEGER"
defp column_type(:serial, _opts), do: "INTEGER"
defp column_type(:bigserial, _opts), do: "INTEGER"
defp column_type(:boolean, _opts), do: "INTEGER"
defp column_type(:integer, _opts), do: "INTEGER"
defp column_type(:bigint, _opts), do: "INTEGER"
defp column_type(:string, _opts), do: "TEXT"
defp column_type(:text, _opts), do: "TEXT"
defp column_type(:float, _opts), do: "NUMERIC"
defp column_type(:binary, _opts), do: "BLOB"
defp column_type(:date, _opts), do: "TEXT"
defp column_type(:utc_datetime, _opts), do: "TEXT"
defp column_type(:utc_datetime_usec, _opts), do: "TEXT"
defp column_type(:naive_datetime, _opts), do: "TEXT"
defp column_type(:naive_datetime_usec, _opts), do: "TEXT"
defp column_type(:time, _opts), do: "TEXT"
defp column_type(:time_usec, _opts), do: "TEXT"
defp column_type(:timestamp, _opts), do: "TEXT"
defp column_type(:binary_id, _opts), do: "TEXT"
defp column_type(:map, _opts), do: "TEXT"
defp column_type({:map, _}, _opts), do: "TEXT"
defp column_type(:array, _opts), do: "TEXT"
defp column_type({:array, _}, _opts), do: "TEXT"
defp column_type(:uuid, _opts), do: "TEXT"
defp column_type(:decimal, nil), do: "DECIMAL"
defp column_type(:decimal, opts) do
precision = Keyword.get(opts, :precision)
scale = Keyword.get(opts, :scale, 0)
if precision, do: "DECIMAL(#{precision},#{scale})", else: "DECIMAL"
end
defp column_type(type, _opts) when is_atom(type) do
type |> Atom.to_string() |> String.upcase()
end
defp column_type(type, _opts) when is_binary(type), do: type
defp column_type(type, _opts) do
raise ArgumentError, "unsupported type `#{inspect(type)}`"
end
defp index_expr(literal) when is_binary(literal), do: literal
defp index_expr({direction, literal}) when direction in [:asc, :desc] do
[quote_name(literal), ?\s, direction |> Atom.to_string() |> String.upcase()]
end
defp index_expr(literal), do: quote_name(literal)
defp pk_expr(true, type) when type in [:serial, :bigserial],
do: " PRIMARY KEY AUTOINCREMENT"
defp pk_expr(true, _), do: " PRIMARY KEY"
defp pk_expr(_, _), do: []
defp options_expr(nil), do: []
defp options_expr(options) when is_list(options) do
raise ArgumentError, "ExSQL adapter does not support keyword lists in :options"
end
defp options_expr(options), do: [?\s, to_string(options)]
defp reference_expr(%Reference{with: [_]}, _table, _name), do: []
defp reference_expr(%Reference{} = ref, table, name) do
[
" CONSTRAINT ",
reference_name(ref, table, name),
" REFERENCES ",
quote_table(ref.prefix || table.prefix, ref.table),
?(,
quote_name(ref.column),
?),
reference_on_delete(ref.on_delete),
reference_on_update(ref.on_update)
]
end
defp reference_name(%Reference{name: nil}, table, column) do
quote_name("#{table.name}_#{column}_fkey")
end
defp reference_name(%Reference{name: name}, _table, _column), do: quote_name(name)
defp reference_on_delete(:nilify_all), do: " ON DELETE SET NULL"
defp reference_on_delete(:default_all), do: " ON DELETE SET DEFAULT"
defp reference_on_delete(:delete_all), do: " ON DELETE CASCADE"
defp reference_on_delete(:restrict), do: " ON DELETE RESTRICT"
defp reference_on_delete(_), do: []
defp reference_on_update(:nilify_all), do: " ON UPDATE SET NULL"
defp reference_on_update(:default_all), do: " ON UPDATE SET DEFAULT"
defp reference_on_update(:update_all), do: " ON UPDATE CASCADE"
defp reference_on_update(:restrict), do: " ON UPDATE RESTRICT"
defp reference_on_update(_), do: []
defp composite_pk_definition(%Table{} = table, columns) do
pks =
Enum.reduce(columns, [], fn {_, name, _, opts}, pk_acc ->
if Keyword.get(opts, :primary_key, false), do: [name | pk_acc], else: pk_acc
end)
if length(pks) > 1 do
composite_pk_expr = pks |> Enum.reverse() |> Enum.map_join(",", "e_name/1)
{%{table | primary_key: :composite}, ", PRIMARY KEY (" <> composite_pk_expr <> ")"}
else
{table, ""}
end
end
defp composite_fk_definitions(%Table{} = table, columns) do
columns
|> Enum.filter(fn
{_op, _name, %Reference{with: [_]}, _opts} -> true
_ -> false
end)
|> Enum.map(&composite_fk_definition(table, &1))
end
defp composite_fk_definition(table, {_op, name, ref, _opts}) do
{current_columns, reference_columns} = Enum.unzip([{name, ref.column} | ref.with])
[
", FOREIGN KEY (",
quote_names(current_columns),
") REFERENCES ",
quote_table(ref.prefix || table.prefix, ref.table),
?(,
quote_names(reference_columns),
?),
reference_on_delete(ref.on_delete),
reference_on_update(ref.on_update)
]
end
defp quote_names(names), do: Enum.map_intersperse(names, ?,, "e_name/1)
defp qualified_names(source_index, names),
do: Enum.map_intersperse(names, ", ", &qualified_name(source_index, &1))
defp qualified_name(source_index, name), do: [source_alias(source_index), ".", quote_name(name)]
defp source_alias(index), do: quote_name("s#{index}")
defp quote_name(name), do: quote_entity(name)
defp quote_table(nil, name), do: quote_entity(name)
defp quote_table(false, name), do: quote_entity(name)
defp quote_table(_, name), do: quote_entity(name)
defp quote_entity(value) when is_atom(value), do: value |> Atom.to_string() |> quote_entity()
defp quote_entity(value), do: [[?", value, ?"]]
defp if_do(condition, value), do: if(condition, do: value, else: [])
defp escape_string(value) when is_binary(value) do
value
|> :binary.replace("'", "''", [:global])
|> :binary.replace("\\", "\\\\", [:global])
end
end