defmodule Ecto.Adapters.SQL do
@moduledoc ~S"""
This application provides functionality for working with
SQL databases in `Ecto`.
## Built-in adapters
By default, we support the following adapters:
* `Ecto.Adapters.Postgres` for Postgres
* `Ecto.Adapters.MyXQL` for MySQL
* `Ecto.Adapters.Tds` for SQLServer
## Additional functions
If your `Ecto.Repo` is backed by any of the SQL adapters above,
this module will inject additional functions into your repository:
* `disconnect_all(interval, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.disconnect_all/3`
* `explain(type, query, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.explain/4`
* `query(sql, params, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.query/4`
* `query!(sql, params, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.query!/4`
* `query_many(sql, params, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.query_many/4`
* `query_many!(sql, params, options \\ [])` -
shortcut for `Ecto.Adapters.SQL.query_many!/4`
* `to_sql(type, query)` -
shortcut for `Ecto.Adapters.SQL.to_sql/3`
Generally speaking, you must invoke those functions directly from
your repository, for example: `MyApp.Repo.query("SELECT true")`.
You can also invoke them direcltly from `Ecto.Adapters.SQL`, but
keep in mind that in such cases features such as "dynamic repositories"
won't be available.
## Migrations
`ecto_sql` supports database migrations. You can generate a migration
with:
$ mix ecto.gen.migration create_posts
This will create a new file inside `priv/repo/migrations` with the
`change` function. Check `Ecto.Migration` for more information.
To interface with migrations, developers typically use mix tasks:
* `mix ecto.migrations` - lists all available migrations and their status
* `mix ecto.migrate` - runs a migration
* `mix ecto.rollback` - rolls back a previously run migration
If you want to run migrations programmatically, see `Ecto.Migrator`.
## SQL sandbox
`ecto_sql` provides a sandbox for testing. The sandbox wraps each
test in a transaction, making sure the tests are isolated and can
run concurrently. See `Ecto.Adapters.SQL.Sandbox` for more information.
## Structure load and dumping
If you have an existing database, you may want to dump its existing
structure and make it reproducible from within Ecto. This can be
achieved with two Mix tasks:
* `mix ecto.load` - loads an existing structure into the database
* `mix ecto.dump` - dumps the existing database structure to the filesystem
For creating and dropping databases, see `mix ecto.create`
and `mix ecto.drop` that are included as part of Ecto.
## Custom adapters
Developers can implement their own SQL adapters by using
`Ecto.Adapters.SQL` and by implementing the callbacks required
by `Ecto.Adapters.SQL.Connection` for handling connections and
performing queries. The connection handling and pooling for SQL
adapters should be built using the `DBConnection` library.
When using `Ecto.Adapters.SQL`, the following options are required:
* `:driver` (required) - the database driver library.
For example: `:postgrex`
"""
require Logger
@doc false
defmacro __using__(opts) do
quote do
@behaviour Ecto.Adapter
@behaviour Ecto.Adapter.Migration
@behaviour Ecto.Adapter.Queryable
@behaviour Ecto.Adapter.Schema
@behaviour Ecto.Adapter.Transaction
opts = unquote(opts)
@conn __MODULE__.Connection
@driver Keyword.fetch!(opts, :driver)
@impl true
defmacro __before_compile__(env) do
Ecto.Adapters.SQL.__before_compile__(@driver, env)
end
@impl true
def ensure_all_started(config, type) do
Ecto.Adapters.SQL.ensure_all_started(@driver, config, type)
end
@impl true
def init(config) do
Ecto.Adapters.SQL.init(@conn, @driver, config)
end
@impl true
def checkout(meta, opts, fun) do
Ecto.Adapters.SQL.checkout(meta, opts, fun)
end
@impl true
def checked_out?(meta) do
Ecto.Adapters.SQL.checked_out?(meta)
end
@impl true
def loaders({:map, _}, type), do: [&Ecto.Type.embedded_load(type, &1, :json)]
def loaders(:binary_id, type), do: [Ecto.UUID, type]
def loaders(_, type), do: [type]
@impl true
def dumpers({:map, _}, type), do: [&Ecto.Type.embedded_dump(type, &1, :json)]
def dumpers(:binary_id, type), do: [type, Ecto.UUID]
def dumpers(_, type), do: [type]
## Query
@impl true
def prepare(:all, query) do
{:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.all(query))}}
end
def prepare(:update_all, query) do
{:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.update_all(query))}}
end
def prepare(:delete_all, query) do
{:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.delete_all(query))}}
end
@impl true
def execute(adapter_meta, query_meta, query, params, opts) do
Ecto.Adapters.SQL.execute(:named, adapter_meta, query_meta, query, params, opts)
end
@impl true
def stream(adapter_meta, query_meta, query, params, opts) do
Ecto.Adapters.SQL.stream(adapter_meta, query_meta, query, params, opts)
end
## Schema
@impl true
def autogenerate(:id), do: nil
def autogenerate(:embed_id), do: Ecto.UUID.generate()
def autogenerate(:binary_id), do: Ecto.UUID.bingenerate()
@impl true
def insert_all(adapter_meta, schema_meta, header, rows, on_conflict, returning, placeholders, opts) do
Ecto.Adapters.SQL.insert_all(adapter_meta, schema_meta, @conn, header, rows, on_conflict, returning, placeholders, opts)
end
@impl true
def insert(adapter_meta, %{source: source, prefix: prefix}, params,
{kind, conflict_params, _} = on_conflict, returning, opts) do
{fields, values} = :lists.unzip(params)
sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning, [])
Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts)
end
@impl true
def update(adapter_meta, %{source: source, prefix: prefix}, fields, params, returning, opts) do
{fields, field_values} = :lists.unzip(fields)
filter_values = params |> Keyword.values() |> Enum.reject(&is_nil(&1))
sql = @conn.update(prefix, source, fields, params, returning)
Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :update, source, params, field_values ++ filter_values, :raise, returning, opts)
end
@impl true
def delete(adapter_meta, %{source: source, prefix: prefix}, params, opts) do
filter_values = params |> Keyword.values() |> Enum.reject(&is_nil(&1))
sql = @conn.delete(prefix, source, params, [])
Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :delete, source, params, filter_values, :raise, [], opts)
end
## Transaction
@impl true
def transaction(meta, opts, fun) do
Ecto.Adapters.SQL.transaction(meta, opts, fun)
end
@impl true
def in_transaction?(meta) do
Ecto.Adapters.SQL.in_transaction?(meta)
end
@impl true
def rollback(meta, value) do
Ecto.Adapters.SQL.rollback(meta, value)
end
## Migration
@impl true
def execute_ddl(meta, definition, opts) do
Ecto.Adapters.SQL.execute_ddl(meta, @conn, definition, opts)
end
defoverridable [prepare: 2, execute: 5, insert: 6, update: 6, delete: 4, insert_all: 8,
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1,
ensure_all_started: 2, __before_compile__: 1]
end
end
@doc """
Converts the given query to SQL according to its kind and the
adapter in the given repository.
## Examples
The examples below are meant for reference. Each adapter will
return a different result:
iex> Ecto.Adapters.SQL.to_sql(:all, Repo, Post)
{"SELECT p.id, p.title, p.inserted_at, p.created_at FROM posts as p", []}
iex> Ecto.Adapters.SQL.to_sql(:update_all, Repo,
from(p in Post, update: [set: [title: ^"hello"]]))
{"UPDATE posts AS p SET title = $1", ["hello"]}
This function is also available under the repository with name `to_sql`:
iex> Repo.to_sql(:all, Post)
{"SELECT p.id, p.title, p.inserted_at, p.created_at FROM posts as p", []}
"""
@spec to_sql(:all | :update_all | :delete_all, Ecto.Repo.t, Ecto.Queryable.t) ::
{String.t, [term]}
def to_sql(kind, repo, queryable) do
case Ecto.Adapter.Queryable.prepare_query(kind, repo, queryable) do
{{:cached, _update, _reset, {_id, cached}}, params} ->
{String.Chars.to_string(cached), params}
{{:cache, _update, {_id, prepared}}, params} ->
{prepared, params}
{{:nocache, {_id, prepared}}, params} ->
{prepared, params}
end
end
@doc """
Executes an EXPLAIN statement or similar for the given query according to its kind and the
adapter in the given repository.
## Examples
# Postgres
iex> Ecto.Adapters.SQL.explain(Repo, :all, Post)
"Seq Scan on posts p0 (cost=0.00..12.12 rows=1 width=443)"
# MySQL
iex> Ecto.Adapters.SQL.explain(Repo, :all, from(p in Post, where: p.title == "title")) |> IO.puts()
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+
| 1 | SIMPLE | p0 | NULL | ALL | NULL | NULL | NULL | NULL | 1 | 100.0 | Using where |
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+
# Shared opts
iex> Ecto.Adapters.SQL.explain(Repo, :all, Post, analyze: true, timeout: 20_000)
"Seq Scan on posts p0 (cost=0.00..11.70 rows=170 width=443) (actual time=0.013..0.013 rows=0 loops=1)\\nPlanning Time: 0.031 ms\\nExecution Time: 0.021 ms"
It's safe to execute it for updates and deletes, no data change will be committed:
iex> Ecto.Adapters.SQL.explain(Repo, :update_all, from(p in Post, update: [set: [title: "new title"]]))
"Update on posts p0 (cost=0.00..11.70 rows=170 width=449)\\n -> Seq Scan on posts p0 (cost=0.00..11.70 rows=170 width=449)"
This function is also available under the repository with name `explain`:
iex> Repo.explain(:all, from(p in Post, where: p.title == "title"))
"Seq Scan on posts p0 (cost=0.00..12.12 rows=1 width=443)\\n Filter: ((title)::text = 'title'::text)"
### Options
Built-in adapters support passing `opts` to the EXPLAIN statement according to the following:
Adapter | Supported opts
---------------- | --------------
Postgrex | `analyze`, `verbose`, `costs`, `settings`, `buffers`, `timing`, `summary`
MyXQL | None
_Postgrex_: Check [PostgreSQL doc](https://www.postgresql.org/docs/current/sql-explain.html)
for version compatibility.
_MyXQL_: `EXTENDED` and `PARTITIONS` opts were [deprecated](https://dev.mysql.com/doc/refman/5.7/en/explain.html)
and are enabled by default.
Also note that:
* Currently `:map`, `:yaml`, and `:text` format options are supported
for PostgreSQL. `:map` is the deserialized JSON encoding. The last two
options return the result as a string;
* Any other value passed to `opts` will be forwarded to the underlying
adapter query function, including Repo shared options such as `:timeout`;
* Non built-in adapters may have specific behavior and you should consult
their own documentation.
"""
@spec explain(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta,
:all | :update_all | :delete_all,
Ecto.Queryable.t, opts :: Keyword.t) :: String.t | Exception.t
def explain(repo, operation, queryable, opts \\ [])
def explain(repo, operation, queryable, opts) when is_atom(repo) or is_pid(repo) do
explain(Ecto.Adapter.lookup_meta(repo), operation, queryable, opts)
end
def explain(%{repo: repo} = adapter_meta, operation, queryable, opts) do
Ecto.Multi.new()
|> Ecto.Multi.run(:explain, fn _, _ ->
{prepared, prepared_params} = to_sql(operation, repo, queryable)
sql_call(adapter_meta, :explain_query, [prepared], prepared_params, opts)
end)
|> Ecto.Multi.run(:rollback, fn _, _ ->
{:error, :forced_rollback}
end)
|> repo.transaction(opts)
|> case do
{:error, :rollback, :forced_rollback, %{explain: result}} -> result
{:error, :explain, error, _} -> raise error
_ -> raise "unable to execute explain"
end
end
@doc """
Forces all connections in the repo pool to disconnect within the given interval.
Once this function is called, the pool will disconnect all of its connections
as they are checked in or as they are pinged. Checked in connections will be
randomly disconnected within the given time interval. Pinged connections are
immediately disconnected - as they are idle (according to `:idle_interval`).
If the connection has a backoff configured (which is the case by default),
disconnecting means an attempt at a new connection will be done immediately
after, without starting a new process for each connection. However, if backoff
has been disabled, the connection process will terminate. In such cases,
disconnecting all connections may cause the pool supervisor to restart
depending on the max_restarts/max_seconds configuration of the pool,
so you will want to set those carefully.
For convenience, this function is also available in the repository:
iex> MyRepo.disconnect_all(60_000)
:ok
"""
@spec disconnect_all(pid | Ecto.Repo.t | Ecto.Adapter.adapter_meta, non_neg_integer, opts :: Keyword.t()) :: :ok
def disconnect_all(repo, interval, opts \\ [])
def disconnect_all(repo, interval, opts) when is_atom(repo) or is_pid(repo) do
disconnect_all(Ecto.Adapter.lookup_meta(repo), interval, opts)
end
def disconnect_all(%{pid: pid} = _adapter_meta, interval, opts) do
DBConnection.disconnect_all(pid, interval, opts)
end
@doc """
Returns a stream that runs a custom SQL query on given repo when reduced.
In case of success it is a enumerable containing maps with at least two keys:
* `:num_rows` - the number of rows affected
* `:rows` - the result set as a list. `nil` may be returned
instead of the list if the command does not yield any row
as result (but still yields the number of affected rows,
like a `delete` command without returning would)
In case of failure it raises an exception.
If the adapter supports a collectable stream, the stream may also be used as
the collectable in `Enum.into/3`. Behaviour depends on the adapter.
## Options
* `:log` - When false, does not log the query
* `:max_rows` - The number of rows to load from the database as we stream
## Examples
iex> Ecto.Adapters.SQL.stream(MyRepo, "SELECT $1::integer + $2", [40, 2]) |> Enum.to_list()
[%{rows: [[42]], num_rows: 1}]
"""
@spec stream(Ecto.Repo.t, String.t, [term], Keyword.t) :: Enum.t
def stream(repo, sql, params \\ [], opts \\ []) do
repo
|> Ecto.Adapter.lookup_meta()
|> Ecto.Adapters.SQL.Stream.build(sql, params, opts)
end
@doc """
Same as `query/4` but raises on invalid queries.
"""
@spec query!(Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) ::
%{:rows => nil | [[term] | binary],
:num_rows => non_neg_integer,
optional(atom) => any}
def query!(repo, sql, params \\ [], opts \\ []) do
case query(repo, sql, params, opts) do
{:ok, result} -> result
{:error, err} -> raise_sql_call_error err
end
end
@doc """
Runs a custom SQL query on the given repo.
In case of success, it must return an `:ok` tuple containing
a map with at least two keys:
* `:num_rows` - the number of rows affected
* `:rows` - the result set as a list. `nil` may be returned
instead of the list if the command does not yield any row
as result (but still yields the number of affected rows,
like a `delete` command without returning would)
## Options
* `:log` - When false, does not log the query
## Examples
iex> Ecto.Adapters.SQL.query(MyRepo, "SELECT $1::integer + $2", [40, 2])
{:ok, %{rows: [[42]], num_rows: 1}}
For convenience, this function is also available under the repository:
iex> MyRepo.query("SELECT $1::integer + $2", [40, 2])
{:ok, %{rows: [[42]], num_rows: 1}}
"""
@spec query(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) ::
{:ok, %{:rows => nil | [[term] | binary],
:num_rows => non_neg_integer,
optional(atom) => any}}
| {:error, Exception.t}
def query(repo, sql, params \\ [], opts \\ [])
def query(repo, sql, params, opts) when is_atom(repo) or is_pid(repo) do
query(Ecto.Adapter.lookup_meta(repo), sql, params, opts)
end
def query(adapter_meta, sql, params, opts) do
sql_call(adapter_meta, :query, [sql], params, opts)
end
@doc """
Same as `query_many/4` but raises on invalid queries.
"""
@spec query_many!(Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) ::
[%{:rows => nil | [[term] | binary],
:num_rows => non_neg_integer,
optional(atom) => any}]
def query_many!(repo, sql, params \\ [], opts \\ []) do
case query_many(repo, sql, params, opts) do
{:ok, result} -> result
{:error, err} -> raise_sql_call_error err
end
end
@doc """
Runs a custom SQL query that returns multiple results on the given repo.
In case of success, it must return an `:ok` tuple containing
a list of maps with at least two keys:
* `:num_rows` - the number of rows affected
* `:rows` - the result set as a list. `nil` may be returned
instead of the list if the command does not yield any row
as result (but still yields the number of affected rows,
like a `delete` command without returning would)
## Options
* `:log` - When false, does not log the query
## Examples
iex> Ecto.Adapters.SQL.query_many(MyRepo, "SELECT $1; SELECT $2;", [40, 2])
{:ok, [%{rows: [[40]], num_rows: 1}, %{rows: [[2]], num_rows: 1}]}
For convenience, this function is also available under the repository:
iex> MyRepo.query_many(SELECT $1; SELECT $2;", [40, 2])
{:ok, [%{rows: [[40]], num_rows: 1}, %{rows: [[2]], num_rows: 1}]}
"""
@spec query_many(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) ::
{:ok, [%{:rows => nil | [[term] | binary],
:num_rows => non_neg_integer,
optional(atom) => any}]}
| {:error, Exception.t}
def query_many(repo, sql, params \\ [], opts \\ [])
def query_many(repo, sql, params, opts) when is_atom(repo) or is_pid(repo) do
query_many(Ecto.Adapter.lookup_meta(repo), sql, params, opts)
end
def query_many(adapter_meta, sql, params, opts) do
sql_call(adapter_meta, :query_many, [sql], params, opts)
end
defp sql_call(adapter_meta, callback, args, params, opts) do
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
conn = get_conn_or_pool(pool)
opts = with_log(telemetry, params, opts ++ default_opts)
args = args ++ [params, opts]
apply(sql, callback, [conn | args])
end
defp put_source(opts, %{sources: sources}) when is_binary(elem(elem(sources, 0), 0)) do
{source, _, _} = elem(sources, 0)
[source: source] ++ opts
end
defp put_source(opts, _) do
opts
end
@doc """
Check if the given `table` exists.
Returns `true` if the `table` exists in the `repo`, otherwise `false`.
The table is checked against the current database/schema in the connection.
"""
@spec table_exists?(Ecto.Repo.t, table :: String.t) :: boolean
def table_exists?(repo, table) when is_atom(repo) do
%{sql: sql} = adapter_meta = Ecto.Adapter.lookup_meta(repo)
{query, params} = sql.table_exists_query(table)
query!(adapter_meta, query, params, []).num_rows != 0
end
# Returns a formatted table for a given query `result`.
#
# ## Examples
#
# iex> Ecto.Adapters.SQL.format_table(query) |> IO.puts()
# +---------------+---------+--------+
# | title | counter | public |
# +---------------+---------+--------+
# | My Post Title | 1 | NULL |
# +---------------+---------+--------+
@doc false
@spec format_table(%{:columns => [String.t] | nil, :rows => [term()] | nil, optional(atom) => any()}) :: String.t
def format_table(result)
def format_table(nil), do: ""
def format_table(%{columns: nil}), do: ""
def format_table(%{columns: []}), do: ""
def format_table(%{columns: columns, rows: nil}), do: format_table(%{columns: columns, rows: []})
def format_table(%{columns: columns, rows: rows}) do
column_widths =
[columns | rows]
|> List.zip()
|> Enum.map(&Tuple.to_list/1)
|> Enum.map(fn column_with_rows ->
column_with_rows |> Enum.map(&binary_length/1) |> Enum.max()
end)
[
separator(column_widths),
"\n",
cells(columns, column_widths),
"\n",
separator(column_widths),
"\n",
Enum.map(rows, &cells(&1, column_widths) ++ ["\n"]),
separator(column_widths)
]
|> IO.iodata_to_binary()
end
defp binary_length(nil), do: 4 # NULL
defp binary_length(binary) when is_binary(binary), do: String.length(binary)
defp binary_length(other), do: other |> inspect() |> String.length()
defp separator(widths) do
Enum.map(widths, & [?+, ?-, String.duplicate("-", &1), ?-]) ++ [?+]
end
defp cells(items, widths) do
cell =
[items, widths]
|> List.zip()
|> Enum.map(fn {item, width} -> [?|, " ", format_item(item, width) , " "] end)
[cell | [?|]]
end
defp format_item(nil, width), do: String.pad_trailing("NULL", width)
defp format_item(item, width) when is_binary(item), do: String.pad_trailing(item, width)
defp format_item(item, width) when is_number(item), do: item |> inspect() |> String.pad_leading(width)
defp format_item(item, width), do: item |> inspect() |> String.pad_trailing(width)
## Callbacks
@doc false
def __before_compile__(driver, _env) do
case Application.get_env(:ecto, :json_library) do
nil ->
:ok
Jason ->
IO.warn """
Jason is the default :json_library in Ecto 3.0.
You no longer need to configure it explicitly,
please remove this line from your config files:
config :ecto, :json_library, Jason
"""
value ->
IO.warn """
The :json_library configuration for the :ecto application is deprecated.
Please configure the :json_library in the driver instead:
config #{inspect driver}, :json_library, #{inspect value}
"""
end
quote do
@doc """
A convenience function for SQL-based repositories that executes the given query.
See `Ecto.Adapters.SQL.query/4` for more information.
"""
def query(sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.query(get_dynamic_repo(), sql, params, opts)
end
@doc """
A convenience function for SQL-based repositories that executes the given query.
See `Ecto.Adapters.SQL.query!/4` for more information.
"""
def query!(sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.query!(get_dynamic_repo(), sql, params, opts)
end
@doc """
A convenience function for SQL-based repositories that executes the given multi-result query.
See `Ecto.Adapters.SQL.query_many/4` for more information.
"""
def query_many(sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.query_many(get_dynamic_repo(), sql, params, opts)
end
@doc """
A convenience function for SQL-based repositories that executes the given multi-result query.
See `Ecto.Adapters.SQL.query_many!/4` for more information.
"""
def query_many!(sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.query_many!(get_dynamic_repo(), sql, params, opts)
end
@doc """
A convenience function for SQL-based repositories that translates the given query to SQL.
See `Ecto.Adapters.SQL.to_sql/3` for more information.
"""
def to_sql(operation, queryable) do
Ecto.Adapters.SQL.to_sql(operation, get_dynamic_repo(), queryable)
end
@doc """
A convenience function for SQL-based repositories that executes an EXPLAIN statement or similar
depending on the adapter to obtain statistics for the given query.
See `Ecto.Adapters.SQL.explain/4` for more information.
"""
def explain(operation, queryable, opts \\ []) do
Ecto.Adapters.SQL.explain(get_dynamic_repo(), operation, queryable, opts)
end
@doc """
A convenience function for SQL-based repositories that forces all connections in the
pool to disconnect within the given interval.
See `Ecto.Adapters.SQL.disconnect_all/3` for more information.
"""
def disconnect_all(interval, opts \\ []) do
Ecto.Adapters.SQL.disconnect_all(get_dynamic_repo(), interval, opts)
end
end
end
@doc false
def ensure_all_started(driver, _config, type) do
Application.ensure_all_started(driver, type)
end
@pool_opts [:timeout, :pool, :pool_size] ++
[:queue_target, :queue_interval, :ownership_timeout, :repo]
@doc false
def init(connection, driver, config) do
unless Code.ensure_loaded?(connection) do
raise """
could not find #{inspect connection}.
Please verify you have added #{inspect driver} as a dependency:
{#{inspect driver}, ">= 0.0.0"}
And remember to recompile Ecto afterwards by cleaning the current build:
mix deps.clean --build ecto
"""
end
log = Keyword.get(config, :log, :debug)
stacktrace = Keyword.get(config, :stacktrace, nil)
telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
telemetry = {config[:repo], log, telemetry_prefix ++ [:query]}
config = adapter_config(config)
opts = Keyword.take(config, @pool_opts)
meta = %{telemetry: telemetry, sql: connection, stacktrace: stacktrace, opts: opts}
{:ok, connection.child_spec(config), meta}
end
defp adapter_config(config) do
if Keyword.has_key?(config, :pool_timeout) do
message = """
:pool_timeout option no longer has an effect and has been replaced with an improved queuing system.
See \"Queue config\" in DBConnection.start_link/2 documentation for more information.
"""
IO.warn(message)
end
config
|> Keyword.delete(:name)
|> Keyword.update(:pool, DBConnection.ConnectionPool, &normalize_pool/1)
end
defp normalize_pool(pool) do
if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do
DBConnection.Ownership
else
pool
end
end
@doc false
def checkout(adapter_meta, opts, callback) do
checkout_or_transaction(:run, adapter_meta, opts, callback)
end
@doc false
def checked_out?(adapter_meta) do
%{pid: pool} = adapter_meta
get_conn(pool) != nil
end
## Query
@doc false
def insert_all(adapter_meta, schema_meta, conn, header, rows, on_conflict, returning, placeholders, opts) do
%{source: source, prefix: prefix} = schema_meta
{_, conflict_params, _} = on_conflict
{rows, params} =
case rows do
{%Ecto.Query{} = query, params} -> {query, Enum.reverse(params)}
rows -> unzip_inserts(header, rows)
end
sql = conn.insert(prefix, source, header, rows, on_conflict, returning, placeholders)
opts = if is_nil(Keyword.get(opts, :cache_statement)) do
[{:cache_statement, "ecto_insert_all_#{source}"} | opts]
else
opts
end
all_params = placeholders ++ Enum.reverse(params, conflict_params)
%{num_rows: num, rows: rows} = query!(adapter_meta, sql, all_params, opts)
{num, rows}
end
defp unzip_inserts(header, rows) do
Enum.map_reduce rows, [], fn fields, params ->
Enum.map_reduce header, params, fn key, acc ->
case :lists.keyfind(key, 1, fields) do
{^key, {%Ecto.Query{} = query, query_params}} ->
{{query, length(query_params)}, Enum.reverse(query_params, acc)}
{^key, {:placeholder, placeholder_index}} ->
{{:placeholder, Integer.to_string(placeholder_index)}, acc}
{^key, value} -> {key, [value | acc]}
false -> {nil, acc}
end
end
end
end
@doc false
def execute(prepare, adapter_meta, query_meta, prepared, params, opts) do
%{num_rows: num, rows: rows} =
execute!(prepare, adapter_meta, prepared, params, put_source(opts, query_meta))
{num, rows}
end
defp execute!(prepare, adapter_meta, {:cache, update, {id, prepared}}, params, opts) do
name = prepare_name(prepare, id)
case sql_call(adapter_meta, :prepare_execute, [name, prepared], params, opts) do
{:ok, query, result} ->
maybe_update_cache(prepare, update, {id, query})
result
{:error, err} ->
raise_sql_call_error err
end
end
defp execute!(:unnamed = prepare, adapter_meta, {:cached, _update, _reset, {id, cached}}, params, opts) do
name = prepare_name(prepare, id)
prepared = String.Chars.to_string(cached)
case sql_call(adapter_meta, :prepare_execute, [name, prepared], params, opts) do
{:ok, _query, result} ->
result
{:error, err} ->
raise_sql_call_error err
end
end
defp execute!(:named = _prepare, adapter_meta, {:cached, update, reset, {id, cached}}, params, opts) do
case sql_call(adapter_meta, :execute, [cached], params, opts) do
{:ok, query, result} ->
update.({id, query})
result
{:ok, result} ->
result
{:error, err} ->
raise_sql_call_error err
{:reset, err} ->
reset.({id, String.Chars.to_string(cached)})
raise_sql_call_error err
end
end
defp execute!(_prepare, adapter_meta, {:nocache, {_id, prepared}}, params, opts) do
case sql_call(adapter_meta, :query, [prepared], params, opts) do
{:ok, res} -> res
{:error, err} -> raise_sql_call_error err
end
end
defp prepare_name(:named, id), do: "ecto_" <> Integer.to_string(id)
defp prepare_name(:unnamed, _id), do: ""
defp maybe_update_cache(:named = _prepare, update, value), do: update.(value)
defp maybe_update_cache(:unnamed = _prepare, _update, _value), do: :noop
@doc false
def stream(adapter_meta, query_meta, prepared, params, opts) do
do_stream(adapter_meta, prepared, params, put_source(opts, query_meta))
end
defp do_stream(adapter_meta, {:cache, _, {_, prepared}}, params, opts) do
prepare_stream(adapter_meta, prepared, params, opts)
end
defp do_stream(adapter_meta, {:cached, _, _, {_, cached}}, params, opts) do
prepare_stream(adapter_meta, String.Chars.to_string(cached), params, opts)
end
defp do_stream(adapter_meta, {:nocache, {_id, prepared}}, params, opts) do
prepare_stream(adapter_meta, prepared, params, opts)
end
defp prepare_stream(adapter_meta, prepared, params, opts) do
adapter_meta
|> Ecto.Adapters.SQL.Stream.build(prepared, params, opts)
|> Stream.map(fn(%{num_rows: nrows, rows: rows}) -> {nrows, rows} end)
end
defp raise_sql_call_error(%DBConnection.OwnershipError{} = err) do
message = err.message <> "\nSee Ecto.Adapters.SQL.Sandbox docs for more information."
raise %{err | message: message}
end
defp raise_sql_call_error(err), do: raise err
@doc false
def reduce(adapter_meta, statement, params, opts, acc, fun) do
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
opts = with_log(telemetry, params, opts ++ default_opts)
case get_conn(pool) do
%DBConnection{conn_mode: :transaction} = conn ->
sql
|> apply(:stream, [conn, statement, params, opts])
|> Enumerable.reduce(acc, fun)
_ ->
raise "cannot reduce stream outside of transaction"
end
end
@doc false
def into(adapter_meta, statement, params, opts) do
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
opts = with_log(telemetry, params, opts ++ default_opts)
case get_conn(pool) do
%DBConnection{conn_mode: :transaction} = conn ->
sql
|> apply(:stream, [conn, statement, params, opts])
|> Collectable.into()
_ ->
raise "cannot collect into stream outside of transaction"
end
end
@doc false
def struct(adapter_meta, conn, sql, operation, source, params, values, on_conflict, returning, opts) do
opts = if is_nil(Keyword.get(opts, :cache_statement)) do
[{:cache_statement, "ecto_#{operation}_#{source}_#{length(params)}"} | opts]
else
opts
end
case query(adapter_meta, sql, values, opts) do
{:ok, %{rows: nil, num_rows: 1}} ->
{:ok, []}
{:ok, %{rows: [values], num_rows: 1}} ->
{:ok, Enum.zip(returning, values)}
{:ok, %{num_rows: 0}} ->
if on_conflict == :nothing, do: {:ok, []}, else: {:error, :stale}
{:ok, %{num_rows: num_rows}} when num_rows > 1 ->
raise Ecto.MultiplePrimaryKeyError,
source: source, params: params, count: num_rows, operation: operation
{:error, err} ->
case conn.to_constraints(err, source: source) do
[] -> raise_sql_call_error err
constraints -> {:invalid, constraints}
end
end
end
## Transactions
@doc false
def transaction(adapter_meta, opts, callback) do
checkout_or_transaction(:transaction, adapter_meta, opts, callback)
end
@doc false
def in_transaction?(%{pid: pool}) do
match?(%DBConnection{conn_mode: :transaction}, get_conn(pool))
end
@doc false
def rollback(%{pid: pool}, value) do
case get_conn(pool) do
%DBConnection{conn_mode: :transaction} = conn -> DBConnection.rollback(conn, value)
_ -> raise "cannot call rollback outside of transaction"
end
end
## Migrations
@doc false
def execute_ddl(meta, conn, definition, opts) do
ddl_logs =
definition
|> conn.execute_ddl()
|> List.wrap()
|> Enum.map(&query!(meta, &1, [], opts))
|> Enum.flat_map(&conn.ddl_logs/1)
{:ok, ddl_logs}
end
@doc false
def raise_migration_pool_size_error do
raise Ecto.MigrationError, """
Migrations failed to run because the connection pool size is less than 2.
Ecto requires a pool size of at least 2 to support concurrent migrators.
When migrations run, Ecto uses one connection to maintain a lock and
another to run migrations.
If you are running migrations with Mix, you can increase the number
of connections via the pool size option:
mix ecto.migrate --pool-size 2
If you are running the Ecto.Migrator programmatically, you can configure
the pool size via your application config:
config :my_app, Repo,
...,
pool_size: 2 # at least
"""
end
## Log
defp with_log(telemetry, params, opts) do
[log: &log(telemetry, params, &1, opts)] ++ opts
end
defp log({repo, log, event_name}, params, entry, opts) do
%{
connection_time: query_time,
decode_time: decode_time,
pool_time: queue_time,
idle_time: idle_time,
result: result,
query: query
} = entry
source = Keyword.get(opts, :source)
query = String.Chars.to_string(query)
result = with {:ok, _query, res} <- result, do: {:ok, res}
stacktrace = Keyword.get(opts, :stacktrace)
params =
Enum.map(params, fn
%Ecto.Query.Tagged{value: value} -> value
value -> value
end)
acc =
if idle_time, do: [idle_time: idle_time], else: []
measurements =
log_measurements(
[query_time: query_time, decode_time: decode_time, queue_time: queue_time],
0,
acc
)
metadata = %{
type: :ecto_sql_query,
repo: repo,
result: result,
params: params,
query: query,
source: source,
stacktrace: stacktrace,
options: Keyword.get(opts, :telemetry_options, [])
}
if event_name = Keyword.get(opts, :telemetry_event, event_name) do
:telemetry.execute(event_name, measurements, metadata)
end
case Keyword.get(opts, :log, log) do
true ->
Logger.log(
log,
fn -> log_iodata(measurements, repo, source, query, params, result, stacktrace) end,
ansi_color: sql_color(query)
)
false ->
:ok
level ->
Logger.log(
level,
fn -> log_iodata(measurements, repo, source, query, params, result, stacktrace) end,
ansi_color: sql_color(query)
)
end
:ok
end
defp log_measurements([{_, nil} | rest], total, acc),
do: log_measurements(rest, total, acc)
defp log_measurements([{key, value} | rest], total, acc),
do: log_measurements(rest, total + value, [{key, value} | acc])
defp log_measurements([], total, acc),
do: Map.new([total_time: total] ++ acc)
defp log_iodata(measurements, repo, source, query, params, result, stacktrace) do
[
"QUERY",
?\s,
log_ok_error(result),
log_ok_source(source),
log_time("db", measurements, :query_time, true),
log_time("decode", measurements, :decode_time, false),
log_time("queue", measurements, :queue_time, false),
log_time("idle", measurements, :idle_time, true),
?\n,
query,
?\s,
inspect(params, charlists: false),
log_stacktrace(stacktrace, repo)
]
end
defp log_ok_error({:ok, _res}), do: "OK"
defp log_ok_error({:error, _err}), do: "ERROR"
defp log_ok_source(nil), do: ""
defp log_ok_source(source), do: " source=#{inspect(source)}"
defp log_time(label, measurements, key, force) do
case measurements do
%{^key => time} ->
us = System.convert_time_unit(time, :native, :microsecond)
ms = div(us, 100) / 10
if force or ms > 0 do
[?\s, label, ?=, :io_lib_format.fwrite_g(ms), ?m, ?s]
else
[]
end
%{} ->
[]
end
end
defp log_stacktrace(stacktrace, repo) do
with [_ | _] <- stacktrace,
{module, function, arity, info} <- last_non_ecto(Enum.reverse(stacktrace), repo, nil) do
[
?\n,
"↳ ",
Exception.format_mfa(module, function, arity),
log_stacktrace_info(info)
]
else
_ -> []
end
end
defp log_stacktrace_info([file: file, line: line] ++ _) do
[", at: ", file, ?#, Integer.to_string(line)]
end
defp log_stacktrace_info(_) do
[]
end
@repo_modules [Ecto.Repo.Queryable, Ecto.Repo.Schema, Ecto.Repo.Transaction]
defp last_non_ecto([{mod, _, _, _} | _stacktrace], repo, last)
when mod == repo or mod in @repo_modules,
do: last
defp last_non_ecto([last | stacktrace], repo, _last),
do: last_non_ecto(stacktrace, repo, last)
defp last_non_ecto([], _repo, last),
do: last
## Connection helpers
defp checkout_or_transaction(fun, adapter_meta, opts, callback) do
%{pid: pool, telemetry: telemetry, opts: default_opts} = adapter_meta
opts = with_log(telemetry, [], opts ++ default_opts)
callback = fn conn ->
previous_conn = put_conn(pool, conn)
try do
callback.()
after
reset_conn(pool, previous_conn)
end
end
apply(DBConnection, fun, [get_conn_or_pool(pool), callback, opts])
end
defp get_conn_or_pool(pool) do
Process.get(key(pool), pool)
end
defp get_conn(pool) do
Process.get(key(pool))
end
defp put_conn(pool, conn) do
Process.put(key(pool), conn)
end
defp reset_conn(pool, conn) do
if conn do
put_conn(pool, conn)
else
Process.delete(key(pool))
end
end
defp key(pool), do: {__MODULE__, pool}
defp sql_color("SELECT" <> _), do: :cyan
defp sql_color("ROLLBACK" <> _), do: :red
defp sql_color("LOCK" <> _), do: :white
defp sql_color("INSERT" <> _), do: :green
defp sql_color("UPDATE" <> _), do: :yellow
defp sql_color("DELETE" <> _), do: :red
defp sql_color("begin" <> _), do: :magenta
defp sql_color("commit" <> _), do: :magenta
defp sql_color(_), do: nil
end