defmodule QuackDB.DDL do
import QuackDB.SQL.Fragment, only: [column: 1, table: 1]
@moduledoc """
Small DuckDB DDL SQL builders.
These helpers return SQL iodata for common analytical setup tasks such as
temporary tables in tests or notebooks. They are not an Ecto migration layer;
execute the generated SQL with `QuackDB.query/4` or `Repo.query/3`.
QuackDB.DDL.create_table("events",
[
id: :integer,
name: :varchar,
payload: :json,
occurred_at: :timestamp
],
temporary: true,
if_not_exists: true
)
"""
@type column_type :: QuackDB.Type.spec()
@type column ::
{atom() | String.t(), column_type() | keyword()}
| {atom() | String.t(), column_type(), keyword()}
@type create_table_option ::
{:temporary, boolean()}
| {:if_not_exists, boolean()}
| {:or_replace, boolean()}
| {:as, iodata()}
@doc "Builds a `CREATE TABLE` statement from an Ecto schema module."
@spec create_table(module()) :: iodata()
def create_table(schema) when is_atom(schema), do: create_table(schema, [])
@spec create_table(module() | String.t() | atom(), [create_table_option()] | [column()]) ::
iodata()
def create_table(schema_or_name, options_or_columns) when is_list(options_or_columns) do
cond do
Keyword.has_key?(options_or_columns, :as) ->
create_table_as_options(schema_or_name, options_or_columns)
is_atom(schema_or_name) and function_exported?(schema_or_name, :__schema__, 1) ->
create_table(
schema_or_name.__schema__(:source),
schema_columns(schema_or_name),
options_or_columns
)
true ->
create_table(schema_or_name, options_or_columns, [])
end
end
@doc """
Builds a `CREATE TABLE` statement.
QuackDB.DDL.create_table("events", id: :integer, name: :varchar)
QuackDB.DDL.create_table("events", [id: :integer], temporary: true)
QuackDB.DDL.create_table("temp_events", EventSchema, temporary: true)
Pass `:as` to build `CREATE TABLE AS` from iodata or an Ecto query without pinned params:
QuackDB.DDL.create_table("docs", as: query, temporary: true)
QuackDB.DDL.create_table("docs", as: query, or_replace: true)
"""
@spec create_table(String.t() | atom(), module() | [column()], [create_table_option()]) ::
iodata()
def create_table(name, schema_or_columns, options \\ [])
def create_table(name, schema, options) when is_atom(schema) and is_list(options) do
if function_exported?(schema, :__schema__, 1) do
create_table(name, schema_columns(schema), options)
else
create_table(name, [{schema, options}], [])
end
end
def create_table(name, columns, options) when is_list(columns) and is_list(options) do
assert_create_options!(options)
[
"CREATE ",
or_replace(options),
temporary(options),
"TABLE ",
if_not_exists(options),
table(name),
" (",
columns(columns),
")"
]
end
defp create_table_as(name, query, options) when is_list(options) do
assert_create_options!(options)
[
"CREATE ",
or_replace(options),
temporary(options),
"TABLE ",
if_not_exists(options),
table(name),
" AS ",
table_query(query)
]
end
defp create_table_as_options(name, options) do
{query, options} = Keyword.pop!(options, :as)
create_table_as(name, query, options)
end
@doc "Builds a `DROP TABLE` statement."
@spec drop_table(String.t() | atom(), keyword()) :: iodata()
def drop_table(name, options \\ []) when is_list(options) do
["DROP TABLE ", if_exists(options), table(name)]
end
defp table_query(%{__struct__: Ecto.Query} = query) do
assert_unparameterized_query!(query)
apply(Ecto.Adapters.QuackDB.Query, :all_literal, [query])
end
defp table_query(query), do: query
defp assert_unparameterized_query!(query) do
if parameterized_query?(query) do
raise ArgumentError,
"QuackDB.DDL.create_table/2 with :as does not support parameterized Ecto queries; use literal query expressions or materialize with Repo.all/query first"
end
end
defp parameterized_query?(%{__struct__: Ecto.Query} = query) do
query
|> Map.take([
:wheres,
:havings,
:order_bys,
:group_bys,
:combinations,
:select,
:joins,
:limit,
:offset
])
|> parameterized_query?()
end
defp parameterized_query?(%{params: [_ | _]}), do: true
defp parameterized_query?(%struct{} = value) when is_atom(struct) do
value
|> Map.from_struct()
|> parameterized_query?()
end
defp parameterized_query?(value) when is_map(value) do
Enum.any?(value, fn {_key, field} -> parameterized_query?(field) end)
end
defp parameterized_query?(value) when is_list(value),
do: Enum.any?(value, ¶meterized_query?/1)
defp parameterized_query?(_value), do: false
defp schema_columns(schema) do
Enum.map(schema.__schema__(:fields), fn field ->
{field, schema_field_type!(schema, field)}
end)
end
defp schema_field_type!(schema, field) do
schema.__schema__(:type, field)
|> then(&apply(QuackDB.Ecto.Type, :column_type!, [&1, :schema]))
rescue
error in ArgumentError ->
raise ArgumentError,
"unsupported Ecto schema type for #{inspect(schema)}.#{field}: #{Exception.message(error)}"
end
defp assert_create_options!(options) do
if Keyword.get(options, :or_replace, false) and Keyword.get(options, :if_not_exists, false) do
raise ArgumentError,
"create_table options :or_replace and :if_not_exists cannot be used together"
end
end
defp or_replace(options) do
if Keyword.get(options, :or_replace, false), do: "OR REPLACE ", else: []
end
defp temporary(options) do
if Keyword.get(options, :temporary, false), do: "TEMP ", else: []
end
defp if_not_exists(options) do
if Keyword.get(options, :if_not_exists, false), do: "IF NOT EXISTS ", else: []
end
defp if_exists(options) do
if Keyword.get(options, :if_exists, false), do: "IF EXISTS ", else: []
end
defp columns([]), do: raise(ArgumentError, "expected at least one column")
defp columns(columns) do
columns
|> Enum.map(&ddl_column/1)
|> Enum.intersperse(", ")
end
defp ddl_column({name, type}) do
[column(name), " ", QuackDB.Type.to_sql(type)]
end
defp ddl_column({name, type, options}) when is_list(options) do
[
column(name),
" ",
QuackDB.Type.to_sql(type),
column_options(options)
]
end
defp ddl_column(other) do
raise ArgumentError, "expected column as {name, type}, got: #{inspect(other)}"
end
defp column_options(options) do
[
nullable(options),
primary_key(options),
default(options)
]
end
defp nullable(options) do
if Keyword.get(options, :null, true), do: [], else: " NOT NULL"
end
defp primary_key(options) do
if Keyword.get(options, :primary_key, false), do: " PRIMARY KEY", else: []
end
defp default(options) do
case Keyword.fetch(options, :default) do
{:ok, value} -> [" DEFAULT ", literal!(value)]
:error -> []
end
end
defp literal!(value) do
case QuackDB.SQL.literal(value) do
{:ok, literal} -> literal
{:error, %QuackDB.Error{} = error} -> raise error
end
end
end