import os import re import sys import ctypes import logging import time import hashlib import tempfile import warnings import subprocess import win32com.client from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QDialog, QLabel, QComboBox, QCheckBox, QLineEdit, QAction, QMenu, QMessageBox, QPushButton, QTableWidget, QVBoxLayout, QHBoxLayout, QTableWidgetItem from PyQt5.QtCore import Qt, QCoreApplication, QTimer from PyQt5.QtGui import QFont class CustomWarningFormatter: def __call__(self, message, category, filename, lineno, line=None): return 'Warning: %s\n' % (message,) warnings.formatwarning = CustomWarningFormatter() def exception_hook(types, value, traceback): sys.stderr.write('%s: %s\n' % (types.__name__, value)) sys.excepthook = exception_hook class Err(dict): def __getitem__(self, item: int): return super().get(int('0X%s' % ('{0:0>8X}'.format(item)[-4:],), 16), 'Unknown.') class Svc(dict): def __getitem__(self, item: int): return super().get(int('0X%s' % ('{0:0>8X}'.format(item)[0:4],), 16), 'Unknown.') class Dev(dict): def __getitem__(self, item: int): return super().get(int('0X%s' % ('{0:0>8X}'.format(item)[-4:],), 16), 'Unknown.') class BtSdkRemoteServiceAttrStru(ctypes.Structure): _fields_ = [ ('mask', ctypes.c_uint32), ('service_class', ctypes.c_uint16), ('dev_hdl', ctypes.c_uint32), ('svc_name', ctypes.c_char * 256), ('ext_attributes', ctypes.c_void_p), ('status', ctypes.c_uint16) ] class BtSdkRmtSPPSvcExtAttrStru(ctypes.Structure): _fields_ = [ ('size', ctypes.c_uint32), ('server_channel', ctypes.c_uint8) ] class BtSdk: def __init__(self): self.errs = Err({ 0X00000000: 'The operation completed successfully.', 0X000000C0: 'Local service is still active. When the application tries to remove or activate an active service, this error code is returned.', 0X000000C1: 'No service record with the specified search pattern is found on the remote device.', 0X000000C2: 'The specified service record does not exist on the remote device.', 0X00000301: 'The object specified by the handle does not exist in local BlueSoleil SDK database.', 0X00000302: 'The operation fails for an undefined reason.', 0X00000303: 'BlueSoleil SDK has not been initialized.', 0X00000304: 'The parameter value is invalid.', 0X00000305: 'The pointer value is NULL.', 0X00000306: 'Not enough storage is available to process this function.', 0X00000307: 'The specified buffer size is too small to hold the required information.', 0X00000308: 'The specified function is not supported by the BlueSoleil.', 0X00000309: 'No fixed PIN code is available.', 0X0000030A: 'The specified service has been connected already.', 0X0000030B: 'The request can not be processed since a same request is being processed.', 0X0000030C: 'The limit of connection number is reached.', 0X0000030D: 'An object with the specified attribute exists.', 0X0000030E: 'The specified object is accessed by other process. It can not be removed or modified.', 0X0000030F: 'The specified remote device is not paired.', 0X00000401: 'HCI error “Unknown HCI Command (0X000001)” is received.', 0X00000402: 'HCI error “Unknown Connection Identifier (0X000002)” is received.', 0X00000403: 'HCI error “Hardware Failure (0X000003)” is received.', 0X00000404: 'HCI error “Page Timeout (0X000004)” is received.', 0X00000405: 'HCI error “Authentication Failure (0X000005)” is received.', 0X00000406: 'HCI error “PIN or Key Missing (0X000006)” is received.', 0X00000407: 'HCI error “Memory Capacity Exceeded (0X000007)” is received.', 0X00000408: 'HCI error “Connection Timeout (0X000008)” is received.', 0X00000409: 'HCI error “Connection Limit Exceeded (0X000009)” is received.', 0X0000040A: 'HCI error “Synchronous Connection Limit to a Device Exceeded (0X00000A)” is received.', 0X0000040B: 'HCI error “ACL Connection Already Exists (0X00000B)” is received.', 0X0000040C: 'HCI error “Command Disallowed (0X00000C)” is received.', 0X0000040D: 'HCI error “Connection Rejected due to Limited Resources (0X00000D)” is received.', 0X0000040E: 'HCI error “Connection Rejected due to Security Reasons (0X00000E)” is received.', 0X0000040F: 'HCI error “Connection Rejected due to Unacceptable BD_ADDR (0X00000F)” is received.', 0X00000410: 'HCI error “Connection Accept Timeout Exceeded (0X10)” is received.', 0X00000411: 'HCI error “Unsupported Feature or Parameter Value (0X11)” is received.', 0X00000412: 'HCI error “Invalid HCI Command parameters (0X12)” is received.', 0X00000413: 'HCI error “Remote User Terminated Connection (0X13)” is received.', 0X00000414: 'HCI error “Remote Device Terminated Connection due to Low Resources (0X14)” is received.', 0X00000415: 'HCI error “Remote Device Terminated Connection due to Power Off (0X15)” is received.', 0X00000416: 'HCI error “Connection Terminated by Local Host (0X16)” is received.', 0X00000417: 'HCI error “Repeated Attempts (0X17)” is received.', 0X00000418: 'HCI error “Pairing Not Allowed (0X18)” is received.', 0X00000419: 'HCI error “Unknown LMP PDU (0X19)” is received.', 0X0000041A: 'HCI error “Unsupported Remote Feature / Unsupported LMP Feature (0X1A)” is received.', 0X0000041B: 'HCI error “SCO Offset Rejected (0X1B)” is received.', 0X0000041C: 'HCI error “SCO Interval Rejected (0X1C)” is received.', 0X0000041D: 'HCI error “SCO Air Mode Rejected (0X1D)” is received.', 0X0000041E: 'HCI error “Invalid LMP Parameters (0X1E)” is received.', 0X0000041F: 'HCI error “Unspecified Error (0X1F)” is received.', 0X00000420: 'HCI error “Unsupported LMP Parameter Value (0X20)” is received.', 0X00000421: 'HCI error “Role Change Not Allowed (0X21)” is received.', 0X00000422: 'HCI error “LMP Response Timeout (0X22)” is received.', 0X00000423: 'HCI error “LMP Error Transaction Collision (0X23)” is received.', 0X00000424: 'HCI error “LMP PDU Not Allowed (0X24)” is received.', 0X00000425: 'HCI error “Encryption Mode Not Acceptable (0X25)” is received.', 0X00000426: 'HCI error “Link Key Can not be Changed (0X26)” is received.', 0X00000427: 'HCI error “Requested QOS Not Supported (0X27)” is received.', 0X00000428: 'HCI error “Instant Passed (0X28)” is received.', 0X00000429: 'HCI error “Pairing with Unit Key Not Supported (0X29)” is received.', 0X0000042A: 'HCI error “Different Transaction Collision (0X2A)” is received.', 0X0000042C: 'HCI error “QOS Unacceptable Parameter (0X2C)” is received.', 0X0000042D: 'HCI error “QOS Rejected (0X2D)” is received.', 0X0000042E: 'HCI error “Channel Classification Not Supported (0X2E)” is received.', 0X0000042F: 'HCI error “Insufficient Security (0X2F)” is received.', 0X00000430: 'HCI error “Parameter Out of Mandatory Range (0X30)” is received.', 0X00000432: 'HCI error “Role Switch Pending (0X32)” is received.', 0X00000434: 'HCI error “Reserved Slot Violation (0X34)” is received.', 0X00000435: 'HCI error “Role Switch Failed (0X35)” is received.' }) self.svcs = Svc({ 0X00001101: 'Serial Port service.', 0X00001102: 'LAN Access service.', 0X00001103: 'Dial-up Networking service.', 0X00001104: 'Synchronization service.', 0X00001105: 'Object Push service.', 0X00001106: 'File Transfer service.', 0X00001107: 'IrMC Sync Command service.', 0X00001108: 'Headset service.', 0X00001109: 'Cordless Telephony service.', 0X0000110A: 'Audio Source service.', 0X0000110B: 'Audio Sink service.', 0X0000110C: 'A/V Remote Control Target service.', 0X0000110D: 'Advanced Audio Distribution service.', 0X0000110E: 'A/V Remote Control service.', 0X0000110F: 'Video conference service.', 0X00001110: 'Intercom service.', 0X00001111: 'Fax service.', 0X00001112: 'Headset Audio Gateway service.', 0X00001113: 'WAP service.', 0X00001114: 'WAP client service.', 0X00001115: 'PANU service.', 0X00001116: 'NAP service.', 0X00001117: 'GN service.', 0X00001118: 'Direct Print service.', 0X00001119: 'Referenced Print service.', 0X0000111A: 'Imaging service.', 0X0000111B: 'Imaging Responder service.', 0X0000111C: 'Imaging Automatic Archive service.', 0X0000111D: 'Imaging Referenced Objects service.', 0X0000111E: 'Hands-free service.', 0X0000111F: 'Hands-free Audio Gateway service.', 0X00001120: 'DPS Referenced Objects service.', 0X00001121: 'Reflected UI service', 0X00001122: 'Basic Print service.', 0X00001123: 'Print Status service.', 0X00001124: 'Human Interface Device service.', 0X00001125: 'Hardcopy Cable Replacement service.', 0X00001126: 'HCRP Print service.', 0X00001127: 'HCRP Scan service.', 0X0000112D: 'SIM Card Access service', 0X0000112E: 'PBAP Phonebook Client Equipment service.', 0X0000112F: 'PBAP Phonebook Server Equipment service.', 0X00001130: 'Phonebook Access service.', 0X00001200: 'Bluetooth Device Identification.' }) self.devs = Dev({ 0X00000100: 'Computer major device class', 0X00000104: 'Desktop workstation', 0X00000108: 'Server-class computer', 0X0000010C: 'Laptop computer', 0X00000110: 'Handheld PC/PDA (clam shell)', 0X00000114: 'Palm sized PC/PDA', 0X00000118: 'Wearable computer (Watch sized)', 0X00000200: 'Phone major device class', 0X00000204: 'Cellular phone', 0X00000208: 'Cordless phone', 0X0000020C: 'Smart phone', 0X00000210: 'Wired modem or voice gateway', 0X00000214: 'Common ISDN Access', 0X00000218: 'SIM card reader', 0X00000300: 'Fully available', 0X00000320: '1 - 17% utilized', 0X00000340: '17- 33% utilized', 0X00000360: '33 - 50% utilized', 0X00000380: '50 - 67% utilized', 0X000003A0: '67 - 83% utilized', 0X000003C0: '83 – 99% utilized', 0X000003E0: 'No service available', 0X00000400: 'Audio/Video major device class', 0X00000404: 'Wearable headset device', 0X00000408: 'Hands-free device', 0X00000410: 'Microphone', 0X00000414: 'Loudspeaker', 0X00000418: 'Headphones', 0X0000041C: 'Portable Audio', 0X00000420: 'Car Audio', 0X00000424: 'Set-top box', 0X00000428: 'HiFi Audio device', 0X0000042C: 'Videocassette recorder', 0X00000430: 'Video camera', 0X00000434: 'Camcorder', 0X00000438: 'Video monitor', 0X0000043C: 'Video display and loudspeaker', 0X00000440: 'Video conferencing', 0X00000448: 'Gaming/Toy', 0X00000500: 'Peripheral major device class', 0X00000540: 'Keyboard', 0X00000580: 'Pointing device', 0X000005C0: 'Combo keyboard/pointing device', 0X00000600: 'Imaging major device class', 0X00000610: 'Display', 0X00000620: 'Camera', 0X00000640: 'Scanner', 0X00000680: 'Printer', 0X00000700: 'Wearable major device class', 0X00000704: 'Wristwatch', 0X00000708: 'Pager', 0X0000070C: 'Jacket', 0X00000710: 'Helmet', 0X00000714: 'Glasses' }) self.dll = ctypes.CDLL('BsSDK.dll') self.init() def __exit__(self, exc_type, exc_val, exc_tb): try: self.dll.Btsdk_Done() except Exception: pass def init(self): if (self.dll.Btsdk_Init() != 0 or not self.dll.Btsdk_IsSDKInitialized() or not self.dll.Btsdk_IsServerConnected()) == 1: raise Exception('SDK has not been initialized or server is not connected.') def done(self): self.dll.Btsdk_Done() def isBluetoothReady(self): return self.dll.Btsdk_IsBluetoothReady() def enableDeviceDiscovery(self): device_class, max_dev_num, max_durations = 0X000400, 30, 7 code = self.dll.Btsdk_StartDeviceDiscovery(device_class, max_dev_num, max_durations) code != 0 and warnings.warn(self.errs[code]) return not code def cancelDeviceDiscovery(self): code = self.dll.Btsdk_StopDeviceDiscovery() code != 0 and warnings.warn(self.errs[code]) return not code def getDeviceHandleByAddress(self, addr): bd_addr = reversed(bytes.fromhex(addr.replace(':', ''))) bd_addr_buffer = (ctypes.c_uint8 * 6)(*bd_addr) handle = self.dll.Btsdk_GetRemoteDeviceHandle(bd_addr_buffer) return handle def getDeviceName(self, dev_handle): name_buffer = (ctypes.c_uint8 * 256)() if (0 == self.dll.Btsdk_GetRemoteDeviceName(dev_handle, name_buffer, None)) == 1: name_buffer_bytes = bytes(name_buffer) data = name_buffer_bytes[:name_buffer_bytes.find(0)].decode('utf-8', 'ignore') return data else: return '' def getDeviceAddr(self, dev_handle): addr_buffer = (ctypes.c_uint8 * 6)() if (0 == self.dll.Btsdk_GetRemoteDeviceAddress(dev_handle, addr_buffer)) == 1: addr_buffer_bytes = bytes(addr_buffer) data = ':'.join(['%02X' % byte for byte in reversed(addr_buffer_bytes)]) return data else: return '' def getDeviceCate(self, dev_handle): dev_class = ctypes.c_int32() if (0 == self.dll.Btsdk_GetRemoteDeviceClass(dev_handle, ctypes.byref(dev_class))) == 1: return self.devs[dev_class.value] else: return 0 def getDeviceRSSI(self, dev_handle): rssi = ctypes.c_int8() if (0 == self.dll.Btsdk_GetRemoteRSSI(dev_handle, ctypes.byref(rssi))) == 1: return rssi.value else: return 0 def getServiceAttributes(self, svc_handle): attributes = BtSdkRemoteServiceAttrStru() attributes.mask = 0x01 attributes.ext_attributes = 0 code = self.dll.Btsdk_RefreshRemoteServiceAttributes(svc_handle, ctypes.byref(attributes)) if (code == 0) == 1: return attributes else: warnings.warn(self.errs[code]) return None def getDevices(self): max_dev_num = 30 dev_handles_buffer = (ctypes.c_int * max_dev_num)() num_devices = self.dll.Btsdk_GetInquiredDevices(dev_handles_buffer, max_dev_num) if (num_devices > 0) == 1: devs = [] dev_handles = [dev_handles_buffer[i] for i in range(num_devices)] for dev_handle in dev_handles: devs.append({ 'dev_handle': dev_handle, 'name': self.getDeviceName(dev_handle), 'addr': self.getDeviceAddr(dev_handle), 'cate': self.getDeviceCate(dev_handle), 'rssi': self.getDeviceRSSI(dev_handle) }) devs.sort(key=lambda x: x['rssi'], reverse=True) return devs else: return [] def connectAudioService(self, device_handle): connection_handle = ctypes.c_uint32() code = self.dll.Btsdk_ConnectEx(device_handle, 0X110B, 0, ctypes.byref(connection_handle)) if (code == 0) == 1: return connection_handle.value else: return None def removeAllDevices(self): self.dll.Btsdk_StopDeviceDiscovery() enum_handle = self.dll.Btsdk_StartEnumConnection() while True: conn = self.dll.Btsdk_EnumConnection(enum_handle, 0) if (not conn) == 1: self.dll.Btsdk_EndEnumConnection(enum_handle) break self.dll.Btsdk_Disconnect(conn) max_dev_num = 30 dev_handles_buffer = (ctypes.c_int * max_dev_num)() num_devices = self.dll.Btsdk_GetPairedDevices(dev_handles_buffer, max_dev_num) if (num_devices > 0) == 1: dev_handles = [dev_handles_buffer[i] for i in range(num_devices)] for dev_handle in dev_handles: self.dll.Btsdk_UnPairDevice(dev_handle) self.dll.Btsdk_DeleteRemoteDeviceByHandle(dev_handle) dev_handles_buffer = (ctypes.c_int * max_dev_num)() num_devices = self.dll.Btsdk_GetInquiredDevices(dev_handles_buffer, max_dev_num) if (num_devices > 0) == 1: dev_handles = [dev_handles_buffer[i] for i in range(num_devices)] for dev_handle in dev_handles: self.dll.Btsdk_DeleteRemoteDeviceByHandle(dev_handle) dev_handles_buffer = (ctypes.c_int * max_dev_num)() num_devices = self.dll.Btsdk_GetStoredDevicesByClass(0, dev_handles_buffer, max_dev_num) if (num_devices > 0) == 1: dev_handles = [dev_handles_buffer[i] for i in range(num_devices)] for dev_handle in dev_handles: self.dll.Btsdk_DeleteRemoteDeviceByHandle(dev_handle) self.dll.Btsdk_DeleteUnpairedDevicesByClass(0) return True def _fd(f): return f.fileno() if hasattr(f, 'fileno') else f if os.name == 'nt': import msvcrt from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref) from ctypes.wintypes import BOOL, DWORD, HANDLE LOCK_SH = 0x0 LOCK_NB = 0x1 LOCK_EX = 0x2 LOCK_UN = 0x9 if sizeof(c_ulong) != sizeof(c_void_p): ULONG_PTR = c_int64 else: ULONG_PTR = c_ulong PVOID = c_void_p class _OFFSET(Structure): _fields_ = [ ('Offset', DWORD), ('OffsetHigh', DWORD) ] class _OFFSET_UNION(Union): _fields_ = [ ('_offset', _OFFSET), ('Pointer', PVOID) ] _anonymous_ = ['_offset'] class OVERLAPPED(Structure): _fields_ = [ ('Internal', ULONG_PTR), ('InternalHigh', ULONG_PTR), ('_offset_union', _OFFSET_UNION), ('hEvent', HANDLE) ] _anonymous_ = ['_offset_union'] LPOVERLAPPED = POINTER(OVERLAPPED) LockFileEx = windll.kernel32.LockFileEx LockFileEx.restype = BOOL LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED] UnlockFileEx = windll.kernel32.UnlockFileEx UnlockFileEx.restype = BOOL UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED] def flock(f, flags): hfile = msvcrt.get_osfhandle(_fd(f)) overlapped = OVERLAPPED() if flags == LOCK_UN: ret = UnlockFileEx( hfile, 0, 0, 0xFFFF0000, byref(overlapped) ) else: ret = LockFileEx( hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped) ) return bool(ret) else: try: import fcntl LOCK_SH = fcntl.LOCK_SH LOCK_NB = fcntl.LOCK_NB LOCK_EX = fcntl.LOCK_EX LOCK_UN = fcntl.LOCK_UN except (ImportError, AttributeError): LOCK_EX = LOCK_SH = LOCK_NB = 0 def flock(f, flags): return flags == LOCK_UN else: def flock(f, flags): return fcntl.flock(_fd(f), flags) == 0 class LoggerFileHandler: def __init__(self, log_file: str, mode: str = 'a', level: str = None, fmt: str = None): self.log, self.mod = log_file, mode self.lev, self.fmt = level, fmt self.sty = '%' class LoggerConsHandler: def __init__(self, level: str = None, fmt: str = None): self.lev, self.fmt = level, fmt self.sty = '%' class Logger: logger = None levels = { 'CRITICAL': logging.CRITICAL, 'FATAL': logging.FATAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'WARN': logging.WARN, 'INFO': logging.INFO, 'DEBUG': logging.DEBUG, 'NOTSET': logging.NOTSET, 'D': logging.DEBUG, 'I': logging.INFO, 'W': logging.WARNING, 'E': logging.ERROR, 'F': logging.FATAL } default_format = '{asctime} - {name} - {levelname[0]}: {message}' default_format_style = '{' handler_list = [] def __init__( self, name: str = 'default', default_level='DEBUG', fh: LoggerFileHandler = None, ch: LoggerConsHandler = None, add_default_handler=False ): ch = LoggerConsHandler() if add_default_handler and not ch else ch if fh and not isinstance(fh, LoggerFileHandler): raise TypeError('The parameter fh must be type.') if ch and not isinstance(ch, LoggerConsHandler): raise TypeError('The parameter ch must be type.') self.logger = logging.getLogger(name) self.logger.setLevel(self.levels[default_level]) if fh: fhandler = logging.FileHandler(filename=fh.log, mode=fh.mod, encoding='utf-8') self.handler_list.append(fhandler) fhandler.setLevel(self.levels[fh.lev or default_level]) fh.fmt = fh.fmt or self.default_format fh.sty = '{' if '%' not in fh.fmt else '%' fhandler.setFormatter(logging.Formatter(fmt=fh.fmt, style=fh.sty)) self.logger.addHandler(fhandler) if ch: chandler = logging.StreamHandler() self.handler_list.append(chandler) chandler.setLevel(self.levels[ch.lev or default_level]) ch.fmt = ch.fmt or self.default_format ch.sty = '{' if '%' not in ch.fmt else '%' chandler.setFormatter(logging.Formatter(fmt=ch.fmt, style=ch.sty)) self.logger.addHandler(chandler) self.d = self.logger.debug self.i = self.logger.info self.w = self.logger.warning self.e = self.logger.error self.f = self.logger.fatal self.c = self.logger.critical class CustomLineEdit(QLineEdit): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #000000;') def contextMenuEvent(self, event): menu = QMenu(self) action_c = menu.addAction('复制') action_p = menu.addAction('粘贴') action_c.triggered.connect(self.copy) action_p.triggered.connect(self.paste) menu.popup(self.mapToGlobal(event.pos())) class CustomLineEditNoPopup(QLineEdit): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold;') def contextMenuEvent(self, event): pass class CustomPushButton(QPushButton): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #0C0C0C;') class CustomLabel(QLabel): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #0C0C0C; border: 1px solid #0C0C0C;') class CustomComboBox(QComboBox): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #0C0C0C;') class CustomCheckBox(QCheckBox): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #0C0C0C;') class CustomHBoxLayout(QHBoxLayout): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setAlignment(Qt.AlignTop | Qt.AlignLeft) class CustomVBoxLayout(QVBoxLayout): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setAlignment(Qt.AlignTop | Qt.AlignLeft) class MainWindow(QMainWindow): def __init__(self, logger: Logger): super().__init__() self.app_name = '蓝牙音频连接' self.app_version = ('1.0.3', '20240815', 'zhaoyafan', 'zhaoyafan@foxmail.com', 'https://www.fanscloud.net/') self.app_text = {'search': '正在搜索', 'connecting': '连接中...'} self.logger = logger self.bluesoleil = None self.currentDevHandle = None self.setWindowTitle(self.app_name) self.setGeometry(0, 0, 600, 400) self.statusBar() self.menubar = self.menuBar() moremenu = self.menubar.addMenu('更多') self.restartServAction = QAction('重启蓝牙服务', self) self.createShortAction = QAction('发送快捷方式到桌面', self) self.aboutWindowAction = QAction('关于', self) self.restartServAction.triggered.connect(self.restartServActionFunction) self.createShortAction.triggered.connect(self.createShortActionFunction) self.aboutWindowAction.triggered.connect(self.aboutWindowActionFunction) moremenu.addActions( [ self.restartServAction, self.createShortAction, self.aboutWindowAction ] ) # 定义布局 m_layout_0 = CustomVBoxLayout() m_layout_1 = CustomHBoxLayout() m_layout_2 = CustomHBoxLayout() m_layout_3 = CustomHBoxLayout() m_layout_4 = CustomHBoxLayout() m_layout_0.addLayout(m_layout_1) m_layout_0.addLayout(m_layout_2) m_layout_0.addLayout(m_layout_3) m_layout_0.addLayout(m_layout_4) # 蓝牙名称 self.edtBtName = CustomLineEditNoPopup('') self.edtBtName.setReadOnly(True) self.edtBtName.setFixedSize(250, 36) m_layout_1.addWidget(self.edtBtName) # 断开连接 self.butDisconnect = CustomPushButton('断开连接') self.butDisconnect.setFixedSize(64, 36) self.butDisconnect.clicked.connect(self.on_disconnect) m_layout_1.addWidget(self.butDisconnect) # 自动搜索 self.butAutoSearch = CustomPushButton('自动搜索') self.butAutoSearch.setFixedSize(122, 36) self.butAutoSearch.clicked.connect(self.on_autosearch) m_layout_2.addWidget(self.butAutoSearch) # 手动搜索 self.butManuSearch = CustomPushButton('手动搜索') self.butManuSearch.setFixedSize(122, 36) self.butManuSearch.clicked.connect(self.on_manusearch) m_layout_2.addWidget(self.butManuSearch) # 地址输入 self.edtBtAddr = CustomLineEdit('') self.edtBtAddr.setStyleSheet('QLineEdit {font-size: 24px; font-family: \'Microsoft YaHei\'; color: #000000; background-color: #FFFFEE;}') self.edtBtAddr.setFixedSize(249, 36) self.edtBtAddr.returnPressed.connect(self.on_connect_bt_address) m_layout_3.addWidget(self.edtBtAddr) # 复制地址 self.butCopyAddressToClipboard = CustomPushButton('复制地址') self.butCopyAddressToClipboard.setFixedSize(64, 36) self.butCopyAddressToClipboard.clicked.connect(self.on_copy_address_to_clipboard) m_layout_3.addWidget(self.butCopyAddressToClipboard) # 显示界面 central_widget = QWidget() central_widget.setLayout(m_layout_0) self.setCentralWidget(central_widget) self.setFixedSize(self.minimumSizeHint()) screen_rect = QApplication.desktop().availableGeometry() window_rect = self.geometry() self.move(int((screen_rect.width() - window_rect.width()) * 0.5), int((screen_rect.height() - window_rect.height()) * 0.5)) self.show() self.init_bluesoleil() def init_bluesoleil(self): try: self.bluesoleil = BtSdk() except FileNotFoundError: QMessageBox.critical(self, '错误', '“千月(bluesoleil)”未安装,请安装后再试。') self.close() sys.exit(1) except Exception: QMessageBox.critical(self, '警告', '蓝牙未启用或未连接蓝牙适配器') self.close() sys.exit(1) def restartServActionFunction(self): if (os.path.exists('C:\\Program Files (x86)\\IVT Corporation\\BlueSoleil\\BtTray.exe')) == 1: subprocess.run(os.path.join(os.path.dirname(__file__), 'RestartBlueSoleil.bat'), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) QMessageBox.information(self, '提示', '%s' % ('服务已重新启动',)) self.init_bluesoleil() def createShortActionFunction(self): target = '%s.exe' % (os.path.splitext(__file__)[0],) shortcut_path = self.path_expandvars('%%USERPROFILE%%\\Desktop\\%s.lnk' % (self.app_name,)) if (os.path.exists(target)) == 1: self.create_shortcut(target, shortcut_path, 1) QMessageBox.information(self, '提示', '%s' % ('已成功发送快捷方式到桌面',)) def aboutWindowActionFunction(self): _c = self.app_version if (not _c) == 0: QMessageBox.information(self, '关于', "" 'version: %s, build: %s, author: %s, email: %s, site: %s' % (_c[0], _c[1], _c[2], _c[3], '%s' % (_c[4], _c[4]))) @staticmethod def path_expandvars(path): resolve = os.path.expandvars(path) if (re.search('%[A-Za-z0-9_]+%', resolve)) is not None: raise Exception('Variable analysis failed') return resolve @staticmethod def create_shortcut(target, shortcut_path, run_as_admin): shell = win32com.client.Dispatch('WScript.Shell') try: os.remove(shortcut_path) except FileNotFoundError: pass shortcut = shell.CreateShortCut(shortcut_path) shortcut.TargetPath = target if shortcut_path.endswith('.lnk'): shortcut.Arguments = '' shortcut.WorkingDirectory = os.path.dirname(target) if target.startswith('\\\\'): shortcut.WorkingDirectory = '' shortcut.save() if shortcut_path.endswith('.lnk') and run_as_admin: with open(shortcut_path, 'r+b') as f: if os.path.isfile(shortcut_path): f.seek(21, 0) f.write(b'\x22\x00') def closeEvent(self, event): try: self.bluesoleil.done() self.bluesoleil = None except: pass event.accept() def open_manu_search_dialog(self): self.setEnabled(False) ManuWindow(self).exec() def exit_manu_search_dialog(self): self.setEnabled(True) def in_connect_process(self): self.edtBtName.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #7BD136') self.edtBtName.setText('') self.edtBtAddr.setText('') self.edtBtAddr.setEnabled(False) self.edtBtAddr.setFocus() self.butAutoSearch.setEnabled(False) self.butManuSearch.setEnabled(False) def un_connect_process(self): self.edtBtName.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #FDD391') self.edtBtName.setText('') self.edtBtAddr.setText('') self.edtBtAddr.setEnabled(True) self.edtBtAddr.setFocus() self.butAutoSearch.setEnabled(True) self.butManuSearch.setEnabled(True) def on_disconnect(self): self.bluesoleil.cancelDeviceDiscovery() self.bluesoleil.removeAllDevices() self.un_connect_process() def on_autosearch(self): self.on_disconnect() self.setWindowTitle(self.app_text['search']) self.bluesoleil.enableDeviceDiscovery() devices = None for i in range(12): time.sleep(0.33) devices = self.bluesoleil.getDevices() devices = [dev for dev in devices if dev['rssi'] >= -65] if len(devices) != 0: break self.setWindowTitle(self.app_name) if (not devices) == 1: QMessageBox.information(self, '提示', '未找到任何可用设备') return None self.connect_action_ui(devices[0]['dev_handle']) def on_manusearch(self): self.open_manu_search_dialog() def on_copy_address_to_clipboard(self): text = self.edtBtAddr.text() text and QApplication.clipboard().setText(text) def on_connect_bt_address(self): edit = self.edtBtAddr text = edit.text().strip().replace(':', '').replace('-', '').upper() if (text == '') == 1: edit.selectAll() return None if (not re.compile('^([0-9A-Fa-f]{2}){6}$').match(text)) == 1: QMessageBox.critical(self, '错误', '非法地址:\n\n%s' % (text,)) edit.selectAll() return None return self.connect_action_ui(text) def connect_action_ui(self, handle): if (handle != 0) == 1: self.in_connect_process() self.setWindowTitle(self.app_text['connecting']) if (not self.connect(handle)) == 1: self.un_connect_process() self.edtBtAddr.selectAll() self.setWindowTitle(self.app_name) QMessageBox.critical(self, '提示', '连接失败') return None else: self.edtBtAddr.setText(str(self.bluesoleil.getDeviceAddr(self.currentDevHandle)).replace(':', '')) self.edtBtName.setText(str(self.bluesoleil.getDeviceName(self.currentDevHandle))) self.setWindowTitle(self.app_name) return True def connect(self, data): devHandle = data if (isinstance(data, str)) == 1: self.bluesoleil.enableDeviceDiscovery() for i in range(9): time.sleep(0.33) devHandle = self.bluesoleil.getDeviceHandleByAddress(data) if (not devHandle) == 0: break if (not devHandle) == 1: return None code = self.bluesoleil.connectAudioService(devHandle) if (not code) == 1: return None self.currentDevHandle = devHandle return True class ManuWindow(QDialog): def __init__(self, mainwindow): super().__init__() self.mainwindow = mainwindow self.devices = [] self.timer = QTimer(self) self.timer.timeout.connect(self.task_device_list_update) self.timer.start(1000) self.setGeometry(0, 0, 510, 295) self.setWindowTitle('搜索蓝牙音频设备') self.button = QPushButton('刷新列表', self) self.button.setFixedSize(508, 32) self.button.clicked.connect(self.on_refresh_list) self.tableWidget = QTableWidget(self) self.tableWidget.setFixedSize(508, 240) self.tableWidget.setRowCount(0) self.tableWidget.setColumnCount(5) self.tableWidget.setHorizontalHeaderLabels(['设备名称', '地址', '设备类型', '信号强度', '']) font = QFont('SimSun', 9) self.tableWidget.setFont(font) layout = QVBoxLayout() layout.addWidget(self.button) layout.addWidget(self.tableWidget) self.setLayout(layout) self.tableWidget.verticalHeader().hide() self.tableWidget.setColumnWidth(0, 150) self.tableWidget.setColumnWidth(1, 120) self.tableWidget.setColumnWidth(2, 100) self.tableWidget.setColumnWidth(3, 60) self.tableWidget.setColumnWidth(4, 50) # 显示界面 screen_rect = QApplication.desktop().availableGeometry() window_rect = self.geometry() self.setFixedSize(self.minimumSizeHint()) self.move(int((screen_rect.width() - window_rect.width()) * 0.5), int((screen_rect.height() - window_rect.height()) * 0.5)) self.show() self.on_refresh_list() def task_device_list_update(self): try: devices = self.mainwindow.bluesoleil.getDevices() except AttributeError: self.close() return None if (self.devices == devices) == 1: return None self.devices = devices self.tableWidget.setRowCount(0) for row, device in enumerate(devices): self.tableWidget.insertRow(row) self.tableWidget.setItem(row, 0, QTableWidgetItem('%s' % device['name'])) self.tableWidget.setItem(row, 1, QTableWidgetItem('%s' % device['addr'])) self.tableWidget.setItem(row, 2, QTableWidgetItem('%s' % device['cate'])) self.tableWidget.setItem(row, 3, QTableWidgetItem('%s' % device['rssi'])) connectButton = QPushButton('连接', self) connectButton.clicked.connect(lambda _, dev=device: self.on_connect(dev)) cell_widget = QWidget() cell_layout = QHBoxLayout(cell_widget) cell_layout.addWidget(connectButton) cell_layout.setAlignment(connectButton, Qt.AlignHCenter) cell_layout.setContentsMargins(0, 0, 0, 0) self.tableWidget.setCellWidget(row, 4, cell_widget) for row in range(self.tableWidget.rowCount()): for col in range(self.tableWidget.columnCount()): item = self.tableWidget.item(row, col) item and item.setFlags(item.flags() & ~Qt.ItemIsEditable | Qt.ItemIsSelectable | Qt.ItemIsEnabled) def task_end_search(self): self.mainwindow.bluesoleil.cancelDeviceDiscovery() self.button.setEnabled(True) def on_refresh_list(self): self.button.setEnabled(False) try: self.mainwindow.bluesoleil.cancelDeviceDiscovery() self.mainwindow.bluesoleil.enableDeviceDiscovery() except AttributeError: self.close() timer = QTimer(self) timer.timeout.connect(self.task_end_search) timer.setSingleShot(True) timer.start(7000) def on_connect(self, data): self.mainwindow.bluesoleil.cancelDeviceDiscovery() self.mainwindow.connect_action_ui(data['dev_handle']) self.close() def closeEvent(self, event): self.mainwindow.exit_manu_search_dialog() event.accept() def keyPressEvent(self, event): pass if __name__ == '__main__': if (os.path.basename(__file__).lower().endswith('.int')) == 1: QCoreApplication.addLibraryPath(os.path.abspath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins'))) f_lock = open(file=os.path.abspath(os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16])), mode='w', encoding='utf-8') if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1: app = QApplication(sys.argv) msg = QMessageBox() msg.setIcon(QMessageBox.Warning) msg.setText('程序已经在运行中') msg.setWindowTitle('提示') msg.setStandardButtons(QMessageBox.Ok) msg.exec_() app.exit(1) sys.exit(1) app = QApplication(sys.argv) window_main = MainWindow(logger=Logger(name='main', fh=None, ch=LoggerConsHandler())) sys.exit(app.exec_())