import os import re import sys import glob import json import time import ctypes import base64 import signal import random import shutil import socket import inspect import uvicorn import zipfile import hashlib import tempfile import platform import requests import requests.adapters import importlib.util import threading import subprocess from selenium.webdriver.common.by import By from selenium.webdriver.common.alert import Alert from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.remote.command import Command from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.common.exceptions import NoSuchWindowException from PyQt5.QtWebEngineWidgets import QWebEngineView, QWebEngineSettings, QWebEngineProfile from PyQt5.QtWebEngineCore import QWebEngineUrlRequestInterceptor from PyQt5.QtNetwork import QNetworkProxyFactory from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox, QSystemTrayIcon, QMenu, QAction, QDesktopWidget from PyQt5.QtCore import Qt, QCoreApplication, QTimer, QUrl from PyQt5.QtGui import QIcon from typing import List, Tuple, Any from fastapi import FastAPI, Response, UploadFile, File, HTTPException from pydantic import BaseModel from starlette.responses import JSONResponse, FileResponse from winotify import Notification, audio from func_timeout import func_set_timeout, FunctionTimedOut from pathlib import Path sys.path.append(os.path.join(os.path.dirname(__file__), 'Packages')) def _fd(f): return f.fileno() if hasattr(f, 'fileno') else f if os.name == 'nt': import msvcrt from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref) from ctypes.wintypes import BOOL, DWORD, HANDLE LOCK_SH = 0x0 LOCK_NB = 0x1 LOCK_EX = 0x2 LOCK_UN = 0x9 if sizeof(c_ulong) != sizeof(c_void_p): ULONG_PTR = c_int64 else: ULONG_PTR = c_ulong PVOID = c_void_p class _OFFSET(Structure): _fields_ = [ ('Offset', DWORD), ('OffsetHigh', DWORD) ] class _OFFSET_UNION(Union): _fields_ = [ ('_offset', _OFFSET), ('Pointer', PVOID) ] _anonymous_ = ['_offset'] class OVERLAPPED(Structure): _fields_ = [ ('Internal', ULONG_PTR), ('InternalHigh', ULONG_PTR), ('_offset_union', _OFFSET_UNION), ('hEvent', HANDLE) ] _anonymous_ = ['_offset_union'] LPOVERLAPPED = POINTER(OVERLAPPED) LockFileEx = windll.kernel32.LockFileEx LockFileEx.restype = BOOL LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED] UnlockFileEx = windll.kernel32.UnlockFileEx UnlockFileEx.restype = BOOL UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED] def flock(f, flags): hfile = msvcrt.get_osfhandle(_fd(f)) overlapped = OVERLAPPED() if flags == LOCK_UN: ret = UnlockFileEx( hfile, 0, 0, 0xFFFF0000, byref(overlapped) ) else: ret = LockFileEx( hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped) ) return bool(ret) else: try: import fcntl LOCK_SH = fcntl.LOCK_SH LOCK_NB = fcntl.LOCK_NB LOCK_EX = fcntl.LOCK_EX LOCK_UN = fcntl.LOCK_UN except (ImportError, AttributeError): LOCK_EX = LOCK_SH = LOCK_NB = 0 def flock(f, flags): return flags == LOCK_UN else: def flock(f, flags): return fcntl.flock(_fd(f), flags) == 0 try: """ 0: selenium(default), 1: selenium-wire, 2: selenium-wire with undetected driver(only chrome). """ seleniumClassesDriver = ['0', '1', '2'].index(os.environ.get('SELENIUM_CLASSES_DRIVER') or '0') match seleniumClassesDriver: case 0: from selenium.webdriver import Chrome as browser_webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service case 1: from seleniumwire.webdriver import Chrome as browser_webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service case 2: from seleniumwire.undetected_chromedriver import Chrome as browser_webdriver from seleniumwire.undetected_chromedriver import ChromeOptions as Options from selenium.webdriver.chrome.service import Service except ValueError: raise SystemError('Not supported driver classes.') def notification_send(app_id=None, title='', message='', reminder=None): app_id = app_id() if callable(app_id) else app_id notify = Notification(app_id=app_id, title=title, msg=message, icon=os.path.join(os.path.dirname(__file__), 'favicon.ico')) notify.set_audio(audio.Reminder, bool(reminder)) notify.show() def import_module(file: str): spec = importlib.util.spec_from_file_location(os.path.splitext(os.path.basename(file))[0], file) module_from_spec = importlib.util.module_from_spec(spec) spec.loader.exec_module(module_from_spec) return module_from_spec class BrowserMobileEmulation(dict): """ Mobile emulation parameters. """ def __init__(self, w=540, h=960, user_agent=None): du = base64.b64decode(bytes(''' TW96aWxsYS81LjAgKExpbnV4OyBVOyBBbmRyb2lkIDEzOyB6aC1jbjsgMjEwOTEx OUJDIEJ1aWxkL1RLUTEuMjIwODI5LjAwMikgQXBwbGVXZWJLaXQvNTM3LjM2IChL SFRNTCwgbGlrZSBHZWNrbykgVmVyc2lvbi80LjAgQ2hyb21lLzk4LjAuNDc1OC4x MDIgTVFRQnJvd3Nlci8xMy42IE1vYmlsZSBTYWZhcmkvNTM3LjM2 ''', encoding='utf-8')).decode() user_agent = user_agent or du super().__init__({'w': w, 'h': h, 'user_agent': user_agent}) self.w = self.h = self.user_agent = None def __setattr__(self, key, value): pass def __getitem__(self, item): try: return super().__getitem__(item) except KeyError: return None def __getattr__(self, item): try: return super().__getitem__(item) except KeyError: return None class CustomHTTPAdapter(requests.adapters.HTTPAdapter): def __init__(self, *args, **kwargs): from urllib.parse import urlparse self.urlparse = urlparse self.hosts = {} self.addrs = {} super().__init__(*args, **kwargs) @staticmethod def resolve_host(host): try: hosts = requests.get('http://119.29.29.29/d?dn=%s&ip=208.67.222.222' % host).text.replace(',', ';').split(';') except (requests.exceptions.RequestException, requests.exceptions.ConnectTimeout): hosts = [] return hosts[0] if len(hosts) > 0 else None def send(self, request, **kwargs): req = request connection_pool_kwargs = self.poolmanager.connection_pool_kw url_resolve = self.urlparse(req.url) scheme = url_resolve.scheme domain = url_resolve.netloc.split(':')[0] try: addition_port = ':%s' % url_resolve.netloc.split(':')[1] except IndexError: addition_port = '' ip_address = self.resolve_host(domain) if ip_address: self.hosts[domain] = ip_address self.addrs[ip_address] = domain req.url = req.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.hosts[domain], addition_port)) if scheme == 'https': connection_pool_kwargs['assert_hostname'] = domain connection_pool_kwargs['server_hostname'] = domain req.headers['Host'] = '%s%s' % (domain, addition_port) return super().send(req, **kwargs) def build_response(self, *args, **kwargs): res = super().build_response(*args, **kwargs) url_resolve = self.urlparse(res.url) domain = url_resolve.netloc.split(':')[0] try: addition_port = ':%s' % url_resolve.netloc.split(':')[1] except IndexError: addition_port = '' if domain in self.addrs.keys(): res.url = res.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.addrs[domain], addition_port)) return res class BrowserPathManager: def __init__(self): self.webdriver_install_location = tempfile.gettempdir() @staticmethod def resolve_browser_version(file: str): if not os.path.exists(file): raise FileNotFoundError('The executable file does not exist in %s' % file) if file.lower().endswith('.exe'): try: full_version = subprocess.run( ['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % file], shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=5 ).stdout.decode('utf-8').strip() except Exception: full_version = '' try: main_version = full_version.split('.')[0] except Exception: main_version = '' else: try: full_version = subprocess.run( '%s --version' % file, shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=5 ).stdout.decode('utf-8').strip() full_version = re.findall('[0-9]+[.\\d+]+', full_version)[-1] except Exception: full_version = '' try: main_version = full_version.split('.')[0] except Exception: main_version = '' return file, main_version, full_version @staticmethod def open_remote_resources(url: str, save_file: str = None, auto_redirects=False, retries=3): http = requests.Session() for scheme in ['http://', 'https://']: http.mount(scheme, CustomHTTPAdapter()) for i in range((retries if retries > 0 else 0) + 1): try: with http.get( url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36' }, allow_redirects=auto_redirects, stream=(save_file is not None) ) as response: if save_file: if 200 != response.status_code: return bool(0) with open(save_file, 'wb') as filestream: for chunk in response.iter_content(chunk_size=8192): filestream.write(chunk) return bool(1) else: if 200 != response.status_code: return '' else: return response.text except requests.exceptions.ConnectionError: retries > 0 and time.sleep(0.75 + round(random.random(), 2)) continue @staticmethod def find_binary(): plat = sys.platform find_list = [] plats = ['win32', 'linux', 'darwin'] if plat == plats[0]: for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']: find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/')) if plat == plats[1]: for p in ['/opt/google/chrome', '/usr/bin/google-chrome']: find_list.append('%s/chrome' % p) if plat == plats[2]: for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']: find_list.append('%s/chrome' % p) for execute_file in find_list: try: if os.path.exists(execute_file): return execute_file except Exception: pass def find_driver(self, main_version: str): if not int(main_version) >= 70: return None location = '%s%s%s' % ( self.webdriver_install_location, os.sep, 'chromedriver_%s%s' % (str(main_version), '.exe' if platform.system().lower() == 'windows' else '') ) return location.replace("\\", '/') if os.path.exists(location) else None def pull_driver(self, main_version: str): if not int(main_version) >= 70: return None chromedriver_site = 'https://chromedriver.storage.googleapis.com' latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version)) if '' == latest_release: return None plat = sys.platform match_assets = [] plats = ['win32', 'linux', 'darwin'] child = ['chromedriver.exe', 'chromedriver'] tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1'] if plat == plats[0]: match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]]) if plat == plats[1]: match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]]) if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0): match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]]) if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1): match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]]) match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]]) package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip') destdir_chromedriver = self.webdriver_install_location for assets in match_assets: res_url = '%s/%s/%s' % (chromedriver_site, latest_release, assets[1]) print('Downloading version %s chromedriver %s to %s...' % (latest_release, res_url, destdir_chromedriver), file=sys.stderr) if self.open_remote_resources(res_url, package_chromedriver): dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], destdir_chromedriver) dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version)) os.path.exists(dist_chan) and os.remove(dist_chan) os.rename(dist, dist_chan) assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777) os.remove(package_chromedriver) return dist_chan.replace("\\", '/') def main(self, binary: str = None, driver: str = None): binary = binary or self.find_binary() if not binary: raise FileNotFoundError('No browser executable file is found on your system, please confirm whether it has been installed') if not os.path.exists(binary): raise FileNotFoundError('The executable file does not exist in %s' % binary) version = self.resolve_browser_version(binary) if not version: raise RuntimeError('Failure to get the browser version number failed in %s' % binary) driver = driver if driver else self.find_driver(version[1]) driver = driver if driver else self.pull_driver(version[1]) if not driver: raise FileNotFoundError('Not specified the driver path, and try the automatic download failure') if not os.path.exists(driver): raise FileNotFoundError('The driver does not exist in %s' % driver) return binary, driver class SeleniumClear: def __init__(self): self.last = '%s/.selenium_clear_last' % tempfile.gettempdir() @staticmethod def clear_selenium(): if platform.uname().system.lower() == 'windows': user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')] if user_home[0] and user_home[1]: try: shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1])) except FileNotFoundError: pass @staticmethod def clear_driver_cache(): for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']: for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)): try: shutil.rmtree(i) except (FileNotFoundError, PermissionError, WindowsError): pass @staticmethod def file_get_contents(file, text=None): if not os.path.exists(file): return text return open(file=file, mode='r', encoding='utf-8').read() @staticmethod def file_put_contents(file, text=None): return open(file=file, mode='w', encoding='utf-8').write(text) def straight_clear(self): self.clear_selenium() self.clear_driver_cache() self.file_put_contents(self.last, str(int(time.time()))) def auto(self): try: int(self.file_get_contents(self.last, '0')) + 86400 < int(time.time()) and self.straight_clear() except ValueError: os.remove(self.last) class PositionTab: """ Position for switch tab. """ Prev = 'Go-Prev' Next = 'Go-Next' class ColorUtils: """ Color utils. """ @staticmethod def hex2rgb(color): color = color[1:].upper() for x in color: if x not in '0123456789ABCDEF': raise Exception('Found invalid hexa character {0}.'.format(x)) if len(color) == 6 or len(color) == 8: color = '#' + color[0:6] elif len(color) == 3: color = '#' + color[0] * 2 + color[1] * 2 + color[2] * 2 else: raise Exception('Hexa string should be 3, 6 or 8 digits. if 8 digits, last 2 are ignored.') hexcolor = color[1:] r, g, b = int(hexcolor[0:2], 16), int(hexcolor[2:4], 16), int(hexcolor[4:6], 16) return r, g, b class FetchResult: def __init__(self, data): self.status = 0 self.header = {} self.result = '' data and self._resolve_data(data) def _resolve_data(self, data: dict): self.status = data['status'] self.header = data['header'] self.result = data['result'] def __str__(self): return '%s: %s\n%s: %s\n%s: %s' % ('STATUS', self.status, 'HEADER', self.header, 'RESULT', self.result) class Browser(browser_webdriver): """ Browser web driver. """ def __init__( self, driver: str = None, binary: str = None, debugger_address: str = None, headless: bool = False, lang: str = None, mute: bool = False, no_images: bool = False, user_agent: str = None, http_proxy: str = None, home: str = None, window_size: str = None, window_site: str = None, mobile_emulation: BrowserMobileEmulation = None, option_arguments: list = None, req_interceptor=None, res_interceptor=None, ): self.is_linux = sys.platform.startswith('linux') SeleniumClear().auto() classes = seleniumClassesDriver binary, driver = BrowserPathManager().main(binary, driver) if self.is_linux is bool(1) and not window_size: window_size = '1920x1080' if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080' # Initialization settings. if (isinstance(option_arguments, list)) is bool(0): option_arguments = [] cdplist = [] service = Service() options = Options() self.cdplist = cdplist # Delete prompt information of chrome being controlled. exclude_switches = ['enable-automation', 'enable-logging', 'disable-translate'] options.add_experimental_option('debuggerAddress', debugger_address) if debugger_address else options.add_experimental_option('excludeSwitches', exclude_switches) # Mobile emulation parameter setting start. if mobile_emulation and not debugger_address: self.w_browser = mobile_emulation.w + 14 self.h_browser = mobile_emulation.h + 0 self.w_inner_window = mobile_emulation.w + 0 self.h_inner_window = mobile_emulation.h - 86 self.mobile_emulation_screen_w = mobile_emulation.w self.mobile_emulation_screen_h = mobile_emulation.h window_size = '%s,%s' % (self.w_browser, self.h_browser) options.add_experimental_option( 'mobileEmulation', { 'deviceMetrics': { 'width': self.w_inner_window, 'height': self.h_inner_window, 'pixelRatio': 2.75, 'touch': True }, 'userAgent': mobile_emulation.user_agent } ) cdplist.append([ 'Emulation.setUserAgentOverride', { 'userAgent': mobile_emulation.user_agent, 'userAgentMetadata': { 'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone', 'mobile': True, 'platformVersion': '', 'architecture': '', 'model': '' } } ]) else: self.w_browser = 0 self.h_browser = 0 self.w_inner_window = 0 self.h_inner_window = 0 self.mobile_emulation_screen_w = 0 self.mobile_emulation_screen_h = 0 # Mobile emulation parameter setting end. # Set browser and webdriver path. if driver: service.path = driver if binary: options.binary_location = binary # Add webdriver option arguments. for i in option_arguments: options.add_argument(i) # Set headless mode. if self.is_linux or headless: options.add_argument('--headless=new') # Set no-sandbox mode. if self.is_linux: options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-gpu') # Set language of browser, default is zh-CN. if lang: options.add_argument('--lang=%s' % (lang or 'zh-CN')) hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN') # Set mute. if mute: options.add_argument('--mute-audio=true') hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr) # Set no images mode. if no_images: options.add_argument('--blink-settings=imagesEnabled=false') hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr) # Set default user agent. if user_agent: options.add_argument('--user-agent=%s' % user_agent) hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent) # Set http proxy for browser. if http_proxy: options.add_argument('--proxy-server=http://%s' % http_proxy) # Set browser window size before startup. if window_size: debugger_address or options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ',')) else: debugger_address or options.add_argument('--start-maximized') # Start the browser. undetected_kwargs = {'driver_executable_path': driver, 'browser_executable_path': binary, 'version_main': 111} if classes == 2 else {} super().__init__(service=service, options=options, **undetected_kwargs) # Selenium-Wire backend optimization start. try: self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.') self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.') self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.') except AttributeError: pass # Selenium-Wire backend optimization end. if mobile_emulation: cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}]) cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}]) cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}]) # Set the request and response interceptor. if req_interceptor: hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr) self.request_interceptor = req_interceptor if res_interceptor: hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr) self.response_interceptor = res_interceptor # Sync set http proxy for Selenium-Wire backend. if http_proxy: self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy} # Set browser window size after startup, by default, there will be full screen display window. if window_size: try: self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(',')) except Exception: pass else: self.maximize_window() if window_site: try: self.set_window_position(*window_site.replace("\x20", '').split(',')) except Exception: pass # Sets a sticky timeout to implicitly wait for an element to be found. self.implicitly_wait(10) # Set the amount of time to wait for a page load to complete. self.set_page_load_timeout(25) # Open the default page. home and self.open(home) @property def title(self) -> str: """ Return current title. """ self._update_tab_auto_switch() if (self.current_alert is None) == 1: return super().title else: return '' @property def current_url(self) -> str: """ Return current url. """ self._update_tab_auto_switch() if (self.current_alert is None) == 1: return super().current_url else: return '' @property def current_alert(self): """ Return current alert object. """ try: alert = Alert(self) self.execute(Command.W3C_GET_ALERT_TEXT) return alert except Exception: return None @property def current_alert_text(self) -> str: """ Return current alert content. """ try: return Alert(self).text except Exception: return '' @property def window_inner_size(self) -> tuple: """ Return the page window inner size. """ size = self.execute_script('return [window.innerWidth, window.innerHeight];') return size[0], size[1] def open(self, url=None): """ Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer. """ self._update_tab_auto_switch() self._update_cdp_command() return self.get(url) def turn(self, url=None): """ Simulation "window.location.href" jumps, the new page has Referrer. """ return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None) def wait(self, secs: int | float = 1): """ Will sleep waiting. """ number_int = int(secs) number_float = secs - number_int try: for i in range(number_int): time.sleep(1) else: time.sleep(number_float) except (KeyboardInterrupt, InterruptedError): print('Interrupted', file=sys.stderr) self.quit() def quit(self): """ Exit the browser. """ try: super().quit() except Exception: pass def find(self, path, wait_for=False, timeout: float = 5.0, freq: float = 0.5, delay: float = 0.0) -> WebElement: """ Use XPath to find an element. """ element = self.webdriver_wait(timeout, freq).until(EC.presence_of_element_located((By.XPATH, path))) if wait_for else self.find_element(By.XPATH, path) delay and self.wait(delay) element = self.find_element(By.XPATH, path) if delay else element self._element_highlight(element, '#F8BE5F') return element def find_mult(self, path) -> list: """ Use XPath to find elements. """ element = self.find_elements(By.XPATH, path) for this_element in element: self._element_highlight(this_element, '#F8BE5F') return element def find_mult_random_choice(self, path) -> WebElement: """ Use XPath to find elements then random choice one. """ element = self.find_elements(By.XPATH, path) element = random.choice(element) self._element_highlight(element, '#F8BE5F') return element def find_element_by(self, sentence): """ Custom find element, pass into a tuple or list. """ element = self.find_element(*sentence) self._element_highlight(element, '#F8BE5F') return element def click(self, element): """ Click element. """ self._element_highlight(element, '#FF0000') element.click() def click_simulate(self, element): """ Click element for simulate. """ self._element_click_effect(element) self.action_chains().reset_actions() self.action_chains().click(element).perform() self.wait(0.1) def touch(self, x, y): """ Click on the coordinate. """ self.action_chains().reset_actions() self.action_chains().move_by_offset(x, y).click().perform() self.wait(0.1) def input(self, element, content): """ Enter the content to the element. """ self._element_highlight(element, '#00B6F1') self.action_chains().reset_actions() self.action_chains().send_keys_to_element(element, content).perform() self.wait(0.1) def mouse(self, element): """ Park the mouse here. """ self._element_highlight(element, '#49DC07') self.action_chains().reset_actions() self.action_chains().move_to_element(element).perform() def scroll(self): """ Scroll page. """ self.action_chains().reset_actions() self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform() self.wait(0.8) def scroll_to(self, pos: int | str): """ Scroll to the specified location. """ if isinstance(pos, int) and pos > 0: self.execute_script('window.scrollTo(0, arguments[0]);', pos) elif pos == 0: self.execute_script('window.scrollTo(0, 0);') elif pos == 0 - 1: self.execute_script('window.scrollTo(0, document.body.scrollHeight);') else: pass self.wait(0.8) def scroll_to_element(self, element): """ Scroll to the specified element location. """ self.action_chains().reset_actions() self.action_chains().scroll_to_element(element).perform() self.wait(0.8) def fetch(self, url: str, options: dict): """ Sending http requests using fetch. """ return FetchResult(self.execute_async_script(''' var _callback = arguments[arguments.length - 1]; var _url = arguments[0]; var _options = arguments[1]; var _data = {}; fetch(_url, _options) .then((response) => { _data.status = response.status; let headers = {}; response.headers.forEach((value, name) => { headers[name] = value; }); _data.header = headers; return response.text(); }) .then((result) => { _data.result = result; _callback(_data); }) .catch((error) => { console.error(error); _callback(null); }); ''', url, options or {})) def frame_switch_to(self, element_of_frame): """ Switch frame to the specified frame element. """ self.switch_to.frame(element_of_frame) self.wait(0.2) def frame_switch_to_default(self): """ Switch to the default frame. """ self.switch_to.default_content() self.wait(0.2) def tab_create(self, url=None): """ Create a new tab and open the URL. """ self.switch_to.new_window('tab') self._update_cdp_command() url and self.open(url) def tab_switch(self, tab: int | str): """ Switch the browser tab page. """ handles = self.window_handles lengths = len(handles) current = handles.index(self.current_window_handle) if isinstance(tab, int): handle = tab elif tab == PositionTab.Prev: handle = (current - 1) elif tab == PositionTab.Next: handle = (current + 1) % lengths else: handle = None self.switch_to.window(handles[handle]) self.wait(0.2) self._update_cdp_command() def tab_switch_prev(self): """ Switch to the previous tab. """ self.tab_switch(PositionTab.Prev) def tab_switch_next(self): """ Switch to next tab. """ self.tab_switch(PositionTab.Next) def tab_cancel(self): """ Close the current browser tab page. """ handles = self.window_handles if len(handles): current = handles.index(self.current_window_handle) self.close() current > 0 and self.switch_to.window(handles[current - 1]) self.wait(0.2) def tab_cancel_all(self): """ Close all the browser tab page. """ handles = self.window_handles for i in handles: self.tab_cancel() def force_display_element(self, element): """ Make hidden element visible and interactive. """ self.execute_script( 'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element ) def screenshot(self) -> bytes: """ Screenshot as bytes. """ return self.get_screenshot_as_png() def action_chains(self) -> ActionChains: """ Return ActionChains object. """ return ActionChains(self) def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None): """ Return WebDriverWait object. """ return WebDriverWait( driver=self, timeout=timeout, poll_frequency=poll_frequency, ignored_exceptions=ignored_exceptions ) def _element_highlight(self, element=None, color='#ff0000', dura=2500): """ Make the element highlight. """ if not element: return False high = ColorUtils.hex2rgb(color) r = high[0] g = high[1] b = high[2] self.execute_script(''' let e=arguments[0]; try{ let o=[e.style.background||null,e.style.border||null]; e.style.border="1px solid %s";e.style.background="rgba(%s,%s,%s,0.2)"; if(!e.prominent){ e.prominent=true; setTimeout(function(args){try{args[0].prominent=null;args[0].style.background=args[1][0];args[0].style.border=args[1][1]}catch(e){}},%s,[e,o]); } }catch(e){} ''' % (color, r, g, b, dura), element ) def _element_click_effect(self, element=None, x: int = 0, y: int = 0): """ Make a coordinate click effect. """ self.execute_script(''' let e=arguments[0]; let r; let x; let y; if(e!==null){ r=e.getBoundingClientRect(); x=r.left+r.width/2+"px"; y=r.top+r.height/2+"px"; } else{ x=arguments[1]+"px"; y=arguments[2]+"px"; } let c=document.createElement("div"); c.style="width:%spx;height:%spx;border-radius:50%%;background-color:rgba(255,0,0,0.18);position:absolute;transform:translate(-50%%,-50%%);transition:opacity 0.5s;border:1px solid #ff3c3c;pointer-events:none"; c.style.zIndex=9999; c.style.left=x;c.style.top=y; document.body.appendChild(c); setTimeout(function(){c.style.opacity=0;setTimeout(function(){document.body.removeChild(c)},999)},200); let w=%s; let h=%s; let d=false; let i=setInterval(function(){ if((w>%s||h>%s)||d){ d=true; w-=2; h-=2; } else{ w+=5; h+=5; } c.style.width=w+"px";c.style.height=h+"px"; if((w<=12||h<=12)&&d){clearInterval(i)} },20); ''' % (0, 0, 0, 0, 30, 30), element, x, y ) def _update_cdp_command(self): for cmd in self.cdplist: self.execute_cdp_cmd(*cmd) def _update_tab_auto_switch(self): if (self.current_alert is None) == 1: try: self.execute(Command.GET_TITLE) except Exception: len(self.window_handles) > 0 and self.switch_to.window(self.window_handles[0]) class BrowserPluginParent: id = None name = None requirements = [] def __init__(self): self.thread = None self.thread_exception = None self.app_id = None def _run_task(self, *args, **kwargs): try: self.running(*args, **kwargs) except Exception as e: self.thread_exception = e self.notification_except(e) def notification(self, message=None): notification_send(app_id=self.app_id, title=self.name, message='%s' % (message,)) def notification_except(self, exception_info): notification_send(app_id=self.app_id, title='%s ' % (self.name,), message='%s' % (exception_info,)) def run(self, *args, **kwargs) -> threading.Thread: thread = threading.Thread(target=self._run_task, args=args, kwargs={**kwargs, **{'notification': self.notification}}) self.thread = thread thread.daemon = False thread.start() return thread def interrupt(self): try: self.thread and self.thread.is_alive() and ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.thread.ident), ctypes.py_object(SystemExit)) except Exception: pass finally: self.thread = None def state(self): return self.thread.is_alive() if self.thread else None @staticmethod def running(driver: Browser, notification, requirements): time.sleep(5) class BrowserPluginFileTest(BrowserPluginParent): name = '单个文件上传测试' requirements = [ { 'type': 'file', 'html': '上传文件' } ] @staticmethod def running(driver, notification, requirements): notification(requirements) class BrowserPluginMultFileTest(BrowserPluginParent): name = '多个文件上传测试' requirements = [ { 'type': 'file-mult', 'html': '上传文件' } ] @staticmethod def running(driver, notification, requirements): notification(requirements) class BrowserPluginTextTest(BrowserPluginParent): name = '单行文本输入测试' requirements = [ { 'type': 'text', 'html': '输入文本' } ] @staticmethod def running(driver, notification, requirements): notification(requirements) class BrowserPluginMultTextTest(BrowserPluginParent): name = '多行文本输入测试' requirements = [ { 'type': 'text-mult', 'html': '输入文本' } ] @staticmethod def running(driver, notification, requirements): notification(requirements) class BrowserPluginFileCommandDebug(BrowserPluginParent): name = '上传脚本进行调试' requirements = [ { 'type': 'file', 'html': '调试' } ] @staticmethod def running(driver, notification, requirements): file = requirements[0][0]['path'] exec(open(file, mode='r', encoding='utf-8').read()) class BrowserPluginTextCommandDebug(BrowserPluginParent): name = '输入命令进行调试' requirements = [ { 'type': 'text-mult', 'html': '调试' } ] @staticmethod def running(driver, notification, requirements): code = requirements[0] code and exec(code) class BrowserManagerDataStorage(dict): def __init__(self, file: str): self._data_file = os.path.abspath(file) super().__init__(self._get_json_data(self._data_file, {})) @staticmethod def _get_json_data(file: str, data=None): if os.path.exists(file): try: return json.loads(open(file=file, mode='r', encoding='utf-8').read()) except json.decoder.JSONDecodeError: return data return data @staticmethod def _put_json_data(file: str, data=None): with open(file=file, mode='w', encoding='utf-8') as f: flock(f, LOCK_EX) res = f.write(json.dumps(data, indent=4, ensure_ascii=True)) flock(f, LOCK_UN) return res def __getitem__(self, item): if item in self: return super().__getitem__(item) else: return None def __setitem__(self, key, value): super().__setitem__(key, value) def save(self): self._put_json_data(self._data_file, self) class BrowserManagerUserRunning: def __init__( self, running=None, user_id: str = None, user_name: str = None, user_data_dir: str = None, remote_debugging_port: int = None ): if isinstance(running, type(self)): self.__dict__ = running.__dict__ else: self.active = 1 self.update_status_last_time = 0 self.user_id = user_id self.user_name = user_name self.user_data_dir = user_data_dir self.remote_debugging_port = remote_debugging_port self.chrome = None self.driver = None self.plugin = None self.is_running_property = False self.url = None self.title = None self.alert = None self.window_handles = None self.update_status_details() @property def is_running(self): self.update_status_running() return self.is_running_property @is_running.setter def is_running(self, value): self.is_running_property = value @func_set_timeout(0.05) def _get_window_handles(self): return list(self.driver.window_handles) @func_set_timeout(0.05) def _get_current_alert_text(self): return str(self.driver.current_alert_text) @func_set_timeout(0.05) def _get_title(self): return str(self.driver.title) @func_set_timeout(0.05) def _get_current_url(self): return str(self.driver.current_url) def update_status_running(self): if (not self.chrome or self.chrome.poll() is not None) == 1: self.chrome_stopped_trigger() else: self.chrome_running_trigger() def update_status_details(self): if (not self.chrome or self.chrome.poll() is not None) == 1: self.chrome_stopped_trigger() else: self.chrome_running_trigger() if (round(time.time(), 3) - self.update_status_last_time) > round(random.uniform(3, 12), 3) and self.driver and self.active: try: self.window_handles = self._get_window_handles() self.alert = self._get_current_alert_text() self.title = self._get_title() self.url = self._get_current_url() except FunctionTimedOut: pass except Exception: pass self.update_status_last_time = round(time.time(), 3) def chrome_running_trigger(self): if (not self.is_running_property) == 0: return None self.is_running = bool(1) def chrome_stopped_trigger(self): if (not self.is_running_property) == 1: return None self.is_running = bool(0) self.url = None self.title = None self.alert = None self.window_handles = None if (not self.chrome) == 0: self.chrome = None if (not self.plugin) == 0: plugin = self.plugin threading.Thread(target=plugin.interrupt).start() self.plugin = None if (not self.driver) == 0: driver = self.driver threading.Thread(target=driver.quit).start() self.driver = None def status(self): threading.Thread(target=self.update_status_details).start() return { 'user_id': self.user_id, 'user_name': self.user_name, 'user_data_dir': self.user_data_dir, 'remote_debugging_port': self.remote_debugging_port, 'is_running': self.is_running, 'current_plugin_executing': self.plugin.id if self.plugin and self.plugin.state() else None, 'url': self.url, 'title': self.title, 'alert': self.alert, 'window_handles': self.window_handles } def app_id(self): return '%s|%s' % (self.user_id, self.user_name) def set_chrome(self, chrome=None): self.chrome = chrome def set_driver(self, driver: Browser = None): self.driver = driver def set_plugin(self, plugin: BrowserPluginParent = None): self.plugin = plugin def set_user_name(self, user_name: str): self.user_name = user_name class BrowserManager: def __init__(self, driver: str, binary: str, manager_data_file: str, browser_data_home: str, browser_init_home: str): if not os.path.exists(driver): raise FileNotFoundError('The driver executable file does not exist.') if not os.path.exists(binary): raise FileNotFoundError('The binary executable file does not exist.') self.driver = driver self.binary = binary self.manager_data_file = manager_data_file self.browser_data_home = browser_data_home self.browser_init_home = browser_init_home self.threading_lock = threading.RLock() self.debugging_port_range = range(60000, 60255) self.data_storage = BrowserManagerDataStorage(manager_data_file) if (not self.data_storage) == 1: self.data_storage['browser_user'] = {} self.data_storage.save() os.path.exists(browser_data_home) or os.makedirs(browser_data_home) self.geometry_config = {} self.user_running = {user_id: self._initialize_user_running(user_id) for user_id in self.data_storage['browser_user'].keys()} self.user_in_operation = [] self.plugins_int = {} self.plugins_ext = {} def _generate_remote_debugging_port(self): exist_ports = [value['remote_debugging_port'] for key, value in self.data_storage['browser_user'].items()] return random.choice([p for p in self.debugging_port_range if p not in exist_ports]) def _update_user_running(self): for user_id, running in self.user_running.items(): if (not running) == 1: continue run = BrowserManagerUserRunning(running) if (not run.chrome or run.chrome.poll() is not None) == 1: run.chrome_stopped_trigger() else: run.chrome_running_trigger() def _get_user_name(self, user_id: str): return str(self.data_storage['browser_user'][user_id]['user_name'] or '') def _get_user_data_dir(self, user_id: str): return os.path.join(self.browser_data_home, self.data_storage['browser_user'][user_id]['user_data_dir']) def _get_user_remote_debugging_port(self, user_id: str): return int(self.data_storage['browser_user'][user_id]['remote_debugging_port']) def _get_user_app_id(self, user_id: str): user_name = self.data_storage['browser_user'][user_id]['user_name'] return '%s%s%s' % (user_id, user_name and ' - ', user_name or '') def _initialize_user_running(self, user_id: str): return BrowserManagerUserRunning( user_id=user_id, user_name=self._get_user_name(user_id), user_data_dir=self._get_user_data_dir(user_id), remote_debugging_port=self._get_user_remote_debugging_port(user_id) ) @property def plugins(self) -> dict: return {**self.plugins_int, **self.plugins_ext} def handle_interrupt(self): print('Received interrupt. Ending all browser users...', file=sys.stderr) online_users = self.user_ids_online() online_count = len(online_users) for user_id in online_users: threading.Thread(target=self.user_die, args=(user_id,)).start() for i in range(online_count): try: time.sleep(0.15) except KeyboardInterrupt: pass def run_browser(self, user_data_dir: str, remote_debugging_port: int): options = [ '--disable-background-networking', '--disable-desktop-notifications', '--disable-component-update', '--no-default-browser-check', '--no-first-run', '--user-data-dir=\"%s\"' % (user_data_dir,), '--remote-debugging-port=%s' % (remote_debugging_port,) ] return subprocess.Popen('\"%s\" %s' % (self.binary, ' '.join(options)), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True) def load_plugins(self, plugins, is_external=0): if (plugins is None) == 1: return None if (not is_external) == 1: plugin_dict = self.plugins_int plugin_id_prefix = '00' else: plugin_dict = self.plugins_ext plugin_id_prefix = '01' plugin_dict.clear() plugin_length = len(plugins) for i in range(plugin_length): plugin_class = plugins[i] plugin_id = '%s_%s_%s' % (plugin_id_prefix, '{:02d}'.format(i + 1), hashlib.md5(str(plugin_class.__name__).encode(encoding='utf-8')).hexdigest()) plugin_class.id = plugin_id plugin_dict[plugin_id] = plugin_class def load_plugins_from_external_module(self): module_home = os.path.join(os.path.dirname(__file__), 'Packages') if (not os.path.exists(module_home)) == 1: return None try: plugins_classes_site = [] module_list = glob.glob(os.path.join(module_home, 'Plugin*.py')) for module_path in module_list: print('Load plugins from \"%s\"' % (module_path,), file=sys.stderr) plugins_modules = import_module(module_path) plugins_classes = [type(name, (cls, BrowserPluginParent), {}) for name, cls in plugins_modules.__dict__.items() if name.startswith('BrowserPlugin') and inspect.isclass(cls)] plugins_classes_site.append(plugins_classes) self.load_plugins([element for sublist in plugins_classes_site for element in sublist], is_external=1) except Exception as e: print(e, file=sys.stderr) def update_geometry_config(self, screen_w: int, screen_h: int, window_w: int, window_h: int, window_x: int, window_y: int): self.geometry_config = { 'screen_w': screen_w, 'screen_h': screen_h, 'control_window_w': window_w, 'control_window_h': window_h, 'control_window_x': window_x, 'control_window_y': window_y, 'browser_window_w': round(window_h * 1.78) if ((screen_w - window_w) / window_h) > 1.78 else (screen_w - window_w), 'browser_window_h': window_h, 'browser_window_x': window_x + window_w, 'browser_window_y': window_y } def user_operate_starting(self, user_id: str): user_id = str(user_id) with self.threading_lock: if (user_id not in self.user_in_operation) == 1: self.user_in_operation.append(user_id) else: raise RuntimeError('Busy.') def user_operate_complete(self, user_id: str): user_id = str(user_id) with self.threading_lock: while user_id in self.user_in_operation: self.user_in_operation.remove(user_id) def is_user_data_occupied(self, user_id: str): if (os.access(os.path.join(self._get_user_data_dir(user_id), 'lockfile'), os.R_OK | os.W_OK)) == 1: return True else: return False def is_port_in_use(self, port: int): if (port in self.debugging_port_range) == 0: return True s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(0.5) try: return s.connect_ex(('127.0.0.1', port)) == 0 except socket.timeout: return False finally: s.close() def plugin_run(self, user_id: str, plugin_id: str, requirements=None): try: self.user_operate_starting(user_id) if (user_id in self.user_ids_online()) == 0: raise FileExistsError('User ID is not running.') running = BrowserManagerUserRunning(self.user_running[user_id]) if (running.plugin and running.plugin.state()) == 1: raise Exception('The plugin is running, please stop it first.') driver = running.driver if (driver is None) == 1: raise Exception('No driver object.') plugin = self.plugins[plugin_id]() plugin.app_id = self._get_user_app_id(user_id) plugin.run(driver=driver, requirements=requirements) running.set_plugin(plugin) finally: self.user_operate_complete(user_id) def plugin_shutdown(self, user_id: str): try: self.user_operate_starting(user_id) if (user_id in self.user_ids_online()) == 0: raise FileExistsError('User ID is not running.') running = BrowserManagerUserRunning(self.user_running[user_id]) if (running.plugin and running.plugin.state()) != 1: raise Exception('The plugin is not running.') plugin = running.plugin plugin.interrupt() finally: self.user_operate_complete(user_id) def user_ids(self): return [user_id for user_id in self.data_storage['browser_user'].keys()] def user_ids_online(self): self._update_user_running() return [user_id for user_id, running in self.user_running.items() if running and running.is_running] def user_all_details(self, user_id: str = None): plugins = {plugin_id: {'name': plugin_class.name, 'requirements': plugin_class.requirements} for plugin_id, plugin_class in self.plugins.items()} details = {user_id: self.user_running[user_id].status() for user_id in self.user_ids() if user_id in self.user_running.keys()} details = {user_id: {**detail, **{'plugins': plugins}} for user_id, detail in details.items()} if (user_id and user_id in details.keys()) == 1: return details[user_id] else: return details def user_set_name(self, user_id: str, user_name: str): try: self.user_operate_starting(user_id) if (user_id in self.user_ids()) == 0: raise FileExistsError('User ID not exists.') if (not isinstance(user_name, str) or len(user_name) > 24) == 1: raise ValueError('Illegal Username.') user_name = user_name.strip() self.data_storage['browser_user'][user_id]['user_name'] = user_name self.data_storage.save() self.user_running[user_id].set_user_name(user_name) finally: self.user_operate_complete(user_id) @func_set_timeout(0.35) def user_focus_window(self, user_id: str): if (user_id in self.user_ids()) == 0: raise FileExistsError('User ID not exists.') if (user_id in self.user_ids_online()) == 0: raise FileExistsError('User ID is not running.') try: self.user_running[user_id].driver.minimize_window() self.user_running[user_id].driver.set_window_rect( self.geometry_config['browser_window_x'], self.geometry_config['browser_window_y'], self.geometry_config['browser_window_w'], self.geometry_config['browser_window_h'] ) except Exception: pass def user_add(self, user_id: str): try: self.user_operate_starting(user_id) if (not isinstance(user_id, str) or not re.match('^[0-9A-Za-z_-]{1,16}$', user_id)) == 1: raise ValueError('Invalid user ID format.') if (user_id in self.user_ids()) == 1: raise FileExistsError('User ID already exists.') basename = 'data_%s' % (user_id,) data_dir = os.path.join(self.browser_data_home, basename) self.data_storage['browser_user'][user_id] = { 'user_name': '', 'user_data_dir': basename, 'remote_debugging_port': self._generate_remote_debugging_port() } self.data_storage.save() self.user_running[user_id] = self._initialize_user_running(user_id) if (os.path.exists(self.browser_init_home)) == 1: os.path.exists(data_dir) or shutil.copytree(self.browser_init_home, data_dir) else: os.path.exists(data_dir) or os.makedirs(data_dir) finally: self.user_operate_complete(user_id) def user_del(self, user_id: str): try: self.user_operate_starting(user_id) if (user_id in self.user_ids()) == 0: raise FileExistsError('User ID not exists.') if (user_id in self.user_ids_online()) == 1: raise FileExistsError('User ID is running, please stop it first.') data_dir = self._get_user_data_dir(user_id) data_dir_deleted = '%s_deleted' % (data_dir,) os.path.exists(data_dir) and os.rename(data_dir, data_dir_deleted) os.path.exists(data_dir_deleted) and shutil.rmtree(data_dir_deleted) self.data_storage['browser_user'].pop(user_id) self.data_storage.save() self.user_running.pop(user_id) finally: self.user_operate_complete(user_id) def user_run(self, user_id: str): try: self.user_operate_starting(user_id) if (user_id in self.user_ids()) == 0: raise FileExistsError('User ID not exists.') if (self.is_user_data_occupied(user_id)) == 1: raise Exception('The instance may be running, please shutdown the instance first.') user_data_dir = self._get_user_data_dir(user_id) remote_debugging_port = self._get_user_remote_debugging_port(user_id) chrome = self.run_browser(user_data_dir=user_data_dir, remote_debugging_port=remote_debugging_port) running = BrowserManagerUserRunning(self.user_running[user_id]) running.active = 1 running.set_chrome(chrome) driver = Browser( driver=self.driver, binary=self.binary, window_size='%s,%s' % (self.geometry_config['browser_window_w'], self.geometry_config['browser_window_h']), window_site='%s,%s' % (self.geometry_config['browser_window_x'], self.geometry_config['browser_window_y']), debugger_address='127.0.0.1:%s' % (remote_debugging_port,) ) running.set_driver(driver) finally: self.user_operate_complete(user_id) def user_die(self, user_id: str): try: self.user_operate_starting(user_id) if (user_id in self.user_ids_online()) == 0: raise FileExistsError('User ID is not running.') debugging_port = self._get_user_remote_debugging_port(user_id) running = BrowserManagerUserRunning(self.user_running[user_id]) running.active = 0 driver = running.driver plugin = running.plugin plugin and plugin.state() and plugin.interrupt() if (driver is None) == 1: raise Exception('No driver object.') if (self.is_port_in_use(debugging_port)) == 1: tab_handles = list(driver.window_handles) tab_current = driver.current_window_handle tab_current in tab_handles and tab_handles.remove(tab_current) tab_handles.append(tab_current) for tab in tab_handles: try: driver.switch_to.window(tab) for i in range(5): try: driver.current_alert.dismiss() except Exception: break driver.close() except NoSuchWindowException: pass threading.Thread(target=driver.quit).start() while self.is_user_data_occupied(user_id): try: time.sleep(0.35) except KeyboardInterrupt: pass else: raise Exception('The debug port is not in listening.') finally: self.user_operate_complete(user_id) class MainWindow(QMainWindow): def __init__(self, runner, app_name: str, app_version: str, scale_rate: float, web_listen_host: str, web_listen_port: int): super().__init__() self.runner = runner self.scale_rate = scale_rate self.web_listen_host = web_listen_host self.web_listen_port = web_listen_port self.setWindowTitle('%s %s' % (app_name, app_version)) self.setAcceptDrops(True) self.setWindowFlags(self.windowFlags() | Qt.WindowMinMaxButtonsHint) screen = QDesktopWidget().screenGeometry() self.screen_w, self.screen_h = screen.width(), screen.height() self.window_w, self.window_h = round(480 * self.scale_rate), round(868 * self.scale_rate) self.window_x, self.window_y = 0, 0 self.setFixedSize(self.window_w, self.window_h) QNetworkProxyFactory.setUseSystemConfiguration(False) self.webview = WebEngineView() self.webview_cache_dir = os.path.join(tempfile.gettempdir(), hashlib.md5(bytes('%s_cache' % (__file__,), encoding='utf-8')).hexdigest()[:32]) web_settings = self.webview.settings() web_settings.setAttribute(QWebEngineSettings.ScrollAnimatorEnabled, True) web_settings.setAttribute(QWebEngineSettings.Accelerated2dCanvasEnabled, True) web_settings.setAttribute(QWebEngineSettings.WebGLEnabled, True) profile = QWebEngineProfile.defaultProfile() profile.setPersistentStoragePath(self.webview_cache_dir) profile.setCachePath(self.webview_cache_dir) self.webview.setZoomFactor(self.scale_rate or 1.0) self.setCentralWidget(self.webview) self.setWindowIcon(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico'))) self.tray_icon = QSystemTrayIcon(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')), self) self.tray_icon_update_timer = QTimer(self) self.tray_icon_update_timer.timeout.connect(self.on_tray_icon_update) self.tray_icon_update_timer.start(1750) self.tray_icon.activated.connect(self.on_tray_icon_activated) self.tray_menu = QMenu() action_list = [ ['调试:全部打开必应搜索', self.on_debug_all_users_open_home], ['调试:全部打开新标签页', self.on_debug_all_users_open_none], ['调试:打印环境变量信息', self.on_debug_print_environment], ['调试:打印模块搜索路径', self.on_debug_print_module_path], ['调试:重新加载外部插件', self.on_debug_reload_external_plugins], ['重载面板', self.webview.reload], ['显示窗口', self.window_show], ['退出', self.exit] ] for action_item in action_list: action = QAction(action_item[0], self) action.triggered.connect(action_item[1]) self.tray_menu.addAction(action) self.tray_icon.setContextMenu(self.tray_menu) self.tray_icon.show() self.init() def closeEvent(self, event): event.ignore() if (self.isVisible()) == 1: self.hide() else: print('Terminating...', file=sys.stderr) self.exit() def init(self): self.show() self.window_position_reset() self.runner.web_server.browser_manager.update_geometry_config( screen_w=round(self.screen_w / self.scale_rate), screen_h=round(self.screen_h / self.scale_rate), window_w=round(self.geometry().width() / self.scale_rate), window_h=round(self.geometry().height() / self.scale_rate) + 38, window_x=self.pos().x(), window_y=self.pos().y() ) self.webview.load(QUrl('http://%s:%s/' % ('127.0.0.1' if self.web_listen_host == '0.0.0.0' else self.web_listen_host, self.web_listen_port))) def exit(self): self.webview.deleteLater() self.runner.handle_interrupt() QApplication.quit() def window_show(self): self.showNormal() self.window_position_reset() def window_hide(self): self.hide() def window_toggle(self): self.window_hide() if self.isVisible() else self.window_show() def window_position_reset(self): self.setGeometry(self.window_x, self.window_y + (self.frameGeometry().height() - self.geometry().height()), self.window_w, self.window_h) def on_tray_icon_update(self): self.tray_icon.setToolTip('活跃用户:%s/%s' % (len(self.runner.web_server.browser_manager.user_ids_online()), len(self.runner.web_server.browser_manager.user_ids()))) def on_tray_icon_activated(self, reason): if (reason == QSystemTrayIcon.DoubleClick) == 1: self.window_toggle() def on_debug_all_users_open_home(self): for user_id in self.runner.web_server.browser_manager.user_ids_online(): driver = self.runner.web_server.browser_manager.user_running[user_id].driver driver and threading.Thread(target=driver.open, args=('https://www.bing.com/',)).start() def on_debug_all_users_open_none(self): for user_id in self.runner.web_server.browser_manager.user_ids_online(): driver = self.runner.web_server.browser_manager.user_running[user_id].driver driver and threading.Thread(target=driver.open, args=('chrome://new-tab-page',)).start() def on_debug_print_environment(self): self.showNormal() QMessageBox.information(self, '提示', '\n'.join(['%s=%s' % (key, value) for key, value in os.environ.items()])) def on_debug_print_module_path(self): self.showNormal() QMessageBox.information(self, '提示', '\n'.join(['%s' % (value,) for value in sys.path])) def on_debug_reload_external_plugins(self): self.runner.web_server.browser_manager.load_plugins_from_external_module() self.webview.reload() class WebEngineViewUrlRequestInterceptor(QWebEngineUrlRequestInterceptor): def interceptRequest(self, info): url = info.requestUrl().toString() url.startswith('file://') and info.redirect(QUrl('')) webEngineViewUrlRequestInterceptor = WebEngineViewUrlRequestInterceptor() class WebEngineView(QWebEngineView): def __init__(self, parent=None): super().__init__(parent) self.setContextMenuPolicy(0) self.setAcceptDrops(True) self.settings().setFontFamily(self.settings().StandardFont, 'Microsoft YaHei') self.page().profile().downloadRequested.connect(self.onDownloadRequested) self.page().profile().setUrlRequestInterceptor(webEngineViewUrlRequestInterceptor) @staticmethod def onDownloadRequested(download): download.accept() def createWindow(self, wt): return self class WebServerAPIJSONData(BaseModel): data: dict = None class WebServer: def __init__(self, root: str, data: str, default_plugins=None): self.app = FastAPI() self.www = os.path.join(root, 'www') self.upload_dir = os.path.join(data, 'upload') self.browser_manager = BrowserManager( driver=os.path.join(root, 'Chrome/chromedriver.exe'), binary=os.path.join(root, 'Chrome/chrome.exe'), manager_data_file=os.path.join(data, 'manager.json'), browser_data_home=os.path.join(data, 'users'), browser_init_home=os.path.join(root, 'initialize') ) self.browser_manager.load_plugins(default_plugins, is_external=0) self.browser_manager.load_plugins_from_external_module() @self.app.api_route(methods=['POST'], path='/plugin_run') def api_plugin_run(data: WebServerAPIJSONData): try: self.browser_manager.plugin_run(data.data['user_id'], data.data['plugin_id'], data.data['requirements']) return self.message(0) except Exception as e: return self.message(1, '%s' % (e,)) @self.app.api_route(methods=['POST'], path='/plugin_shutdown') def api_plugin_shutdown(data: WebServerAPIJSONData): try: self.browser_manager.plugin_shutdown(data.data['user_id']) return self.message(0) except Exception as e: return self.message(1, '%s' % (e,)) @self.app.api_route(methods=['POST'], path='/upload_files') def api_upload_files(files: List[UploadFile] = File(...)): if (len(files) > 25) == 1: return JSONResponse(status_code=200, content=self.message(1, 'Exceeded the maximum number of files allowed of 25.')) if (sum(file.size for file in files) > 50 * 1024 * 1024) == 1: return JSONResponse(status_code=200, content=self.message(1, 'Total file size exceeds the limit of 50MB.')) else: uploaded_files = [] for file in files: file_save_path = os.path.join(self.upload_dir, '%s_%s' % (int(time.time() * 1000), file.filename)) with open(file_save_path, 'wb') as f: f.write(file.file.read()) uploaded_files.append({'filename': file.filename, 'size': file.size, 'type': file.content_type, 'path': file_save_path}) return JSONResponse(status_code=200, content=self.message(0, '', uploaded_files)) @self.app.api_route(methods=['GET', 'POST'], path='/user_all_details') def api_user_all_details(): return self.browser_manager.user_all_details() @self.app.api_route(methods=['POST'], path='/shutdown') def api_shutdown(): try: for user_id in self.browser_manager.user_ids_online(): self.browser_manager.user_die(user_id) return self.message(0) except Exception as e: return self.message(1, '%s' % (e,)) @self.app.api_route(methods=['POST'], path='/user_set_name') def api_user_set_name(data: WebServerAPIJSONData): try: self.browser_manager.user_set_name(data.data['user_id'], data.data['user_name']) return self.message(0) except Exception as e: return self.message(1, '%s' % (e,)) @self.app.api_route(methods=['POST'], path='/user_add') def api_user_add(data: WebServerAPIJSONData): try: self.browser_manager.user_add(data.data['user_id']) return self.message(0) except Exception as e: return self.message(1, '%s' % (e,)) @self.app.api_route(methods=['POST'], path='/user_del') def api_user_del(data: WebServerAPIJSONData): try: self.browser_manager.user_del(data.data['user_id']) return self.message(0) except Exception as e: return self.message(1, '%s' % (e,)) @self.app.api_route(methods=['POST'], path='/user_run') def api_user_run(data: WebServerAPIJSONData): try: self.browser_manager.user_run(data.data['user_id']) return self.message(0) except Exception as e: return self.message(1, '%s' % (e,)) @self.app.api_route(methods=['POST'], path='/user_die') def api_user_die(data: WebServerAPIJSONData): try: self.browser_manager.user_die(data.data['user_id']) return self.message(0) except Exception as e: return self.message(1, '%s' % (e,)) @self.app.api_route(methods=['POST'], path='/user_focus_window') def api_user_focus_window(data: WebServerAPIJSONData): try: self.browser_manager.user_focus_window(data.data['user_id']) return self.message(0) except FunctionTimedOut: return self.message(0) except Exception as e: return self.message(1, '%s' % (e,)) @self.app.api_route(methods=['GET'], path='/') def index(): return self.statics(os.path.join(self.www, 'index.html')) @self.app.api_route(methods=['GET'], path='/200') def state(): return Response(content='200', media_type='text/plain') @self.app.api_route(methods=['GET'], path='/{filename:path}') def www(filename): return self.statics(os.path.join(self.www, filename)) @staticmethod def message(code: int, info: str = '', data=None): return {'code': int(code), 'info': info, 'data': data} @staticmethod def statics(file: str): if (not os.path.isfile(file)) == 1: raise HTTPException(status_code=404) return FileResponse(file) def handle_interrupt(self): self.browser_manager.handle_interrupt() self.upload_clear() def upload_clear(self): dirs = self.upload_dir if (os.path.exists(dirs)) == 1: for filename in os.listdir(dirs): filepath = os.path.join(dirs, filename) try: os.path.isfile(filepath) and os.unlink(filepath) except Exception: pass def run(self, host='127.0.0.1', port=8080): os.path.exists(self.www) or os.makedirs(self.www) os.path.exists(self.upload_dir) or os.makedirs(self.upload_dir) self.upload_clear() uvicorn.run(self.app, host=host, port=port) class MainRunner: def __init__(self): signal.signal(signal.SIGINT, self._handle_interrupt) self.app_root = os.path.dirname(__file__) appdata = os.getenv('APPDATA') self.app_data = os.path.join(appdata, 'Galactic') if appdata else os.path.join(os.path.dirname(__file__), 'data') self.app_name = 'Galactic' self.app_version = '1.0.0.0' self.web_server_host = '127.0.0.1' self.web_server_port = 8095 self.web_server = None self.web_server_thread = None self.application = None self.application_scale_rate = None self.window = None self.plugin_list = [ BrowserPluginFileTest, BrowserPluginTextTest, BrowserPluginMultFileTest, BrowserPluginMultTextTest, BrowserPluginFileCommandDebug, BrowserPluginTextCommandDebug ] def _copy_files_and_directories(self, src, dst): function_name = inspect.currentframe().f_code.co_name if (os.path.exists(src)) != 1: return None if (os.path.isdir(src)) == 1: if not os.path.exists(dst): os.makedirs(dst) for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): self.__getattribute__(function_name)(s, d) else: shutil.copy(s, d) else: shutil.copy(src, dst) def run(self): os.path.exists(self.app_data) or os.makedirs(self.app_data) self.web_server = WebServer(root=self.app_root, data=self.app_data, default_plugins=self.plugin_list) self.web_server_thread = threading.Thread(target=self.web_server.run, kwargs={'host': self.web_server_host, 'port': self.web_server_port}) self.web_server_thread.daemon = True self.web_server_thread.start() self.application = QApplication(sys.argv) self.application.setAttribute(Qt.AA_EnableHighDpiScaling, True) self.application.setAttribute(Qt.AA_UseHighDpiPixmaps, True) self.application.setHighDpiScaleFactorRoundingPolicy(Qt.HighDpiScaleFactorRoundingPolicy.PassThrough) self.application_scale_rate = self.application.screens()[0].logicalDotsPerInch() / 96 self.window = MainWindow(runner=self, app_name=self.app_name, app_version=self.app_version, scale_rate=self.application_scale_rate, web_listen_host=self.web_server_host, web_listen_port=self.web_server_port) sys.exit(self.application.exec_()) def build(self): if (str(__file__).endswith('.py')) == 0: print('Build is not currently supported.', file=sys.stderr) exit(1) from_home = os.path.dirname(__file__) dist_home = os.path.join(os.path.dirname(__file__), '%s.dist' % (os.path.splitext(os.path.basename(__file__))[0],)) ask = input('%s %s: ' % ('Build?', '[Y/n]')) if ask.lower().strip() == 'y': subprocess.run(['pip', 'install', 'nuitka', '-q'], shell=True, env=os.environ) subprocess.run([ 'python', '-m', 'nuitka', '--standalone', '--enable-plugin=pyqt5', '--enable-plugin=pylint-warnings', '--include-module=PyQt5', '--include-module=selenium', '--include-module=fastapi', '--include-module=pydantic', '--include-module=starlette', '--windows-console-mode=disable', '--windows-icon-from-ico=favicon.ico', '--product-name=%s' % (self.app_name,), '--file-description=%s' % (self.app_name,), '--product-version=%s' % (self.app_version,), '--copyright=Copyright (C) 2025', '--output-dir=%s' % (os.path.join(os.path.dirname(__file__)),), '%s' % (__file__,) ], shell=True, env=os.environ) for i in ['Packages', 'PyQt5', 'Chrome', 'initialize', 'www', 'favicon.ico']: self._copy_files_and_directories('%s/%s' % (from_home, i), '%s/%s' % (dist_home, i)) else: if (not os.path.exists(dist_home)) == 1: return None ask = input('%s %s: ' % ('Compile setup program?', '[Y/n]')) if ask.lower().strip() == 'y': compile_file = os.path.join(os.path.dirname(__file__), '%s.iss' % (os.path.splitext(os.path.basename(__file__))[0],)) compile_template = os.path.join(os.path.dirname(__file__), 'Compile.iss.template') compiler = 'C:\\Program Files (x86)\\Inno Setup 6\\ISCC.exe' if (os.path.exists(compile_template)) != 1: print('The template file \"%s\" does not exist.' % (compile_template,), file=sys.stderr) return None if (os.path.exists(compiler)) != 1: print('The compiler \"%s\" does not exist. Please check if Inno Setup is installed. You can download it at https://www.innosetup.com/' % (compiler,), file=sys.stderr) return None Path(compile_file).write_text( Path(compile_template).read_text().replace('%APPNAME%', self.app_name).replace('%APPEXEC%', os.path.splitext(os.path.basename(__file__))[0]).replace('%APPVERSION%', self.app_version)) subprocess.run([compiler, compile_file]) def _handle_interrupt(self, _signal, _frame): self.handle_interrupt() def handle_interrupt(self): try: self.web_server and self.web_server.handle_interrupt() except Exception as e: print(e, file=sys.stderr) if __name__ == '__main__': if (os.path.basename(__file__).lower().endswith('.int')) == 1: QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins')) else: QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'PyQt5/Qt5/plugins')) f_lock = open(file=os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16]), mode='w', encoding='utf-8') if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1: app = QApplication(sys.argv) msg = QMessageBox() msg.setIcon(QMessageBox.Warning) msg.setText('The application is already running.') msg.setWindowTitle('Warning') msg.setStandardButtons(QMessageBox.Cancel) msg.exec_() app.exit(1) sys.exit(1) else: MainRunner().run() if (len(sys.argv) > 1 and sys.argv[1] == '--build') == 0 else MainRunner().build()