diff --git a/youzan/youzan.py b/youzan/youzan.py new file mode 100644 index 0000000..42b8f4a --- /dev/null +++ b/youzan/youzan.py @@ -0,0 +1,999 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +有赞商城自动签到脚本 + +【环境变量】 +export YOUZAN_ACCOUNT=手机号#密码 (必填) +export CAPTCHA_SERVER=验证码服务器 (可选) + +【店铺配置】 +在 SHOPS 列表中添加: {"kdt_id": "店铺ID", "checkin_id": "签到ID"} +店铺名称会自动获取,无需手动填写 + +【定时任务】 +cron: 0 8 * * * (每天早上8点) + +【功能特性】 +✅ 密码登录 + 自动验证码 +✅ Token 自动缓存(7天有效) +✅ Token 失效自动重登 +✅ 积分查询 + 商品查询 +✅ 自动获取店铺名称 + +更新: 2025-12-10 +""" + +import requests +import json +import os +import sys +import time +import re +from datetime import datetime +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad +import base64 +import hashlib + +# 修复 Windows 控制台编码问题 +if sys.platform == 'win32': + import io + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') + sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') + + +# ==================== Token 缓存配置 ==================== +# Token 缓存文件路径(存储在脚本同目录下) +TOKEN_CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".youzan_cache") +# ===================================================== + + +# ==================== 店铺配置区域 ==================== +# 在这里添加你需要签到的店铺信息 +# 格式:{"kdt_id": "店铺ID", "checkin_id": "签到活动ID"} +SHOPS = [ + {"kdt_id": "18739377", "checkin_id": "12063"}, + {"kdt_id": "44877243", "checkin_id": "2162835"}, + # {"kdt_id": "41067901", "checkin_id": "99"}, + {"kdt_id": "129380009", "checkin_id": "3520910"}, + # {"kdt_id": "109809208", "checkin_id": "2923467"}, + {"kdt_id": "43958855", "checkin_id": "2910869"}, + {"kdt_id": "42213767", "checkin_id": "2386563"}, + {"kdt_id": "100464643", "checkin_id": "1597464"}, + {"kdt_id": "117130552", "checkin_id": "3347128"}, + {"kdt_id": "107786737", "checkin_id": "2299510"}, + {"kdt_id": "44694253", "checkin_id": "18415"}, + # {"kdt_id": "130177909", "checkin_id": "3549859"}, + {"kdt_id": "93457151", "checkin_id": "1820214"}, + {"kdt_id": "105036832", "checkin_id": "3997371"}, + # {"kdt_id": "121810522", "checkin_id": "4804346"}, + {"kdt_id": "146288343", "checkin_id": "4296415"}, + {"kdt_id": "139827364", "checkin_id": "4806300"}, + {"kdt_id": "16365465", "checkin_id": "13736"}, + {"kdt_id": "77770507", "checkin_id": "9975"}, + {"kdt_id": "43183730", "checkin_id": "24630"}, + {"kdt_id": "96543670", "checkin_id": "3721624"}, + {"kdt_id": "113745713", "checkin_id": "3800805"}, + {"kdt_id": "179778907", "checkin_id": "5428477"}, + {"kdt_id": "77093133", "checkin_id": "2947047"}, + {"kdt_id": "44519665", "checkin_id": "2682116"}, + {"kdt_id": "149536603", "checkin_id": "6287727"}, +] +# ===================================================== + + +class TokenCache: + """Token 缓存管理类""" + + def __init__(self, cache_dir: str = TOKEN_CACHE_DIR): + """ + 初始化 Token 缓存管理器 + :param cache_dir: 缓存目录路径 + """ + self.cache_dir = cache_dir + # 确保缓存目录存在 + if not os.path.exists(self.cache_dir): + os.makedirs(self.cache_dir) + + @staticmethod + def _get_account_hash(mobile: str) -> str: + """ + 生成账号的哈希值(用于文件名) + :param mobile: 手机号 + :return: 哈希值 + """ + return hashlib.md5(mobile.encode('utf-8')).hexdigest() + + def get_cache_file(self, mobile: str) -> str: + """ + 获取缓存文件路径 + :param mobile: 手机号 + :return: 缓存文件路径 + """ + account_hash = self._get_account_hash(mobile) + return os.path.join(self.cache_dir, f"token_{account_hash}.json") + + def save_token(self, mobile: str, token: str) -> bool: + """ + 保存 Token 到缓存 + :param mobile: 手机号 + :param token: Token + :return: 是否保存成功 + """ + try: + cache_file = self.get_cache_file(mobile) + cache_data = { + "mobile": mobile, + "token": token, + "timestamp": int(time.time()) + } + + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(cache_data, f, ensure_ascii=False, indent=2) + + return True + except Exception as e: + print(f"⚠️ 保存 Token 缓存失败: {e}") + return False + + def load_token(self, mobile: str) -> dict: + """ + 从缓存加载 Token + :param mobile: 手机号 + :return: Token 信息字典 + """ + try: + cache_file = self.get_cache_file(mobile) + + if not os.path.exists(cache_file): + return { + "success": False, + "message": "缓存文件不存在" + } + + with open(cache_file, 'r', encoding='utf-8') as f: + cache_data = json.load(f) + + # 检查缓存是否过期(7天) + cache_time = cache_data.get("timestamp", 0) + current_time = int(time.time()) + if current_time - cache_time > 7 * 24 * 3600: + return { + "success": False, + "message": "缓存已过期(超过7天)" + } + + return { + "success": True, + "token": cache_data.get("token"), + "mobile": cache_data.get("mobile"), + "cache_time": cache_time + } + + except Exception as e: + return { + "success": False, + "message": f"读取缓存失败: {str(e)}" + } + + def clear_token(self, mobile: str) -> bool: + """ + 清除指定账号的 Token 缓存 + :param mobile: 手机号 + :return: 是否清除成功 + """ + try: + cache_file = self.get_cache_file(mobile) + if os.path.exists(cache_file): + os.remove(cache_file) + return True + except Exception as e: + print(f"⚠️ 清除 Token 缓存失败: {e}") + return False + + +class YouzanAuth: + """有赞账号认证类""" + + # AES 加密配置(从 JS 脚本中提取) + AES_KEY = b"youzan.com._key_" + AES_IV = b"youzan.com.aesiv" + + # 验证码服务器地址(从环境变量读取,默认使用 JS 脚本中的地址) + CAPTCHA_SERVER = os.getenv("CAPTCHA_SERVER", "https://captcha.fuckinghigh.eu.org") + + def __init__(self): + self.session = requests.Session() + + @staticmethod + def calculate_password_level(password: str) -> int: + """ + 计算密码强度等级 + :param password: 密码 + :return: 密码等级 (-1 到 2) + """ + level = -1 + if re.search(r'\d+', password): + level += 1 + if re.search(r'[a-zA-Z]+', password): + level += 1 + if re.search(r'[^A-Za-z0-9]+', password): + level += 1 + return level + + @staticmethod + def encrypt_password(password: str) -> str: + """ + 使用 AES-CBC 加密密码 + :param password: 明文密码 + :return: 加密后的密码(Base64编码) + """ + cipher = AES.new(YouzanAuth.AES_KEY, AES.MODE_CBC, YouzanAuth.AES_IV) + padded_data = pad(password.encode('utf-8'), AES.block_size) + encrypted = cipher.encrypt(padded_data) + return base64.b64encode(encrypted).decode('utf-8') + + def get_captcha_ticket(self, aid_encrypted: str) -> dict: + """ + 从验证码服务器获取 ticket + :param aid_encrypted: 加密的 aid + :return: 验证码 ticket 信息 + """ + try: + captcha_url = f"{self.CAPTCHA_SERVER}/captcha?aidEncrypted={aid_encrypted}" + + print(f" 🔐 正在获取验证码...") + + response = requests.get(captcha_url, timeout=30) + response.raise_for_status() + result = response.json() + + if result.get("ticket"): + print(f" ✅ 验证码获取成功") + return { + "success": True, + "ticket": result.get("ticket"), + "randstr": result.get("randstr"), + "verifyDuration": result.get("verifyDuration", 0), + "actionDuration": result.get("actionDuration", 0), + "sid": result.get("sid", "") + } + else: + return { + "success": False, + "message": "验证码服务器未返回 ticket" + } + + except Exception as e: + return { + "success": False, + "message": f"获取验证码失败: {str(e)}" + } + + def login_with_password(self, mobile: str, password: str) -> dict: + """ + 使用手机号和密码登录(支持验证码) + :param mobile: 手机号 + :param password: 密码 + :return: 登录结果 + """ + url = "https://passport.youzan.com/api/login/password.json" + + encrypted_password = self.encrypt_password(password) + password_level = self.calculate_password_level(password) + + payload = { + "countryCode": "+86", + "mobile": mobile, + "password": encrypted_password, + "passwordLevel": password_level, + "passwordLength": len(password) + } + + headers = { + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + + try: + # 第一次登录尝试 + response = self.session.post(url, json=payload, headers=headers, timeout=15) + response.raise_for_status() + result = response.json() + + if result.get("code") == 40000310: + print(f" 🔐 检测到需要验证码,正在处理...") + decision_data = result.get("data", {}).get("decisionData", ) + aid_encrypted = decision_data.get("aidEncrypted") + captcha_app_id = decision_data.get("captchaAppId") + + if not aid_encrypted: + return { + "success": False, + "message": "无法获取验证码参数", + "mobile": mobile + } + + # 调用验证码服务器 + captcha_result = self.get_captcha_ticket(aid_encrypted) + + if not captcha_result.get("success"): + return { + "success": False, + "message": captcha_result.get("message", "验证码获取失败"), + "mobile": mobile + } + + print(f" 🔄 正在使用验证码重新登录...") + + payload_with_captcha = { + "countryCode": "+86", + "mobile": mobile, + "password": encrypted_password, + "passwordLevel": password_level, + "passwordLength": len(password), + "behavior": { + "decisionCode": "TENCENT_LOW_RISK", + "verifyResult": { + "code": 0, + "data": { + "appid": captcha_app_id, + "ret": 0, + "ticket": captcha_result["ticket"], + "randstr": captcha_result["randstr"], + "verifyDuration": captcha_result["verifyDuration"], + "actionDuration": captcha_result["actionDuration"], + "sid": captcha_result["sid"] + } + } + } + } + + response = self.session.post(url, json=payload_with_captcha, headers=headers, timeout=15) + response.raise_for_status() + result = response.json() + + # 处理登录结果 + if result.get("code") == 0: + # 登录成功,从 Cookie 中提取 KDTSESSIONID + cookies = self.session.cookies.get_dict() + kdtsessionid = cookies.get("KDTSESSIONID", "") + + return { + "success": True, + "message": "登录成功", + "token": kdtsessionid, + "mobile": mobile, + "raw_response": result + } + else: + return { + "success": False, + "message": result.get("msg", "登录失败"), + "code": result.get("code"), + "mobile": mobile + } + + except Exception as e: + return { + "success": False, + "message": f"登录请求失败: {str(e)}", + "mobile": mobile + } + + +class YouzanCheckin: + """有赞签到类""" + + def __init__(self, token: str, session: requests.Session = None): + """ + 初始化签到器 + :param token: KDTSESSIONID + :param session: 可选的 requests.Session 对象(用于复用登录后的 session) + """ + self.token = token + self.session = session if session else requests.Session() + self.results = [] + + def get_shop_info(self, kdt_id: str) -> dict: + """ + 获取店铺信息(包括店铺名称) + :param kdt_id: 店铺ID + :return: 店铺信息 + """ + try: + url = f"https://h5.youzan.com/wscshopcore/extension/shop-info.json?kdt_id={kdt_id}" + + extra_data = { + "is_weapp": 1, + "sid": self.token + } + + headers = { + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', + 'extra-data': json.dumps(extra_data), + 'Accept': 'application/json', + } + + response = self.session.get(url, headers=headers, timeout=10) + result = response.json() + + if result.get("code") == 0: + shop_data = result.get("data", {}).get("shop", {}) + shop_name = shop_data.get("shopName", f"店铺{kdt_id}") + + return { + "success": True, + "shop_name": shop_name + } + else: + return { + "success": False, + "shop_name": f"店铺{kdt_id}" + } + + except Exception as e: + return { + "success": False, + "shop_name": f"店铺{kdt_id}" + } + + def get_points_info(self, kdt_id: str) -> dict: + """ + 查询积分信息 + :param kdt_id: 店铺ID + :return: 积分信息 + """ + try: + # 构建查询参数 + params = f"kdt_id={kdt_id}" + + # 查询积分名称 + points_name_url = f"https://h5.youzan.com/wscuser/membercenter/pointsName.json?{params}" + + # 查询积分统计 + stats_url = f"https://h5.youzan.com/wscuser/membercenter/stats.json?{params}" + + extra_data = { + "is_weapp": 1, + "sid": self.token + } + + headers = { + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', + 'extra-data': json.dumps(extra_data), + 'Accept': 'application/json', + } + + # 获取积分名称 + name_response = self.session.get(points_name_url, headers=headers, timeout=10) + name_result = name_response.json() + + # 获取积分统计 + stats_response = self.session.get(stats_url, headers=headers, timeout=10) + stats_result = stats_response.json() + + if name_result.get("code") == 0 and stats_result.get("code") == 0: + points_name = name_result.get("data", {}).get("pointsName", "积分") + points_value = stats_result.get("data", {}).get("stats", {}).get("points", 0) + + return { + "success": True, + "points_name": points_name, + "points_value": points_value + } + else: + return { + "success": False, + "message": "积分查询失败" + } + + except Exception as e: + return { + "success": False, + "message": f"积分查询异常: {str(e)}" + } + + def get_redeemable_goods(self, kdt_id: str) -> dict: + """ + 查询可兑换商品列表 + :param kdt_id: 店铺ID + :return: 商品列表 + """ + try: + url = "https://h5.youzan.com/wscump/pointstore/listPointGoods.json" + + params = { + "page_size": 1000, + "kdt_id": kdt_id + } + + extra_data = { + "is_weapp": 1, + "sid": self.token + } + + headers = { + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', + 'extra-data': json.dumps(extra_data), + 'Accept': 'application/json', + } + + response = self.session.get(url, headers=headers, params=params, timeout=15) + result = response.json() + + if result.get("code") == 0: + # 注意:API 返回的数据结构有两种 + # 1. data 是数组(直接包含商品列表)- 新版 API + # 2. data 是对象(包含 items 或 list 字段)- 旧版 API + data = result.get("data", []) + + # 判断 data 是数组还是对象 + if isinstance(data, list): + goods_list = data + else: + # 尝试从 items 或 list 字段获取 + goods_list = data.get("items", data.get("list", [])) + + formatted_goods = [] + for idx, item in enumerate(goods_list): + # 兼容多种数据结构 + # 新版 API:直接从 item 获取字段 + # 旧版 API:从嵌套对象中获取字段 + + # 尝试新版 API 字段(影视飓风等店铺) + name = item.get("goodsName") + stock = item.get("remainNum") + points = item.get("pointsPrice") + cash = item.get("remainPrice", 0) + + # 如果新版字段不存在,尝试旧版 API 字段 + if name is None: + # 优先检查优惠券类商品(couponGroupInfoDTO.groupName) + coupon_info = item.get("couponGroupInfoDTO", {}) + name = coupon_info.get("groupName") + + # 如果不是优惠券,尝试 generalGoodsInfoDTO.generalGoodsTitle + if name is None: + general_goods_info = item.get("generalGoodsInfoDTO", {}) + name = general_goods_info.get("generalGoodsTitle") + + # 如果还是没有,尝试 generalGoodsTitle 的其他变体 + if name is None: + name = general_goods_info.get("title") + + # 最后尝试 pointGoodsInfo.title + if name is None: + goods_info = item.get("pointGoodsInfo", {}) + name = goods_info.get("title", "未知商品") + + if stock is None: + stock_info = item.get("pointGoodsStockDTO", {}) + stock = stock_info.get("availableStock", 0) + + if points is None: + price_info = item.get("pointGoodsExchangePriceDTO", {}) + points = price_info.get("points", 0) + if cash == 0: + cash = price_info.get("cash", 0) + + formatted_goods.append({ + "name": name if name else "未知商品", + "stock": stock if stock is not None else 0, + "points": points if points is not None else 0, + "cash": cash / 100 # 转换为元 + }) + + return { + "success": True, + "goods": formatted_goods, + "total": len(formatted_goods) + } + else: + return { + "success": False, + "message": result.get("msg", "商品查询失败") + } + + except Exception as e: + return { + "success": False, + "message": f"商品查询异常: {str(e)}" + } + + def checkin(self, shop_info: dict, show_points: bool = True, show_goods: bool = True) -> dict: + """ + 执行签到 + :param shop_info: 店铺信息字典 + :param show_points: 是否显示积分信息 + :param show_goods: 是否显示可兑换商品 + :return: 签到结果 + """ + kdt_id = shop_info.get("kdt_id") + checkin_id = shop_info.get("checkin_id") + + # 检查配置完整性 + if not kdt_id or not checkin_id or checkin_id == "未知": + return { + "success": False, + "name": shop_info.get("name", "未命名"), + "kdt_id": kdt_id, + "message": "店铺配置不完整,请补充 checkin_id" + } + + # 如果没有提供店铺名称,自动获取 + name = shop_info.get("name") + if not name: + shop_info_result = self.get_shop_info(kdt_id) + name = shop_info_result.get("shop_name", f"店铺{kdt_id}") + + url = "https://h5.youzan.com/wscump/checkin/checkinV2.json" + + params = { + "checkinId": checkin_id, + "kdt_id": kdt_id + } + + # 构建 extra-data + extra_data = { + "is_weapp": 1, + "sid": self.token + } + + headers = { + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN', + 'extra-data': json.dumps(extra_data), + 'Accept': 'application/json', + 'Accept-Encoding': 'gzip, deflate, br', + 'Accept-Language': 'zh-CN,zh;q=0.9', + } + + try: + response = self.session.get(url, headers=headers, params=params, timeout=15) + response.raise_for_status() + result = response.json() + + # 解析响应 + code = result.get("code") + msg = result.get("msg", "未知状态") + data = result.get("data", {}) + + checkin_result = { + "name": name, + "kdt_id": kdt_id + } + + if code == 0: + # 签到成功 + continuous_days = data.get("continuousDays", 0) + today_reward = data.get("todayReward", "") + + checkin_result.update({ + "success": True, + "message": f"签到成功! 已连续签到 {continuous_days} 天", + "continuous_days": continuous_days, + "today_reward": today_reward, + "raw_response": result + }) + elif code == -1: + # Token 失效 + checkin_result.update({ + "success": False, + "message": f"Token已失效: {msg}", + "need_refresh_token": True + }) + else: + # 其他状态(比如已经签到过) + checkin_result.update({ + "success": False, + "message": msg, + "raw_response": result + }) + + # 查询积分信息 + if show_points: + time.sleep(0.5) # 避免请求过快 + points_info = self.get_points_info(kdt_id) + checkin_result["points_info"] = points_info + + # 查询可兑换商品 + if show_goods: + time.sleep(0.5) # 避免请求过快 + goods_info = self.get_redeemable_goods(kdt_id) + checkin_result["goods_info"] = goods_info + + return checkin_result + + except requests.exceptions.Timeout: + return { + "success": False, + "name": name, + "kdt_id": kdt_id, + "message": "请求超时,请检查网络连接" + } + except requests.exceptions.RequestException as e: + return { + "success": False, + "name": name, + "kdt_id": kdt_id, + "message": f"网络请求失败: {str(e)}" + } + except json.JSONDecodeError: + return { + "success": False, + "name": name, + "kdt_id": kdt_id, + "message": f"响应解析失败,原始响应: {response.text[:200]}" + } + except Exception as e: + return { + "success": False, + "name": name, + "kdt_id": kdt_id, + "message": f"未知错误: {str(e)}" + } + + def batch_checkin(self, shops: list, show_points: bool = True, show_goods: bool = True) -> list: + """ + 批量签到 + :param shops: 店铺列表 + :param show_points: 是否显示积分信息 + :param show_goods: 是否显示可兑换商品 + :return: 签到结果列表 + """ + results = [] + total = len(shops) + + print(f"\n🎯 开始批量签到,共 {total} 个店铺\n") + + for index, shop in enumerate(shops, 1): + kdt_id = shop.get("kdt_id") + + result = self.checkin(shop, show_points, show_goods) + results.append(result) + + checkin_id = shop.get("checkin_id") + print(f"🏪 店铺:{result['name']} id:checkinId={checkin_id}&kdt_id={kdt_id}") + + status_emoji = "✅" if result.get("success") else "❌" + print(f"{status_emoji} 签到结果:{result['message']}") + + if show_points and "points_info" in result: + points_info = result["points_info"] + if points_info.get("success"): + print(f"💰 拥有{points_info['points_name']}:{points_info['points_value']}") + + if show_goods and "goods_info" in result: + goods_info = result["goods_info"] + if goods_info.get("success"): + goods_list = goods_info.get("goods", []) + points_name = result.get("points_info", {}).get("points_name", "积分") + + if goods_list: + for goods in goods_list: + cash_str = f" + ¥{goods['cash']:.2f}" if goods['cash'] > 0 else "" + stock_emoji = "🟢" if goods['stock'] > 0 else "🔴" + print(f"{stock_emoji} 商品:{goods['name']} | 库存:{goods['stock']} | 需要{points_name}:{goods['points']}{cash_str}") + else: + print("📦 该店铺暂无可兑换商品") + else: + print(f"⚠️ 商品查询失败:{goods_info.get('message', '未知错误')}") + + if index < total: + time.sleep(2) + + print() + + return results + + +def print_banner(): + """打印横幅""" + print("=" * 60) + print(" " * 15 + "🎉 有赞商城签到通杀") + print("=" * 60) + print(f"⏰ 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("=" * 60) + + +def send_notification(results: list, account_count: int = 1): + """ + 发送通知 (适配青龙面板) + """ + try: + import notify + + success_count = sum(1 for r in results if r.get("success")) + fail_count = len(results) - success_count + + title = "有赞商城签到通知" + content = f"📊 签到完成\n\n" + + if account_count > 1: + content += f"👥 账号数: {account_count}\n" + + content += f"✅ 成功: {success_count} 个\n" + content += f"❌ 失败: {fail_count} 个\n\n" + + # 多账号时按账号分组显示 + if account_count > 1: + current_mobile = None + for r in results: + if r.get("mobile") != current_mobile: + current_mobile = r.get("mobile") + mobile_tail = r.get("mobile_tail", "****") + content += f"\n📱 账号尾号: {mobile_tail}\n" + + status = "✅" if r.get("success") else "❌" + content += f"{status} {r['name']}: {r['message']}\n" + else: + # 单账号时直接显示 + for r in results: + status = "✅" if r.get("success") else "❌" + content += f"{status} {r['name']}: {r['message']}\n" + + notify.send(title, content) + except ImportError: + pass + except Exception as e: + print(f"⚠️ 通知发送失败: {e}") + + +def parse_accounts(account_str: str) -> list: + """ + 解析账号配置字符串 + :param account_str: 账号配置字符串,支持换行符或&分隔多账号 + :return: 账号列表 + """ + accounts = [] + if not account_str: + return accounts + + # 支持多账号,用换行符或 & 分隔 + account_str = account_str.replace('&', '\n') + account_parts = account_str.split('\n') + + for part in account_parts: + part = part.strip() + if '#' in part: + mobile, password = part.split('#', 1) + accounts.append({ + "mobile": mobile.strip(), + "password": password.strip() + }) + + return accounts + + +def main(): + """主函数""" + print_banner() + + # 从环境变量读取账号配置 + account_str = os.getenv("YOUZAN_ACCOUNT", "") + + # 默认开启积分和商品查询 + show_points = True + show_goods = True + + if not account_str: + print("\n❌ 错误: 未找到账号配置") + print("\n📝 请在环境变量中配置:") + print(" 变量名: YOUZAN_ACCOUNT") + print(" 变量值: 手机号#密码") + print("\n💡 示例: 13800138000#yourpassword\n") + return + + accounts = parse_accounts(account_str) + if not accounts: + print("❌ 错误: YOUZAN_ACCOUNT 格式不正确") + print("📝 正确格式: 手机号#密码 或 手机号1#密码1\\n手机号2#密码2\n") + return + + print(f"\n📋 检测到 {len(accounts)} 个账号") + + valid_shops = [s for s in SHOPS if s.get("checkin_id") and s["checkin_id"] != "未知"] + if not valid_shops: + print("\n❌ 错误: 没有可用的店铺配置") + print("\n📝 请在脚本中配置 SHOPS 列表,并确保填写了正确的 checkin_id\n") + return + + all_results = [] + + # 外层循环:遍历所有账号 + for idx, account in enumerate(accounts, 1): + mobile = account['mobile'] + password = account['password'] + mobile_tail = mobile[-4:] # 获取手机号后4位 + + if len(accounts) > 1: + print(f"\n{'='*60}") + print(f"🔄 账号 {idx}/{len(accounts)}: {mobile}") + print(f"{'='*60}") + else: + print(f"\n📱 账号: {mobile}") + + token_cache = TokenCache() + token = None + auth = YouzanAuth() + + cache_result = token_cache.load_token(mobile) + + if cache_result.get("success"): + token = cache_result["token"] + cache_time = datetime.fromtimestamp(cache_result["cache_time"]).strftime('%Y-%m-%d %H:%M:%S') + print(f"✅ 使用缓存 Token({cache_time})") + + # 即使有缓存,也要设置 Cookie 到 session + auth.session.cookies.set("KDTSESSIONID", token) + print() + else: + print(f"🔐 正在登录...\n") + + login_result = auth.login_with_password(mobile, password) + + if login_result["success"]: + token = login_result["token"] + print(f"✅ {login_result['message']}") + + if token_cache.save_token(mobile, token): + print("💾 Token 已缓存\n") + else: + print("⚠️ Token 缓存失败\n") + else: + print(f"❌ 登录失败: {login_result['message']}\n") + continue + + if not token: + print("\n❌ 错误: 无法获取有效的 Token\n") + continue + + # 始终使用 auth.session,确保 Cookie 复用 + checker = YouzanCheckin(token, session=auth.session) + results = checker.batch_checkin(valid_shops, show_points, show_goods) + + token_expired = any(r.get("need_refresh_token") for r in results) + + if token_expired: + print("\n⚠️ 检测到 Token 已失效,正在重新登录...\n") + + token_cache.clear_token(mobile) + + login_result = auth.login_with_password(mobile, password) + + if login_result["success"]: + token = login_result["token"] + print("✅ 重新登录成功") + + if token_cache.save_token(mobile, token): + print("💾 Token 已缓存\n") + + print("🔄 重新签到中...\n") + checker = YouzanCheckin(token, session=auth.session) + results = checker.batch_checkin(valid_shops, show_points, show_goods) + else: + print(f"❌ 重新登录失败: {login_result['message']}\n") + + # 为每个结果添加账号信息 + for r in results: + r["mobile"] = mobile + r["mobile_tail"] = mobile_tail + + all_results.extend(results) + + # 发送通知 + if all_results: + send_notification(all_results, len(accounts)) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\n用户中断执行") + except Exception as e: + print(f"\n程序执行出错: {e}") + import traceback + traceback.print_exc()