diff --git a/tlmk.py b/tlmk.py deleted file mode 100644 index bd678dc..0000000 --- a/tlmk.py +++ /dev/null @@ -1,527 +0,0 @@ -import hashlib -import json -import os -import importlib.util -import random -import string -import subprocess -import sys -import time -import requests -from http import HTTPStatus -from datetime import datetime - -NOW_TOOLS_VERSION = '2024.05.27' -if os.path.isfile('DEV_ENV.py'): - import DEV_ENV - - IS_DEV = True -else: - IS_DEV = False - - -# 尝试导入包 -def import_or_install(package_name, import_name=None): - # 如果传入了 import_name,则使用它来检查模块,否则默认与包名相同 - import_name = import_name or package_name - try: - # 检查模块是否已安装 - package_spec = importlib.util.find_spec(import_name) - if package_spec is None: - print(f"{package_name} 模块未安装. 开始安装...") - subprocess.check_call([sys.executable, '-m', 'pip', 'install', package_name]) - print(f"{package_name} 模块安装完成。") - else: - print(f"{package_name} 模块已安装。") - # 尝试导入模块检查是否安装成功 - __import__(import_name) - module = importlib.import_module(import_name) - print(f"{import_name} 模块导入成功.") - return module - except ImportError as e: - print(f"无法导入 {import_name} 模块. 错误信息: {e}") - except subprocess.CalledProcessError as e: - print(f"安装 {package_name} 模块时出错. 错误信息: {e}") - except Exception as e: - print(f"处理 {package_name} 模块时发生错误. 错误信息: {e}") - - -def SAVE_INVITE_CODE(file_name, new_data): - # 读取现有JSON文件(如果存在) - try: - with open(file_name, 'r', encoding='utf-8') as file: - data = json.load(file) - except FileNotFoundError: - # 如果文件不存在,创建所需目录并一个新的空JSON文件 - directory = os.path.dirname(file_name) - if not os.path.exists(directory): - os.makedirs(directory) - data = {} - # 检查是否已存在相同的键,如果存在,合并数据 - for key, value in new_data.items(): - if key in data: - # 如果键已存在,将新数据合并到现有数据中 - data[key].update(value) - else: - # 如果键不存在,直接插入新数据 - data[key] = value - # 将更新后的数据写入JSON文件 - with open(file_name, 'w', encoding='utf-8') as file: - json.dump(data, file, indent=4) - - -# 将参数转换为字典 -def create_dict_from_string(self, data_string): - params = {} - key_value_pairs = data_string.split(',') - for pair in key_value_pairs: - key, value = pair.split('=') - params[key] = value - return params - - -def compare_versions(local_version, server_version): - local_parts = local_version.split('.') # 将本地版本号拆分成数字部分 - server_parts = server_version.split('.') # 将服务器版本号拆分成数字部分 - for l, s in zip(local_parts, server_parts): - if int(l) < int(s): - return True - # 当前版本低于服务器版本 - elif int(l) > int(s): - return False - # 当前版本高于服务器版本 - # 如果上述循环没有返回结果,则表示当前版本与服务器版本的数字部分完全相同 - if len(local_parts) < len(server_parts): - return True # 当前版本位数较短,即版本号形如 x.y 比 x.y.z 低 - else: - return False # 当前版本与服务器版本相同或更高 - - -def CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, server_version_url=None, - APP_NAME=None): - """ - 检查版本并更新 - - Args: - local_version (str): 本地版本号 - server_version_url (str): 服务器版本文件地址 - server_script_url (str): 服务器脚本地址 - script_filename (str): 要保存的脚本文件名 - - Returns: - bool: 是否进行了更新操作 - """ - print(f'当前检测:【{script_filename}】') - try: - if server_version_url: - # 获取服务器版本号 - response = requests.get(server_version_url, verify=False) - response.raise_for_status() # Raises an HTTPError for bad responses - # print(response.text) - server_version = response.text.strip() # 去除首尾空格 - if "code" in server_version: - print('【获取远程版本号失败,设为本地同版本】') - server_version = local_version - if not server_version: server_version = NOW_TOOLS_VERSION - print(f'本地版本:【{local_version}】') - print(f'服务器版本:【{server_version}】') - if compare_versions(local_version, server_version): - # 需要更新,下载服务器脚本 - AUTO_UPDATE = os.getenv("SCRIPT_UPDATE", "True").lower() != "false" - # print(AUTO_UPDATE) - if AUTO_UPDATE: - print(">>>>>>>发现新版本的脚本,默认自动更新,准备更新...") - print(">>>>>>>禁用更新请定义变量export SCRIPT_UPDATE = 'False'") - if down_file(script_filename, server_script_url): - print(f'请重新运行新脚本\n') - return True - else: - print(">>>>>>>发现新版本的脚本,您禁用了自动更新,如需启用请删除变量SCRIPT_UPDATE\n") - else: - print(f'无需更新\n') - return False - except requests.exceptions.RequestException as e: - print(f'发生网络错误:{e}') - server_base_url = f"https://py.cherwin.cn/{APP_NAME}/" - server_script_url = f"{server_base_url}{script_filename}" - CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, APP_NAME=APP_NAME) - except Exception as e: - print(f'发生未知错误:{e}') - return False # 返回 False 表示没有进行更新操作 - - -def down_file(filename, file_url): - print(f'开始下载:{filename},下载地址:{file_url}') - try: - response = requests.get(file_url, verify=False, timeout=10) - response.raise_for_status() - with open(filename + '.tmp', 'wb') as f: - f.write(response.content) - print(f'【{filename}】下载完成!') - # 检查临时文件是否存在 - temp_filename = filename + '.tmp' - if os.path.exists(temp_filename): - # 删除原有文件 - if os.path.exists(filename): - os.remove(filename) - # 重命名临时文件 - os.rename(temp_filename, filename) - print(f'【{filename}】重命名成功!') - return True - else: - print(f'【{filename}】临时文件不存在!') - return False - except Exception as e: - print(f'【{filename}】下载失败:{str(e)}') - return False - - -def get_AuthorInviteCode(url): - global AuthorCode - try: - response = requests.get(url, verify=False, timeout=10) - if response.status_code == 200: - content = json.loads(response.text) - AuthorCode = list(content.values()) - # print(f'获取到作者邀请码:{AuthorCode}') - return AuthorCode - else: - # print("无法获取文件。状态代码:", response.status_code) - return {} - except Exception as e: - print(f"An error occurred: {e}") - return {} - - -def CHECK_PARAMENTERS(index, input_string, required_parameters): - # required_parameters = ['deviceid', 'jysessionid', 'shopid', 'memberid', 'access_token', 'sign'] - - # 记录缺少的参数 - missing_parameters = [] - # 将输入字符串和参数列表中的所有字符都转换为小写 - input_string_lower = input_string.lower() - required_parameters_lower = [param.lower() for param in required_parameters] - # 判断字符串中是否包含所有必需的参数 - for param in required_parameters_lower: - if param not in input_string_lower: - missing_parameters.append(param) - if missing_parameters: - print(f"\n第【{index + 1}】个账号,缺少以下参数:【{missing_parameters}】") - return False - else: - print(f"\n第【{index + 1}】个账号,URL包含所有必需的参数,开始执行脚本") - return True - - -def QIANWEN(tongyiSysPromt, content, api_key): - print('开始调用通义千问') - # 检查dashscope库是否已安装 - dashscope = import_or_install('dashscope') - if dashscope: - dashscope.api_key = api_key - response = dashscope.Generation.call( - model='qwen-max', - messages=[ - {"role": "system", - "content": tongyiSysPromt}, - {"role": "user", "content": content}], - seed=1234, - top_p=0.8, - result_format='message', - enable_search=False, - max_tokens=1500, - temperature=1.0, - repetition_penalty=1.0, - ) - if response.status_code == HTTPStatus.OK: - # print(response) - video_info = response.output['choices'][0]['message']['content'] - print('通义生成【成功】!') - return video_info - else: - print(f"无法解析通义返回的信息:{response}") - return None - else: - print('dashscope 模块无法导入,函数无法执行。') - - -# 取环境变量,并分割 -def ENV_SPLIT(input_str): - parts = [] - if '&' in input_str: - amp_parts = input_str.split('&') - for part in amp_parts: - if '#' in part: - hash_parts = part.split('#') - for hash_part in hash_parts: - parts.append(hash_part) - else: - parts.append(part) - # print(parts) - return (parts) - - elif '#' in input_str: - hash_parts = input_str.split('#') - # print(hash_parts) - return (hash_parts) - else: - out_str = str(input_str) - # print([out_str]) - return ([out_str]) - - -# 使用导入的模块进行验证码识别 -def CAPCODE(captcha_slider, captcha_bg): - ddddocr = import_or_install('ddddocr') - if ddddocr: - slide = ddddocr.DdddOcr(det=False, ocr=False) - with open(captcha_slider, 'rb') as f: - target_bytes = f.read() - with open(captcha_bg, 'rb') as f: - background_bytes = f.read() - res = slide.slide_match(target_bytes, background_bytes, simple_target=True) - # print(res['target'][0]) - # print(type(res['target'][0])) - return res['target'][0] - else: - print('ddddocr 模块无法导入,函数无法执行。') - return False - - -def send_wxpusher(UID, one_msg, APP_NAME, help=False): - WXPUSHER = os.environ.get('WXPUSHER', False) - if WXPUSHER: - if help: - push_res = wxpusher(WXPUSHER, APP_NAME + '互助', one_msg, UID, TIPS_HTML) - else: - push_res = wxpusher(WXPUSHER, APP_NAME, one_msg, UID, TIPS_HTML) - print(push_res) - - -def wxpusher(UID, msg, title, help=False): - """利用 wxpusher 的 web api 发送 json 数据包,实现微信信息的发送""" - WXPUSHER = os.environ.get('WXPUSHER', False) - if WXPUSHER: - if help: title = title + '互助' - print('\n------开始wxpusher推送------') - print(f'标题:【{title}】\n内容:{msg}') - webapi = 'http://wxpusher.zjiecode.com/api/send/message' - msg = msg.replace("\n", "
") - # tips = TIPS_HTML.replace("\n", "
") - data = { - "appToken": WXPUSHER, - "content": f'{title}
{msg}
{TIPS_HTML}', - # "summary": msg[:99], # 该参数可选,默认为 msg 的前10个字符 - "summary": title, - "contentType": 2, - "uids": [UID], - "url": "https://gj.cherwin.cn" - } - try: - result = requests.post(url=webapi, json=data) - result.raise_for_status() # 对于非2xx状态码,抛出异常 - response_json = result.json() - if response_json["success"]: - return "------消息发送成功------\n" - else: - return f"消息发送失败。错误信息:{response_json['msg']}" - except requests.exceptions.RequestException as e: - return f"发送消息时发生错误:{str(e)}" - except Exception as e: - return f"发生意外错误:{str(e)}" - - -def RESTART_SCRIPT(RESTART_SCRIPT_NAME): - python = sys.executable - os.execl(python, RESTART_SCRIPT_NAME, *sys.argv[1:]) - - -def CHECK(): - global CHERWIN_SCRIPT_CONFIG - print('>>>>>>>开始获取版本信息...') - baseurl = 'https://py.cherwin.cn/' - TOOLS_NAME = 'CHERWIN_TOOLS.py' - server_script_url = f'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/{TOOLS_NAME}' - try: - response = requests.get(f'{baseurl}CHERWIN_SCRIPT_CONFIG.json', verify=False) - response.encoding = 'utf-8' - # 读取内容 - CHERWIN_SCRIPT_CONFIG = response.json() - if 'code' in CHERWIN_SCRIPT_CONFIG: - CHERWIN_SCRIPT_CONFIG = None - TOOLS_VERSION = CHERWIN_SCRIPT_CONFIG.get('TOOLS_VERSION', NOW_TOOLS_VERSION) - - if CHECK_UPDATE_NEW(NOW_TOOLS_VERSION, TOOLS_VERSION, server_script_url, TOOLS_NAME): - print('更新脚本完成') - # print(f'重新检测[{TOOLS_NAME}]版本') - return False - else: - return True - except: - print('获取CHERWIN_SCRIPT_CONFIG.json失败') - return False - - -def GJJJ_SIGN(): - app_id = "667516" - app_crypto = "FH3yRrHG2RfexND8" - timestamp = int(time.time() * 1000) - # timestamp = 1715180892075 - text = f"{app_id}{app_crypto}{timestamp}" - sign = hashlib.md5(text.encode()).hexdigest() - new_data = { - 'timestamp': str(timestamp), - "sign": sign - } - return new_data - - -def KWW_SIGN(memberId): - timestamp = int(time.time() * 1000) - random_num = random.randint(0, 31) - u = [ - "A", "Z", "B", "Y", "C", "X", "D", "T", "E", "S", "F", "R", "G", "Q", "H", "P", "I", "O", "J", "N", "k", - "M", "L", "a", "c", "d", "f", "h", "k", "p", "y", "n"] - r = f"{timestamp}{memberId}{u[random_num]}" - sign = hashlib.md5(r.encode()).hexdigest() - update_headers = { - "user-sign": sign, - "user-paramname": "memberId", - "user-timestamp": str(timestamp), - "user-random": str(random_num) - } - return update_headers - - -def TYQH_SIGN(parameters={}, body=None): - sorted_keys = sorted(parameters.keys()) - parameter_strings = [] - for key in sorted_keys: - if isinstance(parameters[key], dict): - parameter_strings.append(f"{key}={json.dumps(parameters[key])}") - else: - parameter_strings.append(f"{key}={parameters[key]}") - - current_time = int(datetime.now().timestamp() * 1000) - secret_chars = list('BxzTx45uIGT25TTHIIBU2') - last_three_digits = str(current_time)[-3:] - for digit in last_three_digits: - secret_chars.insert(int(digit), digit) - - secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest() - nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) - - sign_data = { - 'client_id': 'game', - 'nonstr': nonce_str, - 'timestamp': current_time, - 'body': json.dumps(body) if body else '', - 'query': '&'.join(parameter_strings) if parameter_strings else '', - 'secret': secret - } - - sign_string = '|'.join([str(v) for v in sign_data.values()]) - sign = hashlib.md5(sign_string.encode()).hexdigest().upper() - sign_header = { - 'client_id': 'game', - 'timestamp': str(current_time), - 'nonstr': sign_data['nonstr'], - 'sign': sign - } - return sign_header - - -def YDXQ_SIGN(): - sign_nonce = "tnFWIEFpVPJkOuNX4zdsKeBEMIakLS1RsnS7cH0Id6MjEEBGO" - n = str(int(time.time())) - # 拼接字符串并使用md5哈希。注意在Python中,需要对字符串编码才能生成哈希。 - sign_string = f"sign_{n}_sign{sign_nonce}" - sign_hash = hashlib.md5(sign_string.encode()).hexdigest() - return sign_hash, n - - -def HXEK_SIGN(memberId, appid): - # appid = "wxa1f1fa3785a47c7d" - secret = 'damogic8888' - # 获取GMT+8的当前时间戳 - timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - # timestamp = '2024-05-22 16:07:54' - # 生成随机数 - random_int = random.randint(1000000, 9999999) - # random_int = 4270745 - # 构建待加密字符串 - raw_string = f"timestamp={timestamp}transId={appid}{timestamp}secret={secret}random={random_int}memberId={memberId}" - # 使用MD5进行加密 - md5_hash = hashlib.md5(raw_string.encode()) - sign = md5_hash.hexdigest() - return sign, random_int, timestamp - - -def KPL_SIGN(url, params): - secret_key = "d19b9f22f5aac41ac0b56a1947f82bce" - # 提取URL路径(去掉域名部分) - url_path = url.replace("https://app.tv.kohesport.qq.com", "") - # 如果params是对象,转换为JSON字符串 - if isinstance(params, dict): - params_str = json.dumps(params, separators=(',', ':')) - else: - params_str = params - # 拼接路径、参数和密钥 - string_to_hash = f"{url_path}{params_str}{secret_key}" - # 计算SHA256哈希值 - signature = hashlib.sha256(string_to_hash.encode('utf-8')).hexdigest() - sign_header = { - "X-TGATV-SignatureMethod": "sha256", - "X-TGATV-SignatureVersion": "3", - "X-TGATV-Signature": signature - } - return sign_header - -def get_ip(): - response = requests.get('https://cdn.jsdelivr.net/gh/parserpp/ip_ports/proxyinfo.json',verify=False) - # 使用正则表达式提取 IP 地址和端口号 - data = response.text - lines = data.strip().split('\n') - # json_objects = [json.loads(line) for line in lines] - json_objects = [json.loads(line) for line in lines if json.loads(line)["country"] == "CN"] - # json_array = json.dumps(json_objects, indent=4) - if json_objects: - selected = random.choice(json_objects) - result = f"{selected['type']}://{selected['host']}:{selected['port']}" - - proxies = { - selected['type']: result, - } - print(f"当前代理:{result}") - return proxies - else: - print("没匹配到CN的ip") - return None - -def main(APP_NAME, local_script_name, ENV_NAME, local_version, need_invite=False): - global APP_INFO, TIPS, TIPS_HTML - git_url = f'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/{local_script_name}' - if CHECK(): - APP_INFO = CHERWIN_SCRIPT_CONFIG.get("APP_CONFIG", {}).get(ENV_NAME, {}) - # print(APP_INFO) - server_version = APP_INFO.get('NEW_VERSION', '') - if CHECK_UPDATE_NEW(local_version, server_version, git_url, local_script_name, APP_NAME=APP_NAME): - print('更新成功,请重新运行脚本!') - - if not APP_INFO.get('ENABLE', False) and not IS_DEV: - print('当前脚本未开放') - exit() - TIPS = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC', '') - TIPS_HTML = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC_HTML','') - ENV = os.environ.get(ENV_NAME) - if need_invite: - AuthorCode = get_AuthorInviteCode(f'https://yhsh.ziyuand.cn/{ENV_NAME}_INVITE_CODE.json') - else: - AuthorCode = '' - return ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode - else: - exit() - - -if __name__ == '__main__': - print(NOW_TOOLS_VERSION)