Sfoglia il codice sorgente

增加邮件功能

BrainZhang 1 mese fa
parent
commit
2c6340593d

+ 14 - 1
Config/Config.py

@@ -1,5 +1,18 @@
 import logging
+from pathlib import Path
+
 
 class Config(object):
     log_level = logging.DEBUG
-    log_format = '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
+    log_format = '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
+# 基础路径配置
+BASE_DIR = Path(__file__).resolve().parent.parent
+
+# 测试相关配置
+TESTS_DIR = BASE_DIR / "Tests"
+ALLURE_RESULTS_DIR = BASE_DIR / "Reports" / "allure-results"
+ALLURE_REPORT_DIR = BASE_DIR / "Reports" / "allure-report"
+REPORT_ARCHIVE_DIR = BASE_DIR / "Reports" / "archives"
+# 确保目录存在
+for directory in [ALLURE_RESULTS_DIR, ALLURE_REPORT_DIR, REPORT_ARCHIVE_DIR]:
+    directory.mkdir(parents=True, exist_ok=True)

+ 26 - 0
Config/EmailConfig.py

@@ -0,0 +1,26 @@
+from datetime import datetime
+
+# 邮件配置
+EMAIL_CONFIG = {
+    "recipients": ["recipient1@example.com", "recipient2@example.com"],
+    "subject": f"自动化测试报告 - {datetime.now().strftime('%Y-%m-%d %H:%M')}",
+    "sender_name": "自动化测试系统",
+    "cc_recipients": ["zhangyu1999520@outlook.com"],  # 抄送列表
+    "bcc_recipients": [],  # 密送列表
+}
+
+# SMTP服务器配置
+SMTP_CONFIG = {
+    "smtp_server": "smtp.office365.com",  # Outlook SMTP服务器
+    "smtp_port": 587,  # Outlook使用的端口
+    "sender_email": "zhangyuroot@outlook.com",  # 发件人邮箱
+    "sender_password": "zhangyu923",  # 发件人密码或应用密码
+    "use_tls": True,  # 是否使用TLS加密
+}
+
+# 邮件模板配置
+EMAIL_TEMPLATE_CONFIG = {
+    "template_path": "templates/email_template.html",
+    "company_name": "Your Company",
+    "team_name": "QA Team",
+}

+ 101 - 0
Run.py

@@ -0,0 +1,101 @@
+import sys
+from datetime import datetime
+from pathlib import Path
+
+from TestCore.email_sender import EmailSender
+from TestCore.report_generator import ReportGenerator
+from TestCore.result_parser import TestResultParser
+from TestCore.test_runner import TestRunner
+
+# 添加项目根目录到Python路径
+sys.path.append(str(Path(__file__).parent))
+
+from Config.Config import ALLURE_RESULTS_DIR, ALLURE_REPORT_DIR, REPORT_ARCHIVE_DIR
+from Utils.FileUtils import zip_directory, create_archive_filename, cleanup_old_archives
+from Utils.Log import log_for_api
+logger = log_for_api()
+
+
+class AllureReportSystem:
+    """Allure测试报告系统主类"""
+
+    def __init__(self):
+        self.test_runner = TestRunner()
+        self.report_generator = ReportGenerator()
+        self.result_parser = TestResultParser(ALLURE_RESULTS_DIR)
+        self.email_sender = EmailSender()
+
+        # 报告信息
+        self.report_info = {
+            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        }
+
+    def run(self, send_email=True):
+        """运行完整的测试报告流程"""
+        logger.info("=" * 50)
+        logger.info("开始执行自动化测试流程")
+        logger.info("=" * 50)
+
+        # 1. 运行测试
+        logger.info("步骤 1: 运行测试")
+        returncode, stdout, stderr = self.test_runner.run_tests()
+
+        if returncode not in [0, 1]:  # 0: 全部通过, 1: 有失败用例
+            logger.error("测试执行失败,终止流程")
+            return False
+
+        # 2. 解析测试结果
+        logger.info("步骤 2: 解析测试结果")
+        test_results = self.result_parser.parse_pytest_output(stdout)
+        logger.info(f"测试结果: {test_results}")
+
+        # 3. 生成Allure报告
+        logger.info("步骤 3: 生成Allure报告")
+        if not self.report_generator.is_allure_available():
+            logger.error("Allure命令行工具不可用,跳过报告生成")
+        else:
+            report_generated = self.report_generator.generate_allure_report()
+            if not report_generated:
+                logger.error("生成Allure报告失败")
+
+        # 4. 压缩报告
+        logger.info("步骤 4: 压缩报告")
+        archive_filename = create_archive_filename()
+        archive_path = REPORT_ARCHIVE_DIR / archive_filename
+
+        zip_success = zip_directory(ALLURE_REPORT_DIR, archive_path)
+
+        if not zip_success:
+            logger.error("压缩报告失败")
+            archive_path = None
+
+        # 5. 发送邮件
+        if send_email and archive_path:
+            logger.info("步骤 5: 发送邮件")
+            email_sent = self.email_sender.send_email(test_results, self.report_info, str(archive_path))
+
+            if email_sent:
+                logger.info("邮件发送成功")
+            else:
+                logger.error("邮件发送失败")
+
+        # 6. 清理旧归档
+        logger.info("步骤 6: 清理旧归档")
+        cleanup_old_archives(REPORT_ARCHIVE_DIR)
+
+        logger.info("=" * 50)
+        logger.info("自动化测试流程执行完成")
+        logger.info("=" * 50)
+
+        return True
+
+
+def main():
+    """主函数"""
+    system = AllureReportSystem()
+    success = system.run()
+    sys.exit(0 if success else 1)
+
+
+if __name__ == "__main__":
+    main()

+ 0 - 0
TestCore/__init__.py


+ 123 - 0
TestCore/email_sender.py

@@ -0,0 +1,123 @@
+import smtplib
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from email.mime.base import MIMEBase
+from email import encoders
+import os
+from jinja2 import Template
+from pathlib import Path
+from Config.EmailConfig import EMAIL_CONFIG, EMAIL_TEMPLATE_CONFIG, SMTP_CONFIG
+from Utils.Log import log_for_api
+import requests
+import json
+
+logger = log_for_api()
+
+
+class EmailSender:
+    """邮件发送器 - 使用SMTP协议与OAuth 2.0认证"""
+
+    def __init__(self):
+        self.config = EMAIL_CONFIG
+        self.template_config = EMAIL_TEMPLATE_CONFIG
+        self.smtp_config = SMTP_CONFIG
+        self.access_token = None
+
+    def get_oauth2_token(self):
+        """获取OAuth 2.0访问令牌"""
+        try:
+            # 这里需要替换为你的Azure应用注册信息
+            client_id = self.smtp_config.get("client_id", "")
+            client_secret = self.smtp_config.get("client_secret", "")
+            tenant_id = self.smtp_config.get("tenant_id", "")
+            scope = "https://outlook.office365.com/.default"
+
+            token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
+
+            data = {
+                'client_id': client_id,
+                'scope': scope,
+                'client_secret': client_secret,
+                'grant_type': 'client_credentials'
+            }
+
+            response = requests.post(token_url, data=data)
+            response.raise_for_status()
+
+            token_data = response.json()
+            self.access_token = token_data['access_token']
+            return True
+
+        except Exception as e:
+            logger.error(f"获取OAuth 2.0令牌时出错: {e}")
+            return False
+
+    def render_email_template(self, test_results, report_info):
+        """渲染邮件HTML模板"""
+        # 保持不变...
+
+    def send_email(self, test_results, report_info, attachment_path=None):
+        """通过SMTP发送邮件(使用OAuth 2.0认证)"""
+        try:
+            # 获取OAuth 2.0访问令牌
+            if not self.get_oauth2_token():
+                logger.error("无法获取OAuth 2.0访问令牌")
+                return False
+
+            # 创建邮件消息
+            msg = MIMEMultipart()
+            msg['From'] = self.smtp_config["sender_email"]
+            msg['To'] = ', '.join(self.config["recipients"])
+            msg['Subject'] = self.config["subject"]
+
+            # 如果有抄送和密送
+            if self.config["cc_recipients"]:
+                msg['Cc'] = ', '.join(self.config["cc_recipients"])
+
+            # 渲染HTML内容
+            html_body = self.render_email_template(test_results, report_info)
+            msg.attach(MIMEText(html_body, 'html'))
+
+            # 添加附件
+            if attachment_path and os.path.exists(attachment_path):
+                part = MIMEBase('application', 'octet-stream')
+                with open(attachment_path, 'rb') as file:
+                    part.set_payload(file.read())
+                encoders.encode_base64(part)
+                part.add_header(
+                    'Content-Disposition',
+                    f'attachment; filename={os.path.basename(attachment_path)}'
+                )
+                msg.attach(part)
+                logger.info(f"已添加附件: {attachment_path}")
+
+            # 连接SMTP服务器并发送邮件
+            with smtplib.SMTP(self.smtp_config["smtp_server"], self.smtp_config["smtp_port"]) as server:
+                server.ehlo()
+                server.starttls()
+                server.ehlo()
+
+                # 使用OAuth 2.0认证
+                auth_string = f"user={self.smtp_config['sender_email']}\x01auth=Bearer {self.access_token}\x01\x01"
+                auth_string = auth_string.encode()
+                server.docmd("AUTH", "XOAUTH2 " + auth_string.decode())
+
+                # 发送邮件
+                all_recipients = (
+                        self.config["recipients"] +
+                        self.config["cc_recipients"] +
+                        self.config["bcc_recipients"]
+                )
+
+                server.sendmail(
+                    self.smtp_config["sender_email"],
+                    all_recipients,
+                    msg.as_string()
+                )
+
+            logger.info("邮件已成功发送!")
+            return True
+
+        except Exception as e:
+            logger.error(f"发送邮件时出错: {e}")
+            return False

+ 66 - 0
TestCore/report_generator.py

@@ -0,0 +1,66 @@
+import subprocess
+from pathlib import Path
+from Config.Config import ALLURE_RESULTS_DIR, ALLURE_REPORT_DIR
+from Utils.Log import log_for_api
+logger = log_for_api()
+
+
+class ReportGenerator:
+    """报告生成器"""
+
+    def __init__(self, results_dir=None, report_dir=None):
+        self.results_dir = Path(results_dir) if results_dir else ALLURE_RESULTS_DIR
+        self.report_dir = Path(report_dir) if report_dir else ALLURE_REPORT_DIR
+
+        # 确保目录存在
+        self.report_dir.mkdir(parents=True, exist_ok=True)
+
+    def generate_allure_report(self):
+        """生成Allure报告"""
+        # 构建Allure命令
+        allure_cmd = [
+            "allure", "generate",
+            str(self.results_dir),
+            "-o", str(self.report_dir),
+            "--clean"
+        ]
+
+        logger.info(f"生成Allure报告命令: {' '.join(allure_cmd)}")
+
+        # 执行命令
+        try:
+            process = subprocess.Popen(
+                allure_cmd,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE,
+                universal_newlines=True,
+                encoding='utf-8',
+                errors='replace'
+            )
+
+            stdout, stderr = process.communicate()
+
+            # 记录输出
+            if stdout:
+                logger.info(f"Allure输出:\n{stdout}")
+            if stderr:
+                logger.error(f"Allure错误:\n{stderr}")
+
+            return process.returncode == 0
+
+        except Exception as e:
+            logger.error(f"生成Allure报告时发生异常: {e}")
+            return False
+
+    def is_allure_available(self):
+        """检查Allure命令行工具是否可用"""
+        try:
+            process = subprocess.Popen(
+                ["allure", "--version"],
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE
+            )
+            process.communicate()
+            return process.returncode == 0
+        except:
+            return False

+ 82 - 0
TestCore/result_parser.py

@@ -0,0 +1,82 @@
+import re
+import json
+from pathlib import Path
+from Utils.Log import log_for_api
+logger = log_for_api()
+
+
+class TestResultParser:
+    """测试结果解析器"""
+
+    def __init__(self, results_dir):
+        self.results_dir = Path(results_dir)
+
+    def parse_pytest_output(self, output):
+        """从pytest输出中解析测试结果"""
+        results = {
+            "total": 0,
+            "passed": 0,
+            "failed": 0,
+            "skipped": 0,
+            "errors": 0,
+            "duration": 0,
+            "pass_rate": 0
+        }
+
+        # 尝试从输出中解析结果
+        try:
+            # 匹配 pytest 结果摘要行
+            pattern = r"(\d+) passed.*?(\d+) failed.*?(\d+) skipped"
+            match = re.search(pattern, output)
+
+            if match:
+                results["passed"] = int(match.group(1))
+                results["failed"] = int(match.group(2))
+                results["skipped"] = int(match.group(3))
+                results["total"] = results["passed"] + results["failed"] + results["skipped"]
+
+                # 计算通过率
+                if results["total"] > 0:
+                    results["pass_rate"] = round((results["passed"] / results["total"]) * 100, 2)
+
+            # 尝试解析持续时间
+            duration_pattern = r"in (\d+\.\d+)s"
+            duration_match = re.search(duration_pattern, output)
+            if duration_match:
+                results["duration"] = float(duration_match.group(1))
+
+        except Exception as e:
+            logger.error(f"解析pytest输出时出错: {e}")
+
+        return results
+
+    def parse_allure_results(self):
+        """从Allure结果文件中解析更详细的信息"""
+        results = {
+            "suites": [],
+            "categories": [],
+            "status_counts": {"passed": 0, "failed": 0, "broken": 0, "skipped": 0, "unknown": 0}
+        }
+
+        try:
+            # 查找所有result.json文件
+            result_files = list(self.results_dir.glob("*-result.json"))
+
+            for result_file in result_files:
+                with open(result_file, 'r', encoding='utf-8') as f:
+                    data = json.load(f)
+
+                    # 统计状态
+                    status = data.get("status", "unknown")
+                    if status in results["status_counts"]:
+                        results["status_counts"][status] += 1
+
+                    # 收集套件信息
+                    suite_name = data.get("labels", {}).get("suite", "Unknown Suite")
+                    if suite_name not in results["suites"]:
+                        results["suites"].append(suite_name)
+
+        except Exception as e:
+            logger.error(f"解析Allure结果时出错: {e}")
+
+        return results

+ 58 - 0
TestCore/test_runner.py

@@ -0,0 +1,58 @@
+import subprocess
+import sys
+from pathlib import Path
+from Config.Config import TESTS_DIR, ALLURE_RESULTS_DIR
+from Utils.Log import log_for_api
+logger = log_for_api()
+
+
+class TestRunner:
+    """测试运行器"""
+
+    def __init__(self, tests_dir=None, results_dir=None):
+        self.tests_dir = Path(tests_dir) if tests_dir else TESTS_DIR
+        self.results_dir = Path(results_dir) if results_dir else ALLURE_RESULTS_DIR
+
+        # 确保目录存在
+        self.results_dir.mkdir(parents=True, exist_ok=True)
+
+    def run_tests(self, additional_args=None):
+        """运行测试套件"""
+        if additional_args is None:
+            additional_args = []
+
+        # 构建pytest命令
+        pytest_cmd = [
+                         sys.executable, "-m", "pytest",
+                         str(self.tests_dir),
+                         f"--alluredir={self.results_dir}",
+                         "--clean-alluredir",
+                         "-v"
+                     ] + additional_args
+
+        logger.info(f"运行测试命令: {' '.join(pytest_cmd)}")
+
+        # 执行测试
+        try:
+            process = subprocess.Popen(
+                pytest_cmd,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE,
+                universal_newlines=True,
+                encoding='utf-8',
+                errors='replace'
+            )
+
+            stdout, stderr = process.communicate()
+
+            # 记录输出
+            if stdout:
+                logger.info(f"测试输出:\n{stdout}")
+            if stderr:
+                logger.error(f"测试错误:\n{stderr}")
+
+            return process.returncode, stdout, stderr
+
+        except Exception as e:
+            logger.error(f"运行测试时发生异常: {e}")
+            return -1, "", str(e)

+ 40 - 68
Tests/API/test_api_baidu.py

@@ -2,23 +2,12 @@ import pytest
 import allure
 from Base.WebAPI.API.Base import request_handler
 
-@allure.epic("API自动化测试")
-@allure.feature("百度API测试")
 class TestBaiduAPI:
-    """百度接口测试用例"""
-
-    @allure.story("搜索功能测试")
-    @allure.title("测试百度搜索接口 - {search_keyword}")
-    @allure.severity(allure.severity_level.CRITICAL)
-    @allure.description("测试百度搜索功能,验证返回结果包含搜索关键词")
     @pytest.mark.parametrize("search_keyword", ["python接口测试", "自动化测试", "pytest"])
     def test_baidu_search(self, search_keyword):
-        """测试百度搜索接口"""
-        # 添加测试步骤
         with allure.step("准备请求参数"):
             params = {'wd': search_keyword}
             allure.attach(f"搜索关键词: {search_keyword}", name="请求参数")
-
         with allure.step("发送搜索请求"):
             # 添加更多请求头,模拟真实浏览器
             headers = {
@@ -31,16 +20,11 @@ class TestBaiduAPI:
             # 检查是否触发了安全验证
             if '安全验证' in response.text or 'verify' in response.text.lower():
                 pytest.xfail("百度安全验证被触发,跳过此测试")
-
             # 断言
             assert response.status_code == 200
             assert search_keyword in response.text
             allure.attach(f"响应内容包含关键词: {search_keyword}", name="验证结果")
 
-    @allure.story("首页访问测试")
-    @allure.title("测试百度首页访问")
-    @allure.severity(allure.severity_level.NORMAL)
-    @allure.description("测试百度首页访问,验证页面基本元素")
     def test_baidu_homepage(self):
         """测试百度首页"""
         with allure.step("发送首页请求"):
@@ -60,58 +44,46 @@ class TestBaiduAPI:
             allure.attach("首页访问成功", name="验证结果")
 
 
-@allure.epic("API自动化测试")
-@allure.feature("HTTPBin API测试")
-class TestHTTPBin:
-    """HTTPBin接口测试用例"""
-
-    @allure.story("GET请求测试")
-    @allure.title("测试HTTPBin GET接口")
-    @allure.severity(allure.severity_level.CRITICAL)
-    @allure.description("测试HTTPBin GET接口,验证参数传递和响应")
-    def test_httpbin_get(self):
-        """测试HTTPBin GET接口"""
-        with allure.step("准备请求参数"):
-            params = {
-                'test': 'python接口测试',
-                'number': 123
-            }
-            allure.attach(str(params), name="请求参数")
-
-        with allure.step("发送GET请求"):
-            response = request_handler.get('https://httpbin.org/get', params=params)
-            allure.attach(f"状态码: {response.status_code}", name="响应状态")
-
-        with allure.step("验证响应结果"):
-            assert response.status_code == 200
-            data = response.json()
-            assert data['args']['test'] == 'python接口测试'
-            assert data['args']['number'] == '123'
-            allure.attach(str(data), name="响应数据")
-
-    @allure.story("POST请求测试")
-    @allure.title("测试HTTPBin POST接口")
-    @allure.severity(allure.severity_level.CRITICAL)
-    @allure.description("测试HTTPBin POST接口,验证JSON数据传递")
-    def test_httpbin_post(self):
-        """测试HTTPBin POST接口"""
-        with allure.step("准备请求数据"):
-            data = {
-                'test': 'python接口测试',
-                'number': 123
-            }
-            allure.attach(str(data), name="请求数据")
-
-        with allure.step("发送POST请求"):
-            response = request_handler.post('https://httpbin.org/post', json=data)
-            allure.attach(f"状态码: {response.status_code}", name="响应状态")
-
-        with allure.step("验证响应结果"):
-            assert response.status_code == 200
-            response_data = response.json()
-            assert response_data['json']['test'] == 'python接口测试'
-            assert response_data['json']['number'] == 123
-            allure.attach(str(response_data), name="响应数据")
+# class TestHTTPBin:
+#     def test_httpbin_get(self):
+#         """测试HTTPBin GET接口"""
+#         with allure.step("准备请求参数"):
+#             params = {
+#                 'test': 'python接口测试',
+#                 'number': 123
+#             }
+#             allure.attach(str(params), name="请求参数")
+#
+#         with allure.step("发送GET请求"):
+#             response = request_handler.get('https://httpbin.org/get', params=params)
+#             allure.attach(f"状态码: {response.status_code}", name="响应状态")
+#
+#         with allure.step("验证响应结果"):
+#             assert response.status_code == 200
+#             data = response.json()
+#             assert data['args']['test'] == 'python接口测试'
+#             assert data['args']['number'] == '123'
+#             allure.attach(str(data), name="响应数据")
+#
+#     def test_httpbin_post(self):
+#         """测试HTTPBin POST接口"""
+#         with allure.step("准备请求数据"):
+#             data = {
+#                 'test': 'python接口测试',
+#                 'number': 123
+#             }
+#             allure.attach(str(data), name="请求数据")
+#
+#         with allure.step("发送POST请求"):
+#             response = request_handler.post('https://httpbin.org/post', json=data)
+#             allure.attach(f"状态码: {response.status_code}", name="响应状态")
+#
+#         with allure.step("验证响应结果"):
+#             assert response.status_code == 200
+#             response_data = response.json()
+#             assert response_data['json']['test'] == 'python接口测试'
+#             assert response_data['json']['number'] == 123
+#             allure.attach(str(response_data), name="响应数据")
 
 
 if __name__ == '__main__':

+ 41 - 0
Utils/FileUtils.py

@@ -0,0 +1,41 @@
+import os
+import zipfile
+from pathlib import Path
+from datetime import datetime
+from Utils.Log import log_for_api
+logger = log_for_api()
+
+def zip_directory(source_dir, output_path):
+    """将目录压缩为ZIP文件"""
+    try:
+        with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
+            for root, dirs, files in os.walk(source_dir):
+                for file in files:
+                    file_path = os.path.join(root, file)
+                    # 在ZIP文件中保持相对路径
+                    arcname = os.path.relpath(file_path, source_dir)
+                    zipf.write(file_path, arcname)
+
+        logger.info(f"目录已成功压缩: {output_path}")
+        return True
+    except Exception as e:
+        logger.error(f"压缩目录时出错: {e}")
+        return False
+
+
+def create_archive_filename(base_name="allure_report"):
+    """创建带时间戳的归档文件名"""
+    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+    return f"{base_name}_{timestamp}.zip"
+
+
+def cleanup_old_archives(directory, max_files=10):
+    """清理旧的归档文件,只保留最近的一些文件"""
+    try:
+        files = sorted(Path(directory).iterdir(), key=os.path.getmtime)
+        if len(files) > max_files:
+            for file in files[:-max_files]:
+                file.unlink()
+                logger.info(f"已删除旧归档文件: {file}")
+    except Exception as e:
+        logger.error(f"清理旧归档文件时出错: {e}")

+ 79 - 103
Utils/Log.py

@@ -1,145 +1,121 @@
 import logging
-import sys
-import traceback
 from datetime import datetime
 from pathlib import Path
+import atexit
+from typing import Optional
 from Config.Config import Config
 
 
 class Logger:
-    def __init__(self, log_path: str = None, log_name: str = None):
+    def __init__(self, log_path: Optional[str] = None, log_name: Optional[str] = None):
         """
         初始化日志记录器
         Args:
             log_path: 日志存储路径,默认为项目根目录下的Log文件夹
             log_name: 日志文件名,默认为当前日期时间
         """
-        # 设置默认日志路径为项目根目录下的Log文件夹
-
+        # 1. 处理日志路径(默认项目根目录/Logs,不存在则创建)
         if log_path is None:
             project_root = self._find_project_root()
-            log_path = project_root / "Log"
-        # 创建日志目录(如果不存在)
-        self.log_path = Path(log_path)
-        self.log_path.mkdir(parents=True, exist_ok=True)
-        # 设置日志文件名
+            self.log_path = project_root / "Logs"
+        else:
+            self.log_path = Path(log_path)
+        self.log_path.mkdir(parents=True, exist_ok=True)  # 递归创建目录
+
+        # 2. 处理日志文件名(默认格式:log_年月日_时分秒.log)
         if log_name is None:
             log_name = f"log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
         self.log_file = self.log_path / log_name
-        # 配置日志记录器
-        self.logger = logging.getLogger(__name__)
+
+        # 3. 初始化日志记录器(避免多实例冲突)
+        self.logger = logging.getLogger(f"{__name__}_{id(self)}")
         self.logger.setLevel(Config.log_level)
-        # 清除现有处理器(避免重复记录)
-        self.logger.handlers.clear()
-        # 创建格式化器
-        formatter = logging.Formatter(Config.log_format)
-        # 文件处理器(记录所有级别日志)
-        file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
-        file_handler.setLevel(logging.DEBUG)
-        file_handler.setFormatter(formatter)
-        self.logger.addHandler(file_handler)
-        # 控制台处理器(仅记录INFO及以上级别)
-        console_handler = logging.StreamHandler()
-        console_handler.setLevel(logging.INFO)
-        console_handler.setFormatter(formatter)
-        self.logger.addHandler(console_handler)
-        # 设置异常钩子,捕获未处理的异常
-        sys.excepthook = self._handle_exception
-        # 重定向标准输出和标准错误
-        self._redirect_stdout_stderr()
+        self.logger.handlers.clear()  # 清除默认处理器,防止重复记录
+
+        # 4. 创建文件日志处理器(核心功能,UTF-8编码避免中文乱码)
+        self.file_handler = None
+        try:
+            self.file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
+            self.file_handler.setLevel(logging.DEBUG)  # 文件记录所有级别日志
+            # 应用配置的日志格式
+            formatter = logging.Formatter(Config.log_format)
+            self.file_handler.setFormatter(formatter)
+            self.logger.addHandler(self.file_handler)
+        except Exception as e:
+            # 日志文件创建失败时,仅简单打印(无控制台日志,故用原始print)
+            print(f"创建日志文件失败: {e}")
+
+        # 5. 注册退出清理函数(程序结束时关闭日志文件)
+        atexit.register(self.cleanup)
+
+        # 6. 记录初始化完成日志
+        if self._is_logging_available():
+            self.logger.info(f"日志系统初始化完成,日志文件: {self.log_file}")
 
     def _find_project_root(self):
-        """查找项目根目录"""
-        # 查找包含.git、.project、setup.py或requirements.txt等标记文件的目录
+        """查找项目根目录(基于常见项目标记文件)"""
         current_dir = Path(__file__).resolve().parent
-        # 向上查找直到找到项目根目录标记
+        # 向上遍历目录,直到找到项目根目录标记
         for parent in [current_dir] + list(current_dir.parents):
             if any((parent / marker).exists() for marker in
                    ['.git', '.project', 'setup.py', 'requirements.txt', 'pyproject.toml']):
                 return parent
-        # 如果找不到标记文件,则使用当前工作目录
+        # 未找到标记时,使用当前工作目录
         return Path.cwd()
 
-    def _redirect_stdout_stderr(self):
-        """
-        重定向标准输出和标准错误到日志
-        :return:
-        """   
-        class StreamToLogger:
-            def __init__(self, logger, level):
-                self.logger = logger
-                self.level = level
-                self.linebuf = ''
-
-            def write(self, buf):
-                for line in buf.rstrip().splitlines():
-                    if line.strip():  # 忽略空行
-                        self.logger.log(self.level, line.rstrip())
-
-            def flush(self):
-                pass
-        # 重定向标准输出到INFO级别
-        sys.stdout = StreamToLogger(self.logger, logging.INFO)
-        # 重定向标准错误到ERROR级别
-        sys.stderr = StreamToLogger(self.logger, logging.ERROR)
-
-    def _handle_exception(self, exc_type, exc_value, exc_traceback):
-        """处理未捕获的异常"""
-        # 忽略KeyboardInterrupt,以便控制台可以正常退出
-        if issubclass(exc_type, KeyboardInterrupt):
-            sys.__excepthook__(exc_type, exc_value, exc_traceback)
-            return
-        self.logger.error(
-            "未捕获的异常:",
-            exc_info=(exc_type, exc_value, exc_traceback)
-        )
+    def _is_logging_available(self):
+        """检查日志系统是否可用"""
+        return self.file_handler is not None and self.file_handler in self.logger.handlers
+
+    def cleanup(self):
+        """清理资源:关闭日志文件处理器"""
+        if self.file_handler:
+            try:
+                self.file_handler.close()
+                self.logger.removeHandler(self.file_handler)
+            except Exception as e:
+                print(f"关闭日志文件时出错: {e}")
 
+    # ------------------------------ 核心日志方法------------------------------
     def debug(self, msg):
-        self.logger.debug(msg)
+        if self._is_logging_available():
+            self.logger.debug(msg)
 
     def info(self, msg):
-        self.logger.info(msg)
+        if self._is_logging_available():
+            self.logger.info(msg)
 
     def warning(self, msg):
-        self.logger.warning(msg)
+        if self._is_logging_available():
+            self.logger.warning(msg)
 
     def error(self, msg):
-        self.logger.error(msg)
+        if self._is_logging_available():
+            self.logger.error(msg)
 
     def critical(self, msg):
-        self.logger.critical(msg)
+        if self._is_logging_available():
+            self.logger.critical(msg)
 
     def exception(self, msg, exc_info=True):
-        """记录异常信息,包括堆栈跟踪"""
-        self.logger.exception(msg, exc_info=exc_info)
+        """记录异常(简化版:仅文件日志,含堆栈)"""
+        if self._is_logging_available():
+            self.logger.exception(msg, exc_info=exc_info)
 
     def log_exception(self, e, context=""):
-        """记录异常及其上下文信息"""
-        error_msg = f"{context}: {type(e).__name__}: {str(e)}"
-        self.error(error_msg)
-        self.error("堆栈跟踪:\n" + "".join(traceback.format_tb(e.__traceback__)))
-
-
-# 使用示例
-if __name__ == "__main__":
-    # 创建日志记录器实例(使用默认路径)
-    logger = Logger(log_name="my_app.log")
-    # 记录不同级别的日志
-    logger.debug("这是一条调试信息")
-    logger.info("程序启动成功")
-    logger.warning("磁盘空间不足")
-    # 模拟一个异常
-    try:
-        result = 1 / 0
-    except Exception as e:
-        logger.log_exception(e, "除法运算时发生错误")
-    # 模拟控制台输出
-    print("这是一条普通的控制台输出")
-    # 模拟控制台错误输出
-    sys.stderr.write("这是一条错误输出\n")
-    # 模拟未捕获的异常
-    def cause_exception():
-        raise ValueError("这是一个未捕获的异常")
-    # 调用会抛出异常的函数
-    cause_exception()
-    logger.info("程序执行完成")
+        """记录异常及上下文(简化版:仅文件日志)"""
+        if self._is_logging_available():
+            error_msg = f"{context}: {type(e).__name__}: {str(e)}"
+            self.error(error_msg)
+            # 手动添加堆栈跟踪
+            import traceback
+            self.error("堆栈跟踪:\n" + "".join(traceback.format_tb(e.__traceback__)))
+
+
+# ------------------------------ 快捷日志创建方法------------------------------
+def log_for_api():
+    return Logger(log_path="Logs/API", log_name="api_test.log")
+
+
+def log_for_ui():
+    return Logger(log_path="Logs/UI", log_name="ui_test.log")

+ 3 - 2
requirements.txt

@@ -6,9 +6,10 @@ selenium
 requests
 # 测试框架
 pytest
-pytest-html            # 实在没有办法,才使用pytest自带报告,自定义CSS,JS实现现代化
 # 报告依赖
-# allure-pytest        废弃,网络环境不允许
+# pytest-html            # 废弃 , 实在没有办法,才使用pytest自带报告,自定义CSS,JS实现现代化
+# allure-pytest        # 废弃,网络环境不允许
+# pytest-testreport
 # 其他依赖
 Pillow
 pyyaml

+ 64 - 0
templates/email_template.html

@@ -0,0 +1,64 @@
+<!DOCTYPE html>
+<html>
+<head>
+    <meta charset="UTF-8">
+    <title>自动化测试报告</title>
+    <style>
+        body { font-family: Arial, sans-serif; line-height: 1.6; color: #333; }
+        .container { max-width: 800px; margin: 0 auto; padding: 20px; }
+        .header { background-color: #f8f9fa; padding: 20px; border-radius: 5px; margin-bottom: 20px; }
+        .summary { background-color: #e9ecef; padding: 15px; border-radius: 5px; margin-bottom: 20px; }
+        .stats { display: flex; justify-content: space-between; margin-bottom: 20px; }
+        .stat-box { flex: 1; text-align: center; padding: 15px; border-radius: 5px; margin: 0 5px; }
+        .passed { background-color: #d4edda; color: #155724; }
+        .failed { background-color: #f8d7da; color: #721c24; }
+        .skipped { background-color: #fff3cd; color: #856404; }
+        .total { background-color: #d1ecf1; color: #0c5460; }
+        .footer { margin-top: 30px; padding-top: 20px; border-top: 1px solid #dee2e6; color: #6c757d; }
+    </style>
+</head>
+<body>
+    <div class="container">
+        <div class="header">
+            <h1>自动化测试报告</h1>
+            <p>{{ company_name }} - {{ team_name }}</p>
+        </div>
+
+        <div class="summary">
+            <h2>测试执行摘要</h2>
+            <p>测试运行已完成,以下是执行结果的摘要信息。</p>
+        </div>
+
+        <div class="stats">
+            <div class="stat-box total">
+                <h3>总测试数</h3>
+                <p>{{ test_results.total }}</p>
+            </div>
+            <div class="stat-box passed">
+                <h3>通过</h3>
+                <p>{{ test_results.passed }}</p>
+            </div>
+            <div class="stat-box failed">
+                <h3>失败</h3>
+                <p>{{ test_results.failed }}</p>
+            </div>
+            <div class="stat-box skipped">
+                <h3>跳过</h3>
+                <p>{{ test_results.skipped }}</p>
+            </div>
+        </div>
+
+        <div>
+            <h2>详细结果</h2>
+            <p><strong>通过率:</strong> {{ test_results.pass_rate }}%</p>
+            <p><strong>执行时长:</strong> {{ test_results.duration }} 秒</p>
+            <p><strong>报告生成时间:</strong> {{ report_info.timestamp }}</p>
+        </div>
+
+        <div class="footer">
+            <p>此邮件由自动化测试系统自动生成,请勿直接回复。</p>
+            <p>如有问题,请联系相关测试团队成员。</p>
+        </div>
+    </div>
+</body>
+</html>

+ 96 - 53
testRun.py

@@ -1,66 +1,109 @@
+#!/usr/bin/env python3
+"""
+Allure测试运行主入口文件 - Windows专用版
+用于运行Tests文件夹中的所有测试用例并生成Allure报告
+"""
+
 import os
 import subprocess
 import sys
-from Utils.enhance_report import enhance_report
+from pathlib import Path
+
+# 配置路径
+TESTS_DIR = "Tests"  # 测试用例目录
+ALLURE_RESULTS_DIR = "Reports/allure-results"  # Allure原始数据目录
+ALLURE_REPORT_DIR = "Reports/allure-report"  # Allure报告输出目录
 
-# 添加项目根目录到 Python 路径
-sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+def run_command(cmd, description=""):
+    """运行命令并处理Windows下的编码问题"""
+    if description:
+        print(f"\n{description}")
+        print("-" * 50)
+
+    # 在Windows上使用特定的编码处理
+    if sys.platform == "win32":
+        # 使用shell=True可以避免一些Windows上的路径问题
+        process = subprocess.Popen(
+            cmd,
+            shell=True,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            universal_newlines=True,  # 以文本模式处理输出
+            encoding='utf-8',  # 强制使用UTF-8编码
+            errors='replace'  # 替换无法解码的字符
+        )
+    else:
+        process = subprocess.Popen(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            universal_newlines=True
+        )
+
+    stdout, stderr = process.communicate()
+    return process.returncode, stdout, stderr
 
 
 def run_tests():
-    """运行测试并生成美化报告"""
-    # 创建报告目录
-    os.makedirs('reports', exist_ok=True)
-    os.makedirs('reports/assets/screenshots', exist_ok=True)
-    os.makedirs('reports/assets/logs', exist_ok=True)
+    """运行测试并生成Allure报告"""
+
+    # 确保目录存在
+    Path(ALLURE_RESULTS_DIR).mkdir(exist_ok=True)
+    Path(ALLURE_REPORT_DIR).mkdir(exist_ok=True)
 
+    print("=" * 50)
     print("开始运行测试...")
+    print("=" * 50)
 
-    # 运行测试 - 使用UTF-8编码避免中文乱码
-    try:
-        result = subprocess.run([
-            'pytest',
-            'Tests/',
-            '--html=reports/report.html',
-            '--self-contained-html',
-            '-v'
-        ], capture_output=True, text=True, encoding='utf-8', errors='replace')
-    except Exception as e:
-        print(f"运行测试时出错: {e}")
-        # 尝试不使用编码参数
-        result = subprocess.run([
-            'pytest',
-            'Tests/',
-            '--html=reports/report.html',
-            '--self-contained-html',
-            '-v'
-        ], capture_output=True, text=True)
-
-    # 打印测试结果
-    print("\n测试输出:")
-    if result.stdout:
-        print(result.stdout)
-    if result.stderr:
-        print("错误输出:")
-        print(result.stderr)
-
-    # 增强报告
-    print("\n生成美化报告...")
-    if os.path.exists('reports/report.html'):
-        enhanced_report = enhance_report()
-        print(f"美化报告已生成: {enhanced_report}")
-    else:
-        print("警告: 未找到原始报告文件")
-        # 尝试直接运行增强报告
-        enhanced_report = enhance_report()
-        if enhanced_report:
-            print(f"美化报告已生成: {enhanced_report}")
+    # 构建pytest命令
+    pytest_cmd = [
+        sys.executable, "-m", "pytest",
+        TESTS_DIR,
+        f"--alluredir={ALLURE_RESULTS_DIR}",
+        "--clean-alluredir",  # 清理之前的测试结果
+        "-v"  # 详细输出
+    ]
+
+    # 运行测试
+    returncode, stdout, stderr = run_command(pytest_cmd, "运行测试用例")
+
+    # 输出测试结果
+    print(stdout)
+    if stderr:
+        print("错误信息:", stderr)
+    # 检查测试是否成功
+    if returncode not in [0, 1]:  # 0: 所有测试通过, 1: 有测试失败
+        print(f"测试执行失败,退出码: {returncode}")
+        return False
+
+    print("=" * 50)
+    print("测试执行完成,正在生成Allure报告...")
+    print("=" * 50)
 
-    # 返回退出码
-    return result.returncode
+    # 生成Allure报告
+    allure_cmd = [
+        "allure", "generate",
+        ALLURE_RESULTS_DIR,
+        "-o", ALLURE_REPORT_DIR,
+        "--clean"
+    ]
+
+    # 运行Allure生成命令
+    returncode, stdout, stderr = run_command(allure_cmd, "生成Allure报告")
+
+    if returncode == 0:
+        print("=" * 50)
+        print("Allure报告生成成功!")
+        print(f"报告位置: {os.path.abspath(ALLURE_REPORT_DIR)}")
+        print("=" * 50)
+    else:
+        print("Allure报告生成失败:")
+        print(stderr)
+        return False
+    return True
 
 
-if __name__ == '__main__':
-    exit_code = run_tests()
-    print(f"\n测试执行完成,退出码: {exit_code}")
-    sys.exit(exit_code)
+if __name__ == "__main__":
+    success = run_tests()
+    sys.exit(0 if success else 1)