defmodule COS.STS do
@moduledoc """
临时密钥生成及使用指引 - [腾讯云文档](https://cloud.tencent.com/document/product/436/14048)
"""
alias COS.Utils
@doc """
获取联合身份临时访问凭证 - [腾讯云文档](https://cloud.tencent.com/document/api/1312/48195)
## 示例
iex> COS.STS.get_credential(
"cos-sts-elixir",
%{
version: "2.0",
statement: [
%{
effect: "allow",
action: ["cos:PutObject"],
resource: ["qcs::cos:ap-beijing:uid/1250000000:bucket-1250000000/*"]
}
]
},
"ap-beijing"
)
{:ok, %Tesla.Env{
body: %{
"credentials" => %{
"token" => "xxx",
...
}
},
...
}}
iex> COS.STS.get_credential(
"cos-sts-elixir",
%{
version: "2.0",
statement: [
%{
effect: "allow",
action: ["cos:PutObject"],
resource: ["invalid resouce"]
}
]
},
"ap-beijing"
)
{:error, %Tesla.Env{
body: %{
"error" => %{
"code" => "InvalidParameter.ResouceError",
"message" => "resource error"
}
"request_id" => "6734caac-d3bf-4465-adf3-2e300e5063ce"
},
...
}}
"""
@spec get_credential(
name :: binary(),
policy :: map(),
region :: binary(),
opts :: [
expired_at: DateTime.t(),
expire_in: Utils.expire_in(),
tesla_opts: Tesla.Env.opts()
]
) :: Tesla.Env.t()
def get_credential(name, policy, region, opts \\ []) do
host = "sts.#{region}.tencentcloudapi.com"
headers = [
{"host", host},
{"content-type", "application/json"},
{"x-tc-timestamp", DateTime.utc_now() |> DateTime.to_unix()}
]
duration_seconds =
case {opts[:expired_at], opts[:expire_in]} do
{nil, nil} -> 1800
{%DateTime{} = expired_at, _} -> DateTime.to_unix(expired_at)
{_, expire_in} -> Utils.to_seconds(expire_in)
end
body =
%{
Name: name,
Policy: policy |> Jason.encode!() |> URI.encode_www_form(),
DurationSeconds: duration_seconds
}
|> Jason.encode!()
config = COS.config()
authorization = get_authorization(body, headers, config)
headers =
headers ++
[
{"x-tc-action", "GetFederationToken"},
{"x-tc-region", region},
{"x-tc-version", "2018-08-13"},
{"authorization", authorization}
]
options = [
method: :post,
url: "https://" <> host,
body: body,
headers: headers,
opts: opts[:tesla_opts]
]
config.http_client[:middleware]
|> Tesla.client(config.http_client[:adapter])
|> Tesla.request(options)
|> case do
{:ok, response} ->
response.body
|> Jason.decode!()
|> COS.Utils.underscore_keys()
|> Map.get("response")
|> case do
%{"credentials" => _} = body -> {:ok, %{response | body: body}}
body -> {:error, %{response | body: body}}
end
error ->
error
end
end
defp get_authorization(body, headers, config) do
# https://cloud.tencent.com/document/api/1312/48202
# 1. 拼接规范请求串
sorted_headers =
headers
|> Enum.map(fn {key, value} -> {String.downcase(key), value} end)
|> Enum.sort_by(&elem(&1, 0))
canonical_headers =
sorted_headers
|> Enum.map(fn {key, value} -> "#{key}:#{value}\n" end)
|> Enum.join()
signed_headers = sorted_headers |> Enum.map(&elem(&1, 0)) |> Enum.join(";")
hashed_request_payload =
:sha256
|> :crypto.hash(body)
|> Base.encode16(case: :lower)
canonical_request =
[
"POST",
"/",
"",
canonical_headers,
signed_headers,
hashed_request_payload
]
|> Enum.join("\n")
# 2. 拼接待签名字符串
algorithm = "TC3-HMAC-SHA256"
request_timestamp =
Enum.find_value(sorted_headers, fn {key, value} ->
if key == "x-tc-timestamp", do: value
end)
date = request_timestamp |> DateTime.from_unix!() |> DateTime.to_date() |> Date.to_string()
credential_scope = "#{date}/sts/tc3_request"
hashed_canonical_request =
:sha256
|> :crypto.hash(canonical_request)
|> Base.encode16(case: :lower)
string_to_sign =
"#{algorithm}\n#{request_timestamp}\n#{credential_scope}\n#{hashed_canonical_request}"
# 3. 计算签名
signature =
("TC3" <> config.secret_key)
|> hmac_sha256(date)
|> hmac_sha256("sts")
|> hmac_sha256("tc3_request")
|> hmac_sha256(string_to_sign)
|> Base.encode16(case: :lower)
# 4. 拼接 Authorization
algorithm <>
" " <>
Enum.join(
[
"Credential=#{config.secret_id}/#{credential_scope}",
"SignedHeaders=#{signed_headers}",
"Signature=#{signature}"
],
", "
)
end
defp hmac_sha256(key, data), do: Utils.hmac(:sha256, key, data)
end