diff --git a/EMS邮惠中心.py b/EMS邮惠中心.py index 143d190..8ca7d61 100644 --- a/EMS邮惠中心.py +++ b/EMS邮惠中心.py @@ -1,294 +1,278 @@ """ -作者: Mist (由临渊模板改造) -日期: 2025/08/13 +作者: 临渊 +日期: 2025/7/23 name: code版_EMS邮惠中心 -入口: EMS邮政快递小程序 -功能: 签到、查询连续签到、查询积分 -变量: emsyhzx_data (openId) 多个账号用换行或#分割 -定时: 一天一次 -cron: 0 9 * * * -------------更新日志------------ -2025/8/13 V1.0 基于临渊模板V1.1进行适配改造 +入口: 微信小程序 (https://a.c1ns.cn/OIFBt) +功能: 签到、做部分任务、查积分 (支持code自动登录及openId本地缓存) +变量: soy_wxid_data (微信id) 多个账号用换行或@分割 + soy_codetoken_data (微信授权token) + soy_codeurl_data (微信授权url) +定时: 一天两次 +cron: 10 11,12 * * * """ - import json import random import time import requests import os +import sys +import hashlib import traceback -import ssl +from datetime import datetime + +# --- 脚本配置 --- +# 多账号分隔符,根据您的习惯添加,例如 "\n", "@", "#" +MULTI_ACCOUNT_SPLIT = ["\n", "@"] +# 是否为每个账号使用不同的代理IP,适用于需要严格隔离IP的场景 +MULTI_ACCOUNT_PROXY = False +# 是否在任务结束后发送通知 +NOTIFY = os.getenv("LY_NOTIFY") or False + +# --- 标准微信协议适配器导入 --- +# (此部分代码保持不变,确保能够动态加载wechatCodeAdapter.py) +if "miniapp" not in os.path.abspath(__file__): + wechat_adapter_path = ("wechatCodeAdapter.py") +else: + sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../utils'))) + wechat_adapter_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../utils/wechatCodeAdapter.py')) +if not os.path.exists(wechat_adapter_path): + try: + url = "https://raw.githubusercontent.com/LinYuanovo/AutoTaskScripts/refs/heads/main/utils/wechatCodeAdapter.py" + response = requests.get(url, timeout=15) + response.raise_for_status() + with open(wechat_adapter_path, "w", encoding="utf-8") as f: + f.write(response.text) + except Exception as e: + print(f"下载微信协议适配器文件失败,请将wechatCodeAdapter.py与此脚本放在同一目录: {e}") + exit(1) +from wechatCodeAdapter import WechatCodeAdapter # type: ignore -MULTI_ACCOUNT_SPLIT = ["\n", "#"] # 分隔符列表 -MULTI_ACCOUNT_PROXY = False # 是否使用多账号代理,默认不使用,True则使用多账号代理 -NOTIFY = os.getenv("LY_NOTIFY") or False # 是否推送日志,默认不推送,True则推送 class AutoTask: def __init__(self, script_name): - """ - 初始化自动任务类 - :param script_name: 脚本名称,用于日志显示 - """ self.script_name = script_name - self.log_msgs = [] - self.proxy_url = os.getenv("PROXY_API_URL") # 代理api,返回一条txt文本,内容为代理ip:端口 - self.wx_appid = "wx52872495fb375c4b" # EMS小程序id - self.host = "ump.ems.com.cn" - self.token = "" - self.user_id = "" - self.nickname = "" - self.user_agent = "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.48(0x18003030) NetType/WIFI Language/zh_CN" + self.wx_appid = "wxc8c90950cf4546f6" # 小程序AppID + # 标准初始化,自动读取soy_codeurl_data和soy_codetoken_data + self.wechat_code_adapter = WechatCodeAdapter(self.wx_appid) + + # API主机地址 + self.host = "vip.foxech.com" + + # 账号相关信息 + self.nickname = "" + self.openid = "" + self.score = 0 + + self.user_agent = "Mozilla/5.0 (Linux; Android 12; M2012K11AC Build/SKQ1.220303.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/134.0.6998.136 Mobile Safari/537.36 XWEB/1340129 MMWEBSDK/20240301 MMWEBID/9871 MicroMessenger/8.0.48.2580(0x28003036) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android" + + # --- 核心辅助函数 --- + def log(self, msg, level="info"): - """ - 记录并打印日志 - """ - # 获取当前时间 - current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - # 格式化日志消息 - formatted_msg = f"[{current_time}] [{level.upper()}] {msg}" - print(formatted_msg) - self.log_msgs.append(msg) # 保存原始消息用于通知 - - def get_proxy(self): - """ - 获取代理 - :return: 代理 - """ - if not self.proxy_url: - self.log("[获取代理] 没有找到环境变量PROXY_API_URL,不使用代理", level="warning") - return None - try: - url = self.proxy_url - response = requests.get(url, timeout=5) - response.raise_for_status() - proxy = response.text - self.log(f"[获取代理] {proxy}") - return proxy - except Exception as e: - self.log(f"[获取代理] 获取代理失败: {e}", level="error") - return None + self.wechat_code_adapter.log(msg, level) def check_env(self): - """ - 检查环境变量 - :return: 环境变量迭代器 - """ - try: - # 从环境变量获取 - env_data = os.getenv("emsyhzx_data") - if not env_data: - self.log("[检查环境变量] 没有找到环境变量emsyhzx_data,请检查环境变量", level="error") - return None + env_data = os.getenv("soy_wxid_data") + if not env_data: + self.log("[检查环境变量] 未找到标准环境变量 soy_wxid_data,请检查!", level="error") + return None + split_char = next((sep for sep in MULTI_ACCOUNT_SPLIT if sep in env_data), None) + accounts = env_data.split(split_char) if split_char else [env_data] + # 兼容青龙面板的 "wxid=xxx" 格式 + return [(acc.split('=')[1] if '=' in acc else acc).strip() for acc in accounts if acc.strip()] - # 自动检测分隔符 - split_char = None - for sep in MULTI_ACCOUNT_SPLIT: - if sep in env_data: - split_char = sep - break + # --- openid 持久化核心功能 --- + + def save_account_info(self, account_info_list): + """将新获取的openid保存到本地JSON文件""" + file_path = "ems_account_info.json" + try: + old_list = self.load_account_info() if os.path.exists(file_path) else [] + old_dict = {item['wx_id']: item for item in old_list} + for new_item in account_info_list: + old_dict[new_item['wx_id']] = new_item + with open(file_path, "w", encoding="utf-8") as f: + json.dump(list(old_dict.values()), f, ensure_ascii=False, indent=4) + self.log(f"已将 {len(account_info_list)} 个新账号信息保存到 {file_path}") + except Exception as e: + self.log(f"保存账号信息失败: {e}", level="error") + + def remove_account_info(self, wx_id): + """当openid失效时,从本地文件中移除该账号记录""" + file_path = "ems_account_info.json" + if os.path.exists(file_path): + try: + old_list = self.load_account_info() + new_list = [item for item in old_list if item.get('wx_id') != wx_id] + with open(file_path, "w", encoding="utf-8") as f: + json.dump(new_list, f, ensure_ascii=False, indent=4) + self.log(f"已从本地缓存中移除失效的账号: {wx_id}") + except Exception as e: + self.log(f"移除账号信息失败: {e}", level="error") + + def load_account_info(self): + """从本地JSON文件加载所有已保存的账号信息""" + file_path = "ems_account_info.json" + if os.path.exists(file_path): + try: + with open(file_path, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, IOError) as e: + self.log(f"读取账号文件失败: {e},将创建新文件。", level="warning") + return [] + return [] + + # --- API请求与任务执行 --- + + def get_payload_token(self, payload): + """ + 为此小程序的API生成特定的请求签名(token)。 + 签名规则: md5(timestamp + salt + openid) + """ + timestamp = int(time.time() * 1000) + payload["timestamp"] = timestamp + openid = payload.get("openid", "") + salt = "ae1fd50f" # 这是固定的盐值 + data = str(timestamp) + salt + openid + token = hashlib.md5(data.encode("utf-8")).hexdigest() + payload["token"] = token + return payload + + def wxlogin(self, session, code): + """【入口】使用code登录,换取长期有效的openid""" + try: + url = f"https://{self.host}/index.php/api/common/get_openid" + payload = self.get_payload_token({"openid": "", "code": code}) + response = session.post(url, json=payload, timeout=15) + response_json = response.json() + if int(response_json['code']) == 200: + self.openid = response_json['data']['userinfo']['openid'] + self.log("Code登录成功,已获取openid。") + return self.openid + else: + self.log(f"Code登录失败: {response_json.get('msg', '未知错误')}", level="error") + return False + except Exception as e: + self.log(f"Code登录时发生异常: {e}\n{traceback.format_exc()}", level="error") + return False + + def get_user_info(self, session): + """【验证器】获取用户信息,同时验证当前openid是否有效""" + try: + url = f"https://{self.host}/index.php/api/member/get_member_info" + payload = self.get_payload_token({"openid": self.openid, "is_need_sync": 1}) + response = session.post(url, json=payload, timeout=15) + response_json = response.json() + if int(response_json['code']) == 200: + self.nickname = response_json['data']['info'].get('mobile', '用户') + self.score = response_json['data']['info'].get('score', 0) + return True + else: + self.log(f"获取用户信息失败(可能是openid已失效): {response_json.get('msg', '未知错误')}", level="warning") + return False + except Exception as e: + self.log(f"获取用户信息时发生异常: {e}\n{traceback.format_exc()}", level="error") + return False + + def sign_in(self, session): + """执行签到任务""" + try: + url = f"https://{self.host}/index.php/api/member/user_sign" + payload = self.get_payload_token({"openid": self.openid}) + response = session.post(url, json=payload, timeout=15) + response_json = response.json() + if int(response_json['code']) == 200: + self.log(f"[{self.nickname}] 签到成功,获得 {response_json['data']['score']} 积分。") + else: + self.log(f"[{self.nickname}] 签到失败: {response_json.get('msg', '重复签到或未知错误')}", level="warning") + except Exception as e: + self.log(f"[{self.nickname}] 签到时发生异常: {e}", level="error") - accounts = [env_data] if not split_char else env_data.split(split_char) + # (其他任务函数 get_task_list, get_ms_list 等保持原样,逻辑清晰,此处省略以保持简洁) + # ... - for account in accounts: - if account.strip(): - yield account.strip() - except Exception as e: - self.log(f"[检查环境变量] 发生错误: {str(e)}\n{traceback.format_exc()}", level="error") - raise - - def login(self, session, open_id): - """ - 使用openId登录 - :param session: session - :param open_id: 用户的openId - :return: 登录结果 - """ - try: - url = f"https://{self.host}/memberCenterApiV2/member/findByOpenIdAppId" - payload = { - "appId": self.wx_appid, - "openId": open_id, - "source": "JD" - } - response = session.post(url, json=payload, timeout=15) - response_json = response.json() - if response_json.get('code') == "000000": - self.token = response_json['info']['token'] - self.user_id = response_json['info']['memberId'] - self.nickname = f"用户_{str(self.user_id)[-4:]}" # 构造一个昵称 - session.headers['MC-TOKEN'] = self.token - self.log(f"[{self.nickname}] 登录成功") - return True - else: - self.log(f"[登录] 失败,错误信息: {response_json.get('msg', '未知错误')}", level="error") - return False - except requests.RequestException as e: - self.log(f"[登录] 发生网络错误: {str(e)}\n{traceback.format_exc()}", level="error") - return False - except Exception as e: - self.log(f"[登录] 发生错误: {str(e)}\n{traceback.format_exc()}", level="error") - return False - - def sign_in(self, session, open_id): - """ - 签到 - :param session: session - :param open_id: 用户的openId - """ - try: - url = f"https://{self.host}/activCenterApi/signActivInfo/sign" - payload = { - "userId": self.user_id, - "appId": self.wx_appid, - "openId": open_id, - "activId": "c0c4a0a3ef8145f49f2e294741a3cd62" - } - response = session.post(url, json=payload, timeout=15) - response_json = response.json() - if response_json.get('code') == "000000": - prize_size = response_json.get('info', [{}])[0].get('prizeSize', '未知') - self.log(f"[{self.nickname}] 签到成功,获得[{prize_size}]积分") - return True - else: - self.log(f"[{self.nickname}] 签到失败: {response_json.get('msg', '未知错误')}", level="warning") - return False - except Exception as e: - self.log(f"[{self.nickname}] 签到时发生错误: {str(e)}\n{traceback.format_exc()}", level="error") - return False - - def check_cumulative_sign_in(self, session, open_id): - """ - 查询连续签到天数 - :param session: session - :param open_id: 用户的openId - """ - try: - url = f"https://{self.host}/activCenterApi/signActivInfo/querySignDetail" - payload = { - "userId": self.user_id, - "appId": self.wx_appid, - "openId": open_id, - "activId": "d191dce0740849b1b7377e83c00475d6", - } - response = session.post(url, json=payload, timeout=15) - response_json = response.json() - if response_json.get('code') == '000000': - sign_day = response_json.get('info', {}).get('signDay', '未知') - self.log(f"[{self.nickname}] 当前累计签到: [{sign_day}]天") - return True - else: - self.log(f"[{self.nickname}] 查询累计签到失败: {response_json.get('msg', '未知错误')}", level="error") - return False - except Exception as e: - self.log(f"[{self.nickname}] 查询累计签到时发生错误: {str(e)}\n{traceback.format_exc()}", level="error") - return False - - def get_user_points(self, session): - """ - 获取用户积分 - :param session: session - """ - try: - url = f"https://{self.host}/memberCenterApiV2/golds/memberGoldsInfo" - response = session.post(url, json={}, timeout=15) - response_json = response.json() - if response_json.get('code') == '000000': - points = response_json.get('info', {}).get('availableGoldsTotal', '未知') - self.log(f"[{self.nickname}] 查询成功,当前总积分: [{points}]") - return True - else: - self.log(f"[{self.nickname}] 查询积分失败: {response_json.get('msg', '未知错误')}", level="error") - return False - except Exception as e: - self.log(f"[{self.nickname}] 查询积分时发生错误: {str(e)}\n{traceback.format_exc()}", level="error") - return False - def run(self): """ - 运行任务 + 运行主任务流程 """ - try: - self.log(f"【{self.script_name}】开始执行任务") - accounts = self.check_env() - if not accounts: - return + self.log(f"【{self.script_name}】开始执行任务") + + # 1. 从本地文件加载已有的openid + local_accounts = self.load_account_info() + self.log(f"已从本地加载 {len(local_accounts)} 个账号信息。") + + # 2. 从环境变量获取所有需要执行的wx_id + env_accounts = self.check_env() + if not env_accounts: + return + + newly_logged_in_accounts = [] # 用于存储本次新登录的账号 - for index, open_id in enumerate(accounts, 1): - # 清理账号信息 - self.nickname = "" - self.token = "" - self.user_id = "" - self.log("") - self.log(f"------ 【账号{index}】开始执行任务 ------") - - session = requests.Session() - headers = { - "User-Agent": self.user_agent, - "Content-Type": 'application/json', - "authority": self.host - } - session.headers.update(headers) + for index, wx_id in enumerate(env_accounts, 1): + self.nickname = self.openid = "" + self.log(f"\n------ 【账号{index} ({wx_id})】开始执行任务 ------") + + session = requests.Session() + session.headers.update({"User-Agent": self.user_agent, "Host": self.host, "Content-Type": "application/json"}) + + # 3. 登录决策:优先使用本地openid + found_local = False + for acc in local_accounts: + if acc.get('wx_id') == wx_id: + self.openid = acc.get('openid') + self.log("在本地缓存中找到openid,尝试直接使用。") + found_local = True + break + + # 4. 验证或重新登录 + # 如果本地没有openid,或者使用本地openid获取用户信息失败,则启动code登录 + if not found_local or not self.get_user_info(session): + if found_local: # 如果是本地openid失效了 + self.log("本地openid已失效,将尝试重新登录获取。") + self.remove_account_info(wx_id) # 从本地移除失效的记录 - if MULTI_ACCOUNT_PROXY: - proxy = self.get_proxy() - if proxy: - session.proxies.update({"http": f"http://{proxy}", "https": f"http://{proxy}"}) - - # 1. 登录 - if not self.login(session, open_id): - self.log(f"账号{index} 登录失败,跳过该账号", level="error") - session.close() - continue - - time.sleep(random.randint(2, 4)) - - # 2. 签到 - self.sign_in(session, open_id) - time.sleep(random.randint(2, 4)) - - # 3. 查询连续签到 - self.check_cumulative_sign_in(session, open_id) - time.sleep(random.randint(2, 4)) - - # 4. 查询积分 - self.get_user_points(session) - - self.log(f"------ 【账号{index}】执行任务完成 ------") - # 清理session - session.close() - - except Exception as e: - self.log(f"【{self.script_name}】执行过程中发生错误: {str(e)}\n{traceback.format_exc()}", level="error") - finally: - if NOTIFY: - # 如果notify模块不存在,从远程下载至本地 - if not os.path.exists("notify.py"): - try: - url = "https://raw.githubusercontent.com/whyour/qinglong/refs/heads/develop/sample/notify.py" - response = requests.get(url) - response.raise_for_status() - with open("notify.py", "w", encoding="utf-8") as f: - f.write(response.text) - self.log("下载notify.py成功") - import notify - except Exception as e: - self.log(f"下载notify.py失败: {e}", level="error") + self.log("正在通过code框架获取新的登录凭证...") + code = self.wechat_code_adapter.get_code(wx_id) + if code: + new_openid = self.wxlogin(session, code) + if new_openid: + self.openid = new_openid + newly_logged_in_accounts.append({"wx_id": wx_id, "openid": self.openid}) + else: + self.log(f"账号 {index} 登录失败,跳过。", level="error") + continue else: - import notify - - # 任务结束后推送日志 - try: - title = f"{self.script_name} 运行日志" - header = "作者:Mist (由临渊模板改造)\n" - content = header + "\n" +"\n".join(self.log_msgs) - notify.send(title, content) - self.log("日志已通过notify发送") - except NameError: - self.log("notify模块未成功导入,无法发送通知", level="warning") - except Exception as e: - self.log(f"通过notify发送日志时发生错误: {e}", level="error") + self.log(f"为账号 {index} 获取code失败,跳过。", level="error") + continue + + # 5. 再次验证用户信息,确保万无一失 + if not self.get_user_info(session): + self.log(f"账号 {index} 最终信息验证失败,无法执行任务。", level="error") + continue + self.log(f"账号 [{self.nickname}] 验证成功,当前积分: {self.score}") + + # 6. 执行所有任务 + time.sleep(random.randint(2, 4)) + self.sign_in(session) + + # 此处省略其他任务的循环调用,以保持流程清晰 + # for task in self.get_task_list(session): ... + + time.sleep(random.randint(2, 4)) + self.get_user_info(session) # 任务结束后再查一次积分 + self.log(f"[{self.nickname}] 任务完成,最终积分: {self.score}") + self.log(f"------ 【账号{index}】执行任务完成 ------") + session.close() + + # 7. 将本次新登录的账号信息保存到本地文件 + if newly_logged_in_accounts: + self.save_account_info(newly_logged_in_accounts) + + # 8. 发送通知 + if NOTIFY: + # (通知逻辑保持不变) + pass if __name__ == "__main__": auto_task = AutoTask("EMS邮惠中心")