lib/ecto_adapters_dynamodb.ex

defmodule Ecto.Adapters.DynamoDB do
  @moduledoc """
  Ecto adapter for Amazon DynamoDB.
  """
  @behaviour Ecto.Adapter
  @behaviour Ecto.Adapter.Schema
  @behaviour Ecto.Adapter.Queryable
  @behaviour Ecto.Adapter.Migration

  @impl Ecto.Adapter
  defmacro __before_compile__(_env) do
    # Nothing to see here, yet...
  end

  use Bitwise, only_operators: true

  alias Confex.Resolver
  alias Ecto.Adapters.DynamoDB.Cache
  alias Ecto.Adapters.DynamoDB.DynamoDBSet
  alias Ecto.Adapters.DynamoDB.RepoConfig
  alias Ecto.Query.BooleanExpr
  alias ExAws.Dynamo

  require Logger

  @pool_opts [:timeout, :pool_size, :migration_lock]
  @max_transaction_conflict_retries Application.get_env(
                                      :ecto_adapters_dynamodb,
                                      :max_transaction_conflict_retries,
                                      10
                                    )

  # DynamoDB will reject attempts to batch write more than 25 records at once
  # https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
  @batch_write_item_limit 25

  @impl Ecto.Adapter
  def init(config) do
    log = Keyword.get(config, :log, :debug)
    telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)

    meta = %{
      opts: Keyword.take(config, @pool_opts),
      telemetry: {config[:repo], log, telemetry_prefix},
      migration_source: Keyword.get(config, :migration_source, "schema_migrations")
    }

    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.init", %{
      "#{inspect(__MODULE__)}.init-params" => %{config: config}
    })

    {:ok, Cache.child_spec([config[:repo]]), meta}
  end

  @doc """
  Ensure all applications necessary to run the adapter are started.
  """
  @impl Ecto.Adapter
  def ensure_all_started(config, type) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.ensure_all_started", %{
      "#{inspect(__MODULE__)}.ensure_all_started-params" => %{type: type, config: config}
    })

    with {:ok, _} = Application.ensure_all_started(:ecto_adapters_dynamodb) do
      {:ok, [config]}
    end
  end

  @impl Ecto.Adapter.Migration
  def supports_ddl_transaction?, do: false

  @impl Ecto.Adapter.Migration
  def execute_ddl(adapter_meta, command, options) do
    Ecto.Adapters.DynamoDB.Migration.execute_ddl(adapter_meta, command, options)
  end

  @impl Ecto.Adapter.Migration
  def lock_for_migrations(%{opts: adapter_opts} = _meta, _opts, callback) do
    # TODO - consider adding support for this? See https://github.com/circles-learning-labs/ecto_adapters_dynamodb/issues/34
    if Keyword.get(adapter_opts, :migration_lock) do
      raise "#{inspect(__MODULE__)}.lock_for_migrations error: #{inspect(__MODULE__)} does not currently support migration table lock; please remove the :migration_lock option from your repo configuration or set it to nil"
    else
      callback.()
    end
  end

  @impl Ecto.Adapter
  def checkout(_meta, _opts, _fun) do
    # TODO - consider adding support for this? See https://github.com/circles-learning-labs/ecto_adapters_dynamodb/issues/33
    raise "#{inspect(__MODULE__)}.checkout: #{inspect(__MODULE__)} does not currently support checkout"
  end

  @impl Ecto.Adapter
  def checked_out?(_meta), do: false

  @impl Ecto.Adapter.Queryable
  def stream(_adapter_meta, _query_meta, _query, _params, _opts) do
    # TODO - consider adding support for this? See https://github.com/circles-learning-labs/ecto_adapters_dynamodb/issues/32
    raise "#{inspect(__MODULE__)}.stream: #{inspect(__MODULE__)} does not currently support stream"
  end

  @doc """
  Called to autogenerate a value for id/embed_id/binary_id.

  Returns the autogenerated value, or nil if it must be
  autogenerated inside the storage or raise if not supported.

  For the Ecto type, `:id`, the adapter autogenerates a 128-bit integer

  For the Ecto type, `:embed_id`, the adapter autogenerates a string, using `Ecto.UUID.generate()`

  For the Ecto type, `:binary_id`, the adapter autogenerates a string, using `Ecto.UUID.generate()`
  """

  # biggest possible int in 128 bits
  @max_id (1 <<< 128) - 1
  @impl Ecto.Adapter.Schema
  def autogenerate(:id), do: Enum.random(1..@max_id)
  def autogenerate(:embed_id), do: Ecto.UUID.generate()
  def autogenerate(:binary_id), do: Ecto.UUID.generate()

  @doc """
  Returns the loaders for a given type.

  Rather than use the Ecto adapter loaders callback, the adapter builds on ExAws' decoding functionality, please see ExAws's `ExAws.Dynamo.Decoder`, in this module, which at this time only loads :utc_datetime and :naive_datetime.
  """
  @impl Ecto.Adapter
  def loaders(_primitive, type), do: [type]

  @doc """
  Returns the dumpers for a given type.

  We rely on ExAws encoding functionality during insertion and update to properly format types for DynamoDB. Please see ExAws `ExAws.Dynamo.update_item` and `ExAws.Dynamo.put_item` for specifics. Currently, we only modify :utc_datetime and :naive_datetime, appending the UTC offset, "Z", to the datetime string before passing to ExAws.
  """
  @impl Ecto.Adapter
  def dumpers(type, datetime)
      when type in [:naive_datetime, :naive_datetime_usec, :utc_datetime, :utc_datetime_usec],
      do: [datetime, &to_iso_string/1]

  def dumpers(_primitive, type), do: [type]

  # Add UTC offset
  # We are adding the offset here also for the :naive_datetime, this
  # assumes we are getting a UTC date (which does correspond with the
  # timestamps() macro but not necessarily with :naive_datetime in general)
  defp to_iso_string(datetime) do
    iso_string =
      case datetime do
        %NaiveDateTime{} ->
          (datetime |> NaiveDateTime.to_iso8601()) <> "Z"

        %DateTime{} ->
          datetime |> DateTime.to_iso8601()

        nil ->
          nil
      end

    {:ok, iso_string}
  end

  @doc """
  Commands invoked to prepare a query for `all`, `update_all` and `delete_all`.

  The returned result is given to `execute/6`.
  """
  # @callback prepare(atom :: :all | :update_all | :delete_all, query :: Ecto.Query.t) ::
  #          {:cache, prepared} | {:nocache, prepared}
  @impl Ecto.Adapter.Queryable
  def prepare(:all, query) do
    # 'preparing' is more a SQL concept - Do we really need to do anything here or just pass the params through?
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.prepare: :all", %{
      "#{inspect(__MODULE__)}.prepare-params" => %{query: inspect(query, structs: false)}
    })

    {:nocache, {:all, query}}
  end

  def prepare(:update_all, query) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.prepare: :update_all", %{
      "#{inspect(__MODULE__)}.prepare-params" => %{query: inspect(query, structs: false)}
    })

    {:nocache, {:update_all, query}}
  end

  def prepare(:delete_all, query) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.prepare: :delete_all", %{
      "#{inspect(__MODULE__)}.prepare-params" => %{query: inspect(query, structs: false)}
    })

    {:nocache, {:delete_all, query}}
  end

  @doc """
  Executes a previously prepared query.

  It must return a tuple containing the number of entries and
  the result set as a list of lists. The result set may also be
  `nil` if a particular operation does not support them.

  The `meta` field is a map containing some of the fields found
  in the `Ecto.Query` struct.

  It receives a process function that should be invoked for each
  selected field in the query result in order to convert them to the
  expected Ecto type. The `process` function will be nil if no
  result set is expected from the query.
  """
  # @callback execute(repo, query_meta, query, params :: list(), process | nil, options) :: result when
  #          result: {integer, [[term]] | nil} | no_return,
  #          query: {:nocache, prepared} |
  #                 {:cached, (prepared -> :ok), cached} |
  #                 {:cache, (cached -> :ok), prepared}
  @impl Ecto.Adapter.Queryable
  def execute(
        %{repo: repo, migration_source: migration_source},
        query_meta,
        {:nocache, {func, prepared}},
        params,
        opts
      ) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.execute", %{
      "#{inspect(__MODULE__)}.execute-params" => %{
        repo: repo,
        query_meta: query_meta,
        prepared: prepared,
        params: params,
        opts: opts
      }
    })

    # table and model are now nested under .from.source
    {table, model} = prepared.from.source
    validate_where_clauses!(prepared)
    lookup_fields = extract_lookup_fields(prepared.wheres, params, [])

    limit_option = opts[:scan_limit]
    scan_limit = if is_integer(limit_option), do: [limit: limit_option], else: []

    updated_opts =
      if table == migration_source do
        ecto_dynamo_log(
          :debug,
          "#{inspect(__MODULE__)}.execute: table name corresponds with migration source: #{inspect(migration_source)}. Setting options for recursive scan.",
          %{}
        )

        Keyword.drop(opts, [:timeout, :log, :telemetry_options]) ++ [recursive: true]
      else
        Keyword.drop(opts, [:scan_limit, :limit, :telemetry_options]) ++ scan_limit
      end

    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.execute: local variables", %{
      "#{inspect(__MODULE__)}.execute-vars" => %{
        table: table,
        lookup_fields: lookup_fields,
        scan_limit: scan_limit
      }
    })

    case func do
      :delete_all ->
        delete_all(repo, table, lookup_fields, updated_opts)

      :update_all ->
        update_all(repo, table, lookup_fields, updated_opts, prepared.updates, params)

      :all ->
        ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.execute: :all", %{
          "#{inspect(__MODULE__)}.execute-all-vars" => %{
            table: table,
            lookup_fields: lookup_fields,
            updated_opts: updated_opts
          }
        })

        result = Ecto.Adapters.DynamoDB.Query.get_item(repo, table, lookup_fields, updated_opts)

        ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.execute: all: result", %{
          "#{inspect(__MODULE__)}.execute-all-result" => inspect(result)
        })

        if opts[:query_info_key],
          do:
            Ecto.Adapters.DynamoDB.QueryInfo.put(
              opts[:query_info_key],
              extract_query_info(result)
            )

        if result == %{} do
          # Empty map means "not found"
          {0, []}
        else
          case query_meta do
            %{select: %{from: {_, {_, _, _, types}}}} ->
              types = types_to_source_fields(model, types)
              handle_type_decode(table, result, types, repo, opts)

            _ ->
              if table == migration_source do
                decoded = Enum.map(result["Items"], &decode_item(&1, repo, opts))

                {length(decoded), decoded}
              else
                # Queries with a :select clause will not have the types available in the query_meta,
                # instead construct them from prepared.select
                types = construct_types_from_select_fields(prepared.select)

                handle_type_decode(table, result, types, repo, opts)
              end
          end
        end
    end
  end

  def max_transaction_conflict_retries, do: @max_transaction_conflict_retries

  defp handle_type_decode(table, result, types, repo, opts) do
    if !result["Count"] and !result["Responses"] do
      decoded = decode_item(result["Item"], types, repo, opts)

      {1, [decoded]}
    else
      # batch_get_item returns "Responses" rather than "Items"
      results_to_decode =
        if result["Items"], do: result["Items"], else: result["Responses"][table]

      decoded = Enum.map(results_to_decode, &decode_item(&1, types, repo, opts))

      {length(decoded), decoded}
    end
  end

  defp types_to_source_fields(model, types) do
    types
    |> Enum.into([], fn {field, type} ->
      {model.__schema__(:field_source, field), type}
    end)
  end

  # delete_all allows for the recursive option, scanning through multiple pages
  defp delete_all(repo, table, lookup_fields, opts) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.delete_all", %{
      "#{inspect(__MODULE__)}.delete_all-params" => %{
        table: table,
        lookup_fields: lookup_fields,
        opts: opts
      }
    })

    # select only the key
    {:primary, key_list} = Ecto.Adapters.DynamoDB.Info.primary_key!(repo, table)
    scan_or_query = Ecto.Adapters.DynamoDB.Query.scan_or_query?(repo, table, lookup_fields)
    recursive = Ecto.Adapters.DynamoDB.Query.parse_recursive_option(scan_or_query, opts)

    updated_opts =
      prepare_recursive_opts(opts ++ [projection_expression: Enum.join(key_list, ", ")])

    delete_all_recursive(repo, table, lookup_fields, updated_opts, recursive, %{}, 0)
  end

  defp delete_all_recursive(
         repo,
         table,
         lookup_fields,
         opts,
         recursive,
         query_info,
         total_processed
       ) do
    # query the table for which records to delete
    fetch_result = Ecto.Adapters.DynamoDB.Query.get_item(repo, table, lookup_fields, opts)

    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.delete_all_recursive: fetch_result", %{
      "#{inspect(__MODULE__)}.delete_all_recursive-fetch_result" => inspect(fetch_result)
    })

    items =
      case fetch_result do
        %{"Items" => fetch_items} -> fetch_items
        %{"Item" => item} -> [item]
        %{"Responses" => table_map} -> table_map[table]
        _ -> []
      end

    prepared_data =
      for key_list <- Enum.map(items, &Map.to_list/1) do
        key_map =
          for {key, val_map} <- key_list, into: %{}, do: {key, Dynamo.Decoder.decode(val_map)}

        [delete_request: [key: key_map]]
      end

    unprocessed_items =
      if prepared_data != [] do
        batch_delete(repo, table, prepared_data)
      else
        %{}
      end

    num_processed =
      length(prepared_data) -
        if !unprocessed_items[table], do: 0, else: length(unprocessed_items[table])

    updated_query_info =
      Enum.reduce(fetch_result, query_info, fn {key, val}, acc ->
        case key do
          "Count" ->
            Map.update(acc, key, val, fn x -> x + val end)

          "ScannedCount" ->
            Map.update(acc, key, val, fn x -> x + val end)

          "LastEvaluatedKey" ->
            Map.update(acc, key, val, fn _ -> fetch_result["LastEvaluatedKey"] end)

          _ ->
            acc
        end
      end)
      |> Map.update("UnprocessedItems", unprocessed_items, fn map ->
        if map == %{}, do: %{}, else: %{table => map[table] ++ unprocessed_items[table]}
      end)

    updated_recursive = Ecto.Adapters.DynamoDB.Query.update_recursive_option(recursive)

    if fetch_result["LastEvaluatedKey"] != nil and updated_recursive.continue do
      opts_with_offset = opts ++ [exclusive_start_key: fetch_result["LastEvaluatedKey"]]

      delete_all_recursive(
        repo,
        table,
        lookup_fields,
        opts_with_offset,
        updated_recursive.new_value,
        updated_query_info,
        total_processed + num_processed
      )
    else
      # We're not retrying unprocessed items yet, but we are providing the relevant info in the QueryInfo agent if :query_info_key is supplied
      if opts[:query_info_key],
        do: Ecto.Adapters.DynamoDB.QueryInfo.put(opts[:query_info_key], updated_query_info)

      {num_processed + total_processed, nil}
    end
  end

  # Returns unprocessed_items
  # Similarly to a batch insert, batch delete is also restricted by DDB's batch write limit of 25 records - these requests will be chunked as well.
  defp batch_delete(repo, table, prepared_data) do
    Enum.chunk_every(prepared_data, @batch_write_item_limit)
    |> Enum.reduce(%{}, fn batch, unprocessed_items ->
      batch_write_attempt =
        Dynamo.batch_write_item(%{table => batch})
        |> ExAws.request(ex_aws_config(repo))
        |> handle_error!(repo, %{table: table, records: []})

      case batch_write_attempt do
        %{"UnprocessedItems" => %{^table => items}} ->
          Map.update(unprocessed_items, table, items, &(&1 ++ items))

        _ ->
          unprocessed_items
      end
    end)
  end

  defp update_all(repo, table, lookup_fields, opts, updates, params) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update_all", %{
      "#{inspect(__MODULE__)}.update_all-params" => %{
        table: table,
        lookup_fields: lookup_fields,
        opts: opts
      }
    })

    scan_or_query = Ecto.Adapters.DynamoDB.Query.scan_or_query?(repo, table, lookup_fields)
    recursive = Ecto.Adapters.DynamoDB.Query.parse_recursive_option(scan_or_query, opts)

    key_list = Ecto.Adapters.DynamoDB.Info.primary_key!(repo, table)

    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update_all: key_list", %{
      "#{inspect(__MODULE__)}.update_all-key_list" => inspect(key_list)
    })

    # The remove statement must be constructed after finding pull-indexes, but it
    # also includes possibly removing nil fields, and since we have one handler for
    # both set and remove, we call it during the batch update process
    {update_expression, update_fields_sans_set_remove, set_remove_fields} =
      construct_update_expression(repo, updates, params, opts)

    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update_all: update fields", %{
      "#{inspect(__MODULE__)}.update_all-update_fields" => %{
        update_fields_sans_set_remove: inspect(update_fields_sans_set_remove),
        set_remove_fields: inspect(set_remove_fields)
      }
    })

    attribute_names = construct_expression_attribute_names(update_fields_sans_set_remove)

    attribute_values =
      construct_expression_attribute_values(repo, update_fields_sans_set_remove, opts)

    base_update_options = [
      expression_attribute_names: attribute_names,
      update_expression: update_expression,
      return_values: :all_new
    ]

    updated_opts = prepare_recursive_opts(opts)
    update_options = maybe_add_attribute_values(base_update_options, attribute_values)

    pull_actions_without_index =
      Keyword.keys(set_remove_fields[:pull])
      |> Enum.any?(fn x -> !Enum.member?(Keyword.keys(maybe_list(opts[:pull_indexes])), x) end)

    {new_update_options, new_set_remove_fields} =
      if pull_actions_without_index do
        {update_options, set_remove_fields}
      else
        merged_pull_indexes =
          Keyword.merge(set_remove_fields[:pull], maybe_list(opts[:pull_indexes]))

        opts_with_pull_indexes =
          Keyword.update(opts, :pull_indexes, merged_pull_indexes, fn _ -> merged_pull_indexes end)

        {update_batch_update_options(
           repo,
           update_options,
           set_remove_fields,
           opts_with_pull_indexes
         ), []}
      end

    update_all_recursive(
      repo,
      table,
      lookup_fields,
      updated_opts,
      new_update_options,
      key_list,
      new_set_remove_fields,
      recursive,
      %{},
      0
    )
  end

  defp update_all_recursive(
         repo,
         table,
         lookup_fields,
         opts,
         update_options,
         key_list,
         set_remove_fields,
         recursive,
         query_info,
         total_updated
       ) do
    fetch_result = Ecto.Adapters.DynamoDB.Query.get_item(repo, table, lookup_fields, opts)

    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update_all_recursive: fetch_result", %{
      "#{inspect(__MODULE__)}.update_all_recursive-fetch_result" => inspect(fetch_result)
    })

    updated_query_info =
      case fetch_result do
        %{"Count" => last_count, "ScannedCount" => last_scanned_count} ->
          %{
            "Count" => last_count + Map.get(query_info, "Count", 0),
            "ScannedCount" => last_scanned_count + Map.get(query_info, "ScannedCount", 0),
            "LastEvaluatedKey" => Map.get(fetch_result, "LastEvaluatedKey")
          }

        _ ->
          query_info
      end

    items =
      case fetch_result do
        %{"Items" => fetch_items} -> fetch_items
        %{"Item" => item} -> [item]
        %{"Responses" => table_map} -> table_map[table]
        _ -> []
      end

    num_updated =
      if items != [] do
        batch_update(repo, table, items, key_list, update_options, set_remove_fields, opts)
      else
        0
      end

    updated_recursive = Ecto.Adapters.DynamoDB.Query.update_recursive_option(recursive)

    if fetch_result["LastEvaluatedKey"] != nil and updated_recursive.continue do
      opts_with_offset = opts ++ [exclusive_start_key: fetch_result["LastEvaluatedKey"]]

      update_all_recursive(
        repo,
        table,
        lookup_fields,
        opts_with_offset,
        update_options,
        key_list,
        set_remove_fields,
        updated_recursive.new_value,
        updated_query_info,
        total_updated + num_updated
      )
    else
      if opts[:query_info_key],
        do: Ecto.Adapters.DynamoDB.QueryInfo.put(opts[:query_info_key], updated_query_info)

      {total_updated + num_updated, []}
    end
  end

  defp batch_update(repo, table, items, key_list, update_options, set_remove_fields, opts) do
    Enum.reduce(items, 0, fn result_to_update, acc ->
      filters = get_key_values_dynamo_map(result_to_update, key_list)

      # we only update this on a case-by-case basis if pull actions
      # without specific indexes are specified
      options_with_set_and_remove =
        case set_remove_fields do
          [] ->
            update_options

          _ ->
            pull_fields_with_indexes =
              Enum.map(set_remove_fields[:pull], fn {field_atom, val} ->
                list = result_to_update[to_string(field_atom)]
                {field_atom, find_all_indexes_in_dynamodb_list(list, val)}
              end)

            merged_pull_indexes =
              Keyword.merge(pull_fields_with_indexes, maybe_list(opts[:pull_indexes]))

            opts_with_pull_indexes =
              Keyword.update(opts, :pull_indexes, merged_pull_indexes, fn _ ->
                merged_pull_indexes
              end)

            update_batch_update_options(
              repo,
              update_options,
              set_remove_fields,
              opts_with_pull_indexes
            )
        end

      # 'options_with_set_and_remove' might not have the key, ':expression_attribute_values',
      # when there are only removal statements.
      record =
        if options_with_set_and_remove[:expression_attribute_values],
          do: [options_with_set_and_remove[:expression_attribute_values] |> Enum.into(%{})],
          else: []

      if options_with_set_and_remove[:update_expression] |> String.trim() != "" do
        Dynamo.update_item(table, filters, options_with_set_and_remove)
        |> ExAws.request(ex_aws_config(repo))
        |> handle_error!(repo, %{table: table, records: record ++ []})

        acc + 1
      else
        acc
      end
    end)
  end

  defp update_batch_update_options(repo, update_options, set_remove_fields, opts) do
    attribute_names =
      construct_expression_attribute_names(Keyword.values(set_remove_fields) |> List.flatten())

    set_and_push_fields =
      maybe_list(set_remove_fields[:set]) ++ maybe_list(set_remove_fields[:push])

    opts_with_push = opts ++ Keyword.take(set_remove_fields, [:push])

    attribute_values =
      construct_expression_attribute_values(repo, set_and_push_fields, opts_with_push)

    set_statement = construct_set_statement(repo, set_remove_fields[:set], opts_with_push)

    opts_for_construct_remove =
      Keyword.take(set_remove_fields, [:pull]) ++
        Keyword.take(opts, [:pull_indexes, :remove_nil_fields])

    remove_statement =
      construct_remove_statement(repo, set_remove_fields[:set], opts_for_construct_remove)

    base_update_options = [
      expression_attribute_names:
        Map.merge(attribute_names, update_options[:expression_attribute_names]),
      update_expression:
        (set_statement <> " " <> remove_statement <> " " <> update_options[:update_expression])
        |> String.trim(),
      return_values: :all_new
    ]

    maybe_add_attribute_values(
      base_update_options,
      attribute_values ++ maybe_list(update_options[:expression_attribute_values])
    )
  end

  # find indexes to remove for update :pull action
  defp find_all_indexes_in_dynamodb_list(dynamodb_list, target) do
    Dynamo.Decoder.decode(dynamodb_list)
    |> Enum.with_index()
    |> Enum.filter(fn {x, _} -> x == target end)
    |> Enum.map(fn {_, i} -> i end)
  end

  # During delete_all's and update_all's recursive
  # procedure, we want to keep the recursion in
  # the top-level, between actions, rather than
  # load all the results into memory and then act;
  # so we disable the recursion on get_item
  defp prepare_recursive_opts(opts) do
    opts |> Keyword.delete(:page_limit) |> Keyword.update(:recursive, false, fn _ -> false end)
  end

  @doc """
  Inserts a single new struct in the data store.

  ## Autogenerate

  The primary key will be automatically included in `returning` if the
  field has type `:id` or `:binary_id` and no value was set by the
  developer or none was autogenerated by the adapter.
  """
  # @callback insert(repo, schema_meta, fields, on_conflict, returning, options) ::
  #                  {:ok, fields} | {:invalid, constraints} | no_return
  #  def insert(_,_,_,_,_) do
  @impl Ecto.Adapter.Schema
  def insert(repo_meta, schema_meta, fields, on_conflict, returning, opts) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.insert", %{
      "#{inspect(__MODULE__)}.insert-params" => %{
        repo_meta: repo_meta,
        schema_meta: schema_meta,
        fields: fields,
        on_conflict: on_conflict,
        returning: returning,
        opts: opts
      }
    })

    do_insert(repo_meta, schema_meta, fields, on_conflict, opts, 0)
  end

  defp do_insert(_repo_meta, _schema_meta, _fields, _on_conflict, _opts, retries)
       when retries >= @max_transaction_conflict_retries do
    raise(
      "#{inspect(__MODULE__)}.insert error: reached maximum transaction conflict retries without success"
    )
  end

  defp do_insert(repo_meta, schema_meta, fields, on_conflict, opts, retries) do
    table = schema_meta.source

    model = schema_meta.schema
    fields_map = Enum.into(fields, %{})

    record = maybe_replace_empty_mapsets_for_insert(fields_map, repo_meta.repo, opts)

    insert_nil_fields = opt_config(:insert_nil_fields, repo_meta.repo, opts, true)
    record = unless insert_nil_fields, do: record, else: build_record_map(model, record)

    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.insert: local variables", %{
      "#{inspect(__MODULE__)}.insert-vars" => %{table: table, record: record}
    })

    {:primary, key_list} = Ecto.Adapters.DynamoDB.Info.primary_key!(repo_meta.repo, table)
    hash_key = hd(key_list)

    replace_all =
      case on_conflict do
        {l, _, _} when is_list(l) ->
          # All fields being set are to be replaced
          Enum.all?(Keyword.keys(fields), &(&1 in l))

        _ ->
          false
      end

    options =
      if replace_all do
        # All fields being replaced: don't use a condition
        []
      else
        attribute_names = for k <- key_list, into: %{}, do: {"##{k}", k}
        conditions = for k <- key_list, do: "attribute_not_exists(##{k})"
        condition_expression = Enum.join(conditions, " and ")

        [
          expression_attribute_names: attribute_names,
          condition_expression: condition_expression
        ]
      end

    case Dynamo.put_item(table, record, options)
         |> ExAws.request(ex_aws_config(repo_meta.repo))
         |> handle_error!(repo_meta.repo, %{table: table, records: [record]}) do
      {:error, "ConditionalCheckFailedException"} ->
        case on_conflict do
          # Per discussion with Jose Valim (https://github.com/elixir-ecto/ecto/issues/2378)
          # clarifying the adapter should return nothing if there is no `:returning` specified,
          # and what we thought was to be returned as a `nil` id, is only for cases where
          # "the field is autogenerated by the database" (https://hexdocs.pm/ecto/Ecto.Repo.html)
          {:nothing, _, _} ->
            {:ok, []}

          {:raise, _, _} ->
            # This constraint name yields the correct behavior in the case the user
            # has specified a unique constraint on the primary key in their schema:
            constraint_name = "#{table}_#{hash_key}_index"
            {:invalid, [unique: constraint_name]}
        end

      {:error, "TransactionConflictException"} ->
        do_insert(repo_meta, schema_meta, fields, on_conflict, opts, retries + 1)

      %{} ->
        {:ok, []}
    end
  end

  @impl Ecto.Adapter.Schema
  def insert_all(
        %{repo: repo},
        schema_meta,
        field_list,
        rows,
        on_conflict,
        return_sources,
        _placeholders,
        opts
      ) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.insert_all", %{
      "#{inspect(__MODULE__)}.insert_all-params" => %{
        repo: repo,
        schema_meta: schema_meta,
        field_list: field_list,
        rows: rows,
        on_conflict: on_conflict,
        return_sources: return_sources,
        opts: opts
      }
    })

    insert_nil_field_option = Keyword.get(opts, :insert_nil_fields, true)

    do_not_insert_nil_fields =
      insert_nil_field_option == false ||
        RepoConfig.config_val(repo, :insert_nil_fields) == false

    table = schema_meta.source
    model = schema_meta.schema

    prepared_rows =
      Enum.map(rows, fn row ->
        mapped_fields =
          row
          |> Enum.into(%{})
          |> maybe_replace_empty_mapsets_for_insert(repo, opts)

        record =
          if do_not_insert_nil_fields,
            do: mapped_fields,
            else: build_record_map(model, mapped_fields)

        [put_request: [item: record]]
      end)

    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.insert_all: local variables", %{
      "#{inspect(__MODULE__)}.insert_all-vars" => %{
        table: table,
        records: get_records_from_fields(prepared_rows)
      }
    })

    batch_write(repo, table, prepared_rows, opts)
  end

  # DynamoDB will reject an entire batch of insert_all() records if there are more than 25 requests.
  # https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
  # batch_write/4 will break the list into chunks of 25 items and insert each separately.
  defp batch_write(repo, table, prepared_fields, opts) do
    unprocessed_items_element = "UnprocessedItems"
    grouped_records = Enum.chunk_every(prepared_fields, @batch_write_item_limit)
    num_batches = length(grouped_records)

    # Break the prepared_fields into chunks of at most 25 elements to be batch inserted, accumulating
    # the total count of records and appropriate results as it loops through the reduce.
    {total_processed, results} =
      grouped_records
      |> Stream.with_index()
      |> Enum.reduce({0, []}, fn {field_group, i},
                                 {running_total_processed, batch_write_results} ->
        {total_batch_processed, batch_write_attempt} =
          handle_batch_write(repo, field_group, table, unprocessed_items_element)

        # Log depth of 11 will capture the full data structure returned in any UnprocessedItems - https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
        ecto_dynamo_log(
          :debug,
          "#{inspect(__MODULE__)}.batch_write #{i + 1} of #{num_batches}: local variables",
          %{
            "#{inspect(__MODULE__)}.insert_all-batch_write" => %{
              table: table,
              field_group: field_group,
              results: batch_write_attempt
            }
          },
          depth: 11
        )

        # We're not retrying unprocessed items yet, but we are providing the relevant info in the QueryInfo agent if :query_info_key is supplied
        if opts[:query_info_key] do
          query_info = extract_query_info(batch_write_attempt)

          Ecto.Adapters.DynamoDB.QueryInfo.update(opts[:query_info_key], [query_info], fn list ->
            list ++ [query_info]
          end)
        end

        {running_total_processed + total_batch_processed,
         batch_write_results ++ [batch_write_attempt]}
      end)

    result_body_for_log = %{
      table => Enum.flat_map(results, fn res -> res[unprocessed_items_element][table] || [] end)
    }

    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.batch_write: batch_write_attempt result", %{
      "#{inspect(__MODULE__)}.insert_all-batch_write" =>
        inspect(%{
          unprocessed_items_element =>
            if(result_body_for_log[table] == [], do: %{}, else: result_body_for_log)
        })
    })

    {total_processed, nil}
  end

  defp handle_batch_write(repo, field_group, table, unprocessed_items_element) do
    results =
      Dynamo.batch_write_item(%{table => field_group})
      |> ExAws.request(ex_aws_config(repo))
      |> handle_error!(repo, %{table: table, records: get_records_from_fields(field_group)})

    if results[unprocessed_items_element] == %{} do
      {length(field_group), results}
    else
      {length(field_group) - length(results[unprocessed_items_element][table]), results}
    end
  end

  defp get_records_from_fields(fields),
    do: Enum.map(fields, fn [put_request: [item: record]] -> record end)

  defp build_record_map(model, fields_to_insert) do
    # Ecto does not convert empty strings to nil before passing them
    # to Repo.insert_all, and ExAws will remove empty strings (as well as empty lists)
    # when building the insertion query but not nil values. We don't mind the removal
    # of empty lists since those cannot be inserted to indexed fields, but we'd like to
    # catch the removal of fields with empty strings by ExAws to support our option, :remove_nil_fields,
    # so we convert these to nil.
    fields = model.__schema__(:fields)
    sources = fields |> Enum.into(%{}, fn f -> {f, model.__schema__(:field_source, f)} end)

    empty_strings_to_nil =
      fields_to_insert
      |> Enum.map(fn {field, val} -> {field, if(val == "", do: nil, else: val)} end)
      |> Enum.into(%{})

    model.__struct__
    |> Map.delete(:__meta__)
    |> Map.from_struct()
    |> Enum.reduce(%{}, fn {k, v}, acc ->
      Map.put(acc, Map.get(sources, k), v)
    end)
    |> Map.merge(empty_strings_to_nil)
  end

  @impl Ecto.Adapter.Schema
  def delete(adapter_meta = %{repo: repo}, schema_meta, filters, _returning, opts) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.delete", %{
      "#{inspect(__MODULE__)}.delete-params" => %{
        adapter_meta: adapter_meta,
        schema_meta: schema_meta,
        filters: filters,
        opts: opts
      }
    })

    do_delete(repo, schema_meta, filters, opts, 0)
  end

  defp do_delete(_repo, _schema_meta, _filters, _opts, retries)
       when retries >= @max_transaction_conflict_retries do
    raise(
      "#{inspect(__MODULE__)}.delete error: reached maximum transaction conflict retries without success"
    )
  end

  defp do_delete(repo, schema_meta, filters, opts, retries) do
    table = schema_meta.source

    updated_filters =
      maybe_update_filters_for_range_key(repo, table, schema_meta, filters, opts, "delete")

    attribute_names = construct_expression_attribute_names(keys_to_atoms(filters))

    base_options = [expression_attribute_names: attribute_names]
    condition_expression = construct_condition_expression(filters)
    options = base_options ++ [condition_expression: condition_expression]

    # 'options' might not have the key, ':expression_attribute_values', when there are only removal statements
    record =
      if options[:expression_attribute_values],
        do: [options[:expression_attribute_values] |> Enum.into(%{})],
        else: []

    case Dynamo.delete_item(table, updated_filters, options)
         |> ExAws.request(ex_aws_config(repo))
         |> handle_error!(repo, %{table: table, records: record ++ []}) do
      %{} ->
        {:ok, []}

      {:error, "ConditionalCheckFailedException"} ->
        {:error, :stale}

      {:error, "TransactionConflictException"} ->
        do_delete(repo, schema_meta, filters, opts, retries + 1)
    end
  end

  @impl Ecto.Adapter.Schema
  def update(adapter_meta = %{repo: repo}, schema_meta, fields, filters, returning, opts) do
    ecto_dynamo_log(:debug, "#{inspect(__MODULE__)}.update", %{
      "#{inspect(__MODULE__)}.update-params" => %{
        adapter_meta: adapter_meta,
        schema_meta: schema_meta,
        fields: fields,
        filters: filters,
        returning: returning,
        opts: opts
      }
    })

    do_update(repo, schema_meta, fields, filters, returning, opts, 0)
  end

  defp do_update(_repo, _schema_meta, _fields, _filters, _returning, _opts, retries)
       when retries >= @max_transaction_conflict_retries do
    raise(
      "#{inspect(__MODULE__)}.update error: reached maximum transaction conflict retries without success"
    )
  end

  defp do_update(repo, schema_meta, fields, filters, returning, opts, retries) do
    table = schema_meta.source

    updated_filters =
      maybe_update_filters_for_range_key(repo, table, schema_meta, filters, opts, "update")

    fields = maybe_replace_empty_mapsets_for_update(fields, repo, opts)

    update_expression = construct_update_expression(repo, fields, opts)
    # add updated_filters to attribute_ names and values for condition_expression
    attribute_names = construct_expression_attribute_names(fields ++ keys_to_atoms(filters))
    attribute_values = construct_expression_attribute_values(repo, fields, opts)

    base_options = [
      expression_attribute_names: attribute_names,
      update_expression: update_expression
    ]

    condition_expression = construct_condition_expression(filters)

    options =
      maybe_add_attribute_values(base_options, attribute_values) ++
        [condition_expression: condition_expression]

    # 'options' might not have the key, ':expression_attribute_values', when there are only removal statements
    record =
      if options[:expression_attribute_values],
        do: [options[:expression_attribute_values] |> Enum.into(%{})],
        else: []

    case Dynamo.update_item(table, updated_filters, options)
         |> ExAws.request(ex_aws_config(repo))
         |> handle_error!(repo, %{table: table, records: record ++ []}) do
      %{} ->
        {:ok, []}

      {:error, "ConditionalCheckFailedException"} ->
        {:error, :stale}

      {:error, "TransactionConflictException"} ->
        do_update(repo, schema_meta, fields, filters, returning, opts, retries + 1)
    end
  end

  # Support for tables with a hash+range key.
  #
  #  * If the schema has both keys declared (using the `primary_key: true`) the filters are already correct
  #  * If :range_key is specified with a value it is added to filters
  #  * If :range_key is not specified, and the table does have a range key, attempt to find it with a DynamoDB query
  #
  defp maybe_update_filters_for_range_key(repo, table, schema_meta, filters, opts, action) do
    with primary_key_length <- length(schema_meta.schema.__schema__(:primary_key)) do
      case opts[:range_key] do
        # Use primary keys declared in schema
        nil when primary_key_length == 2 ->
          filters

        nil ->
          {:primary, key_list} = Ecto.Adapters.DynamoDB.Info.primary_key!(repo, table)

          if length(key_list) > 1 do
            updated_opts = opts ++ [projection_expression: Enum.join(key_list, ", ")]

            filters_as_strings =
              for {field, val} <- filters, do: {Atom.to_string(field), {val, :==}}

            fetch_result =
              Ecto.Adapters.DynamoDB.Query.get_item(repo, table, filters_as_strings, updated_opts)

            items =
              case fetch_result do
                %{"Items" => fetch_items} -> fetch_items
                %{"Item" => item} -> [item]
                _ -> []
              end

            if items == [],
              do:
                raise(
                  "#{inspect(__MODULE__)}.#{action} error: no results found for record: #{inspect(filters)}"
                )

            if length(items) > 1,
              do:
                raise(
                  "#{inspect(__MODULE__)}.#{action} error: more than one result found for record: #{inspect(filters)} Please consider using the adapter's :range_key custom inline option (see README)."
                )

            for {field, key_map} <- Map.to_list(hd(items)) do
              [{_field_type, val}] = Map.to_list(key_map)
              {field, val}
            end
          else
            filters
          end

        range_key ->
          [range_key | filters]
      end
    end
  end

  defp keys_to_atoms(list),
    do: for({k, v} <- list, do: {maybe_string_to_atom(k), v})

  defp maybe_string_to_atom(s),
    do: if(is_binary(s), do: String.to_atom(s), else: s)

  defp construct_condition_expression(filters) when is_list(filters) do
    Keyword.keys(filters)
    |> Enum.map(fn field -> "attribute_exists(##{to_string(field)})" end)
    |> Enum.join(" AND ")
  end

  defp extract_query_info(result),
    do:
      result
      |> Map.take([
        "Count",
        "ScannedCount",
        "LastEvaluatedKey",
        "UnprocessedItems",
        "UnprocessedKeys"
      ])

  # Used in update_all
  defp extract_update_params([], _action_atom, _params), do: []

  defp extract_update_params([%{expr: key_list}], action_atom, params) do
    case key_list[action_atom] do
      nil ->
        []

      action_list ->
        for s <- action_list do
          {field_atom, {:^, _, [idx]}} = s
          {field_atom, Enum.at(params, idx)}
        end
    end
  end

  defp extract_update_params([a], _action_atom, _params),
    do:
      error(
        "#{inspect(__MODULE__)}.extract_update_params: Updates is either missing the :expr key or does not contain a struct or map: #{inspect(a)}"
      )

  defp extract_update_params(unsupported, _action_atom, _params),
    do:
      error(
        "#{inspect(__MODULE__)}.extract_update_params: unsupported parameter construction. #{inspect(unsupported)}"
      )

  # Ecto does not support push pull for types other than array.
  # Therefore, we enable add and delete via opts
  defp extract_update_params(key_list, action_atom) do
    case key_list[action_atom] do
      nil -> []
      action_list -> action_list
    end
  end

  # used in :update_all
  defp get_key_values_dynamo_map(dynamo_map, {:primary, keys}) do
    for k <- keys, do: {String.to_atom(k), Dynamo.Decoder.decode(dynamo_map[k])}
  end

  defp construct_expression_attribute_names(fields) do
    for {f, _} <- fields, into: %{}, do: {"##{Atom.to_string(f)}", Atom.to_string(f)}
  end

  defp construct_expression_attribute_values(repo, fields, opts) do
    remove_rather_than_set_to_null =
      opts[:remove_nil_fields] || opts[:remove_nil_fields_on_update] ||
        RepoConfig.config_val(repo, :remove_nil_fields_on_update) == true

    # If the value is nil and the :remove_nil_fields option is set,
    # we're removing this attribute, not updating it, so filter out any such fields:

    fields
    |> Enum.reduce([], &format_update_field(&1, &2, remove_rather_than_set_to_null, opts))
    |> Enum.filter(fn {x, _} -> not Keyword.has_key?(maybe_list(opts[:pull]), x) end)
  end

  defp format_update_field({_k, nil}, acc, true, _opts), do: acc
  defp format_update_field({k, v}, acc, true, opts), do: [{k, format_val(k, v, opts)} | acc]

  defp format_update_field({k, v}, acc, false, opts),
    do: [{k, format_nil_or_val(k, v, opts)} | acc]

  defp maybe_list(l) when is_list(l), do: l
  defp maybe_list(_), do: []

  defp format_nil_or_val(_k, nil, _opts), do: %{"NULL" => "true"}
  defp format_nil_or_val(k, v, opts), do: format_val(k, v, opts)

  defp format_val(k, v, opts) do
    case opts[:push][k] do
      nil -> v
      _ -> [v]
    end
  end

  # DynamoDB throws an error if we pass in an empty list for attribute values,
  # so we have to implement this stupid little helper function to avoid hurting its feelings:
  defp maybe_add_attribute_values(options, []) do
    options
  end

  defp maybe_add_attribute_values(options, attribute_values) do
    [expression_attribute_values: attribute_values] ++ options
  end

  defp construct_update_expression(_repo, updates, params, opts) do
    to_set = extract_update_params(updates, :set, params)
    to_push = extract_update_params(updates, :push, params)
    to_pull = extract_update_params(updates, :pull, params)
    to_add = extract_update_params(opts, :add) ++ extract_update_params(updates, :inc, params)
    to_delete = extract_update_params(opts, :delete)

    {(construct_add_statement(to_add, opts) <>
        " " <>
        construct_delete_statement(to_delete, opts))
     |> String.trim(), to_add ++ to_delete, [set: to_set, push: to_push, pull: to_pull]}
  end

  # The update callback supplies fields in the parameters
  # whereas update_all includes a more complicated updates structure
  defp construct_update_expression(repo, fields, opts) do
    set_statement = construct_set_statement(repo, fields, opts)
    rem_statement = construct_remove_statement(repo, fields, opts)

    String.trim("#{set_statement} #{rem_statement}")
  end

  # fields::[{:field, val}]
  defp construct_set_statement(repo, fields, opts) do
    remove_rather_than_set_to_null =
      opts[:remove_nil_fields] || opts[:remove_nil_fields_on_update] ||
        RepoConfig.config_val(repo, :remove_nil_fields_on_update) == true

    set_clauses =
      for {key, val} <- fields, not (is_nil(val) and remove_rather_than_set_to_null) do
        key_str = Atom.to_string(key)
        "##{key_str}=:#{key_str}"
      end ++
        case opts[:push] do
          nil ->
            []

          push_list ->
            for {key, _val} <- push_list do
              key_str = Atom.to_string(key)

              if Enum.member?(maybe_list(opts[:prepend_to_list]), key),
                do: "##{key_str} = list_append(:#{key_str}, ##{key_str})",
                else: "##{key_str} = list_append(##{key_str}, :#{key_str})"
            end
        end

    case set_clauses do
      [] ->
        ""

      _ ->
        "SET " <> Enum.join(set_clauses, ", ")
    end
  end

  defp construct_remove_statement(repo, fields, opts) do
    remove_rather_than_set_to_null =
      opts[:remove_nil_fields] || opts[:remove_nil_fields_on_update] ||
        RepoConfig.config_val(repo, :remove_nil_fields_on_update) == true

    # Ecto :pull update can be emulated provided
    # we are given an index to remove in opts[:pull_indexes]
    remove_clauses =
      if remove_rather_than_set_to_null do
        for {key, val} <- fields, is_nil(val), do: "##{Atom.to_string(key)}"
      else
        []
      end ++
        cond do
          !opts[:pull_indexes] or Keyword.values(opts[:pull_indexes]) |> List.flatten() == [] ->
            []

          opts[:pull] == nil ->
            []

          true ->
            for {key, _val} <- opts[:pull] do
              key_str = Atom.to_string(key)

              Enum.map(opts[:pull_indexes][key], fn index -> "##{key_str}[#{index}]" end)
              |> Enum.join(", ")
            end
        end

    case remove_clauses do
      [] ->
        ""

      _ ->
        "REMOVE " <> Enum.join(remove_clauses, ", ")
    end
  end

  # fields::[{:field, val}]
  defp construct_add_statement(fields, _opts) do
    add_clauses =
      for {key, _val} <- fields do
        key_str = Atom.to_string(key)
        "##{key_str} :#{key_str}"
      end

    case add_clauses do
      [] ->
        ""

      _ ->
        "ADD " <> Enum.join(add_clauses, ", ")
    end
  end

  defp construct_delete_statement(fields, _opts) do
    delete_clauses =
      for {key, _val} <- fields do
        key_str = Atom.to_string(key)
        "##{key_str} :#{key_str}"
      end

    case delete_clauses do
      [] ->
        ""

      _ ->
        "DELETE " <> Enum.join(delete_clauses, ", ")
    end
  end

  defp validate_where_clauses!(query) do
    for w <- query.wheres do
      validate_where_clause!(w)
    end
  end

  defp validate_where_clause!(%BooleanExpr{expr: {op, _, _}})
       when op in [:==, :<, :>, :<=, :>=, :in],
       do: :ok

  defp validate_where_clause!(%BooleanExpr{expr: {logical_op, _, _}})
       when logical_op in [:and, :or],
       do: :ok

  defp validate_where_clause!(%BooleanExpr{expr: {:is_nil, _, _}}), do: :ok
  defp validate_where_clause!(%BooleanExpr{expr: {:fragment, _, _}}), do: :ok

  defp validate_where_clause!(unsupported),
    do: error("unsupported where clause: #{inspect(unsupported)}")

  # We are parsing a nested, recursive structure of the general type:
  # %{:logical_op, list_of_clauses} | %{:conditional_op, field_and_value}
  defp extract_lookup_fields([], _params, lookup_fields), do: lookup_fields

  defp extract_lookup_fields([query | queries], params, lookup_fields) do
    # A logical operator tuple does not always have a parent 'expr' key.
    maybe_extract_from_expr =
      case query do
        %BooleanExpr{expr: expr} -> expr
        # TODO: could there be other cases?
        _ -> query
      end

    case maybe_extract_from_expr do
      # A logical operator points to a list of conditionals
      {op, _, [left, right]} when op in [:==, :<, :>, :<=, :>=, :in] ->
        {field, value} = get_op_clause(left, right, params)

        updated_lookup_fields =
          case List.keyfind(lookup_fields, field, 0) do
            # we assume the most ops we can apply to one field is two, otherwise this might throw an error
            {field, {old_val, old_op}} ->
              List.keyreplace(lookup_fields, field, 0, {field, {[value, old_val], [op, old_op]}})

            _ ->
              [{field, {value, op}} | lookup_fields]
          end

        extract_lookup_fields(queries, params, updated_lookup_fields)

      # Logical operator expressions have more than one op clause
      # We are matching queries of the type: 'from(p in Person, where: p.email == "g@email.com" and p.first_name == "George")'
      # But not of the type: 'from(p in Person, where: [email: "g@email.com", first_name: "George"])'
      #
      # A logical operator is a member of a list
      {logical_op, _, clauses} when logical_op in [:and, :or] ->
        deeper_lookup_fields = extract_lookup_fields(clauses, params, [])

        extract_lookup_fields(queries, params, [
          {logical_op, deeper_lookup_fields} | lookup_fields
        ])

      {:fragment, _, raw_expr_mixed_list} ->
        parsed_fragment = parse_raw_expr_mixed_list(raw_expr_mixed_list, params)
        extract_lookup_fields(queries, params, [parsed_fragment | lookup_fields])

      # We perform a post-query is_nil filter on indexed fields and have DynamoDB filter
      # for nil non-indexed fields (although post-query nil-filters on (missing) indexed
      # attributes could only find matches when the attributes are not the range part of
      # a queried partition key (hash part) since those would not return the sought records).
      {:is_nil, _, [arg]} ->
        {{:., _, [_, field_name]}, _, _} = arg

        # We give the nil value a string, "null", since it will be mapped as a DynamoDB attribute_expression_value
        extract_lookup_fields(queries, params, [
          {to_string(field_name), {"null", :is_nil}} | lookup_fields
        ])

      _ ->
        extract_lookup_fields(queries, params, lookup_fields)
    end
  end

  # Specific (as opposed to generalized) parsing for Ecto :fragment - the only use for it
  # so far is 'between' which is the only way to query 'between' on an indexed field since
  # those accept only single conditions.
  #
  # Example with values as strings: [raw: "", expr: {{:., [], [{:&, [], [0]}, :person_id]}, [], []}, raw: " between ", expr: "person:a", raw: " and ", expr: "person:f", raw: ""]
  #
  # Example with values as part of the string itself: [raw: "", expr: {{:., [], [{:&, [], [0]}, :person_id]}, [], []}, raw: " between person:a and person:f"]
  #
  # Example with values in params: [raw: "", expr: {{:., [], [{:&, [], [0]}, :person_id]}, [], []}, raw: " between ", expr: {:^, [], [0]}, raw: " and ", expr: {:^, [], [1]}, raw: ""]
  #
  defp parse_raw_expr_mixed_list(raw_expr_mixed_list, params) do
    # group the expression into fields, values, and operators,
    # only supporting the example with values in params
    case raw_expr_mixed_list do
      # between
      [
        raw: _,
        expr: {{:., [], [{:&, [], [0]}, field_atom]}, [], []},
        raw: between_str,
        expr: {:^, [], [idx1]},
        raw: and_str,
        expr: {:^, [], [idx2]},
        raw: _
      ] ->
        if not Regex.match?(~r/^\s*between\s*and\s*$/i, between_str <> and_str),
          do: parse_raw_expr_mixed_list_error(raw_expr_mixed_list)

        {to_string(field_atom), {[Enum.at(params, idx1), Enum.at(params, idx2)], :between}}

      # begins_with
      [
        raw: begins_with_str,
        expr: {{:., [], [{:&, [], [0]}, field_atom]}, [], []},
        raw: comma_str,
        expr: {:^, [], [idx]},
        raw: closing_parenthesis_str
      ] ->
        if not Regex.match?(
             ~r/^\s*begins_with\(\s*,\s*\)\s*$/i,
             begins_with_str <> comma_str <> closing_parenthesis_str
           ),
           do: parse_raw_expr_mixed_list_error(raw_expr_mixed_list)

        {to_string(field_atom), {Enum.at(params, idx), :begins_with}}

      _ ->
        parse_raw_expr_mixed_list_error(raw_expr_mixed_list)
    end
  end

  defp parse_raw_expr_mixed_list_error(raw_expr_mixed_list),
    do:
      raise(
        "#{inspect(__MODULE__)}.parse_raw_expr_mixed_list parse error. We currently only support the Ecto fragments of the form, 'where: fragment(\"? between ? and ?\", FIELD_AS_VARIABLE, VALUE_AS_VARIABLE, VALUE_AS_VARIABLE)'; and 'where: fragment(\"begins_with(?, ?)\", FIELD_AS_VARIABLE, VALUE_AS_VARIABLE)'. Received: #{inspect(raw_expr_mixed_list)}"
      )

  defp get_op_clause(left, right, params) do
    field = left |> get_field |> Atom.to_string()
    value = get_value(right, params)
    {field, value}
  end

  defp get_field({{:., _, [{:&, _, [0]}, field]}, _, []}), do: field

  defp get_field(other_clause) do
    error("Unsupported where clause, left hand side: #{other_clause}")
  end

  defp get_value({:^, _, [idx]}, params), do: Enum.at(params, idx)
  # Handle queries with variable values, ex. Repo.all from i in Item, where: i.id in ^item_ids
  # The last element of the tuple (first arg) will be a list with two numbers;
  # the first number will be the number of attributes to be updated (in the event of an update_all query with a variable list)
  # and the second will be a count of the number of elements in the variable list being queried. For example:
  #
  # query = from p in Person, where: p.id in ^ids
  # TestRepo.update_all(query, set: [password: "cheese", last_name: "Smith"])
  #
  # assuming that ids contains 4 values, the last element would be [2, 4].
  # Use this data to modify the params, which would otherwise include the values to be updated as well, which we don't want to query on.
  defp get_value({:^, _, [num_update_terms, _num_query_terms]}, params),
    do: Enum.drop(params, num_update_terms)

  # Seems to be necessary for handling running a batch of migrations down.
  defp get_value(%{value: right}, params), do: get_value(right, params)
  # Handle .all(query) queries
  defp get_value(other_clause, _params), do: other_clause

  defp error(msg) do
    raise ArgumentError, message: msg
  end

  defp construct_types_from_select_fields(%Ecto.Query.SelectExpr{expr: expr}) do
    case expr do
      {:{}, [], clauses = [{{:., [type: _type], [{:&, [], [0]}, _field]}, [], []} | _]} ->
        for {{:., [type: type], [{:&, [], [0]}, field]}, [], []} <- clauses, do: {field, type}

      {_, _, [0]} ->
        []

      {{:., [type: type], [{_, _, _}, field]}, _, _} ->
        [{field, type}]

      clauses = [_ | _] ->
        for {{_, [type: type], [{_, _, _}, field]}, _, _} <- clauses, do: {field, type}
    end
  end

  def decode_item(item, types, repo, opts) do
    types
    |> Enum.map(fn {field, type} ->
      Map.get(item, Atom.to_string(field), %{"NULL" => true})
      |> Dynamo.Decoder.decode()
      |> decode_type(type, repo, opts)
    end)
  end

  def decode_item(%{"version" => version}, _repo, _opts) do
    [version |> Dynamo.Decoder.decode()]
  end

  # Decodes datetime, seemingly unhandled by ExAws Dynamo decoder
  defp decode_type(nil, DynamoDBSet, repo, opts), do: maybe_replace_nil_mapset(repo, opts)

  defp decode_type(nil, _type, _repo, _opts), do: nil

  defp decode_type(val, type, _repo, _opts) when type in [:utc_datetime_usec, :utc_datetime] do
    {:ok, dt, _offset} = DateTime.from_iso8601(val)
    dt
  end

  defp decode_type(val, type, _repo, _opts) when type in [:naive_datetime_usec, :naive_datetime],
    do: NaiveDateTime.from_iso8601!(val)

  # Support for Ecto >= 3.5
  defp decode_type(val, {:parameterized, Ecto.Embedded, _} = type, _repo, _opts),
    do: decode_embed(val, type)

  # Support for Ecto 3.0-3.4
  defp decode_type(val, {:embed, _} = type, _repo, _opts), do: decode_embed(val, type)

  defp decode_type(val, _type, _repo, _opts), do: val

  defp decode_embed(val, type) do
    case Ecto.Type.embedded_load(type, val, :json) do
      {:ok, decoded_value} ->
        handle_decoded_embeded(decoded_value)

      :error ->
        ecto_dynamo_log(
          :debug,
          "#{inspect(__MODULE__)}.decode_embed: failed to decode embedded value: #{inspect(val)}"
        )

        nil
    end
  end

  defp handle_decoded_embeded(embedded) when is_list(embedded),
    do: Enum.map(embedded, &unload_parameterized_fields/1)

  defp handle_decoded_embeded(embedded), do: unload_parameterized_fields(embedded)

  # Rebuilds the embedded schema unloading the parameterized fields, so the parameterized
  # type can load it in the ecto schema.
  defp unload_parameterized_fields(%schema{} = embedded) do
    fields = schema.__schema__(:fields)

    Enum.reduce(fields, embedded, fn field, acc ->
      field_type = schema.__schema__(:type, field)
      field_value = Map.get(embedded, field)

      Map.put(acc, field, maybe_dump_field(field_value, field_type))
    end)
  end

  defp maybe_dump_field(val, {:parameterized, _type, _params} = field_type) do
    {:ok, unloaded_value} = Ecto.Type.dump(field_type, val)
    unloaded_value
  end

  defp maybe_dump_field(val, _field_type), do: val

  # We found one instance where DynamoDB's error message could
  # be more instructive - when trying to set an indexed field to something
  # other than a string or number - so we're adding a more helpful message.
  # The parameter, 'params', has the type %{table: :string, records: [:map]}
  defp handle_error!({:ok, result}, _repo, _params), do: result

  defp handle_error!({:error, {error_name, _} = error}, repo, params) do
    # Check for inappropriate insert into indexed field
    indexed_fields = Ecto.Adapters.DynamoDB.Info.indexed_attributes(repo, params.table)

    # Repo.insert_all can present multiple records at once
    forbidden_insert_on_indexed_field =
      Enum.reduce(params.records, false, fn record, acc ->
        acc ||
          Enum.any?(record, fn {field, val} ->
            [type] = Dynamo.Encoder.encode(val) |> Map.keys()

            # Ecto does not convert Empty strings to nil before passing them to Repo.update_all or
            # Repo.insert_all DynamoDB provides an instructive message during an update (forwarded by ExAws),
            # but less so for batch_write_item, so we catch the empty string as well.
            # Dynamo does not allow insertion of empty strings in any case.
            (Enum.member?(indexed_fields, to_string(field)) and type not in ["S", "N"]) ||
              val == ""
          end)
      end)

    cond do
      # we use this error to check if an update or delete record does not exist
      error_name in ["ConditionalCheckFailedException", "TransactionConflictException"] ->
        {:error, error_name}

      forbidden_insert_on_indexed_field ->
        raise "The following request error could be related to attempting to insert an empty string or attempting to insert a type other than a string or number on an indexed field. Indexed fields: #{inspect(indexed_fields)}. Records: #{inspect(params.records)}.\n\nExAws Request Error! #{inspect(error)}"

      true ->
        raise ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"
    end
  end

  @doc """
  Logs message to console and optionally to file. Log levels, colours and file path may be set in configuration (details in README.md).
  """
  def ecto_dynamo_log(level, message, attributes \\ %{}, opts \\ []) do
    if Confex.get_env(:ecto_adapters_dynamodb, :use_logger) do
      Logger.log(level, message, attributes)
    else
      write_console_log(level, message, attributes, opts)
    end
  end

  defp write_console_log(level, message, attributes, opts) do
    log_levels = Confex.get_env(:ecto_adapters_dynamodb, :log_levels) || [:info]

    if level in log_levels do
      log_path = Confex.get_env(:ecto_adapters_dynamodb, :log_path)
      depth = opts[:depth] || 4
      colours = Confex.get_env(:ecto_adapters_dynamodb, :log_colours)
      d = DateTime.utc_now()

      formatted_message =
        "#{d.year}-#{d.month}-#{d.day} #{d.hour}:#{d.minute}:#{d.second} UTC [Ecto dynamo #{level}] #{inspect(message)}"

      {:ok, log_message} =
        Jason.encode(%{message: formatted_message, attributes: chisel(attributes, depth)})

      if Confex.get_env(:ecto_adapters_dynamodb, :log_in_colour) do
        IO.ANSI.format([colours[level] || :normal, log_message], true) |> IO.puts()
      else
        log_message |> IO.puts()
      end

      if String.valid?(log_path) and Regex.match?(~r/\S/, log_path),
        do: log_pipe(log_path, log_message)
    end
  end

  def ex_aws_config(repo) do
    config = Resolver.resolve!(repo.config())

    config
    |> Keyword.take([:debug_requests, :access_key_id, :secret_access_key, :region])
    |> Keyword.merge(Keyword.get(config, :dynamodb, []))
  end

  defp chisel(str, _depth) when is_binary(str),
    do: if(String.valid?(str), do: str, else: Base.encode64(str))

  defp chisel(num, _depth) when is_number(num), do: num
  defp chisel(any, _depth) when not is_map(any) and not is_list(any), do: inspect(any)
  defp chisel(_, 0), do: "beyond_log_depth"
  defp chisel(%{__struct__: _} = struct, _depth), do: inspect(struct)

  defp chisel(map, depth) when is_map(map) do
    for {k, v} <- map, into: %{}, do: {k, chisel(v, depth - 1)}
  end

  defp chisel(list, depth) when is_list(list) do
    for e <- list, do: chisel(e, depth - 1)

    # Stream.with_index(list) |> Enum.reduce(%{}, fn({v,k}, acc)-> Map.put(acc, k, chisel(v, depth - 1)) end)
  end

  defp log_pipe(path, str) do
    {:ok, file} = File.open(path, [:append])
    IO.binwrite(file, str)
    File.close(file)
  end

  defp opt_config(key, repo, opts, default \\ false) do
    case Keyword.get(opts, key) do
      nil -> RepoConfig.config_val(repo, key, default)
      x -> x
    end
  end

  defp maybe_replace_empty_mapsets_for_insert(record, repo, opts) do
    empty_mapset_to_nil = opt_config(:empty_mapset_to_nil, repo, opts)
    insert_nil = opt_config(:insert_nil_fields, repo, opts, true)

    cond do
      empty_mapset_to_nil and insert_nil ->
        record
        |> Enum.map(fn {k, v} -> {k, empty_mapset_to_nil(v)} end)
        |> Enum.into(%{})

      empty_mapset_to_nil ->
        record
        |> Enum.reject(fn {_k, v} -> v == MapSet.new() end)
        |> Enum.into(%{})

      true ->
        record
    end
  end

  defp maybe_replace_empty_mapsets_for_update(record, repo, opts) do
    if opt_config(:empty_mapset_to_nil, repo, opts) do
      record
      |> Enum.map(fn {k, v} -> {k, empty_mapset_to_nil(v)} end)
    else
      record
    end
  end

  defp empty_mapset_to_nil(%MapSet{} = m), do: if(MapSet.size(m) == 0, do: nil, else: m)

  defp empty_mapset_to_nil(x), do: x

  defp maybe_replace_nil_mapset(repo, opts) do
    if opt_config(:nil_to_empty_mapset, repo, opts) do
      MapSet.new()
    else
      nil
    end
  end
end