commit 86d46bc1655896b807b04a0bcfef8db3f150e188 Author: zhaoyafan Date: Mon Mar 13 18:49:49 2023 +0800 Commit. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..97c843f --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/Report/* +.pytest_cache/ +__pycache__/ diff --git a/Common/Basic.py b/Common/Basic.py new file mode 100644 index 0000000..6014bfd --- /dev/null +++ b/Common/Basic.py @@ -0,0 +1,4 @@ +from Library.LoadConfig import * +from Library.LoadData import * +import unittest +import ddt diff --git a/Config/default.yaml b/Config/default.yaml new file mode 100644 index 0000000..3eb9437 --- /dev/null +++ b/Config/default.yaml @@ -0,0 +1 @@ +report_format: '%Y-%m-%d-%H-%M-%S' diff --git a/Data/illegal_passwds.csv b/Data/illegal_passwds.csv new file mode 100644 index 0000000..301a7ba --- /dev/null +++ b/Data/illegal_passwds.csv @@ -0,0 +1,20 @@ +123456 +password +12345678 +qwerty +123456789 +12345 +1234 +111111 +1234567 +dragon +123123 +baseball +abc123 +football +monkey +letmein +696969 +shadow +master +666666 diff --git a/Data/illegal_passwds.json b/Data/illegal_passwds.json new file mode 100644 index 0000000..39ca4cb --- /dev/null +++ b/Data/illegal_passwds.json @@ -0,0 +1 @@ +["123456", "password", "12345678", "qwerty", "123456789", "12345", "1234", "111111", "1234567", "dragon", "123123", "baseball", "abc123", "football", "monkey", "letmein", "696969", "shadow", "master", "666666"] diff --git a/Data/illegal_passwds.txt b/Data/illegal_passwds.txt new file mode 100644 index 0000000..301a7ba --- /dev/null +++ b/Data/illegal_passwds.txt @@ -0,0 +1,20 @@ +123456 +password +12345678 +qwerty +123456789 +12345 +1234 +111111 +1234567 +dragon +123123 +baseball +abc123 +football +monkey +letmein +696969 +shadow +master +666666 diff --git a/Data/illegal_passwds.yaml b/Data/illegal_passwds.yaml new file mode 100644 index 0000000..678ebb6 --- /dev/null +++ b/Data/illegal_passwds.yaml @@ -0,0 +1,20 @@ +- '123456' +- 'password' +- '12345678' +- 'qwerty' +- '123456789' +- '12345' +- '1234' +- '111111' +- '1234567' +- 'dragon' +- '123123' +- 'baseball' +- 'abc123' +- 'football' +- 'monkey' +- 'letmein' +- '696969' +- 'shadow' +- 'master' +- '666666' diff --git a/Library/Base64.py b/Library/Base64.py new file mode 100644 index 0000000..f73aafb --- /dev/null +++ b/Library/Base64.py @@ -0,0 +1,45 @@ +import base64 +import os + + +def base64_encode(data, output=None): + """ + :param data: Bytes or String or FileStream. + :param output: Output file stream, when the data parameter is the file stream valid. + :return: What type of data entering returns the same data type. + """ + if isinstance(data, bytes): + return base64.b64encode(data) + if isinstance(data, str): + return base64.b64encode(bytes(data, encoding='utf-8')).decode() + else: + return base64.encode(data, output) + + +def base64_decode(data, output=None): + """ + :param data: Bytes or String or FileStream. + :param output: Output file stream, when the data parameter is the file stream valid. + :return: What type of data entering returns the same data type. + """ + if isinstance(data, bytes): + return base64.b64decode(data) + if isinstance(data, str): + return base64.b64decode(bytes(data, encoding='utf-8')).decode() + else: + return base64.decode(data, output) + + +if __name__ == '__main__': + # e.g. Code bytes type. + print(base64_encode(bytes('Some text content.', encoding='utf-8'))) + # e.g. Code string type. + print(base64_encode('Some text content.')) + # e.g. Read the file and encode and write to another file. + # 1.Create a test example file, + open('./example_of_base64_encode', 'w').write('Some text content.') + # 2.Then test, + print(base64_encode(open('./example_of_base64_encode', 'rb'), open('./example_of_base64_encode.base64', 'wb'))) + # 3.Final cleanup. + os.remove('./example_of_base64_encode') + os.remove('./example_of_base64_encode.base64') diff --git a/Library/Clear.py b/Library/Clear.py new file mode 100644 index 0000000..d02f673 --- /dev/null +++ b/Library/Clear.py @@ -0,0 +1,29 @@ +import tempfile +import platform +import shutil +import glob +import os + + +def clear_selenium(): + if platform.uname().system == 'Windows': + user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')] + if user_home[0] and user_home[1]: + try: + shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1])) + except FileNotFoundError: + pass + +def clear_driver_cache(): + for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']: + for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)): + try: + shutil.rmtree(i) + except (FileNotFoundError, PermissionError, WindowsError): + pass + + +if __name__ == '__main__': + # e.g. Clear + clear_selenium() + clear_driver_cache() diff --git a/Library/Driver.py b/Library/Driver.py new file mode 100644 index 0000000..3db030e --- /dev/null +++ b/Library/Driver.py @@ -0,0 +1,485 @@ +from selenium import webdriver as _root_webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.common.alert import Alert +from selenium.webdriver.common.action_chains import ActionChains +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.wait import WebDriverWait +from selenium.common.exceptions import * +import tempfile +import platform +import random +import json +import time +import sys +import os + +# You can set the drive path and browser location through the environment variables, +# and select the webdriver type. +# It has preferential priority through environmental variable settings. +SELENIUM_WIRE = os.environ.get('SELENIUM_WIRE') or '' +# Attention!!! +# Must set the environment before importing selenium. +# Otherwise, these settings will be invalid. +SELENIUM_BROWSER_CHOOSE = os.environ.get('SELENIUM_BROWSER_CHOOSE') or 'Chrome' +SELENIUM_BROWSER_DRIVER = os.environ.get('SELENIUM_BROWSER_DRIVER') or '' +SELENIUM_BROWSER_BINARY = os.environ.get('SELENIUM_BROWSER_BINARY') or '' +# Choose Selenium or Selenium-Wire, default: Selenium. +if SELENIUM_WIRE: + exec('from seleniumwire import webdriver as _root_webdriver') +# Choose options and service. +if SELENIUM_BROWSER_CHOOSE: + exec('from selenium.webdriver.%s.options import Options' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').lower()) + exec('from selenium.webdriver.%s.service import Service' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').lower()) +# Use the chrome driver by default, such a code writing method is for grammar prompts. +DriverChoose = _root_webdriver.Chrome +if SELENIUM_BROWSER_CHOOSE: + exec('DriverChoose=_root_webdriver.%s' % (SELENIUM_BROWSER_CHOOSE or 'Chrome').capitalize()) + + +class BrowserMobileEmulation(dict): + """ + Mobile emulation parameters. + """ + def __init__(self, w=540, h=960, user_agent=None): + du = 'Mozilla/5.0 (Linux; U; Android 13; zh-cn; 2109119BC Build/TKQ1.220829.002) ' \ + 'AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 ' \ + 'Chrome/98.0.4758.102 MQQBrowser/13.6 Mobile Safari/537.36' + user_agent = user_agent or du + super().__init__({'w': w, 'h': h, 'user_agent': user_agent}) + self.w = self.h = self.user_agent = None + + def __setattr__(self, key, value): + pass + + def __getitem__(self, item): + try: + return super().__getitem__(item) + except KeyError: + return None + + def __getattr__(self, item): + try: + return super().__getitem__(item) + except KeyError: + return None + + +class PositionTab: + """ + Position for switch tab. + """ + Prev = 'Go-Prev' + Next = 'Go-Next' + + +class Browser(DriverChoose): + """ + Browser web driver. + """ + def __init__( + self, + driver: str = None, + binary: str = None, + headless: bool = False, + lang: str = None, + mute: bool = False, + no_images: bool = False, + user_agent: str = None, + http_proxy: str = None, + home: str = None, + window_size: str = None, + mobile_emulation: BrowserMobileEmulation = None, + option_arguments: list = None, + req_interceptor=None, + res_interceptor=None, + ): + choose = SELENIUM_BROWSER_CHOOSE.capitalize() + self.platform = platform.uname().system + default_driver = None + default_binary = None + if choose == 'Chrome': + if self.platform == 'Linux': + default_driver = '/opt/google/chrome/chromedriver' + default_binary = '/opt/google/chrome/chrome' + else: + default_driver = 'C:/Program Files/Google/Chrome/Application/chromedriver.exe' + default_binary = 'C:/Program Files/Google/Chrome/Application/chrome.exe' + driver = driver or SELENIUM_BROWSER_DRIVER or default_driver + binary = binary or SELENIUM_BROWSER_BINARY or default_binary + if self.platform == 'Linux' and not window_size: window_size = '1920x1080' + # Initialization settings. + if (isinstance(option_arguments, list)) is False: option_arguments = [] + cdplist = [] + service = Service() + options = Options() + self.cdplist = cdplist + self.ignore_page_load_timeout = 0 + # Delete prompt information of chrome being controlled. + hasattr(options, 'add_experimental_option') and options.add_experimental_option("excludeSwitches", ['enable-automation']) + # Mobile emulation parameter setting start. + if mobile_emulation: + if hasattr(options, 'add_experimental_option') is False: + raise Exception('Do not support mobile emulation currently.') + self.w_browser = mobile_emulation.w + 14 + self.h_browser = mobile_emulation.h + 0 + self.w_inner_window = mobile_emulation.w + 0 + self.h_inner_window = mobile_emulation.h - 86 + self.mobile_emulation_screen_w = mobile_emulation.w + self.mobile_emulation_screen_h = mobile_emulation.h + window_size = '%s,%s' % (self.w_browser, self.h_browser) + options.add_experimental_option( + 'mobileEmulation', { + 'deviceMetrics': { + 'width': self.w_inner_window, + 'height': self.h_inner_window, + 'pixelRatio': 2.75, + 'touch': True + }, + 'userAgent': mobile_emulation.user_agent + } + ) + cdplist.append([ + 'Emulation.setUserAgentOverride', { + 'userAgent': mobile_emulation.user_agent, + 'userAgentMetadata': { + 'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone', + 'mobile': True, + 'platformVersion': '', + 'architecture': '', + 'model': '' + } + } + ]) + else: + self.w_browser = 0 + self.h_browser = 0 + self.w_inner_window = 0 + self.h_inner_window = 0 + self.mobile_emulation_screen_w = 0 + self.mobile_emulation_screen_h = 0 + # Mobile emulation parameter setting end. + # Set browser and webdriver path. + if driver: + service.path = driver + if binary: + options.binary_location = binary + # Add webdriver option arguments. + for i in option_arguments: + options.add_argument(i) + # Set headless mode. + if self.platform == 'Linux' or headless: + options.add_argument('--headless') + # Set no-sandbox mode. + if self.platform == 'Linux': + options.add_argument('--no-sandbox') + # Set language of browser, default is zh-CN. + if lang: + options.add_argument('--lang=%s' % (lang or 'zh-CN')) + hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN') + # Set mute. + if mute: + options.add_argument('--mute-audio=true') + hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr) + # Set no images mode. + if no_images: + options.add_argument('--blink-settings=imagesEnabled=false') + hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr) + # Set default user agent. + if user_agent: + options.add_argument('--user-agent=%s' % user_agent) + hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent) + # Set http proxy for browser. + if http_proxy: + options.add_argument('--proxy-server=http://%s' % http_proxy) + # Set browser window size before startup. + if window_size: + options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ',')) + else: + options.add_argument('--start-maximized') + # Start the browser. + super().__init__(service=service, options=options) + # Selenium-Wire backend optimization start. + try: + self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.') + self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.') + self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.') + except AttributeError: + pass + # Selenium-Wire backend optimization end. + if mobile_emulation: + cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}]) + cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}]) + cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}]) + # Set the request and response interceptor. + if req_interceptor: + hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr) + self.request_interceptor = req_interceptor + if res_interceptor: + hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr) + self.response_interceptor = res_interceptor + # Sync set http proxy for Selenium-Wire backend. + if http_proxy: + self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy} + # Set browser window size after startup, by default, there will be full screen display window. + if window_size: + self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(',')) + else: + self.maximize_window() + # Sets a sticky timeout to implicitly wait for an element to be found. + self.implicitly_wait(10) + # Set the amount of time to wait for a page load to complete. + self.set_page_load_timeout(25) + # Open the default page. + home and self.open(home) + + @staticmethod + def wait(secs: int | float = 1): + """ + Will sleep waiting. + """ + number_int = int(secs) + number_float = secs - number_int + for i in range(number_int): + time.sleep(1) + else: + time.sleep(number_float) + + def open(self, url=None): + """ + Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer. + """ + self.update_cdp_command() + try: + return self.get(url) + except TimeoutException: + if self.ignore_page_load_timeout: + return False + else: + raise + + def turn(self, url=None): + """ + Simulation "window.location.href" jumps, the new page has Referrer. + """ + return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None) + + def find(self, path): + """ + Use XPath to find an element. + """ + return self.find_element(By.XPATH, path) + + def find_mult(self, path): + """ + Use XPath to find elements. + """ + return self.find_elements(By.XPATH, path) + + def find_element_by(self, sentence): + """ + Custom find element, pass into a tuple or list. + """ + return self.find_element(*sentence) + + def click(self, element): + """ + Click element for desktop version. + """ + self.action_chains().reset_actions() + self.action_chains().click(element).perform() + self.wait(0.1) + + def touch(self, x, y): + """ + Click on the coordinates for Mobile edition. + """ + self.action_chains().reset_actions() + self.action_chains().move_by_offset(x, y).click().perform() + self.wait(0.1) + + def input(self, element, content): + """ + Enter the content to the element. + """ + self.action_chains().reset_actions() + self.action_chains().send_keys_to_element(element, content).perform() + self.wait(0.1) + + def mouse(self, element): + """ + Park the mouse here. + """ + self.action_chains().reset_actions() + self.action_chains().move_to_element(element).perform() + + def tab_create(self, url=None): + """ + Create a new tab and open the URL. + """ + self.switch_to.new_window('tab') + self.update_cdp_command() + url and self.open(url) + + def tab_switch(self, tab: int | str): + """ + Switch the browser tab page + """ + handles = self.window_handles + lengths = len(handles) + current = handles.index(self.current_window_handle) + if isinstance(tab, int): + handle = tab + elif tab == PositionTab.Prev: + handle = (current - 1) + elif tab in PositionTab.Next: + handle = (current + 1) % lengths + else: + handle = None + self.switch_to.window(handles[handle]) + self.wait(0.2) + self.update_cdp_command() + + def tab_cancel(self): + """ + Close the current browser tab page. + """ + handles = self.window_handles + if len(handles): + current = handles.index(self.current_window_handle) + self.close() + current > 0 and self.switch_to.window(handles[current - 1]) + self.wait(0.2) + + def tab_cancel_all(self): + """ + Close all the browser tab page. + """ + handles = self.window_handles + for i in handles: + self.tab_cancel() + + def frame_switch_to(self, element_of_frame): + """ + Switch frame to the specified frame element. + """ + self.switch_to.frame(element_of_frame) + self.wait(0.2) + + def frame_switch_to_default(self): + """ + Switch to the default frame. + """ + self.switch_to.default_content() + self.wait(0.2) + + def scroll(self): + """ + Scroll page. + :return: + """ + self.action_chains().reset_actions() + self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform() + self.wait(0.8) + + def scroll_to(self, pos: int | str): + """ + Scroll to the specified location. + """ + if isinstance(pos, int) and pos > 0: + self.execute_script('window.scrollTo(0, arguments[0]);', pos) + elif pos == 0: + self.execute_script('window.scrollTo(0, 0);') + elif pos == 0 - 1: + self.execute_script('window.scrollTo(0, document.body.scrollHeight);') + else: + pass + self.wait(0.8) + + def scroll_to_element(self, element): + """ + Scroll to the specified element location. + """ + self.action_chains().reset_actions() + self.action_chains().scroll_to_element(element).perform() + self.wait(0.8) + + def element_force_display(self, element): + """ + Make hidden element visible and interactive. + """ + self.execute_script( + 'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element + ) + + def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None): + """ + Return WebDriverWait object. + """ + return WebDriverWait( + driver=self, + timeout=timeout, + poll_frequency=poll_frequency, + ignored_exceptions=ignored_exceptions + ) + + def current_alert(self): + """ + Return current alert object. + """ + return Alert(self) + + def window_inner_size(self): + """ + Get the page window inner size. + """ + size = self.execute_script('return [window.innerWidth, window.innerHeight];') + return {'w': size[0] or 0, 'h': size[1] or 0} + + def action_chains(self): + """ + Return ActionChains object. + """ + return ActionChains(self) + + def ignore_timeout(self): + """ + Page loading timeout will not throw an exception. + """ + self.ignore_page_load_timeout = 1 + + def screenshot_for_report(self): + """ + Screenshot for HTMLTestRunner report, + will print the screenshot path to the to let it analyze it. + """ + root = os.environ.get('HTML_REPORT_ROOT') or tempfile.gettempdir() + basename = '%s%s.png' % (time.strftime('%Y%m%d%H%M%S', time.localtime()), random.randint(1000, 9999)) + fullname = '%s/%s' % (root, basename) + if os.path.exists(root): + if self.get_screenshot_as_file(fullname): + print('[report-screenshot]%s[/report-screenshot]' % fullname, file=sys.stderr) + else: + print('Warning: Screenshot failure.', file=sys.stderr) + + def update_cdp_command(self) -> None: + for cmd in self.cdplist: + self.execute_cdp_cmd(*cmd) + + +class WebDriver(Browser): + """ + Get a browser driver object. + """ + def __init__(self): + super().__init__(lang='zh-CN') + + +if __name__ == '__main__': + # e.g. Test it can work normally. + driver = WebDriver() + driver.open('https://www.hao123.com/') + driver.wait() + driver.quit() diff --git a/Library/HTMLTestRunner.py b/Library/HTMLTestRunner.py new file mode 100644 index 0000000..e17872a --- /dev/null +++ b/Library/HTMLTestRunner.py @@ -0,0 +1,1168 @@ +""" + +这是一个用于unittest框架的TestRunner,用它可以生成HTML测试报告,包含用例执行情况以 + +及图表; + +使用它最简单方法是调用main方法,例如: + +------------------------------------------------------------------------ + +# import unittest +# import HTMLTestRunner + +# 在这里, +# 定义你的测试代码。 + +# if __name__ == '__main__': +# HTMLTestRunner.main() + +------------------------------------------------------------------------ + +""" + +import os, re, sys, io, time, datetime, platform, json, unittest, logging, shutil +from xml.sax import saxutils + + +class OutputRedirector(object): + """ + 重定向stdout和stderr以便于在测试过程中捕获输出 + """ + + def __init__(self, fp): + self.fp = fp + + def write(self, s): + self.fp.write(s) + + def writelines(self, lines): + self.fp.writelines(lines) + + def flush(self): + self.fp.flush() + + +stdout_redirector = OutputRedirector(sys.stdout) +stderr_redirector = OutputRedirector(sys.stderr) + + +class ReportTemplate: + """ + 报告模板 + """ + STATUS = { + 0: '通过', + 1: '失败', + 2: '错误', + } + DEFAULT_TITLE = '自动化测试报告' + DEFAULT_DESCRIPTION = '暂无描述' + # ------------------------------------------------------------------------ + # 网页模板 + # 变量列表 title, generator, styles, header, report, footer + HTML_TMPL = """ + + + + %(title)s + + + + + + + + + %(styles)s + + + + %(header)s + %(report)s + %(footer)s + + +""" + # ------------------------------------------------------------------------ + # 网页样式 + STYLES_TMPL = """ + + """ + # ------------------------------------------------------------------------ + # 报告标题、测试信息、报告描述、截图显示容器、图表容器 + # 变量列表 title, parameters, description, logo, sign + HEADER_TMPL = """ +
+
+
+ +
%(sign)s
+
+
+
+

%(title)s

+ %(parameters)s +

%(description)s

+
+
+
+
+ """ + # ------------------------------------------------------------------------ + # 测试信息 + # 变量列表 name, value + HEADER_ATTRIBUTE_TMPL = """ +

%(name)s : %(value)s

+ """ + # ------------------------------------------------------------------------ + # 报告模板 + # 变量列表 test_list, counts, passed, failed, errors ,passrate + REPORT_TMPL = """ +
+

+ 概要 %(passrate)s + 通过 %(passed)s + 失败 %(failed)s + 错误 %(errors)s + 全部 %(counts)s +

+
+ + + + + + + + + + + + + + + + + + + + + + %(test_list)s + + + + + + + + + +
测试用例说明总计通过失败错误耗时详细
总计%(counts)s%(passed)s%(failed)s%(errors)s%(time_usage)s通过:%(passrate)s
+ """ + # ------------------------------------------------------------------------ + # 变量列表 style, desc, counts, passed, failed, errors, cid + REPORT_CLASS_TMPL = """ + + %(name)s + %(docs)s + %(counts)s + %(passed)s + %(failed)s + %(errors)s + %(time_usage)s + 查看全部 + + """ + # ------------------------------------------------------------------------ + # 失败样式(有截图列) + # 变量列表 tid, Class, style, desc, status + REPORT_TEST_WITH_OUTPUT_TMPL_1 = """ + +
%(name)s
+ %(docs)s + + +
+
%(script)s
+
+ + +
截图信息
+
+ %(screenshot)s +
+ + + """ + # ------------------------------------------------------------------------ + # 失败样式(无截图列) + # 变量列表 tid, Class, style, desc, status + REPORT_TEST_WITH_OUTPUT_TMPL_0 = """ + +
%(name)s
+ %(docs)s + + +
+
%(script)s
+
+ + + + """ + # ------------------------------------------------------------------------ + # 通过样式 + # 变量列表 tid, Class, style, desc, status + REPORT_TEST_NO_OUTPUT_TMPL = """ + +
%(name)s
+ %(docs)s + + + + """ + # ------------------------------------------------------------------------ + # 测试输出内容 + REPORT_TEST_OUTPUT_TMPL = '%(id)s:' + "\n" + '%(output)s' + # ------------------------------------------------------------------------ + # 页面底部、返回顶部 + FOOTER_TMPL = """ + + + """ + # ------------------------------------------------------------------------ + + +def _findMark(mark='', data=''): + return re.findall('\\[' + mark + '](.*?)\\[/' + mark + ']' + '*?', data) + + +def _makeMark(mark='', cont=''): + return '[' + mark + ']' + cont + '[/' + mark + ']' + + +def _Color(fc=0, bc=0, bo=0, text=''): + b = 'PYCHARM_HOSTED' in os.environ.keys() + return "\033[" + str(bo) + ['', ';' + str(fc)][fc > 0] + ['', ';' + str(bc)][bc > 0] + "m" + text + "\033[0m" if b else text + + +class _TestResult(unittest.TestResult): + def __init__(self, verbosity=1, log='', success_log_in_report=False): + super().__init__(verbosity=verbosity) + self.verbosity = verbosity + self.log = log + self.success_log_in_report = success_log_in_report + + self.fh = None + self.lh = None + self.ch = None + + self.loggerStream = None + self.outputBuffer = None + + self.stdout0 = None + self.stderr0 = None + + self.passed_count = 0 + self.failed_count = 0 + self.errors_count = 0 + + self.stime = None + self.etime = None + + self.passrate = float(0) + + # 分类统计数量耗时 + self.casesort = {} + + # 增加失败用例合集 + self.failedCase = '' + self.failedCaseList = [] + + # 增加错误用例合集 + self.errorsCase = '' + self.errorsCaseList = [] + + self.logger = logging.getLogger('test') + + # result is a list of result in 4 tuple: + # 1. result code (0: passed; 1: failed; 2: errors), + # 2. testcase object, + # 3. test output (byte string), + # 4. stack trace. + self.result = [] + + def sortCount(self, cls, res, dur=float(0)): + """ + 分类统计 + """ + s = self.casesort + if cls not in s.keys(): + s[cls] = {'p': 0, 'f': 0, 'e': 0, 'd': 0} + if str(res).upper().startswith('P'): + s[cls]['p'] += 1 + if str(res).upper().startswith('F'): + s[cls]['f'] += 1 + if str(res).upper().startswith('E'): + s[cls]['e'] += 1 + s[cls]['d'] += dur + return True + + def startTest(self, test): + """ + 单条用例执行开始前的动作 + """ + if self.verbosity >= 2: + sys.stderr.write(_Color(fc=39, bc=4, bo=1, text='%-79s' % ('%04d @ Testing...' % (len(self.result) + 1))) + "\n") + # 开始测试 + super().startTest(test) + # 终端日志 + if self.verbosity >= 2: + self.ch = logging.StreamHandler(sys.stderr) + self.ch.setLevel(logging.DEBUG) + self.ch.setFormatter(logging.Formatter(fmt='{asctime} - {levelname[0]}: {message}', style='{')) + self.logger.addHandler(self.ch) + # 报告日志 + if True: + self.loggerStream = io.StringIO() + self.lh = logging.StreamHandler(self.loggerStream) + self.lh.setLevel(logging.DEBUG) + self.lh.setFormatter(logging.Formatter(fmt='{asctime} - {levelname[0]}: {message}', style='{')) + self.logger.addHandler(self.lh) + # 文件日志 + if self.log: + self.fh = logging.FileHandler(filename=self.log, mode='a+', encoding='utf-8') + self.fh.setLevel(logging.DEBUG) + self.fh.setFormatter(logging.Formatter(fmt='{asctime} - {levelname[0]}: {module}.{funcName}\t{message}', style='{')) + self.logger.addHandler(self.fh) + # 用例执行过程中的输出 + # Just one buffer for both stdout and stderr + self.outputBuffer = io.StringIO() + stdout_redirector.fp = self.outputBuffer + stderr_redirector.fp = self.outputBuffer + self.stdout0 = sys.stdout + self.stderr0 = sys.stderr + sys.stdout = stdout_redirector + sys.stderr = stderr_redirector + self.stime = round(time.time(), 3) + + def completeOutput(self): + """ + 单条用例执行结束后,添加结果前的动作; + 添加结果需要调用的方法; + 断开输出重定向; + 返回日志和输出; + """ + self.etime = round(time.time(), 3) + if self.stdout0: + sys.stdout = self.stdout0 + sys.stderr = self.stderr0 + self.stdout0 = None + self.stderr0 = None + return [ + self.loggerStream.getvalue(), + self.outputBuffer.getvalue() + ] + + def stopTest(self, test): + """ + 单条用例执行结束后,添加结果后的动作; + """ + super().stopTest(test) + # 移除日志Handler + for handler in [self.fh, self.lh, self.ch]: + if handler: + self.logger.removeHandler(handler) + + def singleEndConsolePrint(self, test, term_mark, term_head, term_clor, duration): + """ + 将用例执行结果打印到终端显示 + """ + self.verbosity >= 1 and sys.stderr.write('%s %s %s %s.%s.%-18s\t%s %s\n' % ( + _Color(fc=term_clor, bo=1, text='%04d' % (len(self.result)) + ' ' + term_mark + ' ' + term_head + ':'), + _Color(fc=37, bo=0, text=datetime.datetime.utcfromtimestamp(duration).strftime('%H:%M:%S.%f')[0:12]), + _Color(fc=37, bo=0, text='<='), + _Color(fc=37, bo=0, text=str(test.__module__).strip('_')), + _Color(fc=37, bo=0, text=str(test.__class__.__qualname__)), + _Color(fc=37, bo=0, text=str(test.__dict__['_testMethodName'])), + _Color(fc=37, bo=0, text='<='), + _Color(fc=37, bo=0, text=str(test.__dict__['_testMethodDoc'] and test.__dict__['_testMethodDoc'].strip().splitlines()[0].strip() or '')) + )) + + def addSuccess(self, test): + """ + 单条用例执行结束后添加成功结果动作 + """ + term_mark, term_head, term_clor = '=', 'Passed', 32 + self.passed_count += 1 + super().addSuccess(test) + output = self.completeOutput() + duration = round(self.etime - self.stime, 3) + self.sortCount(cls=test.__class__.__qualname__, res=term_head, dur=duration) + + # 添加测试结果 + self.result.append((0, test, ['', output[0]][bool(self.success_log_in_report)] + output[1], '', duration)) + + # 单条用例执行结束后在终端打印结果 + self.singleEndConsolePrint(test, term_mark, term_head, term_clor, duration) + + def addError(self, test, err): + """ + 单条用例执行结束后添加错误结果动作 + """ + term_mark, term_head, term_clor = '?', 'Errors', 33 + self.errors_count += 1 + super().addError(test, err) + _, _exc_str = self.errors[-1] + output = self.completeOutput() + duration = round(self.etime - self.stime, 3) + self.sortCount(cls=test.__class__.__qualname__, res=term_head, dur=duration) + + # 添加测试结果 + self.result.append((2, test, output[0] + output[1], _exc_str, duration)) + + # 用例执行结束后在终端打印结果 + self.singleEndConsolePrint(test, term_mark, term_head, term_clor, duration) + + # 收集错误测试用例名称以在测试报告中显示 + testcase_name = '%s.%s.%s' % (str(test.__module__).strip('_'), test.__class__.__qualname__, test.__dict__['_testMethodName']) + self.errorsCase += '
  • %s
  • ' % testcase_name + self.errorsCaseList.append(testcase_name) + + def addFailure(self, test, err): + """ + 单条用例执行结束后添加失败结果动作 + """ + term_mark, term_head, term_clor = '!', 'Failed', 31 + self.failed_count += 1 + super().addFailure(test, err) + _, _exc_str = self.failures[-1] + output = self.completeOutput() + duration = round(self.etime - self.stime, 3) + self.sortCount(cls=test.__class__.__qualname__, res=term_head, dur=duration) + + # 添加测试结果 + self.result.append((1, test, output[0] + output[1], _exc_str, duration)) + + # 用例执行结束后在终端打印结果 + self.singleEndConsolePrint(test, term_mark, term_head, term_clor, duration) + + # 收集失败测试用例名称以在测试报告中显示 + testcase_name = '%s.%s.%s' % (str(test.__module__).strip('_'), test.__class__.__qualname__, test.__dict__['_testMethodName']) + self.failedCase += '
  • %s
  • ' % testcase_name + self.failedCaseList.append(testcase_name) + + +class HTMLTestRunner(ReportTemplate): + def __init__( + self, + stream=None, + verbosity: int = 1, + title: str = None, + description: str = None, + success_log_in_report=False, + report_home: str = None, + report_home_latest_name: str = None, + report: str = None, + log: str = None, + logo='', + sign='' + ): + """ + This is HTMLTestRunner. + :param stream: HTML report write file stream. + :param verbosity: + 0: Terminal only shows the test results summary. + 1: Terminal shows the results summary and results of each case test. + 2: Terminal shows the results summary and results of each case test, and print log. + :param title: Title of report. + :param description: Description of report. + :param success_log_in_report: The testcase passed is also displayed in the HTML report. + :param report_home: + Set home directory of the report for this test, + all the results related files will be storage inside(including HTML report, log, images), + at this time, + other related parameters will fail. + :param report_home_latest_name: Create directory on the path where the report directory is located. + :param report: HTML report file location(useless when stream parameter inset). + :param log: Log file. + :param logo: Logo of report, pass an image URL. + :param sign: Sign of report, Pass in one piece short text. + """ + if report_home: + try: + os.mkdir(report_home) + except Exception: + raise Exception('Report directory already exists.') + stream = report = log = None + report, log = '%s/index.html' % report_home, '%s/HTMLTestRunner.log' % report_home + self.report_home = report_home + self.result_associate_files = [] + self.verbosity = verbosity + self.success_log_in_report = success_log_in_report + self.report_home_latest_name = report_home_latest_name + self.stream = stream or (report and open(os.path.abspath(report), 'wb')) + self.report = None + if self.stream: + stream_name = os.path.abspath(self.stream.name) + self.result_associate_files.append(stream_name) + os.environ['HTML_REPORT_ROOT'] = os.path.dirname(stream_name) + self.report = stream_name + else: + os.environ['HTML_REPORT_ROOT'] = '' + self.log = log + self.log_location = log and os.path.abspath(log) + self.log_location and self.result_associate_files.append(self.log_location) + self.log_location and open(self.log_location, 'wb').close() + self.title = self.DEFAULT_TITLE if title is None else title + self.description = self.DEFAULT_DESCRIPTION if description is None else description + self.logo = logo or '' + self.sign = sign or '' + + self.passrate = None + self.errormsg = None + + self.runstime = None + self.runetime = None + + def run(self, test): + sys.stderr.write(_Color(fc=38, bo=1, text='* * * * * * * * * * * * * * * * * * 开始测试 * * * * * * * * * * * * * * * * * *') + '\n') + self.verbosity >= 2 and sys.stderr.write("\n") + self.runstime = round(time.time(), 3) + + # 获取测试结果 + result = _TestResult(verbosity=self.verbosity, log=self.log_location, success_log_in_report=self.success_log_in_report) + test(result) + + self.runetime = round(time.time(), 3) + self.verbosity >= 2 and sys.stderr.write("\n") + sys.stderr.write(_Color(fc=38, bo=1, text='* * * * * * * * * * * * * * * * * * 结束测试 * * * * * * * * * * * * * * * * * *') + '\n') + + # 生成测试报告 + self.generateReport(test, result) + if len(result.result) == 0: + raise Exception('No testcases found.') + case_count = { + 't': len(result.result), + 'p': result.passed_count, + 'f': result.failed_count, + 'e': result.errors_count + } + dura = time.strftime('%H:%M:%S', time.gmtime((self.runetime - self.runstime))) + rate_passed = str("%.2f%%" % (float(case_count['p'] / case_count['t'] * 100))) + rate_failed = str("%.2f%%" % (float(case_count['f'] / case_count['t'] * 100))) + rate_errors = str("%.2f%%" % (float(case_count['e'] / case_count['t'] * 100))) + list_failed = result.failedCaseList + list_errors = result.errorsCaseList + casesort = result.casesort + # 终端打印结果 + sys.stderr.write(_Color(fc=36, bo=1, text='结果概要') + "\n") + sys.stderr.write( + _Color(fc=37, bo=0, text='总共' + "\x20" + str(case_count["t"]) + "\x20") + + _Color(fc=37, bo=0, text='通过' + "\x20" + str(case_count["p"]) + "\x20") + + _Color(fc=37, bo=0, text='失败' + "\x20" + str(case_count["f"]) + "\x20") + + _Color(fc=37, bo=0, text='错误' + "\x20" + str(case_count["e"]) + "\x20") + + _Color(fc=37, bo=0, text='耗时' + "\x20" + dura + "\x20") + + _Color(fc=37, bo=0, text='通过率' + "\x20" + rate_passed + "\x20") + + _Color(fc=37, bo=0, text='失败率' + "\x20" + rate_failed + "\x20") + + _Color(fc=37, bo=0, text='错误率' + "\x20" + rate_errors + "\x20") + + "\n" + ) + for value in [['失败用例', list_failed, 31], ['错误用例', list_errors, 33]]: + if len(value[1]): + sys.stderr.write(_Color(fc=value[2], bo=1, text=value[0]) + "\n") + for i in range(len(value[1])): + sys.stderr.write(_Color(fc=37, bo=0, text=str(i + 1) + '. ' + value[1][i]) + "\n") + if len(casesort.keys()): + sys.stderr.write( + _Color(fc=34, bo=1, text='%-22s' % ('用例集合') + "\t" + + '%-4s' % ('总计') + "\t" + + '%-4s' % ('通过') + "\t" + + '%-4s' % ('失败') + "\t" + + '%-4s' % ('错误') + "\t" + + '%-8s' % ('耗时') + "\t") + "\n") + for key, value in casesort.items(): + sys.stderr.write( + _Color(fc=37, bo=0, text='%-22s' % (str(key)) + "\t" + + '%-4s' % (str(value["p"] + value["f"] + value["e"])) + "\t" + + '%-4s' % (str(value["p"])) + "\t" + + '%-4s' % (str(value["f"])) + "\t" + + '%-4s' % (str(value["e"])) + "\t" + + '%-8s' % ('%.3f' % (round(value["d"], 3)) + '秒') + "\t") + "\n") + return result + + @staticmethod + def sortResult(result_list): + rmap = {} + classes = [] + for n, t, o, e, s in result_list: + cls = t.__class__ + if cls not in rmap: + rmap[cls] = [] + classes.append(cls) + rmap[cls].append((n, t, o, e, s)) + return [(cls, rmap[cls]) for cls in classes] + + def getReportAttributes(self, result): + runstime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.runstime)) + duration = time.strftime('%H:%M:%S', time.gmtime((self.runetime - self.runstime))) + status = ','.join([ + '总共 %s' % (result.passed_count + result.failed_count + result.errors_count), + '通过 %s' % (result.passed_count), + '失败 %s' % (result.failed_count), + '错误 %s' % (result.errors_count) + ]) + if (result.passed_count + result.failed_count + result.errors_count) > 0: + self.passrate = str("%.2f%%" % (float(result.passed_count) / float( + result.passed_count + result.failed_count + result.errors_count) * 100)) + else: + self.passrate = '0.00%' + + if len(result.failedCase) > 0: + failedCase = result.failedCase + else: + failedCase = '无' + + if len(result.errorsCase) > 0: + errorsCase = result.errorsCase + else: + errorsCase = '无' + + uname = platform.uname() + return [ + ['开始时间', runstime], + ['合计耗时', duration], + ['主机名称', '%s (%s %s) %s' % (uname.node, uname.system, uname.release, uname.machine.lower())], + ['测试结果', '%s,通过率%s' % (status, self.passrate)], + ['失败用例', failedCase], + ['错误用例', errorsCase], + ] + + def generateReport(self, test, result): + report_attrs = self.getReportAttributes(result) + report_count = self._generate_report(result) + output = self.HTML_TMPL % dict( + title=saxutils.escape(self.title), + generator='HTMLTestRunner', + styles=self._generate_styles(), + passed=report_count['passed'], + failed=report_count['failed'], + errors=report_count['errors'], + casesets=report_count['casesets'], + casesets_passed=report_count['casesets_passed'], + casesets_failed=report_count['casesets_failed'], + casesets_errors=report_count['casesets_errors'], + header=self._generate_header(report_attrs), + report=report_count['report'], + footer=self._generate_footer(), + ) + # 写入报告文件 + self.stream and self.stream.write(output.encode('utf-8')) + self.stream and self.stream.close() + if self.report_home and self.report_home_latest_name: + latest = '%s/%s' % (os.path.dirname(self.report_home), self.report_home_latest_name) + os.path.exists(latest) and shutil.rmtree(latest) + shutil.copytree(self.report_home, latest) + if self.report_home: + open('%s/latest.ini' % os.path.dirname(self.report_home), mode='w').write(os.path.basename(self.report_home)) + + def _generate_styles(self): + return self.STYLES_TMPL + + def _generate_header(self, report_attrs): + line_list = [] + for name, value in report_attrs: + match name: + case '失败用例': + if value == "无": + line = self.HEADER_ATTRIBUTE_TMPL % dict(name=name, value=value) + else: + line = self.HEADER_ATTRIBUTE_TMPL % dict( + name=name, + value="点击查看" + "
      " + value + "
    " + ) + case '错误用例': + if value == "无": + line = self.HEADER_ATTRIBUTE_TMPL % dict(name=name, value=value) + else: + line = self.HEADER_ATTRIBUTE_TMPL % dict( + name=name, + value="点击查看" + "
      " + value + "
    " + ) + case _: + line = self.HEADER_ATTRIBUTE_TMPL % dict(name=saxutils.escape(str(name)), value=saxutils.escape(str(value))) + line_list.append(line) + return self.HEADER_TMPL % dict( + rail_hidden='none' if not self.logo and not self.sign else 'flex', + logo=self.logo, + sign=self.sign, + title=saxutils.escape(self.title), + parameters=''.join(line_list), + description=saxutils.escape(self.description) + ) + + def _generate_report(self, result): + rows = [] + sortedResult = self.sortResult(result.result) + dura_caseset = 0 + for cid, (cls, cls_results) in enumerate(sortedResult): + np = nf = ne = ns = 0 + for n, t, o, e, s in cls_results: + # 遍历每条用例 + match n: + case 0: + np += 1 + case 1: + nf += 1 + case 2: + ne += 1 + ns += s + # 单个用例集合耗时 + ns = round(ns, 3) + dura_caseset += ns + row = self.REPORT_CLASS_TMPL % dict( + style=ne > 0 and 'errorsClass' or nf > 0 and 'failedClass' or 'passedClass', + name=cls.__qualname__, + docs=cls.__doc__ and cls.__doc__.strip().splitlines()[0].strip() or '', + counts=np + nf + ne, + passed=np, + failed=nf, + errors=ne, + cid='c%s' % (cid + 1), + time_usage='%.3f秒' % ns + ) + rows.append(row) + for tid, (n, t, o, e, s) in enumerate(cls_results): + rows.append(self._generate_report_test(rows, cid, tid, n, t, o, e)) + # 全部用例集合耗时 + dura_caseset = round(dura_caseset, 3) + report = self.REPORT_TMPL % dict( + test_list=''.join(rows), + counts=str(result.passed_count + result.failed_count + result.errors_count), + passed=str(result.passed_count), + failed=str(result.failed_count), + errors=str(result.errors_count), + time_usage='%.3f秒' % dura_caseset, + passrate=self.passrate + ) + casesets = list(result.casesort.keys()) + casesets_passed = [] + for value in casesets: + casesets_passed.append(result.casesort[value]["p"]) + casesets_failed = [] + for value in casesets: + casesets_failed.append(result.casesort[value]["f"]) + casesets_errors = [] + for value in casesets: + casesets_errors.append(result.casesort[value]["e"]) + return { + "report": report, + "passed": str(result.passed_count), + "failed": str(result.failed_count), + "errors": str(result.errors_count), + "casesets": json.dumps(casesets, ensure_ascii=False), + "casesets_passed": json.dumps(casesets_passed, ensure_ascii=False), + "casesets_failed": json.dumps(casesets_failed, ensure_ascii=False), + "casesets_errors": json.dumps(casesets_errors, ensure_ascii=False) + } + + def _generate_report_test(self, rows, cid, tid, n, t, o, e): + hasout = bool(o or e) + match n: + case 0: + tid_flag = 'p' + pre_clor = '#119611' + case 1: + tid_flag = 'f' + pre_clor = '#e52000' + case 2: + tid_flag = 'e' + pre_clor = '#e54f00' + case _: + tid_flag = 'u' + pre_clor = '#808080' + # ID修改点为下划线; + # 支持Bootstrap折叠展开特效; + # 例如:'pt1_1', 'ft1_1', 'et1_1' + tid = tid_flag + 't%s_%s' % (cid + 1, tid + 1) + name = t.id().split('.')[-1] + docs = t.shortDescription() or '' + # o and e should be byte string because they are collected from stdout and stderr. + if isinstance(o, str): + # some problem with 'string_escape': it escape \n and mess up formating + # uo = unicode(o.encode('string_escape')) + # uo = o.decode('latin-1') + uo = o + else: + uo = o + if isinstance(e, str): + # some problem with 'string_escape': it escape \n and mess up formating + # ue = unicode(e.encode('string_escape')) + # ue = e.decode('latin-1') + ue = e + else: + ue = e + script = self.REPORT_TEST_OUTPUT_TMPL % dict(id=tid, output=re.compile('\\[[A-Za-z\\-]+].*?\\[/[A-Za-z\\-]+][\r\n]').sub( + '', saxutils.escape(uo + ue))) + + # 截图名称通过抛出异常在 + # 标准错误当中, + # 判断是否包含截图信息 + # 实际检测输出当中是否包含report-screenshot关键字; + output = uo + ue + self.errormsg = output.find('report-screenshot') + if self.errormsg == -1: + # 没有截图信息 + template = hasout and self.REPORT_TEST_WITH_OUTPUT_TMPL_0 or self.REPORT_TEST_NO_OUTPUT_TMPL + row = template % dict( + tid=tid, + Class=n == 0 and 'hiddenRow' or 'none', + style=n == 2 and 'errorsCase' or (n == 1 and 'failedCase' or 'passedCase'), + name=name, + docs=docs, + script=script, + status=self.STATUS[n], + pre_color=pre_clor + ) + else: + # 包含截图信息 + template = hasout and self.REPORT_TEST_WITH_OUTPUT_TMPL_1 or self.REPORT_TEST_NO_OUTPUT_TMPL + screenshot_list = _findMark(mark='report-screenshot', data=output) + screenshot = '' + for image in screenshot_list: + bn = os.path.basename(image) + abs_image = os.path.abspath(image) + self.result_associate_files.append(abs_image) + # 移动图片位置到报告所在目录 + try: + self.report and shutil.move(abs_image, '%s/%s' % (os.path.dirname(self.report), bn)) + except Exception: + pass + screenshot += '' + bn + '
    ' + row = template % dict( + tid=tid, + Class=n == 0 and 'hiddenRow' or 'none', + style=n == 2 and 'errorsCase' or (n == 1 and 'failedCase' or 'passedCase'), + name=name, + docs=docs, + script=script, + status=self.STATUS[n], + pre_color=pre_clor, + screenshot=screenshot + ) + return row + + def _generate_footer(self): + return self.FOOTER_TMPL diff --git a/Library/Http.py b/Library/Http.py new file mode 100644 index 0000000..5cd7a93 --- /dev/null +++ b/Library/Http.py @@ -0,0 +1,104 @@ +import requests +import requests.utils + + +class HTTPResponseObject(dict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.url = None + self.status = None + self.reason = None + self.header = None + self.cookie = None + self.binary = None + self.text, self.json = None, None + self.encoding, self.duration = None, None + + def __setattr__(self, key, value): + pass + + def __getitem__(self, item): + try: + return super().__getitem__(item) + except Exception: + return None + + def __getattr__(self, item): + try: + return super().__getitem__(item) + except Exception: + return None + + +class HTTPRequest: + __http__ = requests + __accept_method__ = ('GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 'PATCH') + + def http(self, method='GET', url: str = None, query=None, data=None, json=None, file=None, header=None, cookie=None, + ua: str = None, auth=None, timeout: int | float = None, proxy: str = None, auto_redirect=False, ignore_cert_error=False): + method = method.upper() + if method not in self.__accept_method__: + raise Exception('Unsupported request method.') + if method == 'HEAD': + auto_redirect = False + if ua: + if not isinstance(header, dict): + header = {} + header['User-Agent'] = ua + if isinstance(proxy, str) and len(proxy) >= 5: + proxy = {'http': proxy, 'https': proxy} + else: + proxy = None + try: + response = self.__http__.request( + method=method, + url=url, + params=query, + data=data, + headers=header, + cookies=cookie, + files=file, + auth=auth, + timeout=timeout, + allow_redirects=auto_redirect, + proxies=proxy, + verify=not ignore_cert_error, + json=json + ) + except Exception: + response = None + try: + response_json = response.json() + except Exception: + response_json = None + if response: + return HTTPResponseObject({ + 'url': response.url, + 'status': response.status_code, + 'reason': response.reason, + 'header': response.headers, + 'cookie': requests.utils.dict_from_cookiejar(response.cookies), + 'binary': response.content, + 'text': response.text, + 'json': response_json, + 'encoding': response.encoding, + 'duration': round(response.elapsed.microseconds / 1000, 1) + }) + else: + return HTTPResponseObject({ + 'status': -1 + }) + + +class HTTPSession(HTTPRequest): + __http__ = requests.session() + + +if __name__ == '__main__': + # e.g. Send a get request. + res = HTTPSession().http( + method='GET', url='http://pv.sohu.com/cityjson', query={'ie': 'utf-8'}, + ua='Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36', + auto_redirect=True + ) + print(res) diff --git a/Library/Json.py b/Library/Json.py new file mode 100644 index 0000000..f2953c7 --- /dev/null +++ b/Library/Json.py @@ -0,0 +1,24 @@ +import json + + +def json_encode(data, indent=None, unicode=True): + return json.dumps(data, indent=indent, ensure_ascii=unicode) + + +def json_decode(data): + return json.loads(data) + + +if __name__ == '__main__': + # e.g. The default does not indent, and the string will be coded by the unicode. + print(json_encode({"id": 101, "name": "汤姆", "friends": ["托尼", "杰森"]})) + # e.g. Indent 4 spaces. + print(json_encode({"id": 101, "name": "汤姆", "friends": ["托尼", "杰森"]}, indent=4)) + # e.g. Indent 4 spaces and no coded by the unicode. + print(json_encode({"id": 101, "name": "汤姆", "friends": ["托尼", "杰森"]}, indent=4, unicode=False)) + # e.g. Decode JSON data. + print(json_decode( + ''' + {"id": 101, "name": "汤姆", "friends": ["托尼", "杰森"]} + ''' + )) diff --git a/Library/LoadConfig.py b/Library/LoadConfig.py new file mode 100644 index 0000000..4567587 --- /dev/null +++ b/Library/LoadConfig.py @@ -0,0 +1,48 @@ +from Library.Json import json_decode +from Library.Yaml import yaml_decode +import os + + +def _home_of_conf(): + home = '%s/%s' % (os.path.dirname(os.path.dirname(__file__)), 'Config') + if os.path.exists(home): + return home + else: + raise FileNotFoundError('Directory "%s" does not exist.' % home) + +class TestingConfig(dict): + """ + Load the configuration file from the configuration directory. + """ + def __init__(self, name: str): + home = _home_of_conf() + json_extension = '.json' + yaml_extension = '.yaml' + pa_json = '%s/%s%s' % (home, name, json_extension) + pa_yaml = '%s/%s%s' % (home, name, yaml_extension) + data = None + if os.path.exists(pa_json): + file = pa_json + data = json_decode(open(file, encoding='utf-8').read()) + if os.path.exists(pa_yaml): + file = pa_yaml + data = yaml_decode(open(file, encoding='utf-8').read()) + if data is not None: + super().__init__(data) + else: + raise FileNotFoundError('No json or yaml configuration file "%s" in config directory.' % name) + + def __setattr__(self, key, value): + raise AttributeError('Attribute is read only.') + + def __getattr__(self, item): + try: + return super().__getitem__(item) + except KeyError: + return None + + def __setitem__(self, key, value): + return self.__setattr__(key, value) + + def __getitem__(self, item): + return self.__getattr__(item) diff --git a/Library/LoadData.py b/Library/LoadData.py new file mode 100644 index 0000000..96421a4 --- /dev/null +++ b/Library/LoadData.py @@ -0,0 +1,44 @@ +from Library.Json import json_decode +from Library.Yaml import yaml_decode +import csv +import os + + +def _home_of_data(): + home = '%s/%s' % (os.path.dirname(os.path.dirname(__file__)), 'Data') + if os.path.exists(home): + return home + else: + raise FileNotFoundError('Directory "%s" does not exist.' % home) + +def _checking_data_filename(_s: str): + if _s.replace("\\", '/').find('/') != -1: + raise NameError('The data filename "%s" is illegal in data directory.' % _s) + +def loadDataFromJson(filename: str): + _checking_data_filename(filename) + return json_decode(open('%s/%s' % (_home_of_data(), filename), encoding='utf-8').read()) + +def loadDataFromYaml(filename: str): + _checking_data_filename(filename) + return yaml_decode(open('%s/%s' % (_home_of_data(), filename), encoding='utf-8').read()) + +def loadCsv(filename: str): + _checking_data_filename(filename) + file = '%s/%s' % (_home_of_data(), filename) + try: + open(file).read(4096) + encoding = None + except UnicodeDecodeError: + encoding = 'utf-8' + return [row for row in csv.reader(open(file, encoding=encoding))] + +def loadTxt(filename: str): + _checking_data_filename(filename) + file = '%s/%s' % (_home_of_data(), filename) + try: + open(file).read(4096) + encoding = None + except UnicodeDecodeError: + encoding = 'utf-8' + return [row.rstrip("\n") for row in open(file, encoding=encoding).readlines()] diff --git a/Library/Logger.py b/Library/Logger.py new file mode 100644 index 0000000..c8d66c3 --- /dev/null +++ b/Library/Logger.py @@ -0,0 +1,15 @@ +import logging + +class TestingLogger: + """ + Testing logger. + """ + def __init__(self, name: str = 'test'): + self._logger = logging.getLogger(name) + self._logger.setLevel(logging.DEBUG) + self.d = self._logger.debug + self.i = self._logger.info + self.w = self._logger.warning + self.e = self._logger.error + self.f = self._logger.fatal + self.c = self._logger.critical diff --git a/Library/ProjectHome.py b/Library/ProjectHome.py new file mode 100644 index 0000000..15d53ad --- /dev/null +++ b/Library/ProjectHome.py @@ -0,0 +1,12 @@ +import os + +def project_home(): + path = os.environ.get('PYTHONPATH') + if not path: + raise Exception('No environment variables of PYTHONPATH.') + here = os.path.dirname(__file__).replace("\\", '/') + for j in [i.replace("\\", '/').strip() for i in path.split(os.pathsep)]: + if here.startswith(j): + return j + else: + raise Exception('Project directory is not in PYTHONPATH.') diff --git a/Library/Report.py b/Library/Report.py new file mode 100644 index 0000000..57615c8 --- /dev/null +++ b/Library/Report.py @@ -0,0 +1,6 @@ +from Library.LoadConfig import TestingConfig +from Library.Time import * + + +def generate_report_name(): + return fmt_time(TestingConfig('default').report_format or '%Y-%m-%d-%H-%M-%S') diff --git a/Library/Smtp.py b/Library/Smtp.py new file mode 100644 index 0000000..f143311 --- /dev/null +++ b/Library/Smtp.py @@ -0,0 +1,247 @@ +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.mime.base import MIMEBase +from email.header import Header +from email import encoders +import smtplib +import sys +import os +import re + + +class SMTPMailerSendState(dict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.code = self.message = None + + def __setattr__(self, key, value): + pass + + def __getitem__(self, item): + try: + return super().__getitem__(item) + except Exception: + return None + + def __getattr__(self, item): + try: + return super().__getitem__(item) + except Exception: + return None + + +class SMTPMailer: + __host__ = { + 'host': '', + 'port': 25 + } + __user__ = { + 'user': '', + 'pass': '' + } + __mail__ = { + 'from': [], + 'to': [], + 'cc': [], + 'bc': [], + 'attachs': [], + 'subject': '', + 'content': '' + } + __flag__ = 0 + __addr__ = 0 + __from__ = 0 + + def __init__( + self, + username='', + password='', + host='127.0.0.1', + port=25, + ssl_enable=False, + send_from='', + send_to=None, + send_cc=None, + send_bc=None, + subject='', + content='', + attachs=None + ): + self.ssl_enable = ssl_enable + self.host(host, port) + self.user(username, password) + send_from and self.send_from(send_from) + send_to and self.to(send_to) + send_cc and self.cc(send_cc) + send_bc and self.bc(send_bc) + subject and self.subject(subject) + content and self.content(content) + attachs and self.attachs(attachs) + + @staticmethod + def test_email(email: str, throw_exception=False): + test = re.fullmatch('^([a-zA-Z0-9_.\\-])+@(([a-zA-Z0-9\\-])+\\.)+([a-zA-Z0-9]{2,4})+$', email) is not None + if throw_exception and test is False: + raise Exception('Illegal email address: %s.' % email) + return test + + @staticmethod + def gen_header_of_mailer(e): + try: + return "\"%s\" <%s>" % (Header(e[0]).encode(), Header(e[1]).encode()) + except Exception: + return '' + + def parse_email_address(self, email: str): + if email.find('<') != -1 and email.find('>') != -1: + match_email = re.findall('^(.*?)<(.*?)>$', email) + if len(match_email) == 1: + addr = match_email[0][1].strip() + self.test_email(addr, throw_exception=True) + name = match_email[0][0].strip() or addr.split('@')[0].strip() + return [name, addr] + else: + raise Exception('The wrong email address format: %s.' % email) + else: + if len(email) != 0: + addr = email + self.test_email(addr, throw_exception=True) + name = email.split('@')[0].strip() + return [name, addr] + else: + raise Exception('Empty email address.') + + def parse_email_address_mult(self, data): + if not isinstance(data, (str, list, tuple)): + raise TypeError('Email address list must be string or list or tuple type.') + if isinstance(data, str): + data = [i.strip() for i in data.replace(',', ';').split(';')] + while '' in data: + data.remove('') + return [self.parse_email_address(email_format) for email_format in data] + + def enableSSL(self): + self.ssl_enable = True + return self + + def host(self, host: str, port: int): + self.__host__['host'] = host + self.__host__['port'] = port + return self + + def user(self, username, password): + self.__user__['user'] = username + self.__user__['pass'] = password + return self + + def send_from(self, e): + self.__mail__['from'] = self.parse_email_address(e) + self.__from__ += 1 + return self + + def to(self, receive_list): + self.__mail__['to'] = self.parse_email_address_mult(receive_list) + self.__addr__ += 1 + return self + + def cc(self, receive_list): + self.__mail__['cc'] = self.parse_email_address_mult(receive_list) + self.__addr__ += 1 + return self + + def bc(self, receive_list): + self.__mail__['bc'] = self.parse_email_address_mult(receive_list) + self.__addr__ += 1 + return self + + def subject(self, text: str): + self.__mail__['subject'] = text + return self + + def content(self, text: str): + self.__mail__['content'] = text + return self + + def attachs(self, attachs_path_list: str | list): + if isinstance(attachs_path_list, str): + attachs_path_list = [attachs_path_list] + for path in attachs_path_list: + if not os.path.exists(path): + raise FileNotFoundError('The attachment was not found.') + self.__mail__['attachs'] = attachs_path_list + return self + + def send(self, test_only=False): + if self.__flag__: + return SMTPMailerSendState({'code': 1, 'message': 'Mail has been sent.'}) + if self.__addr__ == 0: + raise Exception('No receiver.') + message = MIMEMultipart() + if self.__from__ == 0 and self.test_email(self.__user__['user']) and self.send_from(self.__user__['user']): + pass + message['From'] = self.gen_header_of_mailer(self.__mail__['from']) + to_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['to']]) + cc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['cc']]) + bc_header = ",\x20".join([self.gen_header_of_mailer(e) for e in self.__mail__['bc']]) + message['To'], message['Cc'], message['Bcc'] = to_header, cc_header, bc_header + message['Subject'] = Header(self.__mail__['subject']) + message.attach(MIMEText(self.__mail__['content'], 'html', 'utf-8')) + for file in self.__mail__['attachs']: + mime = MIMEBase('application', 'octet-stream') + mime.add_header('Content-Disposition', 'attachment', filename=os.path.basename(file)) + mime.set_payload(open(file, 'rb').read()) + encoders.encode_base64(mime) + message.attach(mime) + try: + smtp = [smtplib.SMTP, smtplib.SMTP_SSL][self.ssl_enable]( + self.__host__['host'], + self.__host__['port'] + ) + except Exception: + return SMTPMailerSendState({'code': 1, 'message': 'Connection failure.'}) + try: + smtp.login(self.__user__['user'], self.__user__['pass']) + except Exception: + return SMTPMailerSendState({'code': 2, 'message': 'Account authentication failed.'}) + try: + receivers = [] + for rg in [self.__mail__['to'], self.__mail__['cc'], self.__mail__['bc']]: + for rp in rg: + receivers.append(rp[1]) + if test_only: + print(message.as_string(), file=sys.stderr) + return None + smtp.sendmail(self.__user__['user'], receivers, message.as_string()) + smtp.quit() + self.__flag__ = 1 + return SMTPMailerSendState({'code': 0, 'message': 'Mail sent successfully.'}) + except Exception as error: + return SMTPMailerSendState({'code': 3, 'message': str(error)}) + + +if __name__ == '__main__': + # e.g. Send an email example. + print(SMTPMailer().enableSSL().host('smtp.qq.com', 465).user('admin@example.com', '******'). + subject('Email Subject').content('This is some content...').to('user@example.com').send()) + # e.g. Or you can send emails like this. + print(SMTPMailer( + username='admin@example.com', + password='******', + ssl_enable=True, + host='smtp.qq.com', + port=465, + # The recipient format can be "Nickname " or "Email", like this: "Mrs.Lisa " or "lisa@example.com" + # Multiple recipients can be separated by semicolon(;) or comma(,). + # Multiple recipients you can also pass a list. + send_from='Administrator ', + # Add recipients. + send_to='Mr.Tom , Mr.Jack ', + # Add Cc. + send_cc='Lindsay , Nora ', + # Add Bcc. + send_bc='frederica@example.com', + subject='Email Subject', + content='This is some content...', + # You can add attachments, you need to pass the path of a single or more attachment in the form of a list. + attachs=None + ).send()) diff --git a/Library/TestingBasic.py b/Library/TestingBasic.py new file mode 100644 index 0000000..286178c --- /dev/null +++ b/Library/TestingBasic.py @@ -0,0 +1,9 @@ +from Library.Logger import TestingLogger +from Library.Driver import WebDriver +from Library.LoadConfig import * +from Library.LoadData import * +import unittest +import ddt + + +log = TestingLogger() diff --git a/Library/Time.py b/Library/Time.py new file mode 100644 index 0000000..7724a18 --- /dev/null +++ b/Library/Time.py @@ -0,0 +1,38 @@ +import time + + +def fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=None, utc=False): + return time.strftime(fmt, [time.localtime, time.gmtime][utc](ts)) + + +def par_time(fmt='', dt=''): + return time.mktime(time.strptime(dt, fmt)) + + +def asc_time(ts=None): + return time.asctime(time.localtime(ts)) + + +def int_time(): + return int(time.time()) + + +def mic_time(): + return round(time.time(), 6) + + +if __name__ == '__main__': + # e.g. Get the current formatting time. + print(fmt_time(fmt='%Y-%m-%d %H:%M:%S')) + # e.g. Get the formatting time and specify the timestamp. + print(fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=0)) + # e.g. Get UTC formatting time. + print(fmt_time(fmt='%Y-%m-%d %H:%M:%S', ts=0, utc=True)) + # e.g. Convert the formatting time to timestamp. + print(par_time(fmt='%Y-%m-%d %H:%M:%S', dt='2020-12-31 12:00:00')) + # e.g. Get ASC formatting time. + print(asc_time()) + # e.g. Get integer timestamp. + print(int_time()) + # e.g. Get accurate to microsecond timestamp. + print(mic_time()) diff --git a/Library/Yaml.py b/Library/Yaml.py new file mode 100644 index 0000000..4d91dba --- /dev/null +++ b/Library/Yaml.py @@ -0,0 +1,14 @@ +import yaml + + +def yaml_encode(data, indent=None, unicode=False): + return yaml.dump(data, indent=indent, allow_unicode=not unicode, sort_keys=False) + + +def yaml_decode(data): + return yaml.load(data, Loader=yaml.CFullLoader) + + +if __name__ == '__main__': + # e.g. Yaml encoding and decoding it. + print(yaml_decode(yaml_encode({"id": 101, "name": "Tom", "friends": ["Tony", "Jason"], "score": {"english": 92, "math": 61}}))) diff --git a/Po/Base/__init__.py b/Po/Base/__init__.py new file mode 100644 index 0000000..ad0ae2a --- /dev/null +++ b/Po/Base/__init__.py @@ -0,0 +1,16 @@ +from Library.Driver import * + + +class PageBase: + def __init__(self, d: WebDriver): + self.driver = d + + def home(self): + self.driver.open('https://www.fanscloud.net/') + + def reload(self): + self.driver.refresh() + + def quit(self): + self.driver.tab_cancel_all() + self.driver.quit() diff --git a/Po/Home/Signin/__init__.py b/Po/Home/Signin/__init__.py new file mode 100644 index 0000000..0407726 --- /dev/null +++ b/Po/Home/Signin/__init__.py @@ -0,0 +1,25 @@ +from Po.Home import * + + +class PageSignin(PageHome): + box_user = [By.XPATH, '//input[@type="text" and @id="user"]'] + box_pass = [By.XPATH, '//input[@type="password" and @id="pw"]'] + box_persist = [By.XPATH, '//input[@type="checkbox" and @id="persist"]'] + btn_submit = [By.XPATH, '//button[@type="submit"]'] + + def open(self): + self.home() + self.click_signin() + return self + + def input_user(self, value): + return self.driver.input(self.driver.find_element_by(self.box_user), value) + + def input_pass(self, value): + return self.driver.input(self.driver.find_element_by(self.box_pass), value) + + def click_persist(self): + return self.driver.click(self.driver.find_element_by(self.box_persist)) + + def click_submit(self): + return self.driver.click(self.driver.find_element_by(self.btn_submit)) diff --git a/Po/Home/Signup/__init__.py b/Po/Home/Signup/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Po/Home/__init__.py b/Po/Home/__init__.py new file mode 100644 index 0000000..b9dc8bc --- /dev/null +++ b/Po/Home/__init__.py @@ -0,0 +1,11 @@ +from Po.Base import * + +class PageHome(PageBase): + btn_signin = [By.LINK_TEXT, '登录'] + + def open(self): + self.home() + return self + + def click_signin(self): + return self.driver.click(self.driver.find_element_by(self.btn_signin)) diff --git a/Testcase/test_login.py b/Testcase/test_login.py new file mode 100644 index 0000000..9a3b1dc --- /dev/null +++ b/Testcase/test_login.py @@ -0,0 +1,30 @@ +from Common.Basic import * +from Po.Home.Signin import * + + +@ddt.ddt +class TestLoginSite(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.driver = WebDriver() + cls.page = PageSignin(cls.driver).open() + + @classmethod + def tearDownClass(cls) -> None: + cls.driver.quit() + + def tearDown(self) -> None: + self.page.reload() + + @ddt.data(*loadCsv('illegal_passwds.csv')) + @ddt.unpack + def test_signin_error_admin_pass(self, v1): + pass + self.page.input_user('admin') + self.page.input_pass(v1) + self.page.click_submit() + # Please assert in here...... + + +if __name__ == '__main__': + unittest.main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..74eb0f9 Binary files /dev/null and b/requirements.txt differ diff --git a/run_testcases_all.py b/run_testcases_all.py new file mode 100644 index 0000000..69ac2f6 --- /dev/null +++ b/run_testcases_all.py @@ -0,0 +1,11 @@ +from Library.HTMLTestRunner import HTMLTestRunner +from Library.ProjectHome import * +from Library.Report import * +from Library.Time import * +from unittest import TestSuite, TestLoader + + +if __name__ == '__main__': + testsuite = TestSuite() + testsuite.addTest(TestLoader().discover('%s/Testcase' % project_home())) + print(HTMLTestRunner(report_home='%s/Report/%s' % (project_home(), fmt_time(generate_report_name())), report_home_latest_name='latest').run(testsuite))