import pymysql.cursors def char_width(string): iChineseChar = 0 iEnglishChar = 0 for char in string: if len(char.encode("utf-8")) > 2: iChineseChar += 1 else: iEnglishChar += 1 return (iChineseChar * 2) + iEnglishChar def char_equal_width(string, length): endadd = "" if char_width(string) > length: length = length - 2 endadd = ".." new_char = "" new_char_len = 0 for i in range(len(string)): this = string[i] this_len = char_width(this) if new_char_len + this_len > length: break else: new_char += this new_char_len += this_len a = length - new_char_len new_char += a * "\x20" new_char_len += a new_char += endadd new_char_len += char_width(endadd) return new_char def char_line(str_list, len_list): o = "" l = len(str_list) for i in range(l): this = str(str_list[i]) if i == 0: o += "|" if i >= 0: o += " " + char_equal_width(this, len_list[i]) + " " + "|" if i == (l - 1): o += "\n" return o def show_table(table_head: list, table_body: list, max_char_line=5): table_body.insert(0, table_head) o = "" width_list = [ max(m) if max(m) <= 20 else 20 for m in [ [char_width(str(value[i])) for value in table_body] for i in range(len(table_head)) ] ] horizontal = "" l = len(table_head) for i in range(l): if i == 0: horizontal += "+" if i >= 0: horizontal += "-" * (width_list[i] + 2) + "+" if i == (l - 1): horizontal += "\n" h = len(table_body) l = h if h <= (max_char_line + 1) else (max_char_line + 1) for i in range(l): this = table_body[i] if i == 0: o += horizontal if i >= 0: o += char_line(this, width_list) if i == 0: o += horizontal if i == (l - 1): o += horizontal if i == (l - 1) and h > l: o += str(h - 1) + " rows in set, the rest have been omitted." + "\n" return o class MySQL: __conn__ = None __curr__ = None __sqls__ = None __data__ = None def __init__(self, **kwargs): if kwargs: self.connect(**kwargs) def connect(self, host=None, port=0, user=None, password='', database=None, charset='', cursor='Dict'): if self.__conn__: raise Exception('连接已存在 | Connection already exists.') match cursor: case 'Index': cursorClass = pymysql.cursors.Cursor case 'Dict': cursorClass = pymysql.cursors.DictCursor case _: cursorClass = pymysql.cursors.DictCursor self.__conn__ = pymysql.connect( host=host, port=port, user=user, password=password, database=database, charset=charset, autocommit=True, connect_timeout=10, cursorclass=cursorClass, client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS ) self.__curr__ = self.__conn__.cursor() return self def execute(self, *args): if self.__curr__ is None: raise Exception('连接不存在 | Connection does not exist.') self.__sqls__ = str(*args) self.__curr__.execute(*args) return self def count(self): if self.__curr__ is None: raise Exception('连接不存在 | Connection does not exist.') return self.__curr__.rowcount def brief(self): if self.__curr__ is None: raise Exception('连接不存在 | Connection does not exist.') if not self.__sqls__: send = '' else: send = self.__sqls__ if not self.__data__: recv = '' else: data = self.__data__ recv = show_table([str(k) for k in data[0].keys()], [[v for k, v in data[i].items()] for i in range(len(data))], 5) return '[->]' + '\n' + send + '\n' + '[<-]' + '\n' + recv def fetchall(self): if self.__curr__ is None: raise Exception('连接不存在 | Connection does not exist.') res = self.__curr__.fetchall() self.__data__ = res return res def fetchone(self): if self.__curr__ is None: raise Exception('连接不存在 | Connection does not exist.') return self.__curr__.fetchone() def close(self): if self.__curr__ or self.__conn__: self.__curr__.close() self.__conn__.close() self.__curr__ = None self.__conn__ = None return True if __name__ == '__main__': # Example. connConfig = { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "root", "database": "db", "charset": "utf8", } sqlConnect = MySQL(**connConfig, cursor='Dict') sqlConnect.execute(''' CREATE TABLE `user` ( `uid` int(11) NOT NULL AUTO_INCREMENT, `username` char(20) DEFAULT NULL, `email` varchar(64) DEFAULT NULL, `phone` char(11) DEFAULT NULL, `password` char(32) DEFAULT NULL, PRIMARY KEY (`uid`), UNIQUE KEY `username` (`username`), UNIQUE KEY `email` (`email`), UNIQUE KEY `phone` (`phone`) ) ENGINE=MyISAM AUTO_INCREMENT=10000 DEFAULT CHARSET=utf8; ''') print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall()) sqlConnect.execute(''' INSERT INTO user (username,email,phone,password) VALUES ("刘德华","10000@example.com","13512310000","0123456789abcdef0123456789abcdef"); ''') print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall()) sqlConnect.execute(''' INSERT INTO user (username,email,phone,password) VALUES ("周润发","10002@example.com","13512310002","0123456789abcdef0123456789abcdef"); ''') print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall()) sqlConnect.execute(''' SELECT * FROM user WHERE uid LIKE "1%"; ''') print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall()) sqlConnect.execute(''' DROP TABLE user; ''') print('RowCounts:', sqlConnect.count(), 'SqlResult:', sqlConnect.fetchall()) sqlConnect.close()