From de8568bc3961a0c6f6d599deb3a5acb895c2464a Mon Sep 17 00:00:00 2001 From: zhaoyafan Date: Wed, 15 Mar 2023 19:40:39 +0800 Subject: [PATCH] Commit. --- .gitignore | 3 + Lib/Base64.py | 29 +++ Lib/Driver.py | 477 ++++++++++++++++++++++++++++++++++++ Lib/Http.py | 104 ++++++++ Lib/Json.py | 9 + Lib/LoadConf.py | 43 ++++ Lib/LoadData.py | 53 ++++ Lib/SeleniumClear.py | 29 +++ Lib/Smtp.py | 247 +++++++++++++++++++ Lib/TestcaseBasicLibrary.py | 4 + Lib/Time.py | 38 +++ Lib/Yaml.py | 9 + Pom/Base/__init__.py | 16 ++ Pom/Home/Signin/__init__.py | 25 ++ Pom/Home/Signup/__init__.py | 0 Pom/Home/__init__.py | 11 + Testcase/test_signin.py | 22 ++ environment.properties | 1 + pytest.ini | 11 +- requirements.txt | Bin 0 -> 1660 bytes 20 files changed, 1130 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 Lib/Base64.py create mode 100644 Lib/Driver.py create mode 100644 Lib/Http.py create mode 100644 Lib/Json.py create mode 100644 Lib/LoadConf.py create mode 100644 Lib/LoadData.py create mode 100644 Lib/SeleniumClear.py create mode 100644 Lib/Smtp.py create mode 100644 Lib/TestcaseBasicLibrary.py create mode 100644 Lib/Time.py create mode 100644 Lib/Yaml.py create mode 100644 Pom/Base/__init__.py create mode 100644 Pom/Home/Signin/__init__.py create mode 100644 Pom/Home/Signup/__init__.py create mode 100644 Pom/Home/__init__.py create mode 100644 Testcase/test_signin.py create mode 100644 environment.properties create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b2c0538 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*pytest_cache/ +*allure*/ +*pycache*/ diff --git a/Lib/Base64.py b/Lib/Base64.py new file mode 100644 index 0000000..b6edb5f --- /dev/null +++ b/Lib/Base64.py @@ -0,0 +1,29 @@ +import base64 + + +def base64_encode(data, output=None): + """ + :param data: Bytes or String or FileStream. + :param output: Output file stream, when the data parameter is the file stream valid. + :return: What type of data entering returns the same data type. + """ + if isinstance(data, bytes): + return base64.b64encode(data) + if isinstance(data, str): + return base64.b64encode(bytes(data, encoding='utf-8')).decode() + else: + return base64.encode(data, output) + + +def base64_decode(data, output=None): + """ + :param data: Bytes or String or FileStream. + :param output: Output file stream, when the data parameter is the file stream valid. + :return: What type of data entering returns the same data type. + """ + if isinstance(data, bytes): + return base64.b64decode(data) + if isinstance(data, str): + return base64.b64decode(bytes(data, encoding='utf-8')).decode() + else: + return base64.decode(data, output) diff --git a/Lib/Driver.py b/Lib/Driver.py new file mode 100644 index 0000000..69eca5a --- /dev/null +++ b/Lib/Driver.py @@ -0,0 +1,477 @@ +from selenium import webdriver as _root_webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.common.alert import Alert +from selenium.webdriver.common.action_chains import ActionChains +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.wait import WebDriverWait +from selenium.common.exceptions import * +import tempfile +import platform +import random +import json +import time +import sys +import os + +# You can set the drive path and browser location through the environment variables, +# and select the webdriver type. +# It has preferential priority through environmental variable settings. +SELENIUM_WIRE = os.environ.get('SELENIUM_WIRE') or '' +# Attention!!! +# Must set the environment before importing selenium. +# Otherwise, these settings will be invalid. +SELENIUM_BROWSER_CHOOSE = os.environ.get('SELENIUM_BROWSER_CHOOSE') or 'Chrome' +SELENIUM_BROWSER_DRIVER = os.environ.get('SELENIUM_BROWSER_DRIVER') or '' +SELENIUM_BROWSER_BINARY = os.environ.get('SELENIUM_BROWSER_BINARY') or '' +# Whether to turn on headless mode. +HEADLESS_BROWSER_ENABLE = True if os.environ.get('HEADLESS') == '1' else False +# Choose Selenium or Selenium-Wire, default: Selenium. +if SELENIUM_WIRE: + exec('from seleniumwire import webdriver as _root_webdriver') +# Choose options and service. +if SELENIUM_BROWSER_CHOOSE: + exec('from selenium.webdriver.%s.options import Options' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').lower()) + exec('from selenium.webdriver.%s.service import Service' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').lower()) +# Use the chrome driver by default, such a code writing method is for grammar prompts. +DriverChoose = _root_webdriver.Chrome +if SELENIUM_BROWSER_CHOOSE: + exec('DriverChoose=_root_webdriver.%s' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').capitalize()) + + +class BrowserMobileEmulation(dict): + """ + Mobile emulation parameters. + """ + def __init__(self, w=540, h=960, user_agent=None): + du = 'Mozilla/5.0 (Linux; U; Android 13; zh-cn; 2109119BC Build/TKQ1.220829.002) ' \ + 'AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 ' \ + 'Chrome/98.0.4758.102 MQQBrowser/13.6 Mobile Safari/537.36' + user_agent = user_agent or du + super().__init__({'w': w, 'h': h, 'user_agent': user_agent}) + self.w = self.h = self.user_agent = None + + def __setattr__(self, key, value): + pass + + def __getitem__(self, item): + try: + return super().__getitem__(item) + except KeyError: + return None + + def __getattr__(self, item): + try: + return super().__getitem__(item) + except KeyError: + return None + + +class DefaultChromeLocation: + xDriver = '/opt/google/chrome/chromedriver' + xChrome = '/opt/google/chrome/chrome' + wDriver = 'C:/Program Files/Google/Chrome/Application/chromedriver.exe' + wChrome = 'C:/Program Files/Google/Chrome/Application/chrome.exe' + + +class PositionTab: + """ + Position for switch tab. + """ + Prev = 'Go-Prev' + Next = 'Go-Next' + + +class Browser(DriverChoose): + """ + Browser web driver. + """ + def __init__( + self, + driver: str = None, + binary: str = None, + headless: bool = False, + lang: str = None, + mute: bool = False, + no_images: bool = False, + user_agent: str = None, + http_proxy: str = None, + home: str = None, + window_size: str = None, + mobile_emulation: BrowserMobileEmulation = None, + option_arguments: list = None, + req_interceptor=None, + res_interceptor=None, + ): + choose = SELENIUM_BROWSER_CHOOSE.capitalize() + self.platform = platform.uname().system + default_driver = None + default_binary = None + headless = HEADLESS_BROWSER_ENABLE or headless + if choose == 'Chrome': + if self.platform == 'Linux': + default_driver = DefaultChromeLocation.xDriver + default_binary = DefaultChromeLocation.xChrome + else: + default_driver = DefaultChromeLocation.wDriver + default_binary = DefaultChromeLocation.wChrome + driver = SELENIUM_BROWSER_DRIVER or driver or default_driver + binary = SELENIUM_BROWSER_BINARY or binary or default_binary + if self.platform == 'Linux' and not window_size: window_size = '1920x1080' + if self.platform == 'Windows' and headless and not window_size: window_size = '1920x1080' + # Initialization settings. + if (isinstance(option_arguments, list)) is False: option_arguments = [] + cdplist = [] + service = Service() + options = Options() + self.cdplist = cdplist + # Delete prompt information of chrome being controlled. + hasattr(options, 'add_experimental_option') and options.add_experimental_option("excludeSwitches", ['enable-automation', 'enable-logging']) + # Mobile emulation parameter setting start. + if mobile_emulation: + if hasattr(options, 'add_experimental_option') is False: + raise Exception('Do not support mobile emulation currently.') + self.w_browser = mobile_emulation.w + 14 + self.h_browser = mobile_emulation.h + 0 + self.w_inner_window = mobile_emulation.w + 0 + self.h_inner_window = mobile_emulation.h - 86 + self.mobile_emulation_screen_w = mobile_emulation.w + self.mobile_emulation_screen_h = mobile_emulation.h + window_size = '%s,%s' % (self.w_browser, self.h_browser) + options.add_experimental_option( + 'mobileEmulation', { + 'deviceMetrics': { + 'width': self.w_inner_window, + 'height': self.h_inner_window, + 'pixelRatio': 2.75, + 'touch': True + }, + 'userAgent': mobile_emulation.user_agent + } + ) + cdplist.append([ + 'Emulation.setUserAgentOverride', { + 'userAgent': mobile_emulation.user_agent, + 'userAgentMetadata': { + 'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone', + 'mobile': True, + 'platformVersion': '', + 'architecture': '', + 'model': '' + } + } + ]) + else: + self.w_browser = 0 + self.h_browser = 0 + self.w_inner_window = 0 + self.h_inner_window = 0 + self.mobile_emulation_screen_w = 0 + self.mobile_emulation_screen_h = 0 + # Mobile emulation parameter setting end. + # Set browser and webdriver path. + if driver: + service.path = driver + if binary: + options.binary_location = binary + # Add webdriver option arguments. + for i in option_arguments: + options.add_argument(i) + # Set headless mode. + if self.platform == 'Linux' or headless: + options.add_argument('--headless') + # Set no-sandbox mode. + if self.platform == 'Linux': + options.add_argument('--no-sandbox') + options.add_argument('--disable-dev-shm-usage') + options.add_argument('--disable-gpu') + # Set language of browser, default is zh-CN. + if lang: + options.add_argument('--lang=%s' % (lang or 'zh-CN')) + hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN') + # Set mute. + if mute: + options.add_argument('--mute-audio=true') + hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr) + # Set no images mode. + if no_images: + options.add_argument('--blink-settings=imagesEnabled=false') + hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr) + # Set default user agent. + if user_agent: + options.add_argument('--user-agent=%s' % user_agent) + hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent) + # Set http proxy for browser. + if http_proxy: + options.add_argument('--proxy-server=http://%s' % http_proxy) + # Set browser window size before startup. + if window_size: + options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ',')) + else: + options.add_argument('--start-maximized') + # Start the browser. + super().__init__(service=service, options=options) + # Selenium-Wire backend optimization start. + try: + self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.') + self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.') + self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.') + except AttributeError: + pass + # Selenium-Wire backend optimization end. + if mobile_emulation: + cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}]) + cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}]) + cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}]) + # Set the request and response interceptor. + if req_interceptor: + hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr) + self.request_interceptor = req_interceptor + if res_interceptor: + hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr) + self.response_interceptor = res_interceptor + # Sync set http proxy for Selenium-Wire backend. + if http_proxy: + self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy} + # Set browser window size after startup, by default, there will be full screen display window. + if window_size: + self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(',')) + else: + self.maximize_window() + # Sets a sticky timeout to implicitly wait for an element to be found. + self.implicitly_wait(10) + # Set the amount of time to wait for a page load to complete. + self.set_page_load_timeout(25) + # Open the default page. + home and self.open(home) + + @staticmethod + def wait(secs: int | float = 1): + """ + Will sleep waiting. + """ + number_int = int(secs) + number_float = secs - number_int + for i in range(number_int): + time.sleep(1) + else: + time.sleep(number_float) + + def open(self, url=None): + """ + Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer. + """ + self.update_cdp_command() + return self.get(url) + + def turn(self, url=None): + """ + Simulation "window.location.href" jumps, the new page has Referrer. + """ + return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None) + + def find(self, path): + """ + Use XPath to find an element. + """ + return self.find_element(By.XPATH, path) + + def find_mult(self, path): + """ + Use XPath to find elements. + """ + return self.find_elements(By.XPATH, path) + + def find_element_by(self, sentence): + """ + Custom find element, pass into a tuple or list. + """ + return self.find_element(*sentence) + + def click(self, element): + """ + Click element for desktop version. + """ + self.action_chains().reset_actions() + self.action_chains().click(element).perform() + self.wait(0.1) + + def touch(self, x, y): + """ + Click on the coordinates for Mobile edition. + """ + self.action_chains().reset_actions() + self.action_chains().move_by_offset(x, y).click().perform() + self.wait(0.1) + + def input(self, element, content): + """ + Enter the content to the element. + """ + self.action_chains().reset_actions() + self.action_chains().send_keys_to_element(element, content).perform() + self.wait(0.1) + + def mouse(self, element): + """ + Park the mouse here. + """ + self.action_chains().reset_actions() + self.action_chains().move_to_element(element).perform() + + def tab_create(self, url=None): + """ + Create a new tab and open the URL. + """ + self.switch_to.new_window('tab') + self.update_cdp_command() + url and self.open(url) + + def tab_switch(self, tab: int | str): + """ + Switch the browser tab page + """ + handles = self.window_handles + lengths = len(handles) + current = handles.index(self.current_window_handle) + if isinstance(tab, int): + handle = tab + elif tab == PositionTab.Prev: + handle = (current - 1) + elif tab in PositionTab.Next: + handle = (current + 1) % lengths + else: + handle = None + self.switch_to.window(handles[handle]) + self.wait(0.2) + self.update_cdp_command() + + def tab_cancel(self): + """ + Close the current browser tab page. + """ + handles = self.window_handles + if len(handles): + current = handles.index(self.current_window_handle) + self.close() + current > 0 and self.switch_to.window(handles[current - 1]) + self.wait(0.2) + + def tab_cancel_all(self): + """ + Close all the browser tab page. + """ + handles = self.window_handles + for i in handles: + self.tab_cancel() + + def frame_switch_to(self, element_of_frame): + """ + Switch frame to the specified frame element. + """ + self.switch_to.frame(element_of_frame) + self.wait(0.2) + + def frame_switch_to_default(self): + """ + Switch to the default frame. + """ + self.switch_to.default_content() + self.wait(0.2) + + def scroll(self): + """ + Scroll page. + :return: + """ + self.action_chains().reset_actions() + self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform() + self.wait(0.8) + + def scroll_to(self, pos: int | str): + """ + Scroll to the specified location. + """ + if isinstance(pos, int) and pos > 0: + self.execute_script('window.scrollTo(0, arguments[0]);', pos) + elif pos == 0: + self.execute_script('window.scrollTo(0, 0);') + elif pos == 0 - 1: + self.execute_script('window.scrollTo(0, document.body.scrollHeight);') + else: + pass + self.wait(0.8) + + def scroll_to_element(self, element): + """ + Scroll to the specified element location. + """ + self.action_chains().reset_actions() + self.action_chains().scroll_to_element(element).perform() + self.wait(0.8) + + def element_force_display(self, element): + """ + Make hidden element visible and interactive. + """ + self.execute_script( + 'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element + ) + + def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None): + """ + Return WebDriverWait object. + """ + return WebDriverWait( + driver=self, + timeout=timeout, + poll_frequency=poll_frequency, + ignored_exceptions=ignored_exceptions + ) + + def current_alert(self): + """ + Return current alert object. + """ + return Alert(self) + + def window_inner_size(self): + """ + Get the page window inner size. + """ + size = self.execute_script('return [window.innerWidth, window.innerHeight];') + return {'w': size[0] or 0, 'h': size[1] or 0} + + def action_chains(self): + """ + Return ActionChains object. + """ + return ActionChains(self) + + def screenshot(self) -> bytes: + """ + Screenshot as bytes. + """ + return self.get_screenshot_as_png() + + def update_cdp_command(self) -> None: + for cmd in self.cdplist: + self.execute_cdp_cmd(*cmd) + + +class WebDriver(Browser): + """ + Get a browser driver object. + """ + def __init__(self): + super().__init__(lang='zh-CN') + + +if __name__ == '__main__': + # e.g. Test it can work normally. + driver = WebDriver() + driver.open('https://www.hao123.com/') + driver.wait() + driver.quit() diff --git a/Lib/Http.py b/Lib/Http.py new file mode 100644 index 0000000..5cd7a93 --- /dev/null +++ b/Lib/Http.py @@ -0,0 +1,104 @@ +import requests +import requests.utils + + +class HTTPResponseObject(dict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.url = None + self.status = None + self.reason = None + self.header = None + self.cookie = None + self.binary = None + self.text, self.json = None, None + self.encoding, self.duration = None, None + + def __setattr__(self, key, value): + pass + + def __getitem__(self, item): + try: + return super().__getitem__(item) + except Exception: + return None + + def __getattr__(self, item): + try: + return super().__getitem__(item) + except Exception: + return None + + +class HTTPRequest: + __http__ = requests + __accept_method__ = ('GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 'PATCH') + + def http(self, method='GET', url: str = None, query=None, data=None, json=None, file=None, header=None, cookie=None, + ua: str = None, auth=None, timeout: int | float = None, proxy: str = None, auto_redirect=False, ignore_cert_error=False): + method = method.upper() + if method not in self.__accept_method__: + raise Exception('Unsupported request method.') + if method == 'HEAD': + auto_redirect = False + if ua: + if not isinstance(header, dict): + header = {} + header['User-Agent'] = ua + if isinstance(proxy, str) and len(proxy) >= 5: + proxy = {'http': proxy, 'https': proxy} + else: + proxy = None + try: + response = self.__http__.request( + method=method, + url=url, + params=query, + data=data, + headers=header, + cookies=cookie, + files=file, + auth=auth, + timeout=timeout, + allow_redirects=auto_redirect, + proxies=proxy, + verify=not ignore_cert_error, + json=json + ) + except Exception: + response = None + try: + response_json = response.json() + except Exception: + response_json = None + if response: + return HTTPResponseObject({ + 'url': response.url, + 'status': response.status_code, + 'reason': response.reason, + 'header': response.headers, + 'cookie': requests.utils.dict_from_cookiejar(response.cookies), + 'binary': response.content, + 'text': response.text, + 'json': response_json, + 'encoding': response.encoding, + 'duration': round(response.elapsed.microseconds / 1000, 1) + }) + else: + return HTTPResponseObject({ + 'status': -1 + }) + + +class HTTPSession(HTTPRequest): + __http__ = requests.session() + + +if __name__ == '__main__': + # e.g. Send a get request. + res = HTTPSession().http( + method='GET', url='http://pv.sohu.com/cityjson', query={'ie': 'utf-8'}, + ua='Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36', + auto_redirect=True + ) + print(res) diff --git a/Lib/Json.py b/Lib/Json.py new file mode 100644 index 0000000..8d649eb --- /dev/null +++ b/Lib/Json.py @@ -0,0 +1,9 @@ +import json + + +def json_encode(data, indent=None, unicode=True): + return json.dumps(data, indent=indent, ensure_ascii=unicode) + + +def json_decode(data): + return json.loads(data) diff --git a/Lib/LoadConf.py b/Lib/LoadConf.py new file mode 100644 index 0000000..d550b44 --- /dev/null +++ b/Lib/LoadConf.py @@ -0,0 +1,43 @@ +from Lib.Json import json_decode +from Lib.Yaml import yaml_decode +import os + +class LoadConf(dict): + """ + Load the configuration file from the configuration directory. + """ + def __init__(self, name: str): + home = '%s%s%s' % (os.path.dirname(os.path.dirname(__file__)), os.sep, 'Conf') + try: + os.path.exists(home) or os.makedirs(home) + except FileExistsError: + pass + json_extension = '.json' + yaml_extension = '.yaml' + pa_json = '%s/%s%s' % (home, name, json_extension) + pa_yaml = '%s/%s%s' % (home, name, yaml_extension) + data = None + if os.path.exists(pa_json): + file = pa_json + data = json_decode(open(file, encoding='utf-8').read()) + if os.path.exists(pa_yaml): + file = pa_yaml + data = yaml_decode(open(file, encoding='utf-8').read()) + if data is None: + raise FileNotFoundError('Not found json or yaml configuration "%s" in %s.' % (name, home)) + super().__init__(data) + + def __setattr__(self, key, value): + raise AttributeError('Attribute is read only.') + + def __getattr__(self, item): + try: + return super().__getitem__(item) + except KeyError: + return None + + def __setitem__(self, key, value): + return self.__setattr__(key, value) + + def __getitem__(self, item): + return self.__getattr__(item) diff --git a/Lib/LoadData.py b/Lib/LoadData.py new file mode 100644 index 0000000..e149147 --- /dev/null +++ b/Lib/LoadData.py @@ -0,0 +1,53 @@ +from Lib.Json import json_decode +from Lib.Yaml import yaml_decode +import csv +import os + + +class LoadData: + """ + Load the data file from the data directory. + """ + def __init__(self): + home = '%s%s%s' % (os.path.dirname(os.path.dirname(__file__)), os.sep, 'Data') + try: + os.path.exists(home) or os.makedirs(home) + except FileExistsError: + pass + self.home = home + + def fromJson(self, filename: str, from_data_home=True): + return json_decode(open('%s/%s' % (self.home, filename) if from_data_home else filename, encoding='utf-8').read()) + + def fromYaml(self, filename: str, from_data_home=True): + return yaml_decode(open('%s/%s' % (self.home, filename) if from_data_home else filename, encoding='utf-8').read()) + + def fromCsv(self, filename: str, from_data_home=True): + """ + Load data from CSV file, compatible standard format. + :param filename: Filename or path. + :param from_data_home: Whether to load from the data directory, otherwise use the absolute path. + :return: List. + """ + file = '%s/%s' % (self.home, filename) if from_data_home else filename + try: + open(file).read(4096) + encoding = None + except UnicodeDecodeError: + encoding = 'utf-8' + return [row for row in csv.reader(open(file, encoding=encoding))] + + def fromTxt(self, filename: str, from_data_home=True): + """ + Load the data from the txt text file, and one line is used as a data. + :param filename: Filename or path. + :param from_data_home: Whether to load from the data directory, otherwise use the absolute path. + :return: List. + """ + file = '%s/%s' % (self.home, filename) if from_data_home else filename + try: + open(file).read(4096) + encoding = None + except UnicodeDecodeError: + encoding = 'utf-8' + return [row.rstrip("\n") for row in open(file, encoding=encoding).readlines()] diff --git a/Lib/SeleniumClear.py b/Lib/SeleniumClear.py new file mode 100644 index 0000000..d02f673 --- /dev/null +++ b/Lib/SeleniumClear.py @@ -0,0 +1,29 @@ +import tempfile +import platform +import shutil +import glob +import os + + +def clear_selenium(): + if platform.uname().system == 'Windows': + user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')] + if user_home[0] and user_home[1]: + try: + shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1])) + except FileNotFoundError: + pass + +def clear_driver_cache(): + for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']: + for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)): + try: + shutil.rmtree(i) + except (FileNotFoundError, PermissionError, WindowsError): + pass + + +if __name__ == '__main__': + # e.g. Clear + clear_selenium() + clear_driver_cache() diff --git a/Lib/Smtp.py b/Lib/Smtp.py new file mode 100644 index 0000000..f143311 --- /dev/null +++ b/Lib/Smtp.py @@ -0,0 +1,247 @@ +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.mime.base import MIMEBase +from email.header import Header +from email import encoders +import smtplib +import sys +import os +import re + + +class SMTPMailerSendState(dict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.code = self.message = None + + def __setattr__(self, key, value): + pass + + def __getitem__(self, item): + try: + return super().__getitem__(item) + except Exception: + return None + + def __getattr__(self, item): + try: + return super().__getitem__(item) + except Exception: + return None + + +class SMTPMailer: + __host__ = { + 'host': '', + 'port': 25 + } + __user__ = { + 'user': '', + 'pass': '' + } + __mail__ = { + 'from': [], + 'to': [], + 'cc': [], + 'bc': [], + 'attachs': [], + 'subject': '', + 'content': '' + } + __flag__ = 0 + __addr__ = 0 + __from__ = 0 + + def __init__( + self, + username='', + password='', + host='127.0.0.1', + port=25, + ssl_enable=False, + send_from='', + send_to=None, + send_cc=None, + send_bc=None, + subject='', + content='', + attachs=None + ): + self.ssl_enable = ssl_enable + self.host(host, port) + self.user(username, password) + send_from and self.send_from(send_from) + send_to and self.to(send_to) + send_cc and self.cc(send_cc) + send_bc and self.bc(send_bc) + subject and self.subject(subject) + content and self.content(content) + attachs and self.attachs(attachs) + + @staticmethod + def test_email(email: str, throw_exception=False): + test = re.fullmatch('^([a-zA-Z0-9_.\\-])+@(([a-zA-Z0-9\\-])+\\.)+([a-zA-Z0-9]{2,4})+$', email) is not None + if throw_exception and test is False: + raise Exception('Illegal email address: %s.' % email) + return test + + @staticmethod + def gen_header_of_mailer(e): + try: + return "\"%s\" <%s>" % (Header(e[0]).encode(), Header(e[1]).encode()) + except Exception: + return '' + + def parse_email_address(self, email: str): + if email.find('<') != -1 and email.find('>') != -1: + match_email = re.findall('^(.*?)<(.*?)>$', email) + if len(match_email) == 1: + addr = match_email[0][1].strip() + self.test_email(addr, throw_exception=True) + name = match_email[0][0].strip() or addr.split('@')[0].strip() + return [name, addr] + else: + raise Exception('The wrong email address format: %s.' % email) + else: + if len(email) != 0: + addr = email + self.test_email(addr, throw_exception=True) + name = email.split('@')[0].strip() + return [name, addr] + else: + raise Exception('Empty email address.') + + def parse_email_address_mult(self, data): + if not isinstance(data, (str, list, tuple)): + raise TypeError('Email address list must be string or list or tuple type.') + if isinstance(data, str): + data = [i.strip() for i in data.replace(',', ';').split(';')] + while '' in data: + data.remove('') + return [self.parse_email_address(email_format) for email_format in data] + + def enableSSL(self): + self.ssl_enable = True + return self + + def host(self, host: str, port: int): + self.__host__['host'] = host + self.__host__['port'] = port + return self + + def user(self, username, password): + self.__user__['user'] = username + self.__user__['pass'] = password + return self + + def send_from(self, e): + self.__mail__['from'] = self.parse_email_address(e) + self.__from__ += 1 + return self + + def to(self, receive_list): + self.__mail__['to'] = self.parse_email_address_mult(receive_list) + self.__addr__ += 1 + return self + + def cc(self, receive_list): + self.__mail__['cc'] = self.parse_email_address_mult(receive_list) + self.__addr__ += 1 + return self + + def bc(self, receive_list): + self.__mail__['bc'] = self.parse_email_address_mult(receive_list) + self.__addr__ += 1 + return self + + def subject(self, text: str): + self.__mail__['subject'] = text + return self + + def content(self, text: str): + self.__mail__['content'] = text + return self + + def attachs(self, attachs_path_list: str | list): + if isinstance(attachs_path_list, str): + attachs_path_list = [attachs_path_list] + for path in attachs_path_list: + if not os.path.exists(path): + raise FileNotFoundError('The attachment was not found.') + self.__mail__['attachs'] = attachs_path_list + return self + + def send(self, test_only=False): + if self.__flag__: + return SMTPMailerSendState({'code': 1, 'message': 'Mail has been sent.'}) + if self.__addr__ == 0: + raise Exception('No receiver.') + message = MIMEMultipart() + if self.__from__ == 0 and self.test_email(self.__user__['user']) and self.send_from(self.__user__['user']): + pass + message['From'] = self.gen_header_of_mailer(self.__mail__['from']) + to_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['to']]) + cc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['cc']]) + bc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['bc']]) + message['To'], message['Cc'], message['Bcc'] = to_header, cc_header, bc_header + message['Subject'] = Header(self.__mail__['subject']) + message.attach(MIMEText(self.__mail__['content'], 'html', 'utf-8')) + for file in self.__mail__['attachs']: + mime = MIMEBase('application', 'octet-stream') + mime.add_header('Content-Disposition', 'attachment', filename=os.path.basename(file)) + mime.set_payload(open(file, 'rb').read()) + encoders.encode_base64(mime) + message.attach(mime) + try: + smtp = [smtplib.SMTP, smtplib.SMTP_SSL][self.ssl_enable]( + self.__host__['host'], + self.__host__['port'] + ) + except Exception: + return SMTPMailerSendState({'code': 1, 'message': 'Connection failure.'}) + try: + smtp.login(self.__user__['user'], self.__user__['pass']) + except Exception: + return SMTPMailerSendState({'code': 2, 'message': 'Account authentication failed.'}) + try: + receivers = [] + for rg in [self.__mail__['to'], self.__mail__['cc'], self.__mail__['bc']]: + for rp in rg: + receivers.append(rp[1]) + if test_only: + print(message.as_string(), file=sys.stderr) + return None + smtp.sendmail(self.__user__['user'], receivers, message.as_string()) + smtp.quit() + self.__flag__ = 1 + return SMTPMailerSendState({'code': 0, 'message': 'Mail sent successfully.'}) + except Exception as error: + return SMTPMailerSendState({'code': 3, 'message': str(error)}) + + +if __name__ == '__main__': + # e.g. Send an email example. + print(SMTPMailer().enableSSL().host('smtp.qq.com', 465).user('admin@example.com', '******'). + subject('Email Subject').content('This is some content...').to('user@example.com').send()) + # e.g. Or you can send emails like this. + print(SMTPMailer( + username='admin@example.com', + password='******', + ssl_enable=True, + host='smtp.qq.com', + port=465, + # The recipient format can be "Nickname " or "Email", like this: "Mrs.Lisa " or "lisa@example.com" + # Multiple recipients can be separated by semicolon(;) or comma(,). + # Multiple recipients you can also pass a list. + send_from='Administrator ', + # Add recipients. + send_to='Mr.Tom , Mr.Jack ', + # Add Cc. + send_cc='Lindsay , Nora ', + # Add Bcc. + send_bc='frederica@example.com', + subject='Email Subject', + content='This is some content...', + # You can add attachments, you need to pass the path of a single or more attachment in the form of a list. + attachs=None + ).send()) diff --git a/Lib/TestcaseBasicLibrary.py b/Lib/TestcaseBasicLibrary.py new file mode 100644 index 0000000..ffbb784 --- /dev/null +++ b/Lib/TestcaseBasicLibrary.py @@ -0,0 +1,4 @@ +from Lib.LoadConf import * +from Lib.LoadData import * +import pytest +import allure diff --git a/Lib/Time.py b/Lib/Time.py new file mode 100644 index 0000000..7724a18 --- /dev/null +++ b/Lib/Time.py @@ -0,0 +1,38 @@ +import time + + +def fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=None, utc=False): + return time.strftime(fmt, [time.localtime, time.gmtime][utc](ts)) + + +def par_time(fmt='', dt=''): + return time.mktime(time.strptime(dt, fmt)) + + +def asc_time(ts=None): + return time.asctime(time.localtime(ts)) + + +def int_time(): + return int(time.time()) + + +def mic_time(): + return round(time.time(), 6) + + +if __name__ == '__main__': + # e.g. Get the current formatting time. + print(fmt_time(fmt='%Y-%m-%d %H:%M:%S')) + # e.g. Get the formatting time and specify the timestamp. + print(fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=0)) + # e.g. Get UTC formatting time. + print(fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=0, utc=True)) + # e.g. Convert the formatting time to timestamp. + print(par_time(fmt='%Y-%m-%d %H:%M:%S', dt='2020-12-31 12:00:00')) + # e.g. Get ASC formatting time. + print(asc_time()) + # e.g. Get integer timestamp. + print(int_time()) + # e.g. Get accurate to microsecond timestamp. + print(mic_time()) diff --git a/Lib/Yaml.py b/Lib/Yaml.py new file mode 100644 index 0000000..8233cd8 --- /dev/null +++ b/Lib/Yaml.py @@ -0,0 +1,9 @@ +import yaml + + +def yaml_encode(data, indent=None, unicode=False): + return yaml.dump(data, indent=indent, allow_unicode=not unicode, sort_keys=False) + + +def yaml_decode(data): + return yaml.load(data, Loader=yaml.CFullLoader) diff --git a/Pom/Base/__init__.py b/Pom/Base/__init__.py new file mode 100644 index 0000000..f04e09b --- /dev/null +++ b/Pom/Base/__init__.py @@ -0,0 +1,16 @@ +from Lib.Driver import * + + +class PageBase: + def __init__(self, d: WebDriver): + self.driver = d + + def home(self): + self.driver.open('https://www.fanscloud.net/') + + def exit(self): + self.driver.tab_cancel_all() + self.driver.quit() + + def reload(self): + self.driver.refresh() diff --git a/Pom/Home/Signin/__init__.py b/Pom/Home/Signin/__init__.py new file mode 100644 index 0000000..b5a0061 --- /dev/null +++ b/Pom/Home/Signin/__init__.py @@ -0,0 +1,25 @@ +from Pom.Home import * + + +class PageSignin(PageHome): + box_user = [By.XPATH, '//input[@type="text" and @id="user"]'] + box_pass = [By.XPATH, '//input[@type="password" and @id="pw"]'] + box_persist = [By.XPATH, '//input[@type="checkbox" and @id="persist"]'] + btn_submit = [By.XPATH, '//button[@type="submit"]'] + + def open(self): + self.home() + self.click_signin() + return self + + def input_user(self, value): + return self.driver.input(self.driver.find_element_by(self.box_user), value) + + def input_pass(self, value): + return self.driver.input(self.driver.find_element_by(self.box_pass), value) + + def click_persist(self): + return self.driver.click(self.driver.find_element_by(self.box_persist)) + + def click_submit(self): + return self.driver.click(self.driver.find_element_by(self.btn_submit)) diff --git a/Pom/Home/Signup/__init__.py b/Pom/Home/Signup/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Pom/Home/__init__.py b/Pom/Home/__init__.py new file mode 100644 index 0000000..9aa5c18 --- /dev/null +++ b/Pom/Home/__init__.py @@ -0,0 +1,11 @@ +from Pom.Base import * + +class PageHome(PageBase): + btn_signin = [By.LINK_TEXT, '登录'] + + def open(self): + self.home() + return self + + def click_signin(self): + return self.driver.click(self.driver.find_element_by(self.btn_signin)) diff --git a/Testcase/test_signin.py b/Testcase/test_signin.py new file mode 100644 index 0000000..321805b --- /dev/null +++ b/Testcase/test_signin.py @@ -0,0 +1,22 @@ +from Lib.TestcaseBasicLibrary import * +from Pom.Home.Signin import * + + +class TestSignin: + def setup_class(self): + self.page = PageSignin(WebDriver()).open() + + def teardown_class(self): + self.page.exit() + + @pytest.mark.parametrize(('username', 'password'), [['super', 'super'], ['guest', 'guest'], ['users', 'users']]) + def test_errors_signin(self, username, password): + self.page.reload() + self.page.input_user(username) + self.page.input_pass(password) + + @pytest.mark.parametrize(('username', 'password'), [['admin', 'admin']]) + def test_normal_signin(self, username, password): + self.page.reload() + self.page.input_user(username) + self.page.input_pass(password) diff --git a/environment.properties b/environment.properties new file mode 100644 index 0000000..5538b2b --- /dev/null +++ b/environment.properties @@ -0,0 +1 @@ +Tester = User \ No newline at end of file diff --git a/pytest.ini b/pytest.ini index e42f559..6f1f26b 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,12 @@ [pytest] -addopts = -q --alluredir=Report/allure_result testpaths = Testcase +addopts = -vs --strict-markers --reruns=1 --reruns-delay=3 +log_cli = 1 +pythonpath = ./ +cache_dir = pytest_cache +norecursedirs = .* venv po pom library dist build +markers = + p0: P0 level testcase + p1: P1 level testcase + p2: P2 level testcase + p3: P3 level testcase diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..52a2d327adc4b0a312e836a3dabf4517f821d55f GIT binary patch literal 1660 zcmZ{l&2H0B5QS%r#G?e&4sBt<0@fgqx?-88PHZJ{f|JnFhX>9#7}6ZLr2>cCcS|(%Vtb54vi5W0lSIyt3E!%Kqow+TMD_F7pZs?zHol=GyR!6PcKm$go^3{x(!aEJ9?s5@oYcn+`wC8@ zJ>n$WYP8qNS&v+)a69^~CXc{%En79(THyv;qdGI;fWgl7>FjyJgmT0NW)#}G=fZu# z2ClcdZ^pcX6gs#fYh_&Mz10^y=fY1WRG=3Rx}r-|xN#fb8+($%jc4gpt(Vs5an_eq zqhP|jkw3jC*`gBJR>HtEEQJHc50hUYNp7upR-T{lm2(Q#t?*J8mZfehGz(?jOg7>& z0u7jrm20rmI~jsw_Mj7%UO7S=R0JF?U@F*liiqo_`tc*1RvHp`KHpbs2q z26N^*5P-kTU@};!3U#wqIWGr4TWeXOhrvH_aN1C}zfca9-Z>MjOIIo8EKUg3Yx%4*Fn?Oh)+yw%wz1tf{=NL zX=F?8gfMnmOkpx-hOMZJ=%JUOc0VdEW8oyoU<>67Y-A$#Xv8uKny)<9>Zq bj)VNzv4vZy_$)hYdU1=Lfj)tYXYlz8B8BmY literal 0 HcmV?d00001