import os import re import sys import time import json import ctypes import ctypes.wintypes import shutil import signal import winreg import hashlib import inspect import tempfile import platform import threading import subprocess import http.client import urllib.parse import datetime from wmi import WMI from tzlocal import get_localzone from pytz import timezone from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMessageBox, QMenu, QAction from PyQt5.QtCore import Qt, QCoreApplication, QTimer from PyQt5.QtGui import QIcon from pathlib import Path def _fd(f): return f.fileno() if hasattr(f, 'fileno') else f if os.name == 'nt': import msvcrt from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref) from ctypes.wintypes import BOOL, DWORD, HANDLE LOCK_SH = 0x0 LOCK_NB = 0x1 LOCK_EX = 0x2 LOCK_UN = 0x9 if sizeof(c_ulong) != sizeof(c_void_p): ULONG_PTR = c_int64 else: ULONG_PTR = c_ulong PVOID = c_void_p class _OFFSET(Structure): _fields_ = [ ('Offset', DWORD), ('OffsetHigh', DWORD) ] class _OFFSET_UNION(Union): _fields_ = [ ('_offset', _OFFSET), ('Pointer', PVOID) ] _anonymous_ = ['_offset'] class OVERLAPPED(Structure): _fields_ = [ ('Internal', ULONG_PTR), ('InternalHigh', ULONG_PTR), ('_offset_union', _OFFSET_UNION), ('hEvent', HANDLE) ] _anonymous_ = ['_offset_union'] LPOVERLAPPED = POINTER(OVERLAPPED) LockFileEx = windll.kernel32.LockFileEx LockFileEx.restype = BOOL LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED] UnlockFileEx = windll.kernel32.UnlockFileEx UnlockFileEx.restype = BOOL UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED] def flock(f, flags): hfile = msvcrt.get_osfhandle(_fd(f)) overlapped = OVERLAPPED() if flags == LOCK_UN: ret = UnlockFileEx( hfile, 0, 0, 0xFFFF0000, byref(overlapped) ) else: ret = LockFileEx( hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped) ) return bool(ret) else: try: import fcntl LOCK_SH = fcntl.LOCK_SH LOCK_NB = fcntl.LOCK_NB LOCK_EX = fcntl.LOCK_EX LOCK_UN = fcntl.LOCK_UN except (ImportError, AttributeError): LOCK_EX = LOCK_SH = LOCK_NB = 0 def flock(f, flags): return flags == LOCK_UN else: def flock(f, flags): return fcntl.flock(_fd(f), flags) == 0 def request_windows_admin(): if not ctypes.windll.shell32.IsUserAnAdmin(): exec_home = os.path.dirname(__file__) exec_name = os.path.splitext(os.path.basename(__file__))[0] exec_path = os.path.join(exec_home, '%s.exe' % (exec_name,)) if os.path.exists(exec_path): executable_path = exec_path else: executable_path = sys.executable executable_args = "\x20".join(sys.argv) print('runas admin: %s %s' % (executable_path, executable_args), file=sys.stderr) ctypes.windll.shell32.ShellExecuteW(None, 'runas', executable_path, executable_args, None, 1) sys.exit(1) class HTTPResponse: status: int reason: str result: str header: dict def __init__(self, response): self.status = response[0] self.reason = response[1] self.result = response[2] self.header = response[3] def json(self): try: return json.loads(self.result) except json.decoder.JSONDecodeError: return None class HTTPRequest: user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36' @classmethod def _parse_headers(cls, headers): return {key.lower(): value for key, value in headers} @classmethod def get(cls, url='', header=None, timeout=15) -> HTTPResponse: url = str(url) header = header or {} header = {**{'User-Agent': cls.user_agent}, **header} host = urllib.parse.urlparse(url).netloc conn = http.client.HTTPSConnection(host, timeout=timeout) if urllib.parse.urlparse(url).scheme == 'https' else http.client.HTTPConnection(host, timeout=timeout) conn.request('GET', url, None, header) response = conn.getresponse() response_data = response.read().decode('utf-8') response_headers = cls._parse_headers(response.getheaders()) conn.close() return HTTPResponse((response.status, response.reason, response_data, response_headers)) @classmethod def post(cls, url='', data=None, json_data=None, header=None, timeout=15) -> HTTPResponse: url = str(url) header = header or {} data = json.dumps(json_data) if json_data else (data and str(data).encode('utf-8')) header = {**{'User-Agent': cls.user_agent}, **header} host = urllib.parse.urlparse(url).netloc conn = http.client.HTTPSConnection(host, timeout=timeout) if urllib.parse.urlparse(url).scheme == 'https' else http.client.HTTPConnection(host, timeout=timeout) conn.request('POST', url, data, header) response = conn.getresponse() response_data = response.read().decode('utf-8') response_headers = cls._parse_headers(response.getheaders()) conn.close() return HTTPResponse((response.status, response.reason, response_data, response_headers)) @classmethod def head(cls, url='', header=None, timeout=15) -> HTTPResponse: url = str(url) header = header or {} header = {**{'User-Agent': cls.user_agent}, **header} host = urllib.parse.urlparse(url).netloc conn = http.client.HTTPSConnection(host, timeout=timeout) if urllib.parse.urlparse(url).scheme == 'https' else http.client.HTTPConnection(host, timeout=timeout) conn.request('HEAD', url, None, header) response = conn.getresponse() response_data = response.read().decode('utf-8') response_headers = cls._parse_headers(response.getheaders()) conn.close() return HTTPResponse((response.status, response.reason, response_data, response_headers)) class MainWindow(QSystemTrayIcon): def __init__(self, app_name, app_version): self.app_name = app_name self.app_version = app_version self.last_sync_time = 0 self.last_sync_exception = '' self.is_synchronizing = None super().__init__(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')), None) self.setToolTip('') self.tray_icon_update_timer = QTimer(self) self.tray_icon_update_timer.timeout.connect(self.on_tray_icon_update) self.tray_icon_update_timer.start(3000) self.time_sync_update_timer = QTimer(self) self.time_sync_update_timer.timeout.connect(self.sync) self.time_sync_update_timer.start(1000 * 3600) self.tray_menu = QMenu() action_list = [ ['立即同步', self.sync], ['退出程序', self.exit] ] for action_item in action_list: action = QAction(action_item[0], self) action.triggered.connect(action_item[1]) self.tray_menu.addAction(action) self.setContextMenu(self.tray_menu) self.show() self.init() def init(self): self.sync() self.sync_delayed() @staticmethod def exit(): QApplication.quit() def _sync_execution(self): try: self.is_synchronizing = 1 dt = self.get_server_datetime() set_date = dt and self.set_date(dt[0]) set_time = dt and self.set_time(dt[1]) if (set_date and set_time) == 1: self.last_sync_time = self.get_runtime() self.last_sync_exception = '' else: self.last_sync_exception = '操作失败,可能需要以管理员身份运行' except Exception as e: if 'server' in str(e).lower(): self.last_sync_exception = '网络错误' finally: self.is_synchronizing = 0 def sync(self): if (not self.is_synchronizing) == 1: print('Synchronizing...') threading.Thread(target=self._sync_execution).start() def sync_delayed(self): QTimer.singleShot(1000 * 12, self.sync) QTimer.singleShot(1000 * 35, self.sync) QTimer.singleShot(1000 * 65, self.sync) @staticmethod def format_time(secs): return '刚刚' if secs < 60 else '%s分钟前' % ((secs // 60),) if secs < 3600 else '%s小时前' % (round(secs / 3600),) @staticmethod def get_runtime(): return time.perf_counter() @staticmethod def set_date(date_str): try: process = subprocess.run('date %s' % (date_str,), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2) if process.returncode == 0: return True else: return False except subprocess.TimeoutExpired: pass return True @staticmethod def set_time(time_str): try: process = subprocess.run('time %s' % (time_str,), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2) if process.returncode == 0: return True else: return False except subprocess.TimeoutExpired: pass @staticmethod def get_datetime_format(): LOCALE_SSHORTDATE = 0x001F LOCALE_STIMEFORMAT = 0x1003 GetUserDefaultLCID = ctypes.windll.kernel32.GetUserDefaultLCID GetUserDefaultLCID.argtypes = [] GetUserDefaultLCID.restype = ctypes.wintypes.LCID GetLocaleInfoW = ctypes.windll.kernel32.GetLocaleInfoW GetLocaleInfoW.argtypes = [ctypes.wintypes.LCID, ctypes.wintypes.LCTYPE, ctypes.wintypes.LPWSTR, ctypes.c_int] GetLocaleInfoW.restype = ctypes.c_int user_lcid = GetUserDefaultLCID() date_format = ctypes.create_unicode_buffer(1024) GetLocaleInfoW(user_lcid, LOCALE_SSHORTDATE, date_format, ctypes.sizeof(date_format)) time_format = ctypes.create_unicode_buffer(1024) GetLocaleInfoW(user_lcid, LOCALE_STIMEFORMAT, time_format, ctypes.sizeof(time_format)) return date_format.value, time_format.value @staticmethod def map_system_format_to_strftime(system_format): format_mapping = { 'dd': '__DATE__', 'd': '__DATE__', 'MM': '__MONTH__', 'M': '__MONTH__', 'yyyy': '__YEAR4__', 'yy': '__YEAR2__', 'HH': '__HOUR24__', 'H': '__HOUR24__', 'hh': '__HOUR12__', 'h': '__HOUR12__', 'mm': '__MINUTE__', 'm': '__MINUTE__', 'ss': '__SECOND__', 's': '__SECOND__', 'tt': '__AMPM__', 't': '__AMPM__' } sorted_formats = sorted(format_mapping.keys(), key=len, reverse=True) for sys_fmt in sorted_formats: system_format = re.sub('\\b' + re.escape(sys_fmt) + '\\b', format_mapping[sys_fmt], system_format) final_mapping = {'__DATE__': '%d', '__MONTH__': '%m', '__YEAR4__': '%Y', '__YEAR2__': '%y', '__HOUR24__': '%H', '__HOUR12__': '%I', '__MINUTE__': '%M', '__SECOND__': '%S', '__AMPM__': '%p'} for placeholder, str_fmt in final_mapping.items(): system_format = system_format.replace(placeholder, str_fmt) return system_format def get_server_datetime(self): try: gate = WMI().Win32_NetworkAdapterConfiguration(IPEnabled=True)[0].DefaultIPGateway[0] except Exception: gate = None server_list = ['cloud.tencent.com', '1.1.1.2', gate] date_string = '' for server in server_list: if server is not None: try: date_string = HTTPRequest.head(url='http://%s/' % (server,), timeout=1).header.get('date') break except Exception: continue if not date_string: raise Exception('Failed to get time from server.') ori_timezone = timezone('GMT') tar_timezone = timezone(str(get_localzone())) try: date_out = ori_timezone.localize(datetime.datetime.strptime(date_string, '%a, %d %b %Y %H:%M:%S %Z')).astimezone(tar_timezone) + datetime.timedelta(seconds=1) if (int(date_out.timestamp()) > 1735689600) == 1: dt_fmt = self.get_datetime_format() return str(date_out.strftime(self.map_system_format_to_strftime(dt_fmt[0]))), str(date_out.strftime(self.map_system_format_to_strftime(dt_fmt[1]))) raise Exception('The time returned by the server is older.') except ValueError: raise Exception('Failed to get time from server.') def on_tray_icon_update(self): self.setToolTip('%s %s\n上次同步:%s' % (self.app_name, self.app_version, self.last_sync_exception or self.format_time(int(self.get_runtime() - self.last_sync_time)))) class MainRunner: def __init__(self): signal.signal(signal.SIGINT, self._handle_interrupt) self.app_name = '时间同步助手' self.app_version = '1.0.0.1' self.app_publisher = 'zhaoyafan' self.app_publisher_url = 'https://www.fanscloud.net/' self.application = None self.window = None def _copy_files_and_directories(self, src, dst): function_name = inspect.currentframe().f_code.co_name if (os.path.exists(src)) != 1: return None if (os.path.isdir(src)) == 1: if not os.path.exists(dst): os.makedirs(dst) for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): self.__getattribute__(function_name)(s, d) else: shutil.copy(s, d) else: shutil.copy(src, dst) @staticmethod def add_startup(entry_name, exe_path): if not os.path.exists(exe_path): return None try: key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, 'Software\\Microsoft\\Windows\\CurrentVersion\\Run', 0, winreg.KEY_SET_VALUE) winreg.SetValueEx(key, entry_name, 0, winreg.REG_SZ, exe_path) winreg.CloseKey(key) return True except Exception: return False @staticmethod def del_startup(entry_name): try: key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, 'Software\\Microsoft\\Windows\\CurrentVersion\\Run', 0, winreg.KEY_SET_VALUE) winreg.DeleteValue(key, entry_name) winreg.CloseKey(key) return True except Exception: return False def run(self): request_windows_admin() exec_home = os.path.dirname(__file__) exec_name = os.path.splitext(os.path.basename(__file__))[0] self.add_startup(exec_name, os.path.join(exec_home, '%s.exe' % (exec_name,))) QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) QApplication.setHighDpiScaleFactorRoundingPolicy(Qt.HighDpiScaleFactorRoundingPolicy.PassThrough) self.application = QApplication(sys.argv) self.window = MainWindow(app_name=self.app_name, app_version=self.app_version) sys.exit(self.application.exec_()) def build(self): if (str(__file__).endswith('.py')) == 0: print('Build is not currently supported.', file=sys.stderr) exit(1) from_home = os.path.dirname(os.path.abspath(__file__)) dist_home = os.path.join(os.path.dirname(__file__), '%s.dist' % (os.path.splitext(os.path.basename(__file__))[0],)) ask = input('%s %s: ' % ('Build?', '[Y/n]')) if ask.lower().strip() == 'y': subprocess.run(['pip', 'install', 'nuitka', '-q'], shell=True, env=os.environ) subprocess.run([ 'python', '-m', 'nuitka', '--standalone', '--enable-plugin=pyqt5', '--include-module=PyQt5', '--windows-console-mode=disable', '--windows-icon-from-ico=favicon.ico', '--product-name=%s' % (self.app_name,), '--file-description=%s' % (self.app_name,), '--product-version=%s' % (self.app_version,), '--copyright=Copyright (C) 2025', '--output-dir=%s' % (os.path.join(os.path.dirname(__file__)),), '%s' % (__file__,) ], shell=True, env=os.environ) for i in ['PyQt5', 'favicon.ico']: self._copy_files_and_directories('%s/%s' % (from_home, i), '%s/%s' % (dist_home, i)) else: if (not os.path.exists(dist_home)) == 1: return None ask = input('%s %s: ' % ('Compile setup program?', '[Y/n]')) if ask.lower().strip() == 'y': compile_file = os.path.join(os.path.dirname(__file__), '%s.iss' % (os.path.splitext(os.path.basename(__file__))[0],)) compile_template = os.path.join(os.path.dirname(__file__), '%s.iss.template' % (os.path.splitext(os.path.basename(__file__))[0],)) compiler = 'C:\\Program Files (x86)\\Inno Setup 6\\ISCC.exe' if (os.path.exists(compile_template)) != 1: print('The template file \"%s\" does not exist.' % (compile_template,), file=sys.stderr) return None if (os.path.exists(compiler)) != 1: print('The compiler \"%s\" does not exist. Please check if Inno Setup is installed. You can download it at https://www.innosetup.com/' % (compiler,), file=sys.stderr) return None Path(compile_file).write_text( Path(compile_template).read_text().replace( '%APPNAME%', self.app_name ).replace( '%APPEXEC%', os.path.splitext(os.path.basename(__file__))[0] ).replace( '%APPVERSION%', self.app_version ).replace( '%APPBUILDDATE%', time.strftime('%Y%m%d', time.localtime()) ).replace( '%APPPUBLISHER%', self.app_publisher ).replace( '%APPPUBLISHERURL%', self.app_publisher_url ).replace( '%DISABLEX64%', '' if platform.architecture()[0] == '64bit' else '; ' ) ) subprocess.run([compiler, compile_file]) def _handle_interrupt(self, _signal, _frame): print('Exit.', file=sys.stderr) self.handle_interrupt() def handle_interrupt(self): try: self.window.exit() except Exception as e: print(e, file=sys.stderr) if __name__ == '__main__': if (os.path.basename(__file__).lower().endswith('.int')) == 1: QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins')) else: QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'PyQt5/Qt5/plugins')) f_lock = open(file=os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16]), mode='w', encoding='utf-8') if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1: app = QApplication(sys.argv) msg = QMessageBox() msg.setIcon(QMessageBox.Warning) msg.setText('The application is already running.') msg.setWindowTitle('Warning') msg.setStandardButtons(QMessageBox.Cancel) msg.exec_() app.exit(1) sys.exit(1) else: MainRunner().run() if (len(sys.argv) > 1 and sys.argv[1] == '--build') == 0 else MainRunner().build()