解决现状
由于企业抠门,导致无法备份企业箱中的邮件,因此只能采用这种效率低的方式进行备份
实现效果
通过某种API强制修改离职用户的邮箱密码,然后调用这个程序多进程下载邮件信息到存储文件夹
# 企业邮箱批量下载邮件算法
from imapclient import IMAPClient
from imapclient.exceptions import LoginError
from imapclient.exceptions import IMAPClientError
from multiprocessing import Pool
import time
import eml_parser
import random
import re
import os
'''
重构代码,多进程稳定版
'''
class downloadMailEMLClass:
def __init__(self,server,port,username,password):
self.server = server
self.port = port
self.username = username
self.password = password
self.user = None #全局登录用户
self.folderNameList = [] #用于存储用户文件夹信息,中文英文
self.userFolderPath = None #用于存储用户文件夹路径
def run(self):
self._login()
self._getFolders()
self._mkdirFolds()
self._downMailCore()
#下载邮件核心,控制逻辑
def _downMailCore(self):
for dict in self.folderNameList:
idList = self._getMailIdList(dict["English"])
if idList:
for id in idList:
body = self._getMailBody(id)
subject,createTime = self._pasreBody(body)
print("[_downMailCore_Save] : {}".format(subject))
userFolderPath = os.path.join(self.userFolderPath,dict["Chinese"])
putMailStorage(subject,body,createTime,userFolderPath)
def getIdList(self,dict):
return self._getMailIdList(dict["English"])
def getFolderNameList(self):
return self.folderNameList
def _mulDownMailCore(self,idList,folderChinese):
if idList:
#print(id)
body = self._getMailBody(idList[0])
subject, createTime = self._pasreBody(body)
print("[_downMailCore_Save] : {}".format(subject))
userFolderPath = os.path.join(self.userFolderPath, folderChinese)
putMailStorage(subject, body, createTime, userFolderPath)
#本文由 曲速引擎(Warp Drive)个人博客 曲速引擎(Warp Drive)CSDN技术博客 创作,转载请说明出处谢谢
#https://blog.csdn.net/siberiaWarpDrive
#https://www.exp-9.com/
def mulRun(self,idList,folderDict):
self.user.select_folder(folderDict["English"], readonly=True)
self._mulDownMailCore(idList,folderDict["Chinese"])
def init(self):
self._login()
self._getFolders()
self._mkdirFolds()
#登录模块,返回全局信息
def _login(self):
try:
self.user = IMAPClient(host=self.server,port=self.port)
self.user.login(username=self.username,password=self.password)
except LoginError as Error:
print("[_LOGON_FUN_ERROR] LOGIN FAIL {}".format(Error))
exit()
#制作文件夹字典
def _parseDict(self,english,chinese):
folderNameDict = {}
folderNameDict["English"] = english
folderNameDict["Chinese"] = chinese
return folderNameDict
#获取用户邮箱文件夹,返回列表,列表成员为字典{"English":english,"Chinese":chinese}
def _getFolders(self):
folderNameDulpList = self.user.list_folders()
for folderNameDulp in folderNameDulpList:
folderName = folderNameDulp[-1]
if folderName == "INBOX" :
self.folderNameList.append(self._parseDict(folderName,"收件箱"))
elif folderName == "Drafts":
self.folderNameList.append(self._parseDict(folderName,"草稿箱"))
elif folderName == "Sent Messages":
self.folderNameList.append(self._parseDict(folderName,"已发送的邮件"))
elif folderName == "Deleted Messages":
self.folderNameList.append(self._parseDict(folderName, "已删除的邮件"))
elif folderName == "Junk":
self.folderNameList.append(self._parseDict(folderName, "垃圾邮件"))
else:
self.folderNameList.append(self._parseDict(folderName, folderName))
#创建存储文件夹
def _mkdirFolds(self):
serverPath = os.path.abspath(os.path.dirname(__file__))
backupPath = os.path.join(serverPath,"backUPMailFolder")
self.userFolderPath = os.path.join(backupPath,self.username)
#print(self.userFolderPath)
if os.path.exists(backupPath):
pass
else:
os.mkdir(backupPath)
if os.path.exists(self.userFolderPath):
pass
else:
os.mkdir(self.userFolderPath)
#递归创建文件夹
for folderName in self.folderNameList:
folderNamePath = os.path.join(self.userFolderPath,folderName["Chinese"])
if os.path.exists(folderNamePath):
pass
else:
os.mkdir(folderNamePath)
#获取邮件ID列表
def _getMailIdList(self,folderName):
self.user.select_folder(folderName, readonly=True)
try:
mailIdList = self.user.search()
print("[_getMailIdList_Length] : {} ".format(len(mailIdList)))
return mailIdList
except IMAPClientError as CErr:
print("[_GETMAILIDLIST_ERROR] : {}".format(CErr))
#通过邮件ID列表获取邮件主体列表
def _getMailBody(self,id):
try:
body = self.user.fetch(id, [b'BODY[]'])[id][b'BODY[]']
parser = eml_parser.EmlParser()
print("[_getMailBody_Download] : {}".format(parser.decode_email_bytes(body)["header"]["subject"]))
return body
except KeyError as KErr:
print("[_GetMailList_KEYERROR] : {}".format(KErr))
except IMAPClientError as IErr:
print("[_GetMailList_IMAPClientERROR] : {}".format(IErr)
#分析邮件主体
def _pasreBody(self,body):
parser = eml_parser.EmlParser()
subject = parser.decode_email_bytes(body)["header"]["subject"]
createTime = parser.decode_email_bytes(body)["header"]["date"].strftime("%Y-%m-%d+%H-%M-%S")
return subject,createTime
#存储信息
def putMailStorage(mailTitle,mailBody,mailCreateTime,userFolderPath):
randomStr = ""
baseStr = 'ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'
for i in range(6):
randomStr += baseStr[random.randint(0, len(baseStr)-1)]
##文件名字未处理好,存在特殊字符存储报错
filterRules = [':', '/', '\\', '\*', '\?', '"', '<', '>', '\|']
for ruls in filterRules:
try:
tmp = re.sub(ruls, "_", mailTitle)
except re.error as rErr:
tmp = re.sub(r"\\", "_", mailTitle)
mailTitle = tmp
file = userFolderPath + "\\" + mailTitle + "_" + mailCreateTime + "_" + randomStr + "_" + ".eml"
with open(file, "wb+") as f:f.write(mailBody)
#本文由 曲速引擎(Warp Drive)个人博客 曲速引擎(Warp Drive)CSDN技术博客 创作,转载请说明出处谢谢
#https://blog.csdn.net/siberiaWarpDrive
#https://www.exp-9.com/
def mulCore(worker,folderDict,server,port,username,password):
d = downloadMailEMLClass(server=server, port=port, username=username, password=password)
d.init()
d.mulRun(worker,folderDict)
if __name__ == '__main__':
print("[+] INFO : Foxmail Information Download System")
server = "imap.exmail.qq.com"
port = 993
username = "xxx@xxx.com"
password = "xxx"
startTime = time.time()
d = downloadMailEMLClass(server=server,port=port,username=username,password=password)
d.init()
folderDictList = d.getFolderNameList()
for folderDict in folderDictList:
idList = d.getIdList(folderDict)
if idList:
workerList = []
pList = []
length = 1
for i in range(0, len(idList), length):
workerList.append(idList[i:i + length])
p = Pool()
for worker in workerList:
pList.append(p.apply_async(mulCore, args=(worker, folderDict, server, port, username, password,)))
#本文由 曲速引擎(Warp Drive)个人博客 曲速引擎(Warp Drive)CSDN技术博客 创作,转载请说明出处谢谢
#https://blog.csdn.net/siberiaWarpDrive
#https://www.exp-9.com/
p.close()
p.join()
endTime = time.time()
print("[Main-INFO] : Total usage time {} /s".format(endTime-startTime))
更详细内容查看
独立博客 https://www.dataeast.cn/
CSDN博客 https://blog.csdn.net/siberiaWarpDrive
B站视频空间 https://space.bilibili.com/25871614?spm_id_from=333.1007.0.0
关注 “曲速引擎 Warp Drive” 微信公众号