diff --git a/main.py b/main.py deleted file mode 100644 index a35f453..0000000 --- a/main.py +++ /dev/null @@ -1,982 +0,0 @@ -import os -import time -import random -import logging -import pickle -import zipfile -import requests -import json -import subprocess -import sys -from datetime import datetime -from selenium import webdriver -from selenium.webdriver.common.by import By -from selenium.webdriver.common.keys import Keys -from selenium.webdriver.common.action_chains import ActionChains -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC -from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException -from webdriver_manager.microsoft import EdgeChromiumDriverManager -from selenium.webdriver.edge.service import Service -from selenium.webdriver.edge.options import Options -from bs4 import BeautifulSoup - -# 设置日志记录 -def setup_logging(): - log_dir = "logs" - if not os.path.exists(log_dir): - os.makedirs(log_dir) - - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - log_file = os.path.join(log_dir, f"bing_search_{timestamp}.log") - - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(log_file), - logging.StreamHandler() - ] - ) - return logging.getLogger(__name__), timestamp - -logger, timestamp = setup_logging() - -# 关键词列表(100个,风格偏日常搜索场景) -KEYWORDS = [ - # 购物相关 - "Best laptops 2025", "Smartphone deals", "Fashion trends women", "Online shopping discounts", - "Gaming console prices", "Home appliance reviews", "Sneaker brands", "Luxury watches", - "Budget headphones", "Furniture sales", "Electronics deals", "Black Friday 2025", - "Amazon best sellers", "Tech gadgets 2025", "Winter clothing trends", "Jewelry gift ideas", - - # 旅游与生活 - "Top travel destinations", "Cheap flights 2025", "Hotel booking tips", "Beach vacation ideas", - "City break Europe", "Adventure travel packages", "Cruise deals 2025", "Travel insurance comparison", - "Camping gear reviews", "Best hiking trails", "Family vacation spots", "Solo travel tips", - "Backpacking destinations", "Luxury resorts Asia", "Travel safety tips", "Road trip ideas", - - # 新闻与时事 - "Breaking news today", "World news updates", "US election 2025", "Global economy trends", - "Climate change solutions", "Political debates 2025", "International conflicts", "Tech industry updates", - "Stock market predictions", "Health policy news", "Space mission updates", "Energy crisis 2025", - - # 学术与教育 - "Online courses free", "Best coding bootcamps", "Study abroad programs", "Scholarship opportunities", - "Academic research tools", "Math learning apps", "History documentaries", "Science podcasts", - "University rankings 2025", "Career training programs", "Language learning tips", "STEM resources", - - # 健康与健身 - "Weight loss diets", "Home workout routines", "Mental health tips", "Meditation apps", - "Healthy meal plans", "Fitness equipment reviews", "Yoga for beginners", "Nutrition supplements", - "Running shoes reviews", "Stress management techniques", "Sleep improvement tips", "Vegan recipes easy", - - # 娱乐与文化 - "New movie releases", "TV show reviews 2025", "Music festivals 2025", "Book recommendations", - "Streaming service deals", "Celebrity news today", "Top video games 2025", "Art exhibitions", - "Theater shows 2025", "Pop music charts", "Comedy specials Netflix", "Cultural events near me", - - # 科技与创新 - "Smart home devices 2025", "Wearable tech reviews", "Electric car prices", "AI innovations", - "5G network updates", "Virtual reality headsets", "Drone technology", "Cybersecurity tips", - "Tech startups 2025", "Cloud storage comparison", "Programming tutorials", "Data privacy laws", - - # 其他日常搜索 - "Local weather forecast", "Event planning ideas", "DIY craft projects", "Pet adoption near me", - "Gardening for beginners", "Car maintenance tips", "Home renovation ideas", "Wedding planning guide", - "Photography gear reviews", "Best coffee machines", "Restaurant reviews near me", "Online grocery delivery", - "Real estate trends 2025", "Job search websites", "Personal finance apps", "Charity organizations" -] - -def get_zhihu_trending(): - """获取知乎热榜""" - try: - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', - 'Referer': 'https://www.zhihu.com/hot' - } - - url = 'https://www.zhihu.com/api/v3/feed/topstory/hot-lists/total?limit=50' - - response = requests.get(url, headers=headers, timeout=10) - if response.status_code == 200: - data = response.json() - trending_keywords = [] - - for item in data.get('data', []): - title = item.get('target', {}).get('title', '') or item.get('target', {}).get('question', {}).get('title', '') - if title and len(title) > 2 and len(title) < 30: - trending_keywords.append(title) - if len(trending_keywords) >= 20: # 获取20条 - break - - logger.info(f"成功获取知乎热榜 {len(trending_keywords)} 条") - return trending_keywords - else: - logger.warning(f"知乎热榜请求失败,状态码: {response.status_code}") - return [] - - except Exception as e: - logger.error(f"获取知乎热榜失败: {e}") - return [] - -def get_baidu_trending(): - """获取百度热榜""" - try: - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', - 'Referer': 'https://top.baidu.com/board?tab=realtime' - } - - url = 'https://top.baidu.com/api/board?platform=wise&tab=realtime' - - response = requests.get(url, headers=headers, timeout=10) - if response.status_code == 200: - data = response.json() - trending_keywords = [] - - for item in data.get('data', {}).get('cards', [{}])[0].get('content', []): - title = item.get('word', '') or item.get('query', '') or item.get('title', '') - if title and len(title) > 2 and len(title) < 30: - trending_keywords.append(title) - if len(trending_keywords) >= 20: # 获取20条 - break - - logger.info(f"成功获取百度热榜 {len(trending_keywords)} 条") - return trending_keywords - else: - logger.warning(f"百度热榜请求失败,状态码: {response.status_code}") - return [] - - except Exception as e: - logger.error(f"获取百度热榜失败: {e}") - return [] - -def get_bilibili_trending(): - """获取哔哩哔哩热榜""" - try: - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', - 'Referer': 'https://www.bilibili.com/v/popular/rank/all' - } - - url = 'https://api.bilibili.com/x/web-interface/popular?ps=20' - - response = requests.get(url, headers=headers, timeout=10) - if response.status_code == 200: - data = response.json() - trending_keywords = [] - - for item in data.get('data', {}).get('list', []): - title = item.get('title', '') - if title and len(title) > 2 and len(title) < 30: - trending_keywords.append(title) - if len(trending_keywords) >= 20: # 获取20条 - break - - logger.info(f"成功获取哔哩哔哩热榜 {len(trending_keywords)} 条") - return trending_keywords - else: - logger.warning(f"哔哩哔哩热榜请求失败,状态码: {response.status_code}") - return [] - - except Exception as e: - logger.error(f"获取哔哩哔哩热榜失败: {e}") - return [] - -def get_toutiao_trending(): - """获取今日头条热榜""" - try: - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', - 'Referer': 'https://www.toutiao.com/' - } - - url = 'https://www.toutiao.com/hot-event/hot-board/?origin=toutiao_pc' - - response = requests.get(url, headers=headers, timeout=10) - if response.status_code == 200: - data = response.json() - trending_keywords = [] - - for item in data.get('data', []): - title = item.get('Title', '') or item.get('title', '') or item.get('Query', '') - if title and len(title) > 2 and len(title) < 30: - trending_keywords.append(title) - if len(trending_keywords) >= 20: # 获取20条 - break - - logger.info(f"成功获取今日头条热榜 {len(trending_keywords)} 条") - return trending_keywords - else: - logger.warning(f"今日头条热榜请求失败,状态码: {response.status_code}") - return [] - - except Exception as e: - logger.error(f"获取今日头条热榜失败: {e}") - return [] - -def get_github_trending(): - """获取GitHub热榜""" - try: - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' - } - - # 获取多种编程语言的趋势 - languages = ['', 'python', 'javascript', 'java', 'go', 'rust'] - trending_keywords = [] - - for lang in languages: - url = f'https://api.github.com/search/repositories?q=language:{lang}&sort=stars&order=desc&per_page=10' - - response = requests.get(url, headers=headers, timeout=10) - if response.status_code == 200: - data = response.json() - - for item in data.get('items', []): - name = item.get('name', '') - description = item.get('description', '') - - if name and len(name) > 2 and len(name) < 30: - trending_keywords.append(f"github {name}") - - if description and len(description) > 5 and len(description) < 40: - # 从描述中提取关键词 - words = description.split() - for word in words: - if len(word) > 3 and len(word) < 15 and word.isalpha(): - trending_keywords.append(f"github {word}") - - if len(trending_keywords) >= 15: # 获取15条 - break - - time.sleep(1) # 避免请求过于频繁 - - logger.info(f"成功获取GitHub热榜 {len(trending_keywords)} 条") - return trending_keywords[:15] # 返回前15条 - - except Exception as e: - logger.error(f"获取GitHub热榜失败: {e}") - return [] - -def get_trending_keywords(): - """获取多个平台的热榜关键词""" - logger.info("开始获取多平台热搜关键词...") - - # 并行获取多个平台的热榜 - trending_functions = [ - get_zhihu_trending, - get_baidu_trending, - get_bilibili_trending, - get_toutiao_trending, - get_github_trending - ] - - all_keywords = [] - - for func in trending_functions: - try: - keywords = func() - if keywords: - all_keywords.extend(keywords) - logger.info(f"从 {func.__name__} 获取到 {len(keywords)} 个关键词") - except Exception as e: - logger.error(f"获取 {func.__name__} 热榜时出错: {e}") - - # 去重 - all_keywords = list(set(all_keywords)) - - # 确保至少有60条有效词条 - if len(all_keywords) < 60: - logger.warning(f"只获取到 {len(all_keywords)} 条热搜关键词,补充预设关键词") - # 补充一些常见关键词 - additional_keywords = [ - "科技创新", "数字化转型", "云计算", "大数据", "物联网", "5G应用", - "人工智能技术", "机器学习", "深度学习", "自动驾驶", "智能家居", "智慧城市", - "远程办公", "在线教育", "数字医疗", "电商平台", "社交媒体", "内容创作", - "短视频平台", "直播经济", "元宇宙概念", "NFT", "数字货币", "区块链技术", - "碳中和", "绿色发展", "可再生能源", "环境保护", "气候变化", "可持续发展", - "健康生活", "心理健康", "健身运动", "营养饮食", "疾病预防", "医疗保险", - "教育改革", "在线学习", "职业培训", "就业市场", "创业机会", "投资方向", - "房地产市场", "股市行情", "基金理财", "保险产品", "消费趋势", "零售行业", - "文化旅游", "户外运动", "本地生活", "餐饮美食", "时尚潮流", "美妆个护" - ] - all_keywords.extend(additional_keywords) - all_keywords = list(set(all_keywords)) # 再次去重 - - logger.info(f"成功获取 {len(all_keywords)} 条热搜关键词") - return all_keywords[:60] # 返回前60条 - -def find_edge_driver(): - """尝试找到本地已安装的Edge驱动""" - # 常见Edge驱动安装路径 - possible_paths = [ - # Windows默认安装路径 - "C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedgedriver.exe", - "C:\\Program Files\\Microsoft\\Edge\\Application\\msedgedriver.exe", - # 用户可能手动安装的路径 - os.path.expanduser("~\\AppData\\Local\\Microsoft\\Edge\\Application\\msedgedriver.exe"), - # 当前目录 - "msedgedriver.exe", - # macOS路径 - "/Applications/Microsoft Edge.app/Contents/MacOS/msedgedriver", - # Linux路径 - "/usr/bin/msedgedriver", - "/usr/local/bin/msedgedriver" - ] - - for path in possible_paths: - if os.path.exists(path): - logger.info(f"找到Edge驱动: {path}") - return path - - logger.error("未找到Edge驱动,请手动下载并放置在当前目录或系统PATH中") - return None - -def setup_desktop_driver(): - """设置并返回桌面版Edge WebDriver实例""" - options = Options() - options.add_argument("--disable-gpu") - options.add_argument("--no-sandbox") - options.add_argument("--disable-dev-shm-usage") - options.add_argument("--start-maximized") # 最大化窗口 - - # 高级反检测选项 - options.add_argument("--disable-blink-features=AutomationControlled") - options.add_experimental_option("excludeSwitches", ["enable-automation", "enable-logging"]) - options.add_experimental_option('useAutomationExtension', False) - - # 随机用户代理 - desktop_user_agents = [ - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36 Edg/96.0.1054.62", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.55", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36 Edg/98.0.1108.51", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36 Edg/99.0.1150.36", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36 Edg/100.0.1185.29" - ] - options.add_argument(f'--user-agent={random.choice(desktop_user_agents)}') - - # 随机化窗口大小 - width = random.randint(1200, 1920) - height = random.randint(800, 1080) - options.add_argument(f"--window-size={width},{height}") - - try: - # 尝试使用WebDriver Manager自动下载驱动 - try: - service = Service(EdgeChromiumDriverManager().install()) - driver = webdriver.Edge(service=service, options=options) - logger.info("使用WebDriver Manager成功初始化Edge驱动") - except Exception as e: - logger.warning(f"WebDriver Manager初始化失败: {e}") - logger.info("尝试使用本地Edge驱动...") - - # 查找本地Edge驱动 - driver_path = find_edge_driver() - if driver_path: - service = Service(executable_path=driver_path) - driver = webdriver.Edge(service=service, options=options) - logger.info("使用本地Edge驱动成功初始化") - else: - raise Exception("无法找到可用的Edge驱动") - - # 执行反检测脚本 - driver.execute_script(""" - Object.defineProperty(navigator, 'webdriver', { - get: () => undefined - }); - - // 覆盖chrome对象 - window.chrome = { - runtime: {}, - // 等等 - }; - - // 覆盖权限 - const originalQuery = window.navigator.permissions.query; - window.navigator.permissions.query = (parameters) => ( - parameters.name === 'notifications' ? - Promise.resolve({ state: Notification.permission }) : - originalQuery(parameters) - ); - - // 覆盖语言 - Object.defineProperty(navigator, 'languages', { - get: () => ['zh-CN', 'zh', 'en'] - }); - - // 覆盖plugins - Object.defineProperty(navigator, 'plugins', { - get: () => [1, 2, 3, 4, 5] - }); - """) - - return driver - except WebDriverException as e: - logger.error(f"桌面版WebDriver初始化失败: {e}") - raise - -def setup_mobile_driver(): - """设置并返回移动版Edge WebDriver实例""" - options = Options() - options.add_argument("--disable-gpu") - options.add_argument("--no-sandbox") - options.add_argument("--disable-dev-shm-usage") - options.add_experimental_option("excludeSwitches", ["enable-automation", "enable-logging"]) - options.add_experimental_option('useAutomationExtension', False) - - # 设置移动设备模拟 - mobile_devices = [ - {"deviceName": "iPhone X", "userAgent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Mobile/15E148 Safari/604.1 Edg/97.0.1072.55"}, - {"deviceName": "Galaxy S5", "userAgent": "Mozilla/5.0 (Linux; Android 5.0; SM-G900P Build/LRX21T) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Mobile Safari/537.36 Edg/97.0.1072.55"}, - {"deviceName": "Pixel 5", "userAgent": "Mozilla/5.0 (Linux; Android 11; Pixel 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.91 Mobile Safari/537.36 Edg/97.0.1072.55"} - ] - - device = random.choice(mobile_devices) - mobile_emulation = {"deviceMetrics": {"width": 375, "height": 812, "pixelRatio": 3.0}, "userAgent": device["userAgent"]} - options.add_experimental_option("mobileEmulation", mobile_emulation) - - try: - # 尝试使用WebDriver Manager自动下载驱动 - try: - service = Service(EdgeChromiumDriverManager().install()) - driver = webdriver.Edge(service=service, options=options) - logger.info("使用WebDriver Manager成功初始化移动版Edge驱动") - except Exception as e: - logger.warning(f"WebDriver Manager初始化失败: {e}") - logger.info("尝试使用本地Edge驱动...") - - # 查找本地Edge驱动 - driver_path = find_edge_driver() - if driver_path: - service = Service(executable_path=driver_path) - driver = webdriver.Edge(service=service, options=options) - logger.info("使用本地Edge驱动成功初始化移动版") - else: - raise Exception("无法找到可用的Edge驱动") - - # 执行反检测脚本 - driver.execute_script(""" - Object.defineProperty(navigator, 'webdriver', { - get: () => undefined - }); - """) - - return driver - except WebDriverException as e: - logger.error(f"移动版WebDriver初始化失败: {e}") - raise - -def save_cookies(driver, filename): - """保存cookies到文件""" - try: - cookies = driver.get_cookies() - with open(filename, 'wb') as f: - pickle.dump(cookies, f) - logger.info(f"Cookies已保存到 {filename}") - except Exception as e: - logger.error(f"保存cookies失败: {e}") - -def load_cookies(driver, filename): - """从文件加载cookies""" - try: - with open(filename, 'rb') as f: - cookies = pickle.load(f) - - for cookie in cookies: - try: - driver.add_cookie(cookie) - except Exception as e: - logger.warning(f"添加cookie失败: {e}") - continue - - logger.info(f"已从 {filename} 加载cookies") - return True - except FileNotFoundError: - logger.warning(f"Cookie文件 {filename} 不存在") - return False - except Exception as e: - logger.error(f"加载cookies失败: {e}") - return False - -def microsoft_login(driver): - """Microsoft账户登录""" - try: - logger.info("尝试Microsoft账户登录...") - - # 检查是否已有cookies - cookie_file = "microsoft_cookies.pkl" - if load_cookies(driver, cookie_file): - # 刷新页面应用cookies - driver.refresh() - time.sleep(3) - - # 检查是否已登录 - try: - driver.find_element(By.ID, "id_n") - logger.info("使用cookies登录成功") - return True - except: - logger.warning("cookies已失效,需要重新登录") - - # 手动登录流程 - driver.get("https://login.live.com") - time.sleep(3) - - # 等待用户手动登录 - logger.info("请在浏览器中手动登录Microsoft账户,完成后按回车键继续...") - input() - - # 保存cookies以备下次使用 - save_cookies(driver, cookie_file) - logger.info("Microsoft账户登录完成") - return True - - except Exception as e: - logger.error(f"Microsoft登录失败: {e}") - return False - -def human_like_delay(min_sec=1.0, max_sec=3.0): - """模拟人类操作之间的随机延迟""" - time.sleep(random.uniform(min_sec, max_sec)) - -def simulate_human_typing(element, text): - """模拟人类输入文本""" - for character in text: - element.send_keys(character) - time.sleep(random.uniform(0.05, 0.2)) # 随机输入间隔 - -def simulate_mouse_movement(driver): - """模拟鼠标移动""" - try: - actions = ActionChains(driver) - - # 随机移动鼠标 - for _ in range(random.randint(2, 5)): - x_offset = random.randint(-100, 100) - y_offset = random.randint(-100, 100) - actions.move_by_offset(x_offset, y_offset) - actions.perform() - time.sleep(random.uniform(0.1, 0.5)) - - # 随机点击空白处 - if random.random() < 0.3: # 30%概率点击 - body = driver.find_element(By.TAG_NAME, "body") - actions.move_to_element_with_offset(body, random.randint(10, 100), random.randint(10, 100)) - actions.click() - actions.perform() - - except Exception as e: - logger.warning(f"模拟鼠标移动失败: {e}") - -def simulate_human_scroll(driver, is_mobile=False): - """模拟人类的滚动行为""" - try: - # 获取页面高度 - page_height = driver.execute_script("return document.body.scrollHeight") - - if is_mobile: - # 移动端滚动参数 - scroll_times = random.randint(2, 5) # 移动端页面通常更长,需要更多滚动 - scroll_distance_range = (150, 300) # 移动端滚动距离较小 - else: - # 桌面端滚动参数 - scroll_times = random.randint(1, 4) - scroll_distance_range = (100, 500) - - current_position = 0 - - for _ in range(scroll_times): - # 随机滚动距离 - scroll_distance = random.randint(*scroll_distance_range) - # 30% 概率向上滚动 - if random.random() < 0.3: - scroll_distance = -scroll_distance - # 确保不超出页面范围 - if 0 <= current_position + scroll_distance < page_height: - driver.execute_script(f"window.scrollBy(0, {scroll_distance});") - current_position += scroll_distance - else: - # 滚动到顶部或底部 - driver.execute_script("window.scrollTo(0, arguments[0]);", - 0 if current_position + scroll_distance < 0 else page_height) - break - # 模拟人类阅读的随机停顿 - human_like_delay(0.5, 2.0) - except Exception as e: - logger.error(f"滚动时发生错误: {e}") - -def simulate_search_result_interaction(driver, is_mobile=False): - """模拟与搜索结果的交互(随机点击一个结果)""" - try: - # 等待搜索结果加载 - if is_mobile: - # 移动端可能使用不同的选择器 - results = WebDriverWait(driver, 10).until( - EC.presence_of_all_elements_located((By.CSS_SELECTOR, "h2 a, .b_algo h2 a, .b_title a")) - ) - else: - results = WebDriverWait(driver, 10).until( - EC.presence_of_all_elements_located((By.CSS_SELECTOR, "h2 a")) - ) - - if results: - # 随机选择一个结果点击(但不是每次都点击) - if random.random() < 0.7: # 70%的概率点击一个结果 - result_to_click = random.choice(results[:5]) # 只在前5个结果中选择 - logger.info(f"点击搜索结果: {result_to_click.text[:50]}...") - result_to_click.click() - human_like_delay(3, 8) # 在点击的页面上停留一段时间 - driver.back() # 返回搜索结果页 - # 等待返回后的页面加载 - WebDriverWait(driver, 10).until( - EC.presence_of_element_located((By.ID, "sb_form_q")) - ) - except (TimeoutException, NoSuchElementException) as e: - logger.warning(f"搜索结果交互失败: {e}") - -def take_screenshot(driver, filename): - """截取屏幕截图""" - try: - driver.save_screenshot(filename) - logger.info(f"截图已保存: {filename}") - except Exception as e: - logger.error(f"截图失败: {e}") - -def bing_search(driver, query, is_mobile=False): - """执行一次Bing搜索""" - try: - logger.info(f"正在{'移动端' if is_mobile else '电脑端'}搜索: {query}") - driver.get("https://www.bing.com") - - # 模拟鼠标移动 - simulate_mouse_movement(driver) - - # 等待搜索框加载 - if is_mobile: - # 移动端可能需要点击搜索图标才能显示搜索框 - try: - search_icon = WebDriverWait(driver, 5).until( - EC.element_to_be_clickable((By.CSS_SELECTOR, ".search.icon, .scopebar_icon")) - ) - search_icon.click() - human_like_delay(0.5, 1.5) - except TimeoutException: - logger.info("移动端搜索图标未找到,直接尝试搜索框") - - search_box = WebDriverWait(driver, 10).until( - EC.presence_of_element_located((By.ID, "sb_form_q")) - ) - - # 清空搜索框并输入查询 - search_box.clear() - human_like_delay(0.5, 1.5) - simulate_human_typing(search_box, query) - human_like_delay(0.5, 1.5) - - # 提交搜索 - search_box.send_keys(Keys.RETURN) - - # 等待搜索结果加载 - WebDriverWait(driver, 10).until( - EC.presence_of_element_located((By.CSS_SELECTOR, "h2 a, .b_algo, .b_title")) - ) - - # 模拟滚动行为 - simulate_human_scroll(driver, is_mobile) - - # 模拟与搜索结果的交互 - simulate_search_result_interaction(driver, is_mobile) - - # 额外延迟,模拟浏览时间 - human_like_delay(2, 6) - - return True - except Exception as e: - logger.error(f"搜索 '{query}' 时发生错误: {e}") - return False - -def run_desktop_searches(): - """运行桌面端搜索""" - driver = None - successful_searches = 0 - target_searches = 40 - - try: - driver = setup_desktop_driver() - logger.info("桌面端浏览器启动成功,开始执行搜索任务") - - # Microsoft登录 - microsoft_login(driver) - - # 获取热搜关键词 - trending_keywords = get_trending_keywords() - - # 合并关键词 - all_keywords = KEYWORDS + trending_keywords - random.shuffle(all_keywords) - - # 执行40次桌面端搜索 - for i in range(target_searches): - keyword = all_keywords[i % len(all_keywords)] # 循环使用关键词 - logger.info(f"执行第 {i + 1} 次桌面端搜索") - - if bing_search(driver, keyword, is_mobile=False): - successful_searches += 1 - - # 随机截图 - if random.random() < 0.2: # 20%的概率截图 - screenshot_dir = "screenshots" - if not os.path.exists(screenshot_dir): - os.makedirs(screenshot_dir) - screenshot_file = os.path.join(screenshot_dir, f"desktop_{i+1}_{datetime.now().strftime('%H%M%S')}.png") - take_screenshot(driver, screenshot_file) - else: - # 如果搜索失败,等待一段时间再继续 - human_like_delay(5, 10) - - # 每5次搜索后稍作休息 - if (i + 1) % 5 == 0: - logger.info(f"已完成 {i + 1} 次桌面端搜索,休息一下...") - human_like_delay(10, 20) - - logger.info(f"桌面端搜索任务完成! 成功执行了 {successful_searches}/{target_searches} 次搜索") - return successful_searches - - except Exception as e: - logger.error(f"桌面端搜索执行过程中发生错误: {e}") - return successful_searches - finally: - if driver: - # 保存cookies - save_cookies(driver, "microsoft_cookies.pkl") - driver.quit() - logger.info("桌面端浏览器已关闭") - -def run_mobile_searches(): - """运行移动端搜索""" - driver = None - successful_searches = 0 - target_searches = 30 - - try: - driver = setup_mobile_driver() - logger.info("移动端浏览器启动成功,开始执行搜索任务") - - # Microsoft登录 - microsoft_login(driver) - - # 获取热搜关键词 - trending_keywords = get_trending_keywords() - - # 合并关键词 - all_keywords = KEYWORDS + trending_keywords - random.shuffle(all_keywords) - - # 执行30次移动端搜索 - for i in range(target_searches): - keyword = all_keywords[i % len(all_keywords)] # 循环使用关键词 - logger.info(f"执行第 {i + 1} 次移动端搜索") - - if bing_search(driver, keyword, is_mobile=True): - successful_searches += 1 - - # 随机截图 - if random.random() < 0.2: # 20%的概率截图 - screenshot_dir = "screenshots" - if not os.path.exists(screenshot_dir): - os.makedirs(screenshot_dir) - screenshot_file = os.path.join(screenshot_dir, f"mobile_{i+1}_{datetime.now().strftime('%H%M%S')}.png") - take_screenshot(driver, screenshot_file) - else: - # 如果搜索失败,等待一段时间再继续 - human_like_delay(5, 10) - - # 每5次搜索后稍作休息 - if (i + 1) % 5 == 0: - logger.info(f"已完成 {i + 1} 次移动端搜索,休息一下...") - human_like_delay(10, 20) - - logger.info(f"移动端搜索任务完成! 成功执行了 {successful_searches}/{target_searches} 次搜索") - return successful_searches - - except Exception as e: - logger.error(f"移动端搜索执行过程中发生错误: {e}") - return successful_searches - finally: - if driver: - # 保存cookies - save_cookies(driver, "microsoft_cookies.pkl") - driver.quit() - logger.info("移动端浏览器已关闭") - -def create_debug_package(timestamp): - """创建调试包""" - debug_dir = "debug" - if not os.path.exists(debug_dir): - os.makedirs(debug_dir) - - zip_filename = os.path.join(debug_dir, f"debug_{timestamp}.zip") - - with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf: - # 添加日志文件 - log_files = [f for f in os.listdir("logs") if f.startswith(f"bing_search_{timestamp}")] - for log_file in log_files: - zipf.write(os.path.join("logs", log_file), log_file) - - # 添加cookies文件 - if os.path.exists("microsoft_cookies.pkl"): - zipf.write("microsoft_cookies.pkl", "microsoft_cookies.pkl") - - # 添加截图 - if os.path.exists("screenshots"): - for screenshot in os.listdir("screenshots"): - zipf.write(os.path.join("screenshots", screenshot), os.path.join("screenshots", screenshot)) - - logger.info(f"调试包已创建: {zip_filename}") - return zip_filename - -def cleanup(): - """清理临时文件""" - try: - # 删除cookies文件 - if os.path.exists("microsoft_cookies.pkl"): - os.remove("microsoft_cookies.pkl") - - # 删除截图目录 - if os.path.exists("screenshots"): - import shutil - shutil.rmtree("screenshots") - - logger.info("临时文件清理完成") - except Exception as e: - logger.error(f"清理临时文件失败: {e}") - -def download_edge_driver(): - """手动下载Edge驱动""" - logger.info("尝试手动下载Edge驱动...") - - # 获取Edge浏览器版本 - try: - if sys.platform == "win32": - # Windows系统 - import winreg - try: - key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Edge\BLBeacon") - version, _ = winreg.QueryValueEx(key, "version") - logger.info(f"检测到Edge浏览器版本: {version}") - except: - try: - key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"Software\Microsoft\Edge\BLBeacon") - version, _ = winreg.QueryValueEx(key, "version") - logger.info(f"检测到Edge浏览器版本: {version}") - except: - logger.warning("无法获取Edge浏览器版本,使用默认版本") - version = "120.0.2210.91" # 默认版本 - else: - # macOS或Linux - try: - result = subprocess.run(["microsoft-edge", "--version"], capture_output=True, text=True) - version = result.stdout.strip().split()[-1] - logger.info(f"检测到Edge浏览器版本: {version}") - except: - logger.warning("无法获取Edge浏览器版本,使用默认版本") - version = "120.0.2210.91" # 默认版本 - - # 构建下载URL - major_version = version.split('.')[0] - base_url = f"https://msedgedriver.azureedge.net/{version}/edgedriver_" - - if sys.platform == "win32": - download_url = base_url + "win64.zip" - driver_name = "msedgedriver.exe" - elif sys.platform == "darwin": - if "arm" in os.uname().machine: - download_url = base_url + "mac64_m1.zip" - else: - download_url = base_url + "mac64.zip" - driver_name = "msedgedriver" - else: - download_url = base_url + "linux64.zip" - driver_name = "msedgedriver" - - logger.info(f"下载URL: {download_url}") - - # 下载驱动 - response = requests.get(download_url, stream=True) - if response.status_code == 200: - zip_path = "edgedriver.zip" - with open(zip_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - - # 解压 - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - zip_ref.extractall(".") - - # 设置执行权限 (非Windows系统) - if sys.platform != "win32": - os.chmod(driver_name, 0o755) - - # 清理 - os.remove(zip_path) - - logger.info("Edge驱动下载并解压成功") - return driver_name - else: - logger.error(f"下载失败,状态码: {response.status_code}") - return None - - except Exception as e: - logger.error(f"下载Edge驱动失败: {e}") - return None - -def main(): - total_successful = 0 - - # 检查Edge驱动是否存在 - if not find_edge_driver(): - logger.warning("未找到Edge驱动,尝试下载...") - if not download_edge_driver(): - logger.error("无法下载Edge驱动,请手动下载并放置在当前目录") - return - - try: - # 运行桌面端搜索 - desktop_success = run_desktop_searches() - total_successful += desktop_success - - # 在桌面端和移动端搜索之间添加较长延迟 - logger.info("桌面端搜索完成,等待一段时间后开始移动端搜索...") - time.sleep(random.uniform(30, 60)) - - # 运行移动端搜索 - mobile_success = run_mobile_searches() - total_successful += mobile_success - - logger.info(f"所有搜索任务完成! 总共成功执行了 {total_successful}/70 次搜索") - - # 创建调试包 - debug_zip = create_debug_package(timestamp) - - # 清理临时文件 - cleanup() - - logger.info(f"程序执行完毕,调试包保存在: {debug_zip}") - - except Exception as e: - logger.error(f"程序执行过程中发生错误: {e}") - - # 即使出错也尝试创建调试包 - try: - debug_zip = create_debug_package(timestamp) - logger.info(f"已创建调试包: {debug_zip}") - except: - logger.error("创建调试包失败") - - # 清理临时文件 - cleanup() - -if __name__ == "__main__": - main() \ No newline at end of file