This commit is contained in:
zhaoyafan 2023-05-06 15:31:34 +08:00
parent b68faa3da5
commit abed800fb0
1 changed files with 283 additions and 115 deletions

View File

@ -8,8 +8,11 @@ import subprocess
import tempfile import tempfile
import platform import platform
import requests import requests
import requests.adapters
import zipfile import zipfile
import tarfile
import base64 import base64
import random
import shutil import shutil
import glob import glob
import json import json
@ -130,19 +133,70 @@ class BrowserMobileEmulation(dict):
return None return None
class ChromePathManager: class CustomHTTPAdapter(requests.adapters.HTTPAdapter):
def __init__(self): def __init__(self, *args, **kwargs):
self.chromedriver_install_location = tempfile.gettempdir() from urllib.parse import urlparse
self.urlparse = urlparse
self.hosts = {}
self.addrs = {}
super().__init__(*args, **kwargs)
@staticmethod @staticmethod
def resolve_chrome_binary_version(file: str): def resolve_host(host):
chrome = file try:
if not os.path.exists(chrome): hosts = requests.get('http://119.29.29.29/d?dn=%s&ip=208.67.222.222' % host).text.replace(',', ';').split(';')
raise Exception('Chrome executable file does not exist in %s' % chrome) except (requests.exceptions.RequestException, requests.exceptions.ConnectTimeout):
if chrome.lower().endswith('.exe'): hosts = []
return hosts[0] if len(hosts) > 0 else None
def send(self, request, **kwargs):
req = request
connection_pool_kwargs = self.poolmanager.connection_pool_kw
url_resolve = self.urlparse(req.url)
scheme = url_resolve.scheme
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
ip_address = self.resolve_host(domain)
if ip_address:
self.hosts[domain] = ip_address
self.addrs[ip_address] = domain
req.url = req.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.hosts[domain], addition_port))
if scheme == 'https':
connection_pool_kwargs['assert_hostname'] = domain
connection_pool_kwargs['server_hostname'] = domain
req.headers['Host'] = '%s%s' % (domain, addition_port)
return super().send(req, **kwargs)
def build_response(self, *args, **kwargs):
res = super().build_response(*args, **kwargs)
url_resolve = self.urlparse(res.url)
domain = url_resolve.netloc.split(':')[0]
try:
addition_port = ':%s' % url_resolve.netloc.split(':')[1]
except IndexError:
addition_port = ''
if domain in self.addrs.keys():
res.url = res.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.addrs[domain], addition_port))
return res
class BrowserPathManager:
def __init__(self, browser: int):
if browser not in (0, 1, 2): raise Exception('Not supported browser.')
self.browser = browser
self.webdriver_install_location = tempfile.gettempdir()
@staticmethod
def resolve_browser_version(file: str):
if not os.path.exists(file):
raise Exception('The executable file does not exist in %s' % file)
if file.lower().endswith('.exe'):
try: try:
full_version = subprocess.run( full_version = subprocess.run(
['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % chrome], ['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % file],
shell=True, shell=True,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
@ -157,131 +211,249 @@ class ChromePathManager:
else: else:
try: try:
full_version = subprocess.run( full_version = subprocess.run(
'%s --version' % chrome, '%s --version' % file,
shell=True, shell=True,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
timeout=5 timeout=5
).stdout.decode('utf-8').strip() ).stdout.decode('utf-8').strip()
full_version = re.findall('[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+', full_version)[0] full_version = re.findall('[0-9]+[.\\d+]+', full_version)[-1]
except Exception: except Exception:
full_version = '' full_version = ''
try: try:
main_version = full_version.split('.')[0] main_version = full_version.split('.')[0]
except Exception: except Exception:
main_version = '' main_version = ''
return chrome, main_version, full_version return file, main_version, full_version
@staticmethod @staticmethod
def open_remote_resources(url: str, save_file: str = None): def open_remote_resources(url: str, save_file: str = None, auto_redirects=False, retries=3):
try: http = requests.Session()
with requests.get(url, allow_redirects=False, stream=(save_file is not None)) as response: for scheme in ['http://', 'https://']:
if save_file: http.mount(scheme, CustomHTTPAdapter())
if 200 != response.status_code: for i in range((retries if retries > 0 else 0) + 1):
return bool(0) try:
with open(save_file, 'wb') as filestream: with http.get(
for chunk in response.iter_content(chunk_size=8192): url,
filestream.write(chunk) headers={
return bool(1) 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
else: },
if 200 != response.status_code: allow_redirects=auto_redirects,
return '' stream=(save_file is not None)
) as response:
if save_file:
if 200 != response.status_code:
return bool(0)
with open(save_file, 'wb') as filestream:
for chunk in response.iter_content(chunk_size=8192):
filestream.write(chunk)
return bool(1)
else: else:
return response.text if 200 != response.status_code:
except requests.exceptions.ConnectionError: return ''
return None else:
return response.text
except requests.exceptions.ConnectionError:
retries > 0 and time.sleep(0.75 + round(random.random(), 2))
continue
def find_chrome(self): def find_binary(self):
plat = sys.platform plat = sys.platform
find_list = [] find_list = []
chrome = ''
plats = ['win32', 'linux', 'darwin'] plats = ['win32', 'linux', 'darwin']
if plat == plats[0]: match self.browser:
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']: case 0:
find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/')) if plat == plats[0]:
if plat == plats[1]: for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']:
for p in ['/opt/google/chrome', '/usr/bin/google-chrome']: find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/'))
find_list.append('%s/chrome' % p) if plat == plats[1]:
if plat == plats[2]: for p in ['/opt/google/chrome', '/usr/bin/google-chrome']:
for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']: find_list.append('%s/chrome' % p)
find_list.append('%s/chrome' % p) if plat == plats[2]:
for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']:
find_list.append('%s/chrome' % p)
case 1:
if plat == plats[0]:
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)']:
find_list.append('%s/Mozilla Firefox/firefox.exe' % os.environ.get(e, '').replace("\\", '/'))
if plat == plats[1]:
for p in ['/usr/bin']:
find_list.append('%s/firefox' % p)
if plat == plats[2]:
for p in ['/Applications/Firefox.app/Contents/MacOS']:
find_list.append('%s/firefox-bin' % p)
case 2:
if plat == plats[0]:
for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)']:
find_list.append('%s/Microsoft/Edge/Application/msedge.exe' % os.environ.get(e, '').replace("\\", '/'))
if plat == plats[1]:
for p in ['/opt/microsoft/msedge']:
find_list.append('%s/msedge' % p)
if plat == plats[2]:
for p in ['/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge']:
find_list.append('%s/msedge' % p)
for execute_file in find_list: for execute_file in find_list:
try: try:
if os.path.exists(execute_file): if os.path.exists(execute_file):
chrome = execute_file return execute_file
break
except Exception: except Exception:
pass pass
return chrome if self.resolve_chrome_binary_version(chrome) else None
def find_chromedriver(self, main_version: str | int): def find_driver(self, main_version: str, full_version: str):
if not int(main_version) >= 70: location = None
return None match self.browser:
location = '%s%s%s' % ( case 0:
self.chromedriver_install_location, if not int(main_version) >= 70:
os.sep, return None
'chromedriver_%s%s' % ( location = '%s%s%s' % (
str(main_version), self.webdriver_install_location,
'.exe' if platform.system().lower() == 'windows' else '' os.sep,
) 'chromedriver_%s%s' % (
) str(main_version),
'.exe' if platform.system().lower() == 'windows' else ''
)
)
case 1:
location = '%s%s%s' % (
self.webdriver_install_location,
os.sep,
'%s%s' % (
'geckodriver',
'.exe' if platform.system().lower() == 'windows' else ''
)
)
case 2:
if not int(main_version) >= 79:
return None
location = '%s%s%s' % (
self.webdriver_install_location,
os.sep,
'msedgedriver_%s%s' % (
str(full_version),
'.exe' if platform.system().lower() == 'windows' else ''
)
)
return location.replace("\\", '/') if os.path.exists(location) else None return location.replace("\\", '/') if os.path.exists(location) else None
def pull_chromedriver(self, main_version: str | int): def pull_driver(self, main_version: str, full_version: str):
if not int(main_version) >= 70: match self.browser:
return None case 0:
main_version = str(main_version) if not int(main_version) >= 70:
chromedriver_site = 'https://chromedriver.storage.googleapis.com' return None
latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version)) chromedriver_site = 'https://chromedriver.storage.googleapis.com'
if '' == latest_release: latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version))
return None if '' == latest_release:
plat = sys.platform return None
match_assets = [] plat = sys.platform
plats = ['win32', 'linux', 'darwin'] match_assets = []
arm64 = ['arm64'] plats = ['win32', 'linux', 'darwin']
child = ['chromedriver.exe', 'chromedriver'] child = ['chromedriver.exe', 'chromedriver']
tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1'] tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1']
if plat == plats[0]: if plat == plats[0]:
match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]]) match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]])
if plat == plats[1]: if plat == plats[1]:
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]]) match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]])
if plat == plats[2] and (platform.machine() in arm64) is bool(0): if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]]) match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]])
if plat == plats[2] and (platform.machine() in arm64) is bool(1): if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]]) match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]])
match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]]) match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]])
package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip') package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip')
distdir_chromedriver = self.chromedriver_install_location distdir_chromedriver = self.webdriver_install_location
for assets in match_assets: for assets in match_assets:
print('Downloading version %s chromedriver to %s...' % (latest_release, distdir_chromedriver), file=sys.stderr) res_url = '%s/%s/%s' % (chromedriver_site, latest_release, assets[1])
if self.open_remote_resources('%s/%s/%s' % (chromedriver_site, latest_release, assets[1]), package_chromedriver): print('Downloading version %s chromedriver %s to %s...' % (latest_release, res_url, distdir_chromedriver), file=sys.stderr)
dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], distdir_chromedriver) if self.open_remote_resources(res_url, package_chromedriver):
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version)) dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], distdir_chromedriver)
os.path.exists(dist_chan) and os.remove(dist_chan) dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version))
os.rename(dist, dist_chan) os.path.exists(dist_chan) and os.remove(dist_chan)
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777) os.rename(dist, dist_chan)
os.remove(package_chromedriver) assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
return dist_chan.replace("\\", '/') os.remove(package_chromedriver)
return dist_chan.replace("\\", '/')
case 1:
site = 'https://github.com/mozilla/geckodriver/releases'
geckodriver_version = '0.33.0'
plat = sys.platform
match_assets = []
plats = ['win32', 'linux', 'darwin']
child = ['geckodriver.exe', 'geckodriver']
tails = ['win32', 'linux64', 'macos', 'macos-aarch64']
compr = ['zip', 'tar.gz']
if plat == plats[0]:
match_assets.append([child[0], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[0], compr[0])])
if plat == plats[1]:
match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[1], compr[1])])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[2], compr[1])])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[3], compr[1])])
for assets in match_assets:
package_driver = '%s%s%s' % (tempfile.gettempdir(), os.sep, assets[1])
distdir_driver = self.webdriver_install_location
res_url = '%s/download/v%s/%s' % (site, geckodriver_version, assets[1])
print('Downloading geckodriver v%s %s to %s...' % (geckodriver_version, res_url, distdir_driver), file=sys.stderr)
if self.open_remote_resources(res_url, package_driver, auto_redirects=True):
compress = zipfile.ZipFile(package_driver) if package_driver.endswith('.%s' % compr[0]) else tarfile.open(package_driver, "r:gz")
dist = compress.extract(assets[0], distdir_driver) or '%s%s%s' % (distdir_driver, os.sep, assets[0])
compress.close()
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0])
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
os.remove(package_driver)
return dist_chan.replace("\\", '/')
case 2:
if not int(main_version) >= 79:
return None
msedgedriver_site = 'https://msedgedriver.azureedge.net'
latest_release = full_version
plat = sys.platform
match_assets = []
plats = ['win32', 'linux', 'darwin']
child = ['msedgedriver.exe', 'msedgedriver']
tails = ['win32', 'linux64', 'mac64', 'mac64_m1']
if plat == plats[0]:
match_assets.append([child[0], 'edgedriver_%s.zip' % tails[0]])
if plat == plats[1]:
match_assets.append([child[1], 'edgedriver_%s.zip' % tails[1]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0):
match_assets.append([child[1], 'edgedriver_%s.zip' % tails[2]])
if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1):
match_assets.append([child[1], 'edgedriver_%s.zip' % tails[3]])
package_msedgedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'msedgedriver.zip')
distdir_msedgedriver = self.webdriver_install_location
for assets in match_assets:
res_url = '%s/%s/%s' % (msedgedriver_site, latest_release, assets[1])
print('Downloading version %s msedgedriver %s to %s...' % (latest_release, res_url, distdir_msedgedriver), file=sys.stderr)
if self.open_remote_resources(res_url, package_msedgedriver):
dist = zipfile.ZipFile(package_msedgedriver).extract(assets[0], distdir_msedgedriver)
dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('msedgedriver', 'msedgedriver_%s' % full_version))
os.path.exists(dist_chan) and os.remove(dist_chan)
os.rename(dist, dist_chan)
assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777)
os.remove(package_msedgedriver)
return dist_chan.replace("\\", '/')
def main(self, chrome: str = None, chromedriver: str = None): def main(self, binary: str = None, driver: str = None):
chrome = chrome if chrome else self.find_chrome() binary = binary if binary else self.find_binary()
if not chrome: if not binary:
raise Exception('No chrome executable file is found on your system, please confirm whether it has been installed') raise Exception('No browser executable file is found on your system, please confirm whether it has been installed')
if not os.path.exists(chrome): if not os.path.exists(binary):
raise Exception('Chrome executable file does not exist in %s' % chrome) raise Exception('The executable file does not exist in %s' % binary)
version = self.resolve_chrome_binary_version(chrome) version = self.resolve_browser_version(binary)
if not version: if not version:
raise Exception('Failure to get the local chrome version number failed in %s' % chrome) raise Exception('Failure to get the browser version number failed in %s' % binary)
i_chrome = chrome i_binary = binary
chrome_main_version = version[1] binary_main_version = version[1]
chromedriver = chromedriver if chromedriver else self.find_chromedriver(chrome_main_version) binary_full_version = version[2]
chromedriver = chromedriver if chromedriver else self.pull_chromedriver(chrome_main_version) driver = driver if driver else self.find_driver(binary_main_version, binary_full_version)
if not chromedriver: driver = driver if driver else self.pull_driver(binary_main_version, binary_full_version)
raise Exception('Not specified the chrome driver path, and try the automatic download failure') if not driver:
if not os.path.exists(chromedriver): raise Exception('Not specified the driver path, and try the automatic download failure')
raise Exception('Chrome driver does not exist in %s' % chromedriver) if not os.path.exists(driver):
i_chromedriver = chromedriver raise Exception('The driver does not exist in %s' % driver)
return i_chrome, i_chromedriver i_driver = driver
return i_binary, i_driver
class SeleniumClear: class SeleniumClear:
@ -385,13 +557,7 @@ class Browser(browser_webdriver):
binary = seleniumBrowserBinary or binary binary = seleniumBrowserBinary or binary
browser_choose = seleniumBrowserChoose browser_choose = seleniumBrowserChoose
classes_driver = seleniumClassesDriver classes_driver = seleniumClassesDriver
match browser_choose: binary, driver = BrowserPathManager(browser_choose).main(binary, driver)
case 0:
binary, driver = ChromePathManager().main(chrome=binary, chromedriver=driver)
case _:
"""
Others browser.
"""
if self.is_linux is bool(1) and not window_size: window_size = '1920x1080' if self.is_linux is bool(1) and not window_size: window_size = '1920x1080'
if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080' if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080'
# Initialization settings. # Initialization settings.
@ -553,11 +719,13 @@ class Browser(browser_webdriver):
""" """
return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None) return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None)
def find(self, path): def find(self, path, wait_for=False, timeout: float = 5.0, freq: float = 0.5, delay: float = 0.0):
""" """
Use XPath to find an element. Use XPath to find an element.
""" """
ele = self.find_element(By.XPATH, path) ele = self.webdriver_wait(timeout, freq).until(EC.presence_of_element_located((By.XPATH, path))) if wait_for else self.find_element(By.XPATH, path)
delay and self.wait(delay)
ele = self.find_element(By.XPATH, path) if delay else ele
self.element_prominent(ele, '#f8be5f') self.element_prominent(ele, '#f8be5f')
return ele return ele