import os import re import sys import time import json import psutil import shutil import signal import winreg import hashlib import inspect import datetime import tempfile import platform import threading import subprocess import http.client import urllib.parse import usb.core import usb.backend.libusb0 from pathlib import Path from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMessageBox, QMenu, QAction, QWidget, QVBoxLayout, QTableWidget, QTableWidgetItem, QLabel, QFrame from PyQt5.QtCore import Qt, QCoreApplication, QPropertyAnimation, pyqtSignal, QEasingCurve, QPoint from PyQt5.QtGui import QPixmap, QFont, QIcon, QPalette, QColor def _fd(f): return f.fileno() if hasattr(f, 'fileno') else f if os.name == 'nt': import msvcrt from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref) from ctypes.wintypes import BOOL, DWORD, HANDLE LOCK_SH = 0x0 LOCK_NB = 0x1 LOCK_EX = 0x2 LOCK_UN = 0x9 if sizeof(c_ulong) != sizeof(c_void_p): ULONG_PTR = c_int64 else: ULONG_PTR = c_ulong PVOID = c_void_p class _OFFSET(Structure): _fields_ = [ ('Offset', DWORD), ('OffsetHigh', DWORD) ] class _OFFSET_UNION(Union): _fields_ = [ ('_offset', _OFFSET), ('Pointer', PVOID) ] _anonymous_ = ['_offset'] class OVERLAPPED(Structure): _fields_ = [ ('Internal', ULONG_PTR), ('InternalHigh', ULONG_PTR), ('_offset_union', _OFFSET_UNION), ('hEvent', HANDLE) ] _anonymous_ = ['_offset_union'] LPOVERLAPPED = POINTER(OVERLAPPED) LockFileEx = windll.kernel32.LockFileEx LockFileEx.restype = BOOL LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED] UnlockFileEx = windll.kernel32.UnlockFileEx UnlockFileEx.restype = BOOL UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED] def flock(f, flags): hfile = msvcrt.get_osfhandle(_fd(f)) overlapped = OVERLAPPED() if flags == LOCK_UN: ret = UnlockFileEx( hfile, 0, 0, 0xFFFF0000, byref(overlapped) ) else: ret = LockFileEx( hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped) ) return bool(ret) else: try: import fcntl LOCK_SH = fcntl.LOCK_SH LOCK_NB = fcntl.LOCK_NB LOCK_EX = fcntl.LOCK_EX LOCK_UN = fcntl.LOCK_UN except (ImportError, AttributeError): LOCK_EX = LOCK_SH = LOCK_NB = 0 def flock(f, flags): return flags == LOCK_UN else: def flock(f, flags): return fcntl.flock(_fd(f), flags) == 0 class ShellResult(dict): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.code = self.stdout = self.stderr = None def __setattr__(self, key, value): pass def __getitem__(self, item): try: return super().__getitem__(item) except Exception: return None def __getattr__(self, item): try: return super().__getitem__(item) except Exception: return None class ShellExecution: @classmethod def exec(cls, command: str = '', stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=0.0, env=None): stdout = stdout or subprocess.DEVNULL stderr = stderr or subprocess.DEVNULL env = env or os.environ try: result = subprocess.run(command, shell=True, stdout=stdout, stderr=stderr, timeout=timeout or None, env=env) stdout_text = '' stderr_text = '' if result.stdout: try: stdout_text = bytes(result.stdout).decode('utf-8') except Exception: pass if result.stderr: try: stderr_text = bytes(result.stderr).decode('utf-8') except Exception: pass return ShellResult({ 'code': result.returncode, 'stdout': stdout_text, 'stderr': stderr_text }) except subprocess.TimeoutExpired: return ShellResult({ 'code': 1, 'stdout': '', 'stderr': 'Execution timeout.' }) class HTTPResponse: status: int reason: str result: str def __init__(self, response): self.status = response[0] self.reason = response[1] self.result = response[2] def json(self): try: return json.loads(self.result) except json.decoder.JSONDecodeError: return None class HTTPRequest: @classmethod def get(cls, url='', header=None, timeout=15.0) -> HTTPResponse: url = str(url) header = header or {} header = {**{'User-Agent': 'Python'}, **header} host = urllib.parse.urlparse(url).netloc conn = http.client.HTTPSConnection(host, timeout=timeout) if urllib.parse.urlparse(url).scheme == 'https' else http.client.HTTPConnection(host, timeout=timeout) conn.request('GET', url, None, header) response = conn.getresponse() response_data = response.read().decode('utf-8') conn.close() return HTTPResponse((response.status, response.reason, response_data)) @classmethod def post(cls, url='', data=None, json_data=None, header=None, timeout=15.0) -> HTTPResponse: url = str(url) header = header or {} data = json.dumps(json_data) if json_data else (data and str(data).encode('utf-8')) header = {**{'User-Agent': 'Python'}, **header} host = urllib.parse.urlparse(url).netloc conn = http.client.HTTPSConnection(host, timeout=timeout) if urllib.parse.urlparse(url).scheme == 'https' else http.client.HTTPConnection(host, timeout=timeout) conn.request('POST', url, data, header) response = conn.getresponse() response_data = response.read().decode('utf-8') conn.close() return HTTPResponse((response.status, response.reason, response_data)) def calculate_md5(input_string): md5_hash = hashlib.md5(input_string.encode()).hexdigest() return md5_hash def get_date_timestamp(): return datetime.datetime.now().strftime('%Y%m%d%H%M%S') class FloatingWindow(QWidget): def __init__(self): super().__init__() dark_palette = QPalette() dark_palette.setColor(QPalette.Window, QColor(62, 62, 62)) self.setPalette(dark_palette) self.setStyleSheet('') self.setStyleSheet(""" QMainWindow { border: 1px solid #626262; } QDialog { border: 1px solid #626262; } QLabel { color: #FCFCFC; } QLineEdit { background: #2A2A2A; border: 2px solid #444; border-radius: 5px; padding: 5px; color: white; selection-color: rgba(255, 255, 255, 0.95); selection-background-color: rgba(245, 245, 245, 0.15); } QTableWidget { background: #3f3f3f; border: 2px solid #6c6c6c; border-radius: 5px; padding: 5px; color: white; } QHeaderView::section { background-color: #444; color: white; padding: 4px; border: 1px solid #555; } QTableWidget::item { padding: 5px; } QPushButton { background: #444; padding: 5px 10px; border-radius: 4px; color: white; } QPushButton:hover { background: #555; } """) self.animation = None self.setWindowFlags(Qt.WindowStaysOnTopHint | Qt.FramelessWindowHint | Qt.Tool) self.setAttribute(Qt.WA_TranslucentBackground, False) self.setWindowTitle('Product Information') self.setMinimumWidth(380) self.layout = QVBoxLayout() self.layout.setContentsMargins(4, 4, 4, 4) self.setLayout(self.layout) self.table = QTableWidget(0, 2) self.table.setEditTriggers(QTableWidget.NoEditTriggers) self.table.horizontalHeader().setVisible(False) self.table.verticalHeader().setVisible(False) self.table.setShowGrid(True) self.table.horizontalHeader().setStretchLastSection(True) self.table.setFixedHeight(125) font = QFont('Consolas', 9) self.table.setFont(font) self.table.verticalHeader().setDefaultSectionSize(20) self.table.setColumnWidth(0, 64) self.table.setColumnWidth(1, 280) self.layout.addWidget(self.table) self.image_frame = QFrame() self.image_frame.setFrameShape(QFrame.Box) self.image_frame.setLineWidth(1) self.image_frame.setStyleSheet('border: 1px solid gray;') self.image_frame.setFixedSize(380, 190) self.image_label = QLabel() self.image_label.setAlignment(Qt.AlignCenter) self.image_image = QVBoxLayout() self.image_image.addStretch() self.image_image.addWidget(self.image_label) self.image_image.addStretch() self.image_frame.setLayout(self.image_image) self.layout.addWidget(self.image_frame, 0, Qt.AlignCenter) self.alert_label = QLabel() self.alert_label.setAlignment(Qt.AlignLeft) self.alert_label.setStyleSheet('padding: 0px 2px 0px 2px; color: red; font-family: \"Microsoft YaHei\";') self.alert_label.setFixedSize(380, 16) self.layout.addWidget(self.alert_label) self.setContextMenuPolicy(Qt.CustomContextMenu) self.customContextMenuRequested.connect(self.show_context_menu) self.adjustSize() self.hide() def show_context_menu(self, pos): menu = QMenu(self) close_action = QAction('关闭窗口', self) close_action.triggered.connect(self.hide_window) menu.addAction(close_action) menu.exec_(self.mapToGlobal(pos)) def show(self): return super().show() def hide(self): self.set_table_data(None) self.set_image(None) self.set_alert(None) return super().hide() def set_table_data(self, data): data = data or [] self.table.clearContents() self.table.setRowCount(len(data)) for row, (k, v) in enumerate(data): k_item = QTableWidgetItem(k) k_item.setTextAlignment(Qt.AlignLeft | Qt.AlignVCenter) k_item.setFlags(Qt.ItemIsEnabled | Qt.ItemIsSelectable) v_item = QTableWidgetItem(v) v_item.setTextAlignment(Qt.AlignLeft | Qt.AlignVCenter) v_item.setFlags(Qt.ItemIsEnabled | Qt.ItemIsSelectable) self.table.setItem(row, 0, k_item) self.table.setItem(row, 1, v_item) def set_image(self, path): if (path and os.path.exists(path)) == 1: pixmap = QPixmap(path) pixmap = pixmap.scaled(self.image_frame.size(), aspectRatioMode=Qt.KeepAspectRatio, transformMode=Qt.SmoothTransformation) self.image_frame.show() self.image_label.setPixmap(pixmap) else: self.image_label.setPixmap(QPixmap()) self.adjustSize() def set_alert(self, text): if (not text) == 0: self.alert_label.show() self.alert_label.setText(text) else: self.alert_label.setText('') self.adjustSize() def show_window(self): if (self.isVisible()) == 0: screen_geometry = QApplication.desktop().screenGeometry() x = (screen_geometry.width() - self.width()) // 2 y = (screen_geometry.height() - 40) s_pos = QPoint(x, y) e_pos = QPoint(x, screen_geometry.height() - self.height() - 40) self.move(s_pos) self.show() self.animation = QPropertyAnimation(self, b'pos') self.animation.setDuration(450) self.animation.setStartValue(s_pos) self.animation.setEndValue(e_pos) self.animation.setEasingCurve(QEasingCurve.OutQuad) self.animation.start() def hide_window(self): if (self.isVisible()) == 1: c_pos = self.pos() e_pos = QPoint(c_pos.x(), QApplication.desktop().screenGeometry().height()) self.animation = QPropertyAnimation(self, b'pos') self.animation.setDuration(520) self.animation.setStartValue(c_pos) self.animation.setEndValue(e_pos) self.animation.setEasingCurve(QEasingCurve.InQuad) self.animation.finished.connect(self.hide) self.animation.start() else: self.hide() class MainWindow(QSystemTrayIcon): signal_on_current_device_changed = pyqtSignal(int, str) def __init__(self, app_name, app_version): self.app_name = app_name self.app_version = app_version self.running = True self.floating_window = None self.current_device_no_connection = (0, '') self.current_device_property = self.current_device_no_connection super().__init__(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')), None) self.setToolTip('正在监听设备连接') self.tray_menu = QMenu() action_list = [ ['退出程序', self.exit] ] for action_item in action_list: action = QAction(action_item[0], self) action.triggered.connect(action_item[1]) self.tray_menu.addAction(action) self.setContextMenu(self.tray_menu) self.show() self.init() self.signal_on_current_device_changed.connect(self.on_current_device_changed) threading.Thread(target=self.run_task, daemon=True).start() def init(self): if (self.floating_window is None) == 1: self.floating_window = FloatingWindow() def exit(self): self.running = False self.floating_window and self.floating_window.close() self.exit_cleanup() QApplication.quit() @staticmethod def exit_cleanup(): for proc in psutil.process_iter(): try: proc.name().lower() == 'adb.exe' and proc.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass @property def current_device(self): return self.current_device_property @current_device.setter def current_device(self, value): if value == self.current_device_property: return try: self.current_device_property = value self.signal_on_current_device_changed.emit(value[0], value[1]) except Exception as e: print(e, file=sys.stderr) @staticmethod def get_barcode() -> str: finds = re.findall( 'Converted string: 02.Y.GG.([0-9A-Z.]+[0-9]{6})', ShellExecution.exec(command='adb shell ggec_ft read -main_pcbid', timeout=1.5).stdout ) if finds: return finds[0] else: return '' @staticmethod def get_chip_id() -> str: finds = re.findall( '([0-9A-Za-z]{16,32})\\s*fastboot', ShellExecution.exec(command='fastboot devices', timeout=1.0).stdout ) if finds: return finds[0] else: return '' @staticmethod def get_chip_id_usb() -> str: try: backend = usb.backend.libusb0.get_backend(find_library=lambda x: 'C:\\Windows\\System32\\libusb0.dll') devices = usb.core.find(find_all=True, backend=backend) for d in devices: if d.idVendor == 0x1B8E and d.idProduct == 0xC004 and d.iSerialNumber: return usb.util.get_string(d, d.iSerialNumber) return '' except Exception: return '' def update_device(self): device_methods = {self.get_chip_id_usb: 1, self.get_chip_id: 2, self.get_barcode: 3} device = self.current_device_no_connection for method, value in device_methods.items(): d = method() if d: device = (value, d) break try: self.current_device = device except Exception as e: print(e, file=sys.stderr) @staticmethod def gsc_api_fetch(url: str, app_type: str, app_code: str, app_sign: str, data): date_timestamp = get_date_timestamp() send_data = json.dumps(data) try: response = HTTPRequest.post( url, json_data={ 'type': app_type, 'code': app_code, 'sign': app_sign, 'time': date_timestamp, 'param': send_data, 'token': calculate_md5('%s%s%s' % (send_data, app_sign, date_timestamp)) }, header={ 'Content-Type': 'application/json' }, timeout=1.5 ) info = response.json().get('info') or response.json().get('error') if (not info) == 1: return response.json()['data'] raise Exception(info) except TypeError: raise Exception('Failed to obtain data.') except TimeoutError: raise Exception('Access timeout.') except OSError: raise Exception('Failed to get data from the server.') def lookup_info_from_mes(self, code: str): request_info = 'lcm08GetBsInfoByQrCode' request_area = 'lcm08Area' request_code = 'LCM08' request_sign = '3CA08BA06C6C2EC70A3E7834CE8A127B' request_path = 'http://10.130.97.102:8814/itf/api' code = code.strip() info = self.gsc_api_fetch(request_path, request_info, request_code, request_sign, {{38: 'barcode', 14: 'deviceId'}.get(len(code), 'chipId'): code}) area = self.gsc_api_fetch(request_path, request_area, request_code, request_sign, {'qrCode': info['qrCode']}) data = {**info, **{'model': area['model'], 'color': area['color'], 'region': area['region'], 'localization': area['localization']}} return {'MAIN_BARCODE': data['qrCode'], 'DID': data['deviceId'], 'CHIP_ID': data['chipId'], 'FW_VER': data['firmwareVersion1'], 'MODEL': data['model'], 'COLOR': data['color']} def on_current_device_changed(self, device_type: int, device_id: str): if device_type > 0: try: if device_type in [1, 2, 3, 4, 5]: print('%s%s' % ('', 'Device connected')) self.floating_window and self.floating_window.show_window() self.floating_window and device_type == 1 and self.floating_window.set_alert('正在通过%s连接...' % ('USB',)) self.floating_window and device_type == 2 and self.floating_window.set_alert('正在通过%s连接...' % ('FASTBOOT',)) self.floating_window and device_type == 3 and self.floating_window.set_alert('正在通过%s连接...' % ('ADB',)) data = self.lookup_info_from_mes(device_id) except Exception as e: data = None if e: print('%s%s' % ('', e)) self.floating_window and self.floating_window.set_alert('%s' % (e,)) if (data is not None) == 1: table_data = [['BARCODE', data['MAIN_BARCODE']], ['DID', data['DID']], ['MODEL', data['MODEL']], ['COLOR', data['COLOR']], ['FIRMWARE', data['FW_VER']]] try: if table_data: self.floating_window and self.floating_window.set_table_data(table_data) self.floating_window and self.floating_window.set_image(os.path.join(os.path.dirname(__file__), '%s.PNG' % (data['MODEL'],))) if not bool(re.match(open(os.path.join(os.path.dirname(__file__), '%s.REGEXP' % (data['MODEL'],)), 'r', encoding='utf-8').read(), data['DID'])): self.floating_window and self.floating_window.set_alert('DID:%s与产品型号:%s不匹配,请检查~' % (data['DID'], data['MODEL'],)) self.floating_window and self.floating_window.set_image(os.path.join(os.path.dirname(__file__), 'UNKNOWN.PNG')) except Exception: pass else: print('获取数据失败') else: try: if device_type in [0]: print('%s%s' % ('', 'Device disconnected')) self.floating_window and self.floating_window.hide_window() except Exception as e: print(e) def run_task(self): try: while self.running: time.sleep(0.55) self.update_device() except KeyboardInterrupt: print('Monitor has exited.') self.exit() exit(0) class MainRunner: def __init__(self): signal.signal(signal.SIGINT, self._handle_interrupt) self.app_name = 'MangoSift' self.app_version = '1.0.0.5' self.app_publisher = 'zhaoyafan' self.app_publisher_url = 'https://www.fanscloud.net/' self.application = None self.window = None def _copy_files_and_directories(self, src, dst): function_name = inspect.currentframe().f_code.co_name if (os.path.exists(src)) != 1: return None if (os.path.isdir(src)) == 1: if not os.path.exists(dst): os.makedirs(dst) for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): self.__getattribute__(function_name)(s, d) else: shutil.copy(s, d) else: shutil.copy(src, dst) @staticmethod def add_startup(entry_name, exe_path): if not os.path.exists(exe_path): return None try: key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, 'Software\\Microsoft\\Windows\\CurrentVersion\\Run', 0, winreg.KEY_SET_VALUE) winreg.SetValueEx(key, entry_name, 0, winreg.REG_SZ, exe_path) winreg.CloseKey(key) return True except Exception: return False @staticmethod def del_startup(entry_name): try: key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, 'Software\\Microsoft\\Windows\\CurrentVersion\\Run', 0, winreg.KEY_SET_VALUE) winreg.DeleteValue(key, entry_name) winreg.CloseKey(key) return True except Exception: return False def run(self): exec_home = os.path.dirname(__file__) exec_name = os.path.splitext(os.path.basename(__file__))[0] self.add_startup(exec_name, os.path.join(exec_home, '%s.exe' % (exec_name,))) os.environ['PATH'] = '%s%s%s' % (os.path.abspath('%s%s' % (os.path.dirname(__file__), '/platform-tools')), os.pathsep, os.environ['PATH']) QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) QApplication.setHighDpiScaleFactorRoundingPolicy(Qt.HighDpiScaleFactorRoundingPolicy.PassThrough) self.application = QApplication(sys.argv) self.window = MainWindow(app_name=self.app_name, app_version=self.app_version) sys.exit(self.application.exec_()) def build(self): if (str(__file__).endswith('.py')) == 0: print('Build is not currently supported.', file=sys.stderr) exit(1) from_home = os.path.dirname(os.path.abspath(__file__)) dist_home = os.path.join(os.path.dirname(__file__), '%s.dist' % (os.path.splitext(os.path.basename(__file__))[0],)) ask = input('%s %s: ' % ('Build?', '[Y/n]')) if ask.lower().strip() == 'y': subprocess.run(['pip', 'install', 'nuitka', '-q'], shell=True, env=os.environ) subprocess.run([ 'python', '-m', 'nuitka', '--standalone', '--enable-plugin=pyqt5', '--include-module=PyQt5', '--windows-console-mode=disable', '--windows-icon-from-ico=favicon.ico', '--product-name=%s' % (self.app_name,), '--file-description=%s' % (self.app_name,), '--product-version=%s' % (self.app_version,), '--copyright=Copyright (C) 2025', '--output-dir=%s' % (os.path.join(os.path.dirname(__file__)),), '%s' % (__file__,) ], shell=True, env=os.environ) for i in ['PyQt5', 'platform-tools', 'favicon.ico', 'PSY02355001.PNG', 'PSY02355002.PNG', 'PSY02355003.PNG', 'PSY02355004.PNG', 'PSY02355005.PNG', 'UNKNOWN.PNG', 'PSY02355001.REGEXP', 'PSY02355002.REGEXP', 'PSY02355003.REGEXP', 'PSY02355004.REGEXP', 'PSY02355005.REGEXP']: self._copy_files_and_directories('%s/%s' % (from_home, i), '%s/%s' % (dist_home, i)) else: if (not os.path.exists(dist_home)) == 1: return None ask = input('%s %s: ' % ('Compile setup program?', '[Y/n]')) if ask.lower().strip() == 'y': compile_file = os.path.join(os.path.dirname(__file__), '%s.iss' % (os.path.splitext(os.path.basename(__file__))[0],)) compile_template = os.path.join(os.path.dirname(__file__), '%s.iss.template' % (os.path.splitext(os.path.basename(__file__))[0],)) compiler = 'C:\\Program Files (x86)\\Inno Setup 6\\ISCC.exe' if (os.path.exists(compile_template)) != 1: print('The template file \"%s\" does not exist.' % (compile_template,), file=sys.stderr) return None if (os.path.exists(compiler)) != 1: print('The compiler \"%s\" does not exist. Please check if Inno Setup is installed. You can download it at https://www.innosetup.com/' % (compiler,), file=sys.stderr) return None Path(compile_file).write_text( Path(compile_template).read_text().replace( '%APPNAME%', self.app_name ).replace( '%APPEXEC%', os.path.splitext(os.path.basename(__file__))[0] ).replace( '%APPVERSION%', self.app_version ).replace( '%APPBUILDDATE%', time.strftime('%Y%m%d', time.localtime()) ).replace( '%APPPUBLISHER%', self.app_publisher ).replace( '%APPPUBLISHERURL%', self.app_publisher_url ).replace( '%DISABLEX64%', '' if platform.architecture()[0] == '64bit' else '; ' ) ) subprocess.run([compiler, compile_file]) def _handle_interrupt(self, _signal, _frame): print('Exit.', file=sys.stderr) self.handle_interrupt() def handle_interrupt(self): try: self.window.exit() except Exception as e: print(e, file=sys.stderr) if __name__ == '__main__': if (os.path.basename(__file__).lower().endswith('.int')) == 1: QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins')) else: QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'PyQt5/Qt5/plugins')) f_lock = open(file=os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16]), mode='w', encoding='utf-8') if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1: app = QApplication(sys.argv) msg = QMessageBox() msg.setIcon(QMessageBox.Warning) msg.setText('The application is already running.') msg.setWindowTitle('Warning') msg.setStandardButtons(QMessageBox.Cancel) msg.exec_() app.exit(1) sys.exit(1) else: MainRunner().run() if (len(sys.argv) > 1 and sys.argv[1] == '--build') == 0 else MainRunner().build()