Files
3288588344-toulu/金徽酒会员.py
2025-05-02 10:01:41 +08:00

338 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#by:哆啦A梦
#入口:http://api.0vsp.com/h5/wxa/link?sid=25430gykJTW
#抓包ucodeprod-openapi.jinhuijiu.com.cn域名下的Authorization和serialId格式Authorization#serialId
#多账号换行分割
import os
import requests
import json
import random
import time
def get_proclamation():
primary_url = "https://github.com/3288588344/toulu/raw/refs/heads/main/tl.txt"
backup_url = "https://tfapi.cn/TL/tl.json"
try:
response = requests.get(primary_url, timeout=10)
if response.status_code == 200:
print("📢 公告信息")
print("=" * 45)
print(response.text)
print("=" * 45 + "\n")
print("公告获取成功,开始执行任务...\n")
return
except requests.exceptions.RequestException as e:
print(f"获取公告时发生错误: {e}, 尝试备用链接...")
try:
response = requests.get(backup_url, timeout=10)
if response.status_code == 200:
print("\n" + "=" * 50)
print("📢 公告信息")
print("=" * 45)
print(response.text)
print("=" * 45 + "\n")
print("公告获取成功,开始执行任务...\n")
else:
print(f"⚠️ 获取公告失败,状态码: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"⚠️ 获取公告时发生错误: {e}, 可能是网络问题或链接无效。")
def get_desensitized_phone(response_data):
"""
从响应数据中提取并脱敏账号。
"""
phone_number = response_data.get("phoneNumber", "")
if phone_number and len(phone_number) >= 11:
return phone_number[:3] + "****" + phone_number[7:]
return "未知账号"
def fetch_user_metadata():
"""
从环境变量获取多账号信息,发送请求获取用户数据元,并对账号进行脱敏处理。
"""
env_variable = "JHHY"
url = "https://ucodeprod-openapi.jinhuijiu.com.cn/user/metadata"
account_info_list = os.getenv(env_variable, "").splitlines()
if not account_info_list:
print("环境变量JHHY未设置或格式不正确")
print("=" * 45)
return
for account_info in account_info_list:
if not account_info.strip():
continue
try:
token_part, serial_part = account_info.split("#", 1)
token = token_part.strip()
serial_id = serial_part.strip()
except ValueError:
print(f"警告: 账号信息格式错误")
print("=" * 45)
continue
headers = {
'Authorization': f"Bearer {token}",
'serialId': serial_id
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
desensitized_phone = get_desensitized_phone(data)
print(f"账号 {desensitized_phone} 获取账户信息成功")
print("=" * 45)
except requests.exceptions.RequestException as e:
print(f"账号 {account_info} 获取账号信息失败: {e}")
print("=" * 45)
except ValueError as e:
print(f"账号 {account_info} 账号数据解析失败: {e}")
print("=" * 45)
def perform_check_in():
"""
发送签到请求。
"""
env_variable = "JHHY"
checkin_url = "https://ucodeprod-openapi.jinhuijiu.com.cn/lottery/checkIn"
metadata_url = "https://ucodeprod-openapi.jinhuijiu.com.cn/user/metadata"
account_info_list = os.getenv(env_variable, "").splitlines()
if not account_info_list:
print("环境变量JHHY未设置或格式不正确")
print("=" * 45)
return
# 使用字典存储每个账号的脱敏手机号
account_desensitized_info = {}
# 首先获取每个账号的脱敏手机号
for account_info in account_info_list:
if not account_info.strip():
continue
try:
token_part, serial_part = account_info.split("#", 1)
auth_token = f"Bearer {token_part.strip()}"
serial_id = serial_part.strip()
except ValueError:
print(f"警告: 账号信息格式错误")
print("=" * 45)
continue
headers = {
'Authorization': auth_token,
'serialId': serial_id,
}
try:
# 获取账号信息并脱敏
metadata_response = requests.get(metadata_url, headers=headers)
metadata_response.raise_for_status()
metadata_data = metadata_response.json()
account_desensitized_info[account_info] = get_desensitized_phone(metadata_data)
except Exception as e:
# 这里只记录获取脱敏信息的错误,不输出
account_desensitized_info[account_info] = "未知账号"
# 进行签到
for account_info in account_info_list:
if not account_info.strip():
continue
desensitized_phone = account_desensitized_info.get(account_info, "未知账号")
try:
token_part, serial_part = account_info.split("#", 1)
auth_token = f"Bearer {token_part.strip()}"
serial_id = serial_part.strip()
except ValueError:
print(f"警告: 账号信息格式错误")
print("=" * 45)
continue
params = {
'longitude': "119.24095916748047",
'latitude': "34.2840690612793"
}
payload = {
"promotionCode": "signIn",
"promotionId": 1001867,
"longitude": 119.24095916748047,
"latitude": 34.2840690612793
}
headers = {
'Content-Type': "application/json",
'Authorization': auth_token,
'serialId': serial_id,
}
try:
response = requests.post(checkin_url, params=params, data=json.dumps(payload), headers=headers)
response.raise_for_status()
response_data = response.json()
if response_data.get("success", False):
if "今日已签到" in response_data.get("message", ""):
print(f"账号 {desensitized_phone} 今日已签到")
print("=" * 45)
else:
print(f"账号 {desensitized_phone} 签到成功")
print("=" * 45)
else:
error_message = response_data.get('message', '未知错误')
if "今日已签到" in error_message:
print(f"账号 {desensitized_phone} 今日已签到")
print("=" * 45)
else:
print(f"账号 {desensitized_phone} 签到失败: {error_message}")
print("=" * 45)
except requests.exceptions.RequestException as e:
# 检查是否有返回的响应内容
if hasattr(e, 'response') and e.response is not None:
try:
error_response = e.response.json()
error_message = error_response.get('emsg', error_response.get('message', '未知错误'))
# 检查是否是已签到的错误
if "今日已签到" in error_message:
print(f"账号 {desensitized_phone} 今日已签到")
print("=" * 45)
else:
print(f"账号 {desensitized_phone} 签到失败: {error_message}")
print("=" * 45)
except ValueError:
# 如果响应内容不是JSON格式直接输出原始内容
print(f"账号 {desensitized_phone} 签到失败: {e.response.text}")
print("=" * 45)
else:
# 如果没有响应内容,输出通用错误提示
print(f"账号 {desensitized_phone} 签到失败: 网络请求失败,请检查网络连接后重试")
print("=" * 45)
def complete_tasks(task_ids):
"""
完成指定任务,并加入随机延迟
"""
url = "https://ucodeprod-openapi.jinhuijiu.com.cn/task/complete"
env_variable = "JHHY"
account_info_list = os.getenv(env_variable, "").splitlines()
if not account_info_list:
print("环境变量JHHY未设置或格式不正确")
print("=" * 45)
return
# 首先获取每个账号的脱敏手机号和认证信息
account_info_map = {}
for account_info in account_info_list:
if not account_info.strip():
continue
try:
token_part, serial_part = account_info.split("#", 1)
token = token_part.strip()
serial_id = serial_part.strip()
except ValueError:
print(f"警告: 账号信息格式错误")
print("=" * 45)
continue
# 获取账号脱敏信息
headers = {
'Authorization': f"Bearer {token}",
'serialId': serial_id,
}
try:
metadata_response = requests.get("https://ucodeprod-openapi.jinhuijiu.com.cn/user/metadata", headers=headers)
metadata_response.raise_for_status()
metadata_data = metadata_response.json()
desensitized_phone = get_desensitized_phone(metadata_data)
account_info_map[account_info] = {
'token': token,
'serial_id': serial_id,
'desensitized_phone': desensitized_phone
}
except Exception as e:
account_info_map[account_info] = {
'token': token,
'serial_id': serial_id,
'desensitized_phone': "未知账号"
}
# 对每个账号完成任务
for account_info, info in account_info_map.items():
desensitized_phone = info['desensitized_phone']
token = info['token']
serial_id = info['serial_id']
headers = {
'Content-Type': "application/json",
'Authorization': f"Bearer {token}",
'serialId': serial_id
}
for task_id in task_ids:
payload = {
"taskId": task_id,
"auto": True
}
try:
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 不再调用raise_for_status因为我们需要捕获400状态码
if response.status_code == 200:
data = response.json()
# 如果任务完成成功
if data.get('success', False):
task_name = data.get('lotteryResultVo', {}).get('prizes', [{}])[0].get('remark', '未知任务')
print("=" * 45)
print(f"账号 {desensitized_phone} 任务 '{task_name}'ID: {task_id})成功完成!")
print("=" * 45)
else:
error_message = data.get('message', '未知错误')
print(f"账号 {desensitized_phone} 任务 ID {task_id} 完成失败: {error_message}")
elif response.status_code == 400:
error_data = response.json()
if error_data.get('ecode') == 41041 and "用户已完成任务" in error_data.get('emsg', ''):
print(f"账号 {desensitized_phone} 任务 ID {task_id} 已完成")
print("=" * 45)
else:
print(f"账号 {desensitized_phone} 任务 ID {task_id} 完成失败: 状态码400错误信息: {response.text}")
print("=" * 45)
else:
print(f"账号 {desensitized_phone} 任务 ID {task_id} 完成失败: 状态码 {response.status_code},响应内容: {response.text}")
print("=" * 45)
# 随机延迟0-20秒
delay_time = random.uniform(0, 20)
print(f"账号 {desensitized_phone} 在完成任务后延迟 {delay_time:.2f}")
print("=" * 45)
time.sleep(delay_time)
except requests.exceptions.RequestException as e:
print(f"账号 {desensitized_phone} 请求任务 ID {task_id} 时发生网络错误:{e}")
print("=" * 45)
except ValueError as e:
print(f"账号 {desensitized_phone} 解析任务 ID {task_id} 响应内容失败:{e}")
print("=" * 45)
# 主函数
def main():
#获取公告
get_proclamation()
# 签到任务
perform_check_in()
# 完成任务
task_ids = [100016, 100017, 10018]
complete_tasks(task_ids)
if __name__ == "__main__":
main()