lib/ash/error/forbidden/policy.ex

defmodule Ash.Error.Forbidden.Policy do
  @moduledoc "Raised when policy authorization for an action fails"

  require Logger
  use Ash.Error.Exception

  alias Ash.Policy.Policy

  def_ash_error(
    [
      scenarios: [],
      facts: %{},
      filter: nil,
      policy_breakdown?: false,
      must_pass_strict_check?: false,
      policies: [],
      resource: nil,
      action: nil
    ],
    class: :forbidden
  )

  def exception(opts) do
    exception =
      super(Keyword.put(opts, :policy_breakdown?, Ash.Policy.Info.show_policy_breakdowns?()))

    case Ash.Policy.Info.log_policy_breakdowns() do
      nil ->
        :ok

      level ->
        Logger.log(level, report(exception, help_text?: false))
    end

    exception
  end

  @help_text """

  A check status of `?` implies that the solver did not need to determine that check.
  Some checks may look like they failed when in reality there was simply no need to check them.
  Look for policies with `✘` and `✓` in check statuses.

  A check with a `⬇` means that it didn't determine if the policy was authorized or forbidden, and so moved on to the next check.
  `🌟` and `⛔` mean that the check was responsible for producing an authorized or forbidden (respectively) status.

  If no check results in a status (they all have `⬇`) then the policy is assumed to have failed. In some cases, however, the policy
  may have just been ignored, as described above.
  """

  @doc """
  Print a report of an authorization failure

  Options:

  - `:help_text?`: Defaults to true. Displays help text at the top of the policy breakdown.
  """
  def report(error, opts \\ []) do
    error
    |> get_errors()
    |> case do
      [] ->
        "No policy errors"

      errors ->
        error_lines =
          errors
          |> Enum.map(fn
            %{
              facts: facts,
              filter: filter,
              policies: policies,
              must_pass_strict_check?: must_pass_strict_check?
            } ->
              must_pass_strict_check? =
                if must_pass_strict_check? do
                  """
                  Scenario must pass strict check only, meaning `runtime` policies cannot be checked.

                  This requirement is generally used for filtering on related resources, when we can't fetch those
                  related resources to run `runtime` policies. For this reason, you generally want your primary read
                  actions on your resources to have standard policies which can be checked statically (like `actor_attribute_equals`)
                  in addition to filter policies, like `expr(foo == :bar)`.
                  """
                else
                  ""
                end

              policy_breakdown_title =
                if Keyword.get(opts, :help_text?, true) do
                  ["Policy Breakdown", @help_text]
                else
                  "Policy Breakdown"
                end

              policy_explanation =
                policies
                |> Enum.filter(&relevant?(&1, facts))
                |> Enum.map(&explain_policy(&1, facts))
                |> Enum.intersperse("\n")
                |> title(policy_breakdown_title, false)

              filter =
                if filter do
                  title(
                    "Did not match filter expression #{inspect(filter)}",
                    "Generated Filter"
                  )
                else
                  ""
                end

              [must_pass_strict_check?, filter, policy_explanation]
              |> Enum.filter(& &1)
              |> Enum.intersperse("\n\n")
          end)
          |> Enum.intersperse("\n\n")

        [title_line(error), "\n", error_lines]
        |> IO.iodata_to_binary()
        |> String.trim()
    end
  end

  defp title_line(error) do
    cond do
      error.resource && error.action ->
        "#{inspect(error.resource)}.#{action_name(error.action)}"

      error.resource ->
        "#{inspect(error.resource)}"

      error.action ->
        "#{action_name(error.action)}"
    end
  end

  defp action_name(%{name: name}), do: name
  defp action_name(name), do: name

  defp relevant?(policy, facts) do
    Enum.all?(policy.condition, fn condition ->
      Policy.fetch_fact(facts, condition) == {:ok, true}
    end)
  end

  defp title(other, title, semicolon \\ true)
  defp title([], _, _), do: []
  defp title(other, title, true), do: [title, ":\n", other]
  defp title(other, title, false), do: [title, "\n", other]

  defp explain_policy(policy, facts) do
    bypass =
      if policy.bypass? do
        "Bypass: "
      else
        ""
      end

    {description, state} = describe_checks(policy.policies, facts)

    tag =
      case state do
        :unknown ->
          "⛔"

        :authorized ->
          "🌟"

        :forbidden ->
          "⛔"
      end

    title(Enum.map(description, &["    ", &1]), [
      "  ",
      bypass,
      policy.description || "Policy",
      " | ",
      tag
    ])
  end

  defp describe_checks(checks, facts) do
    {description, state} =
      Enum.reduce(checks, {[], :unknown}, fn check, {descriptions, state} ->
        new_state =
          case state do
            :unknown ->
              new_state(
                check.type,
                Policy.fetch_fact(facts, check.check)
              )

            other ->
              other
          end

        tag =
          case {state, new_state} do
            {:unknown, :authorized} ->
              "🌟"

            {:unknown, :forbidden} ->
              "⛔"

            {:unknown, :unknown} ->
              "⬇"

            _ ->
              ""
          end

        {[describe_check(check, Policy.fetch_fact(facts, check.check), tag) | descriptions],
         new_state}
      end)

    {Enum.intersperse(Enum.reverse(description), "\n"), state}
  end

  defp describe_check(check, fact_result, tag) do
    fact_result =
      case fact_result do
        {:ok, true} ->
          "✓"

        {:ok, false} ->
          "✘"

        :error ->
          "?"
      end

    [
      check_type(check),
      ": ",
      check.check_module.describe(check.check_opts),
      " | ",
      fact_result,
      " | ",
      tag
    ]
  end

  defp check_type(%{type: :authorize_if}), do: "authorize if"
  defp check_type(%{type: :forbid_if}), do: "forbid if"
  defp check_type(%{type: :authorize_unless}), do: "authorize unless"
  defp check_type(%{type: :forbid_unless}), do: "forbid unless"

  defp new_state(:authorize_if, {:ok, true}), do: :authorized
  defp new_state(:forbid_if, {:ok, true}), do: :forbidden
  defp new_state(:authorize_unless, {:ok, false}), do: :authorized
  defp new_state(:forbid_unless, {:ok, false}), do: :forbidden
  defp new_state(_, _), do: :unknown

  defp get_errors(%Ash.Error.Forbidden{errors: errors}) do
    Enum.flat_map(errors || [], fn error ->
      get_errors(error)
    end)
  end

  defp get_errors(%__MODULE__{} = error) do
    [error]
  end

  defp get_errors(_), do: []

  defimpl Ash.ErrorKind do
    def id(_), do: Ecto.UUID.generate()

    def message(error) do
      if error.policy_breakdown? do
        "forbidden:\n\n#{Ash.Error.Forbidden.Policy.report(error, help_text?: false)}"
      else
        "forbidden"
      end
    end

    def code(_), do: "Forbidden"
  end
end