205 lines
6.4 KiB
Python
205 lines
6.4 KiB
Python
import pymysql.cursors
|
|
|
|
|
|
def char_width(string):
|
|
iChineseChar = 0
|
|
iEnglishChar = 0
|
|
for char in string:
|
|
if len(char.encode("utf-8")) > 2:
|
|
iChineseChar += 1
|
|
else:
|
|
iEnglishChar += 1
|
|
return (iChineseChar * 2) + iEnglishChar
|
|
|
|
|
|
def char_equal_width(string, length):
|
|
endadd = ""
|
|
if char_width(string) > length:
|
|
length = length - 2
|
|
endadd = ".."
|
|
new_char = ""
|
|
new_char_len = 0
|
|
for i in range(len(string)):
|
|
this = string[i]
|
|
this_len = char_width(this)
|
|
if new_char_len + this_len > length:
|
|
break
|
|
else:
|
|
new_char += this
|
|
new_char_len += this_len
|
|
a = length - new_char_len
|
|
new_char += a * "\x20"
|
|
new_char_len += a
|
|
new_char += endadd
|
|
new_char_len += char_width(endadd)
|
|
return new_char
|
|
|
|
|
|
def char_line(str_list, len_list):
|
|
o = ""
|
|
l = len(str_list)
|
|
for i in range(l):
|
|
this = str(str_list[i])
|
|
if i == 0:
|
|
o += "|"
|
|
if i >= 0:
|
|
o += " " + char_equal_width(this, len_list[i]) + " " + "|"
|
|
if i == (l - 1):
|
|
o += "\n"
|
|
return o
|
|
|
|
|
|
def show_table(table_head: list, table_body: list, max_char_line=5):
|
|
table_body.insert(0, table_head)
|
|
o = ""
|
|
width_list = [
|
|
max(m) if max(m) <= 20 else 20
|
|
for m in [
|
|
[char_width(str(value[i])) for value in table_body]
|
|
for i in range(len(table_head))
|
|
]
|
|
]
|
|
horizontal = ""
|
|
l = len(table_head)
|
|
for i in range(l):
|
|
if i == 0:
|
|
horizontal += "+"
|
|
if i >= 0:
|
|
horizontal += "-" * (width_list[i] + 2) + "+"
|
|
if i == (l - 1):
|
|
horizontal += "\n"
|
|
h = len(table_body)
|
|
l = h if h <= (max_char_line + 1) else (max_char_line + 1)
|
|
for i in range(l):
|
|
this = table_body[i]
|
|
if i == 0:
|
|
o += horizontal
|
|
if i >= 0:
|
|
o += char_line(this, width_list)
|
|
if i == 0:
|
|
o += horizontal
|
|
if i == (l - 1):
|
|
o += horizontal
|
|
if i == (l - 1) and h > l:
|
|
o += str(h - 1) + " rows in set, the rest have been omitted." + "\n"
|
|
return o
|
|
|
|
|
|
class MySQL:
|
|
__conn__ = None
|
|
__curr__ = None
|
|
__sqls__ = None
|
|
__data__ = None
|
|
|
|
def __init__(self, **kwargs):
|
|
if kwargs:
|
|
self.connect(**kwargs)
|
|
|
|
def connect(self, host=None, port=0, user=None, password='', database=None, charset='', cursor='Dict'):
|
|
if self.__conn__:
|
|
raise Exception('连接已存在 | Connection already exists.')
|
|
match cursor:
|
|
case 'Index':
|
|
cursorClass = pymysql.cursors.Cursor
|
|
case 'Dict':
|
|
cursorClass = pymysql.cursors.DictCursor
|
|
case _:
|
|
cursorClass = pymysql.cursors.DictCursor
|
|
self.__conn__ = pymysql.connect(
|
|
host=host, port=port,
|
|
user=user, password=password,
|
|
database=database, charset=charset,
|
|
autocommit=True,
|
|
connect_timeout=10,
|
|
cursorclass=cursorClass,
|
|
client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS
|
|
)
|
|
self.__curr__ = self.__conn__.cursor()
|
|
return self
|
|
|
|
def execute(self, *args):
|
|
if self.__curr__ is None:
|
|
raise Exception('连接不存在 | Connection does not exist.')
|
|
self.__sqls__ = str(*args)
|
|
self.__curr__.execute(*args)
|
|
return self
|
|
|
|
def count(self):
|
|
if self.__curr__ is None:
|
|
raise Exception('连接不存在 | Connection does not exist.')
|
|
return self.__curr__.rowcount
|
|
|
|
def brief(self):
|
|
if self.__curr__ is None:
|
|
raise Exception('连接不存在 | Connection does not exist.')
|
|
if not self.__sqls__:
|
|
send = ''
|
|
else:
|
|
send = self.__sqls__
|
|
if not self.__data__:
|
|
recv = ''
|
|
else:
|
|
data = self.__data__
|
|
recv = show_table([str(k) for k in data[0].keys()], [[v for k, v in data[i].items()] for i in range(len(data))], 5)
|
|
return '[->]' + '\n' + send + '\n' + '[<-]' + '\n' + recv
|
|
|
|
def fetchall(self):
|
|
if self.__curr__ is None:
|
|
raise Exception('连接不存在 | Connection does not exist.')
|
|
res = self.__curr__.fetchall()
|
|
self.__data__ = res
|
|
return res
|
|
|
|
def fetchone(self):
|
|
if self.__curr__ is None:
|
|
raise Exception('连接不存在 | Connection does not exist.')
|
|
return self.__curr__.fetchone()
|
|
|
|
def close(self):
|
|
if self.__curr__ or self.__conn__:
|
|
self.__curr__.close()
|
|
self.__conn__.close()
|
|
self.__curr__ = None
|
|
self.__conn__ = None
|
|
return True
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Example.
|
|
connConfig = {
|
|
"host": "127.0.0.1", "port": 3306, "user": "root", "password": "root", "database": "db", "charset": "utf8",
|
|
}
|
|
sqlConnect = MySQL(**connConfig, cursor='Dict')
|
|
sqlConnect.execute('''
|
|
CREATE TABLE `user` (
|
|
`uid` int(11) NOT NULL AUTO_INCREMENT,
|
|
`username` char(20) DEFAULT NULL,
|
|
`email` varchar(64) DEFAULT NULL,
|
|
`phone` char(11) DEFAULT NULL,
|
|
`password` char(32) DEFAULT NULL,
|
|
PRIMARY KEY (`uid`),
|
|
UNIQUE KEY `username` (`username`),
|
|
UNIQUE KEY `email` (`email`),
|
|
UNIQUE KEY `phone` (`phone`)
|
|
)
|
|
ENGINE=MyISAM AUTO_INCREMENT=10000 DEFAULT CHARSET=utf8;
|
|
''')
|
|
print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall())
|
|
sqlConnect.execute('''
|
|
INSERT INTO user (username,email,phone,password) VALUES ("刘德华","10000@example.com","13512310000","0123456789abcdef0123456789abcdef");
|
|
''')
|
|
print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall())
|
|
sqlConnect.execute('''
|
|
INSERT INTO user (username,email,phone,password) VALUES ("周润发","10002@example.com","13512310002","0123456789abcdef0123456789abcdef");
|
|
''')
|
|
print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall())
|
|
sqlConnect.execute('''
|
|
SELECT * FROM user WHERE uid LIKE "1%";
|
|
''')
|
|
print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall())
|
|
sqlConnect.execute('''
|
|
DROP TABLE user;
|
|
''')
|
|
print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall())
|
|
sqlConnect.close()
|