Files
XiaoGe-LiBai-yangmao/youzan/youzan.py
2025-12-10 15:07:17 +08:00

1000 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
有赞商城自动签到脚本
【环境变量】
export YOUZAN_ACCOUNT=手机号#密码 (必填)
export CAPTCHA_SERVER=验证码服务器 (可选)
【店铺配置】
在 SHOPS 列表中添加: {"kdt_id": "店铺ID", "checkin_id": "签到ID"}
店铺名称会自动获取,无需手动填写
【定时任务】
cron: 0 8 * * * (每天早上8点)
【功能特性】
✅ 密码登录 + 自动验证码
✅ Token 自动缓存7天有效
✅ Token 失效自动重登
✅ 积分查询 + 商品查询
✅ 自动获取店铺名称
更新: 2025-12-10
"""
import requests
import json
import os
import sys
import time
import re
from datetime import datetime
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
import base64
import hashlib
# 修复 Windows 控制台编码问题
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# ==================== Token 缓存配置 ====================
# Token 缓存文件路径(存储在脚本同目录下)
TOKEN_CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".youzan_cache")
# =====================================================
# ==================== 店铺配置区域 ====================
# 在这里添加你需要签到的店铺信息
# 格式:{"kdt_id": "店铺ID", "checkin_id": "签到活动ID"}
SHOPS = [
{"kdt_id": "18739377", "checkin_id": "12063"},
{"kdt_id": "44877243", "checkin_id": "2162835"},
# {"kdt_id": "41067901", "checkin_id": "99"},
{"kdt_id": "129380009", "checkin_id": "3520910"},
# {"kdt_id": "109809208", "checkin_id": "2923467"},
{"kdt_id": "43958855", "checkin_id": "2910869"},
{"kdt_id": "42213767", "checkin_id": "2386563"},
{"kdt_id": "100464643", "checkin_id": "1597464"},
{"kdt_id": "117130552", "checkin_id": "3347128"},
{"kdt_id": "107786737", "checkin_id": "2299510"},
{"kdt_id": "44694253", "checkin_id": "18415"},
# {"kdt_id": "130177909", "checkin_id": "3549859"},
{"kdt_id": "93457151", "checkin_id": "1820214"},
{"kdt_id": "105036832", "checkin_id": "3997371"},
# {"kdt_id": "121810522", "checkin_id": "4804346"},
{"kdt_id": "146288343", "checkin_id": "4296415"},
{"kdt_id": "139827364", "checkin_id": "4806300"},
{"kdt_id": "16365465", "checkin_id": "13736"},
{"kdt_id": "77770507", "checkin_id": "9975"},
{"kdt_id": "43183730", "checkin_id": "24630"},
{"kdt_id": "96543670", "checkin_id": "3721624"},
{"kdt_id": "113745713", "checkin_id": "3800805"},
{"kdt_id": "179778907", "checkin_id": "5428477"},
{"kdt_id": "77093133", "checkin_id": "2947047"},
{"kdt_id": "44519665", "checkin_id": "2682116"},
{"kdt_id": "149536603", "checkin_id": "6287727"},
]
# =====================================================
class TokenCache:
"""Token 缓存管理类"""
def __init__(self, cache_dir: str = TOKEN_CACHE_DIR):
"""
初始化 Token 缓存管理器
:param cache_dir: 缓存目录路径
"""
self.cache_dir = cache_dir
# 确保缓存目录存在
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
@staticmethod
def _get_account_hash(mobile: str) -> str:
"""
生成账号的哈希值(用于文件名)
:param mobile: 手机号
:return: 哈希值
"""
return hashlib.md5(mobile.encode('utf-8')).hexdigest()
def get_cache_file(self, mobile: str) -> str:
"""
获取缓存文件路径
:param mobile: 手机号
:return: 缓存文件路径
"""
account_hash = self._get_account_hash(mobile)
return os.path.join(self.cache_dir, f"token_{account_hash}.json")
def save_token(self, mobile: str, token: str) -> bool:
"""
保存 Token 到缓存
:param mobile: 手机号
:param token: Token
:return: 是否保存成功
"""
try:
cache_file = self.get_cache_file(mobile)
cache_data = {
"mobile": mobile,
"token": token,
"timestamp": int(time.time())
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"⚠️ 保存 Token 缓存失败: {e}")
return False
def load_token(self, mobile: str) -> dict:
"""
从缓存加载 Token
:param mobile: 手机号
:return: Token 信息字典
"""
try:
cache_file = self.get_cache_file(mobile)
if not os.path.exists(cache_file):
return {
"success": False,
"message": "缓存文件不存在"
}
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
# 检查缓存是否过期7天
cache_time = cache_data.get("timestamp", 0)
current_time = int(time.time())
if current_time - cache_time > 7 * 24 * 3600:
return {
"success": False,
"message": "缓存已过期超过7天"
}
return {
"success": True,
"token": cache_data.get("token"),
"mobile": cache_data.get("mobile"),
"cache_time": cache_time
}
except Exception as e:
return {
"success": False,
"message": f"读取缓存失败: {str(e)}"
}
def clear_token(self, mobile: str) -> bool:
"""
清除指定账号的 Token 缓存
:param mobile: 手机号
:return: 是否清除成功
"""
try:
cache_file = self.get_cache_file(mobile)
if os.path.exists(cache_file):
os.remove(cache_file)
return True
except Exception as e:
print(f"⚠️ 清除 Token 缓存失败: {e}")
return False
class YouzanAuth:
"""有赞账号认证类"""
# AES 加密配置(从 JS 脚本中提取)
AES_KEY = b"youzan.com._key_"
AES_IV = b"youzan.com.aesiv"
# 验证码服务器地址(从环境变量读取,默认使用 JS 脚本中的地址)
CAPTCHA_SERVER = os.getenv("CAPTCHA_SERVER", "https://captcha.fuckinghigh.eu.org")
def __init__(self):
self.session = requests.Session()
@staticmethod
def calculate_password_level(password: str) -> int:
"""
计算密码强度等级
:param password: 密码
:return: 密码等级 (-1 到 2)
"""
level = -1
if re.search(r'\d+', password):
level += 1
if re.search(r'[a-zA-Z]+', password):
level += 1
if re.search(r'[^A-Za-z0-9]+', password):
level += 1
return level
@staticmethod
def encrypt_password(password: str) -> str:
"""
使用 AES-CBC 加密密码
:param password: 明文密码
:return: 加密后的密码Base64编码
"""
cipher = AES.new(YouzanAuth.AES_KEY, AES.MODE_CBC, YouzanAuth.AES_IV)
padded_data = pad(password.encode('utf-8'), AES.block_size)
encrypted = cipher.encrypt(padded_data)
return base64.b64encode(encrypted).decode('utf-8')
def get_captcha_ticket(self, aid_encrypted: str) -> dict:
"""
从验证码服务器获取 ticket
:param aid_encrypted: 加密的 aid
:return: 验证码 ticket 信息
"""
try:
captcha_url = f"{self.CAPTCHA_SERVER}/captcha?aidEncrypted={aid_encrypted}"
print(f" 🔐 正在获取验证码...")
response = requests.get(captcha_url, timeout=30)
response.raise_for_status()
result = response.json()
if result.get("ticket"):
print(f" ✅ 验证码获取成功")
return {
"success": True,
"ticket": result.get("ticket"),
"randstr": result.get("randstr"),
"verifyDuration": result.get("verifyDuration", 0),
"actionDuration": result.get("actionDuration", 0),
"sid": result.get("sid", "")
}
else:
return {
"success": False,
"message": "验证码服务器未返回 ticket"
}
except Exception as e:
return {
"success": False,
"message": f"获取验证码失败: {str(e)}"
}
def login_with_password(self, mobile: str, password: str) -> dict:
"""
使用手机号和密码登录(支持验证码)
:param mobile: 手机号
:param password: 密码
:return: 登录结果
"""
url = "https://passport.youzan.com/api/login/password.json"
encrypted_password = self.encrypt_password(password)
password_level = self.calculate_password_level(password)
payload = {
"countryCode": "+86",
"mobile": mobile,
"password": encrypted_password,
"passwordLevel": password_level,
"passwordLength": len(password)
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
try:
# 第一次登录尝试
response = self.session.post(url, json=payload, headers=headers, timeout=15)
response.raise_for_status()
result = response.json()
if result.get("code") == 40000310:
print(f" 🔐 检测到需要验证码,正在处理...")
decision_data = result.get("data", {}).get("decisionData", )
aid_encrypted = decision_data.get("aidEncrypted")
captcha_app_id = decision_data.get("captchaAppId")
if not aid_encrypted:
return {
"success": False,
"message": "无法获取验证码参数",
"mobile": mobile
}
# 调用验证码服务器
captcha_result = self.get_captcha_ticket(aid_encrypted)
if not captcha_result.get("success"):
return {
"success": False,
"message": captcha_result.get("message", "验证码获取失败"),
"mobile": mobile
}
print(f" 🔄 正在使用验证码重新登录...")
payload_with_captcha = {
"countryCode": "+86",
"mobile": mobile,
"password": encrypted_password,
"passwordLevel": password_level,
"passwordLength": len(password),
"behavior": {
"decisionCode": "TENCENT_LOW_RISK",
"verifyResult": {
"code": 0,
"data": {
"appid": captcha_app_id,
"ret": 0,
"ticket": captcha_result["ticket"],
"randstr": captcha_result["randstr"],
"verifyDuration": captcha_result["verifyDuration"],
"actionDuration": captcha_result["actionDuration"],
"sid": captcha_result["sid"]
}
}
}
}
response = self.session.post(url, json=payload_with_captcha, headers=headers, timeout=15)
response.raise_for_status()
result = response.json()
# 处理登录结果
if result.get("code") == 0:
# 登录成功,从 Cookie 中提取 KDTSESSIONID
cookies = self.session.cookies.get_dict()
kdtsessionid = cookies.get("KDTSESSIONID", "")
return {
"success": True,
"message": "登录成功",
"token": kdtsessionid,
"mobile": mobile,
"raw_response": result
}
else:
return {
"success": False,
"message": result.get("msg", "登录失败"),
"code": result.get("code"),
"mobile": mobile
}
except Exception as e:
return {
"success": False,
"message": f"登录请求失败: {str(e)}",
"mobile": mobile
}
class YouzanCheckin:
"""有赞签到类"""
def __init__(self, token: str, session: requests.Session = None):
"""
初始化签到器
:param token: KDTSESSIONID
:param session: 可选的 requests.Session 对象(用于复用登录后的 session
"""
self.token = token
self.session = session if session else requests.Session()
self.results = []
def get_shop_info(self, kdt_id: str) -> dict:
"""
获取店铺信息(包括店铺名称)
:param kdt_id: 店铺ID
:return: 店铺信息
"""
try:
url = f"https://h5.youzan.com/wscshopcore/extension/shop-info.json?kdt_id={kdt_id}"
extra_data = {
"is_weapp": 1,
"sid": self.token
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'extra-data': json.dumps(extra_data),
'Accept': 'application/json',
}
response = self.session.get(url, headers=headers, timeout=10)
result = response.json()
if result.get("code") == 0:
shop_data = result.get("data", {}).get("shop", {})
shop_name = shop_data.get("shopName", f"店铺{kdt_id}")
return {
"success": True,
"shop_name": shop_name
}
else:
return {
"success": False,
"shop_name": f"店铺{kdt_id}"
}
except Exception as e:
return {
"success": False,
"shop_name": f"店铺{kdt_id}"
}
def get_points_info(self, kdt_id: str) -> dict:
"""
查询积分信息
:param kdt_id: 店铺ID
:return: 积分信息
"""
try:
# 构建查询参数
params = f"kdt_id={kdt_id}"
# 查询积分名称
points_name_url = f"https://h5.youzan.com/wscuser/membercenter/pointsName.json?{params}"
# 查询积分统计
stats_url = f"https://h5.youzan.com/wscuser/membercenter/stats.json?{params}"
extra_data = {
"is_weapp": 1,
"sid": self.token
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'extra-data': json.dumps(extra_data),
'Accept': 'application/json',
}
# 获取积分名称
name_response = self.session.get(points_name_url, headers=headers, timeout=10)
name_result = name_response.json()
# 获取积分统计
stats_response = self.session.get(stats_url, headers=headers, timeout=10)
stats_result = stats_response.json()
if name_result.get("code") == 0 and stats_result.get("code") == 0:
points_name = name_result.get("data", {}).get("pointsName", "积分")
points_value = stats_result.get("data", {}).get("stats", {}).get("points", 0)
return {
"success": True,
"points_name": points_name,
"points_value": points_value
}
else:
return {
"success": False,
"message": "积分查询失败"
}
except Exception as e:
return {
"success": False,
"message": f"积分查询异常: {str(e)}"
}
def get_redeemable_goods(self, kdt_id: str) -> dict:
"""
查询可兑换商品列表
:param kdt_id: 店铺ID
:return: 商品列表
"""
try:
url = "https://h5.youzan.com/wscump/pointstore/listPointGoods.json"
params = {
"page_size": 1000,
"kdt_id": kdt_id
}
extra_data = {
"is_weapp": 1,
"sid": self.token
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'extra-data': json.dumps(extra_data),
'Accept': 'application/json',
}
response = self.session.get(url, headers=headers, params=params, timeout=15)
result = response.json()
if result.get("code") == 0:
# 注意API 返回的数据结构有两种
# 1. data 是数组(直接包含商品列表)- 新版 API
# 2. data 是对象(包含 items 或 list 字段)- 旧版 API
data = result.get("data", [])
# 判断 data 是数组还是对象
if isinstance(data, list):
goods_list = data
else:
# 尝试从 items 或 list 字段获取
goods_list = data.get("items", data.get("list", []))
formatted_goods = []
for idx, item in enumerate(goods_list):
# 兼容多种数据结构
# 新版 API直接从 item 获取字段
# 旧版 API从嵌套对象中获取字段
# 尝试新版 API 字段(影视飓风等店铺)
name = item.get("goodsName")
stock = item.get("remainNum")
points = item.get("pointsPrice")
cash = item.get("remainPrice", 0)
# 如果新版字段不存在,尝试旧版 API 字段
if name is None:
# 优先检查优惠券类商品couponGroupInfoDTO.groupName
coupon_info = item.get("couponGroupInfoDTO", {})
name = coupon_info.get("groupName")
# 如果不是优惠券,尝试 generalGoodsInfoDTO.generalGoodsTitle
if name is None:
general_goods_info = item.get("generalGoodsInfoDTO", {})
name = general_goods_info.get("generalGoodsTitle")
# 如果还是没有,尝试 generalGoodsTitle 的其他变体
if name is None:
name = general_goods_info.get("title")
# 最后尝试 pointGoodsInfo.title
if name is None:
goods_info = item.get("pointGoodsInfo", {})
name = goods_info.get("title", "未知商品")
if stock is None:
stock_info = item.get("pointGoodsStockDTO", {})
stock = stock_info.get("availableStock", 0)
if points is None:
price_info = item.get("pointGoodsExchangePriceDTO", {})
points = price_info.get("points", 0)
if cash == 0:
cash = price_info.get("cash", 0)
formatted_goods.append({
"name": name if name else "未知商品",
"stock": stock if stock is not None else 0,
"points": points if points is not None else 0,
"cash": cash / 100 # 转换为元
})
return {
"success": True,
"goods": formatted_goods,
"total": len(formatted_goods)
}
else:
return {
"success": False,
"message": result.get("msg", "商品查询失败")
}
except Exception as e:
return {
"success": False,
"message": f"商品查询异常: {str(e)}"
}
def checkin(self, shop_info: dict, show_points: bool = True, show_goods: bool = True) -> dict:
"""
执行签到
:param shop_info: 店铺信息字典
:param show_points: 是否显示积分信息
:param show_goods: 是否显示可兑换商品
:return: 签到结果
"""
kdt_id = shop_info.get("kdt_id")
checkin_id = shop_info.get("checkin_id")
# 检查配置完整性
if not kdt_id or not checkin_id or checkin_id == "未知":
return {
"success": False,
"name": shop_info.get("name", "未命名"),
"kdt_id": kdt_id,
"message": "店铺配置不完整,请补充 checkin_id"
}
# 如果没有提供店铺名称,自动获取
name = shop_info.get("name")
if not name:
shop_info_result = self.get_shop_info(kdt_id)
name = shop_info_result.get("shop_name", f"店铺{kdt_id}")
url = "https://h5.youzan.com/wscump/checkin/checkinV2.json"
params = {
"checkinId": checkin_id,
"kdt_id": kdt_id
}
# 构建 extra-data
extra_data = {
"is_weapp": 1,
"sid": self.token
}
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'extra-data': json.dumps(extra_data),
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
try:
response = self.session.get(url, headers=headers, params=params, timeout=15)
response.raise_for_status()
result = response.json()
# 解析响应
code = result.get("code")
msg = result.get("msg", "未知状态")
data = result.get("data", {})
checkin_result = {
"name": name,
"kdt_id": kdt_id
}
if code == 0:
# 签到成功
continuous_days = data.get("continuousDays", 0)
today_reward = data.get("todayReward", "")
checkin_result.update({
"success": True,
"message": f"签到成功! 已连续签到 {continuous_days}",
"continuous_days": continuous_days,
"today_reward": today_reward,
"raw_response": result
})
elif code == -1:
# Token 失效
checkin_result.update({
"success": False,
"message": f"Token已失效: {msg}",
"need_refresh_token": True
})
else:
# 其他状态(比如已经签到过)
checkin_result.update({
"success": False,
"message": msg,
"raw_response": result
})
# 查询积分信息
if show_points:
time.sleep(0.5) # 避免请求过快
points_info = self.get_points_info(kdt_id)
checkin_result["points_info"] = points_info
# 查询可兑换商品
if show_goods:
time.sleep(0.5) # 避免请求过快
goods_info = self.get_redeemable_goods(kdt_id)
checkin_result["goods_info"] = goods_info
return checkin_result
except requests.exceptions.Timeout:
return {
"success": False,
"name": name,
"kdt_id": kdt_id,
"message": "请求超时,请检查网络连接"
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"name": name,
"kdt_id": kdt_id,
"message": f"网络请求失败: {str(e)}"
}
except json.JSONDecodeError:
return {
"success": False,
"name": name,
"kdt_id": kdt_id,
"message": f"响应解析失败,原始响应: {response.text[:200]}"
}
except Exception as e:
return {
"success": False,
"name": name,
"kdt_id": kdt_id,
"message": f"未知错误: {str(e)}"
}
def batch_checkin(self, shops: list, show_points: bool = True, show_goods: bool = True) -> list:
"""
批量签到
:param shops: 店铺列表
:param show_points: 是否显示积分信息
:param show_goods: 是否显示可兑换商品
:return: 签到结果列表
"""
results = []
total = len(shops)
print(f"\n🎯 开始批量签到,共 {total} 个店铺\n")
for index, shop in enumerate(shops, 1):
kdt_id = shop.get("kdt_id")
result = self.checkin(shop, show_points, show_goods)
results.append(result)
checkin_id = shop.get("checkin_id")
print(f"🏪 店铺:{result['name']} idcheckinId={checkin_id}&kdt_id={kdt_id}")
status_emoji = "" if result.get("success") else ""
print(f"{status_emoji} 签到结果:{result['message']}")
if show_points and "points_info" in result:
points_info = result["points_info"]
if points_info.get("success"):
print(f"💰 拥有{points_info['points_name']}{points_info['points_value']}")
if show_goods and "goods_info" in result:
goods_info = result["goods_info"]
if goods_info.get("success"):
goods_list = goods_info.get("goods", [])
points_name = result.get("points_info", {}).get("points_name", "积分")
if goods_list:
for goods in goods_list:
cash_str = f" + ¥{goods['cash']:.2f}" if goods['cash'] > 0 else ""
stock_emoji = "🟢" if goods['stock'] > 0 else "🔴"
print(f"{stock_emoji} 商品:{goods['name']} | 库存:{goods['stock']} | 需要{points_name}{goods['points']}{cash_str}")
else:
print("📦 该店铺暂无可兑换商品")
else:
print(f"⚠️ 商品查询失败:{goods_info.get('message', '未知错误')}")
if index < total:
time.sleep(2)
print()
return results
def print_banner():
"""打印横幅"""
print("=" * 60)
print(" " * 15 + "🎉 有赞商城签到通杀")
print("=" * 60)
print(f"⏰ 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
def send_notification(results: list, account_count: int = 1):
"""
发送通知 (适配青龙面板)
"""
try:
import notify
success_count = sum(1 for r in results if r.get("success"))
fail_count = len(results) - success_count
title = "有赞商城签到通知"
content = f"📊 签到完成\n\n"
if account_count > 1:
content += f"👥 账号数: {account_count}\n"
content += f"✅ 成功: {success_count}\n"
content += f"❌ 失败: {fail_count}\n\n"
# 多账号时按账号分组显示
if account_count > 1:
current_mobile = None
for r in results:
if r.get("mobile") != current_mobile:
current_mobile = r.get("mobile")
mobile_tail = r.get("mobile_tail", "****")
content += f"\n📱 账号尾号: {mobile_tail}\n"
status = "" if r.get("success") else ""
content += f"{status} {r['name']}: {r['message']}\n"
else:
# 单账号时直接显示
for r in results:
status = "" if r.get("success") else ""
content += f"{status} {r['name']}: {r['message']}\n"
notify.send(title, content)
except ImportError:
pass
except Exception as e:
print(f"⚠️ 通知发送失败: {e}")
def parse_accounts(account_str: str) -> list:
"""
解析账号配置字符串
:param account_str: 账号配置字符串,支持换行符或&分隔多账号
:return: 账号列表
"""
accounts = []
if not account_str:
return accounts
# 支持多账号,用换行符或 & 分隔
account_str = account_str.replace('&', '\n')
account_parts = account_str.split('\n')
for part in account_parts:
part = part.strip()
if '#' in part:
mobile, password = part.split('#', 1)
accounts.append({
"mobile": mobile.strip(),
"password": password.strip()
})
return accounts
def main():
"""主函数"""
print_banner()
# 从环境变量读取账号配置
account_str = os.getenv("YOUZAN_ACCOUNT", "")
# 默认开启积分和商品查询
show_points = True
show_goods = True
if not account_str:
print("\n❌ 错误: 未找到账号配置")
print("\n📝 请在环境变量中配置:")
print(" 变量名: YOUZAN_ACCOUNT")
print(" 变量值: 手机号#密码")
print("\n💡 示例: 13800138000#yourpassword\n")
return
accounts = parse_accounts(account_str)
if not accounts:
print("❌ 错误: YOUZAN_ACCOUNT 格式不正确")
print("📝 正确格式: 手机号#密码 或 手机号1#密码1\\n手机号2#密码2\n")
return
print(f"\n📋 检测到 {len(accounts)} 个账号")
valid_shops = [s for s in SHOPS if s.get("checkin_id") and s["checkin_id"] != "未知"]
if not valid_shops:
print("\n❌ 错误: 没有可用的店铺配置")
print("\n📝 请在脚本中配置 SHOPS 列表,并确保填写了正确的 checkin_id\n")
return
all_results = []
# 外层循环:遍历所有账号
for idx, account in enumerate(accounts, 1):
mobile = account['mobile']
password = account['password']
mobile_tail = mobile[-4:] # 获取手机号后4位
if len(accounts) > 1:
print(f"\n{'='*60}")
print(f"🔄 账号 {idx}/{len(accounts)}: {mobile}")
print(f"{'='*60}")
else:
print(f"\n📱 账号: {mobile}")
token_cache = TokenCache()
token = None
auth = YouzanAuth()
cache_result = token_cache.load_token(mobile)
if cache_result.get("success"):
token = cache_result["token"]
cache_time = datetime.fromtimestamp(cache_result["cache_time"]).strftime('%Y-%m-%d %H:%M:%S')
print(f"✅ 使用缓存 Token{cache_time}")
# 即使有缓存,也要设置 Cookie 到 session
auth.session.cookies.set("KDTSESSIONID", token)
print()
else:
print(f"🔐 正在登录...\n")
login_result = auth.login_with_password(mobile, password)
if login_result["success"]:
token = login_result["token"]
print(f"{login_result['message']}")
if token_cache.save_token(mobile, token):
print("💾 Token 已缓存\n")
else:
print("⚠️ Token 缓存失败\n")
else:
print(f"❌ 登录失败: {login_result['message']}\n")
continue
if not token:
print("\n❌ 错误: 无法获取有效的 Token\n")
continue
# 始终使用 auth.session确保 Cookie 复用
checker = YouzanCheckin(token, session=auth.session)
results = checker.batch_checkin(valid_shops, show_points, show_goods)
token_expired = any(r.get("need_refresh_token") for r in results)
if token_expired:
print("\n⚠️ 检测到 Token 已失效,正在重新登录...\n")
token_cache.clear_token(mobile)
login_result = auth.login_with_password(mobile, password)
if login_result["success"]:
token = login_result["token"]
print("✅ 重新登录成功")
if token_cache.save_token(mobile, token):
print("💾 Token 已缓存\n")
print("🔄 重新签到中...\n")
checker = YouzanCheckin(token, session=auth.session)
results = checker.batch_checkin(valid_shops, show_points, show_goods)
else:
print(f"❌ 重新登录失败: {login_result['message']}\n")
# 为每个结果添加账号信息
for r in results:
r["mobile"] = mobile
r["mobile_tail"] = mobile_tail
all_results.extend(results)
# 发送通知
if all_results:
send_notification(all_results, len(accounts))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n用户中断执行")
except Exception as e:
print(f"\n程序执行出错: {e}")
import traceback
traceback.print_exc()