defmodule MetaCredo.Check.Security.UnrestrictedFileUpload do
use MetaCredo.Check,
category: :security,
base_priority: :higher,
explanations: [
check: """
Detects unrestricted file upload vulnerabilities (CWE-434).
Identifies code patterns where file uploads are processed without proper
validation of file type, size, or content, potentially allowing attackers
to upload executable files or web shells.
""",
params: [],
examples: [
elixir: [
wrong: """
# No validation -- attacker can upload .php, .exe, or oversized files
def upload(conn, %{"file" => upload}) do
File.write!("/var/uploads/" <> upload.filename, upload.binary)
json(conn, %{ok: true})
end
""",
correct: """
@allowed_extensions ~w(.jpg .jpeg .png .gif .webp)
@max_size_bytes 5 * 1024 * 1024 # 5 MB
def upload(conn, %{"file" => upload}) do
ext = Path.extname(upload.filename) |> String.downcase()
cond do
ext not in @allowed_extensions ->
send_resp(conn, 422, "File type not allowed")
upload.size > @max_size_bytes ->
send_resp(conn, 422, "File too large")
true ->
safe_name = Ecto.UUID.generate() <> ext
File.write!("/var/uploads/" <> safe_name, upload.binary)
json(conn, %{path: safe_name})
end
end
"""
]
]
]
@file_save_functions ~W[
write write! copy copy! stream!
save save! store store!
File.write File.copy File.stream
move_uploaded_file transferTo attach
saveAs SaveAs DownloadTo
put_object upload_file
move rename
]
@validation_indicators ~W[
extname extension content_type mime_type
file_type allowed_types valid_extension
file_size size max_size limit
validate_upload check_file verify_file
allowed? valid? acceptable?
]
@upload_patterns ~W[
upload uploaded file attachment
multipart form_data formdata
plug_upload phoenix_upload
]
@impl true
def run(%SourceFile{} = source_file, _params) do
{_, issues} =
source_file
|> SourceFile.ast()
|> AST.prewalk([], fn node, acc -> traverse(node, acc, source_file) end)
issues
end
# Detect function definitions handling uploads without validation
defp traverse({:function_def, meta, body} = node, issues, source_file)
when is_list(meta) do
func_name = Keyword.get(meta, :name, "")
params = Keyword.get(meta, :params, [])
if handles_file_upload?(func_name, params) do
body_list = if is_list(body), do: body, else: [body]
has_validation? = has_upload_validation?(body_list)
has_save_op? = has_file_save_operation?(body_list)
if has_save_op? and not has_validation? do
line = Keyword.get(meta, :line)
{node,
[
format_issue(source_file,
message: "Unrestricted file upload: '#{func_name}' saves files without validation",
trigger: func_name,
line_no: line,
severity: :error,
metadata: %{cwe: 434, function: func_name}
)
| issues
]}
else
{node, issues}
end
else
{node, issues}
end
end
# Detect direct file saves with upload-like variables
defp traverse({:function_call, meta, args} = node, issues, source_file)
when is_list(meta) do
func_name = Keyword.get(meta, :name, "")
if file_save_function?(func_name) and involves_upload?(args) do
line = Keyword.get(meta, :line)
{node,
[
format_issue(source_file,
message: "Potential unrestricted file upload: '#{func_name}' with uploaded content",
trigger: func_name,
line_no: line,
metadata: %{cwe: 434, function: func_name}
)
| issues
]}
else
{node, issues}
end
end
defp traverse(node, issues, _source_file), do: {node, issues}
# --- Private Helpers ---
defp handles_file_upload?(func_name, params) when is_binary(func_name) do
func_lower = String.downcase(func_name)
String.contains?(func_lower, "upload") or
String.contains?(func_lower, "import") or
String.contains?(func_lower, "attach") or
has_upload_param?(params)
end
defp handles_file_upload?(_, _), do: false
defp has_upload_param?(params) when is_list(params) do
Enum.any?(params, fn
{:param, _, name} when is_binary(name) ->
name_lower = String.downcase(name)
Enum.any?(@upload_patterns, &String.contains?(name_lower, &1))
_ ->
false
end)
end
defp has_upload_param?(_), do: false
defp file_save_function?(func_name) when is_binary(func_name) do
func_lower = String.downcase(func_name)
Enum.any?(@file_save_functions, fn pattern ->
String.contains?(func_lower, String.downcase(pattern))
end)
end
defp file_save_function?(_), do: false
defp has_upload_validation?(body) when is_list(body) do
Enum.any?(body, &contains_validation?/1)
end
defp contains_validation?(node) do
case node do
{:function_call, meta, _args} when is_list(meta) ->
func_name = Keyword.get(meta, :name, "")
validation_function?(func_name)
{:conditional, _meta, [condition | _branches]} ->
contains_validation?(condition) or involves_size_or_type_check?(condition)
{:binary_op, meta, [left, right]} when is_list(meta) ->
operator = Keyword.get(meta, :operator)
if operator in [:in, :==, :===, :>, :<, :>=, :<=] do
involves_validation_variable?(left) or involves_validation_variable?(right)
else
contains_validation?(left) or contains_validation?(right)
end
{:case, _meta, _children} ->
true
{:block, _meta, statements} when is_list(statements) ->
Enum.any?(statements, &contains_validation?/1)
{:pipe, _meta, stages} when is_list(stages) ->
Enum.any?(stages, &contains_validation?/1)
tuple when is_tuple(tuple) ->
tuple |> Tuple.to_list() |> Enum.any?(&contains_validation?/1)
list when is_list(list) ->
Enum.any?(list, &contains_validation?/1)
_ ->
false
end
end
defp validation_function?(func_name) when is_binary(func_name) do
func_lower = String.downcase(func_name)
Enum.any?(@validation_indicators, fn ind ->
String.contains?(func_lower, String.downcase(ind))
end)
end
defp validation_function?(_), do: false
defp involves_size_or_type_check?(node) do
case node do
{:function_call, meta, _} when is_list(meta) ->
func_name = Keyword.get(meta, :name, "")
func_lower = String.downcase(func_name)
String.contains?(func_lower, "size") or
String.contains?(func_lower, "type") or
String.contains?(func_lower, "ext")
{:attribute_access, _meta, children} when is_list(children) ->
Enum.any?(children, &involves_validation_variable?/1)
_ ->
false
end
end
defp involves_validation_variable?(node) do
case node do
{:variable, _meta, name} when is_binary(name) ->
name_lower = String.downcase(name)
Enum.any?(@validation_indicators, &String.contains?(name_lower, &1))
{:function_call, meta, _} when is_list(meta) ->
func_name = Keyword.get(meta, :name, "")
validation_function?(func_name)
{:attribute_access, _meta, children} when is_list(children) ->
Enum.any?(children, fn
{:literal, _, attr} when is_binary(attr) or is_atom(attr) ->
attr_lower = to_string(attr) |> String.downcase()
Enum.any?(@validation_indicators, &String.contains?(attr_lower, &1))
other ->
involves_validation_variable?(other)
end)
_ ->
false
end
end
defp has_file_save_operation?(body) when is_list(body) do
Enum.any?(body, &contains_file_save?/1)
end
defp contains_file_save?(node) do
case node do
{:function_call, meta, _args} when is_list(meta) ->
func_name = Keyword.get(meta, :name, "")
file_save_function?(func_name)
{:pipe, _meta, stages} when is_list(stages) ->
Enum.any?(stages, &contains_file_save?/1)
{:block, _meta, statements} when is_list(statements) ->
Enum.any?(statements, &contains_file_save?/1)
tuple when is_tuple(tuple) ->
tuple |> Tuple.to_list() |> Enum.any?(&contains_file_save?/1)
list when is_list(list) ->
Enum.any?(list, &contains_file_save?/1)
_ ->
false
end
end
defp involves_upload?(args) when is_list(args) do
Enum.any?(args, fn
{:variable, _meta, name} when is_binary(name) ->
name_lower = String.downcase(name)
Enum.any?(@upload_patterns, &String.contains?(name_lower, &1))
{:attribute_access, _meta, children} when is_list(children) ->
Enum.any?(children, fn
{:variable, _, name} when is_binary(name) ->
name_lower = String.downcase(name)
Enum.any?(@upload_patterns, &String.contains?(name_lower, &1))
_ ->
false
end)
_ ->
false
end)
end
defp involves_upload?(_), do: false
end