BlueAudioUI/main.py

1137 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import io
import sys
import ctypes
import logging
import threading
import time
import json
import hashlib
import tempfile
import warnings
import subprocess
import win32com.client
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QDialog, QLabel, QComboBox, QCheckBox, QLineEdit, QAction, QMenu, QMessageBox, QPushButton, QTableWidget, QVBoxLayout, QHBoxLayout, QTableWidgetItem
from PyQt5.QtCore import Qt, QCoreApplication, QPropertyAnimation, QTimer
from PyQt5.QtGui import QFont
class CustomWarningFormatter:
def __call__(self, message, category, filename, lineno, line=None):
return 'Warning: %s\n' % (message,)
warnings.formatwarning = CustomWarningFormatter()
def exception_hook(types, value, traceback):
sys.stderr.write('%s: %s\n' % (types.__name__, value))
sys.excepthook = exception_hook
class Err(dict):
def __getitem__(self, item: int):
return super().get(int('0X%s' % ('{0:0>8X}'.format(item)[-4:],), 16), 'Unknown.')
class Svc(dict):
def __getitem__(self, item: int):
return super().get(int('0X%s' % ('{0:0>8X}'.format(item)[0:4],), 16), 'Unknown.')
class Dev(dict):
def __getitem__(self, item: int):
return super().get(int('0X%s' % ('{0:0>8X}'.format(item)[-4:],), 16), 'Unknown.')
class BtSdkRemoteServiceAttrStru(ctypes.Structure):
_fields_ = [
('mask', ctypes.c_uint32),
('service_class', ctypes.c_uint16),
('dev_hdl', ctypes.c_uint32),
('svc_name', ctypes.c_char * 256),
('ext_attributes', ctypes.c_void_p),
('status', ctypes.c_uint16)
]
class BtSdkRmtSPPSvcExtAttrStru(ctypes.Structure):
_fields_ = [
('size', ctypes.c_uint32),
('server_channel', ctypes.c_uint8)
]
class BtSdk:
def __init__(self):
self.errs = Err({
0X00000000: 'The operation completed successfully.',
0X000000C0: 'Local service is still active. When the application tries to remove or activate an active service, this error code is returned.',
0X000000C1: 'No service record with the specified search pattern is found on the remote device.',
0X000000C2: 'The specified service record does not exist on the remote device.',
0X00000301: 'The object specified by the handle does not exist in local BlueSoleil SDK database.',
0X00000302: 'The operation fails for an undefined reason.',
0X00000303: 'BlueSoleil SDK has not been initialized.',
0X00000304: 'The parameter value is invalid.',
0X00000305: 'The pointer value is NULL.',
0X00000306: 'Not enough storage is available to process this function.',
0X00000307: 'The specified buffer size is too small to hold the required information.',
0X00000308: 'The specified function is not supported by the BlueSoleil.',
0X00000309: 'No fixed PIN code is available.',
0X0000030A: 'The specified service has been connected already.',
0X0000030B: 'The request can not be processed since a same request is being processed.',
0X0000030C: 'The limit of connection number is reached.',
0X0000030D: 'An object with the specified attribute exists.',
0X0000030E: 'The specified object is accessed by other process. It can not be removed or modified.',
0X0000030F: 'The specified remote device is not paired.',
0X00000401: 'HCI error “Unknown HCI Command (0X000001)” is received.',
0X00000402: 'HCI error “Unknown Connection Identifier (0X000002)” is received.',
0X00000403: 'HCI error “Hardware Failure (0X000003)” is received.',
0X00000404: 'HCI error “Page Timeout (0X000004)” is received.',
0X00000405: 'HCI error “Authentication Failure (0X000005)” is received.',
0X00000406: 'HCI error “PIN or Key Missing (0X000006)” is received.',
0X00000407: 'HCI error “Memory Capacity Exceeded (0X000007)” is received.',
0X00000408: 'HCI error “Connection Timeout (0X000008)” is received.',
0X00000409: 'HCI error “Connection Limit Exceeded (0X000009)” is received.',
0X0000040A: 'HCI error “Synchronous Connection Limit to a Device Exceeded (0X00000A)” is received.',
0X0000040B: 'HCI error “ACL Connection Already Exists (0X00000B)” is received.',
0X0000040C: 'HCI error “Command Disallowed (0X00000C)” is received.',
0X0000040D: 'HCI error “Connection Rejected due to Limited Resources (0X00000D)” is received.',
0X0000040E: 'HCI error “Connection Rejected due to Security Reasons (0X00000E)” is received.',
0X0000040F: 'HCI error “Connection Rejected due to Unacceptable BD_ADDR (0X00000F)” is received.',
0X00000410: 'HCI error “Connection Accept Timeout Exceeded (0X10)” is received.',
0X00000411: 'HCI error “Unsupported Feature or Parameter Value (0X11)” is received.',
0X00000412: 'HCI error “Invalid HCI Command parameters (0X12)” is received.',
0X00000413: 'HCI error “Remote User Terminated Connection (0X13)” is received.',
0X00000414: 'HCI error “Remote Device Terminated Connection due to Low Resources (0X14)” is received.',
0X00000415: 'HCI error “Remote Device Terminated Connection due to Power Off (0X15)” is received.',
0X00000416: 'HCI error “Connection Terminated by Local Host (0X16)” is received.',
0X00000417: 'HCI error “Repeated Attempts (0X17)” is received.',
0X00000418: 'HCI error “Pairing Not Allowed (0X18)” is received.',
0X00000419: 'HCI error “Unknown LMP PDU (0X19)” is received.',
0X0000041A: 'HCI error “Unsupported Remote Feature / Unsupported LMP Feature (0X1A)” is received.',
0X0000041B: 'HCI error “SCO Offset Rejected (0X1B)” is received.',
0X0000041C: 'HCI error “SCO Interval Rejected (0X1C)” is received.',
0X0000041D: 'HCI error “SCO Air Mode Rejected (0X1D)” is received.',
0X0000041E: 'HCI error “Invalid LMP Parameters (0X1E)” is received.',
0X0000041F: 'HCI error “Unspecified Error (0X1F)” is received.',
0X00000420: 'HCI error “Unsupported LMP Parameter Value (0X20)” is received.',
0X00000421: 'HCI error “Role Change Not Allowed (0X21)” is received.',
0X00000422: 'HCI error “LMP Response Timeout (0X22)” is received.',
0X00000423: 'HCI error “LMP Error Transaction Collision (0X23)” is received.',
0X00000424: 'HCI error “LMP PDU Not Allowed (0X24)” is received.',
0X00000425: 'HCI error “Encryption Mode Not Acceptable (0X25)” is received.',
0X00000426: 'HCI error “Link Key Can not be Changed (0X26)” is received.',
0X00000427: 'HCI error “Requested QOS Not Supported (0X27)” is received.',
0X00000428: 'HCI error “Instant Passed (0X28)” is received.',
0X00000429: 'HCI error “Pairing with Unit Key Not Supported (0X29)” is received.',
0X0000042A: 'HCI error “Different Transaction Collision (0X2A)” is received.',
0X0000042C: 'HCI error “QOS Unacceptable Parameter (0X2C)” is received.',
0X0000042D: 'HCI error “QOS Rejected (0X2D)” is received.',
0X0000042E: 'HCI error “Channel Classification Not Supported (0X2E)” is received.',
0X0000042F: 'HCI error “Insufficient Security (0X2F)” is received.',
0X00000430: 'HCI error “Parameter Out of Mandatory Range (0X30)” is received.',
0X00000432: 'HCI error “Role Switch Pending (0X32)” is received.',
0X00000434: 'HCI error “Reserved Slot Violation (0X34)” is received.',
0X00000435: 'HCI error “Role Switch Failed (0X35)” is received.'
})
self.svcs = Svc({
0X00001101: 'Serial Port service.',
0X00001102: 'LAN Access service.',
0X00001103: 'Dial-up Networking service.',
0X00001104: 'Synchronization service.',
0X00001105: 'Object Push service.',
0X00001106: 'File Transfer service.',
0X00001107: 'IrMC Sync Command service.',
0X00001108: 'Headset service.',
0X00001109: 'Cordless Telephony service.',
0X0000110A: 'Audio Source service.',
0X0000110B: 'Audio Sink service.',
0X0000110C: 'A/V Remote Control Target service.',
0X0000110D: 'Advanced Audio Distribution service.',
0X0000110E: 'A/V Remote Control service.',
0X0000110F: 'Video conference service.',
0X00001110: 'Intercom service.',
0X00001111: 'Fax service.',
0X00001112: 'Headset Audio Gateway service.',
0X00001113: 'WAP service.',
0X00001114: 'WAP client service.',
0X00001115: 'PANU service.',
0X00001116: 'NAP service.',
0X00001117: 'GN service.',
0X00001118: 'Direct Print service.',
0X00001119: 'Referenced Print service.',
0X0000111A: 'Imaging service.',
0X0000111B: 'Imaging Responder service.',
0X0000111C: 'Imaging Automatic Archive service.',
0X0000111D: 'Imaging Referenced Objects service.',
0X0000111E: 'Hands-free service.',
0X0000111F: 'Hands-free Audio Gateway service.',
0X00001120: 'DPS Referenced Objects service.',
0X00001121: 'Reflected UI service',
0X00001122: 'Basic Print service.',
0X00001123: 'Print Status service.',
0X00001124: 'Human Interface Device service.',
0X00001125: 'Hardcopy Cable Replacement service.',
0X00001126: 'HCRP Print service.',
0X00001127: 'HCRP Scan service.',
0X0000112D: 'SIM Card Access service',
0X0000112E: 'PBAP Phonebook Client Equipment service.',
0X0000112F: 'PBAP Phonebook Server Equipment service.',
0X00001130: 'Phonebook Access service.',
0X00001200: 'Bluetooth Device Identification.'
})
self.devs = Dev({
0X00000100: 'Computer major device class',
0X00000104: 'Desktop workstation',
0X00000108: 'Server-class computer',
0X0000010C: 'Laptop computer',
0X00000110: 'Handheld PC/PDA (clam shell)',
0X00000114: 'Palm sized PC/PDA',
0X00000118: 'Wearable computer (Watch sized)',
0X00000200: 'Phone major device class',
0X00000204: 'Cellular phone',
0X00000208: 'Cordless phone',
0X0000020C: 'Smart phone',
0X00000210: 'Wired modem or voice gateway',
0X00000214: 'Common ISDN Access',
0X00000218: 'SIM card reader',
0X00000300: 'Fully available',
0X00000320: '1 - 17% utilized',
0X00000340: '17- 33% utilized',
0X00000360: '33 - 50% utilized',
0X00000380: '50 - 67% utilized',
0X000003A0: '67 - 83% utilized',
0X000003C0: '83 99% utilized',
0X000003E0: 'No service available',
0X00000400: 'Audio/Video major device class',
0X00000404: 'Wearable headset device',
0X00000408: 'Hands-free device',
0X00000410: 'Microphone',
0X00000414: 'Loudspeaker',
0X00000418: 'Headphones',
0X0000041C: 'Portable Audio',
0X00000420: 'Car Audio',
0X00000424: 'Set-top box',
0X00000428: 'HiFi Audio device',
0X0000042C: 'Videocassette recorder',
0X00000430: 'Video camera',
0X00000434: 'Camcorder',
0X00000438: 'Video monitor',
0X0000043C: 'Video display and loudspeaker',
0X00000440: 'Video conferencing',
0X00000448: 'Gaming/Toy',
0X00000500: 'Peripheral major device class',
0X00000540: 'Keyboard',
0X00000580: 'Pointing device',
0X000005C0: 'Combo keyboard/pointing device',
0X00000600: 'Imaging major device class',
0X00000610: 'Display',
0X00000620: 'Camera',
0X00000640: 'Scanner',
0X00000680: 'Printer',
0X00000700: 'Wearable major device class',
0X00000704: 'Wristwatch',
0X00000708: 'Pager',
0X0000070C: 'Jacket',
0X00000710: 'Helmet',
0X00000714: 'Glasses'
})
self.dll = ctypes.CDLL('BsSDK.dll')
self.init()
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.dll.Btsdk_Done()
except Exception:
pass
def init(self):
if (self.dll.Btsdk_Init() != 0 or not self.dll.Btsdk_IsSDKInitialized() or not self.dll.Btsdk_IsServerConnected()) == 1:
raise Exception('SDK has not been initialized or server is not connected.')
def done(self):
self.dll.Btsdk_Done()
def isBluetoothReady(self):
return self.dll.Btsdk_IsBluetoothReady()
def enableDeviceDiscovery(self):
device_class, max_dev_num, max_durations = 0X000400, 30, 10
code = self.dll.Btsdk_StartDeviceDiscovery(device_class, max_dev_num, max_durations)
code != 0 and warnings.warn(self.errs[code])
return not code
def cancelDeviceDiscovery(self):
code = self.dll.Btsdk_StopDeviceDiscovery()
code != 0 and warnings.warn(self.errs[code])
return not code
def getDeviceHandleByAddress(self, addr):
bd_addr = reversed(bytes.fromhex(addr.replace(':', '')))
bd_addr_buffer = (ctypes.c_uint8 * 6)(*bd_addr)
handle = self.dll.Btsdk_GetRemoteDeviceHandle(bd_addr_buffer)
return handle
def getDeviceName(self, dev_handle):
name_buffer = (ctypes.c_uint8 * 256)()
if (0 == self.dll.Btsdk_GetRemoteDeviceName(dev_handle, name_buffer, None)) == 1:
name_buffer_bytes = bytes(name_buffer)
data = name_buffer_bytes[:name_buffer_bytes.find(0)].decode('utf-8', 'ignore')
return data
else:
return ''
def getDeviceAddr(self, dev_handle):
addr_buffer = (ctypes.c_uint8 * 6)()
if (0 == self.dll.Btsdk_GetRemoteDeviceAddress(dev_handle, addr_buffer)) == 1:
addr_buffer_bytes = bytes(addr_buffer)
data = ':'.join(['%02X' % byte for byte in reversed(addr_buffer_bytes)])
return data
else:
return ''
def getDeviceCate(self, dev_handle):
dev_class = ctypes.c_int32()
if (0 == self.dll.Btsdk_GetRemoteDeviceClass(dev_handle, ctypes.byref(dev_class))) == 1:
return self.devs[dev_class.value]
else:
return 0
def getDeviceRSSI(self, dev_handle):
rssi = ctypes.c_int8()
if (0 == self.dll.Btsdk_GetRemoteRSSI(dev_handle, ctypes.byref(rssi))) == 1:
return rssi.value
else:
return 0
def getServiceAttributes(self, svc_handle):
attributes = BtSdkRemoteServiceAttrStru()
attributes.mask = 0x01
attributes.ext_attributes = 0
code = self.dll.Btsdk_RefreshRemoteServiceAttributes(svc_handle, ctypes.byref(attributes))
if (code == 0) == 1:
return attributes
else:
warnings.warn(self.errs[code])
return None
def getDevices(self):
max_dev_num = 30
dev_handles_buffer = (ctypes.c_int * max_dev_num)()
num_devices = self.dll.Btsdk_GetInquiredDevices(dev_handles_buffer, max_dev_num)
if (num_devices > 0) == 1:
devs = []
dev_handles = [dev_handles_buffer[i] for i in range(num_devices)]
for dev_handle in dev_handles:
devs.append({
'dev_handle': dev_handle,
'name': self.getDeviceName(dev_handle),
'addr': self.getDeviceAddr(dev_handle),
'cate': self.getDeviceCate(dev_handle),
'rssi': self.getDeviceRSSI(dev_handle)
})
devs.sort(key=lambda x: x['rssi'], reverse=True)
return devs
else:
return []
def connectAudioService(self, device_handle):
connection_handle = ctypes.c_uint32()
code = self.dll.Btsdk_ConnectEx(device_handle, 0X110B, 0, ctypes.byref(connection_handle))
if (code == 0) == 1:
return connection_handle.value
else:
return None
def removeAllDevices(self):
self.dll.Btsdk_StopDeviceDiscovery()
enum_handle = self.dll.Btsdk_StartEnumConnection()
while True:
conn = self.dll.Btsdk_EnumConnection(enum_handle, 0)
if (not conn) == 1:
self.dll.Btsdk_EndEnumConnection(enum_handle)
break
self.dll.Btsdk_Disconnect(conn)
max_dev_num = 30
dev_handles_buffer = (ctypes.c_int * max_dev_num)()
num_devices = self.dll.Btsdk_GetPairedDevices(dev_handles_buffer, max_dev_num)
if (num_devices > 0) == 1:
dev_handles = [dev_handles_buffer[i] for i in range(num_devices)]
for dev_handle in dev_handles:
self.dll.Btsdk_UnPairDevice(dev_handle)
self.dll.Btsdk_DeleteRemoteDeviceByHandle(dev_handle)
dev_handles_buffer = (ctypes.c_int * max_dev_num)()
num_devices = self.dll.Btsdk_GetInquiredDevices(dev_handles_buffer, max_dev_num)
if (num_devices > 0) == 1:
dev_handles = [dev_handles_buffer[i] for i in range(num_devices)]
for dev_handle in dev_handles:
self.dll.Btsdk_DeleteRemoteDeviceByHandle(dev_handle)
dev_handles_buffer = (ctypes.c_int * max_dev_num)()
num_devices = self.dll.Btsdk_GetStoredDevicesByClass(0, dev_handles_buffer, max_dev_num)
if (num_devices > 0) == 1:
dev_handles = [dev_handles_buffer[i] for i in range(num_devices)]
for dev_handle in dev_handles:
self.dll.Btsdk_DeleteRemoteDeviceByHandle(dev_handle)
self.dll.Btsdk_DeleteUnpairedDevicesByClass(0)
return True
def isBluetoothActive(self):
data = []
enum_handle = self.dll.Btsdk_StartEnumConnection()
while True:
conn = self.dll.Btsdk_EnumConnection(enum_handle, 0)
if (not conn) == 1:
self.dll.Btsdk_EndEnumConnection(enum_handle)
break
data.append(conn)
return True if len(data) > 0 else False
def _fd(f):
return f.fileno() if hasattr(f, 'fileno') else f
if os.name == 'nt':
import msvcrt
from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref)
from ctypes.wintypes import BOOL, DWORD, HANDLE
LOCK_SH = 0x0
LOCK_NB = 0x1
LOCK_EX = 0x2
LOCK_UN = 0x9
if sizeof(c_ulong) != sizeof(c_void_p):
ULONG_PTR = c_int64
else:
ULONG_PTR = c_ulong
PVOID = c_void_p
class _OFFSET(Structure):
_fields_ = [
('Offset', DWORD),
('OffsetHigh', DWORD)
]
class _OFFSET_UNION(Union):
_fields_ = [
('_offset', _OFFSET),
('Pointer', PVOID)
]
_anonymous_ = ['_offset']
class OVERLAPPED(Structure):
_fields_ = [
('Internal', ULONG_PTR),
('InternalHigh', ULONG_PTR),
('_offset_union', _OFFSET_UNION),
('hEvent', HANDLE)
]
_anonymous_ = ['_offset_union']
LPOVERLAPPED = POINTER(OVERLAPPED)
LockFileEx = windll.kernel32.LockFileEx
LockFileEx.restype = BOOL
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
UnlockFileEx = windll.kernel32.UnlockFileEx
UnlockFileEx.restype = BOOL
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
def flock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
if flags == LOCK_UN:
ret = UnlockFileEx(
hfile,
0,
0,
0xFFFF0000,
byref(overlapped)
)
else:
ret = LockFileEx(
hfile,
flags,
0,
0,
0xFFFF0000,
byref(overlapped)
)
return bool(ret)
else:
try:
import fcntl
LOCK_SH = fcntl.LOCK_SH
LOCK_NB = fcntl.LOCK_NB
LOCK_EX = fcntl.LOCK_EX
LOCK_UN = fcntl.LOCK_UN
except (ImportError, AttributeError):
LOCK_EX = LOCK_SH = LOCK_NB = 0
def flock(f, flags):
return flags == LOCK_UN
else:
def flock(f, flags):
return fcntl.flock(_fd(f), flags) == 0
def getJson(file, data=None):
if os.path.exists(file):
try:
return json.loads(open(file=file, mode='r', encoding='utf-8').read())
except json.decoder.JSONDecodeError:
return data
return data
def putJson(file, data=None):
with open(file=file, mode='w', encoding='utf-8') as f:
flock(f, LOCK_EX)
res = f.write(json.dumps(data, indent=4, ensure_ascii=True))
flock(f, LOCK_UN)
return res
class LoggerFileHandler:
def __init__(self, log_file: str, mode: str = 'a', level: str = None, fmt: str = None):
self.log, self.mod = log_file, mode
self.lev, self.fmt = level, fmt
self.sty = '%'
class LoggerConsHandler:
def __init__(self, level: str = None, fmt: str = None):
self.lev, self.fmt = level, fmt
self.sty = '%'
class Logger:
logger = None
levels = {
'CRITICAL': logging.CRITICAL,
'FATAL': logging.FATAL,
'ERROR': logging.ERROR,
'WARNING': logging.WARNING,
'WARN': logging.WARN,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG,
'NOTSET': logging.NOTSET,
'D': logging.DEBUG,
'I': logging.INFO,
'W': logging.WARNING,
'E': logging.ERROR,
'F': logging.FATAL
}
default_format = '{asctime} - {name} - {levelname[0]}: {message}'
default_format_style = '{'
handler_list = []
def __init__(
self,
name: str = 'default',
default_level='DEBUG',
fh: LoggerFileHandler = None,
ch: LoggerConsHandler = None,
add_default_handler=False
):
ch = LoggerConsHandler() if add_default_handler and not ch else ch
if fh and not isinstance(fh, LoggerFileHandler):
raise TypeError('The parameter fh must be <LoggerFileHandler> type.')
if ch and not isinstance(ch, LoggerConsHandler):
raise TypeError('The parameter ch must be <LoggerConsHandler> type.')
self.logger = logging.getLogger(name)
self.logger.setLevel(self.levels[default_level])
if fh:
fhandler = logging.FileHandler(filename=fh.log, mode=fh.mod, encoding='utf-8')
self.handler_list.append(fhandler)
fhandler.setLevel(self.levels[fh.lev or default_level])
fh.fmt = fh.fmt or self.default_format
fh.sty = '{' if '%' not in fh.fmt else '%'
fhandler.setFormatter(logging.Formatter(fmt=fh.fmt, style=fh.sty))
self.logger.addHandler(fhandler)
if ch:
chandler = logging.StreamHandler()
self.handler_list.append(chandler)
chandler.setLevel(self.levels[ch.lev or default_level])
ch.fmt = ch.fmt or self.default_format
ch.sty = '{' if '%' not in ch.fmt else '%'
chandler.setFormatter(logging.Formatter(fmt=ch.fmt, style=ch.sty))
self.logger.addHandler(chandler)
self.d = self.logger.debug
self.i = self.logger.info
self.w = self.logger.warning
self.e = self.logger.error
self.f = self.logger.fatal
self.c = self.logger.critical
class Setting(dict):
def __init__(self, setting_file: str, setting_default: dict):
self.data_file = os.path.abspath(setting_file)
super().__init__(getJson(self.data_file, setting_default))
def __getitem__(self, item):
if item in self:
return super().__getitem__(item)
else:
return None
def __setitem__(self, key, value):
super().__setitem__(key, value)
putJson(self.data_file, self)
class CustomLineEdit(QLineEdit):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #000000;')
def contextMenuEvent(self, event):
menu = QMenu(self)
action_c = menu.addAction('复制')
action_p = menu.addAction('粘贴')
action_c.triggered.connect(self.copy)
action_p.triggered.connect(self.paste)
menu.popup(self.mapToGlobal(event.pos()))
class CustomLineEditNoPopup(QLineEdit):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold;')
def contextMenuEvent(self, event):
pass
class CustomPushButton(QPushButton):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #0C0C0C;')
class CustomLabel(QLabel):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #0C0C0C; border: 1px solid #0C0C0C;')
class CustomComboBox(QComboBox):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #0C0C0C;')
class CustomCheckBox(QCheckBox):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setStyleSheet('font-size: 12px; font-family: \'Microsoft YaHei\'; color: #0C0C0C;')
class CustomHBoxLayout(QHBoxLayout):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setAlignment(Qt.AlignTop | Qt.AlignLeft)
class CustomVBoxLayout(QVBoxLayout):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setAlignment(Qt.AlignTop | Qt.AlignLeft)
class MainWindow(QMainWindow):
def __init__(self, logger: Logger):
super().__init__()
self.app_name = '蓝牙音频连接'
self.app_version = ('1.1.0', '20241030', 'zhaoyafan', 'zhaoyafan@foxmail.com', 'https://www.fanscloud.net/')
self.app_text = {'search': '正在搜索', 'connecting': '连接中...'}
self.logger = logger
self.toast = ToastNotification()
self.setting = Setting(os.path.abspath(os.path.join(tempfile.gettempdir(), '%s.config' % self.md5(__file__)[:16])), {})
self.bluesoleil = None
self.currentDevHandle = None
self.currentDevRssi = 0
self.setWindowTitle(self.app_name)
self.setGeometry(0, 0, 600, 400)
self.statusBar()
self.menubar = self.menuBar()
moremenu = self.menubar.addMenu('更多')
self.restartServAction = QAction('重启蓝牙服务', self)
self.createShortAction = QAction('发送快捷方式到桌面', self)
self.aboutWindowAction = QAction('关于', self)
self.restartServAction.triggered.connect(self.restartServActionFunction)
self.createShortAction.triggered.connect(self.createShortActionFunction)
self.aboutWindowAction.triggered.connect(self.aboutWindowActionFunction)
moremenu.addActions(
[
self.restartServAction,
self.createShortAction,
self.aboutWindowAction
]
)
# 定义布局
m_layout_0 = CustomVBoxLayout()
m_layout_1 = CustomHBoxLayout()
m_layout_2 = CustomHBoxLayout()
m_layout_3 = CustomHBoxLayout()
m_layout_4 = CustomHBoxLayout()
m_layout_0.addLayout(m_layout_1)
m_layout_0.addLayout(m_layout_2)
m_layout_0.addLayout(m_layout_3)
m_layout_0.addLayout(m_layout_4)
# 蓝牙名称
self.edtBtName = CustomLineEditNoPopup('')
self.edtBtName.setReadOnly(True)
self.edtBtName.setFixedSize(250, 36)
m_layout_1.addWidget(self.edtBtName)
# 信号强度
self.edtBtRssi = CustomLineEditNoPopup('')
self.edtBtRssi.setReadOnly(True)
self.edtBtRssi.setFixedSize(38, 36)
m_layout_1.addWidget(self.edtBtRssi)
# 断开连接
self.butDisconnect = CustomPushButton('断开连接')
self.butDisconnect.setFixedSize(64, 36)
self.butDisconnect.clicked.connect(self.on_disconnect)
m_layout_1.addWidget(self.butDisconnect)
# 自动搜索
self.butAutoSearch = CustomPushButton('自动搜索')
self.butAutoSearch.setFixedSize(122, 36)
self.butAutoSearch.clicked.connect(self.on_autosearch)
m_layout_2.addWidget(self.butAutoSearch)
# 手动搜索
self.butManuSearch = CustomPushButton('手动搜索')
self.butManuSearch.setFixedSize(122, 36)
self.butManuSearch.clicked.connect(self.on_manusearch)
m_layout_2.addWidget(self.butManuSearch)
# 地址输入
self.edtBtAddr = CustomLineEdit('')
self.edtBtAddr.setStyleSheet('QLineEdit {font-size: 24px; font-family: \'Microsoft YaHei\'; color: #000000; background-color: #FFFFEE;}')
self.edtBtAddr.setFixedSize(294, 36)
self.edtBtAddr.returnPressed.connect(self.on_connect_bt_address)
m_layout_3.addWidget(self.edtBtAddr)
# 复制地址
self.butCopyAddressToClipboard = CustomPushButton('复制地址')
self.butCopyAddressToClipboard.setFixedSize(64, 36)
self.butCopyAddressToClipboard.clicked.connect(self.on_copy_address_to_clipboard)
m_layout_3.addWidget(self.butCopyAddressToClipboard)
# 显示界面
central_widget = QWidget()
central_widget.setLayout(m_layout_0)
self.setCentralWidget(central_widget)
self.setFixedSize(self.minimumSizeHint())
screen_rect = QApplication.desktop().availableGeometry()
window_rect = self.geometry()
self.move(int((screen_rect.width() - window_rect.width()) * 0.5), int((screen_rect.height() - window_rect.height()) * 0.5))
self.show()
self.init_bluesoleil()
self.connect_status_update_timer = QTimer()
self.connect_status_update_timer.timeout.connect(self.connect_status_update)
if (self.bluesoleil.isBluetoothActive() and self.setting['deviceHandle'] > 0) == 1:
self.in_connect_process()
self.currentDevHandle = self.setting['deviceHandle'] or 0
self.currentDevRssi = self.setting['deviceRssi'] or 0
self.connect_action_ui_update()
def init_bluesoleil(self):
try:
self.bluesoleil = BtSdk()
except FileNotFoundError:
QMessageBox.critical(self, '错误', '“千月(bluesoleil)”未安装,请安装后再试。')
self.close()
sys.exit(1)
except Exception:
QMessageBox.critical(self, '警告', '蓝牙未启用或未连接蓝牙适配器')
self.close()
sys.exit(1)
def restartServActionFunction(self):
if (os.path.exists('C:\\Program Files (x86)\\IVT Corporation\\BlueSoleil\\BtTray.exe')) == 1:
subprocess.run(os.path.join(os.path.dirname(__file__), 'RestartBlueSoleil.bat'), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
QMessageBox.information(self, '提示', '%s' % ('服务已重新启动',))
self.init_bluesoleil()
def createShortActionFunction(self):
target = '%s.exe' % (os.path.splitext(__file__)[0],)
shortcut_path = self.path_expandvars('%%USERPROFILE%%\\Desktop\\%s.lnk' % (self.app_name,))
if (os.path.exists(target)) == 1:
self.create_shortcut(target, shortcut_path, 1)
QMessageBox.information(self, '提示', '%s' % ('已成功发送快捷方式到桌面',))
def aboutWindowActionFunction(self):
_c = self.app_version
if (not _c) == 0:
QMessageBox.information(self, '关于', "" 'version: %s, build: %s, author: %s, email: %s, site: %s' % (_c[0], _c[1], _c[2], _c[3], '<a href=\'%s\'>%s</a>' % (_c[4], _c[4])))
@staticmethod
def md5(input_data):
if isinstance(input_data, bytes):
return hashlib.md5(input_data).hexdigest()
if isinstance(input_data, str):
return hashlib.md5(bytes(input_data, encoding='utf-8')).hexdigest()
md5_object = hashlib.md5()
while True:
data = input_data.read(io.DEFAULT_BUFFER_SIZE)
if data:
md5_object.update(data)
else:
return md5_object.hexdigest()
@staticmethod
def path_expandvars(path):
resolve = os.path.expandvars(path)
if (re.search('%[A-Za-z0-9_]+%', resolve)) is not None:
raise Exception('Variable analysis failed')
return resolve
@staticmethod
def create_shortcut(target, shortcut_path, run_as_admin):
shell = win32com.client.Dispatch('WScript.Shell')
try:
os.remove(shortcut_path)
except FileNotFoundError:
pass
shortcut = shell.CreateShortCut(shortcut_path)
shortcut.TargetPath = target
if shortcut_path.endswith('.lnk'):
shortcut.Arguments = ''
shortcut.WorkingDirectory = os.path.dirname(target)
if target.startswith('\\\\'):
shortcut.WorkingDirectory = ''
shortcut.save()
if shortcut_path.endswith('.lnk') and run_as_admin:
with open(shortcut_path, 'r+b') as f:
if os.path.isfile(shortcut_path):
f.seek(21, 0)
f.write(b'\x22\x00')
def closeEvent(self, event):
try:
self.bluesoleil.done()
self.bluesoleil = None
except:
pass
event.accept()
def open_manu_search_dialog(self):
self.setEnabled(False)
ManuWindow(self).exec()
def exit_manu_search_dialog(self):
self.setEnabled(True)
def in_connect_process(self):
self.edtBtName.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #7BD136')
self.edtBtName.setText('')
self.edtBtRssi.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #7BD136')
self.edtBtRssi.setText('')
self.edtBtAddr.setText('')
self.edtBtAddr.setEnabled(False)
self.edtBtAddr.setFocus()
self.butAutoSearch.setEnabled(False)
self.butManuSearch.setEnabled(False)
def un_connect_process(self):
self.edtBtName.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #FDD391')
self.edtBtName.setText('')
self.edtBtRssi.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #F5F5F5')
self.edtBtRssi.setText('')
self.edtBtAddr.setText('')
self.edtBtAddr.setEnabled(True)
self.edtBtAddr.setFocus()
self.butAutoSearch.setEnabled(True)
self.butManuSearch.setEnabled(True)
def on_disconnect(self):
self.connect_status_update_timer.stop()
self.bluesoleil.cancelDeviceDiscovery()
self.bluesoleil.removeAllDevices()
self.currentDevHandle = 0
self.currentDevRssi = 0
self.setting['deviceHandle'] = 0
self.setting['deviceRssi'] = 0
self.un_connect_process()
def on_autosearch(self):
self.on_disconnect()
self.setWindowTitle(self.app_text['search'])
self.bluesoleil.enableDeviceDiscovery()
devices = None
for i in range(12):
time.sleep(0.33)
devices = self.bluesoleil.getDevices()
devices = [dev for dev in devices if dev['rssi'] >= -65]
if len(devices) != 0:
break
self.setWindowTitle(self.app_name)
if (not devices) == 1:
QMessageBox.information(self, '提示', '未找到任何可用设备')
return None
self.connect_action_ui(devices[0]['dev_handle'])
def on_manusearch(self):
self.open_manu_search_dialog()
def on_copy_address_to_clipboard(self):
text = self.edtBtAddr.text()
text and QApplication.clipboard().setText(text)
def on_connect_bt_address(self):
edit = self.edtBtAddr
text = edit.text().strip().replace(':', '').replace('-', '').upper()
if (text == '') == 1:
edit.selectAll()
return None
if (not re.compile('^([0-9A-Fa-f]{2}){6}$').match(text)) == 1:
QMessageBox.critical(self, '错误', '非法地址:\n\n%s' % (text,))
edit.selectAll()
return None
return self.connect_action_ui(text)
def connect_action_ui_update(self):
self.connect_status_update_timer.start(1500)
self.edtBtAddr.setText(str(self.bluesoleil.getDeviceAddr(self.currentDevHandle)).replace(':', ''))
self.edtBtName.setText(str(self.bluesoleil.getDeviceName(self.currentDevHandle)))
self.edtBtRssi.setText(str(self.currentDevRssi if self.currentDevRssi != -3 else ''))
if -3 == self.currentDevRssi:
self.edtBtRssi.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #FFF3F3')
if -65 < self.currentDevRssi <= -5:
self.edtBtRssi.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #73CD2A')
if -72 < self.currentDevRssi <= -65:
self.edtBtRssi.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #F2B32D')
if -99 < self.currentDevRssi <= -72:
self.edtBtRssi.setStyleSheet('font-size: 16px; font-family: \'Microsoft YaHei\'; color: #303030; border: 1px solid #808080; font-weight: bold; background-color: #F25555')
def connect_action_ui(self, handle):
if (handle != 0) == 1:
self.in_connect_process()
self.setWindowTitle(self.app_text['connecting'])
self.currentDevRssi = self.connect(handle) or 0
else:
self.currentDevRssi = 0
if (not self.currentDevRssi) == 1:
self.un_connect_process()
self.edtBtAddr.selectAll()
self.setWindowTitle(self.app_name)
QMessageBox.critical(self, '提示', '连接失败')
return None
else:
self.connect_action_ui_update()
self.setWindowTitle(self.app_name)
return True
def connect_status_update(self):
self.bluesoleil.isBluetoothActive() or self.on_disconnect()
def connect(self, data):
devHandle = data
if (isinstance(data, str)) == 1:
self.bluesoleil.enableDeviceDiscovery()
for i in range(9):
time.sleep(0.33)
devHandle = self.bluesoleil.getDeviceHandleByAddress(data)
if (not devHandle) == 0:
break
if (not devHandle) == 1:
return None
rssi = 0
for i in range(8):
time.sleep(0.25)
rssi = self.bluesoleil.getDeviceRSSI(devHandle)
if (rssi != 0) == 1:
break
code = self.bluesoleil.connectAudioService(devHandle)
if (not code) == 1:
return None
self.currentDevHandle = devHandle
self.setting['deviceHandle'] = devHandle
self.setting['deviceRssi'] = rssi
if (not rssi and code > 0) == 1:
return -3
else:
return rssi
class ManuWindow(QDialog):
def __init__(self, mainwindow):
super().__init__()
self.mainwindow = mainwindow
self.devices = []
self.timer = QTimer(self)
self.timer.timeout.connect(self.task_device_list_update)
self.timer.start(1000)
self.setGeometry(0, 0, 510, 295)
self.setWindowTitle('搜索蓝牙音频设备')
self.button = QPushButton('刷新列表', self)
self.button.setFixedSize(508, 32)
self.button.clicked.connect(self.on_refresh_list)
self.tableWidget = QTableWidget(self)
self.tableWidget.setFixedSize(508, 240)
self.tableWidget.setRowCount(0)
self.tableWidget.setColumnCount(5)
self.tableWidget.setHorizontalHeaderLabels(['设备名称', '地址', '设备类型', '信号强度', ''])
font = QFont('SimSun', 9)
self.tableWidget.setFont(font)
layout = QVBoxLayout()
layout.addWidget(self.button)
layout.addWidget(self.tableWidget)
self.setLayout(layout)
self.tableWidget.verticalHeader().hide()
self.tableWidget.setColumnWidth(0, 150)
self.tableWidget.setColumnWidth(1, 120)
self.tableWidget.setColumnWidth(2, 100)
self.tableWidget.setColumnWidth(3, 60)
self.tableWidget.setColumnWidth(4, 50)
# 显示界面
screen_rect = QApplication.desktop().availableGeometry()
window_rect = self.geometry()
self.setFixedSize(self.minimumSizeHint())
self.move(int((screen_rect.width() - window_rect.width()) * 0.5), int((screen_rect.height() - window_rect.height()) * 0.5))
self.show()
self.on_refresh_list()
def task_device_list_update(self):
try:
devices = self.mainwindow.bluesoleil.getDevices()
except AttributeError:
self.close()
return None
if (self.devices == devices) == 1:
return None
self.devices = devices
self.tableWidget.setRowCount(0)
for row, device in enumerate(devices):
self.tableWidget.insertRow(row)
self.tableWidget.setItem(row, 0, QTableWidgetItem('%s' % device['name']))
self.tableWidget.setItem(row, 1, QTableWidgetItem('%s' % device['addr']))
self.tableWidget.setItem(row, 2, QTableWidgetItem('%s' % device['cate']))
self.tableWidget.setItem(row, 3, QTableWidgetItem('%s' % device['rssi']))
connectButton = QPushButton('连接', self)
connectButton.clicked.connect(lambda _, dev=device: self.on_connect(dev))
cell_widget = QWidget()
cell_layout = QHBoxLayout(cell_widget)
cell_layout.addWidget(connectButton)
cell_layout.setAlignment(connectButton, Qt.AlignHCenter)
cell_layout.setContentsMargins(0, 0, 0, 0)
self.tableWidget.setCellWidget(row, 4, cell_widget)
for row in range(self.tableWidget.rowCount()):
for col in range(self.tableWidget.columnCount()):
item = self.tableWidget.item(row, col)
item and item.setFlags(item.flags() & ~Qt.ItemIsEditable | Qt.ItemIsSelectable | Qt.ItemIsEnabled)
def task_end_search(self):
self.mainwindow.bluesoleil.cancelDeviceDiscovery()
self.button.setEnabled(True)
def on_refresh_list(self):
self.button.setEnabled(False)
try:
self.mainwindow.bluesoleil.cancelDeviceDiscovery()
self.mainwindow.bluesoleil.enableDeviceDiscovery()
except AttributeError:
self.close()
self.mainwindow.toast.show_toast('正在搜索设备')
timer = QTimer(self)
timer.timeout.connect(self.task_end_search)
timer.setSingleShot(True)
timer.start(9999)
def on_connect(self, data):
self.mainwindow.bluesoleil.cancelDeviceDiscovery()
self.mainwindow.connect_action_ui(data['dev_handle'])
self.close()
def closeEvent(self, event):
self.mainwindow.exit_manu_search_dialog()
event.accept()
def keyPressEvent(self, event):
pass
class ToastNotification(QDialog):
def __init__(self):
super().__init__()
self.setWindowFlags(Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint | Qt.WindowType.Tool)
layout = QVBoxLayout()
layout.setContentsMargins(5, 705, 5, 5)
self.label = QLabel("")
self.label.setStyleSheet("font-family: 'Microsoft YaHei'; font-size: 24px; color: #ffffff;")
layout.addWidget(self.label)
self.setLayout(layout)
self.setAttribute(Qt.WA_TranslucentBackground)
self.setStyleSheet("background-color: rgba(65, 65, 65, 180); border-radius: 5px; padding: 10px 16px 10px 16px")
self.adjustSize()
self.timer = QTimer(self)
self.timer.setInterval(2000)
self.timer.timeout.connect(self.close)
self.fade_animation = QPropertyAnimation(self, b"windowOpacity")
self.fade_animation.setDuration(500)
def show_toast(self, message):
self.close()
self.label.setText(message)
self.timer.isActive() and self.timer.stop()
self.timer.start()
self.adjustSize()
self.fade_animation.setStartValue(0.0)
self.fade_animation.setEndValue(1.0)
self.show()
self.fade_animation.start()
def closeEvent(self, event):
if self.timer.isActive():
self.timer.stop()
super().closeEvent(event)
if __name__ == '__main__':
if (os.path.basename(__file__).lower().endswith('.int')) == 1:
QCoreApplication.addLibraryPath(os.path.abspath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins')))
f_lock = open(file=os.path.abspath(os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16])), mode='w', encoding='utf-8')
if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1:
app = QApplication(sys.argv)
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setText('程序已经在运行中')
msg.setWindowTitle('提示')
msg.setStandardButtons(QMessageBox.Ok)
msg.exec_()
app.exit(1)
sys.exit(1)
app = QApplication(sys.argv)
window_main = MainWindow(logger=Logger(name='main', fh=None, ch=LoggerConsHandler()))
sys.exit(app.exec_())