This commit is contained in:
hnking
2025-10-16 14:39:58 +08:00
parent 9da1509656
commit a0d8f11e63

View File

@@ -1,250 +1,391 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 变量QHAZ 抓茄皇的AZ数据authorization
import os
import requests import requests
import json
import time import time
from urllib3.exceptions import InsecureRequestWarning import os
import random
from requests.exceptions import RequestException
# 禁用SSL警告 # ==============================================================================
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # 【环境变量QH配置说明无token隐藏wid展示
# 1. 格式:
# - 单账号直接填写wid123456
# - 多账号:支持两种分隔方式,可混合使用
# - &分隔123&456&789
# - 换行分隔每个wid单独占一行Windows/Linux换行符均兼容
# 2. 关键脚本仅内部使用wid登录所有输出不显示widUA由脚本自动随机生成小程序ID固定
'''
不用抓包,直接登录小程序。
个人中心---用户设置---用户编号就是需要的wid信息
'''
class ZhuManitoTask: # ==============================================================================
def __init__(self, authorization, account_name):
self.authorization = authorization
self.account_name = account_name
self.base_headers = {
'host': 'api.zhumanito.cn',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541113) XWEB/16771',
'authorization': authorization,
'accept': '*/*',
'origin': 'https://h5.zhumanito.cn',
'sec-fetch-site': 'same-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': 'https://h5.zhumanito.cn/',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'priority': 'u=1, i'
}
self.tasks = []
def print_separator(self): # -------------------------- 【配置+工具函数隐藏wid输出】--------------------------
print("" * 50) def generate_random_ua():
"""生成随机UA基于iPhone微信小程序模板小程序ID wx532ecb3bdaaf92f9固定不变"""
# iPhone OS版本与对应Mobile版本映射可按需扩展
os_mobile_map = [
("15_8_3", "15E148"),
("16_2_0", "16F203"),
("16_5_1", "16H62"),
("17_0_3", "17A5844a"),
("17_1_1", "17B100"),
("17_2_0", "17C304"),
("17_3_1", "17D50"),
("17_4_1", "17E262")
]
# 随机选择OS与Mobile版本
os_version, mobile_version = random.choice(os_mobile_map)
# 随机生成微信版本8.0.50 ~ 8.0.75范围)
wechat_version = f"8.0.{random.randint(50, 75)}"
# 拼接UA字符串
return (
f"Mozilla/5.0 (iPhone; CPU iPhone OS {os_version} like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
f"Mobile/{mobile_version} MicroMessenger/{wechat_version} "
"NetType/WIFI Language/zh_CN miniProgram/wx532ecb3bdaaf92f9"
)
def print_dashed_line(self):
print("=" * 50)
def make_request(self, method, url, headers, data=None, task_name=""): def parse_qh_env():
"""通用的请求函数""" """解析QH环境变量支持&和换行分隔多账号自动生成UA"""
try: qh_env = os.getenv("QH", "")
if method.upper() == "GET": if not qh_env:
response = requests.get( print("❌ 错误未检测到环境变量QH请按配置说明设置")
url=url, return None
headers=headers,
verify=False,
timeout=30
)
else:
if isinstance(data, dict):
response = requests.post(
url=url,
headers=headers,
json=data,
verify=False,
timeout=30
)
else:
response = requests.post(
url=url,
headers=headers,
data=data,
verify=False,
timeout=30
)
if response.status_code == 200: # 统一分隔符:将所有换行符(\r\n、\n替换为&,再拆分账号
try: unified_env = qh_env.replace("\r\n", "&").replace("\n", "&")
result = response.json() account_str_list = unified_env.split("&")
return True, result
except json.JSONDecodeError:
return True, {"message": "success", "text": response.text}
else:
return False, {"error": f"HTTP {response.status_code}", "message": response.text}
except Exception as e: accounts = []
return False, {"error": str(e)} for idx, account_str in enumerate(account_str_list, 1):
wid = account_str.strip()
# 过滤空内容(避免多余分隔符导致的无效项)
if not wid:
print(f"⚠️ 检测到第{idx}个无效项(空内容),已跳过")
continue
def get_task_list(self): # 为当前账号生成随机UA
"""获取任务列表""" ua = generate_random_ua()
url = "https://api.zhumanito.cn/api/task?" print(f" 账号{idx}自动生成UA前70字符{ua[:70]}...")
headers = self.base_headers.copy()
success, result = self.make_request("GET", url, headers, task_name="获取任务列表") accounts.append({
"index": idx,
"wid": wid,
"token": "",
"ua": ua,
"user_data": {},
"land_data": []
})
if success and result.get("code") == 200: if not accounts:
self.tasks = result.get("data", {}).get("task", []) print("❌ 没有可用账号(所有项格式错误或为空),脚本终止")
return True return None
else: return accounts
print(f"❌ 获取任务列表失败: {result}")
def get_account_headers(account):
"""生成账号请求头使用自动生成的UA无wid相关输出"""
return {
"Authorization": account["token"],
"User-Agent": account["ua"],
"Origin": "https://h5.zhumanito.cn",
"Referer": "https://h5.zhumanito.cn/",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Accept-Encoding": "gzip, deflate, br"
}
def mask_wid(wid):
"""对wid进行脱敏处理显示前4位和后4位中间用****替代"""
if len(wid) <= 8:
return wid # 如果wid长度小于等于8直接返回原wid
return f"{wid[:4]}****{wid[-4:]}"
def login_account(account):
"""账号自动登录隐藏wid显示登录后更新token和用户数据"""
login_url = "https://api.zhumanito.cn/api/login"
headers = get_account_headers(account)
# 补充登录请求的Content-Type
headers["Content-Type"] = "application/json;charset=utf-8"
payload = {"wid": account["wid"]} # 内部使用wid不打印
try:
print(f"🔐 账号{account['index']}发起登录请求wid脱敏{mask_wid(account['wid'])}")
response = requests.post(login_url, headers=headers, json=payload, timeout=20)
response.raise_for_status() # 触发HTTP错误如404、500
res = response.json()
if res.get("code") != 200:
print(f"❌ 账号{account['index']}:登录失败,原因:{res.get('msg', '未知错误')}")
return False return False
def complete_task(self, task_id, task_content, water_num, sun_num): # 更新账号的token、用户数据、土地数据
"""完成任务""" account["token"] = res["data"]["token"]
url = "https://api.zhumanito.cn/api/task/complete" account["user_data"] = res["data"]["user"]
headers = self.base_headers.copy() account["land_data"] = res["data"].get("land", [])
headers.update({
'content-length': '10',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
})
data = f"task_id={task_id}&"
success, result = self.make_request("POST", url, headers, data, f"完成任务{task_id}") # 打印登录成功信息不含wid
print(f"✅ 账号{account['index']}:登录成功!")
print(f" 📌 当前资源:💧{account['user_data']['water_num']},☀️{account['user_data']['sun_num']}")
if account["land_data"]:
print(f" 🌱 土地状态:共{len(account['land_data'])}块,生长阶段{account['land_data'][0]['seed_stage']}")
return True
if success: except RequestException as e:
return True, {"water": water_num, "sunshine": sun_num} print(f"❌ 账号{account['index']}:登录异常,原因:{str(e)}")
else: return False
return False, result
def click_task(self, task_id, task_content, water_num, sun_num):
"""点击任务"""
url = "https://api.zhumanito.cn/api/click"
headers = self.base_headers.copy()
headers.update({
'content-length': '2',
'content-type': 'application/json;charset=UTF-8',
})
data = {}
success, result = self.make_request("POST", url, headers, data, f"点击任务{task_id}")
if success:
return True, {"water": water_num, "sunshine": sun_num}
else:
return False, result
def water_task(self):
"""浇水任务"""
url = "https://api.zhumanito.cn/api/water"
headers = self.base_headers.copy()
headers.update({
'content-length': '0',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
})
data = ""
success, result = self.make_request("POST", url, headers, data, "浇水任务")
if success:
# 这里可以根据实际API返回调整
return True, {
"remaining_water": 5,
"remaining_sunshine": 5,
"land_status": {
"total": 6,
"growth_stage": 1
}
}
else:
return False, result
def process_account(self):
"""处理单个账号的所有任务"""
print(f"\n开始处理{self.account_name}")
self.print_dashed_line()
# 获取任务列表
if not self.get_task_list():
print("❌ 无法获取任务列表,跳过该账号")
return
print(f"{self.account_name}任务状态:")
# 执行任务
completed_tasks = 0
total_tasks = len(self.tasks)
for i, task in enumerate(self.tasks, 1):
task_id = task.get("id")
task_content = task.get("content", "")
water_num = task.get("water_num", 0)
sun_num = task.get("sun_num", 0)
task_status = task.get("status", 0)
task_type = task.get("type", 0)
# 如果任务已完成(status=0),则跳过
if task_status == 0:
status = f"任务{i}:{task_content}|奖励: 水滴={water_num},阳光={sun_num}|✓已完成"
print(status)
completed_tasks += 1
continue
success = False
reward = {}
if task_id == 1:
success, reward = self.complete_task(task_id, task_content, water_num, sun_num)
elif task_id == 2: # 浏览指定页面
success, reward = self.click_task(task_id, task_content, water_num, sun_num)
else:
# 其他任务类型,默认使用完成任务接口
success, reward = self.complete_task(task_id, task_content, water_num, sun_num)
if success:
status = f"任务{i}:{task_content}|奖励: 水滴={reward.get('water', water_num)},阳光={reward.get('sunshine', sun_num)}|✓已完成"
completed_tasks += 1
else:
status = f"任务{i}:{task_content}|奖励: 无|✗失败"
print(status)
time.sleep(1) # 任务间延迟
self.print_dashed_line()
# 如果所有任务都完成了,执行浇水
if completed_tasks >= total_tasks:
print(f"{self.account_name}:所有任务已完成或无法获取任务,直接执行浇水")
self.print_dashed_line()
# 执行浇水任务
print(f"{self.account_name}: 开始浇水...")
time.sleep(1)
success, result = self.water_task()
if success:
print(f"{self.account_name}: 浇水成功!")
print(f"|剩余水滴: {result['remaining_water']},剩余阳光: {result['remaining_sunshine']}")
print(f"|土地状态:共{result['land_status']['total']}块,生长阶段{result['land_status']['growth_stage']}")
else:
print(f"{self.account_name}: 浇水失败!")
else:
print(f"{self.account_name}: 有任务未完成,跳过浇水")
self.print_dashed_line()
print(f"{self.account_name}处理完毕")
self.print_separator()
def main(): def get_user_status(account):
authorizations = os.getenv("QHAZ").split("&") """获取账号当前资源水、阳光无wid输出"""
if not authorizations: if not account.get("user_data"):
print("❌ 未找到环境变量 QHAZ") print(f"⚠️ 账号{account['index']}未获取到用户数据返回默认资源值0")
return 0, 0
water = account["user_data"].get("water_num", 0)
sun = account["user_data"].get("sun_num", 0)
return water, sun
# -------------------------- 【任务+浇水函数无wid输出】--------------------------
def get_unfinished_tasks(headers, account_idx):
"""获取账号未完成任务列表无wid相关输出"""
task_url = "https://api.zhumanito.cn/api/task?"
try:
response = requests.get(task_url, headers=headers, timeout=20)
response.raise_for_status()
task_data = response.json()
if task_data.get("code") != 200:
print(f"❌ 账号{account_idx}:获取任务列表失败,原因:{task_data.get('msg', '未知错误')}")
return []
# 筛选未完成任务status=0
unfinished_tasks = [t for t in task_data["data"]["task"] if t["status"] == 0]
# 打印所有任务状态
print("=" * 40)
print(f"📋 账号{account_idx} - 所有任务状态:")
for task in task_data["data"]["task"]:
status_text = "✅ 已完成" if task["status"] == 1 else "❌ 未完成"
print(
f" 任务{task['id']}{task['content']} | 奖励:💧{task['water_num']} ☀️{task['sun_num']} | {status_text}")
print("=" * 40)
return unfinished_tasks
except RequestException as e:
print(f"❌ 账号{account_idx}:获取任务异常,原因:{str(e)}")
return []
def complete_task_1(headers, account_idx, account):
"""执行【每日签到】任务任务ID=1完成后更新用户资源"""
complete_url = "https://api.zhumanito.cn/api/task/complete"
try:
print(f"\n🔄 账号{account_idx}开始执行【每日签到】任务ID=1")
# 复制请求头并补充Content-Type
task_headers = headers.copy()
task_headers["Content-Type"] = "application/x-www-form-urlencoded;charset=utf-8"
# 提交任务完成请求
response = requests.post(complete_url, headers=task_headers, data="task_id=1&", timeout=20)
response.raise_for_status()
res = response.json()
if res.get("code") != 200:
print(f"❌ 账号{account_idx}:【每日签到】失败,原因:{res.get('msg', '未知错误')}")
return False
# 更新账号的用户数据(资源变化)
if res.get("data", {}).get("user"):
account["user_data"] = res["data"]["user"]
print(f"✅ 账号{account_idx}:【每日签到】任务完成!")
return True
except RequestException as e:
print(f"❌ 账号{account_idx}:【每日签到】异常,原因:{str(e)}")
return False
def complete_task_2(headers, account_idx, account, task):
"""执行【浏览指定页面】任务任务ID=2需先访问链接再提交"""
# 检查任务是否包含目标链接
task_url = task.get("url")
if not task_url:
print(f"\n❌ 账号{account_idx}:【浏览指定页面】任务缺少目标链接,已跳过")
return False
try:
print(f"\n🔄 账号{account_idx}开始执行【浏览指定页面】任务ID=2")
print(f"🌐 正在访问目标链接前60字符{task_url[:60]}...")
# 访问目标页面(模拟浏览)
browse_response = requests.get(task_url, headers=headers, timeout=25)
browse_response.raise_for_status()
time.sleep(3) # 模拟实际浏览延迟
print(f"✅ 账号{account_idx}:目标页面浏览完成")
# 提交任务完成请求
complete_url = "https://api.zhumanito.cn/api/task/complete"
submit_headers = headers.copy()
submit_headers["Content-Type"] = "application/x-www-form-urlencoded;charset=utf-8"
submit_data = f"task_id={task['id']}&"
submit_response = requests.post(complete_url, headers=submit_headers, data=submit_data, timeout=20)
submit_response.raise_for_status()
res = submit_response.json()
if res.get("code") != 200:
print(f"❌ 账号{account_idx}:【浏览指定页面】提交失败,原因:{res.get('msg', '未知错误')}")
return False
# 更新账号用户数据
if res.get("data", {}).get("user"):
account["user_data"] = res["data"]["user"]
print(f"✅ 账号{account_idx}:【浏览指定页面】任务完成!")
return True
except RequestException as e:
print(f"❌ 账号{account_idx}:【浏览指定页面】异常,原因:{str(e)}")
return False
def complete_watering(headers, account_idx, account):
"""执行浇水操作(带重试机制,浇水后更新资源和土地状态)"""
water_url = "https://api.zhumanito.cn/api/water"
# 配置请求重试策略(应对临时网络错误)
retry_strategy = requests.adapters.HTTPAdapter(
max_retries=requests.packages.urllib3.util.retry.Retry(
total=2, # 总重试次数
backoff_factor=3, # 重试间隔1s, 3s, 9s...
allowed_methods=["POST"], # 仅对POST请求重试
status_forcelist=[429, 500, 502, 503, 504] # 需要重试的HTTP状态码
)
)
session = requests.Session()
session.mount("https://", retry_strategy)
try:
print("=" * 40)
print(f"💧 账号{account_idx}:开始执行浇水操作")
# 复制请求头并补充Content-Type
water_headers = headers.copy()
water_headers["Content-Type"] = "application/x-www-form-urlencoded;charset=utf-8"
# 发送浇水请求
response = session.post(water_url, headers=water_headers, data=b"", timeout=(25, 30))
response.raise_for_status()
res = response.json()
# 处理能量不足的特殊情况
if res.get("code") == 10006 and "能量值不足" in res.get("msg", ""):
print(f"⚠️ 账号{account_idx}:浇水失败,原因:{res['msg']}")
return False
if res.get("code") != 200:
print(f"❌ 账号{account_idx}:浇水失败,原因:{res.get('msg', '未知错误')}")
return False
# 更新账号的用户数据和土地数据
account["user_data"] = res["data"]["user"]
account["land_data"] = res["data"].get("land", [])
# 打印浇水结果
current_water, current_sun = get_user_status(account)
land_count = len(account["land_data"])
print(f"✅ 账号{account_idx}:浇水成功!")
print(f"📊 剩余资源:💧{current_water},☀️{current_sun}")
if land_count > 0:
print(f"🌱 土地状态:共{land_count}块,生长阶段{account['land_data'][0]['seed_stage']}")
print("=" * 40)
return True
except RequestException as e:
print(f"❌ 账号{account_idx}:浇水异常,原因:{str(e)}")
return False
# -------------------------- 【主流程无wid输出】--------------------------
def auto_multi_account():
"""多账号自动处理主流程:登录→任务→浇水"""
print("🚀 【茄皇多账号自动化脚本】已启动\n")
print("🚀 欢迎加入qq群https://qm.qq.com/q/RfI2JT5AmO\n")
# 解析环境变量获取账号列表
accounts = parse_qh_env()
if not accounts:
return return
num = 1
for authorization in authorizations:
# 从环境变量获取授权令牌
account2 = ZhuManitoTask(authorization, f"账号{num}") # 逐个处理账号
account2.process_account() for account in accounts:
num = num + 1 account_idx = account["index"]
# 可以继续添加更多账号 total_accounts = len(accounts)
# account3 = ZhuManitoTask(authorization, "账号3") # 打印账号处理分隔符
# account3.process_account() print(f"\n" + "=" * 50)
print(f"📌 正在处理账号 {account_idx}/{total_accounts}")
print("=" * 50)
print("✓所有账号已全部处理完成!") # 1. 账号登录
login_success = login_account(account)
if not login_success:
print(f"❌ 账号{account_idx}:登录失败,跳过后续所有操作")
continue
# 2. 获取请求头登录后token已更新
account_headers = get_account_headers(account)
# 3. 获取并处理未完成任务
unfinished_tasks = get_unfinished_tasks(account_headers, account_idx)
if unfinished_tasks:
for task in unfinished_tasks:
# 匹配任务类型并执行
if task["id"] == 1 and task["content"] == "每日签到":
complete_task_1(account_headers, account_idx, account)
elif task["id"] == 2 and task["content"] == "浏览指定页面":
complete_task_2(account_headers, account_idx, account, task)
else:
print(f"\n⚠️ 账号{account_idx}发现未知任务ID{task['id']},内容:{task['content']}),已跳过")
# 打印任务处理后的资源
task_water, task_sun = get_user_status(account)
print(f"\n🎉 账号{account_idx}:所有可处理任务已完成")
print(f"📊 任务后资源:💧{task_water},☀️{task_sun}")
else:
print(f"\n🎉 账号{account_idx}:无未完成任务或无法获取任务列表")
no_task_water, no_task_sun = get_user_status(account)
print(f"📊 当前资源:💧{no_task_water},☀️{no_task_sun}")
# 4. 循环浇水资源满足水≥20且阳光≥20
print(f"\n🔄 账号{account_idx}进入循环浇水逻辑触发条件💧≥20 且 ☀≥20")
while True:
current_water, current_sun = get_user_status(account)
if current_water >= 20 and current_sun >= 20:
print(f"\n📌 账号{account_idx}:资源满足(💧{current_water},☀️{current_sun}),执行浇水")
water_success = complete_watering(account_headers, account_idx, account)
if not water_success:
print(f"🔚 账号{account_idx}:浇水失败,退出浇水循环")
break
time.sleep(2) # 浇水间隔,避免请求过于频繁
else:
print(f"\n🔚 账号{account_idx}:资源不足(💧{current_water},☀️{current_sun}),停止浇水")
break
# 单个账号处理完成
print(f"\n✅ 账号{account_idx}/{total_accounts}:所有操作处理完毕")
# 非最后一个账号,添加延迟避免并发
if account_idx < total_accounts:
delay_time = 5
print(f"⏳ 账号间延迟{delay_time}秒,准备处理下一个账号...\n")
time.sleep(delay_time)
# 所有账号处理完成
print("\n" + "=" * 50)
print("🎯 所有账号已全部处理完成!脚本执行结束")
print("=" * 50)
# 脚本入口
if __name__ == "__main__": if __name__ == "__main__":
main() auto_multi_account()