239 lines
8.8 KiB
Python
239 lines
8.8 KiB
Python
import unittest
|
|
from Business.Class.HTMLTestRunner import HTMLTestRunner
|
|
from Business.Class.ExcelUtils import *
|
|
from Business.Class.JsonOrYaml import *
|
|
from Base.Class.Http import *
|
|
from Base.Class.Database import *
|
|
from Base.Class.Logger import *
|
|
log = Logger(name='test', level='DEBUG')
|
|
|
|
class TestCase:
|
|
_g = {}
|
|
_l = {}
|
|
|
|
def __init__(
|
|
self,
|
|
workbook,
|
|
envs,
|
|
level_list: list = None,
|
|
workbook_config: dict = None,
|
|
testcase: str = None,
|
|
testdata: str = None,
|
|
database: str = None,
|
|
httpconf: str = None
|
|
):
|
|
if workbook_config is None: workbook_config = {}
|
|
tabs = self._merge_dict(
|
|
{
|
|
'HTTPConf': {
|
|
'sheet': httpconf
|
|
},
|
|
'DataBase': {
|
|
'sheet': database
|
|
},
|
|
'TestData': {
|
|
'sheet': testdata
|
|
},
|
|
'TestCase': {
|
|
'sheet': testcase
|
|
}
|
|
}
|
|
, workbook_config)
|
|
# tabs = {
|
|
# 'HTTPConf': {
|
|
# 'sheet': '数据配置'
|
|
# },
|
|
# 'DataBase': {
|
|
# 'sheet': '数据配置'
|
|
# },
|
|
# 'TestData': {
|
|
# 'sheet': '数据配置'
|
|
# },
|
|
# 'TestCase': {
|
|
# 'sheet': '测试用例'
|
|
# }
|
|
# }
|
|
import os
|
|
from pandas.core.frame import DataFrame
|
|
init = Excel().open(workbook, read_only=True)
|
|
file = os.path.splitext(workbook)[0] + '.json'
|
|
conf = self._merge_dict((os.path.exists(file) and auto_decode(open(file, 'r', encoding='utf-8').read())) or {}, tabs)
|
|
# print(json_encode(conf, indent=4, unicode=False))
|
|
# exit()
|
|
data = {}
|
|
for k, v in conf.items():
|
|
sheet_name = ('sheet' in v and v['sheet'] or v['default_sheet'])
|
|
init.select(sheet_name)
|
|
data[k] = {}
|
|
if 'fixed' in v.keys():
|
|
data[k]['fixed'] = {}
|
|
for key, value in v['fixed'].items():
|
|
data[k]['fixed'][key] = init.cellGet(cell=value)
|
|
if 'views' in v.keys() and 'field' in v.keys():
|
|
data[k]['views'] = DataFrame(read_view_dict(
|
|
filename=workbook, sheet=sheet_name, area=v['views'], fields=v['field'], auto_truncate=True))
|
|
else:
|
|
init.exit()
|
|
# print(json_encode(data['DataBase'], indent=4, unicode=False))
|
|
# exit()
|
|
|
|
self.envs = envs
|
|
self.case = testcase
|
|
self.http = Request
|
|
self.dirs = os.path.dirname(workbook)
|
|
|
|
self.i = data['TestCase']['fixed']['ItemName']
|
|
self.m = data['TestCase']['fixed']['SessMode']
|
|
self.h = data['HTTPConf']['views']
|
|
self.b = data['DataBase']['views']
|
|
self.d = data['TestData']['views']
|
|
self.c = data['TestCase']['views']
|
|
|
|
self._setLevels(level_list)
|
|
self._setRequestMode(self.m)
|
|
|
|
|
|
def _setRequestMode(self, value=None):
|
|
self.http = [Request, Session][value in ['会话模式', 'Session', 'session']]()
|
|
return True
|
|
|
|
def _setLevels(self, level_list=None):
|
|
if isinstance(level_list, (list, tuple, str)):
|
|
self.leve_list = level_list
|
|
|
|
@staticmethod
|
|
def _matchCaseType(value=None):
|
|
match value:
|
|
case '正常用例':
|
|
return 10
|
|
case '前置用例':
|
|
return 20
|
|
case _:
|
|
return 50
|
|
|
|
@staticmethod
|
|
def _matchParmType(value=None):
|
|
match value:
|
|
case 'Json':
|
|
return 10
|
|
case 'Form':
|
|
return 20
|
|
case _:
|
|
return 50
|
|
|
|
def _matchLevelRun(self, value=None):
|
|
if self.leve_list is None:
|
|
return True
|
|
return value in self.leve_list
|
|
|
|
def _merge_dict(self, dict_ori, dict_add):
|
|
if isinstance(dict_ori, dict) and isinstance(dict_add, dict):
|
|
for k, v in dict_ori.items():
|
|
if k in dict_add.keys():
|
|
if isinstance(v, dict):
|
|
dict_ori[k] = self._merge_dict(dict_ori[k], dict_add[k])
|
|
else:
|
|
dict_ori[k] = dict_add[k]
|
|
for k, v in dict_add.items():
|
|
if k not in dict_ori.keys():
|
|
dict_ori[k] = dict_add[k]
|
|
return dict_ori
|
|
|
|
@staticmethod
|
|
def _sub_variable(text='', vars_dict=None):
|
|
if not isinstance(text, str):
|
|
return text
|
|
for variable_name in re.findall('\${(.*?)}', text):
|
|
try:
|
|
value = vars_dict[variable_name]
|
|
except:
|
|
value = ''
|
|
text = text.replace('${%s}' % variable_name, str(value))
|
|
return text
|
|
|
|
def _sub_variable_auto(self, data=None, vars_dict=None):
|
|
if isinstance(vars_dict, list):
|
|
d = {}
|
|
for v in vars_dict:
|
|
d.update(v)
|
|
vars_dict = d
|
|
if isinstance(data, list):
|
|
for i in range(len(data)):
|
|
if not isinstance(data[i], str):
|
|
continue
|
|
data[i] = self._sub_variable(text=data[i], vars_dict=vars_dict)
|
|
return data
|
|
if isinstance(data, dict):
|
|
for k in data.keys():
|
|
if not isinstance(data[k], str):
|
|
continue
|
|
data[k] = self._sub_variable(text=data[k], vars_dict=vars_dict)
|
|
return data
|
|
if isinstance(data, str): return self._sub_variable(data, vars_dict=vars_dict)
|
|
return data
|
|
|
|
def test_unit(self, data):
|
|
# 开始测试
|
|
log.d('正在执行:%s' % data['CaseTitle'])
|
|
# 请求构造
|
|
HTTPCh = self.h.where((self.h['Env'] == self.envs) & (self.h['Name'] == data['HTTPCh']), inplace=False).dropna(how='all').loc[0].to_dict()
|
|
resKwArgs = {
|
|
'debug': True,
|
|
'method': data['HTTPMethod'],
|
|
'url': self._sub_variable_auto((HTTPCh['HostWithScheme'] or '') + (data['HTTPUri'] or '/'), [self._g, self._l]),
|
|
'query': self._sub_variable_auto(auto_decode(data['HTTPQuery']), [self._g, self._l]),
|
|
'header': self._sub_variable_auto(auto_decode(data['HTTPHeaders']), [self._g, self._l]),
|
|
'cookie': self._sub_variable_auto(auto_decode(data['HTTPCookies']), [self._g, self._l]),
|
|
'auto_redirect': data['HTTPRedirect'] in ['是', 'True', 'true']
|
|
}
|
|
match self._matchParmType(data['HTTPParamType']):
|
|
case 10:
|
|
resKwArgs['data_json'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent']))
|
|
case 20:
|
|
resKwArgs['data'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent']))
|
|
case _:
|
|
resKwArgs['data'] = self._sub_variable_auto(data['HTTPParamContent'])
|
|
HTTPParamOfFile = self._sub_variable_auto(auto_decode(data['HTTPParamOfFile']))
|
|
if isinstance(HTTPParamOfFile, dict):
|
|
resKwArgs['data_file'] = {k: open(os.path.abspath(os.path.join(self.dirs, './%s' % v)), 'rb') for k, v in HTTPParamOfFile.items()}
|
|
# for k, v in HTTPParamOfFile:
|
|
# HTTPParamOfFile[k] = open(os.path.abspath(os.path.join(self.dirs, './%s' % v)), 'rb')
|
|
# resKwArgs['data_file'] = HTTPParamOfFile
|
|
|
|
res = self.http.http(**resKwArgs)
|
|
log.d('\n%s' % (res['brief'] or ''))
|
|
# print(res)
|
|
# print(HTTPCh)
|
|
|
|
def test(self, index):
|
|
# print(view_case.loc[index])
|
|
main_case = self.c.loc[index].to_dict()
|
|
self.test_unit(data=main_case)
|
|
|
|
def main(self):
|
|
class ClassTestCase(unittest.TestCase):
|
|
pass
|
|
ClassTestCase.__qualname__ = self.case
|
|
ClassTestCase._test_ = self.test
|
|
serial = 0
|
|
for i in range(len(self.c)):
|
|
thisCase = self.c.loc[i].to_dict()
|
|
if not self._matchCaseType(thisCase['Flag']) == 10 and self._matchLevelRun(thisCase['CaseLevel']): continue
|
|
serial += 1
|
|
def foo(self, index=i): return self._test_(index)
|
|
foo.__doc__ = thisCase['CaseTitle']
|
|
exec('ClassTestCase.test%s_%s = foo' % (str('%04d' % serial), str(thisCase['CaseId'])))
|
|
return ClassTestCase
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_suite = unittest.TestSuite()
|
|
test_suite.addTest(unittest.makeSuite(testCaseClass=TestCase(
|
|
workbook='D:/Desktop/接口自动化测试用例.xlsx',
|
|
envs='测试环境',
|
|
level_list=['P0'],
|
|
testcase='测试用例'
|
|
).main(), prefix='test'))
|
|
HTMLTestRunner(verbosity=5, report=os.path.abspath(os.path.join(os.path.dirname(__file__), './1.html'))).run(
|
|
test_suite)
|