Files
smallfawn-QLScriptPublic/bjxd.py
2025-03-24 07:59:20 +08:00

641 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
北京现代 APP 自动任务脚本
功能:自动完成签到、浏览文章、每日答题等任务
new Env("北京现代")
cron: 25 6 * * *
环境变量:
BJXD_DEVICE 安卓写android 苹果IOS写iOS
BJXD: str - 北京现代 APP api token (多个账号用英文逗号分隔,建议每个账号一个变量)
BJXD1/BJXD2/BJXD3: str - 北京现代 APP api token (每个账号一个变量)
BJXD_ANSWER: str - 预设答案 (可选, ABCD 中的一个)
HUNYUAN_API_KEY: str - 腾讯混元AI APIKey (可选)
"""
import os
import random
import time
from datetime import datetime
from typing import List, Dict, Any
import requests
from urllib3.exceptions import InsecureRequestWarning, InsecurePlatformWarning
# 禁用 SSL 警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
requests.packages.urllib3.disable_warnings(InsecurePlatformWarning)
class BeiJingHyundai:
"""北京现代APP自动任务类"""
# 基础配置
NAME = "北京现代 APP 自动任务"
BASE_URL = "https://bm2-api.bluemembers.com.cn"
# API endpoints
API_USER_INFO = "/v1/app/account/users/info"
API_MY_SCORE = "/v1/app/user/my_score"
API_TASK_LIST = "/v1/app/user/task/list"
API_SIGN_LIST = "/v1/app/user/reward_list"
API_SIGN_SUBMIT = "/v1/app/user/reward_report"
API_ARTICLE_LIST = "/v1/app/white/article/list2"
API_ARTICLE_DETAIL = "/v1/app/white/article/detail_app/{}"
API_ARTICLE_SCORE_SUBMIT = "/v1/app/score"
API_QUESTION_INFO = "/v1/app/special/daily/ask_info"
API_QUESTION_SUBMIT = "/v1/app/special/daily/ask_answer"
# 预设的备用 share_user_hid 列表
BACKUP_HIDS = [
"a6688ec1a9ee429fa7b68d50e0c92b1f",
"bb8cd2e44c7b45eeb8cc5f7fa71c3322",
"5f640c50061b400c91be326c8fe0accd",
"55a5d82dacd9417483ae369de9d9b82d",
]
def __init__(self):
"""初始化实例变量"""
self.token: str = "" # 当前用户token
self.user: Dict[str, Any] = {} # 当前用户信息
self.users: List[Dict[str, Any]] = [] # 所有用户信息列表
self.correct_answer: str = "" # 正确答案
self.preset_answer: str = "" # 预设答案
self.ai_api_key: str = "" # 腾讯混元AI APIKey
self.wrong_answers: set = set() # 错误答案集合
self.log_content: str = "" # 日志内容
def log(self, content: str, print_to_console: bool = True) -> None:
"""添加日志"""
if print_to_console:
print(content)
self.log_content += content + "\n"
def push_notification(self) -> None:
"""推送通知"""
try:
QLAPI.notify(self.NAME, self.log_content)
except NameError:
print(f"\n\n🚀 推送通知\n\n{self.NAME}\n\n{self.log_content}")
def make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
"""
发送API请求
Args:
method: 请求方法 (GET/POST)
endpoint: API端点
**kwargs: 请求参数
Returns:
Dict[str, Any]: API响应数据
"""
url = f"{self.BASE_URL}{endpoint}"
headers = {"token": self.token, "device": os.getenv("BJXD_DEVICE", "android")}
if "headers" not in kwargs:
kwargs["headers"] = headers
else:
kwargs["headers"].update(headers)
try:
response = requests.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.log(f"❌ API request failed: {str(e)}")
return {"code": -1, "msg": str(e)}
def get_user_info(self) -> Dict[str, Any]:
"""
获取用户信息
Returns:
Dict[str, Any]: 用户信息字典,获取失败返回空字典
"""
response = self.make_request("GET", self.API_USER_INFO)
print(f"get_user_info API response ——> {response}")
if response["code"] == 0:
data = response["data"]
# 直接生成掩码后的手机号
masked_phone = f"{data['phone'][:3]}******{data['phone'][-2:]}"
return {
"token": self.token,
"hid": data["hid"],
"nickname": data["nickname"],
"phone": masked_phone, # 直接存储掩码后的手机号
"score_value": data["score_value"],
"share_user_hid": "",
"task": {"sign": False, "view": False, "question": False},
}
self.log(f"❌ 账号已失效, 请重新获取 token: {self.token}")
return {}
def get_score_details(self) -> None:
"""显示积分详情,包括总积分、今日变动和最近记录"""
params = {"page_no": "1", "page_size": "10"} # 获取最近10条记录
response = self.make_request("GET", self.API_MY_SCORE, params=params)
print(f"get_score_details API response ——> {response}")
if response["code"] == 0:
data = response["data"]
# 先获取今日记录
today = datetime.now().strftime("%Y-%m-%d")
today_records = [
record
for record in data["points_record"]["list"]
if record["created_at"].startswith(today)
]
# 计算今日积分变化
today_score = sum(
int(record["score_str"].strip("+")) for record in today_records
)
today_score_str = f"+{today_score}" if today_score > 0 else str(today_score)
self.log(f"🎉 总积分: {data['score']} | 今日积分变动: {today_score_str}")
# 输出今日积分记录
if today_records:
self.log("今日积分记录:")
for record in today_records:
self.log(
f"{record['created_at']} {record['desc']} {record['score_str']}"
)
else:
self.log("今日暂无积分变动")
# 任务相关
def check_task_status(self, user: Dict[str, Any]) -> None:
"""检查任务状态"""
response = self.make_request("GET", self.API_TASK_LIST)
print(f"get_task_status API response ——> {response}")
if response["code"] != 0:
self.log(f'❌ 获取任务列表失败: {response["msg"]}')
return
actions = response.get("data", {})
# 检查签到任务
if "action4" in actions:
user["task"]["sign"] = actions["action4"].get("status") == 1
else:
self.log("❌ task list action4 签到任务 不存在")
# 检查浏览文章任务
if "action12" in actions:
user["task"]["view"] = actions["action12"].get("status") == 1
else:
self.log("❌ task list action12 浏览文章任务 不存在")
# 检查答题任务
if "action39" in actions:
user["task"]["question"] = actions["action39"].get("status") == 1
else:
self.log("❌ task list action39 答题任务 不存在")
# 签到相关
def get_sign_info(self) -> None:
"""执行签到任务"""
max_attempts = 5 # 最大尝试次数
best_score = 0
best_params = None
for attempt in range(max_attempts):
response = self.make_request("GET", self.API_SIGN_LIST)
print(f"get_sign_info (attempt {attempt + 1}) API response ——> {response}")
if response["code"] != 0:
self.log(f'❌ 获取签到列表失败: {response["msg"]}')
break
data = response["data"]
hid = data["hid"]
reward_hash = data["rewardHash"]
for item in data["list"]:
if item["hid"] == hid:
current_score = item["score"]
print(
f"{attempt + 1}次获取签到列表: score={current_score} hid={hid} rewardHash={reward_hash}"
)
if current_score > best_score:
best_score = current_score
best_params = (hid, reward_hash, current_score)
print(f"当前可获得签到积分: {best_score}")
break
if attempt < max_attempts - 1: # 不是最后一次循环
print(f"继续尝试获取更高积分, 延时5-10s")
time.sleep(random.randint(5, 10))
else: # 最后一次循环 即将提交签到
print(f"即将提交签到, 延时3-4s")
time.sleep(random.randint(3, 4))
if best_params:
self.submit_sign(*best_params)
else:
self.log("❌ 未能获取到有效的签到参数")
def submit_sign(self, hid: str, reward_hash: str, score: int) -> None:
"""提交签到"""
json_data = {
"hid": hid,
"hash": reward_hash,
"sm_deviceId": "",
"ctu_token": None,
}
response = self.make_request("POST", self.API_SIGN_SUBMIT, json=json_data)
print(f"submit_sign API response ——> {response}")
if response["code"] == 0:
self.log(f"✅ 签到成功 | 积分 +{score}")
else:
self.log(f'❌ 签到失败: {response["msg"]}')
# 文章浏览相关
def get_article_list(self) -> List[str]:
"""获取文章列表"""
params = {
"page_no": "1",
"page_size": "20",
"type_hid": "",
}
response = self.make_request("GET", self.API_ARTICLE_LIST, params=params)
print(f"get_article_list API response ——> {response}")
if response["code"] == 0:
# 从文章列表中随机选择3个ID
article_list = [item["data_id"] for item in response["data"]["list"]]
return random.sample(article_list, min(3, len(article_list)))
self.log(f'❌ 获取文章列表失败: {response["msg"]}')
return []
def get_article_detail(self, article_id: str) -> None:
"""浏览文章"""
self.log(f"浏览文章 article_id: {article_id}")
endpoint = self.API_ARTICLE_DETAIL.format(article_id)
self.make_request("GET", endpoint)
def submit_article_score(self) -> None:
"""提交文章积分"""
json_data = {
"ctu_token": "",
"action": 12,
}
response = self.make_request(
"POST", self.API_ARTICLE_SCORE_SUBMIT, json=json_data
)
print(f"submit_article_score API response ——> {response}")
if response["code"] == 0:
score = response["data"]["score"]
self.log(f"✅ 浏览文章成功 | 积分 +{score}")
else:
self.log(f'❌ 浏览文章失败: {response["msg"]}')
# 答题相关
def get_question_info(self, share_user_hid: str) -> None:
"""执行答题任务"""
params = {"date": datetime.now().strftime("%Y%m%d")}
response = self.make_request("GET", self.API_QUESTION_INFO, params=params)
print(f"get_question_info API response ——> {response}")
if response["code"] != 0:
self.log(f'❌ 获取问题失败: {response["msg"]}')
return
# response['data']['state'] 1=表示未答题 2=已答题且正确 3=答错且未有人帮忙答题 4=答错但有人帮忙答题
if response["data"].get("state") == 3:
self.log("今日已答题但回答错误,当前无人帮助答题,跳过")
return
if response["data"].get("state") != 1:
if response["data"].get("answer"):
answer = response["data"]["answer"][0]
if answer in ["A", "B", "C", "D"]:
self.correct_answer = answer
self.log(f"今日已答题,跳过,答案:{answer}")
return
self.log("今日已答题,但未获取到答案,跳过")
return
question_info = response["data"]["question_info"]
questions_hid = question_info["questions_hid"]
# 构建问题字符串,只包含未被标记为错误的选项
question_str = f"{question_info['content']}\n"
valid_options = []
for option in question_info["option"]:
if option["option"] not in self.wrong_answers:
valid_options.append(option)
question_str += f'{option["option"]}. {option["option_content"]}\n'
else:
print(f"跳过错误选项 {option['option']}. {option['option_content']}")
print(f"\n问题详情:\n{question_str}")
# 如果只剩一个选项,直接使用
if len(valid_options) == 1:
answer = valid_options[0]["option"]
self.log(f"仅剩一个选项,使用答案: {answer}")
time.sleep(random.randint(3, 5))
self.submit_question_answer(questions_hid, answer, share_user_hid)
return
# 获取答案并提交
answer = self.get_question_answer(question_str)
time.sleep(random.randint(3, 5))
self.submit_question_answer(questions_hid, answer, share_user_hid)
def get_ai_answer(self, question: str) -> str:
"""获取AI答案"""
headers = {
"Authorization": f"Bearer {self.ai_api_key}",
"Content-Type": "application/json",
}
prompt = f"你是一个专业的北京现代汽车专家,请直接给出这个单选题的答案,并且不要带'答案'等其他内容。\n{question}"
json_data = {
"model": "hunyuan-turbo",
"messages": [{"role": "user", "content": prompt}],
"enable_enhancement": True,
"force_search_enhancement": True,
"enable_instruction_search": True,
}
try:
response = requests.post(
"https://api.hunyuan.cloud.tencent.com/v1/chat/completions",
headers=headers,
json=json_data,
)
response.raise_for_status()
response_json = response.json()
print(f"腾讯混元AI API response ——> {response_json}")
# 获取AI回答内容并转大写
ai_response = response_json["choices"][0]["message"]["content"].upper()
# 使用集合操作找出有效答案
valid_answers = set("ABCD") - self.wrong_answers
found_answers = set(ai_response) & valid_answers
# 如果找到答案则返回其中一个
if found_answers:
return found_answers.pop()
else:
print(f"❌ 没有找到符合的 AI 答案")
return ""
except Exception as e:
print(f"腾讯混元AI API 请求失败: {str(e)}")
return ""
def get_question_answer(self, question: str) -> str:
"""获取答题答案"""
# 1. 存在正确答案时,使用正确答案
if self.correct_answer:
self.log(f"使用历史正确答案: {self.correct_answer}")
return self.correct_answer
# 2. 存在预设答案时,使用预设答案
if self.preset_answer:
self.log(f"使用预设答案: {self.preset_answer}")
return self.preset_answer
# 3. 存在AI APIKey时使用AI答案
if self.ai_api_key:
ai_answer = self.get_ai_answer(question)
if ai_answer:
self.log(f"使用AI答案: {ai_answer}")
return ai_answer
# 4. 随机选择答案(排除错误答案)
answer = self.get_random_answer()
self.log(f"随机答题,答案: {answer}")
return answer
def get_random_answer(self) -> str:
"""获取随机答案,排除已知错误答案"""
available_answers = set(["A", "B", "C", "D"]) - self.wrong_answers
if not available_answers:
self.wrong_answers.clear()
available_answers = set(["A", "B", "C", "D"])
return random.choice(list(available_answers))
def get_answered_question(self) -> None:
"""从已答题账号获取答案"""
params = {"date": datetime.now().strftime("%Y%m%d")}
response = self.make_request("GET", self.API_QUESTION_INFO, params=params)
print(f"get_answered_question API response ——> {response}")
if response["code"] != 0:
self.log(f'❌ 从已答题账号获取问题失败: {response["msg"]}')
return
# response['data']['state'] 1=表示未答题 2=已答题且正确 4=已答题但错误
if response["code"] == 0 and response["data"].get("answer"):
answer = response["data"]["answer"][0]
if answer in ["A", "B", "C", "D"]:
self.correct_answer = answer
self.log(f"从已答题账号获取到答案:{answer}")
return
self.log("从已答题账号获取答案失败")
def submit_question_answer(
self, question_id: str, answer: str, share_user_hid: str
) -> None:
"""提交答题答案"""
json_data = {
"answer": answer,
"questions_hid": question_id,
"ctu_token": "",
}
if share_user_hid:
json_data["date"] = datetime.now().strftime("%Y%m%d")
json_data["share_user_hid"] = share_user_hid
response = self.make_request("POST", self.API_QUESTION_SUBMIT, json=json_data)
print(f"submit_question_answer API response ——> {response}")
if response["code"] == 0:
data = response["data"]
if data["state"] == 3: # 答错
# 记录错误答案
self.wrong_answers.add(answer)
# 如果是正确答案,清除它
if self.correct_answer == answer:
self.correct_answer = ""
# 如果是预设答案,清除它
if self.preset_answer == answer:
self.preset_answer = ""
self.log("❌ 答题错误")
elif data["state"] == 2: # 答对了
if self.correct_answer != answer:
self.correct_answer = answer
score = data["answer_score"]
self.log(f"✅ 答题正确 | 积分 +{score}")
else:
self.log(f'❌ 答题失败: {response["msg"]}')
def get_backup_share_hid(self, user_hid: str) -> str:
"""从备用 hid 列表中获取一个不同于用户自身的 hid"""
available_hids = [hid for hid in self.BACKUP_HIDS if hid != user_hid]
return random.choice(available_hids) if available_hids else ""
def run(self) -> None:
"""运行主程序"""
# 使用列表保持顺序,使用集合实现去重
tokens = []
tokens_set = set()
# 方式1: 从BJXD环境变量获取(逗号分隔的多个token)
token_str = os.getenv("BJXD")
if token_str:
# 过滤空值并保持顺序添加
for token in token_str.split(","):
token = token.strip()
if token and token not in tokens_set:
tokens.append(token)
tokens_set.add(token)
# 方式2: 从BJXD1/BJXD2/BJXD3等环境变量获取
i = 1
empty_count = 0 # 记录连续空值的数量
while empty_count < 5: # 连续5个空值才退出
token = os.getenv(f"BJXD{i}")
if not token:
empty_count += 1
else:
token = token.strip()
if token and token not in tokens_set: # 确保token不是空字符串且未重复
empty_count = 0 # 重置连续空值计数
tokens.append(token)
tokens_set.add(token)
i += 1
if not tokens:
self.log(
"⛔️ 未获取到 tokens, 请检查环境变量 BJXD 或 BJXD1/BJXD2/... 是否填写"
)
self.push_notification()
return
self.log(f"👻 共获取到用户 token {len(tokens)}")
self.ai_api_key = os.getenv("HUNYUAN_API_KEY", "")
self.log(
"💯 已获取到腾讯混元 AI APIKey, 使用腾讯混元 AI 答题"
if self.ai_api_key
else "😭 未设置腾讯混元 AI HUNYUAN_API_KEY 环境变量,使用随机答题"
)
# 获取预设答案
self.preset_answer = os.getenv("BJXD_ANSWER", "").upper()
if self.preset_answer:
if self.preset_answer in ["A", "B", "C", "D"]:
self.log(f"📝 已获取预设答案: {self.preset_answer}")
else:
self.preset_answer = ""
self.log("❌ 预设答案格式错误,仅支持 A/B/C/D")
# 获取所有用户信息
for token in tokens:
self.token = token
user = self.get_user_info()
if user:
self.users.append(user)
if not self.users:
self.log("❌ 未获取到有效用户")
return
# 设置分享用户ID
for i, user in enumerate(self.users):
prev_index = (i - 1) if i > 0 else len(self.users) - 1
# 如果有多个用户且上一个用户不是自己,使用上一个用户的 hid
if len(self.users) > 1 and self.users[prev_index]["hid"] != user["hid"]:
user["share_user_hid"] = self.users[prev_index]["hid"]
else:
# 否则从备用 hid 列表中选择一个
user["share_user_hid"] = self.get_backup_share_hid(user["hid"])
# 执行任务
self.log("\n============ 执行任务 ============")
for i, user in enumerate(self.users, 1):
# 更新当前用户信息
self.token = user["token"]
self.user = user
# 随机延迟
if i > 1:
print("\n进行下一个账号, 等待 5-10 秒...")
time.sleep(random.randint(5, 10))
self.log(f"\n======== ▷ 第 {i} 个账号 ◁ ========")
# 打印用户信息
self.log(
f"👻 用户名: {self.user['nickname']} | "
f"手机号: {self.user['phone']} | "
f"积分: {self.user['score_value']}\n"
f"🆔 用户hid: {self.user['hid']}\n"
f"🆔 分享hid: {self.user['share_user_hid']}"
)
# 检查任务状态
self.check_task_status(self.user)
self.log(f"任务状态: {self.user['task']}")
# 调试使用 设置任务状态
# self.user["task"]["sign"] = False
# self.user["task"]["view"] = False
# self.user["task"]["question"] = False
# 签到
if not self.user["task"]["sign"]:
self.get_sign_info()
time.sleep(random.randint(5, 10))
else:
self.log("✅ 签到任务 已完成,跳过")
# 阅读文章
if not self.user["task"]["view"]:
article_ids = self.get_article_list()
if article_ids:
for article_id in article_ids: # 已经只有3篇了
self.get_article_detail(article_id)
time.sleep(random.randint(10, 15))
self.submit_article_score()
else:
self.log("✅ 浏览文章任务 已完成,跳过")
# 答题
if not self.user["task"]["question"]:
self.get_question_info(self.user["share_user_hid"])
else:
self.log("✅ 答题任务 已完成,跳过")
if not self.correct_answer:
self.get_answered_question()
self.log("\n============ 积分详情 ============")
for i, user in enumerate(self.users, 1):
if i > 1:
print("\n进行下一个账号, 等待 5-10 秒...")
time.sleep(random.randint(5, 10))
# 更新当前用户信息
self.token = user["token"]
self.user = user
self.log(f"\n======== ▷ 第 {i} 个账号 ◁ ========")
# 打印用户信息
self.log(
f"👻 用户名: {self.user['nickname']} | 手机号: {self.user['phone']}"
)
# 显示积分详情
self.get_score_details()
# 最后推送通知
self.push_notification()
if __name__ == "__main__":
BeiJingHyundai().run()