lib/selecto/set_operations.ex

defmodule Selecto.SetOperations do
  @moduledoc """
  Set operations for combining query results using UNION, INTERSECT, and EXCEPT.

  Set operations allow combining results from multiple Selecto queries using
  standard SQL set operations. All participating queries must have compatible
  column counts and types.

  ## Examples

      # Basic UNION - combine results from two queries
      query1 = Selecto.configure(users_domain, connection)
        |> Selecto.select(["name", "email"])
        |> Selecto.filter([{"active", true}])

      query2 = Selecto.configure(contacts_domain, connection)
        |> Selecto.select(["full_name", "email_address"])
        |> Selecto.filter([{"status", "active"}])

      combined = Selecto.union(query1, query2, all: true)

      # INTERSECT - find common records
      premium_active = Selecto.intersect(premium_users, active_users)

      # EXCEPT - find differences
      free_users = Selecto.except(all_users, premium_users)

      # Chained set operations
      result = query1
        |> Selecto.union(query2)
        |> Selecto.intersect(query3)
        |> Selecto.except(query4)
  """

  alias Selecto.SetOperations.{Spec, Validation}

  defmodule Spec do
    @moduledoc """
    Specification for a set operation between two or more queries.
    """
    defstruct [
      # Unique identifier for the set operation
      :id,
      # :union, :intersect, or :except
      :operation,
      # Left side query (Selecto struct)
      :left_query,
      # Right side query (Selecto struct)
      :right_query,
      # Operation options (all: true/false, etc.)
      :options,
      # Optional column mapping for incompatible schemas
      :column_mapping,
      # Boolean indicating if schemas have been validated
      :validated
    ]

    @type set_operation :: :union | :intersect | :except
    @type operation_options :: %{
            all: boolean(),
            column_mapping: [{String.t(), String.t()}] | nil
          }

    @type t :: %__MODULE__{
            id: String.t(),
            operation: set_operation(),
            left_query: Selecto.t(),
            right_query: Selecto.t(),
            options: operation_options(),
            column_mapping: [{String.t(), String.t()}] | nil,
            validated: boolean()
          }
  end

  defmodule Validation do
    @moduledoc """
    Schema validation for set operations between queries.
    """

    defmodule SchemaError do
      defexception [:type, :message, :query1_info, :query2_info]

      @type t :: %__MODULE__{
              type: :column_count_mismatch | :type_incompatibility | :mapping_error,
              message: String.t(),
              query1_info: map(),
              query2_info: map()
            }
    end

    @doc """
    Validate that two queries are compatible for set operations.

    Returns {:ok, validated_spec} or {:error, validation_error}.
    """
    def validate_compatibility(spec) do
      with {:ok, left_columns} <- extract_query_columns(spec.left_query),
           {:ok, right_columns} <- extract_query_columns(spec.right_query),
           :ok <- validate_column_count(left_columns, right_columns),
           :ok <- validate_column_types(left_columns, right_columns, spec.column_mapping) do
        {:ok, %{spec | validated: true}}
      else
        {:error, reason} -> {:error, reason}
      end
    end

    # Extract column information from a Selecto query
    defp extract_query_columns(selecto) do
      selected = Map.get(selecto.set, :selected, [])

      if Enum.empty?(selected) do
        {:error,
         %SchemaError{
           type: :validation_error,
           message: "Query has no selected columns",
           query1_info: %{selected: selected},
           query2_info: %{}
         }}
      else
        columns = Enum.map(selected, &normalize_column_info(selecto, &1))
        {:ok, columns}
      end
    end

    # Normalize column information for comparison
    defp normalize_column_info(selecto, column_spec) do
      case column_spec do
        column when is_binary(column) ->
          # Basic column name - resolve type from domain
          case Selecto.resolve_field(selecto, column) do
            {:ok, field_info} ->
              %{
                name: column,
                type: Map.get(field_info, :type, :unknown),
                source: :field
              }

            {:error, _} ->
              %{
                name: column,
                type: :unknown,
                source: :field
              }
          end

        {:as, expression, alias_name} ->
          # Aliased expression - try to infer type
          %{
            name: alias_name,
            type: infer_expression_type(expression),
            source: :expression
          }

        {:func, func_name, _args} ->
          # Function call - infer return type
          %{
            name: func_name,
            type: infer_function_return_type(func_name),
            source: :function
          }

        _ ->
          # Complex expression - treat as unknown type
          %{
            name: inspect(column_spec),
            type: :unknown,
            source: :complex
          }
      end
    end

    # Infer type from expression (simplified)
    defp infer_expression_type({:literal, value}) when is_binary(value), do: :string
    defp infer_expression_type({:literal, value}) when is_integer(value), do: :integer
    defp infer_expression_type({:literal, value}) when is_float(value), do: :decimal
    defp infer_expression_type({:literal, value}) when is_boolean(value), do: :boolean
    defp infer_expression_type(_), do: :unknown

    # Infer function return type (simplified mapping)
    defp infer_function_return_type("COUNT"), do: :integer
    defp infer_function_return_type("SUM"), do: :decimal
    defp infer_function_return_type("AVG"), do: :decimal
    # Depends on input
    defp infer_function_return_type("MIN"), do: :unknown
    # Depends on input
    defp infer_function_return_type("MAX"), do: :unknown
    defp infer_function_return_type("CONCAT"), do: :string
    defp infer_function_return_type(_), do: :unknown

    # Validate column counts match
    defp validate_column_count(left_columns, right_columns) do
      left_count = length(left_columns)
      right_count = length(right_columns)

      if left_count == right_count do
        :ok
      else
        {:error,
         %SchemaError{
           type: :column_count_mismatch,
           message: "Query 1 has #{left_count} columns, Query 2 has #{right_count} columns",
           query1_info: %{column_count: left_count, columns: Enum.map(left_columns, & &1.name)},
           query2_info: %{column_count: right_count, columns: Enum.map(right_columns, & &1.name)}
         }}
      end
    end

    # Validate column types are compatible
    defp validate_column_types(left_columns, right_columns, column_mapping) do
      paired_columns = apply_column_mapping(left_columns, right_columns, column_mapping)

      incompatible =
        paired_columns
        |> Enum.with_index()
        |> Enum.filter(fn {{left_col, right_col}, _index} ->
          not types_compatible?(left_col.type, right_col.type)
        end)

      if Enum.empty?(incompatible) do
        :ok
      else
        # Show first incompatible pair
        [{_columns, index}] = Enum.take(incompatible, 1)
        {{left_col, right_col}, _} = Enum.at(paired_columns, index)

        {:error,
         %SchemaError{
           type: :type_incompatibility,
           message:
             "Column #{index + 1}: '#{left_col.name}' (#{left_col.type}) incompatible with '#{right_col.name}' (#{right_col.type})",
           query1_info: %{column: left_col.name, type: left_col.type},
           query2_info: %{column: right_col.name, type: right_col.type}
         }}
      end
    end

    # Apply column mapping if provided, otherwise pair by position
    defp apply_column_mapping(left_columns, right_columns, nil) do
      # No explicit mapping - try intelligent name-based matching first
      case try_name_based_mapping(left_columns, right_columns) do
        {:ok, paired} -> paired
        :fallback -> Enum.zip(left_columns, right_columns)
      end
    end

    defp apply_column_mapping(left_columns, right_columns, mapping) when is_list(mapping) do
      # Explicit column mapping provided: [{left_name, right_name}, ...]
      # Build a lookup map from left column names to right column names
      mapping_lookup =
        Map.new(mapping, fn {left_name, right_name} ->
          {normalize_name(left_name), normalize_name(right_name)}
        end)

      # Create paired columns based on the mapping
      Enum.map(left_columns, fn left_col ->
        left_name = normalize_name(left_col.name)

        case Map.get(mapping_lookup, left_name) do
          nil ->
            # No explicit mapping - try to find by same name or position
            find_matching_right_column(left_col, right_columns)

          right_name ->
            # Find the right column with the mapped name
            right_col =
              Enum.find(right_columns, fn rc ->
                normalize_name(rc.name) == right_name
              end)

            if right_col do
              {left_col, right_col}
            else
              # Mapping target not found - fall back to position
              find_matching_right_column(left_col, right_columns)
            end
        end
      end)
    end

    defp apply_column_mapping(left_columns, right_columns, mapping) when is_map(mapping) do
      # Convert map to list format and recurse
      apply_column_mapping(left_columns, right_columns, Map.to_list(mapping))
    end

    # Try to match columns by name (exact or similar)
    defp try_name_based_mapping(left_columns, right_columns) do
      # Build a lookup of normalized right column names
      right_lookup =
        Map.new(right_columns, fn col ->
          {normalize_name(col.name), col}
        end)

      # Also create a list of alternative name mappings (common patterns)
      alternatives = build_alternative_names(right_columns)

      # Try to pair each left column with a right column by name
      paired =
        Enum.map(left_columns, fn left_col ->
          left_name = normalize_name(left_col.name)

          cond do
            # Exact name match
            Map.has_key?(right_lookup, left_name) ->
              {left_col, Map.get(right_lookup, left_name)}

            # Alternative name match (e.g., full_name -> name, email_address -> email)
            Map.has_key?(alternatives, left_name) ->
              {left_col, Map.get(alternatives, left_name)}

            true ->
              nil
          end
        end)

      # Check if we matched all columns
      if Enum.all?(paired, & &1) do
        {:ok, paired}
      else
        :fallback
      end
    end

    # Build alternative name mappings for common patterns
    defp build_alternative_names(columns) do
      columns
      |> Enum.flat_map(fn col ->
        name = normalize_name(col.name)
        alternatives = generate_name_alternatives(name)
        Enum.map(alternatives, fn alt -> {alt, col} end)
      end)
      |> Map.new()
    end

    # Generate alternative names for a column
    defp generate_name_alternatives(name) do
      base_alternatives = [
        # Remove common prefixes
        String.replace_prefix(name, "full_", ""),
        String.replace_prefix(name, "company_", ""),
        String.replace_prefix(name, "contact_", ""),
        String.replace_prefix(name, "user_", ""),
        String.replace_prefix(name, "customer_", ""),
        # Remove common suffixes
        String.replace_suffix(name, "_address", ""),
        String.replace_suffix(name, "_name", ""),
        String.replace_suffix(name, "_id", ""),
        # Common substitutions
        name |> String.replace("email_address", "email"),
        name |> String.replace("phone_number", "phone"),
        name |> String.replace("full_name", "name"),
        name |> String.replace("company_name", "name"),
        name |> String.replace("product_name", "name")
      ]

      # Filter out unchanged names and empty strings
      base_alternatives
      |> Enum.reject(fn alt -> alt == name or alt == "" end)
      |> Enum.uniq()
    end

    # Normalize column name for comparison
    defp normalize_name(name) when is_atom(name), do: Atom.to_string(name) |> normalize_name()

    defp normalize_name(name) when is_binary(name) do
      name
      |> String.downcase()
      |> String.replace(~r/[^a-z0-9_]/, "_")
      |> String.trim("_")
    end

    # Find a matching right column for a left column
    defp find_matching_right_column(left_col, right_columns) do
      left_name = normalize_name(left_col.name)

      # Try exact match first
      case Enum.find(right_columns, fn rc -> normalize_name(rc.name) == left_name end) do
        nil ->
          # Fall back to position-based (use the first unmatched column)
          # This handles cases where columns are in different orders
          index =
            Enum.find_index(right_columns, fn rc ->
              normalize_name(rc.name) == left_name
            end) || 0

          {left_col, Enum.at(right_columns, index, List.first(right_columns))}

        right_col ->
          {left_col, right_col}
      end
    end

    # Check if two types are compatible for set operations
    defp types_compatible?(:unknown, _), do: true
    defp types_compatible?(_, :unknown), do: true
    defp types_compatible?(type, type), do: true

    # String-like types
    defp types_compatible?(type1, type2)
         when type1 in [:string, :text] and type2 in [:string, :text],
         do: true

    # Numeric types
    defp types_compatible?(type1, type2)
         when type1 in [:integer, :decimal, :float] and type2 in [:integer, :decimal, :float],
         do: true

    # Date/time types
    defp types_compatible?(type1, type2)
         when type1 in [:date, :utc_datetime, :naive_datetime] and
                type2 in [:date, :utc_datetime, :naive_datetime],
         do: true

    # Default: incompatible
    defp types_compatible?(_, _), do: false
  end

  @doc """
  Create a UNION set operation between two queries.

  ## Options

  - `:all` - Use UNION ALL to include duplicates (default: false)
  - `:column_mapping` - Map columns between incompatible schemas

  ## Examples

      # Basic UNION (removes duplicates)
      Selecto.union(query1, query2)

      # UNION ALL (includes duplicates, faster)
      Selecto.union(query1, query2, all: true)

      # UNION with column mapping
      Selecto.union(customers, vendors,
        column_mapping: [
          {"name", "company_name"},
          {"email", "contact_email"}
        ]
      )
  """
  def union(left_query, right_query, opts \\ []) do
    create_set_operation(:union, left_query, right_query, opts)
  end

  @doc """
  Create an INTERSECT set operation between two queries.

  Returns only rows that appear in both queries.

  ## Options

  - `:all` - Use INTERSECT ALL to include duplicate intersections (default: false)
  - `:column_mapping` - Map columns between incompatible schemas
  """
  def intersect(left_query, right_query, opts \\ []) do
    create_set_operation(:intersect, left_query, right_query, opts)
  end

  @doc """
  Create an EXCEPT set operation between two queries.

  Returns rows from the first query that don't appear in the second query.

  ## Options

  - `:all` - Use EXCEPT ALL to include duplicates in difference (default: false)
  - `:column_mapping` - Map columns between incompatible schemas
  """
  def except(left_query, right_query, opts \\ []) do
    create_set_operation(:except, left_query, right_query, opts)
  end

  # Create a set operation specification
  defp create_set_operation(operation, left_query, right_query, opts) do
    options = %{
      all: Keyword.get(opts, :all, false),
      column_mapping: Keyword.get(opts, :column_mapping)
    }

    spec = %Spec{
      id: generate_operation_id(operation, left_query, right_query),
      operation: operation,
      left_query: left_query,
      right_query: right_query,
      options: options,
      column_mapping: options.column_mapping,
      validated: false
    }

    # Validate schema compatibility
    case Validation.validate_compatibility(spec) do
      {:ok, validated_spec} ->
        # Create a new Selecto struct with the set operation
        create_set_operation_selecto(validated_spec)

      {:error, validation_error} ->
        raise validation_error
    end
  end

  # Generate unique ID for set operation
  defp generate_operation_id(operation, left_query, right_query) do
    left_id = inspect(left_query.domain) |> String.slice(0, 8)
    right_id = inspect(right_query.domain) |> String.slice(0, 8)
    unique = :erlang.unique_integer([:positive])

    "#{operation}_#{left_id}_#{right_id}_#{unique}"
  end

  # Create a new Selecto struct representing the set operation
  defp create_set_operation_selecto(spec) do
    # Use the left query as the base for the new struct
    # Set operations inherit the connection and basic configuration from the left side
    sanitized_set =
      spec.left_query.set
      |> Map.put(:order_by, [])
      |> Map.delete(:limit)
      |> Map.delete(:offset)

    base_selecto = %{spec.left_query | set: sanitized_set}

    # Add set operation to the query set
    current_set_ops = Map.get(base_selecto.set, :set_operations, [])
    updated_set_ops = current_set_ops ++ [spec]

    put_in(base_selecto.set[:set_operations], updated_set_ops)
  end
end