defmodule Bylaw.Ecto.Changeset do
@moduledoc """
Extracts conservative changeset candidates from Ecto schema source files.
A candidate is a function whose source AST directly calls `cast/3`,
`cast/4`, `Ecto.Changeset.cast/3`, `Ecto.Changeset.cast/4`, `change/2`, or
`Ecto.Changeset.change/2`. Literal cast/change fields are extracted for
comparison with database constraints. Dynamic field lists are treated as
unverifiable by returning an empty field list for v1.
"""
defmodule Candidate do
@moduledoc """
A changeset-producing function found in a schema module.
"""
@type t :: %__MODULE__{
module: module(),
function: atom(),
arity: non_neg_integer(),
fields: list(atom()),
constraints: list(Bylaw.Ecto.Changeset.ConstraintCall.t())
}
defstruct module: nil,
function: nil,
arity: 0,
fields: [],
constraints: []
end
defmodule ConstraintCall do
@moduledoc """
A direct Ecto changeset constraint helper call.
"""
@type kind :: :unique | :foreign_key | :check | :exclusion
@type match :: :exact | :suffix | :prefix
@type t :: %__MODULE__{
kind: kind(),
fields: list(atom()),
name: String.t() | Regex.t() | nil,
match: match()
}
defstruct kind: nil,
fields: [],
name: nil,
match: :exact
end
@constraint_calls %{
unique_constraint: :unique,
foreign_key_constraint: :foreign_key,
check_constraint: :check,
exclusion_constraint: :exclusion
}
@doc """
Returns changeset candidates found for the given schema modules and source paths.
"""
@spec candidates(paths :: list(Path.t()), modules :: list(module())) :: list(Candidate.t())
def candidates(paths, modules) when is_list(paths) and is_list(modules) do
module_set = MapSet.new(modules)
paths
|> source_files()
|> Enum.flat_map(&candidates_in_file(&1, module_set))
|> Enum.sort_by(&{inspect(&1.module), &1.function, &1.arity})
end
defp source_files(paths) do
paths
|> Enum.flat_map(fn path ->
cond do
File.dir?(path) ->
path
|> Path.join("**/*.{ex,exs}")
|> Path.wildcard()
File.regular?(path) ->
[path]
true ->
[]
end
end)
|> Enum.uniq()
end
defp candidates_in_file(path, module_set) do
with {:ok, source} <- File.read(path),
{:ok, quoted} <- Code.string_to_quoted(source) do
modules_in_ast(quoted, module_set)
else
_error -> []
end
end
defp modules_in_ast(ast, module_set) do
{_ast, candidates} =
Macro.prewalk(ast, [], fn
{:defmodule, _meta, [module_ast, [do: body]]} = node, candidates ->
module = module_name(module_ast)
if MapSet.member?(module_set, module) do
{node, function_candidates(module, body) ++ candidates}
else
{node, candidates}
end
node, candidates ->
{node, candidates}
end)
candidates
end
defp module_name({:__aliases__, _meta, parts}), do: Module.concat(parts)
defp module_name(module) when is_atom(module), do: module
defp module_name(_ast), do: nil
defp function_candidates(module, {:__block__, _meta, expressions}) do
Enum.flat_map(expressions, &function_candidate(module, &1))
end
defp function_candidates(module, expression), do: function_candidate(module, expression)
defp function_candidate(module, {kind, _meta, [head, [do: body]]}) when kind in [:def, :defp] do
{function, arity} = function_name_arity(head)
fields = candidate_fields(body)
if Enum.empty?(fields) do
[]
else
[
%Candidate{
module: module,
function: function,
arity: arity,
fields: fields,
constraints: constraint_calls(body)
}
]
end
end
defp function_candidate(_module, _expression), do: []
defp function_name_arity({:when, _meta, [head | _guards]}), do: function_name_arity(head)
defp function_name_arity({name, _meta, args}) when is_atom(name) do
{name, Enum.count(args || [])}
end
defp function_name_arity(_head), do: {nil, 0}
defp candidate_fields(ast) do
ast
|> collect_calls(&cast_or_change_fields/1)
|> List.flatten()
|> Enum.uniq()
|> Enum.sort()
end
defp constraint_calls(ast) do
ast
|> collect_calls(&constraint_call/1)
|> Enum.reject(&is_nil/1)
end
defp collect_calls(ast, mapper) do
{_ast, values} =
Macro.prewalk(ast, [], fn node, values ->
case mapper.(node) do
[] -> {node, values}
nil -> {node, values}
value -> {node, [value | values]}
end
end)
Enum.reverse(values)
end
defp cast_or_change_fields({name, _meta, args})
when name in [:cast, :change] and is_list(args) do
local_changeset_fields(name, args)
end
defp cast_or_change_fields({{:., _meta, [module, name]}, _call_meta, args})
when name in [:cast, :change] and is_list(args) do
if ecto_changeset_module?(module) do
remote_changeset_fields(name, args)
else
[]
end
end
defp cast_or_change_fields(_node), do: []
defp local_changeset_fields(:cast, args), do: cast_fields(args)
defp local_changeset_fields(:change, [changes]), do: literal_change_fields(changes)
defp local_changeset_fields(:change, [_data, changes]), do: literal_change_fields(changes)
defp local_changeset_fields(_name, _args), do: []
defp remote_changeset_fields(:cast, args), do: cast_fields(args)
defp remote_changeset_fields(:change, [changes]), do: literal_change_fields(changes)
defp remote_changeset_fields(:change, [_data, changes]), do: literal_change_fields(changes)
defp remote_changeset_fields(_name, _args), do: []
defp cast_fields([_params, fields]), do: literal_atom_list(fields)
defp cast_fields([_first, second, third]) do
case literal_atom_list(second) do
[] -> literal_atom_list(third)
fields -> fields
end
end
defp cast_fields([_data, _params, fields, _opts]), do: literal_atom_list(fields)
defp cast_fields(_args), do: []
defp literal_atom_list(fields) when is_list(fields) do
if Enum.all?(fields, &is_atom/1), do: fields, else: []
end
defp literal_atom_list(_fields), do: []
defp literal_change_fields({:%{}, _meta, pairs}) when is_list(pairs) do
pairs
|> Enum.flat_map(fn
{key, _value} when is_atom(key) -> [key]
_pair -> []
end)
|> Enum.sort()
end
defp literal_change_fields(changes) when is_list(changes) do
if Keyword.keyword?(changes), do: Keyword.keys(changes), else: []
end
defp literal_change_fields(_changes), do: []
defp constraint_call({name, _meta, args})
when is_map_key(@constraint_calls, name) and is_list(args) do
constraint_call(Map.fetch!(@constraint_calls, name), args)
end
defp constraint_call({{:., _meta, [module, name]}, _call_meta, args})
when is_map_key(@constraint_calls, name) and is_list(args) do
if ecto_changeset_module?(module) do
constraint_call(Map.fetch!(@constraint_calls, name), args)
end
end
defp constraint_call(_node), do: nil
defp constraint_call(kind, args) do
{field_arg, opts_arg} = constraint_args(args)
%ConstraintCall{
kind: kind,
fields: constraint_fields(field_arg),
name: constraint_name(opts_arg),
match: constraint_match(opts_arg)
}
end
defp constraint_args([field]), do: {field, []}
defp constraint_args([field, opts]) when is_list(opts), do: {field, opts}
defp constraint_args([_changeset, field]), do: {field, []}
defp constraint_args([_changeset, field, opts]), do: {field, opts}
defp constraint_args(_args), do: {nil, []}
defp constraint_fields(field) when is_atom(field), do: [field]
defp constraint_fields(fields) when is_list(fields), do: literal_atom_list(fields)
defp constraint_fields(_field), do: []
defp constraint_name(opts) when is_list(opts) do
case Keyword.fetch(opts, :name) do
{:ok, name} when is_atom(name) -> Atom.to_string(name)
{:ok, name} when is_binary(name) -> name
{:ok, name} -> literal_regex(name)
_other -> nil
end
end
defp constraint_name(_opts), do: nil
defp constraint_match(opts) when is_list(opts) do
case Keyword.fetch(opts, :match) do
{:ok, match} when match in [:exact, :suffix, :prefix] -> match
_other -> :exact
end
end
defp constraint_match(_opts), do: :exact
defp literal_regex({:sigil_r, _meta, [{:<<>>, _string_meta, [source]}, modifiers]})
when is_binary(source) and is_list(modifiers) do
case Regex.compile(source, to_string(modifiers)) do
{:ok, regex} -> regex
{:error, _reason} -> nil
end
end
defp literal_regex(_name), do: nil
defp ecto_changeset_module?({:__aliases__, _meta, [:Ecto, :Changeset]}), do: true
defp ecto_changeset_module?(Ecto.Changeset), do: true
defp ecto_changeset_module?(_module), do: false
end