# Copyright 2026 Relay, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
defmodule ExCredstash do
@moduledoc """
Elixir implementation of credstash - a utility for managing credentials
in AWS using KMS and DynamoDB.
## Usage as a Library
# Store a secret
ExCredstash.put("db_password", "super_secret", region: "us-east-1")
# Retrieve a secret
{:ok, "super_secret"} = ExCredstash.get("db_password", region: "us-east-1")
# List all secrets
{:ok, secrets} = ExCredstash.list(region: "us-east-1")
# Delete a secret
{:ok, count} = ExCredstash.delete("db_password", region: "us-east-1")
## Configuration
Options can be passed to each function or configured globally:
config :ex_credstash,
region: "us-east-1",
table: "credential-store",
kms_key: "alias/credstash"
## Encryption Scheme
Credstash uses a two-key system:
- KMS generates a 64-byte data key
- First 32 bytes are used for AES-256-CTR encryption
- Last 32 bytes are used for HMAC integrity verification
This implementation is fully compatible with the Python credstash library.
"""
alias ExCredstash.{Config, Crypto, Dynamo, KMS, Version}
@doc """
Create the DynamoDB table for storing credentials.
## Options
* `:region` - AWS region (required unless configured)
* `:table` - Table name (default: "credential-store")
* `:tags` - Map of tags to apply to the table
* `:access_key_id` - AWS access key ID (optional, falls back to `:aws_credentials`)
* `:secret_access_key` - AWS secret access key (optional, falls back to `:aws_credentials`)
* `:session_token` - AWS session token (optional)
## Returns
* `{:ok, :created}` - Table was created
* `{:ok, :exists}` - Table already exists
* `{:error, reason}` - Error occurred
## Examples
iex> ExCredstash.setup(region: "us-east-1")
{:ok, :created}
iex> ExCredstash.setup(region: "us-east-1", table: "my-secrets")
{:ok, :created}
"""
@spec setup(keyword()) :: {:ok, :created | :exists} | {:error, term()}
def setup(opts \\ []) do
Dynamo.create_table(
[region: Config.region(opts), table: Config.table(opts), tags: opts[:tags]] ++ creds(opts)
)
end
@doc """
Store a secret in credstash.
The secret is encrypted using a KMS data key and stored in DynamoDB.
## Options
* `:region` - AWS region (required unless configured)
* `:table` - Table name (default: "credential-store")
* `:key_id` or `:kms_key` - KMS key ID (default: "alias/credstash")
* `:version` - Explicit version integer (auto-incremented if not provided)
* `:context` - KMS encryption context map
* `:digest` - Hash algorithm atom (default: :sha256)
* `:comment` - Optional comment to store with the secret
* `:access_key_id` - AWS access key ID (optional, falls back to `:aws_credentials`)
* `:secret_access_key` - AWS secret access key (optional, falls back to `:aws_credentials`)
* `:session_token` - AWS session token (optional)
## Returns
* `{:ok, version_string}` - The version that was stored (zero-padded)
* `{:error, {:already_exists, version}}` - Version already exists
* `{:error, reason}` - Other error
## Examples
iex> ExCredstash.put("db_password", "super_secret", region: "us-east-1")
{:ok, "0000000000000000001"}
iex> ExCredstash.put("api_key", "key123", region: "us-east-1", version: 5)
{:ok, "0000000000000000005"}
"""
@spec put(String.t(), String.t(), keyword()) :: {:ok, String.t()} | {:error, term()}
def put(name, secret, opts \\ []) do
region = Config.region(opts)
table = Config.table(opts)
key_id = Config.kms_key(opts)
context = Config.context(opts)
digest = Config.digest(opts)
comment = opts[:comment]
credential_opts = creds(opts)
# Step 1: Determine the version
version_result =
case opts[:version] do
nil ->
# Auto-increment: get highest version and increment
case Dynamo.get_highest_version(
[region: region, table: table, name: name] ++ credential_opts
) do
{:ok, current} -> {:ok, Version.next(current)}
{:error, _} = error -> error
end
explicit_version when is_integer(explicit_version) ->
{:ok, {explicit_version, Version.pad(explicit_version)}}
end
with {:ok, {_version_int, version_string}} <- version_result,
# Step 2: Generate KMS data key (64 bytes: 32 for AES, 32 for HMAC)
{:ok, %{plaintext: data_key, ciphertext_blob: encrypted_key}} <-
KMS.generate_data_key(
[region: region, key_id: key_id, context: context] ++ credential_opts
),
# Step 3: Encrypt the secret with AES-CTR
{ciphertext, hmac_value} <- Crypto.encrypt(secret, data_key, digest),
# Step 4: Prepare data for storage
key_b64 <- Base.encode64(encrypted_key),
contents_b64 <- Base.encode64(ciphertext),
hmac_hex <- Base.encode16(hmac_value, case: :lower),
digest_string <- Crypto.digest_to_string(digest),
# Step 5: Store in DynamoDB
{:ok, :created} <-
Dynamo.put_secret(
[
region: region,
table: table,
name: name,
version: version_string,
key: key_b64,
contents: contents_b64,
hmac: hmac_hex,
digest: digest_string,
comment: comment
] ++ credential_opts
) do
{:ok, version_string}
end
end
@doc """
Store multiple secrets at once.
Takes a map of name => value pairs.
## Options
Same as `put/3`, including `:access_key_id`, `:secret_access_key`, and `:session_token`.
## Returns
* `{:ok, %{name => version_string}}` - All secrets stored successfully
* `{:error, %{name => reason}, %{name => version_string}}` - Some failed
## Examples
iex> ExCredstash.put_all(%{"key1" => "val1", "key2" => "val2"}, region: "us-east-1")
{:ok, %{"key1" => "0000000000000000001", "key2" => "0000000000000000001"}}
"""
@spec put_all(map(), keyword()) :: {:ok, map()} | {:error, map(), map()}
def put_all(secrets, opts \\ []) when is_map(secrets) do
results =
Enum.reduce(secrets, {%{}, %{}}, fn {name, value}, {successes, failures} ->
case put(name, value, opts) do
{:ok, version} ->
{Map.put(successes, name, version), failures}
{:error, reason} ->
{successes, Map.put(failures, name, reason)}
end
end)
case results do
{successes, failures} when map_size(failures) == 0 ->
{:ok, successes}
{successes, failures} ->
{:error, failures, successes}
end
end
@doc """
Retrieve a secret from credstash.
## Options
* `:region` - AWS region (required unless configured)
* `:table` - Table name (default: "credential-store")
* `:version` - Specific version to retrieve (default: latest)
* `:context` - KMS encryption context map (must match what was used for encryption)
* `:access_key_id` - AWS access key ID (optional, falls back to `:aws_credentials`)
* `:secret_access_key` - AWS secret access key (optional, falls back to `:aws_credentials`)
* `:session_token` - AWS session token (optional)
## Returns
* `{:ok, secret_string}` - The decrypted secret
* `{:error, :not_found}` - Secret not found
* `{:error, :integrity_error}` - HMAC verification failed
* `{:error, reason}` - Other error
## Examples
iex> ExCredstash.get("db_password", region: "us-east-1")
{:ok, "super_secret"}
iex> ExCredstash.get("db_password", region: "us-east-1", version: 1)
{:ok, "old_secret"}
"""
@spec get(String.t(), keyword()) :: {:ok, String.t()} | {:error, term()}
def get(name, opts \\ []) do
region = Config.region(opts)
table = Config.table(opts)
context = Config.context(opts)
credential_opts = creds(opts)
# Step 1: Fetch from DynamoDB
fetch_result =
case opts[:version] do
nil ->
# Get latest version
Dynamo.get_latest_secret([region: region, table: table, name: name] ++ credential_opts)
version when is_integer(version) ->
# Get specific version
Dynamo.get_secret(
[region: region, table: table, name: name, version: Version.pad(version)] ++
credential_opts
)
version when is_binary(version) ->
# Already a string version
padded =
if Version.valid?(version) do
version
else
Version.pad(String.to_integer(version))
end
Dynamo.get_secret(
[region: region, table: table, name: name, version: padded] ++ credential_opts
)
end
with {:ok, item} <- fetch_result do
decrypt_item(item, region, context, credential_opts)
end
end
@doc """
Retrieve all secrets from credstash.
## Options
* `:region` - AWS region (required unless configured)
* `:table` - Table name (default: "credential-store")
* `:context` - KMS encryption context map
* `:version` - `:all` to get all versions, or `:latest` (default)
* `:access_key_id` - AWS access key ID (optional, falls back to `:aws_credentials`)
* `:secret_access_key` - AWS secret access key (optional, falls back to `:aws_credentials`)
* `:session_token` - AWS session token (optional)
## Returns
* `{:ok, %{name => secret_string}}` - Map of all secrets (latest version for each)
* `{:error, reason}` - Error occurred
## Examples
iex> ExCredstash.get_all(region: "us-east-1")
{:ok, %{"db_password" => "secret1", "api_key" => "secret2"}}
"""
@spec get_all(keyword()) :: {:ok, map()} | {:error, term()}
def get_all(opts \\ []) do
region = Config.region(opts)
table = Config.table(opts)
context = Config.context(opts)
credential_opts = creds(opts)
# First get list of all unique names
with {:ok, names} <- Dynamo.list_names([region: region, table: table] ++ credential_opts) do
# Fetch and decrypt each secret
results =
Enum.reduce_while(names, {:ok, %{}}, fn name, {:ok, acc} ->
case get(name, [region: region, table: table, context: context] ++ credential_opts) do
{:ok, secret} ->
{:cont, {:ok, Map.put(acc, name, secret)}}
{:error, _reason} ->
# Skip secrets that fail to decrypt (e.g., wrong context)
# This matches Python credstash behavior
{:cont, {:ok, acc}}
end
end)
results
end
end
@doc """
List all credentials in the table.
Does NOT decrypt the secrets, only returns metadata.
## Options
* `:region` - AWS region (required unless configured)
* `:table` - Table name (default: "credential-store")
* `:access_key_id` - AWS access key ID (optional, falls back to `:aws_credentials`)
* `:secret_access_key` - AWS secret access key (optional, falls back to `:aws_credentials`)
* `:session_token` - AWS session token (optional)
## Returns
* `{:ok, [%{name: String.t(), version: String.t(), comment: String.t() | nil}]}`
* `{:error, reason}`
## Examples
iex> ExCredstash.list(region: "us-east-1")
{:ok, [%{name: "db_password", version: "0000000000000000001", comment: nil}]}
"""
@spec list(keyword()) :: {:ok, list(map())} | {:error, term()}
def list(opts \\ []) do
Dynamo.list_secrets([region: Config.region(opts), table: Config.table(opts)] ++ creds(opts))
end
@doc """
List unique credential names.
## Options
* `:region` - AWS region (required unless configured)
* `:table` - Table name (default: "credential-store")
* `:access_key_id` - AWS access key ID (optional, falls back to `:aws_credentials`)
* `:secret_access_key` - AWS secret access key (optional, falls back to `:aws_credentials`)
* `:session_token` - AWS session token (optional)
## Returns
* `{:ok, [String.t()]}` - Sorted list of unique secret names
* `{:error, reason}`
## Examples
iex> ExCredstash.keys(region: "us-east-1")
{:ok, ["api_key", "db_password"]}
"""
@spec keys(keyword()) :: {:ok, list(String.t())} | {:error, term()}
def keys(opts \\ []) do
Dynamo.list_names([region: Config.region(opts), table: Config.table(opts)] ++ creds(opts))
end
@doc """
Delete all versions of a secret.
## Options
* `:region` - AWS region (required unless configured)
* `:table` - Table name (default: "credential-store")
* `:access_key_id` - AWS access key ID (optional, falls back to `:aws_credentials`)
* `:secret_access_key` - AWS secret access key (optional, falls back to `:aws_credentials`)
* `:session_token` - AWS session token (optional)
## Returns
* `{:ok, deleted_count}` - Number of versions deleted
* `{:error, reason}`
## Examples
iex> ExCredstash.delete("old_secret", region: "us-east-1")
{:ok, 3}
"""
@spec delete(String.t(), keyword()) :: {:ok, non_neg_integer()} | {:error, term()}
def delete(name, opts \\ []) do
Dynamo.delete_secret(
[region: Config.region(opts), table: Config.table(opts), name: name] ++ creds(opts)
)
end
# Private Functions
@doc false
defp decrypt_item(item, region, context, credential_opts) do
# Step 1: Decode stored values
with {:ok, encrypted_key} <- Base.decode64(item.key),
{:ok, ciphertext} <- Base.decode64(item.contents),
{:ok, expected_hmac} <- decode_hmac(item.hmac),
digest <- Crypto.digest_to_atom(item.digest || "SHA256"),
# Step 2: Decrypt the KMS data key
{:ok, data_key} <-
KMS.decrypt(
[region: region, ciphertext_blob: encrypted_key, context: context] ++ credential_opts
),
# Step 3: Verify HMAC and decrypt
{:ok, plaintext} <- Crypto.decrypt(ciphertext, expected_hmac, data_key, digest) do
{:ok, plaintext}
else
:error ->
{:error, :invalid_base64}
{:error, _reason} = error ->
error
end
end
defp creds(opts), do: Keyword.take(opts, [:access_key_id, :secret_access_key, :session_token])
defp decode_hmac(hmac_string) when is_binary(hmac_string) do
# HMAC is stored as hex string
case Base.decode16(hmac_string, case: :mixed) do
{:ok, _hmac} = result -> result
:error -> {:error, :invalid_hmac_format}
end
end
end