defmodule ExAliyunOts do
@moduledoc ~S"""
The `ExAliyunOts` module provides a tablestore-based API as a client for working with Alibaba TableStore product servers.
Here are links to official documents in [Chinese](https://help.aliyun.com/document_detail/27280.html) | [English](https://www.alibabacloud.com/help/product/27278.html)
## Configuration
config :ex_aliyun_ots, :my_instance
name: "MyInstanceName",
endpoint: "MyInstanceEndpoint",
access_key_id: "MyAliyunRAMKeyID",
access_key_secret: "MyAliyunRAMKeySecret"
config :ex_aliyun_ots,
instances: [:my_instance],
debug: false,
enable_tunnel: false
* `debug`, optional, specifies whether to enable debug logger, by default it's false, and please DO NOT use debug mode in production.
* `enable_tunnel`, optional, specifies whether to enable tunnel functions, there will startup tunnel related `Supervisor` and `Registry` when enable it, by default it's false.
## Using ExAliyunOts
To use `ExAliyunOts`, a module that calls `use ExAliyunOts` has to be defined:
defmodule MyApp.TableStore do
use ExAliyunOts, instance: :my_instance
end
This automatically defines some macros and functions in the `MyApp.TableStore` module, here are some examples:
import MyApp.TableStore
# Create table
create_table "table",
[{"pk1", :integer}, {"pk2", :string}]
# Put row
put_row "table",
[{"pk1", "id1"}],
[{"attr1", 10}, {"attr2", "attr2_value"}],
condition: condition(:expect_not_exist),
return_type: :pk
# Search index
search "table", "index_name",
search_query: [
query: match_query("age", 28),
sort: [
field_sort("age", order: :desc)
]
]
# Local transaction
start_local_transaction "table", {"partition_key", "partition_value"}
## ExAliyunOts API
There are two ways to use ExAliyunOts:
* using macros and functions from your own ExAliyunOts module, like `MyApp.TableStore`.
* using macros and functions from the `ExAliyunOts` module.
All defined functions and macros in `ExAliyunOts` are available and referable for your own ExAliyunOts module as well, except that the given arity of functions may
different, because the `instance` parameter of each invoke request is NOT needed from your own ExAliyunOts module although the `ExAliyunOts` module defines it.
"""
require ExAliyunOts.Const.OperationType, as: OperationType
alias ExAliyunOts.{Var, Client, Utils}
alias ExAliyunOts.TableStore.{ReturnType, Direction}
@before_compile ExAliyunOts.MergeCompiler
@type instance :: atom
@type table_name :: String.t()
@type primary_keys :: list
@type inclusive_start_primary_keys :: list
@type exclusive_end_primary_keys :: list
@type index_name :: String.t()
@type options :: Keyword.t()
@type result :: {:ok, map()} | {:error, ExAliyunOts.Error.t()}
require Logger
defmacro __using__(opts \\ []) do
opts = Macro.prewalk(opts, &Macro.expand(&1, __CALLER__))
quote do
@instance Keyword.get(unquote(opts), :instance)
use ExAliyunOts.Constants
import ExAliyunOts.DSL
@before_compile ExAliyunOts.Compiler
end
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27312.html) | [English](https://www.alibabacloud.com/help/doc-detail/27312.html)
## Example
create_table "table_name2",
[{"key1", :string}, {"key2", :auto_increment}]
create_table "table_name3",
[{"key1", :string}],
reserved_throughput_write: 1,
reserved_throughput_read: 1,
time_to_live: 100_000,
max_versions: 3,
deviation_cell_version_in_sec: 6_400,
stream_spec: [is_enabled: true, expiration_time: 2]
create_table "table_name",
[{"key1", :string}],
defined_columns: [
{"attr1", :string},
{"attr2", :integer},
{"attr3", :boolean},
{"attr4", :double},
{"attr5", :binary}
]
create_table "table_name",
[{"key1", :string}],
index_metas: [
{"indexname1", ["key1"], ["attr1", "attr2"]},
{"indexname2", ["key1"], ["attr4"]}
]
## Options
* `:reserved_throughput_write`, optional, the reserved throughput write of table, by default it is 0.
* `:reserved_throughput_read`, optional, the reserved throughput read of table, by default it is 0.
* `time_to_live`, optional, the data storage time to live in seconds, the minimum settable value is 864_000 seconds (one day), by default it is -1 (for permanent).
* `:max_versions`, optional, the version of table, by default it is 1 that specifies there is only one version for columns.
* `:deviation_cell_version_in_sec`, optional, maximum version deviation, by default it is 864_000 seconds (one day).
* `:stream_spec`, specifies whether enable stream, by default it is not enable stream feature.
- `:is_enabled`, enable or not enable stream, use `true` or `false`;
- `:expiration_time`, the expiration time of stream.
* `:index_metas`, optional, the index meta of table, each item of `:index_metas` is in {String.t(), list(), list()} format, by default it is [].
* `:defined_columns`, optional, the indexed attribute column, which is a combination of predefined columns of the base table, each item of `:defined_columns`
is in {String.t(), :integer | :double | :boolean | :string | :binary} format, by default it is [].
"""
@doc table: :table
@spec create_table(instance, table_name, primary_keys, options) ::
:ok | {:error, ExAliyunOts.Error.t()}
def create_table(instance, table_name, primary_keys, options \\ []) do
var_create_table = %Var.CreateTable{
table_name: table_name,
primary_keys: primary_keys
}
prepared_var = map_options(var_create_table, options)
Client.create_table(instance, prepared_var)
end
@doc """
Create global secondary indexes. Official document in [Chinese](https://help.aliyun.com/document_detail/91947.html) | [English](https://www.alibabacloud.com/help/doc-detail/91947.html)
## Example
create_index "table_name",
"table_index_name1"
["pk1", "pk2", "col0"],
["col1", "col2"]
create_index "table_name",
"table_index_name2"
["col0", "pk1"],
["col1", "col2", "col3"],
include_base_data: false
## Options
* `:index_update_mode`, the update mode of the index table, optional, currently only support `:IUM_ASYNC_INDEX`,
by default it is `:IUM_ASYNC_INDEX`;
* `:index_type`, the type of the index table, optional, currently only support `:IT_GLOBAL_INDEX`,
by default it is `:IT_GLOBAL_INDEX`;
* `:include_base_data`, specifies whether the index table includes the existing data in the base table, if set it to
`true` means the index includes the existing data, if set it to `false` means the index excludes the existing data,
optional, by default it is `true`.
"""
@doc table: :table
@spec create_index(
instance,
table_name,
index_name,
primary_keys :: [String.t()],
defined_columns :: [String.t()],
options
) :: :ok | {:error, ExAliyunOts.Error.t()}
def create_index(
instance,
table_name,
index_name,
primary_keys,
defined_columns,
options \\ []
) do
Client.create_index(
instance,
table_name,
index_name,
primary_keys,
defined_columns,
options
)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/94558.html) | [English](https://www.alibabacloud.com/help/doc-detail/94558.html)
## Example
import MyApp.TableStore
delete_index("table_name", "index_name")
"""
@doc table: :table
@spec delete_index(instance, table_name, index_name) :: :ok | {:error, ExAliyunOts.Error.t()}
defdelegate delete_index(instance, table_name, index_name), to: Client
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27314.html) | [English](https://www.alibabacloud.com/help/doc-detail/27314.html)
## Example
import MyApp.TableStore
delete_table("table_name")
"""
@doc table: :table
@spec delete_table(instance, table_name) :: :ok | {:error, ExAliyunOts.Error.t()}
defdelegate delete_table(instance, table_name), to: Client
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27313.html) | [English](https://www.alibabacloud.com/help/doc-detail/27313.html)
## Example
import MyApp.TableStore
list_table()
"""
@doc table: :table
@spec list_table(instance) :: result
defdelegate list_table(instance), to: Client
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27315.html) | [English](https://www.alibabacloud.com/help/doc-detail/27315.html)
## Example
import MyApp.TableStore
update_table "table_name",
reserved_throughput_write: 10,
time_to_live: 200_000,
stream_spec: [is_enabled: false]
## Options
Please see options of `create_table/4`.
"""
@doc table: :table
@spec update_table(instance, table_name, options) :: result
def update_table(instance, table_name, options \\ []) do
var_update_table = %Var.UpdateTable{
table_name: table_name
}
prepared_var = map_options(var_update_table, options)
Client.update_table(instance, prepared_var)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27307.html) | [English](https://www.alibabacloud.com/help/doc-detail/27307.html)
## Example
import MyApp.TableStore
describe_table(table_name)
"""
@doc table: :table
@spec describe_table(instance, table_name) :: result
defdelegate describe_table(instance, table_name), to: Client
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/53813.html) | [English](https://www.alibabacloud.com/help/doc-detail/53813.html)
"""
@doc table: :table
@spec compute_split_points_by_size(instance, table_name, splits_size :: integer()) ::
result
defdelegate compute_split_points_by_size(instance, table_name, splits_size), to: Client
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27310.html) | [English](https://www.alibabacloud.com/help/doc-detail/27310.html)
## Example
import MyApp.TableStore
batch_get [
get(table_name1, [[{"key1", 1}, {"key2", "1"}]]),
get(
table_name2,
[{"key1", "key1"}],
columns_to_get: ["name", "age"],
filter: filter "age" >= 10
)
]
The batch get operation can be considered as a collection of mulitple `get/3` operations.
"""
@doc row: :row
@spec batch_get(instance, requests :: list()) :: result
defdelegate batch_get(instance, requests), to: Client, as: :batch_get_row
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27311.html) | [English](https://www.alibabacloud.com/help/doc-detail/27311.html)
## Example
import MyApp.TableStore
batch_write [
{"table1", [
write_delete([{"key1", 5}, {"key2", "5"}],
return_type: :pk,
condition: condition(:expect_exist, "attr1" == 5)),
write_put([{"key1", 6}, {"key2", "6"}],
[{"new_put_val1", "val1"}, {"new_put_val2", "val2"}],
condition: condition(:expect_not_exist),
return_type: :pk)
]},
{"table2", [
write_update([{"key1", "new_tab3_id2"}],
put: [{"new_put1", "u1"}, {"new_put2", 2.5}],
condition: condition(:expect_not_exist)),
write_put([{"key1", "new_tab3_id3"}],
[{"new_put1", "put1"}, {"new_put2", 10}],
condition: condition(:expect_not_exist))
]}
]
The batch write operation can be considered as a collection of multiple `write_put/3`, `write_update/2` and `write_delete/2` operations.
## Options
* `:transaction_id`, optional, batch write operation within local transaction.
* `:is_atomic`, optional, defaults to false, whether set this batch write request be with an atomic operation, if this option is `true`,
keep the partition key of each table in the batch write operation is unique, or the corresponding write operation of the table will fail.
"""
@doc row: :row
@spec batch_write(instance, requests :: list(), options) :: result
def batch_write(instance, requests, options \\ [])
def batch_write(instance, requests, options) when is_list(requests) do
batch_write_requests =
Enum.map(requests, fn {table_name, write_rows} ->
%Var.BatchWriteRequest{
table_name: table_name,
rows: write_rows
}
end)
Client.batch_write_row(instance, batch_write_requests, options)
end
def batch_write(instance, {table_name, write_rows}, options) do
batch_write_request = %Var.BatchWriteRequest{
table_name: table_name,
rows: write_rows
}
Client.batch_write_row(instance, batch_write_request, options)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27305.html) | [English](https://www.alibabacloud.com/help/doc-detail/27305.html)
## Example
import MyApp.TableStore
get_row "table1",
[{"key1", "id1"}, {"key2", "id2"}],
columns_to_get: ["name", "level"],
filter: filter(("name[ignore_if_missing: true, latest_version_only: true]" == var_name and "age" > 1) or ("class" == "1"))
get_row "table2",
[{"key", "1"}],
start_column: "room",
filter: pagination(offset: 0, limit: 3)
get_row "table3",
[{"key", "1"}],
transaction_id: "transaction_id"
## Options
* `:columns_to_get`, optional, fetch the special fields, by default it returns all fields, pass a field list to specify the expected return fields
e.g. `["field1", "field2"]`.
* `:start_column`, optional, specifies the start column when using for wide-row-read, the returned result contains this `:start_column`.
* `:end_column`, optional, specifies the end column when using for wide-row-read, the returned result does not contain this `:end_column`.
* `:filter`, optional, filter the return results in the server side, please see `filter/1` for details.
* `:max_versions`, optional, how many versions need to return in results, by default it is 1.
* `:time_range`, optional, read data by timestamp range, support two ways to use it:
- `time_range: {start_timestamp, end_timestamp}`, the timestamp in the range (include `start_timestamp` but exclude `end_timestamp`)
and then will return in the results.
- `time_range: special_timestamp`, exactly match and then will return in the results.
- `:time_range` and `:max_versions` are mutually exclusive, by default use `max_versions: 1` and `time_range: nil`.
* `:transaction_id`, optional, read operation within local transaction.
"""
@doc row: :row
@spec get_row(instance, table_name, primary_keys, options) :: result
def get_row(instance, table_name, primary_keys, options \\ []) do
prepared_var = get(table_name, primary_keys, options)
Client.get_row(instance, prepared_var)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27306.html) | [English](https://www.alibabacloud.com/help/doc-detail/27306.html)
## Example
import MyApp.TableStore
put_row "table1",
[{"key1", "id1"}],
[{"name", "name1"}, {"age", 20}],
condition: condition(:expect_not_exist),
return_type: :pk
put_row "table2",
[{"key1", "id1"}],
[{"name", "name1"}, {"age", 20}],
condition: condition(:expect_not_exist),
transaction_id: "transaction_id"
return_type: :pk
## Options
* `:condition`, required, please see `condition/1` or `condition/2` for details.
* `:return_type`, optional, whether return the primary keys after put row, available options are `:pk` | `:none`, by default it is `:none`.
* `:transaction_id`, optional, write operation within local transaction.
"""
@doc row: :row
@spec put_row(instance, table_name, primary_keys, options) :: result
def put_row(instance, table_name, primary_keys, attrs, options \\ []) do
prepared_var =
%Var.PutRow{
table_name: table_name,
primary_keys: primary_keys,
attribute_columns: attrs
}
|> map_options(options)
Client.put_row(instance, prepared_var)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27307.html) | [English](https://www.alibabacloud.com/help/doc-detail/27307.html)
## Example
import MyApp.TableStore
value = "1"
update_row "table1",
[{"key1", 2}, {"key2", "2"}],
delete: [{"attr2", nil, 1524464460}],
delete_all: ["attr1"],
put: [{"attr3", "put_attr3"}],
return_type: :pk,
condition: condition(:expect_exist, "attr2" == value)
update_row "table2",
[{"key1", 1}],
put: [{"attr1", "put_attr1"}],
increment: [{"count", 1}],
return_type: :after_modify,
return_columns: ["count"],
condition: condition(:ignore)
update_row "table3",
[partition_key],
put: [{"new_attr1", "a1"}],
delete_all: ["level", "size"],
condition: condition(:ignore),
transaction_id: "transaction_id"
## Options
* `:put`, optional, require to be valid value, e.g. `[{"field1", "value"}, {...}]`, insert a new column if this field is not existed, or overwrite this field if existed.
* `:delete`, optional, delete the special version of a column or columns, please pass the column's version (timestamp) in `:delete` option, e.g. [{"field1", nil, 1524464460}, ...].
* `:delete_all`, optional, delete all versions of a column or columns, e.g. ["field1", "field2", ...].
* `:increment`, optional, attribute column(s) base on atomic counters for increment or decrement, require the value of column is integer.
- for increment, `increment: [{"count", 1}]`;
- for decrement, `increment: [{"count", -1}]`.
* `:return_type`, optional, whether return the primary keys after update row, available options are `:pk` | `:none` | `:after_modify`, by default it is `:none`.
- if use atomic counters, must set `return_type: :after_modify`.
* `:condition`, required, please see `condition/1` or `condition/2` for details.
* `:transaction_id`, optional, write operation within local transaction.
"""
@doc row: :row
@spec update_row(instance, table_name, primary_keys, options) :: result
def update_row(instance, table_name, primary_keys, options \\ []) do
prepared_var =
%Var.UpdateRow{
table_name: table_name,
primary_keys: primary_keys
}
|> map_options(options)
|> Map.put(:updates, map_updates(options))
Client.update_row(instance, prepared_var)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27308.html) | [English](https://www.alibabacloud.com/help/doc-detail/27308.html)
## Example
import MyApp.TableStore
delete_row "table1",
[{"key1", 3}, {"key2", "3"}],
condition: condition(:expect_exist, "attr2" == "value2")
delete_row "table1",
[{"key1", 3}, {"key2", "3"}],
condition: condition(:expect_exist, "attr2" == "value2"),
transaction_id: "transaction_id"
## Options
* `:condition`, required, please see `condition/1` or `condition/2` for details.
* `:transaction_id`, optional, write operation within local transaction.
"""
@doc row: :row
@spec delete_row(instance, table_name, primary_keys, options) :: result
def delete_row(instance, table_name, primary_keys, options \\ []) do
prepared_var =
%Var.DeleteRow{
table_name: table_name,
primary_keys: primary_keys
}
|> map_options(options)
Client.delete_row(instance, prepared_var)
end
@doc """
Used in batch get operation, please see `batch_get/2` for details.
## Options
The available options are same as `get_row/4`.
"""
@doc row: :row
@spec get(table_name, primary_keys, options) :: map()
def get(table_name, primary_keys, options \\ []) do
%Var.GetRow{table_name: table_name, primary_keys: primary_keys}
|> map_options(options)
end
@doc """
Used in batch write operation, please see `batch_write/2` for details.
## Options
The available options are same as `put_row/5`.
"""
@doc row: :row
@spec write_put(primary_keys, attrs :: list(), options) :: map()
def write_put(primary_keys, attrs, options \\ []) do
%Var.RowInBatchWriteRequest{
type: :PUT,
primary_keys: primary_keys,
updates: attrs
}
|> map_options(options)
end
@doc """
Used in batch write operation, please see `batch_write/2` for details.
## Options
The available options are same as `update_row/4`.
"""
@doc row: :row
@spec write_update(primary_keys, options) :: map()
def write_update(primary_keys, options \\ []) do
%Var.RowInBatchWriteRequest{
type: :UPDATE,
primary_keys: primary_keys,
updates: map_updates(options)
}
|> map_options(options)
end
@doc """
Used in batch write operation, please see `batch_write/2` for details.
## Options
The available operation same as `delete_row/4`.
"""
@doc row: :row
@spec write_delete(primary_keys, options) :: map()
def write_delete(primary_keys, options \\ []) do
%Var.RowInBatchWriteRequest{type: :DELETE, primary_keys: primary_keys}
|> map_options(options)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/27309.html) | [English](https://www.alibabacloud.com/help/doc-detail/27309.html)
## Example
import MyApp.TableStore
get_range "table_name",
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
direction: :forward
get_range "table_name",
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
time_range: {1525922253224, 1525923253224},
direction: :forward
get_range "table_name",
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
time_range: 1525942123224,
direction: :forward
Also, there is an alternative `stream_range/5` to iteratively get range of rows in stream.
## Options
* `:direction`, required, the order of fetch data, available options are `:forward` | `:backward`, by it is `:forward`.
- `:forward`, this query is performed in the order of primary key in ascending, in this case, input `inclusive_start_primary_keys` should less
than `exclusive_end_primary_keys`;
- `:backward`, this query is performed in the order of primary key in descending, in this case, input `inclusive_start_primary_keys` should greater
than `exclusive_end_primary_keys`.
* `:columns_to_get`, optional, fetch the special fields, by default it returns all fields, pass a field list to specify the expected return fields,
e.g. `["field1", "field2"]`.
* `:start_column`, optional, specifies the start column when using for wide-row-read, the returned result contains this `:start_column`.
* `:end_column`, optional, specifies the end column when using for wide-row-read, the returned result does not contain this `:end_column`.
* `:filter`, optional, filter the return results in the server side, please see `filter/1` for details.
* `:max_versions`, optional, how many versions need to return in results, by default it is 1.
* `:transaction_id`, optional, read operation within local transaction.
* `:limit`, optional, the maximum number of rows of data to be returned, this value must be greater than 0, whether this option is set or not, there
returns a maximum of 5,000 data rows and the total data size never exceeds 4 MB.
* `:time_range`, optional, read data by timestamp range, support two ways to use it:
- `time_range: {start_timestamp, end_timestamp}`, the timestamp in the range (include `start_timestamp` but exclude `end_timestamp`)
and then will return in the results.
- `time_range: special_timestamp`, exactly match and then will return in the results.
- `:time_range` and `:max_versions` are mutually exclusive, by default use `max_versions: 1` and `time_range: nil`.
"""
@doc row: :row
@spec get_range(
instance,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options
) :: result
def get_range(
instance,
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options \\ []
)
def get_range(
instance,
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options
)
when is_list(inclusive_start_primary_keys) do
prepared_var =
prepared_get_range(
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options
)
Client.get_range(instance, prepared_var, nil)
end
def get_range(
instance,
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options
)
when is_binary(inclusive_start_primary_keys) do
prepared_var =
%Var.GetRange{
table_name: table_name,
exclusive_end_primary_keys: exclusive_end_primary_keys
}
|> map_options(options)
Client.get_range(instance, prepared_var, inclusive_start_primary_keys)
end
@doc """
As a wrapper built on `get_range/5` to fetch a full matched data set by iterate, if process a large items,
recommend to use `stream_range/5`.
## Example
import MyApp.TableStore
iterate_all_range table_name1,
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
direction: :forward
## Options
Please see options of `get_range/5` for details.
"""
@doc row: :row
@spec iterate_all_range(
instance,
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options
) :: result
def iterate_all_range(
instance,
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options \\ []
) do
prepared_var =
prepared_get_range(
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options
)
Client.iterate_get_all_range(instance, prepared_var)
end
@doc """
As a wrapper built on `get_range/5` to create composable and lazy enumerable stream for iteration.
## Example
import MyApp.TableStore
stream =
stream_range table_name1,
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
direction: :forward
Enum.to_list(stream, fn
{:ok, %{rows: rows} = response} ->
# process rows
{:error, error} ->
# occur error
end)
## Options
Please see options of `get_range/5` for details.
"""
@doc row: :row
@spec stream_range(
instance,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options
) :: Enumerable.t()
def stream_range(
instance,
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options \\ []
) do
prepared_var =
prepared_get_range(
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options
)
Client.stream_range(instance, prepared_var)
end
@compile {:inline, prepared_get_range: 4}
defp prepared_get_range(
table_name,
inclusive_start_primary_keys,
exclusive_end_primary_keys,
options
) do
map_options(
%Var.GetRange{
table_name: table_name,
inclusive_start_primary_keys: inclusive_start_primary_keys,
exclusive_end_primary_keys: exclusive_end_primary_keys
},
options
)
end
@doc """
The one entrance to use search index functions, please see `ExAliyunOts.Search` module for details.
Official document in [Chinese](https://help.aliyun.com/document_detail/91974.html) | [English](https://www.alibabacloud.com/help/doc-detail/91974.html)
## Options
* `:search_query`, required, the main option to use query and sort.
- `:query`, required, bind to the query functions:
- `ExAliyunOts.Search.bool_query/1`
- `ExAliyunOts.Search.exists_query/1`
- `ExAliyunOts.Search.geo_bounding_box_query/3`
- `ExAliyunOts.Search.geo_distance_query/3`
- `ExAliyunOts.Search.geo_polygon_query/2`
- `ExAliyunOts.Search.match_all_query/0`
- `ExAliyunOts.Search.match_phrase_query/2`
- `ExAliyunOts.Search.match_query/3`
- `ExAliyunOts.Search.nested_query/3`
- `ExAliyunOts.Search.prefix_query/2`
- `ExAliyunOts.Search.range_query/2`
- `ExAliyunOts.Search.term_query/2`
- `ExAliyunOts.Search.terms_query/2`
- `ExAliyunOts.Search.wildcard_query/2`
- `:sort`, optional, by default it is use `pk_sort/1`, bind to the Sort functions:
- `ExAliyunOts.Search.field_sort/2`
- `ExAliyunOts.Search.geo_distance_sort/3`
- `ExAliyunOts.Search.nested_filter/2`
- `ExAliyunOts.Search.pk_sort/1`
- `ExAliyunOts.Search.score_sort/1`
- `:aggs`, optional, please see official document in [Chinese](https://help.aliyun.com/document_detail/132191.html) | [English](https://www.alibabacloud.com/help/doc-detail/132191.html).
- `:group_bys`, optional, please see official document in [Chinese](https://help.aliyun.com/document_detail/132210.html) | [English](https://www.alibabacloud.com/help/doc-detail/132210.html).
- `:limit`, optional, the limited size of query.
- `:offset`, optional, the offset size of query. When the total rows are less or equal than 2000, can both used`:limit` and `:offset` to pagination.
- `:get_total_count`, optional, return the total count of the all matched rows, by default it is `true`.
- `:token`, optional, when do not load all the matched rows in a single request, there will return a `next_token` value in that result,
and then we can pass it to `:token` in the next same search query to continue load the rest rows.
- `:collapse`, optional, duplicate removal by the specified field, please see official document in [Chinese](https://help.aliyun.com/document_detail/154172.html), please NOTICE that currently there does not support use `:collapse` with `:token` together.
* `:columns_to_get`, optional, fetch the special fields, by default it returns all fields, here are available options:
- `:all`, return all attribute column fields;
- `:none`, do not return any attribute column fields;
- `["field1", "field2"]`, specifies the expected return attribute column fields.
"""
@doc search: :search
@spec search(instance, table_name, index_name, options) :: result
def search(instance, table_name, index_name, options) do
prepared_var = prepared_search(table_name, index_name, options)
Client.search(instance, prepared_var)
end
@doc """
As a wrapper built on `search/4` to create composable and lazy enumerable stream for iteration.
## Options
Please see options of `search/4` for details.
"""
@doc search: :search
@spec stream_search(instance, table_name, index_name, options) :: Enumerable.t()
def stream_search(instance, table_name, index_name, options) do
prepared_var = prepared_search(table_name, index_name, options)
Client.stream_search(instance, prepared_var)
end
@doc """
As a wrapper built on `stream_search/4` to fetch a full matched data set as a stream, then use `Enum.reduce/2` to iteratively
format all data into a list, if process a large items, recommend to use `stream_search/4`.
## Options
Please see options of `search/4` for details.
"""
@doc search: :search
@spec iterate_search(instance, table_name, index_name, options) :: result
def iterate_search(instance, table_name, index_name, options) do
prepared_var = prepared_search(table_name, index_name, options)
Client.iterate_search(instance, prepared_var)
end
defp prepared_search(table_name, index_name, options) do
ExAliyunOts.Search.map_search_options(
%Var.Search.SearchRequest{table_name: table_name, index_name: index_name},
options
)
end
@doc """
Query current supported maximum number of concurrent tasks to `parallel_scan/4` request.
Official document in [Chinese](https://help.aliyun.com/document_detail/153862.html) | [English](https://www.alibabacloud.com/help/doc-detail/153862.htm)
"""
@doc search: :search
@spec compute_splits(instance, table_name, index_name) :: result
defdelegate compute_splits(instance, table_name, index_name), to: Client
@doc """
Leverage concurrent tasks to query matched raw data (still be with search function) more quickly, in this use case, this function is improved for speed up
scan query, but no guarantee to the order of query results, and does not support the aggregation of scan query.
In general, recommend to use `iterate_parallel_scan/5` or `iterate_parallel_scan/7` for the common use case of parallel scan.
Official document in [Chinese](https://help.aliyun.com/document_detail/153862.html) | [English](https://www.alibabacloud.com/help/doc-detail/153862.htm)
## Options
* `:scan_query`, required, the main option to use query.
- `:query`, required, bind to the query functions, the same as query option of `search/4`.
- `:limit`, optional, the limited size of query, defaults to 2000, the maximum value of limit is 2000.
- `:token`, optional, when do not load all the matched rows in a single request, there will return a `next_token` value in that result,
and then we can pass it to `:token` in the next same scan query to continue load the rest rows.
- `:max_parallel`, required, the maximum number of concurrent, as the `splits_size` value from the response of `compute_splits/3`.
- `:current_parallel_id`, required, refer the official document, the available value is in [0, max_parallel).
* `:columns_to_get`, optional, fetch the special fields, by default it returns all fields of the search index, here are available options:
- `:all_from_index`, return all attribute column fields of search index;
- `:none`, do not return any attribute column fields;
- `["field1", "field2"]`, specifies the expected return attribute column fields.
* `session_id`, as usual, this option is required from the response of `compute_splits/3`, if not set this option, the query result may contain
duplicate data, refer the official document, once occurs an `OTSSessionExpired` error, must initiate another parallel scan task to re-query data.
"""
@doc search: :search
@spec parallel_scan(instance, table_name, index_name, options) :: result
def parallel_scan(instance, table_name, index_name, options) do
request = ExAliyunOts.Search.map_scan_options(table_name, index_name, options)
Client.parallel_scan(instance, request)
end
@doc """
A simple wrapper of `stream_parallel_scan/4` to take care `OTSSessionExpired` error with retry, make parallel scan
as a stream that applies the given function to the complete result of scan query.
In general, recommend to use this function for the common use case of parallel scan.
## Options
* `:scan_query`, required, the main option to use query.
- `:query`, required, bind to the query functions, the same as query option of `search/5`.
- `:limit`, optional, the limited size of query, defaults to 2000, the maximum value of limit is 2000.
* `:columns_to_get`, optional, fetch the special fields, by default it returns all fields of the search index, here are available options:
- `:all_from_index`, return all attribute column fields of search index;
- `:none`, do not return any attribute column fields;
- `["field1", "field2"]`, specifies the expected return attribute column fields.
* `:timeout`, optional, the `:timeout` option of `Task.async_stream/3`, defaults to `:infinity`.
## Example
def iterate_stream(stream) do
Enum.map(stream, fn
{:ok, response} ->
response
{:error, error} ->
error
end)
end
iterate_parallel_scan(
"table",
"index",
&iterate_stream/1,
scan_query: [
query: match_query("is_actived", "true"),
limit: 1000
],
columns_to_get: ["is_actived", "name", "score"]
)
"""
@doc search: :search
@spec iterate_parallel_scan(instance, table_name, index_name, fun :: (term -> term), options) ::
term()
def iterate_parallel_scan(instance, table_name, index_name, fun, options)
when is_function(fun) do
result =
instance
|> stream_parallel_scan(table_name, index_name, options)
|> fun.()
case result do
{:error, %ExAliyunOts.Error{code: "OTSSessionExpired"}} ->
Logger.info("scan_query session expired, will renew a parallel scan task.")
iterate_parallel_scan(instance, table_name, index_name, fun, options)
other ->
other
end
end
@doc """
A simple wrapper of `stream_parallel_scan/4` to take care `OTSSessionExpired` error with retry, make parallel scan
as a stream that applies the given function from `module` with the list of arguments `args` to the complete result of scan query.
In general, recommend to use this function for the common use case of parallel scan.
## Options
Please see options of `iterate_parallel_scan/5`.
## Example
defmodule StreamHandler do
def iterate_stream(stream) do
Enum.map(stream, fn
{:ok, response} ->
response
{:error, error} ->
error
end)
end
end
iterate_parallel_scan(
"table",
"index",
StreamHandler,
:iterate_stream,
[],
scan_query: [
query: match_query("is_actived", "true"),
limit: 1000
],
columns_to_get: ["field1", "field2"]
)
"""
@doc search: :search
@spec iterate_parallel_scan(
instance,
table_name,
index_name,
mod :: module(),
fun :: atom(),
args :: [term],
options
) :: term()
def iterate_parallel_scan(instance, table_name, index_name, mod, fun, args, options) do
value = stream_parallel_scan(instance, table_name, index_name, options)
case apply(mod, fun, [value | args]) do
{:error, %ExAliyunOts.Error{code: "OTSSessionExpired"}} ->
Logger.info("scan_query session expired, will renew a parallel scan task.")
iterate_parallel_scan(instance, table_name, index_name, mod, fun, args, options)
other ->
other
end
end
@doc """
Integrate `parallel_scan/4` with `compute_splits/3` as a complete use, base on the response of `compute_splits/3` to create the corresponding
number of concurrency task(s), use `Task.async_stream/3` to make parallel scan as a stream which properly process `token`
in every request of the internal, when use this function need to consider the possibility of the `OTSSessionExpired` error in the external.
## Options
Please see options of `iterate_parallel_scan/5`.
"""
@doc search: :search
@spec stream_parallel_scan(instance, table_name, index_name, options) :: Enumerable.t()
defdelegate stream_parallel_scan(instance, table_name, index_name, options),
to: ExAliyunOts.Search
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/117477.html) | [English](https://www.alibabacloud.com/help/doc-detail/117477.html)
## Example
import MyApp.TableStore
list_search_index("table")
"""
@doc search: :search
@spec list_search_index(instance, table_name) :: result
defdelegate list_search_index(instance, table_name), to: Client
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/117452.html) | [English](https://www.alibabacloud.com/help/doc-detail/117452.html)
## Example
import MyApp.TableStore
create_search_index "table", "index_name",
field_schemas: [
field_schema_keyword("name"),
field_schema_integer("age")
]
create_search_index "table", "index_name",
field_schemas: [
field_schema_keyword("name"),
field_schema_geo_point("location"),
field_schema_integer("value")
]
create_search_index "table", "index_name",
field_schemas: [
field_schema_nested(
"content",
field_schemas: [
field_schema_keyword("header"),
field_schema_keyword("body")
]
)
]
## Options
* `:field_schemas`, required, a list of predefined search-index schema fields, please see the following helper functions:
- `ExAliyunOts.Search.field_schema_integer/2`
- `ExAliyunOts.Search.field_schema_float/2`
- `ExAliyunOts.Search.field_schema_boolean/2`
- `ExAliyunOts.Search.field_schema_keyword/2`
- `ExAliyunOts.Search.field_schema_text/2`
- `ExAliyunOts.Search.field_schema_nested/2`
- `ExAliyunOts.Search.field_schema_geo_point/2`
* `:index_sorts`, optional, a list of predefined sort-index schema fields, please see the following helper functions:
- `ExAliyunOts.Search.pk_sort/1`
- `ExAliyunOts.Search.field_sort/2`
- `ExAliyunOts.Search.geo_distance_sort/3`
"""
@doc search: :search
@spec create_search_index(instance, table_name, index_name, options) :: result
def create_search_index(instance, table_name, index_name, options) do
var_request = %Var.Search.CreateSearchIndexRequest{
table_name: table_name,
index_name: index_name,
index_schema: %Var.Search.IndexSchema{
field_schemas: Keyword.fetch!(options, :field_schemas),
index_sorts: Keyword.get(options, :index_sorts)
}
}
Client.create_search_index(instance, var_request)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/117478.html) | [English](https://www.alibabacloud.com/help/doc-detail/117478.html)
## Example
import MyApp.TableStore
delete_search_index("table", "index_name")
"""
@doc search: :search
@spec delete_search_index(instance, table_name, index_name) :: result
def delete_search_index(instance, table_name, index_name) do
var_delete_request = %Var.Search.DeleteSearchIndexRequest{
table_name: table_name,
index_name: index_name
}
Client.delete_search_index(instance, var_delete_request)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/117475.html) | [English](https://www.alibabacloud.com/help/doc-detail/117475.html)
## Example
import MyApp.TableStore
describe_search_index("table", "index_name")
"""
@doc search: :search
@spec describe_search_index(instance, table_name, index_name) :: result
def describe_search_index(instance, table_name, index_name) do
var_describe_request = %Var.Search.DescribeSearchIndexRequest{
table_name: table_name,
index_name: index_name
}
Client.describe_search_index(instance, var_describe_request)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/93819.html) | [English](https://www.alibabacloud.com/help/doc-detail/93819.html)
## Example
import MyApp.TableStore
partition_key = {"key", "key1"}
start_local_transaction("table", partition_key)
"""
@doc local_transaction: :local_transaction
@spec start_local_transaction(instance, table_name, partition_key :: tuple()) :: result
def start_local_transaction(instance, table_name, partition_key) do
var_start_local_transaction = %Var.Transaction.StartLocalTransactionRequest{
table_name: table_name,
partition_key: partition_key
}
Client.start_local_transaction(instance, var_start_local_transaction)
end
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/93819.html) | [English](https://www.alibabacloud.com/help/doc-detail/93819.html)
## Example
import MyApp.TableStore
commit_transaction("transaction_id")
"""
@doc local_transaction: :local_transaction
@spec commit_transaction(instance, transaction_id :: String.t()) :: result
defdelegate commit_transaction(instance, transaction_id), to: Client
@doc """
Official document in [Chinese](https://help.aliyun.com/document_detail/93819.html) | [English](https://www.alibabacloud.com/help/doc-detail/93819.html)
## Example
import MyApp.TableStore
abort_transaction("transaction_id")
"""
@doc local_transaction: :local_transaction
defdelegate abort_transaction(instance, transaction_id), to: Client
defp map_options(var, nil), do: var
defp map_options(var, options) do
options
|> Keyword.keys()
|> Enum.reduce(var, fn key, acc ->
value = Keyword.get(options, key)
if value != nil and Map.has_key?(var, key) do
case key do
:return_type ->
Map.put(acc, key, map_return_type(value))
:direction ->
Map.put(acc, key, map_direction(value))
:stream_spec ->
Map.put(acc, key, struct(Var.StreamSpec, value))
:time_range ->
Map.put(acc, key, map_time_range(value))
_ ->
Map.put(acc, key, value)
end
else
acc
end
end)
end
defp map_return_type(nil), do: :RT_NONE
ReturnType.constants()
|> Enum.map(fn {_value, type} ->
downcase_type = type |> to_string() |> String.slice(3..-1) |> Utils.downcase_atom()
defp map_return_type(unquote(downcase_type)), do: unquote(type)
defp map_return_type(unquote(type)), do: unquote(type)
end)
defp map_return_type(invalid_return_type) do
raise ExAliyunOts.RuntimeError, "invalid return_type: #{inspect(invalid_return_type)}"
end
Direction.constants()
|> Enum.map(fn {_value, type} ->
defp map_direction(unquote(Utils.downcase_atom(type))), do: unquote(type)
defp map_direction(unquote(type)), do: unquote(type)
end)
defp map_direction(invalid_direction) do
raise ExAliyunOts.RuntimeError, "invalid direction: #{inspect(invalid_direction)}"
end
defp map_time_range(specific_time) when is_integer(specific_time) do
%Var.TimeRange{specific_time: specific_time}
end
defp map_time_range({start_time, end_time})
when is_integer(start_time) and is_integer(end_time) do
%Var.TimeRange{start_time: start_time, end_time: end_time}
end
@operation_type_mapping OperationType.updates_supported()
|> Enum.map(fn type -> {Utils.downcase_atom(type), type} end)
defp map_updates(options) do
Enum.reduce(@operation_type_mapping, %{}, fn {update_operation, operation_type}, acc ->
{matched_update, _rest_opts} = Keyword.pop(options, update_operation)
if matched_update != nil do
Map.put(acc, operation_type, matched_update)
else
acc
end
end)
end
end