web-automation-pytest-demo/Lib/Driver.py

1029 lines
42 KiB
Python
Raw Permalink Normal View History

2023-03-15 19:40:39 +08:00
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.action_chains import ActionChains
2023-05-07 13:39:25 +08:00
from selenium.webdriver.remote.webelement import WebElement
2023-03-15 19:40:39 +08:00
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
2023-04-18 19:19:01 +08:00
import subprocess
2023-03-15 19:40:39 +08:00
import tempfile
import platform
2023-04-18 19:19:01 +08:00
import requests
2023-05-06 15:31:34 +08:00
import requests.adapters
2023-04-18 19:19:01 +08:00
import zipfile
2023-05-06 15:31:34 +08:00
import tarfile
2023-04-18 19:19:01 +08:00
import base64
2023-05-06 15:31:34 +08:00
import random
2023-04-18 19:19:01 +08:00
import shutil
import glob
2023-03-15 19:40:39 +08:00
import json
import time
import sys
import os
2023-04-18 19:19:01 +08:00
import re
# You can set the drive path and browser location through the environment variable
seleniumBrowserChoose = os.environ.get('SELENIUM_BROWSER_CHOOSE') or 'Chrome'
seleniumClassesDriver = os.environ.get('SELENIUM_CLASSES_DRIVER') or '0'
seleniumBrowserDriver = os.environ.get('SELENIUM_BROWSER_DRIVER') or ''
seleniumBrowserBinary = os.environ.get('SELENIUM_BROWSER_BINARY') or ''
try:
seleniumBrowserChoose = ['Chrome', 'Firefox', 'Edge'].index(seleniumBrowserChoose.capitalize())
except Exception:
raise SystemError('Not supported browser "%s"' % seleniumBrowserChoose)
try:
"""
0: default,
1: selenium-wire,
2: selenium-wire with undetected driver(only chrome).
"""
seleniumClassesDriver = ['0', '1', '2'].index(seleniumClassesDriver)
except Exception:
raise SystemError('Not supported classes "%s"' % seleniumClassesDriver)
match seleniumBrowserChoose:
case 0:
match seleniumClassesDriver:
case 0:
from selenium.webdriver \
import Chrome as browser_webdriver
from selenium.webdriver.chrome.options \
import Options
from selenium.webdriver.chrome.service \
import Service
case 1:
from seleniumwire.webdriver \
import Chrome as browser_webdriver
from selenium.webdriver.chrome.options \
import Options
from selenium.webdriver.chrome.service \
import Service
case 2:
from seleniumwire.undetected_chromedriver \
import Chrome as browser_webdriver
from seleniumwire.undetected_chromedriver \
import ChromeOptions as Options
from selenium.webdriver.chrome.service \
import Service
case 1:
match seleniumClassesDriver:
case 0:
from selenium.webdriver \
import Firefox as browser_webdriver
from selenium.webdriver.firefox.options \
import Options
from selenium.webdriver.firefox.service \
import Service
case 1:
from seleniumwire.webdriver \
import Firefox as browser_webdriver
from selenium.webdriver.firefox.options \
import Options
from selenium.webdriver.firefox.service \
import Service
case 2:
raise SystemError('No support this classes')
case 2:
match seleniumClassesDriver:
case 0:
from selenium.webdriver \
import Edge as browser_webdriver
from selenium.webdriver.edge.options \
import Options
from selenium.webdriver.edge.service \
import Service
case 1:
from seleniumwire.webdriver \
import Edge as browser_webdriver
from selenium.webdriver.edge.options \
import Options
from selenium.webdriver.edge.service \
import Service
case 2:
raise SystemError('No support this classes')
2023-03-15 19:40:39 +08:00
class BrowserMobileEmulation(dict):
"""
Mobile emulation parameters.
"""
def __init__(self, w=540, h=960, user_agent=None):
2023-04-18 19:19:01 +08:00
du = base64.b64decode(bytes('''
TW96aWxsYS81LjAgKExpbnV4OyBVOyBBbmRyb2lkIDEzOyB6aC1jbjsgMjEwOTEx
OUJDIEJ1aWxkL1RLUTEuMjIwODI5LjAwMikgQXBwbGVXZWJLaXQvNTM3LjM2IChL
SFRNTCwgbGlrZSBHZWNrbykgVmVyc2lvbi80LjAgQ2hyb21lLzk4LjAuNDc1OC4x
MDIgTVFRQnJvd3Nlci8xMy42IE1vYmlsZSBTYWZhcmkvNTM3LjM2
''', encoding='utf-8')).decode()
2023-03-15 19:40:39 +08:00
user_agent = user_agent or du
super().__init__({'w': w, 'h': h, 'user_agent': user_agent})
self.w = self.h = self.user_agent = None
def __setattr__(self, key, value):
pass
def __getitem__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
def __getattr__(self, item):
try:
return super().__getitem__(item)
except KeyError:
return None
2023-05-06 15:31:34 +08:00
class CustomHTTPAdapter(requests.adapters.HTTPAdapter):
def __init__(self, *args, **kwargs):
from urllib.parse import urlparse
self.urlparse = urlparse
self.hosts = {}
self.addrs = {}
super().__init__(*args, **kwargs)
@staticmethod
def resolve_host(host):
try:
hosts = requests.get('http://119.29.29.29/d?dn=%s&ip=208.67.222.222' % host).text.replace(',', ';').split(';')
except (requests.exceptions.RequestException, requests.exceptions.ConnectTimeout):
hosts = []
return hosts[0] if len(hosts) > 0 else None
def send(self, request, **kwargs):
req = request
connection_pool_kwargs = self.poolmanager.connection_pool_kw
url_resolve = self.urlparse(req.url)
scheme = url_resolve.scheme
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
ip_address = self.resolve_host(domain)
if ip_address:
self.hosts[domain] = ip_address
self.addrs[ip_address] = domain
req.url = req.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.hosts[domain], addition_port))
if scheme == 'https':
connection_pool_kwargs['assert_hostname'] = domain
connection_pool_kwargs['server_hostname'] = domain
req.headers['Host'] = '%s%s' % (domain, addition_port)
return super().send(req, **kwargs)
def build_response(self, *args, **kwargs):
res = super().build_response(*args, **kwargs)
url_resolve = self.urlparse(res.url)
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
if domain in self.addrs.keys():
res.url = res.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.addrs[domain], addition_port))
return res
class BrowserPathManager:
def __init__(self, browser: int):
if browser not in (0, 1, 2): raise Exception('Not supported browser.')
self.browser = browser
self.webdriver_install_location = tempfile.gettempdir()
2023-04-18 19:19:01 +08:00
@staticmethod
2023-05-06 15:31:34 +08:00
def resolve_browser_version(file: str):
if not os.path.exists(file):
raise Exception('The executable file does not exist in %s' % file)
if file.lower().endswith('.exe'):
2023-04-18 19:19:01 +08:00
try:
full_version = subprocess.run(
2023-05-06 15:31:34 +08:00
['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % file],
2023-04-18 19:19:01 +08:00
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=5
).stdout.decode('utf-8').strip()
except Exception:
full_version = ''
try:
main_version = full_version.split('.')[0]
except Exception:
main_version = ''
else:
try:
full_version = subprocess.run(
2023-05-06 15:31:34 +08:00
'%s --version' % file,
2023-04-18 19:19:01 +08:00
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=5
).stdout.decode('utf-8').strip()
2023-05-06 15:31:34 +08:00
full_version = re.findall('[0-9]+[.\\d+]+', full_version)[-1]
2023-04-18 19:19:01 +08:00
except Exception:
full_version = ''
try:
main_version = full_version.split('.')[0]
except Exception:
main_version = ''
2023-05-06 15:31:34 +08:00
return file, main_version, full_version
2023-04-18 19:19:01 +08:00
@staticmethod
2023-05-06 15:31:34 +08:00
def open_remote_resources(url: str, save_file: str = None, auto_redirects=False, retries=3):
http = requests.Session()
for scheme in ['http://', 'https://']:
http.mount(scheme, CustomHTTPAdapter())
for i in range((retries if retries > 0 else 0) + 1):
try:
with http.get(
url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
},
allow_redirects=auto_redirects,
stream=(save_file is not None)
) as response:
if save_file:
if 200 != response.status_code:
return bool(0)
with open(save_file, 'wb') as filestream:
for chunk in response.iter_content(chunk_size=8192):
filestream.write(chunk)
return bool(1)
2023-04-18 19:19:01 +08:00
else:
2023-05-06 15:31:34 +08:00
if 200 != response.status_code:
return ''
else:
return response.text
except requests.exceptions.ConnectionError:
retries > 0 and time.sleep(0.75 + round(random.random(), 2))
continue
def find_binary(self):
2023-04-18 19:19:01 +08:00
plat = sys.platform
find_list = []
plats = ['win32', 'linux', 'darwin']
2023-05-06 15:31:34 +08:00
match self.browser:
case 0:
if plat == plats[0]:
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']:
find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/'))
if plat == plats[1]:
for p in ['/opt/google/chrome', '/usr/bin/google-chrome']:
find_list.append('%s/chrome' % p)
if plat == plats[2]:
for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']:
find_list.append('%s/chrome' % p)
case 1:
if plat == plats[0]:
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)']:
find_list.append('%s/Mozilla Firefox/firefox.exe' % os.environ.get(e, '').replace("\\", '/'))
if plat == plats[1]:
for p in ['/usr/bin']:
find_list.append('%s/firefox' % p)
if plat == plats[2]:
for p in ['/Applications/Firefox.app/Contents/MacOS']:
find_list.append('%s/firefox-bin' % p)
case 2:
if plat == plats[0]:
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)']:
find_list.append('%s/Microsoft/Edge/Application/msedge.exe' % os.environ.get(e, '').replace("\\", '/'))
if plat == plats[1]:
for p in ['/opt/microsoft/msedge']:
find_list.append('%s/msedge' % p)
if plat == plats[2]:
for p in ['/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge']:
find_list.append('%s/msedge' % p)
2023-04-18 19:19:01 +08:00
for execute_file in find_list:
try:
if os.path.exists(execute_file):
2023-05-06 15:31:34 +08:00
return execute_file
2023-04-18 19:19:01 +08:00
except Exception:
pass
2023-05-06 15:31:34 +08:00
def find_driver(self, main_version: str, full_version: str):
location = None
match self.browser:
case 0:
if not int(main_version) >= 70:
return None
location = '%s%s%s' % (
self.webdriver_install_location,
os.sep,
'chromedriver_%s%s' % (
str(main_version),
'.exe' if platform.system().lower() == 'windows' else ''
)
)
case 1:
location = '%s%s%s' % (
self.webdriver_install_location,
os.sep,
'%s%s' % (
'geckodriver',
'.exe' if platform.system().lower() == 'windows' else ''
)
)
case 2:
if not int(main_version) >= 79:
return None
location = '%s%s%s' % (
self.webdriver_install_location,
os.sep,
'msedgedriver_%s%s' % (
str(full_version),
'.exe' if platform.system().lower() == 'windows' else ''
)
)
2023-04-18 19:19:01 +08:00
return location.replace("\\", '/') if os.path.exists(location) else None
2023-05-06 15:31:34 +08:00
def pull_driver(self, main_version: str, full_version: str):
match self.browser:
case 0:
if not int(main_version) >= 70:
return None
chromedriver_site = 'https://chromedriver.storage.googleapis.com'
latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version))
if '' == latest_release:
return None
plat = sys.platform
match_assets = []
plats = ['win32', 'linux', 'darwin']
child = ['chromedriver.exe', 'chromedriver']
tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1']
if plat == plats[0]:
match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]])
if plat == plats[1]:
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]])
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]])
package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip')
distdir_chromedriver = self.webdriver_install_location
for assets in match_assets:
res_url = '%s/%s/%s' % (chromedriver_site, latest_release, assets[1])
print('Downloading version %s chromedriver %s to %s...' % (latest_release, res_url, distdir_chromedriver), file=sys.stderr)
if self.open_remote_resources(res_url, package_chromedriver):
dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], distdir_chromedriver)
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version))
os.path.exists(dist_chan) and os.remove(dist_chan)
os.rename(dist, dist_chan)
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
os.remove(package_chromedriver)
return dist_chan.replace("\\", '/')
case 1:
site = 'https://github.com/mozilla/geckodriver/releases'
geckodriver_version = '0.33.0'
plat = sys.platform
match_assets = []
plats = ['win32', 'linux', 'darwin']
child = ['geckodriver.exe', 'geckodriver']
tails = ['win32', 'linux64', 'macos', 'macos-aarch64']
compr = ['zip', 'tar.gz']
if plat == plats[0]:
match_assets.append([child[0], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[0], compr[0])])
if plat == plats[1]:
match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[1], compr[1])])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[2], compr[1])])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[3], compr[1])])
for assets in match_assets:
package_driver = '%s%s%s' % (tempfile.gettempdir(), os.sep, assets[1])
distdir_driver = self.webdriver_install_location
res_url = '%s/download/v%s/%s' % (site, geckodriver_version, assets[1])
print('Downloading geckodriver v%s %s to %s...' % (geckodriver_version, res_url, distdir_driver), file=sys.stderr)
if self.open_remote_resources(res_url, package_driver, auto_redirects=True):
compress = zipfile.ZipFile(package_driver) if package_driver.endswith('.%s' % compr[0]) else tarfile.open(package_driver, "r:gz")
dist = compress.extract(assets[0], distdir_driver) or '%s%s%s' % (distdir_driver, os.sep, assets[0])
compress.close()
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0])
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
os.remove(package_driver)
return dist_chan.replace("\\", '/')
case 2:
if not int(main_version) >= 79:
return None
msedgedriver_site = 'https://msedgedriver.azureedge.net'
latest_release = full_version
plat = sys.platform
match_assets = []
plats = ['win32', 'linux', 'darwin']
child = ['msedgedriver.exe', 'msedgedriver']
tails = ['win32', 'linux64', 'mac64', 'mac64_m1']
if plat == plats[0]:
match_assets.append([child[0], 'edgedriver_%s.zip' % tails[0]])
if plat == plats[1]:
match_assets.append([child[1], 'edgedriver_%s.zip' % tails[1]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
match_assets.append([child[1], 'edgedriver_%s.zip' % tails[2]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
match_assets.append([child[1], 'edgedriver_%s.zip' % tails[3]])
package_msedgedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'msedgedriver.zip')
distdir_msedgedriver = self.webdriver_install_location
for assets in match_assets:
res_url = '%s/%s/%s' % (msedgedriver_site, latest_release, assets[1])
print('Downloading version %s msedgedriver %s to %s...' % (latest_release, res_url, distdir_msedgedriver), file=sys.stderr)
if self.open_remote_resources(res_url, package_msedgedriver):
dist = zipfile.ZipFile(package_msedgedriver).extract(assets[0], distdir_msedgedriver)
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('msedgedriver', 'msedgedriver_%s' % full_version))
os.path.exists(dist_chan) and os.remove(dist_chan)
os.rename(dist, dist_chan)
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
os.remove(package_msedgedriver)
return dist_chan.replace("\\", '/')
def main(self, binary: str = None, driver: str = None):
binary = binary if binary else self.find_binary()
if not binary:
raise Exception('No browser executable file is found on your system, please confirm whether it has been installed')
if not os.path.exists(binary):
raise Exception('The executable file does not exist in %s' % binary)
version = self.resolve_browser_version(binary)
2023-04-18 19:19:01 +08:00
if not version:
2023-05-06 15:31:34 +08:00
raise Exception('Failure to get the browser version number failed in %s' % binary)
i_binary = binary
binary_main_version = version[1]
binary_full_version = version[2]
driver = driver if driver else self.find_driver(binary_main_version, binary_full_version)
driver = driver if driver else self.pull_driver(binary_main_version, binary_full_version)
if not driver:
raise Exception('Not specified the driver path, and try the automatic download failure')
if not os.path.exists(driver):
raise Exception('The driver does not exist in %s' % driver)
i_driver = driver
return i_binary, i_driver
2023-04-18 19:19:01 +08:00
class SeleniumClear:
def __init__(self):
self.last = '%s/.selenium_clear_last' % tempfile.gettempdir()
@staticmethod
def clear_selenium():
if platform.uname().system.lower() == 'windows':
user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')]
if user_home[0] and user_home[1]:
try:
shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1]))
except FileNotFoundError:
pass
@staticmethod
def clear_driver_cache():
for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']:
for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)):
try:
shutil.rmtree(i)
except (FileNotFoundError, PermissionError, WindowsError):
pass
@staticmethod
def file_get_contents(file, text=None):
if not os.path.exists(file):
return text
return open(file=file, mode='r', encoding='utf-8').read()
@staticmethod
def file_put_contents(file, text=None):
return open(file=file, mode='w', encoding='utf-8').write(text)
def straight_clear(self):
self.clear_selenium()
self.clear_driver_cache()
self.file_put_contents(self.last, str(int(time.time())))
def auto(self):
try:
int(self.file_get_contents(self.last, '0')) + 86400 < int(time.time()) and self.straight_clear()
except ValueError:
os.remove(self.last)
2023-03-15 19:40:39 +08:00
class PositionTab:
"""
Position for switch tab.
"""
Prev = 'Go-Prev'
Next = 'Go-Next'
2023-03-16 00:17:21 +08:00
class ColorUtils:
"""
Color utils.
"""
@staticmethod
def hex2rgb(color):
color = color[1:].upper()
for x in color:
if x not in '0123456789ABCDEF':
raise Exception('Found invalid hexa character {0}.'.format(x))
if len(color) == 6 or len(color) == 8:
color = '#' + color[0:6]
elif len(color) == 3:
color = '#' + color[0] * 2 + color[1] * 2 + color[2] * 2
else:
raise Exception('Hexa string should be 3, 6 or 8 digits. if 8 digits, last 2 are ignored.')
hexcolor = color[1:]
r, g, b = int(hexcolor[0:2], 16), int(hexcolor[2:4], 16), int(hexcolor[4:6], 16)
return r, g, b
2023-04-18 19:19:01 +08:00
class Browser(browser_webdriver):
2023-03-15 19:40:39 +08:00
"""
Browser web driver.
"""
def __init__(
self,
driver: str = None,
binary: str = None,
headless: bool = False,
lang: str = None,
mute: bool = False,
no_images: bool = False,
user_agent: str = None,
http_proxy: str = None,
home: str = None,
window_size: str = None,
mobile_emulation: BrowserMobileEmulation = None,
option_arguments: list = None,
req_interceptor=None,
res_interceptor=None,
):
2023-04-18 19:19:01 +08:00
self.is_linux = sys.platform.startswith('linux')
SeleniumClear().auto()
driver = seleniumBrowserDriver or driver
binary = seleniumBrowserBinary or binary
browser_choose = seleniumBrowserChoose
classes_driver = seleniumClassesDriver
2023-05-06 15:31:34 +08:00
binary, driver = BrowserPathManager(browser_choose).main(binary, driver)
2023-04-18 19:19:01 +08:00
if self.is_linux is bool(1) and not window_size: window_size = '1920x1080'
if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080'
2023-03-15 19:40:39 +08:00
# Initialization settings.
2023-04-18 19:19:01 +08:00
if (isinstance(option_arguments, list)) is bool(0): option_arguments = []
2023-03-15 19:40:39 +08:00
cdplist = []
service = Service()
options = Options()
self.cdplist = cdplist
# Delete prompt information of chrome being controlled.
2023-05-07 18:19:22 +08:00
exclude_switches = ['enable-logging', 'disable-translate']
if browser_choose != 2:
exclude_switches.append('enable-automation')
2023-04-18 19:19:01 +08:00
if classes_driver != 2:
2023-05-07 18:19:22 +08:00
hasattr(options, 'add_experimental_option') and options.add_experimental_option('excludeSwitches', exclude_switches)
2023-03-15 19:40:39 +08:00
# Mobile emulation parameter setting start.
if mobile_emulation:
if hasattr(options, 'add_experimental_option') is False:
raise Exception('Do not support mobile emulation currently.')
self.w_browser = mobile_emulation.w + 14
self.h_browser = mobile_emulation.h + 0
self.w_inner_window = mobile_emulation.w + 0
self.h_inner_window = mobile_emulation.h - 86
self.mobile_emulation_screen_w = mobile_emulation.w
self.mobile_emulation_screen_h = mobile_emulation.h
window_size = '%s,%s' % (self.w_browser, self.h_browser)
options.add_experimental_option(
'mobileEmulation', {
'deviceMetrics': {
'width': self.w_inner_window,
'height': self.h_inner_window,
'pixelRatio': 2.75,
'touch': True
},
'userAgent': mobile_emulation.user_agent
}
)
cdplist.append([
'Emulation.setUserAgentOverride', {
'userAgent': mobile_emulation.user_agent,
'userAgentMetadata': {
'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone',
'mobile': True,
'platformVersion': '',
'architecture': '',
'model': ''
}
}
])
else:
self.w_browser = 0
self.h_browser = 0
self.w_inner_window = 0
self.h_inner_window = 0
self.mobile_emulation_screen_w = 0
self.mobile_emulation_screen_h = 0
# Mobile emulation parameter setting end.
# Set browser and webdriver path.
if driver:
service.path = driver
if binary:
options.binary_location = binary
# Add webdriver option arguments.
for i in option_arguments:
options.add_argument(i)
# Set headless mode.
2023-04-18 19:19:01 +08:00
if self.is_linux or headless:
2023-05-07 17:00:55 +08:00
options.add_argument('--headless=new' if browser_choose != 1 else '--headless')
2023-03-15 19:40:39 +08:00
# Set no-sandbox mode.
2023-04-18 19:19:01 +08:00
if self.is_linux:
2023-03-15 19:40:39 +08:00
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
2023-05-07 17:00:55 +08:00
if self.is_linux and browser_choose != 1:
2023-03-15 19:40:39 +08:00
options.add_argument('--disable-gpu')
# Set language of browser, default is zh-CN.
if lang:
options.add_argument('--lang=%s' % (lang or 'zh-CN'))
hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN')
# Set mute.
if mute:
options.add_argument('--mute-audio=true')
hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr)
# Set no images mode.
if no_images:
options.add_argument('--blink-settings=imagesEnabled=false')
hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr)
# Set default user agent.
if user_agent:
options.add_argument('--user-agent=%s' % user_agent)
hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent)
# Set http proxy for browser.
if http_proxy:
options.add_argument('--proxy-server=http://%s' % http_proxy)
# Set browser window size before startup.
if window_size:
options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ','))
else:
options.add_argument('--start-maximized')
# Start the browser.
2023-04-18 19:19:01 +08:00
undetected_kwargs = {'driver_executable_path': driver, 'browser_executable_path': binary, 'version_main': 111} if classes_driver == 2 else {}
super().__init__(service=service, options=options, **undetected_kwargs)
2023-03-15 19:40:39 +08:00
# Selenium-Wire backend optimization start.
try:
self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.')
self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.')
self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.')
except AttributeError:
pass
# Selenium-Wire backend optimization end.
if mobile_emulation:
cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}])
cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}])
cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}])
# Set the request and response interceptor.
if req_interceptor:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.request_interceptor = req_interceptor
if res_interceptor:
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
self.response_interceptor = res_interceptor
# Sync set http proxy for Selenium-Wire backend.
if http_proxy:
self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy}
# Set browser window size after startup, by default, there will be full screen display window.
if window_size:
self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(','))
else:
self.maximize_window()
# Sets a sticky timeout to implicitly wait for an element to be found.
self.implicitly_wait(10)
# Set the amount of time to wait for a page load to complete.
self.set_page_load_timeout(25)
# Open the default page.
home and self.open(home)
@staticmethod
def wait(secs: int | float = 1):
"""
Will sleep waiting.
"""
number_int = int(secs)
number_float = secs - number_int
for i in range(number_int):
time.sleep(1)
else:
time.sleep(number_float)
2023-04-18 19:19:01 +08:00
def quit(self):
try:
super().quit()
except Exception:
pass
2023-03-15 19:40:39 +08:00
def open(self, url=None):
"""
Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer.
"""
self.update_cdp_command()
return self.get(url)
def turn(self, url=None):
"""
Simulation "window.location.href" jumps, the new page has Referrer.
"""
return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None)
2023-05-07 13:39:25 +08:00
def find(self, path, wait_for=False, timeout: float = 5.0, freq: float = 0.5, delay: float = 0.0) -> WebElement:
2023-03-15 19:40:39 +08:00
"""
Use XPath to find an element.
"""
2023-05-07 13:39:25 +08:00
element = self.webdriver_wait(timeout, freq).until(EC.presence_of_element_located((By.XPATH, path))) if wait_for else self.find_element(By.XPATH, path)
2023-05-06 15:31:34 +08:00
delay and self.wait(delay)
2023-05-07 13:39:25 +08:00
element = self.find_element(By.XPATH, path) if delay else element
self.element_appear(element, '#F8BE5F')
return element
2023-03-15 19:40:39 +08:00
2023-05-07 13:39:25 +08:00
def find_mult(self, path) -> list:
2023-03-15 19:40:39 +08:00
"""
Use XPath to find elements.
"""
2023-05-07 13:39:25 +08:00
element = self.find_elements(By.XPATH, path)
for this_element in element: self.element_appear(this_element, '#F8BE5F')
return element
def find_mult_random_choice(self, path) -> WebElement:
"""
Use XPath to find elements then random_choice one.
"""
element = self.find_elements(By.XPATH, path)
element = random.choice(element)
self.element_appear(element, '#F8BE5F')
return element
2023-03-15 19:40:39 +08:00
def find_element_by(self, sentence):
"""
Custom find element, pass into a tuple or list.
"""
2023-05-07 13:39:25 +08:00
element = self.find_element(*sentence)
self.element_appear(element, '#F8BE5F')
return element
2023-03-15 19:40:39 +08:00
def click(self, element):
"""
2023-05-07 13:39:25 +08:00
Click element.
2023-03-15 19:40:39 +08:00
"""
2023-05-07 13:39:25 +08:00
self.element_appear(element, '#FF0000')
element.click()
def click_simu(self, element):
"""
Click element for simulate.
"""
self.element_effect(element)
2023-03-15 19:40:39 +08:00
self.action_chains().reset_actions()
self.action_chains().click(element).perform()
self.wait(0.1)
def touch(self, x, y):
"""
2023-05-07 13:39:25 +08:00
Click on the coordinate.
2023-03-15 19:40:39 +08:00
"""
self.action_chains().reset_actions()
self.action_chains().move_by_offset(x, y).click().perform()
self.wait(0.1)
def input(self, element, content):
"""
Enter the content to the element.
"""
2023-05-07 13:39:25 +08:00
self.element_appear(element, '#00B6F1')
2023-03-15 19:40:39 +08:00
self.action_chains().reset_actions()
self.action_chains().send_keys_to_element(element, content).perform()
self.wait(0.1)
def mouse(self, element):
"""
Park the mouse here.
"""
2023-05-07 13:39:25 +08:00
self.element_appear(element, '#49DC07')
2023-03-15 19:40:39 +08:00
self.action_chains().reset_actions()
self.action_chains().move_to_element(element).perform()
def tab_create(self, url=None):
"""
Create a new tab and open the URL.
"""
self.switch_to.new_window('tab')
self.update_cdp_command()
url and self.open(url)
def tab_switch(self, tab: int | str):
"""
Switch the browser tab page
"""
handles = self.window_handles
lengths = len(handles)
current = handles.index(self.current_window_handle)
if isinstance(tab, int):
handle = tab
elif tab == PositionTab.Prev:
handle = (current - 1)
elif tab in PositionTab.Next:
handle = (current + 1) % lengths
else:
handle = None
self.switch_to.window(handles[handle])
self.wait(0.2)
self.update_cdp_command()
2023-04-18 19:19:01 +08:00
def tab_switch_prev(self):
self.tab_switch(PositionTab.Prev)
def tab_switch_next(self):
self.tab_switch(PositionTab.Next)
2023-03-15 19:40:39 +08:00
def tab_cancel(self):
"""
Close the current browser tab page.
"""
handles = self.window_handles
if len(handles):
current = handles.index(self.current_window_handle)
self.close()
current > 0 and self.switch_to.window(handles[current - 1])
self.wait(0.2)
def tab_cancel_all(self):
"""
Close all the browser tab page.
"""
handles = self.window_handles
for i in handles:
self.tab_cancel()
def frame_switch_to(self, element_of_frame):
"""
Switch frame to the specified frame element.
"""
self.switch_to.frame(element_of_frame)
self.wait(0.2)
def frame_switch_to_default(self):
"""
Switch to the default frame.
"""
self.switch_to.default_content()
self.wait(0.2)
def scroll(self):
"""
Scroll page.
:return:
"""
self.action_chains().reset_actions()
self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform()
self.wait(0.8)
def scroll_to(self, pos: int | str):
"""
Scroll to the specified location.
"""
if isinstance(pos, int) and pos > 0:
self.execute_script('window.scrollTo(0, arguments[0]);', pos)
elif pos == 0:
self.execute_script('window.scrollTo(0, 0);')
elif pos == 0 - 1:
self.execute_script('window.scrollTo(0, document.body.scrollHeight);')
else:
pass
self.wait(0.8)
def scroll_to_element(self, element):
"""
Scroll to the specified element location.
"""
self.action_chains().reset_actions()
self.action_chains().scroll_to_element(element).perform()
self.wait(0.8)
def element_force_display(self, element):
"""
Make hidden element visible and interactive.
"""
self.execute_script(
'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element
)
2023-05-07 13:39:25 +08:00
def element_appear(self, element=None, color='#ff0000', dura=2500):
2023-03-16 00:17:21 +08:00
"""
Make the element highlight.
"""
if not element:
return False
high = ColorUtils.hex2rgb(color)
r = high[0]
g = high[1]
b = high[2]
self.execute_script('''
let e=arguments[0];
try{
let o=[e.style.background||null,e.style.border||null];
e.style.border="1px solid %s";e.style.background="rgba(%s,%s,%s,0.2)";
if(!e.prominent){
e.prominent=true;
setTimeout(function(args){try{args[0].prominent=null;args[0].style.background=args[1][0];args[0].style.border=args[1][1]}catch(e){}},%s,[e,o]);
}
2023-05-07 13:39:25 +08:00
}catch(e){}
''' % (color, r, g, b, dura), element
)
def element_effect(self, element=None, x: int = 0, y: int = 0):
"""
Make a coordinate click effect.
"""
self.execute_script('''
let e=arguments[0];
let r;
let x;
let y;
if(e!==null){
r=e.getBoundingClientRect();
x=r.left+r.width/2+"px";
y=r.top+r.height/2+"px";
}
else{
x=arguments[1]+"px";
y=arguments[2]+"px";
}
let c=document.createElement("div");
c.style="width:%spx;height:%spx;border-radius:50%%;background-color:rgba(255,0,0,0.18);position:absolute;transform:translate(-50%%,-50%%);transition:opacity 0.5s;border:1px solid #ff3c3c;pointer-events:none";
c.style.zIndex=9999;
c.style.left=x;c.style.top=y;
document.body.appendChild(c);
setTimeout(function(){c.style.opacity=0;setTimeout(function(){document.body.removeChild(c)},999)},200);
let w=%s;
let h=%s;
let d=false;
let i=setInterval(function(){
if((w>%s||h>%s)||d){
d=true;
w-=2;
h-=2;
}
else{
w+=5;
h+=5;
}
c.style.width=w+"px";c.style.height=h+"px";
if((w<=12||h<=12)&&d){clearInterval(i)}
},20);
''' % (0, 0, 0, 0, 30, 30), element, x, y
2023-03-16 00:17:21 +08:00
)
2023-03-15 19:40:39 +08:00
def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None):
"""
Return WebDriverWait object.
"""
return WebDriverWait(
driver=self,
timeout=timeout,
poll_frequency=poll_frequency,
ignored_exceptions=ignored_exceptions
)
def current_alert(self):
"""
Return current alert object.
"""
return Alert(self)
def window_inner_size(self):
"""
Get the page window inner size.
"""
size = self.execute_script('return [window.innerWidth, window.innerHeight];')
return {'w': size[0] or 0, 'h': size[1] or 0}
def action_chains(self):
"""
Return ActionChains object.
"""
return ActionChains(self)
def screenshot(self) -> bytes:
"""
Screenshot as bytes.
"""
return self.get_screenshot_as_png()
def update_cdp_command(self) -> None:
for cmd in self.cdplist:
self.execute_cdp_cmd(*cmd)
class WebDriver(Browser):
"""
Get a browser driver object.
"""
def __init__(self):
super().__init__(lang='zh-CN')
if __name__ == '__main__':
# e.g. Test it can work normally.
2023-04-18 19:19:01 +08:00
this_driver = WebDriver()
this_driver.open('https://www.hao123.com/')
this_driver.wait()
this_driver.quit()