Delete tlmk.py

This commit is contained in:
3288588344
2024-06-27 12:18:48 +08:00
committed by GitHub
parent b45a03ab5b
commit 75101f2afd

527
tlmk.py
View File

@@ -1,527 +0,0 @@
import hashlib
import json
import os
import importlib.util
import random
import string
import subprocess
import sys
import time
import requests
from http import HTTPStatus
from datetime import datetime
NOW_TOOLS_VERSION = '2024.05.27'
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
else:
IS_DEV = False
# 尝试导入包
def import_or_install(package_name, import_name=None):
# 如果传入了 import_name则使用它来检查模块否则默认与包名相同
import_name = import_name or package_name
try:
# 检查模块是否已安装
package_spec = importlib.util.find_spec(import_name)
if package_spec is None:
print(f"{package_name} 模块未安装. 开始安装...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package_name])
print(f"{package_name} 模块安装完成。")
else:
print(f"{package_name} 模块已安装。")
# 尝试导入模块检查是否安装成功
__import__(import_name)
module = importlib.import_module(import_name)
print(f"{import_name} 模块导入成功.")
return module
except ImportError as e:
print(f"无法导入 {import_name} 模块. 错误信息: {e}")
except subprocess.CalledProcessError as e:
print(f"安装 {package_name} 模块时出错. 错误信息: {e}")
except Exception as e:
print(f"处理 {package_name} 模块时发生错误. 错误信息: {e}")
def SAVE_INVITE_CODE(file_name, new_data):
# 读取现有JSON文件如果存在
try:
with open(file_name, 'r', encoding='utf-8') as file:
data = json.load(file)
except FileNotFoundError:
# 如果文件不存在创建所需目录并一个新的空JSON文件
directory = os.path.dirname(file_name)
if not os.path.exists(directory):
os.makedirs(directory)
data = {}
# 检查是否已存在相同的键,如果存在,合并数据
for key, value in new_data.items():
if key in data:
# 如果键已存在,将新数据合并到现有数据中
data[key].update(value)
else:
# 如果键不存在,直接插入新数据
data[key] = value
# 将更新后的数据写入JSON文件
with open(file_name, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=4)
# 将参数转换为字典
def create_dict_from_string(self, data_string):
params = {}
key_value_pairs = data_string.split(',')
for pair in key_value_pairs:
key, value = pair.split('=')
params[key] = value
return params
def compare_versions(local_version, server_version):
local_parts = local_version.split('.') # 将本地版本号拆分成数字部分
server_parts = server_version.split('.') # 将服务器版本号拆分成数字部分
for l, s in zip(local_parts, server_parts):
if int(l) < int(s):
return True
# 当前版本低于服务器版本
elif int(l) > int(s):
return False
# 当前版本高于服务器版本
# 如果上述循环没有返回结果,则表示当前版本与服务器版本的数字部分完全相同
if len(local_parts) < len(server_parts):
return True # 当前版本位数较短,即版本号形如 x.y 比 x.y.z 低
else:
return False # 当前版本与服务器版本相同或更高
def CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, server_version_url=None,
APP_NAME=None):
"""
检查版本并更新
Args:
local_version (str): 本地版本号
server_version_url (str): 服务器版本文件地址
server_script_url (str): 服务器脚本地址
script_filename (str): 要保存的脚本文件名
Returns:
bool: 是否进行了更新操作
"""
print(f'当前检测:【{script_filename}')
try:
if server_version_url:
# 获取服务器版本号
response = requests.get(server_version_url, verify=False)
response.raise_for_status() # Raises an HTTPError for bad responses
# print(response.text)
server_version = response.text.strip() # 去除首尾空格
if "code" in server_version:
print('【获取远程版本号失败,设为本地同版本】')
server_version = local_version
if not server_version: server_version = NOW_TOOLS_VERSION
print(f'本地版本:【{local_version}')
print(f'服务器版本:【{server_version}')
if compare_versions(local_version, server_version):
# 需要更新,下载服务器脚本
AUTO_UPDATE = os.getenv("SCRIPT_UPDATE", "True").lower() != "false"
# print(AUTO_UPDATE)
if AUTO_UPDATE:
print(">>>>>>>发现新版本的脚本,默认自动更新,准备更新...")
print(">>>>>>>禁用更新请定义变量export SCRIPT_UPDATE = 'False'")
if down_file(script_filename, server_script_url):
print(f'请重新运行新脚本\n')
return True
else:
print(">>>>>>>发现新版本的脚本您禁用了自动更新如需启用请删除变量SCRIPT_UPDATE\n")
else:
print(f'无需更新\n')
return False
except requests.exceptions.RequestException as e:
print(f'发生网络错误:{e}')
server_base_url = f"https://py.cherwin.cn/{APP_NAME}/"
server_script_url = f"{server_base_url}{script_filename}"
CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, APP_NAME=APP_NAME)
except Exception as e:
print(f'发生未知错误:{e}')
return False # 返回 False 表示没有进行更新操作
def down_file(filename, file_url):
print(f'开始下载:{filename},下载地址:{file_url}')
try:
response = requests.get(file_url, verify=False, timeout=10)
response.raise_for_status()
with open(filename + '.tmp', 'wb') as f:
f.write(response.content)
print(f'{filename}】下载完成!')
# 检查临时文件是否存在
temp_filename = filename + '.tmp'
if os.path.exists(temp_filename):
# 删除原有文件
if os.path.exists(filename):
os.remove(filename)
# 重命名临时文件
os.rename(temp_filename, filename)
print(f'{filename}】重命名成功!')
return True
else:
print(f'{filename}】临时文件不存在!')
return False
except Exception as e:
print(f'{filename}】下载失败:{str(e)}')
return False
def get_AuthorInviteCode(url):
global AuthorCode
try:
response = requests.get(url, verify=False, timeout=10)
if response.status_code == 200:
content = json.loads(response.text)
AuthorCode = list(content.values())
# print(f'获取到作者邀请码:{AuthorCode}')
return AuthorCode
else:
# print("无法获取文件。状态代码:", response.status_code)
return {}
except Exception as e:
print(f"An error occurred: {e}")
return {}
def CHECK_PARAMENTERS(index, input_string, required_parameters):
# required_parameters = ['deviceid', 'jysessionid', 'shopid', 'memberid', 'access_token', 'sign']
# 记录缺少的参数
missing_parameters = []
# 将输入字符串和参数列表中的所有字符都转换为小写
input_string_lower = input_string.lower()
required_parameters_lower = [param.lower() for param in required_parameters]
# 判断字符串中是否包含所有必需的参数
for param in required_parameters_lower:
if param not in input_string_lower:
missing_parameters.append(param)
if missing_parameters:
print(f"\n第【{index + 1}】个账号,缺少以下参数:【{missing_parameters}")
return False
else:
print(f"\n第【{index + 1}】个账号URL包含所有必需的参数开始执行脚本")
return True
def QIANWEN(tongyiSysPromt, content, api_key):
print('开始调用通义千问')
# 检查dashscope库是否已安装
dashscope = import_or_install('dashscope')
if dashscope:
dashscope.api_key = api_key
response = dashscope.Generation.call(
model='qwen-max',
messages=[
{"role": "system",
"content": tongyiSysPromt},
{"role": "user", "content": content}],
seed=1234,
top_p=0.8,
result_format='message',
enable_search=False,
max_tokens=1500,
temperature=1.0,
repetition_penalty=1.0,
)
if response.status_code == HTTPStatus.OK:
# print(response)
video_info = response.output['choices'][0]['message']['content']
print('通义生成【成功】!')
return video_info
else:
print(f"无法解析通义返回的信息:{response}")
return None
else:
print('dashscope 模块无法导入,函数无法执行。')
# 取环境变量,并分割
def ENV_SPLIT(input_str):
parts = []
if '&' in input_str:
amp_parts = input_str.split('&')
for part in amp_parts:
if '#' in part:
hash_parts = part.split('#')
for hash_part in hash_parts:
parts.append(hash_part)
else:
parts.append(part)
# print(parts)
return (parts)
elif '#' in input_str:
hash_parts = input_str.split('#')
# print(hash_parts)
return (hash_parts)
else:
out_str = str(input_str)
# print([out_str])
return ([out_str])
# 使用导入的模块进行验证码识别
def CAPCODE(captcha_slider, captcha_bg):
ddddocr = import_or_install('ddddocr')
if ddddocr:
slide = ddddocr.DdddOcr(det=False, ocr=False)
with open(captcha_slider, 'rb') as f:
target_bytes = f.read()
with open(captcha_bg, 'rb') as f:
background_bytes = f.read()
res = slide.slide_match(target_bytes, background_bytes, simple_target=True)
# print(res['target'][0])
# print(type(res['target'][0]))
return res['target'][0]
else:
print('ddddocr 模块无法导入,函数无法执行。')
return False
def send_wxpusher(UID, one_msg, APP_NAME, help=False):
WXPUSHER = os.environ.get('WXPUSHER', False)
if WXPUSHER:
if help:
push_res = wxpusher(WXPUSHER, APP_NAME + '互助', one_msg, UID, TIPS_HTML)
else:
push_res = wxpusher(WXPUSHER, APP_NAME, one_msg, UID, TIPS_HTML)
print(push_res)
def wxpusher(UID, msg, title, help=False):
"""利用 wxpusher 的 web api 发送 json 数据包,实现微信信息的发送"""
WXPUSHER = os.environ.get('WXPUSHER', False)
if WXPUSHER:
if help: title = title + '互助'
print('\n------开始wxpusher推送------')
print(f'标题:【{title}\n内容:{msg}')
webapi = 'http://wxpusher.zjiecode.com/api/send/message'
msg = msg.replace("\n", "<br>")
# tips = TIPS_HTML.replace("\n", "<br>")
data = {
"appToken": WXPUSHER,
"content": f'{title}<br>{msg}<br>{TIPS_HTML}',
# "summary": msg[:99], # 该参数可选,默认为 msg 的前10个字符
"summary": title,
"contentType": 2,
"uids": [UID],
"url": "https://gj.cherwin.cn"
}
try:
result = requests.post(url=webapi, json=data)
result.raise_for_status() # 对于非2xx状态码抛出异常
response_json = result.json()
if response_json["success"]:
return "------消息发送成功------\n"
else:
return f"消息发送失败。错误信息:{response_json['msg']}"
except requests.exceptions.RequestException as e:
return f"发送消息时发生错误:{str(e)}"
except Exception as e:
return f"发生意外错误:{str(e)}"
def RESTART_SCRIPT(RESTART_SCRIPT_NAME):
python = sys.executable
os.execl(python, RESTART_SCRIPT_NAME, *sys.argv[1:])
def CHECK():
global CHERWIN_SCRIPT_CONFIG
print('>>>>>>>开始获取版本信息...')
baseurl = 'https://py.cherwin.cn/'
TOOLS_NAME = 'CHERWIN_TOOLS.py'
server_script_url = f'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/{TOOLS_NAME}'
try:
response = requests.get(f'{baseurl}CHERWIN_SCRIPT_CONFIG.json', verify=False)
response.encoding = 'utf-8'
# 读取内容
CHERWIN_SCRIPT_CONFIG = response.json()
if 'code' in CHERWIN_SCRIPT_CONFIG:
CHERWIN_SCRIPT_CONFIG = None
TOOLS_VERSION = CHERWIN_SCRIPT_CONFIG.get('TOOLS_VERSION', NOW_TOOLS_VERSION)
if CHECK_UPDATE_NEW(NOW_TOOLS_VERSION, TOOLS_VERSION, server_script_url, TOOLS_NAME):
print('更新脚本完成')
# print(f'重新检测[{TOOLS_NAME}]版本')
return False
else:
return True
except:
print('获取CHERWIN_SCRIPT_CONFIG.json失败')
return False
def GJJJ_SIGN():
app_id = "667516"
app_crypto = "FH3yRrHG2RfexND8"
timestamp = int(time.time() * 1000)
# timestamp = 1715180892075
text = f"{app_id}{app_crypto}{timestamp}"
sign = hashlib.md5(text.encode()).hexdigest()
new_data = {
'timestamp': str(timestamp),
"sign": sign
}
return new_data
def KWW_SIGN(memberId):
timestamp = int(time.time() * 1000)
random_num = random.randint(0, 31)
u = [
"A", "Z", "B", "Y", "C", "X", "D", "T", "E", "S", "F", "R", "G", "Q", "H", "P", "I", "O", "J", "N", "k",
"M", "L", "a", "c", "d", "f", "h", "k", "p", "y", "n"]
r = f"{timestamp}{memberId}{u[random_num]}"
sign = hashlib.md5(r.encode()).hexdigest()
update_headers = {
"user-sign": sign,
"user-paramname": "memberId",
"user-timestamp": str(timestamp),
"user-random": str(random_num)
}
return update_headers
def TYQH_SIGN(parameters={}, body=None):
sorted_keys = sorted(parameters.keys())
parameter_strings = []
for key in sorted_keys:
if isinstance(parameters[key], dict):
parameter_strings.append(f"{key}={json.dumps(parameters[key])}")
else:
parameter_strings.append(f"{key}={parameters[key]}")
current_time = int(datetime.now().timestamp() * 1000)
secret_chars = list('BxzTx45uIGT25TTHIIBU2')
last_three_digits = str(current_time)[-3:]
for digit in last_three_digits:
secret_chars.insert(int(digit), digit)
secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest()
nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
sign_data = {
'client_id': 'game',
'nonstr': nonce_str,
'timestamp': current_time,
'body': json.dumps(body) if body else '',
'query': '&'.join(parameter_strings) if parameter_strings else '',
'secret': secret
}
sign_string = '|'.join([str(v) for v in sign_data.values()])
sign = hashlib.md5(sign_string.encode()).hexdigest().upper()
sign_header = {
'client_id': 'game',
'timestamp': str(current_time),
'nonstr': sign_data['nonstr'],
'sign': sign
}
return sign_header
def YDXQ_SIGN():
sign_nonce = "tnFWIEFpVPJkOuNX4zdsKeBEMIakLS1RsnS7cH0Id6MjEEBGO"
n = str(int(time.time()))
# 拼接字符串并使用md5哈希。注意在Python中需要对字符串编码才能生成哈希。
sign_string = f"sign_{n}_sign{sign_nonce}"
sign_hash = hashlib.md5(sign_string.encode()).hexdigest()
return sign_hash, n
def HXEK_SIGN(memberId, appid):
# appid = "wxa1f1fa3785a47c7d"
secret = 'damogic8888'
# 获取GMT+8的当前时间戳
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# timestamp = '2024-05-22 16:07:54'
# 生成随机数
random_int = random.randint(1000000, 9999999)
# random_int = 4270745
# 构建待加密字符串
raw_string = f"timestamp={timestamp}transId={appid}{timestamp}secret={secret}random={random_int}memberId={memberId}"
# 使用MD5进行加密
md5_hash = hashlib.md5(raw_string.encode())
sign = md5_hash.hexdigest()
return sign, random_int, timestamp
def KPL_SIGN(url, params):
secret_key = "d19b9f22f5aac41ac0b56a1947f82bce"
# 提取URL路径去掉域名部分
url_path = url.replace("https://app.tv.kohesport.qq.com", "")
# 如果params是对象转换为JSON字符串
if isinstance(params, dict):
params_str = json.dumps(params, separators=(',', ':'))
else:
params_str = params
# 拼接路径、参数和密钥
string_to_hash = f"{url_path}{params_str}{secret_key}"
# 计算SHA256哈希值
signature = hashlib.sha256(string_to_hash.encode('utf-8')).hexdigest()
sign_header = {
"X-TGATV-SignatureMethod": "sha256",
"X-TGATV-SignatureVersion": "3",
"X-TGATV-Signature": signature
}
return sign_header
def get_ip():
response = requests.get('https://cdn.jsdelivr.net/gh/parserpp/ip_ports/proxyinfo.json',verify=False)
# 使用正则表达式提取 IP 地址和端口号
data = response.text
lines = data.strip().split('\n')
# json_objects = [json.loads(line) for line in lines]
json_objects = [json.loads(line) for line in lines if json.loads(line)["country"] == "CN"]
# json_array = json.dumps(json_objects, indent=4)
if json_objects:
selected = random.choice(json_objects)
result = f"{selected['type']}://{selected['host']}:{selected['port']}"
proxies = {
selected['type']: result,
}
print(f"当前代理:{result}")
return proxies
else:
print("没匹配到CN的ip")
return None
def main(APP_NAME, local_script_name, ENV_NAME, local_version, need_invite=False):
global APP_INFO, TIPS, TIPS_HTML
git_url = f'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/{local_script_name}'
if CHECK():
APP_INFO = CHERWIN_SCRIPT_CONFIG.get("APP_CONFIG", {}).get(ENV_NAME, {})
# print(APP_INFO)
server_version = APP_INFO.get('NEW_VERSION', '')
if CHECK_UPDATE_NEW(local_version, server_version, git_url, local_script_name, APP_NAME=APP_NAME):
print('更新成功,请重新运行脚本!')
if not APP_INFO.get('ENABLE', False) and not IS_DEV:
print('当前脚本未开放')
exit()
TIPS = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC', '')
TIPS_HTML = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC_HTML','')
ENV = os.environ.get(ENV_NAME)
if need_invite:
AuthorCode = get_AuthorInviteCode(f'https://yhsh.ziyuand.cn/{ENV_NAME}_INVITE_CODE.json')
else:
AuthorCode = ''
return ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
else:
exit()
if __name__ == '__main__':
print(NOW_TOOLS_VERSION)