# BulkUpsert
> #### Warning
>
> This is a very early release. It works but has some rough edges, and shouldn't be considered
> production-ready for most use cases.
Upsert multiple Ecto schema structs, along with their nested associations, to the database with a single function call.
Unlike a plain `insert_all/3`, this package passes each list of attrs through Ecto changesets. This lets it validate your data and upsert a parent **and its children across multiple tables** in one call.
Supported features:
- Nested associations: upsert a parent and its `has_many`, `has_one`, and `many_to_many` associations across multiple tables from a single list of attrs (embedded schemas are stored inline on the parent)
- Validation and data processing (via Ecto changesets)
- Custom values for autogenerated fields (e.g. insert/update timestamps)
For more information, see [this project's documentation](https://hexdocs.pm/bulk_upsert/BulkUpsert.html).
---
## Getting started
### Installation
Add this package to your list of dependencies in `mix.exs`, then run `mix deps.get`:
```elixir
def deps do
[
{:bulk_upsert, "0.2.0"}
]
end
```
### Usage
After the package has been installed, you may call `BulkUpsert.bulk_upsert/4` function directly, or create a wrapper function to use in your context modules:
`lib/your_project/repo.ex`
```elixir
defmodule YourProject.Repo do
use Ecto.Repo,
otp_app: :your_project,
adapter: Ecto.Adapters.Postgres
@doc "Wraps `BulkUpsert.bulk_upsert/4`."
def bulk_upsert(schema_module, attrs_list, opts \\ []),
do: BulkUpsert.bulk_upsert(__MODULE__, schema_module, attrs_list, opts)
end
```
#### Basic working example
Here is a contrived migration and schema that we can work with:
`priv/repo/migrations/0001_create_persons.exs`
```elixir
defmodule YourProject.Repo.Migrations.CreatePersons do
use Ecto.Migration
def change do
create table(:persons) do
add :name, :string
end
end
end
```
`lib/your_project/persons/person.ex`
```elixir
defmodule YourProject.Persons.Person do
use Ecto.Schema
import Ecto.Changeset
schema "persons" do
field :name, :string
end
def changeset(person \\ %__MODULE__{}, attrs) do
person
|> cast(attrs, [:id, :name])
|> validate_required([:id, :name])
end
end
```
Now, after running the migrations with `mix ecto.reset`, we can enter an IEx shell with `iex -S mix` and make sure everything works:
```text
Interactive Elixir (1.18.3) - press Ctrl+C to exit (type h() ENTER for help)
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Person)
[
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 1,
name: "Alice"
},
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 2,
name: "Bob"
}
]
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [%{id: 1, name: "Alicia"}, %{id: 2, name: "Bobby"}]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Person)
[
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 1,
name: "Alicia"
},
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 2,
name: "Bobby"
}
]
```
#### Working with nested associations
The main reason to reach for this package over a plain `insert_all/3` is that it can upsert a parent and its children at the same time, from a single list of attrs. The parent and each association are upserted into their own tables, all within one transaction.
Here we extend the `Person` example with a `has_many :pets` association:
`priv/repo/migrations/0002_create_pets.exs`
```elixir
defmodule YourProject.Repo.Migrations.CreatePets do
use Ecto.Migration
def change do
create table(:pets) do
add :person_id, references(:persons)
add :name, :string
end
end
end
```
`lib/your_project/persons/pet.ex`
```elixir
defmodule YourProject.Persons.Pet do
use Ecto.Schema
import Ecto.Changeset
schema "pets" do
field :person_id, :integer
field :name, :string
end
def changeset(pet \\ %__MODULE__{}, attrs) do
pet
|> cast(attrs, [:id, :person_id, :name])
|> validate_required([:id, :person_id, :name])
end
end
```
`lib/your_project/persons/person.ex`
```elixir
defmodule YourProject.Persons.Person do
use Ecto.Schema
import Ecto.Changeset
schema "persons" do
field :name, :string
has_many :pets, YourProject.Persons.Pet
end
def changeset(person \\ %__MODULE__{}, attrs) do
person
|> cast(attrs, [:id, :name])
|> validate_required([:id, :name])
|> cast_assoc(:pets)
end
end
```
> #### Note
>
> Each child's foreign key (here, `person_id`) must be present in its own attrs. Associations are upserted via `insert_all/3`, so the foreign key is not inferred from the parent.
Now a single call upserts both the persons and their pets across both tables:
```text
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [
...> %{id: 1, name: "Alice", pets: [
...> %{id: 10, person_id: 1, name: "Rex"},
...> %{id: 11, person_id: 1, name: "Whiskers"}
...> ]},
...> %{id: 2, name: "Bob", pets: [
...> %{id: 20, person_id: 2, name: "Buddy"}
...> ]}
...> ]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Pet)
[
%YourProject.Persons.Pet{id: 10, person_id: 1, name: "Rex"},
%YourProject.Persons.Pet{id: 11, person_id: 1, name: "Whiskers"},
%YourProject.Persons.Pet{id: 20, person_id: 2, name: "Buddy"}
]
```
Running the same call again with changed pet names upserts the existing rows in place, exactly like the top-level structs.
`has_one` and `many_to_many` associations work the same way: cast them in the changeset and include them in the attrs. For `has_many` and `has_one`, each child must carry its own foreign key (as shown above with `person_id`). For `many_to_many`, the associated records and the join table rows are both upserted for you, and duplicate records and links are removed automatically. Embedded schemas (`embeds_one`, `embeds_many`) have no table of their own, so they are stored inline on the parent row.
#### Working with autogenerated timestamps
Ecto's built-in `insert_all/3` function does not support autogenerated fields such as timestamps. Therefore, if your project has Ecto schemas that use autogenerated timestamp fields, you will need to ensure that these values are present during the bulk upsert process.
The simplest way is the `:placeholders` option, which sets fields from shared values (sent to the database once) after changeset validation:
```elixir
YourProject.Repo.bulk_upsert(
YourProject.Persons.Person,
[%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}],
placeholders: %{
YourProject.Persons.Person => %{inserted_at: DateTime.utc_now(), updated_at: DateTime.utc_now()}
}
)
```
Placeholder fields bypass the changeset, so they are not cast or validated. Do not mark a placeholder field as required in your changeset (its value is absent during validation, which would mark the row invalid and skip it).
If you need more control, you can instead provide custom logic via a custom `insert_all` function. Here is an example that shows how this can be accomplished:
`lib/your_project/repo.ex`
```elixir
defmodule YourProject.Repo do
use Ecto.Repo,
otp_app: :your_project,
adapter: Ecto.Adapters.Postgres
require Logger
@doc "Wraps `BulkUpsert.bulk_upsert/4`."
def bulk_upsert(schema_module, attrs_list, opts \\ []) do
base_opts = [
# Use a custom function that accepts the same arguments as `insert_all/3`
insert_all_function_atom: :insert_all_with_autogenerated_timestamps,
# Do not overwrite the initial insert timestamp
replace_all_except: [:inserted_at]
]
opts = Keyword.merge(base_opts, opts)
BulkUpsert.bulk_upsert(__MODULE__, schema_module, attrs_list, opts)
end
@doc """
Extend `YourProject.Repo.insert_all/3` to automatically generate current insert and update
timestamps when performing bulk insert operations.
> #### Info {: .info}
>
> Unlike `YourProject.Repo.insert_all/3`, this function will only accept a schema as the first
> argument (not a source), and a list of entries as the second argument (not a query).
## Examples
iex> YourProject.Repo.insert_all_with_autogenerated_timestamps(
...> YourProject.Persons.Person,
...> _attrs_list = [%{...}, %{...}]
...> )
{2, nil}
"""
def insert_all_with_autogenerated_timestamps(schema_module, entries, opts \\ []) do
placeholders = Keyword.get(opts, :placeholders, %{})
# Build timestamp placeholders and attrs
inserted_at_field = :inserted_at
updated_at_field = :updated_at
current_timestamp = DateTime.utc_now()
timestamp_placeholders =
Map.new([{inserted_at_field, current_timestamp}, {updated_at_field, current_timestamp}])
# Reject timestamp fields that are not defined in the given `schema_module`
|> Map.reject(fn {field, _value} -> schema_module.__schema__(:type, field) |> is_nil() end)
if Enum.empty?(timestamp_placeholders) do
Logger.debug("""
The #{inspect(schema_module)} schema does not use any configured insert or update \
timestamp fields. Falling back to `#{inspect(__MODULE__)}.insert_all/3`...\
""")
__MODULE__.insert_all(schema_module, entries, opts)
else
Logger.debug("Performing bulk insert with autogenerated timestamps...")
timestamp_placeholder_attrs =
timestamp_placeholders
|> Map.new(fn {field, _value} -> {field, {:placeholder, field}} end)
# Merge the timestamp attrs and placeholders into an `insert_all/3` function call
entries = entries |> Enum.map(fn attrs -> Map.merge(attrs, timestamp_placeholder_attrs) end)
placeholders = placeholders |> Map.merge(timestamp_placeholders)
opts = opts |> Keyword.put(:placeholders, placeholders)
__MODULE__.insert_all(schema_module, entries, opts)
end
end
end
```
`priv/repo/migrations/0001_create_persons.exs`
```elixir
defmodule YourProject.Repo.Migrations.CreatePersons do
use Ecto.Migration
def change do
create table(:persons) do
add :name, :string
timestamps(type: :utc_datetime_usec)
end
end
end
```
`lib/your_project/persons/person.ex`
```elixir
defmodule YourProject.Persons.Person do
use Ecto.Schema
import Ecto.Changeset
schema "persons" do
field :name, :string
timestamps(type: :utc_datetime_usec)
end
def changeset(person \\ %__MODULE__{}, attrs) do
person
|> cast(attrs, [:id, :name])
|> validate_required([:id, :name])
end
end
```
Now, after running the migrations with `mix ecto.reset`, we can enter an IEx shell with `iex -S mix` and make sure everything still works:
```text
Interactive Elixir (1.18.3) - press Ctrl+C to exit (type h() ENTER for help)
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Person)
[
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 1,
name: "Alice",
inserted_at: ~U[2025-04-29 07:01:10.180490Z],
updated_at: ~U[2025-04-29 07:01:10.180490Z]
},
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 2,
name: "Bob",
inserted_at: ~U[2025-04-29 07:01:10.180490Z],
updated_at: ~U[2025-04-29 07:01:10.180490Z]
}
]
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [%{id: 1, name: "Alicia"}, %{id: 2, name: "Bobby"}]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Person)
[
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 1,
name: "Alicia",
inserted_at: ~U[2025-04-29 07:01:10.180490Z],
updated_at: ~U[2025-04-29 07:01:19.549929Z]
},
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 2,
name: "Bobby",
inserted_at: ~U[2025-04-29 07:01:10.180490Z],
updated_at: ~U[2025-04-29 07:01:19.549929Z]
}
]
```
As you can see, our new custom logic ensures that the correct timestamps are generated.
---
For more information, see [this project's documentation on HexDocs](https://hexdocs.pm/bulk_upsert/BulkUpsert.html).
---
This project made possible by Interline Travel and Tour Inc.
https://www.perx.com/
https://www.touchdown.co.uk/
https://www.touchdownfrance.com/