248 lines
8.6 KiB
Python
248 lines
8.6 KiB
Python
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email.header import Header
|
|
from email import encoders
|
|
import smtplib
|
|
import sys
|
|
import os
|
|
import re
|
|
|
|
|
|
class SMTPMailerSendState(dict):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.code = self.message = None
|
|
|
|
def __setattr__(self, key, value):
|
|
pass
|
|
|
|
def __getitem__(self, item):
|
|
try:
|
|
return super().__getitem__(item)
|
|
except Exception:
|
|
return None
|
|
|
|
def __getattr__(self, item):
|
|
try:
|
|
return super().__getitem__(item)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class SMTPMailer:
|
|
__host__ = {
|
|
'host': '',
|
|
'port': 25
|
|
}
|
|
__user__ = {
|
|
'user': '',
|
|
'pass': ''
|
|
}
|
|
__mail__ = {
|
|
'from': [],
|
|
'to': [],
|
|
'cc': [],
|
|
'bc': [],
|
|
'attachs': [],
|
|
'subject': '',
|
|
'content': ''
|
|
}
|
|
__flag__ = 0
|
|
__addr__ = 0
|
|
__from__ = 0
|
|
|
|
def __init__(
|
|
self,
|
|
username='',
|
|
password='',
|
|
host='127.0.0.1',
|
|
port=25,
|
|
ssl_enable=False,
|
|
send_from='',
|
|
send_to=None,
|
|
send_cc=None,
|
|
send_bc=None,
|
|
subject='',
|
|
content='',
|
|
attachs=None
|
|
):
|
|
self.ssl_enable = ssl_enable
|
|
self.host(host, port)
|
|
self.user(username, password)
|
|
send_from and self.send_from(send_from)
|
|
send_to and self.to(send_to)
|
|
send_cc and self.cc(send_cc)
|
|
send_bc and self.bc(send_bc)
|
|
subject and self.subject(subject)
|
|
content and self.content(content)
|
|
attachs and self.attachs(attachs)
|
|
|
|
@staticmethod
|
|
def test_email(email: str, throw_exception=False):
|
|
test = re.fullmatch('^([a-zA-Z0-9_.\\-])+@(([a-zA-Z0-9\\-])+\\.)+([a-zA-Z0-9]{2,4})+$', email) is not None
|
|
if throw_exception and test is False:
|
|
raise Exception('Illegal email address: %s.' % email)
|
|
return test
|
|
|
|
@staticmethod
|
|
def gen_header_of_mailer(e):
|
|
try:
|
|
return "\"%s\" <%s>" % (Header(e[0]).encode(), Header(e[1]).encode())
|
|
except Exception:
|
|
return ''
|
|
|
|
def parse_email_address(self, email: str):
|
|
if email.find('<') != -1 and email.find('>') != -1:
|
|
match_email = re.findall('^(.*?)<(.*?)>$', email)
|
|
if len(match_email) == 1:
|
|
addr = match_email[0][1].strip()
|
|
self.test_email(addr, throw_exception=True)
|
|
name = match_email[0][0].strip() or addr.split('@')[0].strip()
|
|
return [name, addr]
|
|
else:
|
|
raise Exception('The wrong email address format: %s.' % email)
|
|
else:
|
|
if len(email) != 0:
|
|
addr = email
|
|
self.test_email(addr, throw_exception=True)
|
|
name = email.split('@')[0].strip()
|
|
return [name, addr]
|
|
else:
|
|
raise Exception('Empty email address.')
|
|
|
|
def parse_email_address_mult(self, data):
|
|
if not isinstance(data, (str, list, tuple)):
|
|
raise TypeError('Email address list must be string or list or tuple type.')
|
|
if isinstance(data, str):
|
|
data = [i.strip() for i in data.replace(',', ';').split(';')]
|
|
while '' in data:
|
|
data.remove('')
|
|
return [self.parse_email_address(email_format) for email_format in data]
|
|
|
|
def enableSSL(self):
|
|
self.ssl_enable = True
|
|
return self
|
|
|
|
def host(self, host: str, port: int):
|
|
self.__host__['host'] = host
|
|
self.__host__['port'] = port
|
|
return self
|
|
|
|
def user(self, username, password):
|
|
self.__user__['user'] = username
|
|
self.__user__['pass'] = password
|
|
return self
|
|
|
|
def send_from(self, e):
|
|
self.__mail__['from'] = self.parse_email_address(e)
|
|
self.__from__ += 1
|
|
return self
|
|
|
|
def to(self, receive_list):
|
|
self.__mail__['to'] = self.parse_email_address_mult(receive_list)
|
|
self.__addr__ += 1
|
|
return self
|
|
|
|
def cc(self, receive_list):
|
|
self.__mail__['cc'] = self.parse_email_address_mult(receive_list)
|
|
self.__addr__ += 1
|
|
return self
|
|
|
|
def bc(self, receive_list):
|
|
self.__mail__['bc'] = self.parse_email_address_mult(receive_list)
|
|
self.__addr__ += 1
|
|
return self
|
|
|
|
def subject(self, text: str):
|
|
self.__mail__['subject'] = text
|
|
return self
|
|
|
|
def content(self, text: str):
|
|
self.__mail__['content'] = text
|
|
return self
|
|
|
|
def attachs(self, attachs_path_list: str | list):
|
|
if isinstance(attachs_path_list, str):
|
|
attachs_path_list = [attachs_path_list]
|
|
for path in attachs_path_list:
|
|
if not os.path.exists(path):
|
|
raise FileNotFoundError('The attachment was not found.')
|
|
self.__mail__['attachs'] = attachs_path_list
|
|
return self
|
|
|
|
def send(self, test_only=False):
|
|
if self.__flag__:
|
|
return SMTPMailerSendState({'code': 1, 'message': 'Mail has been sent.'})
|
|
if self.__addr__ == 0:
|
|
raise Exception('No receiver.')
|
|
message = MIMEMultipart()
|
|
if self.__from__ == 0 and self.test_email(self.__user__['user']) and self.send_from(self.__user__['user']):
|
|
pass
|
|
message['From'] = self.gen_header_of_mailer(self.__mail__['from'])
|
|
to_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['to']])
|
|
cc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['cc']])
|
|
bc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['bc']])
|
|
message['To'], message['Cc'], message['Bcc'] = to_header, cc_header, bc_header
|
|
message['Subject'] = Header(self.__mail__['subject'])
|
|
message.attach(MIMEText(self.__mail__['content'], 'html', 'utf-8'))
|
|
for file in self.__mail__['attachs']:
|
|
mime = MIMEBase('application', 'octet-stream')
|
|
mime.add_header('Content-Disposition', 'attachment', filename=os.path.basename(file))
|
|
mime.set_payload(open(file, 'rb').read())
|
|
encoders.encode_base64(mime)
|
|
message.attach(mime)
|
|
try:
|
|
smtp = [smtplib.SMTP, smtplib.SMTP_SSL][self.ssl_enable](
|
|
self.__host__['host'],
|
|
self.__host__['port']
|
|
)
|
|
except Exception:
|
|
return SMTPMailerSendState({'code': 1, 'message': 'Connection failure.'})
|
|
try:
|
|
smtp.login(self.__user__['user'], self.__user__['pass'])
|
|
except Exception:
|
|
return SMTPMailerSendState({'code': 2, 'message': 'Account authentication failed.'})
|
|
try:
|
|
receivers = []
|
|
for rg in [self.__mail__['to'], self.__mail__['cc'], self.__mail__['bc']]:
|
|
for rp in rg:
|
|
receivers.append(rp[1])
|
|
if test_only:
|
|
print(message.as_string(), file=sys.stderr)
|
|
return None
|
|
smtp.sendmail(self.__user__['user'], receivers, message.as_string())
|
|
smtp.quit()
|
|
self.__flag__ = 1
|
|
return SMTPMailerSendState({'code': 0, 'message': 'Mail sent successfully.'})
|
|
except Exception as error:
|
|
return SMTPMailerSendState({'code': 3, 'message': str(error)})
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# e.g. Send an email example.
|
|
print(SMTPMailer().enableSSL().host('smtp.qq.com', 465).user('admin@example.com', '******').
|
|
subject('Email Subject').content('This is some content...').to('user@example.com').send())
|
|
# e.g. Or you can send emails like this.
|
|
print(SMTPMailer(
|
|
username='admin@example.com',
|
|
password='******',
|
|
ssl_enable=True,
|
|
host='smtp.qq.com',
|
|
port=465,
|
|
# The recipient format can be "Nickname <Email>" or "Email", like this: "Mrs.Lisa <lisa@example.com>" or "lisa@example.com"
|
|
# Multiple recipients can be separated by semicolon(;) or comma(,).
|
|
# Multiple recipients you can also pass a list.
|
|
send_from='Administrator <admin@example.com>',
|
|
# Add recipients.
|
|
send_to='Mr.Tom <tom@example.com>, Mr.Jack <jack@example.com>',
|
|
# Add Cc.
|
|
send_cc='Lindsay <its-lindsay@example.com>, Nora <nora@example.com>',
|
|
# Add Bcc.
|
|
send_bc='frederica@example.com',
|
|
subject='Email Subject',
|
|
content='This is some content...',
|
|
# You can add attachments, you need to pass the path of a single or more attachment in the form of a list.
|
|
attachs=None
|
|
).send())
|