defmodule Ecto.Adapters.DynamoDB.Cache do
@moduledoc """
An Elixir agent to cache DynamoDB table schemas and the first page of results for selected tables
"""
@typep table_name_t :: String.t()
@typep dynamo_response_t :: %{required(String.t()) => term}
alias Confex.Resolver
alias Ecto.Adapters.DynamoDB
alias Ecto.Repo
defstruct [
:schemas,
:tables,
:ex_aws_config
]
@type cached_table :: {String.t(), map()}
@type t :: %__MODULE__{
schemas: map(),
tables: [cached_table()]
}
def child_spec([repo]) do
%{
id: repo,
start: {__MODULE__, :start_link, [repo]}
}
end
@spec start_link(Repo.t()) :: Agent.on_start()
def start_link(repo) do
cached_table_list =
repo.config()
|> Resolver.resolve!()
|> Keyword.get(:cached_tables, [])
Agent.start_link(
fn ->
%__MODULE__{
schemas: %{},
tables: for(table_name <- cached_table_list, into: %{}, do: {table_name, nil}),
ex_aws_config: DynamoDB.ex_aws_config(repo)
}
end,
name: agent(repo)
)
end
@doc """
Returns the cached value for a call to DynamoDB, describe-table. Performs a DynamoDB scan if not yet cached and raises any errors as a result of the request. The raw json is presented as an elixir map.
"""
@spec describe_table!(Repo.t(), table_name_t) :: dynamo_response_t | no_return
def describe_table!(repo, table_name) do
case describe_table(repo, table_name) do
{:ok, schema} -> schema
{:error, error} -> raise error.type, message: error.message
end
end
@spec describe_table(Repo.t(), table_name_t) :: {:ok, dynamo_response_t} | {:error, term}
def describe_table(repo, table_name),
do: Agent.get_and_update(agent(repo), &do_describe_table(&1, table_name))
@doc """
Performs a DynamoDB, describe-table, and caches (without returning) the result. Raises any errors as a result of the request
"""
@spec update_table_info!(Repo.t(), table_name_t) :: :ok | no_return
def update_table_info!(repo, table_name) do
case update_table_info(repo, table_name) do
:ok -> :ok
{:error, error} -> raise error.type, message: error.message
end
end
@spec update_table_info(Repo.t(), table_name_t) :: :ok | {:error, term}
def update_table_info(repo, table_name),
do: Agent.get_and_update(agent(repo), &do_update_table_info(&1, table_name))
@doc """
Returns the cached first page of results for a table. Performs a DynamoDB scan if not yet cached and raises any errors as a result of the request
"""
@spec scan!(Repo.t(), table_name_t) :: dynamo_response_t | no_return
def scan!(repo, table_name) do
case scan(repo, table_name) do
{:ok, scan_result} -> scan_result
{:error, error} -> raise error.type, message: error.message
end
end
@spec scan(Repo.t(), table_name_t) :: {:ok, dynamo_response_t} | {:error, term}
def scan(repo, table_name),
do: Agent.get_and_update(agent(repo), &do_scan(&1, table_name))
@doc """
Performs a DynamoDB scan and caches (without returning) the first page of results. Raises any errors as a result of the request
"""
@spec update_cached_table!(Repo.t(), table_name_t) :: :ok | no_return
def update_cached_table!(repo, table_name) do
case update_cached_table(repo, table_name) do
:ok -> :ok
{:error, error} -> raise error.type, message: error.message
end
end
@spec update_cached_table(Repo.t(), table_name_t) :: :ok | {:error, term}
def update_cached_table(repo, table_name),
do: Agent.get_and_update(agent(repo), &do_update_cached_table(&1, table_name))
@doc """
Returns the current cache of table schemas, and cache of first page of results for selected tables, as an Elixir map
"""
# For testing and debugging use only:
def get_cache(repo),
do: Agent.get(agent(repo), & &1)
defp do_describe_table(cache, table_name) do
case cache.schemas[table_name] do
nil ->
result = ExAws.Dynamo.describe_table(table_name) |> ExAws.request(cache.ex_aws_config)
case result do
{:ok, %{"Table" => schema}} ->
updated_cache = put_in(cache.schemas[table_name], schema)
{{:ok, schema}, updated_cache}
{:error, error} ->
{{:error, %{type: ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"}},
cache}
end
schema ->
{{:ok, schema}, cache}
end
end
defp do_update_table_info(cache, table_name) do
result = ExAws.Dynamo.describe_table(table_name) |> ExAws.request(cache.ex_aws_config)
case result do
{:ok, %{"Table" => schema}} ->
updated_cache = put_in(cache.schemas[table_name], schema)
{:ok, updated_cache}
{:error, error} ->
{{:error, %{type: ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"}}, cache}
end
end
defp do_scan(cache, table_name) do
table_name_in_config = Map.has_key?(cache.tables, table_name)
case cache.tables[table_name] do
nil when table_name_in_config ->
result = ExAws.Dynamo.scan(table_name) |> ExAws.request(cache.ex_aws_config)
case result do
{:ok, scan_result} ->
updated_cache = put_in(cache.tables[table_name], scan_result)
{{:ok, scan_result}, updated_cache}
{:error, error} ->
{{:error, %{type: ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"}},
cache}
end
nil ->
{{:error,
%{
type: ArgumentError,
message:
"Could not confirm the table, #{inspect(table_name)}, as listed for caching in the application's configuration. Please see README file for details."
}}, cache}
cached_scan ->
{{:ok, cached_scan}, cache}
end
end
defp do_update_cached_table(cache, table_name) do
table_name_in_config = Map.has_key?(cache.tables, table_name)
case cache.tables[table_name] do
nil when not table_name_in_config ->
{{:error,
%{
type: ArgumentError,
message:
"Could not confirm the table, #{inspect(table_name)}, as listed for caching in the application's configuration. Please see README file for details."
}}, cache}
_ ->
result = ExAws.Dynamo.scan(table_name) |> ExAws.request(cache.ex_aws_config)
case result do
{:ok, scan_result} ->
updated_cache = put_in(cache.tables[table_name], scan_result)
{:ok, updated_cache}
{:error, error} ->
{{:error, %{type: ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"}},
cache}
end
end
end
defp agent(repo), do: Module.concat(repo, Cache)
end