Optimizing selenium-wire

This commit is contained in:
zhaoyafan 2025-02-10 18:25:32 +08:00
parent 904708344d
commit 58866712bc
2 changed files with 79 additions and 52 deletions

View File

@ -21,6 +21,12 @@ import requests.adapters
import importlib.util
import threading
import subprocess
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from seleniumwire.webdriver import InspectRequestsMixin, DriverCommonMixin
from seleniumwire.request import Request as SeleniumWireRequest
from seleniumwire.request import Response as SeleniumWireResponse
from selenium.webdriver.common.by import By
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.action_chains import ActionChains
@ -141,29 +147,6 @@ else:
def flock(f, flags):
return fcntl.flock(_fd(f), flags) == 0
try:
"""
0: selenium(default),
1: selenium-wire,
2: selenium-wire with undetected driver(only chrome).
"""
seleniumClassesDriver = ['0', '1', '2'].index(os.environ.get('SELENIUM_CLASSES_DRIVER') or '0')
match seleniumClassesDriver:
case 0:
from selenium.webdriver import Chrome as browser_webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
case 1:
from seleniumwire.webdriver import Chrome as browser_webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
case 2:
from seleniumwire.undetected_chromedriver import Chrome as browser_webdriver
from seleniumwire.undetected_chromedriver import ChromeOptions as Options
from selenium.webdriver.chrome.service import Service
except ValueError:
raise SystemError('Not supported driver classes.')
def notification_send(app_id=None, title='', message='', reminder=None):
app_id = app_id() if callable(app_id) else app_id
@ -508,7 +491,7 @@ class FetchResult:
return '%s: %s\n%s: %s\n%s: %s' % ('STATUS', self.status, 'HEADER', self.header, 'RESULT', self.result)
class Browser(browser_webdriver):
class Browser(InspectRequestsMixin, DriverCommonMixin, Chrome):
"""
Browser web driver.
"""
@ -517,7 +500,9 @@ class Browser(browser_webdriver):
self,
driver: str = None,
binary: str = None,
driver_classes: int = 0,
debugger_address: str = None,
backends_address: str = None,
headless: bool = False,
lang: str = None,
mute: bool = False,
@ -534,7 +519,6 @@ class Browser(browser_webdriver):
):
self.is_linux = sys.platform.startswith('linux')
SeleniumClear().auto()
classes = seleniumClassesDriver
binary, driver = BrowserPathManager().main(binary, driver)
if self.is_linux is bool(1) and not window_size: window_size = '1920x1080'
if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080'
@ -545,7 +529,7 @@ class Browser(browser_webdriver):
options = Options()
self.cdplist = cdplist
# Delete prompt information of chrome being controlled.
exclude_switches = ['enable-automation', 'enable-logging', 'disable-translate']
exclude_switches = ['enable-automation', 'enable-logging', 'disable-translate', 'ignore-certificate-errors']
options.add_experimental_option('debuggerAddress', debugger_address) if debugger_address else options.add_experimental_option('excludeSwitches', exclude_switches)
# Mobile emulation parameter setting start.
if mobile_emulation and not debugger_address:
@ -627,28 +611,35 @@ class Browser(browser_webdriver):
debugger_address or options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ','))
else:
debugger_address or options.add_argument('--start-maximized')
# Start the backend and optimize it.
if (driver_classes == 1 and backends_address is not None) == 1:
for key, value in self._setup_backend({'addr': str(backends_address.split(':')[0]), 'port': int(backends_address.split(':')[1])}).items():
options.set_capability(key, value)
try:
self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.')
self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.')
self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.')
except AttributeError:
pass
# Start the browser.
undetected_kwargs = {'driver_executable_path': driver, 'browser_executable_path': binary, 'version_main': 111} if classes == 2 else {}
super().__init__(service=service, options=options, **undetected_kwargs)
# Selenium-Wire backend optimization start.
try:
self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.')
self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.')
self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.')
except AttributeError:
pass
# Selenium-Wire backend optimization end.
super().__init__(service=service, options=options)
if mobile_emulation:
cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}])
cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}])
cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}])
# Set the request and response interceptor.
if req_interceptor:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.request_interceptor = req_interceptor
try:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.request_interceptor = req_interceptor
except AttributeError:
pass
if res_interceptor:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.response_interceptor = res_interceptor
try:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.response_interceptor = res_interceptor
except AttributeError:
pass
# Sync set http proxy for Selenium-Wire backend.
if http_proxy:
self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy}
@ -1377,7 +1368,7 @@ class BrowserManagerUserRunning:
class BrowserManager:
def __init__(self, driver: str, binary: str, manager_data_file: str, browser_data_home: str, browser_init_home: str):
def __init__(self, driver: str, binary: str, manager_data_file: str, browser_data_home: str, browser_init_home: str, use_selenium_wire: int = 0):
if not os.path.exists(driver):
raise FileNotFoundError('The driver executable file does not exist.')
if not os.path.exists(binary):
@ -1387,8 +1378,10 @@ class BrowserManager:
self.manager_data_file = manager_data_file
self.browser_data_home = browser_data_home
self.browser_init_home = browser_init_home
self.use_selenium_wire = use_selenium_wire
self.threading_lock = threading.RLock()
self.debugging_port_range = range(60000, 60255)
self.debugging_port_range = range(60000, 60256)
self.mitmproxy_port_range = range(55000, 55512)
self.data_storage = BrowserManagerDataStorage(manager_data_file)
if (not self.data_storage) == 1:
self.data_storage['browser_user'] = {}
@ -1404,6 +1397,17 @@ class BrowserManager:
exist_ports = [value['remote_debugging_port'] for key, value in self.data_storage['browser_user'].items()]
return random.choice([p for p in self.debugging_port_range if p not in exist_ports])
def _generate_random_mitmproxy_port(self):
ports_range = self.mitmproxy_port_range
port = None
for p in random.sample([i for i in ports_range], len(ports_range)):
if (not self.is_port_in_use(p)) == 1:
port = p
break
if (port is None) == 1:
raise Exception('All ports in the specified range are in use.')
return port
def _update_user_running(self):
for user_id, running in self.user_running.items():
if (not running) == 1:
@ -1451,17 +1455,34 @@ class BrowserManager:
except KeyboardInterrupt:
pass
def run_browser(self, user_data_dir: str, remote_debugging_port: int):
@staticmethod
def req_interceptor(req: SeleniumWireRequest):
pass
@staticmethod
def res_interceptor(req: SeleniumWireRequest, res: SeleniumWireResponse):
try:
print('[%s] %s %s %s' % (str(req.method).ljust(7), 'HTTP', res.status_code, req.url), file=sys.stderr)
except Exception as e:
print(e, file=sys.stderr)
def run_browser(self, user_data_dir: str, remote_debugging_port: int, proxy_server: str = None):
options = [
self.binary,
'--disable-background-networking',
'--disable-desktop-notifications',
'--disable-component-update',
'--no-default-browser-check',
'--no-first-run',
'--user-data-dir=\"%s\"' % (user_data_dir,),
'--remote-debugging-port=%s' % (remote_debugging_port,)
'--hide-crash-restore-bubble'
]
return subprocess.Popen('\"%s\" %s' % (self.binary, ' '.join(options)), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
if user_data_dir:
options.append('--user-data-dir=%s' % (user_data_dir,))
if remote_debugging_port:
options.append('--remote-debugging-port=%s' % (remote_debugging_port,))
if proxy_server:
options.append('--proxy-server=%s' % (proxy_server,))
return subprocess.Popen(options, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
def load_plugins(self, plugins, is_external=0):
if (plugins is None) == 1:
@ -1532,10 +1553,10 @@ class BrowserManager:
return False
def is_port_in_use(self, port: int):
if (port in self.debugging_port_range) == 0:
if (port not in self.debugging_port_range and port not in self.mitmproxy_port_range) == 1:
return True
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.5)
s.settimeout(0.08)
try:
return s.connect_ex(('127.0.0.1', port)) == 0
except socket.timeout:
@ -1670,16 +1691,21 @@ class BrowserManager:
raise Exception('The instance may be running, please shutdown the instance first.')
user_data_dir = self._get_user_data_dir(user_id)
remote_debugging_port = self._get_user_remote_debugging_port(user_id)
chrome = self.run_browser(user_data_dir=user_data_dir, remote_debugging_port=remote_debugging_port)
running = BrowserManagerUserRunning(self.user_running[user_id])
running.active = 1
mitmproxy_port = self._generate_random_mitmproxy_port()
chrome = self.run_browser(user_data_dir=user_data_dir, remote_debugging_port=remote_debugging_port, proxy_server='127.0.0.1:%s' % (mitmproxy_port,) if self.use_selenium_wire else None)
running.set_chrome(chrome)
driver = Browser(
driver=self.driver,
binary=self.binary,
driver_classes=self.use_selenium_wire,
window_size='%s,%s' % (self.geometry_config['browser_window_w'], self.geometry_config['browser_window_h']),
window_site='%s,%s' % (self.geometry_config['browser_window_x'], self.geometry_config['browser_window_y']),
debugger_address='127.0.0.1:%s' % (remote_debugging_port,)
debugger_address='127.0.0.1:%s' % (remote_debugging_port,),
backends_address='127.0.0.1:%s' % (mitmproxy_port,),
req_interceptor=self.use_selenium_wire and self.req_interceptor,
res_interceptor=self.use_selenium_wire and self.res_interceptor
)
running.set_driver(driver)
finally:
@ -1887,7 +1913,8 @@ class WebServer:
binary=os.path.join(root, 'Chrome/chrome.exe'),
manager_data_file=os.path.join(data, 'manager.json'),
browser_data_home=os.path.join(data, 'users'),
browser_init_home=os.path.join(root, 'initialize')
browser_init_home=os.path.join(root, 'initialize'),
use_selenium_wire=0
)
self.browser_manager.load_plugins(default_plugins, is_external=0)
self.browser_manager.load_plugins_from_external_module()
@ -2026,7 +2053,7 @@ class WebServer:
os.path.exists(self.www) or os.makedirs(self.www)
os.path.exists(self.upload_dir) or os.makedirs(self.upload_dir)
self.upload_clear()
uvicorn.run(self.app, host=host, port=port)
uvicorn.run(self.app, host=host, port=port, log_level='warning')
class MainRunner:

Binary file not shown.