Galactic/Galactic.py

2853 lines
107 KiB
Python

import os
import re
import sys
import glob
import json
import time
import ctypes
import base64
import signal
import random
import shutil
import socket
import psutil
import inspect
import uvicorn
import zipfile
import hashlib
import asyncio
import tempfile
import platform
import win32gui
import win32process
import win32con
import requests
import requests.adapters
import importlib.util
import threading
import subprocess
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from seleniumwire.webdriver import InspectRequestsMixin, DriverCommonMixin
from seleniumwire.request import Request as SeleniumWireRequest
from seleniumwire.request import Response as SeleniumWireResponse
from selenium.webdriver.common.by import By
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.remote.command import Command
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import NoSuchWindowException
from PyQt5.QtWebEngineWidgets import QWebEngineView, QWebEngineSettings, QWebEngineProfile
from PyQt5.QtWebEngineCore import QWebEngineUrlRequestInterceptor
from PyQt5.QtNetwork import QNetworkProxyFactory
from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox, QSystemTrayIcon, QMenu, QAction, QDesktopWidget, QShortcut, QFileDialog
from PyQt5.QtCore import Qt, QCoreApplication, QTimer, QUrl
from PyQt5.QtGui import QIcon, QKeySequence
from typing import List
from fastapi import FastAPI, Response, WebSocket, UploadFile, File, HTTPException
from pydantic import BaseModel
from starlette.responses import JSONResponse, FileResponse
from starlette.websockets import WebSocketDisconnect
from winotify import Notification, audio
from func_timeout import func_set_timeout, FunctionTimedOut
from pathlib import Path
from urllib.parse import urlparse
def _fd(f):
return f.fileno() if hasattr(f, 'fileno') else f
if os.name == 'nt':
import msvcrt
from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref)
from ctypes.wintypes import BOOL, DWORD, HANDLE
LOCK_SH = 0x0
LOCK_NB = 0x1
LOCK_EX = 0x2
LOCK_UN = 0x9
if sizeof(c_ulong) != sizeof(c_void_p):
ULONG_PTR = c_int64
else:
ULONG_PTR = c_ulong
PVOID = c_void_p
class _OFFSET(Structure):
_fields_ = [
('Offset', DWORD),
('OffsetHigh', DWORD)
]
class _OFFSET_UNION(Union):
_fields_ = [
('_offset', _OFFSET),
('Pointer', PVOID)
]
_anonymous_ = ['_offset']
class OVERLAPPED(Structure):
_fields_ = [
('Internal', ULONG_PTR),
('InternalHigh', ULONG_PTR),
('_offset_union', _OFFSET_UNION),
('hEvent', HANDLE)
]
_anonymous_ = ['_offset_union']
LPOVERLAPPED = POINTER(OVERLAPPED)
LockFileEx = windll.kernel32.LockFileEx
LockFileEx.restype = BOOL
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
UnlockFileEx = windll.kernel32.UnlockFileEx
UnlockFileEx.restype = BOOL
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
def flock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
if flags == LOCK_UN:
ret = UnlockFileEx(
hfile,
0,
0,
0xFFFF0000,
byref(overlapped)
)
else:
ret = LockFileEx(
hfile,
flags,
0,
0,
0xFFFF0000,
byref(overlapped)
)
return bool(ret)
else:
try:
import fcntl
LOCK_SH = fcntl.LOCK_SH
LOCK_NB = fcntl.LOCK_NB
LOCK_EX = fcntl.LOCK_EX
LOCK_UN = fcntl.LOCK_UN
except (ImportError, AttributeError):
LOCK_EX = LOCK_SH = LOCK_NB = 0
def flock(f, flags):
return flags == LOCK_UN
else:
def flock(f, flags):
return fcntl.flock(_fd(f), flags) == 0
def notification_send(app_id=None, title='', message='', reminder=None):
app_id = app_id() if callable(app_id) else app_id
notify = Notification(app_id=app_id, title=title, msg=message, icon=os.path.join(os.path.dirname(__file__), 'favicon.ico'))
notify.set_audio(audio.IM, bool(reminder))
notify.show()
def import_module(file: str):
spec = importlib.util.spec_from_file_location(os.path.splitext(os.path.basename(file))[0], file)
module_from_spec = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module_from_spec)
return module_from_spec
def calculate_execution_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time_ms = (end_time - start_time) * 1000
print('Function \'{}\' with args {} and kwargs {} took {:.3f} ms to execute.'.format(func.__name__, args, kwargs, execution_time_ms), file=sys.stderr)
return result
return wrapper
class BrowserMobileEmulation(dict):
"""
Mobile emulation parameters.
"""
def __init__(self, w=540, h=960, user_agent=None):
du = base64.b64decode(bytes('''
TW96aWxsYS81LjAgKExpbnV4OyBVOyBBbmRyb2lkIDEzOyB6aC1jbjsgMjEwOTEx
OUJDIEJ1aWxkL1RLUTEuMjIwODI5LjAwMikgQXBwbGVXZWJLaXQvNTM3LjM2IChL
SFRNTCwgbGlrZSBHZWNrbykgVmVyc2lvbi80LjAgQ2hyb21lLzk4LjAuNDc1OC4x
MDIgTVFRQnJvd3Nlci8xMy42IE1vYmlsZSBTYWZhcmkvNTM3LjM2
''', encoding='utf-8')).decode()
user_agent = user_agent or du
super().__init__({'w': w, 'h': h, 'user_agent': user_agent})
self.w = self.h = self.user_agent = None
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
class CustomHTTPAdapter(requests.adapters.HTTPAdapter):
def __init__(self, *args, **kwargs):
self.urlparse = urlparse
self.hosts = {}
self.addrs = {}
super().__init__(*args, **kwargs)
@staticmethod
def resolve_host(host):
try:
hosts = requests.get('http://119.29.29.29/d?dn=%s&ip=208.67.222.222' % host).text.replace(',', ';').split(';')
except (requests.exceptions.RequestException, requests.exceptions.ConnectTimeout):
hosts = []
return hosts[0] if len(hosts) > 0 else None
def send(self, request, **kwargs):
req = request
connection_pool_kwargs = self.poolmanager.connection_pool_kw
url_resolve = self.urlparse(req.url)
scheme = url_resolve.scheme
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
ip_address = self.resolve_host(domain)
if ip_address:
self.hosts[domain] = ip_address
self.addrs[ip_address] = domain
req.url = req.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.hosts[domain], addition_port))
if scheme == 'https':
connection_pool_kwargs['assert_hostname'] = domain
connection_pool_kwargs['server_hostname'] = domain
req.headers['Host'] = '%s%s' % (domain, addition_port)
return super().send(req, **kwargs)
def build_response(self, *args, **kwargs):
res = super().build_response(*args, **kwargs)
url_resolve = self.urlparse(res.url)
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
if domain in self.addrs.keys():
res.url = res.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.addrs[domain], addition_port))
return res
class BrowserPathManager:
def __init__(self):
self.webdriver_install_location = tempfile.gettempdir()
@staticmethod
def resolve_browser_version(file: str):
if not os.path.exists(file):
raise FileNotFoundError('The executable file does not exist in %s' % file)
if file.lower().endswith('.exe'):
try:
full_version = subprocess.run(
['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % file],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=5
).stdout.decode('utf-8').strip()
except Exception:
full_version = ''
try:
main_version = full_version.split('.')[0]
except Exception:
main_version = ''
else:
try:
full_version = subprocess.run(
'%s --version' % file,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=5
).stdout.decode('utf-8').strip()
full_version = re.findall('[0-9]+[.\\d+]+', full_version)[-1]
except Exception:
full_version = ''
try:
main_version = full_version.split('.')[0]
except Exception:
main_version = ''
return file, main_version, full_version
@staticmethod
def open_remote_resources(url: str, save_file: str = None, auto_redirects=False, retries=3):
http = requests.Session()
for scheme in ['http://', 'https://']:
http.mount(scheme, CustomHTTPAdapter())
for i in range((retries if retries > 0 else 0) + 1):
try:
with http.get(
url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
},
allow_redirects=auto_redirects,
stream=(save_file is not None)
) as response:
if save_file:
if 200 != response.status_code:
return bool(0)
with open(save_file, 'wb') as filestream:
for chunk in response.iter_content(chunk_size=8192):
filestream.write(chunk)
return bool(1)
else:
if 200 != response.status_code:
return ''
else:
return response.text
except requests.exceptions.ConnectionError:
retries > 0 and time.sleep(0.75 + round(random.random(), 2))
continue
@staticmethod
def find_binary():
plat = sys.platform
find_list = []
plats = ['win32', 'linux', 'darwin']
if plat == plats[0]:
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']:
find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/'))
if plat == plats[1]:
for p in ['/opt/google/chrome', '/usr/bin/google-chrome']:
find_list.append('%s/chrome' % p)
if plat == plats[2]:
for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']:
find_list.append('%s/chrome' % p)
for execute_file in find_list:
try:
if os.path.exists(execute_file):
return execute_file
except Exception:
pass
def find_driver(self, main_version: str):
if not int(main_version) >= 70:
return None
location = '%s%s%s' % (
self.webdriver_install_location,
os.sep,
'chromedriver_%s%s' % (str(main_version), '.exe' if platform.system().lower() == 'windows' else '')
)
return location.replace("\\", '/') if os.path.exists(location) else None
def pull_driver(self, main_version: str):
if not int(main_version) >= 70:
return None
chromedriver_site = 'https://chromedriver.storage.googleapis.com'
latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version))
if '' == latest_release:
return None
plat = sys.platform
match_assets = []
plats = ['win32', 'linux', 'darwin']
child = ['chromedriver.exe', 'chromedriver']
tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1']
if plat == plats[0]:
match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]])
if plat == plats[1]:
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]])
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]])
package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip')
destdir_chromedriver = self.webdriver_install_location
for assets in match_assets:
res_url = '%s/%s/%s' % (chromedriver_site, latest_release, assets[1])
print('Downloading version %s chromedriver %s to %s...' % (latest_release, res_url, destdir_chromedriver), file=sys.stderr)
if self.open_remote_resources(res_url, package_chromedriver):
dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], destdir_chromedriver)
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version))
os.path.exists(dist_chan) and os.remove(dist_chan)
os.rename(dist, dist_chan)
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
os.remove(package_chromedriver)
return dist_chan.replace("\\", '/')
def main(self, binary: str = None, driver: str = None):
binary = binary or self.find_binary()
if not binary:
raise FileNotFoundError('No browser executable file is found on your system, please confirm whether it has been installed')
if not os.path.exists(binary):
raise FileNotFoundError('The executable file does not exist in %s' % binary)
version = self.resolve_browser_version(binary)
if not version:
raise RuntimeError('Failure to get the browser version number failed in %s' % binary)
driver = driver if driver else self.find_driver(version[1])
driver = driver if driver else self.pull_driver(version[1])
if not driver:
raise FileNotFoundError('Not specified the driver path, and try the automatic download failure')
if not os.path.exists(driver):
raise FileNotFoundError('The driver does not exist in %s' % driver)
return binary, driver
class SeleniumClear:
def __init__(self):
self.last = '%s/.selenium_clear_last' % tempfile.gettempdir()
@staticmethod
def clear_selenium():
if platform.uname().system.lower() == 'windows':
user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')]
if user_home[0] and user_home[1]:
try:
shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1]))
except FileNotFoundError:
pass
@staticmethod
def clear_driver_cache():
for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']:
for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)):
try:
shutil.rmtree(i)
except (FileNotFoundError, PermissionError, WindowsError):
pass
@staticmethod
def file_get_contents(file, text=None):
if not os.path.exists(file):
return text
return open(file=file, mode='r', encoding='utf-8').read()
@staticmethod
def file_put_contents(file, text=None):
return open(file=file, mode='w', encoding='utf-8').write(text)
def straight_clear(self):
self.clear_selenium()
self.clear_driver_cache()
self.file_put_contents(self.last, str(int(time.time())))
def auto(self):
try:
int(self.file_get_contents(self.last, '0')) + 86400 < int(time.time()) and self.straight_clear()
except ValueError:
os.remove(self.last)
class PositionTab:
"""
Position for switch tab.
"""
Prev = 'Go-Prev'
Next = 'Go-Next'
class ColorUtils:
"""
Color utils.
"""
@staticmethod
def hex2rgb(color):
color = color[1:].upper()
for x in color:
if x not in '0123456789ABCDEF':
raise Exception('Found invalid hexa character {0}.'.format(x))
if len(color) == 6 or len(color) == 8:
color = '#' + color[0:6]
elif len(color) == 3:
color = '#' + color[0] * 2 + color[1] * 2 + color[2] * 2
else:
raise Exception('Hexa string should be 3, 6 or 8 digits. if 8 digits, last 2 are ignored.')
hexcolor = color[1:]
r, g, b = int(hexcolor[0:2], 16), int(hexcolor[2:4], 16), int(hexcolor[4:6], 16)
return r, g, b
class FetchResult:
def __init__(self, data):
self.status = 0
self.header = {}
self.result = ''
data and self._resolve_data(data)
def _resolve_data(self, data: dict):
self.status = data['status']
self.header = data['header']
self.result = data['result']
def __str__(self):
return '%s: %s\n%s: %s\n%s: %s' % ('STATUS', self.status, 'HEADER', self.header, 'RESULT', self.result)
class EmptyMethod:
@classmethod
def no_method(cls, *args, **kwargs):
pass
def __setitem__(self, key, value):
pass
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
return self.no_method
def __getattr__(self, item):
return self.no_method
class BrowserService(Service):
def __del__(self):
pass
def is_connectable(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.05)
try:
return s.connect_ex(('127.0.0.1', self.port)) == 0
except socket.timeout:
return False
finally:
s.close()
@property
def service_url(self):
return 'http://%s:%s' % ('127.0.0.1', self.port)
class BrowserOptions(Options):
def __del__(self):
pass
class Browser(InspectRequestsMixin, DriverCommonMixin, Chrome):
"""
Browser web driver.
"""
def __init__(
self,
driver: str = None,
binary: str = None,
driver_classes: int = 0,
debugger_address: str = None,
backends_address: str = None,
headless: bool = False,
lang: str = None,
mute: bool = False,
no_images: bool = False,
user_agent: str = None,
http_proxy: str = None,
home: str = None,
window_size: str = None,
window_site: str = None,
mobile_emulation: BrowserMobileEmulation = None,
option_arguments: list = None,
req_interceptor=None,
res_interceptor=None,
):
self.exited = None
self.is_linux = sys.platform.startswith('linux')
SeleniumClear().auto()
# binary, driver = BrowserPathManager().main(binary, driver)
if self.is_linux is bool(1) and not window_size: window_size = '1920x1080'
if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080'
# Initialization settings.
if (isinstance(option_arguments, list)) is bool(0): option_arguments = []
cdplist = []
service = BrowserService()
options = BrowserOptions()
self.cdplist = cdplist
# Delete prompt information of chrome being controlled.
exclude_switches = ['enable-automation', 'enable-logging', 'disable-translate']
options.add_experimental_option('debuggerAddress', debugger_address) if debugger_address else options.add_experimental_option('excludeSwitches', exclude_switches)
# Mobile emulation parameter setting start.
if mobile_emulation and not debugger_address:
self.w_browser = mobile_emulation.w + 14
self.h_browser = mobile_emulation.h + 0
self.w_inner_window = mobile_emulation.w + 0
self.h_inner_window = mobile_emulation.h - 86
self.mobile_emulation_screen_w = mobile_emulation.w
self.mobile_emulation_screen_h = mobile_emulation.h
window_size = '%s,%s' % (self.w_browser, self.h_browser)
options.add_experimental_option(
'mobileEmulation', {
'deviceMetrics': {
'width': self.w_inner_window,
'height': self.h_inner_window,
'pixelRatio': 2.75,
'touch': True
},
'userAgent': mobile_emulation.user_agent
}
)
cdplist.append([
'Emulation.setUserAgentOverride', {
'userAgent': mobile_emulation.user_agent,
'userAgentMetadata': {
'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone',
'mobile': True,
'platformVersion': '',
'architecture': '',
'model': ''
}
}
])
else:
self.w_browser = 0
self.h_browser = 0
self.w_inner_window = 0
self.h_inner_window = 0
self.mobile_emulation_screen_w = 0
self.mobile_emulation_screen_h = 0
# Mobile emulation parameter setting end.
# Set browser and webdriver path.
if driver:
service.path = driver
if binary:
options.binary_location = binary
# Add webdriver option arguments.
for i in option_arguments:
options.add_argument(i)
# Set headless mode.
if self.is_linux or headless:
options.add_argument('--headless=new')
# Set no-sandbox mode.
if self.is_linux:
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
# Set language of browser, default is zh-CN.
if lang:
options.add_argument('--lang=%s' % (lang or 'zh-CN'))
hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN')
# Set mute.
if mute:
options.add_argument('--mute-audio=true')
hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr)
# Set no images mode.
if no_images:
options.add_argument('--blink-settings=imagesEnabled=false')
hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr)
# Set default user agent.
if user_agent:
options.add_argument('--user-agent=%s' % user_agent)
hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent)
# Set http proxy for browser.
if http_proxy:
options.add_argument('--proxy-server=http://%s' % http_proxy)
# Set browser window size before startup.
if window_size:
debugger_address or options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ','))
else:
debugger_address or options.add_argument('--start-maximized')
# Start the backend and optimize it.
if (driver_classes == 1 and backends_address is not None) == 1:
for key, value in self._setup_backend({'addr': str(backends_address.split(':')[0]), 'port': int(backends_address.split(':')[1])}).items():
options.set_capability(key, value)
try:
self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.')
self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.')
self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.')
except AttributeError:
pass
else:
self.backend = EmptyMethod()
# Start the browser.
super().__init__(service=service, options=options)
if mobile_emulation:
cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}])
cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}])
cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}])
# Set the request and response interceptor.
if req_interceptor:
try:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.request_interceptor = req_interceptor
except AttributeError:
pass
if res_interceptor:
try:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.response_interceptor = res_interceptor
except AttributeError:
pass
# Sync set http proxy for Selenium-Wire backend.
if http_proxy:
self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy}
# Set browser window size after startup, by default, there will be full screen display window.
if window_size:
try:
self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(','))
except Exception:
pass
# else:
# self.maximize_window()
if window_site:
try:
self.set_window_position(*window_site.replace("\x20", '').split(','))
except Exception:
pass
# Sets a sticky timeout to implicitly wait for an element to be found.
self.implicitly_wait(10)
# Set the amount of time to wait for a page load to complete.
self.set_page_load_timeout(25)
# Open the default page.
home and self.open(home)
@property
def title(self) -> str:
"""
Return current title.
"""
if self.exited:
return ''
self._update_tab_auto_switch()
if (self.current_alert is None) == 1:
return super().title
else:
return ''
@property
def current_url(self) -> str:
"""
Return current url.
"""
if self.exited:
return ''
self._update_tab_auto_switch()
if (self.current_alert is None) == 1:
return super().current_url
else:
return ''
@property
def current_alert(self):
"""
Return current alert object.
"""
if self.exited:
return None
try:
alert = Alert(self)
self.execute(Command.W3C_GET_ALERT_TEXT)
return alert
except Exception:
return None
@property
def current_alert_text(self) -> str:
"""
Return current alert content.
"""
if self.exited:
return ''
try:
return Alert(self).text
except Exception:
return ''
@property
def window_titles(self):
"""
Return to title list.
"""
result = []
try:
targets = self.execute_cdp_cmd('Target.getTargets', {})
for target in targets['targetInfos']:
if (target['type'] == 'page') == 1:
result.append(target['title'])
except Exception:
pass
return result[::-1]
@property
def window_urls(self):
"""
Return to url list.
"""
result = []
try:
targets = self.execute_cdp_cmd('Target.getTargets', {})
for target in targets['targetInfos']:
if (target['type'] == 'page') == 1:
result.append(target['url'])
except Exception:
pass
return result[::-1]
@property
def window_handles(self):
"""
Return to window handle list.
"""
result = []
try:
targets = self.execute_cdp_cmd('Target.getTargets', {})
for target in targets['targetInfos']:
if (target['type'] == 'page') == 1:
result.append(target['targetId'])
except Exception:
pass
return result[::-1]
@property
def window_inner_size(self) -> tuple:
"""
Return the page window inner size.
"""
size = self.execute_script('return [window.innerWidth, window.innerHeight];')
return size[0], size[1]
def open(self, url=None):
"""
Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer.
"""
if self.exited:
return None
self._update_tab_auto_switch()
self._update_cdp_command()
return self.get(url)
def turn(self, url=None):
"""
Simulation "window.location.href" jumps, the new page has Referrer.
"""
if self.exited:
return None
return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None)
def wait(self, secs: int | float = 1):
"""
Will sleep waiting.
"""
number_int = int(secs)
number_float = secs - number_int
try:
for i in range(number_int):
time.sleep(1)
else:
time.sleep(number_float)
except (KeyboardInterrupt, InterruptedError):
print('Interrupted', file=sys.stderr)
self.quit()
def quit(self):
"""
Exit the browser.
"""
if self.exited:
return None
self.exited = True
try:
tab_handles = list(self.window_handles)
tab_current = self.current_window_handle
tab_current in tab_handles and tab_handles.remove(tab_current)
tab_handles.append(tab_current)
for tab in tab_handles:
try:
self.switch_to.window(tab)
for _ in range(5):
try:
self.current_alert.dismiss()
except Exception:
break
self.close()
except NoSuchWindowException:
pass
except Exception:
pass
try:
super().quit()
except Exception:
pass
def quit_backend(self):
"""
Exit the browser backend.
"""
if self.exited:
return None
self.exited = True
try:
super().quit()
except Exception:
pass
def find(self, path, wait_for=False, timeout: float = 5.0, freq: float = 0.5, delay: float = 0.0) -> WebElement:
"""
Use XPath to find an element.
"""
element = self.webdriver_wait(timeout, freq).until(EC.presence_of_element_located((By.XPATH, path))) if wait_for else self.find_element(By.XPATH, path)
delay and self.wait(delay)
element = self.find_element(By.XPATH, path) if delay else element
self._element_highlight(element, '#F8BE5F')
return element
def find_mult(self, path) -> list:
"""
Use XPath to find elements.
"""
element = self.find_elements(By.XPATH, path)
for this_element in element: self._element_highlight(this_element, '#F8BE5F')
return element
def find_mult_random_choice(self, path) -> WebElement:
"""
Use XPath to find elements then random choice one.
"""
element = self.find_elements(By.XPATH, path)
element = random.choice(element)
self._element_highlight(element, '#F8BE5F')
return element
def find_element_by(self, sentence):
"""
Custom find element, pass into a tuple or list.
"""
element = self.find_element(*sentence)
self._element_highlight(element, '#F8BE5F')
return element
def click(self, element):
"""
Click element.
"""
self._element_highlight(element, '#FF0000')
element.click()
def click_simulate(self, element):
"""
Click element for simulate.
"""
self._element_click_effect(element)
self.action_chains().reset_actions()
self.action_chains().click(element).perform()
self.wait(0.1)
def touch(self, x, y):
"""
Click on the coordinate.
"""
self.action_chains().reset_actions()
self.action_chains().move_by_offset(x, y).click().perform()
self.wait(0.1)
def input(self, element, content):
"""
Enter the content to the element.
"""
self._element_highlight(element, '#00B6F1')
self.action_chains().reset_actions()
self.action_chains().send_keys_to_element(element, content).perform()
self.wait(0.1)
def mouse(self, element):
"""
Park the mouse here.
"""
self._element_highlight(element, '#49DC07')
self.action_chains().reset_actions()
self.action_chains().move_to_element(element).perform()
def scroll(self):
"""
Scroll page.
"""
self.action_chains().reset_actions()
self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform()
self.wait(0.8)
def scroll_to(self, pos: int | str):
"""
Scroll to the specified location.
"""
if isinstance(pos, int) and pos > 0:
self.execute_script('window.scrollTo(0, arguments[0]);', pos)
elif pos == 0:
self.execute_script('window.scrollTo(0, 0);')
elif pos == 0 - 1:
self.execute_script('window.scrollTo(0, document.body.scrollHeight);')
else:
pass
self.wait(0.8)
def scroll_to_element(self, element):
"""
Scroll to the specified element location.
"""
self.action_chains().reset_actions()
self.action_chains().scroll_to_element(element).perform()
self.wait(0.8)
def fetch(self, url: str, options: dict, cover_options: dict = None):
"""
Sending http requests using fetch.
"""
return FetchResult(self.execute_async_script('''
var _callback = arguments[arguments.length - 1];
var _url = arguments[0];
var _options = arguments[1];
var _data = {};
fetch(_url, _options)
.then((response) => {
_data.status = response.status;
let headers = {};
response.headers.forEach((value, name) => {
headers[name] = value;
});
_data.header = headers;
return response.text();
})
.then((result) => {
_data.result = result;
_callback(_data);
})
.catch((error) => {
console.error(error);
_callback(null);
});
''', url, {**options, **(cover_options or {})}))
def frame_switch_to(self, element_of_frame):
"""
Switch frame to the specified frame element.
"""
self.switch_to.frame(element_of_frame)
self.wait(0.2)
def frame_switch_to_default(self):
"""
Switch to the default frame.
"""
self.switch_to.default_content()
self.wait(0.2)
def tab_active(self):
"""
Activate current tab.
"""
self.switch_to.window(self.current_window_handle)
self.wait(0.2)
self._update_cdp_command()
def tab_create(self, url: str = None):
"""
Create a new tab and open the URL.
"""
self.switch_to.new_window('tab')
self._update_cdp_command()
url and self.open(url)
def tab_switch(self, tab: int | str = None):
"""
Switch the browser tab page.
"""
handles = self.window_handles
lengths = len(handles)
current = handles.index(self.current_window_handle)
if isinstance(tab, int):
handle = tab
elif tab == PositionTab.Prev:
handle = (current - 1)
elif tab == PositionTab.Next:
handle = (current + 1) % lengths
else:
handle = (current + 0)
self.switch_to.window(handles[handle])
self.wait(0.2)
self._update_cdp_command()
def tab_switch_prev(self):
"""
Switch to the previous tab.
"""
self.tab_switch(PositionTab.Prev)
def tab_switch_next(self):
"""
Switch to next tab.
"""
self.tab_switch(PositionTab.Next)
def tab_cancel(self):
"""
Close the current browser tab page.
"""
handles = self.window_handles
if len(handles):
current = handles.index(self.current_window_handle)
self.close()
current > 0 and self.switch_to.window(handles[current - 1])
self.wait(0.2)
def tab_cancel_all(self):
"""
Close all the browser tab page.
"""
handles = self.window_handles
for i in handles:
self.tab_cancel()
def force_display_element(self, element):
"""
Make hidden element visible and interactive.
"""
self.execute_script(
'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element
)
def screenshot(self) -> bytes:
"""
Screenshot as bytes.
"""
return self.get_screenshot_as_png()
def action_chains(self) -> ActionChains:
"""
Return ActionChains object.
"""
return ActionChains(self)
def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None):
"""
Return WebDriverWait object.
"""
return WebDriverWait(
driver=self,
timeout=timeout,
poll_frequency=poll_frequency,
ignored_exceptions=ignored_exceptions
)
def _element_highlight(self, element=None, color='#ff0000', dura=2500):
"""
Make the element highlight.
"""
if not element:
return False
high = ColorUtils.hex2rgb(color)
r = high[0]
g = high[1]
b = high[2]
self.execute_script('''
let e=arguments[0];
try{
let o=[e.style.background||null,e.style.border||null];
e.style.border="1px solid %s";e.style.background="rgba(%s,%s,%s,0.2)";
if(!e.prominent){
e.prominent=true;
setTimeout(function(args){try{args[0].prominent=null;args[0].style.background=args[1][0];args[0].style.border=args[1][1]}catch(e){}},%s,[e,o]);
}
}catch(e){}
''' % (color, r, g, b, dura), element
)
def _element_click_effect(self, element=None, x: int = 0, y: int = 0):
"""
Make a coordinate click effect.
"""
self.execute_script('''
let e=arguments[0];
let r;
let x;
let y;
if(e!==null){
r=e.getBoundingClientRect();
x=r.left+r.width/2+"px";
y=r.top+r.height/2+"px";
}
else{
x=arguments[1]+"px";
y=arguments[2]+"px";
}
let c=document.createElement("div");
c.style="width:%spx;height:%spx;border-radius:50%%;background-color:rgba(255,0,0,0.18);position:absolute;transform:translate(-50%%,-50%%);transition:opacity 0.5s;border:1px solid #ff3c3c;pointer-events:none";
c.style.zIndex=9999;
c.style.left=x;c.style.top=y;
document.body.appendChild(c);
setTimeout(function(){c.style.opacity=0;setTimeout(function(){document.body.removeChild(c)},999)},200);
let w=%s;
let h=%s;
let d=false;
let i=setInterval(function(){
if((w>%s||h>%s)||d){
d=true;
w-=2;
h-=2;
}
else{
w+=5;
h+=5;
}
c.style.width=w+"px";c.style.height=h+"px";
if((w<=12||h<=12)&&d){clearInterval(i)}
},20);
''' % (0, 0, 0, 0, 30, 30), element, x, y
)
def _update_cdp_command(self):
for cmd in self.cdplist:
self.execute_cdp_cmd(*cmd)
def _update_tab_auto_switch(self):
if (self.current_alert is None) == 1:
try:
self.execute(Command.GET_TITLE)
except Exception:
len(self.window_handles) > 0 and self.switch_to.window(self.window_handles[0])
class BrowserPluginRunningParam(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@classmethod
def no_method(cls, *args, **kwargs):
pass
def __setitem__(self, key, value):
pass
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return self.no_method
class BrowserPluginParent:
id = None
name = None
requirements = []
def __init__(self):
self.app_id = None
self.thread = None
self.result_color = None
self.result = None
self.exception = None
self.user_id = None
self.instant_message = None
self.batch_mode = None
self._rs = 0
self._re = 0
@property
def running_time(self):
return (int(time.time()) - self._rs) if self._rs > self._re else (self._re - self._rs)
def _run_task(self, *args, **kwargs):
try:
self._rs = int(time.time())
res = self.running(*args, **kwargs)
if isinstance(res, (tuple, list)):
self.result_color = res[0]
self.result = res[1]
except Exception as e:
self.exception = e
self.message_except(e)
finally:
self._re = int(time.time())
def message(self, message=None):
print('[%s]: %s' % (str(self.app_id), message), file=sys.stderr)
if len(str(message)) <= 4096:
notification_send(app_id=self.app_id, title=self.name, message='%s' % (message,))
if (self.batch_mode is True) == 1:
return None
self.instant_message and self.instant_message(json.dumps({
'time': int(time.time()), 'type': 'message', 'data': {'user_id': str(self.user_id), 'content': str(message), 'title': str(self.app_id)}
}))
def message_except(self, exception_info):
notification_send(app_id=self.app_id, title='%s ' % (self.name,), message='%s' % (exception_info,))
def logging(self, logging=None):
print('[%s]: %s' % (str(self.app_id), logging), file=sys.stderr)
if len(str(logging)) <= 1024:
self.instant_message and self.instant_message(json.dumps({
'time': int(time.time()), 'type': 'logging', 'data': {'user_id': str(self.user_id), 'content': str(logging)}
}))
def run(self, app_id, user_id, driver, requirements, instant_message) -> threading.Thread:
self.app_id, self.user_id, self.instant_message = app_id, user_id, instant_message
params = BrowserPluginRunningParam({'driver': driver, 'message': self.message, 'logging': self.logging, 'requirements': requirements})
thread = threading.Thread(target=self._run_task, args=(params,))
self.thread = thread
thread.daemon = True
thread.start()
return thread
def interrupt(self):
try:
self.thread and self.thread.is_alive() and ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.thread.ident), ctypes.py_object(SystemExit))
except Exception:
pass
finally:
self.thread = None
def state(self):
return self.thread.is_alive() if self.thread else False
@staticmethod
def running(param: BrowserPluginRunningParam):
param.driver.wait(5)
class BrowserPluginFileTest(BrowserPluginParent):
name = '单个文件上传测试'
requirements = [
{
'type': 'file',
'html': '上传文件'
}
]
@staticmethod
def running(param):
param.message(param.requirements)
return '#FFFFFF', '已完成'
class BrowserPluginMultFileTest(BrowserPluginParent):
name = '多个文件上传测试'
requirements = [
{
'type': 'file-mult',
'html': '上传文件'
}
]
@staticmethod
def running(param):
param.message(param.requirements)
return '#FFFFFF', '已完成'
class BrowserPluginTextTest(BrowserPluginParent):
name = '单行文本输入测试'
requirements = [
{
'type': 'text',
'html': '输入文本'
}
]
@staticmethod
def running(param):
param.message(param.requirements)
return '#FFFFFF', '已完成'
class BrowserPluginMultTextTest(BrowserPluginParent):
name = '多行文本输入测试'
requirements = [
{
'type': 'text-mult',
'html': '输入文本'
}
]
@staticmethod
def running(param):
param.message(param.requirements)
return '#FFFFFF', '已完成'
class BrowserPluginLoggingTest(BrowserPluginParent):
name = '在线日志输出测试'
requirements = [
{
'type': 'text-mult',
'html': '输入内容'
}
]
@staticmethod
def running(param):
param.logging(param.requirements[0])
return '#FFFFFF', '已完成'
class BrowserPluginMessageTest(BrowserPluginParent):
name = '在线消息弹窗测试'
requirements = [
{
'type': 'text-mult',
'html': '输入内容'
}
]
@staticmethod
def running(param):
param.message(param.requirements[0])
return '#FFFFFF', '已完成'
class BrowserPluginFileCommandDebug(BrowserPluginParent):
name = '上传脚本进行调试'
requirements = [
{
'type': 'file',
'html': '调试'
}
]
@staticmethod
def running(param):
file = param.requirements[0][0]['path']
exec(open(file, mode='r', encoding='utf-8').read())
return '#FFFFFF', '已完成'
class BrowserPluginTextCommandDebug(BrowserPluginParent):
name = '输入命令进行调试'
requirements = [
{
'type': 'text-mult',
'html': '调试'
}
]
@staticmethod
def running(param):
code = param.requirements[0]
code and exec(code)
return '#FFFFFF', '已完成'
class BrowserManagerDataStorage(dict):
def __init__(self, file: str):
self._data_file = os.path.abspath(file)
super().__init__(self._get_json_data(self._data_file, {}))
@staticmethod
def _get_json_data(file: str, data=None):
if os.path.exists(file):
try:
return json.loads(open(file=file, mode='r', encoding='utf-8').read())
except json.decoder.JSONDecodeError:
return data
return data
@staticmethod
def _put_json_data(file: str, data=None):
with open(file=file, mode='w', encoding='utf-8') as f:
flock(f, LOCK_EX)
res = f.write(json.dumps(data, indent=4, ensure_ascii=True))
flock(f, LOCK_UN)
return res
def __getitem__(self, item):
if item in self:
return super().__getitem__(item)
else:
return None
def __setitem__(self, key, value):
super().__setitem__(key, value)
def save(self):
self._put_json_data(self._data_file, self)
class BrowserManagerUserRunning:
def __init__(
self,
running=None,
user_id: str = None,
user_name: str = None,
user_data_dir: str = None,
remote_debugging_port: int = None
):
if isinstance(running, type(self)):
self.__dict__ = running.__dict__
else:
self.active = 1
self.update_status_last_time = 0
self.user_id = user_id
self.user_name = user_name
self.user_data_dir = user_data_dir
self.remote_debugging_port = remote_debugging_port
self.driver = None
self.plugin = None
self.is_running_property = False
self.url = None
self.title = None
self.alert = None
self.window_handles = None
self.update_status_details()
@property
def is_running(self):
self.update_status_running()
return self.is_running_property
@is_running.setter
def is_running(self, value):
self.is_running_property = value
@property
def is_user_running(self):
try:
open(os.path.join(self.user_data_dir, 'lockfile'), mode='r').close()
return False
except FileNotFoundError:
return False
except PermissionError:
return True
@func_set_timeout(0.05)
def _get_window_handles(self):
return list(self.driver.window_handles)
@func_set_timeout(0.05)
def _get_current_alert_text(self):
return str(self.driver.current_alert_text)
@func_set_timeout(0.05)
def _get_title(self):
return str(self.driver.title)
@func_set_timeout(0.05)
def _get_current_url(self):
return str(self.driver.current_url)
def update_status_running(self):
if (self.is_user_running is False) == 1:
self.chrome_stopped_trigger()
else:
self.chrome_running_trigger()
def update_status_details(self):
if (self.is_user_running is False) == 1:
self.chrome_stopped_trigger()
else:
self.chrome_running_trigger()
if (round(time.time(), 3) - self.update_status_last_time) > round(random.uniform(3, 12), 3) and self.driver and self.active:
try:
self.window_handles = self._get_window_handles()
self.alert = self._get_current_alert_text()
self.title = self._get_title()
self.url = self._get_current_url()
except FunctionTimedOut:
pass
except Exception:
pass
self.update_status_last_time = round(time.time(), 3)
def chrome_running_trigger(self):
if (not self.is_running_property) == 0:
return None
self.is_running = bool(1)
def chrome_stopped_trigger(self):
if (not self.is_running_property) == 1:
return None
self.is_running = bool(0)
self.url = None
self.title = None
self.alert = None
self.window_handles = None
if (not self.plugin) == 0:
plugin = self.plugin
threading.Thread(target=plugin.interrupt).start()
self.plugin = None
if (not self.driver) == 0:
driver = self.driver
threading.Thread(target=driver.quit_backend).start()
self.driver = None
def status(self):
threading.Thread(target=self.update_status_details).start()
return {
'user_id': self.user_id,
'user_name': self.user_name,
'user_data_dir': self.user_data_dir,
'remote_debugging_port': self.remote_debugging_port,
'is_running': self.is_running,
'url': self.url,
'title': self.title,
'alert': self.alert,
'window_handles': self.window_handles,
'plugin_running': self.plugin.state() if self.plugin else False,
'plugin_running_time': self.plugin.running_time if self.plugin else None,
'plugin_execution_result': self.plugin.result if self.plugin else None,
'plugin_execution_result_color': self.plugin.result_color if self.plugin else None
}
def app_id(self):
return '%s|%s' % (self.user_id, self.user_name)
def set_driver(self, driver: Browser = None):
self.driver = driver
def set_plugin(self, plugin: BrowserPluginParent = None):
self.plugin = plugin
def set_user_name(self, user_name: str):
self.user_name = user_name
class BrowserManagerBusy(Exception):
def __init__(self, message):
self.message = message
super().__init__(self.message)
class BrowserPluginBatchExecution:
def __init__(self):
self.plugin_id = None
self.plugin_name = None
self.plugin_time = None
self.users = None
self.plugins = None
self.results = None
self.running = None
def set_plugin_for_monitor(self, users: list, plugins: list, plugin_id: str, plugin_name: str):
if self.running is True:
raise Exception('Running.')
self.plugin_id = plugin_id
self.plugin_name = plugin_name
self.plugin_time = int(time.time())
self.users = users
self.plugins = plugins
threading.Thread(target=self._monitor_until_all_completed).start()
def _monitor_until_all_completed(self):
if not isinstance(self.plugins, list):
return None
self.results = []
self.running = True
while True:
time.sleep(2.5)
if all(not plugin.state() for plugin in self.plugins):
break
for plugin in self.plugins:
self.results.append([plugin.app_id, plugin.running_time, plugin.result])
self.running = False
class BrowserManager:
def __init__(self, runner, driver: str, binary: str, plugin_result_dir: str, manager_data_file: str, browser_data_home: str, browser_init_home: str, use_selenium_wire: int):
self.runner = runner
if not os.path.exists(driver):
raise FileNotFoundError('The driver executable file does not exist.')
if not os.path.exists(binary):
raise FileNotFoundError('The binary executable file does not exist.')
self.driver = driver
self.binary = binary
self.plugin_result_dir = plugin_result_dir
self.manager_data_file = manager_data_file
self.browser_data_home = browser_data_home
self.browser_init_home = browser_init_home
self.use_selenium_wire = use_selenium_wire
self.threading_lock = threading.RLock()
self.debugging_port_range = range(60000, 60256)
self.data_storage = BrowserManagerDataStorage(manager_data_file)
if (not self.data_storage) == 1:
self.data_storage['browser_user'] = {}
self.data_storage.save()
os.path.exists(plugin_result_dir) or os.makedirs(plugin_result_dir)
os.path.exists(browser_data_home) or os.makedirs(browser_data_home)
self.geometry_config = {}
self.user_running = {user_id: self._initialize_user_running(user_id) for user_id in self.data_storage['browser_user'].keys()}
self.user_in_operation = []
self.plugins_int = {}
self.plugins_ext = {}
self.plugin_batch_execution = BrowserPluginBatchExecution()
def _generate_remote_debugging_port(self):
exist_ports = [value['remote_debugging_port'] for key, value in self.data_storage['browser_user'].items()]
return random.choice([p for p in self.debugging_port_range if p not in exist_ports])
def _update_user_running(self):
for user_id, running in self.user_running.items():
if (not running) == 1:
continue
running.update_status_running()
def _get_user_process_id(self, user_id: str):
try:
return int(self.data_storage['browser_user'][user_id]['process_id'])
except KeyError:
return None
def _get_user_name(self, user_id: str):
try:
return str(self.data_storage['browser_user'][user_id]['user_name'])
except KeyError:
return None
def _get_user_data_dir(self, user_id: str):
try:
return os.path.join(self.browser_data_home, self.data_storage['browser_user'][user_id]['user_data_dir'])
except KeyError:
return None
def _get_user_remote_debugging_port(self, user_id: str):
try:
return int(self.data_storage['browser_user'][user_id]['remote_debugging_port'])
except KeyError:
return None
def _get_user_app_id(self, user_id: str):
try:
return self.data_storage['browser_user'][user_id]['user_name'] or user_id
except KeyError:
return user_id
def _initialize_user_running(self, user_id: str):
return BrowserManagerUserRunning(
user_id=user_id,
user_name=self._get_user_name(user_id),
user_data_dir=self._get_user_data_dir(user_id),
remote_debugging_port=self._get_user_remote_debugging_port(user_id)
)
@property
def plugins(self) -> dict:
return {**self.plugins_int, **self.plugins_ext}
def handle_interrupt(self):
print('Received interrupt. Ending all backend...', file=sys.stderr)
online_users = self.user_ids_online()
for user_id in online_users:
try:
driver = BrowserManagerUserRunning(self.user_running[user_id]).driver
driver and threading.Thread(target=driver.quit_backend).start()
except Exception:
pass
self.clear_dirs(self.plugin_result_dir)
@staticmethod
def req_interceptor(req: SeleniumWireRequest):
pass
@staticmethod
def res_interceptor(req: SeleniumWireRequest, res: SeleniumWireResponse):
try:
print('[%s] %s %s %s' % (str(req.method).ljust(7), 'HTTP', res.status_code, req.url), file=sys.stderr)
except Exception as e:
print(e, file=sys.stderr)
@staticmethod
def get_chrome_hwnd(pid: int):
if pid:
def callback(hwnd, hwnds):
if win32gui.IsWindowVisible(hwnd) and win32gui.GetParent(hwnd) == 0:
_, found_pid = win32process.GetWindowThreadProcessId(hwnd)
if found_pid == pid and 'Chrome_' in win32gui.GetClassName(hwnd):
ws = win32gui.GetWindowLong(hwnd, win32con.GWL_STYLE)
if (ws & win32con.WS_EX_TOOLWINDOW) == 0 and (ws & win32con.WS_EX_PALETTEWINDOW) == 0:
hwnds.append(hwnd)
return False
hwnds = []
win32gui.EnumWindows(callback, hwnds)
return hwnds[0] if hwnds else None
else:
return None
def run_browser(self, user_data_dir: str, remote_debugging_port: int, proxy_server: str = None):
options = [
self.binary,
'--disable-background-networking',
'--disable-desktop-notifications',
'--disable-component-update',
'--no-default-browser-check',
'--no-first-run',
'--hide-crash-restore-bubble'
]
if user_data_dir:
options.append('--user-data-dir=%s' % (user_data_dir,))
if remote_debugging_port:
options.append('--remote-debugging-port=%s' % (remote_debugging_port,))
if proxy_server:
options.append('--proxy-server=%s' % (proxy_server,))
return subprocess.Popen(options, shell=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
@staticmethod
def clear_dirs(dirs):
if (os.path.exists(dirs)) == 1:
for filename in os.listdir(dirs):
filepath = os.path.join(dirs, filename)
try:
os.path.isfile(filepath) and os.unlink(filepath)
except Exception:
pass
def restore_last_status(self):
for user_id, running in self.user_running.items():
if (not running) == 1:
continue
run = BrowserManagerUserRunning(running)
run.update_status_running()
if run.is_running and run.driver is None:
threading.Thread(target=self.user_run, args=(user_id,)).start()
print('Restoring user control: %s' % (user_id,), file=sys.stderr)
def load_plugins(self, plugins, is_external=0):
if (plugins is None) == 1:
return None
if (not is_external) == 1:
plugin_dict = self.plugins_int
plugin_id_prefix = '00'
else:
plugin_dict = self.plugins_ext
plugin_id_prefix = '01'
plugin_dict.clear()
plugin_length = len(plugins)
for i in range(plugin_length):
plugin_class = plugins[i]
plugin_id = '%s_%s_%s' % (plugin_id_prefix, '{:02d}'.format(i + 1), hashlib.md5(str(plugin_class.__name__).encode(encoding='utf-8')).hexdigest())
plugin_class.id = plugin_id
plugin_dict[plugin_id] = plugin_class
def load_plugins_from_external_module(self):
module_home_list = [os.path.join(os.path.dirname(__file__), 'Packages'), os.path.join(self.runner.app_data, 'Packages')]
for module_home in module_home_list:
if (not os.path.exists(module_home)) == 1:
continue
try:
plugins_classes_site = []
module_list = glob.glob(os.path.join(module_home, 'Plugin*.py'))
for module_path in module_list:
try:
print('Load plugins from \"%s\"' % (module_path,), file=sys.stderr)
plugins_modules = import_module(module_path)
plugins_classes = [type(name, (cls, BrowserPluginParent), {}) for name, cls in plugins_modules.__dict__.items() if
name.startswith('BrowserPlugin') and inspect.isclass(cls)]
plugins_classes_site.append(plugins_classes)
except Exception as e:
print(e, file=sys.stderr)
self.load_plugins([element for sublist in plugins_classes_site for element in sublist], is_external=1)
except Exception as e:
print(e, file=sys.stderr)
def update_geometry_config(self, screen_w: int, screen_h: int, window_w: int, window_h: int, window_x: int, window_y: int):
self.geometry_config = {
'screen_w': screen_w,
'screen_h': screen_h,
'control_window_w': window_w,
'control_window_h': window_h,
'control_window_x': window_x,
'control_window_y': window_y,
'browser_window_w': round(window_h * 1.78) if ((screen_w - window_w) / window_h) > 1.78 else (screen_w - window_w),
'browser_window_h': window_h,
'browser_window_x': window_x + window_w,
'browser_window_y': window_y
}
def user_operate_starting(self, user_id: str):
user_id = str(user_id)
if (user_id not in self.user_in_operation) == 1:
self.user_in_operation.append(user_id)
else:
raise BrowserManagerBusy('Busy.')
def user_operate_complete(self, user_id: str):
user_id = str(user_id)
while user_id in self.user_in_operation:
self.user_in_operation.remove(user_id)
def is_user_data_occupied(self, user_id: str):
try:
open(os.path.join(self._get_user_data_dir(user_id), 'lockfile'), mode='r').close()
return False
except FileNotFoundError:
return False
except PermissionError:
return True
def is_port_in_use(self, port: int):
if (port not in self.debugging_port_range) == 1:
return True
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.05)
try:
return s.connect_ex(('127.0.0.1', port)) == 0
except socket.timeout:
return False
finally:
s.close()
@calculate_execution_time
def plugin_run(self, user_id: str, plugin_id: str, requirements=None):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
running = BrowserManagerUserRunning(self.user_running[user_id])
if (running.plugin and running.plugin.state()) == 1:
raise Exception('The plugin is running, please stop it first.')
driver = running.driver
if (driver is None) == 1:
raise Exception('No driver object.')
plugin = self.plugins[plugin_id]()
plugin.run(self._get_user_app_id(user_id), user_id, driver, requirements, self.runner.web_server.websocket_connection_manager.send_broadcast_use_sync)
running.set_plugin(plugin)
self.user_operate_complete(user_id)
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
@calculate_execution_time
def plugin_shutdown(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
running = BrowserManagerUserRunning(self.user_running[user_id])
if (running.plugin and running.plugin.state()) != 1:
raise Exception('The plugin is not running.')
plugin = running.plugin
plugin.interrupt()
self.user_operate_complete(user_id)
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
def plugin_all_users_run(self, plugin_id: str, requirements=None):
try:
if (self.plugin_batch_execution.running is True) == 1:
raise Exception('Still in batch execution.')
users = []
plugins = []
for user_id in self.user_ids_online():
try:
self.plugin_run(user_id, plugin_id, requirements)
users.append(user_id)
plugin = BrowserManagerUserRunning(self.user_running[user_id]).plugin
plugin.batch_mode = True
plugins.append(plugin)
time.sleep(0.1)
except Exception:
pass
self.plugin_batch_execution.set_plugin_for_monitor(users, plugins, plugin_id, self.plugins[plugin_id].name)
except Exception:
raise
def plugin_all_users_shutdown(self):
try:
for user_id in self.plugin_batch_execution.users:
try:
self.plugin_shutdown(user_id)
except Exception:
pass
except Exception:
pass
def plugin_list(self):
return {plugin_id: {'name': plugin_class.name, 'requirements': plugin_class.requirements} for plugin_id, plugin_class in self.plugins.items()}
def user_ids(self):
return [user_id for user_id in self.data_storage['browser_user'].keys()]
def user_ids_online(self):
self._update_user_running()
return [user_id for user_id, running in self.user_running.items() if running and running.is_running]
@calculate_execution_time
def user_all_details(self):
details = {user_id: self.user_running[user_id].status() for user_id in self.user_ids() if user_id in self.user_running.keys()}
return {
'user_online_count': len(self.user_ids_online()),
'user_details': details,
'plugin_batch_execution_id': self.plugin_batch_execution.plugin_id,
'plugin_batch_execution_name': self.plugin_batch_execution.plugin_name,
'plugin_batch_execution_time': self.plugin_batch_execution.plugin_time,
'plugin_batch_execution_status': self.plugin_batch_execution.running,
'plugin_batch_execution_result': self.plugin_batch_execution.results
}
@calculate_execution_time
def user_set_name(self, user_id: str, user_name: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
if (not isinstance(user_name, str) or len(user_name) > 24) == 1:
raise ValueError('Illegal Username.')
user_name = user_name.strip()
self.data_storage['browser_user'][user_id]['user_name'] = user_name
self.data_storage.save()
self.user_running[user_id].set_user_name(user_name)
self.user_operate_complete(user_id)
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
@calculate_execution_time
def user_focus_window(self, user_id: str):
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
try:
chrome_hwnd = self.get_chrome_hwnd(self._get_user_process_id(user_id))
if chrome_hwnd:
for _ in range(1 + int(bool(win32gui.IsIconic(chrome_hwnd)))):
win32gui.ShowWindow(chrome_hwnd, win32con.SW_RESTORE)
win32gui.ShowWindow(chrome_hwnd, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(chrome_hwnd)
win32gui.SetWindowPos(
chrome_hwnd,
win32con.HWND_NOTOPMOST,
self.geometry_config['browser_window_x'],
self.geometry_config['browser_window_y'],
self.geometry_config['browser_window_w'],
self.geometry_config['browser_window_h'],
win32con.SWP_NOZORDER
)
except Exception:
pass
@calculate_execution_time
def user_add(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (not isinstance(user_id, str) or not re.match('^[0-9A-Za-z_-]{1,16}$', user_id)) == 1:
raise ValueError('Invalid user ID format.')
if (user_id in self.user_ids()) == 1:
raise FileExistsError('User ID already exists.')
basename = 'data_%s' % (user_id,)
data_dir = os.path.join(self.browser_data_home, basename)
self.data_storage['browser_user'][user_id] = {
'user_name': '',
'user_data_dir': basename,
'remote_debugging_port': self._generate_remote_debugging_port()
}
self.data_storage.save()
self.user_running[user_id] = self._initialize_user_running(user_id)
if (os.path.exists(self.browser_init_home)) == 1:
os.path.exists(data_dir) or shutil.copytree(self.browser_init_home, data_dir)
else:
os.path.exists(data_dir) or os.makedirs(data_dir)
self.user_operate_complete(user_id)
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
@calculate_execution_time
def user_del(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
if (user_id in self.user_ids_online()) == 1:
raise FileExistsError('User ID is running, please stop it first.')
data_dir = self._get_user_data_dir(user_id)
data_dir_deleted = '%s_deleted' % (data_dir,)
os.path.exists(data_dir) and os.rename(data_dir, data_dir_deleted)
os.path.exists(data_dir_deleted) and shutil.rmtree(data_dir_deleted)
self.data_storage['browser_user'].pop(user_id)
self.data_storage.save()
self.user_running.pop(user_id)
self.user_operate_complete(user_id)
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
@calculate_execution_time
def user_run(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids()) == 0:
raise FileExistsError('User ID not exists.')
user_data_dir = self._get_user_data_dir(user_id)
remote_debugging_port = self._get_user_remote_debugging_port(user_id)
mitmproxy_port = remote_debugging_port - 5000
running = BrowserManagerUserRunning(self.user_running[user_id])
running.active = 1
if (self.is_user_data_occupied(user_id)) == 0:
browser_process = self.run_browser(user_data_dir=user_data_dir, remote_debugging_port=remote_debugging_port, proxy_server='127.0.0.1:%s' % (mitmproxy_port,) if self.use_selenium_wire else None)
self.data_storage['browser_user'][user_id]['process_id'] = browser_process.pid
self.data_storage.save()
print('The browser starts, and the process ID is %s.' % (browser_process.pid,), file=sys.stderr)
driver = Browser(
driver=self.driver,
binary=self.binary,
driver_classes=self.use_selenium_wire,
window_size=('%s,%s' % (self.geometry_config['browser_window_w'], self.geometry_config['browser_window_h'])) if self.geometry_config else None,
window_site=('%s,%s' % (self.geometry_config['browser_window_x'], self.geometry_config['browser_window_y'])) if self.geometry_config else None,
debugger_address='127.0.0.1:%s' % (remote_debugging_port,),
backends_address='127.0.0.1:%s' % (mitmproxy_port,),
req_interceptor=self.use_selenium_wire and self.req_interceptor,
res_interceptor=self.use_selenium_wire and self.res_interceptor
)
running.set_driver(driver)
self.user_operate_complete(user_id)
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
@calculate_execution_time
def user_die(self, user_id: str):
try:
self.user_operate_starting(user_id)
if (user_id in self.user_ids_online()) == 0:
raise FileExistsError('User ID is not running.')
debugging_port = self._get_user_remote_debugging_port(user_id)
running = BrowserManagerUserRunning(self.user_running[user_id])
running.active = 0
driver = running.driver
plugin = running.plugin
plugin and plugin.state() and plugin.interrupt()
running.set_driver(None)
if (driver is None) == 1:
raise Exception('No driver object.')
if (self.is_port_in_use(debugging_port)) == 1:
threading.Thread(target=driver.quit).start()
while self.is_user_data_occupied(user_id):
try:
time.sleep(0.35)
except KeyboardInterrupt:
pass
else:
raise Exception('The debug port is not in listening.')
self.user_operate_complete(user_id)
except BrowserManagerBusy:
raise
except Exception:
self.user_operate_complete(user_id)
raise
class ApplicationSetting(dict):
def __init__(self, file: str):
self._data_file = os.path.abspath(file)
super().__init__(self._get_json_data(self._data_file, {}))
@staticmethod
def _get_json_data(file: str, data=None):
if os.path.exists(file):
try:
return json.loads(open(file=file, mode='r', encoding='utf-8').read())
except json.decoder.JSONDecodeError:
return data
return data
@staticmethod
def _put_json_data(file: str, data=None):
with open(file=file, mode='w', encoding='utf-8') as f:
flock(f, LOCK_EX)
res = f.write(json.dumps(data, indent=4, ensure_ascii=True))
flock(f, LOCK_UN)
return res
def __getitem__(self, item):
if item in self:
return super().__getitem__(item)
else:
return None
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._put_json_data(self._data_file, self)
class MainWindow(QMainWindow):
def __init__(self, runner, app_name: str, app_version: str, scale_rate: float, web_listen_host: str, web_listen_port: int):
super().__init__()
self.runner = runner
try:
if (os.path.exists(self.runner.app_running_file)) == 1:
QMessageBox.information(self, 'Warning', 'The application was not exited normally last time. Please wait for recovery.')
self.runner.web_server.browser_manager.restore_last_status()
except Exception:
pass
finally:
open(self.runner.app_running_file, mode='w').close()
self.scale_rate = scale_rate
self.web_listen_host = web_listen_host
self.web_listen_port = web_listen_port
self.setWindowTitle('%s %s' % (app_name, app_version))
self.setAcceptDrops(True)
self.setWindowFlags(self.windowFlags() | Qt.WindowMinMaxButtonsHint)
screen = QDesktopWidget().screenGeometry()
self.screen_w, self.screen_h = screen.width(), screen.height()
self.window_w, self.window_h = round(455 * self.scale_rate), round(855 * self.scale_rate)
self.window_x, self.window_y = 0, 0
self.setFixedSize(self.window_w, self.window_h)
QNetworkProxyFactory.setUseSystemConfiguration(False)
self.setting = ApplicationSetting(os.path.join(runner.app_data, 'application.json'))
self.webview = WebEngineView()
self.webview_cache_dir = os.path.join(tempfile.gettempdir(), hashlib.md5(bytes('%s_cache' % (__file__,), encoding='utf-8')).hexdigest()[:32])
web_settings = self.webview.settings()
web_settings.setAttribute(QWebEngineSettings.ScrollAnimatorEnabled, True)
web_settings.setAttribute(QWebEngineSettings.Accelerated2dCanvasEnabled, True)
web_settings.setAttribute(QWebEngineSettings.WebGLEnabled, True)
profile = QWebEngineProfile.defaultProfile()
profile.setPersistentStoragePath(self.webview_cache_dir)
profile.setCachePath(self.webview_cache_dir)
self.webview.setZoomFactor(self.scale_rate or 1.0)
self.setCentralWidget(self.webview)
self.setWindowIcon(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')))
self.tray_icon = QSystemTrayIcon(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico')), self)
self.tray_icon_update_timer = QTimer(self)
self.tray_icon_update_timer.timeout.connect(self.on_tray_icon_update)
self.tray_icon_update_timer.start(2250)
self.tray_icon.activated.connect(self.on_tray_icon_activated)
self.tray_menu = QMenu()
action_list = [
['调试:全部打开必应搜索', self.on_debug_all_users_open_home],
['调试:全部打开新标签页', self.on_debug_all_users_open_none],
['调试:打印环境变量信息', self.on_debug_print_environment],
['调试:打印模块搜索路径', self.on_debug_print_module_path],
['调试:重新加载外部插件', self.on_debug_reload_external_plugins],
['配置首选文件', self.on_config_preferred_file],
['重载面板', self.webview.reload],
['退出', self.exit]
]
for action_item in action_list:
action = QAction(action_item[0], self)
action.triggered.connect(action_item[1])
self.tray_menu.addAction(action)
self.tray_icon.setContextMenu(self.tray_menu)
self.tray_icon.show()
# QShortcut(QKeySequence('F5'), self).activated.connect(self.on_debug_reload_external_plugins)
self.init()
def closeEvent(self, event):
event.ignore()
if (self.isVisible()) == 1:
self.hide()
else:
print('Terminating...', file=sys.stderr)
self.exit()
def init(self):
self.show()
self.window_position_reset()
self.runner.web_server.browser_manager.update_geometry_config(
screen_w=round(self.screen_w / self.scale_rate),
screen_h=round(self.screen_h / self.scale_rate),
window_w=round(self.geometry().width() / self.scale_rate),
window_h=round(self.geometry().height() / self.scale_rate) + 38,
window_x=self.pos().x(),
window_y=self.pos().y()
)
self.webview.load(QUrl('http://%s:%s/index.html' % ('127.0.0.1' if self.web_listen_host == '0.0.0.0' else self.web_listen_host, self.web_listen_port)))
self.webview.reload()
def exit(self):
self.webview.deleteLater()
self.runner.handle_interrupt()
try:
os.remove(self.runner.app_running_file)
except Exception:
pass
QApplication.quit()
def window_show(self):
self.showNormal()
self.activateWindow()
self.window_position_reset()
def window_hide(self):
self.hide()
def window_toggle(self):
self.window_show()
def window_position_reset(self):
self.setGeometry(self.window_x, self.window_y + (self.frameGeometry().height() - self.geometry().height()), self.window_w, self.window_h)
def on_config_preferred_file(self):
self.window_show()
filename, _ = QFileDialog.getOpenFileName(self, '选择文件', '/', 'All Files (*)')
if (filename and os.path.exists(filename)) == 1:
filename = os.path.abspath(filename)
self.setting['plugin_preferred_file'] = filename
QMessageBox.information(self, '提示', 'The preferred file has changed to %s' % (filename,))
def on_tray_icon_update(self):
self.tray_icon.setToolTip('活跃用户:%s/%s' % (len(self.runner.web_server.browser_manager.user_ids_online()), len(self.runner.web_server.browser_manager.user_ids())))
def on_tray_icon_activated(self, reason):
if (reason == QSystemTrayIcon.Trigger) == 1:
self.window_toggle()
def on_debug_all_users_open_home(self):
for user_id in self.runner.web_server.browser_manager.user_ids_online():
driver = self.runner.web_server.browser_manager.user_running[user_id].driver
driver and threading.Thread(target=driver.open, args=('https://www.bing.com/',)).start()
def on_debug_all_users_open_none(self):
for user_id in self.runner.web_server.browser_manager.user_ids_online():
driver = self.runner.web_server.browser_manager.user_running[user_id].driver
driver and threading.Thread(target=driver.open, args=('chrome://new-tab-page',)).start()
def on_debug_print_environment(self):
self.showNormal()
QMessageBox.information(self, '提示', '\n'.join(['%s=%s' % (key, value) for key, value in os.environ.items()]))
def on_debug_print_module_path(self):
self.showNormal()
QMessageBox.information(self, '提示', '\n'.join(['%s' % (value,) for value in sys.path]))
def on_debug_reload_external_plugins(self):
self.runner.web_server.browser_manager.load_plugins_from_external_module()
self.webview.reload()
class WebEngineViewUrlRequestInterceptor(QWebEngineUrlRequestInterceptor):
def interceptRequest(self, info):
url = info.requestUrl().toString()
url.startswith('file://') and info.redirect(QUrl(''))
webEngineViewUrlRequestInterceptor = WebEngineViewUrlRequestInterceptor()
class WebEngineView(QWebEngineView):
def __init__(self, parent=None):
super().__init__(parent)
self.setContextMenuPolicy(0)
self.setAcceptDrops(True)
self.settings().setFontFamily(self.settings().StandardFont, 'Microsoft YaHei')
self.page().profile().downloadRequested.connect(self.onDownloadRequested)
self.page().profile().setUrlRequestInterceptor(webEngineViewUrlRequestInterceptor)
@staticmethod
def onDownloadRequested(download):
download.accept()
def createWindow(self, wt):
return self
class WebServerAPIJSONData(BaseModel):
data: dict = None
class WebSocketConnectionManager:
def __init__(self):
self.active_connections = []
async def connect(self, websocket: WebSocket, disconnect=None):
if disconnect:
self.active_connections.remove(websocket)
else:
await websocket.accept()
await websocket.send_text('Hello, %s:%s' % (websocket.client.host, websocket.client.port))
self.active_connections.append(websocket)
async def send_broadcast(self, message: str):
dead_connections = []
for conn in self.active_connections:
try:
await conn.send_text(str(message))
except WebSocketDisconnect:
dead_connections.append(conn)
for conn in dead_connections:
await self.connect(conn, 1)
def send_broadcast_use_sync(self, text):
asyncio.run(self.send_broadcast(text))
class WebServer:
def __init__(self, runner, root: str, data: str, default_plugins=None):
self.runner = runner
self.app = FastAPI()
self.www = os.path.join(root, 'www')
self.upload_dir = os.path.join(data, 'upload')
self.browser_manager = BrowserManager(
runner=runner,
driver=os.path.join(data, 'Chrome/chromedriver.exe'),
binary=os.path.join(data, 'Chrome/chrome.exe'),
plugin_result_dir=os.path.join(data, 'results'),
manager_data_file=os.path.join(data, 'manager.json'),
browser_data_home=os.path.join(data, 'users'),
browser_init_home=os.path.join(root, 'initialize'),
use_selenium_wire=0
)
self.browser_manager.load_plugins(default_plugins, is_external=0)
self.browser_manager.load_plugins_from_external_module()
self.websocket_connection_manager = WebSocketConnectionManager()
@self.app.websocket('/instant_message')
async def websocket_instant_message(websocket: WebSocket):
await self.websocket_connection_manager.connect(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
pass
await self.websocket_connection_manager.connect(websocket, 1)
@self.app.api_route(methods=['GET', 'POST'], path='/plugin_preferred_file')
def api_plugin_preferred_file():
file = self.runner.window and self.runner.window.setting['plugin_preferred_file']
if (file and os.path.exists(file)) == 1:
return JSONResponse(status_code=200, content=self.message(0, '', file))
else:
return JSONResponse(status_code=200, content=self.message(0, '', None))
@self.app.api_route(methods=['POST'], path='/plugin_run')
def api_plugin_run(data: WebServerAPIJSONData):
try:
self.browser_manager.plugin_run(data.data['user_id'], data.data['plugin_id'], data.data['requirements'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/plugin_shutdown')
def api_plugin_shutdown(data: WebServerAPIJSONData):
try:
self.browser_manager.plugin_shutdown(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/plugin_all_users_run')
def api_plugin_all_users_run(data: WebServerAPIJSONData):
try:
self.browser_manager.plugin_all_users_run(data.data['plugin_id'], data.data['requirements'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/plugin_all_users_shutdown')
def api_plugin_all_users_shutdown():
try:
self.browser_manager.plugin_all_users_shutdown()
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['GET', 'POST'], path='/plugin_list')
def api_plugin_list():
return self.browser_manager.plugin_list()
@self.app.api_route(methods=['POST'], path='/upload_files')
def api_upload_files(files: List[UploadFile] = File(...)):
if (len(files) > 25) == 1:
return JSONResponse(status_code=200, content=self.message(1, 'Exceeded the maximum number of files allowed of 25.'))
if (sum(file.size for file in files) > 50 * 1024 * 1024) == 1:
return JSONResponse(status_code=200, content=self.message(1, 'Total file size exceeds the limit of 50MB.'))
else:
uploaded_files = []
for file in files:
file_save_path = os.path.join(self.upload_dir, '%s_%s' % (int(time.time() * 1000), file.filename))
with open(file_save_path, 'wb') as f:
f.write(file.file.read())
uploaded_files.append(file_save_path)
return JSONResponse(status_code=200, content=self.message(0, '', uploaded_files))
@self.app.api_route(methods=['GET', 'POST'], path='/user_all_details')
def api_user_all_details():
return self.browser_manager.user_all_details()
@self.app.api_route(methods=['POST'], path='/shutdown')
def api_shutdown():
try:
for user_id in self.browser_manager.user_ids_online():
self.browser_manager.user_die(user_id)
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_set_name')
def api_user_set_name(data: WebServerAPIJSONData):
try:
self.browser_manager.user_set_name(data.data['user_id'], data.data['user_name'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_add')
def api_user_add(data: WebServerAPIJSONData):
try:
self.browser_manager.user_add(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_del')
def api_user_del(data: WebServerAPIJSONData):
try:
self.browser_manager.user_del(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_run')
def api_user_run(data: WebServerAPIJSONData):
try:
self.browser_manager.user_run(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_die')
def api_user_die(data: WebServerAPIJSONData):
try:
self.browser_manager.user_die(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['POST'], path='/user_focus_window')
def api_user_focus_window(data: WebServerAPIJSONData):
try:
self.browser_manager.user_focus_window(data.data['user_id'])
return self.message(0)
except Exception as e:
return self.message(1, '%s' % (e,))
@self.app.api_route(methods=['GET'], path='/')
def index():
raise HTTPException(status_code=403)
@self.app.api_route(methods=['GET'], path='/200')
def state():
return Response(content='200', media_type='text/plain')
@self.app.api_route(methods=['GET'], path='/{filename:path}')
def www(filename):
return self.statics(os.path.join(self.www, filename))
@staticmethod
def message(code: int, info: str = '', data=None):
return {'code': int(code), 'info': info, 'data': data}
@staticmethod
def statics(file: str):
if (not os.path.isfile(file)) == 1:
raise HTTPException(status_code=404)
return FileResponse(file)
def handle_interrupt(self):
self.browser_manager.handle_interrupt()
self.clear_upload()
def clear_upload(self):
dirs = self.upload_dir
if (os.path.exists(dirs)) == 1:
for filename in os.listdir(dirs):
filepath = os.path.join(dirs, filename)
try:
os.path.isfile(filepath) and os.unlink(filepath)
except Exception:
pass
def run(self, host='127.0.0.1', port=8080):
os.path.exists(self.www) or os.makedirs(self.www)
os.path.exists(self.upload_dir) or os.makedirs(self.upload_dir)
self.clear_upload()
uvicorn.run(self.app, host=host, port=port, log_level='warning')
class OutputRedirector:
def __init__(self, sys_fp, new_fp):
self.sys_fp = sys_fp
self.new_fp = new_fp
self.time_format = ''
@staticmethod
def time():
return '[%s]\n' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),)
def write(self, s):
self.sys_fp.write(s)
if (self.time_format != self.time()) == 1:
self.time_format = self.time()
self.new_fp.write(self.time_format)
self.new_fp.write(s)
self.flush()
def writelines(self, lines):
self.sys_fp.writelines(lines)
self.new_fp.writelines(lines)
self.flush()
def flush(self):
self.sys_fp.flush()
self.new_fp.flush()
def __getattr__(self, item):
if item in ['write', 'writelines', 'flush']:
return getattr(self, item)
else:
return getattr(self.sys_fp, item)
class MainRunner:
def __init__(self):
signal.signal(signal.SIGINT, self._handle_interrupt)
self.app_root = os.path.dirname(__file__)
appdata = os.getenv('APPDATA')
self.app_data = os.path.join(appdata, 'Galactic') if appdata else os.path.join(os.path.dirname(__file__), 'data')
self.app_running_file = os.path.join(self.app_data, 'running')
self.app_name = 'Galactic'
self.app_version = '1.0.0.4'
self.web_server_host = '127.0.0.1'
self.web_server_port = 8095
self.web_server = None
self.web_server_thread = None
self.application = None
self.application_scale_rate = None
self.window = None
self.plugin_list = [
# BrowserPluginFileTest,
# BrowserPluginTextTest,
# BrowserPluginMultFileTest,
# BrowserPluginMultTextTest,
# BrowserPluginLoggingTest,
# BrowserPluginMessageTest,
# BrowserPluginFileCommandDebug,
# BrowserPluginTextCommandDebug
]
def _copy_files_and_directories(self, src, dst):
function_name = inspect.currentframe().f_code.co_name
if (os.path.exists(src)) != 1:
return None
if (os.path.isdir(src)) == 1:
if not os.path.exists(dst):
os.makedirs(dst)
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
self.__getattribute__(function_name)(s, d)
else:
shutil.copy(s, d)
else:
shutil.copy(src, dst)
@staticmethod
def _remove_directory(src):
if (os.path.isdir(src)) == 1:
shutil.rmtree(src)
@staticmethod
def _rename_directory(src, dst):
if (os.path.isdir(src)) == 1:
os.rename(src, dst)
@staticmethod
def _read_file_text(src):
return open(file=src, mode='r', encoding='utf-8').read() if os.path.exists(src) else ''
@staticmethod
def preprocessing():
for proc in psutil.process_iter():
try:
proc.name().lower() == 'chromedriver.exe' and proc.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
def install_chrome(self):
chrome_install_source = os.path.join(self.app_root, 'Chrome')
chrome_install_source_version = os.path.join(chrome_install_source, 'chrome.version')
chrome_install_target = os.path.join(self.app_data, 'Chrome')
chrome_install_target_version = os.path.join(chrome_install_target, 'chrome.version')
chrome_install_target_newname = '%s.delete' % (chrome_install_target,)
if (os.path.exists(chrome_install_source)) != 1:
return None
if (self._read_file_text(chrome_install_source_version) == self._read_file_text(chrome_install_target_version)) == 1:
return None
if (os.path.exists(chrome_install_target)) == 1:
self._remove_directory(chrome_install_target_newname)
self._rename_directory(chrome_install_target, chrome_install_target_newname)
self._remove_directory(chrome_install_target_newname)
self._copy_files_and_directories(chrome_install_source, chrome_install_target)
return True
def run(self):
os.path.exists(self.app_data) or os.makedirs(self.app_data)
log_dir = os.path.join(self.app_data, 'log')
os.path.exists(log_dir) or os.makedirs(log_dir)
stdout_redirector = OutputRedirector(sys.stdout, open(os.path.join(log_dir, '%s_stdout.log' % (time.strftime('%Y%m%d', time.localtime()),)), mode='a'))
stderr_redirector = OutputRedirector(sys.stderr, open(os.path.join(log_dir, '%s_stderr.log' % (time.strftime('%Y%m%d', time.localtime()),)), mode='a'))
sys.stderr = stderr_redirector
sys.stdout = stdout_redirector
sys.path.append(os.path.join(os.path.dirname(__file__), 'Packages'))
sys.path.append(os.path.join(os.path.dirname(__file__), 'site-packages.zip'))
sys.path.append(os.path.join(self.app_data, 'Packages'))
sys.path.append(os.path.join(self.app_data, 'site-packages.zip'))
print('Startup.', file=sys.stderr)
try:
self.preprocessing()
except Exception:
pass
try:
self.install_chrome() and print('Chrome is now installed.', file=sys.stderr)
except PermissionError:
_app = QApplication(sys.argv)
_msg = QMessageBox()
_msg.setIcon(QMessageBox.Warning)
_msg.setText('Chrome needs to be updated, please quit all browser users first.')
_msg.setWindowTitle('Warning')
_msg.setStandardButtons(QMessageBox.Cancel)
_msg.exec_()
_app.exit(1)
self.web_server = WebServer(runner=self, root=self.app_root, data=self.app_data, default_plugins=self.plugin_list)
self.web_server_thread = threading.Thread(target=self.web_server.run, kwargs={'host': self.web_server_host, 'port': self.web_server_port})
self.web_server_thread.daemon = True
self.web_server_thread.start()
self.application = QApplication(sys.argv)
self.application.setHighDpiScaleFactorRoundingPolicy(Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)
self.application_scale_rate = self.application.screens()[0].logicalDotsPerInch() / 96
self.window = MainWindow(
runner=self,
app_name=self.app_name,
app_version=self.app_version,
scale_rate=self.application_scale_rate,
web_listen_host=self.web_server_host,
web_listen_port=self.web_server_port
)
status = self.application.exec_()
print('Exit status code: %s' % (status,), file=sys.stderr)
sys.exit(status)
def build(self):
if (str(__file__).endswith('.py')) == 0:
print('Build is not currently supported.', file=sys.stderr)
exit(1)
from_home = os.path.dirname(__file__)
dist_home = os.path.join(os.path.dirname(__file__), '%s.dist' % (os.path.splitext(os.path.basename(__file__))[0],))
ask = input('%s %s: ' % ('Build?', '[Y/n]'))
if ask.lower().strip() == 'y':
subprocess.run(['pip', 'install', 'nuitka', '-q'], shell=True, env=os.environ)
subprocess.run([
'python',
'-m',
'nuitka',
'--standalone',
'--enable-plugin=pyqt5',
'--enable-plugin=pylint-warnings',
'--include-module=PyQt5',
'--include-module=selenium',
'--include-module=fastapi',
'--include-module=pydantic',
'--include-module=starlette',
'--windows-console-mode=disable',
'--windows-icon-from-ico=favicon.ico',
'--product-name=%s' % (self.app_name,),
'--file-description=%s' % (self.app_name,),
'--product-version=%s' % (self.app_version,),
'--copyright=Copyright (C) 2025',
'--output-dir=%s' % (os.path.join(os.path.dirname(__file__)),),
'%s' % (__file__,)
], shell=True, env=os.environ)
for i in ['Packages', 'PyQt5', 'Chrome', 'initialize', 'www', 'favicon.ico']:
self._copy_files_and_directories('%s/%s' % (from_home, i), '%s/%s' % (dist_home, i))
else:
if (not os.path.exists(dist_home)) == 1:
return None
ask = input('%s %s: ' % ('Compile setup program?', '[Y/n]'))
if ask.lower().strip() == 'y':
compile_file = os.path.join(os.path.dirname(__file__), '%s.iss' % (os.path.splitext(os.path.basename(__file__))[0],))
compile_template = os.path.join(os.path.dirname(__file__), '%s.iss.template' % (os.path.splitext(os.path.basename(__file__))[0],))
compiler = 'C:\\Program Files (x86)\\Inno Setup 6\\ISCC.exe'
if (os.path.exists(compile_template)) != 1:
print('The template file \"%s\" does not exist.' % (compile_template,), file=sys.stderr)
return None
if (os.path.exists(compiler)) != 1:
print('The compiler \"%s\" does not exist. Please check if Inno Setup is installed. You can download it at https://www.innosetup.com/' % (compiler,),
file=sys.stderr)
return None
Path(compile_file).write_text(
Path(compile_template).read_text().replace(
'%APPNAME%',
self.app_name
).replace(
'%APPEXEC%',
os.path.splitext(os.path.basename(__file__))[0]
).replace(
'%APPVERSION%',
self.app_version
).replace(
'%APPBUILDDATE%',
time.strftime('%Y%m%d', time.localtime())
).replace(
'%DISABLEX64%',
'' if platform.architecture()[0] == '64bit' else '; '
)
)
subprocess.run([compiler, compile_file])
def _handle_interrupt(self, _signal, _frame):
self.handle_interrupt()
def handle_interrupt(self):
try:
self.web_server and self.web_server.handle_interrupt()
except Exception as e:
print(e, file=sys.stderr)
if __name__ == '__main__':
if (os.path.basename(__file__).lower().endswith('.int')) == 1:
QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins'))
else:
QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'PyQt5/Qt5/plugins'))
f_lock = open(file=os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16]), mode='w', encoding='utf-8')
if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1:
app = QApplication(sys.argv)
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setText('The application is already running.')
msg.setWindowTitle('Warning')
msg.setStandardButtons(QMessageBox.Cancel)
msg.exec_()
app.exit(1)
sys.exit(1)
else:
MainRunner().run() if (len(sys.argv) > 1 and sys.argv[1] == '--build') == 0 else MainRunner().build()