import requests
import json
from urllib.parse import urljoin
from core.config.config_manager import ConfigManager
from core.utils.logger import Logger
from core.exceptions.security_test_exception import SecurityTestException
class SecurityTester:
def __init__(self):
self.config = ConfigManager()
self.logger = Logger.get_logger()
self.base_url = self.config.get('api.base_url', 'https://example.com')
self.timeout = self.config.get('api.timeout', 30)
def run_tests(self):
"""运行所有安全测试"""
self.logger.info("开始执行安全测试")
test_results = []
# SQL注入测试
test_results.extend(self.run_sql_injection_tests())
# XSS测试
test_results.extend(self.run_xss_tests())
# CSRF测试
test_results.extend(self.run_csrf_tests())
# 敏感信息泄露测试
test_results.extend(self.run_info_leakage_tests())
# 认证和授权测试
test_results.extend(self.run_auth_tests())
self.logger.info(f"安全测试完成,共执行 {len(test_results)} 个测试用例")
return test_results
def run_sql_injection_tests(self):
"""运行SQL注入测试"""
self.logger.info("执行SQL注入测试")
results = []
# 测试SQL注入 payloads
sql_payloads = [
"' OR '1'='1",
"' OR '1'='1' --",
"' UNION SELECT NULL, username, password FROM users --",
"'; DROP TABLE users; --",
"' OR 1=1; --",
"admin' --",
"admin' #",
"' OR 'a'='a",
"' OR 1=1 LIMIT 1 --",
"' OR ''='"
]
# 测试端点(根据实际应用调整)
test_endpoints = [
"/login",
"/search",
"/user/profile",
"/products"
]
for endpoint in test_endpoints:
url = urljoin(self.base_url, endpoint)
for payload in sql_payloads:
test_name = f"SQL注入测试 - {endpoint} - {payload}"
try:
# 测试GET参数
params = {'q': payload, 'id': payload}
response = requests.get(url, params=params, timeout=self.timeout)
# 检查响应中是否包含数据库错误信息
db_errors = [
"sql syntax", "mysql_fetch", "ora-01756",
"postgresql", "microsoft odbc", "odbc driver",
"jdbc", "database error", "syntax error"
]
vulnerability_found = any(error in response.text.lower() for error in db_errors)
if vulnerability_found:
result = {
'name': test_name,
'status': 'FAIL',
'message': f"可能的SQL注入漏洞发现于 {url}",
'payload': payload,
'response_code': response.status_code
}
else:
result = {
'name': test_name,
'status': 'PASS',
'message': f"未发现SQL注入漏洞于 {url}",
'payload': payload,
'response_code': response.status_code
}
results.append(result)
except Exception as e:
result = {
'name': test_name,
'status': 'ERROR',
'message': f"测试执行出错: {str(e)}",
'payload': payload
}
results.append(result)
return results
def run_xss_tests(self):
"""运行XSS测试"""
self.logger.info("执行XSS测试")
results = []
# XSS测试 payloads
xss_payloads = [
"",
"
",
"