新增-未验证

This commit is contained in:
XiaoGe-LiBai
2025-11-10 16:12:48 +08:00
parent ce6b704abe
commit 297407ba3c
11 changed files with 5462 additions and 0 deletions

View File

@@ -0,0 +1,334 @@
// ==UserScript==
// @name Bing Rewards 自动获取刷新令牌
// @namespace http://tampermonkey.net/
// @version 1.0
// @description 自动从Microsoft授权页面获取刷新令牌
// @author 輕🌊ꫛꫀˑꪝ(ID28507)
// @icon https://account.microsoft.com/favicon.ico
// @match https://login.live.com/oauth20_desktop.srf*
// @match https://login.live.com/oauth20_authorize.srf*
// @grant GM_setValue
// @grant GM_getValue
// @grant GM_notification
// @grant GM_setClipboard
// @run-at document-start
// @homepage https://www.yaohuo.me/bbs/userinfo.aspx?touserid=28507
// @supportURL https://www.yaohuo.me/bbs/userinfo.aspx?touserid=28507
// ==/UserScript==
(function() {
'use strict';
// 检查当前页面是否是授权回调页面
function checkForAuthCode() {
const url = window.location.href;
const urlParams = new URLSearchParams(window.location.search);
// 检查是否在回调页面且包含授权码
if (url.includes('oauth20_desktop.srf') && urlParams.has('code')) {
const code = urlParams.get('code');
console.log('🎯 检测到授权码:', code.substring(0, 20) + '...');
// 显示处理状态
showProcessingUI();
// 获取刷新令牌
getRefreshTokenFromCode(code);
}
}
// 显示处理界面
function showProcessingUI() {
// 创建覆盖层
const overlay = document.createElement('div');
overlay.id = 'token-overlay';
overlay.style.cssText = `
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0, 0, 0, 0.8);
z-index: 99999;
display: flex;
justify-content: center;
align-items: center;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
`;
// 创建内容容器
const container = document.createElement('div');
container.style.cssText = `
background: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.3);
max-width: 600px;
width: 90%;
text-align: center;
`;
container.innerHTML = `
<h2 style="color: #0078d4; margin-bottom: 20px;">🔧 Bing Rewards 令牌获取工具</h2>
<div id="status-content">
<div style="margin: 20px 0;">
<div class="spinner" style="
border: 4px solid #f3f3f3;
border-top: 4px solid #0078d4;
border-radius: 50%;
width: 40px;
height: 40px;
animation: spin 1s linear infinite;
margin: 0 auto 15px;
"></div>
<p style="color: #666; font-size: 16px;">🔄 正在获取刷新令牌...</p>
</div>
</div>
`;
// 添加旋转动画
const style = document.createElement('style');
style.textContent = `
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
`;
document.head.appendChild(style);
overlay.appendChild(container);
document.body.appendChild(overlay);
}
// 更新状态显示
function updateStatus(html) {
const statusContent = document.getElementById('status-content');
if (statusContent) {
statusContent.innerHTML = html;
}
}
// 通过授权码获取刷新令牌
async function getRefreshTokenFromCode(code) {
const tokenUrl = "https://login.live.com/oauth20_token.srf";
const data = new URLSearchParams({
'client_id': '0000000040170455',
'code': code,
'grant_type': 'authorization_code',
'redirect_uri': 'https://login.live.com/oauth20_desktop.srf',
'scope': 'service::prod.rewardsplatform.microsoft.com::MBI_SSL'
});
try {
const response = await fetch(tokenUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: data
});
if (response.ok) {
const tokenData = await response.json();
if (tokenData.refresh_token) {
const refreshToken = tokenData.refresh_token;
// 保存令牌到本地存储
GM_setValue('bing_refresh_token', refreshToken);
// 复制到剪贴板
GM_setClipboard(refreshToken);
// 显示成功信息
showSuccessUI(refreshToken);
// 发送通知
GM_notification({
text: '✅ 刷新令牌获取成功!已复制到剪贴板',
title: 'Bing Rewards',
timeout: 5000
});
console.log('✅ 刷新令牌获取成功:', refreshToken);
} else {
throw new Error('响应中未找到refresh_token');
}
} else {
throw new Error(`请求失败,状态码: ${response.status}`);
}
} catch (error) {
console.error('❌ 获取令牌失败:', error);
showErrorUI(error.message);
GM_notification({
text: '❌ 获取令牌失败: ' + error.message,
title: 'Bing Rewards',
timeout: 5000
});
}
}
// 显示成功界面
function showSuccessUI(refreshToken) {
const maskedToken = refreshToken.substring(0, 20) + '...';
updateStatus(`
<div style="text-align: center;">
<div style="font-size: 48px; color: #28a745; margin-bottom: 15px;">✅</div>
<h3 style="color: #28a745; margin-bottom: 20px;">刷新令牌获取成功!</h3>
<div style="background: #f8f9fa; padding: 15px; border-radius: 5px; margin: 20px 0; border-left: 4px solid #28a745;">
<p style="margin: 0; color: #666;">🎯 您的刷新令牌: ${maskedToken}</p>
</div>
<div style="text-align: left; background: #e8f4fd; padding: 15px; border-radius: 5px; margin: 20px 0;">
<h4 style="color: #0078d4; margin-top: 0;">📋 使用说明:</h4>
<ul style="color: #333; margin: 10px 0; padding-left: 20px;">
<li>✅ 令牌已自动复制到剪贴板</li>
<li>✅ 令牌已保存到浏览器本地存储</li>
<li>💡 可以通过控制台 GM_getValue('bing_refresh_token') 获取</li>
</ul>
</div>
<button onclick="document.getElementById('token-overlay').remove()"
style="
background: #0078d4;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
margin-top: 15px;
">
关闭
</button>
</div>
`);
}
// 显示错误界面
function showErrorUI(errorMessage) {
updateStatus(`
<div style="text-align: center;">
<div style="font-size: 48px; color: #dc3545; margin-bottom: 15px;">❌</div>
<h3 style="color: #dc3545; margin-bottom: 20px;">获取令牌失败</h3>
<div style="background: #f8d7da; padding: 15px; border-radius: 5px; margin: 20px 0; border-left: 4px solid #dc3545;">
<p style="margin: 0; color: #721c24;">错误信息: ${errorMessage}</p>
</div>
<div style="text-align: left; background: #fff3cd; padding: 15px; border-radius: 5px; margin: 20px 0;">
<h4 style="color: #856404; margin-top: 0;">💡 解决建议:</h4>
<ul style="color: #333; margin: 10px 0; padding-left: 20px;">
<li>检查网络连接是否正常</li>
<li>确认已正确完成Microsoft账号授权</li>
<li>尝试重新访问授权链接</li>
</ul>
</div>
<button onclick="document.getElementById('token-overlay').remove()"
style="
background: #dc3545;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
margin-top: 15px;
">
关闭
</button>
</div>
`);
}
// 在授权页面添加说明
function addAuthInstructions() {
if (window.location.href.includes('oauth20_authorize.srf')) {
// 等待页面加载完成
setTimeout(() => {
const body = document.body;
if (body) {
const notice = document.createElement('div');
notice.style.cssText = `
position: fixed;
top: 10px;
right: 10px;
background: #0078d4;
color: white;
padding: 15px;
border-radius: 8px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.3);
z-index: 10000;
font-family: 'Segoe UI', sans-serif;
font-size: 14px;
max-width: 300px;
`;
notice.innerHTML = `
<div style="font-weight: bold; margin-bottom: 8px;">🔧 Bing Rewards 令牌工具</div>
<div>完成授权后,页面会自动跳转并获取刷新令牌</div>
<div style="margin-top: 8px; font-size: 12px; opacity: 0.9;">油猴脚本已激活 ✓</div>
`;
body.appendChild(notice);
// 5秒后自动隐藏
setTimeout(() => {
notice.style.opacity = '0';
notice.style.transition = 'opacity 0.5s';
setTimeout(() => notice.remove(), 500);
}, 5000);
}
}, 1000);
}
}
// 添加控制台帮助函数
window.getBingRefreshToken = function() {
const token = GM_getValue('bing_refresh_token');
if (token) {
console.log('🎯 当前保存的刷新令牌:', token);
GM_setClipboard(token);
console.log('✅ 令牌已复制到剪贴板');
return token;
} else {
console.log('❌ 未找到保存的刷新令牌');
return null;
}
};
// 页面加载时执行
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', () => {
checkForAuthCode();
addAuthInstructions();
});
} else {
checkForAuthCode();
addAuthInstructions();
}
// 监听URL变化用于单页应用
let currentUrl = window.location.href;
const urlObserver = new MutationObserver(() => {
if (window.location.href !== currentUrl) {
currentUrl = window.location.href;
checkForAuthCode();
addAuthInstructions();
}
});
urlObserver.observe(document.body, {
childList: true,
subtree: true
});
console.log('🔧 Bing Rewards 自动获取刷新令牌脚本已加载');
console.log('💡 使用 getBingRefreshToken() 函数可以获取已保存的令牌');
})();

2740
bing_multi_accounts_v2.1.py Normal file

File diff suppressed because it is too large Load Diff

34
好伴AI.py Normal file

File diff suppressed because one or more lines are too long

388
新疆联通1104.py Normal file
View File

@@ -0,0 +1,388 @@
# 解析 ZY100_USER_TOKEN (格式: TOKEN1&REMARK1@TOKEN2@TOKEN3 或 多行输入)
# SCRIPT_NAME = "新疆联通开盒子 V1.2 (by:转变)"
# 变量值抓取”userToken“字段userToken有效期 3天
import requests
import json
import os
import sys
import time
import random
from datetime import datetime
from typing import List, Dict, Any
import base64 # <-- 新增: 用于解码 JWT Token
import re # <-- 新增: 用于检查手机号格式
# 引入青龙环境下的通用通知模块
try:
from notify import send
except ImportError:
def send(title, content):
sys.stdout.write(f"\n【通知模块缺失,仅打印结果】{title}:\n{content}\n")
# --- 辅助函数 ---
def get_phone_from_token(token: str) -> str:
"""尝试从JWT token中提取手机号用于生成默认备注 (脱敏格式: ****)"""
try:
# JWT 结构: header.payload.signature, 提取 payload 部分
parts = token.split('.')
if len(parts) != 3:
return ""
# Base64URL 解码需要手动添加 padding
payload_b64 = parts[1]
padding = '=' * (4 - (len(payload_b64) % 4))
# 使用 urlsafe_b64decode 处理 Base64URL 编码
decoded_payload = base64.urlsafe_b64decode(payload_b64 + padding).decode('utf-8')
# 解析 JSON 查找 'user_phone' 或 'sub'
payload_data = json.loads(decoded_payload)
phone = payload_data.get('user_phone') or payload_data.get('sub')
# 确保提取的是一个有效的手机号格式
if phone and re.match(r'^\d{11}$', str(phone)):
# 返回脱敏后的手机号
return str(phone)[:3] + '****' + str(phone)[7:]
except Exception:
# 如果解析失败不是JWT或格式错误返回空字符串
return ""
return ""
# --- 全局配置 ---
SCRIPT_NAME = "新疆联通开盒子 V1.2 (by:转变)"
# 关键配置:单账号尝试次数
# 用户请求将默认值修改为 '1'
ATTEMPT_COUNT = int(os.environ.get("UNICOM_ATTEMPT_COUNT", "1"))
# API配置
DRAW_URL = "https://zy100.xj169.com/touchpoint/openapi/marchAct/draw_Nov2025Act"
DRAW_PAYLOAD = {
'activityId': "Nov2025Act",
'prizeId': ""
}
# 获取奖品详情的API配置
PRIZE_DETAIL_URL = "https://zy100.xj169.com/touchpoint/openapi/drawAct/getMyPrize"
PRIZE_DETAIL_PAYLOAD = {
'activityId': "Nov2025Act"
}
# --- 账号解析逻辑 (支持备注/灵活格式) ---
ACCOUNT_LIST: List[Dict[str, str]] = []
# 1. 解析 ZY100_USER_TOKEN (格式: TOKEN1&REMARK1@TOKEN2@TOKEN3 或 多行输入)
ALL_TOKENS_RAW = os.environ.get("ZY100_USER_TOKEN", "")
# V15.3 改进: 统一分隔符。将所有换行符替换为 '@',并清理多余的 '@'。
ALL_TOKENS_RAW = ALL_TOKENS_RAW.replace('\n', '@').replace('\r', '').replace('@@', '@')
if ALL_TOKENS_RAW.startswith('@'):
ALL_TOKENS_RAW = ALL_TOKENS_RAW[1:]
for t_r in ALL_TOKENS_RAW.split('@'):
t_r = t_r.strip()
if not t_r:
continue
parts = t_r.split('&', 1) # 只分割一次,以支持备注中包含&
token = parts[0]
# 尝试从 token 提取手机号作为默认备注的基础
phone_suffix = get_phone_from_token(token)
# 如果提取到手机号使用脱敏手机号作为默认备注否则使用通用Account
default_remark = phone_suffix if phone_suffix else f"Account {len(ACCOUNT_LIST) + 1} (Main)"
# 如果提供了 & 分隔的备注则使用否则使用基于手机号或通用Account的默认备注
remark = parts[1] if len(parts) > 1 else default_remark
# 避免重复的 token
if token and token not in [acc['token'] for acc in ACCOUNT_LIST]:
ACCOUNT_LIST.append({"token": token, "remark": remark})
# 2. 解析 indexed 变量 (ZY100_USER_TOKEN_1, ZY100_REMARK_1)
i = 1
while os.environ.get(f"ZY100_USER_TOKEN_{i}"):
# 获取 token并检查是否内联了备注
raw_token_value = os.environ.get(f"ZY100_USER_TOKEN_{i}", "").strip()
# V15.3 改进: 确保 indexed 变量也支持内嵌换行符
raw_token_value = raw_token_value.replace('\n', '@').replace('\r', '').replace('@@', '@')
parts = raw_token_value.split('&', 1)
token = parts[0].split('@')[0] # 只需要第一个 token避免多行 token 影响
# 尝试从 token 提取手机号作为默认备注的基础
phone_suffix = get_phone_from_token(token)
default_remark = phone_suffix if phone_suffix else f"Account {len(ACCOUNT_LIST) + 1} (Indexed {i})"
# 优先使用独立的 ZY100_REMARK_i 变量
explicit_remark = os.environ.get(f"ZY100_REMARK_{i}", "").strip()
if explicit_remark:
remark = explicit_remark
elif len(parts) > 1: # 其次使用内联在 token 中的备注
remark = parts[1]
else: # 最后使用基于手机号的默认备注或通用默认备注
remark = default_remark
# 避免重复的 token
if token and token not in [acc['token'] for acc in ACCOUNT_LIST]:
ACCOUNT_LIST.append({"token": token, "remark": remark})
i += 1
# --- 脚本函数 (以下内容与 V15.1 保持一致) ---
def write_log(text):
"""自定义日志写入函数"""
sys.stdout.write(text + "\n")
def get_prize_details(user_token, remark, activity_id="Nov2025Act"):
"""
获取奖品详情信息
返回: (是否成功, 日志字符串, 奖品详情字典)
"""
log_messages = [f"--- 👤 账号备注: {remark} 的奖品详情 ---"]
prize_details = {}
headers = {
'User-Agent': "Mozilla/5.0 (iPhone; CPU iPhone OS 16_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 unicom{version:iphone_c@12.0701};ltst;OSVersion/16.2",
'Origin': "https://zy100.xj169.com",
'Referer': "https://zy100.xj169.com/touchpoint/openapi/jumpHandRoom1G?source=...",
'X-Requested-With': "XMLHttpRequest",
'Content-Type': 'application/x-www-form-urlencoded;charset=utf-8',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
'userToken': user_token,
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
payload = {
'activityId': activity_id
}
try:
response = requests.post(PRIZE_DETAIL_URL, data=payload, headers=headers, timeout=10)
if response.status_code != 200:
log_messages.append(f"❌ 获取奖品详情失败HTTP 状态码: {response.status_code}")
return False, "\n".join(log_messages), {}
try:
res_json = response.json()
except json.JSONDecodeError:
log_messages.append(f"⚠️ 奖品详情响应不是有效的 JSON 格式。")
return False, "\n".join(log_messages), {}
if res_json.get("code") == 0 or res_json.get("code") == "SUCCESS":
data = res_json.get('data', [])
# 确保 data 是列表,即使只返回一个奖品
if isinstance(data, dict):
data = [data]
if data:
prize_details = data
log_messages.append(f"🎁 成功查询到 {len(data)} 个奖品记录:")
# 格式化奖品详情信息只显示prizeId和drawDate
for i, prize in enumerate(data, 1):
prize_id = prize.get('prizeId', '未知奖品')
draw_date_timestamp = prize.get('drawDate', 0)
# 转换时间戳为可读格式
draw_date = '未知时间'
if draw_date_timestamp:
try:
draw_date = datetime.fromtimestamp(draw_date_timestamp / 1000).strftime('%Y-%m-%d %H:%M:%S')
except:
draw_date = str(draw_date_timestamp)
log_messages.append(f" ├── 奖品 {i}: {prize_id}")
log_messages.append(f" └── 获得时间: {draw_date}")
else:
log_messages.append("📝 当前暂无奖品记录。")
else:
log_messages.append(f"⚠️ 获取奖品详情失败: {res_json.get('msg', '未知错误')}")
# 如果失败,显示原始响应以便调试
if res_json.get('code') not in [0, "SUCCESS"]:
log_messages.append(json.dumps(res_json, indent=2, ensure_ascii=False))
except requests.exceptions.RequestException as e:
log_messages.append(f"❌ 获取奖品详情时网络请求异常: {e}")
return True, "\n".join(log_messages), prize_details
def perform_single_draw(user_token):
"""
单个抽奖请求逻辑,只执行一次
返回: (是否需要终止所有尝试的布尔值, 日志字符串, 状态码)
"""
log_messages = []
status = 'continue'
headers = {
'User-Agent': "Mozilla/5.0 (iPhone; CPU iPhone OS 16_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 unicom{version:iphone_c@12.0701};ltst;OSVersion/15.2",
'Origin': "https://zy100.xj169.com",
'Referer': "https://zy100.xj169.com/touchpoint/openapi/jumpHandRoom1G?source=...",
'X-Requested-With': "XMLHttpRequest",
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json, text/plain, */*',
'userToken': user_token,
'Accept-Language': "zh-CN,zh-Hans;q=0.9"
}
try:
response = requests.post(DRAW_URL, data=DRAW_PAYLOAD, headers=headers, timeout=10)
if response.status_code != 200:
log_messages.append(f"❌ 请求失败HTTP 状态码: {response.status_code}")
else:
try:
res_json = response.json()
except json.JSONDecodeError:
log_messages.append(f"⚠️ 响应不是有效的 JSON 格式。")
return False, "\n".join(log_messages), status
# 检查:请求频率过高 (立即终止)
if res_json.get("code") == 500 and "频率过高" in res_json.get("msg", ""):
log_messages.append("❌ 错误代码 500: 请求频率过高,强制终止!")
status = 'rate_limit'
# 检查缺少参数错误Token 失效)
elif res_json.get("code") == 500 and "缺少参数" in res_json.get("msg", ""):
log_messages.append("⚠️ 错误代码 500: Token 无效或已过期!终止尝试。")
status = 'invalid_token'
# 匹配:今日抽奖机会已用完
elif res_json.get("code") == "ERROR" and res_json.get("msg") == "thanks1":
info = res_json.get('data', '未知信息')
log_messages.append(f"✅ 抽奖机会状态: 今日已抽完。信息: {info}")
status = 'done' # 标记为完成,以便停止后续尝试
# 匹配:成功中奖
elif res_json.get("code") == "SUCCESS":
prize_name = res_json.get('data', '恭喜中奖')
log_messages.append(f"🎉 抽奖中奖!结果: {prize_name}")
status = 'won'
# 其他非预期响应
else:
log_messages.append("⚠️ 收到其他非预期响应:")
log_messages.append(json.dumps(res_json, indent=2, ensure_ascii=False))
except requests.exceptions.RequestException as e:
log_messages.append(f"❌ 网络请求异常发生: {e}")
return False, "\n".join(log_messages), status
def run_sign_in():
"""主执行逻辑:多账号串行处理,单账号中延迟多次尝试"""
all_results = [f"{SCRIPT_NAME}\n执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]
if not ACCOUNT_LIST:
error_msg = "❌ 错误:环境变量 ZY100_USER_TOKEN(s) 未配置或为空。"
all_results.append(error_msg)
write_log(error_msg)
return "\n".join(all_results)
write_log(f"✅ 成功检测到 {len(ACCOUNT_LIST)} 个账号,每个账号将被串行尝试 {ATTEMPT_COUNT} 次...")
# 当尝试次数为 1 时,此行日志无意义
if ATTEMPT_COUNT > 1:
write_log(f"每次尝试之间将随机延迟 0.5 秒到 1.5 秒。")
final_push_messages = []
for idx, acc in enumerate(ACCOUNT_LIST, 1):
token = acc['token']
remark = acc['remark']
account_log_buffer = [] # 存储当前账号的详细日志
account_final_status = "未尝试"
account_header = f"\n\n================ 👤 账号 {idx}/{len(ACCOUNT_LIST)} [备注: {remark}] 开始执行 ================"
write_log(account_header)
account_log_buffer.append(account_header)
for attempt in range(ATTEMPT_COUNT):
log_header = f"--- 尝试次数 {attempt + 1}/{ATTEMPT_COUNT} ---"
write_log(log_header)
account_log_buffer.append(log_header)
_, result_log, status = perform_single_draw(token)
write_log(result_log)
account_log_buffer.append(result_log)
# 更新最终状态,如果成功中奖/已抽完/致命错误,则中断
if status == 'done':
account_final_status = "今日机会已用完"
break
elif status == 'won':
account_final_status = "✅ 中奖成功"
break
elif status == 'invalid_token':
account_final_status = "❌ Token 无效或过期"
break
elif status == 'rate_limit':
account_final_status = "❌ 请求频率过高"
break
else:
account_final_status = "抽奖失败/未中奖"
# 如果不是最后一次尝试(并且尝试次数大于 1引入延迟
if ATTEMPT_COUNT > 1 and attempt < ATTEMPT_COUNT - 1 and status not in ['done', 'won']:
wait_time = random.uniform(0.5, 1.5)
write_log(f"等待 {wait_time:.3f} 秒后进行下一次尝试...")
time.sleep(wait_time)
# --- 任务结束:获取奖品详情并汇总 ---
write_log(f"\n--- 账号 {idx} 抽奖完成,获取奖品详情 ---")
detail_success, detail_log, prize_details = get_prize_details(token, remark)
write_log(detail_log)
account_log_buffer.append(detail_log)
write_log(f"================ 👤 账号 {idx} [备注: {remark}] 执行完毕 (状态: {account_final_status}) ================")
account_log_buffer.append(f"================ 状态: {account_final_status} ================")
# 将当前账号的详细日志加入总日志
all_results.extend(account_log_buffer)
# 构造推送消息摘要
detail_summary_lines = [line for line in detail_log.splitlines() if line.startswith((' ├──', ' └──'))]
detail_summary = "\n".join(detail_summary_lines)
if not detail_summary_lines:
# 检查是否有“暂无奖品记录”的提示
if "暂无奖品记录" in detail_log:
detail_summary = "📝 当前暂无奖品记录。"
else:
detail_summary = "❌ 奖品查询失败,请检查日志。"
final_push_messages.append(f"👤 [备注: {remark}] 状态: {account_final_status}\n{detail_summary}\n" + "-"*30)
# 账号切换间歇休息 1-2 秒
time.sleep(random.uniform(1.0, 2.0))
final_log_content = "\n".join(all_results)
final_notification = f"{SCRIPT_NAME}\n执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n--- 🛎️ 执行摘要 (按备注区分) 🛎️ ---\n" + "\n".join(final_push_messages)
write_log("\n--- 推送日志开始 ---")
write_log(final_notification)
write_log("--- 推送日志结束 ---")
return final_notification
if __name__ == "__main__":
# 1. 执行任务并获取推送内容
push_content = run_sign_in()
# 2. 使用通用的 send 函数进行推送
send(SCRIPT_NAME, push_content)

226
桃色Vip.py Normal file
View File

@@ -0,0 +1,226 @@
"""
# 入口:#小程序://趣网/lRVlzUzgEHjTdYx
# 变量名Ts
# 变量值:手机号&密码 多号换行
# by重庆第一深情
#注:打开小程序,用微信登录,然后授权手机号,然后继续修改资料下面有个修改密码
#这个修改密码入口时有时无,只有自己多试一下,不行就注销再试
"""
import os
import sys
import json
import requests
import time
import datetime
import secrets
import string
import random
from notify import send
print(f"++++++++++桃色程序开始启动++++++++++\n")
def process_account(username, password):
"""处理单个账号的签到和任务"""
print(f"\n======= 处理账号: {username} =======")
login_url = "https://wxapp.lllac.com/xqw/login.php"
checkin_url = "https://wxapp.lllac.com/xqw/user_mall.php"
USER_AGENT = "Mozilla/5.0 (Linux; Android 15; PKG110 Build/UKQ1.231108.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/138.0.7204.180 Mobile Safari/537.36 XWEB/1380215 MMWEBSDK/20250904 MMWEBID/6169 MicroMessenger/8.0.64.2940(0x28004034) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android"
timpstamp = int(time.time() * 1000)
ssid = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(32))
nums = random.sample(range(2000, 20000), 3)
pcid, rxid, xpid = nums
# 登录
payload = {
'act': "login",
'u_name': username,
'u_pass': password,
'session_id': ssid
}
headers = {
'User-Agent': USER_AGENT,
'timpstamp': str(timpstamp),
'charset': "utf-8",
'Referer': "https://servicewechat.com/wxb96c32e3d2d4b224/102/page-frame.html",
'Cookie': f"SSID={ssid}"
}
try:
# 登录请求
dl = requests.post(login_url, data=payload, headers=headers, timeout=10)
login_result = json.loads(dl.text)
msg = login_result.get("msg", "未知状态")
print(f"👤手机号:{username}")
print(f"💎登录状态:{msg}")
# 签到
qdparams = {
'act': 'signToday',
'ssid': ssid
}
qdheaders = {
'User-Agent': USER_AGENT
}
qd = requests.post(checkin_url, data=qdparams, headers=qdheaders, timeout=10)
qd_result = json.loads(qd.text)
qdmsg = qd_result.get("msg", "未知状态")
print(f"📅签到:{qdmsg}")
# 每周一次任务,默认周一运行
is_monday = datetime.datetime.now().weekday() == 0
result = {
'username': username,
'login_status': msg,
'checkin_status': qdmsg,
'weekly_tasks': []
}
if is_monday:
pcurl = f"https://wxapp.lllac.com/xqw/ch_article_info.php?id={pcid}&channel=quwang&qudao=wapdlxcx&version=182&f=gyg_c3_tab0&act=task"
rxurl = f"https://wxapp.lllac.com/xqw/goods_v2.php?act=task&id={rxid}&channel=quwang&qudao=wapdlxcx&spm=x.hot.g&type=29"
xpurl = f"https://wxapp.lllac.com/xqw/goods_v2.php?act=task&id={xpid}&channel=quwang&qudao=wapdlxcx&spm=x.new.g3&type=28"
pcheaders = {
'User-Agent': USER_AGENT,
'Cookie': f"SSID={ssid}"
}
try:
xp = requests.post(xpurl, headers=pcheaders, timeout=10)
xp_result = json.loads(xp.text)
xpmsg = xp_result.get("msg", "未知状态")
print(f"❤️每周新品:{xpmsg}")
result['weekly_tasks'].append(f"每周新品:{xpmsg}")
except Exception as e:
xpmsg = f"请求失败: {str(e)}"
print(f"❤️每周新品:{xpmsg}")
result['weekly_tasks'].append(f"每周新品:{xpmsg}")
try:
rx = requests.post(rxurl, headers=pcheaders, timeout=10)
rx_result = json.loads(rx.text)
rxmsg = rx_result.get("msg", "未知状态")
print(f"🧡每周热销:{rxmsg}")
result['weekly_tasks'].append(f"每周热销:{rxmsg}")
except Exception as e:
rxmsg = f"请求失败: {str(e)}"
print(f"🧡每周热销:{rxmsg}")
result['weekly_tasks'].append(f"每周热销:{rxmsg}")
try:
pc = requests.post(pcurl, headers=pcheaders, timeout=10)
pc_result = json.loads(pc.text)
pcmsg = pc_result.get("msg", "未知状态")
print(f"💛每周评测:{pcmsg}")
result['weekly_tasks'].append(f"每周评测:{pcmsg}")
except Exception as e:
pcmsg = f"请求失败: {str(e)}"
print(f"💛每周评测:{pcmsg}")
result['weekly_tasks'].append(f"每周评测:{pcmsg}")
else:
print("❌当前时间不是周一,不执行每周任务")
result['weekly_tasks'] = ["当前时间不是星期一,不执行每周一次任务"]
return result
except Exception as e:
print(f"❌处理账号 {username} 时发生错误: {str(e)}")
return {
'username': username,
'login_status': f"处理失败: {str(e)}",
'checkin_status': "未执行",
'weekly_tasks': ["账号处理失败"]
}
def main():
"""主函数"""
Ts = os.getenv("Ts")
if not Ts:
print("❌未找到环境变量 Ts")
send("桃色Vip", "❌未找到环境变量 Ts")
return
# 分割账号,支持换行、#、和空格分隔
accounts = []
for separator in ['\n', '#', ' ', ';']:
if separator in Ts:
accounts = [acc.strip() for acc in Ts.split(separator) if acc.strip()]
break
# 如果没有找到分隔符,尝试直接处理
if not accounts:
accounts = [Ts.strip()]
# 过滤空账号
accounts = [acc for acc in accounts if acc]
if not accounts:
print("❌未找到有效的账号信息")
send("桃色Vip", "❌未找到有效的账号信息")
return
print(f"📝找到 {len(accounts)} 个账号")
all_results = []
for i, account in enumerate(accounts, 1):
print(f"\n🎯正在处理第 {i}/{len(accounts)} 个账号...")
# 解析账号格式:手机号&密码
if '&' in account:
username, password = account.split('&', 1)
username = username.strip()
password = password.strip()
else:
print(f"❌账号格式错误: {account}")
all_results.append({
'username': account,
'login_status': "账号格式错误",
'checkin_status': "未执行",
'weekly_tasks': ["账号格式错误,应为: 手机号&密码"]
})
continue
result = process_account(username, password)
all_results.append(result)
# 添加延迟,避免请求过于频繁
if i < len(accounts):
time.sleep(2)
# 生成汇总通知消息
is_monday = datetime.datetime.now().weekday() == 0
summary_msg = f"桃色Vip - 多账号运行结果\n"
summary_msg += f"📅执行时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
summary_msg += f"📊处理账号数: {len(all_results)}\n\n"
for i, result in enumerate(all_results, 1):
summary_msg += f"🔹账号{i}: {result['username']}\n"
summary_msg += f" 登录: {result['login_status']}\n"
summary_msg += f" 签到: {result['checkin_status']}\n"
if is_monday and result['weekly_tasks']:
for task in result['weekly_tasks']:
summary_msg += f" {task}\n"
else:
summary_msg += f" 每周任务: 未执行(非周一)\n"
summary_msg += "\n"
print(f"\n++++++++++所有账号处理完成++++++++++")
print(f"📨正在发送通知...")
# 发送汇总通知
send("桃色Vip", summary_msg)
print(f"✅程序执行完成")
if __name__ == "__main__":
main()

194
蒙娜丽莎会员.py Normal file
View File

@@ -0,0 +1,194 @@
"""
微信扫码https://img.meituan.net/portalweb/40d762a3e9eb4671bf76590d014ba0d0190451.jpg
说明:
签到获取积分,兑换实物。 变量名称mnls
抓包 webChatID
"""
import requests
import os
import sys
import time
import random
import datetime
from urllib.parse import quote
# ================== 配置区 ==================
NOTIFY = True # 是否推送通知
SCT_KEY = "" # Server 酱 SendKey推荐使用最简单填你的 SendKey如 "SCTxxx"
# ============================================
def get_info(webChatID):
url = 'https://mcs.monalisagroup.com.cn/member/doAction'
headers = {
'Host': 'mcs.monalisagroup.com.cn',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541113) XWEB/16771',
'xweb_xhr': '1',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://servicewechat.com/wxce6a8f654e81b7a4/450/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
data = {
'action': 'getCustomer',
'webChatID': webChatID,
'brand': 'MON'
}
try:
response = requests.post(url, headers=headers, data=data, timeout=10).json()
info = response['resultInfo'][0]
phone = info['Telephone']
customerid = info['CustomerID']
nickname = info['WebChatName']
integral = info['Integral']
return True, phone, customerid, nickname, integral
except Exception as e:
print('❌ 获取信息失败:', str(e))
return False, None, None, None, None
def get_sign_in(customerid, nickname):
url = 'https://mcs.monalisagroup.com.cn/member/doAction'
headers = {
'Host': 'mcs.monalisagroup.com.cn',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541113) XWEB/16771',
'xweb_xhr': '1',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://servicewechat.com/wxce6a8f654e81b7a4/450/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
data = {
'action': 'sign',
'CustomerID': str(customerid),
'CustomerName': quote(nickname),
'StoreID': '0',
'OrganizationID': '0',
'Brand': 'MON',
'ItemType': '002'
}
try:
response = requests.post(url, headers=headers, data=data, timeout=10).json()
if response['status'] == 0:
msg = f"✅ 签到成功,获得积分: {response['resultInfo']}"
else:
msg = " 签到失败,可能今日已签到"
return True, msg
except Exception as e:
print(f"❌ 请求异常: {str(e)}")
return False, f"请求异常: {str(e)}"
def download_notify():
"""下载 notify.py青龙专用"""
url = "https://raw.githubusercontent.com/whyour/qinglong/refs/heads/develop/sample/notify.py"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
with open("notify.py", "w", encoding="utf-8") as f:
f.write(response.text)
print("✅ notify.py 下载成功")
return True
except Exception as e:
print(f"❌ 下载 notify.py 失败: {str(e)}")
return False
def send_notification(title, content):
"""统一推送通知(优先使用 Server 酱 SCT"""
if SCT_KEY:
try:
url = f"https://sctapi.ftqq.com/{SCT_KEY}.send"
data = {"title": title, "desp": content}
requests.post(url, data=data, timeout=10)
print("✅ Server 酱推送成功")
return
except Exception as e:
print(f"❌ Server 酱推送失败: {e}")
# 备用:青龙 notify.py
if NOTIFY:
try:
if not os.path.exists("notify.py"):
if not download_notify():
print("❌ 无法使用 notify.py 推送")
return
import notify
notify.send(title, content)
print("✅ 青龙 notify 推送成功")
except Exception as e:
print(f"❌ notify.py 推送失败: {str(e)}")
def main():
# 读取环境变量
ck = os.environ.get("mnls")
if not ck:
print("❌ 请设置环境变量 mnls")
sys.exit()
# 延迟执行5:01-6:59
now = datetime.datetime.now().time()
start = datetime.time(5, 1)
end = datetime.time(6, 59)
if start <= now <= end:
delay = random.randint(100, 500)
print(f"⏰ 在 5:01-6:59 之间,延迟 {delay} 秒执行...")
time.sleep(delay)
accounts = [line.strip() for line in ck.split('\n') if line.strip()]
print(f"{' ' * 10}꧁༺ 蒙拉丽莎会员签到 ༻꧂\n")
log_msgs = [
f"📅 执行时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"📊 账号数量: {len(accounts)}",
"" * 30
]
for i, webChatID in enumerate(accounts, 1):
print(f'\n----------- 🍺 账号【{i}/{len(accounts)}】执行 🍺 -----------')
log_msg = f"【账号 {i}"
try:
success, phone, customerid, nickname, integral = get_info(webChatID)
if success:
print(f"👤 昵称: {nickname}")
print(f"📞 电话: {phone}")
print(f"🆔 客户ID: {customerid}")
print(f"⭐ 当前积分: {integral}")
log_msg += f"\n👤 昵称: {nickname}\n📞 电话: {phone}\n🆔 客户ID: {customerid}\n⭐ 当前积分: {integral}"
# 签到
time.sleep(random.randint(1, 2))
sign_success, sign_msg = get_sign_in(customerid, nickname)
print(sign_msg)
log_msg += f"\n📝 {sign_msg}"
else:
log_msg += "\n❌ 获取信息失败"
except Exception as e:
print(f"❌ 执行异常: {str(e)}")
log_msg += f"\n❌ 执行异常: {str(e)}"
finally:
log_msgs.append(log_msg)
# 推送通知
if NOTIFY:
title = "蒙拉丽莎会员签到完成"
content = "\n".join(log_msgs)
send_notification(title, content)
print(f'\n🎉 执行结束,共 {len(accounts)} 个账号')
if __name__ == '__main__':
main()

359
酒仙网账密.py Normal file
View File

@@ -0,0 +1,359 @@
"""
酒仙网自动任务脚本
扫码链接 https://img.meituan.net/portalweb/ba0be8b7b52975047a38682ec3070172251739.jpg
功能:
1. 自动登录酒仙网账号
2. 自动每日签到领取金币
3. 自动完成所有"浏览""分享"类任务并领取金币
4. 自动参与抽奖活动
适用于青龙面板
环境变量设置:
JX_COOKIE值为 "账号#密码",多个账号用换行分隔
示例:
JX_COOKIE="13800000000#password123"
或多个账号:
JX_COOKIE="13800000000#password123
13900000000#password456"
"""
import os
import requests
import time
import json
import ssl
import random
from requests.adapters import HTTPAdapter
from urllib.parse import urlparse
class LegacyRenegotiationAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.options |= getattr(ssl, "OP_LEGACY_SERVER_CONNECT", 0x4)
kwargs['ssl_context'] = context
return super(LegacyRenegotiationAdapter, self).init_poolmanager(*args, **kwargs)
COMMON_PARAMS = {
'apiVersion': '1.0', 'appKey': '5C6567E5-C48B-40C2-A7C4-65D361151543',
'appVersion': '9.2.13', 'areaId': '500', 'channelCode': '0,1', 'cityName': '北京市',
'consentStatus': '2', 'cpsId': 'appstore', 'deviceIdentify': '5C6567E5-C48B-40C2-A7C4-65D361151543',
'deviceType': 'IPHONE', 'deviceTypeExtra': '0', 'equipmentType': 'iPhone 6s Plus',
'netEnv': 'WIFI', 'pushToken': '9a6b0095130f0c8ab0863351669ebcefe66dbc8cc88170a943cfd40833cc33d4',
'screenReslolution': '414.00x736.00', 'supportWebp': '1', 'sysVersion': '15.8.3',
}
NATIVE_HEADERS = {
'User-Agent': 'jiuxian/9.2.13 (iPhone; iOS 15.8.3; Scale/3.00)',
'Accept-Language': 'zh-Hans-US;q=1',
'Accept': 'text/html; q=1.0, text/*; q=0.8, image/gif; q=0.6, image/jpeg; q=0.6, image/*; q=0.5, */*; q=0.1',
'Connection': 'keep-alive'
}
WEBVIEW_USER_AGENT = 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_8_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) oadzApp suptwebp/2 jiuxianApp/9.2.13 from/iOS areaId/500'
class JXClient:
def __init__(self, username, password):
self.username = username
self.password = password
self.session = requests.Session()
self.session.mount('https://', LegacyRenegotiationAdapter())
self.session.headers.update(NATIVE_HEADERS)
self.token = None
def login(self):
print(f"🔑 正在为账号【{self.username}】执行登录...")
login_url = "https://newappuser.jiuxian.com/user/loginUserNamePassWd.htm"
login_data = {**COMMON_PARAMS, 'userName': self.username, 'passWord': self.password, 'token': ''}
headers = {**self.session.headers, 'Host': 'newappuser.jiuxian.com', 'Content-Type': 'application/x-www-form-urlencoded'}
try:
response = self.session.post(login_url, data=login_data, headers=headers, timeout=15)
response.raise_for_status()
result = response.json()
if result.get("success") == "1":
user_info = result.get("result", {}).get("userInfo", {})
self.token = user_info.get("token")
print(f"✅ 登录成功!你好,【{user_info.get('uname') or self.username}")
return True
else:
print(f"❌ 登录失败: {result.get('errMsg') or '未知错误'}")
return False
except Exception as e:
print(f"❌ 登录请求异常: {e}")
return False
def query_balance(self, prefix=""):
if not self.token: return
url = "https://newappuser.jiuxian.com/user/myWinebibber.htm"
params = {**COMMON_PARAMS, 'token': self.token}
headers = {**self.session.headers, 'Host': 'newappuser.jiuxian.com'}
try:
response = self.session.get(url, params=params, headers=headers, timeout=15)
result = response.json()
if result.get("success") == "1":
gold_money = result.get("result", {}).get("bibberInfo", {}).get("goldMoney", "查询失败")
print(f"💰 {prefix}金币余额: {gold_money}")
except Exception:
print(f"⚠️ 查询余额失败。")
def do_daily_tasks(self):
if not self.token: return
print("\n--- 🌟 开始执行日常任务 ---")
self.query_balance(prefix="任务前")
info_url = "https://newappuser.jiuxian.com/memberChannel/memberInfo.htm"
params = {**COMMON_PARAMS, 'token': self.token}
headers = {**self.session.headers, 'Host': 'newappuser.jiuxian.com'}
try:
response = self.session.get(info_url, params=params, headers=headers, timeout=15)
response.raise_for_status()
result = response.json().get("result", {})
if not result.get("isSignTody"):
print("📌 今日未签到,执行签到...")
self.do_sign_in()
time.sleep(random.randint(2, 4))
else:
print("👍 今日已签到。")
response = self.session.get(info_url, params=params, headers=headers, timeout=15)
result = response.json().get("result", {})
task_info = result.get("taskChannel", {})
task_token = task_info.get("taskToken")
task_list = [task for task in task_info.get("taskList", []) if task.get("state") in [0, 1]]
if not task_list or not task_token:
print("📦 未发现可执行的任务或所有任务均已完成。")
return
print(f"📋 检测到 {len(task_list)} 个待办任务,准备执行...")
for i, task in enumerate(task_list):
task_name = task.get("taskName")
task_state = task.get("state")
print(f"\n▶️ 开始处理任务: 【{task_name}")
if task_state == 0:
if task.get("taskType") == 1:
self.do_browse_task(task, task_token)
elif task.get("taskType") == 2:
self.do_share_task(task, task_token)
elif task_state == 1:
print(" - 任务状态为'已完成,待领取', 直接领取奖励...")
self.claim_task_reward(task.get("id"), task_token)
if i < len(task_list) - 1:
delay = random.randint(3, 5)
print(f"⏳ 随机等待 {delay} 秒...")
time.sleep(delay)
except Exception as e:
print(f"❌ 获取任务列表失败: {e}")
finally:
print("\n--- ✅ 所有任务执行完毕 ---")
self.query_balance(prefix="最终")
def do_sign_in(self):
url = "https://newappuser.jiuxian.com/memberChannel/userSign.htm"
params = {**COMMON_PARAMS, 'token': self.token}
headers = {**self.session.headers, 'Host': 'newappuser.jiuxian.com'}
try:
response = self.session.get(url, params=params, headers=headers, timeout=15)
result = response.json()
if result.get("success") == "1":
gold_num = result.get("result", {}).get("receivedGoldNums", "未知")
print(f"🎉 签到成功!获得 {gold_num} 金币。")
else:
print(f"❌ 签到失败: {result.get('errMsg')}")
except Exception as e:
print(f"❌ 签到请求异常: {e}")
def do_browse_task(self, task, task_token):
print(" - [第1步] 正在访问任务页面...")
try:
url, countdown = task.get("url"), task.get("countDown", 15)
host = urlparse(url).netloc
headers = {**NATIVE_HEADERS, 'Host': host, 'User-Agent': WEBVIEW_USER_AGENT}
cookies = {'token': self.token}
self.session.get(url, headers=headers, cookies=cookies, timeout=15)
print(f" - 页面访问成功,等待 {countdown} 秒...")
for i in range(countdown, 0, -1):
print(f"\r 倒计时: {i}", end="")
time.sleep(1)
print("\r 倒计时结束。")
except Exception as e:
print(f" - ❌ 访问任务页面失败: {e}")
return
if self.mark_task_as_complete(task, task_token):
time.sleep(random.randint(1, 3))
self.claim_task_reward(task.get("id"), task_token)
def do_share_task(self, task, task_token):
print(" - [第1步] 模拟点击分享...")
if self.mark_task_as_complete(task, task_token):
time.sleep(random.randint(1, 3))
self.claim_task_reward(task.get("id"), task_token)
def mark_task_as_complete(self, task, task_token):
print(" - [第2步] 正在标记任务为'已完成'...")
url = "https://shop.jiuxian.com/show/wap/addJinBi.htm"
data = {'taskId': task.get("id"), 'taskToken': task_token}
headers = {'Host': 'shop.jiuxian.com', 'Accept': '*/*', 'X-Requested-With': 'XMLHttpRequest','Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8','Origin': 'https://shop.jiuxian.com', 'Referer': task.get("url"),'User-Agent': WEBVIEW_USER_AGENT}
cookies = {'token': self.token}
try:
response = self.session.post(url, data=data, headers=headers, cookies=cookies, timeout=15)
result = response.json()
if result.get("code") == 1:
print(" 标记成功。")
return True
except Exception: pass
print(f" - ❌ 标记任务失败。")
return False
def claim_task_reward(self, task_id, task_token):
print(" - [第3步] 💰 正在领取任务金币...")
url = "https://newappuser.jiuxian.com/memberChannel/receiveRewards.htm"
params = {**COMMON_PARAMS, 'token': self.token, 'taskId': task_id, 'taskToken': task_token}
headers = {**self.session.headers, 'Host': 'newappuser.jiuxian.com'}
try:
response = self.session.get(url, params=params, headers=headers, timeout=15)
result = response.json()
if result.get("success") == "1":
gold_num = result.get("result", {}).get("goldNum", "未知")
print(f" 🎉 领取成功!获得 {gold_num} 金币。")
else:
print(f" - ❌ 领取奖励失败: {result.get('errMsg')}")
except Exception as e:
print(f" - ❌ 领取奖励请求异常: {e}")
def do_lottery(self, activity_id="8e8b7f5386194798ab1ae7647f4af6ba", max_draws=10):
if not self.token:
return
print("\n--- 🎰 开始执行抽奖任务 ---")
url = "https://h5market2.jiuxian.com/drawObject"
headers = {
'Host': 'h5market2.jiuxian.com',
'Accept': '*/*',
'X-Requested-With': 'XMLHttpRequest',
'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
'Origin': 'https://h5market2.jiuxian.com',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_0_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) from/iOS long/119.3335310872396 areaId/2200 jiuxianApp/9.2.13 suptwebp/2 lati/41.59607340494792 oadzApp',
'Referer': f'https://h5market2.jiuxian.com/draw.htm?flag=ios&id={activity_id}&suptwebp=2&deeplink=1&from=iOS',
'Content-Type': 'application/x-www-form-urlencoded'
}
cookies = {'token': self.token}
draw_count = 0
for i in range(max_draws):
try:
current_time = int(time.time() * 1000)
data = {
'id': activity_id,
'isOrNotAlert': 'false',
'orderSn': '',
'advId': '',
'time': str(current_time)
}
print(f"\n🎲 正在进行第 {i + 1} 次抽奖...")
response = self.session.post(url, data=data, headers=headers, cookies=cookies, timeout=15)
result = response.json()
# 处理没有code字段的情况正常抽奖返回
luck_info = result.get("luck")
# 判断是否有抽奖机会
if luck_info is False:
luckdrawnum = result.get("luckdrawnum", {})
total_chance = sum([
luckdrawnum.get("FreeNums", 0),
luckdrawnum.get("SignNums", 0),
luckdrawnum.get("UseGoldNums", 0),
luckdrawnum.get("BuyNums", 0)
])
if total_chance == 0:
print(f"⚠️ 没有抽奖机会了")
else:
print(f"⚠️ 金币不足,无法抽奖")
print("📌 停止抽奖。")
break
# 正常抽奖结果
if isinstance(luck_info, dict):
prize_name = luck_info.get("luckname", "未知奖品")
user_coins = result.get("userCoins", "")
luck_coins = luck_info.get("luckCoins", 0)
# 判断是否中奖
if "谢谢" in prize_name or luck_coins == 0:
print(f"💨 {prize_name}")
else:
prize_msg = f"🎊 恭喜!抽中了: 【{prize_name}"
if luck_coins:
prize_msg += f" (价值 {luck_coins} 金币)"
if user_coins:
prize_msg += f" | 剩余金币: {user_coins}"
print(prize_msg)
draw_count += 1
time.sleep(random.randint(2, 4))
continue
# 处理带code的错误返回
code = result.get("code")
if code == -1:
msg = result.get("msg", "")
if "次数" in msg or "已参与" in msg or "机会" in msg:
print(f"⚠️ {msg}")
print("📌 抽奖次数已用完,停止抽奖。")
break
else:
print(f"⚠️ 抽奖失败: {msg}")
break
elif code == 0:
msg = result.get("msg", "")
print(f"⚠️ {msg if msg else '抽奖失败'}")
break
else:
# 未知情况,显示详细信息
msg = result.get("msg", "")
print(f"⚠️ 抽奖返回未知状态 (code={code}): {msg if msg else '未知错误'}")
print(f"📄 完整返回: {json.dumps(result, ensure_ascii=False)}")
break
except Exception as e:
print(f"❌ 抽奖请求异常: {e}")
break
if draw_count > 0:
print(f"\n🎉 抽奖完成!共成功抽奖 {draw_count} 次。")
else:
print("\n📭 本次未能成功抽奖。")
print("--- ✅ 抽奖任务执行完毕 ---")
def run(self):
if self.login():
time.sleep(random.randint(1, 3))
self.do_daily_tasks()
time.sleep(random.randint(2, 4))
self.do_lottery()
def main():
print("====== 🚀 酒仙网全自动任务 🚀 ======")
jx_cookie = os.environ.get("JX_COOKIE")
if not jx_cookie:
print("🛑 未找到环境变量 JX_COOKIE")
return
accounts = jx_cookie.strip().split("\n")
print(f"🔧 检测到 {len(accounts)} 个账号,准备执行...")
for i, account in enumerate(accounts):
if not account: continue
print(f"\n--- 🌀 开始执行第 {i + 1} 个账号 🌀 ---")
try:
username, password = account.split("#")
client = JXClient(username.strip(), password.strip())
client.run()
except Exception as e:
print(f"❌ 执行第 {i + 1} 个账号时发生未知错误: {e}")
print("\n====== 🎉 所有账号执行完毕 🎉 ======")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,43 @@
# 酒仙应用配置
class JiuxianConfig:
# 应用基本信息
APP_NAME = "酒仙"
VERSION = "9.2.13"
APP_KEY = "ad96ade2-b918-3e05-86b8-ba8c34747b0c"
DEVICE_ID = "ad96ade2-b918-3e05-86b8-ba8c34747b0c"
# API接口
LOGIN_URL = "https://newappuser.jiuxian.com/user/loginUserNamePassWd.htm"
MEMBER_INFO_URL = "https://newappuser.jiuxian.com/memberChannel/memberInfo.htm"
RECEIVE_REWARD_URL = "https://newappuser.jiuxian.com/memberChannel/receiveRewards.htm"
TASK_COMPLETE_URL = "https://shop.jiuxian.com/show/wap/addJinBi.htm"
SHARE_REPORT_URL = "https://log.umsns.com/share/multi_add/51ff1ac356240b6fb20a2156/-1/"
SIGN_URL = "https://newappuser.jiuxian.com/memberChannel/userSign.htm"
# 请求头
HEADERS = {
"User-Agent": "okhttp/3.14.9",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "newappuser.jiuxian.com",
"Connection": "Keep-Alive",
"Accept-Encoding": "gzip"
}
# 设备信息
DEVICE_INFO = {
"appVersion": VERSION,
"areaId": "500",
"channelCode": "0",
"cpsId": "xiaomi",
"deviceIdentify": DEVICE_ID,
"deviceType": "ANDROID",
"deviceTypeExtra": "0",
"equipmentType": "M2011K2C",
"netEnv": "wifi",
"screenReslolution": "1080x2297",
"supportWebp": "1",
"sysVersion": "14"
}
# Token存储文件路径
TOKEN_FILE = "/ql/data/scripts/jiuxian_tokens.json"

View File

@@ -0,0 +1,947 @@
"""
酒仙app/微信小程序签到脚本V1.1
邀请推广入口咱俩各得1000积分
https://img.meituan.net/portalweb/ba0be8b7b52975047a38682ec3070172251739.jpg
操作步骤:
打开上方链接
截图保存二维码
微信扫码参与活动
点击"立即领取"获得1000积分
请勿在0-1点之间运行
定时规则每天上午9点10分运行
10 9 * * *
脚本特色
· 自动完成每日签到 + 3个浏览任务
· 支持多账号批量运行
· 同时支持账号密码登录和Token登录
· 支持PushPlus微信推送通知
· 平均每日可获得约100金币
配置说明:
方式一:账号密码登录(多用户换行分割)
变量名jiuxian
格式:
手机号#密码
13800138000#123456
13900139000#abcdef
注意如使用账号密码登录请先在App中修改为自定义密码
方式二Token登录抓包微信小程序
变量名JX_TOKENS
获取方式:
抓包域名https://newappuser.jiuxian.com/
在请求参数中查找token值
格式:
token1
token2
token3
推送通知(可选)
变量名PUSHPLUS_TOKEN
在 PushPlus官网 获取Token用于接收运行结果推送
每日任务清单:
· 每日签到 [正常] - 10-70金币连续签到奖励更高
· 浏览任务1 [正常] - 20金币自动完成
· 浏览任务2 [正常] - 20金币自动完成
· 浏览任务3 [正常] - 20金币自动完成
· 分享任务 [待完善] - 100金币需要手动完成
收益估算:
· 基础收益每日约70-120金币
· 连续签到:每周额外奖励
· 月累计约3000金币
积分兑换
兑换内容:
· 多种实物商品
积分规则:
· 有效期:当年积分次年年底失效
· 清空机制:注意及时使用
#####################################################################
本脚本采用三层架构设计请下载以下3个文件并放在同一文件夹中
├── jiuxian_config.py # 配置层 - 管理应用配置、API接口和设备信息
├── jiuxian账密版.py # 业务逻辑层 - 主要的业务逻辑和任务执行流程
└── token_manager.py # 数据持久层 - 负责Token数据的存储和管理
使用步骤:
将三个文件下载到同一文件夹
配置环境变量jiuxian 或 JX_TOKENS
运行主程序task jiuxian账密版.py
####################################################################
-----------------------------------------------------------
免责声明
· 本脚本仅供学习交流使用,不得用于商业用途
· 使用者应对自己的行为负责,脚本作者不承担任何法律责任
· 请合理使用脚本,遵守相关平台规则
· 禁止将脚本用于任何违法违纪行为
· 如遇平台规则变更,请及时停止使用
· 下载或使用即代表同意以上声明
使用建议
· 建议设置合理的执行频率,避免对服务器造成压力
· 妥善保管账号信息,注意账号安全
· 关注平台规则变化,及时调整使用方式
· 如发现异常,请立即停止使用
风险提示
· 使用自动化脚本可能存在账号风险
· 请根据自身情况谨慎使用
· 如不确定是否合规,建议手动操作
------------------------------------------------------------
"""
import os
import json
import time
import random
import requests
from typing import Dict, List, Optional, Tuple
import urllib3
from jiuxian_config import JiuxianConfig
from token_manager import TokenManager
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class Jiuxian:
def __init__(self, username: str = None, password: str = None, token: str = None):
self.username = username
self.password = password
self.token = token
self.uid = None
self.nickname = None
self.task_token = None
self.session = requests.Session()
self.session.verify = False
self.token_manager = TokenManager(JiuxianConfig.TOKEN_FILE)
def get_phone_tail(self, phone: str = None) -> str:
"""获取手机尾号(脱敏处理)"""
if not phone:
phone = self.username or ""
if phone and len(phone) >= 4:
return f"******{phone[-4:]}"
return "****"
def load_saved_token(self) -> bool:
"""加载已保存的Token"""
if not self.username:
return False
token_data = self.token_manager.get_token(self.username)
if token_data and self.token_manager.is_token_valid(self.username):
self.token = token_data.get("token")
self.uid = token_data.get("uid")
self.nickname = token_data.get("nickname")
phone_tail = self.get_phone_tail()
print(f"🔑 加载已保存的Token: {self.nickname} ({phone_tail})")
return True
return False
def save_current_token(self):
"""保存当前Token信息"""
if self.token and self.uid and self.username:
token_data = {
"token": self.token,
"uid": self.uid,
"nickname": self.nickname,
"update_time": int(time.time())
}
self.token_manager.save_token(self.username, token_data)
phone_tail = self.get_phone_tail()
print(f"💾 保存Token信息: {self.nickname} ({phone_tail})")
def login_with_password(self) -> bool:
"""使用账号密码登录"""
try:
if not self.username or not self.password:
print("❌ 缺少账号或密码")
return False
login_data = JiuxianConfig.DEVICE_INFO.copy()
login_data.update({
"appKey": JiuxianConfig.APP_KEY,
"userName": self.username,
"passWord": self.password
})
response = self.session.post(
JiuxianConfig.LOGIN_URL,
data=login_data,
headers=JiuxianConfig.HEADERS,
timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get("success") == "1":
user_info = result["result"]["userInfo"]
self.token = user_info["token"]
self.uid = user_info["uid"]
self.nickname = user_info["nickName"]
# 保存新的Token
self.save_current_token()
phone_tail = self.get_phone_tail()
print(f"✅ 密码登录成功: {self.nickname} ({phone_tail})")
return True
else:
phone_tail = self.get_phone_tail()
print(f"❌ 密码登录失败 ({phone_tail}): {result.get('errMsg', '未知错误')}")
# 登录失败时删除无效Token
if self.username:
self.token_manager.delete_token(self.username)
return False
else:
phone_tail = self.get_phone_tail()
print(f"❌ 登录请求失败 ({phone_tail}): HTTP {response.status_code}")
return False
except Exception as e:
phone_tail = self.get_phone_tail()
print(f"❌ 登录异常 ({phone_tail}): {str(e)}")
return False
def login_with_token(self) -> bool:
"""使用Token登录"""
try:
if not self.token:
print("❌ 未提供Token")
return False
# 直接使用提供的Token验证其有效性
phone_tail = self.get_phone_tail()
print(f"🔑 使用提供的Token登录 ({phone_tail})...")
return self.check_token_valid()
except Exception as e:
phone_tail = self.get_phone_tail()
print(f"❌ Token登录异常 ({phone_tail}): {str(e)}")
return False
def check_token_valid(self) -> bool:
"""检查当前Token是否有效"""
if not self.token:
return False
try:
# 通过获取会员信息来验证Token有效性
member_info = self.get_member_info()
if member_info:
# 如果获取到了会员信息说明Token有效
if not self.nickname and member_info.get('userInfo'):
self.nickname = member_info['userInfo'].get('nickName', '未知用户')
elif not self.nickname:
self.nickname = "Token用户"
phone_tail = self.get_phone_tail()
print(f"✅ Token验证成功: {self.nickname} ({phone_tail})")
return True
return False
except Exception:
return False
def smart_login(self) -> bool:
"""智能登录优先使用Token失败时使用密码登录"""
# 如果有直接提供的Token优先使用
if self.token:
phone_tail = self.get_phone_tail()
print(f"🔄 尝试使用提供的Token登录 ({phone_tail})...")
if self.login_with_token():
return True
else:
print("❌ 提供的Token无效尝试其他登录方式...")
# 1. 尝试加载已保存的Token需要用户名
if self.username and self.load_saved_token():
# 2. 验证Token是否仍然有效
if self.check_token_valid():
phone_tail = self.get_phone_tail()
print(f"✅ Token登录成功: {self.nickname} ({phone_tail})")
return True
else:
phone_tail = self.get_phone_tail()
print(f"🔄 保存的Token已过期 ({phone_tail}),尝试密码登录...")
# Token无效清除并重新登录
self.token_manager.delete_token(self.username)
# 3. 使用密码登录(需要用户名和密码)
if self.username and self.password:
password_login_success = self.login_with_password()
if password_login_success:
# 密码登录成功后立即获取会员信息来设置taskToken
self.get_member_info()
return True
phone_tail = self.get_phone_tail()
print(f"❌ 所有登录方式都失败了 ({phone_tail})")
return False
def get_member_info(self) -> Optional[Dict]:
"""获取会员信息包含任务列表和taskToken"""
if not self.token:
phone_tail = self.get_phone_tail()
print(f"❌ 请先登录 ({phone_tail})")
return None
try:
params = JiuxianConfig.DEVICE_INFO.copy()
params["token"] = self.token
params["appKey"] = JiuxianConfig.APP_KEY
response = self.session.get(
JiuxianConfig.MEMBER_INFO_URL,
params=params,
headers=JiuxianConfig.HEADERS,
timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get("success") == "1":
member_data = result["result"]
# 保存taskToken到实例变量中
task_channel = member_data.get("taskChannel", {})
self.task_token = task_channel.get("taskToken", "")
if self.task_token:
phone_tail = self.get_phone_tail()
print(f"🔑 获取到taskToken ({phone_tail}): {self.task_token}")
else:
phone_tail = self.get_phone_tail()
print(f"⚠️ 未获取到taskToken ({phone_tail})")
return member_data
else:
# Token可能已过期
if result.get("errCode") in ["TOKEN_EXPIRED", "INVALID_TOKEN"]:
phone_tail = self.get_phone_tail()
print(f"❌ Token已过期 ({phone_tail})")
if self.username:
self.token_manager.delete_token(self.username)
return None
else:
phone_tail = self.get_phone_tail()
print(f"❌ 获取会员信息请求失败 ({phone_tail}): HTTP {response.status_code}")
return None
except Exception as e:
phone_tail = self.get_phone_tail()
print(f"❌ 获取会员信息异常 ({phone_tail}): {str(e)}")
return None
def check_in(self) -> Tuple[bool, str]:
"""每日签到"""
try:
if not self.token:
return False, "未登录"
params = JiuxianConfig.DEVICE_INFO.copy()
params["token"] = self.token
params["appKey"] = JiuxianConfig.APP_KEY
response = self.session.get(
JiuxianConfig.SIGN_URL,
params=params,
headers=JiuxianConfig.HEADERS,
timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get("success") == "1":
sign_data = result["result"]
sign_days = sign_data.get("signDays", 0)
received_golds = sign_data.get("receivedGoldNums", 0)
will_get_golds = sign_data.get("willGetGolds", 0)
message = f"签到成功!连续签到{sign_days}天,获得{received_golds}金币"
if will_get_golds > 0:
message += f",明日可获得{will_get_golds}金币"
phone_tail = self.get_phone_tail()
print(f"{message} ({phone_tail})")
return True, message
else:
error_msg = result.get('errMsg', '未知错误')
phone_tail = self.get_phone_tail()
print(f"❌ 签到失败 ({phone_tail}): {error_msg}")
return False, error_msg
else:
error_msg = f"签到请求失败: HTTP {response.status_code}"
phone_tail = self.get_phone_tail()
print(f"{error_msg} ({phone_tail})")
return False, error_msg
except Exception as e:
error_msg = f"签到异常: {str(e)}"
phone_tail = self.get_phone_tail()
print(f"{error_msg} ({phone_tail})")
return False, error_msg
def complete_browse_task(self, task: Dict) -> bool:
"""完成浏览任务"""
try:
if not self.task_token:
phone_tail = self.get_phone_tail()
print(f"❌ 未获取到taskToken ({phone_tail}),无法完成任务")
return False
task_id = task["id"]
task_name = task["taskName"]
task_url = task["url"]
count_down = task.get("countDown", 15)
phone_tail = self.get_phone_tail()
print(f"🔄 开始浏览任务 ({phone_tail}): {task_name}, 需要浏览 {count_down}")
# 设置浏览页面的请求头
browse_headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 14; M2011K2C Build/UKQ1.230804.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/139.0.7258.158 Mobile Safari/537.36 jiuxianApp/9.2.13 from/ANDROID suptwebp/1 netEnv/wifi oadzApp lati/null long/null shopId/ areaId/500",
"Cookie": f"token={self.token}",
"Referer": "https://shop.jiuxian.com/",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
print("📱 访问任务页面开始计时...")
# 1. 访问任务页面开始计时
browse_response = self.session.get(task_url, headers=browse_headers, timeout=30)
if browse_response.status_code != 200:
phone_tail = self.get_phone_tail()
print(f"❌ 任务页面访问失败 ({phone_tail}): HTTP {browse_response.status_code}")
return False
print("✅ 任务页面访问成功,开始计时...")
# 2. 等待浏览时间
wait_time = count_down + 5
print(f"⏰ 等待浏览计时 {wait_time} 秒...")
time.sleep(wait_time)
print("✅ 浏览完成,提交任务完成状态...")
# 3. 提交任务完成状态
complete_success = self.submit_task_completion(task_id, task_url)
if not complete_success:
return False
print("✅ 任务完成状态提交成功")
# 4. 领取金币奖励
print("💰 领取任务奖励...")
return self.receive_reward(task_id, task_name)
except Exception as e:
phone_tail = self.get_phone_tail()
print(f"❌ 浏览任务异常 ({phone_tail}): {str(e)}")
import traceback
print(f"详细错误: {traceback.format_exc()}")
return False
def complete_share_task(self, task: Dict) -> bool:
"""完成分享任务"""
try:
if not self.task_token:
phone_tail = self.get_phone_tail()
print(f"❌ 未获取到taskToken ({phone_tail}),无法完成任务")
return False
task_id = task["id"]
task_name = task["taskName"]
task_url = task["url"]
phone_tail = self.get_phone_tail()
print(f"🔄 开始分享任务 ({phone_tail}): {task_name}")
# 设置请求头
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 14; M2011K2C Build/UKQ1.230804.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/139.0.7258.158 Mobile Safari/537.36 jiuxianApp/9.2.13 from/ANDROID suptwebp/1 netEnv/wifi oadzApp lati/null long/null shopId/ areaId/500",
"Cookie": f"token={self.token}",
"Referer": "https://shop.jiuxian.com/"
}
print("📱 访问分享页面...")
# 1. 访问分享页面
response = self.session.get(task_url, headers=headers, timeout=30)
if response.status_code != 200:
phone_tail = self.get_phone_tail()
print(f"❌ 分享页面访问失败 ({phone_tail}): HTTP {response.status_code}")
return False
print("✅ 分享页面访问成功")
# 2. 调用分享上报接口
print("📤 上报分享行为...")
share_success = self.report_share(task_url)
if not share_success:
print("❌ 分享上报失败")
return False
print("✅ 分享上报成功")
# 3. 提交任务完成状态
print("✅ 提交任务完成状态...")
complete_success = self.submit_task_completion(task_id, task_url)
if not complete_success:
return False
# 4. 领取金币奖励
print("💰 领取任务奖励...")
return self.receive_reward(task_id, task_name)
except Exception as e:
phone_tail = self.get_phone_tail()
print(f"❌ 分享任务异常 ({phone_tail}): {str(e)}")
return False
def report_share(self, task_url: str) -> bool:
"""上报分享行为(修复编码问题)"""
try:
boundary = "d38dd6cb-be16-4e1c-91ec-44369961499f"
headers = {
"User-Agent": "Dalvik/2.1.0 (Linux; U; Android 14; M2011K2C Build/UKQ1.230804.001)",
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Host": "log.umsns.com"
}
# 使用字典构建表单数据
form_fields = {
"de": "M2011K2C",
"u_sharetype": "native",
"opid": "9",
"sdkv": "7.1.6",
"title": "酒仙网",
"mac": "no mac",
"dt": str(int(time.time() * 1000)),
"uid": "a90fd0967241099b5242c9a2ea2b97efod",
"sn": "",
"pcv": "3.0",
"os": "Android",
"ek": "-1",
"os_version": "14",
"en": "Wi-Fi",
"ak": "51ff1ac356240b6fb20a2156",
"url": task_url,
"ct": "酒等你来,发现一个超级好的活动,赶快买买买!",
"ftype": "0",
"imei": "a7204ced77696f16",
"sns": '{"qq":""}',
"furl": "http://m.jiuxian.com/mobile/android/update/picture/icon_launcher_new.png",
"to": '{"qq":""}',
"android_id": "2185ce8ea28df6ab",
"tp": "1",
"dc": "com.umeng.share"
}
# 自动生成multipart格式使用UTF-8编码
form_data = ""
for name, value in form_fields.items():
form_data += f"""--{boundary}
Content-Disposition: form-data; name="{name}"
Content-Type: text/plain; charset=UTF-8
{value}
"""
form_data += f"--{boundary}--"
# 显式使用UTF-8编码
response = self.session.post(
JiuxianConfig.SHARE_REPORT_URL,
data=form_data.encode('utf-8'),
headers=headers,
timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get("st") == 200:
return True
print(f"❌ 分享上报失败: {response.text}")
return False
except Exception as e:
print(f"❌ 分享上报异常: {str(e)}")
return False
def submit_task_completion(self, task_id: int, task_url: str) -> bool:
"""提交任务完成状态(浏览和分享任务共用)"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 14; M2011K2C Build/UKQ1.230804.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/139.0.7258.158 Mobile Safari/537.36 jiuxianApp/9.2.13 from/ANDROID suptwebp/1 netEnv/wifi oadzApp lati/null long/null shopId/ areaId/500",
"Cookie": f"token={self.token}",
"Referer": task_url,
"X-Requested-With": "XMLHttpRequest",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
}
data = {
"taskId": str(task_id),
"taskToken": self.task_token
}
response = self.session.post(
JiuxianConfig.TASK_COMPLETE_URL,
data=data,
headers=headers,
timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 1:
return True
else:
phone_tail = self.get_phone_tail()
print(f"❌ 任务完成提交失败 ({phone_tail}): {result.get('msg', '未知错误')}")
else:
phone_tail = self.get_phone_tail()
print(f"❌ 任务完成提交请求失败 ({phone_tail}): HTTP {response.status_code}")
return False
except Exception as e:
phone_tail = self.get_phone_tail()
print(f"❌ 任务完成提交异常 ({phone_tail}): {str(e)}")
return False
def receive_reward(self, task_id: int, task_name: str) -> bool:
"""领取任务奖励"""
try:
params = JiuxianConfig.DEVICE_INFO.copy()
params["token"] = self.token
params["appKey"] = JiuxianConfig.APP_KEY
params["taskId"] = str(task_id)
response = self.session.get(
JiuxianConfig.RECEIVE_REWARD_URL,
params=params,
headers=JiuxianConfig.HEADERS,
timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get("success") == "1":
reward_data = result["result"]
gold_num = reward_data.get("goldNum", 0)
phone_tail = self.get_phone_tail()
print(f"🎉 任务 '{task_name}' 完成 ({phone_tail}),获得 {gold_num} 金币")
return True
else:
phone_tail = self.get_phone_tail()
print(f"❌ 领取奖励失败 ({phone_tail}): {result.get('errMsg', '未知错误')}")
return False
else:
phone_tail = self.get_phone_tail()
print(f"❌ 领取奖励请求失败 ({phone_tail}): HTTP {response.status_code}")
return False
except Exception as e:
phone_tail = self.get_phone_tail()
print(f"❌ 领取奖励异常 ({phone_tail}): {str(e)}")
return False
def run_all_tasks(self) -> Dict:
"""执行所有任务"""
result = {
"username": self.username,
"phone_tail": self.get_phone_tail(),
"nickname": self.nickname,
"login_success": False,
"login_type": "unknown",
"check_in": {"success": False, "message": ""},
"tasks": [],
"member_info": {},
"today_gold": 0, # 今日获得金币
"total_gold": 0 # 总金币数
}
# 智能登录
login_success = self.smart_login()
if login_success:
result["login_success"] = True
result["nickname"] = self.nickname
if self.token and not self.username:
result["login_type"] = "direct_token"
else:
result["login_type"] = "token" if hasattr(self, 'token') and self.token else "password"
else:
result["login_success"] = False
return result
# 获取会员信息(只获取一次!)
member_info = self.get_member_info()
if not member_info:
return result
result["member_info"] = {
"gold_money": member_info.get("goldMoney", 0),
"is_sign_today": member_info.get("isSignTody", False),
"sign_days": member_info.get("signDays", 0),
"user_rank": member_info.get("userRank", "")
}
result["total_gold"] = member_info.get("goldMoney", 0)
# 确保taskToken已正确设置
if not self.task_token:
phone_tail = self.get_phone_tail()
print(f"❌ 未获取到taskToken ({phone_tail}),无法执行任务")
return result
print(f"🔑 使用taskToken: {self.task_token}")
# 处理签到(只有在未签到时才执行)
if not member_info.get("isSignTody"):
print("📅 执行签到...")
check_in_success, check_in_msg = self.check_in()
result["check_in"] = {"success": check_in_success, "message": check_in_msg}
# 如果签到成功,从消息中提取金币数
if check_in_success and "获得" in check_in_msg:
try:
gold_str = check_in_msg.split("获得")[1].split("金币")[0]
result["today_gold"] += int(gold_str)
except:
pass
time.sleep(random.randint(2, 4))
else:
result["check_in"] = {"success": True, "message": "今日已签到"}
print("📅 今日已签到,跳过签到")
# 处理任务
task_channel = member_info.get("taskChannel", {})
task_list = task_channel.get("taskList", [])
for task in task_list:
task_result = {
"id": task["id"],
"name": task["taskName"],
"type": task["taskType"],
"state": task["state"],
"gold_num": task.get("goldNum", 0),
"completed": False
}
# state: 0-未完成, 1-已完成未领取, 2-已完成已领取
if task["state"] == 0: # 未完成的任务
if task["taskType"] == 1: # 浏览任务
task_result["completed"] = self.complete_browse_task(task)
elif task["taskType"] == 2: # 分享任务
task_result["completed"] = self.complete_share_task(task)
# 如果任务完成,累加金币
if task_result["completed"]:
result["today_gold"] += task_result["gold_num"]
result["tasks"].append(task_result)
# 任务间短暂间隔
time.sleep(random.randint(2, 4))
return result
def send_pushplus_notification(token: str, title: str, content: str) -> bool:
"""发送PushPlus推送通知"""
try:
if not token:
print("❌ PushPlus Token未设置跳过推送")
return False
url = "https://www.pushplus.plus/send"
data = {
"token": token,
"title": title,
"content": content,
"template": "markdown"
}
response = requests.post(url, json=data, timeout=30)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
print("✅ PushPlus推送发送成功")
return True
else:
print(f"❌ PushPlus推送失败: {result.get('msg', '未知错误')}")
return False
else:
print(f"❌ PushPlus推送请求失败: HTTP {response.status_code}")
return False
except Exception as e:
print(f"❌ PushPlus推送异常: {str(e)}")
return False
def generate_markdown_report(all_results: List[Dict]) -> str:
"""生成Markdown格式的报告"""
# 统计信息
total_users = len(all_results)
success_login_count = sum(1 for r in all_results if r["login_success"])
success_checkin_count = sum(1 for r in all_results if r.get("check_in", {}).get("success", False))
total_today_gold = sum(r.get("today_gold", 0) for r in all_results)
total_gold = sum(r.get("total_gold", 0) for r in all_results)
# 构建Markdown内容
content = f"""# 🍷 酒仙网任务执行报告
## 📊 统计概览
| 项目 | 数值 |
|------|------|
| 👥 用户总数 | {total_users} |
| ✅ 登录成功 | {success_login_count} |
| 📅 签到成功 | {success_checkin_count} |
| 🎯 今日获得金币 | {total_today_gold} |
| 💰 总金币数 | {total_gold} |
## 👤 用户详情
| 手机尾号 | 签到状态 | 任务状态 | 今日金币 | 总金币 |
|----------|----------|----------|----------|--------|
"""
# 添加每个用户的详情
for result in all_results:
phone_tail = result.get("phone_tail", "****")
nickname = result.get("nickname", "未知用户")
# 签到状态
check_in = result.get("check_in", {})
if check_in.get("success"):
sign_status = "✅ 成功"
else:
sign_status = "❌ 失败"
# 任务状态
tasks = result.get("tasks", [])
completed_tasks = sum(1 for t in tasks if t.get("completed", False))
total_tasks = len(tasks)
task_status = f"{completed_tasks}/{total_tasks}"
# 金币信息
today_gold = result.get("today_gold", 0)
total_gold_user = result.get("total_gold", 0)
content += f"| {phone_tail} ({nickname}) | {sign_status} | {task_status} | {today_gold} | {total_gold_user} |\n"
# 添加执行时间
exec_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
content += f"\n---\n**🕐 执行时间**: {exec_time}\n"
return content
def main():
"""主函数"""
# 获取环境变量
accounts_str = os.getenv("jiuxian", "")
tokens_str = os.getenv("JX_TOKENS", "")
pushplus_token = os.getenv("PUSHPLUS_TOKEN", "")
if not accounts_str and not tokens_str:
print("❌ 未找到账号配置,请检查环境变量 jiuxian 或 JX_TOKENS")
return
all_accounts = []
# 解析账号密码
if accounts_str:
for line in accounts_str.strip().split('\n'):
if '#' in line:
username, password = line.split('#', 1)
all_accounts.append(("account", username.strip(), password.strip()))
# 解析Token
if tokens_str:
for line in tokens_str.strip().split('\n'):
token = line.strip()
if token:
all_accounts.append(("token", None, token))
if not all_accounts:
print("❌ 未找到有效的账号配置")
return
print(f"🔍 找到 {len(all_accounts)} 个账号配置,开始执行任务...")
all_results = []
# 遍历所有账号执行任务
for i, (account_type, username, credential) in enumerate(all_accounts, 1):
print(f"\n{'='*50}")
phone_tail = "****" if not username else f"******{username[-4:]}" if len(username) >= 4 else "****"
print(f"🔄 开始处理账号 {i}: {phone_tail}")
if account_type == "account":
jiuxian = Jiuxian(username=username, password=credential)
else:
jiuxian = Jiuxian(token=credential)
result = jiuxian.run_all_tasks()
all_results.append(result)
print(f"✅ 账号 {i} 处理完成")
time.sleep(random.randint(3, 5)) # 账号间间隔
# 生成简单报告
print("\n" + "="*50)
print("📋 任务执行完成报告:")
success_count = sum(1 for r in all_results if r["login_success"])
print(f"✅ 成功执行: {success_count}/{len(all_accounts)} 个账号")
for i, result in enumerate(all_results, 1):
if result["login_success"]:
completed_tasks = sum(1 for t in result["tasks"] if t["completed"])
total_tasks = len(result["tasks"])
login_type = result.get('login_type', 'unknown')
phone_tail = result.get('phone_tail', '****')
print(f"账号 {i}: {result['nickname']} ({phone_tail}) - 完成任务: {completed_tasks}/{total_tasks}")
# 发送PushPlus推送
if pushplus_token:
print("\n📤 正在发送PushPlus推送通知...")
markdown_content = generate_markdown_report(all_results)
title = f"🍷 酒仙网任务报告 - {success_count}/{len(all_accounts)}成功"
send_pushplus_notification(pushplus_token, title, markdown_content)
else:
print("\n⚠️ 未设置PUSHPLUS_TOKEN环境变量跳过推送")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,51 @@
import json
import os
from typing import Dict, Optional
class TokenManager:
def __init__(self, token_file: str):
self.token_file = token_file
self.tokens = self._load_tokens()
def _load_tokens(self) -> Dict:
"""从文件加载Token数据"""
try:
if os.path.exists(self.token_file):
with open(self.token_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"❌ 加载Token文件失败: {e}")
return {}
def _save_tokens(self):
"""保存Token数据到文件"""
try:
os.makedirs(os.path.dirname(self.token_file), exist_ok=True)
with open(self.token_file, 'w', encoding='utf-8') as f:
json.dump(self.tokens, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"❌ 保存Token文件失败: {e}")
def get_token(self, username: str) -> Optional[Dict]:
"""获取指定账号的Token信息"""
return self.tokens.get(username)
def save_token(self, username: str, token_data: Dict):
"""保存账号的Token信息"""
self.tokens[username] = {
"token": token_data.get("token"),
"uid": token_data.get("uid"),
"nickname": token_data.get("nickname"),
"update_time": token_data.get("update_time")
}
self._save_tokens()
def delete_token(self, username: str):
"""删除账号的Token信息"""
if username in self.tokens:
del self.tokens[username]
self._save_tokens()
def is_token_valid(self, username: str) -> bool:
"""检查Token是否有效"""
return username in self.tokens and self.tokens[username].get("token")

146
酒仙账密版/说明.txt Normal file
View File

@@ -0,0 +1,146 @@
邀请推广入口咱俩各得1000积分
https://img.meituan.net/portalweb/ba0be8b7b52975047a38682ec3070172251739.jpg
操作步骤:
打开上方链接
截图保存二维码
微信扫码参与活动
点击“立即领取“获得1000积分
请勿在0-1点之间运行
定时规则每天上午9点10分运行
10 9 * * *
脚本特色
· 自动完成每日签到 + 3个浏览任务
· 支持多账号批量运行
· 同时支持账号密码登录和Token登录
· 支持PushPlus微信推送通知
· 平均每日可获得约100金币
配置说明:
方式一:账号密码登录(多用户换行分割)
变量名jiuxian
格式:
手机号#密码
13800138000#123456
13900139000#abcdef
注意如使用账号密码登录请先在App或微信小程序中修改为自定义密码
方式二Token登录抓包微信小程序之前抓包的可以直接用变量名也不用改
变量名JX_TOKENS
获取方式:
抓包域名https://newappuser.jiuxian.com/
在请求参数中查找token值
格式:
token1
token2
token3
推送通知(可选)
变量名PUSHPLUS_TOKEN
在 PushPlus官网 获取Token用于接收运行结果推送
通知效果展示测试2个抓包3个账密登录的都正常。任务显示0是因为今天测试太多次都做完了
[img]https://img.meituan.net/portalweb/90e6ae964d32a53ba5e50cf43d65ba3b241586.png[/img]
[img]https://img.meituan.net/portalweb/631393743b31b47397308a6d4a7dbe6339174.png[/img]
每日任务清单:
· 每日签到 [正常] - 10-70金币连续签到奖励更高
· 浏览任务1 [正常] - 20金币自动完成
· 浏览任务2 [正常] - 20金币自动完成
· 浏览任务3 [正常] - 20金币自动完成
· 分享任务 [待完善] - 100金币需要手动完成运行分享任务时会报错无视即可
收益估算:
· 基础收益每日约70-120金币
· 连续签到:每周额外奖励
· 月累计约3000金币
积分兑换
兑换内容:
· 多种实物商品
[img]https://img.meituan.net/portalweb/6f739481b30ec3979b37bc172210d3ad883968.jpg[/img]
积分规则:
· 有效期:当年积分次年年底失效
· 清空机制:注意及时使用
#####################################################################
本脚本采用三层架构设计请下载以下3个文件并放在同一文件夹中
├── jiuxian_config.py # 配置层 - 管理应用配置、API接口和设备信息
├── jiuxian账密版.py # 业务逻辑层 - 主要的业务逻辑和任务执行流程
└── token_manager.py # 数据持久层 - 负责Token数据的存储和管理
使用步骤:
将三个文件下载到同一文件夹
配置环境变量jiuxian 或 JX_TOKENS
运行主程序task jiuxian账密版.py
####################################################################
-----------------------------------------------------------
免责声明
· 本脚本仅供学习交流使用,不得用于商业用途
· 使用者应对自己的行为负责,脚本作者不承担任何法律责任
· 请合理使用脚本,遵守相关平台规则
· 禁止将脚本用于任何违法违纪行为
· 如遇平台规则变更,请及时停止使用
· 下载或使用即代表同意以上声明
使用建议
· 建议设置合理的执行频率,避免对服务器造成压力
· 妥善保管账号信息,注意账号安全
· 关注平台规则变化,及时调整使用方式
· 如发现异常,请立即停止使用
风险提示
· 使用自动化脚本可能存在账号风险
· 请根据自身情况谨慎使用
· 如不确定是否合规,建议手动操作
------------------------------------------------------------