| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- import smtplib
- from email.mime.text import MIMEText
- from email.mime.multipart import MIMEMultipart
- from email.mime.application import MIMEApplication
- from pathlib import Path
- from core.config.config_manager import ConfigManager
- from core.utils.logger import Logger
- from core.singleton import Singleton
- class EmailNotifier(metaclass=Singleton):
- def __init__(self):
- self.config = ConfigManager()
- self.logger = Logger.get_logger()
- self.smtp_config = self.config.get('email', {})
- def send_email(self, recipients, subject, body, attachments=None):
- """发送邮件"""
- try:
- # 获取SMTP配置
- smtp_server = self.smtp_config.get('smtp_server')
- smtp_port = self.smtp_config.get('smtp_port', 587)
- username = self.smtp_config.get('username')
- password = self.smtp_config.get('password')
- if not all([smtp_server, username, password]):
- self.logger.warning("邮件配置不完整,无法发送邮件")
- return False
- # 创建邮件
- msg = MIMEMultipart()
- msg['From'] = username
- msg['To'] = ', '.join(recipients)
- msg['Subject'] = subject
- # 添加正文
- msg.attach(MIMEText(body, 'plain', 'utf-8'))
- # 添加附件
- if attachments:
- for attachment_path in attachments:
- if isinstance(attachment_path, str):
- attachment_path = Path(attachment_path)
- if attachment_path.exists():
- with open(attachment_path, 'rb') as f:
- part = MIMEApplication(f.read(), Name=attachment_path.name)
- part['Content-Disposition'] = f'attachment; filename="{attachment_path.name}"'
- msg.attach(part)
- # 连接SMTP服务器并发送邮件
- with smtplib.SMTP(smtp_server, smtp_port) as server:
- server.starttls()
- server.login(username, password)
- server.sendmail(username, recipients, msg.as_string())
- self.logger.info(f"邮件已成功发送给: {', '.join(recipients)}")
- return True
- except Exception as e:
- self.logger.error(f"发送邮件失败: {str(e)}")
- return False
- def send_test_report(self, report_paths, test_results, execution_time):
- """发送测试报告邮件"""
- # 获取收件人列表
- recipients = self.smtp_config.get('recipients', [])
- if not recipients:
- self.logger.warning("未配置邮件收件人,跳过发送测试报告")
- return False
- # 准备邮件内容
- subject = self.smtp_config.get(
- 'email_subject',
- '自动化测试报告 - {date}'
- ).format(date=datetime.now().strftime('%Y-%m-%d %H:%M'))
- # 统计测试结果
- total_tests = len(test_results)
- passed = sum(1 for r in test_results if r['status'] == 'PASS')
- failed = sum(1 for r in test_results if r['status'] == 'FAIL')
- error = sum(1 for r in test_results if r['status'] == 'ERROR')
- success_rate = (passed / total_tests) * 100 if total_tests > 0 else 0
- # 构建邮件正文
- body = f"""
- 自动化测试执行完成
- 执行时间: {execution_time:.2f} 秒
- 总测试数: {total_tests}
- 通过: {passed}
- 失败: {failed}
- 错误: {error}
- 通过率: {success_rate:.2f}%
- 测试环境: {self.config.get('environment', '未知环境')}
- 请查看附件中的详细测试报告。
- """
- # 发送邮件
- return self.send_email(recipients, subject, body, report_paths)
|