Files
XiaoGe-LiBai-yangmao/新疆联通1104.py
2025-11-10 16:12:48 +08:00

388 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 解析 ZY100_USER_TOKEN (格式: TOKEN1&REMARK1@TOKEN2@TOKEN3 或 多行输入)
# SCRIPT_NAME = "新疆联通开盒子 V1.2 (by:转变)"
# 变量值抓取”userToken“字段userToken有效期 3天
import requests
import json
import os
import sys
import time
import random
from datetime import datetime
from typing import List, Dict, Any
import base64 # <-- 新增: 用于解码 JWT Token
import re # <-- 新增: 用于检查手机号格式
# 引入青龙环境下的通用通知模块
try:
from notify import send
except ImportError:
def send(title, content):
sys.stdout.write(f"\n【通知模块缺失,仅打印结果】{title}:\n{content}\n")
# --- 辅助函数 ---
def get_phone_from_token(token: str) -> str:
"""尝试从JWT token中提取手机号用于生成默认备注 (脱敏格式: ****)"""
try:
# JWT 结构: header.payload.signature, 提取 payload 部分
parts = token.split('.')
if len(parts) != 3:
return ""
# Base64URL 解码需要手动添加 padding
payload_b64 = parts[1]
padding = '=' * (4 - (len(payload_b64) % 4))
# 使用 urlsafe_b64decode 处理 Base64URL 编码
decoded_payload = base64.urlsafe_b64decode(payload_b64 + padding).decode('utf-8')
# 解析 JSON 查找 'user_phone' 或 'sub'
payload_data = json.loads(decoded_payload)
phone = payload_data.get('user_phone') or payload_data.get('sub')
# 确保提取的是一个有效的手机号格式
if phone and re.match(r'^\d{11}$', str(phone)):
# 返回脱敏后的手机号
return str(phone)[:3] + '****' + str(phone)[7:]
except Exception:
# 如果解析失败不是JWT或格式错误返回空字符串
return ""
return ""
# --- 全局配置 ---
SCRIPT_NAME = "新疆联通开盒子 V1.2 (by:转变)"
# 关键配置:单账号尝试次数
# 用户请求将默认值修改为 '1'
ATTEMPT_COUNT = int(os.environ.get("UNICOM_ATTEMPT_COUNT", "1"))
# API配置
DRAW_URL = "https://zy100.xj169.com/touchpoint/openapi/marchAct/draw_Nov2025Act"
DRAW_PAYLOAD = {
'activityId': "Nov2025Act",
'prizeId': ""
}
# 获取奖品详情的API配置
PRIZE_DETAIL_URL = "https://zy100.xj169.com/touchpoint/openapi/drawAct/getMyPrize"
PRIZE_DETAIL_PAYLOAD = {
'activityId': "Nov2025Act"
}
# --- 账号解析逻辑 (支持备注/灵活格式) ---
ACCOUNT_LIST: List[Dict[str, str]] = []
# 1. 解析 ZY100_USER_TOKEN (格式: TOKEN1&REMARK1@TOKEN2@TOKEN3 或 多行输入)
ALL_TOKENS_RAW = os.environ.get("ZY100_USER_TOKEN", "")
# V15.3 改进: 统一分隔符。将所有换行符替换为 '@',并清理多余的 '@'。
ALL_TOKENS_RAW = ALL_TOKENS_RAW.replace('\n', '@').replace('\r', '').replace('@@', '@')
if ALL_TOKENS_RAW.startswith('@'):
ALL_TOKENS_RAW = ALL_TOKENS_RAW[1:]
for t_r in ALL_TOKENS_RAW.split('@'):
t_r = t_r.strip()
if not t_r:
continue
parts = t_r.split('&', 1) # 只分割一次,以支持备注中包含&
token = parts[0]
# 尝试从 token 提取手机号作为默认备注的基础
phone_suffix = get_phone_from_token(token)
# 如果提取到手机号使用脱敏手机号作为默认备注否则使用通用Account
default_remark = phone_suffix if phone_suffix else f"Account {len(ACCOUNT_LIST) + 1} (Main)"
# 如果提供了 & 分隔的备注则使用否则使用基于手机号或通用Account的默认备注
remark = parts[1] if len(parts) > 1 else default_remark
# 避免重复的 token
if token and token not in [acc['token'] for acc in ACCOUNT_LIST]:
ACCOUNT_LIST.append({"token": token, "remark": remark})
# 2. 解析 indexed 变量 (ZY100_USER_TOKEN_1, ZY100_REMARK_1)
i = 1
while os.environ.get(f"ZY100_USER_TOKEN_{i}"):
# 获取 token并检查是否内联了备注
raw_token_value = os.environ.get(f"ZY100_USER_TOKEN_{i}", "").strip()
# V15.3 改进: 确保 indexed 变量也支持内嵌换行符
raw_token_value = raw_token_value.replace('\n', '@').replace('\r', '').replace('@@', '@')
parts = raw_token_value.split('&', 1)
token = parts[0].split('@')[0] # 只需要第一个 token避免多行 token 影响
# 尝试从 token 提取手机号作为默认备注的基础
phone_suffix = get_phone_from_token(token)
default_remark = phone_suffix if phone_suffix else f"Account {len(ACCOUNT_LIST) + 1} (Indexed {i})"
# 优先使用独立的 ZY100_REMARK_i 变量
explicit_remark = os.environ.get(f"ZY100_REMARK_{i}", "").strip()
if explicit_remark:
remark = explicit_remark
elif len(parts) > 1: # 其次使用内联在 token 中的备注
remark = parts[1]
else: # 最后使用基于手机号的默认备注或通用默认备注
remark = default_remark
# 避免重复的 token
if token and token not in [acc['token'] for acc in ACCOUNT_LIST]:
ACCOUNT_LIST.append({"token": token, "remark": remark})
i += 1
# --- 脚本函数 (以下内容与 V15.1 保持一致) ---
def write_log(text):
"""自定义日志写入函数"""
sys.stdout.write(text + "\n")
def get_prize_details(user_token, remark, activity_id="Nov2025Act"):
"""
获取奖品详情信息
返回: (是否成功, 日志字符串, 奖品详情字典)
"""
log_messages = [f"--- 👤 账号备注: {remark} 的奖品详情 ---"]
prize_details = {}
headers = {
'User-Agent': "Mozilla/5.0 (iPhone; CPU iPhone OS 16_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 unicom{version:iphone_c@12.0701};ltst;OSVersion/16.2",
'Origin': "https://zy100.xj169.com",
'Referer': "https://zy100.xj169.com/touchpoint/openapi/jumpHandRoom1G?source=...",
'X-Requested-With': "XMLHttpRequest",
'Content-Type': 'application/x-www-form-urlencoded;charset=utf-8',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
'userToken': user_token,
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
payload = {
'activityId': activity_id
}
try:
response = requests.post(PRIZE_DETAIL_URL, data=payload, headers=headers, timeout=10)
if response.status_code != 200:
log_messages.append(f"❌ 获取奖品详情失败HTTP 状态码: {response.status_code}")
return False, "\n".join(log_messages), {}
try:
res_json = response.json()
except json.JSONDecodeError:
log_messages.append(f"⚠️ 奖品详情响应不是有效的 JSON 格式。")
return False, "\n".join(log_messages), {}
if res_json.get("code") == 0 or res_json.get("code") == "SUCCESS":
data = res_json.get('data', [])
# 确保 data 是列表,即使只返回一个奖品
if isinstance(data, dict):
data = [data]
if data:
prize_details = data
log_messages.append(f"🎁 成功查询到 {len(data)} 个奖品记录:")
# 格式化奖品详情信息只显示prizeId和drawDate
for i, prize in enumerate(data, 1):
prize_id = prize.get('prizeId', '未知奖品')
draw_date_timestamp = prize.get('drawDate', 0)
# 转换时间戳为可读格式
draw_date = '未知时间'
if draw_date_timestamp:
try:
draw_date = datetime.fromtimestamp(draw_date_timestamp / 1000).strftime('%Y-%m-%d %H:%M:%S')
except:
draw_date = str(draw_date_timestamp)
log_messages.append(f" ├── 奖品 {i}: {prize_id}")
log_messages.append(f" └── 获得时间: {draw_date}")
else:
log_messages.append("📝 当前暂无奖品记录。")
else:
log_messages.append(f"⚠️ 获取奖品详情失败: {res_json.get('msg', '未知错误')}")
# 如果失败,显示原始响应以便调试
if res_json.get('code') not in [0, "SUCCESS"]:
log_messages.append(json.dumps(res_json, indent=2, ensure_ascii=False))
except requests.exceptions.RequestException as e:
log_messages.append(f"❌ 获取奖品详情时网络请求异常: {e}")
return True, "\n".join(log_messages), prize_details
def perform_single_draw(user_token):
"""
单个抽奖请求逻辑,只执行一次
返回: (是否需要终止所有尝试的布尔值, 日志字符串, 状态码)
"""
log_messages = []
status = 'continue'
headers = {
'User-Agent': "Mozilla/5.0 (iPhone; CPU iPhone OS 16_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 unicom{version:iphone_c@12.0701};ltst;OSVersion/15.2",
'Origin': "https://zy100.xj169.com",
'Referer': "https://zy100.xj169.com/touchpoint/openapi/jumpHandRoom1G?source=...",
'X-Requested-With': "XMLHttpRequest",
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json, text/plain, */*',
'userToken': user_token,
'Accept-Language': "zh-CN,zh-Hans;q=0.9"
}
try:
response = requests.post(DRAW_URL, data=DRAW_PAYLOAD, headers=headers, timeout=10)
if response.status_code != 200:
log_messages.append(f"❌ 请求失败HTTP 状态码: {response.status_code}")
else:
try:
res_json = response.json()
except json.JSONDecodeError:
log_messages.append(f"⚠️ 响应不是有效的 JSON 格式。")
return False, "\n".join(log_messages), status
# 检查:请求频率过高 (立即终止)
if res_json.get("code") == 500 and "频率过高" in res_json.get("msg", ""):
log_messages.append("❌ 错误代码 500: 请求频率过高,强制终止!")
status = 'rate_limit'
# 检查缺少参数错误Token 失效)
elif res_json.get("code") == 500 and "缺少参数" in res_json.get("msg", ""):
log_messages.append("⚠️ 错误代码 500: Token 无效或已过期!终止尝试。")
status = 'invalid_token'
# 匹配:今日抽奖机会已用完
elif res_json.get("code") == "ERROR" and res_json.get("msg") == "thanks1":
info = res_json.get('data', '未知信息')
log_messages.append(f"✅ 抽奖机会状态: 今日已抽完。信息: {info}")
status = 'done' # 标记为完成,以便停止后续尝试
# 匹配:成功中奖
elif res_json.get("code") == "SUCCESS":
prize_name = res_json.get('data', '恭喜中奖')
log_messages.append(f"🎉 抽奖中奖!结果: {prize_name}")
status = 'won'
# 其他非预期响应
else:
log_messages.append("⚠️ 收到其他非预期响应:")
log_messages.append(json.dumps(res_json, indent=2, ensure_ascii=False))
except requests.exceptions.RequestException as e:
log_messages.append(f"❌ 网络请求异常发生: {e}")
return False, "\n".join(log_messages), status
def run_sign_in():
"""主执行逻辑:多账号串行处理,单账号中延迟多次尝试"""
all_results = [f"{SCRIPT_NAME}\n执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]
if not ACCOUNT_LIST:
error_msg = "❌ 错误:环境变量 ZY100_USER_TOKEN(s) 未配置或为空。"
all_results.append(error_msg)
write_log(error_msg)
return "\n".join(all_results)
write_log(f"✅ 成功检测到 {len(ACCOUNT_LIST)} 个账号,每个账号将被串行尝试 {ATTEMPT_COUNT} 次...")
# 当尝试次数为 1 时,此行日志无意义
if ATTEMPT_COUNT > 1:
write_log(f"每次尝试之间将随机延迟 0.5 秒到 1.5 秒。")
final_push_messages = []
for idx, acc in enumerate(ACCOUNT_LIST, 1):
token = acc['token']
remark = acc['remark']
account_log_buffer = [] # 存储当前账号的详细日志
account_final_status = "未尝试"
account_header = f"\n\n================ 👤 账号 {idx}/{len(ACCOUNT_LIST)} [备注: {remark}] 开始执行 ================"
write_log(account_header)
account_log_buffer.append(account_header)
for attempt in range(ATTEMPT_COUNT):
log_header = f"--- 尝试次数 {attempt + 1}/{ATTEMPT_COUNT} ---"
write_log(log_header)
account_log_buffer.append(log_header)
_, result_log, status = perform_single_draw(token)
write_log(result_log)
account_log_buffer.append(result_log)
# 更新最终状态,如果成功中奖/已抽完/致命错误,则中断
if status == 'done':
account_final_status = "今日机会已用完"
break
elif status == 'won':
account_final_status = "✅ 中奖成功"
break
elif status == 'invalid_token':
account_final_status = "❌ Token 无效或过期"
break
elif status == 'rate_limit':
account_final_status = "❌ 请求频率过高"
break
else:
account_final_status = "抽奖失败/未中奖"
# 如果不是最后一次尝试(并且尝试次数大于 1引入延迟
if ATTEMPT_COUNT > 1 and attempt < ATTEMPT_COUNT - 1 and status not in ['done', 'won']:
wait_time = random.uniform(0.5, 1.5)
write_log(f"等待 {wait_time:.3f} 秒后进行下一次尝试...")
time.sleep(wait_time)
# --- 任务结束:获取奖品详情并汇总 ---
write_log(f"\n--- 账号 {idx} 抽奖完成,获取奖品详情 ---")
detail_success, detail_log, prize_details = get_prize_details(token, remark)
write_log(detail_log)
account_log_buffer.append(detail_log)
write_log(f"================ 👤 账号 {idx} [备注: {remark}] 执行完毕 (状态: {account_final_status}) ================")
account_log_buffer.append(f"================ 状态: {account_final_status} ================")
# 将当前账号的详细日志加入总日志
all_results.extend(account_log_buffer)
# 构造推送消息摘要
detail_summary_lines = [line for line in detail_log.splitlines() if line.startswith((' ├──', ' └──'))]
detail_summary = "\n".join(detail_summary_lines)
if not detail_summary_lines:
# 检查是否有“暂无奖品记录”的提示
if "暂无奖品记录" in detail_log:
detail_summary = "📝 当前暂无奖品记录。"
else:
detail_summary = "❌ 奖品查询失败,请检查日志。"
final_push_messages.append(f"👤 [备注: {remark}] 状态: {account_final_status}\n{detail_summary}\n" + "-"*30)
# 账号切换间歇休息 1-2 秒
time.sleep(random.uniform(1.0, 2.0))
final_log_content = "\n".join(all_results)
final_notification = f"{SCRIPT_NAME}\n执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n--- 🛎️ 执行摘要 (按备注区分) 🛎️ ---\n" + "\n".join(final_push_messages)
write_log("\n--- 推送日志开始 ---")
write_log(final_notification)
write_log("--- 推送日志结束 ---")
return final_notification
if __name__ == "__main__":
# 1. 执行任务并获取推送内容
push_content = run_sign_in()
# 2. 使用通用的 send 函数进行推送
send(SCRIPT_NAME, push_content)