Skip to main content

lib/quack_db/ddl.ex

defmodule QuackDB.DDL do
  import QuackDB.SQL.Fragment, only: [column: 1, table: 1]

  @moduledoc """
  Small DuckDB DDL SQL builders.

  These helpers return SQL iodata for common analytical setup tasks such as
  temporary tables in tests or notebooks. They are not an Ecto migration layer;
  execute the generated SQL with `QuackDB.query/4` or `Repo.query/3`.

      QuackDB.DDL.create_table("events",
        [
          id: :integer,
          name: :varchar,
          payload: :json,
          occurred_at: :timestamp
        ],
        temporary: true,
        if_not_exists: true
      )

  """

  @type column_type :: QuackDB.Type.spec()
  @type column ::
          {atom() | String.t(), column_type() | keyword()}
          | {atom() | String.t(), column_type(), keyword()}

  @type create_table_option ::
          {:temporary, boolean()}
          | {:if_not_exists, boolean()}
          | {:or_replace, boolean()}
          | {:as, iodata()}

  @doc "Builds a `CREATE TABLE` statement from an Ecto schema module."
  @spec create_table(module()) :: iodata()
  def create_table(schema) when is_atom(schema), do: create_table(schema, [])

  @spec create_table(module() | String.t() | atom(), [create_table_option()] | [column()]) ::
          iodata()
  def create_table(schema_or_name, options_or_columns) when is_list(options_or_columns) do
    cond do
      Keyword.has_key?(options_or_columns, :as) ->
        create_table_as_options(schema_or_name, options_or_columns)

      is_atom(schema_or_name) and function_exported?(schema_or_name, :__schema__, 1) ->
        create_table(
          schema_or_name.__schema__(:source),
          schema_columns(schema_or_name),
          options_or_columns
        )

      true ->
        create_table(schema_or_name, options_or_columns, [])
    end
  end

  @doc """
  Builds a `CREATE TABLE` statement.

      QuackDB.DDL.create_table("events", id: :integer, name: :varchar)
      QuackDB.DDL.create_table("events", [id: :integer], temporary: true)
      QuackDB.DDL.create_table("temp_events", EventSchema, temporary: true)

  Pass `:as` to build `CREATE TABLE AS` from iodata or an Ecto query without pinned params:

      QuackDB.DDL.create_table("docs", as: query, temporary: true)
      QuackDB.DDL.create_table("docs", as: query, or_replace: true)
  """
  @spec create_table(String.t() | atom(), module() | [column()], [create_table_option()]) ::
          iodata()
  def create_table(name, schema_or_columns, options \\ [])

  def create_table(name, schema, options) when is_atom(schema) and is_list(options) do
    if function_exported?(schema, :__schema__, 1) do
      create_table(name, schema_columns(schema), options)
    else
      create_table(name, [{schema, options}], [])
    end
  end

  def create_table(name, columns, options) when is_list(columns) and is_list(options) do
    assert_create_options!(options)

    [
      "CREATE ",
      or_replace(options),
      temporary(options),
      "TABLE ",
      if_not_exists(options),
      table(name),
      " (",
      columns(columns),
      ")"
    ]
  end

  defp create_table_as(name, query, options) when is_list(options) do
    assert_create_options!(options)

    [
      "CREATE ",
      or_replace(options),
      temporary(options),
      "TABLE ",
      if_not_exists(options),
      table(name),
      " AS ",
      table_query(query)
    ]
  end

  defp create_table_as_options(name, options) do
    {query, options} = Keyword.pop!(options, :as)
    create_table_as(name, query, options)
  end

  @doc "Builds a `DROP TABLE` statement."
  @spec drop_table(String.t() | atom(), keyword()) :: iodata()
  def drop_table(name, options \\ []) when is_list(options) do
    ["DROP TABLE ", if_exists(options), table(name)]
  end

  defp table_query(%{__struct__: Ecto.Query} = query) do
    assert_unparameterized_query!(query)
    apply(Ecto.Adapters.QuackDB.Query, :all_literal, [query])
  end

  defp table_query(query), do: query

  defp assert_unparameterized_query!(query) do
    if parameterized_query?(query) do
      raise ArgumentError,
            "QuackDB.DDL.create_table/2 with :as does not support parameterized Ecto queries; use literal query expressions or materialize with Repo.all/query first"
    end
  end

  defp parameterized_query?(%{__struct__: Ecto.Query} = query) do
    query
    |> Map.take([
      :wheres,
      :havings,
      :order_bys,
      :group_bys,
      :combinations,
      :select,
      :joins,
      :limit,
      :offset
    ])
    |> parameterized_query?()
  end

  defp parameterized_query?(%{params: [_ | _]}), do: true

  defp parameterized_query?(%struct{} = value) when is_atom(struct) do
    value
    |> Map.from_struct()
    |> parameterized_query?()
  end

  defp parameterized_query?(value) when is_map(value) do
    Enum.any?(value, fn {_key, field} -> parameterized_query?(field) end)
  end

  defp parameterized_query?(value) when is_list(value),
    do: Enum.any?(value, &parameterized_query?/1)

  defp parameterized_query?(_value), do: false

  defp schema_columns(schema) do
    Enum.map(schema.__schema__(:fields), fn field ->
      {field, schema_field_type!(schema, field)}
    end)
  end

  defp schema_field_type!(schema, field) do
    schema.__schema__(:type, field)
    |> then(&apply(QuackDB.Ecto.Type, :column_type!, [&1, :schema]))
  rescue
    error in ArgumentError ->
      raise ArgumentError,
            "unsupported Ecto schema type for #{inspect(schema)}.#{field}: #{Exception.message(error)}"
  end

  defp assert_create_options!(options) do
    if Keyword.get(options, :or_replace, false) and Keyword.get(options, :if_not_exists, false) do
      raise ArgumentError,
            "create_table options :or_replace and :if_not_exists cannot be used together"
    end
  end

  defp or_replace(options) do
    if Keyword.get(options, :or_replace, false), do: "OR REPLACE ", else: []
  end

  defp temporary(options) do
    if Keyword.get(options, :temporary, false), do: "TEMP ", else: []
  end

  defp if_not_exists(options) do
    if Keyword.get(options, :if_not_exists, false), do: "IF NOT EXISTS ", else: []
  end

  defp if_exists(options) do
    if Keyword.get(options, :if_exists, false), do: "IF EXISTS ", else: []
  end

  defp columns([]), do: raise(ArgumentError, "expected at least one column")

  defp columns(columns) do
    columns
    |> Enum.map(&ddl_column/1)
    |> Enum.intersperse(", ")
  end

  defp ddl_column({name, type}) do
    [column(name), " ", QuackDB.Type.to_sql(type)]
  end

  defp ddl_column({name, type, options}) when is_list(options) do
    [
      column(name),
      " ",
      QuackDB.Type.to_sql(type),
      column_options(options)
    ]
  end

  defp ddl_column(other) do
    raise ArgumentError, "expected column as {name, type}, got: #{inspect(other)}"
  end

  defp column_options(options) do
    [
      nullable(options),
      primary_key(options),
      default(options)
    ]
  end

  defp nullable(options) do
    if Keyword.get(options, :null, true), do: [], else: " NOT NULL"
  end

  defp primary_key(options) do
    if Keyword.get(options, :primary_key, false), do: " PRIMARY KEY", else: []
  end

  defp default(options) do
    case Keyword.fetch(options, :default) do
      {:ok, value} -> [" DEFAULT ", literal!(value)]
      :error -> []
    end
  end

  defp literal!(value) do
    case QuackDB.SQL.literal(value) do
      {:ok, literal} -> literal
      {:error, %QuackDB.Error{} = error} -> raise error
    end
  end
end