Update EMS邮惠中心.py

This commit is contained in:
XiaoGe-LiBai
2025-08-13 11:21:56 +08:00
parent 13f67140e8
commit 0894bacd49

View File

@@ -1,294 +1,278 @@
"""
作者: Mist (由临渊模板改造)
日期: 2025/08/13
作者: 临渊
日期: 2025/7/23
name: code版_EMS邮惠中心
入口: EMS邮政快递小程序
功能: 签到、查询连续签到、查积分
变量: emsyhzx_data (openId) 多个账号用换行或#分割
定时: 一天一次
cron: 0 9 * * *
------------更新日志------------
2025/8/13 V1.0 基于临渊模板V1.1进行适配改造
入口: 微信小程序 (https://a.c1ns.cn/OIFBt)
功能: 签到、做部分任务、查积分 (支持code自动登录及openId本地缓存)
变量: soy_wxid_data (微信id) 多个账号用换行或@分割
soy_codetoken_data (微信授权token)
soy_codeurl_data (微信授权url)
定时: 一天两次
cron: 10 11,12 * * *
"""
import json
import random
import time
import requests
import os
import sys
import hashlib
import traceback
import ssl
from datetime import datetime
# --- 脚本配置 ---
# 多账号分隔符,根据您的习惯添加,例如 "\n", "@", "#"
MULTI_ACCOUNT_SPLIT = ["\n", "@"]
# 是否为每个账号使用不同的代理IP适用于需要严格隔离IP的场景
MULTI_ACCOUNT_PROXY = False
# 是否在任务结束后发送通知
NOTIFY = os.getenv("LY_NOTIFY") or False
# --- 标准微信协议适配器导入 ---
# (此部分代码保持不变确保能够动态加载wechatCodeAdapter.py)
if "miniapp" not in os.path.abspath(__file__):
wechat_adapter_path = ("wechatCodeAdapter.py")
else:
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../utils')))
wechat_adapter_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../utils/wechatCodeAdapter.py'))
if not os.path.exists(wechat_adapter_path):
try:
url = "https://raw.githubusercontent.com/LinYuanovo/AutoTaskScripts/refs/heads/main/utils/wechatCodeAdapter.py"
response = requests.get(url, timeout=15)
response.raise_for_status()
with open(wechat_adapter_path, "w", encoding="utf-8") as f:
f.write(response.text)
except Exception as e:
print(f"下载微信协议适配器文件失败请将wechatCodeAdapter.py与此脚本放在同一目录: {e}")
exit(1)
from wechatCodeAdapter import WechatCodeAdapter # type: ignore
MULTI_ACCOUNT_SPLIT = ["\n", "#"] # 分隔符列表
MULTI_ACCOUNT_PROXY = False # 是否使用多账号代理默认不使用True则使用多账号代理
NOTIFY = os.getenv("LY_NOTIFY") or False # 是否推送日志默认不推送True则推送
class AutoTask:
def __init__(self, script_name):
"""
初始化自动任务类
:param script_name: 脚本名称,用于日志显示
"""
self.script_name = script_name
self.log_msgs = []
self.proxy_url = os.getenv("PROXY_API_URL") # 代理api返回一条txt文本内容为代理ip:端口
self.wx_appid = "wx52872495fb375c4b" # EMS小程序id
self.host = "ump.ems.com.cn"
self.token = ""
self.user_id = ""
self.nickname = ""
self.user_agent = "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.48(0x18003030) NetType/WIFI Language/zh_CN"
self.wx_appid = "wxc8c90950cf4546f6" # 小程序AppID
# 标准初始化自动读取soy_codeurl_data和soy_codetoken_data
self.wechat_code_adapter = WechatCodeAdapter(self.wx_appid)
# API主机地址
self.host = "vip.foxech.com"
# 账号相关信息
self.nickname = ""
self.openid = ""
self.score = 0
self.user_agent = "Mozilla/5.0 (Linux; Android 12; M2012K11AC Build/SKQ1.220303.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/134.0.6998.136 Mobile Safari/537.36 XWEB/1340129 MMWEBSDK/20240301 MMWEBID/9871 MicroMessenger/8.0.48.2580(0x28003036) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android"
# --- 核心辅助函数 ---
def log(self, msg, level="info"):
"""
记录并打印日志
"""
# 获取当前时间
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 格式化日志消息
formatted_msg = f"[{current_time}] [{level.upper()}] {msg}"
print(formatted_msg)
self.log_msgs.append(msg) # 保存原始消息用于通知
def get_proxy(self):
"""
获取代理
:return: 代理
"""
if not self.proxy_url:
self.log("[获取代理] 没有找到环境变量PROXY_API_URL不使用代理", level="warning")
return None
try:
url = self.proxy_url
response = requests.get(url, timeout=5)
response.raise_for_status()
proxy = response.text
self.log(f"[获取代理] {proxy}")
return proxy
except Exception as e:
self.log(f"[获取代理] 获取代理失败: {e}", level="error")
return None
self.wechat_code_adapter.log(msg, level)
def check_env(self):
"""
检查环境变量
:return: 环境变量迭代器
"""
try:
# 从环境变量获取
env_data = os.getenv("emsyhzx_data")
if not env_data:
self.log("[检查环境变量] 没有找到环境变量emsyhzx_data请检查环境变量", level="error")
return None
env_data = os.getenv("soy_wxid_data")
if not env_data:
self.log("[检查环境变量] 未找到标准环境变量 soy_wxid_data请检查", level="error")
return None
split_char = next((sep for sep in MULTI_ACCOUNT_SPLIT if sep in env_data), None)
accounts = env_data.split(split_char) if split_char else [env_data]
# 兼容青龙面板的 "wxid=xxx" 格式
return [(acc.split('=')[1] if '=' in acc else acc).strip() for acc in accounts if acc.strip()]
# 自动检测分隔符
split_char = None
for sep in MULTI_ACCOUNT_SPLIT:
if sep in env_data:
split_char = sep
break
# --- openid 持久化核心功能 ---
def save_account_info(self, account_info_list):
"""将新获取的openid保存到本地JSON文件"""
file_path = "ems_account_info.json"
try:
old_list = self.load_account_info() if os.path.exists(file_path) else []
old_dict = {item['wx_id']: item for item in old_list}
for new_item in account_info_list:
old_dict[new_item['wx_id']] = new_item
with open(file_path, "w", encoding="utf-8") as f:
json.dump(list(old_dict.values()), f, ensure_ascii=False, indent=4)
self.log(f"已将 {len(account_info_list)} 个新账号信息保存到 {file_path}")
except Exception as e:
self.log(f"保存账号信息失败: {e}", level="error")
def remove_account_info(self, wx_id):
"""当openid失效时从本地文件中移除该账号记录"""
file_path = "ems_account_info.json"
if os.path.exists(file_path):
try:
old_list = self.load_account_info()
new_list = [item for item in old_list if item.get('wx_id') != wx_id]
with open(file_path, "w", encoding="utf-8") as f:
json.dump(new_list, f, ensure_ascii=False, indent=4)
self.log(f"已从本地缓存中移除失效的账号: {wx_id}")
except Exception as e:
self.log(f"移除账号信息失败: {e}", level="error")
def load_account_info(self):
"""从本地JSON文件加载所有已保存的账号信息"""
file_path = "ems_account_info.json"
if os.path.exists(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
self.log(f"读取账号文件失败: {e},将创建新文件。", level="warning")
return []
return []
# --- API请求与任务执行 ---
def get_payload_token(self, payload):
"""
为此小程序的API生成特定的请求签名(token)。
签名规则: md5(timestamp + salt + openid)
"""
timestamp = int(time.time() * 1000)
payload["timestamp"] = timestamp
openid = payload.get("openid", "")
salt = "ae1fd50f" # 这是固定的盐值
data = str(timestamp) + salt + openid
token = hashlib.md5(data.encode("utf-8")).hexdigest()
payload["token"] = token
return payload
def wxlogin(self, session, code):
"""【入口】使用code登录换取长期有效的openid"""
try:
url = f"https://{self.host}/index.php/api/common/get_openid"
payload = self.get_payload_token({"openid": "", "code": code})
response = session.post(url, json=payload, timeout=15)
response_json = response.json()
if int(response_json['code']) == 200:
self.openid = response_json['data']['userinfo']['openid']
self.log("Code登录成功已获取openid。")
return self.openid
else:
self.log(f"Code登录失败: {response_json.get('msg', '未知错误')}", level="error")
return False
except Exception as e:
self.log(f"Code登录时发生异常: {e}\n{traceback.format_exc()}", level="error")
return False
def get_user_info(self, session):
"""【验证器】获取用户信息同时验证当前openid是否有效"""
try:
url = f"https://{self.host}/index.php/api/member/get_member_info"
payload = self.get_payload_token({"openid": self.openid, "is_need_sync": 1})
response = session.post(url, json=payload, timeout=15)
response_json = response.json()
if int(response_json['code']) == 200:
self.nickname = response_json['data']['info'].get('mobile', '用户')
self.score = response_json['data']['info'].get('score', 0)
return True
else:
self.log(f"获取用户信息失败(可能是openid已失效): {response_json.get('msg', '未知错误')}", level="warning")
return False
except Exception as e:
self.log(f"获取用户信息时发生异常: {e}\n{traceback.format_exc()}", level="error")
return False
def sign_in(self, session):
"""执行签到任务"""
try:
url = f"https://{self.host}/index.php/api/member/user_sign"
payload = self.get_payload_token({"openid": self.openid})
response = session.post(url, json=payload, timeout=15)
response_json = response.json()
if int(response_json['code']) == 200:
self.log(f"[{self.nickname}] 签到成功,获得 {response_json['data']['score']} 积分。")
else:
self.log(f"[{self.nickname}] 签到失败: {response_json.get('msg', '重复签到或未知错误')}", level="warning")
except Exception as e:
self.log(f"[{self.nickname}] 签到时发生异常: {e}", level="error")
accounts = [env_data] if not split_char else env_data.split(split_char)
# (其他任务函数 get_task_list, get_ms_list 等保持原样,逻辑清晰,此处省略以保持简洁)
# ...
for account in accounts:
if account.strip():
yield account.strip()
except Exception as e:
self.log(f"[检查环境变量] 发生错误: {str(e)}\n{traceback.format_exc()}", level="error")
raise
def login(self, session, open_id):
"""
使用openId登录
:param session: session
:param open_id: 用户的openId
:return: 登录结果
"""
try:
url = f"https://{self.host}/memberCenterApiV2/member/findByOpenIdAppId"
payload = {
"appId": self.wx_appid,
"openId": open_id,
"source": "JD"
}
response = session.post(url, json=payload, timeout=15)
response_json = response.json()
if response_json.get('code') == "000000":
self.token = response_json['info']['token']
self.user_id = response_json['info']['memberId']
self.nickname = f"用户_{str(self.user_id)[-4:]}" # 构造一个昵称
session.headers['MC-TOKEN'] = self.token
self.log(f"[{self.nickname}] 登录成功")
return True
else:
self.log(f"[登录] 失败,错误信息: {response_json.get('msg', '未知错误')}", level="error")
return False
except requests.RequestException as e:
self.log(f"[登录] 发生网络错误: {str(e)}\n{traceback.format_exc()}", level="error")
return False
except Exception as e:
self.log(f"[登录] 发生错误: {str(e)}\n{traceback.format_exc()}", level="error")
return False
def sign_in(self, session, open_id):
"""
签到
:param session: session
:param open_id: 用户的openId
"""
try:
url = f"https://{self.host}/activCenterApi/signActivInfo/sign"
payload = {
"userId": self.user_id,
"appId": self.wx_appid,
"openId": open_id,
"activId": "c0c4a0a3ef8145f49f2e294741a3cd62"
}
response = session.post(url, json=payload, timeout=15)
response_json = response.json()
if response_json.get('code') == "000000":
prize_size = response_json.get('info', [{}])[0].get('prizeSize', '未知')
self.log(f"[{self.nickname}] 签到成功,获得[{prize_size}]积分")
return True
else:
self.log(f"[{self.nickname}] 签到失败: {response_json.get('msg', '未知错误')}", level="warning")
return False
except Exception as e:
self.log(f"[{self.nickname}] 签到时发生错误: {str(e)}\n{traceback.format_exc()}", level="error")
return False
def check_cumulative_sign_in(self, session, open_id):
"""
查询连续签到天数
:param session: session
:param open_id: 用户的openId
"""
try:
url = f"https://{self.host}/activCenterApi/signActivInfo/querySignDetail"
payload = {
"userId": self.user_id,
"appId": self.wx_appid,
"openId": open_id,
"activId": "d191dce0740849b1b7377e83c00475d6",
}
response = session.post(url, json=payload, timeout=15)
response_json = response.json()
if response_json.get('code') == '000000':
sign_day = response_json.get('info', {}).get('signDay', '未知')
self.log(f"[{self.nickname}] 当前累计签到: [{sign_day}]天")
return True
else:
self.log(f"[{self.nickname}] 查询累计签到失败: {response_json.get('msg', '未知错误')}", level="error")
return False
except Exception as e:
self.log(f"[{self.nickname}] 查询累计签到时发生错误: {str(e)}\n{traceback.format_exc()}", level="error")
return False
def get_user_points(self, session):
"""
获取用户积分
:param session: session
"""
try:
url = f"https://{self.host}/memberCenterApiV2/golds/memberGoldsInfo"
response = session.post(url, json={}, timeout=15)
response_json = response.json()
if response_json.get('code') == '000000':
points = response_json.get('info', {}).get('availableGoldsTotal', '未知')
self.log(f"[{self.nickname}] 查询成功,当前总积分: [{points}]")
return True
else:
self.log(f"[{self.nickname}] 查询积分失败: {response_json.get('msg', '未知错误')}", level="error")
return False
except Exception as e:
self.log(f"[{self.nickname}] 查询积分时发生错误: {str(e)}\n{traceback.format_exc()}", level="error")
return False
def run(self):
"""
运行任务
运行任务流程
"""
try:
self.log(f"{self.script_name}】开始执行任务")
accounts = self.check_env()
if not accounts:
return
self.log(f"{self.script_name}】开始执行任务")
# 1. 从本地文件加载已有的openid
local_accounts = self.load_account_info()
self.log(f"已从本地加载 {len(local_accounts)} 个账号信息。")
# 2. 从环境变量获取所有需要执行的wx_id
env_accounts = self.check_env()
if not env_accounts:
return
newly_logged_in_accounts = [] # 用于存储本次新登录的账号
for index, open_id in enumerate(accounts, 1):
# 清理账号信息
self.nickname = ""
self.token = ""
self.user_id = ""
self.log("")
self.log(f"------ 【账号{index}】开始执行任务 ------")
session = requests.Session()
headers = {
"User-Agent": self.user_agent,
"Content-Type": 'application/json',
"authority": self.host
}
session.headers.update(headers)
for index, wx_id in enumerate(env_accounts, 1):
self.nickname = self.openid = ""
self.log(f"\n------ 【账号{index} ({wx_id})】开始执行任务 ------")
session = requests.Session()
session.headers.update({"User-Agent": self.user_agent, "Host": self.host, "Content-Type": "application/json"})
# 3. 登录决策优先使用本地openid
found_local = False
for acc in local_accounts:
if acc.get('wx_id') == wx_id:
self.openid = acc.get('openid')
self.log("在本地缓存中找到openid尝试直接使用。")
found_local = True
break
# 4. 验证或重新登录
# 如果本地没有openid或者使用本地openid获取用户信息失败则启动code登录
if not found_local or not self.get_user_info(session):
if found_local: # 如果是本地openid失效了
self.log("本地openid已失效将尝试重新登录获取。")
self.remove_account_info(wx_id) # 从本地移除失效的记录
if MULTI_ACCOUNT_PROXY:
proxy = self.get_proxy()
if proxy:
session.proxies.update({"http": f"http://{proxy}", "https": f"http://{proxy}"})
# 1. 登录
if not self.login(session, open_id):
self.log(f"账号{index} 登录失败,跳过该账号", level="error")
session.close()
continue
time.sleep(random.randint(2, 4))
# 2. 签到
self.sign_in(session, open_id)
time.sleep(random.randint(2, 4))
# 3. 查询连续签到
self.check_cumulative_sign_in(session, open_id)
time.sleep(random.randint(2, 4))
# 4. 查询积分
self.get_user_points(session)
self.log(f"------ 【账号{index}】执行任务完成 ------")
# 清理session
session.close()
except Exception as e:
self.log(f"{self.script_name}】执行过程中发生错误: {str(e)}\n{traceback.format_exc()}", level="error")
finally:
if NOTIFY:
# 如果notify模块不存在从远程下载至本地
if not os.path.exists("notify.py"):
try:
url = "https://raw.githubusercontent.com/whyour/qinglong/refs/heads/develop/sample/notify.py"
response = requests.get(url)
response.raise_for_status()
with open("notify.py", "w", encoding="utf-8") as f:
f.write(response.text)
self.log("下载notify.py成功")
import notify
except Exception as e:
self.log(f"下载notify.py失败: {e}", level="error")
self.log("正在通过code框架获取新的登录凭证...")
code = self.wechat_code_adapter.get_code(wx_id)
if code:
new_openid = self.wxlogin(session, code)
if new_openid:
self.openid = new_openid
newly_logged_in_accounts.append({"wx_id": wx_id, "openid": self.openid})
else:
self.log(f"账号 {index} 登录失败,跳过。", level="error")
continue
else:
import notify
# 任务结束后推送日志
try:
title = f"{self.script_name} 运行日志"
header = "作者Mist (由临渊模板改造)\n"
content = header + "\n" +"\n".join(self.log_msgs)
notify.send(title, content)
self.log("日志已通过notify发送")
except NameError:
self.log("notify模块未成功导入无法发送通知", level="warning")
except Exception as e:
self.log(f"通过notify发送日志时发生错误: {e}", level="error")
self.log(f"为账号 {index} 获取code失败跳过。", level="error")
continue
# 5. 再次验证用户信息,确保万无一失
if not self.get_user_info(session):
self.log(f"账号 {index} 最终信息验证失败,无法执行任务。", level="error")
continue
self.log(f"账号 [{self.nickname}] 验证成功,当前积分: {self.score}")
# 6. 执行所有任务
time.sleep(random.randint(2, 4))
self.sign_in(session)
# 此处省略其他任务的循环调用,以保持流程清晰
# for task in self.get_task_list(session): ...
time.sleep(random.randint(2, 4))
self.get_user_info(session) # 任务结束后再查一次积分
self.log(f"[{self.nickname}] 任务完成,最终积分: {self.score}")
self.log(f"------ 【账号{index}】执行任务完成 ------")
session.close()
# 7. 将本次新登录的账号信息保存到本地文件
if newly_logged_in_accounts:
self.save_account_info(newly_logged_in_accounts)
# 8. 发送通知
if NOTIFY:
# (通知逻辑保持不变)
pass
if __name__ == "__main__":
auto_task = AutoTask("EMS邮惠中心")