TimeMaster/TimeMaster.py

523 lines
20 KiB
Python

import os
import re
import sys
import time
import json
import ctypes
import ctypes.wintypes
import shutil
import signal
import winreg
import hashlib
import inspect
import tempfile
import platform
import threading
import subprocess
import http.client
import urllib.parse
import datetime
from wmi import WMI
from tzlocal import get_localzone
from pytz import timezone
from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMessageBox, QMenu, QAction
from PyQt5.QtCore import Qt, QCoreApplication, QTimer
from PyQt5.QtGui import QIcon
from pathlib import Path
def _fd(f):
return f.fileno() if hasattr(f, 'fileno') else f
if os.name == 'nt':
import msvcrt
from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref)
from ctypes.wintypes import BOOL, DWORD, HANDLE
LOCK_SH = 0x0
LOCK_NB = 0x1
LOCK_EX = 0x2
LOCK_UN = 0x9
if sizeof(c_ulong) != sizeof(c_void_p):
ULONG_PTR = c_int64
else:
ULONG_PTR = c_ulong
PVOID = c_void_p
class _OFFSET(Structure):
_fields_ = [
('Offset', DWORD),
('OffsetHigh', DWORD)
]
class _OFFSET_UNION(Union):
_fields_ = [
('_offset', _OFFSET),
('Pointer', PVOID)
]
_anonymous_ = ['_offset']
class OVERLAPPED(Structure):
_fields_ = [
('Internal', ULONG_PTR),
('InternalHigh', ULONG_PTR),
('_offset_union', _OFFSET_UNION),
('hEvent', HANDLE)
]
_anonymous_ = ['_offset_union']
LPOVERLAPPED = POINTER(OVERLAPPED)
LockFileEx = windll.kernel32.LockFileEx
LockFileEx.restype = BOOL
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
UnlockFileEx = windll.kernel32.UnlockFileEx
UnlockFileEx.restype = BOOL
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
def flock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
if flags == LOCK_UN:
ret = UnlockFileEx(
hfile,
0,
0,
0xFFFF0000,
byref(overlapped)
)
else:
ret = LockFileEx(
hfile,
flags,
0,
0,
0xFFFF0000,
byref(overlapped)
)
return bool(ret)
else:
try:
import fcntl
LOCK_SH = fcntl.LOCK_SH
LOCK_NB = fcntl.LOCK_NB
LOCK_EX = fcntl.LOCK_EX
LOCK_UN = fcntl.LOCK_UN
except (ImportError, AttributeError):
LOCK_EX = LOCK_SH = LOCK_NB = 0
def flock(f, flags):
return flags == LOCK_UN
else:
def flock(f, flags):
return fcntl.flock(_fd(f), flags) == 0
def request_windows_admin():
if not ctypes.windll.shell32.IsUserAnAdmin():
exec_home = os.path.dirname(__file__)
exec_name = os.path.splitext(os.path.basename(__file__))[0]
exec_path = os.path.join(exec_home, '%s.exe' % (exec_name,))
if os.path.exists(exec_path):
executable_path = exec_path
else:
executable_path = sys.executable
executable_args = "\x20".join(sys.argv)
print('runas admin: %s %s' % (executable_path, executable_args), file=sys.stderr)
ctypes.windll.shell32.ShellExecuteW(None, 'runas', executable_path, executable_args, None, 1)
sys.exit(1)
class HTTPResponse:
status: int
reason: str
result: str
header: dict
def __init__(self, response):
self.status = response[0]
self.reason = response[1]
self.result = response[2]
self.header = response[3]
def json(self):
try:
return json.loads(self.result)
except json.decoder.JSONDecodeError:
return None
class HTTPRequest:
user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
@classmethod
def _parse_headers(cls, headers):
return {key.lower(): value for key, value in headers}
@classmethod
def get(cls, url='', header=None, timeout=15) -> HTTPResponse:
url = str(url)
header = header or {}
header = {**{'User-Agent': cls.user_agent}, **header}
host = urllib.parse.urlparse(url).netloc
conn = http.client.HTTPSConnection(host, timeout=timeout) if urllib.parse.urlparse(url).scheme == 'https' else http.client.HTTPConnection(host, timeout=timeout)
conn.request('GET', url, None, header)
response = conn.getresponse()
response_data = response.read().decode('utf-8')
response_headers = cls._parse_headers(response.getheaders())
conn.close()
return HTTPResponse((response.status, response.reason, response_data, response_headers))
@classmethod
def post(cls, url='', data=None, json_data=None, header=None, timeout=15) -> HTTPResponse:
url = str(url)
header = header or {}
data = json.dumps(json_data) if json_data else (data and str(data).encode('utf-8'))
header = {**{'User-Agent': cls.user_agent}, **header}
host = urllib.parse.urlparse(url).netloc
conn = http.client.HTTPSConnection(host, timeout=timeout) if urllib.parse.urlparse(url).scheme == 'https' else http.client.HTTPConnection(host, timeout=timeout)
conn.request('POST', url, data, header)
response = conn.getresponse()
response_data = response.read().decode('utf-8')
response_headers = cls._parse_headers(response.getheaders())
conn.close()
return HTTPResponse((response.status, response.reason, response_data, response_headers))
@classmethod
def head(cls, url='', header=None, timeout=15) -> HTTPResponse:
url = str(url)
header = header or {}
header = {**{'User-Agent': cls.user_agent}, **header}
host = urllib.parse.urlparse(url).netloc
conn = http.client.HTTPSConnection(host, timeout=timeout) if urllib.parse.urlparse(url).scheme == 'https' else http.client.HTTPConnection(host, timeout=timeout)
conn.request('HEAD', url, None, header)
response = conn.getresponse()
response_data = response.read().decode('utf-8')
response_headers = cls._parse_headers(response.getheaders())
conn.close()
return HTTPResponse((response.status, response.reason, response_data, response_headers))
class MainWindow(QSystemTrayIcon):
def __init__(self, app_name, app_version):
self.app_name = app_name
self.app_version = app_version
self.last_sync_time = 0
self.last_sync_exception = ''
self.is_synchronizing = None
super().__init__(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')), None)
self.setToolTip('')
self.tray_icon_update_timer = QTimer(self)
self.tray_icon_update_timer.timeout.connect(self.on_tray_icon_update)
self.tray_icon_update_timer.start(3000)
self.time_sync_update_timer = QTimer(self)
self.time_sync_update_timer.timeout.connect(self.sync)
self.time_sync_update_timer.start(1000 * 3600)
self.tray_menu = QMenu()
action_list = [
['立即同步', self.sync],
['退出程序', self.exit]
]
for action_item in action_list:
action = QAction(action_item[0], self)
action.triggered.connect(action_item[1])
self.tray_menu.addAction(action)
self.setContextMenu(self.tray_menu)
self.show()
self.init()
def init(self):
self.sync()
self.sync_delayed()
@staticmethod
def exit():
QApplication.quit()
def _sync_execution(self):
try:
self.is_synchronizing = 1
dt = self.get_server_datetime()
set_date = dt and self.set_date(dt[0])
set_time = dt and self.set_time(dt[1])
if (set_date and set_time) == 1:
self.last_sync_time = self.get_runtime()
self.last_sync_exception = ''
else:
self.last_sync_exception = '操作失败,可能需要以管理员身份运行'
except Exception as e:
if 'server' in str(e).lower():
self.last_sync_exception = '网络错误'
finally:
self.is_synchronizing = 0
def sync(self):
if (not self.is_synchronizing) == 1:
print('Synchronizing...')
threading.Thread(target=self._sync_execution).start()
def sync_delayed(self):
QTimer.singleShot(1000 * 12, self.sync)
QTimer.singleShot(1000 * 35, self.sync)
QTimer.singleShot(1000 * 65, self.sync)
@staticmethod
def format_time(secs):
return '刚刚' if secs < 60 else '%s分钟前' % ((secs // 60),) if secs < 3600 else '%s小时前' % (round(secs / 3600),)
@staticmethod
def get_runtime():
return time.perf_counter()
@staticmethod
def set_date(date_str):
try:
process = subprocess.run('date %s' % (date_str,), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2)
if process.returncode == 0:
return True
else:
return False
except subprocess.TimeoutExpired:
pass
return True
@staticmethod
def set_time(time_str):
try:
process = subprocess.run('time %s' % (time_str,), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2)
if process.returncode == 0:
return True
else:
return False
except subprocess.TimeoutExpired:
pass
@staticmethod
def get_datetime_format():
LOCALE_SSHORTDATE = 0x001F
LOCALE_STIMEFORMAT = 0x1003
GetUserDefaultLCID = ctypes.windll.kernel32.GetUserDefaultLCID
GetUserDefaultLCID.argtypes = []
GetUserDefaultLCID.restype = ctypes.wintypes.LCID
GetLocaleInfoW = ctypes.windll.kernel32.GetLocaleInfoW
GetLocaleInfoW.argtypes = [ctypes.wintypes.LCID, ctypes.wintypes.LCTYPE, ctypes.wintypes.LPWSTR, ctypes.c_int]
GetLocaleInfoW.restype = ctypes.c_int
user_lcid = GetUserDefaultLCID()
date_format = ctypes.create_unicode_buffer(1024)
GetLocaleInfoW(user_lcid, LOCALE_SSHORTDATE, date_format, ctypes.sizeof(date_format))
time_format = ctypes.create_unicode_buffer(1024)
GetLocaleInfoW(user_lcid, LOCALE_STIMEFORMAT, time_format, ctypes.sizeof(time_format))
return date_format.value, time_format.value
@staticmethod
def map_system_format_to_strftime(system_format):
format_mapping = {
'dd': '__DATE__', 'd': '__DATE__', 'MM': '__MONTH__', 'M': '__MONTH__', 'yyyy': '__YEAR4__', 'yy': '__YEAR2__', 'HH': '__HOUR24__', 'H': '__HOUR24__',
'hh': '__HOUR12__', 'h': '__HOUR12__', 'mm': '__MINUTE__', 'm': '__MINUTE__', 'ss': '__SECOND__', 's': '__SECOND__',
'tt': '__AMPM__', 't': '__AMPM__'
}
sorted_formats = sorted(format_mapping.keys(), key=len, reverse=True)
for sys_fmt in sorted_formats:
system_format = re.sub('\\b' + re.escape(sys_fmt) + '\\b', format_mapping[sys_fmt], system_format)
final_mapping = {'__DATE__': '%d', '__MONTH__': '%m', '__YEAR4__': '%Y', '__YEAR2__': '%y', '__HOUR24__': '%H', '__HOUR12__': '%I', '__MINUTE__': '%M', '__SECOND__': '%S', '__AMPM__': '%p'}
for placeholder, str_fmt in final_mapping.items():
system_format = system_format.replace(placeholder, str_fmt)
return system_format
def get_server_datetime(self):
try:
gate = WMI().Win32_NetworkAdapterConfiguration(IPEnabled=True)[0].DefaultIPGateway[0]
except Exception:
gate = None
server_list = ['cloud.tencent.com', '1.1.1.2', gate]
date_string = ''
for server in server_list:
if server is not None:
try:
date_string = HTTPRequest.head(url='http://%s/' % (server,), timeout=1).header.get('date')
break
except Exception:
continue
if not date_string:
raise Exception('Failed to get time from server.')
ori_timezone = timezone('GMT')
tar_timezone = timezone(str(get_localzone()))
try:
date_out = ori_timezone.localize(datetime.datetime.strptime(date_string, '%a, %d %b %Y %H:%M:%S %Z')).astimezone(tar_timezone) + datetime.timedelta(seconds=1)
if (int(date_out.timestamp()) > 1735689600) == 1:
dt_fmt = self.get_datetime_format()
return str(date_out.strftime(self.map_system_format_to_strftime(dt_fmt[0]))), str(date_out.strftime(self.map_system_format_to_strftime(dt_fmt[1])))
raise Exception('The time returned by the server is older.')
except ValueError:
raise Exception('Failed to get time from server.')
def on_tray_icon_update(self):
self.setToolTip('%s %s\n上次同步:%s' % (self.app_name, self.app_version, self.last_sync_exception or self.format_time(int(self.get_runtime() - self.last_sync_time))))
class MainRunner:
def __init__(self):
signal.signal(signal.SIGINT, self._handle_interrupt)
self.app_name = '时间同步助手'
self.app_version = '1.0.0.1'
self.app_publisher = 'zhaoyafan'
self.app_publisher_url = 'https://www.fanscloud.net/'
self.application = None
self.window = None
def _copy_files_and_directories(self, src, dst):
function_name = inspect.currentframe().f_code.co_name
if (os.path.exists(src)) != 1:
return None
if (os.path.isdir(src)) == 1:
if not os.path.exists(dst):
os.makedirs(dst)
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
self.__getattribute__(function_name)(s, d)
else:
shutil.copy(s, d)
else:
shutil.copy(src, dst)
@staticmethod
def add_startup(entry_name, exe_path):
if not os.path.exists(exe_path):
return None
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, 'Software\\Microsoft\\Windows\\CurrentVersion\\Run', 0, winreg.KEY_SET_VALUE)
winreg.SetValueEx(key, entry_name, 0, winreg.REG_SZ, exe_path)
winreg.CloseKey(key)
return True
except Exception:
return False
@staticmethod
def del_startup(entry_name):
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, 'Software\\Microsoft\\Windows\\CurrentVersion\\Run', 0, winreg.KEY_SET_VALUE)
winreg.DeleteValue(key, entry_name)
winreg.CloseKey(key)
return True
except Exception:
return False
def run(self):
request_windows_admin()
exec_home = os.path.dirname(__file__)
exec_name = os.path.splitext(os.path.basename(__file__))[0]
self.add_startup(exec_name, os.path.join(exec_home, '%s.exe' % (exec_name,)))
QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True)
QApplication.setHighDpiScaleFactorRoundingPolicy(Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)
self.application = QApplication(sys.argv)
self.window = MainWindow(app_name=self.app_name, app_version=self.app_version)
sys.exit(self.application.exec_())
def build(self):
if (str(__file__).endswith('.py')) == 0:
print('Build is not currently supported.', file=sys.stderr)
exit(1)
from_home = os.path.dirname(os.path.abspath(__file__))
dist_home = os.path.join(os.path.dirname(__file__), '%s.dist' % (os.path.splitext(os.path.basename(__file__))[0],))
ask = input('%s %s: ' % ('Build?', '[Y/n]'))
if ask.lower().strip() == 'y':
subprocess.run(['pip', 'install', 'nuitka', '-q'], shell=True, env=os.environ)
subprocess.run([
'python',
'-m',
'nuitka',
'--standalone',
'--enable-plugin=pyqt5',
'--include-module=PyQt5',
'--windows-console-mode=disable',
'--windows-icon-from-ico=favicon.ico',
'--product-name=%s' % (self.app_name,),
'--file-description=%s' % (self.app_name,),
'--product-version=%s' % (self.app_version,),
'--copyright=Copyright (C) 2025',
'--output-dir=%s' % (os.path.join(os.path.dirname(__file__)),),
'%s' % (__file__,)
], shell=True, env=os.environ)
for i in ['PyQt5', 'favicon.ico']:
self._copy_files_and_directories('%s/%s' % (from_home, i), '%s/%s' % (dist_home, i))
else:
if (not os.path.exists(dist_home)) == 1:
return None
ask = input('%s %s: ' % ('Compile setup program?', '[Y/n]'))
if ask.lower().strip() == 'y':
compile_file = os.path.join(os.path.dirname(__file__), '%s.iss' % (os.path.splitext(os.path.basename(__file__))[0],))
compile_template = os.path.join(os.path.dirname(__file__), '%s.iss.template' % (os.path.splitext(os.path.basename(__file__))[0],))
compiler = 'C:\\Program Files (x86)\\Inno Setup 6\\ISCC.exe'
if (os.path.exists(compile_template)) != 1:
print('The template file \"%s\" does not exist.' % (compile_template,), file=sys.stderr)
return None
if (os.path.exists(compiler)) != 1:
print('The compiler \"%s\" does not exist. Please check if Inno Setup is installed. You can download it at https://www.innosetup.com/' % (compiler,),
file=sys.stderr)
return None
Path(compile_file).write_text(
Path(compile_template).read_text().replace(
'%APPNAME%',
self.app_name
).replace(
'%APPEXEC%',
os.path.splitext(os.path.basename(__file__))[0]
).replace(
'%APPVERSION%',
self.app_version
).replace(
'%APPBUILDDATE%',
time.strftime('%Y%m%d', time.localtime())
).replace(
'%APPPUBLISHER%',
self.app_publisher
).replace(
'%APPPUBLISHERURL%',
self.app_publisher_url
).replace(
'%DISABLEX64%',
'' if platform.architecture()[0] == '64bit' else '; '
)
)
subprocess.run([compiler, compile_file])
def _handle_interrupt(self, _signal, _frame):
print('Exit.', file=sys.stderr)
self.handle_interrupt()
def handle_interrupt(self):
try:
self.window.exit()
except Exception as e:
print(e, file=sys.stderr)
if __name__ == '__main__':
if (os.path.basename(__file__).lower().endswith('.int')) == 1:
QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins'))
else:
QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'PyQt5/Qt5/plugins'))
f_lock = open(file=os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16]), mode='w', encoding='utf-8')
if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1:
app = QApplication(sys.argv)
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setText('The application is already running.')
msg.setWindowTitle('Warning')
msg.setStandardButtons(QMessageBox.Cancel)
msg.exec_()
app.exit(1)
sys.exit(1)
else:
MainRunner().run() if (len(sys.argv) > 1 and sys.argv[1] == '--build') == 0 else MainRunner().build()