diff --git a/Base/Class/Database.py b/Base/Class/Database.py
index b284234..98de74b 100644
--- a/Base/Class/Database.py
+++ b/Base/Class/Database.py
@@ -1,9 +1,95 @@
-import re, pymysql.cursors
+import pymysql.cursors
+
+
+def char_width(string):
+ iChineseChar = 0
+ iEnglishChar = 0
+ for char in string:
+ if len(char.encode("utf-8")) > 2:
+ iChineseChar += 1
+ else:
+ iEnglishChar += 1
+ return (iChineseChar * 2) + iEnglishChar
+
+
+def char_equal_width(string, length):
+ endadd = ""
+ if char_width(string) > length:
+ length = length - 2
+ endadd = ".."
+ new_char = ""
+ new_char_len = 0
+ for i in range(len(string)):
+ this = string[i]
+ this_len = char_width(this)
+ if new_char_len + this_len > length:
+ break
+ else:
+ new_char += this
+ new_char_len += this_len
+ a = length - new_char_len
+ new_char += a * "\x20"
+ new_char_len += a
+ new_char += endadd
+ new_char_len += char_width(endadd)
+ return new_char
+
+
+def char_line(str_list, len_list):
+ o = ""
+ l = len(str_list)
+ for i in range(l):
+ this = str(str_list[i])
+ if i == 0:
+ o += "|"
+ if i >= 0:
+ o += " " + char_equal_width(this, len_list[i]) + " " + "|"
+ if i == (l - 1):
+ o += "\n"
+ return o
+
+
+def show_table(table_head: list, table_body: list, max_char_line=5):
+ table_body.insert(0, table_head)
+ o = ""
+ width_list = [
+ max(m) if max(m) <= 20 else 20
+ for m in [
+ [char_width(str(value[i])) for value in table_body]
+ for i in range(len(table_head))
+ ]
+ ]
+ horizontal = ""
+ l = len(table_head)
+ for i in range(l):
+ if i == 0:
+ horizontal += "+"
+ if i >= 0:
+ horizontal += "-" * (width_list[i] + 2) + "+"
+ if i == (l - 1):
+ horizontal += "\n"
+ h = len(table_body)
+ l = h if h <= (max_char_line + 1) else (max_char_line + 1)
+ for i in range(l):
+ this = table_body[i]
+ if i == 0:
+ o += horizontal
+ if i >= 0:
+ o += char_line(this, width_list)
+ if i == 0:
+ o += horizontal
+ if i == (l - 1):
+ o += horizontal
+ if i == (l - 1) and h > l:
+ o += str(h - 1) + " rows in set, the rest have been omitted." + "\n"
+ return o
class MySQL:
__conn__ = None
__curr__ = None
+ __sqls__ = None
+ __data__ = None
def __init__(self, **kwargs):
if kwargs:
@@ -34,6 +120,7 @@ class MySQL:
def execute(self, *args):
if self.__curr__ is None:
raise Exception('连接不存在 | Connection does not exist.')
+ self.__sqls__ = str(*args)
self.__curr__.execute(*args)
return self
@@ -42,10 +129,26 @@ class MySQL:
raise Exception('连接不存在 | Connection does not exist.')
return self.__curr__.rowcount
+ def brief(self):
+ if self.__curr__ is None:
+ raise Exception('连接不存在 | Connection does not exist.')
+ if not self.__sqls__:
+ send = ''
+ else:
+ send = self.__sqls__
+ if not self.__data__:
+ recv = ''
+ else:
+ data = self.__data__
+ recv = show_table([str(k) for k in data[0].keys()], [[v for k, v in data[i].items()] for i in range(len(data))], 5)
+ return '[->]' + '\n' + send + '\n' + '[<-]' + '\n' + recv
+
def fetchall(self):
if self.__curr__ is None:
raise Exception('连接不存在 | Connection does not exist.')
- return self.__curr__.fetchall()
+ res = self.__curr__.fetchall()
+ self.__data__ = res
+ return res
def fetchone(self):
if self.__curr__ is None:
diff --git a/Business/Class/HTMLTestRunner.py b/Business/Class/HTMLTestRunner.py
index f92ae6e..1f2e201 100644
--- a/Business/Class/HTMLTestRunner.py
+++ b/Business/Class/HTMLTestRunner.py
@@ -721,7 +721,7 @@ function html_escape(s) {
%(status)s
-
%(script)s
+
%(script)s
@@ -741,7 +741,7 @@ function html_escape(s) {
%(status)s
-
%(script)s
+
%(script)s
diff --git a/Runner/API/1.html b/Runner/API/1.html
index 8166583..360970f 100644
--- a/Runner/API/1.html
+++ b/Runner/API/1.html
@@ -229,7 +229,7 @@
series: [{
name: '通过',
color: '#64bb64',
- data: [5]
+ data: [1]
}, {
name: '失败',
color: '#f16d7e',
@@ -237,7 +237,7 @@
}, {
name: '错误',
color: '#fdc68c',
- data: [1]
+ data: [0]
}]
})
// 增加饼状图
@@ -291,7 +291,7 @@
innerSize: '80%',
name: '比例',
data: [
- ['通过', 5], ['失败', 0], ['错误', 1]
+ ['通过', 1], ['失败', 0], ['错误', 0]
]
}]
}, function(c) {
@@ -438,15 +438,15 @@ function html_escape(s) {
测试报告
-
开始时间 : 2022-08-12 11:13:49
+
开始时间 : 2022-08-15 00:10:16
合计耗时 : 00:00:00
-
测试结果 : 总共 6,通过 5,失败 0,错误 1,通过率 83.33%
+
测试结果 : 总共 1,通过 1,失败 0,错误 0,通过率 100.00%
失败用例 : 无
-
错误用例 : 点击查看
main.测试用例.test0003_None
+
错误用例 : 无
@@ -457,11 +457,11 @@ function html_escape(s) {
@@ -486,121 +486,32 @@ function html_escape(s) {
详细
-
+
测试用例
- 6
- 5
- 0
1
- 0.720秒
- 查看全部
+ 1
+ 0
+ 0
+ 0.442秒
+ 查看全部
- test0001_None
-
-
- 通过
-
-
-
-
-
-
- test0002_None
-
-
- 通过
-
-
-
-
-
-
- test0003_None
-
-
- 错误
-
-
et1_3:
-Traceback (most recent call last):
- File "D:\Project\AutoFramework\Runner\API\DefaultRunner.py", line 579, in foo
- return self._test_(index)
- File "D:\Project\AutoFramework\Runner\API\DefaultRunner.py", line 564, in test
- self.test_unit(data=main_case)
- File "D:\Project\AutoFramework\Runner\API\DefaultRunner.py", line 500, in test_unit
- self._assert([
- File "D:\Project\AutoFramework\Runner\API\DefaultRunner.py", line 437, in _assert
- assert_parm = [eval(form[0]), eval(form[1])]
- File "<string>", line 1, in <module>
- File "C:\Program Files\Python310\lib\_sitebuiltins.py", line 26, in __call__
- raise SystemExit(code)
-SystemExit: None
-
-
-
-
-
-
-
- test0004_None
-
-
- 通过
-
-
-
-
-
-
- test0005_None
-
-
- 通过
-
-
pt1_5:
-{'var2': 'nginx'}
-
-
-
-
-
-
-
- test0006_None
-
-
- 通过
-
-
pt1_6:
-{'var2': 'nginx', 'var3': '101', 'var4': '阿凡\n凡凡', 'var5': '{"name": "小红", "age": 22}'}
-
-
-
+ test0001_t1
+
+ 通过
总计
- 6
- 5
- 0
1
- 0.720秒
- 通过:83.33%
+ 1
+ 0
+ 0
+ 0.442秒
+ 通过:100.00%
diff --git a/Runner/API/DefaultRunner.py b/Runner/API/DefaultRunner.py
index d6ed031..d4d5404 100644
--- a/Runner/API/DefaultRunner.py
+++ b/Runner/API/DefaultRunner.py
@@ -114,19 +114,21 @@ class TestCase:
tabs = {
"HTTPConf": {
"fixed": {},
- "views": "A4:F*",
+ "views": "A4:H*",
"field": [
"Name",
"Scheme",
"Host",
"DefaultHeader",
"Timeout",
- "Session"
+ "Session",
+ "Proxies",
+ "IgnoreSSLCertError"
]
},
"DataBase": {
"fixed": {},
- "views": "H4:N*",
+ "views": "J4:P*",
"field": [
"Name",
"Host",
@@ -155,7 +157,7 @@ class TestCase:
"Flag",
"PreExecCase",
"PreExecRule",
- "Module",
+ "CaseModule",
"CaseId",
"CaseTitle",
"CaseDesc",
@@ -182,7 +184,7 @@ class TestCase:
"BaseExtract",
"BaseAssertData",
"BaseAssertRows",
- "DataSet",
+ "DataFileSet",
"Author",
"TestTime",
"TestResult"
@@ -323,8 +325,7 @@ class TestCase:
else:
return False
- @staticmethod
- def _sub_variable(text, vars_dict):
+ def _sub_variable(self, text, vars_dict):
if not isinstance(text, str):
return text
for variable_name in re.findall('\${(.*?)}', text):
@@ -332,6 +333,8 @@ class TestCase:
value = vars_dict[variable_name]
except:
value = ''
+ if self._is_nan(value) or value is None:
+ value = ''
text = text.replace('${%s}' % variable_name, str(value))
return text
@@ -363,29 +366,29 @@ class TestCase:
import math
if value is None:
return ''
- try:
- if math.isnan(value):
- return ''
- except:
- pass
- if isinstance(value, float) and math.modf(value)[0] == 0:
+ if isinstance(value, (float, int)) and str(value) == str(float('NaN')):
+ return ''
+ if isinstance(value, (float,)) and math.modf(value)[0] == 0:
return str(int(value))
return str(value)
@staticmethod
def _to_number(value):
- import math
if not value:
return 0
if value is True:
return 1
- try:
- if math.isnan(value):
- return 0
- except:
- pass
+ if isinstance(value, (float, int)) and str(value) == str(float('NaN')):
+ return 0
return int(float(value))
+ @staticmethod
+ def _is_nan(value):
+ if isinstance(value, (float, int)) and str(value) == str(float('NaN')):
+ return True
+ else:
+ return False
+
def _extract_variable(self, extract, vars_dict: dict, domain: str | list | dict):
if not extract: return None
if not isinstance(extract, (list, dict)):
@@ -451,11 +454,10 @@ class TestCase:
actual = str(actual)
assert actual == expect, '%s != %s' % (actual, expect)
- def test_unit(self, data):
+ def _test_unit(self, data, main=None):
if data['HTTPUri']:
if not data['HTTPChannel']:
raise Exception('channel not set')
- # 创建请求
http = self.h.where((self.h['Name'] == data['HTTPChannel']), inplace=False).dropna(how='all').reset_index(drop=True, inplace=False).loc[0].to_dict()
res_kwargs = {
'method': data['HTTPMethod'],
@@ -468,9 +470,9 @@ class TestCase:
'cookie': self._sub_variable_auto(auto_decode(data['HTTPCookie'])),
'auth': None,
'timeout': self._to_number(http['Timeout']) or 15,
- 'proxy': None,
+ 'proxy': self._to_string(http['Proxies']),
'auto_redirect': self._match_bool(data['HTTPRedirect']),
- 'ignore_cert_error': False,
+ 'ignore_cert_error': self._match_bool(http['IgnoreSSLCertError']),
'debug': True
}
res_kwargs['header'].update((auto_decode(http['DefaultHeader']) or {}))
@@ -485,7 +487,7 @@ class TestCase:
res_kwargs['file'] = locals().setdefault('f', self._sub_variable_auto(auto_decode(
data['HTTPParamOfFile']))) and {k: open(os.path.abspath(os.path.join(self.dirs, './%s' % v)), 'rb') for k, v in locals().get('f').items()}
res = [self.request, self.session][self._match_bool(http['Session'])].http(**res_kwargs)
- # 创建变量
+ log.d("\n" + res['brief'])
hvar = {
'Status': ReText(res['status'] or ''),
'Reason': ReText(res['reason'] or ''),
@@ -494,9 +496,7 @@ class TestCase:
'Body': ReText(res['text'] or ''),
'Json': ReDict(res['json'] or {}),
}
- # 提取变量
self._extract_variable(extract=data['HTTPExtract'], vars_dict=hvar, domain='global')
- # 开始断言
self._assert([
{'expect': data['HTTPAssertStatus'], 'actual': hvar['Status']},
{'expect': data['HTTPAssertReason'], 'actual': hvar['Reason']},
@@ -509,61 +509,41 @@ class TestCase:
if not data['BaseChannel']:
raise Exception('channel not set')
base = self.b.where((self.b['Name'] == data['BaseChannel']), inplace=False).dropna(how='all').reset_index(drop=True, inplace=False).loc[0].to_dict()
- # 创建请求
conn = MySQL().connect(
host=base['Host'], port=int(base['Port']),
user=base['User'], password=base['Password'], database=base['Database'], charset=base['Charset'], cursor='Dict'
)
conn.execute(data['BaseSql'])
- # 创建变量
bvar = {
'Data': ReList(conn.fetchall() or []),
'Rows': conn.count() or 0
}
- # 提取变量
+ log.d("\n" + conn.brief())
self._extract_variable(extract=data['BaseExtract'], vars_dict=bvar, domain='global')
- # for var_name, var_value in {
- # '_Data': ReList(conn.fetchall() or []),
- # '_Rows': conn.count() or 0
- # }.items():
- # locals().__setitem__(var_name, var_value)
- # for k, v in (auto_decode(data['DatabaseExtract']) or {}).items():
- # d = re.findall('^([0-9A-Za-z_]+).*?', v)[0]
- # self._g[str(k)] = str(eval('_' + v.replace(d, d.title(), 1)))
- # 开始断言
self._assert([
{'expect': data['BaseAssertData'], 'actual': bvar['Data']},
{'expect': data['BaseAssertRows'], 'actual': bvar['Rows']},
], self.assert_list)
- print(self._g)
-
- # expect = self._to_string(data['HTTPAssertStatus'])
- # if expect:
- # opers = [" == ", " != ", " >= ", " <= ", " > ", " < ", " in ", " not in "]
- # opers_flag = 0
- # for v in opers:
- # if v in expect:
- # opers_flag = 1
- # break
- # if opers_flag:
- # contrast = list(filter(lambda x: x, expect.split("\n")))
- # for contr in contrast:
- # exec('assert ' + self._sub_variable_auto(contr).replace('$', str(res['status'])))
- # else:
- # actual = str(res['status'])
- # try:
- # assert expect == actual
- # except AssertionError as e:
- # raise AssertionError(str(expect) + ' == ' + str(actual))
- # # sys.stderr.write()
-
- def test(self, index):
- # print(view_case.loc[index])
+ def test(self, index, update_global=None, update_locals=None):
+ if isinstance(update_global, (dict,)):
+ self._g.update(update_global)
+ if isinstance(update_locals, (dict,)):
+ self._l.update(update_locals)
main_case = dict(self.c.loc[index].to_dict())
- self.test_unit(data=main_case)
+ prex_list = list(filter(lambda x: x, [value.strip() for value in (main_case['PreExecCase'] or '').replace(',', "\n").split("\n")]))
+ for id in prex_list:
+ data = self.c.where((self.c['CaseId'] == id), inplace=False).dropna(how='all').reset_index(drop=True, inplace=False).loc[0].to_dict()
+ if data['PreExecCase'] or data['PreExecRule']:
+ raise Exception('')
+ if data['DataFileSet']:
+ raise Exception('')
+ self._test_unit(data=data)
+ self._test_unit(data=main_case, main=True)
+ self._l.clear()
def main(self):
+ from pandas import read_csv
class ClassTestCase(unittest.TestCase):
pass
@@ -571,15 +551,57 @@ class TestCase:
ClassTestCase._test_ = self.test
serial = 0
for i in range(len(self.c)):
- thisCase = self.c.loc[i].to_dict()
- if not self._match_case_type(thisCase['Flag']) == 10 and self._match_leve_run(thisCase['CaseLevel']): continue
+ data = self.c.loc[i].to_dict()
+ if not self._match_case_type(data['Flag']) == 10 and self._match_leve_run(data['CaseLevel']): continue
serial += 1
+ filename = data['DataFileSet'] or ''
+ if filename:
+ data_file = read_csv(os.path.abspath(os.path.join(self.dirs, './%s' % filename)), keep_default_na=False)
+ data_file_rows = len(data_file)
+ data_file_flag = 1
+ else:
+ data_file = None
+ data_file_rows = 0
+ data_file_flag = 0
+ j = 0
+ while j < data_file_rows or data_file_flag == 0:
+ match data_file_flag:
+ case 1:
+ vars_data = data_file.loc[j].to_dict()
+ func_docs = str(data['CaseTitle'] or '') + '_' + '_'.join([str(v) for k, v in vars_data.items()])[:32]
+ func_name = 'test%s_%s_%s' % (str('%04d' % serial), str(data['CaseId']), str('%04d' % (j + 1)))
+ once_do_break = 0
+ case _:
+ vars_data = None
+ func_docs = str(data['CaseTitle'] or '')
+ func_name = 'test%s_%s' % (str('%04d' % serial), str(data['CaseId']))
+ once_do_break = 1
- def foo(self, index=i):
- return self._test_(index)
+ def func(self, index=i, update_global=None, update_locals=vars_data):
+ return self._test_(index=index, update_global=update_global, update_locals=update_locals)
+
+ func.__doc__ = func_docs
+ type.__setattr__(ClassTestCase, func_name, func)
+ j += 1
+ if once_do_break: break
+
+ # if filename:
+ # data_file = read_csv(os.path.join(self.dirs, './%s' % filename), keep_default_na=False)
+ # for j in range(len(data_file)):
+ # vars_data = data_file.loc[j].to_dict()
+ #
+ # def func(self, index=i, update_global=None, update_locals=vars_data):
+ # return self._test_(index=index, update_global=update_global, update_locals=update_locals)
+ #
+ # func.__doc__ = (data['CaseTitle'] or '') + '_' + '_'.join([str(v) for k, v in vars_data.items()])[:32]
+ # type.__setattr__(ClassTestCase, 'test%s_%s_%s' % (str('%04d' % serial), str(data['CaseId']), str('%04d' % (j+1))), func)
+ # else:
+ # def func(self, index=i, update_global=None, update_locals=None):
+ # return self._test_(index=index, update_global=update_global, update_locals=update_locals)
+ #
+ # func.__doc__ = data['CaseTitle']
+ # type.__setattr__(ClassTestCase, 'test%s_%s' % (str('%04d' % serial), str(data['CaseId'])), func)
- foo.__doc__ = thisCase['CaseTitle']
- exec('ClassTestCase.test%s_%s = foo' % (str('%04d' % serial), str(thisCase['CaseId'])))
return ClassTestCase
@@ -593,5 +615,5 @@ if __name__ == '__main__':
httpconf='请求配置',
database='请求配置'
).main(), prefix='test'))
- HTMLTestRunner(verbosity=5, report=os.path.abspath(os.path.join(os.path.dirname(__file__), './1.html'))).run(
+ HTMLTestRunner(verbosity=1, report=os.path.abspath(os.path.join(os.path.dirname(__file__), './1.html'))).run(
test_suite)
diff --git a/main5.py b/main5.py
index d9e59e2..b1e6935 100644
--- a/main5.py
+++ b/main5.py
@@ -1,11 +1,16 @@
-def assertPreWith(string: str, sub: str):
- if not string.startswith(sub):
- raise AssertionError('%s does not starts with %s' % (string, sub))
+import pandas as pd
+# from Base.Class.Yaml import *
+# from pandas import read_csv
+# df = read_csv("D:/Desktop/新建 XLSX 工作表.csv", keep_default_na=False)
+# print(df.loc[2].to_dict())
+from pandas.core.frame import DataFrame
+from Base.Class.Database import *
+conn = MySQL().connect(host='sdm821105491.my3w.com', port=3306, user='sdm821105491', password='1234ABCDabcd', database='sdm821105491_db', charset='utf8')
+conn.execute('select gid,title,date,content from %s;' % 'emlog_blog')
+data = conn.fetchall()
+# print(data)
+print(conn.brief())
+
-def assertSufWith(string: str, sub: str):
- if not string.endswith(sub):
- raise AssertionError('%s does not ends with %s' % (string, sub))
-assertSufWith('admin 123456 ...', '...')
-assertPreWith('admin 123456 ...', 'admin 1')
\ No newline at end of file
diff --git a/main6.py b/main6.py
new file mode 100644
index 0000000..4b37798
--- /dev/null
+++ b/main6.py
@@ -0,0 +1 @@
+print(list(filter(lambda x: x, [value.strip() for value in ( or '').replace(',',"\n").split("\n")])))
\ No newline at end of file