import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication from pathlib import Path from core.config.config_manager import ConfigManager from core.utils.logger import Logger from core.singleton import Singleton class EmailNotifier(metaclass=Singleton): def __init__(self): self.config = ConfigManager() self.logger = Logger.get_logger() self.smtp_config = self.config.get('email', {}) def send_email(self, recipients, subject, body, attachments=None): """发送邮件""" try: # 获取SMTP配置 smtp_server = self.smtp_config.get('smtp_server') smtp_port = self.smtp_config.get('smtp_port', 587) username = self.smtp_config.get('username') password = self.smtp_config.get('password') if not all([smtp_server, username, password]): self.logger.warning("邮件配置不完整,无法发送邮件") return False # 创建邮件 msg = MIMEMultipart() msg['From'] = username msg['To'] = ', '.join(recipients) msg['Subject'] = subject # 添加正文 msg.attach(MIMEText(body, 'plain', 'utf-8')) # 添加附件 if attachments: for attachment_path in attachments: if isinstance(attachment_path, str): attachment_path = Path(attachment_path) if attachment_path.exists(): with open(attachment_path, 'rb') as f: part = MIMEApplication(f.read(), Name=attachment_path.name) part['Content-Disposition'] = f'attachment; filename="{attachment_path.name}"' msg.attach(part) # 连接SMTP服务器并发送邮件 with smtplib.SMTP(smtp_server, smtp_port) as server: server.starttls() server.login(username, password) server.sendmail(username, recipients, msg.as_string()) self.logger.info(f"邮件已成功发送给: {', '.join(recipients)}") return True except Exception as e: self.logger.error(f"发送邮件失败: {str(e)}") return False def send_test_report(self, report_paths, test_results, execution_time): """发送测试报告邮件""" # 获取收件人列表 recipients = self.smtp_config.get('recipients', []) if not recipients: self.logger.warning("未配置邮件收件人,跳过发送测试报告") return False # 准备邮件内容 subject = self.smtp_config.get( 'email_subject', '自动化测试报告 - {date}' ).format(date=datetime.now().strftime('%Y-%m-%d %H:%M')) # 统计测试结果 total_tests = len(test_results) passed = sum(1 for r in test_results if r['status'] == 'PASS') failed = sum(1 for r in test_results if r['status'] == 'FAIL') error = sum(1 for r in test_results if r['status'] == 'ERROR') success_rate = (passed / total_tests) * 100 if total_tests > 0 else 0 # 构建邮件正文 body = f""" 自动化测试执行完成 执行时间: {execution_time:.2f} 秒 总测试数: {total_tests} 通过: {passed} 失败: {failed} 错误: {error} 通过率: {success_rate:.2f}% 测试环境: {self.config.get('environment', '未知环境')} 请查看附件中的详细测试报告。 """ # 发送邮件 return self.send_email(recipients, subject, body, report_paths)