diff --git a/农夫山泉有点甜/nf.py b/农夫山泉有点甜/nf.py new file mode 100644 index 0000000..cea5c23 --- /dev/null +++ b/农夫山泉有点甜/nf.py @@ -0,0 +1,228 @@ +import requests +import time +import re +import os + +# 用于存储日志信息的集合 +log_messages = [] + +# 日志记录函数 +def log_message(message): + log_messages.append(message) + print(message) # 可选:同时打印日志 + +# 统一的请求头(不包括Host和Content-Type) +base_headers = { + 'Xweb_Xhr': '1', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.6605.146 Safari/537.36', + 'Accept': '*/*', + 'Sec-Fetch-Site': 'cross-site', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Dest': 'empty', + 'Referer': 'https://servicewechat.com/wxd79ec05386a78727/88/page-frame.html', + 'Accept-Language': 'zh-CN,zh;q=0.9' +} + +# 登录函数 +def login(user, apitoken): + url = 'https://gateway.jmhd8.com/geement.usercenter/api/v1/user/information' + headers = base_headers.copy() + headers['Host'] = 'gateway.jmhd8.com' + headers['Apitoken'] = apitoken # 使用对应的Apitoken + params = { + 'levelprocessinfo': 'false', + 'gpslocationinfo': 'false', + 'popularizeinfo': 'false', + 'disablequery_extra_field': 'true', + 'disablequery_location': 'true', + 'disablequery_memberinfo': 'true', + 'disablequery_customfield': 'true', + 'disablequery_levelinfo': 'true', + 'disablequery_perfectinfo_status': 'true', + 'disablequery_extrainformation': 'true' + } + response = requests.get(url, headers=headers, params=params) + if response.status_code == 200: + data = response.json() + if 'code' in data: + if data['code'] == 200: + if 'data' in data and 'user_id' in data['data']: + user_id = data['data']['user_id'] + log_message(f"登录成功") + return True # 登录成功,继续执行 + elif data['code'] == 500: + log_message(f"Token失效,跳过该账户执行") + return False # Token失效,跳过该账户 + else: + log_message(f"{user}登录失败,返回数据没有'code'字段") + return False + else: + log_message(f"{user}请求失败,状态码: {response.status_code}") + return False + +# 骰子游戏 +def dice_game(user, apitoken): + url = 'https://thirtypro.jmhd8.com/api/v1/nongfuwater/snake/checkerboard/lottery' + headers = base_headers.copy() + headers['Host'] = 'thirtypro.jmhd8.com' + headers['Apitoken'] = apitoken # 使用对应的Apitoken + data = { + "code": "SCENE-24121018362724", + **location_data # 引用共享位置数据 + } + while True: + response = requests.post(url, headers=headers, json=data) + if response.status_code == 200: + response_data = response.json() + if response_data['code'] == 200: + log_message("继续游戏!") + elif response_data['code'] == 500: + log_message("当天的游戏次数已用尽,继续执行任务列表") + break + else: + log_message(f"请求失败,状态码: {response.status_code}") + break + +# 获取任务列表 +def get_task_list(user, apitoken): + url = 'https://gateway.jmhd8.com/geement.marketingplay/api/v1/task' + headers = base_headers.copy() + headers['Host'] = 'gateway.jmhd8.com' + headers['Apitoken'] = apitoken # 使用对应的Apitoken + params = { + 'pageNum': '1', + 'pageSize': '10', + 'task_status': '2', + 'status': '1', + 'group_id': '24121016331837' + } + response = requests.get(url, headers=headers, params=params) + if response.status_code == 200: + data = response.json() + if 'data' in data: + tasks = data['data'] + task_ids = [] + for task in tasks: + task_name = task.get('name', '未知任务') + task_id = task.get('id', '无任务ID') + log_message(f"任务名称: {task_name}, 任务ID: {task_id}") + task_ids.append(task_id) + return task_ids + else: + log_message("获取任务列表失败,返回数据中没有'data'字段") + else: + log_message(f"请求失败,状态码: {response.status_code}") + return [] + +# 执行任务 +def execute_task(user, apitoken, task_ids): + url = 'https://gateway.jmhd8.com/geement.marketingplay/api/v1/task/join' + headers = base_headers.copy() + headers['Host'] = 'gateway.jmhd8.com' + headers['Content-Type'] = 'application/x-www-form-urlencoded' + headers['Apitoken'] = apitoken # 使用对应的Apitoken + for task_id in task_ids: + params = { + 'task_id': task_id + } + while True: + response = requests.get(url, headers=headers, params=params) + if response.status_code == 200: + response_data = response.json() + if response_data['code'] == 500: + log_message(f"任务 {task_id} 已完成,跳到下一个任务") + break # 任务完成,跳出当前循环,继续下一个任务 + else: + log_message(f"任务 {task_id} 执行成功,继续执行") + time.sleep(2) # 延迟2秒后继续执行任务 + else: + log_message(f"请求失败,状态码: {response.status_code}") + break # 请求失败,跳出循环,尝试下一个任务 + +# 抽奖 +def lottery(user, apitoken): + url = 'https://gateway.jmhd8.com/geement.marketinglottery/api/v1/marketinglottery' + headers = base_headers.copy() + headers['Host'] = 'gateway.jmhd8.com' + headers['Content-Type'] = 'application/json' + headers['Apitoken'] = apitoken # 使用对应的Apitoken + codes = ["SCENE-24121018345681", "SCENE-24121018352070"] # 两个抽奖场景 + prizes = [] + for code in codes: + data = { + "code": code, + **location_data # 引用共享位置数据 + } + while True: + response = requests.post(url, headers=headers, json=data) + if response.status_code == 200: + response_data = response.json() + if response_data['code'] == 200: + prize_name = response_data['data']['prizedto']['prize_name'] + log_message(f"恭喜你获得了: {prize_name}") + prizes.append(prize_name) + time.sleep(2) # 延迟2秒后继续抽奖 + elif response_data['code'] == 500: + log_message(f"没有抽奖机会了,跳过代码 {code}") + break # 没有机会了,跳过当前的code,继续下一个code + else: + log_message(f"抽奖请求失败,状态码: {response.status_code}") + break + return prizes + +# 抓包apitoken +# 从环境变量获取账号和apitoken信息 +nongfu_str = os.environ.get('nongfu') +if nongfu_str: + users = {} + for pair in nongfu_str.split('#'): + user, apitoken = pair.split('&') + users[user] = apitoken +else: + print("请设置环境变量nongfu,格式为:user1&apitoken1#user2&apitoken2") + exit(1) + +# 抓包地理位置数据 +location_data = { + "provice_name": "湖南省", + "city_name": "长沙市", + "area_name": "**区", + "address": "**东站", + "longitude": 781.326543, + "dimension": 12.707772 +} + +# 主函数 +def main(): + results = [] + for user, apitoken in users.items(): + result = f"账户 {user}:" + log_message(f"\n开始执行账户 {user}...") + if login(user, apitoken): + dice_game(user, apitoken) # 进行骰子游戏 + task_ids = get_task_list(user, apitoken) # 获取任务列表 + if task_ids: + execute_task(user, apitoken, task_ids) # 执行任务 + prizes = lottery(user, apitoken) # 执行抽奖 + if prizes: + result += "抽奖结果:\n" + for prize in prizes: + result += f"{prize}\n" + else: + result += "无抽奖结果" + else: + result += "已失效" + results.append(result) + # 通知结果 + notify_results('\n'.join(results)) + +# 通知函数 +def notify_results(message): + if os.path.isfile('notify.py'): + from notify import send + send('农夫山泉任务运行结果', message) + else: + print("通知模块加载失败,无法发送通知。") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/农夫山泉有点甜/notify.py b/农夫山泉有点甜/notify.py new file mode 100644 index 0000000..d54a711 --- /dev/null +++ b/农夫山泉有点甜/notify.py @@ -0,0 +1,980 @@ +#!/usr/bin/env python3 +# _*_ coding:utf-8 _*_ +import base64 +import hashlib +import hmac +import json +import os +import re +import threading +import time +import urllib.parse +import smtplib +from email.mime.text import MIMEText +from email.header import Header +from email.utils import formataddr + +import requests + +# 原先的 print 函数和主线程的锁 +_print = print +mutex = threading.Lock() + + +# 定义新的 print 函数 +def print(text, *args, **kw): + """ + 使输出有序进行,不出现多线程同一时间输出导致错乱的问题。 + """ + with mutex: + _print(text, *args, **kw) + + +# 通知服务 +# fmt: off +push_config = { + 'HITOKOTO': True, # 启用一言(随机句子) + + 'BARK_PUSH': '', # bark IP 或设备码,例:https://api.day.app/DxHcxxxxxRxxxxxxcm/ + 'BARK_ARCHIVE': '', # bark 推送是否存档 + 'BARK_GROUP': '', # bark 推送分组 + 'BARK_SOUND': '', # bark 推送声音 + 'BARK_ICON': '', # bark 推送图标 + 'BARK_LEVEL': '', # bark 推送时效性 + 'BARK_URL': '', # bark 推送跳转URL + + 'CONSOLE': False, # 控制台输出 + + 'DD_BOT_SECRET': '', # 钉钉机器人的 DD_BOT_SECRET + 'DD_BOT_TOKEN': '', # 钉钉机器人的 DD_BOT_TOKEN + + 'FSKEY': '', # 飞书机器人的 FSKEY + + 'GOBOT_URL': '', # go-cqhttp + # 推送到个人QQ:http://127.0.0.1/send_private_msg + # 群:http://127.0.0.1/send_group_msg + 'GOBOT_QQ': '', # go-cqhttp 的推送群或用户 + # GOBOT_URL 设置 /send_private_msg 时填入 user_id=个人QQ + # /send_group_msg 时填入 group_id=QQ群 + 'GOBOT_TOKEN': '', # go-cqhttp 的 access_token + + 'GOTIFY_URL': '', # gotify地址,如https://push.example.de:8080 + 'GOTIFY_TOKEN': '', # gotify的消息应用token + 'GOTIFY_PRIORITY': 0, # 推送消息优先级,默认为0 + + 'IGOT_PUSH_KEY': '', # iGot 聚合推送的 IGOT_PUSH_KEY + + 'PUSH_KEY': '', # server 酱的 PUSH_KEY,兼容旧版与 Turbo 版 + + 'DEER_KEY': '', # PushDeer 的 PUSHDEER_KEY + 'DEER_URL': '', # PushDeer 的 PUSHDEER_URL + + 'CHAT_URL': '', # synology chat url + 'CHAT_TOKEN': '', # synology chat token + + 'PUSH_PLUS_TOKEN': '3bc4f55774b84edf95c3f277d315d7fd', # push+ 微信推送的用户令牌 + 'PUSH_PLUS_USER': '', # push+ 微信推送的群组编码 + + 'WE_PLUS_BOT_TOKEN': '', # 微加机器人的用户令牌 + 'WE_PLUS_BOT_RECEIVER': '', # 微加机器人的消息接收者 + 'WE_PLUS_BOT_VERSION': 'pro', # 微加机器人的调用版本 + + 'QMSG_KEY': '', # qmsg 酱的 QMSG_KEY + 'QMSG_TYPE': '', # qmsg 酱的 QMSG_TYPE + + 'QYWX_ORIGIN': '', # 企业微信代理地址 + + 'QYWX_AM': '', # 企业微信应用 + + 'QYWX_KEY': '', # 企业微信机器人 + + 'TG_BOT_TOKEN': '', # tg 机器人的 TG_BOT_TOKEN,例:1407203283:AAG9rt-6RDaaX0HBLZQq0laNOh898iFYaRQ + 'TG_USER_ID': '', # tg 机器人的 TG_USER_ID,例:1434078534 + 'TG_API_HOST': '', # tg 代理 api + 'TG_PROXY_AUTH': '', # tg 代理认证参数 + 'TG_PROXY_HOST': '', # tg 机器人的 TG_PROXY_HOST + 'TG_PROXY_PORT': '', # tg 机器人的 TG_PROXY_PORT + + 'AIBOTK_KEY': '', # 智能微秘书 个人中心的apikey 文档地址:http://wechat.aibotk.com/docs/about + 'AIBOTK_TYPE': '', # 智能微秘书 发送目标 room 或 contact + 'AIBOTK_NAME': '', # 智能微秘书 发送群名 或者好友昵称和type要对应好 + + 'SMTP_SERVER': '', # SMTP 发送邮件服务器,形如 smtp.exmail.qq.com:465 + 'SMTP_SSL': 'false', # SMTP 发送邮件服务器是否使用 SSL,填写 true 或 false + 'SMTP_EMAIL': '', # SMTP 收发件邮箱,通知将会由自己发给自己 + 'SMTP_PASSWORD': '', # SMTP 登录密码,也可能为特殊口令,视具体邮件服务商说明而定 + 'SMTP_NAME': '', # SMTP 收发件人姓名,可随意填写 + + 'PUSHME_KEY': '', # PushMe 的 PUSHME_KEY + 'PUSHME_URL': '', # PushMe 的 PUSHME_URL + + 'CHRONOCAT_QQ': '', # qq号 + 'CHRONOCAT_TOKEN': '', # CHRONOCAT 的token + 'CHRONOCAT_URL': '', # CHRONOCAT的url地址 + + 'WEBHOOK_URL': '', # 自定义通知 请求地址 + 'WEBHOOK_BODY': '', # 自定义通知 请求体 + 'WEBHOOK_HEADERS': '', # 自定义通知 请求头 + 'WEBHOOK_METHOD': '', # 自定义通知 请求方法 + 'WEBHOOK_CONTENT_TYPE': '' # 自定义通知 content-type +} +# fmt: on + +for k in push_config: + if os.getenv(k): + v = os.getenv(k) + push_config[k] = v + + +def bark(title: str, content: str) -> None: + """ + 使用 bark 推送消息。 + """ + if not push_config.get("BARK_PUSH"): + print("bark 服务的 BARK_PUSH 未设置!!\n取消推送") + return + print("bark 服务启动") + + if push_config.get("BARK_PUSH").startswith("http"): + url = f'{push_config.get("BARK_PUSH")}' + else: + url = f'https://api.day.app/{push_config.get("BARK_PUSH")}' + + bark_params = { + "BARK_ARCHIVE": "isArchive", + "BARK_GROUP": "group", + "BARK_SOUND": "sound", + "BARK_ICON": "icon", + "BARK_LEVEL": "level", + "BARK_URL": "url", + } + data = { + "title": title, + "body": content, + } + for pair in filter( + lambda pairs: pairs[0].startswith("BARK_") + and pairs[0] != "BARK_PUSH" + and pairs[1] + and bark_params.get(pairs[0]), + push_config.items(), + ): + data[bark_params.get(pair[0])] = pair[1] + headers = {"Content-Type": "application/json;charset=utf-8"} + response = requests.post( + url=url, data=json.dumps(data), headers=headers, timeout=15 + ).json() + + if response["code"] == 200: + print("bark 推送成功!") + else: + print("bark 推送失败!") + + +def console(title: str, content: str) -> None: + """ + 使用 控制台 推送消息。 + """ + print(f"{title}\n\n{content}") + + +def dingding_bot(title: str, content: str) -> None: + """ + 使用 钉钉机器人 推送消息。 + """ + if not push_config.get("DD_BOT_SECRET") or not push_config.get("DD_BOT_TOKEN"): + print("钉钉机器人 服务的 DD_BOT_SECRET 或者 DD_BOT_TOKEN 未设置!!\n取消推送") + return + print("钉钉机器人 服务启动") + + timestamp = str(round(time.time() * 1000)) + secret_enc = push_config.get("DD_BOT_SECRET").encode("utf-8") + string_to_sign = "{}\n{}".format(timestamp, push_config.get("DD_BOT_SECRET")) + string_to_sign_enc = string_to_sign.encode("utf-8") + hmac_code = hmac.new( + secret_enc, string_to_sign_enc, digestmod=hashlib.sha256 + ).digest() + sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) + url = f'https://oapi.dingtalk.com/robot/send?access_token={push_config.get("DD_BOT_TOKEN")}×tamp={timestamp}&sign={sign}' + headers = {"Content-Type": "application/json;charset=utf-8"} + data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}} + response = requests.post( + url=url, data=json.dumps(data), headers=headers, timeout=15 + ).json() + + if not response["errcode"]: + print("钉钉机器人 推送成功!") + else: + print("钉钉机器人 推送失败!") + + +def feishu_bot(title: str, content: str) -> None: + """ + 使用 飞书机器人 推送消息。 + """ + if not push_config.get("FSKEY"): + print("飞书 服务的 FSKEY 未设置!!\n取消推送") + return + print("飞书 服务启动") + + url = f'https://open.feishu.cn/open-apis/bot/v2/hook/{push_config.get("FSKEY")}' + data = {"msg_type": "text", "content": {"text": f"{title}\n\n{content}"}} + response = requests.post(url, data=json.dumps(data)).json() + + if response.get("StatusCode") == 0 or response.get("code") == 0: + print("飞书 推送成功!") + else: + print("飞书 推送失败!错误信息如下:\n", response) + + +def go_cqhttp(title: str, content: str) -> None: + """ + 使用 go_cqhttp 推送消息。 + """ + if not push_config.get("GOBOT_URL") or not push_config.get("GOBOT_QQ"): + print("go-cqhttp 服务的 GOBOT_URL 或 GOBOT_QQ 未设置!!\n取消推送") + return + print("go-cqhttp 服务启动") + + url = f'{push_config.get("GOBOT_URL")}?access_token={push_config.get("GOBOT_TOKEN")}&{push_config.get("GOBOT_QQ")}&message=标题:{title}\n内容:{content}' + response = requests.get(url).json() + + if response["status"] == "ok": + print("go-cqhttp 推送成功!") + else: + print("go-cqhttp 推送失败!") + + +def gotify(title: str, content: str) -> None: + """ + 使用 gotify 推送消息。 + """ + if not push_config.get("GOTIFY_URL") or not push_config.get("GOTIFY_TOKEN"): + print("gotify 服务的 GOTIFY_URL 或 GOTIFY_TOKEN 未设置!!\n取消推送") + return + print("gotify 服务启动") + + url = f'{push_config.get("GOTIFY_URL")}/message?token={push_config.get("GOTIFY_TOKEN")}' + data = { + "title": title, + "message": content, + "priority": push_config.get("GOTIFY_PRIORITY"), + } + response = requests.post(url, data=data).json() + + if response.get("id"): + print("gotify 推送成功!") + else: + print("gotify 推送失败!") + + +def iGot(title: str, content: str) -> None: + """ + 使用 iGot 推送消息。 + """ + if not push_config.get("IGOT_PUSH_KEY"): + print("iGot 服务的 IGOT_PUSH_KEY 未设置!!\n取消推送") + return + print("iGot 服务启动") + + url = f'https://push.hellyw.com/{push_config.get("IGOT_PUSH_KEY")}' + data = {"title": title, "content": content} + headers = {"Content-Type": "application/x-www-form-urlencoded"} + response = requests.post(url, data=data, headers=headers).json() + + if response["ret"] == 0: + print("iGot 推送成功!") + else: + print(f'iGot 推送失败!{response["errMsg"]}') + + +def serverJ(title: str, content: str) -> None: + """ + 通过 serverJ 推送消息。 + """ + if not push_config.get("PUSH_KEY"): + print("serverJ 服务的 PUSH_KEY 未设置!!\n取消推送") + return + print("serverJ 服务启动") + + data = {"text": title, "desp": content.replace("\n", "\n\n")} + if push_config.get("PUSH_KEY").find("SCT") != -1: + url = f'https://sctapi.ftqq.com/{push_config.get("PUSH_KEY")}.send' + else: + url = f'https://sc.ftqq.com/{push_config.get("PUSH_KEY")}.send' + response = requests.post(url, data=data).json() + + if response.get("errno") == 0 or response.get("code") == 0: + print("serverJ 推送成功!") + else: + print(f'serverJ 推送失败!错误码:{response["message"]}') + + +def pushdeer(title: str, content: str) -> None: + """ + 通过PushDeer 推送消息 + """ + if not push_config.get("DEER_KEY"): + print("PushDeer 服务的 DEER_KEY 未设置!!\n取消推送") + return + print("PushDeer 服务启动") + data = { + "text": title, + "desp": content, + "type": "markdown", + "pushkey": push_config.get("DEER_KEY"), + } + url = "https://api2.pushdeer.com/message/push" + if push_config.get("DEER_URL"): + url = push_config.get("DEER_URL") + + response = requests.post(url, data=data).json() + + if len(response.get("content").get("result")) > 0: + print("PushDeer 推送成功!") + else: + print("PushDeer 推送失败!错误信息:", response) + + +def chat(title: str, content: str) -> None: + """ + 通过Chat 推送消息 + """ + if not push_config.get("CHAT_URL") or not push_config.get("CHAT_TOKEN"): + print("chat 服务的 CHAT_URL或CHAT_TOKEN 未设置!!\n取消推送") + return + print("chat 服务启动") + data = "payload=" + json.dumps({"text": title + "\n" + content}) + url = push_config.get("CHAT_URL") + push_config.get("CHAT_TOKEN") + response = requests.post(url, data=data) + + if response.status_code == 200: + print("Chat 推送成功!") + else: + print("Chat 推送失败!错误信息:", response) + + +def pushplus_bot(title: str, content: str) -> None: + """ + 通过 push+ 推送消息。 + """ + if not push_config.get("PUSH_PLUS_TOKEN"): + print("PUSHPLUS 服务的 PUSH_PLUS_TOKEN 未设置!!\n取消推送") + return + print("PUSHPLUS 服务启动") + + url = "http://www.pushplus.plus/send" + data = { + "token": push_config.get("PUSH_PLUS_TOKEN"), + "title": title, + "content": content, + "topic": push_config.get("PUSH_PLUS_USER"), + } + body = json.dumps(data).encode(encoding="utf-8") + headers = {"Content-Type": "application/json"} + response = requests.post(url=url, data=body, headers=headers).json() + + if response["code"] == 200: + print("PUSHPLUS 推送成功!") + + else: + url_old = "http://pushplus.hxtrip.com/send" + headers["Accept"] = "application/json" + response = requests.post(url=url_old, data=body, headers=headers).json() + + if response["code"] == 200: + print("PUSHPLUS(hxtrip) 推送成功!") + + else: + print("PUSHPLUS 推送失败!") + + +def weplus_bot(title: str, content: str) -> None: + """ + 通过 微加机器人 推送消息。 + """ + if not push_config.get("WE_PLUS_BOT_TOKEN"): + print("微加机器人 服务的 WE_PLUS_BOT_TOKEN 未设置!!\n取消推送") + return + print("微加机器人 服务启动") + + template = "txt" + if len(content) > 800: + template = "html" + + url = "https://www.weplusbot.com/send" + data = { + "token": push_config.get("WE_PLUS_BOT_TOKEN"), + "title": title, + "content": content, + "template": template, + "receiver": push_config.get("WE_PLUS_BOT_RECEIVER"), + "version": push_config.get("WE_PLUS_BOT_VERSION"), + } + body = json.dumps(data).encode(encoding="utf-8") + headers = {"Content-Type": "application/json"} + response = requests.post(url=url, data=body, headers=headers).json() + + if response["code"] == 200: + print("微加机器人 推送成功!") + else: + print("微加机器人 推送失败!") + + +def qmsg_bot(title: str, content: str) -> None: + """ + 使用 qmsg 推送消息。 + """ + if not push_config.get("QMSG_KEY") or not push_config.get("QMSG_TYPE"): + print("qmsg 的 QMSG_KEY 或者 QMSG_TYPE 未设置!!\n取消推送") + return + print("qmsg 服务启动") + + url = f'https://qmsg.zendee.cn/{push_config.get("QMSG_TYPE")}/{push_config.get("QMSG_KEY")}' + payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")} + response = requests.post(url=url, params=payload).json() + + if response["code"] == 0: + print("qmsg 推送成功!") + else: + print(f'qmsg 推送失败!{response["reason"]}') + + +def wecom_app(title: str, content: str) -> None: + """ + 通过 企业微信 APP 推送消息。 + """ + if not push_config.get("QYWX_AM"): + print("QYWX_AM 未设置!!\n取消推送") + return + QYWX_AM_AY = re.split(",", push_config.get("QYWX_AM")) + if 4 < len(QYWX_AM_AY) > 5: + print("QYWX_AM 设置错误!!\n取消推送") + return + print("企业微信 APP 服务启动") + + corpid = QYWX_AM_AY[0] + corpsecret = QYWX_AM_AY[1] + touser = QYWX_AM_AY[2] + agentid = QYWX_AM_AY[3] + try: + media_id = QYWX_AM_AY[4] + except IndexError: + media_id = "" + wx = WeCom(corpid, corpsecret, agentid) + # 如果没有配置 media_id 默认就以 text 方式发送 + if not media_id: + message = title + "\n\n" + content + response = wx.send_text(message, touser) + else: + response = wx.send_mpnews(title, content, media_id, touser) + + if response == "ok": + print("企业微信推送成功!") + else: + print("企业微信推送失败!错误信息如下:\n", response) + + +class WeCom: + def __init__(self, corpid, corpsecret, agentid): + self.CORPID = corpid + self.CORPSECRET = corpsecret + self.AGENTID = agentid + self.ORIGIN = "https://qyapi.weixin.qq.com" + if push_config.get("QYWX_ORIGIN"): + self.ORIGIN = push_config.get("QYWX_ORIGIN") + + def get_access_token(self): + url = f"{self.ORIGIN}/cgi-bin/gettoken" + values = { + "corpid": self.CORPID, + "corpsecret": self.CORPSECRET, + } + req = requests.post(url, params=values) + data = json.loads(req.text) + return data["access_token"] + + def send_text(self, message, touser="@all"): + send_url = ( + f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}" + ) + send_values = { + "touser": touser, + "msgtype": "text", + "agentid": self.AGENTID, + "text": {"content": message}, + "safe": "0", + } + send_msges = bytes(json.dumps(send_values), "utf-8") + respone = requests.post(send_url, send_msges) + respone = respone.json() + return respone["errmsg"] + + def send_mpnews(self, title, message, media_id, touser="@all"): + send_url = ( + f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}" + ) + send_values = { + "touser": touser, + "msgtype": "mpnews", + "agentid": self.AGENTID, + "mpnews": { + "articles": [ + { + "title": title, + "thumb_media_id": media_id, + "author": "Author", + "content_source_url": "", + "content": message.replace("\n", "
"), + "digest": message, + } + ] + }, + } + send_msges = bytes(json.dumps(send_values), "utf-8") + respone = requests.post(send_url, send_msges) + respone = respone.json() + return respone["errmsg"] + + +def wecom_bot(title: str, content: str) -> None: + """ + 通过 企业微信机器人 推送消息。 + """ + if not push_config.get("QYWX_KEY"): + print("企业微信机器人 服务的 QYWX_KEY 未设置!!\n取消推送") + return + print("企业微信机器人服务启动") + + origin = "https://qyapi.weixin.qq.com" + if push_config.get("QYWX_ORIGIN"): + origin = push_config.get("QYWX_ORIGIN") + + url = f"{origin}/cgi-bin/webhook/send?key={push_config.get('QYWX_KEY')}" + headers = {"Content-Type": "application/json;charset=utf-8"} + data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}} + response = requests.post( + url=url, data=json.dumps(data), headers=headers, timeout=15 + ).json() + + if response["errcode"] == 0: + print("企业微信机器人推送成功!") + else: + print("企业微信机器人推送失败!") + + +def telegram_bot(title: str, content: str) -> None: + """ + 使用 telegram 机器人 推送消息。 + """ + if not push_config.get("TG_BOT_TOKEN") or not push_config.get("TG_USER_ID"): + print("tg 服务的 bot_token 或者 user_id 未设置!!\n取消推送") + return + print("tg 服务启动") + + if push_config.get("TG_API_HOST"): + url = f"{push_config.get('TG_API_HOST')}/bot{push_config.get('TG_BOT_TOKEN')}/sendMessage" + else: + url = ( + f"https://api.telegram.org/bot{push_config.get('TG_BOT_TOKEN')}/sendMessage" + ) + headers = {"Content-Type": "application/x-www-form-urlencoded"} + payload = { + "chat_id": str(push_config.get("TG_USER_ID")), + "text": f"{title}\n\n{content}", + "disable_web_page_preview": "true", + } + proxies = None + if push_config.get("TG_PROXY_HOST") and push_config.get("TG_PROXY_PORT"): + if push_config.get("TG_PROXY_AUTH") is not None and "@" not in push_config.get( + "TG_PROXY_HOST" + ): + push_config["TG_PROXY_HOST"] = ( + push_config.get("TG_PROXY_AUTH") + + "@" + + push_config.get("TG_PROXY_HOST") + ) + proxyStr = "http://{}:{}".format( + push_config.get("TG_PROXY_HOST"), push_config.get("TG_PROXY_PORT") + ) + proxies = {"http": proxyStr, "https": proxyStr} + response = requests.post( + url=url, headers=headers, params=payload, proxies=proxies + ).json() + + if response["ok"]: + print("tg 推送成功!") + else: + print("tg 推送失败!") + + +def aibotk(title: str, content: str) -> None: + """ + 使用 智能微秘书 推送消息。 + """ + if ( + not push_config.get("AIBOTK_KEY") + or not push_config.get("AIBOTK_TYPE") + or not push_config.get("AIBOTK_NAME") + ): + print( + "智能微秘书 的 AIBOTK_KEY 或者 AIBOTK_TYPE 或者 AIBOTK_NAME 未设置!!\n取消推送" + ) + return + print("智能微秘书 服务启动") + + if push_config.get("AIBOTK_TYPE") == "room": + url = "https://api-bot.aibotk.com/openapi/v1/chat/room" + data = { + "apiKey": push_config.get("AIBOTK_KEY"), + "roomName": push_config.get("AIBOTK_NAME"), + "message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"}, + } + else: + url = "https://api-bot.aibotk.com/openapi/v1/chat/contact" + data = { + "apiKey": push_config.get("AIBOTK_KEY"), + "name": push_config.get("AIBOTK_NAME"), + "message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"}, + } + body = json.dumps(data).encode(encoding="utf-8") + headers = {"Content-Type": "application/json"} + response = requests.post(url=url, data=body, headers=headers).json() + print(response) + if response["code"] == 0: + print("智能微秘书 推送成功!") + else: + print(f'智能微秘书 推送失败!{response["error"]}') + + +def smtp(title: str, content: str) -> None: + """ + 使用 SMTP 邮件 推送消息。 + """ + if ( + not push_config.get("SMTP_SERVER") + or not push_config.get("SMTP_SSL") + or not push_config.get("SMTP_EMAIL") + or not push_config.get("SMTP_PASSWORD") + or not push_config.get("SMTP_NAME") + ): + print( + "SMTP 邮件 的 SMTP_SERVER 或者 SMTP_SSL 或者 SMTP_EMAIL 或者 SMTP_PASSWORD 或者 SMTP_NAME 未设置!!\n取消推送" + ) + return + print("SMTP 邮件 服务启动") + + message = MIMEText(content, "plain", "utf-8") + message["From"] = formataddr( + ( + Header(push_config.get("SMTP_NAME"), "utf-8").encode(), + push_config.get("SMTP_EMAIL"), + ) + ) + message["To"] = formataddr( + ( + Header(push_config.get("SMTP_NAME"), "utf-8").encode(), + push_config.get("SMTP_EMAIL"), + ) + ) + message["Subject"] = Header(title, "utf-8") + + try: + smtp_server = ( + smtplib.SMTP_SSL(push_config.get("SMTP_SERVER")) + if push_config.get("SMTP_SSL") == "true" + else smtplib.SMTP(push_config.get("SMTP_SERVER")) + ) + smtp_server.login( + push_config.get("SMTP_EMAIL"), push_config.get("SMTP_PASSWORD") + ) + smtp_server.sendmail( + push_config.get("SMTP_EMAIL"), + push_config.get("SMTP_EMAIL"), + message.as_bytes(), + ) + smtp_server.close() + print("SMTP 邮件 推送成功!") + except Exception as e: + print(f"SMTP 邮件 推送失败!{e}") + + +def pushme(title: str, content: str) -> None: + """ + 使用 PushMe 推送消息。 + """ + if not push_config.get("PUSHME_KEY"): + print("PushMe 服务的 PUSHME_KEY 未设置!!\n取消推送") + return + print("PushMe 服务启动") + + url = ( + push_config.get("PUSHME_URL") + if push_config.get("PUSHME_URL") + else "https://push.i-i.me/" + ) + data = { + "push_key": push_config.get("PUSHME_KEY"), + "title": title, + "content": content, + "date": push_config.get("date") if push_config.get("date") else "", + "type": push_config.get("type") if push_config.get("type") else "", + } + response = requests.post(url, data=data) + + if response.status_code == 200 and response.text == "success": + print("PushMe 推送成功!") + else: + print(f"PushMe 推送失败!{response.status_code} {response.text}") + + +def chronocat(title: str, content: str) -> None: + """ + 使用 CHRONOCAT 推送消息。 + """ + if ( + not push_config.get("CHRONOCAT_URL") + or not push_config.get("CHRONOCAT_QQ") + or not push_config.get("CHRONOCAT_TOKEN") + ): + print("CHRONOCAT 服务的 CHRONOCAT_URL 或 CHRONOCAT_QQ 未设置!!\n取消推送") + return + + print("CHRONOCAT 服务启动") + + user_ids = re.findall(r"user_id=(\d+)", push_config.get("CHRONOCAT_QQ")) + group_ids = re.findall(r"group_id=(\d+)", push_config.get("CHRONOCAT_QQ")) + + url = f'{push_config.get("CHRONOCAT_URL")}/api/message/send' + headers = { + "Content-Type": "application/json", + "Authorization": f'Bearer {push_config.get("CHRONOCAT_TOKEN")}', + } + + for chat_type, ids in [(1, user_ids), (2, group_ids)]: + if not ids: + continue + for chat_id in ids: + data = { + "peer": {"chatType": chat_type, "peerUin": chat_id}, + "elements": [ + { + "elementType": 1, + "textElement": {"content": f"{title}\n\n{content}"}, + } + ], + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + if response.status_code == 200: + if chat_type == 1: + print(f"QQ个人消息:{ids}推送成功!") + else: + print(f"QQ群消息:{ids}推送成功!") + else: + if chat_type == 1: + print(f"QQ个人消息:{ids}推送失败!") + else: + print(f"QQ群消息:{ids}推送失败!") + + +def parse_headers(headers): + if not headers: + return {} + + parsed = {} + lines = headers.split("\n") + + for line in lines: + i = line.find(":") + if i == -1: + continue + + key = line[:i].strip().lower() + val = line[i + 1 :].strip() + parsed[key] = parsed.get(key, "") + ", " + val if key in parsed else val + + return parsed + + +def parse_string(input_string, value_format_fn=None): + matches = {} + pattern = r"(\w+):\s*((?:(?!\n\w+:).)*)" + regex = re.compile(pattern) + for match in regex.finditer(input_string): + key, value = match.group(1).strip(), match.group(2).strip() + try: + value = value_format_fn(value) if value_format_fn else value + json_value = json.loads(value) + matches[key] = json_value + except: + matches[key] = value + return matches + + +def parse_body(body, content_type, value_format_fn=None): + if not body or content_type == "text/plain": + return value_format_fn(body) if value_format_fn and body else body + + parsed = parse_string(body, value_format_fn) + + if content_type == "application/x-www-form-urlencoded": + data = urllib.parse.urlencode(parsed, doseq=True) + return data + + if content_type == "application/json": + data = json.dumps(parsed) + return data + + return parsed + + +def custom_notify(title: str, content: str) -> None: + """ + 通过 自定义通知 推送消息。 + """ + if not push_config.get("WEBHOOK_URL") or not push_config.get("WEBHOOK_METHOD"): + print("自定义通知的 WEBHOOK_URL 或 WEBHOOK_METHOD 未设置!!\n取消推送") + return + + print("自定义通知服务启动") + + WEBHOOK_URL = push_config.get("WEBHOOK_URL") + WEBHOOK_METHOD = push_config.get("WEBHOOK_METHOD") + WEBHOOK_CONTENT_TYPE = push_config.get("WEBHOOK_CONTENT_TYPE") + WEBHOOK_BODY = push_config.get("WEBHOOK_BODY") + WEBHOOK_HEADERS = push_config.get("WEBHOOK_HEADERS") + + if "$title" not in WEBHOOK_URL and "$title" not in WEBHOOK_BODY: + print("请求头或者请求体中必须包含 $title 和 $content") + return + + headers = parse_headers(WEBHOOK_HEADERS) + body = parse_body( + WEBHOOK_BODY, + WEBHOOK_CONTENT_TYPE, + lambda v: v.replace("$title", title).replace("$content", content), + ) + formatted_url = WEBHOOK_URL.replace( + "$title", urllib.parse.quote_plus(title) + ).replace("$content", urllib.parse.quote_plus(content)) + response = requests.request( + method=WEBHOOK_METHOD, url=formatted_url, headers=headers, timeout=15, data=body + ) + + if response.status_code == 200: + print("自定义通知推送成功!") + else: + print(f"自定义通知推送失败!{response.status_code} {response.text}") + + +def one() -> str: + """ + 获取一条一言。 + :return: + """ + url = "https://v1.hitokoto.cn/" + res = requests.get(url).json() + return res["hitokoto"] + " ----" + res["from"] + + +def add_notify_function(): + notify_function = [] + if push_config.get("BARK_PUSH"): + notify_function.append(bark) + if push_config.get("CONSOLE"): + notify_function.append(console) + if push_config.get("DD_BOT_TOKEN") and push_config.get("DD_BOT_SECRET"): + notify_function.append(dingding_bot) + if push_config.get("FSKEY"): + notify_function.append(feishu_bot) + if push_config.get("GOBOT_URL") and push_config.get("GOBOT_QQ"): + notify_function.append(go_cqhttp) + if push_config.get("GOTIFY_URL") and push_config.get("GOTIFY_TOKEN"): + notify_function.append(gotify) + if push_config.get("IGOT_PUSH_KEY"): + notify_function.append(iGot) + if push_config.get("PUSH_KEY"): + notify_function.append(serverJ) + if push_config.get("DEER_KEY"): + notify_function.append(pushdeer) + if push_config.get("CHAT_URL") and push_config.get("CHAT_TOKEN"): + notify_function.append(chat) + if push_config.get("PUSH_PLUS_TOKEN"): + notify_function.append(pushplus_bot) + if push_config.get("WE_PLUS_BOT_TOKEN"): + notify_function.append(weplus_bot) + if push_config.get("QMSG_KEY") and push_config.get("QMSG_TYPE"): + notify_function.append(qmsg_bot) + if push_config.get("QYWX_AM"): + notify_function.append(wecom_app) + if push_config.get("QYWX_KEY"): + notify_function.append(wecom_bot) + if push_config.get("TG_BOT_TOKEN") and push_config.get("TG_USER_ID"): + notify_function.append(telegram_bot) + if ( + push_config.get("AIBOTK_KEY") + and push_config.get("AIBOTK_TYPE") + and push_config.get("AIBOTK_NAME") + ): + notify_function.append(aibotk) + if ( + push_config.get("SMTP_SERVER") + and push_config.get("SMTP_SSL") + and push_config.get("SMTP_EMAIL") + and push_config.get("SMTP_PASSWORD") + and push_config.get("SMTP_NAME") + ): + notify_function.append(smtp) + if push_config.get("PUSHME_KEY"): + notify_function.append(pushme) + if ( + push_config.get("CHRONOCAT_URL") + and push_config.get("CHRONOCAT_QQ") + and push_config.get("CHRONOCAT_TOKEN") + ): + notify_function.append(chronocat) + if push_config.get("WEBHOOK_URL") and push_config.get("WEBHOOK_METHOD"): + notify_function.append(custom_notify) + + if not notify_function: + print(f"无推送渠道,请检查通知变量是否正确") + return notify_function + + +def send(title: str, content: str, ignore_default_config: bool = False, **kwargs): + if kwargs: + global push_config + if ignore_default_config: + push_config = kwargs # 清空从环境变量获取的配置 + else: + push_config.update(kwargs) + + if not content: + print(f"{title} 推送内容为空!") + return + + # 根据标题跳过一些消息推送,环境变量:SKIP_PUSH_TITLE 用回车分隔 + skipTitle = os.getenv("SKIP_PUSH_TITLE") + if skipTitle: + if title in re.split("\n", skipTitle): + print(f"{title} 在SKIP_PUSH_TITLE环境变量内,跳过推送!") + return + + hitokoto = push_config.get("HITOKOTO") + content += "\n\n" + one() if hitokoto != "false" else "" + + notify_function = add_notify_function() + ts = [ + threading.Thread(target=mode, args=(title, content), name=mode.__name__) + for mode in notify_function + ] + [t.start() for t in ts] + [t.join() for t in ts] + + +def main(): + send("title", "content") + + +if __name__ == "__main__": + main()