diff --git a/ikuuu2.py b/ikuuu2.py index 6c67a05..86e7499 100644 --- a/ikuuu2.py +++ b/ikuuu2.py @@ -1,124 +1,130 @@ -#!/usr/bin/python3 -# -- coding: utf-8 -- -# ------------------------------- -# @Author : github@wd210010 https://github.com/wd210010/just_for_happy -# @Time : 2025/7/22 13:23 -# ------------------------------- -# cron "30 5 * * *" script-path=xxx.py,tag=匹配cron用 -# const $ = new Env('IKuuu机场签到帐号版') -# 进入青龙容器 -# docker exec -it qinglong bash -# apk update -# apk add chromium chromium-chromedriver - import os -import re -import random import requests import json -from bs4 import BeautifulSoup -import undetected_chromedriver as uc # 使用 uc 避免手动管理 chromedriver -def extract_and_select_url(): - """提取 ikuuu 的可用域名""" - options = uc.ChromeOptions() - options.add_argument("--headless") # 无界面模式 - options.add_argument("--no-sandbox") - options.add_argument("--disable-dev-shm-usage") - options.add_argument("--disable-gpu") - options.add_argument("--disable-software-rasterizer") +# 配置信息 - 从环境变量获取,避免硬编码敏感信息 +WXPUSHER_TOKEN = os.getenv("WXPUSHER_TOKEN", "") +WXPUSHER_UID = os.getenv("WXPUSHER_UID", "") +IKUUU_ACCOUNTS = os.getenv("IKUUU_ACCOUNTS", "").split("#") # 格式: 账号1&密码1#账号2&密码2 - # 关键:指定 chromium 的路径(Alpine 容器安装后在这里) - options.binary_location = "/usr/bin/chromium-browser" +# 全局变量用于收集日志 +log_messages = [] - # 如果你安装了 chromium-chromedriver,可以指定 driver_executable_path - driver = uc.Chrome(options=options, driver_executable_path="/usr/bin/chromedriver") +def log(message): + """记录日志并打印""" + print(message) + log_messages.append(message) - driver.get("http://ikuuu.club") - soup = BeautifulSoup(driver.page_source, 'html.parser') - driver.quit() - - text_content = soup.get_text() - urls = re.findall(r'ikuuu\.[a-zA-Z0-9]+\b', text_content) - - if urls: - unique_urls = list(set(urls)) - selected_url = random.choice(unique_urls) - return selected_url - else: - return "未找到网址" - - -# 从环境变量读取账号信息 -ACCOUNT_STR = os.getenv('ikuuu', '') # 格式:账号1&密码1#账号2&密码2 - -# 解析账号信息 -ACCOUNTS = [] -if ACCOUNT_STR: - account_pairs = ACCOUNT_STR.split('#') - for pair in account_pairs: - if '&' in pair: - email, password = pair.split('&', 1) # 只分割第一个&符号 - ACCOUNTS.append({"email": email, "password": password}) - -if not ACCOUNTS: - print("未检测到账号信息,请设置环境变量 ikuuu") - print('格式示例:export ikuuu="账号1&密码1#账号2&密码2"') - exit() - -DOMAIN = extract_and_select_url() - -# 更新 URLs -LOGIN_URL = f"https://{DOMAIN}/auth/login" -CHECKIN_URL = f"https://{DOMAIN}/user/checkin" - -COMMON_HEADERS = { - "User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Mobile Safari/537.36", - "X-Requested-With": "XMLHttpRequest" -} - -def login_and_checkin(account): - """处理单个账号的登录和签到""" - session = requests.Session() - - # 登录请求 - login_data = { - "host": DOMAIN.replace('https://', ''), - "email": account["email"], - "passwd": account["password"], - "code": "" +def send_wxpusher(msg): + """发送消息到WXPusher""" + if not WXPUSHER_TOKEN or not WXPUSHER_UID: + log("WXPusher配置不完整,跳过推送") + return + + url = "https://wxpusher.zjiecode.com/api/send/message" + headers = {"Content-Type": "application/json"} + data = { + "appToken": WXPUSHER_TOKEN, + "content": msg, + "contentType": 1, # 文本类型 + "uids": [WXPUSHER_UID] } - - print(f"正在处理账号: {account['email']},使用域名: {DOMAIN}") + try: - login_response = session.post(LOGIN_URL, data=login_data, headers=COMMON_HEADERS) - print(json.loads(login_response.text)['msg']) - if login_response.status_code != 200: - print(f"登录失败! 状态码: {login_response.status_code}") - return False - - # 签到请求 - checkin_response = session.post(CHECKIN_URL, headers=COMMON_HEADERS) - - if checkin_response.status_code == 200: - response_data = json.loads(checkin_response.text) - print(f"签到结果: {response_data.get('msg', '无返回消息')}") - return True + resp = requests.post(url, json=data, headers=headers, timeout=10) + resp.raise_for_status() # 触发HTTP错误状态码的异常 + result = resp.json() + + if result.get("code") == 1000: + log("WXPusher推送成功") else: - print(f"签到失败! 状态码: {checkin_response.status_code}") - return False + log(f"WXPusher推送失败: {result.get('msg', '未知错误')}") + + except requests.exceptions.RequestException as e: + log(f"WXPusher请求异常: {str(e)}") + except json.JSONDecodeError: + log("WXPusher返回数据格式错误") +def login_and_checkin(account_info): + """处理单个账号的登录和签到""" + try: + email, password = account_info.split("&", 1) + except ValueError: + return "账号格式错误,应为'邮箱&密码'\n" + + # 这里是示例域名,实际使用时可能需要动态检测可用域名 + base_url = "https://ikuuu.de" + login_url = f"{base_url}/auth/login" + checkin_url = f"{base_url}/user/checkin" + + # 登录 + try: + session = requests.Session() + login_data = { + "email": email, + "passwd": password, + "code": "" + } + + login_resp = session.post( + login_url, + data=login_data, + headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124"}, + timeout=10 + ) + + login_result = login_resp.json() + if login_result.get("ret") != 1: + return f"账号 {email} 登录失败: {login_result.get('msg', '未知错误')}\n" + + # 签到 + checkin_resp = session.post( + checkin_url, + headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124"}, + timeout=10 + ) + + checkin_result = checkin_resp.json() + return f"账号 {email} 签到结果: {checkin_result.get('msg', '未知结果')}\n" + except Exception as e: - print(f"处理账号 {account['email']} 时出错: {str(e)}") - return False + return f"账号 {email} 处理出错: {str(e)}\n" +def notify(title, message): + """模拟原有通知函数,实际使用时替换为真实实现""" + # 这里可以是其他通知方式的实现,如PushPlus等 + log(f"发送通知: {title}\n{message}") + +def main(): + log("===== IKUUU机场签到开始 =====") + + # 检查账号配置 + if not IKUUU_ACCOUNTS or IKUUU_ACCOUNTS == [""]: + log("未配置任何账号,退出程序") + return + + log(f"检测到 {len(IKUUU_ACCOUNTS)} 个账号,开始签到...") + + # 处理所有账号 + sendmsg = "" + for account in IKUUU_ACCOUNTS: + if account.strip(): # 跳过空账号 + sendmsg += login_and_checkin(account) + + # 准备推送内容 + table = f"IKUUU机场签到汇总\n{sendmsg}" + + # 发送推送 + notify("IKUUU机场签到结果", sendmsg) + send_wxpusher(table) + + log("\n===== 所有账号处理完成 =====") if __name__ == "__main__": - print(f"检测到 {len(ACCOUNTS)} 个账号,开始签到...") - - success_count = 0 - for account in ACCOUNTS: - if login_and_checkin(account): - success_count += 1 - - print(f"\n签到完成! 成功处理 {success_count}/{len(ACCOUNTS)} 个账号") + try: + main() + except Exception as e: + log(f"程序运行出错: {str(e)}") + # 出错时也尝试推送错误信息 + if WXPUSHER_TOKEN and WXPUSHER_UID: + send_wxpusher(f"IKUUU签到脚本出错: {str(e)}")