Update LevelExch.py

添加瑞数
This commit is contained in:
daiyanan1992
2024-12-02 08:07:39 +08:00
committed by GitHub
parent 13b3ad988b
commit 44e3b4a646

View File

@@ -1,41 +1,82 @@
#!/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# @Author : github@limoruirui https://github.com/limoruirui
# @Time : 2022/9/12 16:10
# cron "0 9,10,14 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('电信并发多账号签到');
# -------------------------------
"""
1. 电信签到 不需要抓包 脚本仅供学习交流使用, 请在下载后24h内删除
2. cron说明 默认10点14点执行一次(用于兑换话费)
2. 环境变量说明:
必须 TELECOM : 电信手机号@电信服务密码@宠物喂食次数(默认0,最大10)&手机号2@密码2@喂食数2
# TELECOM 13311111111@111111@0&13322222222@222222@10
并发命令task WWJqingcheng_dx/china_telecom.py conc TELECOM
task 后边是脚本所在目录/china_telecom.py conc TELECOM
3. 必须登录过 电信营业厅 app的账号才能正常运行
"""
import json
"""
update:
2022.10.25 参考大佬 github@QGCliveDavis https://github.com/QGCliveDavis 的 loginAuthCipherAsymmertric 参数解密 新增app登录获取token 完成星播客系列任务 感谢大佬
2022.11.11 增加分享任务
"""
from datetime import date, datetime
import time
import requests import requests
from requests import get, post import re
from base64 import b64encode import time
import json
import random
import datetime
import base64
import threading
import ssl
import execjs
import os
import sys
from tools.rsa_encrypt import RSA_Encrypt from bs4 import BeautifulSoup
from tools.tool import timestamp, get_environ, print_now
from login.telecom_login import TelecomLogin
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
from Crypto.Cipher import DES3
from Crypto.Util.Padding import pad, unpad
from Crypto.Util.strxor import strxor
from Crypto.Cipher import AES
from http import cookiejar # Python 2: import cookielib as cookiejar
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
from tools.notify import send
class BlockAll(cookiejar.CookiePolicy):
return_ok = set_ok = domain_return_ok = path_return_ok = lambda self, *args, **kwargs: False
netscape = True
rfc2965 = hide_cookie2 = False
def printn(m):
print(f'\n{m}')
ORIGIN_CIPHERS = ('DEFAULT@SECLEVEL=1')
ip_list = []
class DESAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
"""
A TransportAdapter that re-enables 3DES support in Requests.
"""
CIPHERS = ORIGIN_CIPHERS.split(':')
random.shuffle(CIPHERS)
CIPHERS = ':'.join(CIPHERS)
self.CIPHERS = CIPHERS + ':!aNULL:!eNULL:!MD5'
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).proxy_manager_for(*args, **kwargs)
requests.packages.urllib3.disable_warnings()
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
ssl_context.set_ciphers('DEFAULT@SECLEVEL=0')
ss = requests.session()
ss.ssl = ssl_context
ss.headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36",
"Referer": "https://wapact.189.cn:9001/JinDouMall/JinDouMall_independentDetails.html"}
ss.mount('https://', DESAdapter())
yc = 0.1
wt = 0
kswt = -3
yf = datetime.datetime.now().strftime("%Y%m")
data = {} data = {}
@@ -49,168 +90,399 @@ except:
with open('权益id.log', 'w') as f: with open('权益id.log', 'w') as f:
pass pass
yf = datetime.now().strftime("%Y%m") yf = datetime.datetime.now().strftime("%Y%m")
dd = datetime.now().strftime("%d") dd = datetime.datetime.now().strftime("%d")
# dd = '01' # dd = '01'
def getId(phone,ck):
# global data
if yf not in data:
data[yf] = {}
class ChinaTelecom: str1 = get_level(phone,ck)
def __init__(self, account, pwd, checkin=True): str2 = str1.split('#')
self.phone = account # print(str2)
self.ticket = "" for i in range(0, 3):
self.token = "" data[yf][f'{i + 4}'] = str2[i]
if pwd != "" and checkin:
userLoginInfo = TelecomLogin(account, pwd).main()
self.ticket = userLoginInfo[0]
self.token = userLoginInfo[1]
def getSign(self, tick):
url = "https://wapside.189.cn:9001/jt-sign/ssoHomLogin"
data = {'ticket': f'{tick}'}
resp = get(url=url, params=data).json()
sign = resp["sign"]
# print(sign)
return sign
def init(self):
self.msg = ""
self.ua = f"CtClient;9.6.1;Android;12;SM-G9860;{b64encode(self.phone[5:11].encode()).decode().strip('=+')}!#!{b64encode(self.phone[0:5].encode()).decode().strip('=+')}"
self.sign = self.getSign(self.ticket)
self.headers = {
"Host": "wapside.189.cn:9001",
"Referer": "https://wapside.189.cn:9001/resources/dist/signInActivity.html",
"User-Agent": self.ua,
"sign": self.sign
}
self.key = "-----BEGIN PUBLIC KEY-----\nMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC+ugG5A8cZ3FqUKDwM57GM4io6\nJGcStivT8UdGt67PEOihLZTw3P7371+N47PrmsCpnTRzbTgcupKtUv8ImZalYk65\ndU8rjC/ridwhw9ffW2LBwvkEnDkkKKRi2liWIItDftJVBiWOh17o6gfbPoNrWORc\nAdcbpk2L+udld5kZNwIDAQAB\n-----END PUBLIC KEY-----"
def req(self, url, method, data=None):
if method == "GET":
data = get(url, headers=self.headers).json()
return data
elif method.upper() == "POST":
data = post(url, headers=self.headers, json=data).json()
return data
else:
print_now("您当前使用的请求方式有误,请检查")
# 长明文分段rsa加密
def telecom_encrypt(self, text):
if len(text) <= 32:
return RSA_Encrypt(self.key).encrypt(text)
else:
encrypt_text = ""
for i in range(int(len(text) / 32) + 1):
split_text = text[(32 * i):(32 * (i + 1))]
encrypt_text += RSA_Encrypt(self.key).encrypt(split_text)
return encrypt_text
def get_level(self):
def get_level(phone,ck):
url = "https://wapside.189.cn:9001/jt-sign/paradise/getLevelRightsList" url = "https://wapside.189.cn:9001/jt-sign/paradise/getLevelRightsList"
body = { body = {"para": encrypt_para(f'{{"phone":{phone}}}')}
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}') right_list = requests.post("https://wapside.189.cn:9001/jt-sign/paradise/getLevelRightsList", json=body,
} cookies=ck).text
right_list_json = json.loads(right_list)
rightsId = '' rightsId = ''
levelStr = ['V4','V5','V6'] levelStr = ['V4','V5','V6']
for str in levelStr: for str in levelStr:
right_list = self.req(url, "POST", body)[f"{str}"] # right_list = self.req(url, "POST", body)[f"{str}"]
for data in right_list: # right_list = requests.post("https://wapside.189.cn:9001/jt-sign/paradise/getLevelRightsList",json=body,cookies=ck).json()
# printn(right_list[f'{str}'])
right_list1 = right_list_json[f'{str}']
for data in right_list1:
# print(dumps(data, indent=2, ensure_ascii=0)) # print(dumps(data, indent=2, ensure_ascii=0))
if "话费" in data["righstName"]: if "话费" in data["righstName"]:
rightsId += f'{data["id"]}#' rightsId += f'{data["id"]}#'
print(rightsId) print(rightsId)
return rightsId return rightsId
# print(f'等级返回:{data}')
# print(self.rightsId)
wxp = {}
errcode = {
"0": "兑换成功",
"412": "兑换次数已达上限",
"413": "商品已兑完",
"420": "未知错误",
"410": "该活动已失效~",
"Y0001": "当前等级不足,去升级兑当前话费",
"Y0002": "使用翼相连网络600分钟或连接并拓展网络500分钟可兑换此奖品",
"Y0003": "使用翼相连共享流量400M或共享WIFI2GB可兑换此奖品",
"Y0004": "使用翼相连共享流量2GB可兑换此奖品",
"Y0005": "当前等级不足,去升级兑当前话费",
"E0001": "您的网龄不足10年暂不能兑换"
}
# 每月领取等级金豆 # 加密参数
def level_ex(self, rightsId): key = b'1234567`90koiuyhgtfrdews'
iv = 8 * b'\0'
public_key_b64 = '''-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDBkLT15ThVgz6/NOl6s8GNPofdWzWbCkWnkaAm7O2LjkM1H7dMvzkiqdxU02jamGRHLX/ZNMCXHnPcW/sDhiFCBN18qFvy8g6VYb9QtroI09e176s+ZCtiv7hbin2cCTj99iUpnEloZm19lwHyo69u5UMiPMpq0/XKBO8lYhN/gwIDAQAB
-----END PUBLIC KEY-----'''
public_key_data = '''-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC+ugG5A8cZ3FqUKDwM57GM4io6JGcStivT8UdGt67PEOihLZTw3P7371+N47PrmsCpnTRzbTgcupKtUv8ImZalYk65dU8rjC/ridwhw9ffW2LBwvkEnDkkKKRi2liWIItDftJVBiWOh17o6gfbPoNrWORcAdcbpk2L+udld5kZNwIDAQAB
-----END PUBLIC KEY-----'''
def t(h):
date = datetime.datetime.now()
date_zero = datetime.datetime.now().replace(year=date.year, month=date.month, day=date.day, hour=h, minute=59,
second=55)
date_zero_time = int(time.mktime(date_zero.timetuple()))
return date_zero_time
def encrypt(text):
cipher = DES3.new(key, DES3.MODE_CBC, iv)
ciphertext = cipher.encrypt(pad(text.encode(), DES3.block_size))
return ciphertext.hex()
def decrypt(text):
ciphertext = bytes.fromhex(text)
cipher = DES3.new(key, DES3.MODE_CBC, iv)
plaintext = unpad(cipher.decrypt(ciphertext), DES3.block_size)
return plaintext.decode()
def b64(plaintext):
public_key = RSA.import_key(public_key_b64)
cipher = PKCS1_v1_5.new(public_key)
ciphertext = cipher.encrypt(plaintext.encode())
return base64.b64encode(ciphertext).decode()
def encrypt_para(plaintext):
public_key = RSA.import_key(public_key_data)
cipher = PKCS1_v1_5.new(public_key)
ciphertext = cipher.encrypt(plaintext.encode())
return ciphertext.hex()
def encode_phone(text):
encoded_chars = []
for char in text:
encoded_chars.append(chr(ord(char) + 2))
return ''.join(encoded_chars)
def ophone(t):
key = b'34d7cb0bcdf07523'
utf8_key = key.decode('utf-8')
utf8_t = t.encode('utf-8')
cipher = AES.new(key, AES.MODE_ECB)
ciphertext = cipher.encrypt(pad(utf8_t, AES.block_size))
return ciphertext.hex()
# def send(uid,content):
# r = requests.post('https://wxpusher.zjiecode.com/api/send/message',json={"appToken":"AT_3hr0wdZn5QzPNBbpTHFXawoDIsSUmPkN","content":content,"contentType":1,"uids":[uid]}).json()
# return r
def userLoginNormal(phone, password):
alphabet = 'abcdef0123456789'
uuid = [''.join(random.sample(alphabet, 8)), ''.join(random.sample(alphabet, 4)),
'4' + ''.join(random.sample(alphabet, 3)), ''.join(random.sample(alphabet, 4)),
''.join(random.sample(alphabet, 12))]
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
loginAuthCipherAsymmertric = 'iPhone 14 15.4.' + uuid[0] + uuid[1] + phone + timestamp + password[:6] + '0$$$0.'
r = ss.post('https://appgologin.189.cn:9031/login/client/userLoginNormal', json={
"headerInfos": {"code": "userLoginNormal", "timestamp": timestamp, "broadAccount": "", "broadToken": "",
"clientType": "#9.6.1#channel50#iPhone 14 Pro Max#", "shopId": "20002", "source": "110003",
"sourcePassword": "Sid98s", "token": "", "userLoginName": phone}, "content": {"attach": "test",
"fieldData": {
"loginType": "4",
"accountType": "",
"loginAuthCipherAsymmertric": b64(
loginAuthCipherAsymmertric),
"deviceUid":
uuid[0] +
uuid[1] +
uuid[2],
"phoneNum": encode_phone(
phone),
"isChinatelecom": "0",
"systemVersion": "15.4.0",
"authentication": password}}}).json()
l = r['responseData']['data']['loginSuccessResult']
if l:
load_token[phone] = l
with open(load_token_file, 'w') as f:
json.dump(load_token, f)
ticket = get_ticket(phone, l['userId'], l['token'])
return ticket
return False
def get_ticket(phone, userId, token):
r = ss.post('https://appgologin.189.cn:9031/map/clientXML',
data='<Request><HeaderInfos><Code>getSingle</Code><Timestamp>' + datetime.datetime.now().strftime(
"%Y%m%d%H%M%S") + '</Timestamp><BroadAccount></BroadAccount><BroadToken></BroadToken><ClientType>#9.6.1#channel50#iPhone 14 Pro Max#</ClientType><ShopId>20002</ShopId><Source>110003</Source><SourcePassword>Sid98s</SourcePassword><Token>' + token + '</Token><UserLoginName>' + phone + '</UserLoginName></HeaderInfos><Content><Attach>test</Attach><FieldData><TargetId>' + encrypt(
userId) + '</TargetId><Url>4a6862274835b451</Url></FieldData></Content></Request>',
headers={'user-agent': 'CtClient;10.4.1;Android;13;22081212C;NTQzNzgx!#!MTgwNTg1'})
# printn(phone, '获取ticket', re.findall('<Reason>(.*?)</Reason>',r.text)[0])
tk = re.findall('<Ticket>(.*?)</Ticket>', r.text)
if len(tk) == 0:
return False
return decrypt(tk[0])
def queryInfo(phone, s):
global rs
a = 1
while a < 10:
if rs:
bd = js.call('main').split('=')
ck[bd[0]] = bd[1]
r = s.get('https://wapact.189.cn:9001/gateway/golden/api/queryInfo', cookies=ck).json()
try:
printn(f'{phone} 金豆余额 {r["biz"]["amountTotal"]}')
amountTotal = r["biz"]["amountTotal"]
except:
amountTotal = 0
if amountTotal < 3000:
if rs == 1:
bd = js.call('main').split('=')
ck[bd[0]] = bd[1]
res = s.post('http://wapact.189.cn:9000/gateway/stand/detail/exchange', json={"activityId": jdaid},
cookies=ck).text
if '$_ts=window' in res:
first_request()
rs = 1
time.sleep(3)
else:
return r
a += 1
return r
def getSign(ticket,session):
try:
bd = js.call('main').split('=')
ck[bd[0]] = bd[1]
response_data = session.get('https://wapside.189.cn:9001/jt-sign/ssoHomLogin?ticket=' + ticket, cookies=ck).json()[
'sign']
print(response_data)
return response_data
except Exception as e:
print(e)
def level_ex(phone, rightsId,session,ck):
# self.get_level() # self.get_level()
now = datetime.now().strftime('%H:%M:%S.%f') try:
bd = js.call('main').split('=')
ck[bd[0]] = bd[1]
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
url = "https://wapside.189.cn:9001/jt-sign/paradise/conversionRights" url = "https://wapside.189.cn:9001/jt-sign/paradise/conversionRights"
data = { data = {"para": encrypt_para(f'{{"phone":{phone},"rightsId":"{rightsId}"}},"receiveCount":1')}
"para": self.telecom_encrypt(f'{{"phone":{self.phone},"rightsId":"{rightsId}"}},"receiveCount":1')
}
resp = self.req(url, "POST", data) response = session.post('https://wapside.189.cn:9001/jt-sign/paradise/conversionRights', json=data,cookies=ck)
print_now(f'{now}--Phone:{self.phone}--{resp}') print(f'{now}--Phone:{phone}--{response.text}')
except Exception as e:
print(e)
def getId(self):
# global data
if yf not in data:
data[yf] = {}
str1 = self.get_level()
str2 = str1.split('#')
# print(str2)
for i in range(0, 3):
data[yf][f'{i + 4}'] = str2[i]
def ks(phone, ticket,level, uid):
global wt
wxp[phone] = uid
s = requests.session()
s.headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36",
"Referer": "https://wapact.189.cn:9001/JinDouMall/JinDouMall_independentDetails.html"}
s.cookies.set_policy(BlockAll())
s.mount('https://', DESAdapter())
s.timeout = 30
if rs:
bd = js.call('main').split('=')
ck[bd[0]] = bd[1]
login = s.post('https://wapact.189.cn:9001/unified/user/login',
json={"ticket": ticket, "backUrl": "https%3A%2F%2Fwapact.189.cn%3A9001",
"platformCode": "P201010301", "loginType": 2}, cookies=ck).json()
if login['code'] == 0:
printn(phone + " 获取token成功")
s.headers["Authorization"] = "Bearer " + login["biz"]["token"]
queryInfo(phone, s)
if rs:
# bd = js.call('main').split('=')
def main(self,level): # ck[bd[0]] = bd[1]
global data # response_data = s.get('https://wapside.189.cn:9001/jt-sign/ssoHomLogin?ticket=' + ticket, cookies=ck).json()['sign']
global dirsize # new_header = {
self.init() # "User-Agent": f"CtClient;9.6.1;Android;12;SM-G9860;{base64.b64encode(phone[5:11].encode()).decode().strip('=+')}!#!{base64.b64encode(phone[0:5].encode()).decode().strip('=+')}",
# "Referer": "https://wapside.189.cn:9001/resources/dist/signInActivity.html",
# "sign":response_data}
# s.headers.update(new_header)
if dd == '01' or dirsize == 0: if dd == '01' or dirsize == 0:
self.getId() getId(phone,ck)
with open('权益id.log', 'w') as f: with open('权益id.log', 'w') as f:
f.write(json.dumps(data)) f.write(json.dumps(data))
print('再跑一次脚本') print('再跑一次脚本')
rightsId = data[yf][level] rightsId = data[yf][level]
print(rightsId) sign = getSign(ticket, s)
new_header = {
"User-Agent": f"CtClient;9.6.1;Android;12;SM-G9860;{base64.b64encode(phone[5:11].encode()).decode().strip('=+')}!#!{base64.b64encode(phone[0:5].encode()).decode().strip('=+')}",
"Referer": "https://wapside.189.cn:9001/resources/dist/signInActivity.html",
"sign": sign}
s.headers.update(new_header)
start_time = time.time() start_time = time.time()
while 1 == 1: while 1 == 1:
current_time = time.time() current_time = time.time()
try: try:
data = self.level_ex(rightsId)
level_ex(phone,rightsId,s,ck)
except Exception as e: except Exception as e:
print(f"请求发送失败: " + str(e)) print(f"请求发送失败: " + str(e))
# # sleep(6) # # sleep(6)
continue continue
elapsed_time = current_time - start_time elapsed_time = current_time - start_time
if elapsed_time >= 200: # 5分钟是300秒 if elapsed_time >= 150: # 5分钟是300秒
break break
print('========================================================')
print('============================分隔符=======================')
print('========================================================')
# 主方法与源文件不同;增加了多账号的判断;变量格式如下
# TELECOM 13311111111@111111@10&13322222222@222222@10
if __name__ == "__main__":
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL'
# TELECOM = get_environ("chinaTelecomAccount")
TELECOM = '18888888888#398104#5'
users = TELECOM.split("&")
for i in range(len(users)):
user = users[i].split("#")
phone = user[0]
password = user[1]
levelid = user[2]
print(phone, password)
if phone == "":
exit(0)
if password == "":
print_now("电信服务密码未提供 只执行部分任务")
else: else:
telecom = ChinaTelecom(phone, password)
telecom.main(levelid) printn(f"{phone} 获取token {login['message']}")
def first_request(res=''):
global js, fw
url = 'https://wapact.189.cn:9001/gateway/standExchange/detailNew/exchange'
if res == '':
response = ss.get(url)
res = response.text
soup = BeautifulSoup(res, 'html.parser')
scripts = soup.find_all('script')
for script in scripts:
if 'src' in str(script):
rsurl = re.findall('src="([^"]+)"', str(script))[0]
if '$_ts=window' in script.get_text():
ts_code = script.get_text()
urls = url.split('/')
rsurl = urls[0] + '//' + urls[2] + rsurl
# print(rsurl)
ts_code += ss.get(rsurl).text
content_code = soup.find_all('meta')[1].get('content')
with open("瑞数通杀.js", encoding='utf-8') as f:
js_code_ym = f.read()
js_code = js_code_ym.replace('content_code', content_code).replace("'ts_code'", ts_code)
js = execjs.compile(js_code)
for cookie in ss.cookies:
ck[cookie.name] = cookie.value
return content_code, ts_code, ck
def main():
global wt, rs
r = ss.get('https://wapact.189.cn:9001/gateway/standExchange/detailNew/exchange')
if '$_ts=window' in r.text:
rs = 1
print("瑞数加密已开启")
first_request()
else:
print("瑞数加密已关闭")
rs = 0
if os.environ.get('chinaTelecomAccount') != None:
chinaTelecomAccount = os.environ.get('jdhf')
else:
chinaTelecomAccount = jdhf
chinaTelecomAccount = '18811118888@111111@6'
for i in chinaTelecomAccount.split('&'):
i = i.split('@')
phone = i[0]
password = i[1]
level = i[2]
uid = i[1]
ticket = False
# ticket = get_userTicket(phone)
if phone in load_token:
printn(f'{phone} 使用缓存登录')
ticket = get_ticket(phone, load_token[phone]['userId'], load_token[phone]['token'])
if ticket == False:
printn(f'{phone} 使用密码登录')
ticket = userLoginNormal(phone, password)
if ticket:
threading.Thread(target=ks, args=(phone, ticket,level,uid)).start()
time.sleep(1)
else:
printn(f'{phone} 登录失败')
jdhf = ""
cfcs = 20
jdaid = '60dd79533dc03d3c76bdde30'
ck = {}
load_token_file = 'chinaTelecom_cache.json'
try:
with open(load_token_file, 'r') as f:
load_token = json.load(f)
except:
load_token = {}
main()