Galactic/Galactic.py

2756 lines
104 KiB
Python
Raw Normal View History

2025-02-09 19:56:41 +08:00
import os
import re
import sys
import glob
import json
import time
import ctypes
import base64
import signal
import random
import shutil
import socket
2025-02-24 02:05:25 +08:00
import psutil
2025-02-09 19:56:41 +08:00
import inspect
import uvicorn
import zipfile
import hashlib
2025-02-11 01:01:14 +08:00
import asyncio
2025-02-09 19:56:41 +08:00
import tempfile
import platform
import requests
import requests.adapters
import importlib.util
import threading
import subprocess
2025-02-10 18:25:32 +08:00
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from seleniumwire.webdriver import InspectRequestsMixin, DriverCommonMixin
from seleniumwire.request import Request as SeleniumWireRequest
from seleniumwire.request import Response as SeleniumWireResponse
2025-02-09 19:56:41 +08:00
from selenium.webdriver.common.by import By
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.remote.command import Command
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import NoSuchWindowException
from PyQt5.QtWebEngineWidgets import QWebEngineView, QWebEngineSettings, QWebEngineProfile
from PyQt5.QtWebEngineCore import QWebEngineUrlRequestInterceptor
from PyQt5.QtNetwork import QNetworkProxyFactory
2025-03-01 01:05:40 +08:00
from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox, QSystemTrayIcon, QMenu, QAction, QDesktopWidget, QShortcut, QFileDialog
2025-02-09 19:56:41 +08:00
from PyQt5.QtCore import Qt, QCoreApplication, QTimer, QUrl
2025-02-15 22:54:21 +08:00
from PyQt5.QtGui import QIcon, QKeySequence
2025-02-10 00:06:03 +08:00
from typing import List
2025-02-11 01:01:14 +08:00
from fastapi import FastAPI, Response, WebSocket, UploadFile, File, HTTPException
2025-02-09 19:56:41 +08:00
from pydantic import BaseModel
from starlette.responses import JSONResponse, FileResponse
2025-02-11 01:01:14 +08:00
from starlette.websockets import WebSocketDisconnect
2025-02-09 19:56:41 +08:00
from winotify import Notification, audio
from func_timeout import func_set_timeout, FunctionTimedOut
from pathlib import Path
2025-02-15 22:54:21 +08:00
from urllib.parse import urlparse
2025-02-09 19:56:41 +08:00
def _fd(f):
return f.fileno() if hasattr(f, 'fileno') else f
if os.name == 'nt':
import msvcrt
from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref)
from ctypes.wintypes import BOOL, DWORD, HANDLE
LOCK_SH = 0x0
LOCK_NB = 0x1
LOCK_EX = 0x2
LOCK_UN = 0x9
if sizeof(c_ulong) != sizeof(c_void_p):
ULONG_PTR = c_int64
else:
ULONG_PTR = c_ulong
PVOID = c_void_p
class _OFFSET(Structure):
_fields_ = [
('Offset', DWORD),
('OffsetHigh', DWORD)
]
class _OFFSET_UNION(Union):
_fields_ = [
('_offset', _OFFSET),
('Pointer', PVOID)
]
_anonymous_ = ['_offset']
class OVERLAPPED(Structure):
_fields_ = [
('Internal', ULONG_PTR),
('InternalHigh', ULONG_PTR),
('_offset_union', _OFFSET_UNION),
('hEvent', HANDLE)
]
_anonymous_ = ['_offset_union']
LPOVERLAPPED = POINTER(OVERLAPPED)
LockFileEx = windll.kernel32.LockFileEx
LockFileEx.restype = BOOL
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
UnlockFileEx = windll.kernel32.UnlockFileEx
UnlockFileEx.restype = BOOL
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
def flock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
if flags == LOCK_UN:
ret = UnlockFileEx(
hfile,
0,
0,
0xFFFF0000,
byref(overlapped)
)
else:
ret = LockFileEx(
hfile,
flags,
0,
0,
0xFFFF0000,
byref(overlapped)
)
return bool(ret)
else:
try:
import fcntl
LOCK_SH = fcntl.LOCK_SH
LOCK_NB = fcntl.LOCK_NB
LOCK_EX = fcntl.LOCK_EX
LOCK_UN = fcntl.LOCK_UN
except (ImportError, AttributeError):
LOCK_EX = LOCK_SH = LOCK_NB = 0
def flock(f, flags):
return flags == LOCK_UN
else:
def flock(f, flags):
return fcntl.flock(_fd(f), flags) == 0
2025-02-09 23:58:06 +08:00
2025-02-09 19:56:41 +08:00
def notification_send(app_id=None, title='', message='', reminder=None):
app_id = app_id() if callable(app_id) else app_id
notify = Notification(app_id=app_id, title=title, msg=message, icon=os.path.join(os.path.dirname(__file__), 'favicon.ico'))
notify.set_audio(audio.IM, bool(reminder))
2025-02-09 19:56:41 +08:00
notify.show()
2025-02-09 23:58:06 +08:00
2025-02-09 19:56:41 +08:00
def import_module(file: str):
spec = importlib.util.spec_from_file_location(os.path.splitext(os.path.basename(file))[0], file)
module_from_spec = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module_from_spec)
return module_from_spec
2025-02-24 01:21:42 +08:00
def calculate_execution_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time_ms = (end_time - start_time) * 1000
print('Function \'{}\' with args {} and kwargs {} took {:.3f} ms to execute.'.format(func.__name__, args, kwargs, execution_time_ms), file=sys.stderr)
return result
return wrapper
2025-02-09 23:58:06 +08:00
2025-02-09 19:56:41 +08:00
class BrowserMobileEmulation(dict):
"""
Mobile emulation parameters.
"""
def __init__(self, w=540, h=960, user_agent=None):
du = base64.b64decode(bytes('''
TW96aWxsYS81LjAgKExpbnV4OyBVOyBBbmRyb2lkIDEzOyB6aC1jbjsgMjEwOTEx
OUJDIEJ1aWxkL1RLUTEuMjIwODI5LjAwMikgQXBwbGVXZWJLaXQvNTM3LjM2IChL
SFRNTCwgbGlrZSBHZWNrbykgVmVyc2lvbi80LjAgQ2hyb21lLzk4LjAuNDc1OC4x
MDIgTVFRQnJvd3Nlci8xMy42IE1vYmlsZSBTYWZhcmkvNTM3LjM2
''', encoding='utf-8')).decode()
user_agent = user_agent or du
super().__init__({'w': w, 'h': h, 'user_agent': user_agent})
self.w = self.h = self.user_agent = None
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
class CustomHTTPAdapter(requests.adapters.HTTPAdapter):
def __init__(self, *args, **kwargs):
self.urlparse = urlparse
self.hosts = {}
self.addrs = {}
super().__init__(*args, **kwargs)
@staticmethod
def resolve_host(host):
try:
hosts = requests.get('http://119.29.29.29/d?dn=%s&ip=208.67.222.222' % host).text.replace(',', ';').split(';')
except (requests.exceptions.RequestException, requests.exceptions.ConnectTimeout):
hosts = []
return hosts[0] if len(hosts) > 0 else None
def send(self, request, **kwargs):
req = request
connection_pool_kwargs = self.poolmanager.connection_pool_kw
url_resolve = self.urlparse(req.url)
scheme = url_resolve.scheme
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
ip_address = self.resolve_host(domain)
if ip_address:
self.hosts[domain] = ip_address
self.addrs[ip_address] = domain
req.url = req.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.hosts[domain], addition_port))
if scheme == 'https':
connection_pool_kwargs['assert_hostname'] = domain
connection_pool_kwargs['server_hostname'] = domain
req.headers['Host'] = '%s%s' % (domain, addition_port)
return super().send(req, **kwargs)
def build_response(self, *args, **kwargs):
res = super().build_response(*args, **kwargs)
url_resolve = self.urlparse(res.url)
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
if domain in self.addrs.keys():
res.url = res.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.addrs[domain], addition_port))
return res
class BrowserPathManager:
def __init__(self):
self.webdriver_install_location = tempfile.gettempdir()
@staticmethod
def resolve_browser_version(file: str):
if not os.path.exists(file):
raise FileNotFoundError('The executable file does not exist in %s' % file)
if file.lower().endswith('.exe'):
try:
full_version = subprocess.run(
['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % file],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=5
).stdout.decode('utf-8').strip()
except Exception:
full_version = ''
try:
main_version = full_version.split('.')[0]
except Exception:
main_version = ''
else:
try:
full_version = subprocess.run(
'%s --version' % file,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=5
).stdout.decode('utf-8').strip()
full_version = re.findall('[0-9]+[.\\d+]+', full_version)[-1]
except Exception:
full_version = ''
try:
main_version = full_version.split('.')[0]
except Exception:
main_version = ''
return file, main_version, full_version
@staticmethod
def open_remote_resources(url: str, save_file: str = None, auto_redirects=False, retries=3):
http = requests.Session()
for scheme in ['http://', 'https://']:
http.mount(scheme, CustomHTTPAdapter())
for i in range((retries if retries > 0 else 0) + 1):
try:
with http.get(
url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
},
allow_redirects=auto_redirects,
stream=(save_file is not None)
) as response:
if save_file:
if 200 != response.status_code:
return bool(0)
with open(save_file, 'wb') as filestream:
for chunk in response.iter_content(chunk_size=8192):
filestream.write(chunk)
return bool(1)
else:
if 200 != response.status_code:
return ''
else:
return response.text
except requests.exceptions.ConnectionError:
retries > 0 and time.sleep(0.75 + round(random.random(), 2))
continue
@staticmethod
def find_binary():
plat = sys.platform
find_list = []
plats = ['win32', 'linux', 'darwin']
if plat == plats[0]:
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']:
find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/'))
if plat == plats[1]:
for p in ['/opt/google/chrome', '/usr/bin/google-chrome']:
find_list.append('%s/chrome' % p)
if plat == plats[2]:
for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']:
find_list.append('%s/chrome' % p)
for execute_file in find_list:
try:
if os.path.exists(execute_file):
return execute_file
except Exception:
pass
def find_driver(self, main_version: str):
if not int(main_version) >= 70:
return None
location = '%s%s%s' % (
self.webdriver_install_location,
os.sep,
'chromedriver_%s%s' % (str(main_version), '.exe' if platform.system().lower() == 'windows' else '')
)
return location.replace("\\", '/') if os.path.exists(location) else None
def pull_driver(self, main_version: str):
if not int(main_version) >= 70:
return None
chromedriver_site = 'https://chromedriver.storage.googleapis.com'
latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version))
if '' == latest_release:
return None
plat = sys.platform
match_assets = []
plats = ['win32', 'linux', 'darwin']
child = ['chromedriver.exe', 'chromedriver']
tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1']
if plat == plats[0]:
match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]])
if plat == plats[1]:
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]])
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]])
package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip')
destdir_chromedriver = self.webdriver_install_location
for assets in match_assets:
res_url = '%s/%s/%s' % (chromedriver_site, latest_release, assets[1])
print('Downloading version %s chromedriver %s to %s...' % (latest_release, res_url, destdir_chromedriver), file=sys.stderr)
if self.open_remote_resources(res_url, package_chromedriver):
dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], destdir_chromedriver)
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version))
os.path.exists(dist_chan) and os.remove(dist_chan)
os.rename(dist, dist_chan)
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
os.remove(package_chromedriver)
return dist_chan.replace("\\", '/')
def main(self, binary: str = None, driver: str = None):
binary = binary or self.find_binary()
if not binary:
raise FileNotFoundError('No browser executable file is found on your system, please confirm whether it has been installed')
if not os.path.exists(binary):
raise FileNotFoundError('The executable file does not exist in %s' % binary)
version = self.resolve_browser_version(binary)
if not version:
raise RuntimeError('Failure to get the browser version number failed in %s' % binary)
driver = driver if driver else self.find_driver(version[1])
driver = driver if driver else self.pull_driver(version[1])
if not driver:
raise FileNotFoundError('Not specified the driver path, and try the automatic download failure')
if not os.path.exists(driver):
raise FileNotFoundError('The driver does not exist in %s' % driver)
return binary, driver
class SeleniumClear:
def __init__(self):
self.last = '%s/.selenium_clear_last' % tempfile.gettempdir()
@staticmethod
def clear_selenium():
if platform.uname().system.lower() == 'windows':
user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')]
if user_home[0] and user_home[1]:
try:
shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1]))
except FileNotFoundError:
pass
@staticmethod
def clear_driver_cache():
for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']:
for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)):
try:
shutil.rmtree(i)
except (FileNotFoundError, PermissionError, WindowsError):
pass
@staticmethod
def file_get_contents(file, text=None):
if not os.path.exists(file):
return text
return open(file=file, mode='r', encoding='utf-8').read()
@staticmethod
def file_put_contents(file, text=None):
return open(file=file, mode='w', encoding='utf-8').write(text)
def straight_clear(self):
self.clear_selenium()
self.clear_driver_cache()
self.file_put_contents(self.last, str(int(time.time())))
def auto(self):
try:
int(self.file_get_contents(self.last, '0')) + 86400 < int(time.time()) and self.straight_clear()
except ValueError:
os.remove(self.last)
class PositionTab:
"""
Position for switch tab.
"""
Prev = 'Go-Prev'
Next = 'Go-Next'
class ColorUtils:
"""
Color utils.
"""
@staticmethod
def hex2rgb(color):
color = color[1:].upper()
for x in color:
if x not in '0123456789ABCDEF':
raise Exception('Found invalid hexa character {0}.'.format(x))
if len(color) == 6 or len(color) == 8:
color = '#' + color[0:6]
elif len(color) == 3:
color = '#' + color[0] * 2 + color[1] * 2 + color[2] * 2
else:
raise Exception('Hexa string should be 3, 6 or 8 digits. if 8 digits, last 2 are ignored.')
hexcolor = color[1:]
r, g, b = int(hexcolor[0:2], 16), int(hexcolor[2:4], 16), int(hexcolor[4:6], 16)
return r, g, b
2025-02-09 23:58:06 +08:00
class FetchResult:
def __init__(self, data):
self.status = 0
self.header = {}
self.result = ''
data and self._resolve_data(data)
def _resolve_data(self, data: dict):
self.status = data['status']
self.header = data['header']
self.result = data['result']
def __str__(self):
return '%s: %s\n%s: %s\n%s: %s' % ('STATUS', self.status, 'HEADER', self.header, 'RESULT', self.result)
class EmptyMethod:
2025-02-15 22:56:45 +08:00
@classmethod
def no_method(cls, *args, **kwargs):
pass
def __setitem__(self, key, value):
pass
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
return self.no_method
def __getattr__(self, item):
return self.no_method
2025-02-24 01:21:42 +08:00
class BrowserService(Service):
def __del__(self):
pass
def is_connectable(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.05)
try:
return s.connect_ex(('127.0.0.1', self.port)) == 0
except socket.timeout:
return False
finally:
s.close()
2025-02-24 11:22:57 +08:00
@property
def service_url(self):
return 'http://%s:%s' % ('127.0.0.1', self.port)
2025-02-24 01:21:42 +08:00
class BrowserOptions(Options):
def __del__(self):
pass
2025-02-10 18:25:32 +08:00
class Browser(InspectRequestsMixin, DriverCommonMixin, Chrome):
2025-02-09 19:56:41 +08:00
"""
Browser web driver.
"""
def __init__(
self,
driver: str = None,
binary: str = None,
2025-02-10 18:25:32 +08:00
driver_classes: int = 0,
2025-02-09 19:56:41 +08:00
debugger_address: str = None,
2025-02-10 18:25:32 +08:00
backends_address: str = None,
2025-02-09 19:56:41 +08:00
headless: bool = False,
lang: str = None,
mute: bool = False,
no_images: bool = False,
user_agent: str = None,
http_proxy: str = None,
home: str = None,
window_size: str = None,
window_site: str = None,
mobile_emulation: BrowserMobileEmulation = None,
option_arguments: list = None,
req_interceptor=None,
res_interceptor=None,
):
self.exited = None
2025-02-09 19:56:41 +08:00
self.is_linux = sys.platform.startswith('linux')
SeleniumClear().auto()
# binary, driver = BrowserPathManager().main(binary, driver)
2025-02-09 19:56:41 +08:00
if self.is_linux is bool(1) and not window_size: window_size = '1920x1080'
if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080'
# Initialization settings.
if (isinstance(option_arguments, list)) is bool(0): option_arguments = []
cdplist = []
2025-02-24 01:21:42 +08:00
service = BrowserService()
options = BrowserOptions()
2025-02-09 19:56:41 +08:00
self.cdplist = cdplist
# Delete prompt information of chrome being controlled.
2025-02-10 18:29:32 +08:00
exclude_switches = ['enable-automation', 'enable-logging', 'disable-translate']
2025-02-09 19:56:41 +08:00
options.add_experimental_option('debuggerAddress', debugger_address) if debugger_address else options.add_experimental_option('excludeSwitches', exclude_switches)
# Mobile emulation parameter setting start.
if mobile_emulation and not debugger_address:
self.w_browser = mobile_emulation.w + 14
self.h_browser = mobile_emulation.h + 0
self.w_inner_window = mobile_emulation.w + 0
self.h_inner_window = mobile_emulation.h - 86
self.mobile_emulation_screen_w = mobile_emulation.w
self.mobile_emulation_screen_h = mobile_emulation.h
window_size = '%s,%s' % (self.w_browser, self.h_browser)
options.add_experimental_option(
'mobileEmulation', {
'deviceMetrics': {
'width': self.w_inner_window,
'height': self.h_inner_window,
'pixelRatio': 2.75,
'touch': True
},
'userAgent': mobile_emulation.user_agent
}
)
cdplist.append([
'Emulation.setUserAgentOverride', {
'userAgent': mobile_emulation.user_agent,
'userAgentMetadata': {
'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone',
'mobile': True,
'platformVersion': '',
'architecture': '',
'model': ''
}
}
])
else:
self.w_browser = 0
self.h_browser = 0
self.w_inner_window = 0
self.h_inner_window = 0
self.mobile_emulation_screen_w = 0
self.mobile_emulation_screen_h = 0
# Mobile emulation parameter setting end.
# Set browser and webdriver path.
if driver:
service.path = driver
if binary:
options.binary_location = binary
# Add webdriver option arguments.
for i in option_arguments:
options.add_argument(i)
# Set headless mode.
if self.is_linux or headless:
options.add_argument('--headless=new')
# Set no-sandbox mode.
if self.is_linux:
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
# Set language of browser, default is zh-CN.
if lang:
options.add_argument('--lang=%s' % (lang or 'zh-CN'))
hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN')
# Set mute.
if mute:
options.add_argument('--mute-audio=true')
hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr)
# Set no images mode.
if no_images:
options.add_argument('--blink-settings=imagesEnabled=false')
hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr)
# Set default user agent.
if user_agent:
options.add_argument('--user-agent=%s' % user_agent)
hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent)
# Set http proxy for browser.
if http_proxy:
options.add_argument('--proxy-server=http://%s' % http_proxy)
# Set browser window size before startup.
if window_size:
debugger_address or options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ','))
else:
debugger_address or options.add_argument('--start-maximized')
2025-02-10 18:25:32 +08:00
# Start the backend and optimize it.
if (driver_classes == 1 and backends_address is not None) == 1:
for key, value in self._setup_backend({'addr': str(backends_address.split(':')[0]), 'port': int(backends_address.split(':')[1])}).items():
options.set_capability(key, value)
try:
self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.')
self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.')
self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.')
except AttributeError:
pass
else:
self.backend = EmptyMethod()
2025-02-09 19:56:41 +08:00
# Start the browser.
2025-02-10 18:25:32 +08:00
super().__init__(service=service, options=options)
2025-02-09 19:56:41 +08:00
if mobile_emulation:
cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}])
cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}])
cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}])
# Set the request and response interceptor.
if req_interceptor:
2025-02-10 18:25:32 +08:00
try:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.request_interceptor = req_interceptor
except AttributeError:
pass
2025-02-09 19:56:41 +08:00
if res_interceptor:
2025-02-10 18:25:32 +08:00
try:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.response_interceptor = res_interceptor
except AttributeError:
pass
2025-02-09 19:56:41 +08:00
# Sync set http proxy for Selenium-Wire backend.
if http_proxy:
self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy}
# Set browser window size after startup, by default, there will be full screen display window.
if window_size:
try:
self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(','))
except Exception:
pass
# else:
# self.maximize_window()
2025-02-09 19:56:41 +08:00
if window_site:
try:
self.set_window_position(*window_site.replace("\x20", '').split(','))
except Exception:
pass
# Sets a sticky timeout to implicitly wait for an element to be found.
self.implicitly_wait(10)
# Set the amount of time to wait for a page load to complete.
self.set_page_load_timeout(25)
# Open the default page.
home and self.open(home)
@property
def title(self) -> str:
2025-02-09 23:58:06 +08:00
"""
Return current title.
"""
if self.exited:
return ''
2025-02-09 23:58:06 +08:00
self._update_tab_auto_switch()
if (self.current_alert is None) == 1:
2025-02-09 19:56:41 +08:00
return super().title
else:
return ''
@property
def current_url(self) -> str:
2025-02-09 23:58:06 +08:00
"""
Return current url.
"""
if self.exited:
return ''
2025-02-09 23:58:06 +08:00
self._update_tab_auto_switch()
if (self.current_alert is None) == 1:
2025-02-09 19:56:41 +08:00
return super().current_url
else:
return ''
2025-02-09 23:58:06 +08:00
@property
def current_alert(self):
"""
Return current alert object.
"""
if self.exited:
return None
2025-02-09 23:58:06 +08:00
try:
alert = Alert(self)
self.execute(Command.W3C_GET_ALERT_TEXT)
return alert
except Exception:
return None
2025-02-09 19:56:41 +08:00
@property
def current_alert_text(self) -> str:
"""
Return current alert content.
"""
if self.exited:
return ''
2025-02-09 19:56:41 +08:00
try:
return Alert(self).text
except Exception:
return ''
2025-02-09 23:58:06 +08:00
@property
def window_inner_size(self) -> tuple:
"""
Return the page window inner size.
"""
size = self.execute_script('return [window.innerWidth, window.innerHeight];')
return size[0], size[1]
def open(self, url=None):
"""
Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer.
"""
if self.exited:
return None
2025-02-09 23:58:06 +08:00
self._update_tab_auto_switch()
self._update_cdp_command()
return self.get(url)
def turn(self, url=None):
"""
Simulation "window.location.href" jumps, the new page has Referrer.
"""
if self.exited:
return None
2025-02-09 23:58:06 +08:00
return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None)
2025-02-09 19:56:41 +08:00
def wait(self, secs: int | float = 1):
"""
Will sleep waiting.
"""
number_int = int(secs)
number_float = secs - number_int
try:
for i in range(number_int):
time.sleep(1)
else:
time.sleep(number_float)
except (KeyboardInterrupt, InterruptedError):
print('Interrupted', file=sys.stderr)
self.quit()
def quit(self):
2025-02-09 23:58:06 +08:00
"""
Exit the browser.
"""
if self.exited:
return None
self.exited = True
2025-02-11 22:45:42 +08:00
try:
tab_handles = list(self.window_handles)
tab_current = self.current_window_handle
tab_current in tab_handles and tab_handles.remove(tab_current)
tab_handles.append(tab_current)
for tab in tab_handles:
try:
self.switch_to.window(tab)
for _ in range(5):
try:
self.current_alert.dismiss()
except Exception:
break
self.close()
except NoSuchWindowException:
pass
except Exception:
pass
2025-02-09 19:56:41 +08:00
try:
super().quit()
except Exception:
pass
2025-02-24 01:21:42 +08:00
@calculate_execution_time
def quit_backend(self):
"""
Exit the browser backend.
"""
if self.exited:
return None
self.exited = True
try:
super().quit()
except Exception:
pass
2025-02-09 19:56:41 +08:00
def find(self, path, wait_for=False, timeout: float = 5.0, freq: float = 0.5, delay: float = 0.0) -> WebElement:
"""
Use XPath to find an element.
"""
element = self.webdriver_wait(timeout, freq).until(EC.presence_of_element_located((By.XPATH, path))) if wait_for else self.find_element(By.XPATH, path)
delay and self.wait(delay)
element = self.find_element(By.XPATH, path) if delay else element
2025-02-09 23:58:06 +08:00
self._element_highlight(element, '#F8BE5F')
2025-02-09 19:56:41 +08:00
return element
def find_mult(self, path) -> list:
"""
Use XPath to find elements.
"""
element = self.find_elements(By.XPATH, path)
2025-02-09 23:58:06 +08:00
for this_element in element: self._element_highlight(this_element, '#F8BE5F')
2025-02-09 19:56:41 +08:00
return element
def find_mult_random_choice(self, path) -> WebElement:
"""
2025-02-09 23:58:06 +08:00
Use XPath to find elements then random choice one.
2025-02-09 19:56:41 +08:00
"""
element = self.find_elements(By.XPATH, path)
element = random.choice(element)
2025-02-09 23:58:06 +08:00
self._element_highlight(element, '#F8BE5F')
2025-02-09 19:56:41 +08:00
return element
def find_element_by(self, sentence):
"""
Custom find element, pass into a tuple or list.
"""
element = self.find_element(*sentence)
2025-02-09 23:58:06 +08:00
self._element_highlight(element, '#F8BE5F')
2025-02-09 19:56:41 +08:00
return element
def click(self, element):
"""
Click element.
"""
2025-02-09 23:58:06 +08:00
self._element_highlight(element, '#FF0000')
2025-02-09 19:56:41 +08:00
element.click()
2025-02-09 23:58:06 +08:00
def click_simulate(self, element):
2025-02-09 19:56:41 +08:00
"""
Click element for simulate.
"""
2025-02-09 23:58:06 +08:00
self._element_click_effect(element)
2025-02-09 19:56:41 +08:00
self.action_chains().reset_actions()
self.action_chains().click(element).perform()
self.wait(0.1)
def touch(self, x, y):
"""
Click on the coordinate.
"""
self.action_chains().reset_actions()
self.action_chains().move_by_offset(x, y).click().perform()
self.wait(0.1)
def input(self, element, content):
"""
Enter the content to the element.
"""
2025-02-09 23:58:06 +08:00
self._element_highlight(element, '#00B6F1')
2025-02-09 19:56:41 +08:00
self.action_chains().reset_actions()
self.action_chains().send_keys_to_element(element, content).perform()
self.wait(0.1)
def mouse(self, element):
"""
Park the mouse here.
"""
2025-02-09 23:58:06 +08:00
self._element_highlight(element, '#49DC07')
2025-02-09 19:56:41 +08:00
self.action_chains().reset_actions()
self.action_chains().move_to_element(element).perform()
2025-02-09 23:58:06 +08:00
def scroll(self):
"""
Scroll page.
"""
self.action_chains().reset_actions()
self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform()
self.wait(0.8)
def scroll_to(self, pos: int | str):
"""
Scroll to the specified location.
"""
if isinstance(pos, int) and pos > 0:
self.execute_script('window.scrollTo(0, arguments[0]);', pos)
elif pos == 0:
self.execute_script('window.scrollTo(0, 0);')
elif pos == 0 - 1:
self.execute_script('window.scrollTo(0, document.body.scrollHeight);')
else:
pass
self.wait(0.8)
def scroll_to_element(self, element):
"""
Scroll to the specified element location.
"""
self.action_chains().reset_actions()
self.action_chains().scroll_to_element(element).perform()
self.wait(0.8)
2025-02-17 01:21:28 +08:00
def fetch(self, url: str, options: dict, cover_options: dict = None):
2025-02-09 23:58:06 +08:00
"""
Sending http requests using fetch.
"""
return FetchResult(self.execute_async_script('''
var _callback = arguments[arguments.length - 1];
var _url = arguments[0];
var _options = arguments[1];
var _data = {};
fetch(_url, _options)
.then((response) => {
_data.status = response.status;
let headers = {};
response.headers.forEach((value, name) => {
headers[name] = value;
});
_data.header = headers;
return response.text();
})
.then((result) => {
_data.result = result;
_callback(_data);
})
.catch((error) => {
console.error(error);
_callback(null);
});
2025-02-17 01:21:28 +08:00
''', url, {**options, **(cover_options or {})}))
2025-02-09 23:58:06 +08:00
def frame_switch_to(self, element_of_frame):
"""
Switch frame to the specified frame element.
"""
self.switch_to.frame(element_of_frame)
self.wait(0.2)
def frame_switch_to_default(self):
"""
Switch to the default frame.
"""
self.switch_to.default_content()
self.wait(0.2)
2025-02-17 01:21:28 +08:00
def tab_create(self, url: str = None):
2025-02-09 19:56:41 +08:00
"""
Create a new tab and open the URL.
"""
self.switch_to.new_window('tab')
2025-02-09 23:58:06 +08:00
self._update_cdp_command()
2025-02-09 19:56:41 +08:00
url and self.open(url)
2025-02-17 01:21:28 +08:00
def tab_switch(self, tab: int | str = None):
2025-02-09 19:56:41 +08:00
"""
2025-02-09 23:58:06 +08:00
Switch the browser tab page.
2025-02-09 19:56:41 +08:00
"""
handles = self.window_handles
lengths = len(handles)
current = handles.index(self.current_window_handle)
if isinstance(tab, int):
handle = tab
elif tab == PositionTab.Prev:
handle = (current - 1)
elif tab == PositionTab.Next:
handle = (current + 1) % lengths
else:
2025-02-17 01:21:28 +08:00
handle = (current + 0)
2025-02-09 19:56:41 +08:00
self.switch_to.window(handles[handle])
self.wait(0.2)
2025-02-09 23:58:06 +08:00
self._update_cdp_command()
2025-02-09 19:56:41 +08:00
def tab_switch_prev(self):
2025-02-09 23:58:06 +08:00
"""
Switch to the previous tab.
"""
2025-02-09 19:56:41 +08:00
self.tab_switch(PositionTab.Prev)
def tab_switch_next(self):
2025-02-09 23:58:06 +08:00
"""
Switch to next tab.
"""
2025-02-09 19:56:41 +08:00
self.tab_switch(PositionTab.Next)
def tab_cancel(self):
"""
Close the current browser tab page.
"""
handles = self.window_handles
if len(handles):
current = handles.index(self.current_window_handle)
self.close()
current > 0 and self.switch_to.window(handles[current - 1])
self.wait(0.2)
def tab_cancel_all(self):
"""
Close all the browser tab page.
"""
handles = self.window_handles
for i in handles:
self.tab_cancel()
2025-02-09 23:58:06 +08:00
def force_display_element(self, element):
2025-02-09 19:56:41 +08:00
"""
2025-02-09 23:58:06 +08:00
Make hidden element visible and interactive.
2025-02-09 19:56:41 +08:00
"""
2025-02-09 23:58:06 +08:00
self.execute_script(
'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element
)
2025-02-09 19:56:41 +08:00
2025-02-09 23:58:06 +08:00
def screenshot(self) -> bytes:
2025-02-09 19:56:41 +08:00
"""
2025-02-09 23:58:06 +08:00
Screenshot as bytes.
2025-02-09 19:56:41 +08:00
"""
2025-02-09 23:58:06 +08:00
return self.get_screenshot_as_png()
2025-02-09 19:56:41 +08:00
2025-02-09 23:58:06 +08:00
def action_chains(self) -> ActionChains:
2025-02-09 19:56:41 +08:00
"""
2025-02-09 23:58:06 +08:00
Return ActionChains object.
2025-02-09 19:56:41 +08:00
"""
2025-02-09 23:58:06 +08:00
return ActionChains(self)
2025-02-09 19:56:41 +08:00
2025-02-09 23:58:06 +08:00
def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None):
2025-02-09 19:56:41 +08:00
"""
2025-02-09 23:58:06 +08:00
Return WebDriverWait object.
2025-02-09 19:56:41 +08:00
"""
2025-02-09 23:58:06 +08:00
return WebDriverWait(
driver=self,
timeout=timeout,
poll_frequency=poll_frequency,
ignored_exceptions=ignored_exceptions
2025-02-09 19:56:41 +08:00
)
2025-02-09 23:58:06 +08:00
def _element_highlight(self, element=None, color='#ff0000', dura=2500):
2025-02-09 19:56:41 +08:00
"""
Make the element highlight.
"""
if not element:
return False
high = ColorUtils.hex2rgb(color)
r = high[0]
g = high[1]
b = high[2]
self.execute_script('''
let e=arguments[0];
try{
let o=[e.style.background||null,e.style.border||null];
e.style.border="1px solid %s";e.style.background="rgba(%s,%s,%s,0.2)";
if(!e.prominent){
e.prominent=true;
setTimeout(function(args){try{args[0].prominent=null;args[0].style.background=args[1][0];args[0].style.border=args[1][1]}catch(e){}},%s,[e,o]);
}
}catch(e){}
''' % (color, r, g, b, dura), element
)
2025-02-09 23:58:06 +08:00
def _element_click_effect(self, element=None, x: int = 0, y: int = 0):
2025-02-09 19:56:41 +08:00
"""
Make a coordinate click effect.
"""
self.execute_script('''
let e=arguments[0];
let r;
let x;
let y;
if(e!==null){
r=e.getBoundingClientRect();
x=r.left+r.width/2+"px";
y=r.top+r.height/2+"px";
}
else{
x=arguments[1]+"px";
y=arguments[2]+"px";
}
let c=document.createElement("div");
c.style="width:%spx;height:%spx;border-radius:50%%;background-color:rgba(255,0,0,0.18);position:absolute;transform:translate(-50%%,-50%%);transition:opacity 0.5s;border:1px solid #ff3c3c;pointer-events:none";
c.style.zIndex=9999;
c.style.left=x;c.style.top=y;
document.body.appendChild(c);
setTimeout(function(){c.style.opacity=0;setTimeout(function(){document.body.removeChild(c)},999)},200);
let w=%s;
let h=%s;
let d=false;
let i=setInterval(function(){
if((w>%s||h>%s)||d){
d=true;
w-=2;
h-=2;
}
else{
w+=5;
h+=5;
}
c.style.width=w+"px";c.style.height=h+"px";
if((w<=12||h<=12)&&d){clearInterval(i)}
},20);
''' % (0, 0, 0, 0, 30, 30), element, x, y
)
2025-02-09 23:58:06 +08:00
def _update_cdp_command(self):
2025-02-09 19:56:41 +08:00
for cmd in self.cdplist:
self.execute_cdp_cmd(*cmd)
2025-02-09 23:58:06 +08:00
def _update_tab_auto_switch(self):
if (self.current_alert is None) == 1:
2025-02-09 19:56:41 +08:00
try:
self.execute(Command.GET_TITLE)
except Exception:
len(self.window_handles) > 0 and self.switch_to.window(self.window_handles[0])
2025-02-11 01:01:14 +08:00
class BrowserPluginRunningParam(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
2025-02-15 22:54:21 +08:00
@classmethod
def no_method(cls, *args, **kwargs):
pass
2025-02-11 01:01:14 +08:00
def __setitem__(self, key, value):
pass
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except KeyError:
2025-02-15 22:54:21 +08:00
return self.no_method
2025-02-11 01:01:14 +08:00
2025-02-09 19:56:41 +08:00
class BrowserPluginParent:
id = None
name = None
requirements = []
def __init__(self):
self.app_id = None
2025-02-11 01:01:14 +08:00
self.thread = None
2025-03-01 01:05:40 +08:00
self.result_color = None
2025-02-25 00:09:58 +08:00
self.result = None
2025-03-01 01:05:40 +08:00
self.exception = None
2025-02-11 01:01:14 +08:00
self.user_id = None
self.instant_message = None
2025-03-01 01:05:40 +08:00
self.batch_mode = None
self._rs = 0
self._re = 0
@property
def running_time(self):
return (int(time.time()) - self._rs) if self._rs > self._re else (self._re - self._rs)
2025-02-09 19:56:41 +08:00
def _run_task(self, *args, **kwargs):
try:
2025-03-01 01:05:40 +08:00
self._rs = int(time.time())
res = self.running(*args, **kwargs)
if isinstance(res, (tuple, list)):
self.result_color = res[0]
self.result = res[1]
2025-02-09 19:56:41 +08:00
except Exception as e:
2025-03-01 01:05:40 +08:00
self.exception = e
2025-02-11 01:01:14 +08:00
self.message_except(e)
2025-03-01 01:05:40 +08:00
finally:
self._re = int(time.time())
2025-02-09 19:56:41 +08:00
2025-02-11 01:01:14 +08:00
def message(self, message=None):
2025-02-17 15:34:47 +08:00
print('[%s]: %s' % (str(self.app_id), message), file=sys.stderr)
2025-02-15 22:54:21 +08:00
if len(str(message)) <= 4096:
notification_send(app_id=self.app_id, title=self.name, message='%s' % (message,))
2025-03-01 01:05:40 +08:00
if (self.batch_mode is True) == 1:
return None
2025-02-15 22:54:21 +08:00
self.instant_message and self.instant_message(json.dumps({
'time': int(time.time()), 'type': 'message', 'data': {'user_id': str(self.user_id), 'content': str(message), 'title': str(self.app_id)}
}))
2025-02-09 19:56:41 +08:00
2025-02-11 01:01:14 +08:00
def message_except(self, exception_info):
2025-02-09 19:56:41 +08:00
notification_send(app_id=self.app_id, title='%s ' % (self.name,), message='%s' % (exception_info,))
2025-02-11 01:01:14 +08:00
def logging(self, logging=None):
2025-02-17 15:34:47 +08:00
print('[%s]: %s' % (str(self.app_id), logging), file=sys.stderr)
2025-02-15 22:54:21 +08:00
if len(str(logging)) <= 1024:
self.instant_message and self.instant_message(json.dumps({
'time': int(time.time()), 'type': 'logging', 'data': {'user_id': str(self.user_id), 'content': str(logging)}
}))
2025-02-11 01:01:14 +08:00
def run(self, app_id, user_id, driver, requirements, instant_message) -> threading.Thread:
self.app_id, self.user_id, self.instant_message = app_id, user_id, instant_message
params = BrowserPluginRunningParam({'driver': driver, 'message': self.message, 'logging': self.logging, 'requirements': requirements})
thread = threading.Thread(target=self._run_task, args=(params,))
2025-02-09 19:56:41 +08:00
self.thread = thread
2025-03-01 01:05:40 +08:00
thread.daemon = True
2025-02-09 19:56:41 +08:00
thread.start()
return thread
def interrupt(self):
try:
self.thread and self.thread.is_alive() and ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.thread.ident), ctypes.py_object(SystemExit))
except Exception:
pass
finally:
self.thread = None
def state(self):
2025-03-01 01:05:40 +08:00
return self.thread.is_alive() if self.thread else False
2025-02-09 19:56:41 +08:00
@staticmethod
2025-02-11 01:01:14 +08:00
def running(param: BrowserPluginRunningParam):
param.driver.wait(5)
2025-02-09 19:56:41 +08:00
class BrowserPluginFileTest(BrowserPluginParent):
name = '单个文件上传测试'
requirements = [
{
'type': 'file',
'html': '上传文件'
}
]
@staticmethod
2025-02-11 01:01:14 +08:00
def running(param):
param.message(param.requirements)
2025-03-01 01:05:40 +08:00
return '#FFFFFF', '已完成'
2025-02-09 19:56:41 +08:00
class BrowserPluginMultFileTest(BrowserPluginParent):
name = '多个文件上传测试'
requirements = [
{
'type': 'file-mult',
'html': '上传文件'
}
]
@staticmethod
2025-02-11 01:01:14 +08:00
def running(param):
param.message(param.requirements)
2025-03-01 01:05:40 +08:00
return '#FFFFFF', '已完成'
2025-02-09 19:56:41 +08:00
class BrowserPluginTextTest(BrowserPluginParent):
name = '单行文本输入测试'
requirements = [
{
'type': 'text',
'html': '输入文本'
}
]
@staticmethod
2025-02-11 01:01:14 +08:00
def running(param):
param.message(param.requirements)
2025-03-01 01:05:40 +08:00
return '#FFFFFF', '已完成'
2025-02-09 19:56:41 +08:00
class BrowserPluginMultTextTest(BrowserPluginParent):
name = '多行文本输入测试'
requirements = [
{
'type': 'text-mult',
'html': '输入文本'
}
]
@staticmethod
2025-02-11 01:01:14 +08:00
def running(param):
param.message(param.requirements)
2025-03-01 01:05:40 +08:00
return '#FFFFFF', '已完成'
2025-02-09 19:56:41 +08:00
2025-02-11 22:45:42 +08:00
class BrowserPluginLoggingTest(BrowserPluginParent):
name = '在线日志输出测试'
2025-02-09 19:56:41 +08:00
requirements = [
{
2025-02-11 22:45:42 +08:00
'type': 'text-mult',
'html': '输入内容'
2025-02-09 19:56:41 +08:00
}
]
@staticmethod
2025-02-11 01:01:14 +08:00
def running(param):
2025-02-11 22:45:42 +08:00
param.logging(param.requirements[0])
2025-03-01 01:05:40 +08:00
return '#FFFFFF', '已完成'
2025-02-09 19:56:41 +08:00
2025-02-11 22:45:42 +08:00
class BrowserPluginMessageTest(BrowserPluginParent):
name = '在线消息弹窗测试'
2025-02-09 19:56:41 +08:00
requirements = [
{
'type': 'text-mult',
2025-02-11 22:45:42 +08:00
'html': '输入内容'
2025-02-09 19:56:41 +08:00
}
]
@staticmethod
2025-02-11 01:01:14 +08:00
def running(param):
2025-02-11 22:45:42 +08:00
param.message(param.requirements[0])
2025-03-01 01:05:40 +08:00
return '#FFFFFF', '已完成'
2025-02-09 19:56:41 +08:00
2025-02-11 22:45:42 +08:00
class BrowserPluginFileCommandDebug(BrowserPluginParent):
name = '上传脚本进行调试'
requirements = [
{
2025-02-11 22:45:42 +08:00
'type': 'file',
'html': '调试'
}
]
@staticmethod
def running(param):
2025-02-11 22:45:42 +08:00
file = param.requirements[0][0]['path']
exec(open(file, mode='r', encoding='utf-8').read())
2025-03-01 01:05:40 +08:00
return '#FFFFFF', '已完成'
2025-02-11 22:45:42 +08:00
class BrowserPluginTextCommandDebug(BrowserPluginParent):
name = '输入命令进行调试'
requirements = [
{
'type': 'text-mult',
2025-02-11 22:45:42 +08:00
'html': '调试'
}
]
@staticmethod
def running(param):
2025-02-11 22:45:42 +08:00
code = param.requirements[0]
code and exec(code)
2025-03-01 01:05:40 +08:00
return '#FFFFFF', '已完成'
2025-02-09 19:56:41 +08:00
class BrowserManagerDataStorage(dict):
def __init__(self, file: str):
self._data_file = os.path.abspath(file)
super().__init__(self._get_json_data(self._data_file, {}))
@staticmethod
def _get_json_data(file: str, data=None):
if os.path.exists(file):
try:
return json.loads(open(file=file, mode='r', encoding='utf-8').read())
except json.decoder.JSONDecodeError:
return data
return data
@staticmethod
def _put_json_data(file: str, data=None):
with open(file=file, mode='w', encoding='utf-8') as f:
flock(f, LOCK_EX)
res = f.write(json.dumps(data, indent=4, ensure_ascii=True))
flock(f, LOCK_UN)
return res
def __getitem__(self, item):
if item in self:
return super().__getitem__(item)
else:
return None
def __setitem__(self, key, value):
super().__setitem__(key, value)
def save(self):
self._put_json_data(self._data_file, self)
class BrowserManagerUserRunning:
def __init__(
self,
running=None,
user_id: str = None,
user_name: str = None,
user_data_dir: str = None,
remote_debugging_port: int = None
):
if isinstance(running, type(self)):
self.__dict__ = running.__dict__
else:
self.active = 1
self.update_status_last_time = 0
self.user_id = user_id
self.user_name = user_name
self.user_data_dir = user_data_dir
self.remote_debugging_port = remote_debugging_port
self.driver = None
self.plugin = None
self.is_running_property = False
self.url = None
self.title = None
self.alert = None
self.window_handles = None
self.update_status_details()
@property
def is_running(self):
self.update_status_running()
return self.is_running_property
@is_running.setter
def is_running(self, value):
self.is_running_property = value
@property
def is_user_running(self):
try:
open(os.path.join(self.user_data_dir, 'lockfile'), mode='r').close()
return False
except FileNotFoundError:
return False
except PermissionError:
return True
2025-02-09 19:56:41 +08:00
@func_set_timeout(0.05)
def _get_window_handles(self):
return list(self.driver.window_handles)
@func_set_timeout(0.05)
def _get_current_alert_text(self):
return str(self.driver.current_alert_text)
@func_set_timeout(0.05)
def _get_title(self):
return str(self.driver.title)
@func_set_timeout(0.05)
def _get_current_url(self):
return str(self.driver.current_url)
def update_status_running(self):
if (self.is_user_running is False) == 1:
2025-02-09 19:56:41 +08:00
self.chrome_stopped_trigger()
else:
self.chrome_running_trigger()
def update_status_details(self):
if (self.is_user_running is False) == 1:
2025-02-09 19:56:41 +08:00
self.chrome_stopped_trigger()
else:
self.chrome_running_trigger()
if (round(time.time(), 3) - self.update_status_last_time) > round(random.uniform(3, 12), 3) and self.driver and self.active:
try:
self.window_handles = self._get_window_handles()
self.alert = self._get_current_alert_text()
self.title = self._get_title()
self.url = self._get_current_url()
except FunctionTimedOut:
pass
except Exception:
pass
self.update_status_last_time = round(time.time(), 3)
def chrome_running_trigger(self):
if (not self.is_running_property) == 0:
return None
self.is_running = bool(1)
def chrome_stopped_trigger(self):
if (not self.is_running_property) == 1:
return None
self.is_running = bool(0)
self.url = None
self.title = None
self.alert = None
self.window_handles = None
if (not self.plugin) == 0:
plugin = self.plugin
threading.Thread(target=plugin.interrupt).start()
self.plugin = None
if (not self.driver) == 0:
driver = self.driver
threading.Thread(target=driver.quit_backend).start()
2025-02-09 19:56:41 +08:00
self.driver = None
def status(self):
threading.Thread(target=self.update_status_details).start()
return {
'user_id': self.user_id,
'user_name': self.user_name,
'user_data_dir': self.user_data_dir,
'remote_debugging_port': self.remote_debugging_port,
'is_running': self.is_running,
'url': self.url,
'title': self.title,
'alert': self.alert,
2025-03-01 01:05:40 +08:00
'window_handles': self.window_handles,
'plugin_running': self.plugin.state() if self.plugin else False,
'plugin_running_time': self.plugin.running_time if self.plugin else None,
'plugin_execution_result': self.plugin.result if self.plugin else None,
'plugin_execution_result_color': self.plugin.result_color if self.plugin else None
2025-02-09 19:56:41 +08:00
}
def app_id(self):
return '%s|%s' % (self.user_id, self.user_name)
def set_driver(self, driver: Browser = None):
self.driver = driver
def set_plugin(self, plugin: BrowserPluginParent = None):
self.plugin = plugin
def set_user_name(self, user_name: str):
self.user_name = user_name
2025-03-01 01:05:40 +08:00
class BrowserManagerBusy(Exception):
def __init__(self, message):
self.message = message
super().__init__(self.message)
class BrowserPluginBatchExecution:
def __init__(self):
self.plugin_id = None
self.plugin_name = None
self.plugin_time = None
self.users = None
self.plugins = None
self.results = None
self.running = None
def set_plugin_for_monitor(self, users: list, plugins: list, plugin_id: str, plugin_name: str):
if self.running is True:
raise Exception('Running.')
self.plugin_id = plugin_id
self.plugin_name = plugin_name
self.plugin_time = int(time.time())
self.users = users
self.plugins = plugins
threading.Thread(target=self._monitor_until_all_completed).start()
def _monitor_until_all_completed(self):
if not isinstance(self.plugins, list):
return None
self.results = []
self.running = True
while True:
time.sleep(2.5)
if all(not plugin.state() for plugin in self.plugins):
break
for plugin in self.plugins:
self.results.append([plugin.app_id, plugin.running_time, plugin.result])
self.running = False
2025-02-09 19:56:41 +08:00
class BrowserManager:
2025-02-22 17:53:36 +08:00
def __init__(self, runner, driver: str, binary: str, plugin_result_dir: str, manager_data_file: str, browser_data_home: str, browser_init_home: str, use_selenium_wire: int):
2025-02-11 01:01:14 +08:00
self.runner = runner
2025-02-09 19:56:41 +08:00
if not os.path.exists(driver):
raise FileNotFoundError('The driver executable file does not exist.')
if not os.path.exists(binary):
raise FileNotFoundError('The binary executable file does not exist.')
self.driver = driver
self.binary = binary
2025-02-22 17:53:36 +08:00
self.plugin_result_dir = plugin_result_dir
2025-02-09 19:56:41 +08:00
self.manager_data_file = manager_data_file
self.browser_data_home = browser_data_home
self.browser_init_home = browser_init_home
2025-02-10 18:25:32 +08:00
self.use_selenium_wire = use_selenium_wire
2025-02-09 19:56:41 +08:00
self.threading_lock = threading.RLock()
2025-02-10 18:25:32 +08:00
self.debugging_port_range = range(60000, 60256)
2025-02-09 19:56:41 +08:00
self.data_storage = BrowserManagerDataStorage(manager_data_file)
if (not self.data_storage) == 1:
self.data_storage['browser_user'] = {}
self.data_storage.save()
2025-02-22 17:53:36 +08:00
os.path.exists(plugin_result_dir) or os.makedirs(plugin_result_dir)
2025-02-09 19:56:41 +08:00
os.path.exists(browser_data_home) or os.makedirs(browser_data_home)
self.geometry_config = {}
self.user_running = {user_id: self._initialize_user_running(user_id) for user_id in self.data_storage['browser_user'].keys()}
self.user_in_operation = []
self.plugins_int = {}
self.plugins_ext = {}
2025-03-01 01:05:40 +08:00
self.plugin_batch_execution = BrowserPluginBatchExecution()
2025-02-09 19:56:41 +08:00
def _generate_remote_debugging_port(self):
exist_ports = [value['remote_debugging_port'] for key, value in self.data_storage['browser_user'].items()]
return random.choice([p for p in self.debugging_port_range if p not in exist_ports])
def _update_user_running(self):
for user_id, running in self.user_running.items():
if (not running) == 1:
continue
2025-03-01 01:05:40 +08:00
running.update_status_running()
2025-02-09 19:56:41 +08:00
def _get_user_name(self, user_id: str):
return str(self.data_storage['browser_user'][user_id]['user_name'] or '')
def _get_user_data_dir(self, user_id: str):
return os.path.join(self.browser_data_home, self.data_storage['browser_user'][user_id]['user_data_dir'])
def _get_user_remote_debugging_port(self, user_id: str):
return int(self.data_storage['browser_user'][user_id]['remote_debugging_port'])
def _get_user_app_id(self, user_id: str):
user_name = self.data_storage['browser_user'][user_id]['user_name']
2025-02-11 01:01:14 +08:00
return user_name or user_id
2025-02-09 19:56:41 +08:00
def _initialize_user_running(self, user_id: str):
return BrowserManagerUserRunning(
user_id=user_id,
user_name=self._get_user_name(user_id),
user_data_dir=self._get_user_data_dir(user_id),
remote_debugging_port=self._get_user_remote_debugging_port(user_id)
)
@property
def plugins(self) -> dict:
return {**self.plugins_int, **self.plugins_ext}
def handle_interrupt(self):
print('Received interrupt. Ending all backend...', file=sys.stderr)
2025-02-09 19:56:41 +08:00
online_users = self.user_ids_online()
for user_id in online_users:
try:
driver = BrowserManagerUserRunning(self.user_running[user_id]).driver
driver and threading.Thread(target=driver.quit_backend).start()
except Exception:
2025-02-09 19:56:41 +08:00
pass
2025-02-22 17:53:36 +08:00
self.clear_dirs(self.plugin_result_dir)
2025-02-09 19:56:41 +08:00
2025-02-10 18:25:32 +08:00
@staticmethod
def req_interceptor(req: SeleniumWireRequest):
pass
@staticmethod
def res_interceptor(req: SeleniumWireRequest, res: SeleniumWireResponse):
try:
print('[%s] %s %s %s' % (str(req.method).ljust(7), 'HTTP', res.status_code, req.url), file=sys.stderr)
except Exception as e:
print(e, file=sys.stderr)
def run_browser(self, user_data_dir: str, remote_debugging_port: int, proxy_server: str = None):
2025-02-09 19:56:41 +08:00
options = [
2025-02-10 18:25:32 +08:00
self.binary,
2025-02-09 19:56:41 +08:00
'--disable-background-networking',
'--disable-desktop-notifications',
'--disable-component-update',
'--no-default-browser-check',
'--no-first-run',
2025-02-10 18:25:32 +08:00
'--hide-crash-restore-bubble'
2025-02-09 19:56:41 +08:00
]
2025-02-10 18:25:32 +08:00
if user_data_dir:
options.append('--user-data-dir=%s' % (user_data_dir,))
if remote_debugging_port:
options.append('--remote-debugging-port=%s' % (remote_debugging_port,))
if proxy_server:
options.append('--proxy-server=%s' % (proxy_server,))
return subprocess.Popen(options, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
2025-02-09 19:56:41 +08:00
2025-02-22 17:53:36 +08:00
@staticmethod
def clear_dirs(dirs):
if (os.path.exists(dirs)) == 1:
for filename in os.listdir(dirs):
filepath = os.path.join(dirs, filename)
try:
os.path.isfile(filepath) and os.unlink(filepath)
except Exception:
pass
def restore_last_status(self):
for user_id, running in self.user_running.items():
if (not running) == 1:
continue
run = BrowserManagerUserRunning(running)
run.update_status_running()
if run.is_running and run.driver is None:
threading.Thread(target=self.user_run, args=(user_id,)).start()
print('Restoring user control: %s' % (user_id,), file=sys.stderr)
2025-02-09 19:56:41 +08:00
def load_plugins(self, plugins, is_external=0):
if (plugins is None) == 1:
return None
if (not is_external) == 1:
plugin_dict = self.plugins_int
plugin_id_prefix = '00'
else:
plugin_dict = self.plugins_ext
plugin_id_prefix = '01'
plugin_dict.clear()
plugin_length = len(plugins)
for i in range(plugin_length):
plugin_class = plugins[i]
plugin_id = '%s_%s_%s' % (plugin_id_prefix, '{:02d}'.format(i + 1), hashlib.md5(str(plugin_class.__name__).encode(encoding='utf-8')).hexdigest())
plugin_class.id = plugin_id
plugin_dict[plugin_id] = plugin_class
def load_plugins_from_external_module(self):
2025-02-13 00:17:22 +08:00
module_home_list = [os.path.join(os.path.dirname(__file__), 'Packages'), os.path.join(self.runner.app_data, 'Packages')]
for module_home in module_home_list:
if (not os.path.exists(module_home)) == 1:
continue
try:
plugins_classes_site = []
module_list = glob.glob(os.path.join(module_home, 'Plugin*.py'))
for module_path in module_list:
try:
print('Load plugins from \"%s\"' % (module_path,), file=sys.stderr)
plugins_modules = import_module(module_path)
plugins_classes = [type(name, (cls, BrowserPluginParent), {}) for name, cls in plugins_modules.__dict__.items() if
name.startswith('BrowserPlugin') and inspect.isclass(cls)]
plugins_classes_site.append(plugins_classes)
except Exception as e:
print(e, file=sys.stderr)
self.load_plugins([element for sublist in plugins_classes_site for element in sublist], is_external=1)
except Exception as e:
print(e, file=sys.stderr)
2025-02-09 19:56:41 +08:00
def update_geometry_config(self, screen_w: int, screen_h: int, window_w: int, window_h: int, window_x: int, window_y: int):
self.geometry_config = {
'screen_w': screen_w,
'screen_h': screen_h,
'control_window_w': window_w,
'control_window_h': window_h,
'control_window_x': window_x,
'control_window_y': window_y,
'browser_window_w': round(window_h * 1.78) if ((screen_w - window_w) / window_h) > 1.78 else (screen_w - window_w),
'browser_window_h': window_h,
'browser_window_x': window_x + window_w,
'browser_window_y': window_y
}
def user_operate_starting(self, user_id: str):
user_id = str(user_id)
if (user_id not in self.user_in_operation) == 1:
self.user_in_operation.append(user_id)
else:
2025-03-01 01:05:40 +08:00
raise BrowserManagerBusy('Busy.')
2025-02-09 19:56:41 +08:00
def user_operate_complete(self, user_id: str):
user_id = str(user_id)
2025-03-01 01:05:40 +08:00
while user_id in self.user_in_operation:
self.user_in_operation.remove(user_id)
2025-02-09 19:56:41 +08:00
def is_user_data_occupied(self, user_id: str):
2025-02-17 22:38:24 +08:00
try:
open(os.path.join(self._get_user_data_dir(user_id), 'lockfile'), mode='r').close()
2025-02-09 19:56:41 +08:00
return False
2025-02-17 22:38:24 +08:00
except FileNotFoundError:
return False
except PermissionError:
return True
2025-02-09 19:56:41 +08:00
def is_port_in_use(self, port: int):
if (port not in self.debugging_port_range) == 1:
2025-02-09 19:56:41 +08:00
return True
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2025-02-22 17:53:36 +08:00
s.settimeout(0.05)
2025-02-09 19:56:41 +08:00
try:
return s.connect_ex(('127.0.0.1', port)) == 0
except socket.timeout:
return False
finally:
s.close()
2025-02-24 01:21:42 +08:00
@calculate_execution_time
2025-02-09 19:56:41 +08:00
def plugin_run(self, user_id: str, plugin_id: str, requirements=None):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
running = BrowserManagerUserRunning(self.user_running[user_id])
if (running.plugin and running.plugin.state()) == 1:
raise Exception('The plugin is running, please stop it first.')
driver = running.driver
if (driver is None) == 1:
raise Exception('No driver object.')
plugin = self.plugins[plugin_id]()
2025-02-11 01:01:14 +08:00
plugin.run(self._get_user_app_id(user_id), user_id, driver, requirements, self.runner.web_server.websocket_connection_manager.send_broadcast_use_sync)
2025-02-09 19:56:41 +08:00
running.set_plugin(plugin)
self.user_operate_complete(user_id)
2025-03-01 01:05:40 +08:00
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
2025-02-09 19:56:41 +08:00
2025-02-24 01:21:42 +08:00
@calculate_execution_time
2025-02-09 19:56:41 +08:00
def plugin_shutdown(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
running = BrowserManagerUserRunning(self.user_running[user_id])
if (running.plugin and running.plugin.state()) != 1:
raise Exception('The plugin is not running.')
plugin = running.plugin
plugin.interrupt()
self.user_operate_complete(user_id)
2025-03-01 01:05:40 +08:00
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
def plugin_all_users_run(self, plugin_id: str, requirements=None):
try:
if (self.plugin_batch_execution.running is True) == 1:
raise Exception('Still in batch execution.')
users = []
plugins = []
for user_id in self.user_ids_online():
try:
self.plugin_run(user_id, plugin_id, requirements)
users.append(user_id)
plugin = BrowserManagerUserRunning(self.user_running[user_id]).plugin
plugin.batch_mode = True
plugins.append(plugin)
time.sleep(0.1)
except Exception:
pass
self.plugin_batch_execution.set_plugin_for_monitor(users, plugins, plugin_id, self.plugins[plugin_id].name)
except Exception:
raise
def plugin_all_users_shutdown(self):
try:
for user_id in self.plugin_batch_execution.users:
try:
self.plugin_shutdown(user_id)
except Exception:
pass
except Exception:
pass
def plugin_list(self):
return {plugin_id: {'name': plugin_class.name, 'requirements': plugin_class.requirements} for plugin_id, plugin_class in self.plugins.items()}
2025-02-09 19:56:41 +08:00
def user_ids(self):
return [user_id for user_id in self.data_storage['browser_user'].keys()]
def user_ids_online(self):
self._update_user_running()
return [user_id for user_id, running in self.user_running.items() if running and running.is_running]
2025-02-24 01:21:42 +08:00
@calculate_execution_time
2025-03-01 01:05:40 +08:00
def user_all_details(self):
2025-02-09 19:56:41 +08:00
details = {user_id: self.user_running[user_id].status() for user_id in self.user_ids() if user_id in self.user_running.keys()}
2025-03-01 01:05:40 +08:00
return {
'user_online_count': len(self.user_ids_online()),
'user_details': details,
'plugin_batch_execution_id': self.plugin_batch_execution.plugin_id,
'plugin_batch_execution_name': self.plugin_batch_execution.plugin_name,
'plugin_batch_execution_time': self.plugin_batch_execution.plugin_time,
'plugin_batch_execution_status': self.plugin_batch_execution.running,
'plugin_batch_execution_result': self.plugin_batch_execution.results
}
2025-02-09 19:56:41 +08:00
2025-02-24 01:21:42 +08:00
@calculate_execution_time
2025-02-09 19:56:41 +08:00
def user_set_name(self, user_id: str, user_name: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
if (not isinstance(user_name, str) or len(user_name) > 24) == 1:
raise ValueError('Illegal Username.')
user_name = user_name.strip()
self.data_storage['browser_user'][user_id]['user_name'] = user_name
self.data_storage.save()
self.user_running[user_id].set_user_name(user_name)
self.user_operate_complete(user_id)
2025-03-01 01:05:40 +08:00
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
2025-02-09 19:56:41 +08:00
2025-02-22 17:53:36 +08:00
@func_set_timeout(0.55)
2025-02-24 01:21:42 +08:00
@calculate_execution_time
2025-02-09 19:56:41 +08:00
def user_focus_window(self, user_id: str):
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
try:
self.user_running[user_id].driver.minimize_window()
self.user_running[user_id].driver.set_window_rect(
self.geometry_config['browser_window_x'],
self.geometry_config['browser_window_y'],
self.geometry_config['browser_window_w'],
self.geometry_config['browser_window_h']
)
except Exception:
pass
2025-02-24 01:21:42 +08:00
@calculate_execution_time
2025-02-09 19:56:41 +08:00
def user_add(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (not isinstance(user_id, str) or not re.match('^[0-9A-Za-z_-]{1,16}$', user_id)) == 1:
raise ValueError('Invalid user ID format.')
if (user_id in self.user_ids()) == 1:
raise FileExistsError('User ID already exists.')
basename = 'data_%s' % (user_id,)
data_dir = os.path.join(self.browser_data_home, basename)
self.data_storage['browser_user'][user_id] = {
'user_name': '',
'user_data_dir': basename,
'remote_debugging_port': self._generate_remote_debugging_port()
}
self.data_storage.save()
self.user_running[user_id] = self._initialize_user_running(user_id)
if (os.path.exists(self.browser_init_home)) == 1:
os.path.exists(data_dir) or shutil.copytree(self.browser_init_home, data_dir)
else:
os.path.exists(data_dir) or os.makedirs(data_dir)
self.user_operate_complete(user_id)
2025-03-01 01:05:40 +08:00
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
2025-02-09 19:56:41 +08:00
2025-02-24 01:21:42 +08:00
@calculate_execution_time
2025-02-09 19:56:41 +08:00
def user_del(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
if (user_id in self.user_ids_online()) == 1:
raise FileExistsError('User ID is running, please stop it first.')
data_dir = self._get_user_data_dir(user_id)
data_dir_deleted = '%s_deleted' % (data_dir,)
os.path.exists(data_dir) and os.rename(data_dir, data_dir_deleted)
os.path.exists(data_dir_deleted) and shutil.rmtree(data_dir_deleted)
self.data_storage['browser_user'].pop(user_id)
self.data_storage.save()
self.user_running.pop(user_id)
self.user_operate_complete(user_id)
2025-03-01 01:05:40 +08:00
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
2025-02-09 19:56:41 +08:00
2025-02-24 01:21:42 +08:00
@calculate_execution_time
2025-02-09 19:56:41 +08:00
def user_run(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
user_data_dir = self._get_user_data_dir(user_id)
remote_debugging_port = self._get_user_remote_debugging_port(user_id)
mitmproxy_port = remote_debugging_port - 5000
2025-02-09 19:56:41 +08:00
running = BrowserManagerUserRunning(self.user_running[user_id])
running.active = 1
if (self.is_user_data_occupied(user_id)) == 0:
self.run_browser(user_data_dir=user_data_dir, remote_debugging_port=remote_debugging_port, proxy_server='127.0.0.1:%s' % (mitmproxy_port,) if self.use_selenium_wire else None)
2025-02-09 19:56:41 +08:00
driver = Browser(
driver=self.driver,
binary=self.binary,
2025-02-10 18:25:32 +08:00
driver_classes=self.use_selenium_wire,
window_size=('%s,%s' % (self.geometry_config['browser_window_w'], self.geometry_config['browser_window_h'])) if self.geometry_config else None,
window_site=('%s,%s' % (self.geometry_config['browser_window_x'], self.geometry_config['browser_window_y'])) if self.geometry_config else None,
2025-02-10 18:25:32 +08:00
debugger_address='127.0.0.1:%s' % (remote_debugging_port,),
backends_address='127.0.0.1:%s' % (mitmproxy_port,),
req_interceptor=self.use_selenium_wire and self.req_interceptor,
res_interceptor=self.use_selenium_wire and self.res_interceptor
2025-02-09 19:56:41 +08:00
)
running.set_driver(driver)
self.user_operate_complete(user_id)
2025-03-01 01:05:40 +08:00
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
2025-02-09 19:56:41 +08:00
2025-02-24 01:21:42 +08:00
@calculate_execution_time
2025-02-09 19:56:41 +08:00
def user_die(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
debugging_port = self._get_user_remote_debugging_port(user_id)
running = BrowserManagerUserRunning(self.user_running[user_id])
running.active = 0
driver = running.driver
plugin = running.plugin
plugin and plugin.state() and plugin.interrupt()
running.set_driver(None)
2025-02-09 19:56:41 +08:00
if (driver is None) == 1:
raise Exception('No driver object.')
if (self.is_port_in_use(debugging_port)) == 1:
threading.Thread(target=driver.quit).start()
while self.is_user_data_occupied(user_id):
try:
time.sleep(0.35)
except KeyboardInterrupt:
pass
else:
raise Exception('The debug port is not in listening.')
self.user_operate_complete(user_id)
2025-03-01 01:05:40 +08:00
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
class ApplicationSetting(dict):
def __init__(self, file: str):
self._data_file = os.path.abspath(file)
super().__init__(self._get_json_data(self._data_file, {}))
@staticmethod
def _get_json_data(file: str, data=None):
if os.path.exists(file):
try:
return json.loads(open(file=file, mode='r', encoding='utf-8').read())
except json.decoder.JSONDecodeError:
return data
return data
@staticmethod
def _put_json_data(file: str, data=None):
with open(file=file, mode='w', encoding='utf-8') as f:
flock(f, LOCK_EX)
res = f.write(json.dumps(data, indent=4, ensure_ascii=True))
flock(f, LOCK_UN)
return res
def __getitem__(self, item):
if item in self:
return super().__getitem__(item)
else:
return None
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._put_json_data(self._data_file, self)
2025-02-09 19:56:41 +08:00
class MainWindow(QMainWindow):
def __init__(self, runner, app_name: str, app_version: str, scale_rate: float, web_listen_host: str, web_listen_port: int):
super().__init__()
self.runner = runner
try:
if (os.path.exists(self.runner.app_running_file)) == 1:
QMessageBox.information(self, 'Warning', 'The application was not exited normally last time. Please wait for recovery.')
self.runner.web_server.browser_manager.restore_last_status()
except Exception:
pass
finally:
open(self.runner.app_running_file, mode='w').close()
2025-02-09 19:56:41 +08:00
self.scale_rate = scale_rate
self.web_listen_host = web_listen_host
self.web_listen_port = web_listen_port
self.setWindowTitle('%s %s' % (app_name, app_version))
self.setAcceptDrops(True)
self.setWindowFlags(self.windowFlags() | Qt.WindowMinMaxButtonsHint)
screen = QDesktopWidget().screenGeometry()
self.screen_w, self.screen_h = screen.width(), screen.height()
2025-03-01 01:05:40 +08:00
self.window_w, self.window_h = round(455 * self.scale_rate), round(855 * self.scale_rate)
2025-02-09 19:56:41 +08:00
self.window_x, self.window_y = 0, 0
self.setFixedSize(self.window_w, self.window_h)
QNetworkProxyFactory.setUseSystemConfiguration(False)
2025-03-01 01:05:40 +08:00
self.setting = ApplicationSetting(os.path.join(runner.app_data, 'application.json'))
2025-02-09 19:56:41 +08:00
self.webview = WebEngineView()
self.webview_cache_dir = os.path.join(tempfile.gettempdir(), hashlib.md5(bytes('%s_cache' % (__file__,), encoding='utf-8')).hexdigest()[:32])
web_settings = self.webview.settings()
web_settings.setAttribute(QWebEngineSettings.ScrollAnimatorEnabled, True)
web_settings.setAttribute(QWebEngineSettings.Accelerated2dCanvasEnabled, True)
web_settings.setAttribute(QWebEngineSettings.WebGLEnabled, True)
profile = QWebEngineProfile.defaultProfile()
profile.setPersistentStoragePath(self.webview_cache_dir)
profile.setCachePath(self.webview_cache_dir)
self.webview.setZoomFactor(self.scale_rate or 1.0)
self.setCentralWidget(self.webview)
self.setWindowIcon(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')))
self.tray_icon = QSystemTrayIcon(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')), self)
self.tray_icon_update_timer = QTimer(self)
self.tray_icon_update_timer.timeout.connect(self.on_tray_icon_update)
2025-02-24 01:21:42 +08:00
self.tray_icon_update_timer.start(2250)
2025-02-09 19:56:41 +08:00
self.tray_icon.activated.connect(self.on_tray_icon_activated)
self.tray_menu = QMenu()
action_list = [
['调试:全部打开必应搜索', self.on_debug_all_users_open_home],
['调试:全部打开新标签页', self.on_debug_all_users_open_none],
['调试:打印环境变量信息', self.on_debug_print_environment],
['调试:打印模块搜索路径', self.on_debug_print_module_path],
['调试:重新加载外部插件', self.on_debug_reload_external_plugins],
2025-03-01 01:05:40 +08:00
['配置首选文件', self.on_config_preferred_file],
2025-02-09 19:56:41 +08:00
['重载面板', self.webview.reload],
['退出', self.exit]
]
for action_item in action_list:
action = QAction(action_item[0], self)
action.triggered.connect(action_item[1])
self.tray_menu.addAction(action)
self.tray_icon.setContextMenu(self.tray_menu)
self.tray_icon.show()
# QShortcut(QKeySequence('F5'), self).activated.connect(self.on_debug_reload_external_plugins)
2025-02-09 19:56:41 +08:00
self.init()
def closeEvent(self, event):
event.ignore()
if (self.isVisible()) == 1:
self.hide()
else:
print('Terminating...', file=sys.stderr)
self.exit()
def init(self):
self.show()
self.window_position_reset()
self.runner.web_server.browser_manager.update_geometry_config(
screen_w=round(self.screen_w / self.scale_rate),
screen_h=round(self.screen_h / self.scale_rate),
window_w=round(self.geometry().width() / self.scale_rate),
window_h=round(self.geometry().height() / self.scale_rate) + 38,
window_x=self.pos().x(),
window_y=self.pos().y()
)
2025-03-01 01:05:40 +08:00
self.webview.load(QUrl('http://%s:%s/index.html' % ('127.0.0.1' if self.web_listen_host == '0.0.0.0' else self.web_listen_host, self.web_listen_port)))
self.webview.reload()
2025-02-09 19:56:41 +08:00
def exit(self):
self.webview.deleteLater()
self.runner.handle_interrupt()
try:
os.remove(self.runner.app_running_file)
except Exception:
pass
2025-02-09 19:56:41 +08:00
QApplication.quit()
def window_show(self):
self.showNormal()
self.activateWindow()
2025-02-09 19:56:41 +08:00
self.window_position_reset()
def window_hide(self):
self.hide()
def window_toggle(self):
self.window_show()
2025-02-09 19:56:41 +08:00
def window_position_reset(self):
self.setGeometry(self.window_x, self.window_y + (self.frameGeometry().height() - self.geometry().height()), self.window_w, self.window_h)
2025-03-01 01:05:40 +08:00
def on_config_preferred_file(self):
self.window_show()
filename, _ = QFileDialog.getOpenFileName(self, '选择文件', '/', 'All Files (*)')
if (filename and os.path.exists(filename)) == 1:
filename = os.path.abspath(filename)
self.setting['plugin_preferred_file'] = filename
QMessageBox.information(self, '提示', 'The preferred file has changed to %s' % (filename,))
2025-02-09 19:56:41 +08:00
def on_tray_icon_update(self):
self.tray_icon.setToolTip('活跃用户:%s/%s' % (len(self.runner.web_server.browser_manager.user_ids_online()), len(self.runner.web_server.browser_manager.user_ids())))
def on_tray_icon_activated(self, reason):
if (reason == QSystemTrayIcon.Trigger) == 1:
2025-02-09 19:56:41 +08:00
self.window_toggle()
def on_debug_all_users_open_home(self):
for user_id in self.runner.web_server.browser_manager.user_ids_online():
driver = self.runner.web_server.browser_manager.user_running[user_id].driver
driver and threading.Thread(target=driver.open, args=('https://www.bing.com/',)).start()
def on_debug_all_users_open_none(self):
for user_id in self.runner.web_server.browser_manager.user_ids_online():
driver = self.runner.web_server.browser_manager.user_running[user_id].driver
driver and threading.Thread(target=driver.open, args=('chrome://new-tab-page',)).start()
def on_debug_print_environment(self):
self.showNormal()
QMessageBox.information(self, '提示', '\n'.join(['%s=%s' % (key, value) for key, value in os.environ.items()]))
def on_debug_print_module_path(self):
self.showNormal()
QMessageBox.information(self, '提示', '\n'.join(['%s' % (value,) for value in sys.path]))
def on_debug_reload_external_plugins(self):
self.runner.web_server.browser_manager.load_plugins_from_external_module()
self.webview.reload()
class WebEngineViewUrlRequestInterceptor(QWebEngineUrlRequestInterceptor):
def interceptRequest(self, info):
url = info.requestUrl().toString()
url.startswith('file://') and info.redirect(QUrl(''))
webEngineViewUrlRequestInterceptor = WebEngineViewUrlRequestInterceptor()
class WebEngineView(QWebEngineView):
def __init__(self, parent=None):
super().__init__(parent)
self.setContextMenuPolicy(0)
self.setAcceptDrops(True)
self.settings().setFontFamily(self.settings().StandardFont, 'Microsoft YaHei')
self.page().profile().downloadRequested.connect(self.onDownloadRequested)
self.page().profile().setUrlRequestInterceptor(webEngineViewUrlRequestInterceptor)
@staticmethod
def onDownloadRequested(download):
download.accept()
def createWindow(self, wt):
return self
class WebServerAPIJSONData(BaseModel):
data: dict = None
2025-02-11 01:01:14 +08:00
class WebSocketConnectionManager:
def __init__(self):
self.active_connections = []
async def connect(self, websocket: WebSocket, disconnect=None):
if disconnect:
self.active_connections.remove(websocket)
else:
await websocket.accept()
await websocket.send_text('Hello, %s:%s' % (websocket.client.host, websocket.client.port))
self.active_connections.append(websocket)
async def send_broadcast(self, message: str):
dead_connections = []
for conn in self.active_connections:
try:
await conn.send_text(str(message))
except WebSocketDisconnect:
dead_connections.append(conn)
for conn in dead_connections:
await self.connect(conn, 1)
def send_broadcast_use_sync(self, text):
asyncio.run(self.send_broadcast(text))
2025-02-09 19:56:41 +08:00
class WebServer:
2025-02-11 01:01:14 +08:00
def __init__(self, runner, root: str, data: str, default_plugins=None):
self.runner = runner
2025-02-09 19:56:41 +08:00
self.app = FastAPI()
self.www = os.path.join(root, 'www')
self.upload_dir = os.path.join(data, 'upload')
self.browser_manager = BrowserManager(
2025-02-11 01:01:14 +08:00
runner=runner,
driver=os.path.join(data, 'Chrome/chromedriver.exe'),
binary=os.path.join(data, 'Chrome/chrome.exe'),
2025-02-22 17:53:36 +08:00
plugin_result_dir=os.path.join(data, 'results'),
2025-02-09 19:56:41 +08:00
manager_data_file=os.path.join(data, 'manager.json'),
browser_data_home=os.path.join(data, 'users'),
2025-02-10 18:25:32 +08:00
browser_init_home=os.path.join(root, 'initialize'),
2025-02-12 00:11:28 +08:00
use_selenium_wire=0
2025-02-09 19:56:41 +08:00
)
self.browser_manager.load_plugins(default_plugins, is_external=0)
self.browser_manager.load_plugins_from_external_module()
2025-02-11 01:01:14 +08:00
self.websocket_connection_manager = WebSocketConnectionManager()
@self.app.websocket('/instant_message')
async def websocket_instant_message(websocket: WebSocket):
await self.websocket_connection_manager.connect(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
pass
await self.websocket_connection_manager.connect(websocket, 1)
2025-02-09 19:56:41 +08:00
2025-03-01 01:05:40 +08:00
@self.app.api_route(methods=['GET', 'POST'], path='/plugin_preferred_file')
def api_plugin_preferred_file():
file = self.runner.window and self.runner.window.setting['plugin_preferred_file']
if (file and os.path.exists(file)) == 1:
return JSONResponse(status_code=200, content=self.message(0, '', file))
else:
return JSONResponse(status_code=200, content=self.message(0, '', None))
2025-02-09 19:56:41 +08:00
@self.app.api_route(methods=['POST'], path='/plugin_run')
def api_plugin_run(data: WebServerAPIJSONData):
try:
self.browser_manager.plugin_run(data.data['user_id'], data.data['plugin_id'], data.data['requirements'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/plugin_shutdown')
def api_plugin_shutdown(data: WebServerAPIJSONData):
try:
self.browser_manager.plugin_shutdown(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
2025-03-01 01:05:40 +08:00
@self.app.api_route(methods=['POST'], path='/plugin_all_users_run')
def api_plugin_all_users_run(data: WebServerAPIJSONData):
try:
self.browser_manager.plugin_all_users_run(data.data['plugin_id'], data.data['requirements'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/plugin_all_users_shutdown')
def api_plugin_all_users_shutdown():
try:
self.browser_manager.plugin_all_users_shutdown()
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['GET', 'POST'], path='/plugin_list')
def api_plugin_list():
return self.browser_manager.plugin_list()
2025-02-09 19:56:41 +08:00
@self.app.api_route(methods=['POST'], path='/upload_files')
def api_upload_files(files: List[UploadFile] = File(...)):
if (len(files) > 25) == 1:
return JSONResponse(status_code=200, content=self.message(1, 'Exceeded the maximum number of files allowed of 25.'))
if (sum(file.size for file in files) > 50 * 1024 * 1024) == 1:
return JSONResponse(status_code=200, content=self.message(1, 'Total file size exceeds the limit of 50MB.'))
else:
uploaded_files = []
for file in files:
file_save_path = os.path.join(self.upload_dir, '%s_%s' % (int(time.time() * 1000), file.filename))
with open(file_save_path, 'wb') as f:
f.write(file.file.read())
2025-03-01 01:05:40 +08:00
uploaded_files.append(file_save_path)
2025-02-09 19:56:41 +08:00
return JSONResponse(status_code=200, content=self.message(0, '', uploaded_files))
@self.app.api_route(methods=['GET', 'POST'], path='/user_all_details')
def api_user_all_details():
return self.browser_manager.user_all_details()
@self.app.api_route(methods=['POST'], path='/shutdown')
def api_shutdown():
try:
for user_id in self.browser_manager.user_ids_online():
self.browser_manager.user_die(user_id)
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_set_name')
def api_user_set_name(data: WebServerAPIJSONData):
try:
self.browser_manager.user_set_name(data.data['user_id'], data.data['user_name'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_add')
def api_user_add(data: WebServerAPIJSONData):
try:
self.browser_manager.user_add(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_del')
def api_user_del(data: WebServerAPIJSONData):
try:
self.browser_manager.user_del(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_run')
def api_user_run(data: WebServerAPIJSONData):
try:
self.browser_manager.user_run(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_die')
def api_user_die(data: WebServerAPIJSONData):
try:
self.browser_manager.user_die(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_focus_window')
def api_user_focus_window(data: WebServerAPIJSONData):
try:
self.browser_manager.user_focus_window(data.data['user_id'])
return self.message(0)
except FunctionTimedOut:
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['GET'], path='/')
def index():
2025-03-01 01:05:40 +08:00
raise HTTPException(status_code=403)
2025-02-09 19:56:41 +08:00
@self.app.api_route(methods=['GET'], path='/200')
def state():
return Response(content='200', media_type='text/plain')
@self.app.api_route(methods=['GET'], path='/{filename:path}')
def www(filename):
return self.statics(os.path.join(self.www, filename))
@staticmethod
def message(code: int, info: str = '', data=None):
return {'code': int(code), 'info': info, 'data': data}
@staticmethod
def statics(file: str):
if (not os.path.isfile(file)) == 1:
raise HTTPException(status_code=404)
return FileResponse(file)
def handle_interrupt(self):
self.browser_manager.handle_interrupt()
2025-02-22 17:53:36 +08:00
self.clear_upload()
2025-02-09 19:56:41 +08:00
2025-02-22 17:53:36 +08:00
def clear_upload(self):
2025-02-09 19:56:41 +08:00
dirs = self.upload_dir
if (os.path.exists(dirs)) == 1:
for filename in os.listdir(dirs):
filepath = os.path.join(dirs, filename)
try:
os.path.isfile(filepath) and os.unlink(filepath)
except Exception:
pass
def run(self, host='127.0.0.1', port=8080):
os.path.exists(self.www) or os.makedirs(self.www)
os.path.exists(self.upload_dir) or os.makedirs(self.upload_dir)
2025-02-22 17:53:36 +08:00
self.clear_upload()
2025-02-10 18:25:32 +08:00
uvicorn.run(self.app, host=host, port=port, log_level='warning')
2025-02-09 19:56:41 +08:00
class OutputRedirector:
def __init__(self, sys_fp, new_fp):
self.sys_fp = sys_fp
self.new_fp = new_fp
self.time_format = ''
@staticmethod
def time():
return '[%s]\n' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),)
def write(self, s):
self.sys_fp.write(s)
if (self.time_format != self.time()) == 1:
self.time_format = self.time()
self.new_fp.write(self.time_format)
self.new_fp.write(s)
self.flush()
def writelines(self, lines):
self.sys_fp.writelines(lines)
self.new_fp.writelines(lines)
self.flush()
def flush(self):
self.sys_fp.flush()
self.new_fp.flush()
def __getattr__(self, item):
if item in ['write', 'writelines', 'flush']:
return getattr(self, item)
else:
return getattr(self.sys_fp, item)
2025-02-09 19:56:41 +08:00
class MainRunner:
def __init__(self):
signal.signal(signal.SIGINT, self._handle_interrupt)
self.app_root = os.path.dirname(__file__)
appdata = os.getenv('APPDATA')
self.app_data = os.path.join(appdata, 'Galactic') if appdata else os.path.join(os.path.dirname(__file__), 'data')
self.app_running_file = os.path.join(self.app_data, 'running')
2025-02-09 19:56:41 +08:00
self.app_name = 'Galactic'
2025-02-22 17:53:36 +08:00
self.app_version = '1.0.0.3'
2025-02-09 19:56:41 +08:00
self.web_server_host = '127.0.0.1'
self.web_server_port = 8095
self.web_server = None
self.web_server_thread = None
self.application = None
self.application_scale_rate = None
self.window = None
self.plugin_list = [
2025-02-24 01:21:42 +08:00
# BrowserPluginFileTest,
# BrowserPluginTextTest,
# BrowserPluginMultFileTest,
# BrowserPluginMultTextTest,
# BrowserPluginLoggingTest,
# BrowserPluginMessageTest,
# BrowserPluginFileCommandDebug,
# BrowserPluginTextCommandDebug
2025-02-09 19:56:41 +08:00
]
def _copy_files_and_directories(self, src, dst):
function_name = inspect.currentframe().f_code.co_name
if (os.path.exists(src)) != 1:
return None
if (os.path.isdir(src)) == 1:
if not os.path.exists(dst):
os.makedirs(dst)
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
self.__getattribute__(function_name)(s, d)
else:
shutil.copy(s, d)
else:
shutil.copy(src, dst)
@staticmethod
def _remove_directory(src):
if (os.path.isdir(src)) == 1:
shutil.rmtree(src)
@staticmethod
def _rename_directory(src, dst):
if (os.path.isdir(src)) == 1:
os.rename(src, dst)
@staticmethod
def _read_file_text(src):
return open(file=src, mode='r', encoding='utf-8').read() if os.path.exists(src) else ''
2025-02-24 02:05:25 +08:00
@staticmethod
def preprocessing():
for proc in psutil.process_iter():
try:
proc.name().lower() == 'chromedriver.exe' and proc.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
def install_chrome(self):
chrome_install_source = os.path.join(self.app_root, 'Chrome')
chrome_install_source_version = os.path.join(chrome_install_source, 'chrome.version')
chrome_install_target = os.path.join(self.app_data, 'Chrome')
chrome_install_target_version = os.path.join(chrome_install_target, 'chrome.version')
chrome_install_target_newname = '%s.delete' % (chrome_install_target,)
if (os.path.exists(chrome_install_source)) != 1:
return None
if (self._read_file_text(chrome_install_source_version) == self._read_file_text(chrome_install_target_version)) == 1:
return None
if (os.path.exists(chrome_install_target)) == 1:
self._remove_directory(chrome_install_target_newname)
self._rename_directory(chrome_install_target, chrome_install_target_newname)
self._remove_directory(chrome_install_target_newname)
self._copy_files_and_directories(chrome_install_source, chrome_install_target)
return True
2025-02-09 19:56:41 +08:00
def run(self):
os.path.exists(self.app_data) or os.makedirs(self.app_data)
log_dir = os.path.join(self.app_data, 'log')
os.path.exists(log_dir) or os.makedirs(log_dir)
stdout_redirector = OutputRedirector(sys.stdout, open(os.path.join(log_dir, '%s_stdout.log' % (time.strftime('%Y%m%d', time.localtime()),)), mode='a'))
stderr_redirector = OutputRedirector(sys.stderr, open(os.path.join(log_dir, '%s_stderr.log' % (time.strftime('%Y%m%d', time.localtime()),)), mode='a'))
sys.stderr = stderr_redirector
sys.stdout = stdout_redirector
sys.path.append(os.path.join(os.path.dirname(__file__), 'Packages'))
sys.path.append(os.path.join(os.path.dirname(__file__), 'site-packages.zip'))
sys.path.append(os.path.join(self.app_data, 'Packages'))
sys.path.append(os.path.join(self.app_data, 'site-packages.zip'))
2025-03-01 01:05:40 +08:00
print('Startup.', file=sys.stderr)
2025-02-24 02:05:25 +08:00
try:
self.preprocessing()
except Exception:
pass
try:
self.install_chrome() and print('Chrome is now installed.', file=sys.stderr)
except PermissionError:
_app = QApplication(sys.argv)
_msg = QMessageBox()
_msg.setIcon(QMessageBox.Warning)
_msg.setText('Chrome needs to be updated, please quit all browser users first.')
_msg.setWindowTitle('Warning')
_msg.setStandardButtons(QMessageBox.Cancel)
_msg.exec_()
_app.exit(1)
2025-02-11 01:01:14 +08:00
self.web_server = WebServer(runner=self, root=self.app_root, data=self.app_data, default_plugins=self.plugin_list)
2025-02-09 19:56:41 +08:00
self.web_server_thread = threading.Thread(target=self.web_server.run, kwargs={'host': self.web_server_host, 'port': self.web_server_port})
self.web_server_thread.daemon = True
self.web_server_thread.start()
self.application = QApplication(sys.argv)
self.application.setHighDpiScaleFactorRoundingPolicy(Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)
self.application_scale_rate = self.application.screens()[0].logicalDotsPerInch() / 96
2025-02-10 00:06:03 +08:00
self.window = MainWindow(
runner=self,
app_name=self.app_name,
app_version=self.app_version,
scale_rate=self.application_scale_rate,
web_listen_host=self.web_server_host,
web_listen_port=self.web_server_port
)
status = self.application.exec_()
print('Exit status code: %s' % (status,), file=sys.stderr)
sys.exit(status)
2025-02-09 19:56:41 +08:00
def build(self):
if (str(__file__).endswith('.py')) == 0:
print('Build is not currently supported.', file=sys.stderr)
exit(1)
from_home = os.path.dirname(__file__)
dist_home = os.path.join(os.path.dirname(__file__), '%s.dist' % (os.path.splitext(os.path.basename(__file__))[0],))
ask = input('%s %s: ' % ('Build?', '[Y/n]'))
if ask.lower().strip() == 'y':
subprocess.run(['pip', 'install', 'nuitka', '-q'], shell=True, env=os.environ)
subprocess.run([
'python',
'-m',
'nuitka',
'--standalone',
'--enable-plugin=pyqt5',
'--enable-plugin=pylint-warnings',
'--include-module=PyQt5',
'--include-module=selenium',
'--include-module=fastapi',
'--include-module=pydantic',
'--include-module=starlette',
'--windows-console-mode=disable',
2025-02-09 19:56:41 +08:00
'--windows-icon-from-ico=favicon.ico',
'--product-name=%s' % (self.app_name,),
'--file-description=%s' % (self.app_name,),
'--product-version=%s' % (self.app_version,),
'--copyright=Copyright (C) 2025',
'--output-dir=%s' % (os.path.join(os.path.dirname(__file__)),),
'%s' % (__file__,)
], shell=True, env=os.environ)
for i in ['Packages', 'PyQt5', 'Chrome', 'initialize', 'www', 'favicon.ico']:
self._copy_files_and_directories('%s/%s' % (from_home, i), '%s/%s' % (dist_home, i))
else:
if (not os.path.exists(dist_home)) == 1:
return None
ask = input('%s %s: ' % ('Compile setup program?', '[Y/n]'))
if ask.lower().strip() == 'y':
compile_file = os.path.join(os.path.dirname(__file__), '%s.iss' % (os.path.splitext(os.path.basename(__file__))[0],))
compile_template = os.path.join(os.path.dirname(__file__), '%s.iss.template' % (os.path.splitext(os.path.basename(__file__))[0],))
2025-02-09 19:56:41 +08:00
compiler = 'C:\\Program Files (x86)\\Inno Setup 6\\ISCC.exe'
if (os.path.exists(compile_template)) != 1:
print('The template file \"%s\" does not exist.' % (compile_template,), file=sys.stderr)
return None
if (os.path.exists(compiler)) != 1:
2025-02-09 23:58:06 +08:00
print('The compiler \"%s\" does not exist. Please check if Inno Setup is installed. You can download it at https://www.innosetup.com/' % (compiler,),
file=sys.stderr)
2025-02-09 19:56:41 +08:00
return None
2025-02-09 23:58:06 +08:00
Path(compile_file).write_text(
2025-02-11 22:45:42 +08:00
Path(compile_template).read_text().replace(
'%APPNAME%',
self.app_name
).replace(
'%APPEXEC%',
os.path.splitext(os.path.basename(__file__))[0]
).replace(
'%APPVERSION%',
self.app_version
2025-02-13 00:17:22 +08:00
).replace(
'%APPBUILDDATE%',
time.strftime('%Y%m%d', time.localtime())
2025-02-11 22:45:42 +08:00
).replace(
'%DISABLEX64%',
'' if platform.architecture()[0] == '64bit' else '; '
)
)
2025-02-09 19:56:41 +08:00
subprocess.run([compiler, compile_file])
def _handle_interrupt(self, _signal, _frame):
self.handle_interrupt()
def handle_interrupt(self):
try:
self.web_server and self.web_server.handle_interrupt()
except Exception as e:
print(e, file=sys.stderr)
if __name__ == '__main__':
if (os.path.basename(__file__).lower().endswith('.int')) == 1:
QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins'))
else:
QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'PyQt5/Qt5/plugins'))
f_lock = open(file=os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16]), mode='w', encoding='utf-8')
if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1:
app = QApplication(sys.argv)
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setText('The application is already running.')
msg.setWindowTitle('Warning')
msg.setStandardButtons(QMessageBox.Cancel)
msg.exec_()
app.exit(1)
sys.exit(1)
else:
MainRunner().run() if (len(sys.argv) > 1 and sys.argv[1] == '--build') == 0 else MainRunner().build()