Update ikuuu2.py

This commit is contained in:
blusunny
2025-09-11 21:40:55 +08:00
committed by GitHub
parent 455e1cf9b9
commit f0b0dd7ea7

204
ikuuu2.py
View File

@@ -1,124 +1,130 @@
#!/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# @Author : github@wd210010 https://github.com/wd210010/just_for_happy
# @Time : 2025/7/22 13:23
# -------------------------------
# cron "30 5 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('IKuuu机场签到帐号版')
# 进入青龙容器
# docker exec -it qinglong bash
# apk update
# apk add chromium chromium-chromedriver
import os import os
import re
import random
import requests import requests
import json import json
from bs4 import BeautifulSoup
import undetected_chromedriver as uc # 使用 uc 避免手动管理 chromedriver
def extract_and_select_url(): # 配置信息 - 从环境变量获取,避免硬编码敏感信息
"""提取 ikuuu 的可用域名""" WXPUSHER_TOKEN = os.getenv("WXPUSHER_TOKEN", "")
options = uc.ChromeOptions() WXPUSHER_UID = os.getenv("WXPUSHER_UID", "")
options.add_argument("--headless") # 无界面模式 IKUUU_ACCOUNTS = os.getenv("IKUUU_ACCOUNTS", "").split("#") # 格式: 账号1&密码1#账号2&密码2
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--disable-software-rasterizer")
# 关键:指定 chromium 的路径Alpine 容器安装后在这里) # 全局变量用于收集日志
options.binary_location = "/usr/bin/chromium-browser" log_messages = []
# 如果你安装了 chromium-chromedriver可以指定 driver_executable_path def log(message):
driver = uc.Chrome(options=options, driver_executable_path="/usr/bin/chromedriver") """记录日志并打印"""
print(message)
log_messages.append(message)
driver.get("http://ikuuu.club") def send_wxpusher(msg):
soup = BeautifulSoup(driver.page_source, 'html.parser') """发送消息到WXPusher"""
driver.quit() if not WXPUSHER_TOKEN or not WXPUSHER_UID:
log("WXPusher配置不完整跳过推送")
return
text_content = soup.get_text() url = "https://wxpusher.zjiecode.com/api/send/message"
urls = re.findall(r'ikuuu\.[a-zA-Z0-9]+\b', text_content) headers = {"Content-Type": "application/json"}
data = {
"appToken": WXPUSHER_TOKEN,
"content": msg,
"contentType": 1, # 文本类型
"uids": [WXPUSHER_UID]
}
if urls: try:
unique_urls = list(set(urls)) resp = requests.post(url, json=data, headers=headers, timeout=10)
selected_url = random.choice(unique_urls) resp.raise_for_status() # 触发HTTP错误状态码的异常
return selected_url result = resp.json()
if result.get("code") == 1000:
log("WXPusher推送成功")
else: else:
return "未找到网址" log(f"WXPusher推送失败: {result.get('msg', '未知错误')}")
except requests.exceptions.RequestException as e:
log(f"WXPusher请求异常: {str(e)}")
except json.JSONDecodeError:
log("WXPusher返回数据格式错误")
# 从环境变量读取账号信息 def login_and_checkin(account_info):
ACCOUNT_STR = os.getenv('ikuuu', '') # 格式账号1&密码1#账号2&密码2
# 解析账号信息
ACCOUNTS = []
if ACCOUNT_STR:
account_pairs = ACCOUNT_STR.split('#')
for pair in account_pairs:
if '&' in pair:
email, password = pair.split('&', 1) # 只分割第一个&符号
ACCOUNTS.append({"email": email, "password": password})
if not ACCOUNTS:
print("未检测到账号信息,请设置环境变量 ikuuu")
print('格式示例export ikuuu="账号1&密码1#账号2&密码2"')
exit()
DOMAIN = extract_and_select_url()
# 更新 URLs
LOGIN_URL = f"https://{DOMAIN}/auth/login"
CHECKIN_URL = f"https://{DOMAIN}/user/checkin"
COMMON_HEADERS = {
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Mobile Safari/537.36",
"X-Requested-With": "XMLHttpRequest"
}
def login_and_checkin(account):
"""处理单个账号的登录和签到""" """处理单个账号的登录和签到"""
session = requests.Session() try:
email, password = account_info.split("&", 1)
except ValueError:
return "账号格式错误,应为'邮箱&密码'\n"
# 登录请求 # 这里是示例域名,实际使用时可能需要动态检测可用域名
base_url = "https://ikuuu.de"
login_url = f"{base_url}/auth/login"
checkin_url = f"{base_url}/user/checkin"
# 登录
try:
session = requests.Session()
login_data = { login_data = {
"host": DOMAIN.replace('https://', ''), "email": email,
"email": account["email"], "passwd": password,
"passwd": account["password"],
"code": "" "code": ""
} }
print(f"正在处理账号: {account['email']},使用域名: {DOMAIN}") login_resp = session.post(
try: login_url,
login_response = session.post(LOGIN_URL, data=login_data, headers=COMMON_HEADERS) data=login_data,
print(json.loads(login_response.text)['msg']) headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124"},
if login_response.status_code != 200: timeout=10
print(f"登录失败! 状态码: {login_response.status_code}") )
return False
# 签到请求 login_result = login_resp.json()
checkin_response = session.post(CHECKIN_URL, headers=COMMON_HEADERS) if login_result.get("ret") != 1:
return f"账号 {email} 登录失败: {login_result.get('msg', '未知错误')}\n"
if checkin_response.status_code == 200: # 签到
response_data = json.loads(checkin_response.text) checkin_resp = session.post(
print(f"签到结果: {response_data.get('msg', '无返回消息')}") checkin_url,
return True headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124"},
else: timeout=10
print(f"签到失败! 状态码: {checkin_response.status_code}") )
return False
checkin_result = checkin_resp.json()
return f"账号 {email} 签到结果: {checkin_result.get('msg', '未知结果')}\n"
except Exception as e: except Exception as e:
print(f"处理账号 {account['email']} 出错: {str(e)}") return f"账号 {email} 处理出错: {str(e)}\n"
return False
def notify(title, message):
"""模拟原有通知函数,实际使用时替换为真实实现"""
# 这里可以是其他通知方式的实现如PushPlus等
log(f"发送通知: {title}\n{message}")
def main():
log("===== IKUUU机场签到开始 =====")
# 检查账号配置
if not IKUUU_ACCOUNTS or IKUUU_ACCOUNTS == [""]:
log("未配置任何账号,退出程序")
return
log(f"检测到 {len(IKUUU_ACCOUNTS)} 个账号,开始签到...")
# 处理所有账号
sendmsg = ""
for account in IKUUU_ACCOUNTS:
if account.strip(): # 跳过空账号
sendmsg += login_and_checkin(account)
# 准备推送内容
table = f"IKUUU机场签到汇总\n{sendmsg}"
# 发送推送
notify("IKUUU机场签到结果", sendmsg)
send_wxpusher(table)
log("\n===== 所有账号处理完成 =====")
if __name__ == "__main__": if __name__ == "__main__":
print(f"检测到 {len(ACCOUNTS)} 个账号,开始签到...") try:
main()
success_count = 0 except Exception as e:
for account in ACCOUNTS: log(f"程序运行出错: {str(e)}")
if login_and_checkin(account): # 出错时也尝试推送错误信息
success_count += 1 if WXPUSHER_TOKEN and WXPUSHER_UID:
send_wxpusher(f"IKUUU签到脚本出错: {str(e)}")
print(f"\n签到完成! 成功处理 {success_count}/{len(ACCOUNTS)} 个账号")