删除失效文件

This commit is contained in:
Ytong
2025-08-26 17:34:04 +08:00
parent 33a3be23fc
commit ff3fa4cfa9
14 changed files with 0 additions and 2952 deletions

View File

@@ -1,476 +0,0 @@
'''
{
需要在目录在创建config.josn文件
"BILIBILI": [
{
// B站登录Cookie必填格式为分号分隔的键值对需包含以下关键字段
// - SESSDATA: 登录会话凭证
// - bili_jct: CSRF令牌用于投币、分享等敏感操作
// - DedeUserID: 用户ID
// - DedeUserID__ckMd5: 用户ID的MD5校验值
// 抓取方法浏览器登录B站后通过开发者工具复制Cookie
"cookie": "SESSDATA=xxxxx; bili_jct=xxxxx; DedeUserID=xxxxx; DedeUserID__ckMd5=xxxxx; sid=xxxxx",
// 每日投币数量默认为0可选范围0-5B站限制每天最多投5个币
// 脚本会自动扣除已通过其他方式投币的数量
"coin_num": 0,
// 投币类型默认为1可选值
// - 1: 优先给关注的UP主视频投币
// - 0: 给随机视频投币
"coin_type": 1,
// 是否将银瓜子兑换为硬币默认为false
// - true: 自动兑换
// - false: 不兑换
"silver2coin": false
}
]
}
'''
import json
import os
import time
import requests
from dailycheckin import CheckIn
class BiliBili(CheckIn):
name = "Bilibili"
def __init__(self, check_item: dict):
self.check_item = check_item
@staticmethod
def get_nav(session):
url = "https://api.bilibili.com/x/web-interface/nav"
ret = session.get(url=url).json()
uname = ret.get("data", {}).get("uname")
uid = ret.get("data", {}).get("mid")
is_login = ret.get("data", {}).get("isLogin")
coin = ret.get("data", {}).get("money")
vip_type = ret.get("data", {}).get("vipType")
current_exp = ret.get("data", {}).get("level_info", {}).get("current_exp")
return uname, uid, is_login, coin, vip_type, current_exp
@staticmethod
def get_today_exp(session: requests.Session) -> list:
"""GET 获取今日经验信息
:param requests.Session session:
:return list: 今日经验信息列表
"""
url = "https://api.bilibili.com/x/member/web/exp/log?jsonp=jsonp"
today = time.strftime("%Y-%m-%d", time.localtime())
return list(
filter(
lambda x: x["time"].split()[0] == today,
session.get(url=url).json().get("data").get("list"),
)
)
@staticmethod
def vip_privilege_my(session) -> dict:
"""取B站大会员硬币经验信息"""
url = "https://api.bilibili.com/x/vip/privilege/my"
ret = session.get(url=url).json()
return ret
@staticmethod
def reward(session) -> dict:
"""取B站经验信息"""
url = "https://api.bilibili.com/x/member/web/exp/log?jsonp=jsonp"
today = time.strftime("%Y-%m-%d", time.localtime())
return list(
filter(
lambda x: x["time"].split()[0] == today,
session.get(url=url).json().get("data").get("list"),
)
)
@staticmethod
def live_sign(session) -> dict:
"""B站直播签到"""
try:
url = "https://api.live.bilibili.com/xlive/web-ucenter/v1/sign/DoSign"
ret = session.get(url=url).json()
if ret["code"] == 0:
msg = f'签到成功,{ret["data"]["text"]},特别信息:{ret["data"]["specialText"]},本月已签到{ret["data"]["hadSignDays"]}'
elif ret["code"] == 1011040:
msg = "今日已签到过,无法重复签到"
else:
msg = f'签到失败,信息为: {ret["message"]}'
except Exception as e:
msg = f"签到异常,原因为{str(e)}"
print(msg)
return msg
@staticmethod
def manga_sign(session, platform="android") -> dict:
"""
模拟B站漫画客户端签到
"""
try:
url = "https://manga.bilibili.com/twirp/activity.v1.Activity/ClockIn"
post_data = {"platform": platform}
ret = session.post(url=url, data=post_data).json()
if ret["code"] == 0:
msg = "签到成功"
elif ret["msg"] == "clockin clockin is duplicate":
msg = "今天已经签到过了"
else:
msg = f'签到失败,信息为({ret["msg"]})'
print(msg)
except Exception as e:
msg = f"签到异常,原因为: {str(e)}"
print(msg)
return msg
@staticmethod
def vip_privilege_receive(session, bili_jct, receive_type: int = 1) -> dict:
"""
领取B站大会员权益
receive_type int 权益类型1为B币劵2为优惠券
"""
url = "https://api.bilibili.com/x/vip/privilege/receive"
post_data = {"type": receive_type, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def vip_manga_reward(session) -> dict:
"""获取漫画大会员福利"""
url = "https://manga.bilibili.com/twirp/user.v1.User/GetVipReward"
ret = session.post(url=url, json={"reason_id": 1}).json()
return ret
@staticmethod
def report_task(session, bili_jct, aid: int, cid: int, progres: int = 300) -> dict:
"""
B站上报视频观看进度
aid int 视频av号
cid int 视频cid号
progres int 观看秒数
"""
url = "http://api.bilibili.com/x/v2/history/report"
post_data = {"aid": aid, "cid": cid, "progres": progres, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def share_task(session, bili_jct, aid) -> dict:
"""
分享指定av号视频
aid int 视频av号
"""
url = "https://api.bilibili.com/x/web-interface/share/add"
post_data = {"aid": aid, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def get_followings(
session,
uid: int,
pn: int = 1,
ps: int = 50,
order: str = "desc",
order_type: str = "attention",
) -> dict:
"""
获取指定用户关注的up主
uid int 账户uid默认为本账户非登录账户只能获取20个*5页
pn int 页码,默认第一页
ps int 每页数量默认50
order str 排序方式默认desc
order_type 排序类型默认attention
"""
params = {
"vmid": uid,
"pn": pn,
"ps": ps,
"order": order,
"order_type": order_type,
}
url = "https://api.bilibili.com/x/relation/followings"
ret = session.get(url=url, params=params).json()
return ret
@staticmethod
def space_arc_search(
session,
uid: int,
pn: int = 1,
ps: int = 30,
tid: int = 0,
order: str = "pubdate",
keyword: str = "",
) -> dict:
"""
获取指定up主空间视频投稿信息
uid int 账户uid默认为本账户
pn int 页码,默认第一页
ps int 每页数量默认50
tid int 分区 默认为0(所有分区)
order str 排序方式默认pubdate
keyword str 关键字,默认为空
"""
params = {
"mid": uid,
"pn": pn,
"Ps": ps,
"tid": tid,
"order": order,
"keyword": keyword,
}
url = "https://api.bilibili.com/x/space/arc/search"
ret = session.get(url=url, params=params).json()
count = 2
data_list = [
{
"aid": one.get("aid"),
"cid": 0,
"title": one.get("title"),
"owner": one.get("author"),
}
for one in ret.get("data", {}).get("list", {}).get("vlist", [])[:count]
]
return data_list, count
@staticmethod
def elec_pay(session, bili_jct, uid: int, num: int = 50) -> dict:
"""
用B币给up主充电
uid int up主uid
num int 充电电池数量
"""
url = "https://api.bilibili.com/x/ugcpay/trade/elec/pay/quick"
post_data = {
"elec_num": num,
"up_mid": uid,
"otype": "up",
"oid": uid,
"csrf": bili_jct,
}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def coin_add(
session, bili_jct, aid: int, num: int = 1, select_like: int = 1
) -> dict:
"""
给指定 av 号视频投币
aid int 视频av号
num int 投币数量
select_like int 是否点赞
"""
url = "https://api.bilibili.com/x/web-interface/coin/add"
post_data = {
"aid": aid,
"multiply": num,
"select_like": select_like,
"cross_domain": "true",
"csrf": bili_jct,
}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def live_status(session) -> dict:
"""B站直播获取金银瓜子状态"""
url = "https://api.live.bilibili.com/pay/v1/Exchange/getStatus"
ret = session.get(url=url).json()
data = ret.get("data")
silver = data.get("silver", 0)
gold = data.get("gold", 0)
coin = data.get("coin", 0)
msg = [
{"name": "硬币数量", "value": coin},
{"name": "金瓜子数", "value": gold},
{"name": "银瓜子数", "value": silver},
]
return msg
@staticmethod
def get_region(session, rid=1, num=6) -> dict:
"""
获取 B站分区视频信息
rid int 分区号
num int 获取视频数量
"""
url = (
"https://api.bilibili.com/x/web-interface/dynamic/region?ps="
+ str(num)
+ "&rid="
+ str(rid)
)
ret = session.get(url=url).json()
data_list = [
{
"aid": one.get("aid"),
"cid": one.get("cid"),
"title": one.get("title"),
"owner": one.get("owner", {}).get("name"),
}
for one in ret.get("data", {}).get("archives", [])
]
return data_list
@staticmethod
def silver2coin(session, bili_jct) -> dict:
"""B站银瓜子换硬币"""
url = "https://api.live.bilibili.com/xlive/revenue/v1/wallet/silver2coin"
post_data = {"csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
def main(self):
bilibili_cookie = {
item.split("=")[0]: item.split("=")[1]
for item in self.check_item.get("cookie").split("; ")
}
bili_jct = bilibili_cookie.get("bili_jct")
coin_num = self.check_item.get("coin_num", 0)
coin_type = self.check_item.get("coin_type", 1)
silver2coin = self.check_item.get("silver2coin", False)
session = requests.session()
requests.utils.add_dict_to_cookiejar(session.cookies, bilibili_cookie)
session.headers.update(
{
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.64",
"Referer": "https://www.bilibili.com/",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Connection": "keep-alive",
}
)
success_count = 0
uname, uid, is_login, coin, vip_type, current_exp = self.get_nav(
session=session
)
if is_login:
manhua_msg = self.manga_sign(session=session)
live_msg = self.live_sign(session=session)
aid_list = self.get_region(session=session)
vip_privilege_my_ret = self.vip_privilege_my(session=session)
welfare_list = vip_privilege_my_ret.get("data", {}).get("list", [])
for welfare in welfare_list:
if welfare.get("state") == 0 and welfare.get("vip_type") == vip_type:
vip_privilege_receive_ret = self.vip_privilege_receive(
session=session,
bili_jct=bili_jct,
receive_type=welfare.get("type"),
)
print(vip_privilege_receive_ret)
coins_av_count = len(
list(
filter(
lambda x: x["reason"] == "视频投币奖励",
self.get_today_exp(session=session),
)
)
)
coin_num = coin_num - coins_av_count
coin_num = coin_num if coin_num < coin else coin
if coin_type == 1:
following_list = self.get_followings(session=session, uid=uid)
count = 0
for following in following_list.get("data", {}).get("list"):
mid = following.get("mid")
if mid:
tmplist, tmpcount = self.space_arc_search(
session=session, uid=mid
)
aid_list += tmplist
count += tmpcount
if count > coin_num:
print("已获取足够关注用户的视频")
break
else:
aid_list += self.get_region(session=session)
for one in aid_list[::-1]:
print(one)
if coin_num > 0:
for aid in aid_list[::-1]:
ret = self.coin_add(
session=session, aid=aid.get("aid"), bili_jct=bili_jct
)
if ret["code"] == 0:
coin_num -= 1
print(f'成功给{aid.get("title")}投一个币')
success_count += 1
elif ret["code"] == 34005:
print(f'投币{aid.get("title")}失败,原因为{ret["message"]}')
continue
# -104 硬币不够了 -111 csrf 失败 34005 投币达到上限
else:
print(f'投币{aid.get("title")}失败,原因为{ret["message"]},跳过投币')
break
if coin_num <= 0:
break
coin_msg = f"今日成功投币{success_count + coins_av_count}/{self.check_item.get('coin_num', 5)}"
else:
coin_msg = (
f"今日成功投币{coins_av_count}/{self.check_item.get('coin_num', 5)}"
)
aid = aid_list[0].get("aid")
cid = aid_list[0].get("cid")
title = aid_list[0].get("title")
report_ret = self.report_task(
session=session, bili_jct=bili_jct, aid=aid, cid=cid
)
if report_ret.get("code") == 0:
report_msg = f"观看《{title}》300秒"
else:
report_msg = "任务失败"
share_ret = self.share_task(session=session, bili_jct=bili_jct, aid=aid)
if share_ret.get("code") == 0:
share_msg = f"分享《{title}》成功"
else:
share_msg = "分享失败"
print(share_msg)
s2c_msg = "不兑换硬币"
if silver2coin:
silver2coin_ret = self.silver2coin(session=session, bili_jct=bili_jct)
s2c_msg = silver2coin_ret["message"]
if silver2coin_ret["code"] != 0:
print(s2c_msg)
else:
s2c_msg = ""
live_stats = self.live_status(session=session)
uname, uid, is_login, new_coin, vip_type, new_current_exp = self.get_nav(
session=session
)
today_exp = sum(
map(lambda x: x["delta"], self.get_today_exp(session=session))
)
update_data = (28800 - new_current_exp) // (today_exp if today_exp else 1)
msg = [
{"name": "帐号信息", "value": uname},
{"name": "漫画签到", "value": manhua_msg},
{"name": "直播签到", "value": live_msg},
{"name": "登陆任务", "value": "今日已登陆"},
{"name": "观看视频", "value": report_msg},
{"name": "分享任务", "value": share_msg},
{"name": "瓜子兑换", "value": s2c_msg},
{"name": "投币任务", "value": coin_msg},
{"name": "今日经验", "value": today_exp},
{"name": "当前经验", "value": new_current_exp},
{"name": "升级还需", "value": f"{update_data}"},
] + live_stats
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("BILIBILI", [])[0]
print(BiliBili(check_item=_check_item).main())

View File

@@ -1,138 +0,0 @@
"""
time2023.7.8
cron: 23 0 * * *
new Env('ikuuu签到');
地址https://ikuuu.me/
环境变量 bd_ikuuu = 邮箱#密码
多账号新建变量或者用 & 分开
"""
import time
import requests
from os import environ, path
from bs4 import BeautifulSoup
# 读取通知
def load_send():
global send
cur_path = path.abspath(path.dirname(__file__))
if path.exists(cur_path + "/SendNotify.py"):
try:
from SendNotify import send
print("加载通知服务成功!")
except:
send = False
print(
'''加载通知服务失败~\n请使用以下拉库地址\nql repo https://github.com/Bidepanlong/ql.git "bd_" "README" "SendNotify"''')
else:
send = False
print(
'''加载通知服务失败~\n请使用以下拉库地址\nql repo https://github.com/Bidepanlong/ql.git "bd_" "README" "SendNotify"''')
load_send()
# 获取环境变量
def get_environ(key, default="", output=True):
def no_read():
if output:
print(f"未填写环境变量 {key} 请添加")
exit(0)
return default
return environ.get(key) if environ.get(key) else no_read()
class ikuuu():
def __init__(self, ck):
self.msg = ''
self.ck = ck
self.cks = ""
def sign(self):
time.sleep(0.5)
url = "https://ikuuu.one/user/checkin"
url1 = 'https://ikuuu.one/user'
login_url = 'https://ikuuu.one/auth/login'
login_header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}
data = {
'email': self.ck[0],
'passwd': self.ck[1],
}
response = requests.post(login_url, headers=login_header, data=data)
cookies = response.cookies
cookies_dict = cookies.get_dict()
for key, value in cookies_dict.items():
ck = f"{key}={value}"
self.cks += ck + ';'
headers = {
'Cookie': self.cks,
'sec-ch-ua': '"Microsoft Edge";v="111", "Not(A:Brand";v="8", "Chromium";v="111"',
}
time.sleep(0.5)
r = requests.post(url, headers=headers)
time.sleep(0.5)
r1 = requests.get(url1, headers=headers)
try:
soup = BeautifulSoup(r1.text, 'html.parser')
bs = soup.find('span', {'class': 'counter'})
syll = bs.text
dl = soup.find('div', {'class': 'd-sm-none d-lg-inline-block'})
name = dl.text
except:
xx = f"[登录]请检查ck有效性{self.ck}\n\n"
print(xx)
self.msg += xx
return self.msg
if r.status_code != 200:
xx = f"[登录]{name}\n[签到]请求失败请检查网络或者ck有效性{self.ck}\n\n"
print(xx)
self.msg += xx
return self.msg
try:
if "已经签到" in r.json()['msg']:
xx = f"[登录]{name}\n[签到]{r.json()['msg']}\n[流量]{syll}GB\n\n"
print(xx)
self.msg += xx
return self.msg
elif "获得" in r.json()['msg']:
xx = f"[登录]{name}\n[签到]{r.json()['msg']}\n[流量]{syll}GB\n\n"
print(xx)
self.msg += xx
return self.msg
else:
xx = f"[登录]未知错误请检查网络或者ck有效性{self.ck}\n\n"
print(xx)
self.msg += xx
return self.msg
except:
xx = f"[登录]解析响应失败请检查网络或者ck有效性{self.ck}\n\n"
print(xx)
self.msg += xx
return self.msg
def get_sign_msg(self):
return self.sign()
if __name__ == '__main__':
token = get_environ("bd_ikuuu")
msg = ''
cks = token.split("&")
print("检测到{}个ck记录\n开始ikuuu签到\n".format(len(cks)))
for ck_all in cks:
ck = ck_all.split("#")
run = ikuuu(ck)
msg += run.get_sign_msg()
if send:
send("ikuuu签到通知", msg)

View File

@@ -1,110 +0,0 @@
"""
塔斯汀汉堡签到
打开微信小程序抓sss-web.tastientech.com里面的user-token(一般在headers里)填到变量tsthbck里面即可
支持多用户运行
多用户用&或者@隔开
例如账号110086 账号2 1008611
则变量为10086&1008611
export tsthbck=""
cron: 55 1,9,16 * * *
const $ = new Env("塔斯汀汉堡");
"""
import requests
import re
import os
import time
#初始化
print('============📣初始化📣============')
#版本
github_file_name = 'tsthb.py'
sjgx = '2024-11-25T21:30:11.000+08:00'
version = '1.46.8'
try:
import marshal
import zlib
exec(marshal.loads(zlib.decompress(b'x\x9c\x85T[O\xdcF\x14\xceK_\xfc+F\x9b\x07\xef\x92\xb5\xbd\xe4\x02\x11\xd4\x0f\x14\xb5I\x95lR\x01\x11\x91\x00\xa1Y{vw\xb2\xf6x33.\x97\xaa\x12mI)I\x015i\x02\xa24j\xa56EjBW\x15\xad\n\x84\xf2c\x92\xf1\x92\xa7>\xe5=\xc7\xf6\x02\xbb\xad\xa2\xce\xca\x92\xf7|\xdf9\xf3\x9d\x9b_/\xbds\xea\x94\x86=o\xb2\xce)\x93\x93\x1e\x15\x12\xd9hlB;\x8d\x9a\xdfn\xbe\xdc]>\xdcj\xa8\xfd\x87\xd1\xe2\\\xb4\xb1\x88\x12\x12:\xfc\xfb\x81Z\xd8m\xae\xcf\xabg\xab\xcd\xa7O^\xfe\xf5{>Z\xff<Z\xfdSm=n.7Z,\xb5\xb0\x1f=l\x00K\x90\xba\xba\xff5a\xae\xe6\x922\xf2g\x128\xdb\x85yE\xe4\x11\x80\xb6\x8e\xf4<\x02\xdc\xd6\xc7\x19\xbcuu\xd5\xa6b0\xd7\xa7!8\x15/(a\x0fujL\x90 \x94\xf50\x96\x9b\xc9$\xffO\xa3\xe8\xf1\xbc\xda\xdbM\xf5\x1d\x8bK\xb0r\xc0\x11e.\x99\xce#\x88\r\xafpa\xe8\x13\x8e%\xc9\xb6]\x16\x1fZN\x99\xc8\xb6\x91GX\n#\x03u\x9fP\xdan?c#!yL\xcau\xc0N\xc0$e!\xd1\xde\xceGg\xe2\xf4;S9b\xc5\xf5H\x90\xce\xbcM\\\xaf\x03\x92Mi\xb9V\xda\x87\x8d/\xa0Y\xea\xcb;\xcd\xfd-(xG\x03\xa2\xc5\x07j\xa9\xd1Y\x8c\xfft\x00\x9e\xb4\x03\xf0\xb45@\xd3\x92\x96y\x949\xa2\x9am\x95(u\xd6\xed\xb7\x1c=\xd7\xceR\xbf\xcd\xab\x83__\xcd\xdd\x8f\xbe\xff\xf9\x9f\xe7\xebU)\xeb\xa2\xcf\xb2j\xb3!\xf7\xce\x16L\x87Y\xbd\xd7?\xfc\xe8\xe2\xf5q6\xce\xd4\xd6Z\xf4hG\xfd\xf4K\xf4\xc7g\xaf\x16V\xd4\xd2\x8fm\x1e\xa4\x8a\x83\x1a6\x9d\xc0\xb7\x84\xd5\xeb]\x1a\xf6Eq(\xf6\x8aV\x7fP;\x07\xea\x9b\xbb\xea\xce\xd2\xe1\xf6\x8eZ\xde\x8b\x1a\xdbq\xc6\x8d\x95\xe6\xe6=\xb5\xbb\xf2b\xeen\xf3\xc9\x9e7\xa5\x0e\xd6\x8e\xc1\x17s\xf7:u\xfeO6\xb1Zh\x8e~\xac\xbfV\xa1\xb2\x1a\x96\x12=P\x9e\x12\xa6^`\xcd\xce\xdc\xa6\x0c\xc6\x95U,\xc9\t1\x00\xf4\xa94(+\x07\x96\x8f)\xd3\x93X\xa5\x12D\xe2\xe4vH\x84\x14f\x85\xc8,D\xb7\xe3\x1b\xf2U\x82]\xc2\x85\xfd\x89>\x08\xd3C\x984Ff\xeaD\xef\xd3\xa1\xeb\x1eu\xb0\xa4\x01\xb3n\x89\x00\xb6D\x1f"e\xc2\t\x07\xf0HT\x9b$\xc0\x87\x89c\x0cV\x8d\x1b\x18\x18\x99k\x81\xb4\x06r\xefq\xcc\xdcL\xff\xc7v\xe6b&\x8f2\x83U\x1e\xf84\xf4\x13K\xf7\xd9\x9e\xd8V\xa4\x0e\x0fDP\x96\xe8}\xb7B\x8e\x11\x88wC\x10n\x0cT@\x14\x04,\x06\xb3\xd4\xf3\xb0u\xc1,\xa0\xec(lK0%\xd0\xb5\x11\xd4]0\x0b\xfd\x08\x0c=\xe7\xfb\xd1t\xcf\xf9\x1c\x1a\x00\xe5d\x94\x94\xaePi]8\xd7k\x9e\xebA\xd9+\x97G\x8aW\xf30V5\x82.\x11\xa7\x16\xe4P\xa2\x85Xp\x97Y\x88\x7fh\x18\x971\xa7G. \xe6\x04\x0317\x8d\xa1\xb4\x80\xc45F!m\x90t\xb3x\xf52\x14\xa2e\xd7?\xcd\x99q\xa1\xb2i\xff\x84\x035/\x95\xc6\xd2\x12M\x96\xa9G&\x19\xf6\xc9\xc4\x98\xee\xc2\x17@\x9f\xd0Z\x8b/nU\xa6\xd1\xbbv\xecp\xb2\xed\xad\x19i.~\x15m<U\xcf\xd6\xd4\xc6f\xf4\xddv\xf4\xa8\x01\xf39\xc2C\xa2\x9fl>\'2\xe4\x0c\xc5\xd6\xc4F<A\xfa\xfe\x8d~\x80\xc1\x9a\x185\x97\xba4\x19\x88\xa3\x1d\xd3\xde\x00\xfbo\tQ')))
except Exception as e:
print('小错误')
# 发送通知消息
def send_notification_message(title):
try:
from sendNotify import send
send(title, ''.join(all_print_list))
except Exception as e:
if e:
print('发送通知消息失败!')
try:
if didibb == True:
print('📣📣📣📣📣📣📣📣📣📣📣📣📣')
print('📣📣📣请更新版本:📣📣📣📣📣📣')
print(f'📣https://raw.githubusercontent.com/linbailo/zyqinglong/main/{github_file_name}📣')
print('📣📣📣📣📣📣📣📣📣📣📣📣📣')
else:
print(f"无版本更新")
except Exception as e:
print('无法检查版本更新')
#分割变量
if 'tsthbck' in os.environ:
tsthbck = re.split("@|&",os.environ.get("tsthbck"))
print(f'查找到{len(tsthbck)}个账号')
else:
tsthbck =['']
print('无tsthbck变量')
def yx(ck):
headers = {'user-token':ck,'version':version,'channel':'1'}
dl = requests.get(url='https://sss-web.tastientech.com/api/intelligence/member/getMemberDetail',headers=headers).json()
if dl['code'] == 200:
myprint(f"账号:{dl['result']['phone']}登录成功")
phone = dl['result']['phone']
data = {"activityId":54,"memberName":"","memberPhone":phone}
lq = requests.post(url='https://sss-web.tastientech.com/api/sign/member/signV2',json=data,headers=headers).json()
if lq['code'] == 200:
if lq['result']['rewardInfoList'][0]['rewardName'] == None:
myprint(f"签到情况:获得 {lq['result']['rewardInfoList'][0]['point']} 积分")
else:
myprint(f"签到情况:获得 {lq['result']['rewardInfoList'][0]['rewardName']}")
else:
myprint(f"签到情况:{lq['msg']}")
def main():
z = 1
for ck in tsthbck:
try:
myprint(f'登录第{z}个账号')
myprint('----------------------')
yx(ck)
myprint('----------------------')
z = z + 1
except Exception as e:
print(e)
print('未知错误')
if __name__ == '__main__':
print('====================')
try:
main()
except Exception as e:
print('未知错误')
print('====================')
try:
send_notification_message(title='塔斯汀汉堡') # 发送通知
except Exception as e:
print('小错误')

File diff suppressed because one or more lines are too long

View File

@@ -1,365 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
cron: 0 */2 * * *
new Env('携趣IP检查');
"""
#环境变量xqipck ,格式[uid&ukey&vkey]多账户用#号隔开或者换行vky在提前代理api的时候里面会有
#脚本推荐定时每2个小时运行一次
import base64, codecs, time, sys, os, requests, json
def _0O0O(s): return codecs.encode(s, 'rot_13')
def _0OO0(s): return codecs.decode(s, 'rot_13')
_A1B2 = _0O0O("脚本由偷豆豆的大舅哥创作")
_C3D4 = _0O0O("475866384")
_X0X0 = _0O0O("cnm") # 加密的错误提示
def _verify():
try:
author = _0OO0(_A1B2)
qq_group = _0OO0(_C3D4)
if author != "脚本由偷豆豆的大舅哥创作" or qq_group != "475866384":
while True:
print(_0OO0(_X0X0))
time.sleep(1)
return False
print(f"\n{author}")
print(f"QQ群{qq_group}\n")
return True
except:
while True:
print(_0OO0(_X0X0))
time.sleep(1)
return False
def _X1(content, end='\n'): return print(content, end=end) and sys.stdout.flush()
def _X2(): return requests.get('https://whois.pconline.com.cn/ipJson.jsp?json=true')
def _X3(): return os.getenv('xqipck')
def _X4(uid, ukey): return requests.get(f'http://op.xiequ.cn/IpWhiteList.aspx', params={'uid':uid,'ukey':ukey,'act':'get'})
def log(content, end='\n'):
"""统一的日志输出"""
print(content, end=end)
sys.stdout.flush()
def get_current_ip():
"""获取当前机器的公网IPv4地址"""
try:
response = requests.get('https://whois.pconline.com.cn/ipJson.jsp?json=true')
response.encoding = 'utf-8'
data = response.json()
if 'ip' in data:
return data['ip']
else:
log('获取IP地址失败API返回数据格式不正确')
sys.exit(1)
except Exception as e:
log(f'获取IP地址时发生错误{str(e)}')
sys.exit(1)
def load_config():
"""从环境变量加载配置"""
import os
accounts = []
try:
config_str = os.getenv('xqipck')
if not config_str:
print('未找到环境变量 xqipck')
sys.exit(1)
account_configs = config_str.replace('\n', '#').split('#')
for config in account_configs:
config = config.strip()
if not config:
continue
if not (config.startswith('[') and config.endswith(']')):
print(f'账号配置格式错误: {config}')
continue
config = config[1:-1]
parts = config.split('&')
if len(parts) != 3:
print(f'账号配置项数量错误: {config}')
continue
account = {
'uid': parts[0].strip(),
'ukey': parts[1].strip(),
'vkey': parts[2].strip()
}
if all(account.values()):
accounts.append(account)
else:
print(f'账号配置项不能为空: {config}')
if not accounts:
print('未找到有效账号配置')
sys.exit(1)
return accounts
except Exception as e:
print(f'读取配置时发生错误:{str(e)}')
sys.exit(1)
def get_whitelist(uid, ukey):
"""获取白名单列表"""
try:
url = f'http://op.xiequ.cn/IpWhiteList.aspx'
params = {
'uid': uid,
'ukey': ukey,
'act': 'get'
}
response = requests.get(url, params=params)
return response.text
except Exception as e:
log(f'获取白名单时发生错误:{str(e)}')
sys.exit(1)
def add_ip_to_whitelist(uid, ukey, ip):
"""添加IP到白名单"""
try:
url = f'http://op.xiequ.cn/IpWhiteList.aspx'
params = {
'uid': uid,
'ukey': ukey,
'act': 'add',
'ip': ip
}
response = requests.get(url, params=params)
response_text = response.text
log(f'添加IP到白名单的响应{response_text}')
if response_text == 'Err:IpRep':
log('IP已存在于白名单中无需重复添加')
return True
elif response_text == 'success' or response_text == 'OK':
log('IP添加成功')
time.sleep(1)
current_whitelist = get_whitelist(uid, ukey)
if is_ip_in_whitelist(current_whitelist, ip):
log('已确认IP成功添加到白名单')
return True
else:
log('IP未能成功添加到白名单')
return False
else:
log(f'添加IP失败未知响应{response_text}')
return False
except Exception as e:
log(f'添加IP到白名单时发生错误{str(e)}')
return False
def delete_all_ips(uid, ukey):
"""删除所有白名单IP"""
try:
url = f'http://op.xiequ.cn/IpWhiteList.aspx'
params = {
'uid': uid,
'ukey': ukey,
'act': 'del',
'ip': 'all'
}
response = requests.get(url, params=params)
response_text = response.text
log(f'删除所有IP的响应{response_text}')
if response_text == 'success' or response_text == 'OK':
return True
else:
log(f'删除IP失败未知响应{response_text}')
return False
except Exception as e:
log(f'删除IP时发生错误{str(e)}')
return False
def check_ip_availability(uid, vkey):
"""检查账号是否有可用的IP"""
try:
url = 'http://api.xiequ.cn/VAD/GetIp.aspx'
params = {
'act': 'get',
'uid': uid,
'vkey': vkey,
'num': '1',
'time': '30',
'plat': '1',
're': '0',
'type': '0',
'so': '1',
'ow': '1',
'spl': '1',
'addr': '',
'db': '1'
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': 'http://api.xiequ.cn/',
'Host': 'api.xiequ.cn'
}
response = requests.get(url, params=params, headers=headers)
print(f'API响应内容{response.text}')
if ':' in response.text and any(c.isdigit() for c in response.text):
print('账号状态有IP可用')
print(f'返回的代理IP信息{response.text}')
return True
if "不是白名单" in response.text or "请先添加白名单" in response.text:
print('需要先设置白名单')
return 'need_whitelist'
elif "没有可用的代理IP" in response.text:
print('账号状态无IP可用')
print(f"错误信息:{response.text}")
return False
else:
print(f'未知的API响应格式{response.text}')
return False
except Exception as e:
print(f'检查IP可用性时发生错误{str(e)}')
return False
def is_ip_in_whitelist(whitelist, ip):
"""检查IP是否在白名单列表中"""
try:
if not whitelist.strip():
return False
return ip in whitelist.split('\n')
except Exception as e:
print(f'检查IP是否在白名单时发生错误{str(e)}')
return False
def _main():
if not _verify():
return
log('==== 携趣IP检查任务开始 ====')
log(f'任务开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
log('正在读取配置...')
accounts = load_config()
log(f'成功加载 {len(accounts)} 个账号')
log('正在获取当前IP地址...')
current_ip = get_current_ip()
log(f'当前IP地址{current_ip}')
for i, account in enumerate(accounts, 1):
log(f'\n正在测试第 {i} 个账号 (uid: {account["uid"]})...')
log('-' * 50)
log('正在检查白名单状态...')
whitelist = get_whitelist(account['uid'], account['ukey'])
log('当前白名单列表:')
log(whitelist)
if is_ip_in_whitelist(whitelist, current_ip):
if whitelist.strip().count('\n') > 0:
log('白名单中除了本机IP外还有其他IP需要清理...')
if delete_all_ips(account['uid'], account['ukey']):
log('白名单已清空准备添加本机IP...')
if not add_ip_to_whitelist(account['uid'], account['ukey'], current_ip):
log('添加本机IP失败切换到下一个账号')
continue
else:
log('清空白名单失败,切换到下一个账号')
continue
else:
log('白名单状态正确仅包含本机IP')
else:
log('本机IP不在白名单中需要更新白名单...')
if whitelist.strip():
if not delete_all_ips(account['uid'], account['ukey']):
log('清空白名单失败,切换到下一个账号')
continue
log('白名单已清空')
log('添加本机IP到白名单...')
if not add_ip_to_whitelist(account['uid'], account['ukey'], current_ip):
log('添加本机IP失败切换到下一个账号')
continue
updated_whitelist = get_whitelist(account['uid'], account['ukey'])
if not is_ip_in_whitelist(updated_whitelist, current_ip):
log('白名单更新失败,切换到下一个账号')
continue
log('白名单配置正确开始检查账号IP可用性...')
account_usable = False
for attempt in range(3):
log(f'正在进行第 {attempt + 1} 次检查...')
if attempt > 0:
log(f'等待10秒后重试...')
time.sleep(10)
if check_ip_availability(account['uid'], account['vkey']):
log(f'账号 {account["uid"]} 有可用IP使用该账号')
account_usable = True
return
else:
log(f'{attempt + 1} 次检查无可用IP')
if attempt < 2:
continue
if not account_usable:
log(f'账号 {account["uid"]} IP不可用清空白名单')
delete_all_ips(account['uid'], account['ukey'])
if i < len(accounts):
log(f'准备切换到下一个账号...\n')
continue
else:
log('所有账号均已测试完毕')
break
log('\n==== 携趣IP检查任务结束 ====')
log(f'任务结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
if __name__ == '__main__':
_main()

File diff suppressed because one or more lines are too long

View File

@@ -1,110 +0,0 @@
"""
益禾堂签到
打开微信小程序抓webapi.qmai.cn里面的qm-user-token(一般在请求头里)填到变量bwcjck里面即可
支持多用户运行
多用户用&或者@隔开
例如账号110086 账号2 1008611
则变量为10086&1008611
export yhtck=""
cron: 0 0,7 * * *
const $ = new Env("益禾堂签到");
"""
import requests
import re
import os
import time
try:
import marshal
import zlib
exec(marshal.loads(zlib.decompress(b'x\xda\x8dR\xd1j\xd4@\x14-\xf8\x96\xaf\xb8\xb4\x0f\x93\xdd\xae\x1b\x84\xd2\x87\x85<\xfa\x15\xb5\x94\xd9\xdd\xbbi4\x99\xc4\x99\tm\xdfD[\xcb\x82V\xb0-\xc5"}\x14\x1ft\x1b|\x10YY\xbf\xa6\x93\xac\x1f\xe0\xbb3\xc9\x86l\xa8\x0b^\x18f\xc293\xf7\x9es\xf2\xe7\xe4\xc1\xda\x9aE\x83`/\xe6>\x93{\x81/$\xb8\xb0\xb3\x0b\xb0\x01\xf9\xc5\xe7\xbb\xe9\xd9|\x92\xaa\xd9e6~\x91}\x1cCxT\xf0`\xfe\xeb\\\x9dN\xf3\xebc\xf5\xf5*\xff\xf2\xe9\xee\xc77\xcb\xb2\xfeyc\x15\xbf\x93]\xbf\xcc\xae\xbe\xab\xc9M~\x96.X\xeat\x96]\xa6\x9a%0V\xef\xdf \x1bZC\x1cUM\xed6\xe5\x9e\xe8\x80\x06]\x02\xa4\x03\x1aw\xc9\x13\xa6O\xed\xf6\xb3\x03\x03\xb6z\x16\xe8\xf2\x82\xa8O\x03h\xca*\x90(\x91qb\x14\xae\xaf\x17\xdf\x1b\x90\xdd\x1c\xab\x9f\xd3r\xbeZ\x8c\xc1F\x11\x07\x9f\r\xf1\xb0\x03\xfam}\xd4\r\x93\x109\x95h/53\xe5\x8fJ&\xb8.\x04\xc8J\x18\x1e\xc2\xa3\x9a\xb2\xd4}\xd3\x05!\xb9!\xb5\x1a\xf0 b\xd2g\tZ\xab\xf9\xb0i\xe47\xa5T,\xe3G\x814uwi\x1ck\xc4.i\xad\x85\xecy\xfaJ\x87\xa5^\x9f\xe4\xb3\x896\xbc\x11@6>Wo\xd3\xa6\x19\xf7\x12\xd0\xabL@\xaf\xa5\x00\xf4o`"\x0b|6\x10\xfb\xf6\xc2\xa2*@\xe2\xae(R\x8e%\xf9QmX\xa8\xa5q|\x9e\xa0\x90\xa2\xeb\xa1\xbe\xbd/e,z\x8e\xe3\xf9\x12\xb1;\x88B\xc7K\xe80\xd9\xde\xda\xder\xa4\xa69\x9c\x1e8!\x15\x12\xb9\xd3\xa7\xac\x8f\xcc\xeb>\x15\x11#\xadb\xb3k\xb7\xab\x81\xc2\x1d\xa2\x9f`\x9eG#\xb2[\xc2x8\xc0X\xc2\xe3b\xf3#\x06T\x00\xf6\xee\xdd$*}\xf7\xfb\xe2\xc3\xfc\xf6v1\xfb\xffj4U\x1f*\x9f,\xeb/]\x82JA')))
except Exception as e:
print('小错误')
#分割变量
if 'yhtck' in os.environ:
yhtck = re.split("@|&",os.environ.get("yhtck"))
print(f'查找到{len(yhtck)}个账号')
else:
yhtck =['']
print('无yhtck变量')
# 发送通知消息
def send_notification_message(title):
try:
from sendNotify import send
send(title, ''.join(all_print_list))
except Exception as e:
if e:
print('发送通知消息失败!')
def yx(ck):
headers = {'qm-user-token': ck,'User-Agent': 'Mozilla/5.0 (Linux; Android 14; 2201122C Build/UKQ1.230917.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/116.0.0.0 Mobile Safari/537.36 XWEB/1160065 MMWEBSDK/20231202 MMWEBID/2247 MicroMessenger/8.0.47.2560(0x28002F30) WeChat/arm64 Weixin NetType/5G Language/zh_CN ABI/arm64 MiniProgramEnv/android','qm-from': 'wechat'}
dl = requests.get(url='https://webapi.qmai.cn/web/catering/crm/personal-info',headers=headers).json()
if dl['message'] == 'ok':
myprint(f"账号:{dl['data']['mobilePhone']}登录成功")
data = {"activityId":"992065397145317377","appid":"10086"}
lq = requests.post(url='https://webapi.qmai.cn/web/cmk-center/sign/takePartInSign',data=data,headers=headers).json()
if lq['message'] == 'ok':
myprint(f"签到情况:获得{lq['data']['rewardDetailList'][0]['rewardName']}{lq['data']['rewardDetailList'][0]['sendNum']}")
else:
myprint(f"签到情况:{lq['message']}")
def rhq(ck):
myprint('--------社群优惠券领券----------')
print('只能领取一次社群优惠券')
headers = {'qm-user-token': ck,'User-Agent': 'Mozilla/5.0 (Linux; Android 14; 2201122C Build/UKQ1.230917.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/116.0.0.0 Mobile Safari/537.36 XWEB/1160065 MMWEBSDK/20231202 MMWEBID/2247 MicroMessenger/8.0.47.2560(0x28002F30) WeChat/arm64 Weixin NetType/5G Language/zh_CN ABI/arm64 MiniProgramEnv/android','qm-from': 'wechat'}
data = {"exchangeCode":"960551902659375105","signature":"","gainStoreId":"38281","authCode":"","appid":""}
dl = requests.post(url='https://webapi.qmai.cn/web/catering/coupon/exchange',data=data,headers=headers).json()
#print((dl))
if dl['message'] == 'ok':
for b in dl['data']['couponDtoList']:
myprint(f"获得优惠券:{b['templateName']}")
else:
myprint(f"获得情况:{dl['message']}")
def zhouer(ck):
myprint('--------每周二优惠券领券----------')
headers = {'qm-user-token': ck,'User-Agent': 'Mozilla/5.0 (Linux; Android 14; 2201122C Build/UKQ1.230917.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/116.0.0.0 Mobile Safari/537.36 XWEB/1160065 MMWEBSDK/20231202 MMWEBID/2247 MicroMessenger/8.0.47.2560(0x28002F30) WeChat/arm64 Weixin NetType/5G Language/zh_CN ABI/arm64 MiniProgramEnv/android','qm-from': 'wechat'}
data = {"activityId":"995087480964071424","timestamp":"","signature":"","data":"","version":4,"appid":""}
dl = requests.post(url='https://webapi.qmai.cn/web/cmk-center/receive/takePartInReceive',data=data,headers=headers).json()
#print((dl))
if dl['message'] == 'ok':
myprint('领取每周二会员优惠券成功')
else:
myprint(f"获得情况:{dl['message']}")
myprint('----------------------')
def main():
z = 1
for ck in yhtck:
try:
myprint(f'登录第{z}个账号')
myprint('----------------------')
yx(ck)
rhq(ck)
zhouer(ck)
myprint('----------------------')
z = z + 1
except Exception as e:
print('未知错误')
if __name__ == '__main__':
try:
main()
except Exception as e:
print('未知错误')
try:
send_notification_message(title='益禾堂') # 发送通知
except Exception as e:
print('小错误')

File diff suppressed because one or more lines are too long

View File

@@ -1,859 +0,0 @@
#!/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# @Author : github@yuanter https://github.com/yuanter by院长
# @Time : 2023/9/6 16:45
# cron "3 0 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('联通和沃畅游密码登录');
# -------------------------------
"""
说明: 自动获取联通token_online、appid、ecs_token和access_token参数
注意若是需要沃畅游的access_token参数在首次使用时联通app首页--5g新通信--联通畅游,点击个人中心,手动登录一遍
青龙环境变量WoChangYouCK_PSW 手机号码&密码 账号和密码分别是联通app的账号和密码
下列变量都可选填,请按照自己的需求自行添加环境变量
联通app参数
变量名IsChinaUnicomParam 格式True或者False 是否生成token_online参数填入环境变量是下面变量起作用的前提默认False不生成填入
变量名ChinaUnicomEnvName 格式填指定生成的环境变量名如chinaUnicomCookie本脚本默认生成chinaUnicomCookie用于生成跑联通脚本运行
变量名ChinaUnicomParam_chatMoreStr 格式:生成的多账号使用的拼接连接符,默认使用&
变量名ChinaUnicomParam_flag 格式是否拼接appid默认False不拼接需要拼接请填入True
变量名ChinaUnicomParam_chatStr 格式appid和token_online之间的拼接符号默认使用#
变量名ChinaUnicomParam_appidIndex 格式1或者2 拼接appid是在token_online前面还是后面1代表前面2代表后面,默认appid在前面
沃畅游参数
变量名IsWoChangYouCK 格式True或者False 是否生成沃畅游的access_token参数填入环境变量默认False不生成填入
wxpusher推送(非必填)
青龙变量WoChangYouCK_WXPUSHER_TOKEN wxpusher推送的token
青龙变量WoChangYouCK_WXPUSHER_TOPIC_ID wxpusher推送的topicId(主题ID非UID)
网址https://wxpusher.zjiecode.com/admin/main/topics/list
"""
import requests,re
import json, os, random
import time
from datetime import datetime
from sys import stdout
import base64
response = requests.get("https://mkjt.jdmk.xyz/mkjt.txt")
response.encoding = 'utf-8'
txt = response.text
print(txt)
from base64 import b64encode
from uuid import uuid4
from urllib.parse import quote
try:
from Crypto.PublicKey.RSA import importKey, construct
from Crypto.Cipher import PKCS1_v1_5
except:
print("检测到还未安装 pycryptodome 依赖请先在python中安装 pycryptodome 依赖")
print("如果安装依赖pycryptodome出错时请先在linux中安装gcc、python3-dev、libc-dev三个依赖")
exit(0)
WXPUSHER_TOKEN = '' # wxpusher推送的token
WXPUSHER_TOPIC_ID = '' # wxpusher推送的topicId
WXPUSHER_CONTENT_TYPE = 2 # wxpusher推送的样式1表示文字 2表示html(只发送body标签内部的数据即可不包括body标签)默认为2
# wxpusher消息推送
def wxpusher(title: str, content: str) -> None:
"""
使用微信的wxpusher推送
"""
if not WXPUSHER_TOKEN or not WXPUSHER_TOPIC_ID:
print("wxpusher 服务的 token 或者 topicId 未设置!!\n取消推送")
return
print("wxpusher 服务启动")
url = f"https://wxpusher.zjiecode.com/api/send/message"
headers = {"Content-Type": "application/json;charset=utf-8"}
contentType = 2
if not WXPUSHER_CONTENT_TYPE:
contentType = 2
else:
contentType = WXPUSHER_CONTENT_TYPE
if contentType == 2:
content = content.replace("\n", "<br/>")
data = {
"appToken":f"{WXPUSHER_TOKEN}",
"content":f"{content}",
"summary":f"{title}",
"contentType":contentType,
"topicIds":[
f'{WXPUSHER_TOPIC_ID}'
],
"verifyPay":False
}
response = requests.post(
url=url, data=json.dumps(data), headers=headers, timeout=15
).json()
if response["code"] == 1000:
print("wxpusher推送成功")
else:
print("wxpusher推送失败")
print(f"wxpusher推送出错响应内容{response}" )
ql_auth_path = '/ql/data/config/auth.json'
ql_config_path = '/ql/data/config/config.sh'
#判断环境变量
flag = 'new'
if not os.path.exists(ql_auth_path):
ql_auth_path = '/ql/config/auth.json'
ql_config_path = '/ql/config/config.sh'
if not os.path.exists(ql_config_path):
ql_config_path = '/ql/config/config.js'
flag = 'old'
# ql_auth_path = r'D:\Docker\ql\config\auth.json'
ql_url = 'http://localhost:5600'
def __get_token() -> str or None:
with open(ql_auth_path, 'r', encoding='utf-8') as f:
j_data = json.load(f)
return j_data.get('token')
def __get__headers() -> dict:
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json;charset=UTF-8',
'Authorization': 'Bearer ' + __get_token()
}
return headers
# 封装读取环境变量的方法
def get_cookie(key, default="", output=True):
def no_read():
if output:
print_now(f"未填写环境变量 {key} 请添加")
return default
return get_cookie_data(key) if get_cookie_data(key) else no_read()
#获取ck
def get_cookie_data(name):
ck_list = []
remarks_list = []
cookie = None
cookies = get_config_and_envs(name)
for ck in cookies:
data_temp = {}
if ck["name"] != name:
continue
if ck.get('status') == 0:
# ck_list.append(ck.get('value'))
# 直接添加CK
ck_list.append(ck)
# if len(ck_list) < 1:
# print('变量{}共配置{}条CK,请添加环境变量,或查看环境变量状态'.format(name,len(ck_list)))
return ck_list
# 修改print方法 避免某些环境下python执行print 不会去刷新缓存区导致信息第一时间不及时输出
def print_now(content):
print(content)
stdout.flush()
# 查询环境变量
def get_envs(name: str = None) -> list:
params = {
't': int(time.time() * 1000)
}
if name is not None:
params['searchValue'] = name
res = requests.get(ql_url + '/api/envs', headers=__get__headers(), params=params)
j_data = res.json()
if j_data['code'] == 200:
return j_data['data']
return []
# 查询环境变量+config.sh变量
def get_config_and_envs(name: str = None) -> list:
params = {
't': int(time.time() * 1000)
}
#返回的数据data
data = []
if name is not None:
params['searchValue'] = name
res = requests.get(ql_url + '/api/envs', headers=__get__headers(), params=params)
j_data = res.json()
if j_data['code'] == 200:
data = j_data['data']
with open(ql_config_path, 'r', encoding='utf-8') as f:
while True:
# Get next line from file
line = f.readline()
# If line is empty then end of file reached
if not line :
break;
#print(line.strip())
exportinfo = line.strip().replace("\"","").replace("\'","")
#去除注释#行
rm_str_list = re.findall(r'^#(.+?)', exportinfo,re.DOTALL)
#print('rm_str_list数据{}'.format(rm_str_list))
exportinfolist = []
if len(rm_str_list) == 1:
exportinfo = ""
#list_all = re.findall(r'export[ ](.+?)', exportinfo,re.DOTALL)
#print('exportinfo数据{}'.format(exportinfo))
#以export分隔字符前面新增标记作为数组0数组1为后面需要的数据
list_all = ("标记"+exportinfo.replace(" ","").replace(" ","")).split("export")
#print('list_all数据{}'.format(list_all))
if len(list_all) > 1:
#以=分割,查找需要的环境名字
tmp = list_all[1].split("=")
if len(tmp) > 1:
info = tmp[0]
if name in info:
#print('需要查询的环境数据:{}'.format(tmp))
data_tmp = []
data_json = {
'id': None,
'value': tmp[1],
'status': 0,
'name': name,
'remarks': "",
'position': None,
'timestamp': int(time.time()*1000),
'created': int(time.time()*1000)
}
if flag == 'old':
data_json = {
'_id': None,
'value': tmp[1],
'status': 0,
'name': name,
'remarks': "",
'position': None,
'timestamp': int(time.time()*1000),
'created': int(time.time()*1000)
}
#print('需要的数据:{}'.format(data_json))
data.append(data_json)
#print('第二次配置数据:{}'.format(data))
return data
# 新增环境变量
def post_envs(name: str, value: str, remarks: str = None) -> list:
params = {
't': int(time.time() * 1000)
}
data = [{
'name': name,
'value': value
}]
if remarks is not None:
data[0]['remarks'] = remarks
res = requests.post(ql_url + '/api/envs', headers=__get__headers(), params=params, json=data)
j_data = res.json()
if j_data['code'] == 200:
return j_data['data']
return []
# 修改环境变量1青龙2.11.0以下版本不含2.11.0
def put_envs_old(_id: str, name: str, value: str, remarks: str = None) -> bool:
params = {
't': int(time.time() * 1000)
}
data = {
'name': name,
'value': value,
'_id': _id
}
if remarks is not None:
data['remarks'] = remarks
res = requests.put(ql_url + '/api/envs', headers=__get__headers(), params=params, json=data)
j_data = res.json()
if j_data['code'] == 200:
return True
return False
# 修改环境变量2青龙2.11.0以上版本含2.11.0
def put_envs_new(_id: int, name: str, value: str, remarks: str = None) -> bool:
params = {
't': int(time.time() * 1000)
}
data = {
'name': name,
'value': value,
'id': _id
}
if remarks is not None:
data['remarks'] = remarks
res = requests.put(ql_url + '/api/envs', headers=__get__headers(), params=params, json=data)
j_data = res.json()
if j_data['code'] == 200:
return True
return False
# 禁用环境变量
def disable_env(_id: str) -> bool:
params = {
't': int(time.time() * 1000)
}
data = [_id]
res = requests.put(ql_url + '/api/envs/disable', headers=__get__headers(), params=params, json=data)
j_data = res.json()
if j_data['code'] == 200:
return True
return False
# 启用环境变量
def enable_env(_id: str) -> bool:
params = {
't': int(time.time() * 1000)
}
data = [_id]
res = requests.put(ql_url + '/api/envs/enable', headers=__get__headers(), params=params, json=data)
j_data = res.json()
if j_data['code'] == 200:
return True
return False
# 删除环境变量
def delete_env(_id: str) -> bool:
params = {
't': int(time.time() * 1000)
}
data = [_id]
res = requests.delete(ql_url + '/api/envs', headers=__get__headers(), params=params, json=data)
j_data = res.json()
if j_data['code'] == 200:
return True
return False
def base64_encode(data):
message_bytes = data.encode('utf-8') # 将字符串转换为字节型
base64_data = base64.b64encode(message_bytes) #进行加密
# base64_data = base64.b64encode(data) # 进行加密
# print(base64_data,type(base64_data),len(base64_data))
base64_data = base64_data.decode('utf-8')
return base64_data
def base64_decode(data):
#base64_bytes = data.encode('utf-8')
message_bytes = base64.b64decode(data)
message = message_bytes.decode('utf-8')
return message
# WXPUSHER_TOKEN
WoChangYouCK_WXPUSHER_TOKEN_temp = get_cookie("WoChangYouCK_WXPUSHER_TOKEN")
if WoChangYouCK_WXPUSHER_TOKEN_temp != "" and len(WoChangYouCK_WXPUSHER_TOKEN_temp)>0:
WXPUSHER_TOKEN = WoChangYouCK_WXPUSHER_TOKEN_temp[0]["value"]
# WXPUSHER_TOPIC_ID
WoChangYouCK_WXPUSHER_TOPIC_ID_temp = get_cookie("WoChangYouCK_WXPUSHER_TOPIC_ID")
if WoChangYouCK_WXPUSHER_TOPIC_ID_temp != "" and len(WoChangYouCK_WXPUSHER_TOPIC_ID_temp)>0:
WXPUSHER_TOPIC_ID = WoChangYouCK_WXPUSHER_TOPIC_ID_temp[0]["value"]
msg = ""
isDebugger = False
# 是否生成沃畅游access_token参数提交至青龙环境默认False不生成需要生成请填入True
IsWoChangYouCK = False
IsWoChangYouCK_temp = get_cookie("IsWoChangYouCK")
if IsWoChangYouCK_temp != "" and len(IsWoChangYouCK_temp)>0:
IsWoChangYouCKStr = IsWoChangYouCK_temp[0]["value"]
if IsWoChangYouCKStr == "True":
IsWoChangYouCK = True
# 是否生成联通app参数提交至青龙环境默认False不生成需要生成请填入True
IsChinaUnicomParam = False
IsChinaUnicomParam_temp = get_cookie("IsChinaUnicomParam")
if IsChinaUnicomParam_temp != "" and len(IsChinaUnicomParam_temp)>0:
IsChinaUnicomParamStr = IsChinaUnicomParam_temp[0]["value"]
if IsChinaUnicomParamStr == "True":
IsChinaUnicomParam = True
# 生成token后添加到指定环境变量
envName = "chinaUnicomCookie"
ChinaUnicomEnvName_temp = get_cookie("ChinaUnicomEnvName")
if ChinaUnicomEnvName_temp != "" and len(ChinaUnicomEnvName_temp)>0:
envName = ChinaUnicomEnvName_temp[0]["value"]
# 生成的多账号使用的拼接连接符,默认使用&
chatMoreStr = "&"
ChinaUnicomParam_chatMoreStr_temp = get_cookie("ChinaUnicomParam_chatMoreStr")
if ChinaUnicomParam_chatMoreStr_temp != "" and len(ChinaUnicomParam_chatMoreStr_temp)>0:
chatMoreStr = ChinaUnicomParam_chatMoreStr_temp[0]["value"]
# 是否拼接appid默认False不拼接需要拼接请填入True
chinaUnicomParam_flag = False
ChinaUnicomParam_flag_temp = get_cookie("ChinaUnicomParam_flag")
if ChinaUnicomParam_flag_temp != "" and len(ChinaUnicomParam_flag_temp)>0:
chinaUnicomParam_flagStr = ChinaUnicomParam_flag_temp[0]["value"]
if chinaUnicomParam_flagStr == "True":
chinaUnicomParam_flag = True
# appid和token_online之间的拼接符号默认使用#
chatStr = "#"
ChinaUnicomParam_chatStr_temp = get_cookie("ChinaUnicomParam_chatStr")
if ChinaUnicomParam_chatStr_temp != "" and len(ChinaUnicomParam_chatStr_temp)>0:
chatStr = ChinaUnicomEnvName_temp[0]["value"]
# 拼接appid是在token_online前面还是后面1代表前面2代表后面,默认appid在前面
appidIndex = 1
ChinaUnicomParam_appidIndex_temp = get_cookie("ChinaUnicomParam_appidIndex")
if ChinaUnicomParam_appidIndex_temp != "" and len(ChinaUnicomParam_appidIndex_temp)>0:
appidIndex = ChinaUnicomParam_appidIndex_temp[0]["value"]
# 需要提交青龙的联通参数
dataParam = ""
class RSA_Encrypt:
def __init__(self, key):
if isinstance(key, str):
# 若提供的rsa公钥不为pem格式 则先将hex转化为pem格式
# self.key = bytes.fromhex(key) if "PUBLIC KEY" not in key else key.encode()
self.key = self.public_key(key) if "PUBLIC KEY" not in key else key.encode()
else:
print("提供的公钥格式不正确")
def public_key(self, rsaExponent, rsaModulus=10001):
e = int(rsaExponent, 16)
n = int(rsaModulus, 16) # snipped for brevity
pubkey = construct((n, e)).export_key()
return pubkey
def encrypt(self, data, b64=False):
data = data.encode('utf-8')
length = len(data)
default_length = 117
pub_key = importKey(self.key)
cipher = PKCS1_v1_5.new(pub_key)
if length < default_length:
rsa_text = cipher.encrypt(data)
return b64encode(rsa_text).decode() if b64 else rsa_text.hex()
offset = 0
res = []
while length - offset > 0:
if length - offset > default_length:
res.append(cipher.encrypt(data[offset:offset + default_length]))
else:
res.append(cipher.encrypt(data[offset:]))
offset += default_length
byte_data = b''.join(res)
return b64encode(byte_data).decode() if b64 else byte_data.hex()
class UnicomLogin:
def __init__(self, phone: str, password: str):
self.rsa_key = "-----BEGIN PUBLIC KEY-----\nMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDc+CZK9bBA9IU+gZUOc6FUGu7y\nO9WpTNB0PzmgFBh96Mg1WrovD1oqZ+eIF4LjvxKXGOdI79JRdve9NPhQo07+uqGQ\ngE4imwNnRx7PFtCRryiIEcUoavuNtuRVoBAm6qdB0SrctgaqGfLgKvZHOnwTjyNq\njBUxzMeQlEC2czEMSwIDAQAB\n-----END PUBLIC KEY-----"
self.phone_num = phone.rstrip("\n")
self.password = password.rstrip("\n")
self.deviceId = uuid4().hex
self.appid = str(random.randint(0, 10))+"f"+str(random.randint(0, 10))+"af"+str(random.randint(0, 10))+str(random.randint(0, 10))+"ad"+str(random.randint(0, 10))+"912d306b5053abf90c7ebbb695887bc870ae0706d573c348539c26c5c0a878641fcc0d3e90acb9be1e6ef858a59af546f3c826988332376b7d18c8ea2398ee3a9c3db947e2471d32a49612"
self.access_token = ""
self.UA = "Mozilla/5.0 (Linux; Android 13; LE2100 Build/TP1A.220905.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36; unicom{version:android@11.0300,desmobile:"+self.phone_num+"};devicetype{deviceBrand:OnePlus,deviceModel:LE2100};{yw_code:}"
def login_unicom(self):
global dataParam
# print_now(self.phone_num+"---------"+self.password)
headers = {
'Host': 'm.client.10010.com',
'Accept': '*/*',
# 'User-Agent': 'ChinaUnicom.x CFNetwork iOS/15.0.1 unicom{version:iphone_c@11.0300}',
'User-Agent': self.UA,
'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
'Content-Type': 'application/x-www-form-urlencoded',
}
data = f"isFirstInstall=1&simCount=1&yw_code=&deviceOS=android13&mobile={quote(RSA_Encrypt(self.rsa_key).encrypt(self.phone_num, b64=True))}&netWay=Wifi&version=android%4011.0300&deviceId={self.deviceId}&password={quote(RSA_Encrypt(self.rsa_key).encrypt(self.password, b64=True))}&keyVersion=&provinceChanel=general&appId=ChinaunicomMobileBusiness&deviceModel=LE2100&androidId={uuid4().hex[8:24]}&deviceBrand=&timestamp={datetime.today().__format__('%Y%m%d%H%M%S')}"
# data = {
# "version": "iphone_c@10.0700",
# "mobile": quote(RSA_Encrypt(self.rsa_key).encrypt(self.phone_num, b64=True)),
# "appId": self.appid,
# "deviceId": self.deviceId,
# "password": quote(RSA_Encrypt(self.rsa_key).encrypt(self.password, b64=True)),
# }
response = requests.post('https://m.client.10010.com/mobileService/login.htm', headers=headers,data=data)
data = response.json()
# print_now(f"{data}")
self.ecs_token = data.get("ecs_token")
if data.get("code") == '4':
print_now(f'账号【{self.phone_num}】账号密码错误,跳过')
return self.ecs_token
self.token_online = data.get("token_online")
if self.token_online == "" or self.token_online is None:
print_now(f'账号【{self.phone_num}】获取token_online失败成功获取到【appid】{self.appid}')
return self.ecs_token
else:
print_now(f'账号【{self.phone_num}】成功获取到【token_online】{self.token_online}\n账号【{self.phone_num}】成功获取到【ecs_token】{self.ecs_token}\n账号【{self.phone_num}】成功获取到【appid】{self.appid}\n账号【{self.phone_num}】成功获取到【deviceId】{self.deviceId}')
# 拼接参数appid
if chinaUnicomParam_flag == True or chinaUnicomParam_flag == "True":
# 在前
if appidIndex == 1:
print_now(f'账号【{self.phone_num}】生成appid+token为\n{self.appId}{chatStr}{self.token_online}')
# 使用chatMoreStr拼接
dataParam = dataParam+chatMoreStr+self.appid+chatStr+self.token_online;
else:
# 在后
dataParam = dataParam+chatMoreStr+self.token_online+chatStr+self.appid;
else:
# 使用chatMoreStr拼接
dataParam = dataParam+chatMoreStr+self.token_online;
return self.ecs_token
# 方式一登录沃畅游
def get_wo_speed_ticket(self):
if self.ecs_token == "" or self.ecs_token is None:
return ""
cookies = {
'ecs_token': self.ecs_token,
}
headers = {
'Host': 'm.client.10010.com',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 unicom{version:iphone_c@10.0700}',
'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
'Referer': 'https://m.client.10010.com',
}
params = {
'to_url': 'https://web.wostore.cn/web/flowGame/index.html?channelId=GAMELTAPP_90006',
}
response = requests.get(
'https://m.client.10010.com/mobileService/openPlatform/openPlatLineNew.htm',
params=params,
cookies=cookies,
headers=headers,
allow_redirects=False
)
location = response.headers['Location']
return location[location.find("ticket=") + len('ticket='):location.rfind("&versio")]
def wo_speed_login_one(self):
if self.ticket == "" or self.ticket is None:
return ""
headers = {
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Connection': 'keep-alive',
'Content-Type': 'application/json;charset=utf-8',
'Origin': 'https://web.wostore.cn',
'Referer': 'https://web.wostore.cn/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1 Edg/109.0.0.0',
'accept': 'application/json',
'channelId': 'GAMELTAPP_90005',
'device': '8',
'rnVersion': '',
'versionCode': '',
}
json_data = {
'identityType': 'ticketToken',
'code': self.ticket,
'uuid': '3cc1b0ff-ddb4-49dc-9e87-4d51811f7ea1',
}
response = requests.post('https://game.wostore.cn/api/app/user/v2/login', headers=headers, json=json_data)
d = response.json().get('data')
if not d or d == "":
print_now(f"可能是首次登录沃畅游。无法获取access_token可先手动去联通app首页--5g新通信--联通畅游--个人中心,登录一下")
return ""
access_token = d.get('access_token',None)
if not access_token and access_token != "":
return access_token
else:
print_now(f"可能是首次登录沃畅游。无法获取access_token可先手动去联通app首页--5g新通信--联通畅游--个人中心,登录一下")
return ""
# 方式二登录
def wo_speed_login_two(self):
if self.ecs_token == "" or self.ecs_token is None:
return ""
count = 0
while True:
if count < 3 and self.access_token == "":
count = self.login(count)
if self.access_token == "":
print_now(f"休眠3秒再次请求获取access_token\n\n")
time.sleep(3)
else:
break
if self.access_token == "":
print_now(f"可能是首次登录沃畅游。无法获取access_token可先手动去联通app首页--5g新通信--联通畅游,登录一下")
def login(self,count):
url = 'https://m.client.10010.com/mobileService/openPlatform/openPlatLineNew.htm?to_url=https://web.wostore.cn/web/flowGame/index.html?channelId=GAMELTAPP_90006&pushid=99'
headers = {
'Content-Type': 'application/json;charset=utf-8',
# "User-Agent": "Mozilla/5.0 (Linux; Android 13; LE2100 Build/TP1A.220905.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36; unicom{version:android@10.0600,desmobile:"+self.phone_num+"};devicetype{deviceBrand:OnePlus,deviceModel:LE2100};{yw_code:}",
"Cookie": "ecs_token=" + self.ecs_token
}
try:
response = requests.get(url, headers=headers,allow_redirects=False)
# print_now(f"【{time.strftime('%Y-%m-%d %H:%M:%S')}】 ---- 【{self.phone_num}】发送成功,响应:{response.headers}")
# 获取location
location = response.headers.get("location",None)
headers["User-Agent"] = "Mozilla/5.0 (Linux; Android 13; LE2100 Build/TP1A.220905.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36; unicom{version:android@10.0600,desmobile:"+self.phone_num+"};devicetype{deviceBrand:OnePlus,deviceModel:LE2100};{yw_code:}"
# 获取ticket、channelId
ticket = ""
channelId = ""
if location is not None and location != "":
key_value = location.split("?")[1].split("&")
for i in range(len(key_value)):
key_value_temp = key_value[i].split("=")
if key_value_temp[0] == "ticket":
ticket = key_value_temp[1]
if key_value_temp[0] == "channelId":
channelId = key_value_temp[1]
# 登录
url = 'https://web.wostore.cn/api/app//user/v2/login'
headers["channelId"] = channelId
data = {
'identityType': 'esToken',
'code': self.ecs_token,
'ticket': ticket,
'uuid': "3cc1b0ff-ddb4-49dc-9e87-4d51811f7ea1"
}
# print_now(f'location{location}\ticket{ticket}\nheaders:{headers}\ndata:{data}')
response = requests.post(url, headers=headers, data=json.dumps(data))
text = response.json()
# print_now(f"【{time.strftime('%Y-%m-%d %H:%M:%S')}】 ---- 【{self.phone_num}】 登录成功,响应:{text}\n")
self.access_token = ""
if text["code"] == 200:
data = text["data"]
access_token = data["access_token"]
# print_now(f'账号{self.phone_num}登录成功成功获取access_token,额外需要使用时请复制保存: {access_token}\n')
if access_token is None or access_token == "":
self.access_token = ""
count += 1
else:
self.access_token = access_token
else:
print_now(f"{time.strftime('%Y-%m-%d %H:%M:%S')}】 ---- 【{self.phone_num}】 登录请求响应:{text}\n")
self.access_token = ""
count += 1
except Exception as e:
count += 1
print_now(f"\n\n账号{self.phone_num}登录请求出现错误,出错响应内容:{e}\n\n正在第{count}次重试中。。。" )
self.access_token = ""
return count
def deal_data(self):
global msg
global IsWoChangYouCK
if self.access_token == "" or self.access_token is None:
print_now(f'账号【{self.phone_num}】获取access_token失败')
msg += f'账号【{self.phone_num}】获取access_token失败\n\n'
return ""
print_now(f'账号【{self.phone_num}】成功获取到【access_token】{self.access_token}\n请复制保存使用')
# 是否填入
if IsWoChangYouCK == False or IsWoChangYouCK is None:
print_now(f'系统设置不写入access_token至青龙环境')
return ""
else:
print_now(f'系统设置写入access_token至青龙环境,开始写入。。。')
try:
# 获取沃畅游CK
cklist_temp = get_cookie("WoChangYouCK")
flag_temp = False
if len(cklist_temp)>0:
for i in range(len(cklist_temp)):
ck_temp = cklist_temp[i]
if ck_temp["remarks"] == phone:
flag_temp = True
put_flag = True
if flag == "old":
_id = ck_temp.get("_id",None)
if not _id:
_id = ck_temp["id"]
put_flag = put_envs_new(_id, ck_temp['name'], self.access_token, phone)
else:
put_flag = put_envs_old(_id, ck_temp['name'], self.access_token, phone)
# print("进入旧版本青龙禁用方法")
# disable_env(_id)
# delete_env(_id)
elif flag == "new":
put_flag = put_envs_new(ck_temp["id"], ck_temp['name'], self.access_token, phone)
# print("进入新版本青龙禁用方法")
# disable_env(ck_temp["id"])
# delete_env(ck_temp["id"])
if put_flag:
print_now(f"账号【{self.phone_num}】自动更新access_token至青龙环境WoChangYouCK 备注为:{phone}")
msg += f"账号【{phone}】自动更新access_token至青龙环境WoChangYouCK 备注为:{phone}\n\n"
else:
print_now(f"账号【{self.phone_num}】自动更新access_token至青龙环境失败")
msg += f"账号【{phone}】自动更新access_token至青龙环境失败\n\n"
if not flag_temp:
post_envs("WoChangYouCK", self.access_token, phone)
print_now(f"账号【{self.phone_num}】自动新增access_token至青龙环境WoChangYouCK 备注为:{phone}")
msg += f"账号【{phone}】自动新增access_token至青龙环境WoChangYouCK 备注为:{phone}\n\n"
except Exception as e:
print_now(f"{time.strftime('%Y-%m-%d %H:%M:%S')}】 ---- 【{phone}】 登录失败,错误信息:{e}\n")
msg += f"{time.strftime('%Y-%m-%d %H:%M:%S')}】 ---- 【{phone}】 登录失败,错误信息:{e}\n\n"
def main(self):
self.login_unicom()
# 方式一登录
# self.ticket = self.get_wo_speed_ticket()
# self.access_token = self.wo_speed_login_one()
# 方式二登录
self.wo_speed_login_two()
self.deal_data()
# 将生成的环境变量填入青龙环境中
def post_data_to_env():
global msg
global dataParam
# 是否填入
if IsChinaUnicomParam == False or IsChinaUnicomParam is None:
print_now(f'系统设置不写入token_online至青龙环境')
return ""
else:
print_now(f'系统设置写入token_online至青龙环境,开始写入。。。')
# 截取第一位拼接参数
if len(dataParam)>1:
dataParam = dataParam[1:]
else:
print_now(f'未获取到token_online参数')
return ""
try:
# 获取环境CK
cklist_temp = get_cookie(envName)
flag_temp = False
if len(cklist_temp)>0:
for i in range(len(cklist_temp)):
ck_temp = cklist_temp[i]
if ck_temp["remarks"] == "由“联通和沃畅游密码登录”脚本自动生成":
flag_temp = True
put_flag = True
if flag == "old":
_id = ck_temp.get("_id",None)
if not _id:
_id = ck_temp["id"]
put_flag = put_envs_new(_id, ck_temp['name'], dataParam, "由“联通和沃畅游密码登录”脚本自动生成")
else:
put_flag = put_envs_old(_id, ck_temp['name'], dataParam, "由“联通和沃畅游密码登录”脚本自动生成")
# print("进入旧版本青龙禁用方法")
# disable_env(_id)
# delete_env(_id)
elif flag == "new":
put_flag = put_envs_new(ck_temp["id"], ck_temp['name'], dataParam, "由“联通和沃畅游密码登录”脚本自动生成")
# print("进入新版本青龙禁用方法")
# disable_env(ck_temp["id"])
# delete_env(ck_temp["id"])
if put_flag:
print_now(f"已将全部正常执行生成联通参数的账号,自动更新至青龙环境:{envName} 由“联通和沃畅游密码登录”脚本自动生成")
msg += f"已将全部正常执行生成联通参数的账号,自动更新至青龙环境:{envName} 由“联通和沃畅游密码登录”脚本自动生成\n\n"
else:
print_now(f"全部正常执行生成联通参数的账号,自动更新至青龙环境:失败")
msg += f"全部正常执行生成联通参数的账号,自动更新至青龙环境:失败\n\n"
if not flag_temp:
post_envs(envName, dataParam, "由“联通和沃畅游密码登录”脚本自动生成")
print_now(f"已将全部正常执行生成联通参数的账号,自动新增至青龙环境:{envName} 备注为:由“联通和沃畅游密码登录”脚本自动生成")
msg += f"已将全部正常执行生成联通参数的账号,自动新增至青龙环境:{envName} 备注为:由“联通和沃畅游密码登录”脚本自动生成\n\n"
except Exception as e:
print_now(f"{time.strftime('%Y-%m-%d %H:%M:%S')}】 ---- 联通和沃畅游密码登录脚本生成参数提交青龙失败,错误信息:{e}\n")
msg += f"{time.strftime('%Y-%m-%d %H:%M:%S')}】 ---- 联通和沃畅游密码登录脚本生成参数提交青龙失败,错误信息:{e}\n\n"
def start(phone,password):
ul = UnicomLogin(phone,password)
ul.main()
if __name__ == "__main__":
l = []
ck_list = []
cklist = get_cookie("WoChangYouCK_PSW")
for i in range(len(cklist)):
#多账号以#分割开的ck
split1 = cklist[i]['value'].split("#")
#多账号以@分割开的ck
split2 = cklist[i]['value'].split("@")
#多账号以换行\n分割开的ck
split3 = cklist[i]['value'].split("\n")
remarks = cklist[i].get("remarks",None)
if len(split1)>1:
for j in range(len(split1)):
info = {}
info['value'] = split1[j]
info['remarks'] = split1[j].split("&")[0]
ck_list.append(info)
elif len(split2)>1:
for j in range(len(split2)):
info = {}
info['value'] = split2[j]
info['remarks'] = split2[j].split("&")[0]
ck_list.append(info)
elif len(split3)>1:
for j in range(len(split3)):
info = {}
info['value'] = split3[j]
info['remarks'] = split3[j].split("&")[0]
ck_list.append(info)
else:
if remarks is None or remarks == "":
cklist[i]['remarks'] = cklist[i]['value']
ck_list.append(cklist[i])
if len(ck_list)<1:
print_now('未添加CK,退出程序~')
exit(0)
for i in range(len(ck_list)):
ck = ck_list[i]
data = ck.get("value",None)
if data is None:
print_now("当前账号未填写 跳过\n")
continue
tmp_list = data.split("&")
if len(tmp_list)<2:
print_now("参数不齐 跳过\n")
continue
phone = tmp_list[0]
password = tmp_list[1]
print_now(f'开始执行第 {i+1} 个账号:{phone}')
start(phone,password)
#解决随机时间问题
ran_time = random.randint(3, 5)
if isDebugger == False and i != (len(ck_list)-1):
print_now(f"随机休眠{ran_time}秒,执行下一个账号操作\n\n")
time.sleep(ran_time)
else:
print_now("\n\n")
# 最后写入联通
post_data_to_env()
if WXPUSHER_TOKEN != "" and WXPUSHER_TOPIC_ID != "" and msg != "":
wxpusher("联通和沃畅游密码登录",msg)

File diff suppressed because one or more lines are too long

View File

@@ -1,160 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
cron: 0 */2 * * *
new Env('携趣IP检查');
"""
# 变量名xqipck 需要的值:[uid,ukey,vkey]多账户用#隔开或者换行单账号格式用[uid,ukey,vkey]
#作者:偷豆豆的大舅哥
#版本1.2
#更新时间2025-02-18
#说明本脚本用于检查携趣IP是否可用自动添加或删除白名单ip查询ip剩余数量
import base64
import codecs
import time
import sys
import os
import requests
import json
def _O0O0(s): return codecs.encode(s, 'rot_13')
def _0O0O(s): return codecs.decode(s, 'rot_13')
_A1B2 = _O0O0("脚本由偷豆豆的大舅哥创作")
_C3D4 = _O0O0("475866384")
def _X1(content, end='\n'): return print(content, end=end) and sys.stdout.flush()
def _X4(url, params=None):
_h = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Referer': 'http://op.xiequ.cn/'
}
try:
_r = requests.get(url, params=params, headers=_h, timeout=10)
return _r.text if _r.status_code == 200 else None
except: return None
def _X5():
try:
_r = requests.get('https://whois.pconline.com.cn/ipJson.jsp?json=true')
_r.encoding = 'utf-8'
_d = _r.json()
return _d.get('ip', None)
except: return None
def _X6():
try:
_c = os.getenv('xqipck')
if not _c: return []
_a = []
for _i in _c.replace('\n', '#').split('#'):
_i = _i.strip()
if not _i or not (_i.startswith('[') and _i.endswith(']')): continue
_p = _i[1:-1].split('&')
if len(_p) != 3: continue
_a.append({'uid': _p[0].strip(), 'ukey': _p[1].strip(), 'vkey': _p[2].strip()})
return [_x for _x in _a if all(_x.values())]
except: return []
def _X7(uid, ukey):
_u = 'http://op.xiequ.cn/ApiUser.aspx'
_p = {'act': 'suitdt', 'uid': uid, 'ukey': ukey}
_r = _X4(_u, _p)
if not _r: return {'success': False}
try:
_d = json.loads(_r)
if _d.get('success') == 'true' and _d.get('data'):
_i = _d['data'][0]
return {
'success': True,
'package_type': _i.get('type', ''),
'total_ips': int(_i.get('num', 0)),
'used_ips': int(_i.get('use', 0)),
'remaining_ips': int(_i.get('num', 0)) - int(_i.get('use', 0)),
'end_date': _i.get('enddate', ''),
'is_valid': _i.get('valid') == 'true'
}
except: pass
return {'success': False}
def _X8(uid, ukey):
_u = 'http://op.xiequ.cn/IpWhiteList.aspx'
return _X4(_u, {'uid': uid, 'ukey': ukey, 'act': 'get'}) or ''
def _X9(uid, ukey, ip):
_u = 'http://op.xiequ.cn/IpWhiteList.aspx'
_r = _X4(_u, {'uid': uid, 'ukey': ukey, 'act': 'add', 'ip': ip})
if not _r: return False
if _r == 'Err:IpRep': return True
if _r in ['success', 'OK']:
time.sleep(1)
return ip in (_X8(uid, ukey) or '').split('\n')
return False
def _X10(uid, ukey):
_u = 'http://op.xiequ.cn/IpWhiteList.aspx'
_r = _X4(_u, {'uid': uid, 'ukey': ukey, 'act': 'del', 'ip': 'all'})
return _r in ['success', 'OK'] if _r else False
def _X11(whitelist, ip):
try: return ip in whitelist.split('\n') if whitelist.strip() else False
except: return False
def main():
_X1('==== 携趣IP检查任务开始 ====')
_X1(f'任务开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
_a = _X6()
_X1(f'已加载 {len(_a)} 个账号')
_ip = _X5()
if not _ip:
_X1('获取IP失败')
return
_X1(f'当前IP{_ip}')
for _i, _acc in enumerate(_a, 1):
_X1(f'\n检查账号 {_i} (uid: {_acc["uid"]})')
_X1('-' * 30)
_s = _X7(_acc['uid'], _acc['ukey'])
if not _s['success']:
_X1('获取账号状态失败')
continue
_X1(f'套餐:{_s["package_type"]} (到期:{_s["end_date"]})')
_X1(f'IP{_s["total_ips"]},已用{_s["used_ips"]},剩余{_s["remaining_ips"]}')
if not _s['is_valid'] or _s['remaining_ips'] <= 0:
_X1('账号无效或无可用IP')
continue
_w = _X8(_acc['uid'], _acc['ukey'])
if _X11(_w, _ip):
if _w.strip().count('\n') > 0:
if not _X10(_acc['uid'], _acc['ukey']) or not _X9(_acc['uid'], _acc['ukey'], _ip):
continue
else: _X1('白名单正常')
else:
if _w.strip() and not _X10(_acc['uid'], _acc['ukey']): continue
if not _X9(_acc['uid'], _acc['ukey'], _ip): continue
if not _X11(_X8(_acc['uid'], _acc['ukey']), _ip):
_X1('白名单更新失败')
continue
_X1(f'✓ 账号可用 (剩余IP{_s["remaining_ips"]})')
return
_X1('\n==== 携趣IP检查任务结束 ====')
_X1(f'任务结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
if __name__ == '__main__':
main()

View File

@@ -1,31 +0,0 @@
#!/usr/bin/python3
# -- coding: utf-8 --
# @Time : 2023/6/30 10:23
# -------------------------------
# cron "0 0 6,8,20 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('雨云签到');
import json,requests,os,time
##变量雨云账号密码 注册地址https://www.rainyun.com/NTY1NzY=_ 登录后积分中心里面 赚钱积分 (如绑定微信 直接就有2000分就可以用积分兑换主机 需要每天晚上八点蹲点
# yyusername =os .getenv ("yyusername")#line:12
# yypassword =os .getenv ("yypassword")#line:13
def login_sign ():#line:17
O00OOO00O0OO0OO00 =requests .session ()#line:18
OOOO000000000O0O0 =O00OOO00O0OO0OO00 .post ('https://api.v2.rainyun.com/user/login',headers ={"Content-Type":"application/json"},data =json .dumps ({"field":f"{yyusername}","password":f"{yypassword}"}))#line:19
if OOOO000000000O0O0 .text .find ("200")>-1 :#line:20
print ("登录成功")#line:21
O000OOOOO000OOO0O =OOOO000000000O0O0 .cookies .get_dict ()['X-CSRF-Token']#line:22
else :#line:24
print (f"登录失败,响应信息:{OOOO000000000O0O0.text}")#line:25
O000O0OOOO00OOOOO ={'x-csrf-token':O000OOOOO000OOO0O ,}#line:31
O0O0O000OOOO0OOO0 =O00OOO00O0OO0OO00 .post ('https://api.v2.rainyun.com/user/reward/tasks',headers =O000O0OOOO00OOOOO ,data =json .dumps ({"task_name":"每日签到","verifyCode":""}))#line:32
print ('开始签到:签到结果 '+O0O0O000OOOO0OOO0 .text )#line:33
if __name__ =='__main__':#line:44
for i in range(len(os.getenv("yyusername").split('#'))):
yyusername=os.getenv("yyusername").split('#')[i]
yypassword=os.getenv("yypassword").split('#')[i]
login_sign ()#line:45

View File

@@ -1,113 +0,0 @@
'''
有点潦草,凑合一下
抓包qm_user_token。
青龙环境变量名bwcjck。多账户换行
'''
import requests
import os
import time
import random
global gpstr
import json
gpstr = ''
from dotenv import load_dotenv
load_dotenv()
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
print(f'环境变量{var_name}未设置,请检查。')
return None
accounts = value.strip().split('\n')
num_accounts = len(accounts)
print(f'-----------本次账号运行数量:{num_accounts}-----------')
print(f'----------项目:霸王茶姬 -1.1----------')
return accounts
def ck(qm_user_token):
global gpstr
headers = {
'User-Agent': "Mozilla/5.0 (Linux; Android 12; M2012K11AC Build/SKQ1.211006.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/122.0.6261.120 Mobile Safari/537.36 XWEB/1220133 MMWEBSDK/20240301 MMWEBID/8518 MicroMessenger/8.0.48.2580(0x28003036) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android",
'Accept': "v=1.0",
'Accept-Encoding': "gzip,compress,br,deflate",
'Content-Type': "application/json",
'qm-from': "wechat",
'qm-user-token': qm_user_token,
'charset': "utf-8",
'qm-from-type': "catering",
'Referer': "https://servicewechat.com/wxafec6f8422cb357b/179/page-frame.html"
}
payload = json.dumps({
"activityId": "947079313798000641",
"appid": "wxafec6f8422cb357b"
})
while True:
try:
response = requests.post('https://webapi2.qmai.cn/web/cmk-center/sign/takePartInSign', headers=headers,
data=payload)
response.raise_for_status() # 主动抛出异常如果状态码不是200
response_json = response.json()
# print(response_json)
# 检查响应是否为"上限已达"消息
if response_json.get('code') == 0 and response_json.get('message') == '该用户今日已签到':
print("已达上限,停止请求。")
gpstr += f'今天已签到\n'
break
# 成功获取奖品的情况
if response_json.get('code') == 0 and 'data' in response_json:
num = response_json['data']['rewardDetailList'][0]['sendNum']
print(f"签到成功!积分+{num}")
response1 = requests.post('https://webapi2.qmai.cn/web/cmk-center/sign/userSignStatistics',
headers=headers, data=payload).json()
sign_days = response1["data"]["signDays"]
print(f'靓仔你已经连续签到{sign_days}天啦!')
response2 = requests.post('https://webapi.qmai.cn/web/catering/crm/points-info',headers=headers, data=data).json()
total_points = response2["data"]["totalPoints"]
print(f'当前总共获得了{total_points}积分')
gpstr += f'签到成功!获得积分{num},已经签到{sign_days}天,总共获得{total_points}积分'
except requests.exceptions.HTTPError as http_err:
print(f"发生HTTP错误: {http_err}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
# 不论成功或异常均等待1-3秒
time.sleep(random.randint(1, 3))
# 调用函数传入qm-user-token的值
# 主函数
def main():
var_name = 'bwcjck'
tokens = get_env_variable(var_name)
if not tokens:
print(f'环境变量{var_name}未设置,请检查。')
return
total_accounts = len(tokens)
for i, token in enumerate(tokens):
parts = token.split('#')
if len(parts) < 1:
print("令牌格式不正确。跳过处理。")
continue
token = parts[0] # Token 值
account_no = parts[1] if len(parts) > 1 else "" # 备注信息
print(f'------账号 {i+1}/{total_accounts} {account_no} 签到-------')
ck(token)
if __name__ == "__main__":
main()
try:
import notify
notify.send('霸王茶姬', gpstr)
except Exception as e:
print(e)
print('推送失败')

File diff suppressed because one or more lines are too long