import unittest from Business.Class.HTMLTestRunner import HTMLTestRunner from Business.Class.ExcelUtils import * from Business.Class.JsonOrYaml import * from Base.Class.Http import * from Base.Class.Database import * from Base.Class.Logger import * log = Logger(name='test', level='DEBUG') class TestCase: _g = {} _l = {} def __init__( self, workbook, envs, level_list: list = None, workbook_config: dict = None, testcase: str = None, testdata: str = None, database: str = None, httpconf: str = None ): if workbook_config is None: workbook_config = {} tabs = self._merge_dict( { 'HTTPConf': { 'sheet': httpconf }, 'DataBase': { 'sheet': database }, 'TestData': { 'sheet': testdata }, 'TestCase': { 'sheet': testcase } } , workbook_config) # tabs = { # 'HTTPConf': { # 'sheet': '数据配置' # }, # 'DataBase': { # 'sheet': '数据配置' # }, # 'TestData': { # 'sheet': '数据配置' # }, # 'TestCase': { # 'sheet': '测试用例' # } # } import os from pandas.core.frame import DataFrame init = Excel().open(workbook, read_only=True) file = os.path.splitext(workbook)[0] + '.json' conf = self._merge_dict((os.path.exists(file) and auto_decode(open(file, 'r', encoding='utf-8').read())) or {}, tabs) # print(json_encode(conf, indent=4, unicode=False)) # exit() data = {} for k, v in conf.items(): sheet_name = ('sheet' in v and v['sheet'] or v['default_sheet']) init.select(sheet_name) data[k] = {} if 'fixed' in v.keys(): data[k]['fixed'] = {} for key, value in v['fixed'].items(): data[k]['fixed'][key] = init.cellGet(cell=value) if 'views' in v.keys() and 'field' in v.keys(): data[k]['views'] = DataFrame(read_view_dict( filename=workbook, sheet=sheet_name, area=v['views'], fields=v['field'], auto_truncate=True)) else: init.exit() # print(json_encode(data['DataBase'], indent=4, unicode=False)) # exit() self.envs = envs self.case = testcase self.http = Request self.dirs = os.path.dirname(workbook) self.i = data['TestCase']['fixed']['ItemName'] self.m = data['TestCase']['fixed']['SessMode'] self.h = data['HTTPConf']['views'] self.b = data['DataBase']['views'] self.d = data['TestData']['views'] self.c = data['TestCase']['views'] self._setLevels(level_list) self._setRequestMode(self.m) def _setRequestMode(self, value=None): self.http = [Request, Session][value in ['会话模式', 'Session', 'session']]() return True def _setLevels(self, level_list=None): if isinstance(level_list, (list, tuple, str)): self.leve_list = level_list @staticmethod def _matchCaseType(value=None): match value: case '正常用例': return 10 case '前置用例': return 20 case _: return 50 @staticmethod def _matchParmType(value=None): match value: case 'Json': return 10 case 'Form': return 20 case _: return 50 def _matchLevelRun(self, value=None): if self.leve_list is None: return True return value in self.leve_list def _merge_dict(self, dict_ori, dict_add): if isinstance(dict_ori, dict) and isinstance(dict_add, dict): for k, v in dict_ori.items(): if k in dict_add.keys(): if isinstance(v, dict): dict_ori[k] = self._merge_dict(dict_ori[k], dict_add[k]) else: dict_ori[k] = dict_add[k] for k, v in dict_add.items(): if k not in dict_ori.keys(): dict_ori[k] = dict_add[k] return dict_ori @staticmethod def _sub_variable(text='', vars_dict=None): if not isinstance(text, str): return text for variable_name in re.findall('\${(.*?)}', text): try: value = vars_dict[variable_name] except: value = '' text = text.replace('${%s}' % variable_name, str(value)) return text def _sub_variable_auto(self, data=None, vars_dict=None): if isinstance(vars_dict, list): d = {} for v in vars_dict: d.update(v) vars_dict = d if isinstance(data, list): for i in range(len(data)): if not isinstance(data[i], str): continue data[i] = self._sub_variable(text=data[i], vars_dict=vars_dict) return data if isinstance(data, dict): for k in data.keys(): if not isinstance(data[k], str): continue data[k] = self._sub_variable(text=data[k], vars_dict=vars_dict) return data if isinstance(data, str): return self._sub_variable(data, vars_dict=vars_dict) return data def test_unit(self, data): # 开始测试 log.d('正在执行:%s' % data['CaseTitle']) # 请求构造 HTTPCh = self.h.where((self.h['Env'] == self.envs) & (self.h['Name'] == data['HTTPCh']), inplace=False).dropna(how='all').loc[0].to_dict() resKwArgs = { 'debug': True, 'method': data['HTTPMethod'], 'url': self._sub_variable_auto((HTTPCh['HostWithScheme'] or '') + (data['HTTPUri'] or '/'), [self._g, self._l]), 'query': self._sub_variable_auto(auto_decode(data['HTTPQuery']), [self._g, self._l]), 'header': self._sub_variable_auto(auto_decode(data['HTTPHeaders']), [self._g, self._l]), 'cookie': self._sub_variable_auto(auto_decode(data['HTTPCookies']), [self._g, self._l]), 'auto_redirect': data['HTTPRedirect'] in ['是', 'True', 'true'] } match self._matchParmType(data['HTTPParamType']): case 10: resKwArgs['data_json'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent'])) case 20: resKwArgs['data'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent'])) case _: resKwArgs['data'] = self._sub_variable_auto(data['HTTPParamContent']) HTTPParamOfFile = self._sub_variable_auto(auto_decode(data['HTTPParamOfFile'])) if isinstance(HTTPParamOfFile, dict): resKwArgs['data_file'] = {k: open(os.path.abspath(os.path.join(self.dirs, './%s' % v)), 'rb') for k, v in HTTPParamOfFile.items()} # for k, v in HTTPParamOfFile: # HTTPParamOfFile[k] = open(os.path.abspath(os.path.join(self.dirs, './%s' % v)), 'rb') # resKwArgs['data_file'] = HTTPParamOfFile res = self.http.http(**resKwArgs) log.d('\n%s' % (res['brief'] or '')) # print(res) # print(HTTPCh) def test(self, index): # print(view_case.loc[index]) main_case = self.c.loc[index].to_dict() self.test_unit(data=main_case) def main(self): class ClassTestCase(unittest.TestCase): pass ClassTestCase.__qualname__ = self.case ClassTestCase._test_ = self.test serial = 0 for i in range(len(self.c)): thisCase = self.c.loc[i].to_dict() if not self._matchCaseType(thisCase['Flag']) == 10 and self._matchLevelRun(thisCase['CaseLevel']): continue serial += 1 def foo(self, index=i): return self._test_(index) foo.__doc__ = thisCase['CaseTitle'] exec('ClassTestCase.test%s_%s = foo' % (str('%04d' % serial), str(thisCase['CaseId']))) return ClassTestCase if __name__ == '__main__': test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(testCaseClass=TestCase( workbook='D:/Desktop/接口自动化测试用例.xlsx', envs='测试环境', level_list=['P0'], testcase='测试用例' ).main(), prefix='test')) HTMLTestRunner(verbosity=5, report=os.path.abspath(os.path.join(os.path.dirname(__file__), './1.html'))).run( test_suite)