import time import unittest from typing import Any, Text, Dict, List from Business.Class.HTMLTestRunner import HTMLTestRunner from Business.Class.ExcelUtils import * from Business.Class.JsonOrYaml import * from Base.Class.Http import * from Base.Class.Time import * from Base.Class.Database import * from Base.Class.Logger import * def reparse(regexp, string): import re mut = isinstance(regexp, tuple) reg = [str(regexp), str(regexp[0])][mut] flg = (mut and eval('re.%s' % str(regexp[1]).upper())) or 0 if reg in ['', 'trim']: return ReText(string.strip()) else: return ReList(re.findall(reg, string, flg)) class ReText(str): def __getitem__(self, item): if isinstance(item, (int, bool)): return __class__(super().__getitem__(item)) else: return reparse(item, str(self.__str__())) class ReList(list): def __getitem__(self, item): if isinstance(item, (int, bool)): try: r = super().__getitem__(item) if isinstance(r, (list, tuple, set)): return __class__(r) elif isinstance(r, (dict,)): return ReDict(r) else: return ReText(r) except: return __class__() else: try: return reparse(item, str(self.__str__())) except: return ReText('') def __str__(self): try: return ReText(self[0]) except: return ReText('') class ReDict(dict): def __getattr__(self, item): return self.__getitem__(item) def __getitem__(self, item): if item in self: r = super().__getitem__(item) if isinstance(r, (dict,)): return __class__(r) elif isinstance(r, (list, tuple, set)): return ReList(r) else: return ReText(r) else: try: return reparse(item, str(self.__str__())) except: return ReText('') def __str__(self): try: if '_' in self: return ReText(self.__getitem__('_')) else: return json_encode(self, indent=None, unicode=False) except: return ReText('') def update(self, *args, **kwargs): super().update(*args, **kwargs) return self class CustomAssert: @staticmethod def assertPreWith(string: str, sub: str): if not string.startswith(sub): raise AssertionError('%s does not starts with %s' % (string, sub)) @staticmethod def assertSufWith(string: str, sub: str): if not string.endswith(sub): raise AssertionError('%s does not ends with %s' % (string, sub)) class TestCase: _g = {} _l = {} def __init__(self, workbook: Text, testlves: List, cases: Text, mysql: Text, httpc: Text): tabs = { "HTTPConf": { "fixed": {}, "views": "A4:H*", "field": [ "Name", "Scheme", "Host", "DefaultHeader", "Timeout", "Session", "Proxies", "IgnoreSSLCertError" ] }, "DataBase": { "fixed": {}, "views": "J4:P*", "field": [ "Name", "Host", "Port", "User", "Password", "Database", "Charset" ] }, "TestCase": { "fixed": {}, "views": "A7:AH*", "field": [ "Flag", "PreExecRule", "PreExecCase", "CaseModule", "CaseId", "CaseTitle", "CaseDesc", "CaseLevel", "HTTPChannel", "HTTPMethod", "HTTPUri", "HTTPQuery", "HTTPRedirect", "HTTPHeader", "HTTPCookie", "HTTPParamType", "HTTPParamContent", "HTTPParamOfFile", "HTTPExtract", "HTTPAssertStatus", "HTTPAssertReason", "HTTPAssertHeader", "HTTPAssertCookie", "HTTPAssertBody", "HTTPAssertJson", "BaseChannel", "BaseSql", "BaseExtract", "BaseAssertData", "BaseAssertRows", "DataFileSet", "TestAuthor", "TestTime", "TestResult" ] } } import os from pandas.core.frame import DataFrame self.init = Excel().open(workbook) data = {} for k, v in tabs.items(): match k: case 'TestCase': sheet = cases case 'DataBase': sheet = mysql case 'HTTPConf': sheet = httpc case _: continue self.init.select(sheet) data[k] = {} if 'fixed' in v.keys(): data[k]['fixed'] = {} for key, value in v['fixed'].items(): data[k]['fixed'][key] = self.init.cellGet(cell=value) if 'views' in v.keys() and 'field' in v.keys(): data[k]['views'] = read_view_dict( filename=workbook, sheet=sheet, area=v['views'], fields=v['field'], auto_truncate=True) data[k]['ocell'] = read_view_dict( filename=workbook, sheet=sheet, area=v['views'], fields=v['field'], auto_truncate=True, object_cell=True) self.case = cases self.dirs = os.path.dirname(workbook) self.request = Request() self.session = Session() self.h = DataFrame(data['HTTPConf']['views']['data']) self.b = DataFrame(data['DataBase']['views']['data']) self.c = DataFrame(data['TestCase']['views']['data']) self.h_ocell = DataFrame(data['HTTPConf']['ocell']['data']) self.b_ocell = DataFrame(data['DataBase']['ocell']['data']) self.c_ocell = DataFrame(data['TestCase']['ocell']['data']) self.h_excel_object = data['HTTPConf']['ocell']['excel_object'] self.b_excel_object = data['DataBase']['ocell']['excel_object'] self.c_excel_object = data['TestCase']['ocell']['excel_object'] self._set_levels(testlves) self.assert_unit = unittest.TestCase() self.assert_cust = CustomAssert() self.assert_list = [ {'sign': ' == ', 'method': self.assert_unit.assertEqual, 'reverse': 0, 'cast': 0}, {'sign': ' != ', 'method': self.assert_unit.assertNotEqual, 'reverse': 0, 'cast': 0}, {'sign': ' >= ', 'method': self.assert_unit.assertGreaterEqual, 'reverse': 0, 'cast': float}, {'sign': ' <= ', 'method': self.assert_unit.assertLessEqual, 'reverse': 0, 'cast': float}, {'sign': ' =~ ', 'method': self.assert_unit.assertIn, 'reverse': 1, 'cast': str}, {'sign': ' !~ ', 'method': self.assert_unit.assertNotIn, 'reverse': 1, 'cast': str}, {'sign': ' > ', 'method': self.assert_unit.assertGreater, 'reverse': 0, 'cast': float}, {'sign': ' < ', 'method': self.assert_unit.assertLess, 'reverse': 0, 'cast': float}, {'sign': ' not in ', 'method': self.assert_unit.assertNotIn, 'reverse': 0, 'cast': 0}, {'sign': ' in ', 'method': self.assert_unit.assertIn, 'reverse': 0, 'cast': 0}, {'sign': ' *= ', 'method': self.assert_cust.assertPreWith, 'reverse': 0, 'cast': str}, {'sign': ' =* ', 'method': self.assert_cust.assertSufWith, 'reverse': 0, 'cast': str}, ] self.pre_executed_list = [] def _set_levels(self, level_list=None): if isinstance(level_list, (list, tuple, str)): self.leve_list = level_list else: self.leve_list = None @staticmethod def _match_case_type(value=None): match value: case '正常用例': return 10 case '前置用例': return 20 case _: return 50 @staticmethod def _match_prex_rule(value=None): match value: case '全局一次': return 10 case _: return 50 @staticmethod def _match_bool(value=None): return value in ['是', '开启', '打开', 'True', 'true', 'TRUE', True, 'Yes', 'yes', 'YES', 'Y', 'y', '1', 1, 1.0] def _match_leve_run(self, value=None): if self.leve_list is None: return True return value in self.leve_list def _merge_dict(self, dict_ori, dict_add): if isinstance(dict_ori, dict) and isinstance(dict_add, dict): for k, v in dict_ori.items(): if k in dict_add.keys(): if isinstance(v, dict): dict_ori[k] = self._merge_dict(dict_ori[k], dict_add[k]) else: dict_ori[k] = dict_add[k] for k, v in dict_add.items(): if k not in dict_ori.keys(): dict_ori[k] = dict_add[k] return dict_ori @staticmethod def _parse_formula(text, operator): locals().__setitem__('quota', 0) none_quota = [] none_range = [] for i in range(len(text)): this = text[i] match this: case '\'': match locals().get('quota'): case 0: locals().__setitem__('quota', 1) case 1: locals().__setitem__('quota', 0) case '\"': match locals().get('quota'): case 0: locals().__setitem__('quota', 2) case 2: locals().__setitem__('quota', 0) case _: match locals().get('quota'): case 0: none_quota.append(i) for i in range(len(none_quota) + 1): last = (i > 0 and none_quota[i - 1]) or -1 this = (i < len(none_quota) and none_quota[i]) or 9999 if (this - last) > 1: none_range.append(last) none_range.append(this) none_range.pop(0) none_range.pop(-1) none_range_list = [] for i in range(int(len(none_range) / 2)): none_range_list.append([none_range[i * 2], none_range[i * 2 + 1]]) for v in none_range_list: p = text.find(operator, v[0], v[1] + 1) if not p == -1: return [text[:p], text[p + len(operator):]] else: return False def _sub_variable(self, text, vars_dict): if not isinstance(text, str): return text for variable_name in re.findall('\${(.*?)}', text): try: value = vars_dict[variable_name] except: value = '' if self._is_nan(value) or value is None: value = '' text = text.replace('${%s}' % variable_name, str(value)) return text def _sub_variable_auto(self, data): vars_dict = [self._g, self._l] if isinstance(vars_dict, list): d = {} for v in vars_dict: d.update(v) vars_dict = d if isinstance(data, list): for i in range(len(data)): if not isinstance(data[i], str): continue data[i] = self._sub_variable(text=data[i], vars_dict=vars_dict) return data if isinstance(data, dict): for k in data.keys(): if not isinstance(data[k], str): continue data[k] = self._sub_variable(text=data[k], vars_dict=vars_dict) return data if isinstance(data, str): return self._sub_variable(data, vars_dict=vars_dict) return data @staticmethod def _to_string(value): import math if value is None: return '' if isinstance(value, (float, int)) and str(value) == str(float('NaN')): return '' if isinstance(value, (float,)) and math.modf(value)[0] == 0: return str(int(value)) return str(value) @staticmethod def _to_number(value): if not value: return 0 if value is True: return 1 if isinstance(value, (float, int)) and str(value) == str(float('NaN')): return 0 return int(float(value)) @staticmethod def _is_nan(value): if isinstance(value, (float, int)) and str(value) == str(float('NaN')): return True else: return False def _extract_variable(self, extract, vars_dict: dict, domain: str | list | dict): if not extract: return None if not isinstance(extract, (list, dict)): extract = auto_decode(extract) if isinstance(domain, str): match domain: case 'locals': domain = self._l case 'global': domain = self._g vars_dict = {k if not isinstance(k, str) else k.lower(): v for k, v in vars_dict.items()} import re for k, v in (extract or {}).items(): d = re.findall('^([0-9A-Za-z_]+).*?', v)[0] locals().__setitem__('r', vars_dict[d.lower()]) domain[str(k)] = str(eval(v.replace(d, 'r', 1))) return True def _assert(self, assert_items, assert_lists): for assert_item in assert_items: expect = self._to_string(assert_item['expect']) if not expect: continue actual = assert_item['actual'] if isinstance(actual, (int, bool, str)): actual = ReText(actual) if isinstance(actual, (tuple, list, set)): actual = ReList(actual) if isinstance(actual, (dict,)): actual = ReDict(actual) assert_rows = 0 assert_real = 0 for name, value in { json_encode(None): None, json_encode(bool(0)): bool(0), json_encode(bool(1)): bool(1) }.items(): locals().__setitem__(name, value) for line in list(filter(lambda x: x, expect.split("\n"))): assert_rows += 1 for oper in assert_lists: form = self._parse_formula(re.compile('(\$)(?!{).*?').sub('actual', line), oper['sign']) if not form: continue for i in range(len(form)): form[i] = self._sub_variable_auto(form[i]).replace("\n", "\\n") assert_real += 1 oper['reverse'] and form.reverse() assert_meth = oper['method'] assert_parm = [eval(form[0]), eval(form[1])] for i in range(len(assert_parm)): try: if assert_parm[i]['_']: assert_parm[i] = str(assert_parm[i]) except TypeError as e: pass cast = oper['cast'] if cast: assert_parm[i] = cast(assert_parm[i]) assert_meth(assert_parm[0], assert_parm[1]) break if 1 <= assert_real != assert_rows: raise Exception('wrong assertion statement') if 0 == assert_real: expect = self._sub_variable_auto(expect) actual = str(actual) assert actual == expect, '%s != %s' % (actual, expect) def _test_unit(self, data, main=None): if data['HTTPUri']: if not data['HTTPChannel']: raise Exception('channel not set') from pandas.core.frame import DataFrame http = self.h.where((self.h['Name'] == data['HTTPChannel']), inplace=False).dropna(how='all').reset_index(drop=True, inplace=False).loc[0].to_dict() res_kwargs = { 'method': data['HTTPMethod'], 'url': self._sub_variable_auto((http['Scheme'] or 'http').lower() + '://' + (http['Host'] or '') + data['HTTPUri']), 'query': self._sub_variable_auto(auto_decode(data['HTTPQuery'])), 'data': None, 'json': None, 'file': None, 'header': {}, 'cookie': self._sub_variable_auto(auto_decode(data['HTTPCookie'])), 'auth': None, 'timeout': self._to_number(http['Timeout']) or 15, 'proxy': self._to_string(http['Proxies']), 'auto_redirect': self._match_bool(data['HTTPRedirect']), 'ignore_cert_error': self._match_bool(http['IgnoreSSLCertError']), 'debug': True } res_kwargs['header'].update((auto_decode(http['DefaultHeader']) or {})) res_kwargs['header'].update((self._sub_variable_auto(auto_decode(data['HTTPHeader'])) or {})) match str(data['HTTPParamType']).strip().title(): case 'Json': res_kwargs['json'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent'])) case 'Form': res_kwargs['data'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent'])) case _: res_kwargs['data'] = self._sub_variable_auto(data['HTTPParamContent']) res_kwargs['file'] = locals().setdefault('f', self._sub_variable_auto(auto_decode( data['HTTPParamOfFile']))) and {k: open(os.path.abspath(os.path.join(self.dirs, './%s' % v)), 'rb') for k, v in locals().get('f').items()} res = [self.request, self.session][self._match_bool(http['Session'])].http(**res_kwargs) log.d("\n" + res['brief']) hvar = { 'Status': ReText(res['status'] or ''), 'Reason': ReText(res['reason'] or ''), 'Header': ReDict(res['header'] or {}).update({'_': "\n".join([k + ': ' + (v or '') for k, v in dict(res['header'] or {}).items()])}), 'Cookie': ReDict(res['cookie'] or {}).update({'_': "; ".join([k + '=' + (v or '') for k, v in dict(res['cookie'] or {}).items()])}), 'Body': ReText(res['text'] or ''), 'Json': ReDict(res['json'] or {}), } self._extract_variable(extract=data['HTTPExtract'], vars_dict=hvar, domain='global') self._assert([ {'expect': data['HTTPAssertStatus'], 'actual': hvar['Status']}, {'expect': data['HTTPAssertReason'], 'actual': hvar['Reason']}, {'expect': data['HTTPAssertHeader'], 'actual': hvar['Header']}, {'expect': data['HTTPAssertCookie'], 'actual': hvar['Cookie']}, {'expect': data['HTTPAssertBody'], 'actual': hvar['Body']}, {'expect': data['HTTPAssertJson'], 'actual': hvar['Json']}, ], self.assert_list) if data['BaseSql']: if not data['BaseChannel']: raise Exception('channel not set') base = self.b.where((self.b['Name'] == data['BaseChannel']), inplace=False).dropna(how='all').reset_index(drop=True, inplace=False).loc[0].to_dict() conn = MySQL().connect( host=base['Host'], port=int(base['Port']), user=base['User'], password=base['Password'], database=base['Database'], charset=base['Charset'], cursor='Dict' ) conn.execute(data['BaseSql']) bvar = { 'Data': ReList(conn.fetchall() or []), 'Rows': conn.count() or 0 } log.d("\n" + conn.brief()) self._extract_variable(extract=data['BaseExtract'], vars_dict=bvar, domain='global') self._assert([ {'expect': data['BaseAssertData'], 'actual': bvar['Data']}, {'expect': data['BaseAssertRows'], 'actual': bvar['Rows']}, ], self.assert_list) def test(self, index, update_global=None, update_locals=None): if isinstance(update_global, (dict,)): self._g.update(update_global) if isinstance(update_locals, (dict,)): self._l.update(update_locals) def write_test_result(excel, object_cell, test_result): for cell, value in [[object_cell['TestResult'], test_result], [object_cell['TestTime'], time_strftime(fmt='%Y-%m-%d %H:%M:%S')]]: cell.value = value excel.save() main_case_object_cell = self.c_ocell.loc[index].to_dict() try: main_case = dict(self.c.loc[index].to_dict()) prex_list = list(filter(lambda x: x, [value.strip() for value in (main_case['PreExecCase'] or '').replace(',', "\n").split("\n")])) for case_id in prex_list: if self._match_prex_rule(main_case['PreExecRule']) == 10 and case_id in self.pre_executed_list: continue data = self.c.where((self.c['CaseId'] == case_id), inplace=False).dropna(how='all').reset_index(drop=True, inplace=False).loc[0].to_dict() if data['PreExecCase'] or data['PreExecRule']: raise Exception('Pre-exec case cannot be nested.') if data['DataFileSet']: raise Exception('Pre-exec case cannot use data files set.') self.pre_executed_list.append(case_id) self._test_unit(data=data) self._test_unit(data=main_case, main=True) self._l and self._l.clear() write_test_result(self.c_excel_object, main_case_object_cell, 'Passed') except AssertionError: self._l and self._l.clear() write_test_result(self.c_excel_object, main_case_object_cell, 'Failed') raise except Exception: self._l and self._l.clear() write_test_result(self.c_excel_object, main_case_object_cell, 'Errors') raise def main(self): from pandas import read_csv class ClassTestCase(unittest.TestCase): pass ClassTestCase.__qualname__ = self.case ClassTestCase._test_ = self.test serial = 0 for i in range(len(self.c)): data = self.c.loc[i].to_dict() if not (self._match_case_type(data['Flag']) == 10 and self._match_leve_run(data['CaseLevel'])): continue serial += 1 filename = data['DataFileSet'] or '' if filename: data_file = read_csv(os.path.abspath(os.path.join(self.dirs, './%s' % filename)), keep_default_na=False) data_file_rows = len(data_file) data_file_flag = 1 else: data_file = None data_file_rows = 0 data_file_flag = 0 j = 0 while j < data_file_rows or data_file_flag == 0: match data_file_flag: case 1: vars_data = data_file.loc[j].to_dict() func_docs = str(data['CaseTitle'] or '') + '_' + '_'.join([str(v) for k, v in vars_data.items()])[:32] func_name = 'test%s_%s_%s' % (str('%04d' % serial), str(data['CaseId']), str('%04d' % (j + 1))) once_do_break = 0 case _: vars_data = None func_docs = str(data['CaseTitle'] or '') func_name = 'test%s_%s' % (str('%04d' % serial), str(data['CaseId'])) once_do_break = 1 def func(self, index=i, update_global=None, update_locals=vars_data): return self._test_(index=index, update_global=update_global, update_locals=update_locals) func.__doc__ = func_docs type.__setattr__(ClassTestCase, func_name, func) j += 1 if once_do_break: break # if filename: # data_file = read_csv(os.path.join(self.dirs, './%s' % filename), keep_default_na=False) # for j in range(len(data_file)): # vars_data = data_file.loc[j].to_dict() # # def func(self, index=i, update_global=None, update_locals=vars_data): # return self._test_(index=index, update_global=update_global, update_locals=update_locals) # # func.__doc__ = (data['CaseTitle'] or '') + '_' + '_'.join([str(v) for k, v in vars_data.items()])[:32] # type.__setattr__(ClassTestCase, 'test%s_%s_%s' % (str('%04d' % serial), str(data['CaseId']), str('%04d' % (j+1))), func) # else: # def func(self, index=i, update_global=None, update_locals=None): # return self._test_(index=index, update_global=update_global, update_locals=update_locals) # # func.__doc__ = data['CaseTitle'] # type.__setattr__(ClassTestCase, 'test%s_%s' % (str('%04d' % serial), str(data['CaseId'])), func) return ClassTestCase if __name__ == '__main__': test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(testCaseClass=TestCase( workbook='D:/Desktop/接口自动化测试用例.xlsx', testlves=None, cases='测试用例', httpc='请求配置', mysql='请求配置' ).main(), prefix='test')) HTMLTestRunner(verbosity=5, report=os.path.abspath(os.path.join(os.path.dirname(__file__), './1.html'))).run( test_suite)