lib/metacredo/check/security/missing_csrf_protection.ex

defmodule MetaCredo.Check.Security.MissingCSRFProtection do
  use MetaCredo.Check,
    category: :security,
    base_priority: :high,
    explanations: [
      check: """
      Detects state-changing endpoints without CSRF protection (CWE-352).

      Identifies code patterns where state-changing HTTP operations (POST, PUT,
      PATCH, DELETE) are handled without CSRF token validation.
      """,
      params: [],
      examples: [
        elixir: [
          wrong: """
          # POST handler with no CSRF token validation
          def create(conn, params) do
            Accounts.create_user(params)
            redirect(conn, to: ~p"/users")
          end
          """,
          correct: """
          # Phoenix includes Plug.CSRFProtection by default in the browser pipeline;
          # ensure it is not bypassed and include the CSRF meta tag in your layout.
          # In forms, use Phoenix.HTML.Tag.csrf_meta_tag/0 or the built-in form helper.
          def create(conn, params) do
            # CSRF is validated automatically by the browser pipeline plug
            Accounts.create_user(params)
            redirect(conn, to: ~p"/users")
          end
          """
        ]
      ]
    ]

  @state_changing_methods ~W[post put patch delete]

  @csrf_indicators ~W[
    csrf token protect_from_forgery
    verify_authenticity_token antiforgery
    ValidateAntiForgeryToken csurf
    csrf_protect csrf_token csrf_exempt
    x-csrf-token _csrf
  ]

  @state_changing_actions ~W[
    create update delete destroy
    save insert remove edit
    post put patch
  ]

  @impl true
  def run(%SourceFile{} = source_file, _params) do
    {_, issues} =
      source_file
      |> SourceFile.ast()
      |> AST.prewalk([], fn node, acc -> traverse(node, acc, source_file) end)

    issues
  end

  # Detect function definitions for state-changing actions without CSRF check
  defp traverse({:function_def, meta, body} = node, issues, source_file)
       when is_list(meta) do
    func_name = Keyword.get(meta, :name, "")

    if state_changing_action?(func_name) do
      body_list = if is_list(body), do: body, else: [body]

      if has_csrf_check?(body_list) do
        {node, issues}
      else
        line = Keyword.get(meta, :line)

        {node,
         [
           format_issue(source_file,
             message: "Potential missing CSRF protection: state-changing action '#{func_name}'",
             trigger: func_name,
             line_no: line,
             metadata: %{cwe: 352, function: func_name}
           )
           | issues
         ]}
      end
    else
      {node, issues}
    end
  end

  # Detect route definitions for state-changing methods
  defp traverse({:function_call, meta, _args} = node, issues, source_file)
       when is_list(meta) do
    func_name = Keyword.get(meta, :name, "")
    func_lower = String.downcase(func_name)

    if func_lower in @state_changing_methods do
      line = Keyword.get(meta, :line)

      {node,
       [
         format_issue(source_file,
           message:
             "Potential missing CSRF protection: #{String.upcase(func_name)} route handler",
           trigger: func_name,
           line_no: line,
           metadata: %{cwe: 352, method: func_name}
         )
         | issues
       ]}
    else
      {node, issues}
    end
  end

  defp traverse(node, issues, _source_file), do: {node, issues}

  # --- Private Helpers ---

  defp state_changing_action?(func_name) when is_binary(func_name) do
    func_lower = String.downcase(func_name)
    Enum.any?(@state_changing_actions, &String.contains?(func_lower, &1))
  end

  defp state_changing_action?(_), do: false

  defp has_csrf_check?(body) when is_list(body) do
    Enum.any?(body, &contains_csrf_check?/1)
  end

  defp contains_csrf_check?(node) do
    case node do
      {:function_call, meta, _args} when is_list(meta) ->
        func_name = Keyword.get(meta, :name, "")
        csrf_indicator?(func_name)

      {:block, _meta, statements} when is_list(statements) ->
        Enum.any?(statements, &contains_csrf_check?/1)

      tuple when is_tuple(tuple) ->
        tuple |> Tuple.to_list() |> Enum.any?(&contains_csrf_check?/1)

      list when is_list(list) ->
        Enum.any?(list, &contains_csrf_check?/1)

      _ ->
        false
    end
  end

  defp csrf_indicator?(name) when is_binary(name) do
    name_lower = String.downcase(name)

    Enum.any?(@csrf_indicators, fn ind ->
      String.contains?(name_lower, String.downcase(ind))
    end)
  end

  defp csrf_indicator?(_), do: false
end