#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 有赞商城自动签到脚本 【环境变量】 export YOUZAN_ACCOUNT=手机号#密码 (必填) export CAPTCHA_SERVER=验证码服务器 (可选) 【店铺配置】 在 SHOPS 列表中添加: {"kdt_id": "店铺ID", "checkin_id": "签到ID"} 店铺名称会自动获取,无需手动填写 【定时任务】 cron: 0 8 * * * (每天早上8点) 【功能特性】 ✅ 密码登录 + 自动验证码 ✅ Token 自动缓存(7天有效) ✅ Token 失效自动重登 ✅ 积分查询 + 商品查询 ✅ 自动获取店铺名称 更新: 2025-12-10 """ import requests import json import os import sys import time import re from datetime import datetime from Crypto.Cipher import AES from Crypto.Util.Padding import pad import base64 import hashlib # 修复 Windows 控制台编码问题 if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') # ==================== Token 缓存配置 ==================== # Token 缓存文件路径(存储在脚本同目录下) TOKEN_CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".youzan_cache") # ===================================================== # ==================== 店铺配置区域 ==================== # 在这里添加你需要签到的店铺信息 # 格式:{"kdt_id": "店铺ID", "checkin_id": "签到活动ID"} SHOPS = [ {"kdt_id": "18739377", "checkin_id": "12063"}, {"kdt_id": "44877243", "checkin_id": "2162835"}, # {"kdt_id": "41067901", "checkin_id": "99"}, {"kdt_id": "129380009", "checkin_id": "3520910"}, # {"kdt_id": "109809208", "checkin_id": "2923467"}, {"kdt_id": "43958855", "checkin_id": "2910869"}, {"kdt_id": "42213767", "checkin_id": "2386563"}, {"kdt_id": "100464643", "checkin_id": "1597464"}, {"kdt_id": "117130552", "checkin_id": "3347128"}, {"kdt_id": "107786737", "checkin_id": "2299510"}, {"kdt_id": "44694253", "checkin_id": "18415"}, # {"kdt_id": "130177909", "checkin_id": "3549859"}, {"kdt_id": "93457151", "checkin_id": "1820214"}, {"kdt_id": "105036832", "checkin_id": "3997371"}, # {"kdt_id": "121810522", "checkin_id": "4804346"}, {"kdt_id": "146288343", "checkin_id": "4296415"}, {"kdt_id": "139827364", "checkin_id": "4806300"}, {"kdt_id": "16365465", "checkin_id": "13736"}, {"kdt_id": "77770507", "checkin_id": "9975"}, {"kdt_id": "43183730", "checkin_id": "24630"}, {"kdt_id": "96543670", "checkin_id": "3721624"}, {"kdt_id": "113745713", "checkin_id": "3800805"}, {"kdt_id": "179778907", "checkin_id": "5428477"}, {"kdt_id": "77093133", "checkin_id": "2947047"}, {"kdt_id": "44519665", "checkin_id": "2682116"}, {"kdt_id": "149536603", "checkin_id": "6287727"}, ] # ===================================================== class TokenCache: """Token 缓存管理类""" def __init__(self, cache_dir: str = TOKEN_CACHE_DIR): """ 初始化 Token 缓存管理器 :param cache_dir: 缓存目录路径 """ self.cache_dir = cache_dir # 确保缓存目录存在 if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) @staticmethod def _get_account_hash(mobile: str) -> str: """ 生成账号的哈希值(用于文件名) :param mobile: 手机号 :return: 哈希值 """ return hashlib.md5(mobile.encode('utf-8')).hexdigest() def get_cache_file(self, mobile: str) -> str: """ 获取缓存文件路径 :param mobile: 手机号 :return: 缓存文件路径 """ account_hash = self._get_account_hash(mobile) return os.path.join(self.cache_dir, f"token_{account_hash}.json") def save_token(self, mobile: str, token: str) -> bool: """ 保存 Token 到缓存 :param mobile: 手机号 :param token: Token :return: 是否保存成功 """ try: cache_file = self.get_cache_file(mobile) cache_data = { "mobile": mobile, "token": token, "timestamp": int(time.time()) } with open(cache_file, 'w', encoding='utf-8') as f: json.dump(cache_data, f, ensure_ascii=False, indent=2) return True except Exception as e: print(f"⚠️ 保存 Token 缓存失败: {e}") return False def load_token(self, mobile: str) -> dict: """ 从缓存加载 Token :param mobile: 手机号 :return: Token 信息字典 """ try: cache_file = self.get_cache_file(mobile) if not os.path.exists(cache_file): return { "success": False, "message": "缓存文件不存在" } with open(cache_file, 'r', encoding='utf-8') as f: cache_data = json.load(f) # 检查缓存是否过期(7天) cache_time = cache_data.get("timestamp", 0) current_time = int(time.time()) if current_time - cache_time > 7 * 24 * 3600: return { "success": False, "message": "缓存已过期(超过7天)" } return { "success": True, "token": cache_data.get("token"), "mobile": cache_data.get("mobile"), "cache_time": cache_time } except Exception as e: return { "success": False, "message": f"读取缓存失败: {str(e)}" } def clear_token(self, mobile: str) -> bool: """ 清除指定账号的 Token 缓存 :param mobile: 手机号 :return: 是否清除成功 """ try: cache_file = self.get_cache_file(mobile) if os.path.exists(cache_file): os.remove(cache_file) return True except Exception as e: print(f"⚠️ 清除 Token 缓存失败: {e}") return False class YouzanAuth: """有赞账号认证类""" # AES 加密配置(从 JS 脚本中提取) AES_KEY = b"youzan.com._key_" AES_IV = b"youzan.com.aesiv" # 验证码服务器地址(从环境变量读取,默认使用 JS 脚本中的地址) CAPTCHA_SERVER = os.getenv("CAPTCHA_SERVER", "https://captcha.fuckinghigh.eu.org") def __init__(self): self.session = requests.Session() @staticmethod def calculate_password_level(password: str) -> int: """ 计算密码强度等级 :param password: 密码 :return: 密码等级 (-1 到 2) """ level = -1 if re.search(r'\d+', password): level += 1 if re.search(r'[a-zA-Z]+', password): level += 1 if re.search(r'[^A-Za-z0-9]+', password): level += 1 return level @staticmethod def encrypt_password(password: str) -> str: """ 使用 AES-CBC 加密密码 :param password: 明文密码 :return: 加密后的密码(Base64编码) """ cipher = AES.new(YouzanAuth.AES_KEY, AES.MODE_CBC, YouzanAuth.AES_IV) padded_data = pad(password.encode('utf-8'), AES.block_size) encrypted = cipher.encrypt(padded_data) return base64.b64encode(encrypted).decode('utf-8') def get_captcha_ticket(self, aid_encrypted: str) -> dict: """ 从验证码服务器获取 ticket :param aid_encrypted: 加密的 aid :return: 验证码 ticket 信息 """ try: captcha_url = f"{self.CAPTCHA_SERVER}/captcha?aidEncrypted={aid_encrypted}" print(f" 🔐 正在获取验证码...") response = requests.get(captcha_url, timeout=30) response.raise_for_status() result = response.json() if result.get("ticket"): print(f" ✅ 验证码获取成功") return { "success": True, "ticket": result.get("ticket"), "randstr": result.get("randstr"), "verifyDuration": result.get("verifyDuration", 0), "actionDuration": result.get("actionDuration", 0), "sid": result.get("sid", "") } else: return { "success": False, "message": "验证码服务器未返回 ticket" } except Exception as e: return { "success": False, "message": f"获取验证码失败: {str(e)}" } def login_with_password(self, mobile: str, password: str) -> dict: """ 使用手机号和密码登录(支持验证码) :param mobile: 手机号 :param password: 密码 :return: 登录结果 """ url = "https://passport.youzan.com/api/login/password.json" encrypted_password = self.encrypt_password(password) password_level = self.calculate_password_level(password) payload = { "countryCode": "+86", "mobile": mobile, "password": encrypted_password, "passwordLevel": password_level, "passwordLength": len(password) } headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', 'Content-Type': 'application/json', 'Accept': 'application/json', } try: # 第一次登录尝试 response = self.session.post(url, json=payload, headers=headers, timeout=15) response.raise_for_status() result = response.json() if result.get("code") == 40000310: print(f" 🔐 检测到需要验证码,正在处理...") decision_data = result.get("data", {}).get("decisionData", ) aid_encrypted = decision_data.get("aidEncrypted") captcha_app_id = decision_data.get("captchaAppId") if not aid_encrypted: return { "success": False, "message": "无法获取验证码参数", "mobile": mobile } # 调用验证码服务器 captcha_result = self.get_captcha_ticket(aid_encrypted) if not captcha_result.get("success"): return { "success": False, "message": captcha_result.get("message", "验证码获取失败"), "mobile": mobile } print(f" 🔄 正在使用验证码重新登录...") payload_with_captcha = { "countryCode": "+86", "mobile": mobile, "password": encrypted_password, "passwordLevel": password_level, "passwordLength": len(password), "behavior": { "decisionCode": "TENCENT_LOW_RISK", "verifyResult": { "code": 0, "data": { "appid": captcha_app_id, "ret": 0, "ticket": captcha_result["ticket"], "randstr": captcha_result["randstr"], "verifyDuration": captcha_result["verifyDuration"], "actionDuration": captcha_result["actionDuration"], "sid": captcha_result["sid"] } } } } response = self.session.post(url, json=payload_with_captcha, headers=headers, timeout=15) response.raise_for_status() result = response.json() # 处理登录结果 if result.get("code") == 0: # 登录成功,从 Cookie 中提取 KDTSESSIONID cookies = self.session.cookies.get_dict() kdtsessionid = cookies.get("KDTSESSIONID", "") return { "success": True, "message": "登录成功", "token": kdtsessionid, "mobile": mobile, "raw_response": result } else: return { "success": False, "message": result.get("msg", "登录失败"), "code": result.get("code"), "mobile": mobile } except Exception as e: return { "success": False, "message": f"登录请求失败: {str(e)}", "mobile": mobile } class YouzanCheckin: """有赞签到类""" def __init__(self, token: str, session: requests.Session = None): """ 初始化签到器 :param token: KDTSESSIONID :param session: 可选的 requests.Session 对象(用于复用登录后的 session) """ self.token = token self.session = session if session else requests.Session() self.results = [] def get_shop_info(self, kdt_id: str) -> dict: """ 获取店铺信息(包括店铺名称) :param kdt_id: 店铺ID :return: 店铺信息 """ try: url = f"https://h5.youzan.com/wscshopcore/extension/shop-info.json?kdt_id={kdt_id}" extra_data = { "is_weapp": 1, "sid": self.token } headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', 'extra-data': json.dumps(extra_data), 'Accept': 'application/json', } response = self.session.get(url, headers=headers, timeout=10) result = response.json() if result.get("code") == 0: shop_data = result.get("data", {}).get("shop", {}) shop_name = shop_data.get("shopName", f"店铺{kdt_id}") return { "success": True, "shop_name": shop_name } else: return { "success": False, "shop_name": f"店铺{kdt_id}" } except Exception as e: return { "success": False, "shop_name": f"店铺{kdt_id}" } def get_points_info(self, kdt_id: str) -> dict: """ 查询积分信息 :param kdt_id: 店铺ID :return: 积分信息 """ try: # 构建查询参数 params = f"kdt_id={kdt_id}" # 查询积分名称 points_name_url = f"https://h5.youzan.com/wscuser/membercenter/pointsName.json?{params}" # 查询积分统计 stats_url = f"https://h5.youzan.com/wscuser/membercenter/stats.json?{params}" extra_data = { "is_weapp": 1, "sid": self.token } headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', 'extra-data': json.dumps(extra_data), 'Accept': 'application/json', } # 获取积分名称 name_response = self.session.get(points_name_url, headers=headers, timeout=10) name_result = name_response.json() # 获取积分统计 stats_response = self.session.get(stats_url, headers=headers, timeout=10) stats_result = stats_response.json() if name_result.get("code") == 0 and stats_result.get("code") == 0: points_name = name_result.get("data", {}).get("pointsName", "积分") points_value = stats_result.get("data", {}).get("stats", {}).get("points", 0) return { "success": True, "points_name": points_name, "points_value": points_value } else: return { "success": False, "message": "积分查询失败" } except Exception as e: return { "success": False, "message": f"积分查询异常: {str(e)}" } def get_redeemable_goods(self, kdt_id: str) -> dict: """ 查询可兑换商品列表 :param kdt_id: 店铺ID :return: 商品列表 """ try: url = "https://h5.youzan.com/wscump/pointstore/listPointGoods.json" params = { "page_size": 1000, "kdt_id": kdt_id } extra_data = { "is_weapp": 1, "sid": self.token } headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', 'extra-data': json.dumps(extra_data), 'Accept': 'application/json', } response = self.session.get(url, headers=headers, params=params, timeout=15) result = response.json() if result.get("code") == 0: # 注意:API 返回的数据结构有两种 # 1. data 是数组(直接包含商品列表)- 新版 API # 2. data 是对象(包含 items 或 list 字段)- 旧版 API data = result.get("data", []) # 判断 data 是数组还是对象 if isinstance(data, list): goods_list = data else: # 尝试从 items 或 list 字段获取 goods_list = data.get("items", data.get("list", [])) formatted_goods = [] for idx, item in enumerate(goods_list): # 兼容多种数据结构 # 新版 API:直接从 item 获取字段 # 旧版 API:从嵌套对象中获取字段 # 尝试新版 API 字段(影视飓风等店铺) name = item.get("goodsName") stock = item.get("remainNum") points = item.get("pointsPrice") cash = item.get("remainPrice", 0) # 如果新版字段不存在,尝试旧版 API 字段 if name is None: # 优先检查优惠券类商品(couponGroupInfoDTO.groupName) coupon_info = item.get("couponGroupInfoDTO", {}) name = coupon_info.get("groupName") # 如果不是优惠券,尝试 generalGoodsInfoDTO.generalGoodsTitle if name is None: general_goods_info = item.get("generalGoodsInfoDTO", {}) name = general_goods_info.get("generalGoodsTitle") # 如果还是没有,尝试 generalGoodsTitle 的其他变体 if name is None: name = general_goods_info.get("title") # 最后尝试 pointGoodsInfo.title if name is None: goods_info = item.get("pointGoodsInfo", {}) name = goods_info.get("title", "未知商品") if stock is None: stock_info = item.get("pointGoodsStockDTO", {}) stock = stock_info.get("availableStock", 0) if points is None: price_info = item.get("pointGoodsExchangePriceDTO", {}) points = price_info.get("points", 0) if cash == 0: cash = price_info.get("cash", 0) formatted_goods.append({ "name": name if name else "未知商品", "stock": stock if stock is not None else 0, "points": points if points is not None else 0, "cash": cash / 100 # 转换为元 }) return { "success": True, "goods": formatted_goods, "total": len(formatted_goods) } else: return { "success": False, "message": result.get("msg", "商品查询失败") } except Exception as e: return { "success": False, "message": f"商品查询异常: {str(e)}" } def checkin(self, shop_info: dict, show_points: bool = True, show_goods: bool = True) -> dict: """ 执行签到 :param shop_info: 店铺信息字典 :param show_points: 是否显示积分信息 :param show_goods: 是否显示可兑换商品 :return: 签到结果 """ kdt_id = shop_info.get("kdt_id") checkin_id = shop_info.get("checkin_id") # 检查配置完整性 if not kdt_id or not checkin_id or checkin_id == "未知": return { "success": False, "name": shop_info.get("name", "未命名"), "kdt_id": kdt_id, "message": "店铺配置不完整,请补充 checkin_id" } # 如果没有提供店铺名称,自动获取 name = shop_info.get("name") if not name: shop_info_result = self.get_shop_info(kdt_id) name = shop_info_result.get("shop_name", f"店铺{kdt_id}") url = "https://h5.youzan.com/wscump/checkin/checkinV2.json" params = { "checkinId": checkin_id, "kdt_id": kdt_id } # 构建 extra-data extra_data = { "is_weapp": 1, "sid": self.token } headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', 'extra-data': json.dumps(extra_data), 'Accept': 'application/json', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', } try: response = self.session.get(url, headers=headers, params=params, timeout=15) response.raise_for_status() result = response.json() # 解析响应 code = result.get("code") msg = result.get("msg", "未知状态") data = result.get("data", {}) checkin_result = { "name": name, "kdt_id": kdt_id } if code == 0: # 签到成功 continuous_days = data.get("continuousDays", 0) today_reward = data.get("todayReward", "") checkin_result.update({ "success": True, "message": f"签到成功! 已连续签到 {continuous_days} 天", "continuous_days": continuous_days, "today_reward": today_reward, "raw_response": result }) elif code == -1: # Token 失效 checkin_result.update({ "success": False, "message": f"Token已失效: {msg}", "need_refresh_token": True }) else: # 其他状态(比如已经签到过) checkin_result.update({ "success": False, "message": msg, "raw_response": result }) # 查询积分信息 if show_points: time.sleep(0.5) # 避免请求过快 points_info = self.get_points_info(kdt_id) checkin_result["points_info"] = points_info # 查询可兑换商品 if show_goods: time.sleep(0.5) # 避免请求过快 goods_info = self.get_redeemable_goods(kdt_id) checkin_result["goods_info"] = goods_info return checkin_result except requests.exceptions.Timeout: return { "success": False, "name": name, "kdt_id": kdt_id, "message": "请求超时,请检查网络连接" } except requests.exceptions.RequestException as e: return { "success": False, "name": name, "kdt_id": kdt_id, "message": f"网络请求失败: {str(e)}" } except json.JSONDecodeError: return { "success": False, "name": name, "kdt_id": kdt_id, "message": f"响应解析失败,原始响应: {response.text[:200]}" } except Exception as e: return { "success": False, "name": name, "kdt_id": kdt_id, "message": f"未知错误: {str(e)}" } def batch_checkin(self, shops: list, show_points: bool = True, show_goods: bool = True) -> list: """ 批量签到 :param shops: 店铺列表 :param show_points: 是否显示积分信息 :param show_goods: 是否显示可兑换商品 :return: 签到结果列表 """ results = [] total = len(shops) print(f"\n🎯 开始批量签到,共 {total} 个店铺\n") for index, shop in enumerate(shops, 1): kdt_id = shop.get("kdt_id") result = self.checkin(shop, show_points, show_goods) results.append(result) checkin_id = shop.get("checkin_id") print(f"🏪 店铺:{result['name']} id:checkinId={checkin_id}&kdt_id={kdt_id}") status_emoji = "✅" if result.get("success") else "❌" print(f"{status_emoji} 签到结果:{result['message']}") if show_points and "points_info" in result: points_info = result["points_info"] if points_info.get("success"): print(f"💰 拥有{points_info['points_name']}:{points_info['points_value']}") if show_goods and "goods_info" in result: goods_info = result["goods_info"] if goods_info.get("success"): goods_list = goods_info.get("goods", []) points_name = result.get("points_info", {}).get("points_name", "积分") if goods_list: for goods in goods_list: cash_str = f" + ¥{goods['cash']:.2f}" if goods['cash'] > 0 else "" stock_emoji = "🟢" if goods['stock'] > 0 else "🔴" print(f"{stock_emoji} 商品:{goods['name']} | 库存:{goods['stock']} | 需要{points_name}:{goods['points']}{cash_str}") else: print("📦 该店铺暂无可兑换商品") else: print(f"⚠️ 商品查询失败:{goods_info.get('message', '未知错误')}") if index < total: time.sleep(2) print() return results def print_banner(): """打印横幅""" print("=" * 60) print(" " * 15 + "🎉 有赞商城签到通杀") print("=" * 60) print(f"⏰ 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) def send_notification(results: list, account_count: int = 1): """ 发送通知 (适配青龙面板) """ try: import notify success_count = sum(1 for r in results if r.get("success")) fail_count = len(results) - success_count title = "有赞商城签到通知" content = f"📊 签到完成\n\n" if account_count > 1: content += f"👥 账号数: {account_count}\n" content += f"✅ 成功: {success_count} 个\n" content += f"❌ 失败: {fail_count} 个\n\n" # 多账号时按账号分组显示 if account_count > 1: current_mobile = None for r in results: if r.get("mobile") != current_mobile: current_mobile = r.get("mobile") mobile_tail = r.get("mobile_tail", "****") content += f"\n📱 账号尾号: {mobile_tail}\n" status = "✅" if r.get("success") else "❌" content += f"{status} {r['name']}: {r['message']}\n" else: # 单账号时直接显示 for r in results: status = "✅" if r.get("success") else "❌" content += f"{status} {r['name']}: {r['message']}\n" notify.send(title, content) except ImportError: pass except Exception as e: print(f"⚠️ 通知发送失败: {e}") def parse_accounts(account_str: str) -> list: """ 解析账号配置字符串 :param account_str: 账号配置字符串,支持换行符或&分隔多账号 :return: 账号列表 """ accounts = [] if not account_str: return accounts # 支持多账号,用换行符或 & 分隔 account_str = account_str.replace('&', '\n') account_parts = account_str.split('\n') for part in account_parts: part = part.strip() if '#' in part: mobile, password = part.split('#', 1) accounts.append({ "mobile": mobile.strip(), "password": password.strip() }) return accounts def main(): """主函数""" print_banner() # 从环境变量读取账号配置 account_str = os.getenv("YOUZAN_ACCOUNT", "") # 默认开启积分和商品查询 show_points = True show_goods = True if not account_str: print("\n❌ 错误: 未找到账号配置") print("\n📝 请在环境变量中配置:") print(" 变量名: YOUZAN_ACCOUNT") print(" 变量值: 手机号#密码") print("\n💡 示例: 13800138000#yourpassword\n") return accounts = parse_accounts(account_str) if not accounts: print("❌ 错误: YOUZAN_ACCOUNT 格式不正确") print("📝 正确格式: 手机号#密码 或 手机号1#密码1\\n手机号2#密码2\n") return print(f"\n📋 检测到 {len(accounts)} 个账号") valid_shops = [s for s in SHOPS if s.get("checkin_id") and s["checkin_id"] != "未知"] if not valid_shops: print("\n❌ 错误: 没有可用的店铺配置") print("\n📝 请在脚本中配置 SHOPS 列表,并确保填写了正确的 checkin_id\n") return all_results = [] # 外层循环:遍历所有账号 for idx, account in enumerate(accounts, 1): mobile = account['mobile'] password = account['password'] mobile_tail = mobile[-4:] # 获取手机号后4位 if len(accounts) > 1: print(f"\n{'='*60}") print(f"🔄 账号 {idx}/{len(accounts)}: {mobile}") print(f"{'='*60}") else: print(f"\n📱 账号: {mobile}") token_cache = TokenCache() token = None auth = YouzanAuth() cache_result = token_cache.load_token(mobile) if cache_result.get("success"): token = cache_result["token"] cache_time = datetime.fromtimestamp(cache_result["cache_time"]).strftime('%Y-%m-%d %H:%M:%S') print(f"✅ 使用缓存 Token({cache_time})") # 即使有缓存,也要设置 Cookie 到 session auth.session.cookies.set("KDTSESSIONID", token) print() else: print(f"🔐 正在登录...\n") login_result = auth.login_with_password(mobile, password) if login_result["success"]: token = login_result["token"] print(f"✅ {login_result['message']}") if token_cache.save_token(mobile, token): print("💾 Token 已缓存\n") else: print("⚠️ Token 缓存失败\n") else: print(f"❌ 登录失败: {login_result['message']}\n") continue if not token: print("\n❌ 错误: 无法获取有效的 Token\n") continue # 始终使用 auth.session,确保 Cookie 复用 checker = YouzanCheckin(token, session=auth.session) results = checker.batch_checkin(valid_shops, show_points, show_goods) token_expired = any(r.get("need_refresh_token") for r in results) if token_expired: print("\n⚠️ 检测到 Token 已失效,正在重新登录...\n") token_cache.clear_token(mobile) login_result = auth.login_with_password(mobile, password) if login_result["success"]: token = login_result["token"] print("✅ 重新登录成功") if token_cache.save_token(mobile, token): print("💾 Token 已缓存\n") print("🔄 重新签到中...\n") checker = YouzanCheckin(token, session=auth.session) results = checker.batch_checkin(valid_shops, show_points, show_goods) else: print(f"❌ 重新登录失败: {login_result['message']}\n") # 为每个结果添加账号信息 for r in results: r["mobile"] = mobile r["mobile_tail"] = mobile_tail all_results.extend(results) # 发送通知 if all_results: send_notification(all_results, len(accounts)) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n用户中断执行") except Exception as e: print(f"\n程序执行出错: {e}") import traceback traceback.print_exc()