defmodule Ash.Error.Forbidden.Policy do
@moduledoc "Raised when policy authorization for an action fails"
require Logger
use Ash.Error.Exception
alias Ash.Policy.Policy
def_ash_error(
[
scenarios: [],
facts: %{},
filter: nil,
policy_breakdown?: false,
must_pass_strict_check?: false,
for_fields: nil,
context_description: nil,
policies: [],
resource: nil,
action: nil,
changeset_doesnt_match_filter: false
],
class: :forbidden
)
def exception(opts) do
exception =
super(Keyword.put(opts, :policy_breakdown?, Ash.Policy.Info.show_policy_breakdowns?()))
case Ash.Policy.Info.log_policy_breakdowns() do
nil ->
:ok
level ->
Logger.log(level, report(exception, help_text?: false))
end
exception
end
@help_text """
A check status of `?` implies that the solver did not need to determine that check.
Some checks may look like they failed when in reality there was no need to check them.
Look for policies with `✘` and `✓` in check statuses.
A check with a `⬇` means that it didn't determine if the policy was authorized or forbidden, and so moved on to the next check.
`🌟` and `⛔` mean that the check was responsible for producing an authorized or forbidden (respectively) status.
When viewing successful authorization breakdowns, a `🔎` means that the policy or check was enforced via a filter.
If no check results in a status (they all have `⬇`) then the policy is assumed to have failed. In some cases, however, the policy
may have just been ignored, as described above.
"""
@doc """
Print a report of an authorization failure from a forbidden error
Options:
- `:help_text?`: Defaults to true. Displays help text at the top of the policy breakdown.
"""
def report(error, opts \\ []) do
error
|> get_errors()
|> case do
[] ->
"No policy errors"
errors ->
error_lines =
errors
|> Enum.map(fn
%{
facts: facts,
filter: filter,
policies: policies,
must_pass_strict_check?: must_pass_strict_check?
} ->
get_breakdown(
facts,
filter,
policies,
Keyword.merge(opts,
must_pass_strict_check?: must_pass_strict_check?,
context_description: error.context_description,
for_fields: error.for_fields
)
)
end)
|> Enum.intersperse("\n\n")
[title_line(error), "\n", error_lines]
|> IO.iodata_to_binary()
|> String.trim()
end
end
@doc """
Print a report of an authorization failure from authorization information.
Options:
- `:help_text?`: Defaults to true. Displays help text at the top of the policy breakdown.
- `:success?`: Defaults to false. Changes the messaging/graphics around to indicate successful policy authorization.
- `:must_pass_strict_check?`: Defaults to false. Adds a message about this authorization requiring passing strict check.
"""
def get_breakdown(facts, filter, policies, opts \\ []) do
must_pass_strict_check? =
if opts[:must_pass_strict_check?] do
"""
Scenario must pass strict check only, meaning `runtime` policies cannot be checked.
This requirement is generally used for filtering on related resources, when we can't fetch those
related resources to run `runtime` policies. For this reason, you generally want your primary read
actions on your resources to have standard policies which can be checked statically (like `actor_attribute_equals`)
in addition to filter policies, like `expr(foo == :bar)`.
"""
else
""
end
policy_context_description =
cond do
opts[:for_fields] && opts[:context_description] ->
" for #{opts[:context_description]} fields: #{inspect(opts[:for_fields])} "
opts[:for_fields] ->
" for fields: #{inspect(opts[:for_fields])} "
opts[:context_description] ->
" for #{opts[:context_description]}:"
true ->
""
end
policy_breakdown_title =
if Keyword.get(opts, :help_text?, true) do
["Policy Breakdown#{policy_context_description}", @help_text]
else
"Policy Breakdown#{policy_context_description}"
end
policy_explanation =
policies
|> Enum.filter(&relevant?(&1, facts))
|> Enum.map(&explain_policy(&1, facts, opts[:success?] || false))
|> Enum.intersperse("\n")
|> title(policy_breakdown_title, false)
filter =
if filter do
title(
"#{nicely_formatted_filter(filter)}",
"Generated Filter"
)
else
""
end
[must_pass_strict_check?, filter, policy_explanation]
|> Enum.filter(& &1)
|> Enum.intersperse("\n\n")
end
defp nicely_formatted_filter([{:or, list}]) when is_list(list) do
"(" <> Enum.map_join(list, " or ", &nicely_formatted_filter/1) <> ")"
end
defp nicely_formatted_filter([{:and, list}]) when is_list(list) do
"(" <> Enum.map_join(list, " and ", &nicely_formatted_filter/1) <> ")"
end
defp nicely_formatted_filter(value) do
inspect(value)
end
defp title_line(error) do
cond do
error.resource && error.action ->
"#{inspect(error.resource)}.#{action_name(error.action)}"
error.resource ->
"#{inspect(error.resource)}"
error.action ->
"#{action_name(error.action)}"
end
end
defp action_name(%{name: name}), do: name
defp action_name(name), do: name
defp relevant?(policy, facts) do
Enum.all?(policy.condition || [], fn condition ->
Policy.fetch_fact(facts, condition) == {:ok, true}
end)
end
defp title(other, title, semicolon \\ true)
defp title([], _, _), do: []
defp title(other, title, true), do: [title, ":\n", other]
defp title(other, title, false), do: [title, "\n", other]
defp explain_policy(policy, facts, success?) do
bypass =
if policy.bypass? do
"Bypass: "
else
""
end
{condition_description, applies} = describe_conditions(policy.condition, facts)
if applies == true do
{description, state} = describe_checks(policy.policies, facts, success?)
tag =
case state do
:unknown ->
# In successful cases, this means we must have filtered
"🔎"
:authorized ->
"🌟"
:forbidden ->
"⛔"
end
title(
[Enum.map(condition_description, &[" ", &1]), Enum.map(description, &[" ", &1])],
[
" ",
bypass,
policy.description || "Policy",
" | ",
tag
]
)
else
tag =
if applies == false do
"⬇️"
else
"?"
end
title(
Enum.map(condition_description, &[" ", &1]),
[" ", bypass, policy.description || "Policy", " | ", tag]
)
end
end
defp describe_conditions(condition, facts) do
condition
|> List.wrap()
|> case do
[{Ash.Policy.Check.Static, opts}] ->
{[], opts[:result]}
conditions ->
conditions
|> Enum.reduce({[], true}, fn condition, {conditions, status} ->
{mod, opts} =
case condition do
%{module: module, opts: opts} ->
{module, opts}
{module, opts} ->
{module, opts}
end
new_status =
if status in [false, :unknown] do
false
else
case Policy.fetch_fact(facts, {mod, opts}) do
{:ok, true} ->
true
{:ok, false} ->
false
_ ->
:unknown
end
end
{[["condition: ", mod.describe(opts)] | conditions], new_status}
end)
|> then(fn {conditions, status} ->
conditions =
conditions
|> Enum.reverse()
|> case do
[] ->
[]
conditions ->
[
conditions
|> Enum.intersperse("\n"),
"\n"
]
end
{conditions, status}
end)
end
end
defp describe_checks(checks, facts, success?) do
{description, state} =
Enum.reduce(checks, {[], :unknown}, fn check, {descriptions, state} ->
new_state =
case state do
:unknown ->
new_state(
check.type,
Policy.fetch_fact(facts, check.check)
)
other ->
other
end
filter_check? = function_exported?(elem(check.check, 0), :auto_filter, 3)
tag =
case {state, new_state} do
{:unknown, :authorized} ->
"🌟"
{:unknown, :forbidden} ->
"⛔"
{:unknown, :unknown} ->
if (success? && filter_check? && Policy.fetch_fact(facts, check.check) == :error) ||
{:ok, :unknown} do
"🔎"
else
"⬇"
end
_ ->
""
end
{[
describe_check(
check,
Policy.fetch_fact(facts, check.check),
tag,
success?,
filter_check?
)
| descriptions
], new_state}
end)
{Enum.intersperse(Enum.reverse(description), "\n"), state}
end
defp describe_check(check, fact_result, tag, success?, filter_check?) do
fact_result =
case fact_result do
{:ok, true} ->
"✓"
{:ok, false} ->
"✘"
{:ok, :unknown} ->
unknown_or_error_glyph(success?, filter_check?)
:error ->
unknown_or_error_glyph(success?, filter_check?)
end
[
check_type(check),
": ",
check.check_module.describe(check.check_opts),
" | ",
fact_result,
" | ",
tag
]
end
defp unknown_or_error_glyph(success?, filter_check?) when success? and filter_check?, do: "✓"
defp unknown_or_error_glyph(_, _), do: "?"
defp check_type(%{type: :authorize_if}), do: "authorize if"
defp check_type(%{type: :forbid_if}), do: "forbid if"
defp check_type(%{type: :authorize_unless}), do: "authorize unless"
defp check_type(%{type: :forbid_unless}), do: "forbid unless"
defp new_state(:authorize_if, {:ok, true}), do: :authorized
defp new_state(:forbid_if, {:ok, true}), do: :forbidden
defp new_state(:authorize_unless, {:ok, false}), do: :authorized
defp new_state(:forbid_unless, {:ok, false}), do: :forbidden
defp new_state(_, _), do: :unknown
defp get_errors(%Ash.Error.Forbidden{errors: errors}) do
Enum.flat_map(errors || [], fn error ->
get_errors(error)
end)
end
defp get_errors(%__MODULE__{} = error) do
[error]
end
defp get_errors(_), do: []
defimpl Ash.ErrorKind do
def id(_), do: Ecto.UUID.generate()
def message(error) do
if error.policy_breakdown? do
"forbidden:\n\n#{Ash.Error.Forbidden.Policy.report(error, help_text?: false)}"
else
"forbidden"
end
end
def code(_), do: "forbidden_by_policy"
end
end