Perfect at 06292325.

This commit is contained in:
zhaoyafan 2022-06-29 23:25:57 +08:00
parent 76fcaa9bb1
commit 5701ee0f3f
5 changed files with 164 additions and 75 deletions

View File

@ -1,5 +1,5 @@
import re,chardet,subprocess
def shell(cmd='', stdout='', stderr='', timeout=None):
def shell(cmd='', stdout=None, stderr=None, timeout=None):
match stdout:
case 'NULL' | 'Null' | 'null':
stdoutCh = subprocess.DEVNULL
@ -10,19 +10,38 @@ def shell(cmd='', stdout='', stderr='', timeout=None):
stderrCh = subprocess.DEVNULL
case _:
stderrCh = subprocess.PIPE
execResult = subprocess.run(cmd, shell=True, stdout=stdoutCh, stderr=stderrCh, timeout=timeout)
stdoutContent = ''
stderrContent = ''
if execResult.stdout:
stdoutContent = execResult.stdout.decode(chardet.detect(execResult.stdout)["encoding"] or 'UTF-8')
if execResult.stderr:
stderrContent = execResult.stderr.decode(chardet.detect(execResult.stderr)["encoding"] or 'UTF-8')
return {
"returncode": execResult.returncode,
"stdout": stdoutContent,
"stderr": stderrContent
}
try:
result = subprocess.run(cmd, shell=True, stdout=stdoutCh, stderr=stderrCh, timeout=timeout)
stdoutContent = ''
stderrContent = ''
if result.stdout:
stdoutContent = result.stdout.decode(chardet.detect(result.stdout)["encoding"] or 'UTF-8')
if result.stderr:
stderrContent = result.stderr.decode(chardet.detect(result.stderr)["encoding"] or 'UTF-8')
return {
"code": result.returncode, "stdout": stdoutContent, "stderr": stderrContent
}
except subprocess.TimeoutExpired:
return {
"code": 1,
"stdout": '',
"stderr": 'Execution timeout.'
}
if __name__ == '__main__':
# Example.
print(shell('ping www.baidu.com',timeout=10)['stdout'])
# Example. Normal.
print(shell('ping www.taobao.com'))
# Example. No STDOUT.
print(shell('ping www.taobao.com', stdout='NULL'))
# Example. No STDERR.
print(shell('ping www.taobao.com', stderr='NULL'))
# Example. No STDOUT and STDERR.
print(shell('ping www.taobao.com', stdout='NULL', stderr='NULL'))
# Example. With Timeout.
print(shell('ping www.taobao.com', timeout=3))
# Example. With Timeout Not.
print(shell('ping www.taobao.com', timeout=8))
# Example. Curl.
print(shell('curl "https://www.baidu.com/"'))
# Example. Unknown Command.
print(shell('busybox ls'))

View File

@ -1,5 +1,5 @@
import re,pymysql.cursors
class Mysql():
class MySQL():
__conn__ = None
__curr__ = None
def __init__(self,**kwargs):
@ -62,7 +62,7 @@ if __name__ == '__main__':
connConfig = {
"host": "127.0.0.1", "port": 3306, "user": "root", "password": "root", "database": "db", "charset": "utf8",
}
sqlConnect = Mysql(**connConfig,cursor='Dict')
sqlConnect = MySQL(**connConfig,cursor='Dict')
sqlConnect.execute('''
CREATE TABLE `user` (
`uid` int(11) NOT NULL AUTO_INCREMENT,

View File

@ -5,8 +5,6 @@ from email.mime.base import MIMEBase
from email.header import Header
from email import encoders
class SMTPMailer():
__send__ = 0
__essl__ = False
__host__ = {
"host": '',
"port": 25
@ -24,63 +22,108 @@ class SMTPMailer():
"subject": '',
"content": ''
}
def __init__(self, ssl=False, host='', port=25, user='', password='', from_name='', from_addr='', send_to=None, send_cc=None, send_bc=None, subject='', content='', attachs=None):
self.sendflag = 0
self.ssl = ssl
if host:
self.host(host, port)
if user:
self.user(user, password)
if from_addr:
self.sendFrom(from_name, from_addr)
if send_to:
self.sendTo(send_to)
if send_cc:
self.sendCc(send_cc)
if send_bc:
self.sendBc(send_bc)
if subject:
self.subject(subject)
if content:
self.content(content)
if attachs:
self.attachs(attachs)
def enableSSL(self, ch=True):
self.__essl__ = ch
self.ssl = ch
return self
def host(self, host, port):
self.__host__["host"] = host
self.__host__["port"] = port
return self
def user(self, user, password):
self.__user__["user"] = user
self.__user__["pass"] = password
return self
def sendFrom(self, name='', addr=''):
self.__mail__["fr"]["name"] = name
self.__mail__["fr"]["addr"] = addr
return self
def sendTo(self,recvlist):
if isinstance(recvlist, str):
recvlist = [recvlist]
self.__mail__["to"] = recvlist
def sendTo(self,lst):
if isinstance(lst, str):
lst = [lst]
self.__mail__["to"] = lst
return self
def sendCc(self,recvlist):
if isinstance(recvlist, str):
recvlist = [recvlist]
self.__mail__["cc"] = recvlist
def sendCc(self,lst):
if isinstance(lst, str):
lst = [lst]
self.__mail__["cc"] = lst
return self
def sendBc(self,recvlist):
if isinstance(recvlist, str):
recvlist = [recvlist]
self.__mail__["bc"] = recvlist
def sendBc(self,lst):
if isinstance(lst, str):
lst = [lst]
self.__mail__["bc"] = lst
return self
def subject(self,text):
self.__mail__["subject"] = text
if isinstance(text, str):
self.__mail__["subject"] = text
else:
raise Exception('类型异常 | Type exception.')
return self
def content(self, text):
self.__mail__["content"] = text
if isinstance(text, str):
self.__mail__["content"] = text
else:
raise Exception('类型异常 | Type exception.')
return self
def attachs(self,filelist):
if isinstance(filelist, str):
filelist = [filelist]
for value in filelist:
def attachs(self,lst):
if isinstance(lst, str):
lst = [lst]
for value in lst:
if not os.path.exists(value):
raise Exception('文件未找到 | File not found.')
self.__mail__["attachs"] = filelist
self.__mail__["attachs"] = lst
return self
def send(self):
if self.__send__:
if self.sendflag:
return {"code": 1, "message": 'Mail has been sent.'}
message = MIMEMultipart()
messageFrom = Header()
messageFrom.append(self.__mail__["fr"]["name"])
messageFrom.append('<' + self.__mail__["fr"]["addr"] + '>')
# 发件信息
message["From"] = messageFrom
# 收件列表
message["To"] = ';'.join(self.__mail__["to"])
# 抄送列表
message["Cc"] = ';'.join(self.__mail__["cc"])
# 密送列表
message["Bcc"] = ';'.join(self.__mail__["bc"])
message['Subject'] = Header(self.__mail__["subject"])
# 设置主题
message["Subject"] = Header(self.__mail__["subject"])
# 设置正文
message.attach(MIMEText(self.__mail__["content"], 'html', 'utf-8'))
# 添加附件
for value in self.__mail__["attachs"]:
mime = MIMEBase('application', 'octet-stream')
mime.add_header('Content-Disposition', 'attachment', filename=os.path.basename(value))
@ -88,10 +131,7 @@ class SMTPMailer():
encoders.encode_base64(mime)
message.attach(mime)
try:
if self.__essl__:
smtp = smtplib.SMTP_SSL(self.__host__["host"], self.__host__["port"])
else:
smtp = smtplib.SMTP(self.__host__["host"], self.__host__["port"])
smtp = [smtplib.SMTP,smtplib.SMTP_SSL][self.ssl](self.__host__["host"], self.__host__["port"])
except:
return {"code": 1, "message": 'Connection failure.'}
try:
@ -101,22 +141,40 @@ class SMTPMailer():
try:
smtp.sendmail(self.__user__["user"], self.__mail__["to"] + self.__mail__["cc"] + self.__mail__["bc"], message.as_string())
smtp.quit()
self.__send__ = 1
self.sendflag = 1
return {"code": 0, "message": ''}
except Exception as error:
return {"code": 1, "message": error}
if __name__ == '__main__':
# Example.
result = SMTPMailer(
ssl=True,
host='smtp.qq.com',
port=465,
user='',
password='',
from_name='云凡网络',
from_addr='admin@fanscloud.net',
send_to=['user01@example.com','user02@example.com','user03@example.com'],
send_cc=['user04@example.com','user05@example.com'],
send_bc=['user06@example.com'],
subject='标题',
content='<p>邮件发送测试</p><p><a href="https://www.baidu.com/">链接测试</a></p>',
attachs=['./表格.xlsx', './文档.docx']
).send()
print(result)
# Example.
mailer = SMTPMailer()
mailer.enableSSL(True)
mailer.host('smtp.qq.com', 465)
mailer.user('', '')
mailer.sendFrom('', '')
mailer.sendTo([''])
mailer.sendCc([''])
mailer.sendBc([''])
mailer.subject('这是一封测试邮件')
mailer.content('<p>邮件发送HTML格式文件测试</p><p><a href="http://www.baidu.com/">这是一个链接</a></p>')
mailer.attachs(['./example.xlsx'])
print(mailer.send())
mailer.sendFrom('云凡网络', 'admin@example.com')
mailer.sendTo(['user01@example.com','user02@example.com','user03@example.com'])
mailer.sendCc(['user04@example.com','user05@example.com'])
mailer.sendBc(['user06@example.com'])
mailer.subject('标题')
mailer.content('<p>邮件发送测试</p><p><a href="https://www.baidu.com/">链接测试</a></p>')
mailer.attachs(['./表格.xlsx', './文档.docx'])
result = mailer.send()
print(result)

View File

@ -1,6 +1,6 @@
import re,chardet,platform,subprocess,socket
def ping(host='', version=None):
def sh(cmd='', timeout=None):
def shell_exec(cmd='', timeout=None):
try:
result = subprocess.run(
cmd, shell=True,
@ -12,37 +12,48 @@ def ping(host='', version=None):
return ''
match version:
case 4 | 6:
versionCh = ' -' + str(version)
version_ch = '\x20-' + str(version)
case _:
versionCh = ''
version_ch = ''
match platform.system():
case 'Windows':
result = sh('chcp 65001 && ping -n 1' + versionCh + ' ' + host, timeout=5)
result = shell_exec('chcp 65001 && ping -n 1' + version_ch + ' ' + host, timeout=5)
case _:
result = sh('ping -c 1' + versionCh + ' ' + host, timeout=5)
delayList = re.findall('([0-9.]+)[\s]?ms*',result)
if len(delayList) == 0:
result = shell_exec('ping -c 1' + version_ch + ' ' + host, timeout=5)
delay = re.findall('([0-9.]+)[\s]?ms*',result)
if len(delay) == 0:
return False
else:
return round(float(delayList[0]),1)
return round(float(delay[0]),1)
def resolve(host=''):
def resolve(host='', version=None):
result = []
lst = []
try:
addrLst = socket.getaddrinfo(host, None)
lst = socket.getaddrinfo(host, None)
except:
addrLst = []
inetLst = []
for value in addrLst:
pass
for value in lst:
address = value[4][0]
match value[0]:
case socket.AddressFamily.AF_INET6:
t = 'IPv6'
if version == 6 or version == None:
result.append(address)
case _:
t = 'IPv4'
inetLst.append([t, value[4][0]])
return inetLst
if version == 4 or version == None:
result.append(address)
return result
if __name__ == '__main__':
# Example.
print('PING:',ping('www.baidu.com'),'ms')
# Example.
print(resolve('dns.google'))
# Example. PING IPv4 Test.
print('PING:', ping('www.taobao.com', version=4), 'ms')
# Example. PING IPv6 Test.
print('PING:', ping('www.taobao.com', version=6), 'ms')
# Example. PING Auto Test.
print('PING:', ping('www.taobao.com'), 'ms')
# Example. Resolve IPv4-Only Host.
print(resolve('www.taobao.com', version=4))
# Example. Resolve IPv6-Only Host.
print(resolve('www.taobao.com', version=6))
# Example. Resolve IPv4/6 Host.
print(resolve('www.taobao.com'))

View File

@ -0,0 +1 @@
print('1\x202')