Skip to main content

lib/quack_db/dml.ex

defmodule QuackDB.DML do
  import QuackDB.SQL.Fragment

  @moduledoc """
  Small DuckDB DML SQL builders.

  These helpers return SQL iodata for setup and example insert statements while
  still allowing DuckDB expressions where needed. For large batches, prefer
  `QuackDB.insert_rows/4`, `QuackDB.insert_columns/4`, or
  `QuackDB.Explorer.insert_dataframe/4`.
  """

  @type value :: QuackDB.SQL.parameter() | {:expr, iodata()}
  @type row :: keyword(value()) | %{(atom() | String.t()) => value()}
  @type table :: atom() | String.t() | {atom() | String.t() | nil, atom() | String.t()}
  @type where :: keyword(value())
  @type merge_insert :: {:insert, [atom() | String.t()]}

  @doc """
  Builds a parameterized `DELETE FROM ... WHERE ...` statement.

      {sql, params} =
        QuackDB.DML.delete_from(:events,
          where: [event_type: "session_entry", session_file: session_file]
        )

      QuackDB.query!(conn, sql, params)

  `nil` values generate `IS NULL` predicates. `{:expr, sql}` values are emitted
  directly for cases where a DuckDB expression is required.
  """
  @spec delete_from(table(), keyword()) :: {iodata(), [QuackDB.SQL.parameter()]}
  def delete_from(table, options) when is_list(options) do
    where = Keyword.get(options, :where, :missing)
    {predicates, params} = delete_predicates!(where)

    {
      [
        "DELETE FROM ",
        table(table),
        " WHERE ",
        Enum.intersperse(predicates, " AND ")
      ],
      params
    }
  end

  @doc "Builds an `INSERT INTO ... VALUES ...` statement."
  @spec insert_into(String.t() | atom(), [row()] | row()) :: iodata()
  def insert_into(table, rows) when is_list(rows) do
    rows = normalize_rows(rows)
    columns = columns!(rows)

    [
      "INSERT INTO ",
      table(table),
      " (",
      column_list(columns),
      ") VALUES ",
      rows |> Enum.map(&row_values(&1, columns)) |> Enum.intersperse(", ")
    ]
  end

  def insert_into(table, row) when is_map(row), do: insert_into(table, [row])

  @doc "Builds an `INSERT INTO ... SELECT ... FROM ...` statement."
  @spec insert_into_select(
          table(),
          [atom() | String.t()],
          table(),
          [atom() | String.t()],
          keyword()
        ) ::
          iodata()
  def insert_into_select(table, columns, source, select_columns, options \\ [])
      when is_list(columns) and is_list(select_columns) and is_list(options) do
    [
      "INSERT INTO ",
      table(table),
      insert_columns(columns),
      " SELECT ",
      select_columns(select_columns),
      " FROM ",
      table(source),
      on_conflict(Keyword.get(options, :on_conflict, :raise)),
      returning(Keyword.get(options, :returning, []))
    ]
  end

  @doc "Builds a `MERGE INTO ... USING ...` statement."
  @spec merge_into(table(), keyword()) :: iodata()
  def merge_into(target, options) when is_list(options) do
    source = Keyword.fetch!(options, :using)
    target_alias = Keyword.get(options, :target_as, :target)
    source_alias = Keyword.get(options, :source_as, :source)
    on = Keyword.fetch!(options, :on)
    not_matched = Keyword.get(options, :when_not_matched, :nothing)

    [
      "MERGE INTO ",
      table(target),
      " AS ",
      alias_name(target_alias),
      " USING ",
      table(source),
      " AS ",
      alias_name(source_alias),
      " ON ",
      merge_on(on, target_alias, source_alias),
      merge_not_matched(not_matched, source_alias),
      returning(Keyword.get(options, :returning, []))
    ]
  end

  defp delete_predicates!(:missing) do
    raise ArgumentError, "expected delete where: to be a non-empty keyword list"
  end

  defp delete_predicates!([]) do
    raise ArgumentError, "expected delete where: to include at least one predicate"
  end

  defp delete_predicates!(where) when is_list(where) do
    unless Keyword.keyword?(where) do
      raise ArgumentError, "expected delete where: to be a keyword list, got: #{inspect(where)}"
    end

    where
    |> Enum.map(&delete_predicate/1)
    |> Enum.unzip()
    |> then(fn {predicates, params} -> {predicates, :lists.append(params)} end)
  end

  defp delete_predicates!(where) do
    raise ArgumentError, "expected delete where: to be a keyword list, got: #{inspect(where)}"
  end

  defp delete_predicate({column, nil}) do
    {[column(column), " IS NULL"], []}
  end

  defp delete_predicate({column, {:expr, expression}}) do
    {[column(column), " = ", expression], []}
  end

  defp delete_predicate({column, value}) do
    {[column(column), " = ?"], [value]}
  end

  defp normalize_rows([]), do: []

  defp normalize_rows([{key, _value} | _rest] = row) when is_atom(key) or is_binary(key),
    do: [row]

  defp normalize_rows(rows), do: rows

  defp columns!([]), do: raise(ArgumentError, "expected at least one insert row")

  defp columns!([row | rows]) when is_list(row) do
    columns = Keyword.keys(row)
    validate_columns!(columns, rows)
  end

  defp columns!([row | rows]) when is_map(row) do
    columns = Map.keys(row)
    validate_columns!(columns, rows)
  end

  defp validate_columns!([], _rows),
    do: raise(ArgumentError, "expected at least one insert column")

  defp validate_columns!(columns, rows) do
    Enum.each(rows, fn row ->
      row_columns = row_columns(row)

      if row_columns != columns do
        raise ArgumentError,
              "insert rows must have identical columns, expected #{inspect(columns)}, got #{inspect(row_columns)}"
      end
    end)

    columns
  end

  defp row_columns(row) when is_list(row), do: Keyword.keys(row)
  defp row_columns(row) when is_map(row), do: Map.keys(row)

  defp row_values(row, columns) do
    ["(", columns |> Enum.map(&value(row, &1)) |> Enum.intersperse(", "), ")"]
  end

  defp merge_on(on, target_alias, source_alias) when is_list(on) and on != [] do
    on
    |> Enum.map(fn
      {column, column} ->
        merge_equality(target_alias, column, source_alias, column)

      {target_column, source_column} ->
        merge_equality(target_alias, target_column, source_alias, source_column)

      column ->
        merge_equality(target_alias, column, source_alias, column)
    end)
    |> Enum.intersperse(" AND ")
  end

  defp merge_on(on, _target_alias, _source_alias) do
    raise ArgumentError, "expected MERGE :on to include at least one column, got: #{inspect(on)}"
  end

  defp merge_equality(target_alias, target_column, source_alias, source_column) do
    qualified_equality(target_alias, target_column, source_alias, source_column)
  end

  defp merge_not_matched(:nothing, _source_alias), do: []

  defp merge_not_matched({:insert, columns}, source_alias)
       when is_list(columns) and columns != [] do
    [
      " WHEN NOT MATCHED THEN INSERT",
      insert_columns(columns),
      " VALUES (",
      merge_insert_values(columns, source_alias),
      ")"
    ]
  end

  defp merge_not_matched(other, _source_alias) do
    raise ArgumentError, "unsupported MERGE when_not_matched option: #{inspect(other)}"
  end

  defp merge_insert_values(columns, source_alias) do
    qualified_column_list(columns, source_alias)
  end

  defp value(row, column) do
    row
    |> fetch_value!(column)
    |> sql_value()
  end

  defp fetch_value!(row, column) when is_list(row), do: Keyword.fetch!(row, column)

  defp fetch_value!(row, column) when is_map(row) do
    case Map.fetch(row, column) do
      {:ok, value} -> value
      :error -> Map.fetch!(row, to_string(column))
    end
  end

  defp sql_value({:expr, value}), do: value

  defp sql_value(value) do
    case QuackDB.SQL.literal(value) do
      {:ok, literal} -> literal
      {:error, %QuackDB.Error{} = error} -> raise error
    end
  end
end