mirror of
https://github.com/daiyanan1992/qinglongtest
synced 2025-12-22 17:54:35 +08:00
增加营业厅app登录
This commit is contained in:
134
login/telecom_login.py
Normal file
134
login/telecom_login.py
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
# -- coding: utf-8 --
|
||||||
|
# -------------------------------
|
||||||
|
# @Author : github@limoruirui https://github.com/limoruirui
|
||||||
|
# @Time : 2022/10/24 17:52
|
||||||
|
# -------------------------------
|
||||||
|
"""
|
||||||
|
营业厅登录获取token loginAuthCipherAsymmertric参数解密参考自 github@QGCliveDavis https://github.com/QGCliveDavis 感谢大佬
|
||||||
|
"""
|
||||||
|
from requests import post
|
||||||
|
from datetime import datetime
|
||||||
|
from xml.etree.ElementTree import XML
|
||||||
|
from uuid import uuid4
|
||||||
|
from sys import path
|
||||||
|
if "telecom_login" in __file__:
|
||||||
|
path.append("../tools")
|
||||||
|
from rsa_encrypt import RSA_Encrypt
|
||||||
|
from encrypt_symmetric import Crypt
|
||||||
|
from tool import print_now
|
||||||
|
else:
|
||||||
|
from tools.rsa_encrypt import RSA_Encrypt
|
||||||
|
from tools.tool import print_now
|
||||||
|
class TelecomLogin:
|
||||||
|
def __init__(self, account, pwd):
|
||||||
|
self.account = account
|
||||||
|
self.pwd = pwd
|
||||||
|
self.deviceUid = uuid4().hex
|
||||||
|
def login(self):
|
||||||
|
url = "https://appgologin.189.cn:9031/login/client/userLoginNormal"
|
||||||
|
timestamp = datetime.now().__format__("%Y%m%d%H%M%S")
|
||||||
|
key = "-----BEGIN PUBLIC KEY-----\nMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDBkLT15ThVgz6/NOl6s8GNPofd\nWzWbCkWnkaAm7O2LjkM1H7dMvzkiqdxU02jamGRHLX/ZNMCXHnPcW/sDhiFCBN18\nqFvy8g6VYb9QtroI09e176s+ZCtiv7hbin2cCTj99iUpnEloZm19lwHyo69u5UMi\nPMpq0/XKBO8lYhN/gwIDAQAB\n-----END PUBLIC KEY-----"
|
||||||
|
body = {
|
||||||
|
"headerInfos": {
|
||||||
|
"code": "userLoginNormal",
|
||||||
|
"timestamp": timestamp,
|
||||||
|
"broadAccount": "",
|
||||||
|
"broadToken": "",
|
||||||
|
"clientType": "#9.6.1#channel50#iPhone 14 Pro Max#",
|
||||||
|
"shopId": "20002",
|
||||||
|
"source": "110003",
|
||||||
|
"sourcePassword": "Sid98s",
|
||||||
|
"token": "",
|
||||||
|
"userLoginName": self.account
|
||||||
|
},
|
||||||
|
"content": {
|
||||||
|
"attach": "test",
|
||||||
|
"fieldData": {
|
||||||
|
"loginType": "4",
|
||||||
|
"accountType": "",
|
||||||
|
"loginAuthCipherAsymmertric": RSA_Encrypt(key).encrypt(f"iPhone 14 15.4.{self.deviceUid[:12]}{self.account}{timestamp}{self.pwd}0$$$0.", b64=True),
|
||||||
|
"deviceUid": self.deviceUid[:16],
|
||||||
|
"phoneNum": self.get_phoneNum(self.account),
|
||||||
|
"isChinatelecom": "0",
|
||||||
|
"systemVersion": "15.4.0",
|
||||||
|
"authentication": self.pwd
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
"user-agent": "iPhone 14 Pro Max/9.6.1",
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
data = post(url, headers=headers, json=body).json()
|
||||||
|
code = data["responseData"]["resultCode"]
|
||||||
|
if code != "0000":
|
||||||
|
print_now("登陆失败, 接口日志" + str(data))
|
||||||
|
return None
|
||||||
|
self.token = data["responseData"]["data"]["loginSuccessResult"]["token"]
|
||||||
|
self.userId = data["responseData"]["data"]["loginSuccessResult"]["userId"]
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get_ticket(self):
|
||||||
|
url = "https://appgologin.189.cn:9031/map/clientXML"
|
||||||
|
body = f"<Request>\n<HeaderInfos>\n <Code>getSingle</Code>\n <Timestamp>{datetime.now().__format__('%Y%m%d%H%M%S')}</Timestamp>\n <BroadAccount></BroadAccount>\n <BroadToken></BroadToken>\n <ClientType>#9.6.1#channel50#iPhone 14 Pro Max#</ClientType>\n <ShopId>20002</ShopId>\n <Source>110003</Source>\n <SourcePassword>Sid98s</SourcePassword>\n <Token>{self.token}</Token>\n <UserLoginName>{self.account}</UserLoginName>\n </HeaderInfos>\n <Content>\n <Attach>test</Attach>\n <FieldData>\n <TargetId>{self.encrypt_userid(self.userId)}</TargetId>\n <Url>4a6862274835b451</Url>\n </FieldData>\n </Content>\n</Request>"
|
||||||
|
headers = {
|
||||||
|
"User-Agent": "samsung SM-G9750/9.4.0",
|
||||||
|
"Content-Type": "text/xml; charset=utf-8",
|
||||||
|
"Content-Length": "694",
|
||||||
|
"Host": "appgologin.189.cn:9031",
|
||||||
|
"Connection": "Keep-Alive",
|
||||||
|
"Accept-Encoding": "gzip",
|
||||||
|
"Pragma": "no-cache",
|
||||||
|
"Cache-Control": "no-cache"
|
||||||
|
}
|
||||||
|
xml_data = post(url, headers=headers, data=body).text
|
||||||
|
doc = XML(xml_data)
|
||||||
|
secret_ticket = doc.find("ResponseData/Data/Ticket").text
|
||||||
|
# print("secret: " + secret_ticket)
|
||||||
|
ticket = self.decrypt_ticket(secret_ticket)
|
||||||
|
# print("ticket: " + ticket)
|
||||||
|
return ticket
|
||||||
|
def main(self):
|
||||||
|
if self.login() is None:
|
||||||
|
return "10086"
|
||||||
|
ticket = self.get_ticket()
|
||||||
|
return ticket
|
||||||
|
@staticmethod
|
||||||
|
def get_phoneNum(phone):
|
||||||
|
result = ""
|
||||||
|
for i in phone:
|
||||||
|
result += chr(ord(i) + 2)
|
||||||
|
return result
|
||||||
|
@staticmethod
|
||||||
|
def decrypt_ticket(secret_ticket):
|
||||||
|
key = "1234567`90koiuyhgtfrdewsaqaqsqde"
|
||||||
|
iv = "\0\0\0\0\0\0\0\0"
|
||||||
|
# ticket = des3_cbc_decrypt(key, bytes(TelecomLogin.process_text(secret_ticket)), iv)
|
||||||
|
ticket = Crypt("des3", key, iv, "CBC").decrypt(TelecomLogin.process_text(secret_ticket))
|
||||||
|
return ticket
|
||||||
|
@staticmethod
|
||||||
|
def encrypt_userid(userid):
|
||||||
|
key = "1234567`90koiuyhgtfrdewsaqaqsqde"
|
||||||
|
iv = bytes([0] * 8)
|
||||||
|
targetId = Crypt("des3", key, iv, "CBC").encrypt(userid)
|
||||||
|
return targetId
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def process_text(text):
|
||||||
|
length = len(text) >> 1
|
||||||
|
bArr = [0] * length
|
||||||
|
if len(text) % 2 == 0:
|
||||||
|
i2 = 0
|
||||||
|
i3 = 0
|
||||||
|
while i2 < length:
|
||||||
|
i4 = i3 + 1
|
||||||
|
indexOf = "0123456789abcdef0123456789ABCDEF".find(text[i3])
|
||||||
|
if indexOf != -1:
|
||||||
|
bArr[i2] = (((indexOf & 15) << 4) + ("0123456789abcdef0123456789ABCDEF".find(text[i4]) & 15))
|
||||||
|
i2 += 1
|
||||||
|
i3 = i4 + 1
|
||||||
|
else:
|
||||||
|
print("转化失败 大概率是明文输入错误")
|
||||||
|
return bArr
|
||||||
105
tools/encrypt_symmetric.py
Normal file
105
tools/encrypt_symmetric.py
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
# -- coding: utf-8 --
|
||||||
|
# -------------------------------
|
||||||
|
# @Author : github@limoruirui https://github.com/limoruirui
|
||||||
|
# @Time : 2022/10/24 22:09
|
||||||
|
# -------------------------------
|
||||||
|
# !/usr/bin/python3
|
||||||
|
# -- coding: utf-8 --
|
||||||
|
# -------------------------------
|
||||||
|
# @Author : github@limoruirui https://github.com/limoruirui
|
||||||
|
# @Time : 2022/8/22 18:13
|
||||||
|
# -------------------------------
|
||||||
|
"""
|
||||||
|
aes加密解密工具 目前仅支持ECB/CBC 块长度均为128位 padding只支持pkcs7/zero_padding(aes中没有pkcs5 能用的pkcs5其实是执行的pkcs7) 后续有需要再加
|
||||||
|
pycryptdemo限制 同一个aes加密对象不能即加密又解密 所以当加密和解密都需要执行时 需要重新new一个对象增加额外开销
|
||||||
|
-- A cipher object is stateful: once you have encrypted a message , you cannot encrypt (or decrypt) another message using the same object.
|
||||||
|
"""
|
||||||
|
from Crypto.Cipher import AES, DES, DES3
|
||||||
|
from binascii import b2a_hex, a2b_hex
|
||||||
|
from base64 import b64encode, b64decode
|
||||||
|
|
||||||
|
|
||||||
|
class Crypt:
|
||||||
|
def __init__(self, crypt_type: str, key, iv=None, mode="ECB"):
|
||||||
|
"""
|
||||||
|
|
||||||
|
:param crypt_type: 对称加密类型 支持AES, DES, DES3
|
||||||
|
:param key: 密钥 (aes可选 16/32(24位暂不支持 以后遇到有需要再补) des 固定为8 des3 24(暂不支持16 16应该也不会再使用了) 一般都为24 分为8长度的三组 进行三次des加密
|
||||||
|
:param iv: 偏移量
|
||||||
|
:param mode: 模式 CBC/ECB
|
||||||
|
"""
|
||||||
|
if crypt_type.upper() not in ["AES", "DES", "DES3"]:
|
||||||
|
raise Exception("加密类型错误, 请重新选择 AES/DES/DES3")
|
||||||
|
self.crypt_type = AES if crypt_type.upper() == "AES" else DES if crypt_type.upper() == "DES" else DES3
|
||||||
|
self.block_size = self.crypt_type.block_size
|
||||||
|
if self.crypt_type == DES:
|
||||||
|
self.key_size = self.crypt_type.key_size
|
||||||
|
elif self.crypt_type == DES3:
|
||||||
|
self.key_size = self.crypt_type.key_size[1]
|
||||||
|
else:
|
||||||
|
if len(key) <= 16:
|
||||||
|
self.key_size = self.crypt_type.key_size[0]
|
||||||
|
elif len(key) > 24:
|
||||||
|
self.key_size = self.crypt_type.key_size[2]
|
||||||
|
else:
|
||||||
|
self.key_size = self.crypt_type.key_size[1]
|
||||||
|
print("当前aes密钥的长度只填充到24 若需要32 请手动用 chr(0) 填充")
|
||||||
|
if len(key) > self.key_size:
|
||||||
|
key = key[:self.key_size]
|
||||||
|
else:
|
||||||
|
if len(key) % self.key_size != 0:
|
||||||
|
key = key + (self.key_size - len(key) % self.key_size) * chr(0)
|
||||||
|
self.key = key.encode("utf-8")
|
||||||
|
if mode == "ECB":
|
||||||
|
self.mode = self.crypt_type.MODE_ECB
|
||||||
|
elif mode == "CBC":
|
||||||
|
self.mode = self.crypt_type.MODE_CBC
|
||||||
|
else:
|
||||||
|
raise Exception("您选择的加密模式错误")
|
||||||
|
if iv is None:
|
||||||
|
self.cipher = self.crypt_type.new(self.key, self.mode)
|
||||||
|
else:
|
||||||
|
if isinstance(iv, str):
|
||||||
|
iv = iv[:self.block_size]
|
||||||
|
self.cipher = self.crypt_type.new(self.key, self.mode, iv.encode("utf-8"))
|
||||||
|
elif isinstance(iv, bytes):
|
||||||
|
iv = iv[:self.block_size]
|
||||||
|
self.cipher = self.crypt_type.new(self.key, self.mode, iv)
|
||||||
|
else:
|
||||||
|
raise Exception("偏移量不为字符串")
|
||||||
|
|
||||||
|
def encrypt(self, data, padding="pkcs7", b64=False):
|
||||||
|
"""
|
||||||
|
|
||||||
|
:param data: 目前暂不支持bytes 只支持string 有需求再补
|
||||||
|
:param padding: pkcs7/pkck5 zero
|
||||||
|
:param b64: 若需要得到base64的密文 则为True
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
pkcs7_padding = lambda s: s + (self.block_size - len(s.encode()) % self.block_size) * chr(
|
||||||
|
self.block_size - len(s.encode()) % self.block_size)
|
||||||
|
zero_padding = lambda s: s + (self.block_size - len(s) % self.block_size) * chr(0)
|
||||||
|
pad = pkcs7_padding if padding == "pkcs7" else zero_padding
|
||||||
|
data = self.cipher.encrypt(pad(data).encode("utf8"))
|
||||||
|
encrypt_data = b64encode(data) if b64 else b2a_hex(data) # 输出hex或者base64
|
||||||
|
return encrypt_data.decode('utf8')
|
||||||
|
|
||||||
|
def decrypt(self, data, b64=False):
|
||||||
|
"""
|
||||||
|
对称加密的解密
|
||||||
|
:param data: 支持bytes base64 hex list 未做填充 密文应该都是数据块的倍数 带有需求再补
|
||||||
|
:param b64: 若传入的data为base64格式 则为True
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if isinstance(data, list):
|
||||||
|
data = bytes(data)
|
||||||
|
if not isinstance(data, bytes):
|
||||||
|
data = b64decode(data) if b64 else a2b_hex(data)
|
||||||
|
decrypt_data = self.cipher.decrypt(data).decode()
|
||||||
|
# 去掉padding
|
||||||
|
# pkcs7_unpadding = lambda s: s.replace(s[-1], "")
|
||||||
|
# zero_unpadding = lambda s: s.replace(chr(0), "")
|
||||||
|
# unpadding = pkcs7_unpadding if padding=="pkcs7" else zero_unpadding
|
||||||
|
unpadding = lambda s: s.replace(s[-1], "")
|
||||||
|
return unpadding(decrypt_data)
|
||||||
Reference in New Issue
Block a user