import requests import json from urllib.parse import urljoin from core.config.config_manager import ConfigManager from core.utils.logger import Logger from core.exceptions.security_test_exception import SecurityTestException class SecurityTester: def __init__(self): self.config = ConfigManager() self.logger = Logger.get_logger() self.base_url = self.config.get('api.base_url', 'https://example.com') self.timeout = self.config.get('api.timeout', 30) def run_tests(self): """运行所有安全测试""" self.logger.info("开始执行安全测试") test_results = [] # SQL注入测试 test_results.extend(self.run_sql_injection_tests()) # XSS测试 test_results.extend(self.run_xss_tests()) # CSRF测试 test_results.extend(self.run_csrf_tests()) # 敏感信息泄露测试 test_results.extend(self.run_info_leakage_tests()) # 认证和授权测试 test_results.extend(self.run_auth_tests()) self.logger.info(f"安全测试完成,共执行 {len(test_results)} 个测试用例") return test_results def run_sql_injection_tests(self): """运行SQL注入测试""" self.logger.info("执行SQL注入测试") results = [] # 测试SQL注入 payloads sql_payloads = [ "' OR '1'='1", "' OR '1'='1' --", "' UNION SELECT NULL, username, password FROM users --", "'; DROP TABLE users; --", "' OR 1=1; --", "admin' --", "admin' #", "' OR 'a'='a", "' OR 1=1 LIMIT 1 --", "' OR ''='" ] # 测试端点(根据实际应用调整) test_endpoints = [ "/login", "/search", "/user/profile", "/products" ] for endpoint in test_endpoints: url = urljoin(self.base_url, endpoint) for payload in sql_payloads: test_name = f"SQL注入测试 - {endpoint} - {payload}" try: # 测试GET参数 params = {'q': payload, 'id': payload} response = requests.get(url, params=params, timeout=self.timeout) # 检查响应中是否包含数据库错误信息 db_errors = [ "sql syntax", "mysql_fetch", "ora-01756", "postgresql", "microsoft odbc", "odbc driver", "jdbc", "database error", "syntax error" ] vulnerability_found = any(error in response.text.lower() for error in db_errors) if vulnerability_found: result = { 'name': test_name, 'status': 'FAIL', 'message': f"可能的SQL注入漏洞发现于 {url}", 'payload': payload, 'response_code': response.status_code } else: result = { 'name': test_name, 'status': 'PASS', 'message': f"未发现SQL注入漏洞于 {url}", 'payload': payload, 'response_code': response.status_code } results.append(result) except Exception as e: result = { 'name': test_name, 'status': 'ERROR', 'message': f"测试执行出错: {str(e)}", 'payload': payload } results.append(result) return results def run_xss_tests(self): """运行XSS测试""" self.logger.info("执行XSS测试") results = [] # XSS测试 payloads xss_payloads = [ "", "", "", "javascript:alert('XSS')", "", "