Galactic/Galactic.py

2495 lines
94 KiB
Python

import os
import re
import sys
import glob
import json
import time
import ctypes
import base64
import signal
import random
import shutil
import socket
import inspect
import uvicorn
import zipfile
import hashlib
import asyncio
import tempfile
import platform
import requests
import requests.adapters
import importlib.util
import threading
import subprocess
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from seleniumwire.webdriver import InspectRequestsMixin, DriverCommonMixin
from seleniumwire.request import Request as SeleniumWireRequest
from seleniumwire.request import Response as SeleniumWireResponse
from selenium.webdriver.common.by import By
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.remote.command import Command
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import NoSuchWindowException
from PyQt5.QtWebEngineWidgets import QWebEngineView, QWebEngineSettings, QWebEngineProfile
from PyQt5.QtWebEngineCore import QWebEngineUrlRequestInterceptor
from PyQt5.QtNetwork import QNetworkProxyFactory
from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox, QSystemTrayIcon, QMenu, QAction, QDesktopWidget, QShortcut
from PyQt5.QtCore import Qt, QCoreApplication, QTimer, QUrl
from PyQt5.QtGui import QIcon, QKeySequence
from typing import List
from fastapi import FastAPI, Response, WebSocket, UploadFile, File, HTTPException
from pydantic import BaseModel
from starlette.responses import JSONResponse, FileResponse
from starlette.websockets import WebSocketDisconnect
from winotify import Notification, audio
from func_timeout import func_set_timeout, FunctionTimedOut
from pathlib import Path
from urllib.parse import urlparse
def _fd(f):
return f.fileno() if hasattr(f, 'fileno') else f
if os.name == 'nt':
import msvcrt
from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref)
from ctypes.wintypes import BOOL, DWORD, HANDLE
LOCK_SH = 0x0
LOCK_NB = 0x1
LOCK_EX = 0x2
LOCK_UN = 0x9
if sizeof(c_ulong) != sizeof(c_void_p):
ULONG_PTR = c_int64
else:
ULONG_PTR = c_ulong
PVOID = c_void_p
class _OFFSET(Structure):
_fields_ = [
('Offset', DWORD),
('OffsetHigh', DWORD)
]
class _OFFSET_UNION(Union):
_fields_ = [
('_offset', _OFFSET),
('Pointer', PVOID)
]
_anonymous_ = ['_offset']
class OVERLAPPED(Structure):
_fields_ = [
('Internal', ULONG_PTR),
('InternalHigh', ULONG_PTR),
('_offset_union', _OFFSET_UNION),
('hEvent', HANDLE)
]
_anonymous_ = ['_offset_union']
LPOVERLAPPED = POINTER(OVERLAPPED)
LockFileEx = windll.kernel32.LockFileEx
LockFileEx.restype = BOOL
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
UnlockFileEx = windll.kernel32.UnlockFileEx
UnlockFileEx.restype = BOOL
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
def flock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
if flags == LOCK_UN:
ret = UnlockFileEx(
hfile,
0,
0,
0xFFFF0000,
byref(overlapped)
)
else:
ret = LockFileEx(
hfile,
flags,
0,
0,
0xFFFF0000,
byref(overlapped)
)
return bool(ret)
else:
try:
import fcntl
LOCK_SH = fcntl.LOCK_SH
LOCK_NB = fcntl.LOCK_NB
LOCK_EX = fcntl.LOCK_EX
LOCK_UN = fcntl.LOCK_UN
except (ImportError, AttributeError):
LOCK_EX = LOCK_SH = LOCK_NB = 0
def flock(f, flags):
return flags == LOCK_UN
else:
def flock(f, flags):
return fcntl.flock(_fd(f), flags) == 0
def notification_send(app_id=None, title='', message='', reminder=None):
app_id = app_id() if callable(app_id) else app_id
notify = Notification(app_id=app_id, title=title, msg=message, icon=os.path.join(os.path.dirname(__file__), 'favicon.ico'))
notify.set_audio(audio.IM, bool(reminder))
notify.show()
def import_module(file: str):
spec = importlib.util.spec_from_file_location(os.path.splitext(os.path.basename(file))[0], file)
module_from_spec = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module_from_spec)
return module_from_spec
class BrowserMobileEmulation(dict):
"""
Mobile emulation parameters.
"""
def __init__(self, w=540, h=960, user_agent=None):
du = base64.b64decode(bytes('''
TW96aWxsYS81LjAgKExpbnV4OyBVOyBBbmRyb2lkIDEzOyB6aC1jbjsgMjEwOTEx
OUJDIEJ1aWxkL1RLUTEuMjIwODI5LjAwMikgQXBwbGVXZWJLaXQvNTM3LjM2IChL
SFRNTCwgbGlrZSBHZWNrbykgVmVyc2lvbi80LjAgQ2hyb21lLzk4LjAuNDc1OC4x
MDIgTVFRQnJvd3Nlci8xMy42IE1vYmlsZSBTYWZhcmkvNTM3LjM2
''', encoding='utf-8')).decode()
user_agent = user_agent or du
super().__init__({'w': w, 'h': h, 'user_agent': user_agent})
self.w = self.h = self.user_agent = None
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
class CustomHTTPAdapter(requests.adapters.HTTPAdapter):
def __init__(self, *args, **kwargs):
self.urlparse = urlparse
self.hosts = {}
self.addrs = {}
super().__init__(*args, **kwargs)
@staticmethod
def resolve_host(host):
try:
hosts = requests.get('http://119.29.29.29/d?dn=%s&ip=208.67.222.222' % host).text.replace(',', ';').split(';')
except (requests.exceptions.RequestException, requests.exceptions.ConnectTimeout):
hosts = []
return hosts[0] if len(hosts) > 0 else None
def send(self, request, **kwargs):
req = request
connection_pool_kwargs = self.poolmanager.connection_pool_kw
url_resolve = self.urlparse(req.url)
scheme = url_resolve.scheme
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
ip_address = self.resolve_host(domain)
if ip_address:
self.hosts[domain] = ip_address
self.addrs[ip_address] = domain
req.url = req.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.hosts[domain], addition_port))
if scheme == 'https':
connection_pool_kwargs['assert_hostname'] = domain
connection_pool_kwargs['server_hostname'] = domain
req.headers['Host'] = '%s%s' % (domain, addition_port)
return super().send(req, **kwargs)
def build_response(self, *args, **kwargs):
res = super().build_response(*args, **kwargs)
url_resolve = self.urlparse(res.url)
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
if domain in self.addrs.keys():
res.url = res.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.addrs[domain], addition_port))
return res
class BrowserPathManager:
def __init__(self):
self.webdriver_install_location = tempfile.gettempdir()
@staticmethod
def resolve_browser_version(file: str):
if not os.path.exists(file):
raise FileNotFoundError('The executable file does not exist in %s' % file)
if file.lower().endswith('.exe'):
try:
full_version = subprocess.run(
['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % file],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=5
).stdout.decode('utf-8').strip()
except Exception:
full_version = ''
try:
main_version = full_version.split('.')[0]
except Exception:
main_version = ''
else:
try:
full_version = subprocess.run(
'%s --version' % file,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=5
).stdout.decode('utf-8').strip()
full_version = re.findall('[0-9]+[.\\d+]+', full_version)[-1]
except Exception:
full_version = ''
try:
main_version = full_version.split('.')[0]
except Exception:
main_version = ''
return file, main_version, full_version
@staticmethod
def open_remote_resources(url: str, save_file: str = None, auto_redirects=False, retries=3):
http = requests.Session()
for scheme in ['http://', 'https://']:
http.mount(scheme, CustomHTTPAdapter())
for i in range((retries if retries > 0 else 0) + 1):
try:
with http.get(
url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
},
allow_redirects=auto_redirects,
stream=(save_file is not None)
) as response:
if save_file:
if 200 != response.status_code:
return bool(0)
with open(save_file, 'wb') as filestream:
for chunk in response.iter_content(chunk_size=8192):
filestream.write(chunk)
return bool(1)
else:
if 200 != response.status_code:
return ''
else:
return response.text
except requests.exceptions.ConnectionError:
retries > 0 and time.sleep(0.75 + round(random.random(), 2))
continue
@staticmethod
def find_binary():
plat = sys.platform
find_list = []
plats = ['win32', 'linux', 'darwin']
if plat == plats[0]:
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']:
find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/'))
if plat == plats[1]:
for p in ['/opt/google/chrome', '/usr/bin/google-chrome']:
find_list.append('%s/chrome' % p)
if plat == plats[2]:
for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']:
find_list.append('%s/chrome' % p)
for execute_file in find_list:
try:
if os.path.exists(execute_file):
return execute_file
except Exception:
pass
def find_driver(self, main_version: str):
if not int(main_version) >= 70:
return None
location = '%s%s%s' % (
self.webdriver_install_location,
os.sep,
'chromedriver_%s%s' % (str(main_version), '.exe' if platform.system().lower() == 'windows' else '')
)
return location.replace("\\", '/') if os.path.exists(location) else None
def pull_driver(self, main_version: str):
if not int(main_version) >= 70:
return None
chromedriver_site = 'https://chromedriver.storage.googleapis.com'
latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version))
if '' == latest_release:
return None
plat = sys.platform
match_assets = []
plats = ['win32', 'linux', 'darwin']
child = ['chromedriver.exe', 'chromedriver']
tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1']
if plat == plats[0]:
match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]])
if plat == plats[1]:
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]])
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]])
package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip')
destdir_chromedriver = self.webdriver_install_location
for assets in match_assets:
res_url = '%s/%s/%s' % (chromedriver_site, latest_release, assets[1])
print('Downloading version %s chromedriver %s to %s...' % (latest_release, res_url, destdir_chromedriver), file=sys.stderr)
if self.open_remote_resources(res_url, package_chromedriver):
dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], destdir_chromedriver)
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version))
os.path.exists(dist_chan) and os.remove(dist_chan)
os.rename(dist, dist_chan)
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
os.remove(package_chromedriver)
return dist_chan.replace("\\", '/')
def main(self, binary: str = None, driver: str = None):
binary = binary or self.find_binary()
if not binary:
raise FileNotFoundError('No browser executable file is found on your system, please confirm whether it has been installed')
if not os.path.exists(binary):
raise FileNotFoundError('The executable file does not exist in %s' % binary)
version = self.resolve_browser_version(binary)
if not version:
raise RuntimeError('Failure to get the browser version number failed in %s' % binary)
driver = driver if driver else self.find_driver(version[1])
driver = driver if driver else self.pull_driver(version[1])
if not driver:
raise FileNotFoundError('Not specified the driver path, and try the automatic download failure')
if not os.path.exists(driver):
raise FileNotFoundError('The driver does not exist in %s' % driver)
return binary, driver
class SeleniumClear:
def __init__(self):
self.last = '%s/.selenium_clear_last' % tempfile.gettempdir()
@staticmethod
def clear_selenium():
if platform.uname().system.lower() == 'windows':
user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')]
if user_home[0] and user_home[1]:
try:
shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1]))
except FileNotFoundError:
pass
@staticmethod
def clear_driver_cache():
for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']:
for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)):
try:
shutil.rmtree(i)
except (FileNotFoundError, PermissionError, WindowsError):
pass
@staticmethod
def file_get_contents(file, text=None):
if not os.path.exists(file):
return text
return open(file=file, mode='r', encoding='utf-8').read()
@staticmethod
def file_put_contents(file, text=None):
return open(file=file, mode='w', encoding='utf-8').write(text)
def straight_clear(self):
self.clear_selenium()
self.clear_driver_cache()
self.file_put_contents(self.last, str(int(time.time())))
def auto(self):
try:
int(self.file_get_contents(self.last, '0')) + 86400 < int(time.time()) and self.straight_clear()
except ValueError:
os.remove(self.last)
class PositionTab:
"""
Position for switch tab.
"""
Prev = 'Go-Prev'
Next = 'Go-Next'
class ColorUtils:
"""
Color utils.
"""
@staticmethod
def hex2rgb(color):
color = color[1:].upper()
for x in color:
if x not in '0123456789ABCDEF':
raise Exception('Found invalid hexa character {0}.'.format(x))
if len(color) == 6 or len(color) == 8:
color = '#' + color[0:6]
elif len(color) == 3:
color = '#' + color[0] * 2 + color[1] * 2 + color[2] * 2
else:
raise Exception('Hexa string should be 3, 6 or 8 digits. if 8 digits, last 2 are ignored.')
hexcolor = color[1:]
r, g, b = int(hexcolor[0:2], 16), int(hexcolor[2:4], 16), int(hexcolor[4:6], 16)
return r, g, b
class FetchResult:
def __init__(self, data):
self.status = 0
self.header = {}
self.result = ''
data and self._resolve_data(data)
def _resolve_data(self, data: dict):
self.status = data['status']
self.header = data['header']
self.result = data['result']
def __str__(self):
return '%s: %s\n%s: %s\n%s: %s' % ('STATUS', self.status, 'HEADER', self.header, 'RESULT', self.result)
class EmptyMethod:
@classmethod
def no_method(cls, *args, **kwargs):
pass
def __setitem__(self, key, value):
pass
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
return self.no_method
def __getattr__(self, item):
return self.no_method
class Browser(InspectRequestsMixin, DriverCommonMixin, Chrome):
"""
Browser web driver.
"""
def __init__(
self,
driver: str = None,
binary: str = None,
driver_classes: int = 0,
debugger_address: str = None,
backends_address: str = None,
headless: bool = False,
lang: str = None,
mute: bool = False,
no_images: bool = False,
user_agent: str = None,
http_proxy: str = None,
home: str = None,
window_size: str = None,
window_site: str = None,
mobile_emulation: BrowserMobileEmulation = None,
option_arguments: list = None,
req_interceptor=None,
res_interceptor=None,
):
self.exited = None
self.is_linux = sys.platform.startswith('linux')
SeleniumClear().auto()
# binary, driver = BrowserPathManager().main(binary, driver)
if self.is_linux is bool(1) and not window_size: window_size = '1920x1080'
if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080'
# Initialization settings.
if (isinstance(option_arguments, list)) is bool(0): option_arguments = []
cdplist = []
service = Service()
options = Options()
self.cdplist = cdplist
# Delete prompt information of chrome being controlled.
exclude_switches = ['enable-automation', 'enable-logging', 'disable-translate']
options.add_experimental_option('debuggerAddress', debugger_address) if debugger_address else options.add_experimental_option('excludeSwitches', exclude_switches)
# Mobile emulation parameter setting start.
if mobile_emulation and not debugger_address:
self.w_browser = mobile_emulation.w + 14
self.h_browser = mobile_emulation.h + 0
self.w_inner_window = mobile_emulation.w + 0
self.h_inner_window = mobile_emulation.h - 86
self.mobile_emulation_screen_w = mobile_emulation.w
self.mobile_emulation_screen_h = mobile_emulation.h
window_size = '%s,%s' % (self.w_browser, self.h_browser)
options.add_experimental_option(
'mobileEmulation', {
'deviceMetrics': {
'width': self.w_inner_window,
'height': self.h_inner_window,
'pixelRatio': 2.75,
'touch': True
},
'userAgent': mobile_emulation.user_agent
}
)
cdplist.append([
'Emulation.setUserAgentOverride', {
'userAgent': mobile_emulation.user_agent,
'userAgentMetadata': {
'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone',
'mobile': True,
'platformVersion': '',
'architecture': '',
'model': ''
}
}
])
else:
self.w_browser = 0
self.h_browser = 0
self.w_inner_window = 0
self.h_inner_window = 0
self.mobile_emulation_screen_w = 0
self.mobile_emulation_screen_h = 0
# Mobile emulation parameter setting end.
# Set browser and webdriver path.
if driver:
service.path = driver
if binary:
options.binary_location = binary
# Add webdriver option arguments.
for i in option_arguments:
options.add_argument(i)
# Set headless mode.
if self.is_linux or headless:
options.add_argument('--headless=new')
# Set no-sandbox mode.
if self.is_linux:
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
# Set language of browser, default is zh-CN.
if lang:
options.add_argument('--lang=%s' % (lang or 'zh-CN'))
hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN')
# Set mute.
if mute:
options.add_argument('--mute-audio=true')
hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr)
# Set no images mode.
if no_images:
options.add_argument('--blink-settings=imagesEnabled=false')
hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr)
# Set default user agent.
if user_agent:
options.add_argument('--user-agent=%s' % user_agent)
hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent)
# Set http proxy for browser.
if http_proxy:
options.add_argument('--proxy-server=http://%s' % http_proxy)
# Set browser window size before startup.
if window_size:
debugger_address or options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ','))
else:
debugger_address or options.add_argument('--start-maximized')
# Start the backend and optimize it.
if (driver_classes == 1 and backends_address is not None) == 1:
for key, value in self._setup_backend({'addr': str(backends_address.split(':')[0]), 'port': int(backends_address.split(':')[1])}).items():
options.set_capability(key, value)
try:
self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.')
self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.')
self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.')
except AttributeError:
pass
else:
self.backend = EmptyMethod()
# Start the browser.
super().__init__(service=service, options=options)
if mobile_emulation:
cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}])
cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}])
cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}])
# Set the request and response interceptor.
if req_interceptor:
try:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.request_interceptor = req_interceptor
except AttributeError:
pass
if res_interceptor:
try:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.response_interceptor = res_interceptor
except AttributeError:
pass
# Sync set http proxy for Selenium-Wire backend.
if http_proxy:
self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy}
# Set browser window size after startup, by default, there will be full screen display window.
if window_size:
try:
self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(','))
except Exception:
pass
# else:
# self.maximize_window()
if window_site:
try:
self.set_window_position(*window_site.replace("\x20", '').split(','))
except Exception:
pass
# Sets a sticky timeout to implicitly wait for an element to be found.
self.implicitly_wait(10)
# Set the amount of time to wait for a page load to complete.
self.set_page_load_timeout(25)
# Open the default page.
home and self.open(home)
@property
def title(self) -> str:
"""
Return current title.
"""
if self.exited:
return ''
self._update_tab_auto_switch()
if (self.current_alert is None) == 1:
return super().title
else:
return ''
@property
def current_url(self) -> str:
"""
Return current url.
"""
if self.exited:
return ''
self._update_tab_auto_switch()
if (self.current_alert is None) == 1:
return super().current_url
else:
return ''
@property
def current_alert(self):
"""
Return current alert object.
"""
if self.exited:
return None
try:
alert = Alert(self)
self.execute(Command.W3C_GET_ALERT_TEXT)
return alert
except Exception:
return None
@property
def current_alert_text(self) -> str:
"""
Return current alert content.
"""
if self.exited:
return ''
try:
return Alert(self).text
except Exception:
return ''
@property
def window_inner_size(self) -> tuple:
"""
Return the page window inner size.
"""
size = self.execute_script('return [window.innerWidth, window.innerHeight];')
return size[0], size[1]
def open(self, url=None):
"""
Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer.
"""
if self.exited:
return None
self._update_tab_auto_switch()
self._update_cdp_command()
return self.get(url)
def turn(self, url=None):
"""
Simulation "window.location.href" jumps, the new page has Referrer.
"""
if self.exited:
return None
return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None)
def wait(self, secs: int | float = 1):
"""
Will sleep waiting.
"""
number_int = int(secs)
number_float = secs - number_int
try:
for i in range(number_int):
time.sleep(1)
else:
time.sleep(number_float)
except (KeyboardInterrupt, InterruptedError):
print('Interrupted', file=sys.stderr)
self.quit()
def quit(self):
"""
Exit the browser.
"""
if self.exited:
return None
self.exited = True
try:
tab_handles = list(self.window_handles)
tab_current = self.current_window_handle
tab_current in tab_handles and tab_handles.remove(tab_current)
tab_handles.append(tab_current)
for tab in tab_handles:
try:
self.switch_to.window(tab)
for _ in range(5):
try:
self.current_alert.dismiss()
except Exception:
break
self.close()
except NoSuchWindowException:
pass
except Exception:
pass
try:
super().quit()
except Exception:
pass
def quit_backend(self):
"""
Exit the browser backend.
"""
if self.exited:
return None
self.exited = True
try:
super().quit()
except Exception:
pass
def find(self, path, wait_for=False, timeout: float = 5.0, freq: float = 0.5, delay: float = 0.0) -> WebElement:
"""
Use XPath to find an element.
"""
element = self.webdriver_wait(timeout, freq).until(EC.presence_of_element_located((By.XPATH, path))) if wait_for else self.find_element(By.XPATH, path)
delay and self.wait(delay)
element = self.find_element(By.XPATH, path) if delay else element
self._element_highlight(element, '#F8BE5F')
return element
def find_mult(self, path) -> list:
"""
Use XPath to find elements.
"""
element = self.find_elements(By.XPATH, path)
for this_element in element: self._element_highlight(this_element, '#F8BE5F')
return element
def find_mult_random_choice(self, path) -> WebElement:
"""
Use XPath to find elements then random choice one.
"""
element = self.find_elements(By.XPATH, path)
element = random.choice(element)
self._element_highlight(element, '#F8BE5F')
return element
def find_element_by(self, sentence):
"""
Custom find element, pass into a tuple or list.
"""
element = self.find_element(*sentence)
self._element_highlight(element, '#F8BE5F')
return element
def click(self, element):
"""
Click element.
"""
self._element_highlight(element, '#FF0000')
element.click()
def click_simulate(self, element):
"""
Click element for simulate.
"""
self._element_click_effect(element)
self.action_chains().reset_actions()
self.action_chains().click(element).perform()
self.wait(0.1)
def touch(self, x, y):
"""
Click on the coordinate.
"""
self.action_chains().reset_actions()
self.action_chains().move_by_offset(x, y).click().perform()
self.wait(0.1)
def input(self, element, content):
"""
Enter the content to the element.
"""
self._element_highlight(element, '#00B6F1')
self.action_chains().reset_actions()
self.action_chains().send_keys_to_element(element, content).perform()
self.wait(0.1)
def mouse(self, element):
"""
Park the mouse here.
"""
self._element_highlight(element, '#49DC07')
self.action_chains().reset_actions()
self.action_chains().move_to_element(element).perform()
def scroll(self):
"""
Scroll page.
"""
self.action_chains().reset_actions()
self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform()
self.wait(0.8)
def scroll_to(self, pos: int | str):
"""
Scroll to the specified location.
"""
if isinstance(pos, int) and pos > 0:
self.execute_script('window.scrollTo(0, arguments[0]);', pos)
elif pos == 0:
self.execute_script('window.scrollTo(0, 0);')
elif pos == 0 - 1:
self.execute_script('window.scrollTo(0, document.body.scrollHeight);')
else:
pass
self.wait(0.8)
def scroll_to_element(self, element):
"""
Scroll to the specified element location.
"""
self.action_chains().reset_actions()
self.action_chains().scroll_to_element(element).perform()
self.wait(0.8)
def fetch(self, url: str, options: dict, cover_options: dict = None):
"""
Sending http requests using fetch.
"""
return FetchResult(self.execute_async_script('''
var _callback = arguments[arguments.length - 1];
var _url = arguments[0];
var _options = arguments[1];
var _data = {};
fetch(_url, _options)
.then((response) => {
_data.status = response.status;
let headers = {};
response.headers.forEach((value, name) => {
headers[name] = value;
});
_data.header = headers;
return response.text();
})
.then((result) => {
_data.result = result;
_callback(_data);
})
.catch((error) => {
console.error(error);
_callback(null);
});
''', url, {**options, **(cover_options or {})}))
def frame_switch_to(self, element_of_frame):
"""
Switch frame to the specified frame element.
"""
self.switch_to.frame(element_of_frame)
self.wait(0.2)
def frame_switch_to_default(self):
"""
Switch to the default frame.
"""
self.switch_to.default_content()
self.wait(0.2)
def tab_create(self, url: str = None):
"""
Create a new tab and open the URL.
"""
self.switch_to.new_window('tab')
self._update_cdp_command()
url and self.open(url)
def tab_switch(self, tab: int | str = None):
"""
Switch the browser tab page.
"""
handles = self.window_handles
lengths = len(handles)
current = handles.index(self.current_window_handle)
if isinstance(tab, int):
handle = tab
elif tab == PositionTab.Prev:
handle = (current - 1)
elif tab == PositionTab.Next:
handle = (current + 1) % lengths
else:
handle = (current + 0)
self.switch_to.window(handles[handle])
self.wait(0.2)
self._update_cdp_command()
def tab_switch_prev(self):
"""
Switch to the previous tab.
"""
self.tab_switch(PositionTab.Prev)
def tab_switch_next(self):
"""
Switch to next tab.
"""
self.tab_switch(PositionTab.Next)
def tab_cancel(self):
"""
Close the current browser tab page.
"""
handles = self.window_handles
if len(handles):
current = handles.index(self.current_window_handle)
self.close()
current > 0 and self.switch_to.window(handles[current - 1])
self.wait(0.2)
def tab_cancel_all(self):
"""
Close all the browser tab page.
"""
handles = self.window_handles
for i in handles:
self.tab_cancel()
def force_display_element(self, element):
"""
Make hidden element visible and interactive.
"""
self.execute_script(
'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element
)
def screenshot(self) -> bytes:
"""
Screenshot as bytes.
"""
return self.get_screenshot_as_png()
def action_chains(self) -> ActionChains:
"""
Return ActionChains object.
"""
return ActionChains(self)
def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None):
"""
Return WebDriverWait object.
"""
return WebDriverWait(
driver=self,
timeout=timeout,
poll_frequency=poll_frequency,
ignored_exceptions=ignored_exceptions
)
def _element_highlight(self, element=None, color='#ff0000', dura=2500):
"""
Make the element highlight.
"""
if not element:
return False
high = ColorUtils.hex2rgb(color)
r = high[0]
g = high[1]
b = high[2]
self.execute_script('''
let e=arguments[0];
try{
let o=[e.style.background||null,e.style.border||null];
e.style.border="1px solid %s";e.style.background="rgba(%s,%s,%s,0.2)";
if(!e.prominent){
e.prominent=true;
setTimeout(function(args){try{args[0].prominent=null;args[0].style.background=args[1][0];args[0].style.border=args[1][1]}catch(e){}},%s,[e,o]);
}
}catch(e){}
''' % (color, r, g, b, dura), element
)
def _element_click_effect(self, element=None, x: int = 0, y: int = 0):
"""
Make a coordinate click effect.
"""
self.execute_script('''
let e=arguments[0];
let r;
let x;
let y;
if(e!==null){
r=e.getBoundingClientRect();
x=r.left+r.width/2+"px";
y=r.top+r.height/2+"px";
}
else{
x=arguments[1]+"px";
y=arguments[2]+"px";
}
let c=document.createElement("div");
c.style="width:%spx;height:%spx;border-radius:50%%;background-color:rgba(255,0,0,0.18);position:absolute;transform:translate(-50%%,-50%%);transition:opacity 0.5s;border:1px solid #ff3c3c;pointer-events:none";
c.style.zIndex=9999;
c.style.left=x;c.style.top=y;
document.body.appendChild(c);
setTimeout(function(){c.style.opacity=0;setTimeout(function(){document.body.removeChild(c)},999)},200);
let w=%s;
let h=%s;
let d=false;
let i=setInterval(function(){
if((w>%s||h>%s)||d){
d=true;
w-=2;
h-=2;
}
else{
w+=5;
h+=5;
}
c.style.width=w+"px";c.style.height=h+"px";
if((w<=12||h<=12)&&d){clearInterval(i)}
},20);
''' % (0, 0, 0, 0, 30, 30), element, x, y
)
def _update_cdp_command(self):
for cmd in self.cdplist:
self.execute_cdp_cmd(*cmd)
def _update_tab_auto_switch(self):
if (self.current_alert is None) == 1:
try:
self.execute(Command.GET_TITLE)
except Exception:
len(self.window_handles) > 0 and self.switch_to.window(self.window_handles[0])
class BrowserPluginRunningParam(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@classmethod
def no_method(cls, *args, **kwargs):
pass
def __setitem__(self, key, value):
pass
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return self.no_method
class BrowserPluginParent:
id = None
name = None
requirements = []
def __init__(self):
self.app_id = None
self.thread = None
self.run_task_exception = None
self.user_id = None
self.instant_message = None
def _run_task(self, *args, **kwargs):
try:
self.running(*args, **kwargs)
except Exception as e:
self.run_task_exception = e
self.message_except(e)
def message(self, message=None):
print('[%s]: %s' % (str(self.app_id), message), file=sys.stderr)
if len(str(message)) <= 4096:
notification_send(app_id=self.app_id, title=self.name, message='%s' % (message,))
self.instant_message and self.instant_message(json.dumps({
'time': int(time.time()), 'type': 'message', 'data': {'user_id': str(self.user_id), 'content': str(message), 'title': str(self.app_id)}
}))
def message_except(self, exception_info):
notification_send(app_id=self.app_id, title='%s ' % (self.name,), message='%s' % (exception_info,))
def logging(self, logging=None):
print('[%s]: %s' % (str(self.app_id), logging), file=sys.stderr)
if len(str(logging)) <= 1024:
self.instant_message and self.instant_message(json.dumps({
'time': int(time.time()), 'type': 'logging', 'data': {'user_id': str(self.user_id), 'content': str(logging)}
}))
def run(self, app_id, user_id, driver, requirements, instant_message) -> threading.Thread:
self.app_id, self.user_id, self.instant_message = app_id, user_id, instant_message
params = BrowserPluginRunningParam({'driver': driver, 'message': self.message, 'logging': self.logging, 'requirements': requirements})
thread = threading.Thread(target=self._run_task, args=(params,))
self.thread = thread
thread.daemon = False
thread.start()
return thread
def interrupt(self):
try:
self.thread and self.thread.is_alive() and ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.thread.ident), ctypes.py_object(SystemExit))
except Exception:
pass
finally:
self.thread = None
def state(self):
return self.thread.is_alive() if self.thread else None
@staticmethod
def running(param: BrowserPluginRunningParam):
param.driver.wait(5)
class BrowserPluginFileTest(BrowserPluginParent):
name = '单个文件上传测试'
requirements = [
{
'type': 'file',
'html': '上传文件'
}
]
@staticmethod
def running(param):
param.message(param.requirements)
class BrowserPluginMultFileTest(BrowserPluginParent):
name = '多个文件上传测试'
requirements = [
{
'type': 'file-mult',
'html': '上传文件'
}
]
@staticmethod
def running(param):
param.message(param.requirements)
class BrowserPluginTextTest(BrowserPluginParent):
name = '单行文本输入测试'
requirements = [
{
'type': 'text',
'html': '输入文本'
}
]
@staticmethod
def running(param):
param.message(param.requirements)
class BrowserPluginMultTextTest(BrowserPluginParent):
name = '多行文本输入测试'
requirements = [
{
'type': 'text-mult',
'html': '输入文本'
}
]
@staticmethod
def running(param):
param.message(param.requirements)
class BrowserPluginLoggingTest(BrowserPluginParent):
name = '在线日志输出测试'
requirements = [
{
'type': 'text-mult',
'html': '输入内容'
}
]
@staticmethod
def running(param):
param.logging(param.requirements[0])
class BrowserPluginMessageTest(BrowserPluginParent):
name = '在线消息弹窗测试'
requirements = [
{
'type': 'text-mult',
'html': '输入内容'
}
]
@staticmethod
def running(param):
param.message(param.requirements[0])
class BrowserPluginFileCommandDebug(BrowserPluginParent):
name = '上传脚本进行调试'
requirements = [
{
'type': 'file',
'html': '调试'
}
]
@staticmethod
def running(param):
file = param.requirements[0][0]['path']
exec(open(file, mode='r', encoding='utf-8').read())
class BrowserPluginTextCommandDebug(BrowserPluginParent):
name = '输入命令进行调试'
requirements = [
{
'type': 'text-mult',
'html': '调试'
}
]
@staticmethod
def running(param):
code = param.requirements[0]
code and exec(code)
class BrowserManagerDataStorage(dict):
def __init__(self, file: str):
self._data_file = os.path.abspath(file)
super().__init__(self._get_json_data(self._data_file, {}))
@staticmethod
def _get_json_data(file: str, data=None):
if os.path.exists(file):
try:
return json.loads(open(file=file, mode='r', encoding='utf-8').read())
except json.decoder.JSONDecodeError:
return data
return data
@staticmethod
def _put_json_data(file: str, data=None):
with open(file=file, mode='w', encoding='utf-8') as f:
flock(f, LOCK_EX)
res = f.write(json.dumps(data, indent=4, ensure_ascii=True))
flock(f, LOCK_UN)
return res
def __getitem__(self, item):
if item in self:
return super().__getitem__(item)
else:
return None
def __setitem__(self, key, value):
super().__setitem__(key, value)
def save(self):
self._put_json_data(self._data_file, self)
class BrowserManagerUserRunning:
def __init__(
self,
running=None,
user_id: str = None,
user_name: str = None,
user_data_dir: str = None,
remote_debugging_port: int = None
):
if isinstance(running, type(self)):
self.__dict__ = running.__dict__
else:
self.active = 1
self.update_status_last_time = 0
self.user_id = user_id
self.user_name = user_name
self.user_data_dir = user_data_dir
self.remote_debugging_port = remote_debugging_port
self.driver = None
self.plugin = None
self.is_running_property = False
self.url = None
self.title = None
self.alert = None
self.window_handles = None
self.update_status_details()
@property
def is_running(self):
self.update_status_running()
return self.is_running_property
@is_running.setter
def is_running(self, value):
self.is_running_property = value
@property
def is_user_running(self):
try:
open(os.path.join(self.user_data_dir, 'lockfile'), mode='r').close()
return False
except FileNotFoundError:
return False
except PermissionError:
return True
# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# s.settimeout(0.01)
# try:
# return s.connect_ex(('127.0.0.1', self.remote_debugging_port)) == 0
# except socket.timeout:
# return False
# finally:
# s.close()
@func_set_timeout(0.05)
def _get_window_handles(self):
return list(self.driver.window_handles)
@func_set_timeout(0.05)
def _get_current_alert_text(self):
return str(self.driver.current_alert_text)
@func_set_timeout(0.05)
def _get_title(self):
return str(self.driver.title)
@func_set_timeout(0.05)
def _get_current_url(self):
return str(self.driver.current_url)
def update_status_running(self):
if (self.is_user_running is False) == 1:
self.chrome_stopped_trigger()
else:
self.chrome_running_trigger()
def update_status_details(self):
if (self.is_user_running is False) == 1:
self.chrome_stopped_trigger()
else:
self.chrome_running_trigger()
if (round(time.time(), 3) - self.update_status_last_time) > round(random.uniform(3, 12), 3) and self.driver and self.active:
try:
self.window_handles = self._get_window_handles()
self.alert = self._get_current_alert_text()
self.title = self._get_title()
self.url = self._get_current_url()
except FunctionTimedOut:
pass
except Exception:
pass
self.update_status_last_time = round(time.time(), 3)
def chrome_running_trigger(self):
if (not self.is_running_property) == 0:
return None
self.is_running = bool(1)
def chrome_stopped_trigger(self):
if (not self.is_running_property) == 1:
return None
self.is_running = bool(0)
self.url = None
self.title = None
self.alert = None
self.window_handles = None
if (not self.plugin) == 0:
plugin = self.plugin
threading.Thread(target=plugin.interrupt).start()
self.plugin = None
if (not self.driver) == 0:
driver = self.driver
threading.Thread(target=driver.quit_backend).start()
self.driver = None
def status(self):
threading.Thread(target=self.update_status_details).start()
return {
'user_id': self.user_id,
'user_name': self.user_name,
'user_data_dir': self.user_data_dir,
'remote_debugging_port': self.remote_debugging_port,
'is_running': self.is_running,
'current_plugin_executing': self.plugin.id if self.plugin and self.plugin.state() else None,
'url': self.url,
'title': self.title,
'alert': self.alert,
'window_handles': self.window_handles
}
def app_id(self):
return '%s|%s' % (self.user_id, self.user_name)
def set_driver(self, driver: Browser = None):
self.driver = driver
def set_plugin(self, plugin: BrowserPluginParent = None):
self.plugin = plugin
def set_user_name(self, user_name: str):
self.user_name = user_name
class BrowserManager:
def __init__(self, runner, driver: str, binary: str, manager_data_file: str, browser_data_home: str, browser_init_home: str, use_selenium_wire: int = 0):
self.runner = runner
if not os.path.exists(driver):
raise FileNotFoundError('The driver executable file does not exist.')
if not os.path.exists(binary):
raise FileNotFoundError('The binary executable file does not exist.')
self.driver = driver
self.binary = binary
self.manager_data_file = manager_data_file
self.browser_data_home = browser_data_home
self.browser_init_home = browser_init_home
self.use_selenium_wire = use_selenium_wire
self.threading_lock = threading.RLock()
self.debugging_port_range = range(60000, 60256)
self.data_storage = BrowserManagerDataStorage(manager_data_file)
if (not self.data_storage) == 1:
self.data_storage['browser_user'] = {}
self.data_storage.save()
os.path.exists(browser_data_home) or os.makedirs(browser_data_home)
self.geometry_config = {}
self.user_running = {user_id: self._initialize_user_running(user_id) for user_id in self.data_storage['browser_user'].keys()}
self.user_in_operation = []
self.plugins_int = {}
self.plugins_ext = {}
def _generate_remote_debugging_port(self):
exist_ports = [value['remote_debugging_port'] for key, value in self.data_storage['browser_user'].items()]
return random.choice([p for p in self.debugging_port_range if p not in exist_ports])
def _update_user_running(self):
for user_id, running in self.user_running.items():
if (not running) == 1:
continue
run = BrowserManagerUserRunning(running)
run.update_status_running()
def _get_user_name(self, user_id: str):
return str(self.data_storage['browser_user'][user_id]['user_name'] or '')
def _get_user_data_dir(self, user_id: str):
return os.path.join(self.browser_data_home, self.data_storage['browser_user'][user_id]['user_data_dir'])
def _get_user_remote_debugging_port(self, user_id: str):
return int(self.data_storage['browser_user'][user_id]['remote_debugging_port'])
def _get_user_app_id(self, user_id: str):
user_name = self.data_storage['browser_user'][user_id]['user_name']
return user_name or user_id
def _initialize_user_running(self, user_id: str):
return BrowserManagerUserRunning(
user_id=user_id,
user_name=self._get_user_name(user_id),
user_data_dir=self._get_user_data_dir(user_id),
remote_debugging_port=self._get_user_remote_debugging_port(user_id)
)
@property
def plugins(self) -> dict:
return {**self.plugins_int, **self.plugins_ext}
def handle_interrupt(self):
print('Received interrupt. Ending all backend...', file=sys.stderr)
online_users = self.user_ids_online()
for user_id in online_users:
try:
driver = BrowserManagerUserRunning(self.user_running[user_id]).driver
driver and threading.Thread(target=driver.quit_backend).start()
except Exception:
pass
@staticmethod
def req_interceptor(req: SeleniumWireRequest):
pass
@staticmethod
def res_interceptor(req: SeleniumWireRequest, res: SeleniumWireResponse):
try:
print('[%s] %s %s %s' % (str(req.method).ljust(7), 'HTTP', res.status_code, req.url), file=sys.stderr)
except Exception as e:
print(e, file=sys.stderr)
def run_browser(self, user_data_dir: str, remote_debugging_port: int, proxy_server: str = None):
options = [
self.binary,
'--disable-background-networking',
'--disable-desktop-notifications',
'--disable-component-update',
'--no-default-browser-check',
'--no-first-run',
'--hide-crash-restore-bubble'
]
if user_data_dir:
options.append('--user-data-dir=%s' % (user_data_dir,))
if remote_debugging_port:
options.append('--remote-debugging-port=%s' % (remote_debugging_port,))
if proxy_server:
options.append('--proxy-server=%s' % (proxy_server,))
return subprocess.Popen(options, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
def restore_last_status(self):
for user_id, running in self.user_running.items():
if (not running) == 1:
continue
run = BrowserManagerUserRunning(running)
run.update_status_running()
if run.is_running and run.driver is None:
threading.Thread(target=self.user_run, args=(user_id,)).start()
print('Restoring user control: %s' % (user_id,), file=sys.stderr)
def load_plugins(self, plugins, is_external=0):
if (plugins is None) == 1:
return None
if (not is_external) == 1:
plugin_dict = self.plugins_int
plugin_id_prefix = '00'
else:
plugin_dict = self.plugins_ext
plugin_id_prefix = '01'
plugin_dict.clear()
plugin_length = len(plugins)
for i in range(plugin_length):
plugin_class = plugins[i]
plugin_id = '%s_%s_%s' % (plugin_id_prefix, '{:02d}'.format(i + 1), hashlib.md5(str(plugin_class.__name__).encode(encoding='utf-8')).hexdigest())
plugin_class.id = plugin_id
plugin_dict[plugin_id] = plugin_class
def load_plugins_from_external_module(self):
module_home_list = [os.path.join(os.path.dirname(__file__), 'Packages'), os.path.join(self.runner.app_data, 'Packages')]
for module_home in module_home_list:
if (not os.path.exists(module_home)) == 1:
continue
try:
plugins_classes_site = []
module_list = glob.glob(os.path.join(module_home, 'Plugin*.py'))
for module_path in module_list:
try:
print('Load plugins from \"%s\"' % (module_path,), file=sys.stderr)
plugins_modules = import_module(module_path)
plugins_classes = [type(name, (cls, BrowserPluginParent), {}) for name, cls in plugins_modules.__dict__.items() if
name.startswith('BrowserPlugin') and inspect.isclass(cls)]
plugins_classes_site.append(plugins_classes)
except Exception as e:
print(e, file=sys.stderr)
self.load_plugins([element for sublist in plugins_classes_site for element in sublist], is_external=1)
except Exception as e:
print(e, file=sys.stderr)
def update_geometry_config(self, screen_w: int, screen_h: int, window_w: int, window_h: int, window_x: int, window_y: int):
self.geometry_config = {
'screen_w': screen_w,
'screen_h': screen_h,
'control_window_w': window_w,
'control_window_h': window_h,
'control_window_x': window_x,
'control_window_y': window_y,
'browser_window_w': round(window_h * 1.78) if ((screen_w - window_w) / window_h) > 1.78 else (screen_w - window_w),
'browser_window_h': window_h,
'browser_window_x': window_x + window_w,
'browser_window_y': window_y
}
def user_operate_starting(self, user_id: str):
user_id = str(user_id)
# with self.threading_lock:
# if (user_id not in self.user_in_operation) == 1:
# self.user_in_operation.append(user_id)
# else:
# raise RuntimeError('Busy.')
if (user_id not in self.user_in_operation) == 1:
self.user_in_operation.append(user_id)
else:
raise RuntimeError('Busy.')
def user_operate_complete(self, user_id: str):
user_id = str(user_id)
with self.threading_lock:
while user_id in self.user_in_operation:
self.user_in_operation.remove(user_id)
def is_user_data_occupied(self, user_id: str):
try:
open(os.path.join(self._get_user_data_dir(user_id), 'lockfile'), mode='r').close()
return False
except FileNotFoundError:
return False
except PermissionError:
return True
def is_port_in_use(self, port: int):
if (port not in self.debugging_port_range) == 1:
return True
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.08)
try:
return s.connect_ex(('127.0.0.1', port)) == 0
except socket.timeout:
return False
finally:
s.close()
def plugin_run(self, user_id: str, plugin_id: str, requirements=None):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
running = BrowserManagerUserRunning(self.user_running[user_id])
if (running.plugin and running.plugin.state()) == 1:
raise Exception('The plugin is running, please stop it first.')
driver = running.driver
if (driver is None) == 1:
raise Exception('No driver object.')
plugin = self.plugins[plugin_id]()
plugin.run(self._get_user_app_id(user_id), user_id, driver, requirements, self.runner.web_server.websocket_connection_manager.send_broadcast_use_sync)
running.set_plugin(plugin)
finally:
self.user_operate_complete(user_id)
def plugin_shutdown(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
running = BrowserManagerUserRunning(self.user_running[user_id])
if (running.plugin and running.plugin.state()) != 1:
raise Exception('The plugin is not running.')
plugin = running.plugin
plugin.interrupt()
finally:
self.user_operate_complete(user_id)
def user_ids(self):
return [user_id for user_id in self.data_storage['browser_user'].keys()]
def user_ids_online(self):
self._update_user_running()
return [user_id for user_id, running in self.user_running.items() if running and running.is_running]
def user_all_details(self, user_id: str = None):
plugins = {plugin_id: {'name': plugin_class.name, 'requirements': plugin_class.requirements} for plugin_id, plugin_class in self.plugins.items()}
details = {user_id: self.user_running[user_id].status() for user_id in self.user_ids() if user_id in self.user_running.keys()}
details = {user_id: {**detail, **{'plugins': plugins}} for user_id, detail in details.items()}
if (user_id and user_id in details.keys()) == 1:
return details[user_id]
else:
return details
def user_set_name(self, user_id: str, user_name: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
if (not isinstance(user_name, str) or len(user_name) > 24) == 1:
raise ValueError('Illegal Username.')
user_name = user_name.strip()
self.data_storage['browser_user'][user_id]['user_name'] = user_name
self.data_storage.save()
self.user_running[user_id].set_user_name(user_name)
finally:
self.user_operate_complete(user_id)
@func_set_timeout(0.45)
def user_focus_window(self, user_id: str):
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
try:
self.user_running[user_id].driver.minimize_window()
self.user_running[user_id].driver.set_window_rect(
self.geometry_config['browser_window_x'],
self.geometry_config['browser_window_y'],
self.geometry_config['browser_window_w'],
self.geometry_config['browser_window_h']
)
except Exception:
pass
def user_add(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (not isinstance(user_id, str) or not re.match('^[0-9A-Za-z_-]{1,16}$', user_id)) == 1:
raise ValueError('Invalid user ID format.')
if (user_id in self.user_ids()) == 1:
raise FileExistsError('User ID already exists.')
basename = 'data_%s' % (user_id,)
data_dir = os.path.join(self.browser_data_home, basename)
self.data_storage['browser_user'][user_id] = {
'user_name': '',
'user_data_dir': basename,
'remote_debugging_port': self._generate_remote_debugging_port()
}
self.data_storage.save()
self.user_running[user_id] = self._initialize_user_running(user_id)
if (os.path.exists(self.browser_init_home)) == 1:
os.path.exists(data_dir) or shutil.copytree(self.browser_init_home, data_dir)
else:
os.path.exists(data_dir) or os.makedirs(data_dir)
finally:
self.user_operate_complete(user_id)
def user_del(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
if (user_id in self.user_ids_online()) == 1:
raise FileExistsError('User ID is running, please stop it first.')
data_dir = self._get_user_data_dir(user_id)
data_dir_deleted = '%s_deleted' % (data_dir,)
os.path.exists(data_dir) and os.rename(data_dir, data_dir_deleted)
os.path.exists(data_dir_deleted) and shutil.rmtree(data_dir_deleted)
self.data_storage['browser_user'].pop(user_id)
self.data_storage.save()
self.user_running.pop(user_id)
finally:
self.user_operate_complete(user_id)
def user_run(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
user_data_dir = self._get_user_data_dir(user_id)
remote_debugging_port = self._get_user_remote_debugging_port(user_id)
mitmproxy_port = remote_debugging_port - 5000
running = BrowserManagerUserRunning(self.user_running[user_id])
running.active = 1
if (self.is_user_data_occupied(user_id)) == 0:
self.run_browser(user_data_dir=user_data_dir, remote_debugging_port=remote_debugging_port, proxy_server='127.0.0.1:%s' % (mitmproxy_port,) if self.use_selenium_wire else None)
driver = Browser(
driver=self.driver,
binary=self.binary,
driver_classes=self.use_selenium_wire,
window_size=('%s,%s' % (self.geometry_config['browser_window_w'], self.geometry_config['browser_window_h'])) if self.geometry_config else None,
window_site=('%s,%s' % (self.geometry_config['browser_window_x'], self.geometry_config['browser_window_y'])) if self.geometry_config else None,
debugger_address='127.0.0.1:%s' % (remote_debugging_port,),
backends_address='127.0.0.1:%s' % (mitmproxy_port,),
req_interceptor=self.use_selenium_wire and self.req_interceptor,
res_interceptor=self.use_selenium_wire and self.res_interceptor
)
running.set_driver(driver)
finally:
self.user_operate_complete(user_id)
def user_die(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
debugging_port = self._get_user_remote_debugging_port(user_id)
running = BrowserManagerUserRunning(self.user_running[user_id])
running.active = 0
driver = running.driver
plugin = running.plugin
plugin and plugin.state() and plugin.interrupt()
running.set_driver(None)
if (driver is None) == 1:
raise Exception('No driver object.')
if (self.is_port_in_use(debugging_port)) == 1:
threading.Thread(target=driver.quit).start()
while self.is_user_data_occupied(user_id):
try:
time.sleep(0.35)
except KeyboardInterrupt:
pass
else:
raise Exception('The debug port is not in listening.')
finally:
self.user_operate_complete(user_id)
class MainWindow(QMainWindow):
def __init__(self, runner, app_name: str, app_version: str, scale_rate: float, web_listen_host: str, web_listen_port: int):
super().__init__()
self.runner = runner
try:
if (os.path.exists(self.runner.app_running_file)) == 1:
QMessageBox.information(self, 'Warning', 'The application was not exited normally last time. Please wait for recovery.')
self.runner.web_server.browser_manager.restore_last_status()
except Exception:
pass
finally:
open(self.runner.app_running_file, mode='w').close()
self.scale_rate = scale_rate
self.web_listen_host = web_listen_host
self.web_listen_port = web_listen_port
self.setWindowTitle('%s %s' % (app_name, app_version))
self.setAcceptDrops(True)
self.setWindowFlags(self.windowFlags() | Qt.WindowMinMaxButtonsHint)
screen = QDesktopWidget().screenGeometry()
self.screen_w, self.screen_h = screen.width(), screen.height()
self.window_w, self.window_h = round(480 * self.scale_rate), round(868 * self.scale_rate)
self.window_x, self.window_y = 0, 0
self.setFixedSize(self.window_w, self.window_h)
QNetworkProxyFactory.setUseSystemConfiguration(False)
self.webview = WebEngineView()
self.webview_cache_dir = os.path.join(tempfile.gettempdir(), hashlib.md5(bytes('%s_cache' % (__file__,), encoding='utf-8')).hexdigest()[:32])
web_settings = self.webview.settings()
web_settings.setAttribute(QWebEngineSettings.ScrollAnimatorEnabled, True)
web_settings.setAttribute(QWebEngineSettings.Accelerated2dCanvasEnabled, True)
web_settings.setAttribute(QWebEngineSettings.WebGLEnabled, True)
profile = QWebEngineProfile.defaultProfile()
profile.setPersistentStoragePath(self.webview_cache_dir)
profile.setCachePath(self.webview_cache_dir)
self.webview.setZoomFactor(self.scale_rate or 1.0)
self.setCentralWidget(self.webview)
self.setWindowIcon(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')))
self.tray_icon = QSystemTrayIcon(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')), self)
self.tray_icon_update_timer = QTimer(self)
self.tray_icon_update_timer.timeout.connect(self.on_tray_icon_update)
self.tray_icon_update_timer.start(1750)
self.tray_icon.activated.connect(self.on_tray_icon_activated)
self.tray_menu = QMenu()
action_list = [
['调试:全部打开必应搜索', self.on_debug_all_users_open_home],
['调试:全部打开新标签页', self.on_debug_all_users_open_none],
['调试:打印环境变量信息', self.on_debug_print_environment],
['调试:打印模块搜索路径', self.on_debug_print_module_path],
['调试:重新加载外部插件', self.on_debug_reload_external_plugins],
['重载面板', self.webview.reload],
['显示窗口', self.window_show],
['退出', self.exit]
]
for action_item in action_list:
action = QAction(action_item[0], self)
action.triggered.connect(action_item[1])
self.tray_menu.addAction(action)
self.tray_icon.setContextMenu(self.tray_menu)
self.tray_icon.show()
QShortcut(QKeySequence('F5'), self).activated.connect(self.on_debug_reload_external_plugins)
self.init()
def closeEvent(self, event):
event.ignore()
if (self.isVisible()) == 1:
self.hide()
else:
print('Terminating...', file=sys.stderr)
self.exit()
def init(self):
self.show()
self.window_position_reset()
self.runner.web_server.browser_manager.update_geometry_config(
screen_w=round(self.screen_w / self.scale_rate),
screen_h=round(self.screen_h / self.scale_rate),
window_w=round(self.geometry().width() / self.scale_rate),
window_h=round(self.geometry().height() / self.scale_rate) + 38,
window_x=self.pos().x(),
window_y=self.pos().y()
)
self.webview.load(QUrl('http://%s:%s/' % ('127.0.0.1' if self.web_listen_host == '0.0.0.0' else self.web_listen_host, self.web_listen_port)))
def exit(self):
self.webview.load(QUrl('about:blank'))
self.webview.deleteLater()
self.runner.handle_interrupt()
try:
os.remove(self.runner.app_running_file)
except Exception:
pass
QApplication.quit()
def window_show(self):
self.showNormal()
self.window_position_reset()
def window_hide(self):
self.hide()
def window_toggle(self):
self.window_show()
def window_position_reset(self):
self.setGeometry(self.window_x, self.window_y + (self.frameGeometry().height() - self.geometry().height()), self.window_w, self.window_h)
def on_tray_icon_update(self):
self.tray_icon.setToolTip('活跃用户:%s/%s' % (len(self.runner.web_server.browser_manager.user_ids_online()), len(self.runner.web_server.browser_manager.user_ids())))
def on_tray_icon_activated(self, reason):
if (reason == QSystemTrayIcon.DoubleClick) == 1:
self.window_toggle()
def on_debug_all_users_open_home(self):
for user_id in self.runner.web_server.browser_manager.user_ids_online():
driver = self.runner.web_server.browser_manager.user_running[user_id].driver
driver and threading.Thread(target=driver.open, args=('https://www.bing.com/',)).start()
def on_debug_all_users_open_none(self):
for user_id in self.runner.web_server.browser_manager.user_ids_online():
driver = self.runner.web_server.browser_manager.user_running[user_id].driver
driver and threading.Thread(target=driver.open, args=('chrome://new-tab-page',)).start()
def on_debug_print_environment(self):
self.showNormal()
QMessageBox.information(self, '提示', '\n'.join(['%s=%s' % (key, value) for key, value in os.environ.items()]))
def on_debug_print_module_path(self):
self.showNormal()
QMessageBox.information(self, '提示', '\n'.join(['%s' % (value,) for value in sys.path]))
def on_debug_reload_external_plugins(self):
self.runner.web_server.browser_manager.load_plugins_from_external_module()
self.webview.reload()
class WebEngineViewUrlRequestInterceptor(QWebEngineUrlRequestInterceptor):
def interceptRequest(self, info):
url = info.requestUrl().toString()
url.startswith('file://') and info.redirect(QUrl(''))
webEngineViewUrlRequestInterceptor = WebEngineViewUrlRequestInterceptor()
class WebEngineView(QWebEngineView):
def __init__(self, parent=None):
super().__init__(parent)
self.setContextMenuPolicy(0)
self.setAcceptDrops(True)
self.settings().setFontFamily(self.settings().StandardFont, 'Microsoft YaHei')
self.page().profile().downloadRequested.connect(self.onDownloadRequested)
self.page().profile().setUrlRequestInterceptor(webEngineViewUrlRequestInterceptor)
@staticmethod
def onDownloadRequested(download):
download.accept()
def createWindow(self, wt):
return self
class WebServerAPIJSONData(BaseModel):
data: dict = None
class WebSocketConnectionManager:
def __init__(self):
self.active_connections = []
async def connect(self, websocket: WebSocket, disconnect=None):
if disconnect:
self.active_connections.remove(websocket)
else:
await websocket.accept()
await websocket.send_text('Hello, %s:%s' % (websocket.client.host, websocket.client.port))
self.active_connections.append(websocket)
async def send_broadcast(self, message: str):
dead_connections = []
for conn in self.active_connections:
try:
await conn.send_text(str(message))
except WebSocketDisconnect:
dead_connections.append(conn)
for conn in dead_connections:
await self.connect(conn, 1)
def send_broadcast_use_sync(self, text):
asyncio.run(self.send_broadcast(text))
class WebServer:
def __init__(self, runner, root: str, data: str, default_plugins=None):
self.runner = runner
self.app = FastAPI()
self.www = os.path.join(root, 'www')
self.upload_dir = os.path.join(data, 'upload')
self.browser_manager = BrowserManager(
runner=runner,
driver=os.path.join(data, 'Chrome/chromedriver.exe'),
binary=os.path.join(data, 'Chrome/chrome.exe'),
manager_data_file=os.path.join(data, 'manager.json'),
browser_data_home=os.path.join(data, 'users'),
browser_init_home=os.path.join(root, 'initialize'),
use_selenium_wire=0
)
self.browser_manager.load_plugins(default_plugins, is_external=0)
self.browser_manager.load_plugins_from_external_module()
self.websocket_connection_manager = WebSocketConnectionManager()
@self.app.websocket('/instant_message')
async def websocket_instant_message(websocket: WebSocket):
await self.websocket_connection_manager.connect(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
pass
await self.websocket_connection_manager.connect(websocket, 1)
@self.app.api_route(methods=['POST'], path='/plugin_run')
def api_plugin_run(data: WebServerAPIJSONData):
try:
self.browser_manager.plugin_run(data.data['user_id'], data.data['plugin_id'], data.data['requirements'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/plugin_shutdown')
def api_plugin_shutdown(data: WebServerAPIJSONData):
try:
self.browser_manager.plugin_shutdown(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/upload_files')
def api_upload_files(files: List[UploadFile] = File(...)):
if (len(files) > 25) == 1:
return JSONResponse(status_code=200, content=self.message(1, 'Exceeded the maximum number of files allowed of 25.'))
if (sum(file.size for file in files) > 50 * 1024 * 1024) == 1:
return JSONResponse(status_code=200, content=self.message(1, 'Total file size exceeds the limit of 50MB.'))
else:
uploaded_files = []
for file in files:
file_save_path = os.path.join(self.upload_dir, '%s_%s' % (int(time.time() * 1000), file.filename))
with open(file_save_path, 'wb') as f:
f.write(file.file.read())
uploaded_files.append({'filename': file.filename, 'size': file.size, 'type': file.content_type, 'path': file_save_path})
return JSONResponse(status_code=200, content=self.message(0, '', uploaded_files))
@self.app.api_route(methods=['GET', 'POST'], path='/user_all_details')
def api_user_all_details():
return self.browser_manager.user_all_details()
@self.app.api_route(methods=['POST'], path='/shutdown')
def api_shutdown():
try:
for user_id in self.browser_manager.user_ids_online():
self.browser_manager.user_die(user_id)
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_set_name')
def api_user_set_name(data: WebServerAPIJSONData):
try:
self.browser_manager.user_set_name(data.data['user_id'], data.data['user_name'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_add')
def api_user_add(data: WebServerAPIJSONData):
try:
self.browser_manager.user_add(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_del')
def api_user_del(data: WebServerAPIJSONData):
try:
self.browser_manager.user_del(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_run')
def api_user_run(data: WebServerAPIJSONData):
try:
self.browser_manager.user_run(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_die')
def api_user_die(data: WebServerAPIJSONData):
try:
self.browser_manager.user_die(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_focus_window')
def api_user_focus_window(data: WebServerAPIJSONData):
try:
self.browser_manager.user_focus_window(data.data['user_id'])
return self.message(0)
except FunctionTimedOut:
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['GET'], path='/')
def index():
return self.statics(os.path.join(self.www, 'index.html'))
@self.app.api_route(methods=['GET'], path='/200')
def state():
return Response(content='200', media_type='text/plain')
@self.app.api_route(methods=['GET'], path='/{filename:path}')
def www(filename):
return self.statics(os.path.join(self.www, filename))
@staticmethod
def message(code: int, info: str = '', data=None):
return {'code': int(code), 'info': info, 'data': data}
@staticmethod
def statics(file: str):
if (not os.path.isfile(file)) == 1:
raise HTTPException(status_code=404)
return FileResponse(file)
def handle_interrupt(self):
self.browser_manager.handle_interrupt()
self.upload_clear()
def upload_clear(self):
dirs = self.upload_dir
if (os.path.exists(dirs)) == 1:
for filename in os.listdir(dirs):
filepath = os.path.join(dirs, filename)
try:
os.path.isfile(filepath) and os.unlink(filepath)
except Exception:
pass
def run(self, host='127.0.0.1', port=8080):
os.path.exists(self.www) or os.makedirs(self.www)
os.path.exists(self.upload_dir) or os.makedirs(self.upload_dir)
self.upload_clear()
uvicorn.run(self.app, host=host, port=port, log_level='warning')
class OutputRedirector:
def __init__(self, sys_fp, new_fp):
self.sys_fp = sys_fp
self.new_fp = new_fp
self.time_format = ''
@staticmethod
def time():
return '[%s]\n' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),)
def write(self, s):
self.sys_fp.write(s)
if (self.time_format != self.time()) == 1:
self.time_format = self.time()
self.new_fp.write(self.time_format)
self.new_fp.write(s)
self.flush()
def writelines(self, lines):
self.sys_fp.writelines(lines)
self.new_fp.writelines(lines)
self.flush()
def flush(self):
self.sys_fp.flush()
self.new_fp.flush()
def __getattr__(self, item):
if item in ['write', 'writelines', 'flush']:
return getattr(self, item)
else:
return getattr(self.sys_fp, item)
class MainRunner:
def __init__(self):
signal.signal(signal.SIGINT, self._handle_interrupt)
self.app_root = os.path.dirname(__file__)
appdata = os.getenv('APPDATA')
self.app_data = os.path.join(appdata, 'Galactic') if appdata else os.path.join(os.path.dirname(__file__), 'data')
self.app_running_file = os.path.join(self.app_data, 'running')
self.app_name = 'Galactic'
self.app_version = '1.0.0.2'
self.web_server_host = '127.0.0.1'
self.web_server_port = 8095
self.web_server = None
self.web_server_thread = None
self.application = None
self.application_scale_rate = None
self.window = None
self.plugin_list = [
# BrowserPluginFileTest,
# BrowserPluginTextTest,
# BrowserPluginMultFileTest,
# BrowserPluginMultTextTest,
# BrowserPluginLoggingTest,
# BrowserPluginMessageTest,
# BrowserPluginFileCommandDebug,
# BrowserPluginTextCommandDebug
]
def _copy_files_and_directories(self, src, dst):
function_name = inspect.currentframe().f_code.co_name
if (os.path.exists(src)) != 1:
return None
if (os.path.isdir(src)) == 1:
if not os.path.exists(dst):
os.makedirs(dst)
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
self.__getattribute__(function_name)(s, d)
else:
shutil.copy(s, d)
else:
shutil.copy(src, dst)
@staticmethod
def _remove_directory(src):
if (os.path.isdir(src)) == 1:
shutil.rmtree(src)
@staticmethod
def _rename_directory(src, dst):
if (os.path.isdir(src)) == 1:
os.rename(src, dst)
@staticmethod
def _read_file_text(src):
return open(file=src, mode='r', encoding='utf-8').read() if os.path.exists(src) else ''
def install_chrome(self):
chrome_install_source = os.path.join(self.app_root, 'Chrome')
chrome_install_source_version = os.path.join(chrome_install_source, 'chrome.version')
chrome_install_target = os.path.join(self.app_data, 'Chrome')
chrome_install_target_version = os.path.join(chrome_install_target, 'chrome.version')
chrome_install_target_newname = '%s.delete' % (chrome_install_target,)
if (os.path.exists(chrome_install_source)) != 1:
return None
if (self._read_file_text(chrome_install_source_version) == self._read_file_text(chrome_install_target_version)) == 1:
return None
if (os.path.exists(chrome_install_target)) == 1:
self._remove_directory(chrome_install_target_newname)
self._rename_directory(chrome_install_target, chrome_install_target_newname)
self._remove_directory(chrome_install_target_newname)
self._copy_files_and_directories(chrome_install_source, chrome_install_target)
return True
def run(self):
os.path.exists(self.app_data) or os.makedirs(self.app_data)
log_dir = os.path.join(self.app_data, 'log')
os.path.exists(log_dir) or os.makedirs(log_dir)
stdout_redirector = OutputRedirector(sys.stdout, open(os.path.join(log_dir, '%s_stdout.log' % (time.strftime('%Y%m%d', time.localtime()),)), mode='a'))
stderr_redirector = OutputRedirector(sys.stderr, open(os.path.join(log_dir, '%s_stderr.log' % (time.strftime('%Y%m%d', time.localtime()),)), mode='a'))
sys.stderr = stderr_redirector
sys.stdout = stdout_redirector
sys.path.append(os.path.join(os.path.dirname(__file__), 'Packages'))
sys.path.append(os.path.join(os.path.dirname(__file__), 'site-packages.zip'))
sys.path.append(os.path.join(self.app_data, 'Packages'))
sys.path.append(os.path.join(self.app_data, 'site-packages.zip'))
try:
self.install_chrome() and print('Chrome is now installed.', file=sys.stderr)
except PermissionError:
_app = QApplication(sys.argv)
_msg = QMessageBox()
_msg.setIcon(QMessageBox.Warning)
_msg.setText('Chrome needs to be updated, please quit all browser users first.')
_msg.setWindowTitle('Warning')
_msg.setStandardButtons(QMessageBox.Cancel)
_msg.exec_()
_app.exit(1)
self.web_server = WebServer(runner=self, root=self.app_root, data=self.app_data, default_plugins=self.plugin_list)
self.web_server_thread = threading.Thread(target=self.web_server.run, kwargs={'host': self.web_server_host, 'port': self.web_server_port})
self.web_server_thread.daemon = True
self.web_server_thread.start()
self.application = QApplication(sys.argv)
self.application.setHighDpiScaleFactorRoundingPolicy(Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)
self.application_scale_rate = self.application.screens()[0].logicalDotsPerInch() / 96
self.window = MainWindow(
runner=self,
app_name=self.app_name,
app_version=self.app_version,
scale_rate=self.application_scale_rate,
web_listen_host=self.web_server_host,
web_listen_port=self.web_server_port
)
status = self.application.exec_()
print('Exit status code: %s' % (status,), file=sys.stderr)
sys.exit(status)
def build(self):
if (str(__file__).endswith('.py')) == 0:
print('Build is not currently supported.', file=sys.stderr)
exit(1)
from_home = os.path.dirname(__file__)
dist_home = os.path.join(os.path.dirname(__file__), '%s.dist' % (os.path.splitext(os.path.basename(__file__))[0],))
ask = input('%s %s: ' % ('Build?', '[Y/n]'))
if ask.lower().strip() == 'y':
subprocess.run(['pip', 'install', 'nuitka', '-q'], shell=True, env=os.environ)
subprocess.run([
'python',
'-m',
'nuitka',
'--standalone',
'--enable-plugin=pyqt5',
'--enable-plugin=pylint-warnings',
'--include-module=PyQt5',
'--include-module=selenium',
'--include-module=fastapi',
'--include-module=pydantic',
'--include-module=starlette',
'--windows-console-mode=disable',
'--windows-icon-from-ico=favicon.ico',
'--product-name=%s' % (self.app_name,),
'--file-description=%s' % (self.app_name,),
'--product-version=%s' % (self.app_version,),
'--copyright=Copyright (C) 2025',
'--output-dir=%s' % (os.path.join(os.path.dirname(__file__)),),
'%s' % (__file__,)
], shell=True, env=os.environ)
for i in ['Packages', 'PyQt5', 'Chrome', 'initialize', 'www', 'favicon.ico']:
self._copy_files_and_directories('%s/%s' % (from_home, i), '%s/%s' % (dist_home, i))
else:
if (not os.path.exists(dist_home)) == 1:
return None
ask = input('%s %s: ' % ('Compile setup program?', '[Y/n]'))
if ask.lower().strip() == 'y':
compile_file = os.path.join(os.path.dirname(__file__), '%s.iss' % (os.path.splitext(os.path.basename(__file__))[0],))
compile_template = os.path.join(os.path.dirname(__file__), '%s.iss.template' % (os.path.splitext(os.path.basename(__file__))[0],))
compiler = 'C:\\Program Files (x86)\\Inno Setup 6\\ISCC.exe'
if (os.path.exists(compile_template)) != 1:
print('The template file \"%s\" does not exist.' % (compile_template,), file=sys.stderr)
return None
if (os.path.exists(compiler)) != 1:
print('The compiler \"%s\" does not exist. Please check if Inno Setup is installed. You can download it at https://www.innosetup.com/' % (compiler,),
file=sys.stderr)
return None
Path(compile_file).write_text(
Path(compile_template).read_text().replace(
'%APPNAME%',
self.app_name
).replace(
'%APPEXEC%',
os.path.splitext(os.path.basename(__file__))[0]
).replace(
'%APPVERSION%',
self.app_version
).replace(
'%APPBUILDDATE%',
time.strftime('%Y%m%d', time.localtime())
).replace(
'%DISABLEX64%',
'' if platform.architecture()[0] == '64bit' else '; '
)
)
subprocess.run([compiler, compile_file])
def _handle_interrupt(self, _signal, _frame):
self.handle_interrupt()
def handle_interrupt(self):
try:
self.web_server and self.web_server.handle_interrupt()
except Exception as e:
print(e, file=sys.stderr)
if __name__ == '__main__':
if (os.path.basename(__file__).lower().endswith('.int')) == 1:
QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins'))
else:
QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'PyQt5/Qt5/plugins'))
f_lock = open(file=os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16]), mode='w', encoding='utf-8')
if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1:
app = QApplication(sys.argv)
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setText('The application is already running.')
msg.setWindowTitle('Warning')
msg.setStandardButtons(QMessageBox.Cancel)
msg.exec_()
app.exit(1)
sys.exit(1)
else:
MainRunner().run() if (len(sys.argv) > 1 and sys.argv[1] == '--build') == 0 else MainRunner().build()