| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- import smtplib
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- from email.mime.base import MIMEBase
- from email import encoders
- import os
- from jinja2 import Template
- from pathlib import Path
- from Config.EmailConfig import EMAIL_CONFIG, EMAIL_TEMPLATE_CONFIG, SMTP_CONFIG
- from Utils.Log import log_for_api
- import requests
- import json
- logger = log_for_api()
- class EmailSender:
- """邮件发送器 - 使用SMTP协议与OAuth 2.0认证"""
- def __init__(self):
- self.config = EMAIL_CONFIG
- self.template_config = EMAIL_TEMPLATE_CONFIG
- self.smtp_config = SMTP_CONFIG
- self.access_token = None
- def get_oauth2_token(self):
- """获取OAuth 2.0访问令牌"""
- try:
- # 这里需要替换为你的Azure应用注册信息
- client_id = self.smtp_config.get("client_id", "")
- client_secret = self.smtp_config.get("client_secret", "")
- tenant_id = self.smtp_config.get("tenant_id", "")
- scope = "https://outlook.office365.com/.default"
- token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
- data = {
- 'client_id': client_id,
- 'scope': scope,
- 'client_secret': client_secret,
- 'grant_type': 'client_credentials'
- }
- response = requests.post(token_url, data=data)
- response.raise_for_status()
- token_data = response.json()
- self.access_token = token_data['access_token']
- return True
- except Exception as e:
- logger.error(f"获取OAuth 2.0令牌时出错: {e}")
- return False
- def render_email_template(self, test_results, report_info):
- """渲染邮件HTML模板"""
- # 保持不变...
- def send_email(self, test_results, report_info, attachment_path=None):
- """通过SMTP发送邮件(使用OAuth 2.0认证)"""
- try:
- # 获取OAuth 2.0访问令牌
- if not self.get_oauth2_token():
- logger.error("无法获取OAuth 2.0访问令牌")
- return False
- # 创建邮件消息
- msg = MIMEMultipart()
- msg['From'] = self.smtp_config["sender_email"]
- msg['To'] = ', '.join(self.config["recipients"])
- msg['Subject'] = self.config["subject"]
- # 如果有抄送和密送
- if self.config["cc_recipients"]:
- msg['Cc'] = ', '.join(self.config["cc_recipients"])
- # 渲染HTML内容
- html_body = self.render_email_template(test_results, report_info)
- msg.attach(MIMEText(html_body, 'html'))
- # 添加附件
- if attachment_path and os.path.exists(attachment_path):
- part = MIMEBase('application', 'octet-stream')
- with open(attachment_path, 'rb') as file:
- part.set_payload(file.read())
- encoders.encode_base64(part)
- part.add_header(
- 'Content-Disposition',
- f'attachment; filename={os.path.basename(attachment_path)}'
- )
- msg.attach(part)
- logger.info(f"已添加附件: {attachment_path}")
- # 连接SMTP服务器并发送邮件
- with smtplib.SMTP(self.smtp_config["smtp_server"], self.smtp_config["smtp_port"]) as server:
- server.ehlo()
- server.starttls()
- server.ehlo()
- # 使用OAuth 2.0认证
- auth_string = f"user={self.smtp_config['sender_email']}\x01auth=Bearer {self.access_token}\x01\x01"
- auth_string = auth_string.encode()
- server.docmd("AUTH", "XOAUTH2 " + auth_string.decode())
- # 发送邮件
- all_recipients = (
- self.config["recipients"] +
- self.config["cc_recipients"] +
- self.config["bcc_recipients"]
- )
- server.sendmail(
- self.smtp_config["sender_email"],
- all_recipients,
- msg.as_string()
- )
- logger.info("邮件已成功发送!")
- return True
- except Exception as e:
- logger.error(f"发送邮件时出错: {e}")
- return False
|