security_tester.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. import requests
  2. import json
  3. from urllib.parse import urljoin
  4. from core.config.config_manager import ConfigManager
  5. from core.utils.logger import Logger
  6. from core.exceptions.security_test_exception import SecurityTestException
  7. class SecurityTester:
  8. def __init__(self):
  9. self.config = ConfigManager()
  10. self.logger = Logger.get_logger()
  11. self.base_url = self.config.get('api.base_url', 'https://example.com')
  12. self.timeout = self.config.get('api.timeout', 30)
  13. def run_tests(self):
  14. """运行所有安全测试"""
  15. self.logger.info("开始执行安全测试")
  16. test_results = []
  17. # SQL注入测试
  18. test_results.extend(self.run_sql_injection_tests())
  19. # XSS测试
  20. test_results.extend(self.run_xss_tests())
  21. # CSRF测试
  22. test_results.extend(self.run_csrf_tests())
  23. # 敏感信息泄露测试
  24. test_results.extend(self.run_info_leakage_tests())
  25. # 认证和授权测试
  26. test_results.extend(self.run_auth_tests())
  27. self.logger.info(f"安全测试完成,共执行 {len(test_results)} 个测试用例")
  28. return test_results
  29. def run_sql_injection_tests(self):
  30. """运行SQL注入测试"""
  31. self.logger.info("执行SQL注入测试")
  32. results = []
  33. # 测试SQL注入 payloads
  34. sql_payloads = [
  35. "' OR '1'='1",
  36. "' OR '1'='1' --",
  37. "' UNION SELECT NULL, username, password FROM users --",
  38. "'; DROP TABLE users; --",
  39. "' OR 1=1; --",
  40. "admin' --",
  41. "admin' #",
  42. "' OR 'a'='a",
  43. "' OR 1=1 LIMIT 1 --",
  44. "' OR ''='"
  45. ]
  46. # 测试端点(根据实际应用调整)
  47. test_endpoints = [
  48. "/login",
  49. "/search",
  50. "/user/profile",
  51. "/products"
  52. ]
  53. for endpoint in test_endpoints:
  54. url = urljoin(self.base_url, endpoint)
  55. for payload in sql_payloads:
  56. test_name = f"SQL注入测试 - {endpoint} - {payload}"
  57. try:
  58. # 测试GET参数
  59. params = {'q': payload, 'id': payload}
  60. response = requests.get(url, params=params, timeout=self.timeout)
  61. # 检查响应中是否包含数据库错误信息
  62. db_errors = [
  63. "sql syntax", "mysql_fetch", "ora-01756",
  64. "postgresql", "microsoft odbc", "odbc driver",
  65. "jdbc", "database error", "syntax error"
  66. ]
  67. vulnerability_found = any(error in response.text.lower() for error in db_errors)
  68. if vulnerability_found:
  69. result = {
  70. 'name': test_name,
  71. 'status': 'FAIL',
  72. 'message': f"可能的SQL注入漏洞发现于 {url}",
  73. 'payload': payload,
  74. 'response_code': response.status_code
  75. }
  76. else:
  77. result = {
  78. 'name': test_name,
  79. 'status': 'PASS',
  80. 'message': f"未发现SQL注入漏洞于 {url}",
  81. 'payload': payload,
  82. 'response_code': response.status_code
  83. }
  84. results.append(result)
  85. except Exception as e:
  86. result = {
  87. 'name': test_name,
  88. 'status': 'ERROR',
  89. 'message': f"测试执行出错: {str(e)}",
  90. 'payload': payload
  91. }
  92. results.append(result)
  93. return results
  94. def run_xss_tests(self):
  95. """运行XSS测试"""
  96. self.logger.info("执行XSS测试")
  97. results = []
  98. # XSS测试 payloads
  99. xss_payloads = [
  100. "<script>alert('XSS')</script>",
  101. "<img src=x onerror=alert('XSS')>",
  102. "<svg onload=alert('XSS')>",
  103. "javascript:alert('XSS')",
  104. "<body onload=alert('XSS')>",
  105. "<iframe src=javascript:alert('XSS')>",
  106. "<input onfocus=alert('XSS') autofocus>",
  107. "<details open ontoggle=alert('XSS')>",
  108. "<select onfocus=alert('XSS') autofocus>"
  109. ]
  110. # 测试端点
  111. test_endpoints = [
  112. "/search",
  113. "/comment",
  114. "/contact",
  115. "/profile"
  116. ]
  117. for endpoint in test_endpoints:
  118. url = urljoin(self.base_url, endpoint)
  119. for payload in xss_payloads:
  120. test_name = f"XSS测试 - {endpoint} - {payload}"
  121. try:
  122. # 测试POST数据
  123. data = {'comment': payload, 'name': payload, 'search': payload}
  124. response = requests.post(url, data=data, timeout=self.timeout)
  125. # 检查响应中是否包含未转义的payload
  126. vulnerability_found = payload in response.text
  127. if vulnerability_found:
  128. result = {
  129. 'name': test_name,
  130. 'status': 'FAIL',
  131. 'message': f"可能的XSS漏洞发现于 {url}",
  132. 'payload': payload,
  133. 'response_code': response.status_code
  134. }
  135. else:
  136. result = {
  137. 'name': test_name,
  138. 'status': 'PASS',
  139. 'message': f"未发现XSS漏洞于 {url}",
  140. 'payload': payload,
  141. 'response_code': response.status_code
  142. }
  143. results.append(result)
  144. except Exception as e:
  145. result = {
  146. 'name': test_name,
  147. 'status': 'ERROR',
  148. 'message': f"测试执行出错: {str(e)}",
  149. 'payload': payload
  150. }
  151. results.append(result)
  152. return results
  153. def run_csrf_tests(self):
  154. """运行CSRF测试"""
  155. self.logger.info("执行CSRF测试")
  156. results = []
  157. # 测试需要身份验证的端点
  158. test_endpoints = [
  159. "/user/change-password",
  160. "/user/update-profile",
  161. "/admin/delete-user"
  162. ]
  163. for endpoint in test_endpoints:
  164. url = urljoin(self.base_url, endpoint)
  165. test_name = f"CSRF测试 - {endpoint}"
  166. try:
  167. # 尝试在没有Referer头的情况下发送请求
  168. headers = {'Referer': ''}
  169. response = requests.post(url, headers=headers, timeout=self.timeout)
  170. # 如果请求成功,可能缺乏CSRF保护
  171. if response.status_code < 400:
  172. result = {
  173. 'name': test_name,
  174. 'status': 'FAIL',
  175. 'message': f"可能的CSRF漏洞发现于 {url},请求在没有Referer的情况下成功",
  176. 'response_code': response.status_code
  177. }
  178. else:
  179. result = {
  180. 'name': test_name,
  181. 'status': 'PASS',
  182. 'message': f"未发现CSRF漏洞于 {url},请求被正确拒绝",
  183. 'response_code': response.status_code
  184. }
  185. results.append(result)
  186. except Exception as e:
  187. result = {
  188. 'name': test_name,
  189. 'status': 'ERROR',
  190. 'message': f"测试执行出错: {str(e)}"
  191. }
  192. results.append(result)
  193. return results
  194. def run_info_leakage_tests(self):
  195. """运行敏感信息泄露测试"""
  196. self.logger.info("执行敏感信息泄露测试")
  197. results = []
  198. # 常见敏感文件和目录
  199. sensitive_paths = [
  200. "/.env",
  201. "/.git/config",
  202. "/.htaccess",
  203. "/web.config",
  204. "/phpinfo.php",
  205. "/admin",
  206. "/backup",
  207. "/logs",
  208. "/config.json",
  209. "/database.yml"
  210. ]
  211. for path in sensitive_paths:
  212. url = urljoin(self.base_url, path)
  213. test_name = f"敏感信息泄露测试 - {path}"
  214. try:
  215. response = requests.get(url, timeout=self.timeout)
  216. # 检查响应状态码和内容
  217. if response.status_code == 200:
  218. # 检查响应内容是否包含敏感信息
  219. sensitive_keywords = [
  220. "password", "secret", "key", "token",
  221. "database", "user", "admin", "config"
  222. ]
  223. content = response.text.lower()
  224. info_leakage = any(keyword in content for keyword in sensitive_keywords)
  225. if info_leakage:
  226. result = {
  227. 'name': test_name,
  228. 'status': 'FAIL',
  229. 'message': f"敏感信息可能泄露于 {url}",
  230. 'response_code': response.status_code
  231. }
  232. else:
  233. result = {
  234. 'name': test_name,
  235. 'status': 'PASS',
  236. 'message': f"未发现敏感信息泄露于 {url}",
  237. 'response_code': response.status_code
  238. }
  239. else:
  240. result = {
  241. 'name': test_name,
  242. 'status': 'PASS',
  243. 'message': f"敏感路径 {path} 不可访问",
  244. 'response_code': response.status_code
  245. }
  246. results.append(result)
  247. except Exception as e:
  248. result = {
  249. 'name': test_name,
  250. 'status': 'ERROR',
  251. 'message': f"测试执行出错: {str(e)}"
  252. }
  253. results.append(result)
  254. return results
  255. def run_auth_tests(self):
  256. """运行认证和授权测试"""
  257. self.logger.info("执行认证和授权测试")
  258. results = []
  259. # 需要认证的端点
  260. protected_endpoints = [
  261. "/admin/dashboard",
  262. "/user/profile",
  263. "/api/user/data"
  264. ]
  265. for endpoint in protected_endpoints:
  266. url = urljoin(self.base_url, endpoint)
  267. test_name = f"认证测试 - {endpoint}"
  268. try:
  269. # 尝试未经认证访问
  270. response = requests.get(url, timeout=self.timeout)
  271. # 检查是否重定向到登录页或返回401/403
  272. if response.status_code in [200, 301, 302]:
  273. # 检查是否重定向到登录页面
  274. if 'login' in response.url.lower() or response.history:
  275. result = {
  276. 'name': test_name,
  277. 'status': 'PASS',
  278. 'message': f"认证保护正常工作于 {url}",
  279. 'response_code': response.status_code
  280. }
  281. else:
  282. result = {
  283. 'name': test_name,
  284. 'status': 'FAIL',
  285. 'message': f"认证保护可能失效于 {url},未经认证可以访问",
  286. 'response_code': response.status_code
  287. }
  288. elif response.status_code in [401, 403]:
  289. result = {
  290. 'name': test_name,
  291. 'status': 'PASS',
  292. 'message': f"认证保护正常工作于 {url}",
  293. 'response_code': response.status_code
  294. }
  295. else:
  296. result = {
  297. 'name': test_name,
  298. 'status': 'WARN',
  299. 'message': f"认证测试返回意外状态码: {response.status_code}",
  300. 'response_code': response.status_code
  301. }
  302. results.append(result)
  303. except Exception as e:
  304. result = {
  305. 'name': test_name,
  306. 'status': 'ERROR',
  307. 'message': f"测试执行出错: {str(e)}"
  308. }
  309. results.append(result)
  310. return results