Commit.
This commit is contained in:
commit
86d46bc165
|
@ -0,0 +1,3 @@
|
|||
/Report/*
|
||||
.pytest_cache/
|
||||
__pycache__/
|
|
@ -0,0 +1,4 @@
|
|||
from Library.LoadConfig import *
|
||||
from Library.LoadData import *
|
||||
import unittest
|
||||
import ddt
|
|
@ -0,0 +1 @@
|
|||
report_format: '%Y-%m-%d-%H-%M-%S'
|
|
@ -0,0 +1,20 @@
|
|||
123456
|
||||
password
|
||||
12345678
|
||||
qwerty
|
||||
123456789
|
||||
12345
|
||||
1234
|
||||
111111
|
||||
1234567
|
||||
dragon
|
||||
123123
|
||||
baseball
|
||||
abc123
|
||||
football
|
||||
monkey
|
||||
letmein
|
||||
696969
|
||||
shadow
|
||||
master
|
||||
666666
|
|
|
@ -0,0 +1 @@
|
|||
["123456", "password", "12345678", "qwerty", "123456789", "12345", "1234", "111111", "1234567", "dragon", "123123", "baseball", "abc123", "football", "monkey", "letmein", "696969", "shadow", "master", "666666"]
|
|
@ -0,0 +1,20 @@
|
|||
123456
|
||||
password
|
||||
12345678
|
||||
qwerty
|
||||
123456789
|
||||
12345
|
||||
1234
|
||||
111111
|
||||
1234567
|
||||
dragon
|
||||
123123
|
||||
baseball
|
||||
abc123
|
||||
football
|
||||
monkey
|
||||
letmein
|
||||
696969
|
||||
shadow
|
||||
master
|
||||
666666
|
|
@ -0,0 +1,20 @@
|
|||
- '123456'
|
||||
- 'password'
|
||||
- '12345678'
|
||||
- 'qwerty'
|
||||
- '123456789'
|
||||
- '12345'
|
||||
- '1234'
|
||||
- '111111'
|
||||
- '1234567'
|
||||
- 'dragon'
|
||||
- '123123'
|
||||
- 'baseball'
|
||||
- 'abc123'
|
||||
- 'football'
|
||||
- 'monkey'
|
||||
- 'letmein'
|
||||
- '696969'
|
||||
- 'shadow'
|
||||
- 'master'
|
||||
- '666666'
|
|
@ -0,0 +1,45 @@
|
|||
import base64
|
||||
import os
|
||||
|
||||
|
||||
def base64_encode(data, output=None):
|
||||
"""
|
||||
:param data: Bytes or String or FileStream.
|
||||
:param output: Output file stream, when the data parameter is the file stream valid.
|
||||
:return: What type of data entering returns the same data type.
|
||||
"""
|
||||
if isinstance(data, bytes):
|
||||
return base64.b64encode(data)
|
||||
if isinstance(data, str):
|
||||
return base64.b64encode(bytes(data, encoding='utf-8')).decode()
|
||||
else:
|
||||
return base64.encode(data, output)
|
||||
|
||||
|
||||
def base64_decode(data, output=None):
|
||||
"""
|
||||
:param data: Bytes or String or FileStream.
|
||||
:param output: Output file stream, when the data parameter is the file stream valid.
|
||||
:return: What type of data entering returns the same data type.
|
||||
"""
|
||||
if isinstance(data, bytes):
|
||||
return base64.b64decode(data)
|
||||
if isinstance(data, str):
|
||||
return base64.b64decode(bytes(data, encoding='utf-8')).decode()
|
||||
else:
|
||||
return base64.decode(data, output)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# e.g. Code bytes type.
|
||||
print(base64_encode(bytes('Some text content.', encoding='utf-8')))
|
||||
# e.g. Code string type.
|
||||
print(base64_encode('Some text content.'))
|
||||
# e.g. Read the file and encode and write to another file.
|
||||
# 1.Create a test example file,
|
||||
open('./example_of_base64_encode', 'w').write('Some text content.')
|
||||
# 2.Then test,
|
||||
print(base64_encode(open('./example_of_base64_encode', 'rb'), open('./example_of_base64_encode.base64', 'wb')))
|
||||
# 3.Final cleanup.
|
||||
os.remove('./example_of_base64_encode')
|
||||
os.remove('./example_of_base64_encode.base64')
|
|
@ -0,0 +1,29 @@
|
|||
import tempfile
|
||||
import platform
|
||||
import shutil
|
||||
import glob
|
||||
import os
|
||||
|
||||
|
||||
def clear_selenium():
|
||||
if platform.uname().system == 'Windows':
|
||||
user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')]
|
||||
if user_home[0] and user_home[1]:
|
||||
try:
|
||||
shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1]))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def clear_driver_cache():
|
||||
for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']:
|
||||
for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)):
|
||||
try:
|
||||
shutil.rmtree(i)
|
||||
except (FileNotFoundError, PermissionError, WindowsError):
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# e.g. Clear
|
||||
clear_selenium()
|
||||
clear_driver_cache()
|
|
@ -0,0 +1,485 @@
|
|||
from selenium import webdriver as _root_webdriver
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
from selenium.webdriver.common.alert import Alert
|
||||
from selenium.webdriver.common.action_chains import ActionChains
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.support.wait import WebDriverWait
|
||||
from selenium.common.exceptions import *
|
||||
import tempfile
|
||||
import platform
|
||||
import random
|
||||
import json
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
|
||||
# You can set the drive path and browser location through the environment variables,
|
||||
# and select the webdriver type.
|
||||
# It has preferential priority through environmental variable settings.
|
||||
SELENIUM_WIRE = os.environ.get('SELENIUM_WIRE') or ''
|
||||
# Attention!!!
|
||||
# Must set the environment before importing selenium.
|
||||
# Otherwise, these settings will be invalid.
|
||||
SELENIUM_BROWSER_CHOOSE = os.environ.get('SELENIUM_BROWSER_CHOOSE') or 'Chrome'
|
||||
SELENIUM_BROWSER_DRIVER = os.environ.get('SELENIUM_BROWSER_DRIVER') or ''
|
||||
SELENIUM_BROWSER_BINARY = os.environ.get('SELENIUM_BROWSER_BINARY') or ''
|
||||
# Choose Selenium or Selenium-Wire, default: Selenium.
|
||||
if SELENIUM_WIRE:
|
||||
exec('from seleniumwire import webdriver as _root_webdriver')
|
||||
# Choose options and service.
|
||||
if SELENIUM_BROWSER_CHOOSE:
|
||||
exec('from selenium.webdriver.%s.options import Options' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').lower())
|
||||
exec('from selenium.webdriver.%s.service import Service' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').lower())
|
||||
# Use the chrome driver by default, such a code writing method is for grammar prompts.
|
||||
DriverChoose = _root_webdriver.Chrome
|
||||
if SELENIUM_BROWSER_CHOOSE:
|
||||
exec('DriverChoose=_root_webdriver.%s' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').capitalize())
|
||||
|
||||
|
||||
class BrowserMobileEmulation(dict):
|
||||
"""
|
||||
Mobile emulation parameters.
|
||||
"""
|
||||
def __init__(self, w=540, h=960, user_agent=None):
|
||||
du = 'Mozilla/5.0 (Linux; U; Android 13; zh-cn; 2109119BC Build/TKQ1.220829.002) ' \
|
||||
'AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 ' \
|
||||
'Chrome/98.0.4758.102 MQQBrowser/13.6 Mobile Safari/537.36'
|
||||
user_agent = user_agent or du
|
||||
super().__init__({'w': w, 'h': h, 'user_agent': user_agent})
|
||||
self.w = self.h = self.user_agent = None
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
pass
|
||||
|
||||
def __getitem__(self, item):
|
||||
try:
|
||||
return super().__getitem__(item)
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def __getattr__(self, item):
|
||||
try:
|
||||
return super().__getitem__(item)
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
|
||||
class PositionTab:
|
||||
"""
|
||||
Position for switch tab.
|
||||
"""
|
||||
Prev = 'Go-Prev'
|
||||
Next = 'Go-Next'
|
||||
|
||||
|
||||
class Browser(DriverChoose):
|
||||
"""
|
||||
Browser web driver.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
driver: str = None,
|
||||
binary: str = None,
|
||||
headless: bool = False,
|
||||
lang: str = None,
|
||||
mute: bool = False,
|
||||
no_images: bool = False,
|
||||
user_agent: str = None,
|
||||
http_proxy: str = None,
|
||||
home: str = None,
|
||||
window_size: str = None,
|
||||
mobile_emulation: BrowserMobileEmulation = None,
|
||||
option_arguments: list = None,
|
||||
req_interceptor=None,
|
||||
res_interceptor=None,
|
||||
):
|
||||
choose = SELENIUM_BROWSER_CHOOSE.capitalize()
|
||||
self.platform = platform.uname().system
|
||||
default_driver = None
|
||||
default_binary = None
|
||||
if choose == 'Chrome':
|
||||
if self.platform == 'Linux':
|
||||
default_driver = '/opt/google/chrome/chromedriver'
|
||||
default_binary = '/opt/google/chrome/chrome'
|
||||
else:
|
||||
default_driver = 'C:/Program Files/Google/Chrome/Application/chromedriver.exe'
|
||||
default_binary = 'C:/Program Files/Google/Chrome/Application/chrome.exe'
|
||||
driver = driver or SELENIUM_BROWSER_DRIVER or default_driver
|
||||
binary = binary or SELENIUM_BROWSER_BINARY or default_binary
|
||||
if self.platform == 'Linux' and not window_size: window_size = '1920x1080'
|
||||
# Initialization settings.
|
||||
if (isinstance(option_arguments, list)) is False: option_arguments = []
|
||||
cdplist = []
|
||||
service = Service()
|
||||
options = Options()
|
||||
self.cdplist = cdplist
|
||||
self.ignore_page_load_timeout = 0
|
||||
# Delete prompt information of chrome being controlled.
|
||||
hasattr(options, 'add_experimental_option') and options.add_experimental_option("excludeSwitches", ['enable-automation'])
|
||||
# Mobile emulation parameter setting start.
|
||||
if mobile_emulation:
|
||||
if hasattr(options, 'add_experimental_option') is False:
|
||||
raise Exception('Do not support mobile emulation currently.')
|
||||
self.w_browser = mobile_emulation.w + 14
|
||||
self.h_browser = mobile_emulation.h + 0
|
||||
self.w_inner_window = mobile_emulation.w + 0
|
||||
self.h_inner_window = mobile_emulation.h - 86
|
||||
self.mobile_emulation_screen_w = mobile_emulation.w
|
||||
self.mobile_emulation_screen_h = mobile_emulation.h
|
||||
window_size = '%s,%s' % (self.w_browser, self.h_browser)
|
||||
options.add_experimental_option(
|
||||
'mobileEmulation', {
|
||||
'deviceMetrics': {
|
||||
'width': self.w_inner_window,
|
||||
'height': self.h_inner_window,
|
||||
'pixelRatio': 2.75,
|
||||
'touch': True
|
||||
},
|
||||
'userAgent': mobile_emulation.user_agent
|
||||
}
|
||||
)
|
||||
cdplist.append([
|
||||
'Emulation.setUserAgentOverride', {
|
||||
'userAgent': mobile_emulation.user_agent,
|
||||
'userAgentMetadata': {
|
||||
'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone',
|
||||
'mobile': True,
|
||||
'platformVersion': '',
|
||||
'architecture': '',
|
||||
'model': ''
|
||||
}
|
||||
}
|
||||
])
|
||||
else:
|
||||
self.w_browser = 0
|
||||
self.h_browser = 0
|
||||
self.w_inner_window = 0
|
||||
self.h_inner_window = 0
|
||||
self.mobile_emulation_screen_w = 0
|
||||
self.mobile_emulation_screen_h = 0
|
||||
# Mobile emulation parameter setting end.
|
||||
# Set browser and webdriver path.
|
||||
if driver:
|
||||
service.path = driver
|
||||
if binary:
|
||||
options.binary_location = binary
|
||||
# Add webdriver option arguments.
|
||||
for i in option_arguments:
|
||||
options.add_argument(i)
|
||||
# Set headless mode.
|
||||
if self.platform == 'Linux' or headless:
|
||||
options.add_argument('--headless')
|
||||
# Set no-sandbox mode.
|
||||
if self.platform == 'Linux':
|
||||
options.add_argument('--no-sandbox')
|
||||
# Set language of browser, default is zh-CN.
|
||||
if lang:
|
||||
options.add_argument('--lang=%s' % (lang or 'zh-CN'))
|
||||
hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN')
|
||||
# Set mute.
|
||||
if mute:
|
||||
options.add_argument('--mute-audio=true')
|
||||
hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr)
|
||||
# Set no images mode.
|
||||
if no_images:
|
||||
options.add_argument('--blink-settings=imagesEnabled=false')
|
||||
hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr)
|
||||
# Set default user agent.
|
||||
if user_agent:
|
||||
options.add_argument('--user-agent=%s' % user_agent)
|
||||
hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent)
|
||||
# Set http proxy for browser.
|
||||
if http_proxy:
|
||||
options.add_argument('--proxy-server=http://%s' % http_proxy)
|
||||
# Set browser window size before startup.
|
||||
if window_size:
|
||||
options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ','))
|
||||
else:
|
||||
options.add_argument('--start-maximized')
|
||||
# Start the browser.
|
||||
super().__init__(service=service, options=options)
|
||||
# Selenium-Wire backend optimization start.
|
||||
try:
|
||||
self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.')
|
||||
self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.')
|
||||
self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.')
|
||||
except AttributeError:
|
||||
pass
|
||||
# Selenium-Wire backend optimization end.
|
||||
if mobile_emulation:
|
||||
cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}])
|
||||
cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}])
|
||||
cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}])
|
||||
# Set the request and response interceptor.
|
||||
if req_interceptor:
|
||||
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
|
||||
self.request_interceptor = req_interceptor
|
||||
if res_interceptor:
|
||||
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
|
||||
self.response_interceptor = res_interceptor
|
||||
# Sync set http proxy for Selenium-Wire backend.
|
||||
if http_proxy:
|
||||
self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy}
|
||||
# Set browser window size after startup, by default, there will be full screen display window.
|
||||
if window_size:
|
||||
self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(','))
|
||||
else:
|
||||
self.maximize_window()
|
||||
# Sets a sticky timeout to implicitly wait for an element to be found.
|
||||
self.implicitly_wait(10)
|
||||
# Set the amount of time to wait for a page load to complete.
|
||||
self.set_page_load_timeout(25)
|
||||
# Open the default page.
|
||||
home and self.open(home)
|
||||
|
||||
@staticmethod
|
||||
def wait(secs: int | float = 1):
|
||||
"""
|
||||
Will sleep waiting.
|
||||
"""
|
||||
number_int = int(secs)
|
||||
number_float = secs - number_int
|
||||
for i in range(number_int):
|
||||
time.sleep(1)
|
||||
else:
|
||||
time.sleep(number_float)
|
||||
|
||||
def open(self, url=None):
|
||||
"""
|
||||
Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer.
|
||||
"""
|
||||
self.update_cdp_command()
|
||||
try:
|
||||
return self.get(url)
|
||||
except TimeoutException:
|
||||
if self.ignore_page_load_timeout:
|
||||
return False
|
||||
else:
|
||||
raise
|
||||
|
||||
def turn(self, url=None):
|
||||
"""
|
||||
Simulation "window.location.href" jumps, the new page has Referrer.
|
||||
"""
|
||||
return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None)
|
||||
|
||||
def find(self, path):
|
||||
"""
|
||||
Use XPath to find an element.
|
||||
"""
|
||||
return self.find_element(By.XPATH, path)
|
||||
|
||||
def find_mult(self, path):
|
||||
"""
|
||||
Use XPath to find elements.
|
||||
"""
|
||||
return self.find_elements(By.XPATH, path)
|
||||
|
||||
def find_element_by(self, sentence):
|
||||
"""
|
||||
Custom find element, pass into a tuple or list.
|
||||
"""
|
||||
return self.find_element(*sentence)
|
||||
|
||||
def click(self, element):
|
||||
"""
|
||||
Click element for desktop version.
|
||||
"""
|
||||
self.action_chains().reset_actions()
|
||||
self.action_chains().click(element).perform()
|
||||
self.wait(0.1)
|
||||
|
||||
def touch(self, x, y):
|
||||
"""
|
||||
Click on the coordinates for Mobile edition.
|
||||
"""
|
||||
self.action_chains().reset_actions()
|
||||
self.action_chains().move_by_offset(x, y).click().perform()
|
||||
self.wait(0.1)
|
||||
|
||||
def input(self, element, content):
|
||||
"""
|
||||
Enter the content to the element.
|
||||
"""
|
||||
self.action_chains().reset_actions()
|
||||
self.action_chains().send_keys_to_element(element, content).perform()
|
||||
self.wait(0.1)
|
||||
|
||||
def mouse(self, element):
|
||||
"""
|
||||
Park the mouse here.
|
||||
"""
|
||||
self.action_chains().reset_actions()
|
||||
self.action_chains().move_to_element(element).perform()
|
||||
|
||||
def tab_create(self, url=None):
|
||||
"""
|
||||
Create a new tab and open the URL.
|
||||
"""
|
||||
self.switch_to.new_window('tab')
|
||||
self.update_cdp_command()
|
||||
url and self.open(url)
|
||||
|
||||
def tab_switch(self, tab: int | str):
|
||||
"""
|
||||
Switch the browser tab page
|
||||
"""
|
||||
handles = self.window_handles
|
||||
lengths = len(handles)
|
||||
current = handles.index(self.current_window_handle)
|
||||
if isinstance(tab, int):
|
||||
handle = tab
|
||||
elif tab == PositionTab.Prev:
|
||||
handle = (current - 1)
|
||||
elif tab in PositionTab.Next:
|
||||
handle = (current + 1) % lengths
|
||||
else:
|
||||
handle = None
|
||||
self.switch_to.window(handles[handle])
|
||||
self.wait(0.2)
|
||||
self.update_cdp_command()
|
||||
|
||||
def tab_cancel(self):
|
||||
"""
|
||||
Close the current browser tab page.
|
||||
"""
|
||||
handles = self.window_handles
|
||||
if len(handles):
|
||||
current = handles.index(self.current_window_handle)
|
||||
self.close()
|
||||
current > 0 and self.switch_to.window(handles[current - 1])
|
||||
self.wait(0.2)
|
||||
|
||||
def tab_cancel_all(self):
|
||||
"""
|
||||
Close all the browser tab page.
|
||||
"""
|
||||
handles = self.window_handles
|
||||
for i in handles:
|
||||
self.tab_cancel()
|
||||
|
||||
def frame_switch_to(self, element_of_frame):
|
||||
"""
|
||||
Switch frame to the specified frame element.
|
||||
"""
|
||||
self.switch_to.frame(element_of_frame)
|
||||
self.wait(0.2)
|
||||
|
||||
def frame_switch_to_default(self):
|
||||
"""
|
||||
Switch to the default frame.
|
||||
"""
|
||||
self.switch_to.default_content()
|
||||
self.wait(0.2)
|
||||
|
||||
def scroll(self):
|
||||
"""
|
||||
Scroll page.
|
||||
:return:
|
||||
"""
|
||||
self.action_chains().reset_actions()
|
||||
self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform()
|
||||
self.wait(0.8)
|
||||
|
||||
def scroll_to(self, pos: int | str):
|
||||
"""
|
||||
Scroll to the specified location.
|
||||
"""
|
||||
if isinstance(pos, int) and pos > 0:
|
||||
self.execute_script('window.scrollTo(0, arguments[0]);', pos)
|
||||
elif pos == 0:
|
||||
self.execute_script('window.scrollTo(0, 0);')
|
||||
elif pos == 0 - 1:
|
||||
self.execute_script('window.scrollTo(0, document.body.scrollHeight);')
|
||||
else:
|
||||
pass
|
||||
self.wait(0.8)
|
||||
|
||||
def scroll_to_element(self, element):
|
||||
"""
|
||||
Scroll to the specified element location.
|
||||
"""
|
||||
self.action_chains().reset_actions()
|
||||
self.action_chains().scroll_to_element(element).perform()
|
||||
self.wait(0.8)
|
||||
|
||||
def element_force_display(self, element):
|
||||
"""
|
||||
Make hidden element visible and interactive.
|
||||
"""
|
||||
self.execute_script(
|
||||
'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element
|
||||
)
|
||||
|
||||
def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None):
|
||||
"""
|
||||
Return WebDriverWait object.
|
||||
"""
|
||||
return WebDriverWait(
|
||||
driver=self,
|
||||
timeout=timeout,
|
||||
poll_frequency=poll_frequency,
|
||||
ignored_exceptions=ignored_exceptions
|
||||
)
|
||||
|
||||
def current_alert(self):
|
||||
"""
|
||||
Return current alert object.
|
||||
"""
|
||||
return Alert(self)
|
||||
|
||||
def window_inner_size(self):
|
||||
"""
|
||||
Get the page window inner size.
|
||||
"""
|
||||
size = self.execute_script('return [window.innerWidth, window.innerHeight];')
|
||||
return {'w': size[0] or 0, 'h': size[1] or 0}
|
||||
|
||||
def action_chains(self):
|
||||
"""
|
||||
Return ActionChains object.
|
||||
"""
|
||||
return ActionChains(self)
|
||||
|
||||
def ignore_timeout(self):
|
||||
"""
|
||||
Page loading timeout will not throw an exception.
|
||||
"""
|
||||
self.ignore_page_load_timeout = 1
|
||||
|
||||
def screenshot_for_report(self):
|
||||
"""
|
||||
Screenshot for HTMLTestRunner report,
|
||||
will print the screenshot path to the <stderr> to let it analyze it.
|
||||
"""
|
||||
root = os.environ.get('HTML_REPORT_ROOT') or tempfile.gettempdir()
|
||||
basename = '%s%s.png' % (time.strftime('%Y%m%d%H%M%S', time.localtime()), random.randint(1000, 9999))
|
||||
fullname = '%s/%s' % (root, basename)
|
||||
if os.path.exists(root):
|
||||
if self.get_screenshot_as_file(fullname):
|
||||
print('[report-screenshot]%s[/report-screenshot]' % fullname, file=sys.stderr)
|
||||
else:
|
||||
print('Warning: Screenshot failure.', file=sys.stderr)
|
||||
|
||||
def update_cdp_command(self) -> None:
|
||||
for cmd in self.cdplist:
|
||||
self.execute_cdp_cmd(*cmd)
|
||||
|
||||
|
||||
class WebDriver(Browser):
|
||||
"""
|
||||
Get a browser driver object.
|
||||
"""
|
||||
def __init__(self):
|
||||
super().__init__(lang='zh-CN')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# e.g. Test it can work normally.
|
||||
driver = WebDriver()
|
||||
driver.open('https://www.hao123.com/')
|
||||
driver.wait()
|
||||
driver.quit()
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,104 @@
|
|||
import requests
|
||||
import requests.utils
|
||||
|
||||
|
||||
class HTTPResponseObject(dict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.url = None
|
||||
self.status = None
|
||||
self.reason = None
|
||||
self.header = None
|
||||
self.cookie = None
|
||||
self.binary = None
|
||||
self.text, self.json = None, None
|
||||
self.encoding, self.duration = None, None
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
pass
|
||||
|
||||
def __getitem__(self, item):
|
||||
try:
|
||||
return super().__getitem__(item)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def __getattr__(self, item):
|
||||
try:
|
||||
return super().__getitem__(item)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class HTTPRequest:
|
||||
__http__ = requests
|
||||
__accept_method__ = ('GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 'PATCH')
|
||||
|
||||
def http(self, method='GET', url: str = None, query=None, data=None, json=None, file=None, header=None, cookie=None,
|
||||
ua: str = None, auth=None, timeout: int | float = None, proxy: str = None, auto_redirect=False, ignore_cert_error=False):
|
||||
method = method.upper()
|
||||
if method not in self.__accept_method__:
|
||||
raise Exception('Unsupported request method.')
|
||||
if method == 'HEAD':
|
||||
auto_redirect = False
|
||||
if ua:
|
||||
if not isinstance(header, dict):
|
||||
header = {}
|
||||
header['User-Agent'] = ua
|
||||
if isinstance(proxy, str) and len(proxy) >= 5:
|
||||
proxy = {'http': proxy, 'https': proxy}
|
||||
else:
|
||||
proxy = None
|
||||
try:
|
||||
response = self.__http__.request(
|
||||
method=method,
|
||||
url=url,
|
||||
params=query,
|
||||
data=data,
|
||||
headers=header,
|
||||
cookies=cookie,
|
||||
files=file,
|
||||
auth=auth,
|
||||
timeout=timeout,
|
||||
allow_redirects=auto_redirect,
|
||||
proxies=proxy,
|
||||
verify=not ignore_cert_error,
|
||||
json=json
|
||||
)
|
||||
except Exception:
|
||||
response = None
|
||||
try:
|
||||
response_json = response.json()
|
||||
except Exception:
|
||||
response_json = None
|
||||
if response:
|
||||
return HTTPResponseObject({
|
||||
'url': response.url,
|
||||
'status': response.status_code,
|
||||
'reason': response.reason,
|
||||
'header': response.headers,
|
||||
'cookie': requests.utils.dict_from_cookiejar(response.cookies),
|
||||
'binary': response.content,
|
||||
'text': response.text,
|
||||
'json': response_json,
|
||||
'encoding': response.encoding,
|
||||
'duration': round(response.elapsed.microseconds / 1000, 1)
|
||||
})
|
||||
else:
|
||||
return HTTPResponseObject({
|
||||
'status': -1
|
||||
})
|
||||
|
||||
|
||||
class HTTPSession(HTTPRequest):
|
||||
__http__ = requests.session()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# e.g. Send a get request.
|
||||
res = HTTPSession().http(
|
||||
method='GET', url='http://pv.sohu.com/cityjson', query={'ie': 'utf-8'},
|
||||
ua='Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
|
||||
auto_redirect=True
|
||||
)
|
||||
print(res)
|
|
@ -0,0 +1,24 @@
|
|||
import json
|
||||
|
||||
|
||||
def json_encode(data, indent=None, unicode=True):
|
||||
return json.dumps(data, indent=indent, ensure_ascii=unicode)
|
||||
|
||||
|
||||
def json_decode(data):
|
||||
return json.loads(data)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# e.g. The default does not indent, and the string will be coded by the unicode.
|
||||
print(json_encode({"id": 101, "name": "汤姆", "friends": ["托尼", "杰森"]}))
|
||||
# e.g. Indent 4 spaces.
|
||||
print(json_encode({"id": 101, "name": "汤姆", "friends": ["托尼", "杰森"]}, indent=4))
|
||||
# e.g. Indent 4 spaces and no coded by the unicode.
|
||||
print(json_encode({"id": 101, "name": "汤姆", "friends": ["托尼", "杰森"]}, indent=4, unicode=False))
|
||||
# e.g. Decode JSON data.
|
||||
print(json_decode(
|
||||
'''
|
||||
{"id": 101, "name": "汤姆", "friends": ["托尼", "杰森"]}
|
||||
'''
|
||||
))
|
|
@ -0,0 +1,48 @@
|
|||
from Library.Json import json_decode
|
||||
from Library.Yaml import yaml_decode
|
||||
import os
|
||||
|
||||
|
||||
def _home_of_conf():
|
||||
home = '%s/%s' % (os.path.dirname(os.path.dirname(__file__)), 'Config')
|
||||
if os.path.exists(home):
|
||||
return home
|
||||
else:
|
||||
raise FileNotFoundError('Directory "%s" does not exist.' % home)
|
||||
|
||||
class TestingConfig(dict):
|
||||
"""
|
||||
Load the configuration file from the configuration directory.
|
||||
"""
|
||||
def __init__(self, name: str):
|
||||
home = _home_of_conf()
|
||||
json_extension = '.json'
|
||||
yaml_extension = '.yaml'
|
||||
pa_json = '%s/%s%s' % (home, name, json_extension)
|
||||
pa_yaml = '%s/%s%s' % (home, name, yaml_extension)
|
||||
data = None
|
||||
if os.path.exists(pa_json):
|
||||
file = pa_json
|
||||
data = json_decode(open(file, encoding='utf-8').read())
|
||||
if os.path.exists(pa_yaml):
|
||||
file = pa_yaml
|
||||
data = yaml_decode(open(file, encoding='utf-8').read())
|
||||
if data is not None:
|
||||
super().__init__(data)
|
||||
else:
|
||||
raise FileNotFoundError('No json or yaml configuration file "%s" in config directory.' % name)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
raise AttributeError('Attribute is read only.')
|
||||
|
||||
def __getattr__(self, item):
|
||||
try:
|
||||
return super().__getitem__(item)
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
return self.__setattr__(key, value)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self.__getattr__(item)
|
|
@ -0,0 +1,44 @@
|
|||
from Library.Json import json_decode
|
||||
from Library.Yaml import yaml_decode
|
||||
import csv
|
||||
import os
|
||||
|
||||
|
||||
def _home_of_data():
|
||||
home = '%s/%s' % (os.path.dirname(os.path.dirname(__file__)), 'Data')
|
||||
if os.path.exists(home):
|
||||
return home
|
||||
else:
|
||||
raise FileNotFoundError('Directory "%s" does not exist.' % home)
|
||||
|
||||
def _checking_data_filename(_s: str):
|
||||
if _s.replace("\\", '/').find('/') != -1:
|
||||
raise NameError('The data filename "%s" is illegal in data directory.' % _s)
|
||||
|
||||
def loadDataFromJson(filename: str):
|
||||
_checking_data_filename(filename)
|
||||
return json_decode(open('%s/%s' % (_home_of_data(), filename), encoding='utf-8').read())
|
||||
|
||||
def loadDataFromYaml(filename: str):
|
||||
_checking_data_filename(filename)
|
||||
return yaml_decode(open('%s/%s' % (_home_of_data(), filename), encoding='utf-8').read())
|
||||
|
||||
def loadCsv(filename: str):
|
||||
_checking_data_filename(filename)
|
||||
file = '%s/%s' % (_home_of_data(), filename)
|
||||
try:
|
||||
open(file).read(4096)
|
||||
encoding = None
|
||||
except UnicodeDecodeError:
|
||||
encoding = 'utf-8'
|
||||
return [row for row in csv.reader(open(file, encoding=encoding))]
|
||||
|
||||
def loadTxt(filename: str):
|
||||
_checking_data_filename(filename)
|
||||
file = '%s/%s' % (_home_of_data(), filename)
|
||||
try:
|
||||
open(file).read(4096)
|
||||
encoding = None
|
||||
except UnicodeDecodeError:
|
||||
encoding = 'utf-8'
|
||||
return [row.rstrip("\n") for row in open(file, encoding=encoding).readlines()]
|
|
@ -0,0 +1,15 @@
|
|||
import logging
|
||||
|
||||
class TestingLogger:
|
||||
"""
|
||||
Testing logger.
|
||||
"""
|
||||
def __init__(self, name: str = 'test'):
|
||||
self._logger = logging.getLogger(name)
|
||||
self._logger.setLevel(logging.DEBUG)
|
||||
self.d = self._logger.debug
|
||||
self.i = self._logger.info
|
||||
self.w = self._logger.warning
|
||||
self.e = self._logger.error
|
||||
self.f = self._logger.fatal
|
||||
self.c = self._logger.critical
|
|
@ -0,0 +1,12 @@
|
|||
import os
|
||||
|
||||
def project_home():
|
||||
path = os.environ.get('PYTHONPATH')
|
||||
if not path:
|
||||
raise Exception('No environment variables of PYTHONPATH.')
|
||||
here = os.path.dirname(__file__).replace("\\", '/')
|
||||
for j in [i.replace("\\", '/').strip() for i in path.split(os.pathsep)]:
|
||||
if here.startswith(j):
|
||||
return j
|
||||
else:
|
||||
raise Exception('Project directory is not in PYTHONPATH.')
|
|
@ -0,0 +1,6 @@
|
|||
from Library.LoadConfig import TestingConfig
|
||||
from Library.Time import *
|
||||
|
||||
|
||||
def generate_report_name():
|
||||
return fmt_time(TestingConfig('default').report_format or '%Y-%m-%d-%H-%M-%S')
|
|
@ -0,0 +1,247 @@
|
|||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.base import MIMEBase
|
||||
from email.header import Header
|
||||
from email import encoders
|
||||
import smtplib
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
|
||||
|
||||
class SMTPMailerSendState(dict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.code = self.message = None
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
pass
|
||||
|
||||
def __getitem__(self, item):
|
||||
try:
|
||||
return super().__getitem__(item)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def __getattr__(self, item):
|
||||
try:
|
||||
return super().__getitem__(item)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class SMTPMailer:
|
||||
__host__ = {
|
||||
'host': '',
|
||||
'port': 25
|
||||
}
|
||||
__user__ = {
|
||||
'user': '',
|
||||
'pass': ''
|
||||
}
|
||||
__mail__ = {
|
||||
'from': [],
|
||||
'to': [],
|
||||
'cc': [],
|
||||
'bc': [],
|
||||
'attachs': [],
|
||||
'subject': '',
|
||||
'content': ''
|
||||
}
|
||||
__flag__ = 0
|
||||
__addr__ = 0
|
||||
__from__ = 0
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
username='',
|
||||
password='',
|
||||
host='127.0.0.1',
|
||||
port=25,
|
||||
ssl_enable=False,
|
||||
send_from='',
|
||||
send_to=None,
|
||||
send_cc=None,
|
||||
send_bc=None,
|
||||
subject='',
|
||||
content='',
|
||||
attachs=None
|
||||
):
|
||||
self.ssl_enable = ssl_enable
|
||||
self.host(host, port)
|
||||
self.user(username, password)
|
||||
send_from and self.send_from(send_from)
|
||||
send_to and self.to(send_to)
|
||||
send_cc and self.cc(send_cc)
|
||||
send_bc and self.bc(send_bc)
|
||||
subject and self.subject(subject)
|
||||
content and self.content(content)
|
||||
attachs and self.attachs(attachs)
|
||||
|
||||
@staticmethod
|
||||
def test_email(email: str, throw_exception=False):
|
||||
test = re.fullmatch('^([a-zA-Z0-9_.\\-])+@(([a-zA-Z0-9\\-])+\\.)+([a-zA-Z0-9]{2,4})+$', email) is not None
|
||||
if throw_exception and test is False:
|
||||
raise Exception('Illegal email address: %s.' % email)
|
||||
return test
|
||||
|
||||
@staticmethod
|
||||
def gen_header_of_mailer(e):
|
||||
try:
|
||||
return "\"%s\" <%s>" % (Header(e[0]).encode(), Header(e[1]).encode())
|
||||
except Exception:
|
||||
return ''
|
||||
|
||||
def parse_email_address(self, email: str):
|
||||
if email.find('<') != -1 and email.find('>') != -1:
|
||||
match_email = re.findall('^(.*?)<(.*?)>$', email)
|
||||
if len(match_email) == 1:
|
||||
addr = match_email[0][1].strip()
|
||||
self.test_email(addr, throw_exception=True)
|
||||
name = match_email[0][0].strip() or addr.split('@')[0].strip()
|
||||
return [name, addr]
|
||||
else:
|
||||
raise Exception('The wrong email address format: %s.' % email)
|
||||
else:
|
||||
if len(email) != 0:
|
||||
addr = email
|
||||
self.test_email(addr, throw_exception=True)
|
||||
name = email.split('@')[0].strip()
|
||||
return [name, addr]
|
||||
else:
|
||||
raise Exception('Empty email address.')
|
||||
|
||||
def parse_email_address_mult(self, data):
|
||||
if not isinstance(data, (str, list, tuple)):
|
||||
raise TypeError('Email address list must be string or list or tuple type.')
|
||||
if isinstance(data, str):
|
||||
data = [i.strip() for i in data.replace(',', ';').split(';')]
|
||||
while '' in data:
|
||||
data.remove('')
|
||||
return [self.parse_email_address(email_format) for email_format in data]
|
||||
|
||||
def enableSSL(self):
|
||||
self.ssl_enable = True
|
||||
return self
|
||||
|
||||
def host(self, host: str, port: int):
|
||||
self.__host__['host'] = host
|
||||
self.__host__['port'] = port
|
||||
return self
|
||||
|
||||
def user(self, username, password):
|
||||
self.__user__['user'] = username
|
||||
self.__user__['pass'] = password
|
||||
return self
|
||||
|
||||
def send_from(self, e):
|
||||
self.__mail__['from'] = self.parse_email_address(e)
|
||||
self.__from__ += 1
|
||||
return self
|
||||
|
||||
def to(self, receive_list):
|
||||
self.__mail__['to'] = self.parse_email_address_mult(receive_list)
|
||||
self.__addr__ += 1
|
||||
return self
|
||||
|
||||
def cc(self, receive_list):
|
||||
self.__mail__['cc'] = self.parse_email_address_mult(receive_list)
|
||||
self.__addr__ += 1
|
||||
return self
|
||||
|
||||
def bc(self, receive_list):
|
||||
self.__mail__['bc'] = self.parse_email_address_mult(receive_list)
|
||||
self.__addr__ += 1
|
||||
return self
|
||||
|
||||
def subject(self, text: str):
|
||||
self.__mail__['subject'] = text
|
||||
return self
|
||||
|
||||
def content(self, text: str):
|
||||
self.__mail__['content'] = text
|
||||
return self
|
||||
|
||||
def attachs(self, attachs_path_list: str | list):
|
||||
if isinstance(attachs_path_list, str):
|
||||
attachs_path_list = [attachs_path_list]
|
||||
for path in attachs_path_list:
|
||||
if not os.path.exists(path):
|
||||
raise FileNotFoundError('The attachment was not found.')
|
||||
self.__mail__['attachs'] = attachs_path_list
|
||||
return self
|
||||
|
||||
def send(self, test_only=False):
|
||||
if self.__flag__:
|
||||
return SMTPMailerSendState({'code': 1, 'message': 'Mail has been sent.'})
|
||||
if self.__addr__ == 0:
|
||||
raise Exception('No receiver.')
|
||||
message = MIMEMultipart()
|
||||
if self.__from__ == 0 and self.test_email(self.__user__['user']) and self.send_from(self.__user__['user']):
|
||||
pass
|
||||
message['From'] = self.gen_header_of_mailer(self.__mail__['from'])
|
||||
to_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['to']])
|
||||
cc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['cc']])
|
||||
bc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['bc']])
|
||||
message['To'], message['Cc'], message['Bcc'] = to_header, cc_header, bc_header
|
||||
message['Subject'] = Header(self.__mail__['subject'])
|
||||
message.attach(MIMEText(self.__mail__['content'], 'html', 'utf-8'))
|
||||
for file in self.__mail__['attachs']:
|
||||
mime = MIMEBase('application', 'octet-stream')
|
||||
mime.add_header('Content-Disposition', 'attachment', filename=os.path.basename(file))
|
||||
mime.set_payload(open(file, 'rb').read())
|
||||
encoders.encode_base64(mime)
|
||||
message.attach(mime)
|
||||
try:
|
||||
smtp = [smtplib.SMTP, smtplib.SMTP_SSL][self.ssl_enable](
|
||||
self.__host__['host'],
|
||||
self.__host__['port']
|
||||
)
|
||||
except Exception:
|
||||
return SMTPMailerSendState({'code': 1, 'message': 'Connection failure.'})
|
||||
try:
|
||||
smtp.login(self.__user__['user'], self.__user__['pass'])
|
||||
except Exception:
|
||||
return SMTPMailerSendState({'code': 2, 'message': 'Account authentication failed.'})
|
||||
try:
|
||||
receivers = []
|
||||
for rg in [self.__mail__['to'], self.__mail__['cc'], self.__mail__['bc']]:
|
||||
for rp in rg:
|
||||
receivers.append(rp[1])
|
||||
if test_only:
|
||||
print(message.as_string(), file=sys.stderr)
|
||||
return None
|
||||
smtp.sendmail(self.__user__['user'], receivers, message.as_string())
|
||||
smtp.quit()
|
||||
self.__flag__ = 1
|
||||
return SMTPMailerSendState({'code': 0, 'message': 'Mail sent successfully.'})
|
||||
except Exception as error:
|
||||
return SMTPMailerSendState({'code': 3, 'message': str(error)})
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# e.g. Send an email example.
|
||||
print(SMTPMailer().enableSSL().host('smtp.qq.com', 465).user('admin@example.com', '******').
|
||||
subject('Email Subject').content('This is some content...').to('user@example.com').send())
|
||||
# e.g. Or you can send emails like this.
|
||||
print(SMTPMailer(
|
||||
username='admin@example.com',
|
||||
password='******',
|
||||
ssl_enable=True,
|
||||
host='smtp.qq.com',
|
||||
port=465,
|
||||
# The recipient format can be "Nickname <Email>" or "Email", like this: "Mrs.Lisa <lisa@example.com>" or "lisa@example.com"
|
||||
# Multiple recipients can be separated by semicolon(;) or comma(,).
|
||||
# Multiple recipients you can also pass a list.
|
||||
send_from='Administrator <admin@example.com>',
|
||||
# Add recipients.
|
||||
send_to='Mr.Tom <tom@example.com>, Mr.Jack <jack@example.com>',
|
||||
# Add Cc.
|
||||
send_cc='Lindsay <its-lindsay@example.com>, Nora <nora@example.com>',
|
||||
# Add Bcc.
|
||||
send_bc='frederica@example.com',
|
||||
subject='Email Subject',
|
||||
content='This is some content...',
|
||||
# You can add attachments, you need to pass the path of a single or more attachment in the form of a list.
|
||||
attachs=None
|
||||
).send())
|
|
@ -0,0 +1,9 @@
|
|||
from Library.Logger import TestingLogger
|
||||
from Library.Driver import WebDriver
|
||||
from Library.LoadConfig import *
|
||||
from Library.LoadData import *
|
||||
import unittest
|
||||
import ddt
|
||||
|
||||
|
||||
log = TestingLogger()
|
|
@ -0,0 +1,38 @@
|
|||
import time
|
||||
|
||||
|
||||
def fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=None, utc=False):
|
||||
return time.strftime(fmt, [time.localtime, time.gmtime][utc](ts))
|
||||
|
||||
|
||||
def par_time(fmt='', dt=''):
|
||||
return time.mktime(time.strptime(dt, fmt))
|
||||
|
||||
|
||||
def asc_time(ts=None):
|
||||
return time.asctime(time.localtime(ts))
|
||||
|
||||
|
||||
def int_time():
|
||||
return int(time.time())
|
||||
|
||||
|
||||
def mic_time():
|
||||
return round(time.time(), 6)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# e.g. Get the current formatting time.
|
||||
print(fmt_time(fmt='%Y-%m-%d %H:%M:%S'))
|
||||
# e.g. Get the formatting time and specify the timestamp.
|
||||
print(fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=0))
|
||||
# e.g. Get UTC formatting time.
|
||||
print(fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=0, utc=True))
|
||||
# e.g. Convert the formatting time to timestamp.
|
||||
print(par_time(fmt='%Y-%m-%d %H:%M:%S', dt='2020-12-31 12:00:00'))
|
||||
# e.g. Get ASC formatting time.
|
||||
print(asc_time())
|
||||
# e.g. Get integer timestamp.
|
||||
print(int_time())
|
||||
# e.g. Get accurate to microsecond timestamp.
|
||||
print(mic_time())
|
|
@ -0,0 +1,14 @@
|
|||
import yaml
|
||||
|
||||
|
||||
def yaml_encode(data, indent=None, unicode=False):
|
||||
return yaml.dump(data, indent=indent, allow_unicode=not unicode, sort_keys=False)
|
||||
|
||||
|
||||
def yaml_decode(data):
|
||||
return yaml.load(data, Loader=yaml.CFullLoader)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# e.g. Yaml encoding and decoding it.
|
||||
print(yaml_decode(yaml_encode({"id": 101, "name": "Tom", "friends": ["Tony", "Jason"], "score": {"english": 92, "math": 61}})))
|
|
@ -0,0 +1,16 @@
|
|||
from Library.Driver import *
|
||||
|
||||
|
||||
class PageBase:
|
||||
def __init__(self, d: WebDriver):
|
||||
self.driver = d
|
||||
|
||||
def home(self):
|
||||
self.driver.open('https://www.fanscloud.net/')
|
||||
|
||||
def reload(self):
|
||||
self.driver.refresh()
|
||||
|
||||
def quit(self):
|
||||
self.driver.tab_cancel_all()
|
||||
self.driver.quit()
|
|
@ -0,0 +1,25 @@
|
|||
from Po.Home import *
|
||||
|
||||
|
||||
class PageSignin(PageHome):
|
||||
box_user = [By.XPATH, '//input[@type="text" and @id="user"]']
|
||||
box_pass = [By.XPATH, '//input[@type="password" and @id="pw"]']
|
||||
box_persist = [By.XPATH, '//input[@type="checkbox" and @id="persist"]']
|
||||
btn_submit = [By.XPATH, '//button[@type="submit"]']
|
||||
|
||||
def open(self):
|
||||
self.home()
|
||||
self.click_signin()
|
||||
return self
|
||||
|
||||
def input_user(self, value):
|
||||
return self.driver.input(self.driver.find_element_by(self.box_user), value)
|
||||
|
||||
def input_pass(self, value):
|
||||
return self.driver.input(self.driver.find_element_by(self.box_pass), value)
|
||||
|
||||
def click_persist(self):
|
||||
return self.driver.click(self.driver.find_element_by(self.box_persist))
|
||||
|
||||
def click_submit(self):
|
||||
return self.driver.click(self.driver.find_element_by(self.btn_submit))
|
|
@ -0,0 +1,11 @@
|
|||
from Po.Base import *
|
||||
|
||||
class PageHome(PageBase):
|
||||
btn_signin = [By.LINK_TEXT, '登录']
|
||||
|
||||
def open(self):
|
||||
self.home()
|
||||
return self
|
||||
|
||||
def click_signin(self):
|
||||
return self.driver.click(self.driver.find_element_by(self.btn_signin))
|
|
@ -0,0 +1,30 @@
|
|||
from Common.Basic import *
|
||||
from Po.Home.Signin import *
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestLoginSite(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
cls.driver = WebDriver()
|
||||
cls.page = PageSignin(cls.driver).open()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
cls.driver.quit()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.page.reload()
|
||||
|
||||
@ddt.data(*loadCsv('illegal_passwds.csv'))
|
||||
@ddt.unpack
|
||||
def test_signin_error_admin_pass(self, v1):
|
||||
pass
|
||||
self.page.input_user('admin')
|
||||
self.page.input_pass(v1)
|
||||
self.page.click_submit()
|
||||
# Please assert in here......
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Binary file not shown.
|
@ -0,0 +1,11 @@
|
|||
from Library.HTMLTestRunner import HTMLTestRunner
|
||||
from Library.ProjectHome import *
|
||||
from Library.Report import *
|
||||
from Library.Time import *
|
||||
from unittest import TestSuite, TestLoader
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
testsuite = TestSuite()
|
||||
testsuite.addTest(TestLoader().discover('%s/Testcase' % project_home()))
|
||||
print(HTMLTestRunner(report_home='%s/Report/%s' % (project_home(), fmt_time(generate_report_name())), report_home_latest_name='latest').run(testsuite))
|
Loading…
Reference in New Issue