| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- import requests
- import json
- from urllib.parse import urljoin
- from core.config.config_manager import ConfigManager
- from core.utils.logger import Logger
- from core.exceptions.security_test_exception import SecurityTestException
- class SecurityTester:
- def __init__(self):
- self.config = ConfigManager()
- self.logger = Logger.get_logger()
- self.base_url = self.config.get('api.base_url', 'https://example.com')
- self.timeout = self.config.get('api.timeout', 30)
- def run_tests(self):
- """运行所有安全测试"""
- self.logger.info("开始执行安全测试")
- test_results = []
- # SQL注入测试
- test_results.extend(self.run_sql_injection_tests())
- # XSS测试
- test_results.extend(self.run_xss_tests())
- # CSRF测试
- test_results.extend(self.run_csrf_tests())
- # 敏感信息泄露测试
- test_results.extend(self.run_info_leakage_tests())
- # 认证和授权测试
- test_results.extend(self.run_auth_tests())
- self.logger.info(f"安全测试完成,共执行 {len(test_results)} 个测试用例")
- return test_results
- def run_sql_injection_tests(self):
- """运行SQL注入测试"""
- self.logger.info("执行SQL注入测试")
- results = []
- # 测试SQL注入 payloads
- sql_payloads = [
- "' OR '1'='1",
- "' OR '1'='1' --",
- "' UNION SELECT NULL, username, password FROM users --",
- "'; DROP TABLE users; --",
- "' OR 1=1; --",
- "admin' --",
- "admin' #",
- "' OR 'a'='a",
- "' OR 1=1 LIMIT 1 --",
- "' OR ''='"
- ]
- # 测试端点(根据实际应用调整)
- test_endpoints = [
- "/login",
- "/search",
- "/user/profile",
- "/products"
- ]
- for endpoint in test_endpoints:
- url = urljoin(self.base_url, endpoint)
- for payload in sql_payloads:
- test_name = f"SQL注入测试 - {endpoint} - {payload}"
- try:
- # 测试GET参数
- params = {'q': payload, 'id': payload}
- response = requests.get(url, params=params, timeout=self.timeout)
- # 检查响应中是否包含数据库错误信息
- db_errors = [
- "sql syntax", "mysql_fetch", "ora-01756",
- "postgresql", "microsoft odbc", "odbc driver",
- "jdbc", "database error", "syntax error"
- ]
- vulnerability_found = any(error in response.text.lower() for error in db_errors)
- if vulnerability_found:
- result = {
- 'name': test_name,
- 'status': 'FAIL',
- 'message': f"可能的SQL注入漏洞发现于 {url}",
- 'payload': payload,
- 'response_code': response.status_code
- }
- else:
- result = {
- 'name': test_name,
- 'status': 'PASS',
- 'message': f"未发现SQL注入漏洞于 {url}",
- 'payload': payload,
- 'response_code': response.status_code
- }
- results.append(result)
- except Exception as e:
- result = {
- 'name': test_name,
- 'status': 'ERROR',
- 'message': f"测试执行出错: {str(e)}",
- 'payload': payload
- }
- results.append(result)
- return results
- def run_xss_tests(self):
- """运行XSS测试"""
- self.logger.info("执行XSS测试")
- results = []
- # XSS测试 payloads
- xss_payloads = [
- "<script>alert('XSS')</script>",
- "<img src=x onerror=alert('XSS')>",
- "<svg onload=alert('XSS')>",
- "javascript:alert('XSS')",
- "<body onload=alert('XSS')>",
- "<iframe src=javascript:alert('XSS')>",
- "<input onfocus=alert('XSS') autofocus>",
- "<details open ontoggle=alert('XSS')>",
- "<select onfocus=alert('XSS') autofocus>"
- ]
- # 测试端点
- test_endpoints = [
- "/search",
- "/comment",
- "/contact",
- "/profile"
- ]
- for endpoint in test_endpoints:
- url = urljoin(self.base_url, endpoint)
- for payload in xss_payloads:
- test_name = f"XSS测试 - {endpoint} - {payload}"
- try:
- # 测试POST数据
- data = {'comment': payload, 'name': payload, 'search': payload}
- response = requests.post(url, data=data, timeout=self.timeout)
- # 检查响应中是否包含未转义的payload
- vulnerability_found = payload in response.text
- if vulnerability_found:
- result = {
- 'name': test_name,
- 'status': 'FAIL',
- 'message': f"可能的XSS漏洞发现于 {url}",
- 'payload': payload,
- 'response_code': response.status_code
- }
- else:
- result = {
- 'name': test_name,
- 'status': 'PASS',
- 'message': f"未发现XSS漏洞于 {url}",
- 'payload': payload,
- 'response_code': response.status_code
- }
- results.append(result)
- except Exception as e:
- result = {
- 'name': test_name,
- 'status': 'ERROR',
- 'message': f"测试执行出错: {str(e)}",
- 'payload': payload
- }
- results.append(result)
- return results
- def run_csrf_tests(self):
- """运行CSRF测试"""
- self.logger.info("执行CSRF测试")
- results = []
- # 测试需要身份验证的端点
- test_endpoints = [
- "/user/change-password",
- "/user/update-profile",
- "/admin/delete-user"
- ]
- for endpoint in test_endpoints:
- url = urljoin(self.base_url, endpoint)
- test_name = f"CSRF测试 - {endpoint}"
- try:
- # 尝试在没有Referer头的情况下发送请求
- headers = {'Referer': ''}
- response = requests.post(url, headers=headers, timeout=self.timeout)
- # 如果请求成功,可能缺乏CSRF保护
- if response.status_code < 400:
- result = {
- 'name': test_name,
- 'status': 'FAIL',
- 'message': f"可能的CSRF漏洞发现于 {url},请求在没有Referer的情况下成功",
- 'response_code': response.status_code
- }
- else:
- result = {
- 'name': test_name,
- 'status': 'PASS',
- 'message': f"未发现CSRF漏洞于 {url},请求被正确拒绝",
- 'response_code': response.status_code
- }
- results.append(result)
- except Exception as e:
- result = {
- 'name': test_name,
- 'status': 'ERROR',
- 'message': f"测试执行出错: {str(e)}"
- }
- results.append(result)
- return results
- def run_info_leakage_tests(self):
- """运行敏感信息泄露测试"""
- self.logger.info("执行敏感信息泄露测试")
- results = []
- # 常见敏感文件和目录
- sensitive_paths = [
- "/.env",
- "/.git/config",
- "/.htaccess",
- "/web.config",
- "/phpinfo.php",
- "/admin",
- "/backup",
- "/logs",
- "/config.json",
- "/database.yml"
- ]
- for path in sensitive_paths:
- url = urljoin(self.base_url, path)
- test_name = f"敏感信息泄露测试 - {path}"
- try:
- response = requests.get(url, timeout=self.timeout)
- # 检查响应状态码和内容
- if response.status_code == 200:
- # 检查响应内容是否包含敏感信息
- sensitive_keywords = [
- "password", "secret", "key", "token",
- "database", "user", "admin", "config"
- ]
- content = response.text.lower()
- info_leakage = any(keyword in content for keyword in sensitive_keywords)
- if info_leakage:
- result = {
- 'name': test_name,
- 'status': 'FAIL',
- 'message': f"敏感信息可能泄露于 {url}",
- 'response_code': response.status_code
- }
- else:
- result = {
- 'name': test_name,
- 'status': 'PASS',
- 'message': f"未发现敏感信息泄露于 {url}",
- 'response_code': response.status_code
- }
- else:
- result = {
- 'name': test_name,
- 'status': 'PASS',
- 'message': f"敏感路径 {path} 不可访问",
- 'response_code': response.status_code
- }
- results.append(result)
- except Exception as e:
- result = {
- 'name': test_name,
- 'status': 'ERROR',
- 'message': f"测试执行出错: {str(e)}"
- }
- results.append(result)
- return results
- def run_auth_tests(self):
- """运行认证和授权测试"""
- self.logger.info("执行认证和授权测试")
- results = []
- # 需要认证的端点
- protected_endpoints = [
- "/admin/dashboard",
- "/user/profile",
- "/api/user/data"
- ]
- for endpoint in protected_endpoints:
- url = urljoin(self.base_url, endpoint)
- test_name = f"认证测试 - {endpoint}"
- try:
- # 尝试未经认证访问
- response = requests.get(url, timeout=self.timeout)
- # 检查是否重定向到登录页或返回401/403
- if response.status_code in [200, 301, 302]:
- # 检查是否重定向到登录页面
- if 'login' in response.url.lower() or response.history:
- result = {
- 'name': test_name,
- 'status': 'PASS',
- 'message': f"认证保护正常工作于 {url}",
- 'response_code': response.status_code
- }
- else:
- result = {
- 'name': test_name,
- 'status': 'FAIL',
- 'message': f"认证保护可能失效于 {url},未经认证可以访问",
- 'response_code': response.status_code
- }
- elif response.status_code in [401, 403]:
- result = {
- 'name': test_name,
- 'status': 'PASS',
- 'message': f"认证保护正常工作于 {url}",
- 'response_code': response.status_code
- }
- else:
- result = {
- 'name': test_name,
- 'status': 'WARN',
- 'message': f"认证测试返回意外状态码: {response.status_code}",
- 'response_code': response.status_code
- }
- results.append(result)
- except Exception as e:
- result = {
- 'name': test_name,
- 'status': 'ERROR',
- 'message': f"测试执行出错: {str(e)}"
- }
- results.append(result)
- return results
|