Files
zjk2017-ArcadiaScriptPublic/common.py
2024-08-27 14:25:24 +08:00

324 lines
9.8 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import uuid,os
from datetime import datetime
import json
import requests
import random
import re
import time
import base64
import hashlib
from urllib3.exceptions import InsecureRequestWarning, InsecurePlatformWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
requests.packages.urllib3.disable_warnings(InsecurePlatformWarning)
def make_request(url, json_data=None, method='get', headers=None):
try:
if method.lower() == 'get':
response = requests.get(url, headers=headers, verify=False)
else:
response = requests.post(url, headers=headers, json=json_data, verify=False)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
return None
# 通义千问API
def qianwen_messages(basic_question, question):
content = ''
qw_key = os.getenv("QIANWEN_KEY")
if not qw_key:
print(f'未获取到通义千问key请检查变量 {qw_key} 是否填写')
else:
dashscope.api_key = qw_key
messages = [{'role': 'system', 'content': 'You are a helpful assistant.'},
{'role': 'user', 'content': basic_question + question}]
response = dashscope.Generation.call(
dashscope.Generation.Models.qwen_turbo,
messages=messages,
seed=random.randint(1, 10000),
result_format='message',
)
if response.status_code == HTTPStatus.OK:
content = response['output']['choices'][0]['message']['content']
else:
print('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
response.request_id, response.status_code,
response.code, response.message
))
return content
def get_current_timestamp_milliseconds():
# 获取当前时间
current_time = datetime.now()
# 将当前时间转换为时间戳(秒级)
timestamp_seconds = int(time.mktime(current_time.timetuple()))
# 将秒级时间戳转换为毫秒级
timestamp_milliseconds = timestamp_seconds * 1000 + current_time.microsecond // 1000
return timestamp_milliseconds
def daily_one_word():
urls = [
"https://api.xygeng.cn/openapi/one",
"https://v1.hitokoto.cn",
]
url = random.choice(urls)
response = requests.get(url, verify=False)
if response and response.status_code == 200:
response_json = response.json()
if url == "https://api.xygeng.cn/openapi/one":
return response_json['data']['content']
elif url == "https://v1.hitokoto.cn":
return response_json['hitokoto']
else:
return None
else:
return None
# 随机一句网易云热评
def get_163music_comments():
comments = []
keywords_to_filter = ['苏苏', '这首', '', '', '发行', '编曲', '', '', '生日快乐', '生日', '中考',
'高考', '加油', '小猫', '西子', '好听', '音乐',
]
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36',
}
ids = [3778678, 6723173524, 5059661515]
id = random.choice(ids)
url = f'https://music.163.com/discover/toplist?id={id}' # 热歌榜的url
response = requests.get(url, headers=headers)
music_ids = re.findall('<a href="/song\?id=(\d+)"', response.text)
music_id = random.choice(music_ids)
get_url = "http://music.163.com/api/v1/resource/comments/R_SO_4_" + music_id + "?limit=0&offset=0"
response = requests.get(get_url, headers=headers)
json_dict = json.loads(response.content.decode("utf-8"))
hotcomments = json_dict["hotComments"]
for j in hotcomments:
content = j["content"].replace("\n", " ")
nickname = j["user"]["nickname"]
liked = str(j["likedCount"]) + ""
# print(f"{nickname} | {content} | {liked}赞")
# 过滤带有数字的句子
if any(char.isdigit() for char in content):
print("包含数字,跳过")
continue
# 过滤特殊表情符号和书名号《》
if re.search(r'[《》]', content):
print("包含特殊符号,跳过")
continue
# 检查评论中是否包含任何关键词
if any(keyword in content for keyword in keywords_to_filter):
print("包含关键词,跳过")
continue
if len(content) <= 40:
continue
# 定义一个正则表达式模式,用于匹配表情符号
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F"
"\U0001F300-\U0001F5FF"
"\U0001F680-\U0001F6FF"
"\U0001F700-\U0001F77F"
"\U0001F780-\U0001F7FF"
"\U0001F800-\U0001F8FF"
"\U0001F900-\U0001F9FF"
"\U0001FA00-\U0001FA6F"
"\U0001FA70-\U0001FAFF"
"\U00002702-\U000027B0"
"]+",
flags=re.UNICODE,
)
# 定义一个正则表达式模式,用于匹配特殊字符
special_char_pattern = re.compile(r'[^\w\s“”【】《》]+')
# 替换表情符号
hot_comment = emoji_pattern.sub(r'', content)
# 使用正则表达式替换特殊字符为空字符串
clean_text = special_char_pattern.sub('', hot_comment)
comments.append(clean_text)
if len(comments) > 0:
return random.choice(comments)
else:
return None
# 获取代理IP
def get_ip():
response = requests.get('https://cdn.jsdelivr.net/gh/parserpp/ip_ports/proxyinfo.json', verify=False)
# 使用正则表达式提取 IP 地址和端口号
data = response.text
lines = data.strip().split('\n')
json_objects = [json.loads(line) for line in lines if json.loads(line)["country"] == "CN"]
if json_objects:
selected = random.choice(json_objects)
result = f"{selected['type']}://{selected['host']}:{selected['port']}"
proxies = {
selected['type']: result,
}
print(f"当前代理:{result}")
return proxies
else:
print("没匹配到CN的ip")
return None
# 保存结果到文件
def save_result_to_file(status, name):
if status == "success":
result = f'✅【{name}】 | CK正常'
elif status == "error":
result = f'❌【{name}】 | CK已失效'
# 获取当前日期并格式化
today_date = datetime.now().strftime("%Y%m%d")
file_name = f'script_results_{today_date}.txt'
try:
with open(file_name, 'a', encoding='utf-8') as f:
f.write(f'{result}\n')
except Exception as e:
print(f"保存结果到文件时出现异常:{str(e)}")
def random_delay(min_delay=1, max_delay=5):
"""
在min_delay和max_delay之间产生一个随机的延时时间然后暂停执行。
参数:
min_delay (int/float): 最小延时时间(秒)
max_delay (int/float): 最大延时时间(秒)
"""
delay = random.uniform(min_delay, max_delay)
print(f">本次随机延迟:【{delay:.2f}】 秒.....")
time.sleep(delay)
def base64_to_hex(base64_str):
"""
解码 Base64 编码的字符串,并转换为十六进制字符串。
参数:
base64_str: Base64 编码的字符串。
返回:
解码后的十六进制字符串。
"""
# Base64 解码
decoded_bytes = base64.b64decode(base64_str)
# 转换为十六进制字符串
hex_string = ''.join(['{:02x}'.format(b) for b in decoded_bytes])
return hex_string
def sha256(data):
"""
计算数据的 SHA-256 哈希值,并返回十六进制字符串。
参数:
data: 要进行哈希计算的数据。
返回:
数据的 SHA-256 哈希值的十六进制字符串。
"""
return hashlib.sha256(data.encode()).hexdigest()
def generate_upper_uuid():
# 生成一个UUID
generated_uuid = uuid.uuid4()
# 将UUID转换为指定格式字符串例如04D273CE-FAE2-4CC8-B020-E172B063ED8E
formatted_uuid = str(generated_uuid).upper()
return formatted_uuid
def generate_lower_uuid():
"""
生成一个 UUID并将其转换为全小写的字符串格式。
返回:
一个全小写的 UUID 字符串。
"""
# 生成一个 UUID
generated_uuid = uuid.uuid4()
# 将 UUID 转换为指定格式字符串
formatted_uuid = str(generated_uuid).lower()
return formatted_uuid
def md5_encrypt(input_string):
"""
对输入的字符串进行 MD5 加密
参数:
input_string (str): 需要加密的字符串
返回:
str: 加密后的 MD5 哈希值(十六进制格式)
"""
# 创建 MD5 对象
md5 = hashlib.md5()
# 更新 MD5 对象
md5.update(input_string.encode('utf-8'))
# 获取 MD5 哈希值的十六进制表示
return md5.hexdigest()
# 计算 SHA1 哈希
def calculate_sha1_hash(data):
# 确保输入数据是字节串
if isinstance(data, str):
data = data.encode('utf-8')
# 创建一个 SHA1 哈希对象
sha1 = hashlib.sha1()
# 更新哈希对象
sha1.update(data)
# 获取十六进制形式的哈希值
hash_value = sha1.hexdigest()
return hash_value
def get_millisecond_timestamp():
"""
生成毫秒级时间戳
"""
# 获取当前时间的时间戳(秒)
timestamp_seconds = time.time()
# 将时间戳转换为毫秒并返回
timestamp_milliseconds = int(timestamp_seconds * 1000)
return timestamp_milliseconds
def generate_nanosecond_timestamp():
"""
生成纳秒级时间戳
"""
# 获取当前时间的时间戳(以秒为单位)
current_time_seconds = time.time()
# 转换为纳秒
nanoseconds = int(current_time_seconds * 1e9)
return nanoseconds