This commit is contained in:
uan7
2024-04-25 17:03:04 +08:00
committed by GitHub
parent e10c0f3e7d
commit 64a025e447
12 changed files with 3135 additions and 0 deletions

352
dq点单签到.py Normal file
View File

@@ -0,0 +1,352 @@
"""
变量: 手机号码
变量名: dqqdck
例如 export dqqdck='1380013800#备注
多账号 换行/回车
脚本作者: QGh3amllamll
版本 1.0
------更新记录----
1.0 测试版
"""
import os
import requests
from datetime import datetime, timezone, timedelta
import json
import time
import io
import sys
import requests
import base64
enable_notification = 1 #0不发送通知 1发送通知
# 只有在需要发送通知时才尝试导入notify模块
if enable_notification == 1:
try:
from notify import send
except ModuleNotFoundError:
print("警告未找到notify.py模块。它不是一个依赖项请勿错误安装。程序将退出。")
sys.exit(1)
#---------简化的框架 0.41 带通知--------
import sys
import os
def get_python_version():
version = "python" + ".".join(str(i) for i in sys.version_info[:2])
return version
# 使用函数获取Python版本并打印
PYTHON_VERSION = get_python_version()
print("本程序只支持在Python 3.10环境下运行。当前Python版本:", PYTHON_VERSION)
print()
enable_notification = 1 #0不发送通知 1发送通知
# 获取北京日期的函数
def get_beijing_date():
beijing_time = datetime.now(timezone(timedelta(hours=8)))
return beijing_time.date()
def dq_time():
# 获取当前时间戳
dqsj = int(time.time())
# 将时间戳转换为可读的时间格式
dysj = datetime.fromtimestamp(dqsj).strftime('%Y-%m-%d %H:%M:%S')
#print("当前时间戳:", dqsj)
#print("转换后的时间:", dysj)
return dqsj, dysj
def log(message):
print(message)
def print_disclaimer():
log("📢 请认真阅读以下声明")
log(" 【免责声明】 ")
log("✨ 脚本及其中涉及的任何解密分析程序,仅用于测试和学习研究")
log("✨ 禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断")
log("✨ 禁止任何公众号、自媒体进行任何形式的转载、发布")
log("✨ 本人对任何脚本问题概不负责,包括但不限于由任何脚本错误导致的任何损失或损害")
log("✨ 脚本文件请在下载试用后24小时内自行删除")
log("✨ 脚本文件如有不慎被破解或修改由破解或修改者承担")
log("✨ 如不接受此条款请立即删除脚本文件")
log("" * 10)
log(f'-----------DQ 签到 1.0-----------')
log( " .....................阿弥陀佛.......................")
log( " _oo0oo_ ")
log( " o8888888o ")
log( ' 88" . "88 ')
log( " (| -_- |) ")
log( " 0\\ = /0 ")
log( " ___/---\\___ ")
log( " .' \\| |/ '. ")
log( " / \\\\||| : |||// \\ ")
log( " / _||||| -卍-|||||_ \\ ")
log( " | | \\\\\\ - /// | | ")
log( " | \\_| ''\\---/'' |_/ | ")
log( " \\ .-\\__ '-' ___/-. / ")
log( " ___'. .' /--.--\\ '. .'___ ")
log( " ."" < .___\\_<|>_/___.> "". ")
log( " | | : - \\.;\\ _ /;./ - : | | ")
log( " \\ \\ _. \\_ __\\ /__ _/ .- / / ")
log( " =====-.____.___ \\_____/___.-___.-===== ")
log( " =---= ")
log( " ")
log( "...................佛祖保佑 ,永无BUG.................")
log(f'-----------DQ 签到 1.0-----------')
# 获取环境变量
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
print(f'环境变量{var_name}未设置,请检查。')
return None
accounts = value.strip().split('\n')
num_accounts = len(accounts)
print(f'-----------本次账号运行数量:{num_accounts}-----------')
#print(f'-----------DQ 签到 1.0-----------')
print_disclaimer()
return accounts
#-------------------------------封装请求-------------
def create_headers(new_session):
headers = {
"accept": "application/json, text/plain, */*",
"content-length": "2",
"channel": "311",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309092b) XWEB/8555",
"tenant": "1",
"content-type": "application/json;charset=UTF-8",
"host": "wechat.dairyqueen.com.cn",
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"accept-encoding": "gzip, deflate, br",
"accept-language": "zh-CN,zh;q=0.9",
"cookie": f"SESSION={new_session}"
}
return headers
#-------------------------------封装请求---完成----------
def tjhmhqsign(hm): #提交号码 获取sign 1
url = "https://wxxcx.dairyqueen.com.cn/UserXueLi?_actionName=getXueLiSign&serviceId=4&actionId=9&key=30274185e983a6c6"
headers = {
'host': 'wxxcx.dairyqueen.com.cn',
'content-length': '99',
'xweb_xhr': '1',
#'cookie': 'JSESSIONID=',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309092b) XWEB/8555',
'content-type': 'application/json',
'accept': '*/*',
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': 'https://servicewechat.com/wx22e5ce7c766b4b78/131/page-frame.html',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
}
current_timestamp = int(time.time() * 1000) # 动态生成时间戳
payload = {
"content": {
"bindingAccount": hm, # 动态传入的手机号码或账号
"tenantId": 1,
"channelId": 311,
"timestamp": current_timestamp
}
}
#print("获取学力成功:", payload)
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status() # 确保请求成功
# 解析响应体中的JSON数据
response_data = response.json()
#print("获取学力成功:", response_data)
# 从响应数据中提取 sign
sign = response_data.get('data', {}).get('sign')
# 从响应头中获取 JSESSIONID
cookid = response.cookies.get('JSESSIONID')
return sign, cookid, current_timestamp # 返回包含时间戳的元组
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
return None, None, None
def tj_signhqck(hm, sign, cookid, current_timestamp): # 提交sign 获取Cookie 2
url = "https://wechat.dairyqueen.com.cn/loginNoLandfall"
jsessionid_base64 = base64.b64encode(cookid.encode()).decode()
payload = {
"bindingAccount": hm,
"tenantId": "1",
"channelId": "311",
"timestamp": current_timestamp,
"sign": sign
}
#print("Payload:", payload)
headers = {
"channel": "311",
"tenant": "1",
"origin": "https://wechat.dairyqueen.com.cn",
"x-requested-with": "com.tencent.mm",
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"content-type": "application/json;charset=UTF-8",
"referer": f"https://wechat.dairyqueen.com.cn/webview/dq/index.html?bindingAccount={hm}&tenantId=1&channelId=311&timestamp={current_timestamp}&sign={sign}",
"accept-encoding": "gzip, deflate",
"Cookie": f"SESSION={jsessionid_base64}"
}
#print("Headers:", headers)
response = requests.post(url, headers=headers, json=payload)
#print("响应头:", response.headers)
new_session = None
set_cookie_header = response.headers.get('Set-Cookie')
if set_cookie_header and "SESSION=" in set_cookie_header:
for part in set_cookie_header.split(';'):
if part.strip().startswith("SESSION="):
new_session = part.strip().split('=')[1]
break
#print("新的 SESSION 值:", new_session)
try:
response.raise_for_status()
response_data = response.json()
#print("登录操作成功:", response_data)
return response_data, new_session
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
return None, new_session
def sign_in(new_session):#执行签到操作
url = "https://wechat.dairyqueen.com.cn/memSignIn/signIn"
headers = create_headers(new_session)
try:
response = requests.post(url, headers=headers, json={})
response.raise_for_status()
response_data = response.json()
if response_data.get('code') == 200:
print("成功签到")
elif response_data.get('code') == 11028:
print(response_data.get('message'))
else:
print("响应内容:", response_data)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
def xz(new_session):
url = "https://wechat.dairyqueen.com.cn/member/info"
headers = create_headers(new_session)
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
response_data = response.json()
if response_data.get('code') == 200:
group_points = response_data.get('data', {}).get('groupPoints')
print("我的积分:", group_points)
else:
print("响应体:", response_data)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
#本地测试用
os.environ['cscs'] = '''
'''
class Tee:
def __init__(self, *files):
self.files = files
def write(self, obj):
for file in self.files:
file.write(obj)
file.flush()
def flush(self):
for file in self.files:
file.flush()
def main():
var_name = 'dqqdck'
tokens = get_env_variable(var_name)
if not tokens:
print(f'环境变量{var_name}未设置,请检查。')
return
captured_output = io.StringIO()
original_stdout = sys.stdout
sys.stdout = Tee(sys.stdout, captured_output)
total_accounts = len(tokens)
for i, token in enumerate(tokens):
parts = token.split('#')
if len(parts) < 1:
print("令牌格式不正确。跳过处理。")
continue
hm = parts[0]
account_no = parts[1] if len(parts) > 1 else "" # 备注信息
print(f'------账号 {i+1}/{total_accounts} {account_no} -------')
sign, cookid, current_timestamp = tjhmhqsign(hm)
if sign and cookid and current_timestamp:
response_data, new_session = tj_signhqck(hm, sign, cookid, current_timestamp)
if new_session:
sign_in(new_session)
xz(new_session)
else:
print("未能获取新的SESSION值")
else:
print("手机号码获取sign失败")
sys.stdout = original_stdout
output_content = captured_output.getvalue()
captured_output.close()
if enable_notification == 1:
send("dq点单签到 通知", output_content)
#print("通知已发送。输出内容为:")
#print(output_content)
if __name__ == "__main__":
main()

149
mhdy.py Normal file
View File

@@ -0,0 +1,149 @@
'''
new Env('梦幻岛屿');
BY:YourAhTzu
日期:1.10 7:53 (修复多号报错)
注册链接:http://mh.youwanzz.com/#/pages/reg?id=a413
格式:账号&密码
去主页绑定zfb在运行脚本需要签到十天
'''
import os
import requests
import json
import random
import time
def login(tel, pwd):
print(">>>>>开始登录账号<<<<<")
url = "http://mhapi.youwanzz.com:4005/index/Login"
headers = {
"Host": "mhapi.youwanzz.com:4005",
"Connection": "keep-alive",
"Content-Length": "45",
"User-Agent": "Mozilla/5.0 (Linux; Android 12; RMX3562 Build/SP1A.210812.016) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.98 Mobile Safari/537.36",
"custom-header": "hello",
"content-type": "application/x-www-form-urlencoded",
"Accept": "*/*",
"Origin": "http://mh.youwanzz.com",
"X-Requested-With": "mark.via",
"Referer": "http://mh.youwanzz.com/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = {
"txyzm": "",
"type": "dl",
"tel": tel,
"pwd": pwd
}
response = requests.post(url, headers=headers, data=data)
data = response.json()
if "token" in data:
token = data["token"]
uid = data["id"]
print(data["msg"])
return token, uid
else:
print("登录失败:", data["msg"])
return None, None
def qian_dao(token, uid):
print(">>>>>开始执行签到<<<<<")
url = "http://mhapi.youwanzz.com:4005/Index/QianDao"
headers = {
"Host": "mhapi.youwanzz.com:4005",
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Linux; Android 12; RMX3562 Build/SP1A.210812.016) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.98 Mobile Safari/537.36",
"custom-header": "hello",
"content-type": "application/x-www-form-urlencoded",
"Accept": "*/*",
"Origin": "http://mh.youwanzz.com",
"X-Requested-With": "mark.via",
"Referer": "http://mh.youwanzz.com/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = {
'id': uid,
'token': token
}
response = requests.post(url, headers=headers, data=data)
result = response.json()
print(result['msg'])
def LingRenWu(token, uid):
print(">>>>>开始领取月卡金币<<<<<")
url = "http://mhapi.youwanzz.com:4005/My/LingRenWu"
headers = {
"Host": "mhapi.youwanzz.com:4005",
"Connection": "keep-alive",
"Content-Length": "52",
"User-Agent": "Mozilla/5.0 (Linux; Android 12; RMX3562 Build/SP1A.210812.016) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.98 Mobile Safari/537.36",
"custom-header": "hello",
"content-type": "application/x-www-form-urlencoded",
"Accept": "*/*",
"Origin": "http://mh.youwanzz.com",
"X-Requested-With": "mark.via",
"Referer": "http://mh.youwanzz.com/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = {
'id': uid,
'token': token,
"kid": "1"
}
response = requests.post(url, headers=headers, data=data)
result = response.json()
print(result['msg'])
def GetHome(token, uid):
print(">>>>>开始获取红包和金币<<<<<")
url = 'http://mhapi.youwanzz.com:4005/Index/GetHome'
headers = {
'Host': 'mhapi.youwanzz.com:4005',
'Connection': 'keep-alive',
'Content-Length': '46',
'User-Agent': 'Mozilla/5.0 (Linux; Android 12; RMX3562 Build/SP1A.210812.016) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.98 Mobile Safari/537.36',
'custom-header': 'hello',
'content-type': 'application/x-www-form-urlencoded',
'Accept': '*/*',
'Origin': 'http://mh.youwanzz.com',
'X-Requested-With': 'mark.via',
'Referer': 'http://mh.youwanzz.com/',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7'
}
data = {
'id': uid,
'token': token
}
response = requests.post(url, headers=headers, data=data)
response_data = response.text
data = json.loads(response_data)
jinbi = data["jinbi"]
hongbao = data["hongbao"]
print("当前金币:", jinbi,"当前红包:", hongbao)
if __name__ == '__main__':
mhdy = os.environ.get('mhdy')
if mhdy:
account_list = mhdy.split('@')
for account in account_list:
tel, pwd = account.split('&')
token, uid = login(tel, pwd)
if token is not None and uid is not None:
login_delay = random.randint(15, 25)
time.sleep(login_delay)
qian_dao(token, uid)
LingRenWu(token, uid)
GetHome(token, uid)
task_delay = random.randint(5, 10)
print(f"等待{task_delay}秒后执行下一个任务")
time.sleep(task_delay)
print("-------------------------------------")
else:
print("账号登录失败,请检查账号密码是否正确")
print("-------------------------------------")
else:
print("请设置环境变量 mhdy")

303
mxbc.js Normal file

File diff suppressed because one or more lines are too long

30
nfsq.cookie.js Normal file

File diff suppressed because one or more lines are too long

367
smart汽车.py Normal file
View File

@@ -0,0 +1,367 @@
"""
项目名称 smart汽车+
变量 sadi#saui#请求体
变量 sadi#saui#请求体#备注 可以增加备注
变量名 smartCK
多账号 换行/回车
脚本作者: QGh3amllamll 版本2.0
搜api/smart/web/1.0/oauth/miniapp/quicklogin
全部请求体
{"gbk":"dpe...=","encryptedData":"WVT...==","openid":"oWAvd5.....","unionid":"oiRgW6lL4TtwjaTUwc85rgfa-cms","tongdun_token":"Pb..jV9","extras":{"checkBinding":true}}
例如:
sadi#saui#{"gbk":"dpe...=","encryptedData":"WVT...==","openid":"oWAvd5.....","unionid":"oiRgW6lL4TtwjaTUwc85rgfa-cms","tongdun_token":"Pb..jV9","extras":{"checkBinding":true}}
saui不在一起 任意找一个
---------------------更新说明----------
1.1版本更新 获取token数据的 UTC时间
时间2013.11.19.21
1.2版本更新 打印信息 时间2023.11.20.1:30
1.3版本 更新 通知 重写打印 2023年11月28日01:33:28
1.4版本 更新 30天开宝箱 重写发送通知 2023.12.18 01点
2.0版本 重写代码 2023.12.18 02点
"""
import os
import requests
from datetime import datetime, timezone, timedelta
import json
import time
import random
import sys
from io import StringIO
# 控制变量,用于控制是否发送通知
enable_notification = 1 #0 不发送 1发送通知
# 如果需要发送通知则尝试导入notify模块
if enable_notification:
try:
from notify import send
except ModuleNotFoundError:
print("警告未找到notify.py模块。程序将退出。")
sys.exit(1)
#---------简化的框架0.4--------
# 配置参数
#base_url = "https://hxxxy.gov.cn" # 已修改为实际的基础URL
user_agent = "Mozilla/5.0 (Linux; Android 11; ONEPLUS A6000 Build/RKQ1.201217.002; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/111.0.5563.116 Mobile Safari/537.36 XWEB/1110005 MMWEBSDK/20230405 MMWEBID/2930 MicroMessenger/8.0.35.2360(0x2800235D) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android"
def get_time_info(format="beijing", timestamp_required=False, year_month_only=False):
if format == "beijing":
current_time = datetime.now(timezone(timedelta(hours=8)))
elif format == "utc":
current_time = datetime.now(timezone.utc)
else:
raise ValueError("不支持的时间格式")
if year_month_only:
return current_time.strftime('%Y-%m')
formatted_time = current_time.strftime('%Y%m%dT%H%M%SZ')
date_only = current_time.date()
timestamp = int(current_time.timestamp())
if timestamp_required:
return formatted_time, timestamp
else:
return formatted_time, date_only
# 获取环境变量
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
print(f'环境变量{var_name}未设置,请检查。')
return None
accounts = value.strip().split('\n')
num_accounts = len(accounts)
print(f'-----------本次账号运行数量:{num_accounts}-----------')
print(f'-----------项目 smart汽车 -----------')
return accounts
# 封装请求头
def create_headers(id_token):
headers = {
"X-Channel-Id": "smartapp",
"Accept": "application/json, text/plain, */*",
"X-Auth-Token": id_token,
"User-Agent": user_agent,
"Content-Type": "application/json;charset=UTF-8",
"Origin": "https://app-obs-prod.smart.cn",
"X-Requested-With": "com.tencent.mm",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://app-obs-prod.smart.cn/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
return headers
def hqtoken(sadi, user_agent, json_body): # 获取userId和token
try:
# 获取x-sdk-date
x_sdk_date, _ = get_time_info(format="beijing")
url = "https://cms-api.smart.cn/api/smart/web/1.0/oauth/miniapp/quicklogin"
headers = {
"charset": "utf-8",
"sadi": sadi,
"User-Agent": user_agent,
"Accept-Encoding": "gzip,compress,br,deflate",
"token-time": "[object Undefined]",
"api-key": "90c5f74cb2214ea6927d02addf8333d9",
"x-channel-id": "wechat-applet",
"content-type": "application/json",
"x-sdk-date": x_sdk_date,
"Referer": "https://servicewechat.com/wx7268531cd0569eb5/78/page-frame.html"
}
# 将字符串格式的请求体转换为JSON对象
payload = json.loads(json_body)
# 发送请求
response = requests.post(url, headers=headers, json=payload)
response_data = response.json()
# 提取 userId 和 id_token
userId = response_data['result'].get('userId')
id_token = response_data['result'].get('id_token')
# 打印userId和id_token以供调试
#print(f"userId: {userId}, id_token: {id_token}")
return userId, id_token
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None, None
def sign_in(sadi, saui, user_agent, userId, id_token): # 签到操作
try:
# 使用合并后的函数获取北京时间的格式化字符串和时间戳
formatted_time, timestamp = get_time_info(format="beijing", timestamp_required=True)
url = "https://app-api.smart.cn/smartapp-me/signs/v2"
# 设置请求头
headers = {
"charset": "utf-8",
"sadi": sadi,
"User-Agent": user_agent,
"x-user-id": userId,
"Accept-Encoding": "gzip,compress,br,deflate",
"id-token": id_token,
"token-time": str(timestamp), # 使用时间戳
"x-auth-token": id_token,
"saui": saui,
"x-channel-id": "wechat-applet",
"content-type": "application/json",
"x-sdk-date": formatted_time, # 使用格式化的北京时间
"Referer": "https://servicewechat.com/wx7268531cd0569eb5/78/page-frame.html"
}
#print("签到操作时间戳, token-time:", str(timestamp), "x-sdk-date:", formatted_time)
response = requests.post(url, headers=headers, json={}) # 发送请求
if response.status_code == 200:
response_json = response.json()
if response_json.get('code') == 'err.userprofile.duplicate.sign':
print("签到结果", response_json.get('message'))
elif response_json.get('code') == 'success':
#print("签到成功:", response_json)
# 提取特定信息并打印
sign_count = response_json['data']['signCount']
prize_content = response_json['data']['signPrize']['prizeContent']
total_integral = response_json['data']['totalIntegral']
print(f"签到结果: 签到天数 {sign_count},今日积分 {prize_content},积分总数 {total_integral}")
else:
print("签到状态未知", response_json)
return response_json
else:
print("签到失败, 状态码:", response.status_code)
return None
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
def qdts(id_token): # 获取用户的宝箱签到天数 信息。
url = "https://app-api.smart.cn/smartapp-me/signs/next-prize"
headers = create_headers(id_token)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
response_data = response.json()
# print("获取用户的签到天数信息-响应:", response_data)
sign_count_by_current_prize = response_data['data']['signCountByCurrentPrize']
left_count = response_data['data']['leftCount']
print(f"开宝箱30/{sign_count_by_current_prize}天数 还需要签到:{left_count}")
if sign_count_by_current_prize == 0 and left_count == 30:
ljpid(id_token)
return sign_count_by_current_prize, left_count
else:
print(f"获取签到天数失败,状态码: {response.status_code}")
return None, None
except requests.RequestException as e:
print(f"请求异常: {e}")
return None, None
def ljpid(id_token): # 获得30天奖品id
url = "https://app-api.smart.cn/smartapp-me/signs/v2/prize-records"
headers = create_headers(id_token)
request_body = {
"pageSize": 999,
"status": 1
}
try:
response = requests.post(url, headers=headers, json=request_body)
# 处理响应
if response.status_code == 200:
response_data = response.json()
# print("获得30天奖品id-响应:", response_data)
if response_data['code'] == 'success':
# 检查是否有有效的数据
if response_data['data']:
# 处理 data 中的奖品信息
for prize in response_data['data']:
# 根据需要处理每个奖品的信息
print("奖品Code:", prize['prizeCode']) # 正确提取prizeCode
kbx(prize['prizeCode'], id_token) # 传递prize_code和id_token
return response_data['data']
else:
print("没有返回有效的奖品数据。")
return None
else:
print("请求未成功,消息:", response_data['message'])
return None
else:
print(f"获取奖品ID失败状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
def kbx(prize_code, id_token): # 开30天宝箱
url = "https://app-api.smart.cn/smartapp-me/signs/v2/prize-receive"
headers = create_headers(id_token)
request_body = {"prizeCode": prize_code}
try:
response = requests.post(url, headers=headers, json=request_body)
if response.status_code == 200:
response_json = response.json()
print("开30天宝箱:", response_json) # 打印完整的响应JSON
# 检查code是否为success
if response_json.get('code') == 'success':
# 提取并打印prizeContent和prizeType
prize_content = response_json.get('data', {}).get('prizeContent')
prize_type = response_json.get('data', {}).get('prizeType')
print("奖励内容:", prize_content, "奖励类型:", prize_type)
return True, response_json # 返回操作成功的标识和响应数据
else:
print("操作失败code不是success")
return False, None # 返回操作失败的标识和空数据
else:
print(f"请求失败,状态码: {response.status_code}")
return False, None # 返回操作失败的标识和空数据
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return False, None # 返回操作失败的标识和空数据
#本地测试用 sadi#saui#请求体
os.environ['cscs'] = '''
'''
class Tee:
def __init__(self, *files):
self.files = files
def write(self, obj):
for file in self.files:
file.write(obj)
file.flush() # 确保及时输出
def flush(self):
for file in self.files:
file.flush()
# 保存原始stdout
original_stdout = sys.stdout
# 主函数 sadi#saui#请求体
def main():
# 创建一个StringIO对象来捕获输出
string_io = StringIO()
sys.stdout = Tee(sys.stdout, string_io)
var_name = 'smartCK'
tokens = get_env_variable(var_name)
if not tokens:
print(f'环境变量{var_name}未设置,请检查。')
return
try:
# 这里是主要的逻辑代码
for index, token in enumerate(tokens, start=1):
parts = token.split('#')
if len(parts) < 3:
print("令牌格式不正确。跳过处理。")
continue
sadi = parts[0]
saui = parts[1]
json_body = parts[2]
remark = parts[3] if len(parts) > 3 else ""
# 使用hqtoken函数获取userId和id_token
userId, id_token = hqtoken(sadi, user_agent, json_body)#获取userId和id_token
if userId and id_token:
print(f"--------账号{index}/{len(tokens)}--{remark}----")
sign_in(sadi, saui, user_agent, userId, id_token) #签到
qdts(id_token) #获取用户的签到天数信息
#ljpid(id_token) # 获得30天奖品id 测试完要删除
else:
print("未能获取userId和id_token")
# 随机停止一段时间10-30秒
account_sleep_duration = random.randint(3, 6)
#print(f"账号 {index} 休息时间:{account_sleep_duration} 秒")
time.sleep(account_sleep_duration)
finally:
# 恢复原始stdout
sys.stdout = original_stdout
output_content = string_io.getvalue() # 获取捕获的输出内容
if enable_notification:
try:
from notify import send
send("smart汽车通知-jie", output_content) # 发送通知
except ModuleNotFoundError:
print("通知模块未找到。")
if __name__ == "__main__":
main()

10
wbh.py Normal file

File diff suppressed because one or more lines are too long

212
yhsh(1).py Normal file
View File

@@ -0,0 +1,212 @@
#''
#new Env('永辉生活');
#抓任意域名的deviceid和access_token(有bug及时反馈)多帐号换行变量yhsh
#果园任务七点之后才刷新
#2.11 4:35(修复浇水任务完整执行跳过)
#2.14 16:00(增加会员成长值任务和推送)
#'''
import requests
import time
import os
# 控制是否启用变量
enable_notification = 1 # 0不发送通知 1发送通知
# 只有在需要发送通知时才尝试导入notify模块
if enable_notification == 1:
try:
from notify import send
except ModuleNotFoundError:
print("警告未找到notify.py模块。它不是一个依赖项请勿错误安装。程序将退出。")
sys.exit(1)
def send_notification(title, content):
if enable_notification == 1:
send(title, content)
def member(device_id, access_token):
timestamp = str(int(time.time() * 1000))
url = f"https://api.yonghuivip.com/web/coupon/signreward/sign?timestamp={timestamp}&channel=ios&platform=ios&v=10.1.0.6&app_version=10.1.0.6&sellerid=&channelSub=&jysessionid=8eba2fe1-ea26-4a83-98ab-72992f390e44&brand=iPhone&model=iPhone%206s%20(A1633%2FA1688%2FA1691%2FA1700)&os=ios&osVersion=15.7.9&networkType=WIFI&screen=375*667&productLine=YhStore&appType=h5&cityid=11&deviceid={device_id}&shopid=9637&memberid=242976506184457885&access_token={access_token}"
headers = {
"Host": "api.yonghuivip.com",
"Connection": "keep-alive",
"Content-Length": "64",
"X-YH-Biz-Params": "ncjkdy=,'+(&nzggzmdy=(&xdotdy=--&gib=--,0(-$,&gvo=+\$0_+)*,+&vkkdy=yKWHqna(DlqXsuHhk",
"Accept": "application/json",
"X-YH-Context": "origin=h5&morse=1",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_9 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 YhStore/10.1.0(client/phone; iOS 15.7.9; iPhone8,1)",
"Content-Type": "application/json",
"Origin": "https://m.yonghuivip.com",
"X-Requested-With": "cn.yonghui.hyd",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://m.yonghuivip.com/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = {
"memberId": "962892903519470906",
"shopId": "9637",
"missionid": 39
}
response = requests.post(url, json=data, headers=headers)
response_data = response.json()
code = response_data["code"]
if code == 0:
credit = response_data["data"]["signrewardvo"]["credit"]
return f"首页签到任务:恭喜获得{credit}积分"
else:
message = response_data["message"]
return f"首页签到任务:{message}"
def membertask(device_id, access_token):
timestamp = str(int(time.time() * 1000))
url = f"https://api.yonghuivip.com/web/member/task/doTask?timestamp={timestamp}&channel=ios&platform=ios&v=10.1.0.6&app_version=10.1.0.6&sellerid=7&channelSub=&jysessionid=8eba2fe1-ea26-4a83-98ab-72992f390e44&brand=iPhone&model=iPhone%206s%20(A1633%2FA1688%2FA1691%2FA1700)&os=ios&osVersion=15.7.9&networkType=WIFI&screen=375*667&productLine=YhStore&appType=h5&cityid=14&deviceid={device_id}&shopid=95DN&memberid=242976506184457885&access_token={access_token}"
headers = {
"Host": "api.yonghuivip.com",
"Connection": "keep-alive",
"Content-Length": "53",
"X-YH-Biz-Params": "ncjkdy=,'+(&nzggzmdy=(&xdotdy=--&gib=--,0(-$,&gvo=+\$0_+)*,+&vkkdy=yKWHqna(DlqXsuHhk",
"Accept": "application/json",
"X-YH-Context": "origin=h5&morse=1",
"X-YH-Biz-Params": "ncjkdy=,*HR&nzggzmdy=(&xdotdy=-!&gib=--)0-*$_'-+!)+*$-!&gvo=_!0!)$(!*($_+*$++",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_9 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 YhStore/10.1.0(client/phone; iOS 15.7.9; iPhone8,1)",
"Content-Type": "application/json",
"Origin": "yhwebcachehttps://m.yonghuivip.com",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = {
"taskId": "813",
"shopId": "95DN",
"taskCode": "2yue-HYRW"
}
response = requests.post(url, json=data, headers=headers)
response_data = response.json()
code = response_data["code"]
if code == 0:
data = response_data["data"]
if isinstance(data, int):
credit = data
else:
credit = data["data"]
return f"成长值任务:恭喜获得{credit}成长值"
else:
message = response_data["message"]
return f"成长值任务:{message}"
def flow(device_id, access_token):
timestamp = str(int(time.time() * 1000))
url = f"https://activity.yonghuivip.com/api/web/flow/farm/doTask?timestamp={timestamp}&channel=ios&platform=ios&v=10.1.0.6&sellerid=&deviceid={device_id}&shopid=9637&memberid=242976506184457885&app_version=10.1.0.6&channelSub=&brand=iPhone&model=iPhone%206s%20(A1633%2FA1688%2FA1691%2FA1700)&os=ios&osVersion=15.7.9&networkType=WIFI&screen=375*667&productLine=YhStore&appType=h5&access_token={access_token}"
headers = {
"X-YH-Biz-Params": "xdotdy=--&gib=--,0(-$,&gvo=+\$0_+)*,+",
"X-YH-Context": "origin=h5&morse=1",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_9 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 YhStore/10.1.0(client/phone; iOS 15.7.9; iPhone8,1)",
"Content-Type": "application/json",
"Origin": "https://m.yonghuivip.com",
"X-Requested-With": "cn.yonghui.hyd",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://m.yonghuivip.com/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
payload = {
"taskType": "sign",
"activityCode": "HXNC-QG",
"shopId": "",
"channel": ""
}
response = requests.post(url, json=payload, headers=headers)
response_data = response.json()
sign = response_data["data"]["signText"]
return f"果园签到结果:{sign}"
def watering(device_id, access_token):
results = [] # 存储执行结果的列表
while True:
timestamp = str(int(time.time() * 1000))
url = f"https://activity.yonghuivip.com/api/web/flow/farm/watering?timestamp={timestamp}&channel=ios&platform=ios&v=10.1.0.6&sellerid=&deviceid={device_id}&shopid=9637&memberid=242976506184457885&app_version=10.1.0.6&channelSub=&brand=iPhone&model=iPhone%206s%20(A1633%2FA1688%2FA1691%2FA1700)&os=ios&osVersion=15.7.9&networkType=WIFI&screen=375*667&productLine=YhStore&appType=h5&access_token={access_token}"
headers = {
"Host": "activity.yonghuivip.com",
"Connection": "keep-alive",
"Content-Length": "87",
"X-YH-Biz-Params": "xdotdy=--&gib=--,0(-$,&gvo=+\\$0_+)*,+",
"Accept": "application/json",
"X-YH-Context": "origin=h5&morse=1",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_9 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 YhStore/10.1.0(client/phone; iOS 15.7.9; iPhone8,1)",
"Content-Type": "application/json",
"Origin": "https://m.yonghuivip.com",
"X-Requested-With": "cn.yonghui.hyd",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://m.yonghuivip.com/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = {
"activityCode": "HXNC-QG",
"shopId": "",
"channel": "",
"inviteTicket": "",
"inviteShopId": ""
}
response = requests.post(url, headers=headers, json=data)
response_data = response.json()
if response_data and "code" in response_data:
code = response_data["code"]
message = response_data.get("message", "")
if code == 0:
ladder_text = response_data["data"].get("ladderText", "")
result = f"果园浇水结果: {ladder_text}"
results.append(result) # 将结果添加到列表中
print(result)
else:
result = f"果园浇水失败: {message}"
results.append(result) # 将结果添加到列表中
print(result)
break
else:
result = "果园浇水失败: 无法解析响应数据"
results.append(result) # 将结果添加到列表中
print(result)
break
# 返回结果列表
return results
def main():
tokens_str = os.environ.get('yhsh')
if not tokens_str:
print("请设置环境变量yhsh")
return
notifications = [] # 存储每个设备的执行结果
token_pairs = tokens_str.split('\n')
for idx, pair in enumerate(token_pairs, start=1):
device_id, access_token = pair.split('&')
member_result = member(device_id, access_token)
membertask_result = membertask(device_id, access_token)
flow_result = flow(device_id, access_token)
#wateringtask_result=wateringtask(device_id, access_token)
watering_results = watering(device_id, access_token)
# 整合每个设备的执行结果
device_notification = f"帐号{idx}\n{member_result}\n{membertask_result}\n{flow_result}\n"
device_notification += "\n".join(watering_results)
notifications.append(device_notification)
# 将所有设备的执行结果发送通知
summary_notification = "\n\n".join(notifications)
# 去掉括号并每次循环执行结果换行
summary_notification = summary_notification.replace("[", "").replace("]", "").replace(", ", "\n")
# 保留单引号
summary_notification = summary_notification.replace("'", "\\'")
summary_notification += "\n🎉🎉🎉🎉🎉🎉🎉\n\n" # 添加一个换行符
send("永辉生活任务执行汇总", summary_notification) # 调用notify.py中的send函数发送通知
if __name__ == "__main__":
print(">>>>>开始执行所有任务<<<<<")
main()
print(">>>>>所有任务执行结束<<<<<")

180
yhsh.py Normal file
View File

@@ -0,0 +1,180 @@
''
new Env('永辉生活');
抓任意域名的deviceid和access_token(有bug及时反馈)
果园任务七点之后才刷新
2.11 4:35(修复浇水任务完整执行跳过)
2.14 16:00(增加会员成长值任务和推送)
'''
import requests
import time
import os
# 控制是否启用变量
enable_notification = 1 #0不发送通知 1发送通知
# 只有在需要发送通知时才尝试导入notify模块
if enable_notification == 1:
try:
from notify import send
except ModuleNotFoundError:
print("警告未找到notify.py模块。它不是一个依赖项请勿错误安装。程序将退出。")
sys.exit(1)
def member(device_id, access_token):
timestamp = str(int(time.time() * 1000))
url = f"https://api.yonghuivip.com/web/coupon/signreward/sign?timestamp={timestamp}&channel=ios&platform=ios&v=10.1.0.6&app_version=10.1.0.6&sellerid=&channelSub=&jysessionid=8eba2fe1-ea26-4a83-98ab-72992f390e44&brand=realme&model=RMX3562&os=android&osVersion=android31&networkType=5G&screen=2248*1080&productLine=YhStore&appType=h5&cityid=11&deviceid={device_id}&shopid=9637&memberid=242976506184457885&access_token={access_token}"
headers = {
"Host": "api.yonghuivip.com",
"Connection": "keep-alive",
"Content-Length": "64",
"X-YH-Biz-Params": "ncjkdy=,'+(&nzggzmdy=(&xdotdy=--&gib=--,0(-$,&gvo=+$0_+)*,+&vkkdy=yKWHqna(DlqXsuHhk",
"Accept": "application/json",
"X-YH-Context": "origin=h5&morse=1",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_9 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 YhStore/10.1.0(client/phone; iOS 15.7.9; iPhone8,1)",
"Content-Type": "application/json",
"Origin": "https://m.yonghuivip.com",
"X-Requested-With": "cn.yonghui.hyd",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://m.yonghuivip.com/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = {
"memberId": "962892903519470906",
"shopId": "9637",
"missionid": 39
}
response = requests.post(url, json=data, headers=headers)
response_data = response.json()
code = response_data["code"]
if code == 0:
credit = response_data["data"]["signrewardvo"]["credit"]
return f"首页签到任务:恭喜获得{credit}积分"
else:
message = response_data["message"]
return f"首页签到任务:签到失败原因:{message}"
def membertask(device_id, access_token):
timestamp = str(int(time.time() * 1000))
url = f"https://api.yonghuivip.com/web/member/task/doTask?timestamp={timestamp}&channel=ios&platform=ios&v=10.1.0.6&app_version=10.1.0.6&sellerid=7&channelSub=&jysessionid=8eba2fe1-ea26-4a83-98ab-72992f390e44&brand=iPhone&model=iPhone%206s%20(A1633%2FA1688%2FA1691%2FA1700)&os=ios&osVersion=15.7.9&networkType=WIFI&screen=375*667&productLine=YhStore&appType=h5&cityid=14&deviceid={device_id}&shopid=95DN&memberid=242976506184457885&access_token={access_token}"
headers = {
"Host": "api.yonghuivip.com",
"Connection": "keep-alive",
"Content-Length": "53",
"X-YH-Biz-Params": "ncjkdy=,'+(&nzggzmdy=(&xdotdy=--&gib=--,0(-$,&gvo=+$0_+)*,+&vkkdy=yKWHqna(DlqXsuHhk",
"Accept": "application/json",
"X-YH-Context": "origin=h5&morse=1",
"X-YH-Biz-Params": "ncjkdy=,*HR&nzggzmdy=(&xdotdy=-!&gib=--)0-*$_'-+!)+*$-!&gvo=_!0!)$$(!*($_+*$++",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_9 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 YhStore/10.1.0(client/phone; iOS 15.7.9; iPhone8,1)",
"Content-Type": "application/json",
"Origin": "yhwebcachehttps://m.yonghuivip.com",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = {
"taskId": "813",
"shopId": "95DN",
"taskCode": "2yue-HYRW"
}
response = requests.post(url, json=data, headers=headers)
response_data = response.json()
code = response_data["code"]
if code == 0:
credit = response_data["data"]["data"]
return f"成长值任务:恭喜获得{credit}成长值"
else:
message = response_data["message"]
return f"成长值任务:签到失败原因:{message}"
def flow(device_id, access_token):
timestamp = str(int(time.time() * 1000))
url = f"https://activity.yonghuivip.com/api/web/flow/farm/doTask?timestamp={timestamp}&channel=ios&platform=ios&v=10.1.0.6&sellerid=&deviceid={device_id}&shopid=9637&memberid=242976506184457885&app_version=10.1.0.6&channelSub=&brand=realme&model=RMX3562&os=android&osVersion=android31&networkType=5G&screen=2248*1080&productLine=YhStore&appType=h5&access_token={access_token}"
headers = {
"X-YH-Biz-Params": "xdotdy=--&gib=--,0(-$,&gvo=+$0_+)*,+",
"X-YH-Context": "origin=h5&morse=1",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_9 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 YhStore/10.1.0(client/phone; iOS 15.7.9; iPhone8,1)",
"Content-Type": "application/json",
"Origin": "https://m.yonghuivip.com",
"X-Requested-With": "cn.yonghui.hyd",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://m.yonghuivip.com/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
payload = {
"taskType": "sign",
"activityCode": "HXNC-QG",
"shopId": "",
"channel": ""
}
response = requests.post(url, json=payload, headers=headers)
data = response.json()
sign = data["data"]["signText"]
return f"果园签到结果:{sign}"
def watering(device_id, access_token, code=0):
timestamp = str(int(time.time() * 1000))
url = f"https://activity.yonghuivip.com/api/web/flow/farm/watering?timestamp={timestamp}&channel=ios&platform=ios&v=10.1.0.6&sellerid=&deviceid={device_id}&shopid=9637&memberid=242976506184457885&app_version=10.1.0.6&channelSub=&brand=realme&model=RMX3562&os=android&osVersion=android31&networkType=5G&screen=2248*1080&productLine=YhStore&appType=h5&access_token={access_token}"
headers = {
"Host": "activity.yonghuivip.com",
"Connection": "keep-alive",
"Content-Length": "87",
"X-YH-Biz-Params": "xdotdy=--&gib=--,0(-$,&gvo=+$0_+)*,+",
"Accept": "application/json",
"X-YH-Context": "origin=h5&morse=1",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_9 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 YhStore/10.1.0(client/phone; iOS 15.7.9; iPhone8,1)",
"Content-Type": "application/json",
"Origin": "https://m.yonghuivip.com",
"X-Requested-With": "cn.yonghui.hyd",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://m.yonghuivip.com/",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = {
"activityCode": "HXNC-QG",
"shopId": "",
"channel": "",
"inviteTicket": "",
"inviteShopId": ""
}
response = requests.post(url, headers=headers, json=data)
response_data = response.json()
code = response_data["code"]
message = response_data["message"]
if code == 0:
ladder_text = response_data["data"]["ladderText"]
return f"果园浇水结果: {ladder_text}"
else:
return f"果园浇水失败原因: {message}"
def main():
tokens_str = os.environ.get('yhsh')
if not tokens_str:
print("请设置环境变量yhsh")
return
notifications = [] # 存储每个设备的执行结果
token_pairs = tokens_str.split('\n')
for idx, pair in enumerate(token_pairs, start=1):
device_id, access_token = pair.split('&')
member_result = member(device_id, access_token)
flow_result = flow(device_id, access_token)
membertask_result = membertask(device_id, access_token)
watering_result = watering(device_id, access_token)
# 整合每个设备的执行结果
device_notification = f"帐号{idx}\n{member_result}\n{flow_result}\n{membertask_result}\n{watering_result}\n"
notifications.append(device_notification)
# 将所有设备的执行结果发送通知
summary_notification = "\n\n".join(notifications)
send("永辉生活任务执行汇总", summary_notification) # 调用notify.py中的send函数发送通知
if __name__ == "__main__":
print(">>>>>开始执行所有任务<<<<<")
main()
print(">>>>>所有任务执行结束<<<<<")

59
yljf.py Normal file
View File

@@ -0,0 +1,59 @@
"""
cron: 0 7 * * * yljf.py
new Env("微信小程序-伊利积分")
env add yljf_token
"""
# !/usr/bin/env python3
# coding: utf-8
import os
import requests
import urllib3
import ApiRequest
import mytool
from notify import send
import json
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
title = '微信小程序-伊利积分'
tokenName = 'yljf_token'
msg = ''
class yljf(ApiRequest.ApiRequest):
def __init__(self, data):
super().__init__()
self.sec.headers = {
'Host': 'msmarket.msx.digitalyili.com',
'Connection': 'keep-alive',
'access-token': data,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/8447',
'Referer': 'https://servicewechat.com/wx1fb666a33da7ac88/13/page-frame.html',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
def login(self):
global msg
rj = self.sec.post('https://msmarket.msx.digitalyili.com/gateway/api/member/daily/sign', json={}).json()
if rj['status']:
msg = f"登录成功\n获得积分:{rj['data']['dailySign']['bonusPoint']}"
else:
msg = f"登录失败\n" + json.dumps(rj, ensure_ascii=False)
print(msg)
send(title, msg)
if __name__ == '__main__':
# DEBUG
# if os.path.exists('debug.py'):
# import debug
#
# debug.setDebugEnv()
#
# if mytool.getlistCk(f'{tokenName}') is None:
# print(f'请检查你的变量名称 {tokenName} 是否填写正确')
# exit(0)
# else:
# for i in mytool.getlistCk(f'{tokenName}'):
# yljf(i).login()
ApiRequest.ApiMain(['login']).run(tokenName, yljf)

491
屈臣氏1.1.py Normal file
View File

@@ -0,0 +1,491 @@
"""
项目名称 屈臣氏回馈金签到
入口:#小程序://屈臣氏/p4PXyRIEkAJccuw
要去掉authorization的Bearer 例如 Authorization: Bearer abcd。。13。B 只需要abcd。。13。B
变量authorization#unionid#openid#备注(没有备注也可以)
变量名: qsccs
例如: export qsccs="abcd。。13。B#unionid#openid"
多账号 换行/回车
脚本作者: QGh3amllamll
------更新记录----
1.1 测试版 连签七天没有先判断 下次更新判断签到7天
"""
import os
import requests
from datetime import datetime, timezone, timedelta
import json
import time
import io
import sys
import requests
import base64
import random
enable_notification = 1 #0不发送通知 1发送通知
# 只有在需要发送通知时才尝试导入notify模块
if enable_notification == 1:
try:
from notify import send
except ModuleNotFoundError:
print("警告未找到notify.py模块。它不是一个依赖项请勿错误安装。程序将退出。")
sys.exit(1)
jbxmmz = "屈臣氏回馈金"
jbxmbb = "1.1"
#---------简化的框架 0.5 带通知--------
# 获取北京日期的函数
def get_beijing_date():
beijing_time = datetime.now(timezone(timedelta(hours=8)))
return beijing_time.date()
def dq_time():
# 获取当前时间戳
dqsj = int(time.time())
# 将时间戳转换为可读的时间格式
dysj = datetime.fromtimestamp(dqsj).strftime('%Y-%m-%d %H:%M:%S')
#print("当前时间戳:", dqsj)
#print("转换后的时间:", dysj)
return dqsj, dysj
def log(message):
print(message)
def print_disclaimer():
log("📢 请认真阅读以下声明")
log(" 【免责声明】 ")
log("✨ 脚本及其中涉及的任何解密分析程序,仅用于测试和学习研究")
log("✨ 禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断")
log("✨ 禁止任何公众号、自媒体进行任何形式的转载、发布")
log("✨ 本人对任何脚本问题概不负责,包括但不限于由任何脚本错误导致的任何损失或损害")
log("✨ 脚本文件请在下载试用后24小时内自行删除")
log("✨ 脚本文件如有不慎被破解或修改由破解或修改者承担")
log("✨ 如不接受此条款请立即删除脚本文件")
log("" * 10)
log("如果喜欢请打赏支持维护和开发 更要钱动力 来 更新/维护脚本")
log("" * 10)
log(f'这个是怎么东西???')
log(f'U2FsdGVkX1/F371b27nTzUeMknDFjABXyQBHINWvVPRkUVoUe6ZdZ508DVGF7dMc')
log("" * 10)
log("" * 10)
log(f'-----------{jbxmmz} {jbxmbb}-----------')
# 获取环境变量
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
print(f'环境变量{var_name}未设置,请检查。')
return None
accounts = value.strip().split('\n')
num_accounts = len(accounts)
print(f'-----------本次账号运行数量:{num_accounts}-----------')
print_disclaimer()
return accounts
#-------------------------------封装请求-------------
base_url = "https://mystore-01api.watsonsvip.com.cn"
def create_headers(an, op_id, un_id):
headers = {
"Authorization": f"Bearer {an}",
"Content-Type": "application/json",
"authorizer-appid": "wx1ffbd6927043dff7",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309092b) XWEB/8555",
"openId": op_id,
"unionId": un_id,
}
return headers
#-------------------------------封装请求---完成----------
def qcsqd(an, op_id, un_id): # 签到
urlqd = f"{base_url}/wx/signIn/iter/sign"
headers = create_headers(an, op_id, un_id)
body = json.dumps({"unionId": op_id, "isSorttion": False})
try:
response = requests.post(urlqd, headers=headers, data=body)
response.raise_for_status() # 确保请求成功
response_data = response.json()
if response_data.get("code") == 11000: # 已经签到,也算签到成功
print(f"签到提示:{response_data.get('errorMsg')}")
return True # 签到成功
elif response_data.get("code") == 0:
# 将奖励金额除以100来转换为元
reward_amount = response_data['result'].get('rewardAmount', 0) / 100
beauty_amount = response_data['result'].get('beautyAmount', 0) / 100
print(f"签到成功:连续签到天数: {response_data['result'].get('continueDays')}, "
f"序列号: {response_data['result'].get('dailySignInCouponSerialNum')}, "
f"类型编号: {response_data['result'].get('typeNum')}, "
f"奖励金额: {reward_amount}, " # 使用转换后的金额
f"美容金额: {beauty_amount}") # 使用转换后的金额
return True # 签到成功
elif response_data.get("code") == 1403:
print("权限不足,数据过期/不正常退出账号。")
return False # 签到失败
else:
print("签到响应:", response_data)
return False # 签到失败
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
return False # 签到失败
except Exception as err:
print(f"请求异常:{err}")
return False # 签到失败
# 任务判断 始 -------
def rwmwcid(an, op_id, un_id): # 没有完成任务的ID并返回任务ID列表
url = f"{base_url}/cloudapi/v2/users/bubbles/filterNot/taskType/4"
headers = create_headers(an, op_id, un_id)
task_ids = [] # 初始化任务ID列表
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
response_data = response.json()
if response_data.get("code") == 0:
tasks = response_data.get("result", [])
if not tasks:
# print("当前没有可用的任务或奖励。1⃣跑个毛线脚本")
#pass
print()
else:
for task in tasks:
task_id = task.get("taskId")
task_ids.append(task_id)
else:
print("响应内容:", response_data)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
return task_ids # 返回任务ID列表
def pb_rwid(an, op_id, un_id, task_ids):
url = f"{base_url}/cloudapi/v2/users/tasks"
headers = create_headers(an, op_id, un_id)
print(f"任务ID列表{task_ids}")
task_ids_str = [str(id) for id in task_ids]
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
response_data = response.json()
if response_data.get("code") == 0:
tasks = response_data['result'].get('list', [])
categorized_tasks = {}
for task in tasks:
task_type = task.get('type')
if task_type not in categorized_tasks:
categorized_tasks[task_type] = []
categorized_tasks[task_type].append(task)
for task_type, tasks_in_category in categorized_tasks.items():
for task in tasks_in_category:
if str(task['id']) in task_ids_str:
print(f"准备执行任务:{task['name']} (ID: {task['id']})")
if task_type == 'Browse':
browse(an, op_id, un_id, task['id'])
elif task_type in ['Jump', 'Subscribe']:
jumprw(an, op_id, un_id, task['id'])
#print(f"任务完成:{task['name']} (ID: {task['id']})")
else:
print("响应内容:", response_data)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
# 任务判断 完 -------
# 提交任务 始 -------
def jumprw(an, op_id, un_id, task_id): # 跳转/订阅类任务
url = f"{base_url}/cloudapi/v2/users/tasks/complete"
headers = create_headers(an, op_id, un_id)
data = {"taskId": task_id}
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
response_data = response.json()
if response_data.get("code") == 0:
print(f"任务 {task_id} 完成成功。")
elif response_data.get("code") == 11000:
print(f"任务 {task_id} 完成失败,原因:{response_data.get('errorMsg')}")
else:
print("响应内容:", response_data)
sleep_time = random.randint(1, 3)
#print(f"暂停 {sleep_time} 秒...")
time.sleep(sleep_time)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
def browse(an, op_id, un_id, task_id): # 执行浏览任务并提交数据
browse_url = f"{base_url}/cloudapi/v2/users/tasks/browserTask/token/{task_id}"
complete_url = f"{base_url}/cloudapi/v2/users/tasks/complete"
headers = create_headers(an, op_id, un_id)
#print(browse_url)
try:
# 获取浏览任务的令牌
browse_response = requests.get(browse_url, headers=headers)
browse_response.raise_for_status()
browse_result = browse_response.json()
token = browse_result.get('result', {}).get('token')
if not token:
print("未获取到有效的token")
return None
# 暂停10-13秒模拟浏览
time.sleep(random.randint(11, 13))
# 提交浏览任务数据
payload = json.dumps({
"taskId": str(task_id),
"completeBrowserTaskToken": token
})
complete_response = requests.post(complete_url, headers=headers, data=payload)
complete_response.raise_for_status()
# 解析响应数据
complete_data = complete_response.json()
if complete_data.get('code') == 0:
# 处理成功的请求
amount = complete_data['result'][0]['amount']
print(f"任务完成,奖励金额为:{amount}")
elif complete_data.get('code') == 11000:
# 处理重复的请求
print(f"错误信息:{complete_data['errorMsg']}")
else:
# 处理其他错误
print(f"未知错误,错误代码:{complete_data.get('code')}")
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
# 提交任务 完 -------
# 奖励 始 -------
def hq_jlid(an, op_id, un_id): # 获取没有领取奖励ID
url = f"{base_url}/cloudapi/v2/users/bubbles/filterNot/taskType/4"
headers = create_headers(an, op_id, un_id)
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
response_data = response.json()
if response_data.get("code") == 0:
tasks = response_data.get("result", [])
if not tasks:
print("当前没有可用的任务或奖励。1⃣跑个毛线脚本")
else:
for task in tasks:
task_id = task.get("taskId")
task_name = task.get("taskName")
prize = task.get("prize")
prize_id = task.get("prizeId")
#print(f"任务ID: {task_id}, 任务名称: {task_name}, 奖励: {prize}")
print(f"任务ID: {task_id}, 奖励: {prize}")
tjjl(an, op_id, un_id, prize_id)
else:
print("响应内容:", response_data)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
def tjjl(an, op_id, un_id, prize_id):
url = f"{base_url}/cloudapi/v2/users/receive"
headers = create_headers(an, op_id, un_id)
data = {"prizeId": prize_id}
try:
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
response_data = response.json()
if response_data.get("code") == 0:
amount = response_data.get("result", {}).get("amount", "0")
prize_biz_param = json.loads(response_data.get("result", {}).get("prizeBizParam", "{}"))
sub_play_name = prize_biz_param.get("subPlayName", "Unknown")
amount_in_yuan = float(amount) / 100
print(f"奖励领取成功,金额: {amount_in_yuan}")
sleep_time = random.randint(10, 13)
print(f"暂停 {sleep_time} 秒...")
time.sleep(sleep_time)
else:
print("响应内容:", response_data)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
# 奖励 完 -------
def xc_ye(an, op_id, un_id):#查询余额
url = f"{base_url}/wx/signIn/index?unionId={un_id}"
headers = create_headers(an, op_id, un_id)
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
response_data = response.json()
if response_data.get("code") == 0:
amount = response_data['result']['amount'] / 100
print(f"回馈金余额:{amount}")
for welfare in response_data['result'].get('oldUserSignInWelfareList', []):
if 'couponName' in welfare:
coupon_name = welfare['couponName']
print(f"优惠券:{coupon_name}")
else:
print("请求未成功,完整响应内容:", response_data)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
def sign_in(an, op_id, un_id):
url = f"{base_url}/signIn/turntable/lotteryDraw"
headers = create_headers(an, op_id, un_id)
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # 确保请求成功
response_data = response.json()
if response_data.get("code") == 11000: # 特定条件
print(f"提示信息:{response_data.get('errorMsg')}")
else:
print("连签七天响应:", response_data)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误{http_err}")
except Exception as err:
print(f"请求异常:{err}")
#本地测试用
os.environ['cscs'] = '''
'''
class Tee:
def __init__(self, *files):
self.files = files
def write(self, obj):
for file in self.files:
file.write(obj)
file.flush()
def flush(self):
for file in self.files:
file.flush()
def main():
var_name = 'qsccs'
tokens = get_env_variable(var_name)
if not tokens:
print(f'环境变量{var_name}未设置,请检查。')
return
captured_output = io.StringIO()
original_stdout = sys.stdout
sys.stdout = Tee(sys.stdout, captured_output)
total_accounts = len(tokens)
for i, token in enumerate(tokens):
parts = token.split('#')
if len(parts) < 3:
print("令牌格式不正确。跳过处理。")
continue
an = parts[0]
un_id = parts[1]
op_id = parts[2]
account_no = parts[3] if len(parts) > 3 else "" # 备注信息
print(f'------账号 {i+1}/{total_accounts} {account_no} -------')
sign_in_success = qcsqd(an, op_id, un_id) # 签到并获取签到是否成功的状态
if not sign_in_success:
print("由于签到失败,跳过此账号的后续操作。")
continue # 跳过当前循环的剩余部分,直接处理下一个账号
#qcsqd(an, op_id, un_id)#签到
sign_in(an, op_id, un_id)#连签七天
task_ids = rwmwcid(an, op_id, un_id) # 没有完成 任务ID列表
pb_rwid(an, op_id, un_id, task_ids) # 匹配 任务ID 执行任务
hq_jlid(an, op_id, un_id)#领取奖励/判断任务是不是完成
xc_ye(an, op_id, un_id)#查询余额
sys.stdout = original_stdout
output_content = captured_output.getvalue()
captured_output.close()
if enable_notification == 1:
try:
send("通知", output_content) # 尝试发送通知
print("通知已发送。输出内容为:")
#print(output_content)
except NameError:
print("通知发送失败send函数未定义。")
if __name__ == "__main__":
main()

12
看余杭抽奖.js Normal file

File diff suppressed because one or more lines are too long

970
辛喜.py Normal file
View File

@@ -0,0 +1,970 @@
"""
项目名称 心喜 小程序
变量: sso#备注
变量名: XSSONF
多账号 换行/回车
脚本作者: QGh3amllamll
版本 2.01
------更新记录----
1.22版本 更新自动获取抽奖 id 2023年10月31日13点33分
1.23版本 更新抽奖问题
2.00版本 修复任务 2024年1月28日02:34:25
2.01版本 修复获取 data 字段值是 None
"""
import os
import requests
import random
from datetime import datetime, timezone, timedelta
import time
import sys
import io
# 控制是否启用通知的变量
enable_notification = 1
# 只有在需要发送通知时才尝试导入notify模块
if enable_notification == 1:
try:
from notify import send
except ModuleNotFoundError:
print("警告未找到notify.py模块。它不是一个依赖项请勿错误安装。程序将退出。")
sys.exit(1)
#---------简化的框架--------
# 配置参数
BASE_URL = "https://api.xinc818.com/mini/"
# 获取北京日期的函数
def get_beijing_date():
beijing_time = datetime.now(timezone(timedelta(hours=8)))
return beijing_time.date()
def dq_time():
# 获取当前时间戳
dqsj = int(time.time())
# 将时间戳转换为可读的时间格式
dysj = datetime.fromtimestamp(dqsj).strftime('%Y-%m-%d %H:%M:%S')
return dqsj, dysj
# 获取环境变量
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
print(f'环境变量{var_name}未设置,请检查。')
return None
accounts = value.strip().split('\n')
num_accounts = len(accounts)
print(f'-----------本次账号运行数量:{num_accounts}-----------')
print(f'----------项目:心喜小程序-----------')
return accounts
# 封装请求头
def create_headers(sso):
headers = {
'Host': 'api.xinc818.com',
'Connection': 'keep-alive',
'sso': sso,
'xweb_xhr': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090819) XWEB/8531',
'Content-Type': 'application/json',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
return headers
def rwlb(sso): # 任务列表
urlrw = BASE_URL + 'dailyTask/daily' # 确保字符串连接正确
headers = create_headers(sso)
try:
response = requests.get(urlrw, headers=headers)
response_data = response.json()
if response_data["code"] != 0 or response_data["data"] is None:
print("错误响应或数据为空,跳过当前账号")
return None
for task in response_data["data"]:
task_id = task.get("id")
task_name = task.get("name")
task_status = task.get("status")
# 跳过不需要处理的任务
if task_name in ["完善个人资料", "购买商城商品", "参加活动", "申请试用", "提交反馈建议"]:
print(f"🚫🚫'{task_name}' ⛔⛔⛔跳过❌❌。")
continue
if task_status: # 如果任务状态为 True (已完成)
print(f"任务 '{task_name}' 已完成。")
#elif task_name == "参与讨论": # 测试bug
# selected_post = fetch_posts_data(sso)
# if selected_post:
# post_id = selected_post[0] # 假设列表的第一个元素是 post_id
# cytl(sso, post_id)
continue
print(f"任务 '{task_name}' 尚未完成。进行处理...")
if task_name == "分享心喜":
fx_xx(sso)
elif task_name == "签到打卡":
sign_dk(sso, task_id)
elif task_name == "想要商品":
xysp(sso)
elif task_name == "大转盘抽奖":
dzp_cj(sso, task_id)
elif task_name == "去商城浏览30秒":
ll_sp(sso, task_id)
elif task_name == "发帖":
hitokoto_content = fetch_hitokoto() # 获取一言内容
if hitokoto_content and not hitokoto_content.startswith("请求一言失败"):
rw_post(sso, hitokoto_content, task_id) # 使用一言内容作为发帖内容
else:
print("未能获取有效的一言内容,无法执行发帖操作。跳过此任务。")
elif task_name == "点赞用户":
selected_post = fetch_posts_data(sso)
if selected_post:
post_id = selected_post[0] # 从选中的帖子中获取 post_id
like_post(sso, post_id) # 执行点赞操作
else:
print("未能获取帖子数据,无法执行点赞操作。")
elif task_name == "关注用户":
selected_post = fetch_posts_data(sso)
if selected_post:
follow_user_id = selected_post[1] # 假设使用帖子的发布者ID作为关注对象
gz_user(sso, follow_user_id)
else:
print("未能获取帖子数据,无法执行关注操作。")
elif task_name == "参与讨论":
selected_post = fetch_posts_data(sso)
if selected_post:
post_id = selected_post[0] # 假设列表的第一个元素是 post_id
cytl(sso, post_id)
else:
print("未能获取帖子数据,无法参与讨论。")
elif task_name == "给主播留言":
selected_anchor = zb_list(sso)
if selected_anchor:
circle_id, related_id = selected_anchor
add_comment(sso, circle_id, related_id, task_id)
else:
print("未能获取主播数据,无法执行留言操作。")
# 在任务之间停止 1 到 3 秒
time.sleep(random.randint(1, 2))
return response_data
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
except ValueError:
print("响应内容不是有效的 JSON 格式")
return None
def fx_xx(sso): # 分享心喜
headers = create_headers(sso)
urlfx = BASE_URL + 'dailyTask/share'
try:
response = requests.get(urlfx, headers=headers)
if response.status_code == 200:
response_data = response.json() # 解析 JSON 响应
#print("分享心喜完整响应内容: ", response_data)
# 检查 data 是否存在
if response_data.get('data'):
task_name = response_data['data'].get('taskName', '未知任务')
single_reward = response_data['data'].get('singleReward', '未知奖励')
print(f"完成🎉 {task_name},🥂奖励: {single_reward}")
print()
else:
#print("分享心喜任务完成,但未获取到详细信息。")
print("🤡🤡分享心喜任务🤡🤡。")
time.sleep(3)
else:
print(f"请求失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
def sign_dk(sso, task_id): # 签到打卡
sign_in_url = BASE_URL + f'sign/in?dailyTaskId={task_id}' # 构造签到 URL
headers = create_headers(sso) # 使用 create_headers 函数创建 headers
try:
response = requests.get(sign_in_url, headers=headers) # 使用 GET 方法签到
if response.status_code == 200:
response_data = response.json()
#print(f"签到成功!响应内容: {response.text}")
# 在访问 taskResult 之前检查它是否存在
if response_data.get('data') and response_data['data'].get('taskResult'):
task_name = response_data['data']['taskResult'].get('taskName', '未知任务')
single_reward = response_data['data']['taskResult'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 奖励: {single_reward}")
print()
else:
print("签到成功,但未获取到任务结果。")
return True # 签到成功
else:
print(f"签到失败,状态码:{response.status_code}, 响应内容: {response.text}")
return False # 签到失败
except requests.exceptions.RequestException as e:
print(f"签到请求失败: {e}")
return False # 请求失败
def xysp(sso): # 想要商品
headers = {
'sso': sso,
'Host': 'cdn-api.xinc818.com' # 注意这里的 Host
}
random_page_num = random.randint(1, 15)
url_desire_goods = f'https://cdn-api.xinc818.com/mini/integralGoods?orderField=sort&orderScheme=DESC&pageSize=10&pageNum={random_page_num}'
try:
response = requests.get(url_desire_goods, headers=headers)
print(f"随机选择的页数为: {random_page_num}")
if response.status_code == 200:
response_json = response.json()
goods_list = response_json.get('data', {}).get('list', [])
if goods_list:
random_id = random.choice(goods_list)['id']
print("随机选取的商品ID:", random_id)
headers['Host'] = 'api.xinc818.com' # 更新headers为 api.xinc818.com
url_specific_good = f'https://api.xinc818.com/mini/integralGoods/{random_id}?type'
response_specific = requests.get(url_specific_good, headers=headers)
if response_specific.status_code == 200:
response_specific_json = response_specific.json()
outer_id = response_specific_json.get('data', {}).get('outerId', '')
print(f"提取的outerId: {outer_id}")
# 新的POST请求
url_submit = 'https://api.xinc818.com/mini/live/likeLiveItem'
data = {
"isLike": True,
"dailyTaskId": 20,
"productId": outer_id
}
print(data)
submit_response = requests.post(url_submit, headers=headers, json=data)
if submit_response.status_code == 200 and submit_response.headers.get('Content-Type') == 'application/json':
response_data = submit_response.json()
#print(submit_response.json())
if response_data is None:
print(f"响应的JSON数据为空原始响应内容: {submit_response.text}")
return # 退出当前任务
if response_data.get("data"):
task_name = response_data["data"].get("taskName", "未知任务")
single_reward = response_data["data"].get("singleReward", 0)
print(f" 🎉: {task_name},奖励: {single_reward}")
else:
print("想要商品 请求成功但未完成任务,响应码或数据内容不正确")
return # 退出当前任务
else:
print(f"POST请求失败状态码{submit_response.status_code}, 响应内容: {submit_response.text}")
return # 退出当前任务
else:
print("商品列表为空,跳过当前账号")
return # 退出当前任务
else:
print(f"获取商品列表失败,状态码:{response.status_code}, 响应内容: {response.text}")
return # 退出当前任务
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return # 退出当前任务
def dzp_cj(sso, task_id):#大转盘抽奖
try:
headers = create_headers(sso)
# 获取抽奖活动列表
activity_list_url = BASE_URL + 'lottery/list'
activity_response = requests.get(activity_list_url, headers=headers)
if activity_response.status_code != 200:
print('获取活动列表失败,状态码:', activity_response.status_code)
return
activity_data = activity_response.json()
# 查找“幸运大转盘抽奖”的id
activity_id = None
for activity in activity_data['data']:
if activity['activityName'] == "幸运大转盘抽奖":
activity_id = activity['id']
break
if activity_id is None:
print("没有找到‘幸运大转盘抽奖’活动")
return
print(f"找到‘幸运大转盘抽奖’活动, ID: {activity_id}")
# 检查抽奖次数
lottery_url = BASE_URL + f'lottery/{activity_id}/freeNum'
lottery_response = requests.get(lottery_url, headers=headers)
if lottery_response.status_code != 200:
print('检查抽奖次数失败,状态码:', lottery_response.status_code)
return
lottery_data = lottery_response.json()
print('抽奖次数:', lottery_data['data'])
if lottery_data['data'] == 0:
print("没有抽奖机会,过程跳过。")
return
# 获取用户id
user_url = BASE_URL + 'user'
user_response = requests.get(user_url, headers=headers)
if user_response.status_code != 200:
print("获取用户ID失败状态码 ", user_response.status_code)
return
user_data = user_response.json()
user_id = user_data['data']['id']
print(f"用户ID获取成功: {user_id}")
# 抽奖
lottery_draw_url = BASE_URL + 'lottery/draw'
payload = {
"activityId": activity_id,
"batch": False,
"isIntegral": False,
"userId": user_id,
"dailyTaskId": task_id # 使用变量task_id作为dailyTaskId
}
#print(payload)
lottery_draw_response = requests.post(lottery_draw_url, headers=headers, json=payload)
if lottery_draw_response.status_code == 200:
#print("抽奖成功!")
lottery_result = lottery_draw_response.json()
lottery_result_list = lottery_result['data']['lotteryResult']
if lottery_result_list:
prize_name = lottery_result_list[0].get('prizeName', '未知奖品')
print("中奖奖品🎉:", prize_name)
else:
print("未获取到抽奖结果")
else:
print("抽奖失败,状态码:", lottery_draw_response.status_code)
time.sleep(3)
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
def ll_sp(sso, task_id): # 浏览商品30秒
browse_url = BASE_URL + f'dailyTask/browseGoods/{task_id}'
headers = create_headers(sso)
print(browse_url)
try:
# 模拟浏览前的准备请求
pre_browse_response = requests.get(browse_url, headers=headers)
if pre_browse_response.status_code != 200:
print(f"浏览前的请求失败,状态码:{pre_browse_response.status_code}")
return False
# 模拟用户浏览30秒
time.sleep(3) # 注意这里应该是30秒但现在是3秒
# 模拟浏览后的完成请求
post_browse_response = requests.get(browse_url, headers=headers)
if post_browse_response.status_code == 200:
print("成功模拟浏览商品30秒")
# 打印响应内容,如果需要
print(post_browse_response.json()) # 更正的行
print(f"浏览后的响应内容: {post_browse_response.text}")
return True
else:
print(f"浏览后的请求失败,状态码:{post_browse_response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return False
def fetch_posts_data(sso):#获取帖子数据并提取 点赞用户id 和 关注用户publisherId然后随机选择一个
url = "https://cdn-api.xinc818.com/mini/posts/sorts?sortType=SPAM&pageNum=1&pageSize=10&groupClassId=0"
headers = {
"Host": "cdn-api.xinc818.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36",
"sso": sso
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json().get('data', {}).get('list', [])
extracted_data = [(item['id'], item['publisherId']) for item in data]
if extracted_data:
# 随机选择一个帖子
selected_post = random.choice(extracted_data)
return selected_post
else:
print("没有找到帖子数据")
return None
else:
print(f"请求失败,状态码:{response.status_code}, 响应内容: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None
def like_post(sso, post_id): # 点赞用户
url = BASE_URL + "posts/like"
headers = create_headers(sso)
payload = {
"postsId": str(post_id),
"decision": True
}
try:
response = requests.put(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"💨成功点赞帖子帖子ID: {post_id}")
#print(f"点赞帖子 完整响应内容: {response.text}")
response_json = response.json()
# 检查 response_json['data'] 是否存在
if response_json.get('data'):
task_name = response_json['data'].get('taskName', '未知任务')
single_reward = response_json['data'].get('singleReward', '未知奖励')
print(f"任务名称: {task_name}, 单次奖励: {single_reward}")
print()
else:
#print("点赞成功,但未获取到任务详情。")
#print("👻👻点赞👻👻。")
print()
else:
print(f"点赞失败,状态码:{response.status_code}")
print(f"完整响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def gz_user(sso, follow_user_id): # 关注用户
url = BASE_URL + "user/follow"
headers = create_headers(sso)
payload = {
"decision": True,
"followUserId": follow_user_id
}
try:
response = requests.put(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"成功关注💗用户用户ID: {follow_user_id}")
#print(f"完整响应内容: {response.text}") # 打印完整的响应内容
response_json = response.json()
# 检查 response_json['data'] 是否存在
if response_json.get('data'):
task_name = response_json['data'].get('taskName', '未知任务')
single_reward = response_json['data'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 奖励: {single_reward}")
print()
else:
#print("关注成功,但未获取到任务详情。")
print()
else:
print(f"关注用户失败,状态码:{response.status_code}")
print(f"完整响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def user_info(sso):#"获取用户信息ID 积分
url = BASE_URL + "user"
headers = create_headers(sso)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
response_json = response.json()
user_data = response_json.get('data', {})
user_id = user_data.get('id', '')
integral = user_data.get('integral', '')
history_integral = user_data.get('historyIntegral', '')
print(f"用户ID: {user_id}, 当前积分: {integral}, 历史积分: {history_integral}")
return user_id # 返回用户ID
else:
print(f"获取用户信息失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None # 如果请求失败或异常返回None
def cytl(sso, post_id): # "参与讨论,对指定帖子发表随机评论
url = BASE_URL + "postsComments"
headers = create_headers(sso)
user_info_result = user_info(sso) # 获取用户信息的结果
# 定义一个评论内容的字典
comments = [
"非常好!",
"我同意!",
"这确实很重要。",
"我从这个评论学到了很多。",
"加油,",
"为你打call",
"今天也要加油哦!",
"很有见地!",
"太精彩了!",
"赞同这个观点。",
"好好",
"真的很棒!",
"这个我喜欢!",
"太赞了!",
"很有帮助!",
"这是我见过的最好的观点!",
"太有创意了!",
"这让我思考良多。",
"绝对同意!",
"讲得太好了!",
"这才是重点!",
"我刚想到这个!",
"太同意了!",
"这解释得太清楚了!",
"这个分析很到位!",
"你抓住了核心!",
"这个角度很独特!",
"没想到这样的观点,太棒了!"
"情感丰富,真实感人!",
"每次看到你的评论都很期待!",
"你的观点总能给人启发!",
"你的评论总是那么独到!",
"期待你更多的分享!",
"你的观点太有深度了!",
"每次看到你的评论都很受益!",
"这分析太透彻了!",
"你的评论总能点亮我的思考!",
"看到你的评论,我的心情都好了!"
"这让我看到了不同的视角!",
"每个人的看法都很有意思!",
"太有创造力了,我喜欢!",
"这个讨论很有价值!",
"你的见解让人耳目一新!",
"这是一个很棒的开始!",
"从你的评论中学到了很多!",
"你的想法很有启发性!",
"这个观点很有趣,赞一个!",
"你的理解深度让我佩服!",
"这确实是个好问题,值得探讨。",
"谢谢分享,我受益匪浅!",
"这种观点很难得,很欣赏!",
"你的分析很到位,赞同!",
"这样的讨论太精彩了,期待更多!"
]
content = random.choice(comments) # 随机选择一个评论内容
if user_info_result:
user_id = user_info_result[0] # 仅提取 user_id
payload = {
"customizeImages": [],
"content": content,
"postsId": post_id,
"publisherId": user_id, # 使用提取的 user_id
"floorId": "",
"voice": ""
}
#print(payload)
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"🙋参与讨论帖子ID: {post_id}, 内容: '{content}'")
# 解析响应内容以获取taskName和singleReward
response_json = response.json()
#print("参与讨论完整响应内容: ", response_json)
task_name = response_json.get('data', {}).get('taskResult', {}).get('taskName', '')
single_reward = response_json.get('data', {}).get('taskResult', {}).get('singleReward', '')
print(f"完成🎉{task_name}, 奖励: {single_reward}🔝🔝🔝")
print()
else:
print(f"参与讨论失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
else:
print("未能获取用户ID无法参与讨论。")
def zb_list(sso): # 主播列表
url = BASE_URL + "groups/defaultGroupList"
headers = create_headers(sso)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
group_list = response.json()
# 提取所有主播的id和associatedAnchorId
anchors = [(group.get('id'), group.get('associatedAnchorId')) for group in group_list.get('data', [])]
# 随机选择一个主播
if anchors:
selected_anchor = random.choice(anchors)
group_id, associated_anchor_id = selected_anchor
#print(f"随机选取的群组ID: {group_id}, 关联主播ID: {associated_anchor_id}")
return group_id, associated_anchor_id # 返回随机选取的群组ID和关联主播ID
else:
print("没有可用的主播列表。")
return None, None
else:
print(f"获取主播列表失败,状态码:{response.status_code}, 响应内容: {response.text}")
return None, None
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None, None
def add_comment(sso, circle_id, related_id, daily_task_id):#给主播留言
"""在指定圈子中对指定主播添加评论"""
url = BASE_URL + "anchorComment/addComment"
headers = create_headers(sso)
# 预定义的评论列表
comments = [
"加油,我们支持你!",
"你是最棒的!",
"永远支持你!",
"我们在这里,永远爱你!",
"你总是能给我们带来欢笑!",
"你的直播太棒了!",
"你总是能够打动我们!",
"你的笑容真的很治愈!",
"我们是你坚强的后盾!",
"为你打call",
"今天也要加油哦!",
"一直在你身后支持你!",
"你是最亮的星!",
"很高兴认识你!",
"你的努力我们都看到了!",
"每一次直播都是一次享受!",
"你的粉丝们都在这里!",
"继续前进,我们会一直在这里!",
"你总是那么有活力!",
"我们看到的不只是努力的你!",
"每次看你的直播都很开心!",
"你的每一次努力我们都看在眼里!",
"你的直播总能给我带来好心情!",
"你的才华无人能及!",
"你的直播是我一天中最期待的时刻!",
"你的魅力真的无法抗拒!",
"在你的直播中总能找到快乐!",
"你的每个瞬间都充满了惊喜!",
"你的努力值得每一份赞赏!",
"你的存在让这个平台更加精彩!",
"期待你的每一次直播!",
"你的每个直播都值得反复观看!",
"你总是能带给我们满满的正能量!",
"你的每一次分享都很有价值!",
"每次看你的直播都能学到很多东西!",
"你的直播里总有无限的乐趣!",
"你的直播总是那么充满活力!",
"你的直播是我们的快乐源泉!",
"感谢你带来这么多美好的直播时光!",
"你的每一场直播都是一场视听盛宴!",
"你的每场直播都是我们的心灵鸡汤!",
"看到你的努力,我们都非常感动!",
"你的笑声太迷人了,每次听都很开心!",
"你的才华横溢,每场直播都令人期待!",
"感谢你总是带给我们这么多正能量!",
"你的每一次直播都给我留下深刻印象!",
"你是我们的超级明星,永远支持你!",
"你在直播中的每一刻都是那么的真实可爱!",
"你的直播是我一天中最放松的时光!",
"每次看到你,都觉得世界变得更美好了!",
"你的直播充满了温暖和力量!",
"你的存在就是我们的幸运!",
"你的直播总是那么富有创造力和想象力!",
"你是那么的不同凡响,总能带来惊喜!",
"你的直播是我的精神食粮!",
"看着你的成长和进步,我们都为你感到骄傲!",
"你总是那么的充满魅力和活力!",
"你的每一次直播都是一次美好的旅行!",
"你的存在让我们的生活充满了乐趣!",
"你是我们心中的英雄,永远支持你!",
"你的直播总是能点亮我们的生活!",
"每次听你说话都特别有感染力!",
"你的直播总是那么有趣,让人忍不住一直看!",
"你的每一次表演都是那么精彩,无法挪开眼!",
"你的直播总是给我带来好心情,谢谢你!",
"每次看你直播都有新的收获,真的很棒!",
"你的直播里总有无尽的正能量,真的很喜欢!",
"你的直播总是那么温馨,感觉像回到家一样!",
"你是我们的快乐小天使,每次看到你都特别开心!",
"你的每一次直播都是我们的期待!",
"你的直播中总有许多惊喜,让人意犹未尽!",
"每次看你的直播都能感受到你的用心!",
"你的直播就像一股清泉,沁人心脾!",
"你的每一次直播都是我们的精神食粮!",
"你的直播总能带给我不一样的感受,太棒了!",
"你的直播充满了智慧和趣味,真是太有才了!",
"你的直播总是能给我们带来欢乐和知识,感谢你!",
"每次看你直播都有种被治愈的感觉!",
"你的直播总是那么充满活力,真是太赞了!",
"你的直播给了我们很多快乐,永远支持你!",
]
# 随机选择一个评论内容
content = random.choice(comments)
payload = {
"content": content,
"circleId": circle_id,
"relatedId": related_id,
"contentType": 0, # contentType固定为0
"dailyTaskId": daily_task_id,
"topCommentId": 0, # topCommentId固定为0
}
#print(payload) # 打印请求内容
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
response_data = response.json()
print(f"💬评论成功圈子ID: {circle_id}, 主播ID: {related_id}, ✍️: '{content}'")
#print(f"评论 完整响应内容: {response_data}") # 打印完整的响应内容
# 检查 response_data['data'] 和 response_data['data']['taskResult'] 是否存在
if response_data.get('data') and response_data['data'].get('taskResult'):
task_name = response_data['data']['taskResult'].get('taskName', '未知任务')
single_reward = response_data['data']['taskResult'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 奖励: {single_reward}")
print()
else:
#print("评论成功,但未获取到任务详情。")
print()
else:
print(f"评论失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def user_info(sso):
url = BASE_URL + "user"
headers = create_headers(sso)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
response_json = response.json()
user_data = response_json.get('data', {})
if user_data is None: # 检查 user_data 是否为 None
#print(f"未能获取到 {sso} 的用户数据。")
return None
user_id = user_data.get('id', '')
integral = user_data.get('integral', '')
history_integral = user_data.get('historyIntegral', '')
#print(f"用户ID: {user_id}, 当前积分: {integral}, 历史积分: {history_integral}")
return user_id, integral, history_integral # 以元组形式返回
else:
print(f"获取用户信息失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None # 如果请求失败或异常返回None
def fetch_hitokoto(): #一言
url_hitokoto = 'https://v1.hitokoto.cn/'
# 设置请求参数
params = {
'c': 'k', # 类型为哲学
'min_length': 10 # 设置返回句子的最小长度为 10
}
try:
response_hitokoto = requests.get(url_hitokoto, params=params)
if response_hitokoto.status_code == 200:
data = response_hitokoto.json()
return data.get('hitokoto')
else:
# 主要 API 请求失败,尝试备用 API
return fetch_hitokoto_backup()
except requests.exceptions.RequestException:
# 主要 API 请求异常,尝试备用 API
return fetch_hitokoto_backup()
def fetch_hitokoto_backup(): # 备用一言
url_backup = 'https://api.7585.net.cn/yan/api.php?charset=utf-8'
try:
response_backup = requests.get(url_backup)
if response_backup.status_code == 200:
return response_backup.text.strip() # 返回备用 API 的响应内容
else:
return f"备用API请求失败状态码{response_backup.status_code}"
except requests.exceptions.RequestException as e:
return f"备用API请求异常: {e}"
def rw_post(sso, content, task_id): # 发帖
headers = create_headers(sso)
url = BASE_URL + "posts"
sid = int(time.time() * 1000) # 生成时间戳
data = {
"topicNames": [],
"content": content,
"groupId": 0,
"groupClassifyId": 0,
"attachments": [],
"voteType": 0,
"commentType": "0",
"dailyTaskId": task_id,
"platform": "android",
"sid": sid
}
#print(data)
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
response_json = response.json()
print(f"💬发帖成功,✍️{content}, 任务ID: {task_id}, 时间戳: '{sid}'")
# 检查 response_json['data'] 是否存在
if response_json.get('data'):
task_name = response_json['data'].get('taskName', '未知任务')
single_reward = response_json['data'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 单次奖励: {single_reward}")
print()
else:
#print("发帖成功,但未获取到任务详情。")
print()
#print(f"发帖完整响应内容: {response.text}")
else:
print(f"发帖请求失败,状态码:{response.status_code}")
print(f"完整响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def hqjljl(sso): # 奖励记录 积分
"""获取奖励记录,并返回当天的积分总和"""
records_url = BASE_URL + f'user/integralRecord?pageNum=1&pageSize=20'
headers = create_headers(sso) # 使用 create_headers 函数创建 headers
try:
response = requests.get(records_url, headers=headers)
if response.status_code == 200:
data = response.json()
total_points_today = 0
today = datetime.now(timezone(timedelta(hours=8))).date() # 获取当前日期
for record in data.get('data', {}).get('list', []):
# 提取 remark、changeValue 和 changeTime
remark = record.get('remark')
change_value = record.get('changeValue')
change_time = record.get('changeTime')
# 将时间戳转换为北京时间
beijing_time = datetime.fromtimestamp(change_time / 1000, timezone(timedelta(hours=8)))
formatted_time = beijing_time.strftime('%Y-%m-%d %H:%M:%S')
# 判断记录是否为当天的
if beijing_time.date() == today:
total_points_today += change_value
# 在循环内部打印每条记录的详细信息
#print(f"任务: {remark}, 积分: {change_value}, 时间: {formatted_time}")
print(f"今日积分: {total_points_today}")
return total_points_today # 返回当天的积分总和
else:
print(f"获取奖励记录失败,状态码:{response.status_code}, 响应内容: {response.text}")
return None # 获取失败时返回 None
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None # 请求异常时返回 None
#本地测试用
os.environ['XSSONF1'] = '''
Wmeimob_eyJ0ebGciOiJIUzI1NiJ9.eyJzdWIiOiIx#大号
'''
class Tee:
def __init__(self, *files):
self.files = files
def write(self, obj):
for file in self.files:
file.write(obj)
file.flush() # 确保及时输出
def flush(self):
for file in self.files:
file.flush()
# 主函数
def main():
var_name = 'XSSONF'
tokens = get_env_variable(var_name)
if not tokens:
return
yxsl = len(tokens) # 账号总数
# 首先对每个账号运行 rwlb(sso)
for i, token in enumerate(tokens):
parts = token.split('#')
if len(parts) < 2:
print("令牌格式不正确。跳过处理。")
continue
sso = parts[0]
account_no = parts[1]
print(f'------账号 {i+1}/{yxsl} {account_no} -------')
rwlb(sso) # 任务列表
# 设置Tee类实例并开始捕获输出
original_stdout = sys.stdout # 保存原始stdout
string_io = io.StringIO() # 创建StringIO对象以捕获输出
sys.stdout = Tee(sys.stdout, string_io) # 将stdout重定向
# 所有账号运行完 rwlb(sso) 后再统一运行 hqjljl(sso) 和 user_info(sso)
for i, token in enumerate(tokens):
parts = token.split('#')
if len(parts) < 2:
continue # 如果令牌格式不正确,继续下一个
sso = parts[0]
account_no = parts[1]
print(f'---账号{i+1}/{yxsl} {account_no}---')
user_info_result = user_info(sso) # 获取 user_info 函数的返回值
if user_info_result is None:
print(f"由于某些原因/过期/不正确 跳过此账号。")
continue # 跳过此次循环的剩余部分
# 解包 user_info 函数返回的元组
user_id, integral, history_integral = user_info_result
print(f"🎊当前积分: {integral}, 历史积分: {history_integral}") # 打印积分信息
hqjljl(sso) # 处理奖励
# 捕获完成后重置sys.stdout并获取内容
sys.stdout = original_stdout # 重置stdout
output_content = string_io.getvalue() # 获取捕获的输出
# 如果需要发送通知
if enable_notification == 1:
send("心喜-通知", output_content)
if __name__ == "__main__":
main()