diff --git a/common.py b/common.py new file mode 100644 index 0000000..3550816 --- /dev/null +++ b/common.py @@ -0,0 +1,323 @@ +import uuid +from datetime import datetime +import json +import requests +import random +import re +import time +import base64 +import hashlib +from urllib3.exceptions import InsecureRequestWarning, InsecurePlatformWarning +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) +requests.packages.urllib3.disable_warnings(InsecurePlatformWarning) + + +def make_request(url, json_data=None, method='get', headers=None): + try: + if method.lower() == 'get': + response = requests.get(url, headers=headers, verify=False) + else: + response = requests.post(url, headers=headers, json=json_data, verify=False) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"请求错误: {e}") + return None + +# 通义千问API +def qianwen_messages(basic_question, question): + content = '' + # qw_key = os.getenv("QIANWEN_KEY") + # if not qw_key: + # print(f'⛔️未获取到通义千问key:请检查变量 {qw_key} 是否填写') + # else: + # dashscope.api_key = qw_key + # messages = [{'role': 'system', 'content': 'You are a helpful assistant.'}, + # {'role': 'user', 'content': basic_question + question}] + # response = dashscope.Generation.call( + # dashscope.Generation.Models.qwen_turbo, + # messages=messages, + # seed=random.randint(1, 10000), + # result_format='message', + # ) + # if response.status_code == HTTPStatus.OK: + # content = response['output']['choices'][0]['message']['content'] + # else: + # print('Request id: %s, Status code: %s, error code: %s, error message: %s' % ( + # response.request_id, response.status_code, + # response.code, response.message + # )) + return content + + +def get_current_timestamp_milliseconds(): + # 获取当前时间 + current_time = datetime.now() + # 将当前时间转换为时间戳(秒级) + timestamp_seconds = int(time.mktime(current_time.timetuple())) + # 将秒级时间戳转换为毫秒级 + timestamp_milliseconds = timestamp_seconds * 1000 + current_time.microsecond // 1000 + return timestamp_milliseconds + + +def daily_one_word(): + urls = [ + "https://api.xygeng.cn/openapi/one", + "https://v1.hitokoto.cn", + ] + url = random.choice(urls) + response = requests.get(url, verify=False) + if response and response.status_code == 200: + response_json = response.json() + if url == "https://api.xygeng.cn/openapi/one": + return response_json['data']['content'] + elif url == "https://v1.hitokoto.cn": + return response_json['hitokoto'] + else: + return None + else: + return None + + +# 随机一句网易云热评 +def get_163music_comments(): + comments = [] + keywords_to_filter = ['苏苏', '这首', '歌', '听', '发行', '编曲', '曲', '唱', '生日快乐', '生日', '中考', + '高考', '加油', '小猫', '西子', '好听', '音乐', + ] + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36', + } + + ids = [3778678, 6723173524, 5059661515] + id = random.choice(ids) + url = f'https://music.163.com/discover/toplist?id={id}' # 热歌榜的url + response = requests.get(url, headers=headers) + music_ids = re.findall(' 0: + return random.choice(comments) + else: + return None + + +# 获取代理IP +def get_ip(): + response = requests.get('https://cdn.jsdelivr.net/gh/parserpp/ip_ports/proxyinfo.json', verify=False) + # 使用正则表达式提取 IP 地址和端口号 + data = response.text + lines = data.strip().split('\n') + json_objects = [json.loads(line) for line in lines if json.loads(line)["country"] == "CN"] + if json_objects: + selected = random.choice(json_objects) + result = f"{selected['type']}://{selected['host']}:{selected['port']}" + + proxies = { + selected['type']: result, + } + print(f"当前代理:{result}") + return proxies + else: + print("没匹配到CN的ip") + return None + + +# 保存结果到文件 +def save_result_to_file(status, name): + if status == "success": + result = f'✅【{name}】 | CK正常' + elif status == "error": + result = f'❌【{name}】 | CK已失效' + + # 获取当前日期并格式化 + today_date = datetime.now().strftime("%Y%m%d") + file_name = f'script_results_{today_date}.txt' + + try: + with open(file_name, 'a', encoding='utf-8') as f: + f.write(f'{result}\n') + except Exception as e: + print(f"保存结果到文件时出现异常:{str(e)}") + + +def random_delay(min_delay=1, max_delay=5): + """ + 在min_delay和max_delay之间产生一个随机的延时时间,然后暂停执行。 + 参数: + min_delay (int/float): 最小延时时间(秒) + max_delay (int/float): 最大延时时间(秒) + """ + delay = random.uniform(min_delay, max_delay) + print(f">本次随机延迟:【{delay:.2f}】 秒.....") + time.sleep(delay) + + +def base64_to_hex(base64_str): + """ + 解码 Base64 编码的字符串,并转换为十六进制字符串。 + + 参数: + base64_str: Base64 编码的字符串。 + + 返回: + 解码后的十六进制字符串。 + """ + # Base64 解码 + decoded_bytes = base64.b64decode(base64_str) + + # 转换为十六进制字符串 + hex_string = ''.join(['{:02x}'.format(b) for b in decoded_bytes]) + + return hex_string + + +def sha256(data): + """ + 计算数据的 SHA-256 哈希值,并返回十六进制字符串。 + + 参数: + data: 要进行哈希计算的数据。 + + 返回: + 数据的 SHA-256 哈希值的十六进制字符串。 + """ + return hashlib.sha256(data.encode()).hexdigest() + + +def generate_upper_uuid(): + # 生成一个UUID + generated_uuid = uuid.uuid4() + + # 将UUID转换为指定格式字符串,例如:04D273CE-FAE2-4CC8-B020-E172B063ED8E + formatted_uuid = str(generated_uuid).upper() + + return formatted_uuid + + +def generate_lower_uuid(): + """ + 生成一个 UUID,并将其转换为全小写的字符串格式。 + + 返回: + 一个全小写的 UUID 字符串。 + """ + # 生成一个 UUID + generated_uuid = uuid.uuid4() + + # 将 UUID 转换为指定格式字符串 + formatted_uuid = str(generated_uuid).lower() + + return formatted_uuid + + +def md5_encrypt(input_string): + """ + 对输入的字符串进行 MD5 加密 + + 参数: + input_string (str): 需要加密的字符串 + + 返回: + str: 加密后的 MD5 哈希值(十六进制格式) + """ + # 创建 MD5 对象 + md5 = hashlib.md5() + + # 更新 MD5 对象 + md5.update(input_string.encode('utf-8')) + + # 获取 MD5 哈希值的十六进制表示 + return md5.hexdigest() + + +# 计算 SHA1 哈希 +def calculate_sha1_hash(data): + # 确保输入数据是字节串 + if isinstance(data, str): + data = data.encode('utf-8') + + # 创建一个 SHA1 哈希对象 + sha1 = hashlib.sha1() + + # 更新哈希对象 + sha1.update(data) + + # 获取十六进制形式的哈希值 + hash_value = sha1.hexdigest() + + return hash_value + + +def get_millisecond_timestamp(): + """ + 生成毫秒级时间戳 + """ + # 获取当前时间的时间戳(秒) + timestamp_seconds = time.time() + # 将时间戳转换为毫秒并返回 + timestamp_milliseconds = int(timestamp_seconds * 1000) + return timestamp_milliseconds + + +def generate_nanosecond_timestamp(): + """ + 生成纳秒级时间戳 + """ + # 获取当前时间的时间戳(以秒为单位) + current_time_seconds = time.time() + + # 转换为纳秒 + nanoseconds = int(current_time_seconds * 1e9) + + return nanoseconds