defmodule Ecto.Adapters.DynamoDB.Query do
@moduledoc """
Some query wrapper functions for helping us query dynamo db. Selects indexes to use, etc.
Not to be confused with `Ecto.Query`.
"""
import Ecto.Adapters.DynamoDB.Info
alias Ecto.Adapters.DynamoDB
alias Ecto.Adapters.DynamoDB.RepoConfig
alias Ecto.Repo
@typep key :: String.t()
@typep table_name :: String.t()
@typep query_op :: :== | :> | :< | :>= | :<= | :is_nil | :between | :begins_with | :in
@typep boolean_op :: :and | :or
@typep match_clause :: {term, query_op}
@typep search_clause :: {key, match_clause} | {boolean_op, [search_clause]}
@typep search :: [search_clause]
@typep dynamo_response :: %{required(String.t()) => term}
@typep query_opts :: [{atom(), any()}]
# DynamoDB will reject an entire batch get query if the query is for more than 100 records, so these need to be batched.
# https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html
@batch_get_item_limit 100
@logical_ops [:and, :or]
# parameters for get_item:
# TABLE_NAME::string,
# [{LOGICAL_OP::atom, [{ATTRIBUTE::string, {VALUE::string, OPERATOR::atom}}]} | {ATTRIBUTE::string, {VALUE::string, OPERATOR::atom}}]
#
# Repo.all(model), provide cached results for tables designated in :cached_tables
@spec get_item(Repo.t(), table_name, search, keyword) :: dynamo_response | no_return
def get_item(repo, table, search, opts) when search == [] do
maybe_scan(repo, table, search, opts)
end
# Regular queries
def get_item(repo, table, search, opts) do
parsed_index =
case get_best_index!(repo, table, search, opts) do
# Primary key without range
{:primary, [_idx] = idxs} ->
{:primary, idxs}
{:primary, idxs} ->
{_, op2} = deep_find_key(search, Enum.at(idxs, 1))
# Maybe query on composite primary
if op2 in [:between, :begins_with, :<, :>, :<=, :>=] do
{:primary_partial, idxs}
else
{:primary, idxs}
end
parsed ->
parsed
end
results =
case parsed_index do
# primary key based lookup uses the efficient 'get_item' operation
{:primary, indexes} = index ->
{hash_values, op} = deep_find_key(search, hd(indexes))
if op == :in do
responses_element = "Responses"
unprocessed_keys_element = "UnprocessedKeys"
# The default format of the response from Dynamo.
response_map = %{responses_element => %{table => []}, unprocessed_keys_element => %{}}
Enum.chunk_every(hash_values, @batch_get_item_limit)
|> Enum.reduce(response_map, fn hash_batch, acc ->
# Modify the 'search' arg so that it only contains values from the current hash_batch.
batched_search = make_batched_search(search, hash_batch)
%{
^responses_element => %{^table => results},
^unprocessed_keys_element => unprocessed_key_map
} =
ExAws.Dynamo.batch_get_item(
construct_batch_get_item_query(
table,
indexes,
hash_batch,
batched_search,
construct_opts(:get_item, opts)
)
)
|> ExAws.request!(DynamoDB.ex_aws_config(repo))
Kernel.put_in(
acc,
[responses_element, table],
acc[responses_element][table] ++ results
)
|> maybe_put_unprocessed_keys(unprocessed_key_map, table, unprocessed_keys_element)
end)
else
# https://hexdocs.pm/ex_aws/ExAws.Dynamo.html#get_item/3
query = construct_search(index, search, opts)
ExAws.Dynamo.get_item(table, query, construct_opts(:get_item, opts))
|> ExAws.request!(DynamoDB.ex_aws_config(repo))
end
# secondary index based lookups need the query functionality.
index when is_tuple(index) ->
index_fields = get_hash_range_key_list(index)
{hash_values, op} = deep_find_key(search, hd(index_fields))
# https://hexdocs.pm/ex_aws/ExAws.Dynamo.html#query/2
query = construct_search(index, search, opts)
if op == :in do
responses_element = "Responses"
response_map = %{responses_element => %{table => []}}
Enum.reduce(hash_values, response_map, fn hash_value, acc ->
# When receiving a list of values to query on, construct a custom query for each of those values to pass into do_fetch_recursive/1.
new_query =
Kernel.put_in(query, [:expression_attribute_values, :hash_key], hash_value)
%{"Items" => items} = do_fetch_recursive(repo, new_query, table, opts)
Kernel.put_in(
acc,
[responses_element, table],
acc[responses_element][table] ++ items
)
end)
else
do_fetch_recursive(repo, query, table, opts)
end
:scan ->
maybe_scan(repo, table, search, opts)
end
results
end
defp do_fetch_recursive(repo, query, table, opts) do
fetch_recursive(
repo,
&ExAws.Dynamo.query/2,
table,
query,
parse_recursive_option(:query, opts),
%{}
)
end
# In the case of a partial query on a composite key secondary index, the value of index in get_item/2 will be a three-element tuple, ex. {:secondary_partial, "person_id_entity", ["person_id"]}.
# Otherwise, we can expect it to be a two-element tuple.
defp get_hash_range_key_list({_index_type, index_name, index_fields}),
do: get_hash_range_key_list({index_name, index_fields})
defp get_hash_range_key_list({_index_name, index_fields}), do: index_fields
# If a batch_get_item request returns unprocessed keys, update the accumulator with those values.
defp maybe_put_unprocessed_keys(acc, unprocessed_key_map, _table, _unprocessed_keys_element)
when unprocessed_key_map == %{},
do: acc
defp maybe_put_unprocessed_keys(acc, unprocessed_key_map, table, unprocessed_keys_element) do
if Map.has_key?(acc[unprocessed_keys_element], table) do
keys_element = "Keys"
Kernel.put_in(
acc,
[unprocessed_keys_element, table, keys_element],
acc[unprocessed_keys_element][table][keys_element] ++
unprocessed_key_map[table][keys_element]
)
else
Map.put(acc, unprocessed_keys_element, unprocessed_key_map)
end
end
# The initial 'search' arg will have a list of all of the values being queried for;
# when passing this data to construct_batch_get_item_query/5 during a batched operation,
# use a modified form of the 'search' arg that contains only the values from the current batch.
defp make_batched_search([and: [range_query, {hash_key, {_vals, op}}]], hash_batch),
do: [{hash_key, {hash_batch, op}}, range_query]
defp make_batched_search([{index, {_vals, op}}], hash_batch), do: [{index, {hash_batch, op}}]
@doc """
Returns an atom, :scan or :query, specifying whether the current search will be a DynamoDB scan or a query.
"""
def scan_or_query?(repo, table, search) do
if get_best_index!(repo, table, search) == :scan, do: :scan, else: :query
end
# we've a list of fields from an index that matches (some of) the search fields, so construct
# a DynamoDB search criteria map with only the given fields and their search objects!
@spec construct_search(
{:primary | :primary_partial | nil | String.t(), [String.t()]},
search,
keyword
) :: keyword
@spec construct_search({:secondary_partial, String.t(), [String.t()]}, search, keyword) ::
keyword
def construct_search({:primary, index_fields}, search, opts),
do: construct_search(%{}, index_fields, search, opts)
def construct_search({:primary_partial, index_fields}, search, opts) do
# do not provide index_name for primary partial
construct_search({nil, index_fields}, search, opts)
end
def construct_search({index_name, index_fields}, search, opts) do
# Construct a DynamoDB FilterExpression (since it cannot be provided blank but may be,
# we merge it with the full query)
{filter_expression_tuple, expression_attribute_names, expression_attribute_values} =
construct_filter_expression(search, index_fields)
updated_ops = construct_opts(:query, opts)
# :primary_partial might not provide an index name
criteria = if index_name != nil, do: [index_name: index_name], else: []
criteria ++
case index_fields do
[hash, range] ->
{hash_val, _op} = deep_find_key(search, hash)
{range_expression, range_attribute_names, range_attribute_values} =
construct_range_params(range, deep_find_key(search, range))
[
# We need ExpressionAttributeNames when field-names are reserved, for example "name" or "role"
key_condition_expression: "##{hash} = :hash_key AND #{range_expression}",
expression_attribute_names:
Enum.reduce(
[%{"##{hash}" => hash}, range_attribute_names, expression_attribute_names],
&Map.merge/2
),
expression_attribute_values:
[hash_key: hash_val] ++ range_attribute_values ++ expression_attribute_values
] ++ filter_expression_tuple ++ updated_ops
[hash] ->
{hash_val, _op} = deep_find_key(search, hash)
[
key_condition_expression: "##{hash} = :hash_key",
expression_attribute_names:
Map.merge(%{"##{hash}" => hash}, expression_attribute_names),
expression_attribute_values: [hash_key: hash_val] ++ expression_attribute_values
] ++ filter_expression_tuple ++ updated_ops
end
end
def construct_search({:secondary_partial, index_name, index_fields}, search, opts) do
construct_search({index_name, index_fields}, search, opts)
end
defp construct_search(criteria, [], _, _), do: criteria
defp construct_search(criteria, [index_field | index_fields], search, opts) do
Map.put(criteria, index_field, elem(deep_find_key(search, index_field), 0))
|> construct_search(index_fields, search, opts)
end
@spec construct_range_params(key, match_clause) ::
{String.t(), %{required(String.t()) => key}, keyword()}
defp construct_range_params(range, {[range_start, range_end], :between}) do
{"##{range} between :range_start and :range_end", %{"##{range}" => range},
[range_start: range_start, range_end: range_end]}
end
defp construct_range_params(range, {prefix, :begins_with}) do
{"begins_with(##{range}, :prefix)", %{"##{range}" => range}, [prefix: prefix]}
end
defp construct_range_params(range, {range_val, :==}) do
{"##{range} = :range_key", %{"##{range}" => range}, [range_key: range_val]}
end
defp construct_range_params(range, {range_val, op}) when op in [:<, :>, :<=, :>=] do
{"##{range} #{to_string(op)} :range_key", %{"##{range}" => range}, [range_key: range_val]}
end
@spec construct_opts(atom, keyword) :: keyword
defp construct_opts(query_type, opts) do
take_opts =
case query_type do
:get_item -> [:consistent_read]
:query -> [:exclusive_start_key, :limit, :scan_index_forward, :consistent_read]
end
case opts[:projection_expression] do
nil -> [select: opts[:select] || :all_attributes]
_ -> Keyword.take(opts, [:projection_expression])
end ++ Keyword.take(opts, take_opts)
end
# returns a tuple: {filter_expression_tuple, expression_attribute_names, expression_attribute_values}
@spec construct_filter_expression(search, [String.t()]) ::
{[filter_expression: String.t()], map, keyword}
defp construct_filter_expression(search, index_fields) do
# We can only construct a FilterExpression on attributes not in key-conditions.
non_indexed_filters = collect_non_indexed_search(search, index_fields, [])
case non_indexed_filters do
[] ->
{[], %{}, []}
_ ->
{filter_expression_list, expression_attribute_names, expression_attribute_values} =
build_filter_expression_data(non_indexed_filters, {[], %{}, %{}})
{[filter_expression: Enum.join(filter_expression_list, " and ")],
expression_attribute_names, Enum.into(expression_attribute_values, [])}
end
end
# Recursively strip out the fields for key-conditions; they could be mixed with non key-conditions.
# TODO: this may be redundant - the indexed fields can just be skipped during the expression construction
@spec collect_non_indexed_search(search, [String.t()], search) :: search
defp collect_non_indexed_search([], _index_fields, acc), do: acc
defp collect_non_indexed_search([search_clause | search_clauses], index_fields, acc) do
case search_clause do
{field, {_val, _op}} = complete_tuple when field not in @logical_ops ->
if Enum.member?(index_fields, field),
do: collect_non_indexed_search(search_clauses, index_fields, acc),
else: collect_non_indexed_search(search_clauses, index_fields, [complete_tuple | acc])
{logical_op, deeper_clauses} when logical_op in @logical_ops ->
filtered_clauses = collect_non_indexed_search(deeper_clauses, index_fields, [])
# don't keep empty logical_op groups
if filtered_clauses == [],
do: collect_non_indexed_search(search_clauses, index_fields, acc),
else:
collect_non_indexed_search(search_clauses, index_fields, [
{logical_op, filtered_clauses} | acc
])
end
end
# Recursively reconstruct parentheticals
@type expression_data_acc :: {[String.t()], map, map}
@spec build_filter_expression_data(search, expression_data_acc) :: expression_data_acc
defp build_filter_expression_data([], acc), do: acc
defp build_filter_expression_data([expr | exprs], {filter_exprs, attr_names, attr_values}) do
case expr do
# a list of lookup fields; iterate.
{field, {val, op} = val_op_tuple} = complete_tuple when is_tuple(val_op_tuple) ->
updated_filter_exprs = [construct_conditional_statement(complete_tuple) | filter_exprs]
updated_attr_names = Map.merge(%{"##{field}" => field}, attr_names)
updated_attr_values =
Map.merge(format_expression_attribute_value(field, val, op), attr_values)
build_filter_expression_data(
exprs,
{updated_filter_exprs, updated_attr_names, updated_attr_values}
)
{logical_op, exprs_list} when is_list(exprs_list) ->
{deeper_filter_exprs, deeper_attr_names, deeper_attr_values} =
build_filter_expression_data(exprs_list, {[], %{}, %{}})
# We don't parenthesize only one expression
updated_filter_exprs =
case deeper_filter_exprs do
[one_expression] ->
[one_expression | filter_exprs]
many_expressions ->
[
"(" <> Enum.join(many_expressions, " #{to_string(logical_op)} ") <> ")"
| filter_exprs
]
end
updated_attr_names = Map.merge(deeper_attr_names, attr_names)
updated_attr_values = Map.merge(deeper_attr_values, attr_values)
build_filter_expression_data(
exprs,
{updated_filter_exprs, updated_attr_names, updated_attr_values}
)
end
end
@spec format_expression_attribute_value(key, term, query_op | [query_op]) :: map
defp format_expression_attribute_value(field, _val, :is_nil),
do: %{String.to_atom(field <> "_val") => nil}
defp format_expression_attribute_value(field, val, :in) do
{result, _count} =
Enum.reduce(val, {%{}, 1}, fn v, {acc, count} ->
{Map.merge(acc, %{String.to_atom(field <> "_val#{to_string(count)}") => v}), count + 1}
end)
result
end
# double op
defp format_expression_attribute_value(field, [val1, val2], [_op1, _op2]) do
%{String.to_atom(field <> "_val1") => val1, String.to_atom(field <> "_val2") => val2}
end
defp format_expression_attribute_value(field, [start_val, end_val], :between) do
%{
String.to_atom(field <> "_start_val") => start_val,
String.to_atom(field <> "_end_val") => end_val
}
end
defp format_expression_attribute_value(field, val, _op),
do: %{String.to_atom(field <> "_val") => val}
# double op (neither of them ought be :==)
@spec construct_conditional_statement({key, {term, query_op} | {[term], [query_op]}}) ::
String.t()
defp construct_conditional_statement({field, {[_val1, _val2], [op1, op2]}}) do
"##{field} #{to_string(op1)} :#{field <> "_val1"} and ##{field} #{to_string(op2)} :#{field <> "_val2"}"
end
defp construct_conditional_statement({field, {_val, :is_nil}}) do
"(##{field} = :#{field <> "_val"} or attribute_not_exists(##{field}))"
end
defp construct_conditional_statement({field, {val, :in}}) do
{result, _count} =
Enum.reduce(val, {[], 1}, fn _val, {acc, count} ->
{[":#{field}_val#{to_string(count)}" | acc], count + 1}
end)
"(##{field} in (#{Enum.join(result, ",")}))"
end
defp construct_conditional_statement({field, {_val, :==}}) do
"##{field} = :#{field <> "_val"}"
end
defp construct_conditional_statement({field, {_val, op}}) when op in [:<, :>, :<=, :>=] do
"##{field} #{to_string(op)} :#{field <> "_val"}"
end
defp construct_conditional_statement({field, {[_start_val, _end_val], :between}}) do
"##{field} between :#{field <> "_start_val"} and :#{field <> "_end_val"}"
end
defp construct_conditional_statement({field, {_val, :begins_with}}) do
"begins_with(##{field}, :#{field}_val)"
end
defp construct_batch_get_item_query(table, indexes, hash_values, search, opts) do
take_opts = Keyword.take(opts, [:consistent_read, :projection_expression])
keys =
case indexes do
[hash_key] ->
Enum.map(hash_values, fn hash_value -> [{String.to_atom(hash_key), hash_value}] end)
[hash_key, range_key] ->
{range_values, :in} = deep_find_key(search, range_key)
zipped = Enum.zip(hash_values, range_values)
Enum.map(zipped, fn {hash_value, range_value} ->
[{String.to_atom(hash_key), hash_value}, {String.to_atom(range_key), range_value}]
end)
end
%{table => [keys: keys] ++ take_opts}
end
@doc """
Given a map with a search criteria, finds the best index to search against it.
Returns a tuple indicating whether it's a primary key index, or a secondary index.
To query against a secondary index in Dynamo, we NEED to have it's index name,
so secondary indexes are returned as a tuple with the field name, whilst
the primary key uses the atom :primary to distinguish it.
{:primary, [indexed_fields_list]} | {"index_name", [indexed_fields_list]}
Exception if the index doesn't exist.
"""
@spec get_best_index(Repo.t(), table_name, search, query_opts) ::
:not_found
| {:primary, [String.t()]}
| {:primary_partial, [String.t()]}
| {String.t(), [String.t()]}
| {:secondary_partial, String.t(), [String.t()]}
| no_return
def get_best_index(repo, tablename, search, opts) do
case get_matching_primary_index(repo, tablename, search) do
# if we found a primary index with hash+range match, it's probably the best index.
{:primary, _} = index ->
index
# we've found a primary hash index, but lets check if there's a more specific
# secondary index with hash+sort available...
{:primary_partial, _primary_hash} = index ->
case get_matching_secondary_index(repo, tablename, search, opts) do
# we've found a better, more specific index.
{_, [_, _]} = sec_index -> sec_index
# :not_found, or any other hash? default back to the primary.
_ -> index
end
# no primary found, so try for a secondary.
:not_found ->
get_matching_secondary_index(repo, tablename, search, opts)
end
end
@doc """
Same as get_best_index, but refers to a scan option on failure
"""
def get_best_index!(repo, tablename, search, opts \\ []) do
case get_best_index(repo, tablename, search, opts) do
:not_found -> :scan
index -> index
end
end
@doc """
Given a search criteria of 1 or more fields, we try find out if the primary key is a
good match and can be used to forfill this search. Returns the tuple
{:primary, [hash] | [hash, range]}
or
:not_found
"""
def get_matching_primary_index(repo, tablename, search) do
primary_key = primary_key!(repo, tablename)
case match_index(primary_key, search) do
# We found a full primary index
{:primary, _} = index -> index
# We might be able to use a range query for all results with
# the hash part, such as all circle_member with a specific person_id (all a user's circles).
:not_found -> match_index_hash_part(primary_key, search)
end
end
@doc """
Given a keyword list containing search field, value and operator to search for (which may also be nested under logical operators, e.g., `[{"id", {"franko", :==}}]`, or `[{:and, [{"circle_id", {"123", :==}}, {"person_id", {"abc", :>}}]}`), will return the dynamo db index description that will help us match this search. return :not_found if no index is found.
Returns a tuple of {"index_name", [ hash_key or hash,range_key]]} or :not_found
TODO: Does not help with range queries. -> The match_index_hash_part function is
beginning to address this.
"""
def get_matching_secondary_index(repo, tablename, search, opts) do
secondary_indexes = secondary_indexes(repo, tablename)
# A user may provide an :index opt in a query, in which case we will prioritize choosing that index.
case opts[:index] do
nil ->
find_best_match(secondary_indexes, search, :not_found)
index_option ->
case maybe_select_index_option(index_option, secondary_indexes) do
nil -> index_option_error(index_option, secondary_indexes)
index -> index
end
end
end
defp maybe_select_index_option(index_option, secondary_indexes)
when is_atom(index_option) do
index_option
|> Atom.to_string()
|> maybe_select_index_option(secondary_indexes)
end
defp maybe_select_index_option(index_option, secondary_indexes),
do: Enum.find(secondary_indexes, fn {name, _keys} -> name == index_option end)
@spec index_option_error(String.t() | atom(), [{String.t(), [String.t()]}]) :: no_return
defp index_option_error(_index_option, []) do
raise ArgumentError,
message:
"#{inspect(__MODULE__)}.get_matching_secondary_index/4 error: :index option does not match existing secondary index names."
end
defp index_option_error(index_option, secondary_indexes) do
index_option = if is_atom(index_option), do: Atom.to_string(index_option), else: index_option
{nearest_index_name, jaro_distance} =
secondary_indexes
|> Enum.map(fn {name, _keys} -> {name, String.jaro_distance(index_option, name)} end)
|> Enum.max_by(fn {_name, jaro_distance} -> jaro_distance end)
case jaro_distance >= 0.75 do
true ->
raise ArgumentError,
message:
"#{inspect(__MODULE__)}.get_matching_secondary_index/4 error: :index option does not match existing secondary index names. Did you mean #{nearest_index_name}?"
false ->
index_option_error(index_option, [])
end
end
defp find_best_match([], _search, best), do: best
defp find_best_match([index | indexes], search, best) do
case match_index(index, search) do
# Matching on both hash + sort makes this the best index (as well as we can tell)
{_, [_, _]} ->
index
{_, [hash]} ->
case search do
# If we're only querying on a single field and we find a matching hash-only index, that's the index to use, no need to check others.
[{field, {_, _}}] ->
if field == hash, do: index, else: find_best_match(indexes, search, best)
# we have a candidate for best match, though it's a hash key only. Look for better.
_ ->
find_best_match(indexes, search, index)
end
:not_found ->
case match_index_hash_part(index, search) do
# haven't found anything good, keep looking, retain our previous best match.
:not_found ->
find_best_match(indexes, search, best)
index_partial ->
# If the current best is a hash-only index (formatted like {"idx_name", ["hash"]}), always choose it over a partial secondary.
# Note that this default behavior would cause a hash-only key to be selected over a partial with a different hash
# in cases where a query might be ambiguous - for example, a query on a user's first_name and last_name where
# a hash-only index exists on first_name and a composite index exists on last_name_something_else.
case best do
{_, [_]} -> find_best_match(indexes, search, best)
_ -> find_best_match(indexes, search, index_partial)
end
end
end
end
# The parameter, 'search', is a map: %{field_name::string => {value::string, operator::atom}}
defp match_index(index, search) do
case index do
{_, [hash, range]} ->
# Part of the query could be a nil filter on an indexed attribute;
# in that case, we need a check in addition to the key, so we check the operator.
# Also, the hash part can only accept an :== operator.
hash_key = deep_find_key(search, hash)
range_key = deep_find_key(search, range)
if hash_key != nil and elem(hash_key, 1) in [:==, :in] and
range_key != nil and elem(range_key, 1) != :is_nil,
do: index,
else: :not_found
{_, [hash]} ->
hash_key = deep_find_key(search, hash)
if hash_key != nil and elem(hash_key, 1) in [:==, :in], do: index, else: :not_found
end
end
@spec match_index_hash_part({:primary, [key]}, search) :: {:primary_partial, [key]} | :not_found
@spec match_index_hash_part({String.t(), [key]}, search) ::
{:secondary_partial, String.t(), [key]} | :not_found
defp match_index_hash_part(index, search) do
case index do
{:primary, [hash, _range]} ->
hash_key = deep_find_key(search, hash)
if hash_key != nil and elem(hash_key, 1) in [:==, :in],
do: {:primary_partial, [hash]},
else: :not_found
{index_name, [hash, _range]} ->
hash_key = deep_find_key(search, hash)
if hash_key != nil and elem(hash_key, 1) in [:==, :in],
do: {:secondary_partial, index_name, [hash]},
else: :not_found
_ ->
:not_found
end
end
# TODO: multiple use of deep_find_key could be avoided by using the recursion in the main module to provide a set of indexed attributes in addition to the nested logical clauses.
@spec deep_find_key(search, key) :: nil | {term, query_op}
defp deep_find_key([], _), do: nil
defp deep_find_key([clause | clauses], key) do
case clause do
{field, {val, op}} when field not in @logical_ops ->
if field == key, do: {val, op}, else: deep_find_key(clauses, key)
{logical_op, deeper_clauses} when logical_op in @logical_ops ->
found = deep_find_key(deeper_clauses, key)
if found != nil, do: found, else: deep_find_key(clauses, key)
end
end
@doc """
Formats the recursive option according to whether the query is a DynamoDB scan or query. (The adapter defaults to recursive fetch in case of the latter but not the former)
"""
def parse_recursive_option(scan_or_query, opts) do
case opts[:page_limit] do
page_limit when is_integer(page_limit) and page_limit > 0 ->
page_limit
page_limit when is_integer(page_limit) and page_limit < 1 ->
raise ArgumentError,
message:
"#{inspect(__MODULE__)}.parse_recursive_option/2 error: :page_limit option must be greater than 0."
_ when scan_or_query == :scan ->
# scan defaults to no recursion, opts[:recursive] must equal true to enable it
opts[:recursive] == true
_ when scan_or_query == :query ->
# query defaults to recursion, opts[:recursive] must equal false to disable it
opts[:recursive] != false
end
end
# scan
defp maybe_scan(repo, table, [], opts) do
scan_enabled =
opts[:scan] == true || RepoConfig.config_val(repo, :scan_all) == true ||
scannable_table?(repo, table)
cond do
# TODO: we could use the cached scan and apply the search filters
# ourselves when they are provided.
cached_table?(repo, table) and opts[:scan] != true ->
Ecto.Adapters.DynamoDB.Cache.scan!(repo, table)
scan_enabled ->
limit_option = opts[:limit] || RepoConfig.config_val(repo, :scan_limit)
scan_limit = if is_integer(limit_option), do: [limit: limit_option], else: []
updated_opts = Keyword.drop(opts, [:recursive, :limit, :scan]) ++ scan_limit
fetch_recursive(
repo,
&ExAws.Dynamo.scan/2,
table,
updated_opts,
parse_recursive_option(:scan, opts),
%{}
)
true ->
maybe_scan_error(table)
end
end
defp maybe_scan(repo, table, search, opts) do
scan_enabled =
opts[:scan] == true || RepoConfig.config_val(repo, :scan_all) == true ||
scannable_table?(repo, table)
limit_option = opts[:limit] || RepoConfig.config_val(repo, :scan_limit)
scan_limit = if is_integer(limit_option), do: [limit: limit_option], else: []
updated_opts = Keyword.drop(opts, [:recursive, :limit, :scan]) ++ scan_limit
if scan_enabled do
{filter_expression_tuple, expression_attribute_names, expression_attribute_values} =
construct_filter_expression(search, [])
expressions =
[
expression_attribute_names: expression_attribute_names,
expression_attribute_values: expression_attribute_values
] ++ updated_opts ++ filter_expression_tuple
fetch_recursive(
repo,
&ExAws.Dynamo.scan/2,
table,
expressions,
parse_recursive_option(:scan, opts),
%{}
)
else
maybe_scan_error(table)
end
end
@spec maybe_scan_error(table_name) :: no_return
defp maybe_scan_error(table) do
raise ArgumentError,
message:
"#{inspect(__MODULE__)}.maybe_scan/3 error: :scan option or configuration have not been specified, and could not confirm the table, #{inspect(table)}, as listed for scan or caching in the application's configuration. Please see README file for details."
end
@typep fetch_func :: (table_name, keyword -> ExAws.Operation.JSON.t())
@spec fetch_recursive(Repo.t(), fetch_func, table_name, keyword, boolean | number, map) ::
dynamo_response
defp fetch_recursive(repo, func, table, expressions, recursive, result) do
updated_expressions =
if recursive == true, do: Keyword.delete(expressions, :limit), else: expressions
fetch_result =
func.(table, updated_expressions) |> ExAws.request!(DynamoDB.ex_aws_config(repo))
# recursive can be a boolean or a page limit
updated_recursive = update_recursive_option(recursive)
if fetch_result["LastEvaluatedKey"] != nil and updated_recursive.continue do
fetch_recursive(
repo,
func,
table,
updated_expressions ++ [exclusive_start_key: fetch_result["LastEvaluatedKey"]],
updated_recursive.new_value,
combine_results(result, fetch_result)
)
else
combine_results(result, fetch_result)
end
end
@doc """
Updates the recursive option during a recursive fetch, according to whether the option is a boolean or an integer (as in the case of page_limit)
"""
def update_recursive_option(r) when is_boolean(r), do: %{continue: r, new_value: r}
def update_recursive_option(r) when is_integer(r), do: %{continue: r > 1, new_value: r - 1}
@spec combine_results(map, map) :: map
defp combine_results(result, scan_result) do
if result == %{} do
scan_result
else
%{"Count" => result_count, "Items" => result_items, "ScannedCount" => result_scanned_count} =
result
%{
"Count" => scanned_count,
"Items" => scanned_items,
"ScannedCount" => scanned_scanned_count
} = scan_result
%{
"Count" => result_count + scanned_count,
"Items" => result_items ++ scanned_items,
"ScannedCount" => result_scanned_count + scanned_scanned_count
}
end
end
defp scannable_table?(repo, table), do: RepoConfig.table_in_list?(repo, table, :scan_tables)
defp cached_table?(repo, table), do: RepoConfig.table_in_list?(repo, table, :cached_tables)
end