mirror of
https://github.com/smallfawn/QLScriptPublic.git
synced 2025-12-17 15:25:10 +08:00
641 lines
24 KiB
Python
641 lines
24 KiB
Python
"""
|
||
北京现代 APP 自动任务脚本
|
||
功能:自动完成签到、浏览文章、每日答题等任务
|
||
new Env("北京现代")
|
||
cron: 25 6 * * *
|
||
|
||
环境变量:
|
||
BJXD_DEVICE 安卓写android 苹果IOS写iOS
|
||
BJXD: str - 北京现代 APP api token (多个账号用英文逗号分隔,建议每个账号一个变量)
|
||
BJXD1/BJXD2/BJXD3: str - 北京现代 APP api token (每个账号一个变量)
|
||
BJXD_ANSWER: str - 预设答案 (可选, ABCD 中的一个)
|
||
HUNYUAN_API_KEY: str - 腾讯混元AI APIKey (可选)
|
||
|
||
"""
|
||
|
||
import os
|
||
import random
|
||
import time
|
||
from datetime import datetime
|
||
from typing import List, Dict, Any
|
||
import requests
|
||
from urllib3.exceptions import InsecureRequestWarning, InsecurePlatformWarning
|
||
|
||
# 禁用 SSL 警告
|
||
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||
requests.packages.urllib3.disable_warnings(InsecurePlatformWarning)
|
||
|
||
|
||
class BeiJingHyundai:
|
||
"""北京现代APP自动任务类"""
|
||
|
||
# 基础配置
|
||
NAME = "北京现代 APP 自动任务"
|
||
BASE_URL = "https://bm2-api.bluemembers.com.cn"
|
||
|
||
# API endpoints
|
||
API_USER_INFO = "/v1/app/account/users/info"
|
||
API_MY_SCORE = "/v1/app/user/my_score"
|
||
API_TASK_LIST = "/v1/app/user/task/list"
|
||
API_SIGN_LIST = "/v1/app/user/reward_list"
|
||
API_SIGN_SUBMIT = "/v1/app/user/reward_report"
|
||
API_ARTICLE_LIST = "/v1/app/white/article/list2"
|
||
API_ARTICLE_DETAIL = "/v1/app/white/article/detail_app/{}"
|
||
API_ARTICLE_SCORE_SUBMIT = "/v1/app/score"
|
||
API_QUESTION_INFO = "/v1/app/special/daily/ask_info"
|
||
API_QUESTION_SUBMIT = "/v1/app/special/daily/ask_answer"
|
||
|
||
# 预设的备用 share_user_hid 列表
|
||
BACKUP_HIDS = [
|
||
"a6688ec1a9ee429fa7b68d50e0c92b1f",
|
||
"bb8cd2e44c7b45eeb8cc5f7fa71c3322",
|
||
"5f640c50061b400c91be326c8fe0accd",
|
||
"55a5d82dacd9417483ae369de9d9b82d",
|
||
]
|
||
|
||
def __init__(self):
|
||
"""初始化实例变量"""
|
||
self.token: str = "" # 当前用户token
|
||
self.user: Dict[str, Any] = {} # 当前用户信息
|
||
self.users: List[Dict[str, Any]] = [] # 所有用户信息列表
|
||
self.correct_answer: str = "" # 正确答案
|
||
self.preset_answer: str = "" # 预设答案
|
||
self.ai_api_key: str = "" # 腾讯混元AI APIKey
|
||
self.wrong_answers: set = set() # 错误答案集合
|
||
self.log_content: str = "" # 日志内容
|
||
|
||
def log(self, content: str, print_to_console: bool = True) -> None:
|
||
"""添加日志"""
|
||
if print_to_console:
|
||
print(content)
|
||
self.log_content += content + "\n"
|
||
|
||
def push_notification(self) -> None:
|
||
"""推送通知"""
|
||
try:
|
||
QLAPI.notify(self.NAME, self.log_content)
|
||
except NameError:
|
||
print(f"\n\n🚀 推送通知\n\n{self.NAME}\n\n{self.log_content}")
|
||
|
||
def make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
|
||
"""
|
||
发送API请求
|
||
Args:
|
||
method: 请求方法 (GET/POST)
|
||
endpoint: API端点
|
||
**kwargs: 请求参数
|
||
Returns:
|
||
Dict[str, Any]: API响应数据
|
||
"""
|
||
url = f"{self.BASE_URL}{endpoint}"
|
||
headers = {"token": self.token, "device": os.getenv("BJXD_DEVICE", "android")}
|
||
if "headers" not in kwargs:
|
||
kwargs["headers"] = headers
|
||
else:
|
||
kwargs["headers"].update(headers)
|
||
try:
|
||
response = requests.request(method, url, **kwargs)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
except requests.exceptions.RequestException as e:
|
||
self.log(f"❌ API request failed: {str(e)}")
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
def get_user_info(self) -> Dict[str, Any]:
|
||
"""
|
||
获取用户信息
|
||
Returns:
|
||
Dict[str, Any]: 用户信息字典,获取失败返回空字典
|
||
"""
|
||
response = self.make_request("GET", self.API_USER_INFO)
|
||
print(f"get_user_info API response ——> {response}")
|
||
|
||
if response["code"] == 0:
|
||
data = response["data"]
|
||
# 直接生成掩码后的手机号
|
||
masked_phone = f"{data['phone'][:3]}******{data['phone'][-2:]}"
|
||
return {
|
||
"token": self.token,
|
||
"hid": data["hid"],
|
||
"nickname": data["nickname"],
|
||
"phone": masked_phone, # 直接存储掩码后的手机号
|
||
"score_value": data["score_value"],
|
||
"share_user_hid": "",
|
||
"task": {"sign": False, "view": False, "question": False},
|
||
}
|
||
|
||
self.log(f"❌ 账号已失效, 请重新获取 token: {self.token}")
|
||
return {}
|
||
|
||
def get_score_details(self) -> None:
|
||
"""显示积分详情,包括总积分、今日变动和最近记录"""
|
||
params = {"page_no": "1", "page_size": "10"} # 获取最近10条记录
|
||
response = self.make_request("GET", self.API_MY_SCORE, params=params)
|
||
print(f"get_score_details API response ——> {response}")
|
||
|
||
if response["code"] == 0:
|
||
data = response["data"]
|
||
# 先获取今日记录
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
today_records = [
|
||
record
|
||
for record in data["points_record"]["list"]
|
||
if record["created_at"].startswith(today)
|
||
]
|
||
|
||
# 计算今日积分变化
|
||
today_score = sum(
|
||
int(record["score_str"].strip("+")) for record in today_records
|
||
)
|
||
today_score_str = f"+{today_score}" if today_score > 0 else str(today_score)
|
||
self.log(f"🎉 总积分: {data['score']} | 今日积分变动: {today_score_str}")
|
||
|
||
# 输出今日积分记录
|
||
if today_records:
|
||
self.log("今日积分记录:")
|
||
for record in today_records:
|
||
self.log(
|
||
f"{record['created_at']} {record['desc']} {record['score_str']}"
|
||
)
|
||
else:
|
||
self.log("今日暂无积分变动")
|
||
|
||
# 任务相关
|
||
def check_task_status(self, user: Dict[str, Any]) -> None:
|
||
"""检查任务状态"""
|
||
response = self.make_request("GET", self.API_TASK_LIST)
|
||
print(f"get_task_status API response ——> {response}")
|
||
|
||
if response["code"] != 0:
|
||
self.log(f'❌ 获取任务列表失败: {response["msg"]}')
|
||
return
|
||
|
||
actions = response.get("data", {})
|
||
|
||
# 检查签到任务
|
||
if "action4" in actions:
|
||
user["task"]["sign"] = actions["action4"].get("status") == 1
|
||
else:
|
||
self.log("❌ task list action4 签到任务 不存在")
|
||
|
||
# 检查浏览文章任务
|
||
if "action12" in actions:
|
||
user["task"]["view"] = actions["action12"].get("status") == 1
|
||
else:
|
||
self.log("❌ task list action12 浏览文章任务 不存在")
|
||
|
||
# 检查答题任务
|
||
if "action39" in actions:
|
||
user["task"]["question"] = actions["action39"].get("status") == 1
|
||
else:
|
||
self.log("❌ task list action39 答题任务 不存在")
|
||
|
||
# 签到相关
|
||
def get_sign_info(self) -> None:
|
||
"""执行签到任务"""
|
||
max_attempts = 5 # 最大尝试次数
|
||
best_score = 0
|
||
best_params = None
|
||
|
||
for attempt in range(max_attempts):
|
||
response = self.make_request("GET", self.API_SIGN_LIST)
|
||
print(f"get_sign_info (attempt {attempt + 1}) API response ——> {response}")
|
||
|
||
if response["code"] != 0:
|
||
self.log(f'❌ 获取签到列表失败: {response["msg"]}')
|
||
break
|
||
|
||
data = response["data"]
|
||
hid = data["hid"]
|
||
reward_hash = data["rewardHash"]
|
||
|
||
for item in data["list"]:
|
||
if item["hid"] == hid:
|
||
current_score = item["score"]
|
||
print(
|
||
f"第{attempt + 1}次获取签到列表: score={current_score} hid={hid} rewardHash={reward_hash}"
|
||
)
|
||
|
||
if current_score > best_score:
|
||
best_score = current_score
|
||
best_params = (hid, reward_hash, current_score)
|
||
print(f"当前可获得签到积分: {best_score}")
|
||
break
|
||
|
||
if attempt < max_attempts - 1: # 不是最后一次循环
|
||
print(f"继续尝试获取更高积分, 延时5-10s")
|
||
time.sleep(random.randint(5, 10))
|
||
else: # 最后一次循环 即将提交签到
|
||
print(f"即将提交签到, 延时3-4s")
|
||
time.sleep(random.randint(3, 4))
|
||
|
||
if best_params:
|
||
self.submit_sign(*best_params)
|
||
else:
|
||
self.log("❌ 未能获取到有效的签到参数")
|
||
|
||
def submit_sign(self, hid: str, reward_hash: str, score: int) -> None:
|
||
"""提交签到"""
|
||
json_data = {
|
||
"hid": hid,
|
||
"hash": reward_hash,
|
||
"sm_deviceId": "",
|
||
"ctu_token": None,
|
||
}
|
||
response = self.make_request("POST", self.API_SIGN_SUBMIT, json=json_data)
|
||
print(f"submit_sign API response ——> {response}")
|
||
|
||
if response["code"] == 0:
|
||
self.log(f"✅ 签到成功 | 积分 +{score}")
|
||
else:
|
||
self.log(f'❌ 签到失败: {response["msg"]}')
|
||
|
||
# 文章浏览相关
|
||
def get_article_list(self) -> List[str]:
|
||
"""获取文章列表"""
|
||
params = {
|
||
"page_no": "1",
|
||
"page_size": "20",
|
||
"type_hid": "",
|
||
}
|
||
response = self.make_request("GET", self.API_ARTICLE_LIST, params=params)
|
||
print(f"get_article_list API response ——> {response}")
|
||
|
||
if response["code"] == 0:
|
||
# 从文章列表中随机选择3个ID
|
||
article_list = [item["data_id"] for item in response["data"]["list"]]
|
||
return random.sample(article_list, min(3, len(article_list)))
|
||
|
||
self.log(f'❌ 获取文章列表失败: {response["msg"]}')
|
||
return []
|
||
|
||
def get_article_detail(self, article_id: str) -> None:
|
||
"""浏览文章"""
|
||
self.log(f"浏览文章 article_id: {article_id}")
|
||
endpoint = self.API_ARTICLE_DETAIL.format(article_id)
|
||
self.make_request("GET", endpoint)
|
||
|
||
def submit_article_score(self) -> None:
|
||
"""提交文章积分"""
|
||
json_data = {
|
||
"ctu_token": "",
|
||
"action": 12,
|
||
}
|
||
response = self.make_request(
|
||
"POST", self.API_ARTICLE_SCORE_SUBMIT, json=json_data
|
||
)
|
||
print(f"submit_article_score API response ——> {response}")
|
||
|
||
if response["code"] == 0:
|
||
score = response["data"]["score"]
|
||
self.log(f"✅ 浏览文章成功 | 积分 +{score}")
|
||
else:
|
||
self.log(f'❌ 浏览文章失败: {response["msg"]}')
|
||
|
||
# 答题相关
|
||
def get_question_info(self, share_user_hid: str) -> None:
|
||
"""执行答题任务"""
|
||
params = {"date": datetime.now().strftime("%Y%m%d")}
|
||
response = self.make_request("GET", self.API_QUESTION_INFO, params=params)
|
||
print(f"get_question_info API response ——> {response}")
|
||
if response["code"] != 0:
|
||
self.log(f'❌ 获取问题失败: {response["msg"]}')
|
||
return
|
||
# response['data']['state'] 1=表示未答题 2=已答题且正确 3=答错且未有人帮忙答题 4=答错但有人帮忙答题
|
||
if response["data"].get("state") == 3:
|
||
self.log("今日已答题但回答错误,当前无人帮助答题,跳过")
|
||
return
|
||
if response["data"].get("state") != 1:
|
||
if response["data"].get("answer"):
|
||
answer = response["data"]["answer"][0]
|
||
if answer in ["A", "B", "C", "D"]:
|
||
self.correct_answer = answer
|
||
self.log(f"今日已答题,跳过,答案:{answer}")
|
||
return
|
||
self.log("今日已答题,但未获取到答案,跳过")
|
||
return
|
||
|
||
question_info = response["data"]["question_info"]
|
||
questions_hid = question_info["questions_hid"]
|
||
|
||
# 构建问题字符串,只包含未被标记为错误的选项
|
||
question_str = f"{question_info['content']}\n"
|
||
valid_options = []
|
||
for option in question_info["option"]:
|
||
if option["option"] not in self.wrong_answers:
|
||
valid_options.append(option)
|
||
question_str += f'{option["option"]}. {option["option_content"]}\n'
|
||
else:
|
||
print(f"跳过错误选项 {option['option']}. {option['option_content']}")
|
||
|
||
print(f"\n问题详情:\n{question_str}")
|
||
|
||
# 如果只剩一个选项,直接使用
|
||
if len(valid_options) == 1:
|
||
answer = valid_options[0]["option"]
|
||
self.log(f"仅剩一个选项,使用答案: {answer}")
|
||
time.sleep(random.randint(3, 5))
|
||
self.submit_question_answer(questions_hid, answer, share_user_hid)
|
||
return
|
||
|
||
# 获取答案并提交
|
||
answer = self.get_question_answer(question_str)
|
||
|
||
time.sleep(random.randint(3, 5))
|
||
self.submit_question_answer(questions_hid, answer, share_user_hid)
|
||
|
||
def get_ai_answer(self, question: str) -> str:
|
||
"""获取AI答案"""
|
||
headers = {
|
||
"Authorization": f"Bearer {self.ai_api_key}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
prompt = f"你是一个专业的北京现代汽车专家,请直接给出这个单选题的答案,并且不要带'答案'等其他内容。\n{question}"
|
||
json_data = {
|
||
"model": "hunyuan-turbo",
|
||
"messages": [{"role": "user", "content": prompt}],
|
||
"enable_enhancement": True,
|
||
"force_search_enhancement": True,
|
||
"enable_instruction_search": True,
|
||
}
|
||
|
||
try:
|
||
response = requests.post(
|
||
"https://api.hunyuan.cloud.tencent.com/v1/chat/completions",
|
||
headers=headers,
|
||
json=json_data,
|
||
)
|
||
response.raise_for_status()
|
||
response_json = response.json()
|
||
print(f"腾讯混元AI API response ——> {response_json}")
|
||
|
||
# 获取AI回答内容并转大写
|
||
ai_response = response_json["choices"][0]["message"]["content"].upper()
|
||
|
||
# 使用集合操作找出有效答案
|
||
valid_answers = set("ABCD") - self.wrong_answers
|
||
found_answers = set(ai_response) & valid_answers
|
||
|
||
# 如果找到答案则返回其中一个
|
||
if found_answers:
|
||
return found_answers.pop()
|
||
else:
|
||
print(f"❌ 没有找到符合的 AI 答案")
|
||
return ""
|
||
|
||
except Exception as e:
|
||
print(f"腾讯混元AI API 请求失败: {str(e)}")
|
||
|
||
return ""
|
||
|
||
def get_question_answer(self, question: str) -> str:
|
||
"""获取答题答案"""
|
||
# 1. 存在正确答案时,使用正确答案
|
||
if self.correct_answer:
|
||
self.log(f"使用历史正确答案: {self.correct_answer}")
|
||
return self.correct_answer
|
||
|
||
# 2. 存在预设答案时,使用预设答案
|
||
if self.preset_answer:
|
||
self.log(f"使用预设答案: {self.preset_answer}")
|
||
return self.preset_answer
|
||
|
||
# 3. 存在AI APIKey时,使用AI答案
|
||
if self.ai_api_key:
|
||
ai_answer = self.get_ai_answer(question)
|
||
if ai_answer:
|
||
self.log(f"使用AI答案: {ai_answer}")
|
||
return ai_answer
|
||
|
||
# 4. 随机选择答案(排除错误答案)
|
||
answer = self.get_random_answer()
|
||
self.log(f"随机答题,答案: {answer}")
|
||
return answer
|
||
|
||
def get_random_answer(self) -> str:
|
||
"""获取随机答案,排除已知错误答案"""
|
||
available_answers = set(["A", "B", "C", "D"]) - self.wrong_answers
|
||
if not available_answers:
|
||
self.wrong_answers.clear()
|
||
available_answers = set(["A", "B", "C", "D"])
|
||
return random.choice(list(available_answers))
|
||
|
||
def get_answered_question(self) -> None:
|
||
"""从已答题账号获取答案"""
|
||
params = {"date": datetime.now().strftime("%Y%m%d")}
|
||
response = self.make_request("GET", self.API_QUESTION_INFO, params=params)
|
||
print(f"get_answered_question API response ——> {response}")
|
||
if response["code"] != 0:
|
||
self.log(f'❌ 从已答题账号获取问题失败: {response["msg"]}')
|
||
return
|
||
# response['data']['state'] 1=表示未答题 2=已答题且正确 4=已答题但错误
|
||
if response["code"] == 0 and response["data"].get("answer"):
|
||
answer = response["data"]["answer"][0]
|
||
if answer in ["A", "B", "C", "D"]:
|
||
self.correct_answer = answer
|
||
self.log(f"从已答题账号获取到答案:{answer}")
|
||
return
|
||
self.log("从已答题账号获取答案失败")
|
||
|
||
def submit_question_answer(
|
||
self, question_id: str, answer: str, share_user_hid: str
|
||
) -> None:
|
||
"""提交答题答案"""
|
||
json_data = {
|
||
"answer": answer,
|
||
"questions_hid": question_id,
|
||
"ctu_token": "",
|
||
}
|
||
if share_user_hid:
|
||
json_data["date"] = datetime.now().strftime("%Y%m%d")
|
||
json_data["share_user_hid"] = share_user_hid
|
||
|
||
response = self.make_request("POST", self.API_QUESTION_SUBMIT, json=json_data)
|
||
print(f"submit_question_answer API response ——> {response}")
|
||
|
||
if response["code"] == 0:
|
||
data = response["data"]
|
||
if data["state"] == 3: # 答错
|
||
# 记录错误答案
|
||
self.wrong_answers.add(answer)
|
||
# 如果是正确答案,清除它
|
||
if self.correct_answer == answer:
|
||
self.correct_answer = ""
|
||
# 如果是预设答案,清除它
|
||
if self.preset_answer == answer:
|
||
self.preset_answer = ""
|
||
self.log("❌ 答题错误")
|
||
elif data["state"] == 2: # 答对了
|
||
if self.correct_answer != answer:
|
||
self.correct_answer = answer
|
||
score = data["answer_score"]
|
||
self.log(f"✅ 答题正确 | 积分 +{score}")
|
||
else:
|
||
self.log(f'❌ 答题失败: {response["msg"]}')
|
||
|
||
def get_backup_share_hid(self, user_hid: str) -> str:
|
||
"""从备用 hid 列表中获取一个不同于用户自身的 hid"""
|
||
available_hids = [hid for hid in self.BACKUP_HIDS if hid != user_hid]
|
||
return random.choice(available_hids) if available_hids else ""
|
||
|
||
def run(self) -> None:
|
||
"""运行主程序"""
|
||
# 使用列表保持顺序,使用集合实现去重
|
||
tokens = []
|
||
tokens_set = set()
|
||
|
||
# 方式1: 从BJXD环境变量获取(逗号分隔的多个token)
|
||
token_str = os.getenv("BJXD")
|
||
if token_str:
|
||
# 过滤空值并保持顺序添加
|
||
for token in token_str.split(","):
|
||
token = token.strip()
|
||
if token and token not in tokens_set:
|
||
tokens.append(token)
|
||
tokens_set.add(token)
|
||
|
||
# 方式2: 从BJXD1/BJXD2/BJXD3等环境变量获取
|
||
i = 1
|
||
empty_count = 0 # 记录连续空值的数量
|
||
while empty_count < 5: # 连续5个空值才退出
|
||
token = os.getenv(f"BJXD{i}")
|
||
if not token:
|
||
empty_count += 1
|
||
else:
|
||
token = token.strip()
|
||
if token and token not in tokens_set: # 确保token不是空字符串且未重复
|
||
empty_count = 0 # 重置连续空值计数
|
||
tokens.append(token)
|
||
tokens_set.add(token)
|
||
i += 1
|
||
|
||
if not tokens:
|
||
self.log(
|
||
"⛔️ 未获取到 tokens, 请检查环境变量 BJXD 或 BJXD1/BJXD2/... 是否填写"
|
||
)
|
||
self.push_notification()
|
||
return
|
||
|
||
self.log(f"👻 共获取到用户 token {len(tokens)} 个")
|
||
|
||
self.ai_api_key = os.getenv("HUNYUAN_API_KEY", "")
|
||
self.log(
|
||
"💯 已获取到腾讯混元 AI APIKey, 使用腾讯混元 AI 答题"
|
||
if self.ai_api_key
|
||
else "😭 未设置腾讯混元 AI HUNYUAN_API_KEY 环境变量,使用随机答题"
|
||
)
|
||
|
||
# 获取预设答案
|
||
self.preset_answer = os.getenv("BJXD_ANSWER", "").upper()
|
||
if self.preset_answer:
|
||
if self.preset_answer in ["A", "B", "C", "D"]:
|
||
self.log(f"📝 已获取预设答案: {self.preset_answer}")
|
||
else:
|
||
self.preset_answer = ""
|
||
self.log("❌ 预设答案格式错误,仅支持 A/B/C/D")
|
||
|
||
# 获取所有用户信息
|
||
for token in tokens:
|
||
self.token = token
|
||
user = self.get_user_info()
|
||
if user:
|
||
self.users.append(user)
|
||
|
||
if not self.users:
|
||
self.log("❌ 未获取到有效用户")
|
||
return
|
||
|
||
# 设置分享用户ID
|
||
for i, user in enumerate(self.users):
|
||
prev_index = (i - 1) if i > 0 else len(self.users) - 1
|
||
# 如果有多个用户且上一个用户不是自己,使用上一个用户的 hid
|
||
if len(self.users) > 1 and self.users[prev_index]["hid"] != user["hid"]:
|
||
user["share_user_hid"] = self.users[prev_index]["hid"]
|
||
else:
|
||
# 否则从备用 hid 列表中选择一个
|
||
user["share_user_hid"] = self.get_backup_share_hid(user["hid"])
|
||
|
||
# 执行任务
|
||
self.log("\n============ 执行任务 ============")
|
||
for i, user in enumerate(self.users, 1):
|
||
# 更新当前用户信息
|
||
self.token = user["token"]
|
||
self.user = user
|
||
|
||
# 随机延迟
|
||
if i > 1:
|
||
print("\n进行下一个账号, 等待 5-10 秒...")
|
||
time.sleep(random.randint(5, 10))
|
||
|
||
self.log(f"\n======== ▷ 第 {i} 个账号 ◁ ========")
|
||
|
||
# 打印用户信息
|
||
self.log(
|
||
f"👻 用户名: {self.user['nickname']} | "
|
||
f"手机号: {self.user['phone']} | "
|
||
f"积分: {self.user['score_value']}\n"
|
||
f"🆔 用户hid: {self.user['hid']}\n"
|
||
f"🆔 分享hid: {self.user['share_user_hid']}"
|
||
)
|
||
|
||
# 检查任务状态
|
||
self.check_task_status(self.user)
|
||
self.log(f"任务状态: {self.user['task']}")
|
||
|
||
# 调试使用 设置任务状态
|
||
# self.user["task"]["sign"] = False
|
||
# self.user["task"]["view"] = False
|
||
# self.user["task"]["question"] = False
|
||
|
||
# 签到
|
||
if not self.user["task"]["sign"]:
|
||
self.get_sign_info()
|
||
time.sleep(random.randint(5, 10))
|
||
else:
|
||
self.log("✅ 签到任务 已完成,跳过")
|
||
|
||
# 阅读文章
|
||
if not self.user["task"]["view"]:
|
||
article_ids = self.get_article_list()
|
||
if article_ids:
|
||
for article_id in article_ids: # 已经只有3篇了
|
||
self.get_article_detail(article_id)
|
||
time.sleep(random.randint(10, 15))
|
||
self.submit_article_score()
|
||
else:
|
||
self.log("✅ 浏览文章任务 已完成,跳过")
|
||
|
||
# 答题
|
||
if not self.user["task"]["question"]:
|
||
self.get_question_info(self.user["share_user_hid"])
|
||
else:
|
||
self.log("✅ 答题任务 已完成,跳过")
|
||
if not self.correct_answer:
|
||
self.get_answered_question()
|
||
|
||
self.log("\n============ 积分详情 ============")
|
||
for i, user in enumerate(self.users, 1):
|
||
if i > 1:
|
||
print("\n进行下一个账号, 等待 5-10 秒...")
|
||
time.sleep(random.randint(5, 10))
|
||
|
||
# 更新当前用户信息
|
||
self.token = user["token"]
|
||
self.user = user
|
||
|
||
self.log(f"\n======== ▷ 第 {i} 个账号 ◁ ========")
|
||
|
||
# 打印用户信息
|
||
self.log(
|
||
f"👻 用户名: {self.user['nickname']} | 手机号: {self.user['phone']}"
|
||
)
|
||
|
||
# 显示积分详情
|
||
self.get_score_details()
|
||
|
||
# 最后推送通知
|
||
self.push_notification()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
BeiJingHyundai().run() |