AutoFramework/Runner/API/DefaultRunner.py

430 lines
15 KiB
Python

import sys
import unittest
from typing import Any, Text, Dict, List
from Business.Class.HTMLTestRunner import HTMLTestRunner
from Business.Class.ExcelUtils import *
from Business.Class.JsonOrYaml import *
from Base.Class.Http import *
from Base.Class.Database import *
from Base.Class.Logger import *
def reparse(regexp, string):
import re
mut = isinstance(regexp, tuple)
reg = [str(regexp), str(regexp[0])][mut]
flg = [0, eval('re.%s' % str(regexp[1]).upper())][mut]
if reg in ['', 'trim']:
return ReText(string.strip())
else:
return ReList(re.findall(reg, string, flg))
class ReText(str):
def __getitem__(self, item):
if isinstance(item, (int, bool)):
return __class__(super().__getitem__(item))
else:
return reparse(item, str(self.__str__()))
class ReList(list):
def __getitem__(self, item):
if isinstance(item, (int, bool)):
try:
r = super().__getitem__(item)
if isinstance(r, (list, tuple, set)):
return __class__(r)
elif isinstance(r, (dict,)):
return ReDict(r)
else:
return ReText(r)
except:
return __class__()
else:
try:
return reparse(item, str(self.__str__()))
except:
return ReText('')
def __str__(self):
try:
return ReText(self[0])
except:
return ReText('')
class ReDict(dict):
def __getattr__(self, item):
return self.__getitem__(item)
def __getitem__(self, item):
if item in self:
r = super().__getitem__(item)
if isinstance(r, (dict,)):
return __class__(r)
elif isinstance(r, (list, tuple, set)):
return ReList(r)
else:
return ReText(r)
else:
try:
return reparse(item, str(self.__str__()))
except:
return ReText('')
def __str__(self):
try:
if '_' in self:
return ReText(self.__getitem__('_'))
else:
return json_encode(self, indent=None, unicode=False)
except:
return ReText('')
def update(self, *args, **kwargs):
super().update(*args, **kwargs)
return self
class TestCase:
_g = {}
_l = {}
def __init__(
self,
workbook: Text,
testlves: List,
testcase: Text,
testdata: Text,
database: Text,
httpconf: Text
):
tabs = {
"HTTPConf": {
"fixed": {},
"views": "A4:C*",
"field": [
"Name",
"HostWithScheme",
"DefaultHeader"
]
},
"DataBase": {
"fixed": {},
"views": "E4:L*",
"field": [
"Name",
"Type",
"Host",
"Port",
"User",
"Password",
"Database",
"Charset"
]
},
"TestData": {
"fixed": {},
"views": "A4:E*",
"field": [
"Name",
"Delimiter",
"Data",
"FileData",
"VarNameList"
]
},
"TestCase": {
"fixed": {
"ItemName": "B3",
"SessMode": "E3"
},
"views": "A7:AH*",
"field": [
"Flag",
"PreExecCase",
"PreExecRule",
"Module",
"CaseId",
"CaseTitle",
"CaseDesc",
"CaseLevel",
"HTTPChannel",
"HTTPMethod",
"HTTPUri",
"HTTPQuery",
"HTTPRedirect",
"HTTPHeader",
"HTTPCookie",
"HTTPParamType",
"HTTPParamContent",
"HTTPParamOfFile",
"HTTPExtract",
"HTTPAssertStatus",
"HTTPAssertReason",
"HTTPAssertHeader",
"HTTPAssertCookie",
"HTTPAssertBody",
"HTTPAssertJson",
"DatabaseChannel",
"DatabaseSqlQuery",
"DatabaseExtract",
"DatabaseAssertData",
"DatabaseAssertCount",
"DataSet",
"Author",
"TestTime",
"TestResult"
]
}
}
import os
from pandas.core.frame import DataFrame
init = Excel().open(workbook, read_only=True)
data = {}
for k, v in tabs.items():
match k:
case 'TestCase':
sheet = testcase
case 'TestData':
sheet = testdata
case 'DataBase':
sheet = database
case 'HTTPConf':
sheet = httpconf
case _:
continue
init.select(sheet)
data[k] = {}
if 'fixed' in v.keys():
data[k]['fixed'] = {}
for key, value in v['fixed'].items():
data[k]['fixed'][key] = init.cellGet(cell=value)
if 'views' in v.keys() and 'field' in v.keys():
data[k]['views'] = DataFrame(read_view_dict(
filename=workbook, sheet=sheet, area=v['views'], fields=v['field'], auto_truncate=True))
else:
init.exit()
self.case = testcase
self.http = Request
self.dirs = os.path.dirname(workbook)
self.i = data['TestCase']['fixed']['ItemName']
self.m = data['TestCase']['fixed']['SessMode']
self.h = data['HTTPConf']['views']
self.b = data['DataBase']['views']
self.d = data['TestData']['views']
self.c = data['TestCase']['views']
self._set_levels(testlves)
self._set_request_mode(self.m)
def _set_request_mode(self, value=None):
self.http = [Request, Session][self._match_mode(value)]()
def _set_levels(self, level_list=None):
if isinstance(level_list, (list, tuple, str)):
self.leve_list = level_list
@staticmethod
def _match_case_type(value=None):
match value:
case '正常用例':
return 10
case '前置用例':
return 20
case _:
return 50
@staticmethod
def _match_parm_type(value=None):
match value:
case 'Json':
return 10
case 'Form':
return 20
case _:
return 50
@staticmethod
def _match_bool(value=None):
return value in ['', '开启', '打开', 'True', 'true', 'TRUE', True, 'Yes', 'yes', 'YES', 'Y', 'y', '1', 1, 1.0]
@staticmethod
def _match_mode(value=None):
return value in ['会话模式', 'Session', 'session']
def _match_leve_run(self, value=None):
if self.leve_list is None:
return True
return value in self.leve_list
def _merge_dict(self, dict_ori, dict_add):
if isinstance(dict_ori, dict) and isinstance(dict_add, dict):
for k, v in dict_ori.items():
if k in dict_add.keys():
if isinstance(v, dict):
dict_ori[k] = self._merge_dict(dict_ori[k], dict_add[k])
else:
dict_ori[k] = dict_add[k]
for k, v in dict_add.items():
if k not in dict_ori.keys():
dict_ori[k] = dict_add[k]
return dict_ori
@staticmethod
def _sub_variable(text='', vars_dict=None):
if not isinstance(text, str):
return text
for variable_name in re.findall('\${(.*?)}', text):
try:
value = vars_dict[variable_name]
except:
value = ''
text = text.replace('${%s}' % variable_name, str(value))
return text
def _sub_variable_auto(self, data=None, vars_dict=None):
if isinstance(vars_dict, list):
d = {}
for v in vars_dict:
d.update(v)
vars_dict = d
if isinstance(data, list):
for i in range(len(data)):
if not isinstance(data[i], str):
continue
data[i] = self._sub_variable(text=data[i], vars_dict=vars_dict)
return data
if isinstance(data, dict):
for k in data.keys():
if not isinstance(data[k], str):
continue
data[k] = self._sub_variable(text=data[k], vars_dict=vars_dict)
return data
if isinstance(data, str):
return self._sub_variable(data, vars_dict=vars_dict)
return data
@staticmethod
def _to_string(text):
import math
if text is None:
return ''
try:
if math.isnan(text):
return ''
except:
pass
if isinstance(text, float) and math.modf(text)[0] == 0:
return str(int(text))
return str(text)
def test_unit(self, data):
http = self.h.where((self.h['Name'] == data['HTTPChannel']), inplace=False).dropna(how='all').reset_index(drop=True, inplace=False).loc[0].to_dict()
res_kwargs = {
'method': data['HTTPMethod'],
'url': self._sub_variable_auto((http['HostWithScheme'] or '') + (data['HTTPUri'] or '/'), [self._g, self._l]),
'query': self._sub_variable_auto(auto_decode(data['HTTPQuery']), [self._g, self._l]),
'data': None,
'json': None,
'file': None,
'header': {},
'cookie': self._sub_variable_auto(
auto_decode(data['HTTPCookie']),
[self._g, self._l]
),
'auth': None,
'timeout': 60,
'proxy': None,
'auto_redirect': self._match_bool(data['HTTPRedirect']),
'ignore_cert_error': False,
'debug': True
}
res_kwargs['header'].update((auto_decode(http['DefaultHeader']) or {}))
res_kwargs['header'].update((self._sub_variable_auto(auto_decode(data['HTTPHeader']), [self._g, self._l]) or {}))
match self._match_parm_type(data['HTTPParamType']):
case 10:
res_kwargs['json'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent']))
case 20:
res_kwargs['data'] = self._sub_variable_auto(auto_decode(data['HTTPParamContent']))
case _:
res_kwargs['data'] = self._sub_variable_auto(data['HTTPParamContent'])
res_kwargs['file'] = locals().setdefault('f', self._sub_variable_auto(auto_decode(
data['HTTPParamOfFile']))) and {k: open(os.path.abspath(os.path.join(self.dirs, './%s' % v)), 'rb') for k, v in locals().get('f').items()}
res = self.http.http(**res_kwargs)
for var_name, var_value in {
'_Status': ReText(res['status'] or ''),
'_Reason': ReText(res['reason'] or ''),
'_Header': ReDict(res['header'] or {}).update({'_': yaml_encode(dict(res['header'] or {}))}),
'_Cookie': ReDict(res['cookie'] or {}).update({'_': '; '.join([k + '=' + (v or '') for k, v in dict(res['cookie'] or {}).items()])}),
'_Body': ReText(res['text'] or ''),
'_Json': ReDict(res['json'] or {}),
json_encode(None): None,
json_encode(bool(0)): bool(0),
json_encode(bool(1)): bool(1)
}.items():
locals().setdefault(var_name, var_value)
for k, v in (auto_decode(data['HTTPExtract']) or {}).items():
d = re.findall('^([0-9A-Za-z_]+).*?', v)[0]
self._g[str(k)] = str(eval('_' + v.replace(d, d.title(), 1)))
expect = self._to_string(data['HTTPAssertStatus'])
if expect:
opers = [" == ", " != ", " >= ", " <= ", " > ", " < ", " in ", " not in "]
opers_flag = 0
for v in opers:
if v in expect:
opers_flag = 1
break
if opers_flag:
contrast = list(filter(lambda x: x, expect.split("\n")))
for contr in contrast:
exec('assert ' + self._sub_variable_auto(contr).replace('$', str(res['status'])))
else:
actual = str(res['status'])
try:
assert expect == actual
except AssertionError as e:
raise AssertionError(str(expect) + ' == ' + str(actual))
# sys.stderr.write()
def test(self, index):
# print(view_case.loc[index])
main_case = dict(self.c.loc[index].to_dict())
self.test_unit(data=main_case)
def main(self):
class ClassTestCase(unittest.TestCase):
pass
ClassTestCase.__qualname__ = self.case
ClassTestCase._test_ = self.test
serial = 0
for i in range(len(self.c)):
thisCase = self.c.loc[i].to_dict()
if not self._match_case_type(thisCase['Flag']) == 10 and self._match_leve_run(thisCase['CaseLevel']): continue
serial += 1
def foo(self, index=i):
return self._test_(index)
foo.__doc__ = thisCase['CaseTitle']
exec('ClassTestCase.test%s_%s = foo' % (str('%04d' % serial), str(thisCase['CaseId'])))
return ClassTestCase
if __name__ == '__main__':
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(testCaseClass=TestCase(
workbook='D:/Desktop/接口自动化测试用例.xlsx',
testlves=['P0'],
testcase='测试用例',
testdata='测试数据',
httpconf='请求配置',
database='请求配置'
).main(), prefix='test'))
HTMLTestRunner(verbosity=5, report=os.path.abspath(os.path.join(os.path.dirname(__file__), './1.html'))).run(
test_suite)