diff --git a/Lib/Driver.py b/Lib/Driver.py index 7a05794..db2584f 100644 --- a/Lib/Driver.py +++ b/Lib/Driver.py @@ -8,8 +8,11 @@ import subprocess import tempfile import platform import requests +import requests.adapters import zipfile +import tarfile import base64 +import random import shutil import glob import json @@ -130,19 +133,70 @@ class BrowserMobileEmulation(dict): return None -class ChromePathManager: - def __init__(self): - self.chromedriver_install_location = tempfile.gettempdir() +class CustomHTTPAdapter(requests.adapters.HTTPAdapter): + def __init__(self, *args, **kwargs): + from urllib.parse import urlparse + self.urlparse = urlparse + self.hosts = {} + self.addrs = {} + super().__init__(*args, **kwargs) @staticmethod - def resolve_chrome_binary_version(file: str): - chrome = file - if not os.path.exists(chrome): - raise Exception('Chrome executable file does not exist in %s' % chrome) - if chrome.lower().endswith('.exe'): + def resolve_host(host): + try: + hosts = requests.get('http://119.29.29.29/d?dn=%s&ip=208.67.222.222' % host).text.replace(',', ';').split(';') + except (requests.exceptions.RequestException, requests.exceptions.ConnectTimeout): + hosts = [] + return hosts[0] if len(hosts) > 0 else None + + def send(self, request, **kwargs): + req = request + connection_pool_kwargs = self.poolmanager.connection_pool_kw + url_resolve = self.urlparse(req.url) + scheme = url_resolve.scheme + domain = url_resolve.netloc.split(':')[0] + try: + addition_port = ':%s' % url_resolve.netloc.split(':')[1] + except IndexError: + addition_port = '' + ip_address = self.resolve_host(domain) + if ip_address: + self.hosts[domain] = ip_address + self.addrs[ip_address] = domain + req.url = req.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.hosts[domain], addition_port)) + if scheme == 'https': + connection_pool_kwargs['assert_hostname'] = domain + connection_pool_kwargs['server_hostname'] = domain + req.headers['Host'] = '%s%s' % (domain, addition_port) + return super().send(req, **kwargs) + + def build_response(self, *args, **kwargs): + res = super().build_response(*args, **kwargs) + url_resolve = self.urlparse(res.url) + domain = url_resolve.netloc.split(':')[0] + try: + addition_port = ':%s' % url_resolve.netloc.split(':')[1] + except IndexError: + addition_port = '' + if domain in self.addrs.keys(): + res.url = res.url.replace('://%s%s/' % (domain, addition_port), '://%s%s/' % (self.addrs[domain], addition_port)) + return res + + +class BrowserPathManager: + def __init__(self, browser: int): + if browser not in (0, 1, 2): raise Exception('Not supported browser.') + self.browser = browser + self.webdriver_install_location = tempfile.gettempdir() + + @staticmethod + def resolve_browser_version(file: str): + if not os.path.exists(file): + raise Exception('The executable file does not exist in %s' % file) + if file.lower().endswith('.exe'): try: full_version = subprocess.run( - ['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % chrome], + ['powershell', '(Get-Item -Path "%s").VersionInfo.ProductVersion' % file], shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, @@ -157,131 +211,249 @@ class ChromePathManager: else: try: full_version = subprocess.run( - '%s --version' % chrome, + '%s --version' % file, shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=5 ).stdout.decode('utf-8').strip() - full_version = re.findall('[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+', full_version)[0] + full_version = re.findall('[0-9]+[.\\d+]+', full_version)[-1] except Exception: full_version = '' try: main_version = full_version.split('.')[0] except Exception: main_version = '' - return chrome, main_version, full_version + return file, main_version, full_version @staticmethod - def open_remote_resources(url: str, save_file: str = None): - try: - with requests.get(url, allow_redirects=False, stream=(save_file is not None)) as response: - if save_file: - if 200 != response.status_code: - return bool(0) - with open(save_file, 'wb') as filestream: - for chunk in response.iter_content(chunk_size=8192): - filestream.write(chunk) - return bool(1) - else: - if 200 != response.status_code: - return '' + def open_remote_resources(url: str, save_file: str = None, auto_redirects=False, retries=3): + http = requests.Session() + for scheme in ['http://', 'https://']: + http.mount(scheme, CustomHTTPAdapter()) + for i in range((retries if retries > 0 else 0) + 1): + try: + with http.get( + url, + headers={ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36' + }, + allow_redirects=auto_redirects, + stream=(save_file is not None) + ) as response: + if save_file: + if 200 != response.status_code: + return bool(0) + with open(save_file, 'wb') as filestream: + for chunk in response.iter_content(chunk_size=8192): + filestream.write(chunk) + return bool(1) else: - return response.text - except requests.exceptions.ConnectionError: - return None + if 200 != response.status_code: + return '' + else: + return response.text + except requests.exceptions.ConnectionError: + retries > 0 and time.sleep(0.75 + round(random.random(), 2)) + continue - def find_chrome(self): + def find_binary(self): plat = sys.platform find_list = [] - chrome = '' plats = ['win32', 'linux', 'darwin'] - if plat == plats[0]: - for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']: - find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/')) - if plat == plats[1]: - for p in ['/opt/google/chrome', '/usr/bin/google-chrome']: - find_list.append('%s/chrome' % p) - if plat == plats[2]: - for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']: - find_list.append('%s/chrome' % p) + match self.browser: + case 0: + if plat == plats[0]: + for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)', 'LOCALAPPDATA', 'PROGRAMW6432']: + find_list.append('%s/Google/Chrome/Application/chrome.exe' % os.environ.get(e, '').replace("\\", '/')) + if plat == plats[1]: + for p in ['/opt/google/chrome', '/usr/bin/google-chrome']: + find_list.append('%s/chrome' % p) + if plat == plats[2]: + for p in ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']: + find_list.append('%s/chrome' % p) + case 1: + if plat == plats[0]: + for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)']: + find_list.append('%s/Mozilla Firefox/firefox.exe' % os.environ.get(e, '').replace("\\", '/')) + if plat == plats[1]: + for p in ['/usr/bin']: + find_list.append('%s/firefox' % p) + if plat == plats[2]: + for p in ['/Applications/Firefox.app/Contents/MacOS']: + find_list.append('%s/firefox-bin' % p) + case 2: + if plat == plats[0]: + for e in ['PROGRAMFILES', 'PROGRAMFILES(X86)']: + find_list.append('%s/Microsoft/Edge/Application/msedge.exe' % os.environ.get(e, '').replace("\\", '/')) + if plat == plats[1]: + for p in ['/opt/microsoft/msedge']: + find_list.append('%s/msedge' % p) + if plat == plats[2]: + for p in ['/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge']: + find_list.append('%s/msedge' % p) + for execute_file in find_list: try: if os.path.exists(execute_file): - chrome = execute_file - break + return execute_file except Exception: pass - return chrome if self.resolve_chrome_binary_version(chrome) else None - def find_chromedriver(self, main_version: str | int): - if not int(main_version) >= 70: - return None - location = '%s%s%s' % ( - self.chromedriver_install_location, - os.sep, - 'chromedriver_%s%s' % ( - str(main_version), - '.exe' if platform.system().lower() == 'windows' else '' - ) - ) + def find_driver(self, main_version: str, full_version: str): + location = None + match self.browser: + case 0: + if not int(main_version) >= 70: + return None + location = '%s%s%s' % ( + self.webdriver_install_location, + os.sep, + 'chromedriver_%s%s' % ( + str(main_version), + '.exe' if platform.system().lower() == 'windows' else '' + ) + ) + case 1: + location = '%s%s%s' % ( + self.webdriver_install_location, + os.sep, + '%s%s' % ( + 'geckodriver', + '.exe' if platform.system().lower() == 'windows' else '' + ) + ) + case 2: + if not int(main_version) >= 79: + return None + location = '%s%s%s' % ( + self.webdriver_install_location, + os.sep, + 'msedgedriver_%s%s' % ( + str(full_version), + '.exe' if platform.system().lower() == 'windows' else '' + ) + ) return location.replace("\\", '/') if os.path.exists(location) else None - def pull_chromedriver(self, main_version: str | int): - if not int(main_version) >= 70: - return None - main_version = str(main_version) - chromedriver_site = 'https://chromedriver.storage.googleapis.com' - latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version)) - if '' == latest_release: - return None - plat = sys.platform - match_assets = [] - plats = ['win32', 'linux', 'darwin'] - arm64 = ['arm64'] - child = ['chromedriver.exe', 'chromedriver'] - tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1'] - if plat == plats[0]: - match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]]) - if plat == plats[1]: - match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]]) - if plat == plats[2] and (platform.machine() in arm64) is bool(0): - match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]]) - if plat == plats[2] and (platform.machine() in arm64) is bool(1): - match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]]) - match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]]) - package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip') - distdir_chromedriver = self.chromedriver_install_location - for assets in match_assets: - print('Downloading version %s chromedriver to %s...' % (latest_release, distdir_chromedriver), file=sys.stderr) - if self.open_remote_resources('%s/%s/%s' % (chromedriver_site, latest_release, assets[1]), package_chromedriver): - dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], distdir_chromedriver) - dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version)) - os.path.exists(dist_chan) and os.remove(dist_chan) - os.rename(dist, dist_chan) - assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777) - os.remove(package_chromedriver) - return dist_chan.replace("\\", '/') + def pull_driver(self, main_version: str, full_version: str): + match self.browser: + case 0: + if not int(main_version) >= 70: + return None + chromedriver_site = 'https://chromedriver.storage.googleapis.com' + latest_release = self.open_remote_resources('%s/LATEST_RELEASE_%s' % (chromedriver_site, main_version)) + if '' == latest_release: + return None + plat = sys.platform + match_assets = [] + plats = ['win32', 'linux', 'darwin'] + child = ['chromedriver.exe', 'chromedriver'] + tails = ['win32', 'linux64', 'mac64', 'mac_arm64', 'mac64_m1'] + if plat == plats[0]: + match_assets.append([child[0], 'chromedriver_%s.zip' % tails[0]]) + if plat == plats[1]: + match_assets.append([child[1], 'chromedriver_%s.zip' % tails[1]]) + if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0): + match_assets.append([child[1], 'chromedriver_%s.zip' % tails[2]]) + if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1): + match_assets.append([child[1], 'chromedriver_%s.zip' % tails[3]]) + match_assets.append([child[1], 'chromedriver_%s.zip' % tails[4]]) + package_chromedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'chromedriver.zip') + distdir_chromedriver = self.webdriver_install_location + for assets in match_assets: + res_url = '%s/%s/%s' % (chromedriver_site, latest_release, assets[1]) + print('Downloading version %s chromedriver %s to %s...' % (latest_release, res_url, distdir_chromedriver), file=sys.stderr) + if self.open_remote_resources(res_url, package_chromedriver): + dist = zipfile.ZipFile(package_chromedriver).extract(assets[0], distdir_chromedriver) + dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('chromedriver', 'chromedriver_%s' % main_version)) + os.path.exists(dist_chan) and os.remove(dist_chan) + os.rename(dist, dist_chan) + assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777) + os.remove(package_chromedriver) + return dist_chan.replace("\\", '/') + case 1: + site = 'https://github.com/mozilla/geckodriver/releases' + geckodriver_version = '0.33.0' + plat = sys.platform + match_assets = [] + plats = ['win32', 'linux', 'darwin'] + child = ['geckodriver.exe', 'geckodriver'] + tails = ['win32', 'linux64', 'macos', 'macos-aarch64'] + compr = ['zip', 'tar.gz'] + if plat == plats[0]: + match_assets.append([child[0], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[0], compr[0])]) + if plat == plats[1]: + match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[1], compr[1])]) + if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0): + match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[2], compr[1])]) + if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1): + match_assets.append([child[1], 'geckodriver-v%s-%s.%s' % (geckodriver_version, tails[3], compr[1])]) + for assets in match_assets: + package_driver = '%s%s%s' % (tempfile.gettempdir(), os.sep, assets[1]) + distdir_driver = self.webdriver_install_location + res_url = '%s/download/v%s/%s' % (site, geckodriver_version, assets[1]) + print('Downloading geckodriver v%s %s to %s...' % (geckodriver_version, res_url, distdir_driver), file=sys.stderr) + if self.open_remote_resources(res_url, package_driver, auto_redirects=True): + compress = zipfile.ZipFile(package_driver) if package_driver.endswith('.%s' % compr[0]) else tarfile.open(package_driver, "r:gz") + dist = compress.extract(assets[0], distdir_driver) or '%s%s%s' % (distdir_driver, os.sep, assets[0]) + compress.close() + dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0]) + assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777) + os.remove(package_driver) + return dist_chan.replace("\\", '/') + case 2: + if not int(main_version) >= 79: + return None + msedgedriver_site = 'https://msedgedriver.azureedge.net' + latest_release = full_version + plat = sys.platform + match_assets = [] + plats = ['win32', 'linux', 'darwin'] + child = ['msedgedriver.exe', 'msedgedriver'] + tails = ['win32', 'linux64', 'mac64', 'mac64_m1'] + if plat == plats[0]: + match_assets.append([child[0], 'edgedriver_%s.zip' % tails[0]]) + if plat == plats[1]: + match_assets.append([child[1], 'edgedriver_%s.zip' % tails[1]]) + if plat == plats[2] and (platform.machine().startswith('arm')) is bool(0): + match_assets.append([child[1], 'edgedriver_%s.zip' % tails[2]]) + if plat == plats[2] and (platform.machine().startswith('arm')) is bool(1): + match_assets.append([child[1], 'edgedriver_%s.zip' % tails[3]]) + package_msedgedriver = '%s%s%s' % (tempfile.gettempdir(), os.sep, 'msedgedriver.zip') + distdir_msedgedriver = self.webdriver_install_location + for assets in match_assets: + res_url = '%s/%s/%s' % (msedgedriver_site, latest_release, assets[1]) + print('Downloading version %s msedgedriver %s to %s...' % (latest_release, res_url, distdir_msedgedriver), file=sys.stderr) + if self.open_remote_resources(res_url, package_msedgedriver): + dist = zipfile.ZipFile(package_msedgedriver).extract(assets[0], distdir_msedgedriver) + dist_chan = '%s%s%s' % (os.path.dirname(dist), os.sep, assets[0].replace('msedgedriver', 'msedgedriver_%s' % full_version)) + os.path.exists(dist_chan) and os.remove(dist_chan) + os.rename(dist, dist_chan) + assets[0].lower().endswith('.exe') or os.chmod(dist_chan, 0o777) + os.remove(package_msedgedriver) + return dist_chan.replace("\\", '/') - def main(self, chrome: str = None, chromedriver: str = None): - chrome = chrome if chrome else self.find_chrome() - if not chrome: - raise Exception('No chrome executable file is found on your system, please confirm whether it has been installed') - if not os.path.exists(chrome): - raise Exception('Chrome executable file does not exist in %s' % chrome) - version = self.resolve_chrome_binary_version(chrome) + def main(self, binary: str = None, driver: str = None): + binary = binary if binary else self.find_binary() + if not binary: + raise Exception('No browser executable file is found on your system, please confirm whether it has been installed') + if not os.path.exists(binary): + raise Exception('The executable file does not exist in %s' % binary) + version = self.resolve_browser_version(binary) if not version: - raise Exception('Failure to get the local chrome version number failed in %s' % chrome) - i_chrome = chrome - chrome_main_version = version[1] - chromedriver = chromedriver if chromedriver else self.find_chromedriver(chrome_main_version) - chromedriver = chromedriver if chromedriver else self.pull_chromedriver(chrome_main_version) - if not chromedriver: - raise Exception('Not specified the chrome driver path, and try the automatic download failure') - if not os.path.exists(chromedriver): - raise Exception('Chrome driver does not exist in %s' % chromedriver) - i_chromedriver = chromedriver - return i_chrome, i_chromedriver + raise Exception('Failure to get the browser version number failed in %s' % binary) + i_binary = binary + binary_main_version = version[1] + binary_full_version = version[2] + driver = driver if driver else self.find_driver(binary_main_version, binary_full_version) + driver = driver if driver else self.pull_driver(binary_main_version, binary_full_version) + if not driver: + raise Exception('Not specified the driver path, and try the automatic download failure') + if not os.path.exists(driver): + raise Exception('The driver does not exist in %s' % driver) + i_driver = driver + return i_binary, i_driver class SeleniumClear: @@ -385,13 +557,7 @@ class Browser(browser_webdriver): binary = seleniumBrowserBinary or binary browser_choose = seleniumBrowserChoose classes_driver = seleniumClassesDriver - match browser_choose: - case 0: - binary, driver = ChromePathManager().main(chrome=binary, chromedriver=driver) - case _: - """ - Others browser. - """ + binary, driver = BrowserPathManager(browser_choose).main(binary, driver) if self.is_linux is bool(1) and not window_size: window_size = '1920x1080' if self.is_linux is bool(0) and headless and not window_size: window_size = '1920x1080' # Initialization settings. @@ -553,11 +719,13 @@ class Browser(browser_webdriver): """ return self.execute_script('window.location.href=%s;' % json.dumps(url, indent=None, ensure_ascii=True), None) - def find(self, path): + def find(self, path, wait_for=False, timeout: float = 5.0, freq: float = 0.5, delay: float = 0.0): """ Use XPath to find an element. """ - ele = self.find_element(By.XPATH, path) + ele = self.webdriver_wait(timeout, freq).until(EC.presence_of_element_located((By.XPATH, path))) if wait_for else self.find_element(By.XPATH, path) + delay and self.wait(delay) + ele = self.find_element(By.XPATH, path) if delay else ele self.element_prominent(ele, '#f8be5f') return ele