defmodule Selecto do
@derive {Inspect, only: [:postgrex_opts, :adapter, :connection, :set, :tenant]}
defstruct [:postgrex_opts, :adapter, :connection, :domain, :config, :set, :extensions, :tenant]
# import Selecto.Types - removed to avoid circular dependency
@type t :: Selecto.Types.t()
@named_query_member_kinds [:ctes, :values, :subqueries, :laterals, :unnests]
@named_query_member_key_map %{
"name" => :name,
"type" => :type,
"query" => :query,
"query_builder" => :query_builder,
"base_query" => :base_query,
"recursive_query" => :recursive_query,
"columns" => :columns,
"dependencies" => :dependencies,
"join" => :join,
"rows" => :rows,
"data" => :data,
"as" => :as,
"alias" => :alias,
"alias_name" => :alias_name,
"join_id" => :join_id,
"on" => :on,
"kind" => :kind,
"source" => :source,
"lateral_source" => :lateral_source,
"join_type" => :join_type,
"field" => :field,
"array_field" => :array_field,
"ordinality" => :ordinality,
"options" => :options,
"max_depth" => :max_depth,
"cycle_detection" => :cycle_detection
}
@moduledoc """
Selecto is a query builder for Elixir that uses Postgrex to execute queries.
It is designed to be a flexible and powerful tool for building complex SQL queries
without writing SQL by hand.
## Domain Configuration
Selecto is configured using a domain map. This map defines the database schema,
including tables, columns, and associations. Here is an example of a domain map:
%{
source: %{
source_table: "users",
primary_key: :id,
fields: [:id, :name, :email, :age, :active, :created_at, :updated_at],
redact_fields: [],
columns: %{
id: %{type: :integer},
name: %{type: :string},
email: %{type: :string},
age: %{type: :integer},
active: %{type: :boolean},
created_at: %{type: :utc_datetime},
updated_at: %{type: :utc_datetime}
},
associations: %{
posts: %{
queryable: :posts,
field: :posts,
owner_key: :id,
related_key: :user_id
}
}
},
schemas: %{
posts: %{
source_table: "posts",
primary_key: :id,
fields: [:id, :title, :body, :user_id, :created_at, :updated_at],
redact_fields: [],
columns: %{
id: %{type: :integer},
title: %{type: :string},
body: %{type: :string},
user_id: %{type: :integer},
created_at: %{type: :utc_datetime},
updated_at: %{type: :utc_datetime}
},
associations: %{
tags: %{
queryable: :post_tags,
field: :tags,
owner_key: :id,
related_key: :post_id
}
}
},
post_tags: %{
source_table: "post_tags",
primary_key: :id,
fields: [:id, :name, :post_id],
redact_fields: [],
columns: %{
id: %{type: :integer},
name: %{type: :string},
post_id: %{type: :integer}
}
}
},
name: "User",
default_selected: ["name", "email"],
default_aggregate: [{"id", %{"format" => "count"}}],
required_filters: [{"active", true}],
joins: %{
posts: %{
type: :left,
name: "posts",
parameters: [
{:tag, :name}
],
joins: %{
tags: %{
type: :left,
name: "tags"
}
}
}
},
filters: %{
"active" => %{
name: "Active",
type: "boolean",
default: true
}
}
}
## Query Execution
Selecto provides two execution patterns for better error handling and control flow:
### Safe Execution (Non-raising)
Use `execute/2` and `execute_one/2` for applications that prefer explicit error handling:
# Multiple rows
case Selecto.execute(selecto) do
{:ok, {rows, columns, aliases}} ->
# Process successful results
Enum.map(rows, &process_row/1)
{:error, %Selecto.Error{} = error} ->
# Handle database errors gracefully
Logger.error("Query failed: \#{inspect(error)}")
{:error, :database_error}
end
# Single row (useful for COUNT, aggregate queries, or lookups)
case Selecto.execute_one(selecto) do
{:ok, {row, aliases}} ->
# Process single row
extract_values(row, aliases)
{:error, :no_results} ->
# Handle empty result set
{:error, :not_found}
{:error, :multiple_results} ->
# Handle unexpected multiple rows
{:error, :ambiguous_result}
end
### Error Types
All execution functions return structured `Selecto.Error` for consistent error handling:
- `{:error, %Selecto.Error{type: :connection_error}}` - Database connection failures
- `{:error, %Selecto.Error{type: :query_error}}` - SQL execution errors
- `{:error, %Selecto.Error{type: :no_results}}` - execute_one/2 when 0 rows returned
- `{:error, %Selecto.Error{type: :multiple_results}}` - execute_one/2 when >1 rows returned
- `{:error, %Selecto.Error{type: :timeout_error}}` - Query timeout failures
"""
@doc """
Generate a selecto structure from a domain configuration and connection input.
## Parameters
- `domain` - Domain configuration map (see domain configuration docs)
- `postgrex_opts` - Connection input retained for backward compatibility.
This may be adapter-specific connection options, an Ecto repo, a live
connection pid/name, or a pooled connection reference.
- `opts` - Configuration options
## Options
- `:validate` - (boolean, default: true) Whether to validate the domain configuration
before processing. When `true`, will raise `Selecto.DomainValidator.ValidationError`
if the domain has structural issues like missing schemas, circular join dependencies,
or invalid advanced join configurations.
- `:pool` - (boolean, default: false) Whether to enable connection pooling
- `:pool_options` - Connection pool configuration options
- `:adapter` - (module, default: `SelectoDBPostgreSQL.Adapter`) Database adapter module
## Validation
Domain validation checks for:
- Required top-level keys (source, schemas)
- Schema structural integrity (required keys, column definitions)
- Association references to valid schemas
- Join references to existing associations
- Join dependency cycles that would cause infinite recursion
- Advanced join type requirements (dimension keys, hierarchy parameters, etc.)
- Field reference validity in filters and selectors
## Examples
# Basic usage (validation enabled by default)
selecto = Selecto.configure(domain, connection_input)
# With connection pooling
selecto = Selecto.configure(domain, connection_input, pool: true)
# Custom pool configuration
pool_opts = [pool_size: 20, max_overflow: 10]
selecto = Selecto.configure(domain, connection_input, pool: true, pool_options: pool_opts)
# Using existing pooled connection
{:ok, pool} = Selecto.ConnectionPool.start_pool(connection_input)
selecto = Selecto.configure(domain, {:pool, pool})
# Disable validation for performance-critical scenarios
selecto = Selecto.configure(domain, connection_input, validate: false)
# With Ecto repository and schema
selecto = Selecto.from_ecto(MyApp.Repo, MyApp.User)
# Validation can also be called explicitly
:ok = Selecto.DomainValidator.validate_domain!(domain)
selecto = Selecto.configure(domain, connection_input)
"""
@spec configure(Selecto.Types.domain(), term(), keyword()) :: t()
def configure(domain, postgrex_opts, opts \\ []) do
Selecto.OptionsValidator.validate_configure_opts!(opts)
Selecto.Configuration.configure(domain, postgrex_opts, opts)
end
@doc """
Configure Selecto from an Ecto repository and schema.
This convenience function automatically introspects the Ecto schema
and configures Selecto with the appropriate domain and database connection.
## Parameters
- `repo` - The Ecto repository module (e.g., MyApp.Repo)
- `schema` - The Ecto schema module to use as the source table
- `opts` - Configuration options (passed to EctoAdapter.configure/3)
## Examples
# Basic usage
selecto = Selecto.from_ecto(MyApp.Repo, MyApp.User)
# With joins and options
selecto = Selecto.from_ecto(MyApp.Repo, MyApp.User,
joins: [:posts, :profile],
redact_fields: [:password_hash]
)
# With validation
selecto = Selecto.from_ecto(MyApp.Repo, MyApp.User, validate: true)
"""
def from_ecto(repo, schema, opts \\ []) do
Selecto.Configuration.from_ecto(repo, schema, opts)
end
### Delegate to Selecto.Fields module
@spec filters(t()) :: %{String.t() => term()}
defdelegate filters(selecto), to: Selecto.Fields
@spec columns(t()) :: map()
defdelegate columns(selecto), to: Selecto.Fields
@spec joins(t()) :: map()
defdelegate joins(selecto), to: Selecto.Fields
@spec source_table(t()) :: Selecto.Types.table_name() | nil
defdelegate source_table(selecto), to: Selecto.Fields
@spec domain(t()) :: Selecto.Types.domain()
defdelegate domain(selecto), to: Selecto.Fields
@spec domain_data(t()) :: term()
defdelegate domain_data(selecto), to: Selecto.Fields
@spec detail_actions(t()) :: %{optional(atom() | String.t()) => map()}
defdelegate detail_actions(selecto), to: Selecto.Fields
@spec field(t(), Selecto.Types.field_name()) :: map() | nil
defdelegate field(selecto, field_name), to: Selecto.Fields
@spec extensions(t()) :: [{module(), keyword()}]
defdelegate extensions(selecto), to: Selecto.Fields
@doc """
Enhanced field resolution with disambiguation and error handling.
Provides detailed field information and helpful error messages.
"""
@spec resolve_field(t(), Selecto.Types.field_name()) :: {:ok, map()} | {:error, term()}
defdelegate resolve_field(selecto, field), to: Selecto.Fields
@doc """
Get all available fields across all joins and the source table.
"""
@spec available_fields(t()) :: [String.t()]
defdelegate available_fields(selecto), to: Selecto.Fields
@doc """
Infer the SQL type of an expression.
Returns the type of a field, function, literal, or complex expression.
Useful for type checking, validation, and UI components that need type information.
## Examples
# Field type lookup
{:ok, :string} = Selecto.infer_type(selecto, "product_name")
# Aggregate function
{:ok, :bigint} = Selecto.infer_type(selecto, {:count, "*"})
# Numeric aggregate
{:ok, :decimal} = Selecto.infer_type(selecto, {:sum, "price"})
# Literal
{:ok, :integer} = Selecto.infer_type(selecto, {:literal, 42})
"""
@spec infer_type(t(), term()) :: {:ok, Selecto.TypeSystem.sql_type()} | {:error, term()}
defdelegate infer_type(selecto, expression), to: Selecto.TypeSystem
@doc """
Build a registered scalar or predicate UDF expression.
This is a convenience wrapper around the normalized `{:udf, function_id, args}`
shape used by selectors and filters.
## Examples
Selecto.udf("similarity", ["name", "Acme"])
Selecto.udf(:matches_name, ["name", "Acme%"])
"""
@spec udf(atom() | String.t(), [term()] | term()) :: tuple()
def udf(function_id, args \\ []) do
Selecto.Expr.udf(function_id, args)
end
@doc """
Build a registered table-UDF expression for lateral joins and named laterals.
## Examples
Selecto.udf_table("nearby_points", ["location", 500])
Selecto.udf_table(:nearby_points, ["location", 500])
"""
@spec udf_table(atom() | String.t(), [term()] | term()) :: tuple()
def udf_table(function_id, args \\ []) do
{:udf_table, Selecto.UDF.normalize_id(function_id), List.wrap(args)}
end
@doc """
Check if two SQL types are compatible for comparisons or assignments.
## Examples
true = Selecto.types_compatible?(:integer, :decimal)
false = Selecto.types_compatible?(:string, :boolean)
"""
@spec types_compatible?(Selecto.TypeSystem.sql_type(), Selecto.TypeSystem.sql_type()) ::
boolean()
defdelegate types_compatible?(type1, type2), to: Selecto.TypeSystem, as: :compatible?
@doc """
Get the type category for a given SQL type.
Categories: :numeric, :string, :boolean, :datetime, :json, :array, :binary, :uuid, :unknown
"""
@spec type_category(Selecto.TypeSystem.sql_type()) :: Selecto.TypeSystem.type_category()
defdelegate type_category(type), to: Selecto.TypeSystem
@doc """
Get field suggestions for autocomplete or error recovery.
"""
@spec field_suggestions(t(), String.t()) :: [String.t()]
defdelegate field_suggestions(selecto, partial_name), to: Selecto.Fields
@spec set(t()) :: Selecto.Types.query_set()
defdelegate set(selecto), to: Selecto.Fields
@doc """
Enable a join from the domain configuration or add a custom join dynamically.
This allows adding joins at runtime that either:
- Enable predefined joins from the domain configuration
- Add completely custom joins not in the domain
## Parameters
- `join_id` - The join identifier (atom)
- `options` - Optional configuration overrides
## Options
- `:type` - Join type (:left, :inner, :right, :full). Default: :left
- `:source` - Source table name (required for custom joins)
- `:on` - Join conditions as list of maps with :left and :right keys
- `:owner_key` - The key on the parent table
- `:related_key` - The key on the joined table
- `:fields` - Map of field configurations to expose from the joined table
## Examples
# Enable domain-configured join
selecto |> Selecto.join(:category)
# Custom join with explicit configuration
selecto |> Selecto.join(:audit_log,
source: "audit_logs",
on: [%{left: "id", right: "record_id"}],
type: :left,
fields: %{
action: %{type: :string},
created_at: %{type: :naive_datetime}
}
)
"""
@spec join(t(), atom(), keyword()) :: t()
defdelegate join(selecto, join_id, options \\ []), to: Selecto.DynamicJoin
@doc """
Create a parameterized instance of an existing join.
Parameterized joins allow the same association to be joined multiple times
with different filter conditions. The parameter creates a unique instance
that can be referenced using dot notation: `join_name:parameter.field_name`
## Parameters
- `join_id` - Base join identifier to parameterize
- `parameter` - Unique parameter value to identify this instance
- `options` - Filter conditions and options
## Examples
# Create parameterized join for electronics products
selecto
|> Selecto.join_parameterize(:products, "electronics", category_id: 1)
|> Selecto.select(["products:electronics.product_name"])
# Multiple parameterized instances for comparison
selecto
|> Selecto.join_parameterize(:orders, "active", status: "active")
|> Selecto.join_parameterize(:orders, "completed", status: "completed")
|> Selecto.select([
"orders:active.total as active_total",
"orders:completed.total as completed_total"
])
"""
@spec join_parameterize(t(), atom(), String.t() | atom(), keyword()) :: t()
defdelegate join_parameterize(selecto, join_id, parameter, options \\ []),
to: Selecto.DynamicJoin
@doc """
Join with another Selecto query as a subquery.
This creates a join using a separate Selecto query as the right side,
enabling complex subquery joins for aggregations and derived tables.
## Parameters
- `join_id` - Identifier for this join
- `join_selecto` - The Selecto struct to use as subquery
- `options` - Join configuration
## Options
- `:type` - Join type (:left, :inner, :right, :full). Default: :left
- `:on` - Join conditions referencing the subquery alias
## Examples
# Create a subquery for aggregated data
order_totals = Selecto.configure(order_domain, connection)
|> Selecto.select(["customer_id", {:sum, "total", as: "total_spent"}])
|> Selecto.group_by(["customer_id"])
# Join aggregated subquery to main query
selecto
|> Selecto.join_subquery(:customer_totals, order_totals,
on: [%{left: "customer_id", right: "customer_id"}]
)
|> Selecto.select(["name", "customer_totals.total_spent"])
"""
@spec join_subquery(t(), atom(), t(), keyword()) :: t()
defdelegate join_subquery(selecto, join_id, join_selecto, options \\ []),
to: Selecto.DynamicJoin
@doc """
Apply a named subquery preset from `domain.query_members.subqueries`.
This resolves a preconfigured subquery member and applies
`Selecto.join_subquery/4` with optional overrides.
## Examples
selecto
|> Selecto.with_subquery(:high_value_orders)
selecto
|> Selecto.with_subquery(:high_value_orders, type: :left)
"""
@spec with_subquery(t(), atom() | String.t(), keyword() | map()) :: t()
def with_subquery(selecto, member_id, opts \\ [])
def with_subquery(selecto, member_id, opts)
when (is_atom(member_id) or is_binary(member_id)) and (is_list(opts) or is_map(opts)) do
normalized_overrides = normalize_named_query_member_opts(opts)
{member_name, raw_spec} = fetch_named_query_member!(selecto, :subqueries, member_id)
spec = normalize_named_query_member_spec(raw_spec)
kind = Map.get(spec, :kind, :join)
if kind != :join do
raise ArgumentError,
"Named subquery '#{member_name}' has unsupported kind #{inspect(kind)}. Only :join is currently supported."
end
query_source =
resolve_override_or_spec_value(
normalized_overrides,
spec,
[:query, :query_builder],
[:query, :query_builder]
)
join_selecto =
evaluate_named_query_member_query!(query_source, selecto, :subqueries, member_name)
join_id =
normalized_overrides
|> Keyword.get(:join_id, Map.get(spec, :join_id, member_name))
|> normalize_values_join_id()
default_options =
[]
|> maybe_put_keyword(:type, Map.get(spec, :type))
|> maybe_put_keyword(:on, normalize_subquery_on(Map.get(spec, :on)))
|> Keyword.merge(ensure_keyword_opts(Map.get(spec, :options, []), :subqueries, member_name))
override_options =
merge_named_member_options(
normalized_overrides,
:subqueries,
member_name,
[:query, :query_builder, :join_id, :kind]
)
override_options =
if Keyword.has_key?(override_options, :on) do
Keyword.update!(override_options, :on, &normalize_subquery_on/1)
else
override_options
end
final_options =
default_options
|> Keyword.merge(override_options)
Selecto.join_subquery(selecto, join_id, join_selecto, final_options)
end
@doc """
Add fields to the select list.
For macro-free query composition, prefer importing `Selecto.Expr` and using
string field paths plus runtime helper constructors.
## Examples
import Selecto.Expr
selecto
|> Selecto.select(["order_number", "customer.name", as(sum("total"), "customer_total")])
|> Selecto.select(count_distinct("customer.id"))
"""
@spec select(t(), [Selecto.Types.selector()]) :: t()
@spec select(t(), Selecto.Types.selector()) :: t()
defdelegate select(selecto, fields_or_field), to: Selecto.Query
@doc """
Compile a nested selection shape and attach it to the query.
This is an opt-in structured selection API. Use `execute_shape/2` to
materialize results into the same list/tuple structure.
Nested lists/tuples that only reference a single joined schema are treated as
subselect nodes.
"""
@spec select_shape(t(), list() | tuple()) :: t()
defdelegate select_shape(selecto, shape), to: Selecto.SelectionShape
@doc """
Execute a query configured with `select_shape/2` and return shaped rows.
Returns `{:ok, shaped_rows}` where each row mirrors the selection shape.
"""
@spec execute_shape(t(), Selecto.Types.execute_options()) ::
{:ok, list()} | {:error, Selecto.Error.t()}
defdelegate execute_shape(selecto, opts \\ []), to: Selecto.SelectionShape
@doc """
Add filters to the query.
For macro-free query composition, prefer importing `Selecto.Expr` and using
runtime filter helpers.
## Examples
import Selecto.Expr
selecto
|> Selecto.filter(eq("status", "delivered"))
|> Selecto.filter(compact_and([not_null("customer.id"), gte("total", 100)]))
"""
@spec filter(t(), [Selecto.Types.filter()]) :: t()
@spec filter(t(), Selecto.Types.filter()) :: t()
defdelegate filter(selecto, filters_or_filter), to: Selecto.Query
@doc """
Return required filters currently attached to the query.
This includes domain-level required filters and tenant-scope required filters
attached at runtime.
"""
@spec required_filters(t()) :: [Selecto.Types.filter()]
defdelegate required_filters(selecto), to: Selecto.Query
@doc """
Attach tenant context to the query state.
"""
@spec with_tenant(t(), map() | keyword() | String.t() | atom() | nil) :: t()
defdelegate with_tenant(selecto, tenant_context), to: Selecto.Tenant
@doc """
Read tenant context from query state.
"""
@spec tenant(t()) :: map() | nil
defdelegate tenant(selecto), to: Selecto.Tenant
@doc """
Apply tenant scope as required filters.
"""
@spec apply_tenant_scope(t(), keyword()) :: t()
defdelegate apply_tenant_scope(selecto, opts \\ []), to: Selecto.Tenant
@doc """
Add a required tenant filter to query state.
"""
@spec require_tenant_filter(t(), Selecto.Types.filter()) :: t()
@spec require_tenant_filter(t(), atom() | String.t(), term()) :: t()
def require_tenant_filter(selecto, {_, _} = filter) do
Selecto.Tenant.require_tenant_filter(selecto, filter)
end
def require_tenant_filter(selecto, tenant_field, tenant_id) do
Selecto.Tenant.require_tenant_filter(selecto, tenant_field, tenant_id)
end
@doc """
Append filters explicitly to the pre-retarget filter list.
These filters stay attached to the original root side when using `retarget/3`.
"""
@spec pre_retarget_filter(t(), [Selecto.Types.filter()]) :: t()
@spec pre_retarget_filter(t(), Selecto.Types.filter()) :: t()
defdelegate pre_retarget_filter(selecto, filters_or_filter), to: Selecto.Query
@doc """
Append filters explicitly to the post-retarget filter list.
These filters apply to the retargeted target root.
"""
@spec post_retarget_filter(t(), [Selecto.Types.filter()]) :: t()
@spec post_retarget_filter(t(), Selecto.Types.filter()) :: t()
defdelegate post_retarget_filter(selecto, filters_or_filter), to: Selecto.Query
@doc """
Read pre-retarget filters from the query set (`set.filtered`).
"""
@spec pre_retarget_filters(t()) :: [Selecto.Types.filter()]
defdelegate pre_retarget_filters(selecto), to: Selecto.Query
@doc """
Read post-retarget filters from the query set (`set.post_retarget_filters`).
"""
@spec post_retarget_filters(t()) :: [Selecto.Types.filter()]
defdelegate post_retarget_filters(selecto), to: Selecto.Query
@doc false
@deprecated "Use pre_retarget_filter/2 instead."
@spec pre_pivot_filter(t(), [Selecto.Types.filter()]) :: t()
@spec pre_pivot_filter(t(), Selecto.Types.filter()) :: t()
def pre_pivot_filter(selecto, filters_or_filter),
do: Selecto.Query.pre_retarget_filter(selecto, filters_or_filter)
@doc false
@deprecated "Use post_retarget_filter/2 instead."
@spec post_pivot_filter(t(), [Selecto.Types.filter()]) :: t()
@spec post_pivot_filter(t(), Selecto.Types.filter()) :: t()
def post_pivot_filter(selecto, filters_or_filter),
do: Selecto.Query.post_retarget_filter(selecto, filters_or_filter)
@doc false
@deprecated "Use pre_retarget_filters/1 instead."
@spec pre_pivot_filters(t()) :: [Selecto.Types.filter()]
def pre_pivot_filters(selecto), do: Selecto.Query.pre_retarget_filters(selecto)
@doc false
@deprecated "Use post_retarget_filters/1 instead."
@spec post_pivot_filters(t()) :: [Selecto.Types.filter()]
def post_pivot_filters(selecto), do: Selecto.Query.post_retarget_filters(selecto)
@doc """
Return query filters from current filter buckets.
Options:
- `:include_post_retarget` (default: `true`)
"""
@spec query_filters(t(), keyword()) :: [Selecto.Types.filter()]
defdelegate query_filters(selecto, opts \\ []), to: Selecto.Query
@doc """
Return whether tenant scope is required for this query.
"""
@spec tenant_required?(t(), keyword()) :: boolean()
defdelegate tenant_required?(selecto, opts \\ []), to: Selecto.Tenant
@doc """
Validate tenant scope and return `:ok` or structured validation error.
"""
@spec validate_tenant_scope(t(), keyword()) :: :ok | {:error, Selecto.Error.t()}
defdelegate validate_tenant_scope(selecto, opts \\ []), to: Selecto.Tenant, as: :validate_scope
@doc """
Raise if tenant scope is required and missing.
"""
@spec ensure_tenant_scope!(t(), keyword()) :: :ok
defdelegate ensure_tenant_scope!(selecto, opts \\ []), to: Selecto.Tenant, as: :ensure_scope!
@doc """
Add to the Order By clause.
## Examples
import Selecto.Expr
selecto
|> Selecto.order_by([asc("inserted_at"), desc("priority")])
"""
@spec order_by(t(), [Selecto.Types.order_spec()]) :: t()
@spec order_by(t(), Selecto.Types.order_spec()) :: t()
defdelegate order_by(selecto, orders), to: Selecto.Query
@doc """
Add to the Group By clause.
## Examples
import Selecto.Expr
selecto
|> Selecto.group_by(["customer.name"])
|> Selecto.group_by(rollup(["status"]))
"""
@spec group_by(t(), [Selecto.Types.field_name()]) :: t()
@spec group_by(t(), Selecto.Types.field_name()) :: t()
defdelegate group_by(selecto, groups), to: Selecto.Query
@doc """
Limit the number of rows returned by the query.
## Examples
# Limit to 10 rows
selecto |> Selecto.limit(10)
# Limit with offset for pagination
selecto |> Selecto.limit(10) |> Selecto.offset(20)
"""
@spec limit(t(), non_neg_integer()) :: t()
defdelegate limit(selecto, limit_value), to: Selecto.Query
@doc """
Set the offset for the query results.
## Examples
# Skip first 20 rows
selecto |> Selecto.offset(20)
# Pagination: page 3 with 10 items per page
selecto |> Selecto.limit(10) |> Selecto.offset(20)
"""
@spec offset(t(), non_neg_integer()) :: t()
defdelegate offset(selecto, offset_value), to: Selecto.Query
@doc """
Retarget the query to focus on a different table while preserving existing context.
This allows you to retarget a Selecto query from the source table to any joined
table, while preserving existing filters through subqueries.
## Examples
# Retarget from events to orders while preserving event filters
selecto
|> Selecto.filter([{"event_id", 123}])
|> Selecto.retarget(:orders)
|> Selecto.select(["product_name", "quantity"])
## Options
- `:preserve_filters` - Whether to preserve existing filters (default: true)
- `:subquery_strategy` - How to generate the subquery (`:in`, `:exists`, `:join`)
See `Selecto.Retarget` module for more details.
"""
def retarget(selecto, target_schema, opts \\ []) do
Selecto.Retarget.retarget(selecto, target_schema, opts)
end
@doc false
@deprecated "Use retarget/3 instead."
def pivot(selecto, target_schema, opts \\ []) do
Selecto.Retarget.retarget(selecto, target_schema, opts)
end
@doc """
Add subselect fields to return related data as aggregated arrays.
This prevents result set denormalization while maintaining relational context
by returning related data as JSON arrays, PostgreSQL arrays, or other formats.
## Examples
# Basic subselect - get orders as JSON for each attendee
selecto
|> Selecto.select(["attendee.name"])
|> Selecto.subselect(["order.product_name", "order.quantity"])
# With custom configuration
selecto
|> Selecto.subselect([
%{
fields: ["product_name", "quantity"],
target_schema: :order,
format: :json_agg,
alias: "order_items"
}
])
## Options
- `:format` - Aggregation format (`:json_agg`, `:array_agg`, `:string_agg`, `:count`)
- `:alias_prefix` - Prefix for generated field aliases
See `Selecto.Subselect` module for more details.
"""
def subselect(selecto, field_specs, opts \\ []) do
Selecto.Subselect.subselect(selecto, field_specs, opts)
end
@spec gen_sql(t(), keyword()) :: {String.t(), map(), list()}
def gen_sql(selecto, opts) do
Selecto.Builder.Sql.build(selecto, opts)
end
@doc """
Generate and run the query, returning {:ok, result} or {:error, reason}.
Non-raising version that returns tagged tuples for better error handling.
Result format: {:ok, {rows, columns, aliases}} | {:error, reason}
## Examples
case Selecto.execute(selecto) do
{:ok, {rows, columns, aliases}} ->
# Handle successful query
process_results(rows, columns)
{:error, reason} ->
# Handle database error
Logger.error("Query failed: \#{inspect(reason)}")
end
"""
@spec execute(Selecto.Types.t(), Selecto.Types.execute_options()) ::
Selecto.Types.safe_execute_result()
def execute(selecto, opts \\ []) do
Selecto.OptionsValidator.validate_execute_opts!(opts)
# Delegate to the extracted Executor module
Selecto.Executor.execute(selecto, Selecto.Tenant.merge_execution_opts(selecto, opts))
end
@doc """
Execute a query and return results with metadata including SQL, params, and execution time.
## Parameters
- `selecto` - The Selecto struct containing connection and query info
- `opts` - Execution options
## Returns
- `{:ok, result, metadata}` - Successful execution with results and metadata
- `{:error, error}` - Execution failure with detailed error
The metadata map includes:
- `:sql` - The generated SQL query string
- `:params` - The query parameters
- `:execution_time` - Query execution time in milliseconds
## Examples
case Selecto.execute_with_metadata(selecto) do
{:ok, {rows, columns, aliases}, _metadata} ->
# Process successful results with metadata
handle_results(rows, columns, aliases)
{:error, error} ->
# Handle database error
Logger.error("Query failed: \#{inspect(error)}")
end
"""
@spec execute_with_metadata(Selecto.Types.t(), Selecto.Types.execute_options()) ::
{:ok, Selecto.Types.execute_result(), map()} | {:error, Selecto.Error.t()}
def execute_with_metadata(selecto, opts \\ []) do
Selecto.OptionsValidator.validate_execute_opts!(opts)
# Delegate to the extracted Executor module
Selecto.Executor.execute_with_metadata(
selecto,
Selecto.Tenant.merge_execution_opts(selecto, opts)
)
end
@doc """
Execute a query as a database-backed stream.
Returns a stream of `{row, columns, aliases}` tuples for incremental result
consumption.
## Options
- `:max_rows` - PostgreSQL cursor batch size (default `500`)
- `:receive_timeout` - stream consumer wait timeout in ms (default `60000`)
- `:queue_timeout` - internal task yield timeout in ms (default `100`)
- `:stream_timeout` - transaction timeout for cursor execution (default `30000`)
## Notes
- Direct PostgreSQL connections use cursor-backed streaming.
- Adapter-backed streaming requires `adapter.stream/4` support.
- Ecto repo and pooled PostgreSQL stream paths currently return structured
`:validation_error` responses.
"""
@spec execute_stream(Selecto.Types.t(), keyword()) :: Selecto.Types.safe_execute_stream_result()
def execute_stream(selecto, opts \\ []) do
Selecto.OptionsValidator.validate_execute_opts!(opts)
Selecto.Executor.execute_stream(selecto, Selecto.Tenant.merge_execution_opts(selecto, opts))
end
@doc """
Execute a query expecting exactly one row, returning {:ok, row} or {:error, reason}.
Useful for queries that should return a single record (e.g., with LIMIT 1 or aggregate functions).
Returns an error if zero rows or multiple rows are returned.
## Examples
case Selecto.execute_one(selecto) do
{:ok, row} ->
# Handle single row result
process_single_result(row)
{:error, :no_results} ->
# Handle case where no rows were found
{:error, :multiple_results} ->
# Handle case where multiple rows were found
{:error, error} ->
# Handle database or other errors
end
"""
@spec execute_one(Selecto.Types.t(), Selecto.Types.execute_options()) ::
Selecto.Types.safe_execute_one_result()
def execute_one(selecto, opts \\ []) do
Selecto.OptionsValidator.validate_execute_opts!(opts)
# Delegate to the extracted Executor module
Selecto.Executor.execute_one(selecto, Selecto.Tenant.merge_execution_opts(selecto, opts))
end
@doc """
Generate SQL without executing - useful for debugging and caching.
Supports optional readability controls:
- `pretty: true`
- `highlight: :ansi | :markdown`
"""
@spec to_sql(t(), keyword()) :: {String.t(), list()}
def to_sql(selecto, opts \\ []) do
Selecto.OptionsValidator.validate_to_sql_opts!(opts)
Selecto.Query.to_sql(selecto, opts)
end
@doc """
Format SQL output for readability.
"""
@spec format_sql(String.t(), keyword()) :: String.t()
defdelegate format_sql(sql, opts \\ []), to: Selecto.SQL.Formatter, as: :format
@doc """
Apply optional highlighting to SQL (`:ansi` or `:markdown`).
"""
@spec highlight_sql(String.t(), :ansi | :markdown | nil) :: String.t()
defdelegate highlight_sql(sql, style), to: Selecto.SQL.Formatter, as: :highlight
@doc """
Run EXPLAIN for a query and return plan details.
"""
@spec explain(t(), keyword()) :: {:ok, map()} | {:error, Selecto.Error.t()}
def explain(selecto, opts \\ []) do
Selecto.OptionsValidator.validate_diagnostic_opts!(opts)
Selecto.Diagnostics.explain(selecto, opts)
end
@doc """
Run EXPLAIN ANALYZE for a query and return plan details.
"""
@spec explain_analyze(t(), keyword()) :: {:ok, map()} | {:error, Selecto.Error.t()}
def explain_analyze(selecto, opts \\ []) do
Selecto.OptionsValidator.validate_diagnostic_opts!(opts)
Selecto.Diagnostics.explain_analyze(selecto, opts)
end
@doc """
Add a window function to the query.
Window functions provide analytical capabilities over a set of rows related to
the current row, without grouping rows into a single result.
## Examples
# Add row numbers within each category
selecto |> Selecto.window_function(:row_number,
over: [partition_by: ["category"], order_by: ["created_at"]])
# Calculate running total
selecto |> Selecto.window_function(:sum, ["amount"],
over: [partition_by: ["user_id"], order_by: ["date"]],
as: "running_total")
# Get previous value for comparison
selecto |> Selecto.window_function(:lag, ["amount", 1],
over: [partition_by: ["user_id"], order_by: ["date"]],
as: "prev_amount")
"""
def window_function(selecto, function, arguments \\ [], options) do
Selecto.Window.add_window_function(selecto, function, arguments, options)
end
@doc """
Add an UNNEST operation to expand array columns into rows.
UNNEST transforms array values into a set of rows, one for each array element.
Can optionally include ordinality to track position in the array.
## Examples
# Basic unnest
selecto |> Selecto.unnest("tags", as: "tag")
# Unnest with ordinality (includes position)
selecto |> Selecto.unnest("tags", as: "tag", ordinality: "tag_position")
# Multiple unnests (will be cross-joined)
selecto
|> Selecto.unnest("tags", as: "tag")
|> Selecto.unnest("categories", as: "category")
"""
def unnest(selecto, array_field, opts \\ []) do
Selecto.QueryValidator.validate_unnest_source!(selecto, array_field)
alias_name = Keyword.get(opts, :as, "unnested_#{array_field}")
ordinality = Keyword.get(opts, :ordinality)
unnest_spec = %{
field: array_field,
alias: alias_name,
ordinality: ordinality
}
current_unnests = Map.get(selecto.set, :unnest, [])
updated_selecto = put_in(selecto.set[:unnest], current_unnests ++ [unnest_spec])
# Register the unnested field in the configuration
# This allows it to be selected and used in GROUP BY
current_columns = Map.get(updated_selecto.config, :columns, %{})
# Add the unnested field as a column
unnested_column = %{
name: alias_name,
field: alias_name,
requires_join: nil,
# Default type for unnested array elements
type: :text
}
# Also add ordinality column if requested
columns_to_add =
if ordinality do
%{
alias_name => unnested_column,
"#{alias_name}_ordinality" => %{
name: "#{alias_name}_ordinality",
field: "#{alias_name}_ordinality",
requires_join: nil,
type: :integer
}
}
else
%{alias_name => unnested_column}
end
put_in(updated_selecto.config[:columns], Map.merge(current_columns, columns_to_add))
end
@doc """
Add a MySQL `JSON_TABLE` rowset expansion to the query.
This helper projects JSON-derived rows as a joinable rowset and registers the
projected columns into the query config so they can be selected and filtered
with qualified field references such as `item_rows.sku`.
"""
@spec json_table(t(), atom() | String.t(), keyword() | map()) :: t()
def json_table(selecto, source_field, opts \\ [])
def json_table(selecto, source_field, opts) when is_map(opts) do
json_table(selecto, source_field, Enum.into(opts, []))
end
def json_table(selecto, source_field, opts) when is_list(opts) do
Selecto.QueryValidator.validate_table_source!(selecto, source_field)
alias_name = opts |> Keyword.fetch!(:as) |> to_string()
path = Keyword.get(opts, :path, "$[*]")
join_type = Keyword.get(opts, :join_type, :inner)
columns =
opts
|> Keyword.fetch!(:columns)
|> Enum.map(&normalize_json_table_column!/1)
table_function = {:json_table, normalize_json_table_source(source_field), path, columns}
selecto
|> Selecto.lateral_join(join_type, table_function, alias_name)
|> register_json_table_columns(alias_name, columns)
|> upsert_lateral_by_alias(alias_name)
end
@doc """
Add a SQLite JSON rowset expansion using `json_each` or `json_tree`.
This helper registers the standard SQLite JSON rowset columns so they can be
selected and filtered with qualified references such as `item_rows.value`.
"""
@spec json_rowset(t(), atom() | String.t(), keyword() | map()) :: t()
def json_rowset(selecto, source_field, opts \\ [])
def json_rowset(selecto, source_field, opts) when is_map(opts) do
json_rowset(selecto, source_field, Enum.into(opts, []))
end
def json_rowset(selecto, source_field, opts) when is_list(opts) do
Selecto.QueryValidator.validate_table_source!(selecto, source_field)
alias_name = opts |> Keyword.fetch!(:as) |> to_string()
path = Keyword.get(opts, :path)
join_type = Keyword.get(opts, :join_type, :inner)
function_name = Keyword.get(opts, :function, :json_each)
table_function =
case function_name do
:json_each ->
{:json_each, normalize_json_table_source(source_field), path}
:json_tree ->
{:json_tree, normalize_json_table_source(source_field), path}
other ->
raise ArgumentError,
"SQLite JSON rowset function must be :json_each or :json_tree. Got: #{inspect(other)}"
end
selecto
|> Selecto.lateral_join(join_type, table_function, alias_name)
|> register_sqlite_json_rowset_columns(alias_name)
|> upsert_lateral_by_alias(alias_name)
end
@doc """
Add an adapter-aware text-search rank selector.
This is the shared entry point for ranking support. Adapters can map it onto
their native ranking primitives where available.
"""
@spec text_search_rank(t(), atom() | String.t() | [atom() | String.t()], keyword() | map()) ::
t()
def text_search_rank(selecto, fields, opts \\ [])
def text_search_rank(selecto, fields, opts) when is_map(opts) do
text_search_rank(selecto, fields, Enum.into(opts, []))
end
def text_search_rank(selecto, fields, opts) when is_list(opts) do
case Selecto.AdapterSupport.adapter_name(Map.get(selecto, :adapter)) do
:mysql ->
mysql_text_search_rank(selecto, fields, opts)
:postgresql ->
postgresql_text_search_rank(selecto, fields, opts)
:sqlite ->
sqlite_fts_rank(selecto, fields, opts)
adapter_name ->
raise ArgumentError,
"text_search_rank/3 is not yet implemented for adapter #{inspect(adapter_name)}"
end
end
@doc false
def mysql_text_search_rank(selecto, fields, opts) when is_list(opts) do
normalized_fields = Enum.map(List.wrap(fields), &to_string/1)
if normalized_fields == [] do
raise ArgumentError, "mysql text_search_rank/3 requires at least one field"
end
alias_name = Keyword.get(opts, :as, "fts_rank")
query = Keyword.get(opts, :query)
mode = Keyword.get(opts, :mode, :natural)
if is_nil(query) do
raise ArgumentError, "mysql text_search_rank/3 requires a :query option"
end
if Keyword.has_key?(opts, :weights) do
raise ArgumentError, "mysql text_search_rank/3 does not support :weights yet"
end
match_args =
normalized_fields
|> Enum.map(fn field -> "selecto_root.#{field}" end)
|> Enum.join(", ")
selector =
{:custom_sql,
"MATCH(#{match_args}) AGAINST ('#{escape_sql_literal(query)}'#{mysql_rank_mode_sql(selecto, mode)}) AS \"#{alias_name}\"",
%{}}
put_in(selecto.set[:selected], Enum.uniq(selecto.set.selected ++ [selector]))
end
@doc false
def postgresql_text_search_rank(selecto, fields, opts) when is_list(opts) do
normalized_fields = List.wrap(fields)
if length(normalized_fields) != 1 do
raise ArgumentError,
"postgresql text_search_rank/3 currently requires exactly one tsvector field"
end
[field] = normalized_fields
conf = postgresql_text_search_rank_field_conf!(selecto, field)
alias_name = Keyword.get(opts, :as, "fts_rank")
query = Keyword.get(opts, :query)
mode = Keyword.get(opts, :mode, :websearch)
if is_nil(query) do
raise ArgumentError, "postgresql text_search_rank/3 requires a :query option"
end
if Keyword.has_key?(opts, :weights) do
raise ArgumentError, "postgresql text_search_rank/3 does not support :weights yet"
end
query_function = postgresql_text_search_query_function!(mode)
field_ref = Map.get(conf, :field, field)
selector =
{:field,
{:func, :ts_rank, [to_string(field_ref), {:func, query_function, [{:literal, query}]}]},
to_string(alias_name)}
put_in(selecto.set[:selected], Enum.uniq(selecto.set.selected ++ [selector]))
end
@doc """
Add a SQLite FTS5 ranking selector using `bm25(...)`.
This helper is intentionally narrow: all referenced fields must be configured
as SQLite FTS5 fields on the same source alias.
"""
@spec sqlite_fts_rank(t(), atom() | String.t() | [atom() | String.t()], keyword() | map()) ::
t()
def sqlite_fts_rank(selecto, fields, opts \\ [])
def sqlite_fts_rank(selecto, fields, opts) when is_map(opts) do
sqlite_fts_rank(selecto, fields, Enum.into(opts, []))
end
def sqlite_fts_rank(selecto, fields, opts) when is_list(opts) do
adapter = Map.get(selecto, :adapter)
if Selecto.AdapterSupport.adapter_name(adapter) != :sqlite do
raise ArgumentError, "sqlite_fts_rank/3 requires the SQLite adapter"
end
normalized_fields = List.wrap(fields)
if normalized_fields == [] do
raise ArgumentError, "sqlite_fts_rank/3 requires at least one FTS field"
end
alias_name = Keyword.get(opts, :as, "fts_rank")
weights = Keyword.get(opts, :weights, [])
source_table = sqlite_fts_rank_source_table!(selecto, normalized_fields)
bm25_args =
case weights do
[] -> source_table
list when is_list(list) -> Enum.join([source_table | Enum.map(list, &to_string/1)], ", ")
end
selector = {:custom_sql, "bm25(#{bm25_args}) AS \"#{alias_name}\"", %{}}
put_in(selecto.set[:selected], Enum.uniq(selecto.set.selected ++ [selector]))
end
@doc """
Apply a named UNNEST preset from `domain.query_members.unnests`.
## Examples
selecto
|> Selecto.with_unnest(:product_tags)
"""
@spec with_unnest(t(), atom() | String.t(), keyword() | map()) :: t()
def with_unnest(selecto, member_id, opts \\ [])
def with_unnest(selecto, member_id, opts)
when (is_atom(member_id) or is_binary(member_id)) and (is_list(opts) or is_map(opts)) do
normalized_overrides = normalize_named_query_member_opts(opts)
{member_name, raw_spec} = fetch_named_query_member!(selecto, :unnests, member_id)
spec = normalize_named_query_member_spec(raw_spec)
array_field =
Keyword.get(
normalized_overrides,
:array_field,
Keyword.get(normalized_overrides, :field, map_get(spec, :array_field, :field))
)
if is_nil(array_field) do
raise ArgumentError,
"Named UNNEST '#{member_name}' requires :array_field (or :field)."
end
alias_name =
normalized_overrides
|> Keyword.get(
:as,
Keyword.get(normalized_overrides, :alias, Keyword.get(normalized_overrides, :alias_name))
) || values_member_alias(spec) || "unnested_#{array_field}"
ordinality = Keyword.get(normalized_overrides, :ordinality, Map.get(spec, :ordinality))
default_options = ensure_keyword_opts(Map.get(spec, :options, []), :unnests, member_name)
override_options =
normalized_overrides
|> Keyword.drop([
:array_field,
:field,
:as,
:alias,
:alias_name,
:ordinality,
:options
])
override_nested_options =
ensure_keyword_opts(
Keyword.get(normalized_overrides, :options, :__missing__),
:unnests,
member_name
)
unnest_opts =
default_options
|> Keyword.put(:as, to_string(alias_name))
|> maybe_put_keyword(:ordinality, ordinality)
|> Keyword.merge(override_options)
|> Keyword.merge(override_nested_options)
selecto
|> Selecto.unnest(array_field, unnest_opts)
|> upsert_unnest_by_alias(to_string(alias_name))
end
defp upsert_unnest_by_alias(selecto, alias_name) do
unnests = Map.get(selecto.set, :unnest, [])
{matching, others} = Enum.split_with(unnests, &(Map.get(&1, :alias) == alias_name))
case matching do
[] ->
selecto
_ ->
put_in(selecto.set[:unnest], others ++ [List.last(matching)])
end
end
# DUPLICATE REMOVED - Consolidated with the version below that uses Advanced.CTE
# This version accepted (selecto, cte_name, base_fn, recursive_fn, opts)
# The consolidated version below now handles both parameter formats
# DUPLICATE REMOVED - Consolidated with the version below that uses Advanced.LateralJoin
# This version accepted (selecto, lateral_source, opts)
# The consolidated version below now handles both parameter formats
@doc """
Create a UNION set operation between two queries.
Combines results from multiple queries using UNION or UNION ALL.
All queries must have compatible column counts and types.
## Options
- `:all` - Use UNION ALL to include duplicates (default: false)
- `:column_mapping` - Map columns between incompatible schemas
## Examples
# Basic UNION (removes duplicates)
query1 |> Selecto.union(query2)
# UNION ALL (includes duplicates, faster)
query1 |> Selecto.union(query2, all: true)
# UNION with column mapping
customers |> Selecto.union(vendors,
column_mapping: [
{"name", "company_name"},
{"email", "contact_email"}
]
)
"""
def union(left_query, right_query, opts \\ []) do
Selecto.SetOperations.union(left_query, right_query, opts)
end
@doc """
Create an INTERSECT set operation between two queries.
Returns only rows that appear in both queries.
## Options
- `:all` - Use INTERSECT ALL to include duplicate intersections (default: false)
- `:column_mapping` - Map columns between incompatible schemas
## Examples
# Find users who are both active and premium
active_users |> Selecto.intersect(premium_users)
# Include duplicate intersections
query1 |> Selecto.intersect(query2, all: true)
"""
def intersect(left_query, right_query, opts \\ []) do
Selecto.SetOperations.intersect(left_query, right_query, opts)
end
@doc """
Create an EXCEPT set operation between two queries.
Returns rows from the first query that don't appear in the second query.
## Options
- `:all` - Use EXCEPT ALL to include duplicates in difference (default: false)
- `:column_mapping` - Map columns between incompatible schemas
## Examples
# Find free users (all users except premium)
all_users |> Selecto.except(premium_users)
# Include duplicates in difference
query1 |> Selecto.except(query2, all: true)
"""
def except(left_query, right_query, opts \\ []) do
Selecto.SetOperations.except(left_query, right_query, opts)
end
@doc """
Add a LATERAL join to the query.
LATERAL joins allow the right side of the join to reference columns from the
left side, enabling powerful correlated subquery patterns.
## Parameters
- `join_type` - Type of join (:left, :inner, :right, :full)
- `subquery_builder_or_function` - Function that builds correlated subquery or table function tuple
- `alias_name` - Alias for the LATERAL join results
- `opts` - Additional options
## Examples
# LATERAL join with correlated subquery
selecto
|> Selecto.lateral_join(
:left,
fn base_query ->
Selecto.configure(rental_domain, connection)
|> Selecto.select([{:func, "COUNT", ["*"], as: "rental_count"}])
|> Selecto.filter([{"customer_id", {:ref, "customer.customer_id"}}])
|> Selecto.limit(5)
end,
"recent_rentals"
)
# LATERAL join with table function
selecto
|> Selecto.lateral_join(
:inner,
{:unnest, "film.special_features"},
"features"
)
# LATERAL join with generate_series
selecto
|> Selecto.lateral_join(
:inner,
{:function, :generate_series, [1, 10]},
"numbers"
)
"""
# Consolidated version that handles both parameter formats:
# 1. (selecto, lateral_source, opts) - where opts contains :as and :type
# 2. (selecto, join_type, subquery_builder_or_function, alias_name, opts)
def lateral_join(selecto, arg2, arg3 \\ [], arg4 \\ nil, arg5 \\ []) do
# Determine which parameter format is being used
{join_type, subquery_builder_or_function, alias_name, opts} =
case {arg2, arg3, arg4, arg5} do
# Format 1: (selecto, lateral_source, opts)
{lateral_source, opts, nil, []} when is_list(opts) ->
alias_name = Keyword.fetch!(opts, :as)
join_type = Keyword.get(opts, :type, :left)
{join_type, lateral_source, alias_name, opts}
# Format 2: (selecto, join_type, subquery_builder_or_function, alias_name, opts)
{join_type, subquery_builder_or_function, alias_name, opts} ->
{join_type, subquery_builder_or_function, alias_name, opts}
end
# Handle different lateral source types
lateral_spec =
case subquery_builder_or_function do
# Handle inline spec format from old version
{:function, func_name, args, returns: returns} ->
%{
type: :table_function,
function: func_name,
arguments: args,
returns: returns,
alias: alias_name,
join_type: join_type
}
{:udf_table, function_id, _args} = udf_table ->
case Selecto.UDF.fetch(selecto, function_id) do
{:ok, spec} ->
kind = Map.get(spec, :kind) || Map.get(spec, "kind")
allowed_in = Map.get(spec, :allowed_in) || Map.get(spec, "allowed_in") || []
if kind != :table do
raise ArgumentError,
"UDF '#{Selecto.UDF.normalize_id(function_id)}' must be kind :table to be used in lateral joins"
end
if :lateral not in allowed_in and :query_member not in allowed_in do
raise ArgumentError,
"UDF '#{Selecto.UDF.normalize_id(function_id)}' is not allowed in :lateral. Allowed: #{inspect(allowed_in)}"
end
Selecto.Advanced.LateralJoin.create_lateral_join(
join_type,
udf_table,
alias_name,
opts
)
:error ->
raise ArgumentError, "Unknown UDF '#{Selecto.UDF.normalize_id(function_id)}'"
end
# For other cases, use Advanced.LateralJoin
_ ->
Selecto.Advanced.LateralJoin.create_lateral_join(
join_type,
subquery_builder_or_function,
alias_name,
opts
)
end
# Validate correlations
case Selecto.Advanced.LateralJoin.validate_correlations(lateral_spec, selecto) do
{:ok, validated_spec} ->
selecto =
maybe_register_lateral_source_columns(
selecto,
subquery_builder_or_function,
to_string(alias_name),
opts
)
# Add to selecto set
current_lateral_joins = Map.get(selecto.set, :lateral_joins, [])
updated_lateral_joins = current_lateral_joins ++ [validated_spec]
put_in(selecto.set[:lateral_joins], updated_lateral_joins)
{:error, correlation_error} ->
raise correlation_error
end
end
@doc """
Apply a LATERAL source directly or resolve a named LATERAL preset from
`domain.query_members.laterals`.
## Examples
selecto
|> Selecto.with_lateral(:recent_rentals)
selecto
|> Selecto.with_lateral(:recent_rentals, join_type: :inner)
selecto
|> Selecto.with_lateral(Selecto.udf_table("nearby_points", ["location", 500]),
as: "nearby_points",
join_type: :left
)
"""
@spec with_lateral(t(), atom() | String.t(), keyword() | map()) :: t()
def with_lateral(selecto, member_id, opts \\ [])
def with_lateral(selecto, %Selecto{} = lateral_source, opts)
when is_list(opts) or is_map(opts) do
apply_direct_lateral(selecto, lateral_source, opts)
end
def with_lateral(selecto, lateral_source, opts)
when (is_tuple(lateral_source) or is_function(lateral_source)) and
(is_list(opts) or is_map(opts)) do
apply_direct_lateral(selecto, lateral_source, opts)
end
def with_lateral(selecto, member_id, opts)
when (is_atom(member_id) or is_binary(member_id)) and (is_list(opts) or is_map(opts)) do
normalized_overrides = normalize_named_query_member_opts(opts)
{member_name, raw_spec} = fetch_named_query_member!(selecto, :laterals, member_id)
spec = normalize_named_query_member_spec(raw_spec)
lateral_source =
Keyword.get(
normalized_overrides,
:query,
Keyword.get(
normalized_overrides,
:source,
Keyword.get(
normalized_overrides,
:lateral_source,
map_get(spec, :query, :source, :lateral_source)
)
)
)
lateral_source = normalize_lateral_source!(lateral_source, selecto, member_name)
alias_name =
resolve_alias_name(normalized_overrides, values_member_alias(spec) || member_name)
join_type =
Keyword.get(
normalized_overrides,
:join_type,
Keyword.get(normalized_overrides, :type, map_get(spec, :join_type, :type) || :left)
)
join_type = normalize_lateral_join_type!(join_type, member_name)
default_options = ensure_keyword_opts(Map.get(spec, :options, []), :laterals, member_name)
override_options =
merge_named_member_options(
normalized_overrides,
:laterals,
member_name,
[:query, :source, :lateral_source, :as, :alias, :alias_name, :join_type, :type]
)
lateral_opts =
default_options
|> Keyword.merge(override_options)
selecto
|> Selecto.lateral_join(join_type, lateral_source, to_string(alias_name), lateral_opts)
|> maybe_register_lateral_source_columns(lateral_source, to_string(alias_name), lateral_opts)
|> upsert_lateral_by_alias(to_string(alias_name))
end
defp apply_direct_lateral(selecto, lateral_source, opts) do
normalized_overrides = normalize_named_query_member_opts(opts)
alias_name = resolve_alias_name(normalized_overrides)
if is_nil(alias_name) do
raise ArgumentError,
"Direct with_lateral/3 sources require :as, :alias, or :alias_name"
end
join_type =
Keyword.get(
normalized_overrides,
:join_type,
Keyword.get(normalized_overrides, :type, :left)
)
join_type = normalize_lateral_join_type!(join_type, to_string(alias_name))
lateral_opts =
merge_named_member_options(
normalized_overrides,
:laterals,
to_string(alias_name),
[:as, :alias, :alias_name, :join_type, :type]
)
selecto
|> Selecto.lateral_join(join_type, lateral_source, to_string(alias_name), lateral_opts)
|> maybe_register_lateral_source_columns(lateral_source, to_string(alias_name), lateral_opts)
|> upsert_lateral_by_alias(to_string(alias_name))
end
defp upsert_lateral_by_alias(selecto, alias_name) do
laterals = Map.get(selecto.set, :lateral_joins, [])
{matching, others} = Enum.split_with(laterals, &(Map.get(&1, :alias) == alias_name))
case matching do
[] ->
selecto
_ ->
put_in(selecto.set[:lateral_joins], others ++ [List.last(matching)])
end
end
defp normalize_json_table_source(field) when is_atom(field) do
normalize_json_table_source(Atom.to_string(field))
end
defp normalize_json_table_source(field) when is_binary(field) do
case String.split(field, ".", parts: 2) do
[_prefix, _column] -> field
[column] -> "selecto_root." <> column
end
end
defp normalize_json_table_column!({name, :for_ordinality}) do
%{name: to_string(name), for_ordinality: true, type: :integer}
end
defp normalize_json_table_column!({name, path}) when is_binary(path) do
%{name: to_string(name), path: path, type: :string}
end
defp normalize_json_table_column!({name, path, type}) when is_binary(path) do
%{name: to_string(name), path: path, type: type}
end
defp normalize_json_table_column!(%{name: name} = column) do
column
|> Map.put(:name, to_string(name))
|> Map.put_new(:type, if(Map.get(column, :for_ordinality), do: :integer, else: :string))
end
defp normalize_json_table_column!(other) do
raise ArgumentError,
"JSON_TABLE columns must be tuples or maps. Got: #{inspect(other)}"
end
defp register_json_table_columns(selecto, alias_name, columns) do
current_columns = Map.get(selecto.config, :columns, %{})
columns_to_add =
Enum.reduce(columns, %{}, fn %{name: name} = column, acc ->
Map.put(acc, "#{alias_name}.#{name}", %{
name: "#{alias_name}.#{name}",
field: name,
requires_join: alias_name,
type: Map.get(column, :type, :string)
})
end)
put_in(selecto.config[:columns], Map.merge(current_columns, columns_to_add))
end
defp register_sqlite_json_rowset_columns(selecto, alias_name) do
current_columns = Map.get(selecto.config, :columns, %{})
columns_to_add =
Enum.reduce(sqlite_json_rowset_columns(), %{}, fn {name, type}, acc ->
Map.put(acc, "#{alias_name}.#{name}", %{
name: "#{alias_name}.#{name}",
field: name,
requires_join: alias_name,
type: type
})
end)
put_in(selecto.config[:columns], Map.merge(current_columns, columns_to_add))
end
defp register_udf_table_columns(selecto, alias_name, columns) do
current_columns = Map.get(selecto.config, :columns, %{})
columns_to_add =
Enum.reduce(columns, %{}, fn {name, spec}, acc ->
type =
case spec do
%{type: type} -> type
%{"type" => type} -> type
_ -> :unknown
end
Map.put(acc, "#{alias_name}.#{name}", %{
name: "#{alias_name}.#{name}",
field: to_string(name),
requires_join: alias_name,
type: type
})
end)
put_in(selecto.config[:columns], Map.merge(current_columns, columns_to_add))
end
defp maybe_register_lateral_source_columns(
selecto,
{:json_table, _source_ref, _path, columns},
alias_name,
_opts
) do
register_json_table_columns(selecto, alias_name, columns)
end
defp maybe_register_lateral_source_columns(
selecto,
{function_name, _source_ref, _path},
alias_name,
_opts
)
when function_name in [:json_each, :json_tree] do
register_sqlite_json_rowset_columns(selecto, alias_name)
end
defp maybe_register_lateral_source_columns(
selecto,
{:udf_table, function_id, _args},
alias_name,
_opts
) do
case Selecto.UDF.fetch(selecto, function_id) do
{:ok, spec} ->
returns = Map.get(spec, :returns) || Map.get(spec, "returns") || %{}
columns = Map.get(returns, :columns) || Map.get(returns, "columns") || %{}
register_udf_table_columns(selecto, alias_name, columns)
:error ->
raise ArgumentError, "Unknown UDF '#{Selecto.UDF.normalize_id(function_id)}'"
end
end
defp maybe_register_lateral_source_columns(selecto, _lateral_source, _alias_name, _opts),
do: selecto
defp sqlite_json_rowset_columns do
[
{"key", :string},
{"value", :json},
{"type", :string},
{"atom", :string},
{"id", :integer},
{"parent", :integer},
{"fullkey", :string},
{"path", :string}
]
end
defp sqlite_fts_rank_source_table!(selecto, fields) do
fields
|> Enum.map(&sqlite_fts_rank_field_conf!(selecto, &1))
|> Enum.map(fn conf -> Map.get(conf, :requires_join, :selecto_root) end)
|> Enum.map(fn
:selecto_root -> "selecto_root"
value -> to_string(value)
end)
|> Enum.uniq()
|> case do
["selecto_root"] ->
selecto.domain.source.source_table
[alias_name] ->
raise ArgumentError,
"sqlite_fts_rank/3 currently supports only root-source FTS tables, got: #{inspect(alias_name)}"
aliases ->
raise ArgumentError,
"sqlite_fts_rank/3 requires FTS fields from one source alias, got: #{inspect(aliases)}"
end
end
defp sqlite_fts_rank_field_conf!(selecto, field) do
columns = selecto.config[:columns] || %{}
field_key = to_string(field)
conf = Map.get(columns, field_key) || Map.get(columns, safe_existing_atom(field_key))
cond do
is_nil(conf) ->
raise ArgumentError, "sqlite_fts_rank/3 field not found: #{inspect(field)}"
Map.get(conf, :type) == :fts5 or Map.get(conf, :sqlite_fts5) == true or
Map.get(conf, :text_search_backend) == :fts5 ->
conf
true ->
raise ArgumentError,
"sqlite_fts_rank/3 field is not configured for SQLite FTS5: #{inspect(field)}"
end
end
defp postgresql_text_search_rank_field_conf!(selecto, field) do
columns = selecto.config[:columns] || %{}
field_key = to_string(field)
conf = Map.get(columns, field_key) || Map.get(columns, safe_existing_atom(field_key))
cond do
is_nil(conf) ->
raise ArgumentError, "postgresql text_search_rank/3 field not found: #{inspect(field)}"
Map.get(conf, :type) == :tsvector or Map.get(conf, :text_search_backend) == :postgresql ->
conf
true ->
raise ArgumentError,
"postgresql text_search_rank/3 field is not configured for PostgreSQL text search: #{inspect(field)}"
end
end
defp postgresql_text_search_query_function!(:web), do: :websearch_to_tsquery
defp postgresql_text_search_query_function!(:websearch), do: :websearch_to_tsquery
defp postgresql_text_search_query_function!(:plain), do: :plainto_tsquery
defp postgresql_text_search_query_function!(:natural), do: :plainto_tsquery
defp postgresql_text_search_query_function!(:phrase), do: :phraseto_tsquery
defp postgresql_text_search_query_function!(:boolean), do: :to_tsquery
defp postgresql_text_search_query_function!(mode) do
raise ArgumentError, "postgresql text_search_rank/3 does not support mode #{inspect(mode)}"
end
defp mysql_rank_mode_sql(selecto, mode) do
adapter = Map.get(selecto, :adapter)
case mode do
nil ->
" IN NATURAL LANGUAGE MODE"
:web ->
" IN NATURAL LANGUAGE MODE"
:websearch ->
" IN NATURAL LANGUAGE MODE"
:plain ->
" IN NATURAL LANGUAGE MODE"
:natural ->
" IN NATURAL LANGUAGE MODE"
:boolean ->
if Selecto.AdapterSupport.supports_feature?(adapter, :text_search_boolean) do
" IN BOOLEAN MODE"
else
raise ArgumentError, "mysql text_search_rank/3 requires boolean text search support"
end
:query_expansion ->
if Selecto.AdapterSupport.supports_feature?(adapter, :text_search_query_expansion) do
" IN NATURAL LANGUAGE MODE WITH QUERY EXPANSION"
else
raise ArgumentError,
"mysql text_search_rank/3 requires query expansion text search support"
end
:phrase ->
raise ArgumentError, "mysql text_search_rank/3 does not support :phrase"
other ->
raise ArgumentError, "mysql text_search_rank/3 does not support mode #{inspect(other)}"
end
end
defp escape_sql_literal(value) when is_binary(value), do: String.replace(value, "'", "''")
defp safe_existing_atom(value) when is_binary(value) do
try do
String.to_existing_atom(value)
rescue
ArgumentError -> nil
end
end
defp safe_existing_atom(_value), do: nil
@doc """
Add a VALUES clause to create an inline table from literal data.
VALUES clauses allow creating inline tables from literal values, useful for
data transformations, lookup tables, and testing scenarios.
## Parameters
- `selecto` - The Selecto struct
- `data` - List of data rows (lists or maps)
- `opts` - Options including `:columns` (explicit column names), `:as` (table alias),
and optional `:join` configuration to auto-join the VALUES table
## Examples
# Basic VALUES table with explicit columns
selecto
|> Selecto.with_values([
["PG", "Family Friendly", 1],
["PG-13", "Teen", 2],
["R", "Adult", 3]
],
columns: ["rating_code", "description", "sort_order"],
as: "rating_lookup"
)
# Map-based VALUES (columns inferred from keys)
selecto
|> Selecto.with_values([
%{month: 1, name: "January", days: 31},
%{month: 2, name: "February", days: 28},
%{month: 3, name: "March", days: 31}
], as: "months")
# VALUES with auto-join (fields inferred from VALUES columns)
selecto
|> Selecto.with_values([
["processing", "In Progress"],
["shipped", "In Transit"]
],
columns: ["status", "status_label"],
as: "status_labels",
join: [owner_key: :status, related_key: :status]
)
|> Selecto.select(["order_number", "status_labels.status_label"])
# Named VALUES member from domain.query_members.values
selecto
|> Selecto.with_values(:status_labels)
|> Selecto.select(["order_number", "status_labels.status_label"])
# Generated SQL:
# WITH rating_lookup (rating_code, description, sort_order) AS (
# VALUES ('PG', 'Family Friendly', 1),
# ('PG-13', 'Teen', 2),
# ('R', 'Adult', 3)
# )
"""
def with_values(selecto, data_or_member, opts \\ [])
def with_values(selecto, member_id, opts)
when (is_atom(member_id) or is_binary(member_id)) and (is_list(opts) or is_map(opts)) do
normalized_overrides = normalize_named_query_member_opts(opts)
{member_name, raw_spec} = fetch_named_query_member!(selecto, :values, member_id)
spec = normalize_named_query_member_spec(raw_spec)
rows_override =
Keyword.get(
normalized_overrides,
:rows,
Keyword.get(normalized_overrides, :data, :__missing__)
)
rows =
case rows_override do
:__missing__ -> Map.get(spec, :rows, Map.get(spec, :data))
override -> override
end
if not is_list(rows) do
raise ArgumentError,
"Named VALUES '#{member_name}' requires list rows (use :rows or :data). Got: #{inspect(rows)}"
end
alias_name =
normalized_overrides
|> Keyword.get(
:as,
Keyword.get(normalized_overrides, :alias, Keyword.get(normalized_overrides, :alias_name))
) || values_member_alias(spec) || member_name
values_opts =
[]
|> maybe_put_keyword(
:columns,
Keyword.get(normalized_overrides, :columns, Map.get(spec, :columns))
)
|> Keyword.put(:as, to_string(alias_name))
join_opts =
merge_named_join_opts(
Map.get(spec, :join),
if(Keyword.has_key?(normalized_overrides, :join),
do: Keyword.get(normalized_overrides, :join),
else: :__missing__
),
&normalize_values_join_opts/1
)
values_opts =
if join_opts == :__missing__ do
values_opts
else
Keyword.put(values_opts, :join, join_opts)
end
apply_values_clause(selecto, rows, values_opts, upsert: true)
end
def with_values(selecto, data, opts) do
apply_values_clause(selecto, data, opts, upsert: false)
end
defp apply_values_clause(selecto, data, opts, apply_opts) do
values_spec = Selecto.Advanced.ValuesClause.create_values_clause(data, opts)
selecto =
if Keyword.get(apply_opts, :upsert, false) do
upsert_values_clause(selecto, values_spec)
else
append_values_clause(selecto, values_spec)
end
maybe_join_values_clause(selecto, values_spec, Keyword.get(opts, :join))
end
defp append_values_clause(selecto, values_spec) do
current_values_clauses = Map.get(selecto.set, :values_clauses, [])
updated_values_clauses = current_values_clauses ++ [values_spec]
put_in(selecto, [Access.key(:set), :values_clauses], updated_values_clauses)
end
defp upsert_values_clause(selecto, values_spec) do
current_values_clauses = Map.get(selecto.set, :values_clauses, [])
updated_values_clauses =
current_values_clauses
|> Enum.reject(fn existing -> Map.get(existing, :alias) == values_spec.alias end)
|> Kernel.++([values_spec])
put_in(selecto, [Access.key(:set), :values_clauses], updated_values_clauses)
end
defp maybe_join_values_clause(selecto, _values_spec, join_opts)
when join_opts in [nil, false],
do: selecto
defp maybe_join_values_clause(selecto, values_spec, join_opts) do
{join_id, join_options} = build_values_join_options(values_spec, join_opts)
Selecto.join(selecto, join_id, join_options)
end
defp build_values_join_options(values_spec, true),
do: build_values_join_options(values_spec, [])
defp build_values_join_options(values_spec, join_opts) when is_map(join_opts) do
values_spec
|> build_values_join_options(normalize_values_join_opts(join_opts))
end
defp build_values_join_options(values_spec, join_opts) when is_list(join_opts) do
normalized_join_opts = normalize_values_join_opts(join_opts)
default_join_key = default_values_join_key(values_spec)
owner_key = Keyword.get(normalized_join_opts, :owner_key, default_join_key)
related_key = Keyword.get(normalized_join_opts, :related_key, owner_key)
join_id =
normalized_join_opts
|> Keyword.get(:id, values_spec.alias)
|> normalize_values_join_id()
join_options =
normalized_join_opts
|> Keyword.delete(:id)
|> Keyword.put_new(:source, values_spec.alias)
|> Keyword.put_new(:type, :left)
|> Keyword.put_new(:owner_key, owner_key)
|> Keyword.put_new(:related_key, related_key)
|> Keyword.put_new(:fields, inferred_values_join_fields(values_spec))
{join_id, join_options}
end
defp build_values_join_options(_values_spec, invalid_opts) do
raise ArgumentError,
":join option for with_values/3 must be true, false, nil, a keyword list, or a map. Got: #{inspect(invalid_opts)}"
end
defp normalize_values_join_opts(join_opts) when is_map(join_opts),
do: join_opts |> Map.to_list() |> normalize_values_join_opts()
defp normalize_values_join_opts(join_opts) when is_list(join_opts) do
Enum.map(join_opts, fn
{key, value} -> {normalize_values_join_key(key), value}
other -> other
end)
end
defp normalize_values_join_key(key) when is_atom(key), do: key
defp normalize_values_join_key(key) when is_binary(key) do
case key do
"id" -> :id
"source" -> :source
"type" -> :type
"owner_key" -> :owner_key
"related_key" -> :related_key
"on" -> :on
"fields" -> :fields
other -> other
end
end
defp normalize_values_join_key(key), do: key
defp normalize_values_join_id(join_id) when is_atom(join_id), do: join_id
defp normalize_values_join_id(join_id) when is_binary(join_id), do: String.to_atom(join_id)
defp normalize_values_join_id(join_id) do
raise ArgumentError,
"VALUES auto-join id must be an atom or string. Got: #{inspect(join_id)}"
end
defp default_values_join_key(%{columns: [first_column | _]}), do: first_column
defp default_values_join_key(%{columns: []}) do
raise ArgumentError,
"Cannot infer VALUES join key from empty columns. Provide join: [owner_key: ..., related_key: ...]."
end
defp inferred_values_join_fields(values_spec) do
Enum.reduce(values_spec.columns, %{}, fn column, acc ->
type = Map.get(values_spec.column_types, column, :string)
Map.put(acc, column, %{type: type})
end)
end
@doc """
Add JSON operations to SELECT clauses for PostgreSQL JSON/JSONB functionality.
Supports JSON path extraction, aggregation, construction, and manipulation operations.
## Parameters
- `selecto` - The Selecto instance
- `json_operations` - List of JSON operation tuples or single operation
- `opts` - Options (reserved for future use)
## Examples
# JSON path extraction
selecto
|> Selecto.json_select([
{:json_extract, "metadata", "$.category", as: "category"},
{:json_extract, "metadata", "$.specs.weight", as: "weight"}
])
# JSON aggregation
selecto
|> Selecto.json_select([
{:json_agg, "product_name", as: "products"},
{:json_object_agg, "product_id", "price", as: "price_map"}
])
|> Selecto.group_by(["category"])
# Single JSON operation
selecto
|> Selecto.json_select({:json_extract, "data", "$.status", as: "status"})
"""
def json_select(selecto, json_operations, opts \\ [])
def json_select(selecto, json_operations, _opts) when is_list(json_operations) do
# Create JSON operation specifications
json_specs =
json_operations
|> Enum.map(fn
{operation, column, path_or_opts} when is_binary(path_or_opts) ->
Selecto.Advanced.JsonOperations.create_json_operation(operation, column,
path: path_or_opts
)
{operation, column, path, opts} when is_binary(path) ->
Selecto.Advanced.JsonOperations.create_json_operation(
operation,
column,
[path: path] ++ opts
)
{operation, column, opts} when is_list(opts) ->
Selecto.Advanced.JsonOperations.create_json_operation(operation, column, opts)
{operation, column} ->
Selecto.Advanced.JsonOperations.create_json_operation(operation, column)
end)
# Add to selecto set
Selecto.QueryValidator.validate_json_specs!(selecto, json_specs)
current_json_selects = Map.get(selecto.set, :json_selects, [])
updated_json_selects = current_json_selects ++ json_specs
put_in(selecto.set[:json_selects], updated_json_selects)
end
def json_select(selecto, json_operation, opts) do
json_select(selecto, [json_operation], opts)
end
@doc """
Add JSON operations to WHERE clauses for filtering with PostgreSQL JSON/JSONB functionality.
Supports JSON containment, existence, and comparison operations.
## Parameters
- `selecto` - The Selecto instance
- `json_filters` - List of JSON filter tuples or single filter
- `opts` - Options (reserved for future use)
## Examples
# JSON containment and existence
selecto
|> Selecto.json_filter([
{:json_contains, "metadata", %{"category" => "electronics"}},
{:json_path_exists, "metadata", "$.specs.warranty"}
])
# JSON path comparison
selecto
|> Selecto.json_filter([
{:json_extract_text, "settings", "$.theme", {:=, "dark"}},
{:json_extract, "data", "$.priority", {:>, 5}}
])
# Single JSON filter
selecto
|> Selecto.json_filter({:json_exists, "tags", "electronics"})
"""
def json_filter(selecto, json_filters, opts \\ [])
def json_filter(selecto, json_filters, _opts) when is_list(json_filters) do
# Create JSON filter specifications
json_specs =
json_filters
|> Enum.map(fn
{operation, column, path_or_value, comparison} when is_binary(path_or_value) ->
Selecto.Advanced.JsonOperations.create_json_operation(operation, column,
path: path_or_value,
comparison: comparison
)
{operation, column, path}
when operation in [:json_extract, :json_extract_text, :json_exists, :json_path_exists] and
is_binary(path) ->
Selecto.Advanced.JsonOperations.create_json_operation(operation, column, path: path)
{operation, column, value} ->
Selecto.Advanced.JsonOperations.create_json_operation(operation, column, value: value)
{operation, column} ->
Selecto.Advanced.JsonOperations.create_json_operation(operation, column)
end)
# Add to selecto set
Selecto.QueryValidator.validate_json_specs!(selecto, json_specs)
current_json_filters = Map.get(selecto.set, :json_filters, [])
updated_json_filters = current_json_filters ++ json_specs
put_in(selecto.set[:json_filters], updated_json_filters)
end
def json_filter(selecto, json_filter, opts) do
json_filter(selecto, [json_filter], opts)
end
@doc """
Add JSON operations to ORDER BY clauses for sorting with PostgreSQL JSON/JSONB functionality.
## Parameters
- `selecto` - The Selecto instance
- `json_sorts` - List of JSON sort tuples or single sort
- `opts` - Options (reserved for future use)
## Examples
# Sort by JSON path values
selecto
|> Selecto.json_order_by([
{:json_extract, "metadata", "$.priority", :desc},
{:json_extract_text, "data", "$.created_at", :asc}
])
# Single JSON sort
selecto
|> Selecto.json_order_by({:json_extract, "settings", "$.sort_order"})
"""
def json_order_by(selecto, json_sorts, opts \\ [])
def json_order_by(selecto, json_sorts, _opts) when is_list(json_sorts) do
# Create JSON sort specifications
json_specs =
json_sorts
|> Enum.map(fn
{operation, column, path, direction} when is_binary(path) ->
spec =
Selecto.Advanced.JsonOperations.create_json_operation(operation, column, path: path)
{spec, direction || :asc}
{operation, column, direction} when direction in [:asc, :desc] ->
spec = Selecto.Advanced.JsonOperations.create_json_operation(operation, column)
{spec, direction}
{operation, column, path} when is_binary(path) ->
spec =
Selecto.Advanced.JsonOperations.create_json_operation(operation, column, path: path)
{spec, :asc}
{operation, column} ->
spec = Selecto.Advanced.JsonOperations.create_json_operation(operation, column)
{spec, :asc}
end)
# Add to selecto set
Selecto.QueryValidator.validate_json_specs!(
selecto,
Enum.map(json_specs, fn {spec, _direction} -> spec end)
)
current_json_sorts = Map.get(selecto.set, :json_order_by, [])
updated_json_sorts = current_json_sorts ++ json_specs
put_in(selecto.set[:json_order_by], updated_json_sorts)
end
def json_order_by(selecto, json_sort, opts) do
json_order_by(selecto, [json_sort], opts)
end
@doc """
Add a Common Table Expression (CTE) to the query using WITH clause.
CTEs provide a way to create temporary named result sets that can be
referenced within the main query, enabling query modularity and readability.
## Parameters
- `selecto` - The Selecto instance
- `name` - CTE name (must be valid SQL identifier)
- `query_builder` - Function that returns a Selecto query for the CTE
- `opts` - Options including :columns, :dependencies, and optional :join
Join shortcut options (`:join`):
- `true` - infer join keys from first declared CTE column
- keyword/map - any `Selecto.join/3` options (`:type`, `:owner_key`, `:related_key`, `:on`, `:fields`)
- `fields: :infer` - infer join fields from declared CTE columns
## Examples
# Simple CTE for filtering
selecto
|> Selecto.with_cte("active_customers", fn ->
Selecto.configure(customer_domain, connection)
|> Selecto.filter([{"active", true}])
end,
join: [owner_key: :customer_id, related_key: :customer_id]
)
|> Selecto.select(["film.title", "active_customers.first_name"])
# CTE with explicit columns
selecto
|> Selecto.with_cte("customer_stats",
fn ->
Selecto.configure(customer_domain, connection)
|> Selecto.select(["customer_id", {:func, "COUNT", ["rental_id"], as: "rental_count"}])
|> Selecto.join(:left, "rental", on: "customer.customer_id = rental.customer_id")
|> Selecto.group_by(["customer_id"])
end,
columns: ["customer_id", "rental_count"]
)
# Named CTE member from domain.query_members.ctes
selecto
|> Selecto.with_cte(:active_customers)
|> Selecto.select(["film.title", "active_customers.first_name"])
# Generated SQL:
# WITH active_customers AS (
# SELECT * FROM customer WHERE active = true
# )
# SELECT film.title, active_customers.first_name
# FROM film
# INNER JOIN active_customers ON rental.customer_id = active_customers.customer_id
"""
def with_cte(selecto, member_id) when is_atom(member_id) or is_binary(member_id) do
with_cte(selecto, member_id, [])
end
def with_cte(selecto, member_id, opts)
when (is_atom(member_id) or is_binary(member_id)) and (is_list(opts) or is_map(opts)) do
normalized_overrides = normalize_named_query_member_opts(opts)
{member_name, raw_spec} = fetch_named_query_member!(selecto, :ctes, member_id)
spec = normalize_named_query_member_spec(raw_spec)
join_opts =
merge_named_join_opts(
Map.get(spec, :join),
if(Keyword.has_key?(normalized_overrides, :join),
do: Keyword.get(normalized_overrides, :join),
else: :__missing__
),
&normalize_cte_join_opts/1
)
recursive? =
Keyword.get(normalized_overrides, :type, Map.get(spec, :type)) == :recursive or
not is_nil(Map.get(spec, :base_query)) or not is_nil(Map.get(spec, :recursive_query)) or
Keyword.has_key?(normalized_overrides, :base_query) or
Keyword.has_key?(normalized_overrides, :recursive_query)
cte_name = Keyword.get(normalized_overrides, :name, Map.get(spec, :name, member_name))
cte_spec =
if recursive? do
base_query =
Keyword.get(normalized_overrides, :base_query, Map.get(spec, :base_query))
|> wrap_named_base_query!(selecto, member_name)
recursive_query =
Keyword.get(normalized_overrides, :recursive_query, Map.get(spec, :recursive_query))
|> wrap_named_recursive_query!(selecto, member_name)
recursive_opts =
[]
|> maybe_put_keyword(
:columns,
Keyword.get(normalized_overrides, :columns, Map.get(spec, :columns))
)
|> maybe_put_keyword(
:dependencies,
Keyword.get(normalized_overrides, :dependencies, Map.get(spec, :dependencies))
)
|> maybe_put_keyword(
:max_depth,
Keyword.get(normalized_overrides, :max_depth, Map.get(spec, :max_depth))
)
|> maybe_put_keyword(
:cycle_detection,
Keyword.get(normalized_overrides, :cycle_detection, Map.get(spec, :cycle_detection))
)
|> Keyword.put(:base_query, base_query)
|> Keyword.put(:recursive_query, recursive_query)
Selecto.Advanced.CTE.create_recursive_cte(to_string(cte_name), recursive_opts)
else
query_builder =
Keyword.get(
normalized_overrides,
:query,
Keyword.get(normalized_overrides, :query_builder)
) || Map.get(spec, :query, Map.get(spec, :query_builder))
cte_opts =
[]
|> maybe_put_keyword(
:columns,
Keyword.get(normalized_overrides, :columns, Map.get(spec, :columns))
)
|> maybe_put_keyword(
:dependencies,
Keyword.get(normalized_overrides, :dependencies, Map.get(spec, :dependencies))
)
Selecto.Advanced.CTE.create_cte(
to_string(cte_name),
wrap_named_query_builder!(query_builder, selecto, :ctes, member_name),
cte_opts
)
end
join_opts = if join_opts == :__missing__, do: nil, else: join_opts
selecto
|> upsert_cte_spec(cte_spec)
|> maybe_join_cte_spec(cte_spec, join_opts)
end
def with_cte(selecto, name, query_builder, opts \\ []) do
join_opts = Keyword.get(opts, :join)
cte_opts = Keyword.delete(opts, :join)
cte_spec = Selecto.Advanced.CTE.create_cte(name, query_builder, cte_opts)
selecto
|> append_cte_spec(cte_spec)
|> maybe_join_cte_spec(cte_spec, join_opts)
end
@doc """
Add a recursive Common Table Expression (CTE) to the query.
Recursive CTEs enable hierarchical queries by combining an anchor query
with a recursive query that references the CTE itself.
## Parameters
- `selecto` - The Selecto instance
- `name` - CTE name (must be valid SQL identifier)
- `opts` - Options with :base_query, :recursive_query, and optional :join
Join shortcut options (`:join`) follow `with_cte/4`.
## Examples
# Hierarchical employee structure
selecto
|> Selecto.with_recursive_cte("employee_hierarchy",
base_query: fn ->
# Anchor: top-level managers
Selecto.configure(employee_domain, connection)
|> Selecto.select(["employee_id", "name", "manager_id", {:literal, 0, as: "level"}])
|> Selecto.filter([{"manager_id", nil}])
end,
recursive_query: fn cte_ref ->
# Recursive: subordinates
Selecto.configure(employee_domain, connection)
|> Selecto.select(["employee.employee_id", "employee.name", "employee.manager_id",
{:func, "employee_hierarchy.level + 1", as: "level"}])
|> Selecto.join(:inner, cte_ref, on: "employee.manager_id = employee_hierarchy.employee_id")
end,
join: [owner_key: :employee_id, related_key: :employee_id]
)
|> Selecto.select([
"employee_hierarchy.employee_id",
"employee_hierarchy.name",
"employee_hierarchy.level"
])
|> Selecto.order_by([{"employee_hierarchy.level", :asc}, {"employee_hierarchy.name", :asc}])
# Generated SQL:
# WITH RECURSIVE employee_hierarchy AS (
# SELECT employee_id, name, manager_id, 0 as level
# FROM employee
# WHERE manager_id IS NULL
# UNION ALL
# SELECT employee.employee_id, employee.name, employee.manager_id, employee_hierarchy.level + 1
# FROM employee
# INNER JOIN employee_hierarchy ON employee.manager_id = employee_hierarchy.employee_id
# )
# SELECT employee_hierarchy.employee_id, employee_hierarchy.name, employee_hierarchy.level
# FROM employee
# INNER JOIN employee_hierarchy ON employee.employee_id = employee_hierarchy.employee_id
# ORDER BY employee_hierarchy.level ASC, employee_hierarchy.name ASC
"""
# Consolidated version that handles both parameter formats:
# 1. (selecto, cte_name, base_fn, recursive_fn, opts) - original inline format
# 2. (selecto, name, opts) - newer format using Advanced.CTE
def with_recursive_cte(selecto, arg2, arg3, arg4 \\ nil, arg5 \\ []) do
{cte_spec, join_opts} =
case {arg2, arg3, arg4, arg5} do
# Format 1: (selecto, cte_name, base_fn, recursive_fn, opts)
{cte_name, base_fn, recursive_fn, opts}
when is_function(base_fn) and is_function(recursive_fn) ->
normalized_opts = normalize_cte_option_list(opts)
join_opts = Keyword.get(normalized_opts, :join)
cte_opts = Keyword.delete(normalized_opts, :join)
# Inline spec format
{%{
name: cte_name,
type: :recursive,
base_query: base_fn,
recursive_query: recursive_fn,
max_depth: Keyword.get(cte_opts, :max_depth),
cycle_detection: Keyword.get(cte_opts, :cycle_detection, false),
columns: Keyword.get(cte_opts, :columns)
}, join_opts}
# Format 2: (selecto, name, opts)
{name, opts, nil, []} when is_list(opts) or is_map(opts) ->
normalized_opts = normalize_cte_option_list(opts)
join_opts = Keyword.get(normalized_opts, :join)
cte_opts = Keyword.delete(normalized_opts, :join)
# Use Advanced.CTE module
{Selecto.Advanced.CTE.create_recursive_cte(name, cte_opts), join_opts}
end
selecto
|> append_cte_spec(cte_spec)
|> maybe_join_cte_spec(cte_spec, join_opts)
end
@doc """
Add multiple CTEs to the query in a single operation.
Useful for complex queries that require multiple temporary result sets.
CTEs will be automatically ordered based on their dependencies.
## Parameters
- `selecto` - The Selecto instance
- `cte_specs` - List of CTE specifications created with create_cte/3
- `opts` - Options including :joins for auto-joining one or more CTEs
Batch join shortcut options (`:joins`):
- `true` - auto-join every provided CTE (default key inference)
- list of entries where each entry is one of:
- cte name (`"my_cte"` or `:my_cte`) to use inferred defaults
- `{name, join_opts}` tuple
- keyword/map with `:name` plus join options
## Examples
# Multiple related CTEs
active_customers_cte = Selecto.Advanced.CTE.create_cte("active_customers", fn ->
Selecto.configure(customer_domain, connection)
|> Selecto.filter([{"active", true}])
end)
high_value_cte = Selecto.Advanced.CTE.create_cte("high_value_customers", fn ->
Selecto.configure(customer_domain, connection)
|> Selecto.aggregate([{"payment.amount", :sum, as: "total_spent"}])
|> Selecto.join(:inner, "payment", on: "customer.customer_id = payment.customer_id")
|> Selecto.group_by(["customer.customer_id"])
|> Selecto.having([{"total_spent", {:>, 100}}])
end, dependencies: ["active_customers"])
selecto
|> Selecto.with_ctes([active_customers_cte, high_value_cte],
joins: [
[name: "active_customers", owner_key: :customer_id, related_key: :customer_id],
[name: "high_value_customers", owner_key: :customer_id, related_key: :customer_id]
]
)
|> Selecto.select(["film.title", "high_value_customers.total_spent"])
"""
def with_ctes(selecto, cte_specs, opts \\ []) when is_list(cte_specs) do
selecto_with_ctes = append_cte_specs(selecto, cte_specs)
apply_with_ctes_joins(selecto_with_ctes, cte_specs, Keyword.get(opts, :joins))
end
defp append_cte_spec(selecto, cte_spec), do: append_cte_specs(selecto, [cte_spec])
defp append_cte_specs(selecto, cte_specs) when is_list(cte_specs) do
current_ctes = Map.get(selecto.set, :ctes, [])
updated_ctes = current_ctes ++ cte_specs
put_in(selecto.set[:ctes], updated_ctes)
end
defp upsert_cte_spec(selecto, cte_spec) do
cte_name = cte_spec_name!(cte_spec)
current_ctes = Map.get(selecto.set, :ctes, [])
updated_ctes =
current_ctes
|> Enum.reject(fn existing -> cte_spec_name(existing) == cte_name end)
|> Kernel.++([cte_spec])
put_in(selecto.set[:ctes], updated_ctes)
end
defp maybe_join_cte_spec(selecto, _cte_spec, join_opts) when join_opts in [nil, false],
do: selecto
defp maybe_join_cte_spec(selecto, cte_spec, join_opts) do
{join_id, join_options} = build_cte_join_options(cte_spec, join_opts)
Selecto.join(selecto, join_id, join_options)
end
defp build_cte_join_options(cte_spec, true), do: build_cte_join_options(cte_spec, [])
defp build_cte_join_options(cte_spec, join_opts) when is_map(join_opts) do
cte_spec
|> build_cte_join_options(normalize_cte_join_opts(join_opts))
end
defp build_cte_join_options(cte_spec, join_opts) when is_list(join_opts) do
cte_name = cte_spec_name!(cte_spec)
normalized_join_opts = normalize_cte_join_opts(join_opts)
join_id =
normalized_join_opts
|> Keyword.get(:id, cte_name)
|> normalize_values_join_id()
join_options =
normalized_join_opts
|> Keyword.delete(:id)
|> Keyword.delete(:name)
|> Keyword.put_new(:source, cte_name)
|> Keyword.put_new(:type, :left)
|> maybe_add_default_cte_join_keys(cte_spec)
|> maybe_add_default_cte_join_fields(cte_spec)
{join_id, join_options}
end
defp build_cte_join_options(_cte_spec, invalid_opts) do
raise ArgumentError,
":join option for CTE helpers must be true, false, nil, a keyword list, or a map. Got: #{inspect(invalid_opts)}"
end
defp maybe_add_default_cte_join_keys(join_options, cte_spec) do
if Keyword.has_key?(join_options, :on) do
join_options
else
case {Keyword.get(join_options, :owner_key), Keyword.get(join_options, :related_key)} do
{owner_key, related_key} when not is_nil(owner_key) and not is_nil(related_key) ->
join_options
{owner_key, nil} when not is_nil(owner_key) ->
Keyword.put(join_options, :related_key, owner_key)
_ ->
default_join_key = default_cte_join_key(cte_spec)
owner_key = Keyword.get(join_options, :owner_key, default_join_key)
related_key = Keyword.get(join_options, :related_key, owner_key)
join_options
|> Keyword.put_new(:owner_key, owner_key)
|> Keyword.put_new(:related_key, related_key)
end
end
end
defp maybe_add_default_cte_join_fields(join_options, cte_spec) do
case Keyword.get(join_options, :fields, :__missing__) do
:infer ->
Keyword.put(join_options, :fields, inferred_cte_join_fields!(cte_spec))
:__missing__ ->
inferred_fields = inferred_cte_join_fields(cte_spec)
if inferred_fields == %{} do
join_options
else
Keyword.put(join_options, :fields, inferred_fields)
end
_ ->
join_options
end
end
defp default_cte_join_key(cte_spec) do
case cte_columns(cte_spec) do
[first_column | _] ->
first_column
[] ->
cte_name = cte_spec_name!(cte_spec)
raise ArgumentError,
"Cannot infer join keys for CTE '#{cte_name}'. Declare CTE columns or pass :owner_key/:related_key (or :on) in :join options."
end
end
defp inferred_cte_join_fields(cte_spec) do
Enum.reduce(cte_columns(cte_spec), %{}, fn column, acc ->
Map.put(acc, String.to_atom(column), %{type: :any})
end)
end
defp inferred_cte_join_fields!(cte_spec) do
inferred_fields = inferred_cte_join_fields(cte_spec)
if inferred_fields == %{} do
cte_name = cte_spec_name!(cte_spec)
raise ArgumentError,
"Cannot infer fields for CTE '#{cte_name}' because it has no declared columns. Provide fields explicitly or declare CTE columns."
else
inferred_fields
end
end
defp cte_columns(cte_spec) do
cte_spec
|> Map.get(:columns, [])
|> List.wrap()
|> Enum.map(&to_string/1)
end
defp cte_spec_name(%{name: name}) when is_binary(name), do: name
defp cte_spec_name(%{name: name}) when is_atom(name), do: Atom.to_string(name)
defp cte_spec_name(_cte_spec), do: nil
defp cte_spec_name!(%{name: name}) when is_binary(name), do: name
defp cte_spec_name!(%{name: name}) when is_atom(name), do: Atom.to_string(name)
defp cte_spec_name!(cte_spec) do
raise ArgumentError,
"CTE spec must include a string or atom :name. Got: #{inspect(cte_spec)}"
end
defp apply_with_ctes_joins(selecto, _cte_specs, joins) when joins in [nil, false, []],
do: selecto
defp apply_with_ctes_joins(selecto, cte_specs, true) do
Enum.reduce(cte_specs, selecto, fn cte_spec, acc ->
maybe_join_cte_spec(acc, cte_spec, true)
end)
end
defp apply_with_ctes_joins(selecto, cte_specs, joins) when is_list(joins) do
Enum.reduce(joins, selecto, fn join_entry, acc ->
{cte_spec, join_opts} = resolve_with_ctes_join_entry(cte_specs, join_entry)
maybe_join_cte_spec(acc, cte_spec, join_opts)
end)
end
defp apply_with_ctes_joins(_selecto, _cte_specs, invalid_joins) do
raise ArgumentError,
":joins option for with_ctes/3 must be true, false, nil, or a list. Got: #{inspect(invalid_joins)}"
end
defp resolve_with_ctes_join_entry(cte_specs, entry) when is_binary(entry) or is_atom(entry) do
{find_cte_spec!(cte_specs, entry), true}
end
defp resolve_with_ctes_join_entry(cte_specs, {name, join_opts}) do
{find_cte_spec!(cte_specs, name), join_opts}
end
defp resolve_with_ctes_join_entry(cte_specs, join_entry) when is_map(join_entry) do
resolve_with_ctes_join_entry(cte_specs, Map.to_list(join_entry))
end
defp resolve_with_ctes_join_entry(cte_specs, join_entry) when is_list(join_entry) do
normalized_entry = normalize_cte_join_opts(join_entry)
cte_name = Keyword.get(normalized_entry, :name)
if is_nil(cte_name) do
raise ArgumentError,
"Each entry in :joins for with_ctes/3 must include :name, be a cte name, or be a {name, opts} tuple. Got: #{inspect(join_entry)}"
end
join_opts = Keyword.delete(normalized_entry, :name)
{find_cte_spec!(cte_specs, cte_name), join_opts}
end
defp resolve_with_ctes_join_entry(_cte_specs, invalid_entry) do
raise ArgumentError,
"Invalid :joins entry for with_ctes/3: #{inspect(invalid_entry)}"
end
defp find_cte_spec!(cte_specs, cte_name) do
cte_name_string = to_string(cte_name)
Enum.find(cte_specs, fn cte_spec ->
cte_spec_name!(cte_spec) == cte_name_string
end) ||
raise ArgumentError,
"CTE named '#{cte_name_string}' was not found in with_ctes/3 input"
end
defp normalize_cte_option_list(opts) when is_map(opts),
do: opts |> Map.to_list() |> normalize_cte_option_list()
defp normalize_cte_option_list(opts) when is_list(opts) do
Enum.map(opts, fn
{key, value} -> {normalize_cte_option_key(key), value}
other -> other
end)
end
defp normalize_cte_option_key(key) when is_atom(key), do: key
defp normalize_cte_option_key(key) when is_binary(key) do
case key do
"base_query" -> :base_query
"recursive_query" -> :recursive_query
"columns" -> :columns
"dependencies" -> :dependencies
"join" -> :join
"joins" -> :joins
"max_depth" -> :max_depth
"cycle_detection" -> :cycle_detection
other -> other
end
end
defp normalize_cte_option_key(key), do: key
defp normalize_cte_join_opts(join_opts) when is_map(join_opts),
do: join_opts |> Map.to_list() |> normalize_cte_join_opts()
defp normalize_cte_join_opts(join_opts) when is_list(join_opts) do
Enum.map(join_opts, fn
{key, value} -> {normalize_cte_join_key(key), value}
other -> other
end)
end
defp normalize_cte_join_key(key) when is_atom(key), do: key
defp normalize_cte_join_key(key) when is_binary(key) do
case key do
"name" -> :name
"id" -> :id
"source" -> :source
"type" -> :type
"owner_key" -> :owner_key
"related_key" -> :related_key
"on" -> :on
"fields" -> :fields
other -> other
end
end
defp normalize_cte_join_key(key), do: key
defp maybe_put_keyword(keyword, _key, value) when value in [nil, :__missing__], do: keyword
defp maybe_put_keyword(keyword, key, value), do: Keyword.put(keyword, key, value)
defp normalize_named_query_member_opts(opts) when is_map(opts),
do: opts |> Map.to_list() |> normalize_named_query_member_opts()
defp normalize_named_query_member_opts(opts) when is_list(opts) do
Enum.map(opts, fn
{key, value} ->
{normalize_named_query_member_key(key), value}
other ->
raise ArgumentError,
"Named query member options must be a keyword list or map. Invalid option: #{inspect(other)}"
end)
end
defp normalize_named_query_member_opts(invalid_opts) do
raise ArgumentError,
"Named query member options must be a keyword list or map. Got: #{inspect(invalid_opts)}"
end
defp resolve_alias_name(overrides, default \\ nil) do
Keyword.get(
overrides,
:as,
Keyword.get(overrides, :alias, Keyword.get(overrides, :alias_name))
) ||
default
end
defp resolve_override_or_spec_value(overrides, spec, override_keys, spec_keys) do
Enum.find_value(override_keys, fn key -> Keyword.get(overrides, key) end) ||
Enum.find_value(spec_keys, fn key -> Map.get(spec, key) end)
end
defp merge_named_member_options(overrides, kind, member_name, drop_keys) do
override_options = Keyword.drop(overrides, drop_keys ++ [:options])
nested_options =
ensure_keyword_opts(
Keyword.get(overrides, :options, :__missing__),
kind,
member_name
)
Keyword.merge(override_options, nested_options)
end
defp normalize_named_query_member_spec(spec) when is_map(spec) do
spec
|> Enum.map(fn {key, value} -> {normalize_named_query_member_key(key), value} end)
|> Map.new()
end
defp normalize_named_query_member_spec(spec) do
raise ArgumentError,
"Named query member specifications must be maps. Got: #{inspect(spec)}"
end
defp normalize_named_query_member_key(key) when is_atom(key), do: key
defp normalize_named_query_member_key(key) when is_binary(key),
do: Map.get(@named_query_member_key_map, key, key)
defp normalize_named_query_member_key(key), do: key
defp fetch_named_query_member!(selecto, kind, member_id)
when kind in @named_query_member_kinds do
members = query_members_for_kind!(selecto, kind)
member_name = to_string(member_id)
Enum.find_value(members, fn {key, spec} ->
if to_string(key) == member_name do
{member_name, spec}
end
end) ||
raise ArgumentError,
"Named #{kind_to_label(kind)} '#{member_name}' was not found in domain query_members. Available: #{available_named_members(members)}"
end
defp query_members_for_kind!(selecto, kind) when kind in @named_query_member_kinds do
query_members =
selecto
|> Map.get(:domain, %{})
|> Map.get(:query_members, %{})
if not is_map(query_members) do
raise ArgumentError,
"domain.query_members must be a map to resolve named #{kind_to_label(kind)} members."
end
members = Map.get(query_members, kind, Map.get(query_members, Atom.to_string(kind), %{}))
if is_map(members) do
members
else
raise ArgumentError,
"domain.query_members.#{kind} must be a map of named members. Got: #{inspect(members)}"
end
end
defp available_named_members(members) do
case members |> Map.keys() |> Enum.map(&to_string/1) |> Enum.sort() do
[] -> "none"
ids -> Enum.join(ids, ", ")
end
end
defp kind_to_label(:ctes), do: "CTE"
defp kind_to_label(:values), do: "VALUES"
defp kind_to_label(:subqueries), do: "subquery"
defp kind_to_label(:laterals), do: "lateral"
defp kind_to_label(:unnests), do: "unnest"
defp map_get(map, key, alt_key) when is_map(map) do
Map.get(map, key, Map.get(map, alt_key))
end
defp map_get(map, key, alt_key1, alt_key2) when is_map(map) do
Map.get(map, key, Map.get(map, alt_key1, Map.get(map, alt_key2)))
end
defp normalize_lateral_source!(source, selecto, member_name) do
case source do
source when is_tuple(source) ->
source
%Selecto{} = query ->
fn _base_query -> query end
query_builder when is_function(query_builder, 0) ->
fn _base_query ->
result = query_builder.()
if match?(%Selecto{}, result) do
result
else
raise ArgumentError,
"Named lateral '#{member_name}' query function must return a Selecto struct. Got: #{inspect(result)}"
end
end
query_builder when is_function(query_builder, 1) ->
fn base_query ->
result = query_builder.(base_query)
if match?(%Selecto{}, result) do
result
else
raise ArgumentError,
"Named lateral '#{member_name}' query function must return a Selecto struct. Got: #{inspect(result)}"
end
end
query_builder when is_function(query_builder, 2) ->
fn base_query ->
result = query_builder.(selecto, base_query)
if match?(%Selecto{}, result) do
result
else
raise ArgumentError,
"Named lateral '#{member_name}' query function must return a Selecto struct. Got: #{inspect(result)}"
end
end
nil ->
raise ArgumentError,
"Named lateral '#{member_name}' requires :query, :source, or :lateral_source."
invalid ->
raise ArgumentError,
"Named lateral '#{member_name}' source must be a tuple, Selecto struct, or function with arity 0/1/2. Got: #{inspect(invalid)}"
end
end
defp normalize_lateral_join_type!(join_type, _member_name)
when join_type in [:left, :inner, :right, :full],
do: join_type
defp normalize_lateral_join_type!(join_type, member_name) do
raise ArgumentError,
"Named lateral '#{member_name}' join type must be one of :left, :inner, :right, :full. Got: #{inspect(join_type)}"
end
defp ensure_keyword_opts(nil, _kind, _member_name), do: []
defp ensure_keyword_opts(:__missing__, _kind, _member_name), do: []
defp ensure_keyword_opts(opts, _kind, _member_name) when is_list(opts), do: opts
defp ensure_keyword_opts(opts, _kind, _member_name) when is_map(opts), do: Map.to_list(opts)
defp ensure_keyword_opts(opts, kind, member_name) do
raise ArgumentError,
"Named #{kind_to_label(kind)} '#{member_name}' expects :options to be a keyword list or map. Got: #{inspect(opts)}"
end
defp evaluate_named_query_member_query!(query_source, selecto, kind, member_name) do
query =
case query_source do
%Selecto{} = query ->
query
query_builder when is_function(query_builder, 0) ->
query_builder.()
query_builder when is_function(query_builder, 1) ->
query_builder.(selecto)
nil ->
raise ArgumentError,
"Named #{kind_to_label(kind)} '#{member_name}' requires a query source (:query or :query_builder)."
invalid ->
raise ArgumentError,
"Named #{kind_to_label(kind)} '#{member_name}' query source must be a Selecto struct or function with arity 0/1. Got: #{inspect(invalid)}"
end
if match?(%Selecto{}, query) do
query
else
raise ArgumentError,
"Named #{kind_to_label(kind)} '#{member_name}' query builder must return a Selecto struct. Got: #{inspect(query)}"
end
end
defp wrap_named_query_builder!(query_source, selecto, kind, member_name) do
case query_source do
%Selecto{} = query ->
fn -> query end
query_builder when is_function(query_builder, 0) ->
fn ->
result = query_builder.()
if match?(%Selecto{}, result) do
result
else
raise ArgumentError,
"Named #{kind_to_label(kind)} '#{member_name}' query builder must return a Selecto struct. Got: #{inspect(result)}"
end
end
query_builder when is_function(query_builder, 1) ->
fn ->
result = query_builder.(selecto)
if match?(%Selecto{}, result) do
result
else
raise ArgumentError,
"Named #{kind_to_label(kind)} '#{member_name}' query builder must return a Selecto struct. Got: #{inspect(result)}"
end
end
nil ->
raise ArgumentError,
"Named #{kind_to_label(kind)} '#{member_name}' requires :query or :query_builder."
invalid ->
raise ArgumentError,
"Named #{kind_to_label(kind)} '#{member_name}' query source must be a Selecto struct or function with arity 0/1. Got: #{inspect(invalid)}"
end
end
defp wrap_named_base_query!(base_query, selecto, member_name) do
case base_query do
query_builder when is_function(query_builder, 0) ->
fn ->
result = query_builder.()
if match?(%Selecto{}, result) do
result
else
raise ArgumentError,
"Named CTE '#{member_name}' base_query must return a Selecto struct. Got: #{inspect(result)}"
end
end
query_builder when is_function(query_builder, 1) ->
fn ->
result = query_builder.(selecto)
if match?(%Selecto{}, result) do
result
else
raise ArgumentError,
"Named CTE '#{member_name}' base_query must return a Selecto struct. Got: #{inspect(result)}"
end
end
invalid ->
raise ArgumentError,
"Named CTE '#{member_name}' requires :base_query function with arity 0 or 1. Got: #{inspect(invalid)}"
end
end
defp wrap_named_recursive_query!(recursive_query, selecto, member_name) do
case recursive_query do
query_builder when is_function(query_builder, 1) ->
fn cte_ref ->
result = query_builder.(cte_ref)
if match?(%Selecto{}, result) do
result
else
raise ArgumentError,
"Named CTE '#{member_name}' recursive_query must return a Selecto struct. Got: #{inspect(result)}"
end
end
query_builder when is_function(query_builder, 2) ->
fn cte_ref ->
result = query_builder.(selecto, cte_ref)
if match?(%Selecto{}, result) do
result
else
raise ArgumentError,
"Named CTE '#{member_name}' recursive_query must return a Selecto struct. Got: #{inspect(result)}"
end
end
invalid ->
raise ArgumentError,
"Named CTE '#{member_name}' requires :recursive_query function with arity 1 or 2. Got: #{inspect(invalid)}"
end
end
defp merge_named_join_opts(default_join_opts, :__missing__, normalizer_fun) do
normalize_named_join_opts(default_join_opts, normalizer_fun)
end
defp merge_named_join_opts(_default_join_opts, override_join_opts, _normalizer_fun)
when override_join_opts in [nil, false, true],
do: override_join_opts
defp merge_named_join_opts(default_join_opts, override_join_opts, normalizer_fun)
when is_list(override_join_opts) or is_map(override_join_opts) do
override_join_opts = normalize_named_join_opts(override_join_opts, normalizer_fun)
default_join_opts = normalize_named_join_opts(default_join_opts, normalizer_fun)
if is_list(default_join_opts) do
Keyword.merge(default_join_opts, override_join_opts)
else
override_join_opts
end
end
defp merge_named_join_opts(_default_join_opts, invalid_join_opts, _normalizer_fun) do
raise ArgumentError,
"Named query member :join must be true, false, nil, a keyword list, or a map. Got: #{inspect(invalid_join_opts)}"
end
defp normalize_named_join_opts(join_opts, _normalizer_fun)
when join_opts in [nil, false, true, :__missing__],
do: join_opts
defp normalize_named_join_opts(join_opts, normalizer_fun)
when is_list(join_opts) or is_map(join_opts),
do: normalizer_fun.(join_opts)
defp normalize_named_join_opts(invalid_join_opts, _normalizer_fun) do
raise ArgumentError,
"Named query member :join must be true, false, nil, a keyword list, or a map. Got: #{inspect(invalid_join_opts)}"
end
defp values_member_alias(spec) when is_map(spec) do
Map.get(spec, :as) || Map.get(spec, :alias) || Map.get(spec, :alias_name)
end
defp normalize_subquery_on(nil), do: nil
defp normalize_subquery_on(:__missing__), do: nil
defp normalize_subquery_on(on_conditions) when is_list(on_conditions) do
Enum.map(on_conditions, fn
condition when is_map(condition) ->
normalize_subquery_on_condition(condition)
condition when is_list(condition) ->
condition |> Map.new() |> normalize_subquery_on_condition()
invalid ->
raise ArgumentError, "Invalid subquery :on condition: #{inspect(invalid)}"
end)
end
defp normalize_subquery_on(invalid_on) do
raise ArgumentError,
"Subquery :on option must be a list of conditions. Got: #{inspect(invalid_on)}"
end
defp normalize_subquery_on_condition(condition) do
left = Map.get(condition, :left, Map.get(condition, "left"))
right = Map.get(condition, :right, Map.get(condition, "right"))
operator = Map.get(condition, :operator, Map.get(condition, "operator"))
if is_nil(left) or is_nil(right) do
raise ArgumentError,
"Subquery :on conditions must include :left and :right. Got: #{inspect(condition)}"
end
result = %{left: left, right: right}
if is_nil(operator) do
result
else
Map.put(result, :operator, operator)
end
end
@doc """
Add a simple CASE expression to the select fields.
Simple CASE expressions test a column against specific values and return
corresponding results. This is useful for data transformation and categorization.
## Parameters
- `selecto` - The Selecto instance
- `column` - Column to test against
- `when_clauses` - List of {value, result} tuples for WHEN conditions
- `opts` - Options including :else and :as
## Examples
# Simple CASE for film ratings
selecto
|> Selecto.case_select("film.rating", [
{"G", "General Audience"},
{"PG", "Parental Guidance"},
{"PG-13", "Parents Strongly Cautioned"},
{"R", "Restricted"}
], else: "Not Rated", as: "rating_description")
|> Selecto.select(["film.title", "rating_description"])
# Generated SQL:
# SELECT film.title,
# CASE film.rating
# WHEN 'G' THEN 'General Audience'
# WHEN 'PG' THEN 'Parental Guidance'
# WHEN 'PG-13' THEN 'Parents Strongly Cautioned'
# WHEN 'R' THEN 'Restricted'
# ELSE 'Not Rated'
# END AS rating_description
"""
def case_select(selecto, column, when_clauses, opts \\ []) do
# Create CASE specification
case_spec = Selecto.Advanced.CaseExpression.create_simple_case(column, when_clauses, opts)
# Add to select fields
case_field = {:case, case_spec}
select(selecto, case_field)
end
@doc """
Add a searched CASE expression to the select fields.
Searched CASE expressions evaluate multiple conditions and return results
based on the first true condition. This enables complex conditional logic.
## Parameters
- `selecto` - The Selecto instance
- `when_clauses` - List of {conditions, result} tuples
- `opts` - Options including :else and :as
## Examples
# Customer tier based on payment totals
selecto
|> Selecto.case_when_select([
{[{"payment_total", {:>, 100}}], "Premium"},
{[{"payment_total", {:between, 50, 100}}], "Standard"},
{[{"payment_total", {:>, 0}}], "Basic"}
], else: "No Purchases", as: "customer_tier")
|> Selecto.select(["customer.first_name", "customer_tier"])
# Multiple conditions per WHEN clause
selecto
|> Selecto.case_when_select([
{[{"film.rating", "R"}, {"film.length", {:>, 120}}], "Long Adult Film"},
{[{"film.rating", "G"}, {"film.special_features", {:like, "%Family%"}}], "Family Film"}
], else: "Regular Film", as: "film_category")
# Generated SQL:
# SELECT customer.first_name,
# CASE
# WHEN payment_total > $1 THEN $2
# WHEN payment_total BETWEEN $3 AND $4 THEN $5
# WHEN payment_total > $6 THEN $7
# ELSE $8
# END AS customer_tier
"""
def case_when_select(selecto, when_clauses, opts \\ []) do
# Create CASE specification
case_spec = Selecto.Advanced.CaseExpression.create_searched_case(when_clauses, opts)
# Add to select fields
case_field = {:case_when, case_spec}
select(selecto, case_field)
end
@doc """
Add array aggregation operations to select fields.
Supports ARRAY_AGG, STRING_AGG, and other array aggregation functions
with optional DISTINCT, ORDER BY, and filtering.
## Parameters
- `selecto` - The Selecto instance
- `array_operations` - List of array operation tuples or single operation
- `opts` - Additional options
## Examples
# Simple array aggregation
selecto
|> Selecto.array_select({:array_agg, "film.title", as: "film_titles"})
# Array aggregation with DISTINCT and ORDER BY
selecto
|> Selecto.array_select({:array_agg, "actor.name",
distinct: true,
order_by: [{"actor.last_name", :asc}],
as: "unique_actors"})
# String aggregation with custom delimiter
selecto
|> Selecto.array_select({:string_agg, "tag.name",
delimiter: ", ",
as: "tag_list"})
# Array length operation
selecto
|> Selecto.array_select({:array_length, "tags", 1, as: "tag_count"})
"""
def array_select(selecto, array_operations, opts \\ [])
def array_select(selecto, array_operations, _opts) when is_list(array_operations) do
# Create array operation specifications
array_specs =
array_operations
|> Enum.map(fn
# Aggregation operations
{:array_agg, column, opts} ->
Selecto.Advanced.ArrayOperations.create_array_operation(:array_agg, column, opts)
{:array_agg_distinct, column, opts} ->
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_agg_distinct,
column,
opts
)
{:string_agg, column, opts} ->
Selecto.Advanced.ArrayOperations.create_array_operation(:string_agg, column, opts)
# Size operations with dimension
{:array_length, column, dimension, opts} ->
Selecto.Advanced.ArrayOperations.create_array_size(
:array_length,
column,
dimension,
opts
)
# Size operations without dimension
{:cardinality, column, opts} ->
Selecto.Advanced.ArrayOperations.create_array_size(:cardinality, column, nil, opts)
{:array_ndims, column, opts} ->
Selecto.Advanced.ArrayOperations.create_array_size(:array_ndims, column, nil, opts)
{:array_dims, column, opts} ->
Selecto.Advanced.ArrayOperations.create_array_size(:array_dims, column, nil, opts)
# Array construction/manipulation with value
{:array_append, column, value, opts} ->
spec_opts = Keyword.put(opts, :value, value)
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_append,
column,
spec_opts
)
{:array_prepend, column, value, opts} ->
spec_opts = Keyword.put(opts, :value, value)
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_prepend,
column,
spec_opts
)
{:array_remove, column, value, opts} ->
spec_opts = Keyword.put(opts, :value, value)
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_remove,
column,
spec_opts
)
{:array_replace, column, old_value, new_value, opts} ->
spec_opts = opts |> Keyword.put(:value, old_value) |> Keyword.put(:new_value, new_value)
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_replace,
column,
spec_opts
)
{:array_cat, column, value, opts} ->
spec_opts = Keyword.put(opts, :value, value)
Selecto.Advanced.ArrayOperations.create_array_operation(:array_cat, column, spec_opts)
{:array_position, column, value, opts} ->
spec_opts = Keyword.put(opts, :value, value)
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_position,
column,
spec_opts
)
{:array_positions, column, value, opts} ->
spec_opts = Keyword.put(opts, :value, value)
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_positions,
column,
spec_opts
)
# Array transformation operations
{:array_to_string, column, delimiter, opts} ->
spec_opts = Keyword.put(opts, :value, delimiter)
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_to_string,
column,
spec_opts
)
{:string_to_array, column, delimiter, opts} ->
spec_opts = Keyword.put(opts, :value, delimiter)
Selecto.Advanced.ArrayOperations.create_array_operation(
:string_to_array,
column,
spec_opts
)
# Array constructor (no column)
{:array, elements, opts} ->
spec_opts = Keyword.put(opts, :value, elements)
Selecto.Advanced.ArrayOperations.create_array_operation(:array, nil, spec_opts)
# Generic pattern for operations with column and options
{operation, column, opts} when is_atom(operation) and is_list(opts) ->
Selecto.Advanced.ArrayOperations.create_array_operation(operation, column, opts)
_ = spec ->
spec
end)
# Add to selecto set
current_array_ops = Map.get(selecto.set, :array_operations, [])
updated_array_ops = current_array_ops ++ array_specs
put_in(selecto.set[:array_operations], updated_array_ops)
end
def array_select(selecto, array_operation, opts) do
array_select(selecto, [array_operation], opts)
end
@doc """
Add array filtering operations to WHERE clauses.
Supports array containment, overlap, and equality operations.
## Parameters
- `selecto` - The Selecto instance
- `array_filters` - List of array filter tuples or single filter
- `opts` - Additional options
## Examples
# Array contains
selecto
|> Selecto.array_filter({:array_contains, "tags", ["featured", "new"]})
# Array overlap (has any of the elements)
selecto
|> Selecto.array_filter({:array_overlap, "categories", ["electronics", "computers"]})
# Array contained by
selecto
|> Selecto.array_filter({:array_contained, "permissions", ["read", "write", "admin"]})
# Multiple filters
selecto
|> Selecto.array_filter([
{:array_contains, "special_features", ["Trailers"]},
{:array_overlap, "languages", ["English", "Spanish"]}
])
"""
def array_filter(selecto, array_filters, opts \\ [])
def array_filter(selecto, array_filters, _opts) when is_list(array_filters) do
# Create array filter specifications
array_specs =
array_filters
|> Enum.map(fn
{operation, column, value} ->
Selecto.Advanced.ArrayOperations.create_array_filter(operation, column, value)
_ = spec ->
spec
end)
# Add to selecto set filters
Selecto.QueryValidator.validate_array_specs!(selecto, array_specs)
current_filters = Map.get(selecto.set, :array_filters, [])
updated_filters = current_filters ++ array_specs
put_in(selecto.set[:array_filters], updated_filters)
end
def array_filter(selecto, array_filter, opts) do
array_filter(selecto, [array_filter], opts)
end
@doc """
Add array manipulation operations to select fields.
Supports array construction, modification, and transformation operations.
## Parameters
- `selecto` - The Selecto instance
- `array_operations` - List of array manipulation operations
- `opts` - Additional options
## Examples
# Array append
selecto
|> Selecto.array_manipulate({:array_append, "tags", "new-tag", as: "updated_tags"})
# Array remove
selecto
|> Selecto.array_manipulate({:array_remove, "tags", "deprecated", as: "cleaned_tags"})
# Array to string
selecto
|> Selecto.array_manipulate({:array_to_string, "tags", ", ", as: "tag_string"})
"""
def array_manipulate(selecto, array_operations, opts \\ [])
def array_manipulate(selecto, array_operations, _opts) when is_list(array_operations) do
# Create array operation specifications
array_specs =
array_operations
|> Enum.map(fn
{:array_append, column, value, opts} ->
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_append,
column,
Keyword.put(opts, :value, value)
)
{:array_prepend, column, value, opts} ->
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_prepend,
column,
Keyword.put(opts, :value, value)
)
{:array_remove, column, value, opts} ->
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_remove,
column,
Keyword.put(opts, :value, value)
)
{:array_replace, column, old_value, new_value, opts} ->
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_replace,
column,
opts |> Keyword.put(:value, old_value) |> Keyword.put(:new_value, new_value)
)
{:array_to_string, column, delimiter, opts} ->
Selecto.Advanced.ArrayOperations.create_array_operation(
:array_to_string,
column,
Keyword.put(opts, :value, delimiter)
)
{:string_to_array, column, delimiter, opts} ->
Selecto.Advanced.ArrayOperations.create_array_operation(
:string_to_array,
column,
Keyword.put(opts, :value, delimiter)
)
{operation, column, opts} when is_atom(operation) ->
Selecto.Advanced.ArrayOperations.create_array_operation(operation, column, opts)
_ = spec ->
spec
end)
# Add to selecto set
Selecto.QueryValidator.validate_array_specs!(selecto, array_specs)
current_array_ops = Map.get(selecto.set, :array_operations, [])
updated_array_ops = current_array_ops ++ array_specs
put_in(selecto.set[:array_operations], updated_array_ops)
end
def array_manipulate(selecto, array_operation, opts) do
array_manipulate(selecto, [array_operation], opts)
end
end