This commit is contained in:
zhaoyafan 2023-03-15 19:40:39 +08:00
parent e3d811158b
commit de8568bc39
20 changed files with 1130 additions and 1 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*pytest_cache/
*allure*/
*pycache*/

29
Lib/Base64.py Normal file
View File

@ -0,0 +1,29 @@
import base64
def base64_encode(data, output=None):
"""
:param data: Bytes or String or FileStream.
:param output: Output file stream, when the data parameter is the file stream valid.
:return: What type of data entering returns the same data type.
"""
if isinstance(data, bytes):
return base64.b64encode(data)
if isinstance(data, str):
return base64.b64encode(bytes(data, encoding='utf-8')).decode()
else:
return base64.encode(data, output)
def base64_decode(data, output=None):
"""
:param data: Bytes or String or FileStream.
:param output: Output file stream, when the data parameter is the file stream valid.
:return: What type of data entering returns the same data type.
"""
if isinstance(data, bytes):
return base64.b64decode(data)
if isinstance(data, str):
return base64.b64decode(bytes(data, encoding='utf-8')).decode()
else:
return base64.decode(data, output)

477
Lib/Driver.py Normal file
View File

@ -0,0 +1,477 @@
from selenium import webdriver as _root_webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import *
import tempfile
import platform
import random
import json
import time
import sys
import os
# You can set the drive path and browser location through the environment variables,
# and select the webdriver type.
# It has preferential priority through environmental variable settings.
SELENIUM_WIRE = os.environ.get('SELENIUM_WIRE') or ''
# Attention!!!
# Must set the environment before importing selenium.
# Otherwise, these settings will be invalid.
SELENIUM_BROWSER_CHOOSE = os.environ.get('SELENIUM_BROWSER_CHOOSE') or 'Chrome'
SELENIUM_BROWSER_DRIVER = os.environ.get('SELENIUM_BROWSER_DRIVER') or ''
SELENIUM_BROWSER_BINARY = os.environ.get('SELENIUM_BROWSER_BINARY') or ''
# Whether to turn on headless mode.
HEADLESS_BROWSER_ENABLE = True if os.environ.get('HEADLESS') == '1' else False
# Choose Selenium or Selenium-Wire, default: Selenium.
if SELENIUM_WIRE:
exec('from seleniumwire import webdriver as _root_webdriver')
# Choose options and service.
if SELENIUM_BROWSER_CHOOSE:
exec('from selenium.webdriver.%s.options import Options' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').lower())
exec('from selenium.webdriver.%s.service import Service' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').lower())
# Use the chrome driver by default, such a code writing method is for grammar prompts.
DriverChoose = _root_webdriver.Chrome
if SELENIUM_BROWSER_CHOOSE:
exec('DriverChoose=_root_webdriver.%s' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').capitalize())
class BrowserMobileEmulation(dict):
"""
Mobile emulation parameters.
"""
def __init__(self, w=540, h=960, user_agent=None):
du = 'Mozilla/5.0 (Linux; U; Android 13; zh-cn; 2109119BC Build/TKQ1.220829.002) ' \
'AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 ' \
'Chrome/98.0.4758.102 MQQBrowser/13.6 Mobile Safari/537.36'
user_agent = user_agent or du
super().__init__({'w': w, 'h': h, 'user_agent': user_agent})
self.w = self.h = self.user_agent = None
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
class DefaultChromeLocation:
xDriver = '/opt/google/chrome/chromedriver'
xChrome = '/opt/google/chrome/chrome'
wDriver = 'C:/Program Files/Google/Chrome/Application/chromedriver.exe'
wChrome = 'C:/Program Files/Google/Chrome/Application/chrome.exe'
class PositionTab:
"""
Position for switch tab.
"""
Prev = 'Go-Prev'
Next = 'Go-Next'
class Browser(DriverChoose):
"""
Browser web driver.
"""
def __init__(
self,
driver: str = None,
binary: str = None,
headless: bool = False,
lang: str = None,
mute: bool = False,
no_images: bool = False,
user_agent: str = None,
http_proxy: str = None,
home: str = None,
window_size: str = None,
mobile_emulation: BrowserMobileEmulation = None,
option_arguments: list = None,
req_interceptor=None,
res_interceptor=None,
):
choose = SELENIUM_BROWSER_CHOOSE.capitalize()
self.platform = platform.uname().system
default_driver = None
default_binary = None
headless = HEADLESS_BROWSER_ENABLE or headless
if choose == 'Chrome':
if self.platform == 'Linux':
default_driver = DefaultChromeLocation.xDriver
default_binary = DefaultChromeLocation.xChrome
else:
default_driver = DefaultChromeLocation.wDriver
default_binary = DefaultChromeLocation.wChrome
driver = SELENIUM_BROWSER_DRIVER or driver or default_driver
binary = SELENIUM_BROWSER_BINARY or binary or default_binary
if self.platform == 'Linux' and not window_size: window_size = '1920x1080'
if self.platform == 'Windows' and headless and not window_size: window_size = '1920x1080'
# Initialization settings.
if (isinstance(option_arguments, list)) is False: option_arguments = []
cdplist = []
service = Service()
options = Options()
self.cdplist = cdplist
# Delete prompt information of chrome being controlled.
hasattr(options, 'add_experimental_option') and options.add_experimental_option("excludeSwitches", ['enable-automation', 'enable-logging'])
# Mobile emulation parameter setting start.
if mobile_emulation:
if hasattr(options, 'add_experimental_option') is False:
raise Exception('Do not support mobile emulation currently.')
self.w_browser = mobile_emulation.w + 14
self.h_browser = mobile_emulation.h + 0
self.w_inner_window = mobile_emulation.w + 0
self.h_inner_window = mobile_emulation.h - 86
self.mobile_emulation_screen_w = mobile_emulation.w
self.mobile_emulation_screen_h = mobile_emulation.h
window_size = '%s,%s' % (self.w_browser, self.h_browser)
options.add_experimental_option(
'mobileEmulation', {
'deviceMetrics': {
'width': self.w_inner_window,
'height': self.h_inner_window,
'pixelRatio': 2.75,
'touch': True
},
'userAgent': mobile_emulation.user_agent
}
)
cdplist.append([
'Emulation.setUserAgentOverride', {
'userAgent': mobile_emulation.user_agent,
'userAgentMetadata': {
'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone',
'mobile': True,
'platformVersion': '',
'architecture': '',
'model': ''
}
}
])
else:
self.w_browser = 0
self.h_browser = 0
self.w_inner_window = 0
self.h_inner_window = 0
self.mobile_emulation_screen_w = 0
self.mobile_emulation_screen_h = 0
# Mobile emulation parameter setting end.
# Set browser and webdriver path.
if driver:
service.path = driver
if binary:
options.binary_location = binary
# Add webdriver option arguments.
for i in option_arguments:
options.add_argument(i)
# Set headless mode.
if self.platform == 'Linux' or headless:
options.add_argument('--headless')
# Set no-sandbox mode.
if self.platform == 'Linux':
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
# Set language of browser, default is zh-CN.
if lang:
options.add_argument('--lang=%s' % (lang or 'zh-CN'))
hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN')
# Set mute.
if mute:
options.add_argument('--mute-audio=true')
hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr)
# Set no images mode.
if no_images:
options.add_argument('--blink-settings=imagesEnabled=false')
hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr)
# Set default user agent.
if user_agent:
options.add_argument('--user-agent=%s' % user_agent)
hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent)
# Set http proxy for browser.
if http_proxy:
options.add_argument('--proxy-server=http://%s' % http_proxy)
# Set browser window size before startup.
if window_size:
options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ','))
else:
options.add_argument('--start-maximized')
# Start the browser.
super().__init__(service=service, options=options)
# Selenium-Wire backend optimization start.
try:
self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.')
self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.')
self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.')
except AttributeError:
pass
# Selenium-Wire backend optimization end.
if mobile_emulation:
cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}])
cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}])
cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}])
# Set the request and response interceptor.
if req_interceptor:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.request_interceptor = req_interceptor
if res_interceptor:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.response_interceptor = res_interceptor
# Sync set http proxy for Selenium-Wire backend.
if http_proxy:
self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy}
# Set browser window size after startup, by default, there will be full screen display window.
if window_size:
self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(','))
else:
self.maximize_window()
# Sets a sticky timeout to implicitly wait for an element to be found.
self.implicitly_wait(10)
# Set the amount of time to wait for a page load to complete.
self.set_page_load_timeout(25)
# Open the default page.
home and self.open(home)
@staticmethod
def wait(secs: int | float = 1):
"""
Will sleep waiting.
"""
number_int = int(secs)
number_float = secs - number_int
for i in range(number_int):
time.sleep(1)
else:
time.sleep(number_float)
def open(self, url=None):
"""
Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer.
"""
self.update_cdp_command()
return self.get(url)
def turn(self, url=None):
"""
Simulation "window.location.href" jumps, the new page has Referrer.
"""
return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None)
def find(self, path):
"""
Use XPath to find an element.
"""
return self.find_element(By.XPATH, path)
def find_mult(self, path):
"""
Use XPath to find elements.
"""
return self.find_elements(By.XPATH, path)
def find_element_by(self, sentence):
"""
Custom find element, pass into a tuple or list.
"""
return self.find_element(*sentence)
def click(self, element):
"""
Click element for desktop version.
"""
self.action_chains().reset_actions()
self.action_chains().click(element).perform()
self.wait(0.1)
def touch(self, x, y):
"""
Click on the coordinates for Mobile edition.
"""
self.action_chains().reset_actions()
self.action_chains().move_by_offset(x, y).click().perform()
self.wait(0.1)
def input(self, element, content):
"""
Enter the content to the element.
"""
self.action_chains().reset_actions()
self.action_chains().send_keys_to_element(element, content).perform()
self.wait(0.1)
def mouse(self, element):
"""
Park the mouse here.
"""
self.action_chains().reset_actions()
self.action_chains().move_to_element(element).perform()
def tab_create(self, url=None):
"""
Create a new tab and open the URL.
"""
self.switch_to.new_window('tab')
self.update_cdp_command()
url and self.open(url)
def tab_switch(self, tab: int | str):
"""
Switch the browser tab page
"""
handles = self.window_handles
lengths = len(handles)
current = handles.index(self.current_window_handle)
if isinstance(tab, int):
handle = tab
elif tab == PositionTab.Prev:
handle = (current - 1)
elif tab in PositionTab.Next:
handle = (current + 1) % lengths
else:
handle = None
self.switch_to.window(handles[handle])
self.wait(0.2)
self.update_cdp_command()
def tab_cancel(self):
"""
Close the current browser tab page.
"""
handles = self.window_handles
if len(handles):
current = handles.index(self.current_window_handle)
self.close()
current > 0 and self.switch_to.window(handles[current - 1])
self.wait(0.2)
def tab_cancel_all(self):
"""
Close all the browser tab page.
"""
handles = self.window_handles
for i in handles:
self.tab_cancel()
def frame_switch_to(self, element_of_frame):
"""
Switch frame to the specified frame element.
"""
self.switch_to.frame(element_of_frame)
self.wait(0.2)
def frame_switch_to_default(self):
"""
Switch to the default frame.
"""
self.switch_to.default_content()
self.wait(0.2)
def scroll(self):
"""
Scroll page.
:return:
"""
self.action_chains().reset_actions()
self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform()
self.wait(0.8)
def scroll_to(self, pos: int | str):
"""
Scroll to the specified location.
"""
if isinstance(pos, int) and pos > 0:
self.execute_script('window.scrollTo(0, arguments[0]);', pos)
elif pos == 0:
self.execute_script('window.scrollTo(0, 0);')
elif pos == 0 - 1:
self.execute_script('window.scrollTo(0, document.body.scrollHeight);')
else:
pass
self.wait(0.8)
def scroll_to_element(self, element):
"""
Scroll to the specified element location.
"""
self.action_chains().reset_actions()
self.action_chains().scroll_to_element(element).perform()
self.wait(0.8)
def element_force_display(self, element):
"""
Make hidden element visible and interactive.
"""
self.execute_script(
'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element
)
def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None):
"""
Return WebDriverWait object.
"""
return WebDriverWait(
driver=self,
timeout=timeout,
poll_frequency=poll_frequency,
ignored_exceptions=ignored_exceptions
)
def current_alert(self):
"""
Return current alert object.
"""
return Alert(self)
def window_inner_size(self):
"""
Get the page window inner size.
"""
size = self.execute_script('return [window.innerWidth, window.innerHeight];')
return {'w': size[0] or 0, 'h': size[1] or 0}
def action_chains(self):
"""
Return ActionChains object.
"""
return ActionChains(self)
def screenshot(self) -> bytes:
"""
Screenshot as bytes.
"""
return self.get_screenshot_as_png()
def update_cdp_command(self) -> None:
for cmd in self.cdplist:
self.execute_cdp_cmd(*cmd)
class WebDriver(Browser):
"""
Get a browser driver object.
"""
def __init__(self):
super().__init__(lang='zh-CN')
if __name__ == '__main__':
# e.g. Test it can work normally.
driver = WebDriver()
driver.open('https://www.hao123.com/')
driver.wait()
driver.quit()

104
Lib/Http.py Normal file
View File

@ -0,0 +1,104 @@
import requests
import requests.utils
class HTTPResponseObject(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url = None
self.status = None
self.reason = None
self.header = None
self.cookie = None
self.binary = None
self.text, self.json = None, None
self.encoding, self.duration = None, None
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except Exception:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except Exception:
return None
class HTTPRequest:
__http__ = requests
__accept_method__ = ('GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 'PATCH')
def http(self, method='GET', url: str = None, query=None, data=None, json=None, file=None, header=None, cookie=None,
ua: str = None, auth=None, timeout: int | float = None, proxy: str = None, auto_redirect=False, ignore_cert_error=False):
method = method.upper()
if method not in self.__accept_method__:
raise Exception('Unsupported request method.')
if method == 'HEAD':
auto_redirect = False
if ua:
if not isinstance(header, dict):
header = {}
header['User-Agent'] = ua
if isinstance(proxy, str) and len(proxy) >= 5:
proxy = {'http': proxy, 'https': proxy}
else:
proxy = None
try:
response = self.__http__.request(
method=method,
url=url,
params=query,
data=data,
headers=header,
cookies=cookie,
files=file,
auth=auth,
timeout=timeout,
allow_redirects=auto_redirect,
proxies=proxy,
verify=not ignore_cert_error,
json=json
)
except Exception:
response = None
try:
response_json = response.json()
except Exception:
response_json = None
if response:
return HTTPResponseObject({
'url': response.url,
'status': response.status_code,
'reason': response.reason,
'header': response.headers,
'cookie': requests.utils.dict_from_cookiejar(response.cookies),
'binary': response.content,
'text': response.text,
'json': response_json,
'encoding': response.encoding,
'duration': round(response.elapsed.microseconds / 1000, 1)
})
else:
return HTTPResponseObject({
'status': -1
})
class HTTPSession(HTTPRequest):
__http__ = requests.session()
if __name__ == '__main__':
# e.g. Send a get request.
res = HTTPSession().http(
method='GET', url='http://pv.sohu.com/cityjson', query={'ie': 'utf-8'},
ua='Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
auto_redirect=True
)
print(res)

9
Lib/Json.py Normal file
View File

@ -0,0 +1,9 @@
import json
def json_encode(data, indent=None, unicode=True):
return json.dumps(data, indent=indent, ensure_ascii=unicode)
def json_decode(data):
return json.loads(data)

43
Lib/LoadConf.py Normal file
View File

@ -0,0 +1,43 @@
from Lib.Json import json_decode
from Lib.Yaml import yaml_decode
import os
class LoadConf(dict):
"""
Load the configuration file from the configuration directory.
"""
def __init__(self, name: str):
home = '%s%s%s' % (os.path.dirname(os.path.dirname(__file__)), os.sep, 'Conf')
try:
os.path.exists(home) or os.makedirs(home)
except FileExistsError:
pass
json_extension = '.json'
yaml_extension = '.yaml'
pa_json = '%s/%s%s' % (home, name, json_extension)
pa_yaml = '%s/%s%s' % (home, name, yaml_extension)
data = None
if os.path.exists(pa_json):
file = pa_json
data = json_decode(open(file, encoding='utf-8').read())
if os.path.exists(pa_yaml):
file = pa_yaml
data = yaml_decode(open(file, encoding='utf-8').read())
if data is None:
raise FileNotFoundError('Not found json or yaml configuration "%s" in %s.' % (name, home))
super().__init__(data)
def __setattr__(self, key, value):
raise AttributeError('Attribute is read only.')
def __getattr__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
def __setitem__(self, key, value):
return self.__setattr__(key, value)
def __getitem__(self, item):
return self.__getattr__(item)

53
Lib/LoadData.py Normal file
View File

@ -0,0 +1,53 @@
from Lib.Json import json_decode
from Lib.Yaml import yaml_decode
import csv
import os
class LoadData:
"""
Load the data file from the data directory.
"""
def __init__(self):
home = '%s%s%s' % (os.path.dirname(os.path.dirname(__file__)), os.sep, 'Data')
try:
os.path.exists(home) or os.makedirs(home)
except FileExistsError:
pass
self.home = home
def fromJson(self, filename: str, from_data_home=True):
return json_decode(open('%s/%s' % (self.home, filename) if from_data_home else filename, encoding='utf-8').read())
def fromYaml(self, filename: str, from_data_home=True):
return yaml_decode(open('%s/%s' % (self.home, filename) if from_data_home else filename, encoding='utf-8').read())
def fromCsv(self, filename: str, from_data_home=True):
"""
Load data from CSV file, compatible standard format.
:param filename: Filename or path.
:param from_data_home: Whether to load from the data directory, otherwise use the absolute path.
:return: List.
"""
file = '%s/%s' % (self.home, filename) if from_data_home else filename
try:
open(file).read(4096)
encoding = None
except UnicodeDecodeError:
encoding = 'utf-8'
return [row for row in csv.reader(open(file, encoding=encoding))]
def fromTxt(self, filename: str, from_data_home=True):
"""
Load the data from the txt text file, and one line is used as a data.
:param filename: Filename or path.
:param from_data_home: Whether to load from the data directory, otherwise use the absolute path.
:return: List.
"""
file = '%s/%s' % (self.home, filename) if from_data_home else filename
try:
open(file).read(4096)
encoding = None
except UnicodeDecodeError:
encoding = 'utf-8'
return [row.rstrip("\n") for row in open(file, encoding=encoding).readlines()]

29
Lib/SeleniumClear.py Normal file
View File

@ -0,0 +1,29 @@
import tempfile
import platform
import shutil
import glob
import os
def clear_selenium():
if platform.uname().system == 'Windows':
user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')]
if user_home[0] and user_home[1]:
try:
shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1]))
except FileNotFoundError:
pass
def clear_driver_cache():
for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']:
for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)):
try:
shutil.rmtree(i)
except (FileNotFoundError, PermissionError, WindowsError):
pass
if __name__ == '__main__':
# e.g. Clear
clear_selenium()
clear_driver_cache()

247
Lib/Smtp.py Normal file
View File

@ -0,0 +1,247 @@
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.header import Header
from email import encoders
import smtplib
import sys
import os
import re
class SMTPMailerSendState(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.code = self.message = None
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except Exception:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except Exception:
return None
class SMTPMailer:
__host__ = {
'host': '',
'port': 25
}
__user__ = {
'user': '',
'pass': ''
}
__mail__ = {
'from': [],
'to': [],
'cc': [],
'bc': [],
'attachs': [],
'subject': '',
'content': ''
}
__flag__ = 0
__addr__ = 0
__from__ = 0
def __init__(
self,
username='',
password='',
host='127.0.0.1',
port=25,
ssl_enable=False,
send_from='',
send_to=None,
send_cc=None,
send_bc=None,
subject='',
content='',
attachs=None
):
self.ssl_enable = ssl_enable
self.host(host, port)
self.user(username, password)
send_from and self.send_from(send_from)
send_to and self.to(send_to)
send_cc and self.cc(send_cc)
send_bc and self.bc(send_bc)
subject and self.subject(subject)
content and self.content(content)
attachs and self.attachs(attachs)
@staticmethod
def test_email(email: str, throw_exception=False):
test = re.fullmatch('^([a-zA-Z0-9_.\\-])+@(([a-zA-Z0-9\\-])+\\.)+([a-zA-Z0-9]{2,4})+$', email) is not None
if throw_exception and test is False:
raise Exception('Illegal email address: %s.' % email)
return test
@staticmethod
def gen_header_of_mailer(e):
try:
return "\"%s\" <%s>" % (Header(e[0]).encode(), Header(e[1]).encode())
except Exception:
return ''
def parse_email_address(self, email: str):
if email.find('<') != -1 and email.find('>') != -1:
match_email = re.findall('^(.*?)<(.*?)>$', email)
if len(match_email) == 1:
addr = match_email[0][1].strip()
self.test_email(addr, throw_exception=True)
name = match_email[0][0].strip() or addr.split('@')[0].strip()
return [name, addr]
else:
raise Exception('The wrong email address format: %s.' % email)
else:
if len(email) != 0:
addr = email
self.test_email(addr, throw_exception=True)
name = email.split('@')[0].strip()
return [name, addr]
else:
raise Exception('Empty email address.')
def parse_email_address_mult(self, data):
if not isinstance(data, (str, list, tuple)):
raise TypeError('Email address list must be string or list or tuple type.')
if isinstance(data, str):
data = [i.strip() for i in data.replace(',', ';').split(';')]
while '' in data:
data.remove('')
return [self.parse_email_address(email_format) for email_format in data]
def enableSSL(self):
self.ssl_enable = True
return self
def host(self, host: str, port: int):
self.__host__['host'] = host
self.__host__['port'] = port
return self
def user(self, username, password):
self.__user__['user'] = username
self.__user__['pass'] = password
return self
def send_from(self, e):
self.__mail__['from'] = self.parse_email_address(e)
self.__from__ += 1
return self
def to(self, receive_list):
self.__mail__['to'] = self.parse_email_address_mult(receive_list)
self.__addr__ += 1
return self
def cc(self, receive_list):
self.__mail__['cc'] = self.parse_email_address_mult(receive_list)
self.__addr__ += 1
return self
def bc(self, receive_list):
self.__mail__['bc'] = self.parse_email_address_mult(receive_list)
self.__addr__ += 1
return self
def subject(self, text: str):
self.__mail__['subject'] = text
return self
def content(self, text: str):
self.__mail__['content'] = text
return self
def attachs(self, attachs_path_list: str | list):
if isinstance(attachs_path_list, str):
attachs_path_list = [attachs_path_list]
for path in attachs_path_list:
if not os.path.exists(path):
raise FileNotFoundError('The attachment was not found.')
self.__mail__['attachs'] = attachs_path_list
return self
def send(self, test_only=False):
if self.__flag__:
return SMTPMailerSendState({'code': 1, 'message': 'Mail has been sent.'})
if self.__addr__ == 0:
raise Exception('No receiver.')
message = MIMEMultipart()
if self.__from__ == 0 and self.test_email(self.__user__['user']) and self.send_from(self.__user__['user']):
pass
message['From'] = self.gen_header_of_mailer(self.__mail__['from'])
to_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['to']])
cc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['cc']])
bc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['bc']])
message['To'], message['Cc'], message['Bcc'] = to_header, cc_header, bc_header
message['Subject'] = Header(self.__mail__['subject'])
message.attach(MIMEText(self.__mail__['content'], 'html', 'utf-8'))
for file in self.__mail__['attachs']:
mime = MIMEBase('application', 'octet-stream')
mime.add_header('Content-Disposition', 'attachment', filename=os.path.basename(file))
mime.set_payload(open(file, 'rb').read())
encoders.encode_base64(mime)
message.attach(mime)
try:
smtp = [smtplib.SMTP, smtplib.SMTP_SSL][self.ssl_enable](
self.__host__['host'],
self.__host__['port']
)
except Exception:
return SMTPMailerSendState({'code': 1, 'message': 'Connection failure.'})
try:
smtp.login(self.__user__['user'], self.__user__['pass'])
except Exception:
return SMTPMailerSendState({'code': 2, 'message': 'Account authentication failed.'})
try:
receivers = []
for rg in [self.__mail__['to'], self.__mail__['cc'], self.__mail__['bc']]:
for rp in rg:
receivers.append(rp[1])
if test_only:
print(message.as_string(), file=sys.stderr)
return None
smtp.sendmail(self.__user__['user'], receivers, message.as_string())
smtp.quit()
self.__flag__ = 1
return SMTPMailerSendState({'code': 0, 'message': 'Mail sent successfully.'})
except Exception as error:
return SMTPMailerSendState({'code': 3, 'message': str(error)})
if __name__ == '__main__':
# e.g. Send an email example.
print(SMTPMailer().enableSSL().host('smtp.qq.com', 465).user('admin@example.com', '******').
subject('Email Subject').content('This is some content...').to('user@example.com').send())
# e.g. Or you can send emails like this.
print(SMTPMailer(
username='admin@example.com',
password='******',
ssl_enable=True,
host='smtp.qq.com',
port=465,
# The recipient format can be "Nickname <Email>" or "Email", like this: "Mrs.Lisa <lisa@example.com>" or "lisa@example.com"
# Multiple recipients can be separated by semicolon(;) or comma(,).
# Multiple recipients you can also pass a list.
send_from='Administrator <admin@example.com>',
# Add recipients.
send_to='Mr.Tom <tom@example.com>, Mr.Jack <jack@example.com>',
# Add Cc.
send_cc='Lindsay <its-lindsay@example.com>, Nora <nora@example.com>',
# Add Bcc.
send_bc='frederica@example.com',
subject='Email Subject',
content='This is some content...',
# You can add attachments, you need to pass the path of a single or more attachment in the form of a list.
attachs=None
).send())

View File

@ -0,0 +1,4 @@
from Lib.LoadConf import *
from Lib.LoadData import *
import pytest
import allure

38
Lib/Time.py Normal file
View File

@ -0,0 +1,38 @@
import time
def fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=None, utc=False):
return time.strftime(fmt, [time.localtime, time.gmtime][utc](ts))
def par_time(fmt='', dt=''):
return time.mktime(time.strptime(dt, fmt))
def asc_time(ts=None):
return time.asctime(time.localtime(ts))
def int_time():
return int(time.time())
def mic_time():
return round(time.time(), 6)
if __name__ == '__main__':
# e.g. Get the current formatting time.
print(fmt_time(fmt='%Y-%m-%d %H:%M:%S'))
# e.g. Get the formatting time and specify the timestamp.
print(fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=0))
# e.g. Get UTC formatting time.
print(fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=0, utc=True))
# e.g. Convert the formatting time to timestamp.
print(par_time(fmt='%Y-%m-%d %H:%M:%S', dt='2020-12-31 12:00:00'))
# e.g. Get ASC formatting time.
print(asc_time())
# e.g. Get integer timestamp.
print(int_time())
# e.g. Get accurate to microsecond timestamp.
print(mic_time())

9
Lib/Yaml.py Normal file
View File

@ -0,0 +1,9 @@
import yaml
def yaml_encode(data, indent=None, unicode=False):
return yaml.dump(data, indent=indent, allow_unicode=not unicode, sort_keys=False)
def yaml_decode(data):
return yaml.load(data, Loader=yaml.CFullLoader)

16
Pom/Base/__init__.py Normal file
View File

@ -0,0 +1,16 @@
from Lib.Driver import *
class PageBase:
def __init__(self, d: WebDriver):
self.driver = d
def home(self):
self.driver.open('https://www.fanscloud.net/')
def exit(self):
self.driver.tab_cancel_all()
self.driver.quit()
def reload(self):
self.driver.refresh()

View File

@ -0,0 +1,25 @@
from Pom.Home import *
class PageSignin(PageHome):
box_user = [By.XPATH, '//input[@type="text" and @id="user"]']
box_pass = [By.XPATH, '//input[@type="password" and @id="pw"]']
box_persist = [By.XPATH, '//input[@type="checkbox" and @id="persist"]']
btn_submit = [By.XPATH, '//button[@type="submit"]']
def open(self):
self.home()
self.click_signin()
return self
def input_user(self, value):
return self.driver.input(self.driver.find_element_by(self.box_user), value)
def input_pass(self, value):
return self.driver.input(self.driver.find_element_by(self.box_pass), value)
def click_persist(self):
return self.driver.click(self.driver.find_element_by(self.box_persist))
def click_submit(self):
return self.driver.click(self.driver.find_element_by(self.btn_submit))

View File

11
Pom/Home/__init__.py Normal file
View File

@ -0,0 +1,11 @@
from Pom.Base import *
class PageHome(PageBase):
btn_signin = [By.LINK_TEXT, '登录']
def open(self):
self.home()
return self
def click_signin(self):
return self.driver.click(self.driver.find_element_by(self.btn_signin))

22
Testcase/test_signin.py Normal file
View File

@ -0,0 +1,22 @@
from Lib.TestcaseBasicLibrary import *
from Pom.Home.Signin import *
class TestSignin:
def setup_class(self):
self.page = PageSignin(WebDriver()).open()
def teardown_class(self):
self.page.exit()
@pytest.mark.parametrize(('username', 'password'), [['super', 'super'], ['guest', 'guest'], ['users', 'users']])
def test_errors_signin(self, username, password):
self.page.reload()
self.page.input_user(username)
self.page.input_pass(password)
@pytest.mark.parametrize(('username', 'password'), [['admin', 'admin']])
def test_normal_signin(self, username, password):
self.page.reload()
self.page.input_user(username)
self.page.input_pass(password)

1
environment.properties Normal file
View File

@ -0,0 +1 @@
Tester = User

View File

@ -1,3 +1,12 @@
[pytest]
addopts = -q --alluredir=Report/allure_result
testpaths = Testcase
addopts = -vs --strict-markers --reruns=1 --reruns-delay=3
log_cli = 1
pythonpath = ./
cache_dir = pytest_cache
norecursedirs = .* venv po pom library dist build
markers =
p0: P0 level testcase
p1: P1 level testcase
p2: P2 level testcase
p3: P3 level testcase

BIN
requirements.txt Normal file

Binary file not shown.