lib/ex_aws/dynamo.ex

defmodule ExAws.Dynamo do
  @moduledoc """
  Operations on the AWS DynamoDB service.

  NOTE: When Mix.env in [:test, :dev], Dynamo clients will run by default against
  DynamoDB local.

  ## Basic usage
  ```elixir
  defmodule User do
    @derive [ExAws.Dynamo.Encodable]
    defstruct [:email, :name, :age, :admin]
  end

  alias ExAws.Dynamo

  # Create a provisioned users table with a primary key of email [String]
  # and 1 unit of read and write capacity
  Dynamo.create_table("Users", "email", %{email: :string}, 1, 1)
  |> ExAws.request!

  user = %User{email: "bubba@foo.com", name: "Bubba", age: 23, admin: false}
  # Save the user
  Dynamo.put_item("Users", user) |> ExAws.request!

  # Retrieve the user by email and decode it as a User struct.
  result = Dynamo.get_item("Users", %{email: user.email})
  |> ExAws.request!
  |> Dynamo.decode_item(as: User)

  assert user == result
  ```

  ## General notes
  All options are handled as underscored atoms instead of camelcased binaries as specified
  in the Dynamo API, e.g. `IndexName` would be `:index_name`. Anywhere in the API that requires
  Dynamo type annotation (`{"S":"mystring"}`) is handled for you automatically. For example,

  ```elixir
  ExAws.Dynamo.scan("Users", expression_attribute_values: [api_key: "foo"])
  ```
  transforms into a query of
  ```elixir
  %{"ExpressionAttributeValues" => %{api_key: %{"S" => "foo"}}, "TableName" => "Users"}
  ```

  Consult the function documentation to see precisely which options are handled this way.

  If you wish to avoid this kind of automatic behaviour, you are free to specify the types yourself.
  For example,
  ```elixir
  ExAws.Dynamo.scan("Users", expression_attribute_values: [api_key: %{"B" => "Treated as binary"}])
  ```
  becomes
  ```elixir
  %{"ExpressionAttributeValues" => %{api_key: %{"B" => "Treated as binary"}}, "TableName" => "Users"}
  ```
  Alternatively, if what's being encoded is a struct, you're always free to implement ExAws.Dynamo.Encodable for that struct.

  https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations.html
  """

  import ExAws.Utils, only: [camelize: 1, camelize_keys: 1, camelize_keys: 2, upcase: 1]
  alias __MODULE__
  alias ExAws.Dynamo.{Decoder, Lazy}
  alias ExAws.Operation.JSON

  @nested_opts [:exclusive_start_key, :expression_attribute_values, :expression_attribute_names]
  @upcase_opts [
    :return_values,
    :return_item_collection_metrics,
    :return_values_on_condition_check_failure,
    :select,
    :total_segments
  ]
  @top_level_update_fields [
    :attribute_definitions,
    :billing_mode,
    :global_indexes,
    :global_secondary_index_updates,
    :local_indexes,
    :provisioned_throughput,
    :read_capacity,
    :write_capacity
  ]
  @special_opts @nested_opts ++ @upcase_opts
  @default_billing_mode :provisioned
  @default_read_capacity 10
  @default_write_capacity 10

  @namespace "DynamoDB_20120810"

  ## Tables
  ######################

  @type table_name :: binary
  @type primary_key :: [{atom, binary}] | %{atom => binary}
  @type exclusive_start_key_vals :: [{atom, binary}] | %{atom => binary}
  @type expression_attribute_names_vals :: %{binary => binary}
  @type expression_attribute_values_vals ::
          [{atom, Dynamo.Encodable.t()}] | %{atom => Dynamo.Encodable.t()}
  @type return_consumed_capacity_vals ::
          :none
          | :total
          | :indexes
  @type select_vals ::
          :all_attributes
          | :all_projected_attributes
          | :specific_attributes
          | :count
  @type return_values_vals ::
          :none
          | :all_old
          | :updated_old
          | :all_new
          | :updated_new
  @type return_item_collection_metrics_vals ::
          :size
          | :none
  @type dynamo_type_names ::
          :blob
          | :boolean
          | :blob_set
          | :list
          | :map
          | :number_set
          | :null
          | :number
          | :string
          | :string_set
  @type dynamo_billing_types ::
          :pay_per_request
          | :provisioned
  @type key_schema :: [{atom | binary, :hash | :range}, ...]
  @type key_definitions :: [{atom | binary, dynamo_type_names}, ...]
  @type create_table_opts :: [
          {:global_indexes, [map()]},
          {:local_indexes, [map()]},
          {:read_capacity, pos_integer},
          {:write_capacity, pos_integer},
          {:billing_mode, dynamo_billing_types},
          {:stream_enabled, boolean()},
          {:stream_view_type, stream_view_type()}
        ]
  @type stream_view_type :: :keys_only | :new_image | :old_image | :new_and_old_images
  @type update_table_opts :: create_table_opts()

  @doc """
  Decode an item returned from Dynamo. This will handle items wrapped in the ordinary
  `get_item` response map of `%{"Item" => item}`.

  ## Example
  ```elixir
  Dynamo.get_item("users", %{id: "asdf"})
  |> ExAws.request!
  |> Dynamo.decode_item(as: User)
  ```
  """
  @spec decode_item(map()) :: map() | [map()]
  @spec decode_item(map(), as: atom) :: map() | [map()]
  def decode_item(item, opts \\ [])

  def decode_item(%{"Items" => items}, opts) do
    for item <- items, do: decode_item(item, opts)
  end

  def decode_item(%{"Item" => item}, opts) do
    decode_item(item, opts)
  end

  def decode_item(item, opts) do
    Decoder.decode(item, opts)
  end

  @doc "List tables"
  @spec list_tables() :: JSON.t()
  def list_tables do
    request(:list_tables, %{})
  end

  @doc """
  Create table

  `key_schema` can be a simple binary or atom indicating a simple hash key.

  `billing_mode` may be either `:provisioned` (default) or `:pay_per_request`.
  If you are creating a `:pay-per-request` table, you will still need to provide values for read and write capacities,
  although they will be ignored - you may consider providing `nil` in those cases.
  """
  @spec create_table(
          table_name :: binary,
          key_schema :: binary | atom | key_schema,
          key_definitions :: key_definitions,
          read_capacity :: pos_integer,
          write_capacity :: pos_integer,
          billing_mode :: dynamo_billing_types
        ) :: JSON.t()
  def create_table(
        name,
        primary_key,
        key_definitions,
        read_capacity,
        write_capacity,
        billing_mode \\ @default_billing_mode
      )

  def create_table(
        name,
        primary_key,
        key_definitions,
        read_capacity,
        write_capacity,
        billing_mode
      )
      when is_atom(primary_key) or is_binary(primary_key) do
    create_table(
      name,
      [{primary_key, :hash}],
      key_definitions,
      read_capacity,
      write_capacity,
      billing_mode
    )
  end

  def create_table(
        name,
        key_schema,
        key_definitions,
        read_capacity,
        write_capacity,
        billing_mode
      )
      when is_list(key_schema) do
    create_table(
      name,
      key_schema,
      key_definitions,
      read_capacity,
      write_capacity,
      [],
      [],
      billing_mode
    )
  end

  @doc """
  Create table with secondary indices

  Each index should follow the format outlined here: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_CreateTable.html

  For convenience, the keys in each index map are allowed to be atoms. e.g:
  `"KeySchema"` in the aws docs can be `key_schema:`

  Note that both the `global_indexes` and `local_indexes` arguments expect a list of such indices.

  `billing_mode` may be either `:provisioned` (default) or `:pay_per_request`.
  If you are creating a `:pay-per-request` table, you will still need to provide values for read and write capacities,
  although they will be ignored - you may consider providing `nil` in those cases.

  Examples
  ```
  secondary_index = [%{
    index_name: "my-global-index",
    key_schema: [%{
      attribute_name: "email",
      key_type: "HASH",
    }],
    provisioned_throughput: %{
      read_capacity_units: 1,
      write_capacity_units: 1,
    },
    projection: %{
      projection_type: "KEYS_ONLY",
    }
  }]
  create_table("TestUsers", [id: :hash], %{id: :string, email: :string}, 1, 1, secondary_index, [])
  ```
  """
  @spec create_table(
          table_name :: binary,
          key_schema :: key_schema,
          key_definitions :: key_definitions,
          read_capacity :: pos_integer,
          write_capacity :: pos_integer,
          global_indexes :: [map()],
          local_indexes :: [map()],
          billing_mode :: dynamo_billing_types
        ) :: JSON.t()
  def create_table(
        name,
        key_schema,
        key_definitions,
        read_capacity,
        write_capacity,
        global_indexes,
        local_indexes,
        billing_mode \\ @default_billing_mode
      ) do
    create_table(name, key_schema, key_definitions,
      read_capacity: read_capacity,
      write_capacity: write_capacity,
      global_indexes: global_indexes,
      local_indexes: local_indexes,
      billing_mode: billing_mode
    )
  end

  @doc """
  Create table with arbitrary options. This provides a superset of the functionality of other
  `create_table` variants, allowing the specification of any options as per the `t:create_table_opts/0`
  type. If no options are specified, defaults will be used as follows:

  * `billing_mode`: `#{inspect(@default_billing_mode)}`
  * `read_capacity`: `#{@default_read_capacity}`
  * `write_capacity`: `#{@default_write_capacity}`
  * `stream_enabled`: `false`
  """
  @spec create_table(binary, key_schema, key_definitions, create_table_opts) :: JSON.t()
  def create_table(name, key_schema, key_definitions, opts \\ []) do
    data =
      build_billing_mode(
        opts[:read_capacity] || @default_read_capacity,
        opts[:write_capacity] || @default_write_capacity,
        opts[:billing_mode] || @default_billing_mode
      )
      |> Map.merge(%{
        "TableName" => name,
        "AttributeDefinitions" => key_definitions |> encode_key_definitions(),
        "KeySchema" => key_schema |> build_key_schema()
      })

    data =
      %{
        "GlobalSecondaryIndexes" => (opts[:global_indexes] || []) |> Enum.map(&camelize_keys(&1, deep: true)),
        "LocalSecondaryIndexes" => (opts[:local_indexes] || []) |> Enum.map(&camelize_keys(&1, deep: true))
      }
      |> Enum.reduce(data, fn
        {_, []}, data ->
          data

        {name, indices}, data ->
          Map.put(data, name, indices)
      end)

    data = Enum.reduce(opts, data, &add_table_opt/2)

    request(:create_table, data)
  end

  defp build_key_schema(key_schema) do
    Enum.map(key_schema, fn {attr, type} ->
      %{
        "AttributeName" => attr,
        "KeyType" => type |> upcase
      }
    end)
  end

  @spec build_billing_mode(
          read_capacity :: pos_integer,
          write_capacity :: pos_integer,
          billing_mode :: dynamo_billing_types
        ) :: map()
  defp build_billing_mode(read_capacity, write_capacity, :provisioned) do
    %{
      "BillingMode" => "PROVISIONED",
      "ProvisionedThroughput" => %{
        "ReadCapacityUnits" => read_capacity,
        "WriteCapacityUnits" => write_capacity
      }
    }
  end

  # Pay-per-request (AKA on-demand) tables do not have read/write capacities.
  defp build_billing_mode(_read_capacity, _write_capacity, :pay_per_request) do
    %{"BillingMode" => "PAY_PER_REQUEST"}
  end

  @doc "Describe table"
  @spec describe_table(name :: binary) :: JSON.t()
  def describe_table(name) do
    request(:describe_table, %{"TableName" => name})
  end

  @doc "Update Table"
  @spec update_table(name :: binary, opts :: update_table_opts() | map()) ::
          JSON.t()
  def update_table(name, opts) do
    data =
      opts
      |> take_opts(@top_level_update_fields)
      |> maybe_convert_billing_mode()
      |> camelize_keys(deep: true)
      |> Map.merge(%{"TableName" => name})

    data = Enum.reduce(opts, data, &add_table_opt/2)

    request(:update_table, data)
  end

  @spec maybe_convert_billing_mode(attributes :: Keyword.t() | map()) :: Keyword.t() | map()
  defp maybe_convert_billing_mode(attributes) do
    case attributes[:billing_mode] do
      nil -> attributes
      _ -> convert_billing_mode(attributes, attributes[:billing_mode])
    end
  end

  @spec convert_billing_mode(attributes :: Keyword.t() | map(), dynamo_billing_types) ::
          Keyword.t() | map()
  defp convert_billing_mode(attributes, :provisioned),
    do: do_convert_billing_mode(attributes, "PROVISIONED")

  defp convert_billing_mode(attributes, :pay_per_request),
    do: do_convert_billing_mode(attributes, "PAY_PER_REQUEST")

  @spec do_convert_billing_mode(attributes :: Keyword.t() | map(), value :: String.t()) ::
          Keyword.t() | map()
  defp do_convert_billing_mode(attributes, value) when is_map(attributes),
    do: Map.replace!(attributes, :billing_mode, value)

  defp do_convert_billing_mode(attributes, value) when is_list(attributes),
    do: Keyword.replace!(attributes, :billing_mode, value)

  defp add_table_opt({:stream_enabled, enabled}, data) do
    data
    |> ensure_entry("StreamSpecification")
    |> put_in(["StreamSpecification", "StreamEnabled"], enabled)
  end

  defp add_table_opt({:stream_view_type, type}, data) do
    data
    |> ensure_entry("StreamSpecification")
    |> put_in(["StreamSpecification", "StreamViewType"], type |> to_string() |> String.upcase())
  end

  defp add_table_opt(_, data) do
    # Other opts are handled in create_table and update_table
    data
  end

  defp ensure_entry(map, entry), do: Map.put_new(map, entry, %{})

  @doc "Delete Table"
  @spec delete_table(table :: binary) :: JSON.t()
  def delete_table(table) do
    request(:delete_table, %{"TableName" => table})
  end

  @doc "Update time to live"
  @spec update_time_to_live(table :: binary, ttl_attribute :: binary, enabled :: boolean) ::
          JSON.t()
  def update_time_to_live(table, ttl_attribute, enabled) do
    data = build_time_to_live(ttl_attribute, enabled) |> Map.merge(%{"TableName" => table})

    request(:update_time_to_live, data)
  end

  @spec build_time_to_live(ttl_attribute :: binary, enabled :: boolean) :: map()
  defp build_time_to_live("", _enabled) do
    %{}
  end

  defp build_time_to_live(ttl_attribute, enabled) when ttl_attribute != nil do
    %{
      "TimeToLiveSpecification" => %{
        "AttributeName" => ttl_attribute,
        "Enabled" => enabled
      }
    }
  end

  defp build_time_to_live(_ttl_attribute, _enabled) do
    %{}
  end

  @doc "Describe time to live"
  @spec describe_time_to_live(table :: binary) :: JSON.t()
  def describe_time_to_live(table) do
    request(:describe_time_to_live, %{"TableName" => table})
  end

  ## Records
  ######################
  @doc """
  Scan table

  Please read https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html

  ```
  Dynamo.scan("Users"
    limit: 1,
    expression_attribute_values: [desired_api_key: "adminkey"],
    expression_attribute_names: %{"#asdf" => "api_key"},
    filter_expression: "#asdf = :desired_api_key")
  ```

  Generally speaking, you won't need to use `:expression_attribute_names`. It exists
  to alias a column name if one of the columns you want to search against is a reserved Dynamo word,
  like `Percentile`. In this case, it's totally unnecessary as `api_key` is not a reserved word.

  Parameters with keys that are automatically annotated with Dynamo types are:
  `[:exclusive_start_key, :expression_attribute_names]`
  """
  @type scan_opts :: [
          {:consistent_read, boolean}
          | {:exclusive_start_key, exclusive_start_key_vals}
          | {:expression_attribute_names, expression_attribute_names_vals}
          | {:expression_attribute_values, expression_attribute_values_vals}
          | {:filter_expression, binary}
          | {:index_name, binary}
          | {:limit, pos_integer}
          | {:projection_expression, binary}
          | {:return_consumed_capacity, return_consumed_capacity_vals}
          | {:segment, non_neg_integer}
          | {:select, select_vals}
          | {:total_segments, pos_integer}
        ]
  @spec scan(table_name :: table_name) :: JSON.t()
  @spec scan(table_name :: table_name, opts :: scan_opts) :: JSON.t()
  def scan(name, opts \\ []) do
    data =
      opts
      |> build_opts()
      |> Map.merge(%{"TableName" => name})

    request(:scan, data, %{stream_builder: &Lazy.stream_scan(name, opts, &1)})
  end

  @doc """
  Query Table

  Please read https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html

  ```
  Dynamo.query("Users",
    limit: 1,
    expression_attribute_values: [desired_api_key: "adminkey"],
    key_condition_expression: "api_key = :desired_api_key")
  ```

  Parameters with keys that are automatically annotated with dynamo types are:
  `[:exclusive_start_key, :expression_attribute_names]`
  """
  @type query_opts :: [
          {:consistent_read, boolean}
          | {:exclusive_start_key, exclusive_start_key_vals}
          | {:expression_attribute_names, expression_attribute_names_vals}
          | {:expression_attribute_values, expression_attribute_values_vals}
          | {:filter_expression, binary}
          | {:index_name, binary}
          | {:key_condition_expression, binary}
          | {:limit, pos_integer}
          | {:projection_expression, binary}
          | {:return_consumed_capacity, return_consumed_capacity_vals}
          | {:scan_index_forward, boolean}
          | {:select, select_vals}
        ]
  @spec query(table_name :: table_name) :: JSON.t()
  @spec query(table_name :: table_name, opts :: query_opts) :: JSON.t()
  def query(name, opts \\ []) do
    data =
      opts
      |> build_opts()
      |> Map.merge(%{"TableName" => name})

    request(:query, data, %{stream_builder: &Lazy.stream_query(name, opts, &1)})
  end

  @doc """
  Batch-get up to 100 items (16 MB total max)

  Map of table names to request parameter maps.
  https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html

  Parameters with keys that are automatically annotated with dynamo types are:
  `[:keys]`

  ```elixir
  Dynamo.batch_get_item(%{
    "Users" => [
      consistent_read: true,
      keys: [
        [api_key: "key1"],
        [api_key: "api_key2"]
      ],
      expression_attribute_names: %{"#api_key" => "api_key"},
      projection_expression: "#api_key"
    ],
    "Subscriptions" => %{
      keys: [
        %{id: "id1"}
      ]
    }
  })
  ```
  As you see, you're largely free to use either keyword args or maps in the body. A map
  is required for the argument itself because the table names are most often binaries, and I refuse
  to inflict proplists on anyone.

  """
  @type batch_get_item_opts :: [
          {:return_consumed_capacity, return_consumed_capacity_vals}
        ]
  @type get_item :: [
          {:consistent_read, boolean}
          | {:keys, [primary_key]}
          | {:expression_attribute_names, expression_attribute_names_vals}
          | {:projection_expression, binary}
        ]
  @spec batch_get_item(%{table_name => get_item}) :: JSON.t()
  @spec batch_get_item(%{table_name => get_item}, opts :: batch_get_item_opts) ::
          JSON.t()
  def batch_get_item(data, opts \\ []) do
    request_items =
      data
      |> Enum.reduce(%{}, fn {table_name, table_query}, query ->
        keys =
          table_query[:keys]
          |> Enum.map(&encode_values/1)

        mapped_table_query =
          table_query
          |> Map.new()

        dynamized_table_query =
          mapped_table_query
          |> Map.drop(@special_opts ++ [:keys])
          |> camelize_keys
          |> build_expression_attribute_names(mapped_table_query)
          |> Map.put("Keys", keys)

        Map.put(query, table_name, dynamized_table_query)
      end)

    data =
      opts
      |> build_opts()
      |> Map.merge(%{"RequestItems" => request_items})

    request(:batch_get_item, data)
  end

  @doc "Put item in table"
  @type put_item_opts :: [
          {:condition_expression, binary}
          | {:expression_attribute_names, expression_attribute_names_vals}
          | {:expression_attribute_values, expression_attribute_values_vals}
          | {:return_consumed_capacity, return_consumed_capacity_vals}
          | {:return_item_collection_metrics, return_item_collection_metrics_vals}
          | {:return_values, return_values_vals}
          | {:return_values_on_condition_check_failure, return_values_on_condition_check_failure_vals}
        ]
  @spec put_item(table_name :: table_name, record :: map()) :: JSON.t()
  @spec put_item(table_name :: table_name, record :: map(), opts :: put_item_opts) ::
          JSON.t()
  def put_item(name, record, opts \\ []) do
    data =
      opts
      |> build_opts()
      |> Map.merge(%{
        "TableName" => name,
        "Item" => Dynamo.Encoder.encode_root(record)
      })

    request(:put_item, data)
  end

  @doc """
  Put or delete up to 25 items (16 MB total max)

  Map of table names to request parameter maps.
  https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html

  Parameters with keys that are automatically annotated with Dynamo types are:
  `[:keys]`
  """
  @type write_item :: [
          [delete_request: [key: primary_key]]
          | [put_request: [item: map()]]
        ]
  @type batch_write_item_opts :: [
          {:return_consumed_capacity, return_consumed_capacity_vals}
          | {:return_item_collection_metrics, return_item_collection_metrics_vals}
        ]
  @spec batch_write_item(%{table_name => [write_item]}) :: JSON.t()
  @spec batch_write_item(%{table_name => [write_item]}, opts :: batch_write_item_opts) ::
          JSON.t()
  def batch_write_item(data, opts \\ []) do
    request_items =
      data
      |> Enum.reduce(%{}, fn {table_name, table_queries}, query ->
        queries =
          table_queries
          |> Enum.map(fn
            [delete_request: [key: primary_key]] ->
              %{"DeleteRequest" => %{"Key" => primary_key |> Dynamo.Encoder.encode_root()}}

            [put_request: [item: item]] ->
              %{"PutRequest" => %{"Item" => Dynamo.Encoder.encode_root(item)}}
          end)

        Map.put(query, table_name, queries)
      end)

    data =
      opts
      |> build_opts()
      |> Map.merge(%{"RequestItems" => request_items})

    request(:batch_write_item, data)
  end

  @doc "Get item from table"
  @type get_item_opts :: [
          {:consistent_read, boolean}
          | {:expression_attribute_names, expression_attribute_names_vals}
          | {:projection_expression, binary}
          | {:return_consumed_capacity, return_consumed_capacity_vals}
        ]
  @spec get_item(table_name :: table_name, primary_key :: primary_key) :: JSON.t()
  @spec get_item(table_name :: table_name, primary_key :: primary_key, opts :: get_item_opts) ::
          JSON.t()
  def get_item(name, primary_key, opts \\ []) do
    data =
      opts
      |> build_opts()
      |> Map.merge(%{
        "TableName" => name,
        "Key" => primary_key |> Map.new() |> Dynamo.Encoder.encode_root()
      })

    request(:get_item, data)
  end

  @doc """
  Update item in table

  For update_args format see
  https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html
  """
  @type update_item_opts :: [
          {:condition_expression, binary}
          | {:expression_attribute_names, expression_attribute_names_vals}
          | {:expression_attribute_values, expression_attribute_values_vals}
          | {:return_consumed_capacity, return_consumed_capacity_vals}
          | {:return_item_collection_metrics, return_item_collection_metrics_vals}
          | {:return_values, return_values_vals}
          | {:return_values_on_condition_check_failure, return_values_on_condition_check_failure_vals}
          | {:update_expression, binary}
        ]
  @spec update_item(
          table_name :: table_name,
          primary_key :: primary_key,
          opts :: update_item_opts
        ) :: JSON.t()
  def update_item(table_name, primary_key, update_opts) do
    data =
      update_opts
      |> build_opts()
      |> Map.merge(%{
        "TableName" => table_name,
        "Key" => primary_key |> Map.new() |> Dynamo.Encoder.encode_root()
      })

    request(:update_item, data)
  end

  @doc "Delete item in table"
  @type delete_item_opts :: [
          {:condition_expression, binary}
          | {:expression_attribute_names, expression_attribute_names_vals}
          | {:expression_attribute_values, expression_attribute_values_vals}
          | {:return_consumed_capacity, return_consumed_capacity_vals}
          | {:return_item_collection_metrics, return_item_collection_metrics_vals}
          | {:return_values, return_values_vals}
          | {:return_values_on_condition_check_failure, return_values_on_condition_check_failure_vals}
        ]
  @spec delete_item(table_name :: table_name, primary_key :: primary_key) ::
          JSON.t()
  @spec delete_item(
          table_name :: table_name,
          primary_key :: primary_key,
          opts :: delete_item_opts
        ) :: JSON.t()
  def delete_item(name, primary_key, opts \\ []) do
    data =
      opts
      |> build_opts()
      |> Map.merge(%{
        "TableName" => name,
        "Key" => primary_key |> Map.new() |> Dynamo.Encoder.encode_root()
      })

    request(:delete_item, data)
  end

  @type transact_get_item_opts :: [
          {:expression_attribute_names, expression_attribute_names_vals}
          | {:projection_expression, binary}
        ]

  @type transact_get_item ::
          {table_name :: binary, primary_key :: primary_key}
          | {table_name :: binary, primary_key :: primary_key, transact_get_item_opts}

  @type transact_get_items_opts :: [
          {:return_consumed_capacity, return_consumed_capacity_vals}
        ]

  @spec transact_get_items(items :: [transact_get_item], transact_get_items_opts) ::
          JSON.t()
  @spec transact_get_items(items :: [transact_get_item]) :: JSON.t()

  @doc """
  A synchronous operation that retrieves multiple items from one or more tables (but not from indexes) in a single account and region
  """
  def transact_get_items(items, opts \\ []) do
    data =
      opts
      |> build_opts()
      |> Map.merge(%{
        "TransactItems" => Enum.map(items, &build_transaction_item({:get, &1}))
      })

    request(:transact_get_items, data)
  end

  defp build_transaction_item({method, {table, item}}),
    do: build_transaction_item({method, {table, item, []}})

  defp build_transaction_item({method, {table, item, opts}}) do
    build_transaction_item(method, table, item, opts)
  end

  defp build_transaction_item(method, table_name, item, opts) do
    item = item |> Dynamo.Encoder.encode_root()

    details =
      opts
      |> build_opts()
      |> Map.merge(%{
        "TableName" => table_name,
        transaction_item_key(method) => item
      })

    %{camelize(method) => details}
  end

  defp transaction_item_key(:put), do: "Item"
  defp transaction_item_key(_any), do: "Key"

  @type return_values_on_condition_check_failure_vals :: :all_old | :none

  @type transact_standard_item_opts :: [
          {:condition_expression, binary}
          | {:expression_attribute_names, expression_attribute_names_vals}
          | {:expression_attribute_values, expression_attribute_values_vals}
          | {:return_values_on_condition_check_failure, return_values_on_condition_check_failure_vals}
        ]

  @type transact_update_item_opts :: [
          {:condition_expression, binary}
          | {:expression_attribute_names, expression_attribute_names_vals}
          | {:expression_attribute_values, expression_attribute_values_vals}
          | {:return_values_on_condition_check_failure, return_values_on_condition_check_failure_vals}
          | {:update_expression, binary}
        ]

  @type transact_write_item ::
          {:condition_check, {table_name :: binary, key :: primary_key, transact_standard_item_opts}}
          | {:delete, {table_name :: binary, key :: primary_key, transact_standard_item_opts}}
          | {:put, {table_name :: binary, item :: map(), transact_standard_item_opts}}
          | {:update, {table_name :: binary, key :: primary_key, transact_update_item_opts}}

  @type transact_write_items_opts :: [
          {:client_request_token, binary}
          | {:return_consumed_capacity, return_consumed_capacity_vals}
          | {:return_item_collection_metrics, return_item_collection_metrics_vals}
        ]

  @doc """
  A synchronous write operation that groups up to 25 action requests
  """
  @spec transact_write_items(items :: [transact_write_item], transact_write_items_opts) ::
          JSON.t()
  @spec transact_write_items(items :: [transact_write_item]) :: JSON.t()
  def transact_write_items(items, opts \\ []) do
    data =
      opts
      |> build_opts()
      |> Map.merge(%{
        "TransactItems" => Enum.map(items, &build_transaction_item/1)
      })

    request(:transact_write_items, data)
  end

  ## Options builder
  ###################

  defp build_opts(opts) do
    opts = opts |> Map.new()

    opts
    |> Map.drop(@special_opts)
    |> add_upcased_opt(opts, :total_segments)
    |> add_upcased_opt(opts, :return_item_collection_metrics)
    |> add_upcased_opt(opts, :select)
    |> add_upcased_opt(opts, :return_values)
    |> add_upcased_opt(opts, :return_values_on_condition_check_failure)
    |> add_upcased_opt(opts, :return_consumed_capacity)
    |> camelize_keys
    |> build_special_opts(opts)
  end

  ## Builders for special options
  ################################

  defp build_special_opts(data, opts) do
    data
    |> build_exclusive_start_key(opts)
    |> build_expression_attribute_names(opts)
    |> build_expression_attribute_values(opts)
  end

  defp build_exclusive_start_key(data, %{exclusive_start_key: start_key}) do
    Map.put(data, "ExclusiveStartKey", start_key |> encode_values)
  end

  defp build_exclusive_start_key(data, _), do: data

  defp build_expression_attribute_names(data, %{expression_attribute_names: names}) do
    Map.put(data, "ExpressionAttributeNames", names |> Map.new())
  end

  defp build_expression_attribute_names(data, _), do: data

  defp build_expression_attribute_values(data, %{expression_attribute_values: values}) do
    values =
      values
      |> encode_values
      |> Enum.reduce(%{}, fn {k, v}, map ->
        Map.put(map, ":#{k}", v)
      end)

    Map.put(data, "ExpressionAttributeValues", values)
  end

  defp build_expression_attribute_values(data, _), do: data

  ## Various other helpers
  #########################

  defp add_upcased_opt(data, opts, key) do
    case Map.fetch(opts, key) do
      :error -> data
      {:ok, nil} -> data
      {:ok, v} -> Map.put(data, key, v |> upcase)
    end
  end

  defp encode_values(dict) do
    Enum.reduce(dict, %{}, fn {attr, value}, attribute_values ->
      Map.put(attribute_values, attr, Dynamo.Encoder.encode(value))
    end)
  end

  defp encode_key_definitions(attrs) do
    attrs
    |> Enum.map(fn {name, type} ->
      %{"AttributeName" => name, "AttributeType" => type |> Dynamo.Encoder.atom_to_dynamo_type()}
    end)
  end

  defp request(op, data, opts \\ %{}) do
    operation =
      op
      |> Atom.to_string()
      |> Macro.camelize()

    JSON.new(
      :dynamodb,
      %{
        data: data,
        error_parser: &error_parser/1,
        headers: [
          {"x-amz-target", "#{@namespace}.#{operation}"},
          {"content-type", "application/x-amz-json-1.0"}
        ]
      }
      |> Map.merge(opts)
    )
  end

  # Cancelled "transactions" are specific to DynamoDB and may include
  # additional information, so include it in the result:
  defp error_parser(
         {:error, {:aws_unhandled, "TransactionCanceledException" = type, message, %{"CancellationReasons" => reasons}}}
       ) do
    {:error, {type, message, reasons}}
  end

  # Condition check failures may include the item that failed the check, when
  # `return_values_on_condition_check_failure` is set to `:all_old`
  defp error_parser({:error, {:aws_unhandled, "ConditionalCheckFailedException" = type, message, %{"Item" => item}}}) do
    {:error, {type, message, item}}
  end

  defp error_parser(otherwise), do: otherwise

  defp take_opts(map, keys) when is_map(map), do: Map.take(map, keys)
  defp take_opts(keyword, keys) when is_list(keyword), do: Keyword.take(keyword, keys)
end