import sys import unittest from typing import Any, Text, Dict, List from Business.Class.HTMLTestRunner import HTMLTestRunner from Business.Class.ExcelUtils import * from Business.Class.JsonOrYaml import * from Base.Class.Http import * from Base.Class.Database import * from Base.Class.Logger import * def reparse(regexp, string): import re mut = isinstance(regexp, tuple) reg = [str(regexp), str(regexp[0])][mut] flg = [0, eval('re.%s' % str(regexp[1]).upper())][mut] if reg in ['', 'trim']: return ReText(string.strip()) else: return ReList(re.findall(reg, string, flg)) class ReText(str): def __getitem__(self, item): if isinstance(item, (int, bool)): return __class__(super().__getitem__(item)) else: return reparse(item, str(self.__str__())) class ReList(list): def __getitem__(self, item): if isinstance(item, (int, bool)): try: r = super().__getitem__(item) if isinstance(r, (list, tuple, set)): return __class__(r) elif isinstance(r, (dict,)): return ReDict(r) else: return ReText(r) except: return __class__() else: try: return reparse(item, str(self.__str__())) except: return ReText('') def __str__(self): try: return ReText(self[0]) except: return ReText('') class ReDict(dict): def __getattr__(self, item): return self.__getitem__(item) def __getitem__(self, item): if item in self: r = super().__getitem__(item) if isinstance(r, (dict,)): return __class__(r) elif isinstance(r, (list, tuple, set)): return ReList(r) else: return ReText(r) else: try: return reparse(item, str(self.__str__())) except: return ReText('') def __str__(self): try: if '_' in self: return ReText(self.__getitem__('_')) else: return json_encode(self, indent=None, unicode=False) except: return ReText('') def update(self, *args, **kwargs): super().update(*args, **kwargs) return self class TestCase: _g = {} _l = {} def __init__( self, workbook: Text, testlves: List, testcase: Text, testdata: Text, database: Text, httpconf: Text ): tabs = { "HTTPConf": { "fixed": {}, "views": "A4:C*", "field": [ "Name", "HostWithScheme", "DefaultHeader" ] }, "DataBase": { "fixed": {}, "views": "E4:L*", "field": [ "Name", "Type", "Host", "Port", "User", "Password", "Database", "Charset" ] }, "TestData": { "fixed": {}, "views": "A4:E*", "field": [ "Name", "Delimiter", "Data", "FileData", "VarNameList" ] }, "TestCase": { "fixed": { "ItemName": "B3", "SessMode": "E3" }, "views": "A7:AH*", "field": [ "Flag", "PreExecCase", "PreExecRule", "Module", "CaseId", "CaseTitle", "CaseDesc", "CaseLevel", "HTTPChannel", "HTTPMethod", "HTTPUri", "HTTPQuery", "HTTPRedirect", "HTTPHeader", "HTTPCookie", "HTTPParamType", "HTTPParamContent", "HTTPParamOfFile", "HTTPExtract", "HTTPAssertStatus", "HTTPAssertReason", "HTTPAssertHeader", "HTTPAssertCookie", "HTTPAssertBody", "HTTPAssertJson", "DatabaseChannel", "DatabaseSqlQuery", "DatabaseExtract", "DatabaseAssertData", "DatabaseAssertCount", "DataSet", "Author", "TestTime", "TestResult" ] } } import os from pandas.core.frame import DataFrame init = Excel().open(workbook, read_only=True) data = {} for k, v in tabs.items(): match k: case 'TestCase': sheet = testcase case 'TestData': sheet = testdata case 'DataBase': sheet = database case 'HTTPConf': sheet = httpconf case _: continue init.select(sheet) data[k] = {} if 'fixed' in v.keys(): data[k]['fixed'] = {} for key, value in v['fixed'].items(): data[k]['fixed'][key] = init.cellGet(cell=value) if 'views' in v.keys() and 'field' in v.keys(): data[k]['views'] = DataFrame(read_view_dict( filename=workbook, sheet=sheet, area=v['views'], fields=v['field'], auto_truncate=True)) else: init.exit() self.case = testcase self.http = Request self.dirs = os.path.dirname(workbook) self.i = data['TestCase']['fixed']['ItemName'] self.m = data['TestCase']['fixed']['SessMode'] self.h = data['HTTPConf']['views'] self.b = data['DataBase']['views'] self.d = data['TestData']['views'] self.c = data['TestCase']['views'] self._set_levels(testlves) self._set_request_mode(self.m) def _set_request_mode(self, value=None): self.http = [Request, Session][self._match_mode(value)]() def _set_levels(self, level_list=None): if isinstance(level_list, (list, tuple, str)): self.leve_list = level_list @staticmethod def _match_case_type(value=None): match value: case '正常用例': return 10 case '前置用例': return 20 case _: return 50 @staticmethod def _match_parm_type(value=None): match value: case 'Json': return 10 case 'Form': return 20 case _: return 50 @staticmethod def _match_bool(value=None): return value in ['是', '开启', '打开', 'True', 'true', 'TRUE', True, 'Yes', 'yes', 'YES', 'Y', 'y', '1', 1, 1.0] @staticmethod def _match_mode(value=None): return value in ['会话模式', 'Session', 'session'] def _match_leve_run(self, value=None): if self.leve_list is None: return True return value in self.leve_list def _merge_dict(self, dict_ori, dict_add): if isinstance(dict_ori, dict) and isinstance(dict_add, dict): for k, v in dict_ori.items(): if k in dict_add.keys(): if isinstance(v, dict): dict_ori[k] = self._merge_dict(dict_ori[k], dict_add[k]) else: dict_ori[k] = dict_add[k] for k, v in dict_add.items(): if k not in dict_ori.keys(): dict_ori[k] = dict_add[k] return dict_ori @staticmethod def _sub_variable(text='', vars_dict=None): if not isinstance(text, str): return text for variable_name in re.findall('\${(.*?)}', text): try: value = vars_dict[variable_name] except: value = '' text = text.replace('${%s}' % variable_name, str(value)) return text def _sub_variable_auto(self, data=None, vars_dict=None): if isinstance(vars_dict, list): d = {} for v in vars_dict: d.update(v) vars_dict = d if isinstance(data, list): for i in range(len(data)): if not isinstance(data[i], str): continue data[i] = self._sub_variable(text=data[i], vars_dict=vars_dict) return data if isinstance(data, dict): for k in data.keys(): if not isinstance(data[k], str): continue data[k] = self._sub_variable(text=data[k], vars_dict=vars_dict) return data if isinstance(data, str): return self._sub_variable(data, vars_dict=vars_dict) return data @staticmethod def _to_string(text): import math if text is None: return '' try: if math.isnan(text): return '' except: pass if isinstance(text, float) and math.modf(text)[0] == 0: return str(int(text)) return str(text) def test_unit(self, data): http = self.h.where((self.h['Name'] == data['HTTPChannel']), inplace=False).dropna(how='all').reset_index(drop=True, inplace=False).loc[0].to_dict() res_kwargs = { 'method': data['HTTPMethod'], 'url': self._sub_variable_auto((http['HostWithScheme'] or '') + (data['HTTPUri'] or '/'), [self._g, self._l]), 'query': self._sub_variable_auto(auto_decode(data['HTTPQuery']), [self._g, self._l]), 'data': None, 'json': None, 'file': None, 'header': {}, 'cookie': self._sub_variable_auto( auto_decode(data['HTTPCookie']), [self._g, self._l] ), 'auth': None, 'timeout': 60, 'proxy': None, 'auto_redirect': self._match_bool(data['HTTPRedirect']), 'ignore_cert_error': False, 'debug': True } res_kwargs['header'].update((auto_decode(http['DefaultHeader']) or {})) res_kwargs['header'].update((self._sub_variable_auto(auto_decode(data['HTTPHeader']), [self._g, self._l]) or {})) match self._match_parm_type(data['HTTPParamType']): case 10: res_kwargs['json'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent'])) case 20: res_kwargs['data'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent'])) case _: res_kwargs['data'] = self._sub_variable_auto(data['HTTPParamContent']) res_kwargs['file'] = locals().setdefault('f', self._sub_variable_auto(auto_decode( data['HTTPParamOfFile']))) and {k: open(os.path.abspath(os.path.join(self.dirs, './%s' % v)), 'rb') for k, v in locals().get('f').items()} res = self.http.http(**res_kwargs) for var_name, var_value in { '_Status': ReText(res['status'] or ''), '_Reason': ReText(res['reason'] or ''), '_Header': ReDict(res['header'] or {}).update({'_': yaml_encode(dict(res['header'] or {}))}), '_Cookie': ReDict(res['cookie'] or {}).update({'_': '; '.join([k + '=' + (v or '') for k, v in dict(res['cookie'] or {}).items()])}), '_Body': ReText(res['text'] or ''), '_Json': ReDict(res['json'] or {}), json_encode(None): None, json_encode(bool(0)): bool(0), json_encode(bool(1)): bool(1) }.items(): locals().setdefault(var_name, var_value) for k, v in (auto_decode(data['HTTPExtract']) or {}).items(): d = re.findall('^([0-9A-Za-z_]+).*?', v)[0] self._g[str(k)] = str(eval('_' + v.replace(d, d.title(), 1))) expect = self._to_string(data['HTTPAssertStatus']) if expect: opers = [" == ", " != ", " >= ", " <= ", " > ", " < ", " in ", " not in "] opers_flag = 0 for v in opers: if v in expect: opers_flag = 1 break if opers_flag: contrast = list(filter(lambda x: x, expect.split("\n"))) for contr in contrast: exec('assert ' + self._sub_variable_auto(contr).replace('$', str(res['status']))) else: actual = str(res['status']) try: assert expect == actual except AssertionError as e: raise AssertionError(str(expect) + ' == ' + str(actual)) # sys.stderr.write() def test(self, index): # print(view_case.loc[index]) main_case = dict(self.c.loc[index].to_dict()) self.test_unit(data=main_case) def main(self): class ClassTestCase(unittest.TestCase): pass ClassTestCase.__qualname__ = self.case ClassTestCase._test_ = self.test serial = 0 for i in range(len(self.c)): thisCase = self.c.loc[i].to_dict() if not self._match_case_type(thisCase['Flag']) == 10 and self._match_leve_run(thisCase['CaseLevel']): continue serial += 1 def foo(self, index=i): return self._test_(index) foo.__doc__ = thisCase['CaseTitle'] exec('ClassTestCase.test%s_%s = foo' % (str('%04d' % serial), str(thisCase['CaseId']))) return ClassTestCase if __name__ == '__main__': test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(testCaseClass=TestCase( workbook='D:/Desktop/接口自动化测试用例.xlsx', testlves=['P0'], testcase='测试用例', testdata='测试数据', httpconf='请求配置', database='请求配置' ).main(), prefix='test')) HTMLTestRunner(verbosity=5, report=os.path.abspath(os.path.join(os.path.dirname(__file__), './1.html'))).run( test_suite)