diff --git a/Base/Class/Command.py b/Base/Class/Command.py index 700eb9d..7bdbff2 100644 --- a/Base/Class/Command.py +++ b/Base/Class/Command.py @@ -1,5 +1,5 @@ import re,chardet,subprocess -def shell(cmd='', stdout='', stderr='', timeout=None): +def shell(cmd='', stdout=None, stderr=None, timeout=None): match stdout: case 'NULL' | 'Null' | 'null': stdoutCh = subprocess.DEVNULL @@ -10,19 +10,38 @@ def shell(cmd='', stdout='', stderr='', timeout=None): stderrCh = subprocess.DEVNULL case _: stderrCh = subprocess.PIPE - execResult = subprocess.run(cmd, shell=True, stdout=stdoutCh, stderr=stderrCh, timeout=timeout) - stdoutContent = '' - stderrContent = '' - if execResult.stdout: - stdoutContent = execResult.stdout.decode(chardet.detect(execResult.stdout)["encoding"] or 'UTF-8') - if execResult.stderr: - stderrContent = execResult.stderr.decode(chardet.detect(execResult.stderr)["encoding"] or 'UTF-8') - return { - "returncode": execResult.returncode, - "stdout": stdoutContent, - "stderr": stderrContent - } + try: + result = subprocess.run(cmd, shell=True, stdout=stdoutCh, stderr=stderrCh, timeout=timeout) + stdoutContent = '' + stderrContent = '' + if result.stdout: + stdoutContent = result.stdout.decode(chardet.detect(result.stdout)["encoding"] or 'UTF-8') + if result.stderr: + stderrContent = result.stderr.decode(chardet.detect(result.stderr)["encoding"] or 'UTF-8') + return { + "code": result.returncode, "stdout": stdoutContent, "stderr": stderrContent + } + except subprocess.TimeoutExpired: + return { + "code": 1, + "stdout": '', + "stderr": 'Execution timeout.' + } if __name__ == '__main__': - # Example. - print(shell('ping www.baidu.com',timeout=10)['stdout']) + # Example. Normal. + print(shell('ping www.taobao.com')) + # Example. No STDOUT. + print(shell('ping www.taobao.com', stdout='NULL')) + # Example. No STDERR. + print(shell('ping www.taobao.com', stderr='NULL')) + # Example. No STDOUT and STDERR. + print(shell('ping www.taobao.com', stdout='NULL', stderr='NULL')) + # Example. With Timeout. + print(shell('ping www.taobao.com', timeout=3)) + # Example. With Timeout Not. + print(shell('ping www.taobao.com', timeout=8)) + # Example. Curl. + print(shell('curl "https://www.baidu.com/"')) + # Example. Unknown Command. + print(shell('busybox ls')) diff --git a/Base/Class/DataBase.py b/Base/Class/DataBase.py index f416cc8..d884388 100644 --- a/Base/Class/DataBase.py +++ b/Base/Class/DataBase.py @@ -1,5 +1,5 @@ import re,pymysql.cursors -class Mysql(): +class MySQL(): __conn__ = None __curr__ = None def __init__(self,**kwargs): @@ -62,7 +62,7 @@ if __name__ == '__main__': connConfig = { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "root", "database": "db", "charset": "utf8", } - sqlConnect = Mysql(**connConfig,cursor='Dict') + sqlConnect = MySQL(**connConfig,cursor='Dict') sqlConnect.execute(''' CREATE TABLE `user` ( `uid` int(11) NOT NULL AUTO_INCREMENT, diff --git a/Base/Class/Mail.py b/Base/Class/Mail.py index 3e1fac1..0850bba 100644 --- a/Base/Class/Mail.py +++ b/Base/Class/Mail.py @@ -5,8 +5,6 @@ from email.mime.base import MIMEBase from email.header import Header from email import encoders class SMTPMailer(): - __send__ = 0 - __essl__ = False __host__ = { "host": '', "port": 25 @@ -24,63 +22,108 @@ class SMTPMailer(): "subject": '', "content": '' } + def __init__(self, ssl=False, host='', port=25, user='', password='', from_name='', from_addr='', send_to=None, send_cc=None, send_bc=None, subject='', content='', attachs=None): + self.sendflag = 0 + self.ssl = ssl + if host: + self.host(host, port) + if user: + self.user(user, password) + if from_addr: + self.sendFrom(from_name, from_addr) + if send_to: + self.sendTo(send_to) + if send_cc: + self.sendCc(send_cc) + if send_bc: + self.sendBc(send_bc) + if subject: + self.subject(subject) + if content: + self.content(content) + if attachs: + self.attachs(attachs) + def enableSSL(self, ch=True): - self.__essl__ = ch + self.ssl = ch + return self def host(self, host, port): self.__host__["host"] = host self.__host__["port"] = port + return self def user(self, user, password): self.__user__["user"] = user self.__user__["pass"] = password + return self def sendFrom(self, name='', addr=''): self.__mail__["fr"]["name"] = name self.__mail__["fr"]["addr"] = addr + return self - def sendTo(self,recvlist): - if isinstance(recvlist, str): - recvlist = [recvlist] - self.__mail__["to"] = recvlist + def sendTo(self,lst): + if isinstance(lst, str): + lst = [lst] + self.__mail__["to"] = lst + return self - def sendCc(self,recvlist): - if isinstance(recvlist, str): - recvlist = [recvlist] - self.__mail__["cc"] = recvlist + def sendCc(self,lst): + if isinstance(lst, str): + lst = [lst] + self.__mail__["cc"] = lst + return self - def sendBc(self,recvlist): - if isinstance(recvlist, str): - recvlist = [recvlist] - self.__mail__["bc"] = recvlist + def sendBc(self,lst): + if isinstance(lst, str): + lst = [lst] + self.__mail__["bc"] = lst + return self def subject(self,text): - self.__mail__["subject"] = text + if isinstance(text, str): + self.__mail__["subject"] = text + else: + raise Exception('类型异常 | Type exception.') + return self def content(self, text): - self.__mail__["content"] = text + if isinstance(text, str): + self.__mail__["content"] = text + else: + raise Exception('类型异常 | Type exception.') + return self - def attachs(self,filelist): - if isinstance(filelist, str): - filelist = [filelist] - for value in filelist: + def attachs(self,lst): + if isinstance(lst, str): + lst = [lst] + for value in lst: if not os.path.exists(value): raise Exception('文件未找到 | File not found.') - self.__mail__["attachs"] = filelist + self.__mail__["attachs"] = lst + return self def send(self): - if self.__send__: + if self.sendflag: return {"code": 1, "message": 'Mail has been sent.'} message = MIMEMultipart() messageFrom = Header() messageFrom.append(self.__mail__["fr"]["name"]) messageFrom.append('<' + self.__mail__["fr"]["addr"] + '>') + # 发件信息 message["From"] = messageFrom + # 收件列表 message["To"] = ';'.join(self.__mail__["to"]) + # 抄送列表 message["Cc"] = ';'.join(self.__mail__["cc"]) + # 密送列表 message["Bcc"] = ';'.join(self.__mail__["bc"]) - message['Subject'] = Header(self.__mail__["subject"]) + # 设置主题 + message["Subject"] = Header(self.__mail__["subject"]) + # 设置正文 message.attach(MIMEText(self.__mail__["content"], 'html', 'utf-8')) + # 添加附件 for value in self.__mail__["attachs"]: mime = MIMEBase('application', 'octet-stream') mime.add_header('Content-Disposition', 'attachment', filename=os.path.basename(value)) @@ -88,10 +131,7 @@ class SMTPMailer(): encoders.encode_base64(mime) message.attach(mime) try: - if self.__essl__: - smtp = smtplib.SMTP_SSL(self.__host__["host"], self.__host__["port"]) - else: - smtp = smtplib.SMTP(self.__host__["host"], self.__host__["port"]) + smtp = [smtplib.SMTP,smtplib.SMTP_SSL][self.ssl](self.__host__["host"], self.__host__["port"]) except: return {"code": 1, "message": 'Connection failure.'} try: @@ -101,22 +141,40 @@ class SMTPMailer(): try: smtp.sendmail(self.__user__["user"], self.__mail__["to"] + self.__mail__["cc"] + self.__mail__["bc"], message.as_string()) smtp.quit() - self.__send__ = 1 + self.sendflag = 1 return {"code": 0, "message": ''} except Exception as error: return {"code": 1, "message": error} if __name__ == '__main__': + # Example. + result = SMTPMailer( + ssl=True, + host='smtp.qq.com', + port=465, + user='', + password='', + from_name='云凡网络', + from_addr='admin@fanscloud.net', + send_to=['user01@example.com','user02@example.com','user03@example.com'], + send_cc=['user04@example.com','user05@example.com'], + send_bc=['user06@example.com'], + subject='标题', + content='
邮件发送测试
', + attachs=['./表格.xlsx', './文档.docx'] + ).send() + print(result) # Example. mailer = SMTPMailer() mailer.enableSSL(True) mailer.host('smtp.qq.com', 465) mailer.user('', '') - mailer.sendFrom('', '') - mailer.sendTo(['']) - mailer.sendCc(['']) - mailer.sendBc(['']) - mailer.subject('这是一封测试邮件') - mailer.content('邮件发送HTML格式文件测试
') - mailer.attachs(['./example.xlsx']) - print(mailer.send()) + mailer.sendFrom('云凡网络', 'admin@example.com') + mailer.sendTo(['user01@example.com','user02@example.com','user03@example.com']) + mailer.sendCc(['user04@example.com','user05@example.com']) + mailer.sendBc(['user06@example.com']) + mailer.subject('标题') + mailer.content('邮件发送测试
') + mailer.attachs(['./表格.xlsx', './文档.docx']) + result = mailer.send() + print(result) diff --git a/Base/Class/Network.py b/Base/Class/Network.py index 1247acf..186e482 100644 --- a/Base/Class/Network.py +++ b/Base/Class/Network.py @@ -1,6 +1,6 @@ import re,chardet,platform,subprocess,socket def ping(host='', version=None): - def sh(cmd='', timeout=None): + def shell_exec(cmd='', timeout=None): try: result = subprocess.run( cmd, shell=True, @@ -12,37 +12,48 @@ def ping(host='', version=None): return '' match version: case 4 | 6: - versionCh = ' -' + str(version) + version_ch = '\x20-' + str(version) case _: - versionCh = '' + version_ch = '' match platform.system(): case 'Windows': - result = sh('chcp 65001 && ping -n 1' + versionCh + ' ' + host, timeout=5) + result = shell_exec('chcp 65001 && ping -n 1' + version_ch + ' ' + host, timeout=5) case _: - result = sh('ping -c 1' + versionCh + ' ' + host, timeout=5) - delayList = re.findall('([0-9.]+)[\s]?ms*',result) - if len(delayList) == 0: + result = shell_exec('ping -c 1' + version_ch + ' ' + host, timeout=5) + delay = re.findall('([0-9.]+)[\s]?ms*',result) + if len(delay) == 0: return False else: - return round(float(delayList[0]),1) + return round(float(delay[0]),1) -def resolve(host=''): +def resolve(host='', version=None): + result = [] + lst = [] try: - addrLst = socket.getaddrinfo(host, None) + lst = socket.getaddrinfo(host, None) except: - addrLst = [] - inetLst = [] - for value in addrLst: + pass + for value in lst: + address = value[4][0] match value[0]: case socket.AddressFamily.AF_INET6: - t = 'IPv6' + if version == 6 or version == None: + result.append(address) case _: - t = 'IPv4' - inetLst.append([t, value[4][0]]) - return inetLst + if version == 4 or version == None: + result.append(address) + return result if __name__ == '__main__': - # Example. - print('PING:',ping('www.baidu.com'),'ms') - # Example. - print(resolve('dns.google')) + # Example. PING IPv4 Test. + print('PING:', ping('www.taobao.com', version=4), 'ms') + # Example. PING IPv6 Test. + print('PING:', ping('www.taobao.com', version=6), 'ms') + # Example. PING Auto Test. + print('PING:', ping('www.taobao.com'), 'ms') + # Example. Resolve IPv4-Only Host. + print(resolve('www.taobao.com', version=4)) + # Example. Resolve IPv6-Only Host. + print(resolve('www.taobao.com', version=6)) + # Example. Resolve IPv4/6 Host. + print(resolve('www.taobao.com')) diff --git a/main4.py b/main4.py index e69de29..e693212 100644 --- a/main4.py +++ b/main4.py @@ -0,0 +1 @@ +print('1\x202') \ No newline at end of file