import unittest from typing import Any, Text, Dict, List from Business.Class.HTMLTestRunner import HTMLTestRunner from Business.Class.ExcelUtils import * from Business.Class.JsonOrYaml import * from Base.Class.Http import * from Base.Class.Database import * from Base.Class.Logger import * log = Logger(name='test', level='DEBUG') class TestCase: _g = {} _l = {} def __init__( self, workbook: Text, testlves: List, testcase: Text, testdata: Text, database: Text, httpconf: Text ): tabs = { "HTTPConf": { "fixed": {}, "views": "A4:C*", "field": [ "Name", "HostWithScheme", "DefaultHeader" ] }, "DataBase": { "fixed": {}, "views": "E4:L*", "field": [ "Name", "Type", "Host", "Port", "User", "Password", "Database", "Charset" ] }, "TestData": { "fixed": {}, "views": "A4:E*", "field": [ "Name", "Delimiter", "Data", "FileData", "VarNameList" ] }, "TestCase": { "fixed": { "ItemName": "B3", "SessMode": "E3" }, "views": "A7:AG*", "field": [ "Flag", "PreExecCase", "PreExecRule", "Module", "CaseId", "CaseTitle", "CaseDesc", "CaseLevel", "HTTPChannel", "HTTPMethod", "HTTPUri", "HTTPQuery", "HTTPRedirect", "HTTPHeaders", "HTTPCookies", "HTTPParamType", "HTTPParamContent", "HTTPParamOfFile", "HTTPExtract", "HTTPAssertStatus", "HTTPAssertHeaders", "HTTPAssertCookies", "HTTPAssertBody", "HTTPAssertJson", "DatabaseCh", "DatabaseSqlQuery", "DatabaseExtract", "DatabaseAssertData", "DatabaseAssertCount", "DataSet", "Author", "TestTime", "TestResult" ] } } import os from pandas.core.frame import DataFrame init = Excel().open(workbook, read_only=True) data = {} for k, v in tabs.items(): match k: case 'TestCase': sheet = testcase case 'TestData': sheet = testdata case 'DataBase': sheet = database case 'HTTPConf': sheet = httpconf case _: continue init.select(sheet) data[k] = {} if 'fixed' in v.keys(): data[k]['fixed'] = {} for key, value in v['fixed'].items(): data[k]['fixed'][key] = init.cellGet(cell=value) if 'views' in v.keys() and 'field' in v.keys(): data[k]['views'] = DataFrame(read_view_dict( filename=workbook, sheet=sheet, area=v['views'], fields=v['field'], auto_truncate=True)) else: init.exit() self.case = testcase self.http = Request self.dirs = os.path.dirname(workbook) self.i = data['TestCase']['fixed']['ItemName'] self.m = data['TestCase']['fixed']['SessMode'] self.h = data['HTTPConf']['views'] self.b = data['DataBase']['views'] self.d = data['TestData']['views'] self.c = data['TestCase']['views'] self._setLevels(testlves) self._setRequestMode(self.m) def _setRequestMode(self, value=None): self.http = [Request, Session][value in ['会话模式', 'Session', 'session']]() def _setLevels(self, level_list=None): if isinstance(level_list, (list, tuple, str)): self.leve_list = level_list @staticmethod def _matchCaseType(value=None): match value: case '正常用例': return 10 case '前置用例': return 20 case _: return 50 @staticmethod def _matchParmType(value=None): match value: case 'Json': return 10 case 'Form': return 20 case _: return 50 def _matchLevelRun(self, value=None): if self.leve_list is None: return True return value in self.leve_list def _merge_dict(self, dict_ori, dict_add): if isinstance(dict_ori, dict) and isinstance(dict_add, dict): for k, v in dict_ori.items(): if k in dict_add.keys(): if isinstance(v, dict): dict_ori[k] = self._merge_dict(dict_ori[k], dict_add[k]) else: dict_ori[k] = dict_add[k] for k, v in dict_add.items(): if k not in dict_ori.keys(): dict_ori[k] = dict_add[k] return dict_ori @staticmethod def _sub_variable(text='', vars_dict=None): if not isinstance(text, str): return text for variable_name in re.findall('\${(.*?)}', text): try: value = vars_dict[variable_name] except: value = '' text = text.replace('${%s}' % variable_name, str(value)) return text def _sub_variable_auto(self, data=None, vars_dict=None): if isinstance(vars_dict, list): d = {} for v in vars_dict: d.update(v) vars_dict = d if isinstance(data, list): for i in range(len(data)): if not isinstance(data[i], str): continue data[i] = self._sub_variable(text=data[i], vars_dict=vars_dict) return data if isinstance(data, dict): for k in data.keys(): if not isinstance(data[k], str): continue data[k] = self._sub_variable(text=data[k], vars_dict=vars_dict) return data if isinstance(data, str): return self._sub_variable(data, vars_dict=vars_dict) return data def test_unit(self, data): # 开始测试 log.d('正在执行:%s' % data['CaseTitle']) # 请求构造 locals().setdefault('HTTPChannel', self.h.where((self.h['Name'] == data['HTTPChannel']), inplace=False).dropna(how='all').loc[0].to_dict()) resKwArgs = { 'method': data['HTTPMethod'] or 'GET', 'url': self._sub_variable_auto((locals().get('HTTPChannel')['HostWithScheme'] or '') + (data['HTTPUri'] or '/'), [self._g, self._l]), 'query': self._sub_variable_auto(auto_decode(data['HTTPQuery']), [self._g, self._l]), 'data': None, 'data_json': None, 'data_file': None, 'header': {}, 'cookie': self._sub_variable_auto(auto_decode(data['HTTPCookies']), [self._g, self._l]), 'auth': None, 'timeout': 60, 'proxy': None, 'auto_redirect': data['HTTPRedirect'] in ['是', 'True', 'true'], 'ignore_cert_error': False, 'debug': True } resKwArgs['header'].update(auto_decode(locals().get('HTTPChannel')['DefaultHeader']) or {}) resKwArgs['header'].update(self._sub_variable_auto(auto_decode(data['HTTPHeaders']), [self._g, self._l]) or {}) match self._matchParmType(data['HTTPParamType']): case 10: resKwArgs['data_json'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent'])) case 20: resKwArgs['data'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent'])) case _: resKwArgs['data'] = self._sub_variable_auto(data['HTTPParamContent']) # httpParamOfFile = self._sub_variable_auto(auto_decode(data['HTTPParamOfFile'])) resKwArgs['data_file'] = locals().setdefault('httpParamOfFile', self._sub_variable_auto(auto_decode( data['HTTPParamOfFile']))) and {k: open(os.path.abspath(os.path.join(self.dirs, './%s' % v)), 'rb') for k, v in locals().get('httpParamOfFile').items()} res = self.http.http(**resKwArgs) log.d('\n%s' % (res['brief'] or '')) def test(self, index): # print(view_case.loc[index]) main_case = self.c.loc[index].to_dict() self.test_unit(data=main_case) def main(self): class ClassTestCase(unittest.TestCase): pass ClassTestCase.__qualname__ = self.case ClassTestCase._test_ = self.test serial = 0 for i in range(len(self.c)): thisCase = self.c.loc[i].to_dict() if not self._matchCaseType(thisCase['Flag']) == 10 and self._matchLevelRun(thisCase['CaseLevel']): continue serial += 1 def foo(self, index=i): return self._test_(index) foo.__doc__ = thisCase['CaseTitle'] exec('ClassTestCase.test%s_%s = foo' % (str('%04d' % serial), str(thisCase['CaseId']))) return ClassTestCase if __name__ == '__main__': test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(testCaseClass=TestCase( workbook='D:/Desktop/接口自动化测试用例.xlsx', testlves=['P0'], testcase='测试用例', testdata='测试数据', httpconf='请求配置', database='请求配置' ).main(), prefix='test')) HTMLTestRunner(verbosity=5, report=os.path.abspath(os.path.join(os.path.dirname(__file__), './1.html'))).run( test_suite)