Files
Purelineawa-Asuku-Rewards/main.py
Pureline 16dee0108e base
2025-08-22 00:00:51 +08:00

982 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
import random
import logging
import pickle
import zipfile
import requests
import json
import subprocess
import sys
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from selenium.webdriver.edge.service import Service
from selenium.webdriver.edge.options import Options
from bs4 import BeautifulSoup
# 设置日志记录
def setup_logging():
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = os.path.join(log_dir, f"bing_search_{timestamp}.log")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger(__name__), timestamp
logger, timestamp = setup_logging()
# 关键词列表100个风格偏日常搜索场景
KEYWORDS = [
# 购物相关
"Best laptops 2025", "Smartphone deals", "Fashion trends women", "Online shopping discounts",
"Gaming console prices", "Home appliance reviews", "Sneaker brands", "Luxury watches",
"Budget headphones", "Furniture sales", "Electronics deals", "Black Friday 2025",
"Amazon best sellers", "Tech gadgets 2025", "Winter clothing trends", "Jewelry gift ideas",
# 旅游与生活
"Top travel destinations", "Cheap flights 2025", "Hotel booking tips", "Beach vacation ideas",
"City break Europe", "Adventure travel packages", "Cruise deals 2025", "Travel insurance comparison",
"Camping gear reviews", "Best hiking trails", "Family vacation spots", "Solo travel tips",
"Backpacking destinations", "Luxury resorts Asia", "Travel safety tips", "Road trip ideas",
# 新闻与时事
"Breaking news today", "World news updates", "US election 2025", "Global economy trends",
"Climate change solutions", "Political debates 2025", "International conflicts", "Tech industry updates",
"Stock market predictions", "Health policy news", "Space mission updates", "Energy crisis 2025",
# 学术与教育
"Online courses free", "Best coding bootcamps", "Study abroad programs", "Scholarship opportunities",
"Academic research tools", "Math learning apps", "History documentaries", "Science podcasts",
"University rankings 2025", "Career training programs", "Language learning tips", "STEM resources",
# 健康与健身
"Weight loss diets", "Home workout routines", "Mental health tips", "Meditation apps",
"Healthy meal plans", "Fitness equipment reviews", "Yoga for beginners", "Nutrition supplements",
"Running shoes reviews", "Stress management techniques", "Sleep improvement tips", "Vegan recipes easy",
# 娱乐与文化
"New movie releases", "TV show reviews 2025", "Music festivals 2025", "Book recommendations",
"Streaming service deals", "Celebrity news today", "Top video games 2025", "Art exhibitions",
"Theater shows 2025", "Pop music charts", "Comedy specials Netflix", "Cultural events near me",
# 科技与创新
"Smart home devices 2025", "Wearable tech reviews", "Electric car prices", "AI innovations",
"5G network updates", "Virtual reality headsets", "Drone technology", "Cybersecurity tips",
"Tech startups 2025", "Cloud storage comparison", "Programming tutorials", "Data privacy laws",
# 其他日常搜索
"Local weather forecast", "Event planning ideas", "DIY craft projects", "Pet adoption near me",
"Gardening for beginners", "Car maintenance tips", "Home renovation ideas", "Wedding planning guide",
"Photography gear reviews", "Best coffee machines", "Restaurant reviews near me", "Online grocery delivery",
"Real estate trends 2025", "Job search websites", "Personal finance apps", "Charity organizations"
]
def get_zhihu_trending():
"""获取知乎热榜"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://www.zhihu.com/hot'
}
url = 'https://www.zhihu.com/api/v3/feed/topstory/hot-lists/total?limit=50'
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
trending_keywords = []
for item in data.get('data', []):
title = item.get('target', {}).get('title', '') or item.get('target', {}).get('question', {}).get('title', '')
if title and len(title) > 2 and len(title) < 30:
trending_keywords.append(title)
if len(trending_keywords) >= 20: # 获取20条
break
logger.info(f"成功获取知乎热榜 {len(trending_keywords)}")
return trending_keywords
else:
logger.warning(f"知乎热榜请求失败,状态码: {response.status_code}")
return []
except Exception as e:
logger.error(f"获取知乎热榜失败: {e}")
return []
def get_baidu_trending():
"""获取百度热榜"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://top.baidu.com/board?tab=realtime'
}
url = 'https://top.baidu.com/api/board?platform=wise&tab=realtime'
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
trending_keywords = []
for item in data.get('data', {}).get('cards', [{}])[0].get('content', []):
title = item.get('word', '') or item.get('query', '') or item.get('title', '')
if title and len(title) > 2 and len(title) < 30:
trending_keywords.append(title)
if len(trending_keywords) >= 20: # 获取20条
break
logger.info(f"成功获取百度热榜 {len(trending_keywords)}")
return trending_keywords
else:
logger.warning(f"百度热榜请求失败,状态码: {response.status_code}")
return []
except Exception as e:
logger.error(f"获取百度热榜失败: {e}")
return []
def get_bilibili_trending():
"""获取哔哩哔哩热榜"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://www.bilibili.com/v/popular/rank/all'
}
url = 'https://api.bilibili.com/x/web-interface/popular?ps=20'
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
trending_keywords = []
for item in data.get('data', {}).get('list', []):
title = item.get('title', '')
if title and len(title) > 2 and len(title) < 30:
trending_keywords.append(title)
if len(trending_keywords) >= 20: # 获取20条
break
logger.info(f"成功获取哔哩哔哩热榜 {len(trending_keywords)}")
return trending_keywords
else:
logger.warning(f"哔哩哔哩热榜请求失败,状态码: {response.status_code}")
return []
except Exception as e:
logger.error(f"获取哔哩哔哩热榜失败: {e}")
return []
def get_toutiao_trending():
"""获取今日头条热榜"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://www.toutiao.com/'
}
url = 'https://www.toutiao.com/hot-event/hot-board/?origin=toutiao_pc'
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
trending_keywords = []
for item in data.get('data', []):
title = item.get('Title', '') or item.get('title', '') or item.get('Query', '')
if title and len(title) > 2 and len(title) < 30:
trending_keywords.append(title)
if len(trending_keywords) >= 20: # 获取20条
break
logger.info(f"成功获取今日头条热榜 {len(trending_keywords)}")
return trending_keywords
else:
logger.warning(f"今日头条热榜请求失败,状态码: {response.status_code}")
return []
except Exception as e:
logger.error(f"获取今日头条热榜失败: {e}")
return []
def get_github_trending():
"""获取GitHub热榜"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# 获取多种编程语言的趋势
languages = ['', 'python', 'javascript', 'java', 'go', 'rust']
trending_keywords = []
for lang in languages:
url = f'https://api.github.com/search/repositories?q=language:{lang}&sort=stars&order=desc&per_page=10'
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
for item in data.get('items', []):
name = item.get('name', '')
description = item.get('description', '')
if name and len(name) > 2 and len(name) < 30:
trending_keywords.append(f"github {name}")
if description and len(description) > 5 and len(description) < 40:
# 从描述中提取关键词
words = description.split()
for word in words:
if len(word) > 3 and len(word) < 15 and word.isalpha():
trending_keywords.append(f"github {word}")
if len(trending_keywords) >= 15: # 获取15条
break
time.sleep(1) # 避免请求过于频繁
logger.info(f"成功获取GitHub热榜 {len(trending_keywords)}")
return trending_keywords[:15] # 返回前15条
except Exception as e:
logger.error(f"获取GitHub热榜失败: {e}")
return []
def get_trending_keywords():
"""获取多个平台的热榜关键词"""
logger.info("开始获取多平台热搜关键词...")
# 并行获取多个平台的热榜
trending_functions = [
get_zhihu_trending,
get_baidu_trending,
get_bilibili_trending,
get_toutiao_trending,
get_github_trending
]
all_keywords = []
for func in trending_functions:
try:
keywords = func()
if keywords:
all_keywords.extend(keywords)
logger.info(f"{func.__name__} 获取到 {len(keywords)} 个关键词")
except Exception as e:
logger.error(f"获取 {func.__name__} 热榜时出错: {e}")
# 去重
all_keywords = list(set(all_keywords))
# 确保至少有60条有效词条
if len(all_keywords) < 60:
logger.warning(f"只获取到 {len(all_keywords)} 条热搜关键词,补充预设关键词")
# 补充一些常见关键词
additional_keywords = [
"科技创新", "数字化转型", "云计算", "大数据", "物联网", "5G应用",
"人工智能技术", "机器学习", "深度学习", "自动驾驶", "智能家居", "智慧城市",
"远程办公", "在线教育", "数字医疗", "电商平台", "社交媒体", "内容创作",
"短视频平台", "直播经济", "元宇宙概念", "NFT", "数字货币", "区块链技术",
"碳中和", "绿色发展", "可再生能源", "环境保护", "气候变化", "可持续发展",
"健康生活", "心理健康", "健身运动", "营养饮食", "疾病预防", "医疗保险",
"教育改革", "在线学习", "职业培训", "就业市场", "创业机会", "投资方向",
"房地产市场", "股市行情", "基金理财", "保险产品", "消费趋势", "零售行业",
"文化旅游", "户外运动", "本地生活", "餐饮美食", "时尚潮流", "美妆个护"
]
all_keywords.extend(additional_keywords)
all_keywords = list(set(all_keywords)) # 再次去重
logger.info(f"成功获取 {len(all_keywords)} 条热搜关键词")
return all_keywords[:60] # 返回前60条
def find_edge_driver():
"""尝试找到本地已安装的Edge驱动"""
# 常见Edge驱动安装路径
possible_paths = [
# Windows默认安装路径
"C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedgedriver.exe",
"C:\\Program Files\\Microsoft\\Edge\\Application\\msedgedriver.exe",
# 用户可能手动安装的路径
os.path.expanduser("~\\AppData\\Local\\Microsoft\\Edge\\Application\\msedgedriver.exe"),
# 当前目录
"msedgedriver.exe",
# macOS路径
"/Applications/Microsoft Edge.app/Contents/MacOS/msedgedriver",
# Linux路径
"/usr/bin/msedgedriver",
"/usr/local/bin/msedgedriver"
]
for path in possible_paths:
if os.path.exists(path):
logger.info(f"找到Edge驱动: {path}")
return path
logger.error("未找到Edge驱动请手动下载并放置在当前目录或系统PATH中")
return None
def setup_desktop_driver():
"""设置并返回桌面版Edge WebDriver实例"""
options = Options()
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--start-maximized") # 最大化窗口
# 高级反检测选项
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation", "enable-logging"])
options.add_experimental_option('useAutomationExtension', False)
# 随机用户代理
desktop_user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36 Edg/96.0.1054.62",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.55",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36 Edg/98.0.1108.51",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36 Edg/99.0.1150.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36 Edg/100.0.1185.29"
]
options.add_argument(f'--user-agent={random.choice(desktop_user_agents)}')
# 随机化窗口大小
width = random.randint(1200, 1920)
height = random.randint(800, 1080)
options.add_argument(f"--window-size={width},{height}")
try:
# 尝试使用WebDriver Manager自动下载驱动
try:
service = Service(EdgeChromiumDriverManager().install())
driver = webdriver.Edge(service=service, options=options)
logger.info("使用WebDriver Manager成功初始化Edge驱动")
except Exception as e:
logger.warning(f"WebDriver Manager初始化失败: {e}")
logger.info("尝试使用本地Edge驱动...")
# 查找本地Edge驱动
driver_path = find_edge_driver()
if driver_path:
service = Service(executable_path=driver_path)
driver = webdriver.Edge(service=service, options=options)
logger.info("使用本地Edge驱动成功初始化")
else:
raise Exception("无法找到可用的Edge驱动")
# 执行反检测脚本
driver.execute_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 覆盖chrome对象
window.chrome = {
runtime: {},
// 等等
};
// 覆盖权限
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
// 覆盖语言
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en']
});
// 覆盖plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
""")
return driver
except WebDriverException as e:
logger.error(f"桌面版WebDriver初始化失败: {e}")
raise
def setup_mobile_driver():
"""设置并返回移动版Edge WebDriver实例"""
options = Options()
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_experimental_option("excludeSwitches", ["enable-automation", "enable-logging"])
options.add_experimental_option('useAutomationExtension', False)
# 设置移动设备模拟
mobile_devices = [
{"deviceName": "iPhone X", "userAgent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Mobile/15E148 Safari/604.1 Edg/97.0.1072.55"},
{"deviceName": "Galaxy S5", "userAgent": "Mozilla/5.0 (Linux; Android 5.0; SM-G900P Build/LRX21T) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Mobile Safari/537.36 Edg/97.0.1072.55"},
{"deviceName": "Pixel 5", "userAgent": "Mozilla/5.0 (Linux; Android 11; Pixel 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.91 Mobile Safari/537.36 Edg/97.0.1072.55"}
]
device = random.choice(mobile_devices)
mobile_emulation = {"deviceMetrics": {"width": 375, "height": 812, "pixelRatio": 3.0}, "userAgent": device["userAgent"]}
options.add_experimental_option("mobileEmulation", mobile_emulation)
try:
# 尝试使用WebDriver Manager自动下载驱动
try:
service = Service(EdgeChromiumDriverManager().install())
driver = webdriver.Edge(service=service, options=options)
logger.info("使用WebDriver Manager成功初始化移动版Edge驱动")
except Exception as e:
logger.warning(f"WebDriver Manager初始化失败: {e}")
logger.info("尝试使用本地Edge驱动...")
# 查找本地Edge驱动
driver_path = find_edge_driver()
if driver_path:
service = Service(executable_path=driver_path)
driver = webdriver.Edge(service=service, options=options)
logger.info("使用本地Edge驱动成功初始化移动版")
else:
raise Exception("无法找到可用的Edge驱动")
# 执行反检测脚本
driver.execute_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
""")
return driver
except WebDriverException as e:
logger.error(f"移动版WebDriver初始化失败: {e}")
raise
def save_cookies(driver, filename):
"""保存cookies到文件"""
try:
cookies = driver.get_cookies()
with open(filename, 'wb') as f:
pickle.dump(cookies, f)
logger.info(f"Cookies已保存到 {filename}")
except Exception as e:
logger.error(f"保存cookies失败: {e}")
def load_cookies(driver, filename):
"""从文件加载cookies"""
try:
with open(filename, 'rb') as f:
cookies = pickle.load(f)
for cookie in cookies:
try:
driver.add_cookie(cookie)
except Exception as e:
logger.warning(f"添加cookie失败: {e}")
continue
logger.info(f"已从 {filename} 加载cookies")
return True
except FileNotFoundError:
logger.warning(f"Cookie文件 {filename} 不存在")
return False
except Exception as e:
logger.error(f"加载cookies失败: {e}")
return False
def microsoft_login(driver):
"""Microsoft账户登录"""
try:
logger.info("尝试Microsoft账户登录...")
# 检查是否已有cookies
cookie_file = "microsoft_cookies.pkl"
if load_cookies(driver, cookie_file):
# 刷新页面应用cookies
driver.refresh()
time.sleep(3)
# 检查是否已登录
try:
driver.find_element(By.ID, "id_n")
logger.info("使用cookies登录成功")
return True
except:
logger.warning("cookies已失效需要重新登录")
# 手动登录流程
driver.get("https://login.live.com")
time.sleep(3)
# 等待用户手动登录
logger.info("请在浏览器中手动登录Microsoft账户完成后按回车键继续...")
input()
# 保存cookies以备下次使用
save_cookies(driver, cookie_file)
logger.info("Microsoft账户登录完成")
return True
except Exception as e:
logger.error(f"Microsoft登录失败: {e}")
return False
def human_like_delay(min_sec=1.0, max_sec=3.0):
"""模拟人类操作之间的随机延迟"""
time.sleep(random.uniform(min_sec, max_sec))
def simulate_human_typing(element, text):
"""模拟人类输入文本"""
for character in text:
element.send_keys(character)
time.sleep(random.uniform(0.05, 0.2)) # 随机输入间隔
def simulate_mouse_movement(driver):
"""模拟鼠标移动"""
try:
actions = ActionChains(driver)
# 随机移动鼠标
for _ in range(random.randint(2, 5)):
x_offset = random.randint(-100, 100)
y_offset = random.randint(-100, 100)
actions.move_by_offset(x_offset, y_offset)
actions.perform()
time.sleep(random.uniform(0.1, 0.5))
# 随机点击空白处
if random.random() < 0.3: # 30%概率点击
body = driver.find_element(By.TAG_NAME, "body")
actions.move_to_element_with_offset(body, random.randint(10, 100), random.randint(10, 100))
actions.click()
actions.perform()
except Exception as e:
logger.warning(f"模拟鼠标移动失败: {e}")
def simulate_human_scroll(driver, is_mobile=False):
"""模拟人类的滚动行为"""
try:
# 获取页面高度
page_height = driver.execute_script("return document.body.scrollHeight")
if is_mobile:
# 移动端滚动参数
scroll_times = random.randint(2, 5) # 移动端页面通常更长,需要更多滚动
scroll_distance_range = (150, 300) # 移动端滚动距离较小
else:
# 桌面端滚动参数
scroll_times = random.randint(1, 4)
scroll_distance_range = (100, 500)
current_position = 0
for _ in range(scroll_times):
# 随机滚动距离
scroll_distance = random.randint(*scroll_distance_range)
# 30% 概率向上滚动
if random.random() < 0.3:
scroll_distance = -scroll_distance
# 确保不超出页面范围
if 0 <= current_position + scroll_distance < page_height:
driver.execute_script(f"window.scrollBy(0, {scroll_distance});")
current_position += scroll_distance
else:
# 滚动到顶部或底部
driver.execute_script("window.scrollTo(0, arguments[0]);",
0 if current_position + scroll_distance < 0 else page_height)
break
# 模拟人类阅读的随机停顿
human_like_delay(0.5, 2.0)
except Exception as e:
logger.error(f"滚动时发生错误: {e}")
def simulate_search_result_interaction(driver, is_mobile=False):
"""模拟与搜索结果的交互(随机点击一个结果)"""
try:
# 等待搜索结果加载
if is_mobile:
# 移动端可能使用不同的选择器
results = WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "h2 a, .b_algo h2 a, .b_title a"))
)
else:
results = WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "h2 a"))
)
if results:
# 随机选择一个结果点击(但不是每次都点击)
if random.random() < 0.7: # 70%的概率点击一个结果
result_to_click = random.choice(results[:5]) # 只在前5个结果中选择
logger.info(f"点击搜索结果: {result_to_click.text[:50]}...")
result_to_click.click()
human_like_delay(3, 8) # 在点击的页面上停留一段时间
driver.back() # 返回搜索结果页
# 等待返回后的页面加载
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "sb_form_q"))
)
except (TimeoutException, NoSuchElementException) as e:
logger.warning(f"搜索结果交互失败: {e}")
def take_screenshot(driver, filename):
"""截取屏幕截图"""
try:
driver.save_screenshot(filename)
logger.info(f"截图已保存: {filename}")
except Exception as e:
logger.error(f"截图失败: {e}")
def bing_search(driver, query, is_mobile=False):
"""执行一次Bing搜索"""
try:
logger.info(f"正在{'移动端' if is_mobile else '电脑端'}搜索: {query}")
driver.get("https://www.bing.com")
# 模拟鼠标移动
simulate_mouse_movement(driver)
# 等待搜索框加载
if is_mobile:
# 移动端可能需要点击搜索图标才能显示搜索框
try:
search_icon = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, ".search.icon, .scopebar_icon"))
)
search_icon.click()
human_like_delay(0.5, 1.5)
except TimeoutException:
logger.info("移动端搜索图标未找到,直接尝试搜索框")
search_box = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "sb_form_q"))
)
# 清空搜索框并输入查询
search_box.clear()
human_like_delay(0.5, 1.5)
simulate_human_typing(search_box, query)
human_like_delay(0.5, 1.5)
# 提交搜索
search_box.send_keys(Keys.RETURN)
# 等待搜索结果加载
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "h2 a, .b_algo, .b_title"))
)
# 模拟滚动行为
simulate_human_scroll(driver, is_mobile)
# 模拟与搜索结果的交互
simulate_search_result_interaction(driver, is_mobile)
# 额外延迟,模拟浏览时间
human_like_delay(2, 6)
return True
except Exception as e:
logger.error(f"搜索 '{query}' 时发生错误: {e}")
return False
def run_desktop_searches():
"""运行桌面端搜索"""
driver = None
successful_searches = 0
target_searches = 40
try:
driver = setup_desktop_driver()
logger.info("桌面端浏览器启动成功,开始执行搜索任务")
# Microsoft登录
microsoft_login(driver)
# 获取热搜关键词
trending_keywords = get_trending_keywords()
# 合并关键词
all_keywords = KEYWORDS + trending_keywords
random.shuffle(all_keywords)
# 执行40次桌面端搜索
for i in range(target_searches):
keyword = all_keywords[i % len(all_keywords)] # 循环使用关键词
logger.info(f"执行第 {i + 1} 次桌面端搜索")
if bing_search(driver, keyword, is_mobile=False):
successful_searches += 1
# 随机截图
if random.random() < 0.2: # 20%的概率截图
screenshot_dir = "screenshots"
if not os.path.exists(screenshot_dir):
os.makedirs(screenshot_dir)
screenshot_file = os.path.join(screenshot_dir, f"desktop_{i+1}_{datetime.now().strftime('%H%M%S')}.png")
take_screenshot(driver, screenshot_file)
else:
# 如果搜索失败,等待一段时间再继续
human_like_delay(5, 10)
# 每5次搜索后稍作休息
if (i + 1) % 5 == 0:
logger.info(f"已完成 {i + 1} 次桌面端搜索,休息一下...")
human_like_delay(10, 20)
logger.info(f"桌面端搜索任务完成! 成功执行了 {successful_searches}/{target_searches} 次搜索")
return successful_searches
except Exception as e:
logger.error(f"桌面端搜索执行过程中发生错误: {e}")
return successful_searches
finally:
if driver:
# 保存cookies
save_cookies(driver, "microsoft_cookies.pkl")
driver.quit()
logger.info("桌面端浏览器已关闭")
def run_mobile_searches():
"""运行移动端搜索"""
driver = None
successful_searches = 0
target_searches = 30
try:
driver = setup_mobile_driver()
logger.info("移动端浏览器启动成功,开始执行搜索任务")
# Microsoft登录
microsoft_login(driver)
# 获取热搜关键词
trending_keywords = get_trending_keywords()
# 合并关键词
all_keywords = KEYWORDS + trending_keywords
random.shuffle(all_keywords)
# 执行30次移动端搜索
for i in range(target_searches):
keyword = all_keywords[i % len(all_keywords)] # 循环使用关键词
logger.info(f"执行第 {i + 1} 次移动端搜索")
if bing_search(driver, keyword, is_mobile=True):
successful_searches += 1
# 随机截图
if random.random() < 0.2: # 20%的概率截图
screenshot_dir = "screenshots"
if not os.path.exists(screenshot_dir):
os.makedirs(screenshot_dir)
screenshot_file = os.path.join(screenshot_dir, f"mobile_{i+1}_{datetime.now().strftime('%H%M%S')}.png")
take_screenshot(driver, screenshot_file)
else:
# 如果搜索失败,等待一段时间再继续
human_like_delay(5, 10)
# 每5次搜索后稍作休息
if (i + 1) % 5 == 0:
logger.info(f"已完成 {i + 1} 次移动端搜索,休息一下...")
human_like_delay(10, 20)
logger.info(f"移动端搜索任务完成! 成功执行了 {successful_searches}/{target_searches} 次搜索")
return successful_searches
except Exception as e:
logger.error(f"移动端搜索执行过程中发生错误: {e}")
return successful_searches
finally:
if driver:
# 保存cookies
save_cookies(driver, "microsoft_cookies.pkl")
driver.quit()
logger.info("移动端浏览器已关闭")
def create_debug_package(timestamp):
"""创建调试包"""
debug_dir = "debug"
if not os.path.exists(debug_dir):
os.makedirs(debug_dir)
zip_filename = os.path.join(debug_dir, f"debug_{timestamp}.zip")
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 添加日志文件
log_files = [f for f in os.listdir("logs") if f.startswith(f"bing_search_{timestamp}")]
for log_file in log_files:
zipf.write(os.path.join("logs", log_file), log_file)
# 添加cookies文件
if os.path.exists("microsoft_cookies.pkl"):
zipf.write("microsoft_cookies.pkl", "microsoft_cookies.pkl")
# 添加截图
if os.path.exists("screenshots"):
for screenshot in os.listdir("screenshots"):
zipf.write(os.path.join("screenshots", screenshot), os.path.join("screenshots", screenshot))
logger.info(f"调试包已创建: {zip_filename}")
return zip_filename
def cleanup():
"""清理临时文件"""
try:
# 删除cookies文件
if os.path.exists("microsoft_cookies.pkl"):
os.remove("microsoft_cookies.pkl")
# 删除截图目录
if os.path.exists("screenshots"):
import shutil
shutil.rmtree("screenshots")
logger.info("临时文件清理完成")
except Exception as e:
logger.error(f"清理临时文件失败: {e}")
def download_edge_driver():
"""手动下载Edge驱动"""
logger.info("尝试手动下载Edge驱动...")
# 获取Edge浏览器版本
try:
if sys.platform == "win32":
# Windows系统
import winreg
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Edge\BLBeacon")
version, _ = winreg.QueryValueEx(key, "version")
logger.info(f"检测到Edge浏览器版本: {version}")
except:
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"Software\Microsoft\Edge\BLBeacon")
version, _ = winreg.QueryValueEx(key, "version")
logger.info(f"检测到Edge浏览器版本: {version}")
except:
logger.warning("无法获取Edge浏览器版本使用默认版本")
version = "120.0.2210.91" # 默认版本
else:
# macOS或Linux
try:
result = subprocess.run(["microsoft-edge", "--version"], capture_output=True, text=True)
version = result.stdout.strip().split()[-1]
logger.info(f"检测到Edge浏览器版本: {version}")
except:
logger.warning("无法获取Edge浏览器版本使用默认版本")
version = "120.0.2210.91" # 默认版本
# 构建下载URL
major_version = version.split('.')[0]
base_url = f"https://msedgedriver.azureedge.net/{version}/edgedriver_"
if sys.platform == "win32":
download_url = base_url + "win64.zip"
driver_name = "msedgedriver.exe"
elif sys.platform == "darwin":
if "arm" in os.uname().machine:
download_url = base_url + "mac64_m1.zip"
else:
download_url = base_url + "mac64.zip"
driver_name = "msedgedriver"
else:
download_url = base_url + "linux64.zip"
driver_name = "msedgedriver"
logger.info(f"下载URL: {download_url}")
# 下载驱动
response = requests.get(download_url, stream=True)
if response.status_code == 200:
zip_path = "edgedriver.zip"
with open(zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 解压
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(".")
# 设置执行权限 (非Windows系统)
if sys.platform != "win32":
os.chmod(driver_name, 0o755)
# 清理
os.remove(zip_path)
logger.info("Edge驱动下载并解压成功")
return driver_name
else:
logger.error(f"下载失败,状态码: {response.status_code}")
return None
except Exception as e:
logger.error(f"下载Edge驱动失败: {e}")
return None
def main():
total_successful = 0
# 检查Edge驱动是否存在
if not find_edge_driver():
logger.warning("未找到Edge驱动尝试下载...")
if not download_edge_driver():
logger.error("无法下载Edge驱动请手动下载并放置在当前目录")
return
try:
# 运行桌面端搜索
desktop_success = run_desktop_searches()
total_successful += desktop_success
# 在桌面端和移动端搜索之间添加较长延迟
logger.info("桌面端搜索完成,等待一段时间后开始移动端搜索...")
time.sleep(random.uniform(30, 60))
# 运行移动端搜索
mobile_success = run_mobile_searches()
total_successful += mobile_success
logger.info(f"所有搜索任务完成! 总共成功执行了 {total_successful}/70 次搜索")
# 创建调试包
debug_zip = create_debug_package(timestamp)
# 清理临时文件
cleanup()
logger.info(f"程序执行完毕,调试包保存在: {debug_zip}")
except Exception as e:
logger.error(f"程序执行过程中发生错误: {e}")
# 即使出错也尝试创建调试包
try:
debug_zip = create_debug_package(timestamp)
logger.info(f"已创建调试包: {debug_zip}")
except:
logger.error("创建调试包失败")
# 清理临时文件
cleanup()
if __name__ == "__main__":
main()