From d16338a90b30cf4f33f8ffa00c1c8732c6df05c0 Mon Sep 17 00:00:00 2001 From: 3288588344 <127068117+3288588344@users.noreply.github.com> Date: Thu, 27 Jun 2024 12:19:27 +0800 Subject: [PATCH] Add files via upload --- CHERWIN_TOOLS.py | 527 +++++++++++++++++++++++++++++++++++++++++++++++ 顺丰速运.py | 6 +- 2 files changed, 530 insertions(+), 3 deletions(-) create mode 100644 CHERWIN_TOOLS.py diff --git a/CHERWIN_TOOLS.py b/CHERWIN_TOOLS.py new file mode 100644 index 0000000..bd678dc --- /dev/null +++ b/CHERWIN_TOOLS.py @@ -0,0 +1,527 @@ +import hashlib +import json +import os +import importlib.util +import random +import string +import subprocess +import sys +import time +import requests +from http import HTTPStatus +from datetime import datetime + +NOW_TOOLS_VERSION = '2024.05.27' +if os.path.isfile('DEV_ENV.py'): + import DEV_ENV + + IS_DEV = True +else: + IS_DEV = False + + +# 尝试导入包 +def import_or_install(package_name, import_name=None): + # 如果传入了 import_name,则使用它来检查模块,否则默认与包名相同 + import_name = import_name or package_name + try: + # 检查模块是否已安装 + package_spec = importlib.util.find_spec(import_name) + if package_spec is None: + print(f"{package_name} 模块未安装. 开始安装...") + subprocess.check_call([sys.executable, '-m', 'pip', 'install', package_name]) + print(f"{package_name} 模块安装完成。") + else: + print(f"{package_name} 模块已安装。") + # 尝试导入模块检查是否安装成功 + __import__(import_name) + module = importlib.import_module(import_name) + print(f"{import_name} 模块导入成功.") + return module + except ImportError as e: + print(f"无法导入 {import_name} 模块. 错误信息: {e}") + except subprocess.CalledProcessError as e: + print(f"安装 {package_name} 模块时出错. 错误信息: {e}") + except Exception as e: + print(f"处理 {package_name} 模块时发生错误. 错误信息: {e}") + + +def SAVE_INVITE_CODE(file_name, new_data): + # 读取现有JSON文件(如果存在) + try: + with open(file_name, 'r', encoding='utf-8') as file: + data = json.load(file) + except FileNotFoundError: + # 如果文件不存在,创建所需目录并一个新的空JSON文件 + directory = os.path.dirname(file_name) + if not os.path.exists(directory): + os.makedirs(directory) + data = {} + # 检查是否已存在相同的键,如果存在,合并数据 + for key, value in new_data.items(): + if key in data: + # 如果键已存在,将新数据合并到现有数据中 + data[key].update(value) + else: + # 如果键不存在,直接插入新数据 + data[key] = value + # 将更新后的数据写入JSON文件 + with open(file_name, 'w', encoding='utf-8') as file: + json.dump(data, file, indent=4) + + +# 将参数转换为字典 +def create_dict_from_string(self, data_string): + params = {} + key_value_pairs = data_string.split(',') + for pair in key_value_pairs: + key, value = pair.split('=') + params[key] = value + return params + + +def compare_versions(local_version, server_version): + local_parts = local_version.split('.') # 将本地版本号拆分成数字部分 + server_parts = server_version.split('.') # 将服务器版本号拆分成数字部分 + for l, s in zip(local_parts, server_parts): + if int(l) < int(s): + return True + # 当前版本低于服务器版本 + elif int(l) > int(s): + return False + # 当前版本高于服务器版本 + # 如果上述循环没有返回结果,则表示当前版本与服务器版本的数字部分完全相同 + if len(local_parts) < len(server_parts): + return True # 当前版本位数较短,即版本号形如 x.y 比 x.y.z 低 + else: + return False # 当前版本与服务器版本相同或更高 + + +def CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, server_version_url=None, + APP_NAME=None): + """ + 检查版本并更新 + + Args: + local_version (str): 本地版本号 + server_version_url (str): 服务器版本文件地址 + server_script_url (str): 服务器脚本地址 + script_filename (str): 要保存的脚本文件名 + + Returns: + bool: 是否进行了更新操作 + """ + print(f'当前检测:【{script_filename}】') + try: + if server_version_url: + # 获取服务器版本号 + response = requests.get(server_version_url, verify=False) + response.raise_for_status() # Raises an HTTPError for bad responses + # print(response.text) + server_version = response.text.strip() # 去除首尾空格 + if "code" in server_version: + print('【获取远程版本号失败,设为本地同版本】') + server_version = local_version + if not server_version: server_version = NOW_TOOLS_VERSION + print(f'本地版本:【{local_version}】') + print(f'服务器版本:【{server_version}】') + if compare_versions(local_version, server_version): + # 需要更新,下载服务器脚本 + AUTO_UPDATE = os.getenv("SCRIPT_UPDATE", "True").lower() != "false" + # print(AUTO_UPDATE) + if AUTO_UPDATE: + print(">>>>>>>发现新版本的脚本,默认自动更新,准备更新...") + print(">>>>>>>禁用更新请定义变量export SCRIPT_UPDATE = 'False'") + if down_file(script_filename, server_script_url): + print(f'请重新运行新脚本\n') + return True + else: + print(">>>>>>>发现新版本的脚本,您禁用了自动更新,如需启用请删除变量SCRIPT_UPDATE\n") + else: + print(f'无需更新\n') + return False + except requests.exceptions.RequestException as e: + print(f'发生网络错误:{e}') + server_base_url = f"https://py.cherwin.cn/{APP_NAME}/" + server_script_url = f"{server_base_url}{script_filename}" + CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, APP_NAME=APP_NAME) + except Exception as e: + print(f'发生未知错误:{e}') + return False # 返回 False 表示没有进行更新操作 + + +def down_file(filename, file_url): + print(f'开始下载:{filename},下载地址:{file_url}') + try: + response = requests.get(file_url, verify=False, timeout=10) + response.raise_for_status() + with open(filename + '.tmp', 'wb') as f: + f.write(response.content) + print(f'【{filename}】下载完成!') + # 检查临时文件是否存在 + temp_filename = filename + '.tmp' + if os.path.exists(temp_filename): + # 删除原有文件 + if os.path.exists(filename): + os.remove(filename) + # 重命名临时文件 + os.rename(temp_filename, filename) + print(f'【{filename}】重命名成功!') + return True + else: + print(f'【{filename}】临时文件不存在!') + return False + except Exception as e: + print(f'【{filename}】下载失败:{str(e)}') + return False + + +def get_AuthorInviteCode(url): + global AuthorCode + try: + response = requests.get(url, verify=False, timeout=10) + if response.status_code == 200: + content = json.loads(response.text) + AuthorCode = list(content.values()) + # print(f'获取到作者邀请码:{AuthorCode}') + return AuthorCode + else: + # print("无法获取文件。状态代码:", response.status_code) + return {} + except Exception as e: + print(f"An error occurred: {e}") + return {} + + +def CHECK_PARAMENTERS(index, input_string, required_parameters): + # required_parameters = ['deviceid', 'jysessionid', 'shopid', 'memberid', 'access_token', 'sign'] + + # 记录缺少的参数 + missing_parameters = [] + # 将输入字符串和参数列表中的所有字符都转换为小写 + input_string_lower = input_string.lower() + required_parameters_lower = [param.lower() for param in required_parameters] + # 判断字符串中是否包含所有必需的参数 + for param in required_parameters_lower: + if param not in input_string_lower: + missing_parameters.append(param) + if missing_parameters: + print(f"\n第【{index + 1}】个账号,缺少以下参数:【{missing_parameters}】") + return False + else: + print(f"\n第【{index + 1}】个账号,URL包含所有必需的参数,开始执行脚本") + return True + + +def QIANWEN(tongyiSysPromt, content, api_key): + print('开始调用通义千问') + # 检查dashscope库是否已安装 + dashscope = import_or_install('dashscope') + if dashscope: + dashscope.api_key = api_key + response = dashscope.Generation.call( + model='qwen-max', + messages=[ + {"role": "system", + "content": tongyiSysPromt}, + {"role": "user", "content": content}], + seed=1234, + top_p=0.8, + result_format='message', + enable_search=False, + max_tokens=1500, + temperature=1.0, + repetition_penalty=1.0, + ) + if response.status_code == HTTPStatus.OK: + # print(response) + video_info = response.output['choices'][0]['message']['content'] + print('通义生成【成功】!') + return video_info + else: + print(f"无法解析通义返回的信息:{response}") + return None + else: + print('dashscope 模块无法导入,函数无法执行。') + + +# 取环境变量,并分割 +def ENV_SPLIT(input_str): + parts = [] + if '&' in input_str: + amp_parts = input_str.split('&') + for part in amp_parts: + if '#' in part: + hash_parts = part.split('#') + for hash_part in hash_parts: + parts.append(hash_part) + else: + parts.append(part) + # print(parts) + return (parts) + + elif '#' in input_str: + hash_parts = input_str.split('#') + # print(hash_parts) + return (hash_parts) + else: + out_str = str(input_str) + # print([out_str]) + return ([out_str]) + + +# 使用导入的模块进行验证码识别 +def CAPCODE(captcha_slider, captcha_bg): + ddddocr = import_or_install('ddddocr') + if ddddocr: + slide = ddddocr.DdddOcr(det=False, ocr=False) + with open(captcha_slider, 'rb') as f: + target_bytes = f.read() + with open(captcha_bg, 'rb') as f: + background_bytes = f.read() + res = slide.slide_match(target_bytes, background_bytes, simple_target=True) + # print(res['target'][0]) + # print(type(res['target'][0])) + return res['target'][0] + else: + print('ddddocr 模块无法导入,函数无法执行。') + return False + + +def send_wxpusher(UID, one_msg, APP_NAME, help=False): + WXPUSHER = os.environ.get('WXPUSHER', False) + if WXPUSHER: + if help: + push_res = wxpusher(WXPUSHER, APP_NAME + '互助', one_msg, UID, TIPS_HTML) + else: + push_res = wxpusher(WXPUSHER, APP_NAME, one_msg, UID, TIPS_HTML) + print(push_res) + + +def wxpusher(UID, msg, title, help=False): + """利用 wxpusher 的 web api 发送 json 数据包,实现微信信息的发送""" + WXPUSHER = os.environ.get('WXPUSHER', False) + if WXPUSHER: + if help: title = title + '互助' + print('\n------开始wxpusher推送------') + print(f'标题:【{title}】\n内容:{msg}') + webapi = 'http://wxpusher.zjiecode.com/api/send/message' + msg = msg.replace("\n", "
") + # tips = TIPS_HTML.replace("\n", "
") + data = { + "appToken": WXPUSHER, + "content": f'{title}
{msg}
{TIPS_HTML}', + # "summary": msg[:99], # 该参数可选,默认为 msg 的前10个字符 + "summary": title, + "contentType": 2, + "uids": [UID], + "url": "https://gj.cherwin.cn" + } + try: + result = requests.post(url=webapi, json=data) + result.raise_for_status() # 对于非2xx状态码,抛出异常 + response_json = result.json() + if response_json["success"]: + return "------消息发送成功------\n" + else: + return f"消息发送失败。错误信息:{response_json['msg']}" + except requests.exceptions.RequestException as e: + return f"发送消息时发生错误:{str(e)}" + except Exception as e: + return f"发生意外错误:{str(e)}" + + +def RESTART_SCRIPT(RESTART_SCRIPT_NAME): + python = sys.executable + os.execl(python, RESTART_SCRIPT_NAME, *sys.argv[1:]) + + +def CHECK(): + global CHERWIN_SCRIPT_CONFIG + print('>>>>>>>开始获取版本信息...') + baseurl = 'https://py.cherwin.cn/' + TOOLS_NAME = 'CHERWIN_TOOLS.py' + server_script_url = f'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/{TOOLS_NAME}' + try: + response = requests.get(f'{baseurl}CHERWIN_SCRIPT_CONFIG.json', verify=False) + response.encoding = 'utf-8' + # 读取内容 + CHERWIN_SCRIPT_CONFIG = response.json() + if 'code' in CHERWIN_SCRIPT_CONFIG: + CHERWIN_SCRIPT_CONFIG = None + TOOLS_VERSION = CHERWIN_SCRIPT_CONFIG.get('TOOLS_VERSION', NOW_TOOLS_VERSION) + + if CHECK_UPDATE_NEW(NOW_TOOLS_VERSION, TOOLS_VERSION, server_script_url, TOOLS_NAME): + print('更新脚本完成') + # print(f'重新检测[{TOOLS_NAME}]版本') + return False + else: + return True + except: + print('获取CHERWIN_SCRIPT_CONFIG.json失败') + return False + + +def GJJJ_SIGN(): + app_id = "667516" + app_crypto = "FH3yRrHG2RfexND8" + timestamp = int(time.time() * 1000) + # timestamp = 1715180892075 + text = f"{app_id}{app_crypto}{timestamp}" + sign = hashlib.md5(text.encode()).hexdigest() + new_data = { + 'timestamp': str(timestamp), + "sign": sign + } + return new_data + + +def KWW_SIGN(memberId): + timestamp = int(time.time() * 1000) + random_num = random.randint(0, 31) + u = [ + "A", "Z", "B", "Y", "C", "X", "D", "T", "E", "S", "F", "R", "G", "Q", "H", "P", "I", "O", "J", "N", "k", + "M", "L", "a", "c", "d", "f", "h", "k", "p", "y", "n"] + r = f"{timestamp}{memberId}{u[random_num]}" + sign = hashlib.md5(r.encode()).hexdigest() + update_headers = { + "user-sign": sign, + "user-paramname": "memberId", + "user-timestamp": str(timestamp), + "user-random": str(random_num) + } + return update_headers + + +def TYQH_SIGN(parameters={}, body=None): + sorted_keys = sorted(parameters.keys()) + parameter_strings = [] + for key in sorted_keys: + if isinstance(parameters[key], dict): + parameter_strings.append(f"{key}={json.dumps(parameters[key])}") + else: + parameter_strings.append(f"{key}={parameters[key]}") + + current_time = int(datetime.now().timestamp() * 1000) + secret_chars = list('BxzTx45uIGT25TTHIIBU2') + last_three_digits = str(current_time)[-3:] + for digit in last_three_digits: + secret_chars.insert(int(digit), digit) + + secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest() + nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) + + sign_data = { + 'client_id': 'game', + 'nonstr': nonce_str, + 'timestamp': current_time, + 'body': json.dumps(body) if body else '', + 'query': '&'.join(parameter_strings) if parameter_strings else '', + 'secret': secret + } + + sign_string = '|'.join([str(v) for v in sign_data.values()]) + sign = hashlib.md5(sign_string.encode()).hexdigest().upper() + sign_header = { + 'client_id': 'game', + 'timestamp': str(current_time), + 'nonstr': sign_data['nonstr'], + 'sign': sign + } + return sign_header + + +def YDXQ_SIGN(): + sign_nonce = "tnFWIEFpVPJkOuNX4zdsKeBEMIakLS1RsnS7cH0Id6MjEEBGO" + n = str(int(time.time())) + # 拼接字符串并使用md5哈希。注意在Python中,需要对字符串编码才能生成哈希。 + sign_string = f"sign_{n}_sign{sign_nonce}" + sign_hash = hashlib.md5(sign_string.encode()).hexdigest() + return sign_hash, n + + +def HXEK_SIGN(memberId, appid): + # appid = "wxa1f1fa3785a47c7d" + secret = 'damogic8888' + # 获取GMT+8的当前时间戳 + timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + # timestamp = '2024-05-22 16:07:54' + # 生成随机数 + random_int = random.randint(1000000, 9999999) + # random_int = 4270745 + # 构建待加密字符串 + raw_string = f"timestamp={timestamp}transId={appid}{timestamp}secret={secret}random={random_int}memberId={memberId}" + # 使用MD5进行加密 + md5_hash = hashlib.md5(raw_string.encode()) + sign = md5_hash.hexdigest() + return sign, random_int, timestamp + + +def KPL_SIGN(url, params): + secret_key = "d19b9f22f5aac41ac0b56a1947f82bce" + # 提取URL路径(去掉域名部分) + url_path = url.replace("https://app.tv.kohesport.qq.com", "") + # 如果params是对象,转换为JSON字符串 + if isinstance(params, dict): + params_str = json.dumps(params, separators=(',', ':')) + else: + params_str = params + # 拼接路径、参数和密钥 + string_to_hash = f"{url_path}{params_str}{secret_key}" + # 计算SHA256哈希值 + signature = hashlib.sha256(string_to_hash.encode('utf-8')).hexdigest() + sign_header = { + "X-TGATV-SignatureMethod": "sha256", + "X-TGATV-SignatureVersion": "3", + "X-TGATV-Signature": signature + } + return sign_header + +def get_ip(): + response = requests.get('https://cdn.jsdelivr.net/gh/parserpp/ip_ports/proxyinfo.json',verify=False) + # 使用正则表达式提取 IP 地址和端口号 + data = response.text + lines = data.strip().split('\n') + # json_objects = [json.loads(line) for line in lines] + json_objects = [json.loads(line) for line in lines if json.loads(line)["country"] == "CN"] + # json_array = json.dumps(json_objects, indent=4) + if json_objects: + selected = random.choice(json_objects) + result = f"{selected['type']}://{selected['host']}:{selected['port']}" + + proxies = { + selected['type']: result, + } + print(f"当前代理:{result}") + return proxies + else: + print("没匹配到CN的ip") + return None + +def main(APP_NAME, local_script_name, ENV_NAME, local_version, need_invite=False): + global APP_INFO, TIPS, TIPS_HTML + git_url = f'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/{local_script_name}' + if CHECK(): + APP_INFO = CHERWIN_SCRIPT_CONFIG.get("APP_CONFIG", {}).get(ENV_NAME, {}) + # print(APP_INFO) + server_version = APP_INFO.get('NEW_VERSION', '') + if CHECK_UPDATE_NEW(local_version, server_version, git_url, local_script_name, APP_NAME=APP_NAME): + print('更新成功,请重新运行脚本!') + + if not APP_INFO.get('ENABLE', False) and not IS_DEV: + print('当前脚本未开放') + exit() + TIPS = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC', '') + TIPS_HTML = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC_HTML','') + ENV = os.environ.get(ENV_NAME) + if need_invite: + AuthorCode = get_AuthorInviteCode(f'https://yhsh.ziyuand.cn/{ENV_NAME}_INVITE_CODE.json') + else: + AuthorCode = '' + return ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode + else: + exit() + + +if __name__ == '__main__': + print(NOW_TOOLS_VERSION) diff --git a/顺丰速运.py b/顺丰速运.py index 169b00f..4b46f15 100644 --- a/顺丰速运.py +++ b/顺丰速运.py @@ -1727,15 +1727,15 @@ export SCRIPT_UPDATE = 'False' 关闭脚本自动更新,默认开启 if IS_DEV: import_Tools() else: - if os.path.isfile('tlmk.py'): + if os.path.isfile('CHERWIN_TOOLS.py'): import_Tools() else: - if down_file('CHERWIN_TOOLS.py', 'https://github.com/3288588344/toulu/blob/main/tlmk.py'): + if down_file('CHERWIN_TOOLS.py', 'https://github.com/3288588344/toulu/blob/main/CHERWIN_TOOLS.py'): print('脚本依赖下载完成请重新运行脚本') import_Tools() else: print( - '脚本依赖下载失败,请到https://github.com/3288588344/toulu/blob/main/tlmk.py下载最新版本依赖') + '脚本依赖下载失败,请到https://github.com/3288588344/toulu/blob/main/CHERWIN_TOOLS.py下载最新版本依赖') exit() print(TIPS) token = ''