From 6578a7b1f6c9859b78f53451cdfd00d310a1a4e4 Mon Sep 17 00:00:00 2001 From: XiaoGe-LiBai <2381919454@qq.com> Date: Mon, 2 Dec 2024 09:48:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9B=E5=BB=BA=20=E7=94=B5=E4=BF=A1?= =?UTF-8?q?=E5=85=91=E6=8D=A2.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 电信兑换.py | 451 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 451 insertions(+) create mode 100644 电信兑换.py diff --git a/电信兑换.py b/电信兑换.py new file mode 100644 index 0000000..287d196 --- /dev/null +++ b/电信兑换.py @@ -0,0 +1,451 @@ +""" +新电信抢话费 + +群里发的,未测试好,自测 +修改内容如下“ +1.删除内置的一个手机账号 +2.修改环境变量名保持和拉菲电信金豆本环境变量一致 +3.恢复瑞数通杀.js调用地址,确实也不知道是啥。398、399行注释 + +环境变量chinaTelecomAccount,值为:账号#密码 + +cron: 57 9,13,23 * * * +const $ = new Env("新电信抢话费"); + +""" + +import os +import re +import sys +import ssl +import time +import json +import execjs +import base64 +import random +import certifi +import aiohttp +import asyncio +import certifi +import datetime +import requests +import binascii +from lxml import etree +from http import cookiejar +from Crypto.Cipher import AES +from Crypto.Cipher import DES3 +from Crypto.PublicKey import RSA +from Crypto.Cipher import PKCS1_v1_5 +from Crypto.Util.Padding import pad, unpad +from aiohttp import ClientSession, TCPConnector +from concurrent.futures import ThreadPoolExecutor +run_num=os.environ.get('reqNUM') or "80" + +diffValue=2 +MAX_RETRIES = 3 +RATE_LIMIT = 10 # 每秒请求数限制 + +class RateLimiter: + def __init__(self, rate_limit): + self.rate_limit = rate_limit + self.tokens = rate_limit + self.updated_at = time.monotonic() + + async def acquire(self): + while self.tokens < 1: + self.add_new_tokens() + await asyncio.sleep(0.1) + self.tokens -= 1 + + def add_new_tokens(self): + now = time.monotonic() + time_since_update = now - self.updated_at + new_tokens = time_since_update * self.rate_limit + if new_tokens > 1: + self.tokens = min(self.tokens + new_tokens, self.rate_limit) + self.updated_at = now + +class AsyncSessionManager: + def __init__(self): + self.session = None + self.connector = None + + async def __aenter__(self): + ssl_context = ssl.create_default_context(cafile=certifi.where()) + ssl_context.set_ciphers('DEFAULT@SECLEVEL=1') + self.connector = TCPConnector(ssl=ssl_context, limit=1000) + self.session = ClientSession(connector=self.connector) + return self.session + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.session.close() + await self.connector.close() + +async def retry_request(session, method, url, **kwargs): + for attempt in range(MAX_RETRIES): + try: + await asyncio.sleep(1) + async with session.request(method, url, **kwargs) as response: + return await response.json() + # return await response.json() + + except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError) as e: + print(f"请求失败,第 {attempt + 1} 次重试: {e}") + if attempt == MAX_RETRIES - 1: + raise + await asyncio.sleep(2 ** attempt) + +class BlockAll(cookiejar.CookiePolicy): + return_ok = set_ok = domain_return_ok = path_return_ok = lambda self, *args, **kwargs: False + netscape = True + rfc2965 = hide_cookie2 = False + +def printn(m): + print(f'\n{m}') + +context = ssl.create_default_context() +context.set_ciphers('DEFAULT@SECLEVEL=1') # 低安全级别0/1 +context.check_hostname = False # 禁用主机 +context.verify_mode = ssl.CERT_NONE # 禁用证书 + +class DESAdapter(requests.adapters.HTTPAdapter): + def init_poolmanager(self, *args, **kwargs): + kwargs['ssl_context'] = context + return super().init_poolmanager(*args, **kwargs) + +requests.packages.urllib3.disable_warnings() +# urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +ss = requests.session() +ss.headers={"User-Agent":"Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36","Referer":"https://wapact.189.cn:9001/JinDouMall/JinDouMall_independentDetails.html"} +ss.mount('https://', DESAdapter()) +ss.cookies.set_policy(BlockAll()) +runTime = 0 +key = b'1234567`90koiuyhgtfrdews' +iv = 8 * b'\0' + +public_key_b64 = '''-----BEGIN PUBLIC KEY----- +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDBkLT15ThVgz6/NOl6s8GNPofdWzWbCkWnkaAm7O2LjkM1H7dMvzkiqdxU02jamGRHLX/ZNMCXHnPcW/sDhiFCBN18qFvy8g6VYb9QtroI09e176s+ZCtiv7hbin2cCTj99iUpnEloZm19lwHyo69u5UMiPMpq0/XKBO8lYhN/gwIDAQAB +-----END PUBLIC KEY-----''' + +public_key_data = '''-----BEGIN PUBLIC KEY----- +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC+ugG5A8cZ3FqUKDwM57GM4io6JGcStivT8UdGt67PEOihLZTw3P7371+N47PrmsCpnTRzbTgcupKtUv8ImZalYk65dU8rjC/ridwhw9ffW2LBwvkEnDkkKKRi2liWIItDftJVBiWOh17o6gfbPoNrWORcAdcbpk2L+udld5kZNwIDAQAB +-----END PUBLIC KEY-----''' + +def get_first_three(value): + # 处理数字情况 + if isinstance(value, (int, float)): + return int(str(value)[:3]) + elif isinstance(value, str): + return str(value)[:3] + else: + raise TypeError("error") + +def run_Time(hour,miute,second): + date = datetime.datetime.now() + date_zero = datetime.datetime.now().replace(year=date.year, month=date.month, day=date.day, hour=hour, minute=miute, second=second) + date_zero_time = int(time.mktime(date_zero.timetuple())) + return date_zero_time + +def encrypt(text): + cipher = DES3.new(key, DES3.MODE_CBC, iv) + ciphertext = cipher.encrypt(pad(text.encode(), DES3.block_size)) + return ciphertext.hex() + +def decrypt(text): + ciphertext = bytes.fromhex(text) + cipher = DES3.new(key, DES3.MODE_CBC, iv) + plaintext = unpad(cipher.decrypt(ciphertext), DES3.block_size) + return plaintext.decode() + +def b64(plaintext): + public_key = RSA.import_key(public_key_b64) + cipher = PKCS1_v1_5.new(public_key) + ciphertext = cipher.encrypt(plaintext.encode()) + return base64.b64encode(ciphertext).decode() + +def encrypt_para(plaintext): + if not isinstance(plaintext, str): + plaintext = json.dumps(plaintext) + public_key = RSA.import_key(public_key_data) + cipher = PKCS1_v1_5.new(public_key) + ciphertext = cipher.encrypt(plaintext.encode()) + return binascii.hexlify(ciphertext).decode() + +def encode_phone(text): + encoded_chars = [] + for char in text: + encoded_chars.append(chr(ord(char) + 2)) + return ''.join(encoded_chars) + + +def getApiTime(api_url): + try: + with requests.get(api_url) as response: + if(not response or not response.text): + return time.time() + json_data = json.loads(response.text) + if (json_data.get("api")and json_data.get("api")not in("time") ): + timestamp_str = json_data.get('data', {}).get('t', '') + else: + timestamp_str = json_data.get('currentTime', {}) + timestamp = int(timestamp_str) / 1000.0 # 将毫秒转为秒 + difftime=time.time()-timestamp + return difftime; + except Exception as e: + print(f"获取时间失败: {e}") + return 0; + + +def userLoginNormal(phone,password): + alphabet = 'abcdef0123456789' + uuid = [''.join(random.sample(alphabet, 8)),''.join(random.sample(alphabet, 4)),'4'+''.join(random.sample(alphabet, 3)),''.join(random.sample(alphabet, 4)),''.join(random.sample(alphabet, 12))] + timestamp=datetime.datetime.now().strftime("%Y%m%d%H%M%S") + loginAuthCipherAsymmertric = 'iPhone 14 15.4.' + uuid[0] + uuid[1] + phone + timestamp + password[:6] + '0$$$0.' + r = ss.post('https://appgologin.189.cn:9031/login/client/userLoginNormal',json={"headerInfos": {"code": "userLoginNormal", "timestamp": timestamp, "broadAccount": "", "broadToken": "", "clientType": "#9.6.1#channel50#iPhone 14 Pro Max#", "shopId": "20002", "source": "110003", "sourcePassword": "Sid98s", "token": "", "userLoginName": phone}, "content": {"attach": "test", "fieldData": {"loginType": "4", "accountType": "", "loginAuthCipherAsymmertric": b64(loginAuthCipherAsymmertric), "deviceUid": uuid[0] + uuid[1] + uuid[2], "phoneNum": encode_phone(phone), "isChinatelecom": "0", "systemVersion": "15.4.0", "authentication": password}}},verify=certifi.where()).json() + l = r['responseData']['data']['loginSuccessResult'] + if l: + ticket = get_ticket(phone,l['userId'],l['token']) + return ticket + return False + +async def exchangeForDay(phone, session, run_Time, rid, stime): + async def delayed_conversion(delay): + await asyncio.sleep(delay) + await conversionRights(phone, rid,session) + tasks = [asyncio.create_task(delayed_conversion(i * stime)) for i in range(int(run_Time))] + await asyncio.gather(*tasks) +def get_ticket(phone,userId,token): + r = ss.post('https://appgologin.189.cn:9031/map/clientXML',data='getSingle'+datetime.datetime.now().strftime("%Y%m%d%H%M%S")+'#9.6.1#channel50#iPhone 14 Pro Max#20002110003Sid98s'+token+''+phone+'test'+encrypt(userId)+'4a6862274835b451',headers={'user-agent': 'CtClient;10.4.1;Android;13;22081212C;NTQzNzgx!#!MTgwNTg1'},verify=certifi.where()) + tk = re.findall('(.*?)',r.text) + if len(tk) == 0: + return False + return decrypt(tk[0]) + +async def exchange(s, phone, title, aid,jsexec, ckvalue): + try: + url="https://wapact.189.cn:9001/gateway/stand/detailNew/exchange" + # getck = await asyncio.to_thread(jsexec.call, "getck") # 两种方式,一种用ck,一种用后缀 + # getck = getck.split(';')[0].split('=') + # ckvalue[getck[0]] = getck[1] + + # async with s.post(url, cookies=ckvalue, json={"activityId": aid}) as response: + + # 通过 retry_request 实现重试机制 + # response = await retry_request(s, 'POST', get_url, cookies=ckvalue, json={"activityId": aid}) + + get_url = await asyncio.to_thread(jsexec.call,"getUrl", "POST",url) + async with s.post(get_url, cookies=ckvalue, json={"activityId": aid}) as response: + pass + except Exception as e: + print(e) + + + +async def check(s,item,ckvalue): + checkGoods = s.get('https://wapact.189.cn:9001/gateway/stand/detailNew/check?activityId=' + item, cookies=ckvalue).json() + return checkGoods + +async def conversionRights(phone, aid,session): + value = { + "phone": phone, + "rightsId": aid + } + paraV=encrypt_para(value) + response = session.post('https://wapside.189.cn:9001/jt-sign/paradise/conversionRights',json={"para":paraV}) + login = response.json() + printn(f"{get_first_three(phone)},{str(datetime.datetime.now())[11:23]}:{login} ") + +async def getLevelRightsList(phone,session): + value = { + "phone": phone + } + paraV=encrypt_para(value) + requests = session.post('https://wapside.189.cn:9001/jt-sign/paradise/getLevelRightsList',json={"para":paraV}) + try: + data=requests.json() + if data.get('code') == 401: + print(f"获取失败:{data},原因大概是sign过期了") + return None + current_level = int(data['currentLevel']) + key_name = 'V' + str(current_level) + ids = [item['id'] for item in data.get(key_name, []) if item.get('name') == '话费'] + return ids + except: + print(f"获取失败,重试一次:{requests.text}") + paraV=encrypt_para(value) + requests = session.post('https://wapside.189.cn:9001/jt-sign/paradise/getLevelRightsList',json={"para":paraV}) + data=requests.json() + if data.get('code') == 401: + print(f"获取失败:{data},原因大概是sign过期了") + return None + current_level = int(data['currentLevel']) + key_name = 'V' + str(current_level) + ids = [item['id'] for item in data.get(key_name, []) if item.get('name') == '话费'] + return ids + + + +async def getSign(ticket,session): + try: + response_data = session.get('https://wapside.189.cn:9001/jt-sign/ssoHomLogin?ticket=' + ticket).json() + if response_data.get('resoultCode') == '0': + sign = response_data.get('sign') + return sign + else: + print(f"获取sign失败[{response_data.get('resoultCode')}]: {response_data}") + except Exception as e: + print(e) + return None + +async def login_request(ss,url,payload): + global ckvalue,js_codeRead + url = "https://wapact.189.cn:9001/unified/user/login" + headers = { + 'Accept': 'application/json, text/javascript, */*; q=0.01', + 'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Origin': 'https://wapact.189.cn:9001', + 'Pragma': 'no-cache', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-origin', + 'X-Requested-With': 'XMLHttpRequest', + 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Android WebView";v="126"', + 'sec-ch-ua-mobile': '?1', + 'sec-ch-ua-platform': '"Android"', + 'Content-Type': 'application/json;charset=UTF-8' + } + response = ss.post(url, headers=headers, data=json.dumps(payload)) + rsCK = re.findall('yiUIIlbdQT3fO=([^;]+)',response.headers['Set-Cookie'])[0] + # print(response.status_code) + if response.status_code == 412: + print("检测到瑞数特征码412,正在尝试调用js") + else: + print("未检测到瑞数.") + return response,None,rsCK + html=etree.HTML(response.text) + arg1=html.xpath('//meta/@content')[-1] + arg2=html.xpath('//script/text()')[0] + arg3=html.xpath('//meta/@id')[-1] + js_code = js_codeRead.replace("contentCODE", arg1).replace('"tsCODE"', arg2).replace('"tsID"',f'"{arg3}"') + + jsexec = execjs.compile(js_code) + ck=await asyncio.to_thread(jsexec.call, "getck") + get_url=await asyncio.to_thread(jsexec.call,"getUrl","POST",url) + def parse_cookies(ck): + cookies = {} + for part in ck.split(';'): + part = part.strip() + if '=' in part: + key, value = part.split('=', 1) + if 'path' not in key and 'expires' not in key and 'Secure' not in key and 'SameSite' not in key: + cookies[key] = value + return cookies + ck=parse_cookies(ck) + ck["yiUIIlbdQT3fO"] = rsCK + ckvalue=ck + res=ss.post(get_url, headers=headers,data=json.dumps(payload),cookies=ckvalue) + if res.status_code == 200: + print("瑞数返回状态码200,开始下一步.") + return res,jsexec,ckvalue + else: + print("瑞数破解失败,调用重试机制") + return res,jsexec,None + # return res,jsexec,ckvalue#ckvalue这里直接返回,没直接塞session里面. + +async def qgNight(phone, ticket, timeValue,isTrue): + if isTrue: + runTime = run_Time(23,59,3) + 0.65 + else: + runTime = run_Time(0,0,0) + 0.65 + + if runTime >(time.time()+timeValue): + difftime = runTime - time.time() - timeValue + printn(f"当前时间:{str(datetime.datetime.now())[11:23]},跟设定的时间不同,等待{difftime}秒开始兑换每天一次的") + await asyncio.sleep(difftime) + session = requests.Session() + session.mount('https://', DESAdapter()) + session.verify = False # 禁用证书验证 + sign =await getSign(ticket,session) + if sign: + print(f"当前时间:{str(datetime.datetime.now())[11:23]}获取到了Sign:"+sign) + session.headers={"User-Agent":"Mozilla/5.0 (Linux; Android 13; 22081212C Build/TKQ1.220829.002) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.97 Mobile Safari/537.36","sign":sign} + else: + print("未能获取sign。") + return + rightsId =await getLevelRightsList(phone,session); + if rightsId: + print("获取到了rightsId:"+rightsId[0]) + else: + print("未能获取rightsId。") + return + # await asyncio.sleep(10)直接延迟也行,或者用下面的等待一段时间。之所以这样是要先获取sign省一些步骤。 + if isTrue: + runTime2 = run_Time(23,59,58) + 0.65 + difftime = runTime2 - time.time() - timeValue + printn(f"等待下") + await asyncio.sleep(difftime) + printn(f"{str(datetime.datetime.now())[11:23]} 时间到开始兑换每天一次的") + await exchangeForDay(phone,session,run_num,rightsId[0],0.1) +async def qgDay(phone, ticket, timeValue, isTrue): + global runTime,js_codeRead + async with AsyncSessionManager() as s: + pass +async def main(timeValue,isTRUE,hour): + global runTime,js_codeRead + tasks = [] + with open("./瑞数通杀.js", "r", encoding="utf-8") as f: + js_codeRead = f.read() + phone_list = PHONES.split('&') + # diffValue=len(phone_list) + for phoneV in phone_list: + value = phoneV.split('#') + phone, password = value[0], value[1] + printn(f'{get_first_three(phone)}开始登录') + ticket = userLoginNormal(phone,password) + if ticket: + # hour=datetime.datetime.now().hour + # hour=23 + if hour > 15: + tasks.append(qgNight(phone, ticket, timeValue, isTRUE)) + # await asyncio.sleep(0.1) + else:#十点//十四点场次 + tasks.append(qgDay(phone, ticket, timeValue, isTRUE)) + # await asyncio.sleep(0.1) + else: + printn(f'{phone} 登录失败') + await asyncio.gather(*tasks) + +PHONES=os.environ.get('chinaTelecomAccount') + +if __name__ == "__main__": + global timeValue,timeDiff + h = datetime.datetime.now().hour + h=9 #手动设置场次的时间 + print("当前小时为: "+str(h)) + if 10 >h >0: + print("当前小时为: "+str(h)+"已过0点但未到10点开始准备抢十点场次") + wttime= run_Time(9,59,58) #抢十点场次 + elif 14 >= h >=10: + print("当前小时为: "+str(h) +"已过10点但未到14点开始准备抢十四点场次") + wttime= run_Time(13,59,58) #抢十四点场次 + else: + print("当前小时为: "+str(h)+"已过14点开始准备抢凌晨") + wttime= run_Time(23,59,58) #抢凌晨 +# isTRUE=FalseTrue + isTRUE=True + #isTRUE等于False则表示忽略所有限制直接运行。这个参数一般用于测试。实际生产一定要设置为True。 + if(wttime >time.time()) : + wTime=wttime-time.time() + print("未到时间,计算后差异:"+str(wTime)+"秒") + if isTRUE: + print("一定要先测试,根据自身 设定的重发和多号,不然会出问题,抢购过早或者过晚。") + print("开始等待:") + time.sleep(wTime) + timeValue = getApiTime("https://f.m.suning.com/api/ct.do") + timeDiff = timeValue if timeValue > 0 else 0 + asyncio.run(main(timeDiff, isTRUE,h)) + print("所有任务都已执行完毕!") + +