lib/prima_auth0_ex/token.ex

defmodule PrimaAuth0Ex.Token do
  @moduledoc """
  Module to verify the integrity and validate the claims of tokens.
  """

  use Joken.Config

  add_hook JokenJwks, strategy: PrimaAuth0Ex.JwksStrategy
  add_hook Joken.Hooks.RequiredClaims, [:aud, :iat, :exp]

  @impl true
  def token_config do
    [skip: [:audience], iss: issuer()]
    |> default_claims()
    |> add_claim("aud", nil, &validate_audience/3)
    |> add_claim("permissions", nil, &validate_permissions/3)
  end

  @impl Joken.Hooks
  def after_validate(_hook_options, {:ok, claims} = result, {_token_config, _claims, context} = input) do
    if missing_required_permissions_claim?(claims, context) do
      {:halt, {:error, [message: "Invalid token", missing_claims: "permissions"]}}
    else
      {:cont, result, input}
    end
  end

  def after_validate(_, result, input), do: {:cont, result, input}

  @spec verify_and_validate_token(String.t(), String.t(), list(String.t()), boolean()) ::
          {:ok, Joken.claims()} | {:error, atom | Keyword.t()}
  def verify_and_validate_token(token, audience, required_permissions, ignore_signature) do
    context = %{audience: audience, required_permissions: required_permissions}

    if ignore_signature do
      validate_token(token, context)
    else
      verify_and_validate(token, __default_signer__(), context)
    end
  end

  @doc """
  Returns the list of permissions held by a token.

  In case of missing permissions claim or malformed token it defaults to an empty list.
  Note that this function does not verify the signature of the token.
  """
  @spec peek_permissions(String.t()) :: [String.t()]
  def peek_permissions(token) do
    with {:ok, claims} <- Joken.peek_claims(token),
         permissions <- Map.get(claims, "permissions", []) do
      permissions
    else
      _any_error -> []
    end
  end

  defp validate_token(token, context) do
    with {:ok, claims} <- Joken.peek_claims(token),
         do: validate(claims, context)
  end

  defp issuer, do: Application.fetch_env!(:prima_auth0_ex, :server)[:issuer]

  defp validate_audience(token_audience, _claims, context) do
    expected_audience = context[:audience]

    unless expected_audience do
      raise ArgumentError, "It is required to set an expected audience in order to validate tokens"
    end

    do_validate_audience(token_audience, expected_audience)
  end

  defp do_validate_audience(found, expected) when is_list(found), do: expected in found
  defp do_validate_audience(found, expected), do: found == expected

  defp validate_permissions(token_permissions, _claims, context) do
    required_permissions = context[:required_permissions]

    Enum.all?(required_permissions, &(&1 in token_permissions))
  end

  defp missing_required_permissions_claim?(claims, context) do
    permissions_required? =
      context
      |> Map.get(:required_permissions, [])
      |> Enum.count()
      |> Kernel.>(0)

    permissions_required? and not Map.has_key?(claims, "permissions")
  end
end