Create youzan.py

This commit is contained in:
XiaoGe-LiBai
2025-12-10 15:07:17 +08:00
parent 9198df06c7
commit de0bbe7171

999
youzan/youzan.py Normal file
View File

@@ -0,0 +1,999 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
有赞商城自动签到脚本
【环境变量】
export YOUZAN_ACCOUNT=手机号#密码 (必填)
export CAPTCHA_SERVER=验证码服务器 (可选)
【店铺配置】
在 SHOPS 列表中添加: {"kdt_id": "店铺ID", "checkin_id": "签到ID"}
店铺名称会自动获取,无需手动填写
【定时任务】
cron: 0 8 * * * (每天早上8点)
【功能特性】
✅ 密码登录 + 自动验证码
✅ Token 自动缓存7天有效
✅ Token 失效自动重登
✅ 积分查询 + 商品查询
✅ 自动获取店铺名称
更新: 2025-12-10
"""
import requests
import json
import os
import sys
import time
import re
from datetime import datetime
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
import base64
import hashlib
# 修复 Windows 控制台编码问题
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# ==================== Token 缓存配置 ====================
# Token 缓存文件路径(存储在脚本同目录下)
TOKEN_CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".youzan_cache")
# =====================================================
# ==================== 店铺配置区域 ====================
# 在这里添加你需要签到的店铺信息
# 格式:{"kdt_id": "店铺ID", "checkin_id": "签到活动ID"}
SHOPS = [
{"kdt_id": "18739377", "checkin_id": "12063"},
{"kdt_id": "44877243", "checkin_id": "2162835"},
# {"kdt_id": "41067901", "checkin_id": "99"},
{"kdt_id": "129380009", "checkin_id": "3520910"},
# {"kdt_id": "109809208", "checkin_id": "2923467"},
{"kdt_id": "43958855", "checkin_id": "2910869"},
{"kdt_id": "42213767", "checkin_id": "2386563"},
{"kdt_id": "100464643", "checkin_id": "1597464"},
{"kdt_id": "117130552", "checkin_id": "3347128"},
{"kdt_id": "107786737", "checkin_id": "2299510"},
{"kdt_id": "44694253", "checkin_id": "18415"},
# {"kdt_id": "130177909", "checkin_id": "3549859"},
{"kdt_id": "93457151", "checkin_id": "1820214"},
{"kdt_id": "105036832", "checkin_id": "3997371"},
# {"kdt_id": "121810522", "checkin_id": "4804346"},
{"kdt_id": "146288343", "checkin_id": "4296415"},
{"kdt_id": "139827364", "checkin_id": "4806300"},
{"kdt_id": "16365465", "checkin_id": "13736"},
{"kdt_id": "77770507", "checkin_id": "9975"},
{"kdt_id": "43183730", "checkin_id": "24630"},
{"kdt_id": "96543670", "checkin_id": "3721624"},
{"kdt_id": "113745713", "checkin_id": "3800805"},
{"kdt_id": "179778907", "checkin_id": "5428477"},
{"kdt_id": "77093133", "checkin_id": "2947047"},
{"kdt_id": "44519665", "checkin_id": "2682116"},
{"kdt_id": "149536603", "checkin_id": "6287727"},
]
# =====================================================
class TokenCache:
"""Token 缓存管理类"""
def __init__(self, cache_dir: str = TOKEN_CACHE_DIR):
"""
初始化 Token 缓存管理器
:param cache_dir: 缓存目录路径
"""
self.cache_dir = cache_dir
# 确保缓存目录存在
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
@staticmethod
def _get_account_hash(mobile: str) -> str:
"""
生成账号的哈希值(用于文件名)
:param mobile: 手机号
:return: 哈希值
"""
return hashlib.md5(mobile.encode('utf-8')).hexdigest()
def get_cache_file(self, mobile: str) -> str:
"""
获取缓存文件路径
:param mobile: 手机号
:return: 缓存文件路径
"""
account_hash = self._get_account_hash(mobile)
return os.path.join(self.cache_dir, f"token_{account_hash}.json")
def save_token(self, mobile: str, token: str) -> bool:
"""
保存 Token 到缓存
:param mobile: 手机号
:param token: Token
:return: 是否保存成功
"""
try:
cache_file = self.get_cache_file(mobile)
cache_data = {
"mobile": mobile,
"token": token,
"timestamp": int(time.time())
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"⚠️ 保存 Token 缓存失败: {e}")
return False
def load_token(self, mobile: str) -> dict:
"""
从缓存加载 Token
:param mobile: 手机号
:return: Token 信息字典
"""
try:
cache_file = self.get_cache_file(mobile)
if not os.path.exists(cache_file):
return {
"success": False,
"message": "缓存文件不存在"
}
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
# 检查缓存是否过期7天
cache_time = cache_data.get("timestamp", 0)
current_time = int(time.time())
if current_time - cache_time > 7 * 24 * 3600:
return {
"success": False,
"message": "缓存已过期超过7天"
}
return {
"success": True,
"token": cache_data.get("token"),
"mobile": cache_data.get("mobile"),
"cache_time": cache_time
}
except Exception as e:
return {
"success": False,
"message": f"读取缓存失败: {str(e)}"
}
def clear_token(self, mobile: str) -> bool:
"""
清除指定账号的 Token 缓存
:param mobile: 手机号
:return: 是否清除成功
"""
try:
cache_file = self.get_cache_file(mobile)
if os.path.exists(cache_file):
os.remove(cache_file)
return True
except Exception as e:
print(f"⚠️ 清除 Token 缓存失败: {e}")
return False
class YouzanAuth:
"""有赞账号认证类"""
# AES 加密配置(从 JS 脚本中提取)
AES_KEY = b"youzan.com._key_"
AES_IV = b"youzan.com.aesiv"
# 验证码服务器地址(从环境变量读取,默认使用 JS 脚本中的地址)
CAPTCHA_SERVER = os.getenv("CAPTCHA_SERVER", "https://captcha.fuckinghigh.eu.org")
def __init__(self):
self.session = requests.Session()
@staticmethod
def calculate_password_level(password: str) -> int:
"""
计算密码强度等级
:param password: 密码
:return: 密码等级 (-1 到 2)
"""
level = -1
if re.search(r'\d+', password):
level += 1
if re.search(r'[a-zA-Z]+', password):
level += 1
if re.search(r'[^A-Za-z0-9]+', password):
level += 1
return level
@staticmethod
def encrypt_password(password: str) -> str:
"""
使用 AES-CBC 加密密码
:param password: 明文密码
:return: 加密后的密码Base64编码
"""
cipher = AES.new(YouzanAuth.AES_KEY, AES.MODE_CBC, YouzanAuth.AES_IV)
padded_data = pad(password.encode('utf-8'), AES.block_size)
encrypted = cipher.encrypt(padded_data)
return base64.b64encode(encrypted).decode('utf-8')
def get_captcha_ticket(self, aid_encrypted: str) -> dict:
"""
从验证码服务器获取 ticket
:param aid_encrypted: 加密的 aid
:return: 验证码 ticket 信息
"""
try:
captcha_url = f"{self.CAPTCHA_SERVER}/captcha?aidEncrypted={aid_encrypted}"
print(f" 🔐 正在获取验证码...")
response = requests.get(captcha_url, timeout=30)
response.raise_for_status()
result = response.json()
if result.get("ticket"):
print(f" ✅ 验证码获取成功")
return {
"success": True,
"ticket": result.get("ticket"),
"randstr": result.get("randstr"),
"verifyDuration": result.get("verifyDuration", 0),
"actionDuration": result.get("actionDuration", 0),
"sid": result.get("sid", "")
}
else:
return {
"success": False,
"message": "验证码服务器未返回 ticket"
}
except Exception as e:
return {
"success": False,
"message": f"获取验证码失败: {str(e)}"
}
def login_with_password(self, mobile: str, password: str) -> dict:
"""
使用手机号和密码登录(支持验证码)
:param mobile: 手机号
:param password: 密码
:return: 登录结果
"""
url = "https://passport.youzan.com/api/login/password.json"
encrypted_password = self.encrypt_password(password)
password_level = self.calculate_password_level(password)
payload = {
"countryCode": "+86",
"mobile": mobile,
"password": encrypted_password,
"passwordLevel": password_level,
"passwordLength": len(password)
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
try:
# 第一次登录尝试
response = self.session.post(url, json=payload, headers=headers, timeout=15)
response.raise_for_status()
result = response.json()
if result.get("code") == 40000310:
print(f" 🔐 检测到需要验证码,正在处理...")
decision_data = result.get("data", {}).get("decisionData", )
aid_encrypted = decision_data.get("aidEncrypted")
captcha_app_id = decision_data.get("captchaAppId")
if not aid_encrypted:
return {
"success": False,
"message": "无法获取验证码参数",
"mobile": mobile
}
# 调用验证码服务器
captcha_result = self.get_captcha_ticket(aid_encrypted)
if not captcha_result.get("success"):
return {
"success": False,
"message": captcha_result.get("message", "验证码获取失败"),
"mobile": mobile
}
print(f" 🔄 正在使用验证码重新登录...")
payload_with_captcha = {
"countryCode": "+86",
"mobile": mobile,
"password": encrypted_password,
"passwordLevel": password_level,
"passwordLength": len(password),
"behavior": {
"decisionCode": "TENCENT_LOW_RISK",
"verifyResult": {
"code": 0,
"data": {
"appid": captcha_app_id,
"ret": 0,
"ticket": captcha_result["ticket"],
"randstr": captcha_result["randstr"],
"verifyDuration": captcha_result["verifyDuration"],
"actionDuration": captcha_result["actionDuration"],
"sid": captcha_result["sid"]
}
}
}
}
response = self.session.post(url, json=payload_with_captcha, headers=headers, timeout=15)
response.raise_for_status()
result = response.json()
# 处理登录结果
if result.get("code") == 0:
# 登录成功,从 Cookie 中提取 KDTSESSIONID
cookies = self.session.cookies.get_dict()
kdtsessionid = cookies.get("KDTSESSIONID", "")
return {
"success": True,
"message": "登录成功",
"token": kdtsessionid,
"mobile": mobile,
"raw_response": result
}
else:
return {
"success": False,
"message": result.get("msg", "登录失败"),
"code": result.get("code"),
"mobile": mobile
}
except Exception as e:
return {
"success": False,
"message": f"登录请求失败: {str(e)}",
"mobile": mobile
}
class YouzanCheckin:
"""有赞签到类"""
def __init__(self, token: str, session: requests.Session = None):
"""
初始化签到器
:param token: KDTSESSIONID
:param session: 可选的 requests.Session 对象(用于复用登录后的 session
"""
self.token = token
self.session = session if session else requests.Session()
self.results = []
def get_shop_info(self, kdt_id: str) -> dict:
"""
获取店铺信息(包括店铺名称)
:param kdt_id: 店铺ID
:return: 店铺信息
"""
try:
url = f"https://h5.youzan.com/wscshopcore/extension/shop-info.json?kdt_id={kdt_id}"
extra_data = {
"is_weapp": 1,
"sid": self.token
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'extra-data': json.dumps(extra_data),
'Accept': 'application/json',
}
response = self.session.get(url, headers=headers, timeout=10)
result = response.json()
if result.get("code") == 0:
shop_data = result.get("data", {}).get("shop", {})
shop_name = shop_data.get("shopName", f"店铺{kdt_id}")
return {
"success": True,
"shop_name": shop_name
}
else:
return {
"success": False,
"shop_name": f"店铺{kdt_id}"
}
except Exception as e:
return {
"success": False,
"shop_name": f"店铺{kdt_id}"
}
def get_points_info(self, kdt_id: str) -> dict:
"""
查询积分信息
:param kdt_id: 店铺ID
:return: 积分信息
"""
try:
# 构建查询参数
params = f"kdt_id={kdt_id}"
# 查询积分名称
points_name_url = f"https://h5.youzan.com/wscuser/membercenter/pointsName.json?{params}"
# 查询积分统计
stats_url = f"https://h5.youzan.com/wscuser/membercenter/stats.json?{params}"
extra_data = {
"is_weapp": 1,
"sid": self.token
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'extra-data': json.dumps(extra_data),
'Accept': 'application/json',
}
# 获取积分名称
name_response = self.session.get(points_name_url, headers=headers, timeout=10)
name_result = name_response.json()
# 获取积分统计
stats_response = self.session.get(stats_url, headers=headers, timeout=10)
stats_result = stats_response.json()
if name_result.get("code") == 0 and stats_result.get("code") == 0:
points_name = name_result.get("data", {}).get("pointsName", "积分")
points_value = stats_result.get("data", {}).get("stats", {}).get("points", 0)
return {
"success": True,
"points_name": points_name,
"points_value": points_value
}
else:
return {
"success": False,
"message": "积分查询失败"
}
except Exception as e:
return {
"success": False,
"message": f"积分查询异常: {str(e)}"
}
def get_redeemable_goods(self, kdt_id: str) -> dict:
"""
查询可兑换商品列表
:param kdt_id: 店铺ID
:return: 商品列表
"""
try:
url = "https://h5.youzan.com/wscump/pointstore/listPointGoods.json"
params = {
"page_size": 1000,
"kdt_id": kdt_id
}
extra_data = {
"is_weapp": 1,
"sid": self.token
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'extra-data': json.dumps(extra_data),
'Accept': 'application/json',
}
response = self.session.get(url, headers=headers, params=params, timeout=15)
result = response.json()
if result.get("code") == 0:
# 注意API 返回的数据结构有两种
# 1. data 是数组(直接包含商品列表)- 新版 API
# 2. data 是对象(包含 items 或 list 字段)- 旧版 API
data = result.get("data", [])
# 判断 data 是数组还是对象
if isinstance(data, list):
goods_list = data
else:
# 尝试从 items 或 list 字段获取
goods_list = data.get("items", data.get("list", []))
formatted_goods = []
for idx, item in enumerate(goods_list):
# 兼容多种数据结构
# 新版 API直接从 item 获取字段
# 旧版 API从嵌套对象中获取字段
# 尝试新版 API 字段(影视飓风等店铺)
name = item.get("goodsName")
stock = item.get("remainNum")
points = item.get("pointsPrice")
cash = item.get("remainPrice", 0)
# 如果新版字段不存在,尝试旧版 API 字段
if name is None:
# 优先检查优惠券类商品couponGroupInfoDTO.groupName
coupon_info = item.get("couponGroupInfoDTO", {})
name = coupon_info.get("groupName")
# 如果不是优惠券,尝试 generalGoodsInfoDTO.generalGoodsTitle
if name is None:
general_goods_info = item.get("generalGoodsInfoDTO", {})
name = general_goods_info.get("generalGoodsTitle")
# 如果还是没有,尝试 generalGoodsTitle 的其他变体
if name is None:
name = general_goods_info.get("title")
# 最后尝试 pointGoodsInfo.title
if name is None:
goods_info = item.get("pointGoodsInfo", {})
name = goods_info.get("title", "未知商品")
if stock is None:
stock_info = item.get("pointGoodsStockDTO", {})
stock = stock_info.get("availableStock", 0)
if points is None:
price_info = item.get("pointGoodsExchangePriceDTO", {})
points = price_info.get("points", 0)
if cash == 0:
cash = price_info.get("cash", 0)
formatted_goods.append({
"name": name if name else "未知商品",
"stock": stock if stock is not None else 0,
"points": points if points is not None else 0,
"cash": cash / 100 # 转换为元
})
return {
"success": True,
"goods": formatted_goods,
"total": len(formatted_goods)
}
else:
return {
"success": False,
"message": result.get("msg", "商品查询失败")
}
except Exception as e:
return {
"success": False,
"message": f"商品查询异常: {str(e)}"
}
def checkin(self, shop_info: dict, show_points: bool = True, show_goods: bool = True) -> dict:
"""
执行签到
:param shop_info: 店铺信息字典
:param show_points: 是否显示积分信息
:param show_goods: 是否显示可兑换商品
:return: 签到结果
"""
kdt_id = shop_info.get("kdt_id")
checkin_id = shop_info.get("checkin_id")
# 检查配置完整性
if not kdt_id or not checkin_id or checkin_id == "未知":
return {
"success": False,
"name": shop_info.get("name", "未命名"),
"kdt_id": kdt_id,
"message": "店铺配置不完整,请补充 checkin_id"
}
# 如果没有提供店铺名称,自动获取
name = shop_info.get("name")
if not name:
shop_info_result = self.get_shop_info(kdt_id)
name = shop_info_result.get("shop_name", f"店铺{kdt_id}")
url = "https://h5.youzan.com/wscump/checkin/checkinV2.json"
params = {
"checkinId": checkin_id,
"kdt_id": kdt_id
}
# 构建 extra-data
extra_data = {
"is_weapp": 1,
"sid": self.token
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'extra-data': json.dumps(extra_data),
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
try:
response = self.session.get(url, headers=headers, params=params, timeout=15)
response.raise_for_status()
result = response.json()
# 解析响应
code = result.get("code")
msg = result.get("msg", "未知状态")
data = result.get("data", {})
checkin_result = {
"name": name,
"kdt_id": kdt_id
}
if code == 0:
# 签到成功
continuous_days = data.get("continuousDays", 0)
today_reward = data.get("todayReward", "")
checkin_result.update({
"success": True,
"message": f"签到成功! 已连续签到 {continuous_days}",
"continuous_days": continuous_days,
"today_reward": today_reward,
"raw_response": result
})
elif code == -1:
# Token 失效
checkin_result.update({
"success": False,
"message": f"Token已失效: {msg}",
"need_refresh_token": True
})
else:
# 其他状态(比如已经签到过)
checkin_result.update({
"success": False,
"message": msg,
"raw_response": result
})
# 查询积分信息
if show_points:
time.sleep(0.5) # 避免请求过快
points_info = self.get_points_info(kdt_id)
checkin_result["points_info"] = points_info
# 查询可兑换商品
if show_goods:
time.sleep(0.5) # 避免请求过快
goods_info = self.get_redeemable_goods(kdt_id)
checkin_result["goods_info"] = goods_info
return checkin_result
except requests.exceptions.Timeout:
return {
"success": False,
"name": name,
"kdt_id": kdt_id,
"message": "请求超时,请检查网络连接"
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"name": name,
"kdt_id": kdt_id,
"message": f"网络请求失败: {str(e)}"
}
except json.JSONDecodeError:
return {
"success": False,
"name": name,
"kdt_id": kdt_id,
"message": f"响应解析失败,原始响应: {response.text[:200]}"
}
except Exception as e:
return {
"success": False,
"name": name,
"kdt_id": kdt_id,
"message": f"未知错误: {str(e)}"
}
def batch_checkin(self, shops: list, show_points: bool = True, show_goods: bool = True) -> list:
"""
批量签到
:param shops: 店铺列表
:param show_points: 是否显示积分信息
:param show_goods: 是否显示可兑换商品
:return: 签到结果列表
"""
results = []
total = len(shops)
print(f"\n🎯 开始批量签到,共 {total} 个店铺\n")
for index, shop in enumerate(shops, 1):
kdt_id = shop.get("kdt_id")
result = self.checkin(shop, show_points, show_goods)
results.append(result)
checkin_id = shop.get("checkin_id")
print(f"🏪 店铺:{result['name']} idcheckinId={checkin_id}&kdt_id={kdt_id}")
status_emoji = "" if result.get("success") else ""
print(f"{status_emoji} 签到结果:{result['message']}")
if show_points and "points_info" in result:
points_info = result["points_info"]
if points_info.get("success"):
print(f"💰 拥有{points_info['points_name']}{points_info['points_value']}")
if show_goods and "goods_info" in result:
goods_info = result["goods_info"]
if goods_info.get("success"):
goods_list = goods_info.get("goods", [])
points_name = result.get("points_info", {}).get("points_name", "积分")
if goods_list:
for goods in goods_list:
cash_str = f" + ¥{goods['cash']:.2f}" if goods['cash'] > 0 else ""
stock_emoji = "🟢" if goods['stock'] > 0 else "🔴"
print(f"{stock_emoji} 商品:{goods['name']} | 库存:{goods['stock']} | 需要{points_name}{goods['points']}{cash_str}")
else:
print("📦 该店铺暂无可兑换商品")
else:
print(f"⚠️ 商品查询失败:{goods_info.get('message', '未知错误')}")
if index < total:
time.sleep(2)
print()
return results
def print_banner():
"""打印横幅"""
print("=" * 60)
print(" " * 15 + "🎉 有赞商城签到通杀")
print("=" * 60)
print(f"⏰ 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
def send_notification(results: list, account_count: int = 1):
"""
发送通知 (适配青龙面板)
"""
try:
import notify
success_count = sum(1 for r in results if r.get("success"))
fail_count = len(results) - success_count
title = "有赞商城签到通知"
content = f"📊 签到完成\n\n"
if account_count > 1:
content += f"👥 账号数: {account_count}\n"
content += f"✅ 成功: {success_count}\n"
content += f"❌ 失败: {fail_count}\n\n"
# 多账号时按账号分组显示
if account_count > 1:
current_mobile = None
for r in results:
if r.get("mobile") != current_mobile:
current_mobile = r.get("mobile")
mobile_tail = r.get("mobile_tail", "****")
content += f"\n📱 账号尾号: {mobile_tail}\n"
status = "" if r.get("success") else ""
content += f"{status} {r['name']}: {r['message']}\n"
else:
# 单账号时直接显示
for r in results:
status = "" if r.get("success") else ""
content += f"{status} {r['name']}: {r['message']}\n"
notify.send(title, content)
except ImportError:
pass
except Exception as e:
print(f"⚠️ 通知发送失败: {e}")
def parse_accounts(account_str: str) -> list:
"""
解析账号配置字符串
:param account_str: 账号配置字符串,支持换行符或&分隔多账号
:return: 账号列表
"""
accounts = []
if not account_str:
return accounts
# 支持多账号,用换行符或 & 分隔
account_str = account_str.replace('&', '\n')
account_parts = account_str.split('\n')
for part in account_parts:
part = part.strip()
if '#' in part:
mobile, password = part.split('#', 1)
accounts.append({
"mobile": mobile.strip(),
"password": password.strip()
})
return accounts
def main():
"""主函数"""
print_banner()
# 从环境变量读取账号配置
account_str = os.getenv("YOUZAN_ACCOUNT", "")
# 默认开启积分和商品查询
show_points = True
show_goods = True
if not account_str:
print("\n❌ 错误: 未找到账号配置")
print("\n📝 请在环境变量中配置:")
print(" 变量名: YOUZAN_ACCOUNT")
print(" 变量值: 手机号#密码")
print("\n💡 示例: 13800138000#yourpassword\n")
return
accounts = parse_accounts(account_str)
if not accounts:
print("❌ 错误: YOUZAN_ACCOUNT 格式不正确")
print("📝 正确格式: 手机号#密码 或 手机号1#密码1\\n手机号2#密码2\n")
return
print(f"\n📋 检测到 {len(accounts)} 个账号")
valid_shops = [s for s in SHOPS if s.get("checkin_id") and s["checkin_id"] != "未知"]
if not valid_shops:
print("\n❌ 错误: 没有可用的店铺配置")
print("\n📝 请在脚本中配置 SHOPS 列表,并确保填写了正确的 checkin_id\n")
return
all_results = []
# 外层循环:遍历所有账号
for idx, account in enumerate(accounts, 1):
mobile = account['mobile']
password = account['password']
mobile_tail = mobile[-4:] # 获取手机号后4位
if len(accounts) > 1:
print(f"\n{'='*60}")
print(f"🔄 账号 {idx}/{len(accounts)}: {mobile}")
print(f"{'='*60}")
else:
print(f"\n📱 账号: {mobile}")
token_cache = TokenCache()
token = None
auth = YouzanAuth()
cache_result = token_cache.load_token(mobile)
if cache_result.get("success"):
token = cache_result["token"]
cache_time = datetime.fromtimestamp(cache_result["cache_time"]).strftime('%Y-%m-%d %H:%M:%S')
print(f"✅ 使用缓存 Token{cache_time}")
# 即使有缓存,也要设置 Cookie 到 session
auth.session.cookies.set("KDTSESSIONID", token)
print()
else:
print(f"🔐 正在登录...\n")
login_result = auth.login_with_password(mobile, password)
if login_result["success"]:
token = login_result["token"]
print(f"{login_result['message']}")
if token_cache.save_token(mobile, token):
print("💾 Token 已缓存\n")
else:
print("⚠️ Token 缓存失败\n")
else:
print(f"❌ 登录失败: {login_result['message']}\n")
continue
if not token:
print("\n❌ 错误: 无法获取有效的 Token\n")
continue
# 始终使用 auth.session确保 Cookie 复用
checker = YouzanCheckin(token, session=auth.session)
results = checker.batch_checkin(valid_shops, show_points, show_goods)
token_expired = any(r.get("need_refresh_token") for r in results)
if token_expired:
print("\n⚠️ 检测到 Token 已失效,正在重新登录...\n")
token_cache.clear_token(mobile)
login_result = auth.login_with_password(mobile, password)
if login_result["success"]:
token = login_result["token"]
print("✅ 重新登录成功")
if token_cache.save_token(mobile, token):
print("💾 Token 已缓存\n")
print("🔄 重新签到中...\n")
checker = YouzanCheckin(token, session=auth.session)
results = checker.batch_checkin(valid_shops, show_points, show_goods)
else:
print(f"❌ 重新登录失败: {login_result['message']}\n")
# 为每个结果添加账号信息
for r in results:
r["mobile"] = mobile
r["mobile_tail"] = mobile_tail
all_results.extend(results)
# 发送通知
if all_results:
send_notification(all_results, len(accounts))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n用户中断执行")
except Exception as e:
print(f"\n程序执行出错: {e}")
import traceback
traceback.print_exc()