defmodule Ecto.Adapters.SQL do
@moduledoc ~S"""
This application provides functionality for working with
SQL databases in `Ecto`.
## Built-in adapters
By default, we support the following adapters:
* `Ecto.Adapters.Postgres` for Postgres
* `Ecto.Adapters.MyXQL` for MySQL
* `Ecto.Adapters.Tds` for SQLServer
## Additional functions
If your `Ecto.Repo` is backed by any of the SQL adapters above,
this module will inject additional functions into your repository:
* `disconnect_all(interval, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.disconnect_all/3`
* `explain(type, query, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.explain/4`
* `query(sql, params, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.query/4`
* `query!(sql, params, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.query!/4`
* `query_many(sql, params, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.query_many/4`
* `query_many!(sql, params, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.query_many!/4`
* `to_sql(type, query)` -
shortcut for `Ecto.Adapters.SQL.to_sql/3`
Generally speaking, you must invoke those functions directly from
your repository, for example: `MyApp.Repo.query("SELECT true")`.
You can also invoke them directly from `Ecto.Adapters.SQL`, but
keep in mind that in such cases features such as "dynamic repositories"
won't be available.
## Migrations
`ecto_sql` supports database migrations. You can generate a migration
with:
$ mix ecto.gen.migration create_posts
This will create a new file inside `priv/repo/migrations` with the
`change` function. Check `Ecto.Migration` for more information.
To interface with migrations, developers typically use mix tasks:
* `mix ecto.migrations` - lists all available migrations and their status
* `mix ecto.migrate` - runs a migration
* `mix ecto.rollback` - rolls back a previously run migration
If you want to run migrations programmatically, see `Ecto.Migrator`.
## SQL sandbox
`ecto_sql` provides a sandbox for testing. The sandbox wraps each
test in a transaction, making sure the tests are isolated and can
run concurrently. See `Ecto.Adapters.SQL.Sandbox` for more information.
## Structure load and dumping
If you have an existing database, you may want to dump its existing
structure and make it reproducible from within Ecto. This can be
achieved with two Mix tasks:
* `mix ecto.load` - loads an existing structure into the database
* `mix ecto.dump` - dumps the existing database structure to the filesystem
For creating and dropping databases, see `mix ecto.create`
and `mix ecto.drop` that are included as part of Ecto.
## Custom adapters
Developers can implement their own SQL adapters by using
`Ecto.Adapters.SQL` and by implementing the callbacks required
by `Ecto.Adapters.SQL.Connection` for handling connections and
performing queries. The connection handling and pooling for SQL
adapters should be built using the `DBConnection` library.
When using `Ecto.Adapters.SQL`, the following options are required:
* `:driver` (required) - the database driver library.
For example: `:postgrex`
"""
require Logger
@type query_result :: %{
:rows => nil | [[term] | binary],
:num_rows => non_neg_integer,
optional(atom) => any
}
@type query_params :: [term] | %{(atom | String.t()) => term}
@doc false
defmacro __using__(opts) do
quote do
@behaviour Ecto.Adapter
@behaviour Ecto.Adapter.Migration
@behaviour Ecto.Adapter.Queryable
@behaviour Ecto.Adapter.Schema
@behaviour Ecto.Adapter.Transaction
opts = unquote(opts)
@conn __MODULE__.Connection
@driver Keyword.fetch!(opts, :driver)
@impl true
defmacro __before_compile__(env) do
Ecto.Adapters.SQL.__before_compile__(@driver, env)
end
@impl true
def ensure_all_started(config, type) do
Ecto.Adapters.SQL.ensure_all_started(@driver, config, type)
end
@impl true
def init(config) do
Ecto.Adapters.SQL.init(@conn, @driver, config)
end
@impl true
def checkout(meta, opts, fun) do
Ecto.Adapters.SQL.checkout(meta, opts, fun)
end
@impl true
def checked_out?(meta) do
Ecto.Adapters.SQL.checked_out?(meta)
end
@impl true
def loaders({:map, _}, type), do: [&Ecto.Type.embedded_load(type, &1, :json)]
def loaders(:binary_id, type), do: [Ecto.UUID, type]
def loaders(_, type), do: [type]
@impl true
def dumpers({:map, _}, type), do: [&Ecto.Type.embedded_dump(type, &1, :json)]
def dumpers(:binary_id, type), do: [type, Ecto.UUID]
def dumpers(_, type), do: [type]
## Query
@impl true
def prepare(:all, query) do
{:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.all(query))}}
end
def prepare(:update_all, query) do
{:cache,
{System.unique_integer([:positive]), IO.iodata_to_binary(@conn.update_all(query))}}
end
def prepare(:delete_all, query) do
{:cache,
{System.unique_integer([:positive]), IO.iodata_to_binary(@conn.delete_all(query))}}
end
@impl true
def execute(adapter_meta, query_meta, query, params, opts) do
Ecto.Adapters.SQL.execute(:named, adapter_meta, query_meta, query, params, opts)
end
@impl true
def stream(adapter_meta, query_meta, query, params, opts) do
Ecto.Adapters.SQL.stream(adapter_meta, query_meta, query, params, opts)
end
## Schema
@impl true
def autogenerate(:id), do: nil
def autogenerate(:embed_id), do: Ecto.UUID.generate()
def autogenerate(:binary_id), do: Ecto.UUID.bingenerate()
@impl true
def insert_all(
adapter_meta,
schema_meta,
header,
rows,
on_conflict,
returning,
placeholders,
opts
) do
Ecto.Adapters.SQL.insert_all(
adapter_meta,
schema_meta,
@conn,
header,
rows,
on_conflict,
returning,
placeholders,
opts
)
end
@impl true
def insert(adapter_meta, schema_meta, params, on_conflict, returning, opts) do
%{source: source, prefix: prefix} = schema_meta
{kind, conflict_params, _} = on_conflict
{fields, values} = :lists.unzip(params)
sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning, [])
Ecto.Adapters.SQL.struct(
adapter_meta,
@conn,
sql,
:insert,
source,
[],
values ++ conflict_params,
kind,
returning,
opts
)
end
@impl true
def update(adapter_meta, schema_meta, fields, params, returning, opts) do
%{source: source, prefix: prefix} = schema_meta
{fields, field_values} = :lists.unzip(fields)
filter_values = Keyword.values(params)
sql = @conn.update(prefix, source, fields, params, returning)
Ecto.Adapters.SQL.struct(
adapter_meta,
@conn,
sql,
:update,
source,
params,
field_values ++ filter_values,
:raise,
returning,
opts
)
end
@impl true
def delete(adapter_meta, schema_meta, params, returning, opts) do
%{source: source, prefix: prefix} = schema_meta
filter_values = Keyword.values(params)
sql = @conn.delete(prefix, source, params, returning)
Ecto.Adapters.SQL.struct(
adapter_meta,
@conn,
sql,
:delete,
source,
params,
filter_values,
:raise,
returning,
opts
)
end
## Transaction
@impl true
def transaction(meta, opts, fun) do
Ecto.Adapters.SQL.transaction(meta, opts, fun)
end
@impl true
def in_transaction?(meta) do
Ecto.Adapters.SQL.in_transaction?(meta)
end
@impl true
def rollback(meta, value) do
Ecto.Adapters.SQL.rollback(meta, value)
end
## Migration
@impl true
def execute_ddl(meta, definition, opts) do
Ecto.Adapters.SQL.execute_ddl(meta, @conn, definition, opts)
end
defoverridable prepare: 2,
execute: 5,
insert: 6,
update: 6,
delete: 5,
insert_all: 8,
execute_ddl: 3,
loaders: 2,
dumpers: 2,
autogenerate: 1,
checkout: 3,
ensure_all_started: 2,
__before_compile__: 1
end
end
@timeout 15_000
@doc """
Converts the given query to SQL according to its kind and the
adapter in the given repository.
## Examples
The examples below are meant for reference. Each adapter will
return a different result:
iex> Ecto.Adapters.SQL.to_sql(:all, Repo, Post)
{"SELECT p.id, p.title, p.inserted_at, p.created_at FROM posts as p", []}
iex> Ecto.Adapters.SQL.to_sql(:update_all, Repo,
from(p in Post, update: [set: [title: ^"hello"]]))
{"UPDATE posts AS p SET title = $1", ["hello"]}
This function is also available under the repository with name `to_sql`:
iex> Repo.to_sql(:all, Post)
{"SELECT p.id, p.title, p.inserted_at, p.created_at FROM posts as p", []}
"""
@spec to_sql(:all | :update_all | :delete_all, Ecto.Repo.t(), Ecto.Queryable.t()) ::
{String.t(), query_params}
def to_sql(kind, repo, queryable) do
case Ecto.Adapter.Queryable.prepare_query(kind, repo, queryable) do
{{:cached, _update, _reset, {_id, cached}}, params} ->
{String.Chars.to_string(cached), params}
{{:cache, _update, {_id, prepared}}, params} ->
{prepared, params}
{{:nocache, {_id, prepared}}, params} ->
{prepared, params}
end
end
@doc """
Executes an EXPLAIN statement or similar for the given query according to its kind and the
adapter in the given repository.
## Examples
# Postgres
iex> Ecto.Adapters.SQL.explain(Repo, :all, Post)
"Seq Scan on posts p0 (cost=0.00..12.12 rows=1 width=443)"
# MySQL
iex> Ecto.Adapters.SQL.explain(Repo, :all, from(p in Post, where: p.title == "title")) |> IO.puts()
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+
| 1 | SIMPLE | p0 | NULL | ALL | NULL | NULL | NULL | NULL | 1 | 100.0 | Using where |
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+
# Shared opts
iex> Ecto.Adapters.SQL.explain(Repo, :all, Post, analyze: true, timeout: 20_000)
"Seq Scan on posts p0 (cost=0.00..11.70 rows=170 width=443) (actual time=0.013..0.013 rows=0 loops=1)\\nPlanning Time: 0.031 ms\\nExecution Time: 0.021 ms"
It's safe to execute it for updates and deletes, no data change will be committed:
iex> Ecto.Adapters.SQL.explain(Repo, :update_all, from(p in Post, update: [set: [title: "new title"]]))
"Update on posts p0 (cost=0.00..11.70 rows=170 width=449)\\n -> Seq Scan on posts p0 (cost=0.00..11.70 rows=170 width=449)"
This function is also available under the repository with name `explain`:
iex> Repo.explain(:all, from(p in Post, where: p.title == "title"))
"Seq Scan on posts p0 (cost=0.00..12.12 rows=1 width=443)\\n Filter: ((title)::text = 'title'::text)"
### Options
Built-in adapters support passing `opts` to the EXPLAIN statement according to the following:
Adapter | Supported opts
---------------- | --------------
Postgrex | `analyze`, `verbose`, `costs`, `settings`, `buffers`, `timing`, `summary`, `format`, `plan`
MyXQL | `format`
All options except `format` are boolean valued and default to `false`.
The allowed `format` values are `:map`, `:yaml`, and `:text`:
* `:map` is the deserialized JSON encoding.
* `:yaml` and `:text` return the result as a string.
The built-in adapters support the following formats:
* Postgrex: `:map`, `:yaml` and `:text`
* MyXQL: `:map` and `:text`
The `:plan` option in Postgrex can take the values `:custom` or `:fallback_generic`. When `:custom`
is specified, the explain plan generated will consider the specific values of the query parameters
that are supplied. When using `:fallback_generic`, the specific values of the query parameters will
be ignored. `:fallback_generic` does not use PostgreSQL's built-in support for a generic explain
plan (available as of PostgreSQL 16), but instead uses a special implementation that works for PostgreSQL
versions 12 and above. Defaults to `:custom`.
Any other value passed to `opts` will be forwarded to the underlying adapter query function, including
shared Repo options such as `:timeout`. Non built-in adapters may have specific behaviour and you should
consult their documentation for more details.
For version compatibility, please check your database's documentation:
* _Postgrex_: [PostgreSQL doc](https://www.postgresql.org/docs/current/sql-explain.html).
* _MyXQL_: [MySQL doc](https://dev.mysql.com/doc/refman/8.0/en/explain.html).
"""
@spec explain(
pid() | Ecto.Repo.t() | Ecto.Adapter.adapter_meta(),
:all | :update_all | :delete_all,
Ecto.Queryable.t(),
opts :: Keyword.t()
) :: String.t() | Exception.t() | list(map)
def explain(repo, operation, queryable, opts \\ [])
def explain(repo, operation, queryable, opts) when is_atom(repo) or is_pid(repo) do
explain(Ecto.Adapter.lookup_meta(repo), operation, queryable, opts)
end
def explain(%{repo: repo} = adapter_meta, operation, queryable, opts) do
Ecto.Multi.new()
|> Ecto.Multi.run(:explain, fn _, _ ->
{prepared, prepared_params} = to_sql(operation, repo, queryable)
sql_call(adapter_meta, :explain_query, [prepared], prepared_params, opts)
end)
|> Ecto.Multi.run(:rollback, fn _, _ ->
{:error, :forced_rollback}
end)
|> repo.transaction(opts)
|> case do
{:error, :rollback, :forced_rollback, %{explain: result}} -> result
{:error, :explain, error, _} -> raise error
_ -> raise "unable to execute explain"
end
end
@doc """
Forces all connections in the repo pool to disconnect within the given interval.
Once this function is called, the pool will disconnect all of its connections
as they are checked in or as they are pinged. Checked in connections will be
randomly disconnected within the given time interval. Pinged connections are
immediately disconnected - as they are idle (according to `:idle_interval`).
If the connection has a backoff configured (which is the case by default),
disconnecting means an attempt at a new connection will be done immediately
after, without starting a new process for each connection. However, if backoff
has been disabled, the connection process will terminate. In such cases,
disconnecting all connections may cause the pool supervisor to restart
depending on the max_restarts/max_seconds configuration of the pool,
so you will want to set those carefully.
For convenience, this function is also available in the repository:
iex> MyRepo.disconnect_all(60_000)
:ok
"""
@spec disconnect_all(
pid | Ecto.Repo.t() | Ecto.Adapter.adapter_meta(),
non_neg_integer,
opts :: Keyword.t()
) :: :ok
def disconnect_all(repo, interval, opts \\ [])
def disconnect_all(repo, interval, opts) when is_atom(repo) or is_pid(repo) do
disconnect_all(Ecto.Adapter.lookup_meta(repo), interval, opts)
end
def disconnect_all(adapter_meta, interval, opts) do
case adapter_meta do
%{partition_supervisor: {name, count}} ->
1..count
|> Enum.map(fn i ->
Task.async(fn ->
DBConnection.disconnect_all({:via, PartitionSupervisor, {name, i}}, interval, opts)
end)
end)
|> Task.await_many(:infinity)
:ok
%{pid: pool} ->
DBConnection.disconnect_all(pool, interval, opts)
end
end
@doc """
Returns a stream that runs a custom SQL query on given repo when reduced.
In case of success it is a enumerable containing maps with at least two keys:
* `:num_rows` - the number of rows affected
* `:rows` - the result set as a list. `nil` may be returned
instead of the list if the command does not yield any row
as result (but still yields the number of affected rows,
like a `delete` command without returning would)
In case of failure it raises an exception.
If the adapter supports a collectable stream, the stream may also be used as
the collectable in `Enum.into/3`. Behaviour depends on the adapter.
## Options
* `:log` - When false, does not log the query
* `:max_rows` - The number of rows to load from the database as we stream
## Examples
iex> Ecto.Adapters.SQL.stream(MyRepo, "SELECT $1::integer + $2", [40, 2]) |> Enum.to_list()
[%{rows: [[42]], num_rows: 1}]
"""
@spec stream(Ecto.Repo.t(), String.t(), query_params, Keyword.t()) :: Enum.t()
def stream(repo, sql, params \\ [], opts \\ []) do
repo
|> Ecto.Adapter.lookup_meta()
|> Ecto.Adapters.SQL.Stream.build(sql, params, opts)
end
@doc """
Same as `query/4` but raises on invalid queries.
"""
@spec query!(
pid() | Ecto.Repo.t() | Ecto.Adapter.adapter_meta(),
iodata,
query_params,
Keyword.t()
) ::
query_result
def query!(repo, sql, params \\ [], opts \\ []) do
case query(repo, sql, params, opts) do
{:ok, result} -> result
{:error, err} -> raise_sql_call_error(err)
end
end
@doc """
Runs a custom SQL query on the given repo.
In case of success, it must return an `:ok` tuple containing
a map with at least two keys:
* `:num_rows` - the number of rows affected
* `:rows` - the result set as a list. `nil` may be returned
instead of the list if the command does not yield any row
as result (but still yields the number of affected rows,
like a `delete` command without returning would)
## Options
* `:log` - When false, does not log the query
* `:timeout` - Execute request timeout, accepts: `:infinity` (default: `#{@timeout}`);
## Examples
iex> Ecto.Adapters.SQL.query(MyRepo, "SELECT $1::integer + $2", [40, 2])
{:ok, %{rows: [[42]], num_rows: 1}}
For convenience, this function is also available under the repository:
iex> MyRepo.query("SELECT $1::integer + $2", [40, 2])
{:ok, %{rows: [[42]], num_rows: 1}}
"""
@spec query(
pid() | Ecto.Repo.t() | Ecto.Adapter.adapter_meta(),
iodata,
query_params,
Keyword.t()
) ::
{:ok, query_result} | {:error, Exception.t()}
def query(repo, sql, params \\ [], opts \\ [])
def query(repo, sql, params, opts) when is_atom(repo) or is_pid(repo) do
query(Ecto.Adapter.lookup_meta(repo), sql, params, opts)
end
def query(adapter_meta, sql, params, opts) do
sql_call(adapter_meta, :query, [sql], params, opts)
end
@doc """
Same as `query_many/4` but raises on invalid queries.
"""
@spec query_many!(
Ecto.Repo.t() | Ecto.Adapter.adapter_meta(),
iodata,
query_params,
Keyword.t()
) ::
[query_result]
def query_many!(repo, sql, params \\ [], opts \\ []) do
case query_many(repo, sql, params, opts) do
{:ok, result} -> result
{:error, err} -> raise_sql_call_error(err)
end
end
@doc """
Runs a custom SQL query that returns multiple results on the given repo.
In case of success, it must return an `:ok` tuple containing
a list of maps with at least two keys:
* `:num_rows` - the number of rows affected
* `:rows` - the result set as a list. `nil` may be returned
instead of the list if the command does not yield any row
as result (but still yields the number of affected rows,
like a `delete` command without returning would)
## Options
* `:log` - When false, does not log the query
* `:timeout` - Execute request timeout, accepts: `:infinity` (default: `#{@timeout}`);
## Examples
iex> Ecto.Adapters.SQL.query_many(MyRepo, "SELECT $1; SELECT $2;", [40, 2])
{:ok, [%{rows: [[40]], num_rows: 1}, %{rows: [[2]], num_rows: 1}]}
For convenience, this function is also available under the repository:
iex> MyRepo.query_many("SELECT $1; SELECT $2;", [40, 2])
{:ok, [%{rows: [[40]], num_rows: 1}, %{rows: [[2]], num_rows: 1}]}
"""
@spec query_many(
pid() | Ecto.Repo.t() | Ecto.Adapter.adapter_meta(),
iodata,
query_params,
Keyword.t()
) :: {:ok, [query_result]} | {:error, Exception.t()}
def query_many(repo, sql, params \\ [], opts \\ [])
def query_many(repo, sql, params, opts) when is_atom(repo) or is_pid(repo) do
query_many(Ecto.Adapter.lookup_meta(repo), sql, params, opts)
end
def query_many(adapter_meta, sql, params, opts) do
sql_call(adapter_meta, :query_many, [sql], params, opts)
end
defp sql_call(adapter_meta, callback, args, params, opts) do
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
conn = get_conn_or_pool(pool, adapter_meta)
opts = with_log(telemetry, params, opts ++ default_opts)
args = args ++ [params, opts]
apply(sql, callback, [conn | args])
end
defp put_source(opts, %{sources: sources}) when is_binary(elem(elem(sources, 0), 0)) do
{source, _, _} = elem(sources, 0)
[source: source] ++ opts
end
defp put_source(opts, _) do
opts
end
@doc """
Checks if the given `table` exists.
Returns `true` if the `table` exists in the `repo`, otherwise `false`.
The table is checked against the current database/schema in the connection.
"""
@spec table_exists?(Ecto.Repo.t(), table :: String.t(), opts :: Keyword.t()) :: boolean
def table_exists?(repo, table, opts \\ []) when is_atom(repo) do
%{sql: sql} = adapter_meta = Ecto.Adapter.lookup_meta(repo)
{query, params} = sql.table_exists_query(table)
query!(adapter_meta, query, params, opts).num_rows != 0
end
# Returns a formatted table for a given query `result`.
#
# ## Examples
#
# iex> Ecto.Adapters.SQL.format_table(query) |> IO.puts()
# +---------------+---------+--------+
# | title | counter | public |
# +---------------+---------+--------+
# | My Post Title | 1 | NULL |
# +---------------+---------+--------+
@doc false
@spec format_table(%{
:columns => [String.t()] | nil,
:rows => [term()] | nil,
optional(atom) => any()
}) :: String.t()
def format_table(result)
def format_table(nil), do: ""
def format_table(%{columns: nil}), do: ""
def format_table(%{columns: []}), do: ""
def format_table(%{columns: columns, rows: nil}),
do: format_table(%{columns: columns, rows: []})
def format_table(%{columns: columns, rows: rows}) do
column_widths =
[columns | rows]
|> Enum.zip()
|> Enum.map(&Tuple.to_list/1)
|> Enum.map(fn column_with_rows ->
column_with_rows |> Enum.map(&binary_length/1) |> Enum.max()
end)
[
separator(column_widths),
"\n",
cells(columns, column_widths),
"\n",
separator(column_widths),
"\n",
Enum.map(rows, &(cells(&1, column_widths) ++ ["\n"])),
separator(column_widths)
]
|> IO.iodata_to_binary()
end
# NULL
defp binary_length(nil), do: 4
defp binary_length(binary) when is_binary(binary), do: String.length(binary)
defp binary_length(other), do: other |> inspect() |> String.length()
defp separator(widths) do
Enum.map(widths, &[?+, ?-, String.duplicate("-", &1), ?-]) ++ [?+]
end
defp cells(items, widths) do
cell =
[items, widths]
|> Enum.zip()
|> Enum.map(fn {item, width} -> [?|, " ", format_item(item, width), " "] end)
[cell | [?|]]
end
defp format_item(nil, width), do: String.pad_trailing("NULL", width)
defp format_item(item, width) when is_binary(item), do: String.pad_trailing(item, width)
defp format_item(item, width) when is_number(item),
do: item |> inspect() |> String.pad_leading(width)
defp format_item(item, width), do: item |> inspect() |> String.pad_trailing(width)
## Callbacks
@doc false
def __before_compile__(_driver, _env) do
quote do
@doc """
A convenience function for SQL-based repositories that executes the given query.
See `Ecto.Adapters.SQL.query/4` for more information.
"""
def query(sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.query(get_dynamic_repo(), sql, params, opts)
end
@doc """
A convenience function for SQL-based repositories that executes the given query.
See `Ecto.Adapters.SQL.query!/4` for more information.
"""
def query!(sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.query!(get_dynamic_repo(), sql, params, opts)
end
@doc """
A convenience function for SQL-based repositories that executes the given multi-result query.
See `Ecto.Adapters.SQL.query_many/4` for more information.
"""
def query_many(sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.query_many(get_dynamic_repo(), sql, params, opts)
end
@doc """
A convenience function for SQL-based repositories that executes the given multi-result query.
See `Ecto.Adapters.SQL.query_many!/4` for more information.
"""
def query_many!(sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.query_many!(get_dynamic_repo(), sql, params, opts)
end
@doc """
A convenience function for SQL-based repositories that translates the given query to SQL.
See `Ecto.Adapters.SQL.to_sql/3` for more information.
"""
def to_sql(operation, queryable) do
Ecto.Adapters.SQL.to_sql(operation, get_dynamic_repo(), queryable)
end
@doc """
A convenience function for SQL-based repositories that executes an EXPLAIN statement or similar
depending on the adapter to obtain statistics for the given query.
See `Ecto.Adapters.SQL.explain/4` for more information.
"""
def explain(operation, queryable, opts \\ []) do
Ecto.Adapters.SQL.explain(get_dynamic_repo(), operation, queryable, opts)
end
@doc """
A convenience function for SQL-based repositories that forces all connections in the
pool to disconnect within the given interval.
See `Ecto.Adapters.SQL.disconnect_all/3` for more information.
"""
def disconnect_all(interval, opts \\ []) do
Ecto.Adapters.SQL.disconnect_all(get_dynamic_repo(), interval, opts)
end
end
end
@doc false
def ensure_all_started(driver, _config, type) do
Application.ensure_all_started(driver, type)
end
@pool_opts [:timeout, :pool, :pool_size] ++
[:queue_target, :queue_interval, :ownership_timeout, :repo]
@valid_log_levels ~w(false debug info notice warning error critical alert emergency)a
@doc false
def init(connection, driver, config) do
unless Code.ensure_loaded?(connection) do
raise """
could not find #{inspect(connection)}.
Please verify you have added #{inspect(driver)} as a dependency:
{#{inspect(driver)}, ">= 0.0.0"}
And remember to recompile Ecto afterwards by cleaning the current build:
mix deps.clean --build ecto
"""
end
log = Keyword.get(config, :log, :debug)
if log not in @valid_log_levels do
raise """
invalid value for :log option in Repo config
The accepted values for the :log option are:
#{Enum.map_join(@valid_log_levels, ", ", &inspect/1)}
See https://hexdocs.pm/ecto/Ecto.Repo.html for more information.
"""
end
stacktrace = Keyword.get(config, :stacktrace, nil)
telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
telemetry = {config[:repo], log, telemetry_prefix ++ [:query]}
{name, config} = Keyword.pop(config, :name, config[:repo])
{pool_count, config} = Keyword.pop(config, :pool_count, 1)
{pool, config} = pool_config(config)
child_spec = connection.child_spec(config)
meta = %{
telemetry: telemetry,
sql: connection,
stacktrace: stacktrace,
opts: Keyword.take(config, @pool_opts)
}
if pool_count > 1 do
if name == nil do
raise ArgumentError, "the option :pool_count requires a :name"
end
if pool == DBConnection.Ownership do
raise ArgumentError, "the option :pool_count does not work with the SQL sandbox"
end
name = Module.concat(name, PartitionSupervisor)
partition_opts = [name: name, child_spec: child_spec, partitions: pool_count]
child_spec = Supervisor.child_spec({PartitionSupervisor, partition_opts}, [])
{:ok, child_spec, Map.put(meta, :partition_supervisor, {name, pool_count})}
else
{:ok, child_spec, meta}
end
end
defp pool_config(config) do
{pool, config} = Keyword.pop(config, :pool, DBConnection.ConnectionPool)
pool =
if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do
DBConnection.Ownership
else
pool
end
{pool, [pool: pool] ++ config}
end
@doc false
def checkout(adapter_meta, opts, callback) do
checkout_or_transaction(:run, adapter_meta, opts, callback)
end
@doc false
def checked_out?(adapter_meta) do
%{pid: pool} = adapter_meta
get_conn(pool) != nil
end
## Query
@doc false
def insert_all(
adapter_meta,
schema_meta,
conn,
header,
rows,
on_conflict,
returning,
placeholders,
opts
) do
%{source: source, prefix: prefix} = schema_meta
{_, conflict_params, _} = on_conflict
{rows, params} =
case rows do
{%Ecto.Query{} = query, params} -> {query, Enum.reverse(params)}
rows -> unzip_inserts(header, rows)
end
sql = conn.insert(prefix, source, header, rows, on_conflict, returning, placeholders)
opts =
if is_nil(Keyword.get(opts, :cache_statement)) do
[{:cache_statement, "ecto_insert_all_#{source}"} | opts]
else
opts
end
all_params = placeholders ++ Enum.reverse(params, conflict_params)
%{num_rows: num, rows: rows} = query!(adapter_meta, sql, all_params, [source: source] ++ opts)
{num, rows}
end
defp unzip_inserts(header, rows) do
Enum.map_reduce(rows, [], fn fields, params ->
Enum.map_reduce(header, params, fn key, acc ->
case :lists.keyfind(key, 1, fields) do
{^key, {%Ecto.Query{} = query, query_params}} ->
{{query, length(query_params)}, Enum.reverse(query_params, acc)}
{^key, {:placeholder, placeholder_index}} ->
{{:placeholder, Integer.to_string(placeholder_index)}, acc}
{^key, value} ->
{key, [value | acc]}
false ->
{nil, acc}
end
end)
end)
end
@doc false
def execute(prepare, adapter_meta, query_meta, prepared, params, opts) do
%{num_rows: num, rows: rows} =
execute!(prepare, adapter_meta, prepared, params, put_source(opts, query_meta))
{num, rows}
end
defp execute!(prepare, adapter_meta, {:cache, update, {id, prepared}}, params, opts) do
name = prepare_name(prepare, id)
case sql_call(adapter_meta, :prepare_execute, [name, prepared], params, opts) do
{:ok, query, result} ->
maybe_update_cache(prepare, update, {id, query})
result
{:error, err} ->
raise_sql_call_error(err)
end
end
defp execute!(
:unnamed = prepare,
adapter_meta,
{:cached, _update, _reset, {id, cached}},
params,
opts
) do
name = prepare_name(prepare, id)
prepared = String.Chars.to_string(cached)
case sql_call(adapter_meta, :prepare_execute, [name, prepared], params, opts) do
{:ok, _query, result} ->
result
{:error, err} ->
raise_sql_call_error(err)
end
end
defp execute!(
:named = _prepare,
adapter_meta,
{:cached, update, reset, {id, cached}},
params,
opts
) do
case sql_call(adapter_meta, :execute, [cached], params, opts) do
{:ok, query, result} ->
update.({id, query})
result
{:ok, result} ->
result
{:error, err} ->
raise_sql_call_error(err)
{:reset, err} ->
reset.({id, String.Chars.to_string(cached)})
raise_sql_call_error(err)
end
end
defp execute!(_prepare, adapter_meta, {:nocache, {_id, prepared}}, params, opts) do
case sql_call(adapter_meta, :query, [prepared], params, opts) do
{:ok, res} -> res
{:error, err} -> raise_sql_call_error(err)
end
end
defp prepare_name(:named, id), do: "ecto_" <> Integer.to_string(id)
defp prepare_name(:unnamed, _id), do: ""
defp maybe_update_cache(:named = _prepare, update, value), do: update.(value)
defp maybe_update_cache(:unnamed = _prepare, _update, _value), do: :noop
@doc false
def stream(adapter_meta, query_meta, prepared, params, opts) do
do_stream(adapter_meta, prepared, params, put_source(opts, query_meta))
end
defp do_stream(adapter_meta, {:cache, _, {_, prepared}}, params, opts) do
prepare_stream(adapter_meta, prepared, params, opts)
end
defp do_stream(adapter_meta, {:cached, _, _, {_, cached}}, params, opts) do
prepare_stream(adapter_meta, String.Chars.to_string(cached), params, opts)
end
defp do_stream(adapter_meta, {:nocache, {_id, prepared}}, params, opts) do
prepare_stream(adapter_meta, prepared, params, opts)
end
defp prepare_stream(adapter_meta, prepared, params, opts) do
adapter_meta
|> Ecto.Adapters.SQL.Stream.build(prepared, params, opts)
|> Stream.map(fn %{num_rows: nrows, rows: rows} -> {nrows, rows} end)
end
defp raise_sql_call_error(%DBConnection.OwnershipError{} = err) do
message = err.message <> "\nSee Ecto.Adapters.SQL.Sandbox docs for more information."
raise %{err | message: message}
end
defp raise_sql_call_error(err), do: raise(err)
@doc false
def reduce(adapter_meta, statement, params, opts, acc, fun) do
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
opts = with_log(telemetry, params, opts ++ default_opts)
case get_conn(pool) do
%DBConnection{conn_mode: :transaction} = conn ->
sql
|> apply(:stream, [conn, statement, params, opts])
|> Enumerable.reduce(acc, fun)
_ ->
raise "cannot reduce stream outside of transaction"
end
end
@doc false
def into(adapter_meta, statement, params, opts) do
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
opts = with_log(telemetry, params, opts ++ default_opts)
case get_conn(pool) do
%DBConnection{conn_mode: :transaction} = conn ->
sql
|> apply(:stream, [conn, statement, params, opts])
|> Collectable.into()
_ ->
raise "cannot collect into stream outside of transaction"
end
end
@doc false
def struct(
adapter_meta,
conn,
sql,
operation,
source,
params,
values,
on_conflict,
returning,
opts
) do
opts =
if is_nil(Keyword.get(opts, :cache_statement)) do
[{:cache_statement, "ecto_#{operation}_#{source}_#{length(params)}"} | opts]
else
opts
end
case query(adapter_meta, sql, values, [source: source] ++ opts) do
{:ok, %{rows: nil, num_rows: 1}} ->
{:ok, []}
{:ok, %{rows: [values], num_rows: 1}} ->
{:ok, Enum.zip(returning, values)}
{:ok, %{num_rows: 0}} ->
if on_conflict == :nothing, do: {:ok, []}, else: {:error, :stale}
{:ok, %{num_rows: num_rows}} when num_rows > 1 ->
raise Ecto.MultiplePrimaryKeyError,
source: source,
params: params,
count: num_rows,
operation: operation
{:error, err} ->
case conn.to_constraints(err, source: source) do
[] -> raise_sql_call_error(err)
constraints -> {:invalid, constraints}
end
end
end
## Transactions
@doc false
def transaction(adapter_meta, opts, callback) do
checkout_or_transaction(:transaction, adapter_meta, opts, callback)
end
@doc false
def in_transaction?(%{pid: pool}) do
match?(%DBConnection{conn_mode: :transaction}, get_conn(pool))
end
@doc false
def rollback(%{pid: pool}, value) do
case get_conn(pool) do
%DBConnection{conn_mode: :transaction} = conn -> DBConnection.rollback(conn, value)
_ -> raise "cannot call rollback outside of transaction"
end
end
## Migrations
@doc false
def execute_ddl(meta, conn, definition, opts) do
ddl_logs =
definition
|> conn.execute_ddl()
|> List.wrap()
|> Enum.map(&query!(meta, &1, [], opts))
|> Enum.flat_map(&conn.ddl_logs/1)
{:ok, ddl_logs}
end
@doc false
def raise_migration_pool_size_error do
raise Ecto.MigrationError, """
Migrations failed to run because the connection pool size is less than 2.
Ecto requires a pool size of at least 2 to support concurrent migrators.
When migrations run, Ecto uses one connection to maintain a lock and
another to run migrations.
If you are running migrations with Mix, you can increase the number
of connections via the pool size option:
mix ecto.migrate --pool-size 2
If you are running the Ecto.Migrator programmatically, you can configure
the pool size via your application config:
config :my_app, Repo,
...,
pool_size: 2 # at least
"""
end
## Log
defp with_log(telemetry, params, opts) do
[log: &log(telemetry, params, &1, opts)] ++ opts
end
defp log({repo, log, event_name}, params, entry, opts) do
%{
connection_time: query_time,
decode_time: decode_time,
pool_time: queue_time,
idle_time: idle_time,
result: result,
query: query
} = entry
source = Keyword.get(opts, :source)
query = String.Chars.to_string(query)
result = with {:ok, _query, res} <- result, do: {:ok, res}
stacktrace = Keyword.get(opts, :stacktrace)
log_params = opts[:cast_params] || params
acc = if idle_time, do: [idle_time: idle_time], else: []
measurements =
log_measurements(
[query_time: query_time, decode_time: decode_time, queue_time: queue_time],
0,
acc
)
metadata = %{
type: :ecto_sql_query,
repo: repo,
result: result,
params: params,
cast_params: opts[:cast_params],
query: query,
source: source,
stacktrace: stacktrace,
options: Keyword.get(opts, :telemetry_options, [])
}
if event_name = Keyword.get(opts, :telemetry_event, event_name) do
:telemetry.execute(event_name, measurements, metadata)
end
case {opts[:log], log} do
{false, _level} ->
:ok
{opts_level, false} when opts_level in [nil, true] ->
:ok
{true, level} ->
Logger.log(
level,
fn -> log_iodata(measurements, repo, source, query, log_params, result, stacktrace) end,
ansi_color: sql_color(query)
)
{opts_level, args_level} ->
Logger.log(
opts_level || args_level,
fn -> log_iodata(measurements, repo, source, query, log_params, result, stacktrace) end,
ansi_color: sql_color(query)
)
end
:ok
end
defp log_measurements([{_, nil} | rest], total, acc),
do: log_measurements(rest, total, acc)
defp log_measurements([{key, value} | rest], total, acc),
do: log_measurements(rest, total + value, [{key, value} | acc])
defp log_measurements([], total, acc),
do: Map.new([total_time: total] ++ acc)
defp log_iodata(measurements, repo, source, query, params, result, stacktrace) do
[
"QUERY",
?\s,
log_ok_error(result),
log_ok_source(source),
log_time("db", measurements, :query_time, true),
log_time("decode", measurements, :decode_time, false),
log_time("queue", measurements, :queue_time, false),
log_time("idle", measurements, :idle_time, true),
?\n,
query,
?\s,
inspect(params, charlists: false),
log_stacktrace(stacktrace, repo)
]
end
defp log_ok_error({:ok, _res}), do: "OK"
defp log_ok_error({:error, _err}), do: "ERROR"
defp log_ok_source(nil), do: ""
defp log_ok_source(source), do: " source=#{inspect(source)}"
defp log_time(label, measurements, key, force) do
case measurements do
%{^key => time} ->
us = System.convert_time_unit(time, :native, :microsecond)
ms = div(us, 100) / 10
if force or ms > 0 do
[?\s, label, ?=, :io_lib_format.fwrite_g(ms), ?m, ?s]
else
[]
end
%{} ->
[]
end
end
defp log_stacktrace(stacktrace, repo) do
with [_ | _] <- stacktrace,
{module, function, arity, info} <- last_non_ecto(Enum.reverse(stacktrace), repo, nil) do
[
?\n,
IO.ANSI.light_black(),
"↳ ",
Exception.format_mfa(module, function, arity),
log_stacktrace_info(info),
IO.ANSI.reset()
]
else
_ -> []
end
end
defp log_stacktrace_info([file: file, line: line] ++ _) do
[", at: ", file, ?:, Integer.to_string(line)]
end
defp log_stacktrace_info(_) do
[]
end
@repo_modules [Ecto.Repo.Queryable, Ecto.Repo.Schema, Ecto.Repo.Transaction]
defp last_non_ecto([{mod, _, _, _} | _stacktrace], repo, last)
when mod == repo or mod in @repo_modules,
do: last
defp last_non_ecto([last | stacktrace], repo, _last),
do: last_non_ecto(stacktrace, repo, last)
defp last_non_ecto([], _repo, last),
do: last
## Connection helpers
defp checkout_or_transaction(fun, adapter_meta, opts, callback) do
%{pid: pool, telemetry: telemetry, opts: default_opts} = adapter_meta
opts = with_log(telemetry, [], opts ++ default_opts)
callback = fn conn ->
previous_conn = put_conn(pool, conn)
try do
callback.()
after
reset_conn(pool, previous_conn)
end
end
apply(DBConnection, fun, [get_conn_or_pool(pool, adapter_meta), callback, opts])
end
defp get_conn_or_pool(pool, adapter_meta) do
case :erlang.get(key(pool)) do
:undefined ->
case adapter_meta do
%{partition_supervisor: {name, _}} -> {:via, PartitionSupervisor, {name, self()}}
_ -> pool
end
conn ->
conn
end
end
defp get_conn(pool) do
Process.get(key(pool))
end
defp put_conn(pool, conn) do
Process.put(key(pool), conn)
end
defp reset_conn(pool, conn) do
if conn do
put_conn(pool, conn)
else
Process.delete(key(pool))
end
end
defp key(pool), do: {__MODULE__, pool}
defp sql_color("SELECT" <> _), do: :cyan
defp sql_color("ROLLBACK" <> _), do: :red
defp sql_color("LOCK" <> _), do: :white
defp sql_color("INSERT" <> _), do: :green
defp sql_color("UPDATE" <> _), do: :yellow
defp sql_color("DELETE" <> _), do: :red
defp sql_color("begin" <> _), do: :magenta
defp sql_color("commit" <> _), do: :magenta
defp sql_color(_), do: nil
end