mirror of
https://github.com/daiyanan1992/qinglongtest
synced 2025-12-16 23:09:38 +08:00
416 lines
16 KiB
Python
416 lines
16 KiB
Python
#!/usr/bin/python3
|
|
# -- coding: utf-8 --
|
|
# -------------------------------
|
|
# @Author : github@limoruirui https://github.com/limoruirui
|
|
# @Time : 2022/9/12 16:10
|
|
# cron "1 9,12 * * *" script-path=xxx.py,tag=匹配cron用
|
|
# const $ = new Env('电信签到');
|
|
# -------------------------------
|
|
|
|
"""
|
|
1. 电信签到 不需要抓包 脚本仅供学习交流使用, 请在下载后24h内删除
|
|
2. cron说明 12点必须执行一次(用于兑换) 然后12点之外还需要执行一次(用于执行每日任务) 一天共两次 可直接使用默认cron
|
|
2. 环境变量说明:
|
|
必须 TELECOM_PHONE : 电信手机号
|
|
选填 TELECOM_PASSWORD : 电信服务密码 填写后会执行更多任务
|
|
选填 TELECOM_FOOD : 给宠物喂食次数 默认为0 不喂食 根据用户在网时长 每天可以喂食5-10次
|
|
3. 必须登录过 电信营业厅 app的账号才能正常运行
|
|
"""
|
|
"""
|
|
update:
|
|
2022.10.25 参考大佬 github@QGCliveDavis https://github.com/QGCliveDavis 的 loginAuthCipherAsymmertric 参数解密 新增app登录获取token 完成星播客系列任务 感谢大佬
|
|
2022.11.11 增加分享任务
|
|
"""
|
|
from datetime import date, datetime
|
|
from random import shuffle, randint, choices
|
|
from time import sleep, strftime
|
|
from re import findall
|
|
from requests import get, post
|
|
from base64 import b64encode
|
|
from tools.aes_encrypt import AES_Ctypt
|
|
from tools.rsa_encrypt import RSA_Encrypt
|
|
from tools.tool import timestamp, get_environ, print_now
|
|
from tools.send_msg import push
|
|
from login.telecom_login import TelecomLogin
|
|
from string import ascii_letters, digits
|
|
|
|
|
|
|
|
class ChinaTelecom:
|
|
def __init__(self, account, pwd, checkin=True):
|
|
self.phone = account
|
|
self.ticket = ""
|
|
self.token = ""
|
|
if pwd != "" and checkin:
|
|
userLoginInfo = TelecomLogin(account, pwd).main()
|
|
self.ticket = userLoginInfo[0]
|
|
self.token = userLoginInfo[1]
|
|
|
|
def init(self):
|
|
self.msg = ""
|
|
self.ua = f"CtClient;9.6.1;Android;12;SM-G9860;{b64encode(self.phone[5:11].encode()).decode().strip('=+')}!#!{b64encode(self.phone[0:5].encode()).decode().strip('=+')}"
|
|
self.headers = {
|
|
"Host": "wapside.189.cn:9001",
|
|
"Referer": "https://wapside.189.cn:9001/resources/dist/signInActivity.html",
|
|
"User-Agent": self.ua
|
|
}
|
|
self.key = "-----BEGIN PUBLIC KEY-----\nMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC+ugG5A8cZ3FqUKDwM57GM4io6\nJGcStivT8UdGt67PEOihLZTw3P7371+N47PrmsCpnTRzbTgcupKtUv8ImZalYk65\ndU8rjC/ridwhw9ffW2LBwvkEnDkkKKRi2liWIItDftJVBiWOh17o6gfbPoNrWORc\nAdcbpk2L+udld5kZNwIDAQAB\n-----END PUBLIC KEY-----"
|
|
|
|
def req(self, url, method, data=None):
|
|
re_try = 3
|
|
while re_try > 0:
|
|
try:
|
|
if method == "GET":
|
|
data = get(url, headers=self.headers).json()
|
|
return data
|
|
elif method.upper() == "POST":
|
|
data = post(url, headers=self.headers, json=data).json()
|
|
return data
|
|
else:
|
|
print_now("您当前使用的请求方式有误,请检查")
|
|
break
|
|
except:
|
|
re_try -= 1
|
|
sleep(5)
|
|
continue
|
|
|
|
# 长明文分段rsa加密
|
|
def telecom_encrypt(self, text):
|
|
if len(text) <= 32:
|
|
return RSA_Encrypt(self.key).encrypt(text)
|
|
else:
|
|
encrypt_text = ""
|
|
for i in range(int(len(text) / 32) + 1):
|
|
split_text = text[(32 * i):(32 * (i + 1))]
|
|
encrypt_text += RSA_Encrypt(self.key).encrypt(split_text)
|
|
return encrypt_text
|
|
@staticmethod
|
|
def geneRandomToken():
|
|
randomList = choices(ascii_letters + digits, k=129)
|
|
token = f"V1.0{''.join(x for x in randomList)}"
|
|
return token
|
|
# 签到
|
|
def chech_in(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/api/home/sign"
|
|
data = {
|
|
"encode": AES_Ctypt("34d7cb0bcdf07523").encrypt(
|
|
f'{{"phone":{self.phone},"date":{timestamp()},"signSource":"smlprgrm"}}')
|
|
}
|
|
print_now(self.req(url, "post", data))
|
|
|
|
# 获取任务列表
|
|
def get_task(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/paradise/getTask"
|
|
data = {
|
|
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
|
|
}
|
|
msg = self.req(url, "post", data)
|
|
# print_now(dumps(msg, indent=2, ensure_ascii=False))
|
|
if msg["resoultCode"] == "0":
|
|
self.task_list = msg["data"]
|
|
else:
|
|
print_now("获取任务列表失败")
|
|
print_now(msg)
|
|
return
|
|
|
|
# 做每日任务
|
|
def do_task(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/paradise/polymerize"
|
|
for task in self.task_list:
|
|
if "翻牌抽好礼" in task["title"] or "查看我的订单" in task["title"] or "查看我的云盘" in task["title"]:
|
|
print_now(f'{task["title"]}----{task["taskId"]}')
|
|
decrept_para = f'{{"phone":"{self.phone}","jobId":"{task["taskId"]}"}}'
|
|
data = {
|
|
"para": self.telecom_encrypt(decrept_para)
|
|
}
|
|
data = self.req(url, "POST", data)
|
|
if data["data"]["code"] == 0:
|
|
# print(data["resoultMsg"])
|
|
print_now(data)
|
|
else:
|
|
print_now(f'聚合任务完成失败,原因是{data["resoultMsg"]}')
|
|
|
|
# 给宠物喂食
|
|
def food(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/paradise/food"
|
|
data = {
|
|
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
|
|
}
|
|
res_data = self.req(url, "POST", data)
|
|
if res_data["resoultCode"] == "0":
|
|
print_now(res_data["resoultMsg"])
|
|
else:
|
|
print_now(f'聚合任务完成失败,原因是{res_data["resoultMsg"]}')
|
|
|
|
# 查询宠物等级
|
|
def get_level(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/paradise/getParadiseInfo"
|
|
body = {
|
|
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
|
|
}
|
|
data = self.req(url, "POST", body)
|
|
self.level = int(data["userInfo"]["paradiseDressup"]["level"])
|
|
if self.level < 5:
|
|
print_now("当前等级小于5级 不领取等级权益")
|
|
return
|
|
url = "https://wapside.189.cn:9001/jt-sign/paradise/getLevelRightsList"
|
|
right_list = self.req(url, "POST", body)[f"V{self.level}"]
|
|
for data in right_list:
|
|
# print(dumps(data, indent=2, ensure_ascii=0))
|
|
if "00金豆" in data["righstName"] or "话费" in data["righstName"]:
|
|
rightsId = data["id"]
|
|
self.level_ex(rightsId)
|
|
continue
|
|
# print(self.rightsId)
|
|
|
|
# 每月领取等级金豆
|
|
def level_ex(self, rightsId):
|
|
# self.get_level()
|
|
url = "https://wapside.189.cn:9001/jt-sign/paradise/conversionRights"
|
|
data = {
|
|
"para": self.telecom_encrypt(f'{{"phone":{self.phone},"rightsId":"{rightsId}"}},"receiveCount":1')
|
|
}
|
|
print_now(self.req(url, "POST", data))
|
|
|
|
# 查询连续签到天数
|
|
def query_signinfo(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/reward/activityMsg"
|
|
body = {
|
|
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
|
|
}
|
|
data = self.req(url, "post", body)
|
|
# print(dumps(data, indent=2, ensure_ascii=0))
|
|
recordNum = data["recordNum"]
|
|
if recordNum != 0:
|
|
return data["date"]["id"]
|
|
return ""
|
|
|
|
# 若连续签到为7天 则兑换
|
|
def convert_reward(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/reward/convertReward"
|
|
try:
|
|
rewardId = self.query_signinfo() # "baadc927c6ed4d8a95e28fa3fc68cb9"
|
|
except:
|
|
rewardId = "baadc927c6ed4d8a95e28fa3fc68cb9"
|
|
if rewardId == "":
|
|
return
|
|
body = {
|
|
"para": self.telecom_encrypt(
|
|
f'{{"phone":"{self.phone}","rewardId":"{rewardId}","month":"{date.today().__format__("%Y%m")}"}}')
|
|
}
|
|
for i in range(10):
|
|
try:
|
|
data = self.req(url, "post", body)
|
|
except Exception as e:
|
|
print(f"请求发送失败: " + str(e))
|
|
sleep(6)
|
|
continue
|
|
print_now(data)
|
|
if data["code"] == "0":
|
|
break
|
|
sleep(6)
|
|
reward_status = self.get_coin_info()
|
|
if reward_status:
|
|
self.msg += f"账号{self.phone}连续签到7天兑换1元话费成功\n"
|
|
print_now(self.msg)
|
|
push("电信签到兑换", self.msg)
|
|
else:
|
|
self.msg += f"账号{self.phone}连续签到7天兑换1元话费失败 明天会继续尝试兑换\n"
|
|
print_now(self.msg)
|
|
push("电信签到兑换", self.msg)
|
|
|
|
|
|
# 查询金豆数量
|
|
def coin_info(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/api/home/userCoinInfo"
|
|
data = {
|
|
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
|
|
}
|
|
self.coin_count = self.req(url, "post", data)
|
|
print_now(self.coin_count)
|
|
|
|
def author(self):
|
|
"""
|
|
通过usercode 获取 authorization
|
|
:return:
|
|
"""
|
|
self.get_usercode()
|
|
url = "https://xbk.189.cn/xbkapi/api/auth/userinfo/codeToken"
|
|
data = {
|
|
"usercode": self.usercode
|
|
}
|
|
data = post(url, headers=self.headers_live, json=data).json()
|
|
self.authorization = f"Bearer {data['data']['token']}"
|
|
self.headers_live["Authorization"] = self.authorization
|
|
def get_usercode(self):
|
|
"""
|
|
授权星播客登录获取 usercode
|
|
:return:
|
|
"""
|
|
url = f"https://xbk.189.cn/xbkapi/api/auth/jump?userID={self.ticket}&version=9.3.3&type=room&l=renwu"
|
|
self.headers_live = {
|
|
"User-Agent": self.ua,
|
|
"Host": "xbk.189.cn",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "zh-CN,zh-Hans;q=0.9"
|
|
}
|
|
location = get(url, headers=self.headers_live, allow_redirects=False).headers["location"]
|
|
usercode = findall(r"usercode=(.*?)&", location)[0]
|
|
self.usercode = usercode
|
|
def watch_video(self):
|
|
"""
|
|
看视频 一天可完成6次
|
|
:return:
|
|
"""
|
|
url = "https://xbk.189.cn/xbkapi/lteration/liveTask/index/watchVideo"
|
|
data = {
|
|
"articleId": 3453
|
|
}
|
|
data = post(url, headers=self.headers_live, json=data).json()
|
|
if data["code"] == 0:
|
|
print("看小视频15s完成一次")
|
|
else:
|
|
print(f"完成看小视频15s任务失败, 失败原因为{data['msg']}")
|
|
def like(self):
|
|
"""
|
|
点赞直播间 可完成5次
|
|
:return:
|
|
"""
|
|
url = "https://xbk.189.cn/xbkapi/lteration/room/like"
|
|
liveId_list = [1820, 2032, 2466, 2565, 1094, 2422, 1858, 2346]
|
|
shuffle(liveId_list)
|
|
for liveId in liveId_list[:5]:
|
|
data = {
|
|
"account": self.phone,
|
|
"liveId": liveId
|
|
}
|
|
try:
|
|
data = post(url, headers=self.headers_live, json=data).json()
|
|
if data["code"] == 8888:
|
|
sleep(2)
|
|
print(data["msg"])
|
|
else:
|
|
print(f"完成点赞直播间任务失败,原因是{data['msg']}")
|
|
except Exception:
|
|
print(Exception)
|
|
def watch_live(self):
|
|
# 首先初始化任务,等待15秒倒计时后再完成 可完成10次
|
|
url = "https://xbk.189.cn/xbkapi/lteration/liveTask/index/watchLiveInit"
|
|
live_id = randint(1000, 2700)
|
|
data = {
|
|
"period": 1,
|
|
"liveId": live_id
|
|
}
|
|
data = post(url, headers=self.headers_live, json=data).json()
|
|
if data["code"] == 0:
|
|
taskcode = data["data"]
|
|
url = "https://xbk.189.cn/xbkapi/lteration/liveTask/index/watchLive"
|
|
data = {
|
|
"key": taskcode,
|
|
"period": 1,
|
|
"liveId": live_id
|
|
}
|
|
print("正在等待15秒")
|
|
sleep(15)
|
|
data = post(url, headers=self.headers_live, json=data).json()
|
|
if data["code"] == 0:
|
|
print("完成1次观看直播任务")
|
|
else:
|
|
print(f"完成观看直播任务失败,原因是{data['msg']}")
|
|
else:
|
|
print(f"初始化观看直播任务失败,失败原因为{data['msg']}")
|
|
|
|
def get_userid(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/api/home/homeInfo"
|
|
body = {
|
|
"para": self.telecom_encrypt(f'{{"phone":"{self.phone}","signDate":"{datetime.now().__format__("%Y-%m")}"}}')
|
|
}
|
|
userid = post(url, json=body, headers=self.headers).json()["data"]["userInfo"]["userThirdId"]
|
|
return userid
|
|
def share(self):
|
|
"""
|
|
50的分享任务 token不做校检 有值即可 若登录成功了 使用自己的token 否则生成随机的token
|
|
:return:
|
|
"""
|
|
url = "https://appfuwu.189.cn:9021/query/sharingGetGold"
|
|
body = {
|
|
"headerInfos": {
|
|
"code": "sharingGetGold",
|
|
"timestamp": datetime.now().__format__("%Y%m%d%H%M%S"),
|
|
"broadAccount": "",
|
|
"broadToken": "",
|
|
"clientType": "#9.6.1#channel50#iPhone 14 Pro Max#",
|
|
"shopId": "20002",
|
|
"source": "110003",
|
|
"sourcePassword": "Sid98s",
|
|
"token": self.token if self.token != "" else self.geneRandomToken(),
|
|
"userLoginName": self.phone
|
|
},
|
|
"content": {
|
|
"attach": "test",
|
|
"fieldData": {
|
|
"shareSource": "3",
|
|
"userId": self.get_userid(),
|
|
"account": TelecomLogin.get_phoneNum(self.phone)
|
|
}
|
|
}
|
|
}
|
|
headers = {
|
|
"user-agent": "iPhone 14 Pro Max/9.6.1"
|
|
}
|
|
data = post(url, headers=headers, json=body).json()
|
|
print_now(data)
|
|
def main(self):
|
|
self.init()
|
|
self.chech_in()
|
|
self.get_task()
|
|
self.do_task()
|
|
if foods != 0:
|
|
for i in range(foods):
|
|
self.food()
|
|
# self.convert_reward()
|
|
if datetime.now().day == 1:
|
|
self.get_level()
|
|
self.share()
|
|
if self.ticket != "":
|
|
self.author()
|
|
for i in range(6):
|
|
self.watch_video()
|
|
sleep(15)
|
|
# self.like()
|
|
for i in range(10):
|
|
try:
|
|
self.watch_live()
|
|
except:
|
|
continue
|
|
self.coin_info()
|
|
self.msg += f"你账号{self.phone} 当前有金豆{self.coin_count['totalCoin']}"
|
|
push("电信app签到", self.msg)
|
|
def get_coin_info(self):
|
|
url = "https://wapside.189.cn:9001/jt-sign/api/getCoinInfo"
|
|
decrept_para = f'{{"phone":"{self.phone}","pageNo":0,"pageSize":10,type:"1"}}'
|
|
data = {
|
|
"para": self.telecom_encrypt(decrept_para)
|
|
}
|
|
data = self.req(url, "POST", data)
|
|
if "skuName" in data["data"]["biz"]["results"][0] and "连续签到" in data["data"]["biz"]["results"][0]["skuName"]:
|
|
return True
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
phone = get_environ("TELECOM_PHONE")
|
|
password = get_environ("TELECOM_PASSWORD")
|
|
foods = int(float(get_environ("TELECOM_FOOD", 0, False)))
|
|
if phone == "":
|
|
exit(0)
|
|
if password == "":
|
|
print_now("电信服务密码未提供 只执行部分任务")
|
|
if datetime.now().hour + (8 - int(strftime("%z")[2])) == 12:
|
|
telecom = ChinaTelecom(phone, password, False)
|
|
telecom.init()
|
|
telecom.convert_reward()
|
|
else:
|
|
telecom = ChinaTelecom(phone, password)
|
|
telecom.main()
|