活动恢复 改用pycryptodome

This commit is contained in:
limoruirui
2022-12-01 17:09:29 +08:00
parent 2c14f344af
commit 9c21fc083f

View File

@@ -4,6 +4,8 @@
# @Author : github@limoruirui https://github.com/limoruirui
# @Time : 2022/8/10 13:23
# -------------------------------
# cron "30 9 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('某通阅读');
"""
联通app抽奖 入口:联通app 搜索 阅读专区 进入话费派送中
1. 脚本仅供学习交流使用, 请在下载后24h内删除
@@ -14,71 +16,9 @@
推送通知的变量同青龙 只写了tgbot(支持反代api)和pushplus
"""
"""
updateTime: 2022.12.1 log: 活动重新上架 改用 pycryptodome 替代 cryptography 进行aes加密
updateTime: 2022.9.1 log: 每个月活动id改变更新
"""
import base64
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import algorithms
from Crypto.Cipher import AES
from binascii import b2a_hex, a2b_hex
class PrpCrypt(object):
def __init__(self, key):
self.key = key.encode('utf-8')
self.mode = AES.MODE_CBC
# 加密函数如果text不足16位就用空格补足为16位
# 如果大于16当时不是16的倍数那就补足为16的倍数。
def encrypt(self, text):
cryptor = AES.new(self.key, self.mode, b'16-Bytes--String')
text = text.encode('utf-8')
# 这里**key 长度必须为16AES-128,
# 24AES-192,或者32 AES-256Bytes 长度
# 目前AES-128 足够目前使用
text = self.pkcs7_padding(text)
self.ciphertext = cryptor.encrypt(text)
# 因为AES加密时候得到的字符串不一定是ascii字符集的输出到终端或者保存时候可能存在问题
# 所以这里统一把加密后的字符串转化为16进制字符串
return b2a_hex(self.ciphertext)
@staticmethod
def pkcs7_padding(data):
if not isinstance(data, bytes):
data = data.encode()
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
return padded_data
@staticmethod
def pkcs7_unpadding(padded_data):
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
data = unpadder.update(padded_data)
try:
uppadded_data = data + unpadder.finalize()
except ValueError:
raise Exception('无效的加密信息!')
else:
return uppadded_data
# 解密后去掉补足的空格用strip() 去掉
def decrypt(self, text):
# 偏移量'abcdefg'
cryptor = AES.new(self.key, self.mode, b'16-Bytes--String')
plain_text = cryptor.decrypt(a2b_hex(text))
# return plain_text.rstrip('\0')
return bytes.decode(plain_text).rstrip('\0')
from requests import post
from time import sleep, time
@@ -87,17 +27,16 @@ from hashlib import md5 as md5Encode
from random import randint, uniform
from os import environ
from sys import stdout, exit
from base64 import b64encode
from json import dumps
from tools.encrypt_symmetric import Crypt
from tools.send_msg import push
from tools.tool import get_environ, random_sleep
random_sleep(0, 1600)
"""读取环境变量"""
phone_num = environ.get("PHONE_NUM") if environ.get("PHONE_NUM") else ""
unicom_lotter = environ.get("UNICOM_LOTTER") if environ.get("UNICOM_LOTTER") else True
pushplus_token = environ.get("PUSH_PLUS_TOKEN") if environ.get("PUSH_PLUS_TOKEN") else ""
tgbot_token = environ.get("TG_BOT_TOKEN") if environ.get("TG_BOT_TOKEN") else ""
tg_userId = environ.get("TG_USER_ID") if environ.get("TG_USER_ID") else ""
tg_push_api = environ.get("TG_API_HOST") if environ.get("TG_API_HOST") else ""
"""主类"""
class China_Unicom:
def __init__(self, phone_num):
self.phone_num = phone_num
@@ -125,43 +64,11 @@ class China_Unicom:
m = md5Encode(str.encode(encoding='utf-8'))
return m.hexdigest()
def pushplus(self, title, content):
url = "http://www.pushplus.plus/send"
headers = {
"Content-Type": "application/json"
}
data = {
"token": pushplus_token,
"title": title,
"content": content
}
try:
post(url, headers=headers, json=data)
except:
self.print_now('推送失败')
def tgpush(self, content):
url = f"https://api.telegram.org/bot{tgbot_token}/sendMessage"
if tg_push_api != "":
url = f"https://{tg_push_api}/bot{tgbot_token}/sendMessage"
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = {'chat_id': str(tg_userId), 'text': content, 'disable_web_page_preview': 'true'}
try:
post(url, headers=headers, data=data, timeout=10)
except:
self.print_now('推送失败')
def push(self, msg):
if pushplus_token != "":
self.pushplus("联通app大转盘", msg)
if tgbot_token != "" and tg_userId != "":
self.tgpush(f"联通app大转盘:\n{msg}")
def req(self, url, crypt_text):
body = {
"sign": base64.b64encode(PrpCrypt(self.headers["accesstoken"][-16:]).encrypt(crypt_text)).decode()
"sign": b64encode(Crypt(crypt_type="AES", key=self.headers["accesstoken"][-16:], iv="16-Bytes--String", mode="CBC").encrypt(crypt_text).encode()).decode()
}
self.headers["Content-Length"] = str(len(str(body)) - 1)
self.headers["Content-Length"] = str(len(dumps(body).replace(" ", "")))
data = post(url, headers=self.headers, json=body).json()
return data
@@ -171,7 +78,7 @@ class China_Unicom:
url = f"https://10010.woread.com.cn/ng_woread_service/rest/app/auth/10000002/{timestamp}/{self.md5(f'100000027k1HcDL8RKvc{timestamp}')}"
crypt_text = f'{{"timestamp":"{date}"}}'
body = {
"sign": base64.b64encode(PrpCrypt("1234567890abcdef").encrypt(crypt_text)).decode()
"sign": b64encode(Crypt(crypt_type="AES", key="1234567890abcdef").encrypt(crypt_text).encode()).decode()
}
self.headers["Content-Length"] = str(len(str(body)) - 1)
data = post(url, headers=self.headers, json=body).json()
@@ -202,7 +109,7 @@ class China_Unicom:
self.print_now(data)
if self.fail_num == 3:
self.print_now("当前任务出现异常 且错误次数达到3次 请手动检查")
self.push("当前任务出现异常 且错误次数达到3次 请手动检查")
push("某通阅读", "当前任务出现异常 且错误次数达到3次 请手动检查")
exit(0)
if data["code"] == "9999":
self.print_now("当前任务出现异常 正在重新执行")
@@ -212,7 +119,7 @@ class China_Unicom:
def read_novel(self):
self.print_now("正在执行观看150次小说, 此过程较久, 最大时长为150 * 8s = 20min")
for i in range(150):
for i in range(1):
date = datetime.today().__format__("%Y%m%d%H%M%S")
chapterAllIndex = randint(100000000, 999999999)
cntIndex = randint(1000000, 9999999)
@@ -271,19 +178,15 @@ class China_Unicom:
url = "https://10010.woread.com.cn/ng_woread_service/rest/phone/vouchers/queryTicketAccount"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
body = {
"sign": base64.b64encode(PrpCrypt(self.headers["accesstoken"][-16:]).encrypt(crypt_text)).decode()
}
self.headers["Content-Length"] = str(len(str(body)) - 1)
data = self.req(url, crypt_text)
if data["code"] == "0000":
can_use_red = data["data"]["usableNum"] / 100
if can_use_red >= 3:
self.print_now(f"查询成功 你当前有话费红包{can_use_red} 可以去兑换了")
self.push(f"查询成功 你当前有话费红包{can_use_red} 可以去兑换了")
push("某通阅读", f"查询成功 你当前有话费红包{can_use_red} 可以去兑换了")
else:
self.print_now(f"查询成功 你当前有话费红包{can_use_red} 不足兑换的最低额度")
self.push(f"查询成功 你当前有话费红包{can_use_red} 不足兑换的最低额度")
push("某通阅读", f"查询成功 你当前有话费红包{can_use_red} 不足兑换的最低额度")
def main(self):
self.referer_login()
@@ -303,4 +206,9 @@ class China_Unicom:
if __name__ == "__main__":
"""读取环境变量"""
phone_num = get_environ("PHONE_NUM")
unicom_lotter = environ.get("UNICOM_LOTTER") if environ.get("UNICOM_LOTTER") else True
if phone_num == "":
exit(0)
China_Unicom(phone_num).main()