lib/util/aws/cloudfront_signed_url.ex

# Copyright(c) 2015-2023 ACCESS CO., LTD. All rights reserved.

use Croma

defmodule Antikythera.Aws.CloudfrontSignedUrl do
  @moduledoc """
  This module provides functions to generate [a signed URL for CloudFront](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-signed-urls.html)
  """

  @doc """
  Generates a signed URL to access a file via CloudFront.

  ## Parameters

  - `resource_url` (string): CloudFront URL used for accessing the file, including query string parameters, if any.
  - `lifetime_in_seconds` (positive integer): Expiration time is determined as the sum of the current time (in seconds) and this value.
  - `key_pair_id` (string): ID for an active CloudFront key pair used for generating the signature.
  - `private_key` (string): RSA private key for the key pair specified by `key_pair_id`.
  - `url_encoded?` (boolean): Whether `resource_url` is encoded or not (optional, default is `false`).

  ## Return value

  A generated signed URL (string).
  """
  defun generate_signed_url(
          resource_url :: v[String.t()],
          lifetime_in_seconds :: v[pos_integer],
          key_pair_id :: v[String.t()],
          private_key :: v[String.t()],
          url_encoded? :: v[boolean] \\ false
        ) :: String.t() do
    encoded_url = if url_encoded?, do: resource_url, else: URI.encode(resource_url)
    expires_in_seconds = System.system_time(:second) + lifetime_in_seconds
    joiner = if URI.parse(encoded_url) |> Map.get(:query) |> is_nil(), do: "?", else: "&"

    encoded_url <>
      joiner <>
      URI.encode_query(
        make_query_params_for_canned_policy(
          encoded_url,
          expires_in_seconds,
          key_pair_id,
          private_key
        )
      )
  end

  @doc """
  Generates a signed URL to access a file via CloudFront using a custom policy.

  ## Parameters

  - `resource_url` (string): CloudFront URL used for accessing the file, including query string parameters, if any.
  - `lifetime_in_seconds` (positive integer): Expiration time is determined as the sum of the current time (in seconds) and this value.
  - `key_pair_id` (string): ID for an active CloudFront key pair used for generating the signature.
  - `private_key` (string): RSA private key for the key pair specified by `key_pair_id`.
  - `url_encoded?` (boolean): Whether `resource_url` is encoded or not (optional, default is `false`).
  - `optional_policy` (Keyword): Optional policy conditions to be added to a custom policy (default is `[]`). Currently, supports only the following keywords:
    - `:date_greater_than`(integer >= 0): Seconds in `AWS:EpochTime`. Specified to `DateGreaterThan`
    - `:ip_address` (list of strings): Specified to `IpAddress`.

  ## Return value

  A generated signed URL (string).
  """
  defun generate_signed_url_using_custom_policy(
          resource_url :: v[String.t()],
          lifetime_in_seconds :: v[pos_integer],
          key_pair_id :: v[String.t()],
          private_key :: v[String.t()],
          url_encoded? :: v[boolean] \\ false,
          optional_policy :: Keyword.t() \\ []
        ) :: String.t() do
    encoded_url = if url_encoded?, do: resource_url, else: URI.encode(resource_url)
    expires_in_seconds = System.system_time(:second) + lifetime_in_seconds
    joiner = if URI.parse(encoded_url) |> Map.get(:query) |> is_nil(), do: "?", else: "&"

    encoded_url <>
      joiner <>
      URI.encode_query(
        make_query_params_for_custom_policy(
          encoded_url,
          expires_in_seconds,
          key_pair_id,
          private_key,
          optional_policy
        )
      )
  end

  defunpt make_query_params_for_canned_policy(
            encoded_url :: v[String.t()],
            expires_in_seconds :: v[pos_integer],
            key_pair_id :: v[String.t()],
            private_key :: v[String.t()]
          ) :: [{String.t(), String.t()}] do
    policy_statement =
      ~s/{"Statement":[{"Resource":"#{encoded_url}","Condition":{"DateLessThan":{"AWS:EpochTime":#{expires_in_seconds}}}}]}/

    signature = create_signature(policy_statement, private_key)

    [
      {"Expires", expires_in_seconds},
      {"Signature", signature},
      {"Key-Pair-Id", key_pair_id}
    ]
  end

  defunpt generate_ip_address_string(addresses :: v[[String.t()]]) :: v[String.t()] do
    case addresses do
      [address] ->
        ~s/"#{String.trim(address)}"/

      addresses ->
        "[" <>
          Enum.map_join(addresses, ",", fn address -> ~s/"#{String.trim(address)}"/ end) <> "]"
    end
  end

  defunpt generate_custom_policy(
            encoded_url :: v[String.t()],
            expires_in_seconds :: v[pos_integer],
            optional_policy :: Keyword.t()
          ) :: v[String.t()] do
    date_greater_than =
      case Keyword.fetch(optional_policy, :date_greater_than) do
        {:ok, t} when is_integer(t) and t >= 0 -> ~s/,"DateGreaterThan":{"AWS:EpochTime":#{t}}/
        _ -> ""
      end

    ip_address =
      case Keyword.fetch(optional_policy, :ip_address) do
        {:ok, []} ->
          ""

        {:ok, addresses} when is_list(addresses) ->
          ~s/,"IpAddress":{"AWS:SourceIp":#{generate_ip_address_string(addresses)}}/

        # It is important to restrict a connection using IpAddress, so this raises a RuntimeError instead of just ignoring :ip_address.
        {:ok, _} ->
          raise "Type of :ip_address is not list"

        _ ->
          ""
      end

    ~s/{"Statement":[{"Resource":"#{encoded_url}","Condition":{"DateLessThan":{"AWS:EpochTime":#{expires_in_seconds}}#{date_greater_than}#{ip_address}}}]}/
  end

  defunpt make_query_params_for_custom_policy(
            encoded_url :: v[String.t()],
            expires_in_seconds :: v[pos_integer],
            key_pair_id :: v[String.t()],
            private_key :: v[String.t()],
            optional_policy :: Keyword.t()
          ) :: [{String.t(), String.t()}] do
    policy_statement = generate_custom_policy(encoded_url, expires_in_seconds, optional_policy)
    signature = create_signature(policy_statement, private_key)

    [
      {"Policy", encode_for_aws(policy_statement)},
      {"Signature", signature},
      {"Key-Pair-Id", key_pair_id}
    ]
  end

  defunp encode_for_aws(string :: v[String.t()]) :: v[String.t()] do
    string
    |> Base.encode64()
    # Replace characters that are invalid in a URL query string with characters that are valid.
    # https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-creating-signed-url-canned-policy.html
    |> String.replace("+", "-")
    |> String.replace("=", "_")
    |> String.replace("/", "~")
  end

  defunp create_signature(policy_statement :: v[String.t()], private_key :: v[String.t()]) ::
           String.t() do
    :public_key.sign(policy_statement, :sha, decode_rsa_key(private_key))
    |> encode_for_aws()
  end

  defp decode_rsa_key(rsa_key) do
    [pem_entry] = :public_key.pem_decode(rsa_key)
    :public_key.pem_entry_decode(pem_entry)
  end
end