mirror of
https://github.com/blusunny/qinglong.git
synced 2025-12-16 23:10:17 +08:00
342 lines
12 KiB
JavaScript
342 lines
12 KiB
JavaScript
# !/usr/bin/python3
|
||
# -- coding: utf-8 --
|
||
# -------------------------------
|
||
# @Author CHERWIN✨✨✨
|
||
# -------------------------------
|
||
# cron "30 1 * * *" script-path=xxx.py,tag=匹配cron用
|
||
# const $ = new Env('奈雪小程序签到')
|
||
import datetime
|
||
import json
|
||
import os
|
||
import random
|
||
import requests
|
||
import hashlib
|
||
import hmac
|
||
import base64
|
||
import time
|
||
from requests.packages.urllib3.exceptions import InsecureRequestWarning
|
||
|
||
# 禁用安全请求警告
|
||
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||
|
||
IS_DEV = False
|
||
if os.path.isfile('DEV_ENV.py'):
|
||
import DEV_ENV
|
||
IS_DEV = True
|
||
if os.path.isfile('notify.py'):
|
||
from notify import send
|
||
print("加载通知服务成功!")
|
||
else:
|
||
print("加载通知服务失败!")
|
||
send_msg = ''
|
||
one_msg=''
|
||
def Log(cont=''):
|
||
global send_msg,one_msg
|
||
print(cont)
|
||
if cont:
|
||
one_msg += f'{cont}\n'
|
||
send_msg += f'{cont}\n'
|
||
|
||
class RUN:
|
||
def __init__(self,info,index):
|
||
global one_msg
|
||
one_msg = ''
|
||
split_info = info.split('@')
|
||
token = split_info[0]
|
||
len_split_info = len(split_info)
|
||
last_info = split_info[len_split_info - 1]
|
||
self.send_UID = None
|
||
if len_split_info > 0 and "UID_" in last_info:
|
||
print('检测到设置了UID')
|
||
print(last_info)
|
||
self.send_UID = last_info
|
||
self.index = index + 1
|
||
self.s = requests.session()
|
||
self.s.verify = False
|
||
|
||
self.token = f'Bearer {token}'
|
||
self.UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/6945'
|
||
self.openId = 'QL6ZOftGzbziPlZwfiXM'
|
||
|
||
def random_string(self,length=6, chars='123456789'):
|
||
return ''.join(random.choice(chars) for _ in range(length))
|
||
|
||
def get_body(self):
|
||
nonce = int(self.random_string())
|
||
timestamp = int(time.time())
|
||
url_path = f'nonce={nonce}&openId={self.openId}×tamp={timestamp}'
|
||
signature = base64.b64encode(
|
||
hmac.new('sArMTldQ9tqU19XIRDMWz7BO5WaeBnrezA'.encode(), url_path.encode(), hashlib.sha1).digest()).decode()
|
||
common = {
|
||
'platform': 'wxapp',
|
||
'version': '5.1.8',
|
||
'imei': '',
|
||
'osn': 'microsoft',
|
||
'sv': 'Windows 10 x64',
|
||
'lang': 'zh_CN',
|
||
'currency': 'CNY',
|
||
'timeZone': '',
|
||
'nonce': nonce,
|
||
'openId': self.openId,
|
||
'timestamp': timestamp,
|
||
'signature': signature
|
||
}
|
||
params = {
|
||
'businessType': 1,
|
||
'brand': 26000252,
|
||
'tenantId': 1,
|
||
'channel': 2,
|
||
'stallType': None,
|
||
'storeId': None
|
||
}
|
||
|
||
requestData = {
|
||
'common': common,
|
||
'params': params
|
||
}
|
||
return requestData
|
||
|
||
def task_api(self,api_options=None):
|
||
if api_options is None:
|
||
api_options = {}
|
||
try:
|
||
# 首先解析URL,获得主机名
|
||
host_name = api_options['url'].replace('//', '/').split('/')[1]
|
||
full_url = api_options['url']
|
||
if 'queryParam' in api_options:
|
||
# 如果存在查询参数,将其附加到URL
|
||
query_str = "&".join(f"{k}={v}" for k, v in api_options['queryParam'].items())
|
||
full_url += '?' + query_str
|
||
# 定义请求头
|
||
headers = {
|
||
'Host': host_name,
|
||
'Connection': 'keep-alive',
|
||
'User-Agent': self.UA,
|
||
'Authorization': self.token,
|
||
'Referer': 'https://tm-web.pin-dao.cn/',
|
||
'Origin': 'https://tm-web.pin-dao.cn'
|
||
}
|
||
# 准备请求体
|
||
data = None
|
||
if 'body' in api_options:
|
||
body = self.get_body()
|
||
body['params'].update(api_options['body'])
|
||
content_type = api_options.get('Content-Type', 'application/json')
|
||
headers['Content-Type'] = content_type
|
||
if 'json' in content_type:
|
||
data = json.dumps(body)
|
||
else:
|
||
data = "&".join(f"{k}={json.dumps(v) if isinstance(v, dict) else v}" for k, v in body.items())
|
||
headers['Content-Length'] = str(len(data))
|
||
# 如果有额外的URL参数或头部参数,合并到请求中
|
||
if 'urlObjectParam' in api_options:
|
||
# 这里根据需要处理urlObjectParam
|
||
pass
|
||
if 'headerParam' in api_options:
|
||
headers.update(api_options['headerParam'])
|
||
# 发出请求
|
||
response = requests.request(method=api_options.get('method', 'GET'), url=full_url, headers=headers,
|
||
data=data, timeout=20,verify=False)
|
||
# 打印状态码
|
||
if response.status_code != 200:
|
||
print(f"[{api_options.get('fn', 'unknown function')}]返回[{response.status_code}]")
|
||
|
||
# 解析结果
|
||
try:
|
||
result = response.json()
|
||
except ValueError:
|
||
result = response.text
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {}
|
||
|
||
def base_userinfo(self):
|
||
try:
|
||
api_options = {
|
||
'fn': 'baseUserinfo',
|
||
'method': 'post',
|
||
'url': 'https://tm-web.pin-dao.cn/user/base-userinfo',
|
||
'body': {}
|
||
}
|
||
response = self.task_api(api_options)
|
||
if response['code'] == 0:
|
||
# 登录成功的逻辑处理
|
||
phone = response['data']['phone']
|
||
self.phone = phone[:3] + "*" * 4 + phone[7:]
|
||
self.userId = response['data']['userId']
|
||
self.nickName = response['data']['nickName']
|
||
Log(f'账号[{self.index}]登录成功!\n手机号:[{self.phone}] ID[{self.userId}]')
|
||
# print(one_msg)
|
||
return True
|
||
else:
|
||
# 登录失败的逻辑处理
|
||
Log(f"账号登录失败: {response['message']}")
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def user_account(self):
|
||
try:
|
||
api_options = {
|
||
'fn': 'userAccount',
|
||
'method': 'post',
|
||
'url': 'https://tm-web.pin-dao.cn/user/account/user-account',
|
||
'body': {}
|
||
}
|
||
response = self.task_api(api_options)
|
||
if response['code'] == 0:
|
||
# 查询成功的逻辑处理
|
||
coin = response['data']['coin']
|
||
Log(f'账号[{self.index}]当前奈雪币: {coin}')
|
||
else:
|
||
# 查询失败的逻辑处理
|
||
Log(f'账号[{self.index}]查询失败')
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def sign_record(self):
|
||
try:
|
||
sign_date = datetime.datetime.now().replace(day=1).strftime('%Y-%m-%d')
|
||
today_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
api_options = {
|
||
'fn': 'signRecord',
|
||
'method': 'post',
|
||
'url': 'https://tm-web.pin-dao.cn/user/sign/records',
|
||
'body': {
|
||
'signDate': sign_date,
|
||
'startDate': today_date
|
||
}
|
||
}
|
||
response = self.task_api(api_options)
|
||
if response['code'] == 0:
|
||
Log(f"今天{'已' if response['data']['status'] else '未'}签到,已签到{response['data']['signCount']}天")
|
||
if not response['data']['status']:
|
||
self.sign_save()
|
||
else:
|
||
Log(f"查询签到失败: {response['message']}")
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def sign_save(self):
|
||
try:
|
||
sign_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
api_options = {
|
||
'fn': 'signSave',
|
||
'method': 'post',
|
||
'url': 'https://tm-web.pin-dao.cn/user/sign/save',
|
||
'body': {
|
||
'signDate': sign_date
|
||
}
|
||
}
|
||
response = self.task_api(api_options)
|
||
if response['code'] == 0:
|
||
if response['data']['flag']:
|
||
Log('签到成功')
|
||
else:
|
||
Log('今天已经签到过了')
|
||
else:
|
||
print(f"签到失败: {response['message']}")
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def main(self):
|
||
Log(f"\n开始执行第{self.index}个账号--------------->>>>>")
|
||
base_userinfo_result = self.base_userinfo()
|
||
if not base_userinfo_result:
|
||
Log("用户信息无效,请更新CK")
|
||
return False
|
||
self.sign_record()
|
||
self.user_account()
|
||
self.sendMsg()
|
||
return True
|
||
|
||
def sendMsg(self, help=False):
|
||
if self.send_UID:
|
||
push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME, help)
|
||
print(push_res)
|
||
|
||
|
||
def down_file(filename, file_url):
|
||
print(f'开始下载:{filename},下载地址:{file_url}')
|
||
try:
|
||
response = requests.get(file_url, verify=False, timeout=10)
|
||
response.raise_for_status()
|
||
with open(filename + '.tmp', 'wb') as f:
|
||
f.write(response.content)
|
||
print(f'【{filename}】下载完成!')
|
||
|
||
# 检查临时文件是否存在
|
||
temp_filename = filename + '.tmp'
|
||
if os.path.exists(temp_filename):
|
||
# 删除原有文件
|
||
if os.path.exists(filename):
|
||
os.remove(filename)
|
||
# 重命名临时文件
|
||
os.rename(temp_filename, filename)
|
||
print(f'【{filename}】重命名成功!')
|
||
return True
|
||
else:
|
||
print(f'【{filename}】临时文件不存在!')
|
||
return False
|
||
except Exception as e:
|
||
print(f'【{filename}】下载失败:{str(e)}')
|
||
return False
|
||
|
||
def import_Tools():
|
||
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
|
||
import CHERWIN_TOOLS
|
||
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
|
||
|
||
if __name__ == '__main__':
|
||
APP_NAME = '奈雪点单小程序'
|
||
ENV_NAME = 'NXDD'
|
||
CK_NAME = 'Authorization'
|
||
print(f'''
|
||
✨✨✨ {APP_NAME}签到✨✨✨
|
||
✨ 功能:
|
||
积分签到
|
||
✨ 抓包步骤:
|
||
打开{APP_NAME}
|
||
授权登陆
|
||
打开抓包工具
|
||
找请求头带{CK_NAME}的URl
|
||
复制里面的{CK_NAME}参数值【不要】前面的Bearer 【不要】前面的Bearer 【不要】前面的Bearer
|
||
参数示例:eyJhbGciOiJxxxxxxxxxxxx
|
||
✨ 设置青龙变量:
|
||
export {ENV_NAME}='{CK_NAME}参数值【不要】前面的Bearer'多账号#或&分割
|
||
export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启
|
||
✨ ✨ 注意:抓完CK没事儿别打开小程序,重新打开小程序请重新抓包
|
||
✨ 推荐cron:30 1 * * *
|
||
✨✨✨ @Author CHERWIN✨✨✨
|
||
''')
|
||
local_script_name = os.path.basename(__file__)
|
||
local_version = '2024.05.15'
|
||
if IS_DEV:
|
||
import_Tools()
|
||
else:
|
||
if os.path.isfile('CHERWIN_TOOLS.py'):
|
||
import_Tools()
|
||
else:
|
||
if down_file('CHERWIN_TOOLS.py', 'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py'):
|
||
print('脚本依赖下载完成请重新运行脚本')
|
||
import_Tools()
|
||
else:
|
||
print('脚本依赖下载失败,请到https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/CHERWIN_TOOLS.py下载最新版本依赖')
|
||
exit()
|
||
print(TIPS)
|
||
token = ''
|
||
token = ENV if ENV else token
|
||
if not token:
|
||
print(f"未填写{ENV_NAME}变量\n青龙可在环境变量设置 {ENV_NAME} 或者在本脚本文件上方将{CK_NAME}填入token =''")
|
||
exit()
|
||
tokens = CHERWIN_TOOLS.ENV_SPLIT(token)
|
||
# print(tokens)
|
||
if len(tokens) > 0:
|
||
print(f"\n>>>>>>>>>>共获取到{len(tokens)}个账号<<<<<<<<<<")
|
||
access_token = []
|
||
for index, infos in enumerate(tokens):
|
||
run_result = RUN(infos, index).main()
|
||
if not run_result: continue
|
||
if send: send(f'{APP_NAME}挂机通知', send_msg + TIPS_HTML)
|