defmodule PgRest.QueryPipeline do
@moduledoc """
Composes query execution through a pipeline:
base_query -> scope -> URL filters -> handle_param -> select -> order -> paginate -> execute -> after_load
"""
import Ecto.Query
alias Ecto.Adapters.SQL
alias PgRest.{Filter, Order, Parser, Select, TypeCaster}
@doc """
Executes a read (GET) pipeline: parse params, apply scope/filters/select/order/pagination, fetch records.
Returns `{:ok, records}` or `{:ok, records, range_info}` when count mode is requested.
"""
@spec execute_read(module(), map(), map(), keyword()) ::
{:ok, [map()]} | {:ok, [map()], map()} | {:error, term()}
def execute_read(resource_module, params, context, pipeline_opts \\ []) when is_map(params) do
repo = Map.fetch!(context, :repo)
meta = %{resource: resource_module, operation: :read, repo: repo}
:telemetry.span([:pgrest, :query], meta, fn ->
config = resource_module.__pgrest_config__()
max_limit = Keyword.get(pipeline_opts, :max_limit)
prefer = Keyword.get(pipeline_opts, :prefer, %{})
result =
with {:ok, parsed} <-
Parser.parse(params, allowed_fields: config.fields, max_limit: max_limit),
{:ok, cast_filters} <- TypeCaster.cast_filters(parsed.filters, resource_module) do
embed_filters = Map.get(parsed, :embed_filters, %{})
embed_options = Map.get(parsed, :embed_options, %{})
base_query =
resource_module
|> resource_module.scope(context)
|> apply_custom_params(resource_module, parsed.custom_params, context)
|> Filter.apply_all(cast_filters)
|> Select.apply_select(parsed.select, resource_module, embed_filters, embed_options)
|> Order.apply_order(parsed.order)
paginated_query = apply_pagination(base_query, parsed.limit, parsed.offset)
records =
repo.all(paginated_query)
|> Enum.map(&resource_module.after_load(&1, context))
count_mode = Map.get(prefer, :count)
build_response(records, count_mode, parsed.offset, base_query, repo, max_limit)
end
{result, meta}
end)
end
@doc """
Executes a create (POST) pipeline: build changeset, insert, apply after_load.
"""
@spec execute_create(module(), map(), map()) :: {:ok, map()} | {:error, Ecto.Changeset.t()}
def execute_create(resource_module, attrs, context) when is_map(attrs) do
repo = Map.fetch!(context, :repo)
meta = %{resource: resource_module, operation: :create, repo: repo}
:telemetry.span([:pgrest, :query], meta, fn ->
struct = struct(resource_module)
changeset = resource_module.changeset(struct, attrs, context)
result =
case repo.insert(changeset) do
{:ok, record} ->
{:ok, resource_module.after_load(record, context)}
{:error, changeset} ->
{:error, changeset}
end
{result, meta}
end)
end
@doc """
Executes an update (PATCH) pipeline: find by PK within scope, apply changeset, update.
"""
@spec execute_update(module(), term(), map(), map()) ::
{:ok, map()} | {:error, :not_found | Ecto.Changeset.t()}
def execute_update(resource_module, id, attrs, context) when is_map(attrs) do
repo = Map.fetch!(context, :repo)
meta = %{resource: resource_module, operation: :update, repo: repo}
:telemetry.span([:pgrest, :query], meta, fn ->
result = do_update(resource_module, id, attrs, context, repo)
{result, meta}
end)
end
defp do_update(resource_module, id, attrs, context, repo) do
config = resource_module.__pgrest_config__()
[pk_field] = config.primary_key
query =
resource_module
|> resource_module.scope(context)
|> where([r], field(r, ^pk_field) == ^id)
case repo.one(query) do
nil ->
{:error, :not_found}
record ->
changeset = resource_module.changeset(record, attrs, context)
case repo.update(changeset) do
{:ok, updated} ->
{:ok, resource_module.after_load(updated, context)}
{:error, changeset} ->
{:error, changeset}
end
end
end
@doc """
Executes a delete (DELETE) pipeline: find by PK within scope, delete.
"""
@spec execute_delete(module(), term(), map()) ::
{:ok, map()} | {:error, :not_found | Ecto.Changeset.t()}
def execute_delete(resource_module, id, context) do
repo = Map.fetch!(context, :repo)
meta = %{resource: resource_module, operation: :delete, repo: repo}
:telemetry.span([:pgrest, :query], meta, fn ->
result = do_delete(resource_module, id, context, repo)
{result, meta}
end)
end
defp do_delete(resource_module, id, context, repo) do
config = resource_module.__pgrest_config__()
[pk_field] = config.primary_key
query =
resource_module
|> resource_module.scope(context)
|> where([r], field(r, ^pk_field) == ^id)
case repo.one(query) do
nil ->
{:error, :not_found}
record ->
case repo.delete(record) do
{:ok, deleted} -> {:ok, deleted}
{:error, changeset} -> {:error, changeset}
end
end
end
@doc """
Bulk insert multiple records using Ecto.Multi for individual changeset validation.
Returns {:ok, records} or {:error, index, changeset}.
"""
@spec execute_bulk_create(module(), [map()], map(), keyword()) ::
{:ok, [map()]} | {:error, non_neg_integer(), Ecto.Changeset.t()}
def execute_bulk_create(resource_module, attrs_list, context, _opts \\ [])
when is_list(attrs_list) do
repo = Map.fetch!(context, :repo)
meta = %{resource: resource_module, operation: :bulk_create, repo: repo}
:telemetry.span([:pgrest, :query], meta, fn ->
result = do_bulk_create(resource_module, attrs_list, context, repo)
{result, meta}
end)
end
defp do_bulk_create(resource_module, attrs_list, context, repo) do
multi =
attrs_list
|> Enum.with_index()
|> Enum.reduce(Ecto.Multi.new(), fn {attrs, idx}, multi ->
changeset = resource_module.changeset(struct(resource_module), attrs, context)
Ecto.Multi.insert(multi, {:insert, idx}, changeset)
end)
case repo.transaction(multi) do
{:ok, results} ->
records =
results
|> Enum.sort_by(fn {{:insert, idx}, _} -> idx end)
|> Enum.map(fn {_, record} -> resource_module.after_load(record, context) end)
{:ok, records}
{:error, {:insert, idx}, changeset, _changes} ->
{:error, idx, changeset}
end
end
@doc """
Bulk update records matching query filters.
Returns {:ok, count} or {:ok, count, records} when returning.
"""
@spec execute_bulk_update(module(), map(), map(), map(), keyword()) ::
{:ok, non_neg_integer()} | {:ok, non_neg_integer(), [map()]} | {:error, term()}
def execute_bulk_update(resource_module, params, attrs, context, opts \\ []) do
repo = Map.fetch!(context, :repo)
meta = %{resource: resource_module, operation: :bulk_update, repo: repo}
:telemetry.span([:pgrest, :query], meta, fn ->
config = resource_module.__pgrest_config__()
return_records? = Keyword.get(opts, :return, false)
result =
with {:ok, parsed} <- Parser.parse(params, allowed_fields: config.fields),
{:ok, cast_filters} <- TypeCaster.cast_filters(parsed.filters, resource_module) do
query =
resource_module
|> resource_module.scope(context)
|> Filter.apply_all(cast_filters)
updates = normalize_updates(attrs, config.fields)
run_update_all(repo, query, updates, return_records?, resource_module, context)
end
{result, meta}
end)
end
defp run_update_all(repo, query, updates, true, resource_module, context) do
query_with_select = select(query, [r], r)
case repo.update_all(query_with_select, set: updates) do
{count, records} when is_list(records) ->
records = Enum.map(records, &resource_module.after_load(&1, context))
{:ok, count, records}
{count, nil} ->
{:ok, count, []}
end
end
defp run_update_all(repo, query, updates, false, _resource_module, _context) do
{count, _} = repo.update_all(query, set: updates)
{:ok, count}
end
@doc """
Bulk delete records matching query filters.
Returns {:ok, count} or {:ok, count, records} when returning.
"""
@spec execute_bulk_delete(module(), map(), map(), keyword()) ::
{:ok, non_neg_integer()} | {:ok, non_neg_integer(), [map()]} | {:error, term()}
def execute_bulk_delete(resource_module, params, context, opts \\ []) do
repo = Map.fetch!(context, :repo)
meta = %{resource: resource_module, operation: :bulk_delete, repo: repo}
:telemetry.span([:pgrest, :query], meta, fn ->
config = resource_module.__pgrest_config__()
return_records? = Keyword.get(opts, :return, false)
result =
with {:ok, parsed} <- Parser.parse(params, allowed_fields: config.fields),
{:ok, cast_filters} <- TypeCaster.cast_filters(parsed.filters, resource_module) do
query =
resource_module
|> resource_module.scope(context)
|> Filter.apply_all(cast_filters)
run_delete_all(repo, query, return_records?, resource_module, context)
end
{result, meta}
end)
end
defp run_delete_all(repo, query, true, resource_module, context) do
query_with_select = select(query, [r], r)
case repo.delete_all(query_with_select) do
{count, records} when is_list(records) ->
records = Enum.map(records, &resource_module.after_load(&1, context))
{:ok, count, records}
{count, nil} ->
{:ok, count, []}
end
end
defp run_delete_all(repo, query, false, _resource_module, _context) do
{count, _} = repo.delete_all(query)
{:ok, count}
end
@doc """
Upsert records using Ecto's on_conflict support.
Returns {:ok, count} or {:ok, count, records} when returning.
"""
@spec execute_upsert(module(), map() | [map()], map(), keyword()) ::
{:ok, non_neg_integer()} | {:ok, non_neg_integer(), [map()]} | {:error, term()}
def execute_upsert(resource_module, attrs_list, context, opts \\ []) do
repo = Map.fetch!(context, :repo)
meta = %{resource: resource_module, operation: :upsert, repo: repo}
:telemetry.span([:pgrest, :query], meta, fn ->
config = resource_module.__pgrest_config__()
return_records? = Keyword.get(opts, :return, false)
resolution = Keyword.get(opts, :resolution, :merge_duplicates)
on_conflict_param = Keyword.get(opts, :on_conflict)
missing_default? = Keyword.get(opts, :missing_default, false)
attrs_list = if is_map(attrs_list), do: [attrs_list], else: attrs_list
on_conflict = build_on_conflict(resolution, config)
conflict_target = build_conflict_target(on_conflict_param, config)
entries =
attrs_list
|> Enum.map(&normalize_keys(&1, config.fields))
|> maybe_apply_missing_default(missing_default?, config.fields)
insert_opts =
[on_conflict: on_conflict, conflict_target: conflict_target]
|> maybe_add_returning(return_records?)
result =
try do
{count, records} = repo.insert_all(resource_module, entries, insert_opts)
if return_records? do
records = (records || []) |> Enum.map(&resource_module.after_load(&1, context))
{:ok, count, records}
else
{:ok, count}
end
rescue
e in [Ecto.ConstraintError, Postgrex.Error] ->
{:error, Exception.message(e)}
end
{result, meta}
end)
end
@doc """
Converts string-keyed JSON attrs to atom-keyed maps, filtering to known schema fields.
"""
@spec normalize_keys(map(), [atom()]) :: map()
def normalize_keys(attrs, fields) when is_map(attrs) do
field_strings = Enum.map(fields, &Atom.to_string/1)
attrs
|> Enum.filter(fn {k, _v} -> k in field_strings end)
|> Enum.map(fn {k, v} -> {String.to_existing_atom(k), v} end)
|> Map.new()
end
defp build_response(records, nil, _offset, _base_query, _repo, _max_limit) do
{:ok, records}
end
defp build_response(records, count_mode, offset, base_query, repo, max_limit) do
page_count = length(records)
offset = offset || 0
total = get_total_count(repo, base_query, count_mode, page_count, max_limit)
range_info = %{
offset: offset,
count: page_count,
total: total
}
{:ok, records, range_info}
end
defp apply_custom_params(query, resource_module, custom_params, context) do
Enum.reduce(custom_params, query, fn {key, value}, acc ->
resource_module.handle_param(key, value, acc, context)
end)
end
defp apply_pagination(query, nil, nil), do: query
defp apply_pagination(query, limit, nil) when is_integer(limit) do
limit(query, ^limit)
end
defp apply_pagination(query, nil, offset) when is_integer(offset) do
offset(query, ^offset)
end
defp apply_pagination(query, limit, offset) when is_integer(limit) and is_integer(offset) do
query
|> limit(^limit)
|> offset(^offset)
end
# PostgREST count modes:
# - exact: Always runs COUNT(*)
# - planned: Uses EXPLAIN to get planner row estimate
# - estimated: Hybrid — exact if page_count < max_limit, otherwise max(page_count, planned)
defp get_total_count(repo, query, :exact, _page_count, _max_limit) do
count_query = exclude(query, :order_by) |> exclude(:select)
repo.aggregate(count_query, :count)
end
defp get_total_count(repo, query, :planned, _page_count, _max_limit) do
get_planned_count(repo, query)
end
# PostgREST estimated logic: if results fill the page, use planned estimate.
# Otherwise use exact count (results fit within max_limit).
defp get_total_count(repo, query, :estimated, page_count, max_limit)
when is_integer(max_limit) and page_count >= max_limit do
planned = get_planned_count(repo, query)
max(page_count, planned)
end
defp get_total_count(repo, query, :estimated, _page_count, _max_limit) do
count_query = exclude(query, :order_by) |> exclude(:select)
repo.aggregate(count_query, :count)
end
defp get_planned_count(repo, query) do
# PostgREST uses EXPLAIN (FORMAT JSON) and extracts "Plan Rows"
count_query = exclude(query, :order_by) |> exclude(:select)
try do
{sql, params} = SQL.to_sql(:all, repo, count_query)
explain_sql = "EXPLAIN (FORMAT JSON) #{sql}"
case repo.query(explain_sql, params) do
{:ok, %{rows: [[json]]}} when is_list(json) ->
extract_plan_rows(json, repo, count_query)
_ ->
fallback_exact_count(repo, count_query)
end
rescue
_ -> fallback_exact_count(repo, count_query)
end
end
defp extract_plan_rows(json, repo, count_query) do
case json |> List.first() |> get_in(["Plan", "Plan Rows"]) do
rows when is_number(rows) -> trunc(rows)
_ -> fallback_exact_count(repo, count_query)
end
end
defp fallback_exact_count(repo, count_query) do
repo.aggregate(count_query, :count)
end
defp normalize_updates(attrs, fields) when is_map(attrs) do
field_strings = Enum.map(fields, &Atom.to_string/1)
attrs
|> Enum.filter(fn {k, _v} -> k in field_strings end)
|> Enum.map(fn {k, v} -> {String.to_existing_atom(k), v} end)
end
defp build_on_conflict(:merge_duplicates, config) do
# Replace all non-PK fields
pk_fields = config.primary_key
replace_fields = config.fields -- pk_fields
{:replace, replace_fields}
end
defp build_on_conflict(:ignore_duplicates, _config) do
:nothing
end
defp build_conflict_target(nil, config) do
config.primary_key
end
defp build_conflict_target(column_string, _config) when is_binary(column_string) do
column_string
|> String.split(",")
|> Enum.map(&String.trim/1)
|> Enum.map(&String.to_existing_atom/1)
end
defp maybe_apply_missing_default(entries, true, _fields), do: entries
defp maybe_apply_missing_default(entries, false, fields) do
# Without missing=default, explicitly set missing schema fields to nil
Enum.map(entries, fn entry ->
Enum.reduce(fields, entry, fn field, acc ->
Map.put_new(acc, field, nil)
end)
end)
end
defp maybe_add_returning(opts, true), do: Keyword.put(opts, :returning, true)
defp maybe_add_returning(opts, false), do: opts
end