Add files via upload

This commit is contained in:
cc892786825
2025-08-17 21:04:02 +08:00
committed by GitHub
parent 1b923353e7
commit 22cdbc68b1
5 changed files with 3919 additions and 0 deletions

255
电信/Ruishu.py Normal file
View File

@@ -0,0 +1,255 @@
import os
import ssl
import time
import json
import execjs
import base64
import random
import certifi
import aiohttp
import asyncio
import requests
from http import cookiejar
from Crypto.Cipher import DES3
from Crypto.Util.Padding import pad, unpad
from aiohttp import ClientSession, TCPConnector
import httpx
httpx._config.DEFAULT_CIPHERS += ":ALL:@SECLEVEL=1"
diffValue = 2
filename='Cache.js'
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as file:
fileContent = file.read()
else:
fileContent=''
class BlockAll(cookiejar.CookiePolicy):
return_ok = set_ok = domain_return_ok = path_return_ok = lambda self, *args, **kwargs: False
netscape = True
rfc2965 = hide_cookie2 = False
def printn(m):
print(f'\n{m}')
context = ssl.create_default_context()
context.set_ciphers('DEFAULT@SECLEVEL=1') # 低安全级别0/1
context.check_hostname = False # 禁用主机
context.verify_mode = ssl.CERT_NONE # 禁用证书
class DESAdapter(requests.adapters.HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = context
return super().init_poolmanager(*args, **kwargs)
requests.DEFAULT_RETRIES = 0
requests.packages.urllib3.disable_warnings()
ss = requests.session()
ss.headers = {"User-Agent": "Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36", "Referer": "https://wapact.189.cn:9001/JinDouMall/JinDouMall_independentDetails.html"}
ss.mount('https://', DESAdapter())
ss.cookies.set_policy(BlockAll())
runTime = 0
sleepTime = 1
key = b'1234567`90koiuyhgtfrdews'
iv = 8 * b'\0'
def encrypt(text):
cipher = DES3.new(key, DES3.MODE_CBC, iv)
ciphertext = cipher.encrypt(pad(text.encode(), DES3.block_size))
return ciphertext.hex()
def decrypt(text):
ciphertext = bytes.fromhex(text)
cipher = DES3.new(key, DES3.MODE_CBC, iv)
plaintext = unpad(cipher.decrypt(ciphertext), DES3.block_size)
return plaintext.decode()
def initCookie(getUrl='https://wapact.189.cn:9001/gateway/standQuery/detailNew/exchange'):
global js_code_ym, fileContent
cookie = ''
response = httpx.post(getUrl)
content = response.text.split(' content="')[2].split('" r=')[0]
code1 = response.text.split('$_ts=window')[1].split('</script><script type="text/javascript"')[0]
code1Content = '$_ts=window' + code1
Url = response.text.split('$_ts.lcd();</script><script type="text/javascript" charset="utf-8" src="')[1].split('" r=')[0]
urls = getUrl.split('/')
rsurl = urls[0] + '//' + urls[2] + Url
filename = 'Cache.js'
if fileContent == '':
if not os.path.exists(filename):
fileRes = httpx.get(rsurl)
fileContent = fileRes.text
if fileRes.status_code == 200:
with open(filename, 'w', encoding='utf-8') as file:
file.write(fileRes.text)
else:
print(f"Failed to download {rsurl}. Status code: {fileRes.status_code}")
if response.headers['Set-Cookie']:
cookie = response.headers['Set-Cookie'].split(';')[0].split('=')[1]
runJs = js_code_ym.replace('content_code', content).replace("'ts_code'", code1Content + fileContent)
execjsRun = RefererCookie(runJs)
return {
'cookie': cookie,
'execjsRun': execjsRun
}
def RefererCookie(runJs):
try:
execjsRun = execjs.compile(runJs)
return execjsRun
except execjs._exceptions.CompileError as e:
print(f"JavaScript 编译错误: {e}")
except execjs._exceptions.RuntimeError as e:
print(f"JavaScript 运行时错误: {e}")
except Exception as e:
print(f"其他错误: {e}")
js_code_ym = '''delete __filename
delete __dirname
ActiveXObject = undefined
window = global;
content="content_code"
navigator = {"platform": "Linux aarch64"}
navigator = {"userAgent": "CtClient;11.0.0;Android;13;22081212C;NTIyMTcw!#!MTUzNzY"}
location={
"href": "https://",
"origin": "",
"protocol": "",
"host": "",
"hostname": "",
"port": "",
"pathname": "",
"search": "",
"hash": ""
}
i = {length: 0}
base = {length: 0}
div = {
getElementsByTagName: function (res) {
console.log('div中的getElementsByTagName', res)
if (res === 'i') {
return i
}
return '<div></div>'
}
}
script = {
}
meta = [
{charset:"UTF-8"},
{
content: content,
getAttribute: function (res) {
console.log('meta中的getAttribute', res)
if (res === 'r') {
return 'm'
}
},
parentNode: {
removeChild: function (res) {
console.log('meta中的removeChild', res)
return content
}
},
}
]
form = '<form></form>'
window.addEventListener= function (res) {
console.log('window中的addEventListener:', res)
}
document = {
createElement: function (res) {
console.log('document中的createElement', res)
if (res === 'div') {
return div
} else if (res === 'form') {
return form
}
else{return res}
},
addEventListener: function (res) {
console.log('document中的addEventListener:', res)
},
appendChild: function (res) {
console.log('document中的appendChild', res)
return res
},
removeChild: function (res) {
console.log('document中的removeChild', res)
},
getElementsByTagName: function (res) {
console.log('document中的getElementsByTagName', res)
if (res === 'script') {
return script
}
if (res === 'meta') {
return meta
}
if (res === 'base') {
return base
}
},
getElementById: function (res) {
console.log('document中的getElementById', res)
if (res === 'root-hammerhead-shadow-ui') {
return null
}
}
}
setInterval = function () {}
setTimeout = function () {}
window.top = window
'ts_code'
function main() {
cookie = document.cookie.split(';')[0]
return cookie
}'''
async def main(timeValue):
global runTime, js_codeRead
tasks = []
init_result = initCookie()
if init_result:
cookie = init_result['cookie']
execjsRun = init_result['execjsRun']
else:
print("初始化 cookies 失败")
return
runcookie = {
'cookie': cookie,
'execjsRun': execjsRun
}
# 添加输出 cookies 的代码
cookies = {
'yiUIIlbdQT3fO': runcookie['cookie'],
'yiUIIlbdQT3fP': runcookie['execjsRun'].call('main').split('=')[1]
}
print(json.dumps(cookies)) # 确保输出是 JSON 格式的
if __name__ == "__main__":
asyncio.run(main(0))

2569
电信/dx签到.js Normal file

File diff suppressed because it is too large Load Diff

393
电信/jdhf2.py Normal file
View File

@@ -0,0 +1,393 @@
import requests
import re
import time
import json
import random
import datetime
import base64
import threading
import ssl
import os
import sys
import certifi
from bs4 import BeautifulSoup
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
from Crypto.Cipher import DES3
from Crypto.Util.Padding import pad, unpad
from Crypto.Cipher import AES
from http import cookiejar
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
class BlockAll(cookiejar.CookiePolicy):
return_ok = set_ok = domain_return_ok = path_return_ok = lambda self, *args, **kwargs: False
netscape = True
rfc2965 = hide_cookie2 = False
ORIGIN_CIPHERS = ('DEFAULT@SECLEVEL=1')
ip_list = []
class DESAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
CIPHERS = ORIGIN_CIPHERS.split(':')
random.shuffle(CIPHERS)
CIPHERS = ':'.join(CIPHERS)
self.CIPHERS = CIPHERS + ':!aNULL:!eNULL:!MD5'
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).proxy_manager_for(*args, **kwargs)
requests.packages.urllib3.disable_warnings()
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
ssl_context.set_ciphers('DEFAULT@SECLEVEL=0')
ss = requests.session()
ss.verify = certifi.where()
ss.ssl=ssl_context
ss.headers={"User-Agent":"Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36","Referer":"https://wapact.189.cn:9001/JinDouMall/JinDouMall_independentDetails.html"}
ss.mount('https://', DESAdapter())
yc = 0.1
wt = 0
kswt = -3
yf = datetime.datetime.now().strftime("%Y%m")
jp = {"9":{},"12":{},"13":{},"23":{}}
try:
with open('电信金豆换话费.log') as fr:
dhjl = json.load(fr)
except:
dhjl = {}
if yf not in dhjl:
dhjl[yf] = {}
wxp={}
errcode = {
"0":"兑换成功",
"412":"兑换次数已达上限",
"413":"商品被抢光",
"420":"未知错误",
"429":"请求太频繁",
"999999":"未抢到",
"410":"活动已经结束",
"Y0001":"等级不够",
"Y0002":"需要使用翼相连网络才能兑换",
"Y0003":"需要共享流量才能兑换",
"Y0004":"需要更多共享流量",
"Y0005":"等级还不够",
"E0001":"网龄不足10年"
}
# 加密参数
key = b'1234567`90koiuyhgtfrdews'
iv = 8 * b'\0'
public_key_b64 = '''-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDBkLT15ThVgz6/NOl6s8GNPofdWzWbCkWnkaAm7O2LjkM1H7dMvzkiqdxU02jamGRHLX/ZNMCXHnPcW/sDhiFCBN18qFvy8g6VYb9QtroI09e176s+ZCtiv7hbin2cCTj99iUpnEloZm19lwHyo69u5UMiPMpq0/XKBO8lYhN/gwIDAQAB
-----END PUBLIC KEY-----'''
public_key_data = '''-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC+ugG5A8cZ3FqUKDwM57GM4io6JGcStivT8UdGt67PEOihLZTw3P7371+N47PrmsCpnTRzbTgcupKtUv8ImZalYk65dU8rjC/ridwhw9ffW2LBwvkEnDkkKKRi2liWIItDftJVBiWOh17o6gfbPoNrWORcAdcbpk2L+udld5kZNwIDAQAB
-----END PUBLIC KEY-----'''
def t(h):
date = datetime.datetime.now()
date_zero = datetime.datetime.now().replace(year=date.year, month=date.month, day=date.day, hour=h, minute=59, second=59)
date_zero_time = int(time.mktime(date_zero.timetuple()))
return date_zero_time
def encrypt(text):
cipher = DES3.new(key, DES3.MODE_CBC, iv)
ciphertext = cipher.encrypt(pad(text.encode(), DES3.block_size))
return ciphertext.hex()
def decrypt(text):
ciphertext = bytes.fromhex(text)
cipher = DES3.new(key, DES3.MODE_CBC, iv)
plaintext = unpad(cipher.decrypt(ciphertext), DES3.block_size)
return plaintext.decode()
def b64(plaintext):
public_key = RSA.import_key(public_key_b64)
cipher = PKCS1_v1_5.new(public_key)
ciphertext = cipher.encrypt(plaintext.encode())
return base64.b64encode(ciphertext).decode()
def encrypt_para(plaintext):
public_key = RSA.import_key(public_key_data)
cipher = PKCS1_v1_5.new(public_key)
ciphertext = cipher.encrypt(plaintext.encode())
return ciphertext.hex()
def encode_phone(text):
encoded_chars = []
for char in text:
encoded_chars.append(chr(ord(char) + 2))
return ''.join(encoded_chars)
def ophone(t):
key = b'34d7cb0bcdf07523'
utf8_key = key.decode('utf-8')
utf8_t = t.encode('utf-8')
cipher = AES.new(key, AES.MODE_ECB)
ciphertext = cipher.encrypt(pad(utf8_t, AES.block_size))
return ciphertext.hex()
def send(uid,content):
r = requests.post('https://wxpusher.zjiecode.com/api/send/message',json={"appToken":appToken,"content":content,"contentType":1,"uids":[uid]}).json()
return r
def userLoginNormal(phone,password):
alphabet = 'abcdef0123456789'
uuid = [''.join(random.sample(alphabet, 8)),''.join(random.sample(alphabet, 4)),'4'+''.join(random.sample(alphabet, 3)),''.join(random.sample(alphabet, 4)),''.join(random.sample(alphabet, 12))]
timestamp=datetime.datetime.now().strftime("%Y%m%d%H%M%S")
loginAuthCipherAsymmertric = 'iPhone 14 15.4.' + uuid[0] + uuid[1] + phone + timestamp + password[:6] + '0$$$0.'
login_date = {
"headerInfos": {
"code": "userLoginNormal",
"timestamp": timestamp,
"broadAccount": "",
"broadToken": "",
"clientType": "#10.5.0#channel50#iPhone 14 Pro Max#",
"shopId": "20002",
"source": "110003",
"sourcePassword": "Sid98s",
"token": "",
"userLoginName": encode_phone(phone)
},
"content": {
"attach": "test",
"fieldData": {
"loginType": "4",
"accountType": "",
"loginAuthCipherAsymmertric": b64(loginAuthCipherAsymmertric),
"deviceUid": uuid[0] + uuid[1] + uuid[2],
"phoneNum": encode_phone(phone),
"isChinatelecom": "0",
"systemVersion": "15.4.0",
"authentication": encode_phone(password)
}
}
}
r = ss.post('https://appgologin.189.cn:9031/login/client/userLoginNormal', json=login_date).json()
l = r['responseData']['data']['loginSuccessResult']
if l:
load_token[phone] = l
with open(load_token_file, 'w') as f:
json.dump(load_token, f)
ticket = get_ticket(phone,l['userId'],l['token'])
return ticket
return False
def get_ticket(phone,userId,token):
r = ss.post('https://appgologin.189.cn:9031/map/clientXML',data='<Request><HeaderInfos><Code>getSingle</Code><Timestamp>'+datetime.datetime.now().strftime("%Y%m%d%H%M%S")+'</Timestamp><BroadAccount></BroadAccount><BroadToken></BroadToken><ClientType>#9.6.1#channel50#iPhone 14 Pro Max#</ClientType><ShopId>20002</ShopId><Source>110003</Source><SourcePassword>Sid98s</SourcePassword><Token>'+token+'</Token><UserLoginName>'+phone+'</UserLoginName></HeaderInfos><Content><Attach>test</Attach><FieldData><TargetId>'+encrypt(userId)+'</TargetId><Url>4a6862274835b451</Url></FieldData></Content></Request>',headers={'user-agent': 'CtClient;10.4.1;Android;13;22081212C;NTQzNzgx!#!MTgwNTg1'})
tk = re.findall('<Ticket>(.*?)</Ticket>',r.text)
if len(tk) == 0:
return False
return decrypt(tk[0])
def queryInfo(phone,s):
a = 1
while a < 2:
r = s.get('https://wapact.189.cn:9001/gateway/golden/api/queryInfo',verify=certifi.where()).json()
try:
print(f'{phone} 金豆余额是 {r["biz"]["amountTotal"]} ')
amountTotal= r["biz"]["amountTotal"]
except:
amountTotal = 0
if amountTotal< 3000:
res = s.post('http://wapact.189.cn:9000/gateway/stand/detail/exchange',json={"activityId":jdaid}).text
print(res)
time.sleep(3)
else:
return r
a += 1
return r
def exchange(phone,s,title,aid, uid):
try:
r = s.post('https://wapact.189.cn:9001/gateway/standExchange/detailNew/exchange',json={"activityId":aid})
print(f"响应码: {r.status_code}{r.text} ")
r = r.json()
if r["code"] == 0:
if r["biz"] != {} and r["biz"]["resultCode"] in errcode:
print(f'{str(datetime.datetime.now())[11:22]} {phone}{title} {errcode[r["biz"]["resultCode"]]}')
if r["biz"]["resultCode"] in ["0","412"]:
if r["biz"]["resultCode"] == "0":
msg = f'{phone}{title}兑换成功'
send(uid, msg)
if phone not in dhjl[yf][title]:
dhjl[yf][title] += "#"+phone
with open('电信金豆换话费.log', 'w') as f:
json.dump(dhjl, f, ensure_ascii=False)
else:
print(f'{str(datetime.datetime.now())[11:22]} {phone} :没抢到')
except Exception as e:
pass
def dh(phone,s,title,aid,wt, uid):
while wt > time.time():
pass
print(f"{str(datetime.datetime.now())[11:22]} {phone}{title} 开始兑换")
for cs in range(cfcs):
threading.Thread(target=exchange,args=(phone,s,title,aid, uid)).start()
def lottery(s):
for cishu in range(3):
try:
r = s.post('https://wapact.189.cn:9001/gateway/golden/api/lottery',json={"activityId":"6384b49b1e44396da4f1e4a3"})
except:
pass
time.sleep(3)
def aes_ecb_encrypt(plaintext, key):
key = key.encode('utf-8')
if len(key) not in [16, 24, 32]:
raise ValueError("密钥长度必须为16/24/32字节")
padded_data = pad(plaintext.encode('utf-8'), AES.block_size)
cipher = AES.new(key, AES.MODE_ECB)
ciphertext = cipher.encrypt(padded_data)
return base64.b64encode(ciphertext).decode('utf-8')
def ks(phone, ticket, uid):
global wt
wxp[phone] = uid
s = requests.session()
s.headers={"User-Agent":"Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36","Referer":"https://wapact.189.cn:9001/JinDouMall/JinDouMall_independentDetails.html"}
s.cookies.set_policy(BlockAll())
s.mount('https://', DESAdapter())
s.timeout = 30
data = aes_ecb_encrypt(json.dumps({"ticket":ticket,"backUrl":"https%3A%2F%2Fwapact.189.cn%3A9001","platformCode":"P201010301","loginType":2}), 'telecom_wap_2018')
login = ss.post('https://wapact.189.cn:9001/unified/user/login',data=data, headers={"Content-Type":"application/json;charset=UTF-8","Accept":"application/json, text/javascript, */*; q=0.01"}).json()
if login['code'] == 0:
print(phone+" 获取token成功")
s.headers["Authorization"] = "Bearer " + login["biz"]["token"]
queryInfo(phone,s)
queryBigDataAppGetOrInfo = s.get('https://waphub.189.cn/gateway/golden/goldGoods/getGoodsList??floorType=0&userType=1&page&1&order=3&tabOrder=').json()
for i in queryBigDataAppGetOrInfo["biz"]["ExchangeGoodslist"]:
if '话费' not in i["title"]:continue
if '0.5元' in i["title"] or '5元' in i["title"]:
jp["9"][i["title"]] = i["id"]
elif '1元' in i["title"] or '10元' in i["title"]:
jp["13"][i["title"]] = i["id"]
else:
jp["12"][i["title"]] = i["id"]
h = datetime.datetime.now().hour
if 11 > h > 1:
h = 9
elif 23 > h > 1:
h = 13
else:
h = 23
if len(sys.argv) ==2:
h = int(sys.argv[1])
d = jp[str(h)]
wt = t(h) + kswt
if jp["12"] != {}:
d.update(jp["12"])
wt = 0
for di in d:
if di not in dhjl[yf]:
dhjl[yf][di] = ""
if phone in dhjl[yf][di] :
print(f"{phone}{di} 已经兑换过")
else:
print(f"{phone}{di} 准备好")
if wt - time.time() > 20 * 60:
print("等待时间超过20分钟")
return
threading.Thread(target=dh,args=(phone,s,di,d[di],wt, uid)).start()
else:
print(f"{phone} 获取token失败{login['message']}")
START_LOG = rf'''
'''
def main():
print(START_LOG)
global wt, rs
rs = 0 # 强制关闭瑞数加密
if os.environ.get('chinaTelecomAccount')!= None:
chinaTelecomAccount = os.environ.get('chinaTelecomAccount')
else:
chinaTelecomAccount = jdhf
for i in chinaTelecomAccount.split('&'):
i = i.split('#')
phone = i[0]
password = i[1]
uid = i[-1]
ticket = False
if phone in load_token:
print(f'{phone} 正在使用缓存登录')
ticket = get_ticket(phone,load_token[phone]['userId'],load_token[phone]['token'])
if ticket == False:
print(f'{phone} 正在使用密码登录')
ticket = userLoginNormal(phone,password)
if ticket:
threading.Thread(target=ks,args=(phone, ticket, uid)).start()
time.sleep(1)
else:
print(f'{phone} 登录失败')
# 手机号@密码@wxpusheruid
jdhf = ""
# 重发次数
cfcs = 15
# wxpusher推送appToken
appToken = ""
jdaid = '60dd79533dc03d3c76bdde30'
load_token_file = 'chinaTelecom_cache.json'
try:
with open(load_token_file, 'r') as f:
load_token = json.load(f)
except:
load_token = {}
main()

134
电信/瑞数通杀.js Normal file
View File

@@ -0,0 +1,134 @@
delete __filename
delete __dirname
ActiveXObject = undefined
window = global;
content="content_code"
navigator = {"platform": "Linux aarch64"}
navigator = {"userAgent": "CtClient;11.0.0;Android;13;22081212C;NTIyMTcw!#!MTUzNzY"}
location={
"href": "https://",
"origin": "",
"protocol": "",
"host": "",
"hostname": "",
"port": "",
"pathname": "",
"search": "",
"hash": ""
}
i = {length: 0}
base = {length: 0}
div = {
getElementsByTagName: function (res) {
console.log('div中的getElementsByTagName', res)
if (res === 'i') {
return i
}
return '<div></div>'
}
}
script = {
}
meta = [
{charset:"UTF-8"},
{
content: content,
getAttribute: function (res) {
console.log('meta中的getAttribute', res)
if (res === 'r') {
return 'm'
}
},
parentNode: {
removeChild: function (res) {
console.log('meta中的removeChild', res)
return content
}
},
}
]
form = '<form></form>'
window.addEventListener= function (res) {
console.log('window中的addEventListener:', res)
}
document = {
createElement: function (res) {
console.log('document中的createElement', res)
if (res === 'div') {
return div
} else if (res === 'form') {
return form
}
else{return res}
},
addEventListener: function (res) {
console.log('document中的addEventListener:', res)
},
appendChild: function (res) {
console.log('document中的appendChild', res)
return res
},
removeChild: function (res) {
console.log('document中的removeChild', res)
},
getElementsByTagName: function (res) {
console.log('document中的getElementsByTagName', res)
if (res === 'script') {
return script
}
if (res === 'meta') {
return meta
}
if (res === 'base') {
return base
}
},
getElementById: function (res) {
console.log('document中的getElementById', res)
if (res === 'root-hammerhead-shadow-ui') {
return null
}
}
}
setInterval = function () {}
setTimeout = function () {}
window.top = window
'ts_code'
function main() {
cookie = document.cookie.split(';')[0]
return cookie
}

568
电信/电信0点321.py Normal file
View File

@@ -0,0 +1,568 @@
#!/usr/bin/env python3
import os
import re
import sys
import ssl
import time
import json
import execjs
import base64
import random
import certifi
import aiohttp
import asyncio
import datetime
import requests
import binascii
import hashlib
from lxml import etree
from http import cookiejar
from Crypto.Cipher import AES, DES3, PKCS1_v1_5
from Crypto.PublicKey import RSA
from Crypto.Util.Padding import pad, unpad
from aiohttp import ClientSession, TCPConnector
from concurrent.futures import ThreadPoolExecutor
import subprocess
import base64,zlib
# 缓存文件路径
CACHE_FILE = 'chinaTelecom_cache.json'
# 尝试从缓存文件中加载凭证
def load_cache():
try:
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, 'r') as f:
return json.load(f)
except Exception as e:
print_error(f"加载缓存失败: {str(e)}")
return {}
# 保存凭证到缓存文件
def save_cache(cache):
try:
with open(CACHE_FILE, 'w') as f:
json.dump(cache, f)
except Exception as e:
print_error(f"保存缓存失败: {str(e)}")
# 彩色输出定义
class Color:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# ASCII艺术图标
LOGO = f'''
{Color.OKBLUE}
_____ _ _ _ _
|_ _|____ | | ___ ___| |_| |__ ___ _ __ | |_ ___ _ __
| |/ _ \\\\ \\| |/ _ \\/ __| __| '_ \\ / _ \\| '_ \\| __/ _ \\| '__|
| | __/ \\ V | __/\\__ \\ |_| | | | (_) | | | | || (_) | |
|_|\\___| |_|\\___||___/\\__|_| |_|\\___/|_| |_|\\__\\___/|_|
{Color.ENDC}
{Color.OKGREEN}==================== 0点抢话费脚本 v1.6 ===================={Color.ENDC}
'''
def printn(m):
current_time = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f'\n[{Color.OKCYAN}{current_time}{Color.ENDC}] {m}')
def print_info(m):
print(f'[{Color.OKGREEN}INFO{Color.ENDC}] {m}')
def print_warn(m):
print(f'[{Color.WARNING}WARN{Color.ENDC}] {m}')
def print_error(m):
print(f'[{Color.FAIL}ERROR{Color.ENDC}] {m}')
def print_success(m):
print(f'[{Color.OKGREEN}SUCCESS{Color.ENDC}] {m}')
context = ssl.create_default_context()
context.set_ciphers('DEFAULT@SECLEVEL=1') # 低安全级别0/1
context.check_hostname = False # 禁用主机
context.verify_mode = ssl.CERT_NONE # 禁用证书
class DESAdapter(requests.adapters.HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = context
return super().init_poolmanager(*args, **kwargs)
requests.packages.urllib3.disable_warnings()
ss = requests.session()
ss.headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36",
"Referer": "https://wapact.189.cn:9001/JinDouMall/JinDouMall_independentDetails.html"
}
ss.mount('https://', DESAdapter())
# 修复使用正确的方式禁用cookie
class BlockAll(cookiejar.CookiePolicy):
return_ok = set_ok = domain_return_ok = path_return_ok = lambda self, *args, **kwargs: False
netscape = True
rfc2965 = hide_cookie2 = False
# 使用自定义的CookiePolicy
ss.cookies.set_policy(BlockAll())
run_num = os.environ.get('reqNUM') or "5"
MAX_RETRIES = 3
RATE_LIMIT = 10 # 每秒请求数限制
class RateLimiter:
def __init__(self, rate_limit):
self.rate_limit = rate_limit
self.tokens = rate_limit
self.updated_at = time.monotonic()
async def acquire(self):
while self.tokens < 1:
self.add_new_tokens()
await asyncio.sleep(0.1)
self.tokens -= 1
def add_new_tokens(self):
now = time.monotonic()
time_since_update = now - self.updated_at
new_tokens = time_since_update * self.rate_limit
if new_tokens > 1:
self.tokens = min(self.tokens + new_tokens, self.rate_limit)
self.updated_at = now
class AsyncSessionManager:
def __init__(self):
self.session = None
self.connector = None
async def __aenter__(self):
ssl_context = ssl.create_default_context(cafile=certifi.where())
ssl_context.set_ciphers('DEFAULT@SECLEVEL=1')
self.connector = TCPConnector(ssl=ssl_context, limit=1000)
self.session = ClientSession(connector=self.connector)
return self.session
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
await self.connector.close()
async def retry_request(session, method, url, **kwargs):
for attempt in range(MAX_RETRIES):
try:
await asyncio.sleep(1)
async with session.request(method, url,** kwargs) as response:
return await response.json()
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError) as e:
print_error(f"请求失败,第 {attempt + 1} 次重试: {e}")
if attempt == MAX_RETRIES - 1:
raise
await asyncio.sleep(2 **attempt)
key = b'1234567`90koiuyhgtfrdews'
iv = 8 * b'\0'
public_key_b64 = '''-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDBkLT15ThVgz6/NOl6s8GNPofdWzWbCkWnkaAm7O2LjkM1H7dMvzkiqdxU02jamGRHLX/ZNMCXHnPcW/sDhiFCBN18qFvy8g6VYb9QtroI09e176s+ZCtiv7hbin2cCTj99iUpnEloZm19lwHyo69u5UMiPMpq0/XKBO8lYhN/gwIDAQAB
-----END PUBLIC KEY-----'''
public_key_data = '''-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC+ugG5A8cZ3FqUKDwM57GM4io6JGcStivT8UdGt67PEOihLZTw3P7371+N47PrmsCpnTRzbTgcupKtUv8ImZalYk65dU8rjC/ridwhw9ffW2LBwvkEnDkkKKRi2liWIItDftJVBiWOh17o6gfbPoNrWORcAdcbpk2L+udld5kZNwIDAQAB
-----END PUBLIC KEY-----'''
def get_first_three(value):
if isinstance(value, (int, float)):
return int(str(value)[:3])
elif isinstance(value, str):
return str(value)[:3]
else:
raise TypeError("error")
def run_Time(hour, minute, second):
date = datetime.datetime.now()
date_zero = datetime.datetime.now().replace(year=date.year, month=date.month, day=date.day, hour=hour, minute=minute, second=second)
date_zero_time = int(time.mktime(date_zero.timetuple()))
return date_zero_time
def encrypt(text):
cipher = DES3.new(key, DES3.MODE_CBC, iv)
ciphertext = cipher.encrypt(pad(text.encode(), DES3.block_size))
return ciphertext.hex()
def decrypt(text):
ciphertext = bytes.fromhex(text)
cipher = DES3.new(key, DES3.MODE_CBC, iv)
plaintext = unpad(cipher.decrypt(ciphertext), DES3.block_size)
return plaintext.decode()
def b64(plaintext):
public_key = RSA.import_key(public_key_b64)
cipher = PKCS1_v1_5.new(public_key)
ciphertext = cipher.encrypt(plaintext.encode())
return base64.b64encode(ciphertext).decode()
def encrypt_para(plaintext):
if not isinstance(plaintext, str):
plaintext = json.dumps(plaintext)
public_key = RSA.import_key(public_key_data)
cipher = PKCS1_v1_5.new(public_key)
ciphertext = cipher.encrypt(plaintext.encode())
return binascii.hexlify(ciphertext).decode()
def encrypt_paraNew(p):
k = RSA.import_key(public_key_data)
c = PKCS1_v1_5.new(k)
s = k.size_in_bytes() - 11
d = p.encode() if isinstance(p, str) else json.dumps(p).encode()
return binascii.hexlify(b''.join(c.encrypt(d[i:i+s]) for i in range(0, len(d), s))).decode()
def encode_phone(text):
encoded_chars = []
for char in text:
encoded_chars.append(chr(ord(char) + 2))
return ''.join(encoded_chars)
def userLoginNormal(phone, password):
alphabet = 'abcdef0123456789'
uuid = [''.join(random.sample(alphabet, 8)), ''.join(random.sample(alphabet, 4)),
'4' + ''.join(random.sample(alphabet, 3)), ''.join(random.sample(alphabet, 4)),
''.join(random.sample(alphabet, 12))]
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
loginAuthCipherAsymmertric = 'iPhone 14 15.4.' + uuid[0] + uuid[1] + phone + timestamp + password[:6] + '0$$$0.'
try:
r = ss.post(
'https://appgologin.189.cn:9031/login/client/userLoginNormal',
json={
"headerInfos": {
"code": "userLoginNormal",
"timestamp": timestamp,
"broadAccount": "",
"broadToken": "",
"clientType": "#11.3.0#channel35#Xiaomi Redmi K30 Pro#",
"shopId": "20002",
"source": "110003",
"sourcePassword": "Sid98s",
"token": "",
"userLoginName": encode_phone(phone)
},
"content": {
"attach": "test",
"fieldData": {
"loginType": "4",
"accountType": "",
"loginAuthCipherAsymmertric": b64(loginAuthCipherAsymmertric),
"deviceUid": uuid[0] + uuid[1] + uuid[2],
"phoneNum": encode_phone(phone),
"isChinatelecom": "0",
"systemVersion": "12",
"authentication": encode_phone(password)
}
}
},
verify=certifi.where()
).json()
l = r['responseData']['data']['loginSuccessResult']
if l:
ticket = get_ticket(phone, l['userId'], l['token'])
return ticket, l['userId'], l['token']
return False, None, None
except Exception as e:
print_error(f"登录请求异常: {str(e)}")
return False, None, None
async def exchangeForDay(phone, session, run_num, rid, stime, accId):
async def delayed_conversion(delay):
await asyncio.sleep(delay)
await conversionRights(phone, rid, session, accId)
tasks = [asyncio.create_task(delayed_conversion(i * stime)) for i in range(int(run_num))]
await asyncio.gather(*tasks)
def get_ticket(phone, userId, token):
try:
r = ss.post(
'https://appgologin.189.cn:9031/map/clientXML',
data='<Request><HeaderInfos><Code>getSingle</Code><Timestamp>'+datetime.datetime.now().strftime("%Y%m%d%H%M%S")+'</Timestamp><BroadAccount></BroadAccount><BroadToken></BroadToken><ClientType>#9.6.1#channel50#iPhone 14 Pro Max#</ClientType><ShopId>20002</ShopId><Source>110003</Source><SourcePassword>Sid98s</SourcePassword><Token>'+token+'</Token><UserLoginName>'+phone+'</UserLoginName></HeaderInfos><Content><Attach>test</Attach><FieldData><TargetId>'+encrypt(userId)+'</TargetId><Url>4a6862274835b451</Url></FieldData></Content></Request>',
headers={'user-agent': 'CtClient;10.4.1;Android;13;22081212C;NTQzNzgx!#!MTgwNTg1'},
verify=certifi.where()
)
tk = re.findall('<Ticket>(.*?)</Ticket>', r.text)
if len(tk) == 0:
return False
return decrypt(tk[0])
except Exception as e:
print_error(f"获取ticket异常: {str(e)}")
return False
async def exchange(s, phone, title, rid, jsexec, ckvalue):
try:
url = "https://wapact.189.cn:9001/gateway/standExchange/detailNew/exchange"
get_url = await asyncio.to_thread(jsexec.call, "getUrl", "POST", url)
async with s.post(get_url, cookies=ckvalue, json={"activityId": rid}) as response:
pass
except Exception as e:
print_error(e)
async def check(s, item, ckvalue):
checkGoods = s.get('https://wapact.189.cn:9001/gateway/stand/detailNew/check?activityId=' + item, cookies=ckvalue).json()
return checkGoods
async def conversionRights(phone, rid, session, accId):
try:
value = {
"id": rid,
"accId": accId,
"showType": "9003",
"showEffect": "8",
"czValue": "0"
}
paraV = encrypt_paraNew(value)
printn(f"{Color.OKGREEN}{get_first_three(phone)}: 开始兑换{Color.ENDC}")
response = session.post(
'https://wappark.189.cn/jt-sign/paradise/receiverRights',
json={"para": paraV}
)
login = response.json()
printn(f"{get_first_three(phone)}: {login}")
if '兑换成功' in response.text:
print_success(f"{get_first_three(phone)}: 兑换成功!")
except Exception as e:
print_error(f"{get_first_three(phone)}: 兑换请求发生错误: {str(e)}")
async def getLevelRightsList(phone, session, accId):
try:
value = {
"type": "hg_qd_djqydh",
"accId": accId,
"shopId": "20001"
}
paraV = encrypt_paraNew(value)
response = session.post(
'https://wappark.189.cn/jt-sign/paradise/queryLevelRightInfo',
json={"para": paraV}
)
data = response.json()
if data.get('code') == 401:
print_warn(f"获取失败:{data}, 原因大概是sign过期了")
return None
current_level = int(data['currentLevel'])
key_name = 'V' + str(current_level)
ids = [item['activityId'] for item in data.get(key_name, []) if '话费' in item.get('title', "")]
return ids
except Exception as e:
print_warn(f"获取失败, 重试一次: {str(e)}")
try:
paraV = encrypt_para(value)
response = session.post(
'https://wappark.189.cn/jt-sign/paradise/getLevelRightsList',
json={"para": paraV}
)
data = response.json()
if data.get('code') == 401:
print_warn(f"重试获取失败:{data}, 原因大概是sign过期了")
return None
current_level = int(data['currentLevel'])
key_name = 'V' + str(current_level)
ids = [item['activityId'] for item in data.get(key_name, []) if '话费' in item.get('title', "")]
return ids
except Exception as e:
print_error(f"重试也失败了: {str(e)}")
return None
async def getSign(ticket, session):
try:
response = session.get(
'https://wappark.189.cn/jt-sign/ssoHomLogin?ticket=' + ticket,
headers={
'User-Agent': "Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36"
}
).json()
if response.get('resoultCode') == '0':
sign = response.get('sign')
accId = response.get('accId')
return sign, accId
else:
print_warn(f"获取sign失败[{response.get('resoultCode')}]: {response}")
except Exception as e:
print_error(f"getSign 发生错误: {str(e)}")
return None
async def qgNight(phone, ticket, timeDiff, isTrue):
if isTrue:
runTime = run_Time(23, 59, 3)
else:
runTime = 0
if runTime > (time.time() + timeDiff):
difftime = runTime - time.time() - timeDiff
printn(f"当前时间:{str(datetime.datetime.now())[11:23]}, 跟设定的时间不同, 等待{difftime}秒开始兑换每天一次的")
await asyncio.sleep(difftime)
session = requests.Session()
session.mount('https://', DESAdapter())
session.verify = False # 禁用证书验证
signx = await getSign(ticket, session)
if signx:
sign, accId = signx
printn(f"当前时间:{str(datetime.datetime.now())[11:23]}获取到了Sign: {Color.OKGREEN}{sign}{Color.ENDC}")
session.headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36",
"sign": sign
}
else:
print_warn("未获取sign。")
return
rightsId = await getLevelRightsList(phone, session, accId)
if rightsId:
printn(f"获取到了rightsId: {Color.OKGREEN}{rightsId[0]}{Color.ENDC}")
else:
print_warn("未能获取rightsId。")
return
if isTrue:
runTime2 = run_Time(23, 59, 56) + 0.3
difftime = runTime2 - time.time() - timeDiff
printn(f"等待{difftime}s")
await asyncio.sleep(difftime)
await exchangeForDay(phone, session, run_num, rightsId[0], 0.1, accId)
async def qgDay(phone, ticket, timeDiff, isTrue):
async with AsyncSessionManager() as s:
pass
async def main(timeDiff, isTRUE, hour):
print(LOGO)
print_info("脚本初始化完成,开始准备抢话费...")
# 加载缓存
cache = load_cache()
tasks = []
PHONES = os.environ.get('chinaTelecomAccount')
if not PHONES:
print_error("错误: 未设置 chinaTelecomAccount 环境变量,请配置为 账号#密码 格式")
return
phone_list = PHONES.split('&')
print_info(f"检测到 {len(phone_list)} 个账号将参与抢话费")
for phoneV in phone_list:
value = phoneV.split('#')
if len(value) != 2:
print_warn(f"跳过无效账号格式: {phoneV},请使用 账号#密码 格式")
continue
phone, password = value[0], value[1]
printn(f"{Color.OKBLUE}{get_first_three(phone)}: 开始登录{Color.ENDC}")
max_retries = 3
retry_count = 0
ticket = None
# 先尝试使用缓存登录
if phone in cache:
printn(f"{get_first_three(phone)}: 尝试缓存登录")
cache_data = cache[phone]
ticket = get_ticket(phone, cache_data['userId'], cache_data['token'])
if ticket:
print_success(f"{get_first_three(phone)}: 缓存登录成功")
# 缓存登录失败则尝试密码登录
if not ticket:
printn(f"{get_first_three(phone)}: 缓存登录失败,尝试密码登录")
while retry_count < max_retries and not ticket:
ticket, userId, token = userLoginNormal(phone, password)
if ticket:
# 更新缓存
cache[phone] = {'userId': userId, 'token': token}
save_cache(cache)
print_success(f"{get_first_three(phone)}: 密码登录成功并更新缓存")
break
else:
print_warn(f"{get_first_three(phone)}: 第{retry_count+1}次登录失败,准备重试")
retry_count += 1
await asyncio.sleep(1) # 添加1秒延迟避免频繁请求
if ticket:
if hour > 15:
tasks.append(qgNight(phone, ticket, timeDiff, isTRUE))
else: # 十点//十四点场次
tasks.append(qgNight(phone, ticket, timeDiff, isTRUE))
else:
print_error(f"{get_first_three(phone)}: 登录失败,已达最大重试次数{max_retries}")
if tasks:
print_info(f"准备执行 {len(tasks)} 个抢话费任务")
await asyncio.gather(*tasks)
else:
print_warn("没有可执行的抢话费任务")
if __name__ == "__main__":
isTRUE = True #实际生产环境设为True测试时可设为False忽略时间限制
print_info(f"环境变量配置: chinaTelecomAccount = {os.environ.get('chinaTelecomAccount', '未设置')}")
print_info(f"请求次数配置: reqNUM = {run_num}")
print_info(f"测试模式配置: test0 = {isTRUE} (true为正常模式false为测试模式默认为true)")
h = datetime.datetime.now().hour
print_info(f"当前系统时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if 10 > h > 0:
print_info(f"当前小时为: {h} 已过0点但未到10点v开始准备抢凌晨场次")
wttime = run_Time(23, 59, 8) # 抢十点场次
elif 14 >= h >= 10:
print_info(f"当前小时为: {h} 已过10点但未到14点开始准备抢凌晨场次")
wttime = run_Time(23, 59, 8) # 抢十四点场次
else:
print_info(f"当前小时为: {h} 已过14点开始准备抢凌晨场次")
wttime = run_Time(23, 58, 58) # 抢凌晨场次
if wttime > time.time():
wTime = wttime - time.time()
print_info(f"未到抢话费时间,计算后等待: {wTime:.2f}")
if isTRUE:
print_warn("注意: 一定要先测试,根据自身网络设定重发次数和多账号策略,避免抢购过早或过晚")
print_info("开始等待抢话费时间...")
time.sleep(wTime)
timeValue = 0 # getApiTime("https://f.m.suning.com/api/ct.do")
timeDiff = timeValue if timeValue > 0 else 0
try:
asyncio.run(main(timeDiff, isTRUE, h))
except Exception as e:
print_error(f"脚本执行过程中发生异常: {str(e)}")
finally:
print_info("所有任务都已执行完毕!")