defmodule EctoUnnest do
@moduledoc """
Bulk insert for Ecto via `unnest(...)`.
A plain `Ecto.Repo.insert_all/3` builds `VALUES ($1,$2),($3,$4),...`, so the SQL
text grows with the number of rows — every batch size is a different prepared
statement. PgBouncer (transaction mode) dislikes that, and Postgres caps
parameters at ~65535.
This library generates **constant SQL text, independent of the row count**:
INSERT INTO "events" ("type","user_id")
(SELECT f0."type", f0."user_id"
FROM (SELECT * FROM unnest($1::text[], $2::bigint[]) AS u("type","user_id")) AS f0)
Whether you insert 1 or 10,000 rows, the statement is identical. The query is
assembled from Ecto building blocks (`fragment`/`dynamic`) and handed to
`Ecto.Repo.insert_all/3`, which renders `ON CONFLICT`/`RETURNING`/`prefix` and
loads structs natively.
## API
Two disjoint maps:
* a columns map `%{col => list}` — each column goes into `unnest` as an array,
* `:placeholders` `%{col => value}` — constants broadcast onto every row.
```elixir
EctoUnnest.insert_all(Repo, Event,
%{user_id: [1, 2, 3], type: ["click", "view", "click"]},
placeholders: %{inserted_at: ~U[2026-06-17 10:00:00Z]},
returning: true
)
```
## Options (same as `Ecto.Repo.insert_all/3`)
* `:placeholders` — `%{col => value}` of constant columns (default `%{}`)
* `:returning` — `true | false | [field]` (default `false`)
* `:prefix` — schema prefix (overrides `@schema_prefix`)
* `:on_conflict` — `:raise | :nothing | :replace_all | {:replace, fields} |
{:replace_all_except, fields} | [set: kw, inc: kw] | Ecto.Query.t()`. A full
query enables a conditional `DO UPDATE ... WHERE ...` (see "Conflicts" below)
* `:conflict_target` — `[col] | {:unsafe_fragment, binary}`
* `:types` — `%{col => pg_type}` override for type inference, where `pg_type` is
an atom (recommended — assumed app-controlled, so rendered straight into the
SQL cast) or a string. For a placeholder it becomes a raw `::pg_type` cast, so
it can name a custom PG type (e.g. a domain `:kafka_topic_name`). The value is
rendered into SQL verbatim, so it is fail-closed: Ecto's default PG types
(`bigint`, `text`, `jsonb`, `timestamptz`, …) are always accepted, but any
other spelling (a domain, an alias like `int4`, a modifier like `numeric(10,2)`)
must be vouched for in config, or it raises:
config :ecto_unnest, :allowed_types, [:int4, "kafka_topic_name"]
* `:json` — `[col]` columns to force into JSON mode (see "JSON columns" below)
* `:cache_statement` — prepared-statement cache name. Defaults to
`"ecto_unnest_all_\#{table}_\#{arity}"`, where `arity` is the number of
`unnest` (array) columns, so distinct column shapes for one table do not
share a cache slot (see "Prepared-statement cache" below). Pass a binary to
override, or `nil` to fall back to Ecto's `"ecto_insert_all_\#{table}"`
* `:require_all_fields` — `true` to assert every schema field is accounted for
(see "Completeness check" below; schema sources only). Defaults to the
`config :ecto_unnest, :require_all_fields` value, else `false`
## Completeness check
With `require_all_fields: true` every field of the schema must appear either in
the columns map or in `:placeholders`, otherwise it raises listing what is
missing. This catches a column you forgot — without it a missing field silently
falls back to its DB default (or `NULL`). The autogenerated primary key and
`timestamps()` fields are exempt (Ecto/the DB fills them). A field you mean to
leave at its default goes in `:placeholders` as `nil`:
EctoUnnest.insert_all(Repo, Event,
%{user_id: [1, 2], type: ["a", "b"], score: [1.0, 2.0], tags: [nil, nil]},
placeholders: %{inserted_at: ts, payload: nil},
require_all_fields: true)
Only valid for a schema source (a binary source has no field list and raises).
To make it a test-only safety net (off in production), skip the per-call option
and set it in config for that environment:
# config/test.exs
config :ecto_unnest, :require_all_fields, true
An explicit `:require_all_fields` in `opts` always overrides the config value.
## JSON columns
A per-row value destined for a `jsonb` column (Ecto type `:map`, `{:map, _}` or
`{:array, :map}`) cannot ride in `unnest` as a `::jsonb[]` array — `unnest`
flattens multi-dimensional arrays and Postgrex would double-encode. Such columns
are shipped instead as a 1-D `text[]` of pre-encoded JSON with a `::jsonb` cast in
the projection. Schema sources detect this automatically from the Ecto type; on
binary sources use `types: %{col: :jsonb}` or `json: [:col]`. Raw terms are JSON
encoded with the configured `:postgrex` `:json_library`; already-encoded strings
pass through untouched.
EctoUnnest.insert_all(Repo, Doc,
%{id: [id1, id2], summary: [[%{"a" => 1}], [%{"b" => 2}]]},
placeholders: %{created_at: ts})
## Prepared-statement cache
Postgrex caches prepared statements under a name. Ecto's `insert_all` derives it
from the table alone (`"ecto_insert_all_\#{table}"`), which is fine when the SQL
for a table never changes. But here the SQL depends on the **column shape**: each
distinct set of `unnest` (array) columns renders a different statement, so sharing
one cache name per table would make Postgrex re-prepare every time the shape
changes. The default name therefore appends the unnest arity:
ecto_unnest_all_events_2 # 2 unnest columns
ecto_unnest_all_events_3 # 3 unnest columns
The row count never enters the name (the SQL is constant across it — that is the
whole point). Arity does not distinguish two same-arity shapes with different
column *names*; if you insert such shapes into one table, give each an explicit
`:cache_statement`. Pass `cache_statement: nil` to restore Ecto's default.
## Conflicts
Besides the keyword form, `:on_conflict` accepts a full `Ecto.Query` (as
`Ecto.Repo.insert_all/3` does) for a conditional update:
from(s in Setting, update: [set: [deleted_at: nil]], where: not is_nil(s.deleted_at))
## Reading: virtual table
`table/3` exposes the same `unnest(...)` source as a composable `%Ecto.Query{}`,
so you can `SELECT` from it or join it into an `UPDATE`. See `table/3`.
## Limitations
* non-JSON array-typed columns (`{:array, _}` other than `{:array, :map}`) that
vary per row are unsupported (`unnest` flattens multi-dimensional arrays) — we
raise a clear error. JSON arrays (`{:array, :map}`) are supported via JSON mode
(see "JSON columns"),
* binary sources (`"table"`) require `:types`, and `:returning` as a field list
(no `__schema__`).
"""
alias EctoUnnest.Query
@type source :: module() | binary()
@type columns :: %{atom() => list()}
@spec insert_all(Ecto.Repo.t(), source(), columns(), keyword()) ::
{non_neg_integer(), [struct()] | nil}
def insert_all(repo, schema, columns, opts \\ []) when is_map(columns) and is_list(opts) do
placeholders = Map.new(opts[:placeholders] || %{})
plan = build_plan!(schema, columns, placeholders, opts)
repo.insert_all(schema, Query.build(plan),
on_conflict: plan.on_conflict,
conflict_target: conflict_target_opt(plan.conflict_target),
returning: returning_opt(plan.returning),
prefix: plan.prefix,
cache_statement: cache_statement(plan, opts)
)
end
# The prepared-statement cache name. Ecto defaults `insert_all` to
# `"ecto_insert_all_#{source}"` — keyed on the table only. But each distinct
# unnest shape (a different set/count of array columns) renders different SQL and
# so is a different prepared statement; collapsing them under one name makes
# Postgrex re-prepare on every shape change. We append the unnest arity (the
# number of `unnest($n::t[])` params) so shapes get distinct cache slots. Pass
# `:cache_statement` to override (a binary used verbatim, or `nil` to fall back to
# Ecto's default). Note arity alone does not separate same-arity/different-column
# shapes — give those an explicit `:cache_statement`.
defp cache_statement(plan, opts) do
case Keyword.fetch(opts, :cache_statement) do
{:ok, name} -> name
:error -> "ecto_unnest_all_#{plan.table}_#{map_size(plan.values)}"
end
end
@doc """
Returns `{sql, params}` without executing the query.
Useful for debugging and for tests with no database connection — the whole plan
build (type inference) is pure, and the text itself is rendered by the Postgres
adapter's `Connection` module (the same building blocks `Repo.insert_all/3` uses).
"""
@spec to_sql(source(), columns(), keyword()) :: {String.t(), [term()]}
def to_sql(schema, columns, opts \\ []) when is_map(columns) and is_list(opts) do
placeholders = Map.new(opts[:placeholders] || %{})
plan = build_plan!(schema, columns, placeholders, opts)
Query.to_sql(plan)
end
@doc """
A virtual table built from `unnest(...)` as a composable `%Ecto.Query{}`.
Each column `%{col => list}` becomes an `unnest` column. The result can be used
like any Ecto source — `where`, `order_by`, `select`, `Repo.all/2`, and through
`subquery/1` also in a `join` for `update_all`/`delete_all`.
Types come from the schema (like `insert_all/4`) or from `:types` for sources
without a schema. The binding is named `:s` by default (`:as` option).
q = EctoUnnest.table(Event, %{user_id: [1, 2, 3], type: ["a", "b", "c"]})
from([s: s] in q, where: s.user_id > 1, select: {s.user_id, s.type})
|> Repo.all()
For a join, wrap it in `subquery/1` (which carries the parameters) and give it a
`select`:
src = from([s: s] in q, select: %{user_id: s.user_id, type: s.type})
from(e in Event, join: s in subquery(src), on: e.user_id == s.user_id,
update: [set: [type: s.type]])
|> Repo.update_all([])
"""
@spec table(source(), columns(), keyword()) :: Ecto.Query.t()
def table(schema, columns, opts \\ []) when is_map(columns) and is_list(opts) do
validate_lists!(columns)
_ = row_count!(columns)
arrays = classify_columns!(schema, columns, %{}, opts)
Query.virtual(arrays, opts[:as] || :s)
end
# `Repo.insert_all/3` wants a field list in :returning (or no key at all).
defp returning_opt([]), do: false
defp returning_opt(fields), do: fields
# Empty conflict target -> don't pass it (Ecto requires a non-empty one).
defp conflict_target_opt([]), do: []
defp conflict_target_opt(target), do: target
# ── Plan: all validation and classification in one place ──────────────
defp build_plan!(schema, columns, placeholders, opts) do
validate_disjoint!(columns, placeholders)
validate_lists!(columns)
if require_all_fields?(opts), do: validate_complete!(schema, columns, placeholders)
n = row_count!(columns)
cols = classify_columns!(schema, columns, placeholders, opts)
%{
schema: schema,
table: table_name(schema),
prefix: opts[:prefix] || schema_prefix(schema),
columns: cols,
n: n,
values: columns,
placeholders: placeholders,
returning: normalize_returning(opts[:returning] || false, schema),
on_conflict: opts[:on_conflict] || :raise,
conflict_target: opts[:conflict_target] || []
}
end
defp validate_disjoint!(columns, placeholders) do
(%MapSet{} = ms) = MapSet.intersection(MapSet.new(Map.keys(columns)), MapSet.new(Map.keys(placeholders)))
if MapSet.size(ms) > 0 do
raise ArgumentError,
"columns #{inspect(MapSet.to_list(ms))} are in both the columns map and :placeholders"
end
end
# On by default only where you opt in: an explicit `:require_all_fields` per call
# wins; otherwise fall back to application config, so you can switch it on for one
# environment (e.g. `config :ecto_unnest, :require_all_fields, true` in test) and
# leave it off in production. Defaults to off.
defp require_all_fields?(opts) do
Keyword.get(opts, :require_all_fields, Application.get_env(:ecto_unnest, :require_all_fields, false))
end
# `require_all_fields: true` — every schema field must be covered by the columns
# map or :placeholders, so a forgotten column raises instead of silently taking
# the DB default. The autogenerated primary key and `timestamps()` fields are
# exempt (Ecto/the DB supplies them). A binary source has no field list.
defp validate_complete!(source, _columns, _placeholders) when is_binary(source) do
raise ArgumentError,
"require_all_fields: true needs a schema; source #{inspect(source)} has no field list"
end
defp validate_complete!(schema, columns, placeholders) do
required = MapSet.difference(MapSet.new(schema.__schema__(:fields)), autogenerated_fields(schema))
provided = MapSet.union(MapSet.new(Map.keys(columns)), MapSet.new(Map.keys(placeholders)))
missing = MapSet.difference(required, provided)
if MapSet.size(missing) > 0 do
raise ArgumentError,
"require_all_fields: true — schema fields #{inspect(Enum.sort(MapSet.to_list(missing)))} " <>
"are not provided. Add each to the columns map or to :placeholders (use a nil " <>
"placeholder for a field you intentionally leave at its default)."
end
:ok
end
# Fields Ecto/the DB fill on their own: the autogenerate primary key and any
# `timestamps()`/`autogenerate: {m,f,a}` fields. Exempt from the completeness check.
defp autogenerated_fields(schema) do
id =
case schema.__schema__(:autogenerate_id) do
nil -> []
tuple -> [elem(tuple, 0)]
end
auto = Enum.flat_map(schema.__schema__(:autogenerate), fn {fields, _mfa} -> fields end)
MapSet.new(id ++ auto)
end
defp validate_lists!(columns) do
for {name, value} <- columns, not is_list(value) do
raise ArgumentError,
"column #{inspect(name)} has a scalar value — move it to :placeholders " <>
"or wrap it in a list"
end
:ok
end
# N = length of the column lists; all lists must be equal.
# Empty columns map -> N = 1 (insert a single row of placeholders only).
defp row_count!(columns) do
case columns |> Map.values() |> Enum.map(&length/1) |> Enum.uniq() do
[] -> 1
[n] -> n
lens -> raise ArgumentError, "column lists have different lengths: #{inspect(lens)}"
end
end
# Deterministic column order (sorted by name) -> stable SQL.
defp classify_columns!(schema, columns, placeholders, opts) do
overrides = Map.new(opts[:types] || %{})
json_opt = MapSet.new(opts[:json] || [])
array_cols = for k <- Map.keys(columns), do: {k, :array}
scalar_cols = for k <- Map.keys(placeholders), do: {k, :scalar}
(array_cols ++ scalar_cols)
|> Enum.sort_by(fn {name, _} -> name end)
|> Enum.map(fn {name, kind} ->
override = normalize_type(name, overrides[name])
ecto_type = EctoUnnest.Types.ecto_type!(schema, name, override)
classify_column!(schema, name, kind, ecto_type, override, columns, MapSet.member?(json_opt, name))
end)
end
# An `:array` (per-row) column whose value is itself JSON (jsonb/`{:array,:map}`).
# `unnest` flattens multi-dimensional arrays, so the only shape Postgres accepts
# is a 1-D `text[]` of pre-encoded JSON with a `::jsonb` cast in the projection.
defp classify_column!(schema, name, :array, ecto_type, override, columns, json?) do
base = %{name: name, kind: :array, ecto_type: ecto_type}
if json?(ecto_type, override, json?) do
Map.merge(base, %{
json: true,
pg_type: "text",
values: Enum.map(columns[name], &encode_json/1),
dump_type: :string
})
else
pg_type = resolve_pg_type!(name, ecto_type, override, :array)
# Schema -> dump arrays through the real Ecto type (UUID, Enum, datetime).
# Binary source -> the Ecto type is a `:string` stand-in, so pass values
# through untouched (`:any`) and let the `::pg_type[]` cast do the encoding.
dump_type = if is_binary(schema), do: :any, else: ecto_type
Map.merge(base, %{pg_type: pg_type, values: columns[name], dump_type: dump_type})
end
end
# Placeholders never go through `unnest`, so they need no `::pg_type[]` param
# cast — the value is a scalar parameter cast in the projection. `:types`, when
# given, becomes a raw `::type` cast on that parameter (supports custom PG types
# such as a domain `kafka_topic_name`, and non-string types on binary sources);
# otherwise the Ecto type drives the cast (so `Ecto.Enum`/date/uuid just work).
defp classify_column!(_schema, name, :scalar, ecto_type, override, _columns, _json?) do
cast = if override, do: {:raw, override}, else: {:ecto, ecto_type}
%{name: name, kind: :scalar, ecto_type: ecto_type, cast: cast}
end
defp json?(_ecto_type, _override, true), do: true
defp json?(_ecto_type, override, _json?) when override in ["jsonb", "json"], do: true
defp json?(ecto_type, nil, _json?), do: EctoUnnest.Types.jsonb?(ecto_type)
defp json?(_ecto_type, _override, _json?), do: false
# `:types` may be an atom (recommended — app-controlled, so it is safe to render
# straight into the SQL cast) or a string. Normalize to a string for rendering.
defp normalize_type(_name, nil), do: nil
defp normalize_type(name, t) when is_atom(t), do: validate_type!(name, Atom.to_string(t))
defp normalize_type(name, t) when is_binary(t), do: validate_type!(name, t)
# The override is rendered straight into the SQL cast (`$n::type` / `::type[]`),
# so as a defence-in-depth guard against injection we accept only a well-formed
# PostgreSQL type spelling, not just a safe character set: a (possibly
# multi-word, schema-qualified) base name, an optional `(...)` modifier whose
# parens must close, and zero or more `[]`/`[n]` array suffixes whose brackets
# must close. Covers `kafka_topic_name`, `bigint[]`, `numeric(10,2)`,
# `timestamp with time zone`, `public.mytype`. A stray `int4)` or `bigint[`,
# or any other char (`;`, quotes, `-`, `\`, …), is rejected.
# A `:types` override is rendered into SQL verbatim, so it is fail-closed: a
# spelling is allowed only if it is one of Ecto's default PG types or the app has
# vouched for it via `config :ecto_unnest, :allowed_types`. Anything else raises.
defp validate_type!(name, type) do
if type in allowed_types() do
type
else
raise ArgumentError,
":types value #{inspect(type)} for column #{inspect(name)} is not allowed — it is not a " <>
"default Ecto type; add it to `config :ecto_unnest, :allowed_types, [...]`"
end
end
# Default PG types (always allowed) plus the configured custom ones. Config
# entries may be atoms or strings; normalized to strings to match the cast.
defp allowed_types do
configured =
case Application.get_env(:ecto_unnest, :allowed_types) do
nil -> []
list -> Enum.map(list, &to_string/1)
end
EctoUnnest.Types.default_pg_types() ++ configured
end
defp encode_json(nil), do: nil
defp encode_json(v) when is_binary(v), do: v
defp encode_json(v), do: json_library().encode!(v)
defp json_library, do: Application.get_env(:postgrex, :json_library, JSON)
defp resolve_pg_type!(name, ecto_type, override, kind) do
case EctoUnnest.Types.resolve_pg_type(ecto_type, override, kind) do
{:ok, pg_type} ->
pg_type
{:error, :array_unsupported} ->
raise ArgumentError,
"column #{inspect(name)} is array-typed and varies per row — " <>
"unnest flattens multi-dimensional arrays. If the value is constant, " <>
"move it to :placeholders (then it goes as a scalar parameter)."
end
end
# ── helpers ────────────────────────────────────────────────────────────
defp table_name(schema) when is_atom(schema), do: schema.__schema__(:source)
defp table_name(source) when is_binary(source), do: source
defp schema_prefix(schema) when is_atom(schema), do: schema.__schema__(:prefix)
defp schema_prefix(_), do: nil
defp normalize_returning(false, _schema), do: []
defp normalize_returning(true, schema) when is_atom(schema), do: schema.__schema__(:fields)
defp normalize_returning(true, source) when is_binary(source),
do: raise(ArgumentError, "for source #{inspect(source)} pass :returning as a field list")
defp normalize_returning(list, _schema) when is_list(list), do: list
end