1025 lines
42 KiB
Python
1025 lines
42 KiB
Python
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.common.alert import Alert
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
from selenium.webdriver.remote.webelement import WebElement
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.wait import WebDriverWait
|
|
import subprocess
|
|
import tempfile
|
|
import platform
|
|
import requests
|
|
import requests.adapters
|
|
import zipfile
|
|
import tarfile
|
|
import base64
|
|
import random
|
|
import shutil
|
|
import glob
|
|
import json
|
|
import time
|
|
import sys
|
|
import os
|
|
import re
|
|
|
|
# You can set the drive path and browser location through the environment variable
|
|
seleniumBrowserChoose = os.environ.get('SELENIUM_BROWSER_CHOOSE') or 'Chrome'
|
|
seleniumClassesDriver = os.environ.get('SELENIUM_CLASSES_DRIVER') or '0'
|
|
seleniumBrowserDriver = os.environ.get('SELENIUM_BROWSER_DRIVER') or ''
|
|
seleniumBrowserBinary = os.environ.get('SELENIUM_BROWSER_BINARY') or ''
|
|
try:
|
|
seleniumBrowserChoose = ['Chrome', 'Firefox', 'Edge'].index(seleniumBrowserChoose.capitalize())
|
|
except Exception:
|
|
raise SystemError('Not supported browser "%s"' % seleniumBrowserChoose)
|
|
try:
|
|
"""
|
|
0: default,
|
|
1: selenium-wire,
|
|
2: selenium-wire with undetected driver(only chrome).
|
|
"""
|
|
seleniumClassesDriver = ['0', '1', '2'].index(seleniumClassesDriver)
|
|
except Exception:
|
|
raise SystemError('Not supported classes "%s"' % seleniumClassesDriver)
|
|
|
|
match seleniumBrowserChoose:
|
|
case 0:
|
|
match seleniumClassesDriver:
|
|
case 0:
|
|
from selenium.webdriver \
|
|
import Chrome as browser_webdriver
|
|
from selenium.webdriver.chrome.options \
|
|
import Options
|
|
from selenium.webdriver.chrome.service \
|
|
import Service
|
|
case 1:
|
|
from seleniumwire.webdriver \
|
|
import Chrome as browser_webdriver
|
|
from selenium.webdriver.chrome.options \
|
|
import Options
|
|
from selenium.webdriver.chrome.service \
|
|
import Service
|
|
case 2:
|
|
from seleniumwire.undetected_chromedriver \
|
|
import Chrome as browser_webdriver
|
|
from seleniumwire.undetected_chromedriver \
|
|
import ChromeOptions as Options
|
|
from selenium.webdriver.chrome.service \
|
|
import Service
|
|
case 1:
|
|
match seleniumClassesDriver:
|
|
case 0:
|
|
from selenium.webdriver \
|
|
import Firefox as browser_webdriver
|
|
from selenium.webdriver.firefox.options \
|
|
import Options
|
|
from selenium.webdriver.firefox.service \
|
|
import Service
|
|
case 1:
|
|
from seleniumwire.webdriver \
|
|
import Firefox as browser_webdriver
|
|
from selenium.webdriver.firefox.options \
|
|
import Options
|
|
from selenium.webdriver.firefox.service \
|
|
import Service
|
|
case 2:
|
|
raise SystemError('No support this classes')
|
|
case 2:
|
|
match seleniumClassesDriver:
|
|
case 0:
|
|
from selenium.webdriver \
|
|
import Edge as browser_webdriver
|
|
from selenium.webdriver.edge.options \
|
|
import Options
|
|
from selenium.webdriver.edge.service \
|
|
import Service
|
|
case 1:
|
|
from seleniumwire.webdriver \
|
|
import Edge as browser_webdriver
|
|
from selenium.webdriver.edge.options \
|
|
import Options
|
|
from selenium.webdriver.edge.service \
|
|
import Service
|
|
case 2:
|
|
raise SystemError('No support this classes')
|
|
|
|
|
|
class BrowserMobileEmulation(dict):
|
|
"""
|
|
Mobile emulation parameters.
|
|
"""
|
|
def __init__(self, w=540, h=960, user_agent=None):
|
|
du = base64.b64decode(bytes('''
|
|
TW96aWxsYS81LjAgKExpbnV4OyBVOyBBbmRyb2lkIDEzOyB6aC1jbjsgMjEwOTEx
|
|
OUJDIEJ1aWxkL1RLUTEuMjIwODI5LjAwMikgQXBwbGVXZWJLaXQvNTM3LjM2IChL
|
|
SFRNTCwgbGlrZSBHZWNrbykgVmVyc2lvbi80LjAgQ2hyb21lLzk4LjAuNDc1OC4x
|
|
MDIgTVFRQnJvd3Nlci8xMy42IE1vYmlsZSBTYWZhcmkvNTM3LjM2
|
|
''', encoding='utf-8')).decode()
|
|
user_agent = user_agent or du
|
|
super().__init__({'w': w, 'h': h, 'user_agent': user_agent})
|
|
self.w = self.h = self.user_agent = None
|
|
|
|
def __setattr__(self, key, value):
|
|
pass
|
|
|
|
def __getitem__(self, item):
|
|
try:
|
|
return super().__getitem__(item)
|
|
except KeyError:
|
|
return None
|
|
|
|
def __getattr__(self, item):
|
|
try:
|
|
return super().__getitem__(item)
|
|
except KeyError:
|
|
return None
|
|
|
|
|
|
class CustomHTTPAdapter(requests.adapters.HTTPAdapter):
|
|
def __init__(self, *args, **kwargs):
|
|
from urllib.parse import urlparse
|
|
self.urlparse = urlparse
|
|
self.hosts = {}
|
|
self.addrs = {}
|
|
super().__init__(*args, **kwargs)
|
|
|
|
@staticmethod
|
|
def resolve_host(host):
|
|
try:
|
|
hosts = requests.get('http://119.29.29.29/d?dn=%s&ip=208.67.222.222' % host).text.replace(',', ';').split(';')
|
|
except (requests.exceptions.RequestException, requests.exceptions.ConnectTimeout):
|
|
hosts = []
|
|
return hosts[0] if len(hosts) > 0 else None
|
|
|
|
def send(self, request, **kwargs):
|
|
req = request
|
|
connection_pool_kwargs = self.poolmanager.connection_pool_kw
|
|
url_resolve = self.urlparse(req.url)
|
|
scheme = url_resolve.scheme
|
|
domain = url_resolve.netloc.split(':')[0]
|
|
try:
|
|
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
|
|
except IndexError:
|
|
addition_port = ''
|
|
ip_address = self.resolve_host(domain)
|
|
if ip_address:
|
|
self.hosts[domain] = ip_address
|
|
self.addrs[ip_address] = domain
|
|
req.url = req.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.hosts[domain], addition_port))
|
|
if scheme == 'https':
|
|
connection_pool_kwargs['assert_hostname'] = domain
|
|
connection_pool_kwargs['server_hostname'] = domain
|
|
req.headers['Host'] = '%s%s' % (domain, addition_port)
|
|
return super().send(req, **kwargs)
|
|
|
|
def build_response(self, *args, **kwargs):
|
|
res = super().build_response(*args, **kwargs)
|
|
url_resolve = self.urlparse(res.url)
|
|
domain = url_resolve.netloc.split(':')[0]
|
|
try:
|
|
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
|
|
except IndexError:
|
|
addition_port = ''
|
|
if domain in self.addrs.keys():
|
|
res.url = res.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.addrs[domain], addition_port))
|
|
return res
|
|
|
|
|
|
class BrowserPathManager:
|
|
def __init__(self, browser: int):
|
|
if browser not in (0, 1, 2): raise Exception('Not supported browser.')
|
|
self.browser = browser
|
|
self.webdriver_install_location = tempfile.gettempdir()
|
|
|
|
@staticmethod
|
|
def resolve_browser_version(file: str):
|
|
if not os.path.exists(file):
|
|
raise Exception('The executable file does not exist in %s' % file)
|
|
if file.lower().endswith('.exe'):
|
|
try:
|
|
full_version = subprocess.run(
|
|
['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % file],
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=5
|
|
).stdout.decode('utf-8').strip()
|
|
except Exception:
|
|
full_version = ''
|
|
try:
|
|
main_version = full_version.split('.')[0]
|
|
except Exception:
|
|
main_version = ''
|
|
else:
|
|
try:
|
|
full_version = subprocess.run(
|
|
'%s --version' % file,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=5
|
|
).stdout.decode('utf-8').strip()
|
|
full_version = re.findall('[0-9]+[.\\d+]+', full_version)[-1]
|
|
except Exception:
|
|
full_version = ''
|
|
try:
|
|
main_version = full_version.split('.')[0]
|
|
except Exception:
|
|
main_version = ''
|
|
return file, main_version, full_version
|
|
|
|
@staticmethod
|
|
def open_remote_resources(url: str, save_file: str = None, auto_redirects=False, retries=3):
|
|
http = requests.Session()
|
|
for scheme in ['http://', 'https://']:
|
|
http.mount(scheme, CustomHTTPAdapter())
|
|
for i in range((retries if retries > 0 else 0) + 1):
|
|
try:
|
|
with http.get(
|
|
url,
|
|
headers={
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
|
|
},
|
|
allow_redirects=auto_redirects,
|
|
stream=(save_file is not None)
|
|
) as response:
|
|
if save_file:
|
|
if 200 != response.status_code:
|
|
return bool(0)
|
|
with open(save_file, 'wb') as filestream:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
filestream.write(chunk)
|
|
return bool(1)
|
|
else:
|
|
if 200 != response.status_code:
|
|
return ''
|
|
else:
|
|
return response.text
|
|
except requests.exceptions.ConnectionError:
|
|
retries > 0 and time.sleep(0.75 + round(random.random(), 2))
|
|
continue
|
|
|
|
def find_binary(self):
|
|
plat = sys.platform
|
|
find_list = []
|
|
plats = ['win32', 'linux', 'darwin']
|
|
match self.browser:
|
|
case 0:
|
|
if plat == plats[0]:
|
|
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']:
|
|
find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/'))
|
|
if plat == plats[1]:
|
|
for p in ['/opt/google/chrome', '/usr/bin/google-chrome']:
|
|
find_list.append('%s/chrome' % p)
|
|
if plat == plats[2]:
|
|
for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']:
|
|
find_list.append('%s/chrome' % p)
|
|
case 1:
|
|
if plat == plats[0]:
|
|
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)']:
|
|
find_list.append('%s/Mozilla Firefox/firefox.exe' % os.environ.get(e, '').replace("\\", '/'))
|
|
if plat == plats[1]:
|
|
for p in ['/usr/bin']:
|
|
find_list.append('%s/firefox' % p)
|
|
if plat == plats[2]:
|
|
for p in ['/Applications/Firefox.app/Contents/MacOS']:
|
|
find_list.append('%s/firefox-bin' % p)
|
|
case 2:
|
|
if plat == plats[0]:
|
|
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)']:
|
|
find_list.append('%s/Microsoft/Edge/Application/msedge.exe' % os.environ.get(e, '').replace("\\", '/'))
|
|
if plat == plats[1]:
|
|
for p in ['/opt/microsoft/msedge']:
|
|
find_list.append('%s/msedge' % p)
|
|
if plat == plats[2]:
|
|
for p in ['/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge']:
|
|
find_list.append('%s/msedge' % p)
|
|
|
|
for execute_file in find_list:
|
|
try:
|
|
if os.path.exists(execute_file):
|
|
return execute_file
|
|
except Exception:
|
|
pass
|
|
|
|
def find_driver(self, main_version: str, full_version: str):
|
|
location = None
|
|
match self.browser:
|
|
case 0:
|
|
if not int(main_version) >= 70:
|
|
return None
|
|
location = '%s%s%s' % (
|
|
self.webdriver_install_location,
|
|
os.sep,
|
|
'chromedriver_%s%s' % (
|
|
str(main_version),
|
|
'.exe' if platform.system().lower() == 'windows' else ''
|
|
)
|
|
)
|
|
case 1:
|
|
location = '%s%s%s' % (
|
|
self.webdriver_install_location,
|
|
os.sep,
|
|
'%s%s' % (
|
|
'geckodriver',
|
|
'.exe' if platform.system().lower() == 'windows' else ''
|
|
)
|
|
)
|
|
case 2:
|
|
if not int(main_version) >= 79:
|
|
return None
|
|
location = '%s%s%s' % (
|
|
self.webdriver_install_location,
|
|
os.sep,
|
|
'msedgedriver_%s%s' % (
|
|
str(full_version),
|
|
'.exe' if platform.system().lower() == 'windows' else ''
|
|
)
|
|
)
|
|
return location.replace("\\", '/') if os.path.exists(location) else None
|
|
|
|
def pull_driver(self, main_version: str, full_version: str):
|
|
match self.browser:
|
|
case 0:
|
|
if not int(main_version) >= 70:
|
|
return None
|
|
chromedriver_site = 'https://chromedriver.storage.googleapis.com'
|
|
latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version))
|
|
if '' == latest_release:
|
|
return None
|
|
plat = sys.platform
|
|
match_assets = []
|
|
plats = ['win32', 'linux', 'darwin']
|
|
child = ['chromedriver.exe', 'chromedriver']
|
|
tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1']
|
|
if plat == plats[0]:
|
|
match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]])
|
|
if plat == plats[1]:
|
|
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]])
|
|
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
|
|
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]])
|
|
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
|
|
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]])
|
|
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]])
|
|
package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip')
|
|
distdir_chromedriver = self.webdriver_install_location
|
|
for assets in match_assets:
|
|
res_url = '%s/%s/%s' % (chromedriver_site, latest_release, assets[1])
|
|
print('Downloading version %s chromedriver %s to %s...' % (latest_release, res_url, distdir_chromedriver), file=sys.stderr)
|
|
if self.open_remote_resources(res_url, package_chromedriver):
|
|
dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], distdir_chromedriver)
|
|
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version))
|
|
os.path.exists(dist_chan) and os.remove(dist_chan)
|
|
os.rename(dist, dist_chan)
|
|
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
|
|
os.remove(package_chromedriver)
|
|
return dist_chan.replace("\\", '/')
|
|
case 1:
|
|
site = 'https://github.com/mozilla/geckodriver/releases'
|
|
geckodriver_version = '0.33.0'
|
|
plat = sys.platform
|
|
match_assets = []
|
|
plats = ['win32', 'linux', 'darwin']
|
|
child = ['geckodriver.exe', 'geckodriver']
|
|
tails = ['win32', 'linux64', 'macos', 'macos-aarch64']
|
|
compr = ['zip', 'tar.gz']
|
|
if plat == plats[0]:
|
|
match_assets.append([child[0], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[0], compr[0])])
|
|
if plat == plats[1]:
|
|
match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[1], compr[1])])
|
|
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
|
|
match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[2], compr[1])])
|
|
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
|
|
match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[3], compr[1])])
|
|
for assets in match_assets:
|
|
package_driver = '%s%s%s' % (tempfile.gettempdir(), os.sep, assets[1])
|
|
distdir_driver = self.webdriver_install_location
|
|
res_url = '%s/download/v%s/%s' % (site, geckodriver_version, assets[1])
|
|
print('Downloading geckodriver v%s %s to %s...' % (geckodriver_version, res_url, distdir_driver), file=sys.stderr)
|
|
if self.open_remote_resources(res_url, package_driver, auto_redirects=True):
|
|
compress = zipfile.ZipFile(package_driver) if package_driver.endswith('.%s' % compr[0]) else tarfile.open(package_driver, "r:gz")
|
|
dist = compress.extract(assets[0], distdir_driver) or '%s%s%s' % (distdir_driver, os.sep, assets[0])
|
|
compress.close()
|
|
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0])
|
|
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
|
|
os.remove(package_driver)
|
|
return dist_chan.replace("\\", '/')
|
|
case 2:
|
|
if not int(main_version) >= 79:
|
|
return None
|
|
msedgedriver_site = 'https://msedgedriver.azureedge.net'
|
|
latest_release = full_version
|
|
plat = sys.platform
|
|
match_assets = []
|
|
plats = ['win32', 'linux', 'darwin']
|
|
child = ['msedgedriver.exe', 'msedgedriver']
|
|
tails = ['win32', 'linux64', 'mac64', 'mac64_m1']
|
|
if plat == plats[0]:
|
|
match_assets.append([child[0], 'edgedriver_%s.zip' % tails[0]])
|
|
if plat == plats[1]:
|
|
match_assets.append([child[1], 'edgedriver_%s.zip' % tails[1]])
|
|
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
|
|
match_assets.append([child[1], 'edgedriver_%s.zip' % tails[2]])
|
|
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
|
|
match_assets.append([child[1], 'edgedriver_%s.zip' % tails[3]])
|
|
package_msedgedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'msedgedriver.zip')
|
|
distdir_msedgedriver = self.webdriver_install_location
|
|
for assets in match_assets:
|
|
res_url = '%s/%s/%s' % (msedgedriver_site, latest_release, assets[1])
|
|
print('Downloading version %s msedgedriver %s to %s...' % (latest_release, res_url, distdir_msedgedriver), file=sys.stderr)
|
|
if self.open_remote_resources(res_url, package_msedgedriver):
|
|
dist = zipfile.ZipFile(package_msedgedriver).extract(assets[0], distdir_msedgedriver)
|
|
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('msedgedriver', 'msedgedriver_%s' % full_version))
|
|
os.path.exists(dist_chan) and os.remove(dist_chan)
|
|
os.rename(dist, dist_chan)
|
|
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
|
|
os.remove(package_msedgedriver)
|
|
return dist_chan.replace("\\", '/')
|
|
|
|
def main(self, binary: str = None, driver: str = None):
|
|
binary = binary if binary else self.find_binary()
|
|
if not binary:
|
|
raise Exception('No browser executable file is found on your system, please confirm whether it has been installed')
|
|
if not os.path.exists(binary):
|
|
raise Exception('The executable file does not exist in %s' % binary)
|
|
version = self.resolve_browser_version(binary)
|
|
if not version:
|
|
raise Exception('Failure to get the browser version number failed in %s' % binary)
|
|
i_binary = binary
|
|
binary_main_version = version[1]
|
|
binary_full_version = version[2]
|
|
driver = driver if driver else self.find_driver(binary_main_version, binary_full_version)
|
|
driver = driver if driver else self.pull_driver(binary_main_version, binary_full_version)
|
|
if not driver:
|
|
raise Exception('Not specified the driver path, and try the automatic download failure')
|
|
if not os.path.exists(driver):
|
|
raise Exception('The driver does not exist in %s' % driver)
|
|
i_driver = driver
|
|
return i_binary, i_driver
|
|
|
|
|
|
class SeleniumClear:
|
|
def __init__(self):
|
|
self.last = '%s/.selenium_clear_last' % tempfile.gettempdir()
|
|
|
|
@staticmethod
|
|
def clear_selenium():
|
|
if platform.uname().system.lower() == 'windows':
|
|
user_home = [os.environ.get('HOMEDRIVE'), os.environ.get('HOMEPATH')]
|
|
if user_home[0] and user_home[1]:
|
|
try:
|
|
shutil.rmtree('%s%s/.cache/selenium' % (user_home[0], user_home[1]))
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
@staticmethod
|
|
def clear_driver_cache():
|
|
for cache in ['scoped_dir*', 'chrome_BITS*', 'chrome_url_fetcher*']:
|
|
for i in glob.glob('%s/%s' % (tempfile.gettempdir(), cache)):
|
|
try:
|
|
shutil.rmtree(i)
|
|
except (FileNotFoundError, PermissionError, WindowsError):
|
|
pass
|
|
|
|
@staticmethod
|
|
def file_get_contents(file, text=None):
|
|
if not os.path.exists(file):
|
|
return text
|
|
return open(file=file, mode='r', encoding='utf-8').read()
|
|
|
|
@staticmethod
|
|
def file_put_contents(file, text=None):
|
|
return open(file=file, mode='w', encoding='utf-8').write(text)
|
|
|
|
def straight_clear(self):
|
|
self.clear_selenium()
|
|
self.clear_driver_cache()
|
|
self.file_put_contents(self.last, str(int(time.time())))
|
|
|
|
def auto(self):
|
|
try:
|
|
int(self.file_get_contents(self.last, '0')) + 86400 < int(time.time()) and self.straight_clear()
|
|
except ValueError:
|
|
os.remove(self.last)
|
|
|
|
|
|
class PositionTab:
|
|
"""
|
|
Position for switch tab.
|
|
"""
|
|
Prev = 'Go-Prev'
|
|
Next = 'Go-Next'
|
|
|
|
|
|
class ColorUtils:
|
|
"""
|
|
Color utils.
|
|
"""
|
|
@staticmethod
|
|
def hex2rgb(color):
|
|
color = color[1:].upper()
|
|
for x in color:
|
|
if x not in '0123456789ABCDEF':
|
|
raise Exception('Found invalid hexa character {0}.'.format(x))
|
|
if len(color) == 6 or len(color) == 8:
|
|
color = '#' + color[0:6]
|
|
elif len(color) == 3:
|
|
color = '#' + color[0] * 2 + color[1] * 2 + color[2] * 2
|
|
else:
|
|
raise Exception('Hexa string should be 3, 6 or 8 digits. if 8 digits, last 2 are ignored.')
|
|
hexcolor = color[1:]
|
|
r, g, b = int(hexcolor[0:2], 16), int(hexcolor[2:4], 16), int(hexcolor[4:6], 16)
|
|
return r, g, b
|
|
|
|
|
|
class Browser(browser_webdriver):
|
|
"""
|
|
Browser web driver.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
driver: str = None,
|
|
binary: str = None,
|
|
headless: bool = False,
|
|
lang: str = None,
|
|
mute: bool = False,
|
|
no_images: bool = False,
|
|
user_agent: str = None,
|
|
http_proxy: str = None,
|
|
home: str = None,
|
|
window_size: str = None,
|
|
mobile_emulation: BrowserMobileEmulation = None,
|
|
option_arguments: list = None,
|
|
req_interceptor=None,
|
|
res_interceptor=None,
|
|
):
|
|
self.is_linux = sys.platform.startswith('linux')
|
|
SeleniumClear().auto()
|
|
driver = seleniumBrowserDriver or driver
|
|
binary = seleniumBrowserBinary or binary
|
|
browser_choose = seleniumBrowserChoose
|
|
classes_driver = seleniumClassesDriver
|
|
binary, driver = BrowserPathManager(browser_choose).main(binary, driver)
|
|
if self.is_linux is bool(1) and not window_size: window_size = '1920x1080'
|
|
if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080'
|
|
# Initialization settings.
|
|
if (isinstance(option_arguments, list)) is bool(0): option_arguments = []
|
|
cdplist = []
|
|
service = Service()
|
|
options = Options()
|
|
self.cdplist = cdplist
|
|
# Delete prompt information of chrome being controlled.
|
|
if classes_driver != 2:
|
|
hasattr(options, 'add_experimental_option') and options.add_experimental_option("excludeSwitches", ['enable-automation', 'enable-logging'])
|
|
# Mobile emulation parameter setting start.
|
|
if mobile_emulation:
|
|
if hasattr(options, 'add_experimental_option') is False:
|
|
raise Exception('Do not support mobile emulation currently.')
|
|
self.w_browser = mobile_emulation.w + 14
|
|
self.h_browser = mobile_emulation.h + 0
|
|
self.w_inner_window = mobile_emulation.w + 0
|
|
self.h_inner_window = mobile_emulation.h - 86
|
|
self.mobile_emulation_screen_w = mobile_emulation.w
|
|
self.mobile_emulation_screen_h = mobile_emulation.h
|
|
window_size = '%s,%s' % (self.w_browser, self.h_browser)
|
|
options.add_experimental_option(
|
|
'mobileEmulation', {
|
|
'deviceMetrics': {
|
|
'width': self.w_inner_window,
|
|
'height': self.h_inner_window,
|
|
'pixelRatio': 2.75,
|
|
'touch': True
|
|
},
|
|
'userAgent': mobile_emulation.user_agent
|
|
}
|
|
)
|
|
cdplist.append([
|
|
'Emulation.setUserAgentOverride', {
|
|
'userAgent': mobile_emulation.user_agent,
|
|
'userAgentMetadata': {
|
|
'platform': 'Android' if mobile_emulation.user_agent.find('iPhone') == -1 else 'iPhone',
|
|
'mobile': True,
|
|
'platformVersion': '',
|
|
'architecture': '',
|
|
'model': ''
|
|
}
|
|
}
|
|
])
|
|
else:
|
|
self.w_browser = 0
|
|
self.h_browser = 0
|
|
self.w_inner_window = 0
|
|
self.h_inner_window = 0
|
|
self.mobile_emulation_screen_w = 0
|
|
self.mobile_emulation_screen_h = 0
|
|
# Mobile emulation parameter setting end.
|
|
# Set browser and webdriver path.
|
|
if driver:
|
|
service.path = driver
|
|
if binary:
|
|
options.binary_location = binary
|
|
# Add webdriver option arguments.
|
|
for i in option_arguments:
|
|
options.add_argument(i)
|
|
# Set headless mode.
|
|
if self.is_linux or headless:
|
|
options.add_argument('--headless=new')
|
|
# Set no-sandbox mode.
|
|
if self.is_linux:
|
|
options.add_argument('--no-sandbox')
|
|
options.add_argument('--disable-dev-shm-usage')
|
|
options.add_argument('--disable-gpu')
|
|
# Set language of browser, default is zh-CN.
|
|
if lang:
|
|
options.add_argument('--lang=%s' % (lang or 'zh-CN'))
|
|
hasattr(options, 'set_preference') and options.set_preference('intl.accept_languages', lang or 'zh-CN')
|
|
# Set mute.
|
|
if mute:
|
|
options.add_argument('--mute-audio=true')
|
|
hasattr(options, 'set_preference') and print('Warning: Do not support mute audio currently.', file=sys.stderr)
|
|
# Set no images mode.
|
|
if no_images:
|
|
options.add_argument('--blink-settings=imagesEnabled=false')
|
|
hasattr(options, 'set_preference') and print('Warning: Do not support disable images currently.', file=sys.stderr)
|
|
# Set default user agent.
|
|
if user_agent:
|
|
options.add_argument('--user-agent=%s' % user_agent)
|
|
hasattr(options, 'set_preference') and options.set_preference('general.useragent.override', user_agent)
|
|
# Set http proxy for browser.
|
|
if http_proxy:
|
|
options.add_argument('--proxy-server=http://%s' % http_proxy)
|
|
# Set browser window size before startup.
|
|
if window_size:
|
|
options.add_argument('--window-size=%s' % window_size.replace("\x20", '').replace('x', ','))
|
|
else:
|
|
options.add_argument('--start-maximized')
|
|
# Start the browser.
|
|
undetected_kwargs = {'driver_executable_path': driver, 'browser_executable_path': binary, 'version_main': 111} if classes_driver == 2 else {}
|
|
super().__init__(service=service, options=options, **undetected_kwargs)
|
|
# Selenium-Wire backend optimization start.
|
|
try:
|
|
self.backend.master.options.add_option('ssl_insecure', bool, True, 'Do not verify upstream server SSL/TLS certificates.')
|
|
self.backend.master.options.add_option('upstream_cert', bool, False, 'Connect to upstream server to look up certificate details.')
|
|
self.backend.master.options.add_option('http2', bool, False, 'Enable/disable HTTP/2 support.')
|
|
except AttributeError:
|
|
pass
|
|
# Selenium-Wire backend optimization end.
|
|
if mobile_emulation:
|
|
cdplist.append(['Emulation.setFocusEmulationEnabled', {'enabled': True}])
|
|
cdplist.append(['Emulation.setTouchEmulationEnabled', {'enabled': True, 'maxTouchPoints': 5}])
|
|
cdplist.append(['Emulation.setEmitTouchEventsForMouse', {'enabled': True, 'configuration': 'mobile'}])
|
|
# Set the request and response interceptor.
|
|
if req_interceptor:
|
|
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
|
|
self.request_interceptor = req_interceptor
|
|
if res_interceptor:
|
|
hasattr(self, 'backend') or print('Warning: Can not use the interceptor, because not extends Seleniun-Wire.', file=sys.stderr)
|
|
self.response_interceptor = res_interceptor
|
|
# Sync set http proxy for Selenium-Wire backend.
|
|
if http_proxy:
|
|
self.proxy = {'http': 'http://%s' % http_proxy, 'https': 'https://%s' % http_proxy}
|
|
# Set browser window size after startup, by default, there will be full screen display window.
|
|
if window_size:
|
|
self.set_window_size(*window_size.replace("\x20", '').replace('x', ',').split(','))
|
|
else:
|
|
self.maximize_window()
|
|
# Sets a sticky timeout to implicitly wait for an element to be found.
|
|
self.implicitly_wait(10)
|
|
# Set the amount of time to wait for a page load to complete.
|
|
self.set_page_load_timeout(25)
|
|
# Open the default page.
|
|
home and self.open(home)
|
|
|
|
@staticmethod
|
|
def wait(secs: int | float = 1):
|
|
"""
|
|
Will sleep waiting.
|
|
"""
|
|
number_int = int(secs)
|
|
number_float = secs - number_int
|
|
for i in range(number_int):
|
|
time.sleep(1)
|
|
else:
|
|
time.sleep(number_float)
|
|
|
|
def quit(self):
|
|
try:
|
|
super().quit()
|
|
except Exception:
|
|
pass
|
|
|
|
def open(self, url=None):
|
|
"""
|
|
Open the URL, simulate into the URL in the address bar and jump, the new page has no Referrer.
|
|
"""
|
|
self.update_cdp_command()
|
|
return self.get(url)
|
|
|
|
def turn(self, url=None):
|
|
"""
|
|
Simulation "window.location.href" jumps, the new page has Referrer.
|
|
"""
|
|
return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None)
|
|
|
|
def find(self, path, wait_for=False, timeout: float = 5.0, freq: float = 0.5, delay: float = 0.0) -> WebElement:
|
|
"""
|
|
Use XPath to find an element.
|
|
"""
|
|
element = self.webdriver_wait(timeout, freq).until(EC.presence_of_element_located((By.XPATH, path))) if wait_for else self.find_element(By.XPATH, path)
|
|
delay and self.wait(delay)
|
|
element = self.find_element(By.XPATH, path) if delay else element
|
|
self.element_appear(element, '#F8BE5F')
|
|
return element
|
|
|
|
def find_mult(self, path) -> list:
|
|
"""
|
|
Use XPath to find elements.
|
|
"""
|
|
element = self.find_elements(By.XPATH, path)
|
|
for this_element in element: self.element_appear(this_element, '#F8BE5F')
|
|
return element
|
|
|
|
def find_mult_random_choice(self, path) -> WebElement:
|
|
"""
|
|
Use XPath to find elements then random_choice one.
|
|
"""
|
|
element = self.find_elements(By.XPATH, path)
|
|
element = random.choice(element)
|
|
self.element_appear(element, '#F8BE5F')
|
|
return element
|
|
|
|
def find_element_by(self, sentence):
|
|
"""
|
|
Custom find element, pass into a tuple or list.
|
|
"""
|
|
element = self.find_element(*sentence)
|
|
self.element_appear(element, '#F8BE5F')
|
|
return element
|
|
|
|
def click(self, element):
|
|
"""
|
|
Click element.
|
|
"""
|
|
self.element_appear(element, '#FF0000')
|
|
element.click()
|
|
|
|
def click_simu(self, element):
|
|
"""
|
|
Click element for simulate.
|
|
"""
|
|
self.element_effect(element)
|
|
self.action_chains().reset_actions()
|
|
self.action_chains().click(element).perform()
|
|
self.wait(0.1)
|
|
|
|
def touch(self, x, y):
|
|
"""
|
|
Click on the coordinate.
|
|
"""
|
|
self.action_chains().reset_actions()
|
|
self.action_chains().move_by_offset(x, y).click().perform()
|
|
self.wait(0.1)
|
|
|
|
def input(self, element, content):
|
|
"""
|
|
Enter the content to the element.
|
|
"""
|
|
self.element_appear(element, '#00B6F1')
|
|
self.action_chains().reset_actions()
|
|
self.action_chains().send_keys_to_element(element, content).perform()
|
|
self.wait(0.1)
|
|
|
|
def mouse(self, element):
|
|
"""
|
|
Park the mouse here.
|
|
"""
|
|
self.element_appear(element, '#49DC07')
|
|
self.action_chains().reset_actions()
|
|
self.action_chains().move_to_element(element).perform()
|
|
|
|
def tab_create(self, url=None):
|
|
"""
|
|
Create a new tab and open the URL.
|
|
"""
|
|
self.switch_to.new_window('tab')
|
|
self.update_cdp_command()
|
|
url and self.open(url)
|
|
|
|
def tab_switch(self, tab: int | str):
|
|
"""
|
|
Switch the browser tab page
|
|
"""
|
|
handles = self.window_handles
|
|
lengths = len(handles)
|
|
current = handles.index(self.current_window_handle)
|
|
if isinstance(tab, int):
|
|
handle = tab
|
|
elif tab == PositionTab.Prev:
|
|
handle = (current - 1)
|
|
elif tab in PositionTab.Next:
|
|
handle = (current + 1) % lengths
|
|
else:
|
|
handle = None
|
|
self.switch_to.window(handles[handle])
|
|
self.wait(0.2)
|
|
self.update_cdp_command()
|
|
|
|
def tab_switch_prev(self):
|
|
self.tab_switch(PositionTab.Prev)
|
|
|
|
def tab_switch_next(self):
|
|
self.tab_switch(PositionTab.Next)
|
|
|
|
def tab_cancel(self):
|
|
"""
|
|
Close the current browser tab page.
|
|
"""
|
|
handles = self.window_handles
|
|
if len(handles):
|
|
current = handles.index(self.current_window_handle)
|
|
self.close()
|
|
current > 0 and self.switch_to.window(handles[current - 1])
|
|
self.wait(0.2)
|
|
|
|
def tab_cancel_all(self):
|
|
"""
|
|
Close all the browser tab page.
|
|
"""
|
|
handles = self.window_handles
|
|
for i in handles:
|
|
self.tab_cancel()
|
|
|
|
def frame_switch_to(self, element_of_frame):
|
|
"""
|
|
Switch frame to the specified frame element.
|
|
"""
|
|
self.switch_to.frame(element_of_frame)
|
|
self.wait(0.2)
|
|
|
|
def frame_switch_to_default(self):
|
|
"""
|
|
Switch to the default frame.
|
|
"""
|
|
self.switch_to.default_content()
|
|
self.wait(0.2)
|
|
|
|
def scroll(self):
|
|
"""
|
|
Scroll page.
|
|
:return:
|
|
"""
|
|
self.action_chains().reset_actions()
|
|
self.action_chains().scroll_by_amount(0, self.execute_script('return document.documentElement.clientHeight;')).perform()
|
|
self.wait(0.8)
|
|
|
|
def scroll_to(self, pos: int | str):
|
|
"""
|
|
Scroll to the specified location.
|
|
"""
|
|
if isinstance(pos, int) and pos > 0:
|
|
self.execute_script('window.scrollTo(0, arguments[0]);', pos)
|
|
elif pos == 0:
|
|
self.execute_script('window.scrollTo(0, 0);')
|
|
elif pos == 0 - 1:
|
|
self.execute_script('window.scrollTo(0, document.body.scrollHeight);')
|
|
else:
|
|
pass
|
|
self.wait(0.8)
|
|
|
|
def scroll_to_element(self, element):
|
|
"""
|
|
Scroll to the specified element location.
|
|
"""
|
|
self.action_chains().reset_actions()
|
|
self.action_chains().scroll_to_element(element).perform()
|
|
self.wait(0.8)
|
|
|
|
def element_force_display(self, element):
|
|
"""
|
|
Make hidden element visible and interactive.
|
|
"""
|
|
self.execute_script(
|
|
'let e=arguments[0];e.style.display="inline-block";e.style.visibility="visible";e.setAttribute("hidden","false");', element
|
|
)
|
|
|
|
def element_appear(self, element=None, color='#ff0000', dura=2500):
|
|
"""
|
|
Make the element highlight.
|
|
"""
|
|
if not element:
|
|
return False
|
|
high = ColorUtils.hex2rgb(color)
|
|
r = high[0]
|
|
g = high[1]
|
|
b = high[2]
|
|
self.execute_script('''
|
|
let e=arguments[0];
|
|
try{
|
|
let o=[e.style.background||null,e.style.border||null];
|
|
e.style.border="1px solid %s";e.style.background="rgba(%s,%s,%s,0.2)";
|
|
if(!e.prominent){
|
|
e.prominent=true;
|
|
setTimeout(function(args){try{args[0].prominent=null;args[0].style.background=args[1][0];args[0].style.border=args[1][1]}catch(e){}},%s,[e,o]);
|
|
}
|
|
}catch(e){}
|
|
''' % (color, r, g, b, dura), element
|
|
)
|
|
|
|
def element_effect(self, element=None, x: int = 0, y: int = 0):
|
|
"""
|
|
Make a coordinate click effect.
|
|
"""
|
|
self.execute_script('''
|
|
let e=arguments[0];
|
|
let r;
|
|
let x;
|
|
let y;
|
|
if(e!==null){
|
|
r=e.getBoundingClientRect();
|
|
x=r.left+r.width/2+"px";
|
|
y=r.top+r.height/2+"px";
|
|
}
|
|
else{
|
|
x=arguments[1]+"px";
|
|
y=arguments[2]+"px";
|
|
}
|
|
let c=document.createElement("div");
|
|
c.style="width:%spx;height:%spx;border-radius:50%%;background-color:rgba(255,0,0,0.18);position:absolute;transform:translate(-50%%,-50%%);transition:opacity 0.5s;border:1px solid #ff3c3c;pointer-events:none";
|
|
c.style.zIndex=9999;
|
|
c.style.left=x;c.style.top=y;
|
|
document.body.appendChild(c);
|
|
setTimeout(function(){c.style.opacity=0;setTimeout(function(){document.body.removeChild(c)},999)},200);
|
|
let w=%s;
|
|
let h=%s;
|
|
let d=false;
|
|
let i=setInterval(function(){
|
|
if((w>%s||h>%s)||d){
|
|
d=true;
|
|
w-=2;
|
|
h-=2;
|
|
}
|
|
else{
|
|
w+=5;
|
|
h+=5;
|
|
}
|
|
c.style.width=w+"px";c.style.height=h+"px";
|
|
if((w<=12||h<=12)&&d){clearInterval(i)}
|
|
},20);
|
|
''' % (0, 0, 0, 0, 30, 30), element, x, y
|
|
)
|
|
|
|
def webdriver_wait(self, timeout: float, poll_frequency: float = 0.5, ignored_exceptions=None):
|
|
"""
|
|
Return WebDriverWait object.
|
|
"""
|
|
return WebDriverWait(
|
|
driver=self,
|
|
timeout=timeout,
|
|
poll_frequency=poll_frequency,
|
|
ignored_exceptions=ignored_exceptions
|
|
)
|
|
|
|
def current_alert(self):
|
|
"""
|
|
Return current alert object.
|
|
"""
|
|
return Alert(self)
|
|
|
|
def window_inner_size(self):
|
|
"""
|
|
Get the page window inner size.
|
|
"""
|
|
size = self.execute_script('return [window.innerWidth, window.innerHeight];')
|
|
return {'w': size[0] or 0, 'h': size[1] or 0}
|
|
|
|
def action_chains(self):
|
|
"""
|
|
Return ActionChains object.
|
|
"""
|
|
return ActionChains(self)
|
|
|
|
def screenshot(self) -> bytes:
|
|
"""
|
|
Screenshot as bytes.
|
|
"""
|
|
return self.get_screenshot_as_png()
|
|
|
|
def update_cdp_command(self) -> None:
|
|
for cmd in self.cdplist:
|
|
self.execute_cdp_cmd(*cmd)
|
|
|
|
|
|
class WebDriver(Browser):
|
|
"""
|
|
Get a browser driver object.
|
|
"""
|
|
def __init__(self):
|
|
super().__init__(lang='zh-CN')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# e.g. Test it can work normally.
|
|
this_driver = WebDriver()
|
|
this_driver.open('https://www.hao123.com/')
|
|
this_driver.wait()
|
|
this_driver.quit()
|