defmodule Ecto.Adapters.DynamoDB do
@moduledoc """
Ecto adapter for Amazon DynamoDB.
"""
@behaviour Ecto.Adapter
@behaviour Ecto.Adapter.Schema
@behaviour Ecto.Adapter.Queryable
@behaviour Ecto.Adapter.Migration
@impl Ecto.Adapter
defmacro __before_compile__(_env) do
# Nothing to see here, yet...
end
use Bitwise, only_operators: true
alias Confex.Resolver
alias Ecto.Adapters.DynamoDB.Cache
alias Ecto.Adapters.DynamoDB.DynamoDBSet
alias Ecto.Adapters.DynamoDB.RepoConfig
alias Ecto.Query.BooleanExpr
alias ExAws.Dynamo
require Logger
@pool_opts [:timeout, :pool_size, :migration_lock]
@max_transaction_conflict_retries Application.get_env(
:ecto_adapters_dynamodb,
:max_transaction_conflict_retries,
10
)
# DynamoDB will reject attempts to batch write more than 25 records at once
# https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
@batch_write_item_limit 25
@impl Ecto.Adapter
def init(config) do
log = Keyword.get(config, :log, :debug)
telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
meta = %{
opts: Keyword.take(config, @pool_opts),
telemetry: {config[:repo], log, telemetry_prefix},
migration_source: Keyword.get(config, :migration_source, "schema_migrations")
}
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.init", %{
"#{inspect(__MODULE__)}.init-params" => %{config: config}
})
{:ok, Cache.child_spec([config[:repo]]), meta}
end
@doc """
Ensure all applications necessary to run the adapter are started.
"""
@impl Ecto.Adapter
def ensure_all_started(config, type) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.ensure_all_started", %{
"#{inspect(__MODULE__)}.ensure_all_started-params" => %{type: type, config: config}
})
with {:ok, _} = Application.ensure_all_started(:ecto_adapters_dynamodb) do
{:ok, [config]}
end
end
@impl Ecto.Adapter.Migration
def supports_ddl_transaction?, do: false
@impl Ecto.Adapter.Migration
def execute_ddl(adapter_meta, command, options) do
Ecto.Adapters.DynamoDB.Migration.execute_ddl(adapter_meta, command, options)
end
@impl Ecto.Adapter.Migration
def lock_for_migrations(%{opts: adapter_opts} = _meta, _opts, callback) do
# TODO - consider adding support for this? See https://github.com/circles-learning-labs/ecto_adapters_dynamodb/issues/34
if Keyword.get(adapter_opts, :migration_lock) do
raise "#{inspect(__MODULE__)}.lock_for_migrations error: #{inspect(__MODULE__)} does not currently support migration table lock; please remove the :migration_lock option from your repo configuration or set it to nil"
else
callback.()
end
end
@impl Ecto.Adapter
def checkout(_meta, _opts, _fun) do
# TODO - consider adding support for this? See https://github.com/circles-learning-labs/ecto_adapters_dynamodb/issues/33
raise "#{inspect(__MODULE__)}.checkout: #{inspect(__MODULE__)} does not currently support checkout"
end
@impl Ecto.Adapter
def checked_out?(_meta), do: false
@impl Ecto.Adapter.Queryable
def stream(_adapter_meta, _query_meta, _query, _params, _opts) do
# TODO - consider adding support for this? See https://github.com/circles-learning-labs/ecto_adapters_dynamodb/issues/32
raise "#{inspect(__MODULE__)}.stream: #{inspect(__MODULE__)} does not currently support stream"
end
@doc """
Called to autogenerate a value for id/embed_id/binary_id.
Returns the autogenerated value, or nil if it must be
autogenerated inside the storage or raise if not supported.
For the Ecto type, `:id`, the adapter autogenerates a 128-bit integer
For the Ecto type, `:embed_id`, the adapter autogenerates a string, using `Ecto.UUID.generate()`
For the Ecto type, `:binary_id`, the adapter autogenerates a string, using `Ecto.UUID.generate()`
"""
# biggest possible int in 128 bits
@max_id (1 <<< 128) - 1
@impl Ecto.Adapter.Schema
def autogenerate(:id), do: Enum.random(1..@max_id)
def autogenerate(:embed_id), do: Ecto.UUID.generate()
def autogenerate(:binary_id), do: Ecto.UUID.generate()
@doc """
Returns the loaders for a given type.
Rather than use the Ecto adapter loaders callback, the adapter builds on ExAws' decoding functionality, please see ExAws's `ExAws.Dynamo.Decoder`, in this module, which at this time only loads :utc_datetime and :naive_datetime.
"""
@impl Ecto.Adapter
def loaders(_primitive, type), do: [type]
@doc """
Returns the dumpers for a given type.
We rely on ExAws encoding functionality during insertion and update to properly format types for DynamoDB. Please see ExAws `ExAws.Dynamo.update_item` and `ExAws.Dynamo.put_item` for specifics. Currently, we only modify :utc_datetime and :naive_datetime, appending the UTC offset, "Z", to the datetime string before passing to ExAws.
"""
@impl Ecto.Adapter
def dumpers(type, datetime)
when type in [:naive_datetime, :naive_datetime_usec, :utc_datetime, :utc_datetime_usec],
do: [datetime, &to_iso_string/1]
def dumpers(_primitive, type), do: [type]
# Add UTC offset
# We are adding the offset here also for the :naive_datetime, this
# assumes we are getting a UTC date (which does correspond with the
# timestamps() macro but not necessarily with :naive_datetime in general)
defp to_iso_string(datetime) do
iso_string =
case datetime do
%NaiveDateTime{} ->
(datetime |> NaiveDateTime.to_iso8601()) <> "Z"
%DateTime{} ->
datetime |> DateTime.to_iso8601()
nil ->
nil
end
{:ok, iso_string}
end
@doc """
Commands invoked to prepare a query for `all`, `update_all` and `delete_all`.
The returned result is given to `execute/6`.
"""
# @callback prepare(atom :: :all | :update_all | :delete_all, query :: Ecto.Query.t) ::
# {:cache, prepared} | {:nocache, prepared}
@impl Ecto.Adapter.Queryable
def prepare(:all, query) do
# 'preparing' is more a SQL concept - Do we really need to do anything here or just pass the params through?
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.prepare: :all", %{
"#{inspect(__MODULE__)}.prepare-params" => %{query: inspect(query, structs: false)}
})
{:nocache, {:all, query}}
end
def prepare(:update_all, query) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.prepare: :update_all", %{
"#{inspect(__MODULE__)}.prepare-params" => %{query: inspect(query, structs: false)}
})
{:nocache, {:update_all, query}}
end
def prepare(:delete_all, query) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.prepare: :delete_all", %{
"#{inspect(__MODULE__)}.prepare-params" => %{query: inspect(query, structs: false)}
})
{:nocache, {:delete_all, query}}
end
@doc """
Executes a previously prepared query.
It must return a tuple containing the number of entries and
the result set as a list of lists. The result set may also be
`nil` if a particular operation does not support them.
The `meta` field is a map containing some of the fields found
in the `Ecto.Query` struct.
It receives a process function that should be invoked for each
selected field in the query result in order to convert them to the
expected Ecto type. The `process` function will be nil if no
result set is expected from the query.
"""
# @callback execute(repo, query_meta, query, params :: list(), process | nil, options) :: result when
# result: {integer, [[term]] | nil} | no_return,
# query: {:nocache, prepared} |
# {:cached, (prepared -> :ok), cached} |
# {:cache, (cached -> :ok), prepared}
@impl Ecto.Adapter.Queryable
def execute(
%{repo: repo, migration_source: migration_source},
query_meta,
{:nocache, {func, prepared}},
params,
opts
) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.execute", %{
"#{inspect(__MODULE__)}.execute-params" => %{
repo: repo,
query_meta: query_meta,
prepared: prepared,
params: params,
opts: opts
}
})
# table and model are now nested under .from.source
{table, model} = prepared.from.source
validate_where_clauses!(prepared)
lookup_fields = extract_lookup_fields(prepared.wheres, params, [])
limit_option = opts[:scan_limit]
scan_limit = if is_integer(limit_option), do: [limit: limit_option], else: []
updated_opts =
if table == migration_source do
ecto_dynamo_log(
:debug,
"#{inspect(__MODULE__)}.execute: table name corresponds with migration source: #{inspect(migration_source)}. Setting options for recursive scan.",
%{}
)
Keyword.drop(opts, [:timeout, :log, :telemetry_options]) ++ [recursive: true]
else
Keyword.drop(opts, [:scan_limit, :limit, :telemetry_options]) ++ scan_limit
end
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.execute: local variables", %{
"#{inspect(__MODULE__)}.execute-vars" => %{
table: table,
lookup_fields: lookup_fields,
scan_limit: scan_limit
}
})
case func do
:delete_all ->
delete_all(repo, table, lookup_fields, updated_opts)
:update_all ->
update_all(repo, table, lookup_fields, updated_opts, prepared.updates, params)
:all ->
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.execute: :all", %{
"#{inspect(__MODULE__)}.execute-all-vars" => %{
table: table,
lookup_fields: lookup_fields,
updated_opts: updated_opts
}
})
result = Ecto.Adapters.DynamoDB.Query.get_item(repo, table, lookup_fields, updated_opts)
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.execute: all: result", %{
"#{inspect(__MODULE__)}.execute-all-result" => inspect(result)
})
if opts[:query_info_key],
do:
Ecto.Adapters.DynamoDB.QueryInfo.put(
opts[:query_info_key],
extract_query_info(result)
)
if result == %{} do
# Empty map means "not found"
{0, []}
else
case query_meta do
%{select: %{from: {_, {_, _, _, types}}}} ->
types = types_to_source_fields(model, types)
handle_type_decode(table, result, types, repo, opts)
_ ->
if table == migration_source do
decoded = Enum.map(result["Items"], &decode_item(&1, repo, opts))
{length(decoded), decoded}
else
# Queries with a :select clause will not have the types available in the query_meta,
# instead construct them from prepared.select
types = construct_types_from_select_fields(prepared.select)
handle_type_decode(table, result, types, repo, opts)
end
end
end
end
end
def max_transaction_conflict_retries, do: @max_transaction_conflict_retries
defp handle_type_decode(table, result, types, repo, opts) do
if !result["Count"] and !result["Responses"] do
decoded = decode_item(result["Item"], types, repo, opts)
{1, [decoded]}
else
# batch_get_item returns "Responses" rather than "Items"
results_to_decode =
if result["Items"], do: result["Items"], else: result["Responses"][table]
decoded = Enum.map(results_to_decode, &decode_item(&1, types, repo, opts))
{length(decoded), decoded}
end
end
defp types_to_source_fields(model, types) do
types
|> Enum.into([], fn {field, type} ->
{model.__schema__(:field_source, field), type}
end)
end
# delete_all allows for the recursive option, scanning through multiple pages
defp delete_all(repo, table, lookup_fields, opts) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.delete_all", %{
"#{inspect(__MODULE__)}.delete_all-params" => %{
table: table,
lookup_fields: lookup_fields,
opts: opts
}
})
# select only the key
{:primary, key_list} = Ecto.Adapters.DynamoDB.Info.primary_key!(repo, table)
scan_or_query = Ecto.Adapters.DynamoDB.Query.scan_or_query?(repo, table, lookup_fields)
recursive = Ecto.Adapters.DynamoDB.Query.parse_recursive_option(scan_or_query, opts)
updated_opts =
prepare_recursive_opts(opts ++ [projection_expression: Enum.join(key_list, ", ")])
delete_all_recursive(repo, table, lookup_fields, updated_opts, recursive, %{}, 0)
end
defp delete_all_recursive(
repo,
table,
lookup_fields,
opts,
recursive,
query_info,
total_processed
) do
# query the table for which records to delete
fetch_result = Ecto.Adapters.DynamoDB.Query.get_item(repo, table, lookup_fields, opts)
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.delete_all_recursive: fetch_result", %{
"#{inspect(__MODULE__)}.delete_all_recursive-fetch_result" => inspect(fetch_result)
})
items =
case fetch_result do
%{"Items" => fetch_items} -> fetch_items
%{"Item" => item} -> [item]
%{"Responses" => table_map} -> table_map[table]
_ -> []
end
prepared_data =
for key_list <- Enum.map(items, &Map.to_list/1) do
key_map =
for {key, val_map} <- key_list, into: %{}, do: {key, Dynamo.Decoder.decode(val_map)}
[delete_request: [key: key_map]]
end
unprocessed_items =
if prepared_data != [] do
batch_delete(repo, table, prepared_data)
else
%{}
end
num_processed =
length(prepared_data) -
if !unprocessed_items[table], do: 0, else: length(unprocessed_items[table])
updated_query_info =
Enum.reduce(fetch_result, query_info, fn {key, val}, acc ->
case key do
"Count" ->
Map.update(acc, key, val, fn x -> x + val end)
"ScannedCount" ->
Map.update(acc, key, val, fn x -> x + val end)
"LastEvaluatedKey" ->
Map.update(acc, key, val, fn _ -> fetch_result["LastEvaluatedKey"] end)
_ ->
acc
end
end)
|> Map.update("UnprocessedItems", unprocessed_items, fn map ->
if map == %{}, do: %{}, else: %{table => map[table] ++ unprocessed_items[table]}
end)
updated_recursive = Ecto.Adapters.DynamoDB.Query.update_recursive_option(recursive)
if fetch_result["LastEvaluatedKey"] != nil and updated_recursive.continue do
opts_with_offset = opts ++ [exclusive_start_key: fetch_result["LastEvaluatedKey"]]
delete_all_recursive(
repo,
table,
lookup_fields,
opts_with_offset,
updated_recursive.new_value,
updated_query_info,
total_processed + num_processed
)
else
# We're not retrying unprocessed items yet, but we are providing the relevant info in the QueryInfo agent if :query_info_key is supplied
if opts[:query_info_key],
do: Ecto.Adapters.DynamoDB.QueryInfo.put(opts[:query_info_key], updated_query_info)
{num_processed + total_processed, nil}
end
end
# Returns unprocessed_items
# Similarly to a batch insert, batch delete is also restricted by DDB's batch write limit of 25 records - these requests will be chunked as well.
defp batch_delete(repo, table, prepared_data) do
Enum.chunk_every(prepared_data, @batch_write_item_limit)
|> Enum.reduce(%{}, fn batch, unprocessed_items ->
batch_write_attempt =
Dynamo.batch_write_item(%{table => batch})
|> ExAws.request(ex_aws_config(repo))
|> handle_error!(repo, %{table: table, records: []})
case batch_write_attempt do
%{"UnprocessedItems" => %{^table => items}} ->
Map.update(unprocessed_items, table, items, &(&1 ++ items))
_ ->
unprocessed_items
end
end)
end
defp update_all(repo, table, lookup_fields, opts, updates, params) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update_all", %{
"#{inspect(__MODULE__)}.update_all-params" => %{
table: table,
lookup_fields: lookup_fields,
opts: opts
}
})
scan_or_query = Ecto.Adapters.DynamoDB.Query.scan_or_query?(repo, table, lookup_fields)
recursive = Ecto.Adapters.DynamoDB.Query.parse_recursive_option(scan_or_query, opts)
key_list = Ecto.Adapters.DynamoDB.Info.primary_key!(repo, table)
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update_all: key_list", %{
"#{inspect(__MODULE__)}.update_all-key_list" => inspect(key_list)
})
# The remove statement must be constructed after finding pull-indexes, but it
# also includes possibly removing nil fields, and since we have one handler for
# both set and remove, we call it during the batch update process
{update_expression, update_fields_sans_set_remove, set_remove_fields} =
construct_update_expression(repo, updates, params, opts)
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update_all: update fields", %{
"#{inspect(__MODULE__)}.update_all-update_fields" => %{
update_fields_sans_set_remove: inspect(update_fields_sans_set_remove),
set_remove_fields: inspect(set_remove_fields)
}
})
attribute_names = construct_expression_attribute_names(update_fields_sans_set_remove)
attribute_values =
construct_expression_attribute_values(repo, update_fields_sans_set_remove, opts)
base_update_options = [
expression_attribute_names: attribute_names,
update_expression: update_expression,
return_values: :all_new
]
updated_opts = prepare_recursive_opts(opts)
update_options = maybe_add_attribute_values(base_update_options, attribute_values)
pull_actions_without_index =
Keyword.keys(set_remove_fields[:pull])
|> Enum.any?(fn x -> !Enum.member?(Keyword.keys(maybe_list(opts[:pull_indexes])), x) end)
{new_update_options, new_set_remove_fields} =
if pull_actions_without_index do
{update_options, set_remove_fields}
else
merged_pull_indexes =
Keyword.merge(set_remove_fields[:pull], maybe_list(opts[:pull_indexes]))
opts_with_pull_indexes =
Keyword.update(opts, :pull_indexes, merged_pull_indexes, fn _ -> merged_pull_indexes end)
{update_batch_update_options(
repo,
update_options,
set_remove_fields,
opts_with_pull_indexes
), []}
end
update_all_recursive(
repo,
table,
lookup_fields,
updated_opts,
new_update_options,
key_list,
new_set_remove_fields,
recursive,
%{},
0
)
end
defp update_all_recursive(
repo,
table,
lookup_fields,
opts,
update_options,
key_list,
set_remove_fields,
recursive,
query_info,
total_updated
) do
fetch_result = Ecto.Adapters.DynamoDB.Query.get_item(repo, table, lookup_fields, opts)
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update_all_recursive: fetch_result", %{
"#{inspect(__MODULE__)}.update_all_recursive-fetch_result" => inspect(fetch_result)
})
updated_query_info =
case fetch_result do
%{"Count" => last_count, "ScannedCount" => last_scanned_count} ->
%{
"Count" => last_count + Map.get(query_info, "Count", 0),
"ScannedCount" => last_scanned_count + Map.get(query_info, "ScannedCount", 0),
"LastEvaluatedKey" => Map.get(fetch_result, "LastEvaluatedKey")
}
_ ->
query_info
end
items =
case fetch_result do
%{"Items" => fetch_items} -> fetch_items
%{"Item" => item} -> [item]
%{"Responses" => table_map} -> table_map[table]
_ -> []
end
num_updated =
if items != [] do
batch_update(repo, table, items, key_list, update_options, set_remove_fields, opts)
else
0
end
updated_recursive = Ecto.Adapters.DynamoDB.Query.update_recursive_option(recursive)
if fetch_result["LastEvaluatedKey"] != nil and updated_recursive.continue do
opts_with_offset = opts ++ [exclusive_start_key: fetch_result["LastEvaluatedKey"]]
update_all_recursive(
repo,
table,
lookup_fields,
opts_with_offset,
update_options,
key_list,
set_remove_fields,
updated_recursive.new_value,
updated_query_info,
total_updated + num_updated
)
else
if opts[:query_info_key],
do: Ecto.Adapters.DynamoDB.QueryInfo.put(opts[:query_info_key], updated_query_info)
{total_updated + num_updated, []}
end
end
defp batch_update(repo, table, items, key_list, update_options, set_remove_fields, opts) do
Enum.reduce(items, 0, fn result_to_update, acc ->
filters = get_key_values_dynamo_map(result_to_update, key_list)
# we only update this on a case-by-case basis if pull actions
# without specific indexes are specified
options_with_set_and_remove =
case set_remove_fields do
[] ->
update_options
_ ->
pull_fields_with_indexes =
Enum.map(set_remove_fields[:pull], fn {field_atom, val} ->
list = result_to_update[to_string(field_atom)]
{field_atom, find_all_indexes_in_dynamodb_list(list, val)}
end)
merged_pull_indexes =
Keyword.merge(pull_fields_with_indexes, maybe_list(opts[:pull_indexes]))
opts_with_pull_indexes =
Keyword.update(opts, :pull_indexes, merged_pull_indexes, fn _ ->
merged_pull_indexes
end)
update_batch_update_options(
repo,
update_options,
set_remove_fields,
opts_with_pull_indexes
)
end
# 'options_with_set_and_remove' might not have the key, ':expression_attribute_values',
# when there are only removal statements.
record =
if options_with_set_and_remove[:expression_attribute_values],
do: [options_with_set_and_remove[:expression_attribute_values] |> Enum.into(%{})],
else: []
if options_with_set_and_remove[:update_expression] |> String.trim() != "" do
Dynamo.update_item(table, filters, options_with_set_and_remove)
|> ExAws.request(ex_aws_config(repo))
|> handle_error!(repo, %{table: table, records: record ++ []})
acc + 1
else
acc
end
end)
end
defp update_batch_update_options(repo, update_options, set_remove_fields, opts) do
attribute_names =
construct_expression_attribute_names(Keyword.values(set_remove_fields) |> List.flatten())
set_and_push_fields =
maybe_list(set_remove_fields[:set]) ++ maybe_list(set_remove_fields[:push])
opts_with_push = opts ++ Keyword.take(set_remove_fields, [:push])
attribute_values =
construct_expression_attribute_values(repo, set_and_push_fields, opts_with_push)
set_statement = construct_set_statement(repo, set_remove_fields[:set], opts_with_push)
opts_for_construct_remove =
Keyword.take(set_remove_fields, [:pull]) ++
Keyword.take(opts, [:pull_indexes, :remove_nil_fields])
remove_statement =
construct_remove_statement(repo, set_remove_fields[:set], opts_for_construct_remove)
base_update_options = [
expression_attribute_names:
Map.merge(attribute_names, update_options[:expression_attribute_names]),
update_expression:
(set_statement <> " " <> remove_statement <> " " <> update_options[:update_expression])
|> String.trim(),
return_values: :all_new
]
maybe_add_attribute_values(
base_update_options,
attribute_values ++ maybe_list(update_options[:expression_attribute_values])
)
end
# find indexes to remove for update :pull action
defp find_all_indexes_in_dynamodb_list(dynamodb_list, target) do
Dynamo.Decoder.decode(dynamodb_list)
|> Enum.with_index()
|> Enum.filter(fn {x, _} -> x == target end)
|> Enum.map(fn {_, i} -> i end)
end
# During delete_all's and update_all's recursive
# procedure, we want to keep the recursion in
# the top-level, between actions, rather than
# load all the results into memory and then act;
# so we disable the recursion on get_item
defp prepare_recursive_opts(opts) do
opts |> Keyword.delete(:page_limit) |> Keyword.update(:recursive, false, fn _ -> false end)
end
@doc """
Inserts a single new struct in the data store.
## Autogenerate
The primary key will be automatically included in `returning` if the
field has type `:id` or `:binary_id` and no value was set by the
developer or none was autogenerated by the adapter.
"""
# @callback insert(repo, schema_meta, fields, on_conflict, returning, options) ::
# {:ok, fields} | {:invalid, constraints} | no_return
# def insert(_,_,_,_,_) do
@impl Ecto.Adapter.Schema
def insert(repo_meta, schema_meta, fields, on_conflict, returning, opts) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.insert", %{
"#{inspect(__MODULE__)}.insert-params" => %{
repo_meta: repo_meta,
schema_meta: schema_meta,
fields: fields,
on_conflict: on_conflict,
returning: returning,
opts: opts
}
})
do_insert(repo_meta, schema_meta, fields, on_conflict, opts, 0)
end
defp do_insert(_repo_meta, _schema_meta, _fields, _on_conflict, _opts, retries)
when retries >= @max_transaction_conflict_retries do
raise(
"#{inspect(__MODULE__)}.insert error: reached maximum transaction conflict retries without success"
)
end
defp do_insert(repo_meta, schema_meta, fields, on_conflict, opts, retries) do
table = schema_meta.source
model = schema_meta.schema
fields_map = Enum.into(fields, %{})
record = maybe_replace_empty_mapsets_for_insert(fields_map, repo_meta.repo, opts)
insert_nil_fields = opt_config(:insert_nil_fields, repo_meta.repo, opts, true)
record = unless insert_nil_fields, do: record, else: build_record_map(model, record)
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.insert: local variables", %{
"#{inspect(__MODULE__)}.insert-vars" => %{table: table, record: record}
})
{:primary, key_list} = Ecto.Adapters.DynamoDB.Info.primary_key!(repo_meta.repo, table)
hash_key = hd(key_list)
replace_all =
case on_conflict do
{l, _, _} when is_list(l) ->
# All fields being set are to be replaced
Enum.all?(Keyword.keys(fields), &(&1 in l))
_ ->
false
end
options =
if replace_all do
# All fields being replaced: don't use a condition
[]
else
attribute_names = for k <- key_list, into: %{}, do: {"##{k}", k}
conditions = for k <- key_list, do: "attribute_not_exists(##{k})"
condition_expression = Enum.join(conditions, " and ")
[
expression_attribute_names: attribute_names,
condition_expression: condition_expression
]
end
case Dynamo.put_item(table, record, options)
|> ExAws.request(ex_aws_config(repo_meta.repo))
|> handle_error!(repo_meta.repo, %{table: table, records: [record]}) do
{:error, "ConditionalCheckFailedException"} ->
case on_conflict do
# Per discussion with Jose Valim (https://github.com/elixir-ecto/ecto/issues/2378)
# clarifying the adapter should return nothing if there is no `:returning` specified,
# and what we thought was to be returned as a `nil` id, is only for cases where
# "the field is autogenerated by the database" (https://hexdocs.pm/ecto/Ecto.Repo.html)
{:nothing, _, _} ->
{:ok, []}
{:raise, _, _} ->
# This constraint name yields the correct behavior in the case the user
# has specified a unique constraint on the primary key in their schema:
constraint_name = "#{table}_#{hash_key}_index"
{:invalid, [unique: constraint_name]}
end
{:error, "TransactionConflictException"} ->
do_insert(repo_meta, schema_meta, fields, on_conflict, opts, retries + 1)
%{} ->
{:ok, []}
end
end
@impl Ecto.Adapter.Schema
def insert_all(
%{repo: repo},
schema_meta,
field_list,
rows,
on_conflict,
return_sources,
_placeholders,
opts
) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.insert_all", %{
"#{inspect(__MODULE__)}.insert_all-params" => %{
repo: repo,
schema_meta: schema_meta,
field_list: field_list,
rows: rows,
on_conflict: on_conflict,
return_sources: return_sources,
opts: opts
}
})
insert_nil_field_option = Keyword.get(opts, :insert_nil_fields, true)
do_not_insert_nil_fields =
insert_nil_field_option == false ||
RepoConfig.config_val(repo, :insert_nil_fields) == false
table = schema_meta.source
model = schema_meta.schema
prepared_rows =
Enum.map(rows, fn row ->
mapped_fields =
row
|> Enum.into(%{})
|> maybe_replace_empty_mapsets_for_insert(repo, opts)
record =
if do_not_insert_nil_fields,
do: mapped_fields,
else: build_record_map(model, mapped_fields)
[put_request: [item: record]]
end)
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.insert_all: local variables", %{
"#{inspect(__MODULE__)}.insert_all-vars" => %{
table: table,
records: get_records_from_fields(prepared_rows)
}
})
batch_write(repo, table, prepared_rows, opts)
end
# DynamoDB will reject an entire batch of insert_all() records if there are more than 25 requests.
# https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
# batch_write/4 will break the list into chunks of 25 items and insert each separately.
defp batch_write(repo, table, prepared_fields, opts) do
unprocessed_items_element = "UnprocessedItems"
grouped_records = Enum.chunk_every(prepared_fields, @batch_write_item_limit)
num_batches = length(grouped_records)
# Break the prepared_fields into chunks of at most 25 elements to be batch inserted, accumulating
# the total count of records and appropriate results as it loops through the reduce.
{total_processed, results} =
grouped_records
|> Stream.with_index()
|> Enum.reduce({0, []}, fn {field_group, i},
{running_total_processed, batch_write_results} ->
{total_batch_processed, batch_write_attempt} =
handle_batch_write(repo, field_group, table, unprocessed_items_element)
# Log depth of 11 will capture the full data structure returned in any UnprocessedItems - https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
ecto_dynamo_log(
:debug,
"#{inspect(__MODULE__)}.batch_write #{i + 1} of #{num_batches}: local variables",
%{
"#{inspect(__MODULE__)}.insert_all-batch_write" => %{
table: table,
field_group: field_group,
results: batch_write_attempt
}
},
depth: 11
)
# We're not retrying unprocessed items yet, but we are providing the relevant info in the QueryInfo agent if :query_info_key is supplied
if opts[:query_info_key] do
query_info = extract_query_info(batch_write_attempt)
Ecto.Adapters.DynamoDB.QueryInfo.update(opts[:query_info_key], [query_info], fn list ->
list ++ [query_info]
end)
end
{running_total_processed + total_batch_processed,
batch_write_results ++ [batch_write_attempt]}
end)
result_body_for_log = %{
table => Enum.flat_map(results, fn res -> res[unprocessed_items_element][table] || [] end)
}
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.batch_write: batch_write_attempt result", %{
"#{inspect(__MODULE__)}.insert_all-batch_write" =>
inspect(%{
unprocessed_items_element =>
if(result_body_for_log[table] == [], do: %{}, else: result_body_for_log)
})
})
{total_processed, nil}
end
defp handle_batch_write(repo, field_group, table, unprocessed_items_element) do
results =
Dynamo.batch_write_item(%{table => field_group})
|> ExAws.request(ex_aws_config(repo))
|> handle_error!(repo, %{table: table, records: get_records_from_fields(field_group)})
if results[unprocessed_items_element] == %{} do
{length(field_group), results}
else
{length(field_group) - length(results[unprocessed_items_element][table]), results}
end
end
defp get_records_from_fields(fields),
do: Enum.map(fields, fn [put_request: [item: record]] -> record end)
defp build_record_map(model, fields_to_insert) do
# Ecto does not convert empty strings to nil before passing them
# to Repo.insert_all, and ExAws will remove empty strings (as well as empty lists)
# when building the insertion query but not nil values. We don't mind the removal
# of empty lists since those cannot be inserted to indexed fields, but we'd like to
# catch the removal of fields with empty strings by ExAws to support our option, :remove_nil_fields,
# so we convert these to nil.
fields = model.__schema__(:fields)
sources = fields |> Enum.into(%{}, fn f -> {f, model.__schema__(:field_source, f)} end)
empty_strings_to_nil =
fields_to_insert
|> Enum.map(fn {field, val} -> {field, if(val == "", do: nil, else: val)} end)
|> Enum.into(%{})
model.__struct__
|> Map.delete(:__meta__)
|> Map.from_struct()
|> Enum.reduce(%{}, fn {k, v}, acc ->
Map.put(acc, Map.get(sources, k), v)
end)
|> Map.merge(empty_strings_to_nil)
end
@impl Ecto.Adapter.Schema
def delete(adapter_meta = %{repo: repo}, schema_meta, filters, _returning, opts) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.delete", %{
"#{inspect(__MODULE__)}.delete-params" => %{
adapter_meta: adapter_meta,
schema_meta: schema_meta,
filters: filters,
opts: opts
}
})
do_delete(repo, schema_meta, filters, opts, 0)
end
defp do_delete(_repo, _schema_meta, _filters, _opts, retries)
when retries >= @max_transaction_conflict_retries do
raise(
"#{inspect(__MODULE__)}.delete error: reached maximum transaction conflict retries without success"
)
end
defp do_delete(repo, schema_meta, filters, opts, retries) do
table = schema_meta.source
updated_filters =
maybe_update_filters_for_range_key(repo, table, schema_meta, filters, opts, "delete")
attribute_names = construct_expression_attribute_names(keys_to_atoms(filters))
base_options = [expression_attribute_names: attribute_names]
condition_expression = construct_condition_expression(filters)
options = base_options ++ [condition_expression: condition_expression]
# 'options' might not have the key, ':expression_attribute_values', when there are only removal statements
record =
if options[:expression_attribute_values],
do: [options[:expression_attribute_values] |> Enum.into(%{})],
else: []
case Dynamo.delete_item(table, updated_filters, options)
|> ExAws.request(ex_aws_config(repo))
|> handle_error!(repo, %{table: table, records: record ++ []}) do
%{} ->
{:ok, []}
{:error, "ConditionalCheckFailedException"} ->
{:error, :stale}
{:error, "TransactionConflictException"} ->
do_delete(repo, schema_meta, filters, opts, retries + 1)
end
end
@impl Ecto.Adapter.Schema
def update(adapter_meta = %{repo: repo}, schema_meta, fields, filters, returning, opts) do
ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update", %{
"#{inspect(__MODULE__)}.update-params" => %{
adapter_meta: adapter_meta,
schema_meta: schema_meta,
fields: fields,
filters: filters,
returning: returning,
opts: opts
}
})
do_update(repo, schema_meta, fields, filters, returning, opts, 0)
end
defp do_update(_repo, _schema_meta, _fields, _filters, _returning, _opts, retries)
when retries >= @max_transaction_conflict_retries do
raise(
"#{inspect(__MODULE__)}.update error: reached maximum transaction conflict retries without success"
)
end
defp do_update(repo, schema_meta, fields, filters, returning, opts, retries) do
table = schema_meta.source
updated_filters =
maybe_update_filters_for_range_key(repo, table, schema_meta, filters, opts, "update")
fields = maybe_replace_empty_mapsets_for_update(fields, repo, opts)
update_expression = construct_update_expression(repo, fields, opts)
# add updated_filters to attribute_ names and values for condition_expression
attribute_names = construct_expression_attribute_names(fields ++ keys_to_atoms(filters))
attribute_values = construct_expression_attribute_values(repo, fields, opts)
base_options = [
expression_attribute_names: attribute_names,
update_expression: update_expression
]
condition_expression = construct_condition_expression(filters)
options =
maybe_add_attribute_values(base_options, attribute_values) ++
[condition_expression: condition_expression]
# 'options' might not have the key, ':expression_attribute_values', when there are only removal statements
record =
if options[:expression_attribute_values],
do: [options[:expression_attribute_values] |> Enum.into(%{})],
else: []
case Dynamo.update_item(table, updated_filters, options)
|> ExAws.request(ex_aws_config(repo))
|> handle_error!(repo, %{table: table, records: record ++ []}) do
%{} ->
{:ok, []}
{:error, "ConditionalCheckFailedException"} ->
{:error, :stale}
{:error, "TransactionConflictException"} ->
do_update(repo, schema_meta, fields, filters, returning, opts, retries + 1)
end
end
# Support for tables with a hash+range key.
#
# * If the schema has both keys declared (using the `primary_key: true`) the filters are already correct
# * If :range_key is specified with a value it is added to filters
# * If :range_key is not specified, and the table does have a range key, attempt to find it with a DynamoDB query
#
defp maybe_update_filters_for_range_key(repo, table, schema_meta, filters, opts, action) do
with primary_key_length <- length(schema_meta.schema.__schema__(:primary_key)) do
case opts[:range_key] do
# Use primary keys declared in schema
nil when primary_key_length == 2 ->
filters
nil ->
{:primary, key_list} = Ecto.Adapters.DynamoDB.Info.primary_key!(repo, table)
if length(key_list) > 1 do
updated_opts = opts ++ [projection_expression: Enum.join(key_list, ", ")]
filters_as_strings =
for {field, val} <- filters, do: {Atom.to_string(field), {val, :==}}
fetch_result =
Ecto.Adapters.DynamoDB.Query.get_item(repo, table, filters_as_strings, updated_opts)
items =
case fetch_result do
%{"Items" => fetch_items} -> fetch_items
%{"Item" => item} -> [item]
_ -> []
end
if items == [],
do:
raise(
"#{inspect(__MODULE__)}.#{action} error: no results found for record: #{inspect(filters)}"
)
if length(items) > 1,
do:
raise(
"#{inspect(__MODULE__)}.#{action} error: more than one result found for record: #{inspect(filters)} Please consider using the adapter's :range_key custom inline option (see README)."
)
for {field, key_map} <- Map.to_list(hd(items)) do
[{_field_type, val}] = Map.to_list(key_map)
{field, val}
end
else
filters
end
range_key ->
[range_key | filters]
end
end
end
defp keys_to_atoms(list),
do: for({k, v} <- list, do: {maybe_string_to_atom(k), v})
defp maybe_string_to_atom(s),
do: if(is_binary(s), do: String.to_atom(s), else: s)
defp construct_condition_expression(filters) when is_list(filters) do
Keyword.keys(filters)
|> Enum.map(fn field -> "attribute_exists(##{to_string(field)})" end)
|> Enum.join(" AND ")
end
defp extract_query_info(result),
do:
result
|> Map.take([
"Count",
"ScannedCount",
"LastEvaluatedKey",
"UnprocessedItems",
"UnprocessedKeys"
])
# Used in update_all
defp extract_update_params([], _action_atom, _params), do: []
defp extract_update_params([%{expr: key_list}], action_atom, params) do
case key_list[action_atom] do
nil ->
[]
action_list ->
for s <- action_list do
{field_atom, {:^, _, [idx]}} = s
{field_atom, Enum.at(params, idx)}
end
end
end
defp extract_update_params([a], _action_atom, _params),
do:
error(
"#{inspect(__MODULE__)}.extract_update_params: Updates is either missing the :expr key or does not contain a struct or map: #{inspect(a)}"
)
defp extract_update_params(unsupported, _action_atom, _params),
do:
error(
"#{inspect(__MODULE__)}.extract_update_params: unsupported parameter construction. #{inspect(unsupported)}"
)
# Ecto does not support push pull for types other than array.
# Therefore, we enable add and delete via opts
defp extract_update_params(key_list, action_atom) do
case key_list[action_atom] do
nil -> []
action_list -> action_list
end
end
# used in :update_all
defp get_key_values_dynamo_map(dynamo_map, {:primary, keys}) do
for k <- keys, do: {String.to_atom(k), Dynamo.Decoder.decode(dynamo_map[k])}
end
defp construct_expression_attribute_names(fields) do
for {f, _} <- fields, into: %{}, do: {"##{Atom.to_string(f)}", Atom.to_string(f)}
end
defp construct_expression_attribute_values(repo, fields, opts) do
remove_rather_than_set_to_null =
opts[:remove_nil_fields] || opts[:remove_nil_fields_on_update] ||
RepoConfig.config_val(repo, :remove_nil_fields_on_update) == true
# If the value is nil and the :remove_nil_fields option is set,
# we're removing this attribute, not updating it, so filter out any such fields:
fields
|> Enum.reduce([], &format_update_field(&1, &2, remove_rather_than_set_to_null, opts))
|> Enum.filter(fn {x, _} -> not Keyword.has_key?(maybe_list(opts[:pull]), x) end)
end
defp format_update_field({_k, nil}, acc, true, _opts), do: acc
defp format_update_field({k, v}, acc, true, opts), do: [{k, format_val(k, v, opts)} | acc]
defp format_update_field({k, v}, acc, false, opts),
do: [{k, format_nil_or_val(k, v, opts)} | acc]
defp maybe_list(l) when is_list(l), do: l
defp maybe_list(_), do: []
defp format_nil_or_val(_k, nil, _opts), do: %{"NULL" => "true"}
defp format_nil_or_val(k, v, opts), do: format_val(k, v, opts)
defp format_val(k, v, opts) do
case opts[:push][k] do
nil -> v
_ -> [v]
end
end
# DynamoDB throws an error if we pass in an empty list for attribute values,
# so we have to implement this stupid little helper function to avoid hurting its feelings:
defp maybe_add_attribute_values(options, []) do
options
end
defp maybe_add_attribute_values(options, attribute_values) do
[expression_attribute_values: attribute_values] ++ options
end
defp construct_update_expression(_repo, updates, params, opts) do
to_set = extract_update_params(updates, :set, params)
to_push = extract_update_params(updates, :push, params)
to_pull = extract_update_params(updates, :pull, params)
to_add = extract_update_params(opts, :add) ++ extract_update_params(updates, :inc, params)
to_delete = extract_update_params(opts, :delete)
{(construct_add_statement(to_add, opts) <>
" " <>
construct_delete_statement(to_delete, opts))
|> String.trim(), to_add ++ to_delete, [set: to_set, push: to_push, pull: to_pull]}
end
# The update callback supplies fields in the parameters
# whereas update_all includes a more complicated updates structure
defp construct_update_expression(repo, fields, opts) do
set_statement = construct_set_statement(repo, fields, opts)
rem_statement = construct_remove_statement(repo, fields, opts)
String.trim("#{set_statement} #{rem_statement}")
end
# fields::[{:field, val}]
defp construct_set_statement(repo, fields, opts) do
remove_rather_than_set_to_null =
opts[:remove_nil_fields] || opts[:remove_nil_fields_on_update] ||
RepoConfig.config_val(repo, :remove_nil_fields_on_update) == true
set_clauses =
for {key, val} <- fields, not (is_nil(val) and remove_rather_than_set_to_null) do
key_str = Atom.to_string(key)
"##{key_str}=:#{key_str}"
end ++
case opts[:push] do
nil ->
[]
push_list ->
for {key, _val} <- push_list do
key_str = Atom.to_string(key)
if Enum.member?(maybe_list(opts[:prepend_to_list]), key),
do: "##{key_str} = list_append(:#{key_str}, ##{key_str})",
else: "##{key_str} = list_append(##{key_str}, :#{key_str})"
end
end
case set_clauses do
[] ->
""
_ ->
"SET " <> Enum.join(set_clauses, ", ")
end
end
defp construct_remove_statement(repo, fields, opts) do
remove_rather_than_set_to_null =
opts[:remove_nil_fields] || opts[:remove_nil_fields_on_update] ||
RepoConfig.config_val(repo, :remove_nil_fields_on_update) == true
# Ecto :pull update can be emulated provided
# we are given an index to remove in opts[:pull_indexes]
remove_clauses =
if remove_rather_than_set_to_null do
for {key, val} <- fields, is_nil(val), do: "##{Atom.to_string(key)}"
else
[]
end ++
cond do
!opts[:pull_indexes] or Keyword.values(opts[:pull_indexes]) |> List.flatten() == [] ->
[]
opts[:pull] == nil ->
[]
true ->
for {key, _val} <- opts[:pull] do
key_str = Atom.to_string(key)
Enum.map(opts[:pull_indexes][key], fn index -> "##{key_str}[#{index}]" end)
|> Enum.join(", ")
end
end
case remove_clauses do
[] ->
""
_ ->
"REMOVE " <> Enum.join(remove_clauses, ", ")
end
end
# fields::[{:field, val}]
defp construct_add_statement(fields, _opts) do
add_clauses =
for {key, _val} <- fields do
key_str = Atom.to_string(key)
"##{key_str} :#{key_str}"
end
case add_clauses do
[] ->
""
_ ->
"ADD " <> Enum.join(add_clauses, ", ")
end
end
defp construct_delete_statement(fields, _opts) do
delete_clauses =
for {key, _val} <- fields do
key_str = Atom.to_string(key)
"##{key_str} :#{key_str}"
end
case delete_clauses do
[] ->
""
_ ->
"DELETE " <> Enum.join(delete_clauses, ", ")
end
end
defp validate_where_clauses!(query) do
for w <- query.wheres do
validate_where_clause!(w)
end
end
defp validate_where_clause!(%BooleanExpr{expr: {op, _, _}})
when op in [:==, :<, :>, :<=, :>=, :in],
do: :ok
defp validate_where_clause!(%BooleanExpr{expr: {logical_op, _, _}})
when logical_op in [:and, :or],
do: :ok
defp validate_where_clause!(%BooleanExpr{expr: {:is_nil, _, _}}), do: :ok
defp validate_where_clause!(%BooleanExpr{expr: {:fragment, _, _}}), do: :ok
defp validate_where_clause!(unsupported),
do: error("unsupported where clause: #{inspect(unsupported)}")
# We are parsing a nested, recursive structure of the general type:
# %{:logical_op, list_of_clauses} | %{:conditional_op, field_and_value}
defp extract_lookup_fields([], _params, lookup_fields), do: lookup_fields
defp extract_lookup_fields([query | queries], params, lookup_fields) do
# A logical operator tuple does not always have a parent 'expr' key.
maybe_extract_from_expr =
case query do
%BooleanExpr{expr: expr} -> expr
# TODO: could there be other cases?
_ -> query
end
case maybe_extract_from_expr do
# A logical operator points to a list of conditionals
{op, _, [left, right]} when op in [:==, :<, :>, :<=, :>=, :in] ->
{field, value} = get_op_clause(left, right, params)
updated_lookup_fields =
case List.keyfind(lookup_fields, field, 0) do
# we assume the most ops we can apply to one field is two, otherwise this might throw an error
{field, {old_val, old_op}} ->
List.keyreplace(lookup_fields, field, 0, {field, {[value, old_val], [op, old_op]}})
_ ->
[{field, {value, op}} | lookup_fields]
end
extract_lookup_fields(queries, params, updated_lookup_fields)
# Logical operator expressions have more than one op clause
# We are matching queries of the type: 'from(p in Person, where: p.email == "g@email.com" and p.first_name == "George")'
# But not of the type: 'from(p in Person, where: [email: "g@email.com", first_name: "George"])'
#
# A logical operator is a member of a list
{logical_op, _, clauses} when logical_op in [:and, :or] ->
deeper_lookup_fields = extract_lookup_fields(clauses, params, [])
extract_lookup_fields(queries, params, [
{logical_op, deeper_lookup_fields} | lookup_fields
])
{:fragment, _, raw_expr_mixed_list} ->
parsed_fragment = parse_raw_expr_mixed_list(raw_expr_mixed_list, params)
extract_lookup_fields(queries, params, [parsed_fragment | lookup_fields])
# We perform a post-query is_nil filter on indexed fields and have DynamoDB filter
# for nil non-indexed fields (although post-query nil-filters on (missing) indexed
# attributes could only find matches when the attributes are not the range part of
# a queried partition key (hash part) since those would not return the sought records).
{:is_nil, _, [arg]} ->
{{:., _, [_, field_name]}, _, _} = arg
# We give the nil value a string, "null", since it will be mapped as a DynamoDB attribute_expression_value
extract_lookup_fields(queries, params, [
{to_string(field_name), {"null", :is_nil}} | lookup_fields
])
_ ->
extract_lookup_fields(queries, params, lookup_fields)
end
end
# Specific (as opposed to generalized) parsing for Ecto :fragment - the only use for it
# so far is 'between' which is the only way to query 'between' on an indexed field since
# those accept only single conditions.
#
# Example with values as strings: [raw: "", expr: {{:., [], [{:&, [], [0]}, :person_id]}, [], []}, raw: " between ", expr: "person:a", raw: " and ", expr: "person:f", raw: ""]
#
# Example with values as part of the string itself: [raw: "", expr: {{:., [], [{:&, [], [0]}, :person_id]}, [], []}, raw: " between person:a and person:f"]
#
# Example with values in params: [raw: "", expr: {{:., [], [{:&, [], [0]}, :person_id]}, [], []}, raw: " between ", expr: {:^, [], [0]}, raw: " and ", expr: {:^, [], [1]}, raw: ""]
#
defp parse_raw_expr_mixed_list(raw_expr_mixed_list, params) do
# group the expression into fields, values, and operators,
# only supporting the example with values in params
case raw_expr_mixed_list do
# between
[
raw: _,
expr: {{:., [], [{:&, [], [0]}, field_atom]}, [], []},
raw: between_str,
expr: {:^, [], [idx1]},
raw: and_str,
expr: {:^, [], [idx2]},
raw: _
] ->
if not Regex.match?(~r/^\s*between\s*and\s*$/i, between_str <> and_str),
do: parse_raw_expr_mixed_list_error(raw_expr_mixed_list)
{to_string(field_atom), {[Enum.at(params, idx1), Enum.at(params, idx2)], :between}}
# begins_with
[
raw: begins_with_str,
expr: {{:., [], [{:&, [], [0]}, field_atom]}, [], []},
raw: comma_str,
expr: {:^, [], [idx]},
raw: closing_parenthesis_str
] ->
if not Regex.match?(
~r/^\s*begins_with\(\s*,\s*\)\s*$/i,
begins_with_str <> comma_str <> closing_parenthesis_str
),
do: parse_raw_expr_mixed_list_error(raw_expr_mixed_list)
{to_string(field_atom), {Enum.at(params, idx), :begins_with}}
_ ->
parse_raw_expr_mixed_list_error(raw_expr_mixed_list)
end
end
defp parse_raw_expr_mixed_list_error(raw_expr_mixed_list),
do:
raise(
"#{inspect(__MODULE__)}.parse_raw_expr_mixed_list parse error. We currently only support the Ecto fragments of the form, 'where: fragment(\"? between ? and ?\", FIELD_AS_VARIABLE, VALUE_AS_VARIABLE, VALUE_AS_VARIABLE)'; and 'where: fragment(\"begins_with(?, ?)\", FIELD_AS_VARIABLE, VALUE_AS_VARIABLE)'. Received: #{inspect(raw_expr_mixed_list)}"
)
defp get_op_clause(left, right, params) do
field = left |> get_field |> Atom.to_string()
value = get_value(right, params)
{field, value}
end
defp get_field({{:., _, [{:&, _, [0]}, field]}, _, []}), do: field
defp get_field(other_clause) do
error("Unsupported where clause, left hand side: #{other_clause}")
end
defp get_value({:^, _, [idx]}, params), do: Enum.at(params, idx)
# Handle queries with variable values, ex. Repo.all from i in Item, where: i.id in ^item_ids
# The last element of the tuple (first arg) will be a list with two numbers;
# the first number will be the number of attributes to be updated (in the event of an update_all query with a variable list)
# and the second will be a count of the number of elements in the variable list being queried. For example:
#
# query = from p in Person, where: p.id in ^ids
# TestRepo.update_all(query, set: [password: "cheese", last_name: "Smith"])
#
# assuming that ids contains 4 values, the last element would be [2, 4].
# Use this data to modify the params, which would otherwise include the values to be updated as well, which we don't want to query on.
defp get_value({:^, _, [num_update_terms, _num_query_terms]}, params),
do: Enum.drop(params, num_update_terms)
# Seems to be necessary for handling running a batch of migrations down.
defp get_value(%{value: right}, params), do: get_value(right, params)
# Handle .all(query) queries
defp get_value(other_clause, _params), do: other_clause
defp error(msg) do
raise ArgumentError, message: msg
end
defp construct_types_from_select_fields(%Ecto.Query.SelectExpr{expr: expr}) do
case expr do
{:{}, [], clauses = [{{:., [type: _type], [{:&, [], [0]}, _field]}, [], []} | _]} ->
for {{:., [type: type], [{:&, [], [0]}, field]}, [], []} <- clauses, do: {field, type}
{_, _, [0]} ->
[]
{{:., [type: type], [{_, _, _}, field]}, _, _} ->
[{field, type}]
clauses = [_ | _] ->
for {{_, [type: type], [{_, _, _}, field]}, _, _} <- clauses, do: {field, type}
end
end
def decode_item(item, types, repo, opts) do
types
|> Enum.map(fn {field, type} ->
Map.get(item, Atom.to_string(field), %{"NULL" => true})
|> Dynamo.Decoder.decode()
|> decode_type(type, repo, opts)
end)
end
def decode_item(%{"version" => version}, _repo, _opts) do
[version |> Dynamo.Decoder.decode()]
end
# Decodes datetime, seemingly unhandled by ExAws Dynamo decoder
defp decode_type(nil, DynamoDBSet, repo, opts), do: maybe_replace_nil_mapset(repo, opts)
defp decode_type(nil, _type, _repo, _opts), do: nil
defp decode_type(val, type, _repo, _opts) when type in [:utc_datetime_usec, :utc_datetime] do
{:ok, dt, _offset} = DateTime.from_iso8601(val)
dt
end
defp decode_type(val, type, _repo, _opts) when type in [:naive_datetime_usec, :naive_datetime],
do: NaiveDateTime.from_iso8601!(val)
# Support for Ecto >= 3.5
defp decode_type(val, {:parameterized, Ecto.Embedded, _} = type, _repo, _opts),
do: decode_embed(val, type)
# Support for Ecto 3.0-3.4
defp decode_type(val, {:embed, _} = type, _repo, _opts), do: decode_embed(val, type)
defp decode_type(val, _type, _repo, _opts), do: val
defp decode_embed(val, type) do
case Ecto.Type.embedded_load(type, val, :json) do
{:ok, decoded_value} ->
handle_decoded_embeded(decoded_value)
:error ->
ecto_dynamo_log(
:debug,
"#{inspect(__MODULE__)}.decode_embed: failed to decode embedded value: #{inspect(val)}"
)
nil
end
end
defp handle_decoded_embeded(embedded) when is_list(embedded),
do: Enum.map(embedded, &unload_parameterized_fields/1)
defp handle_decoded_embeded(embedded), do: unload_parameterized_fields(embedded)
# Rebuilds the embedded schema unloading the parameterized fields, so the parameterized
# type can load it in the ecto schema.
defp unload_parameterized_fields(%schema{} = embedded) do
fields = schema.__schema__(:fields)
Enum.reduce(fields, embedded, fn field, acc ->
field_type = schema.__schema__(:type, field)
field_value = Map.get(embedded, field)
Map.put(acc, field, maybe_dump_field(field_value, field_type))
end)
end
defp maybe_dump_field(val, {:parameterized, _type, _params} = field_type) do
{:ok, unloaded_value} = Ecto.Type.dump(field_type, val)
unloaded_value
end
defp maybe_dump_field(val, _field_type), do: val
# We found one instance where DynamoDB's error message could
# be more instructive - when trying to set an indexed field to something
# other than a string or number - so we're adding a more helpful message.
# The parameter, 'params', has the type %{table: :string, records: [:map]}
defp handle_error!({:ok, result}, _repo, _params), do: result
defp handle_error!({:error, {error_name, _} = error}, repo, params) do
# Check for inappropriate insert into indexed field
indexed_fields = Ecto.Adapters.DynamoDB.Info.indexed_attributes(repo, params.table)
# Repo.insert_all can present multiple records at once
forbidden_insert_on_indexed_field =
Enum.reduce(params.records, false, fn record, acc ->
acc ||
Enum.any?(record, fn {field, val} ->
[type] = Dynamo.Encoder.encode(val) |> Map.keys()
# Ecto does not convert Empty strings to nil before passing them to Repo.update_all or
# Repo.insert_all DynamoDB provides an instructive message during an update (forwarded by ExAws),
# but less so for batch_write_item, so we catch the empty string as well.
# Dynamo does not allow insertion of empty strings in any case.
(Enum.member?(indexed_fields, to_string(field)) and type not in ["S", "N"]) ||
val == ""
end)
end)
cond do
# we use this error to check if an update or delete record does not exist
error_name in ["ConditionalCheckFailedException", "TransactionConflictException"] ->
{:error, error_name}
forbidden_insert_on_indexed_field ->
raise "The following request error could be related to attempting to insert an empty string or attempting to insert a type other than a string or number on an indexed field. Indexed fields: #{inspect(indexed_fields)}. Records: #{inspect(params.records)}.\n\nExAws Request Error! #{inspect(error)}"
true ->
raise ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"
end
end
@doc """
Logs message to console and optionally to file. Log levels, colours and file path may be set in configuration (details in README.md).
"""
def ecto_dynamo_log(level, message, attributes \\ %{}, opts \\ []) do
if Confex.get_env(:ecto_adapters_dynamodb, :use_logger) do
Logger.log(level, message, attributes)
else
write_console_log(level, message, attributes, opts)
end
end
defp write_console_log(level, message, attributes, opts) do
log_levels = Confex.get_env(:ecto_adapters_dynamodb, :log_levels) || [:info]
if level in log_levels do
log_path = Confex.get_env(:ecto_adapters_dynamodb, :log_path)
depth = opts[:depth] || 4
colours = Confex.get_env(:ecto_adapters_dynamodb, :log_colours)
d = DateTime.utc_now()
formatted_message =
"#{d.year}-#{d.month}-#{d.day} #{d.hour}:#{d.minute}:#{d.second} UTC [Ecto dynamo #{level}] #{inspect(message)}"
{:ok, log_message} =
Jason.encode(%{message: formatted_message, attributes: chisel(attributes, depth)})
if Confex.get_env(:ecto_adapters_dynamodb, :log_in_colour) do
IO.ANSI.format([colours[level] || :normal, log_message], true) |> IO.puts()
else
log_message |> IO.puts()
end
if String.valid?(log_path) and Regex.match?(~r/\S/, log_path),
do: log_pipe(log_path, log_message)
end
end
def ex_aws_config(repo) do
config = Resolver.resolve!(repo.config())
config
|> Keyword.take([:debug_requests, :access_key_id, :secret_access_key, :region])
|> Keyword.merge(Keyword.get(config, :dynamodb, []))
end
defp chisel(str, _depth) when is_binary(str),
do: if(String.valid?(str), do: str, else: Base.encode64(str))
defp chisel(num, _depth) when is_number(num), do: num
defp chisel(any, _depth) when not is_map(any) and not is_list(any), do: inspect(any)
defp chisel(_, 0), do: "beyond_log_depth"
defp chisel(%{__struct__: _} = struct, _depth), do: inspect(struct)
defp chisel(map, depth) when is_map(map) do
for {k, v} <- map, into: %{}, do: {k, chisel(v, depth - 1)}
end
defp chisel(list, depth) when is_list(list) do
for e <- list, do: chisel(e, depth - 1)
# Stream.with_index(list) |> Enum.reduce(%{}, fn({v,k}, acc)-> Map.put(acc, k, chisel(v, depth - 1)) end)
end
defp log_pipe(path, str) do
{:ok, file} = File.open(path, [:append])
IO.binwrite(file, str)
File.close(file)
end
defp opt_config(key, repo, opts, default \\ false) do
case Keyword.get(opts, key) do
nil -> RepoConfig.config_val(repo, key, default)
x -> x
end
end
defp maybe_replace_empty_mapsets_for_insert(record, repo, opts) do
empty_mapset_to_nil = opt_config(:empty_mapset_to_nil, repo, opts)
insert_nil = opt_config(:insert_nil_fields, repo, opts, true)
cond do
empty_mapset_to_nil and insert_nil ->
record
|> Enum.map(fn {k, v} -> {k, empty_mapset_to_nil(v)} end)
|> Enum.into(%{})
empty_mapset_to_nil ->
record
|> Enum.reject(fn {_k, v} -> v == MapSet.new() end)
|> Enum.into(%{})
true ->
record
end
end
defp maybe_replace_empty_mapsets_for_update(record, repo, opts) do
if opt_config(:empty_mapset_to_nil, repo, opts) do
record
|> Enum.map(fn {k, v} -> {k, empty_mapset_to_nil(v)} end)
else
record
end
end
defp empty_mapset_to_nil(%MapSet{} = m), do: if(MapSet.size(m) == 0, do: nil, else: m)
defp empty_mapset_to_nil(x), do: x
defp maybe_replace_nil_mapset(repo, opts) do
if opt_config(:nil_to_empty_mapset, repo, opts) do
MapSet.new()
else
nil
end
end
end