mirror of
https://github.com/zjk2017/ArcadiaScriptPublic.git
synced 2025-12-17 07:28:13 +08:00
186 lines
7.1 KiB
Python
186 lines
7.1 KiB
Python
"""
|
||
new Env("乐事心动社")
|
||
cron: 43 7 * * *
|
||
微信小程序乐事心动社
|
||
实物毛,兑换了个小包包,女朋友挺喜欢,如果不喜欢那不喜欢的是你
|
||
cookie不会更改的,抓一次就不用管了
|
||
export lsxdscks='authorization#memberId'
|
||
邀请扫码:https://pepsicocampuscrmstgblob.blob.core.chinacloudapi.cn/pepsicocampuscrmstgblob/FAQ_202310201504181814.png
|
||
fix 20240625 ArcadiaScriptPublic 频道:https://t.me/ArcadiaScript 群组:https://t.me/ArcadiaScriptPublic
|
||
增加打印签到信息
|
||
"""
|
||
|
||
|
||
|
||
import asyncio
|
||
import aiohttp
|
||
from typing import Optional, Dict
|
||
from urllib.parse import urlparse
|
||
import random,json
|
||
import os
|
||
|
||
|
||
|
||
class Lsxds:
|
||
def __init__(self) -> None:
|
||
"""
|
||
new model
|
||
|
||
"""
|
||
self.sessions = aiohttp.ClientSession()
|
||
|
||
async def close(self):
|
||
await self.sessions.close()
|
||
|
||
async def request(self, url, method='get', data=None, add_headers: Optional[Dict[str, str]] = None, headers=None, dtype='json', max_retries=3):
|
||
host = urlparse(url).netloc
|
||
_default_headers = {
|
||
"Accept": "*/*",
|
||
"Accept-Encoding": "gzip, deflate, br",
|
||
"Accept-Language": "zh-CN,zh;",
|
||
"Authorization": self.cookie,
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36',
|
||
"content-type": "application/json",
|
||
"Host":host
|
||
}
|
||
retries = 0
|
||
while retries < max_retries:
|
||
try:
|
||
request_headers = headers or _default_headers
|
||
if add_headers:
|
||
request_headers.update(add_headers)
|
||
async with getattr(self.sessions, method)(url, headers=request_headers, data=data) as response:
|
||
if response.status == 200:
|
||
if dtype == 'json':
|
||
return await response.json()
|
||
else:
|
||
return await response.text()
|
||
else:
|
||
print(f"请求失败状态码:{response.status}")
|
||
# 可以选择休眠一段时间再重试,以避免频繁请求
|
||
await asyncio.sleep(random.randint(3,5)) # 休眠1秒钟
|
||
except Exception as e:
|
||
print(f"请求出现错误:{e}")
|
||
await asyncio.sleep(random.randint(3,5)) # 休眠1秒钟
|
||
retries += 1
|
||
print(f"无法完成请求,已达到最大重试次数 ({max_retries})")
|
||
return None
|
||
|
||
async def member_point(self):
|
||
url = 'https://campuscrm.pepsico.com.cn/web/user/point/getMemberPoint'
|
||
data = {"memberId": self.memberid}
|
||
res = await self.request(url, 'post',data=data)
|
||
if not res:
|
||
print(f"[用户{self.index}]:获取金土豆信息失败")
|
||
return
|
||
if res['code'] == '00000':
|
||
print(f"[用户{self.index}]:当前 {res['data']}金土豆")
|
||
self.num = res['data']
|
||
else:
|
||
print(f"[用户{self.index}]:{res['message']}")
|
||
|
||
async def signin(self):
|
||
url = 'https://campuscrm.pepsico.com.cn/web/user/member/signIn'
|
||
data = {"memberId": self.memberid}
|
||
res = await self.request(url,'post',data=data)
|
||
# print(res)
|
||
if not res:
|
||
print(f"[用户{self.index}]:请求签到失败")
|
||
return
|
||
if res['code'] == '00000':
|
||
if res['data'] is None:
|
||
print(f"[用户{self.index}]:请求签到已完成了过了")
|
||
else:
|
||
print(f"[用户{self.index}]:签到{res['message']},签到时间{res['data']['singInTime']},抽到卡片{res['data']['rewardName']}")
|
||
else:
|
||
print(f"[用户{self.index}]:请求签到出现未知错误{res}")
|
||
|
||
async def share_card(self):
|
||
url = 'https://campuscrm.pepsico.com.cn/web/user/original/getArticleList'
|
||
data = json.dumps({
|
||
"categoryId": 0,
|
||
"sort": 0,
|
||
"pageNum": 1,
|
||
"pageSize": 10,
|
||
"memberId": self.memberid
|
||
})
|
||
res = await self.request(url,'post',data=data)
|
||
if not res:
|
||
print(f"[用户{self.index}]:请求分享信息失败")
|
||
return
|
||
if res['code'] == '00000':
|
||
card = random.choice(res['data']['list'])
|
||
print(f"[用户{self.index}]:分享卡片信息id {card['id']},标题 {card['title']}")
|
||
url1 = 'https://campuscrm.pepsico.com.cn/web/user/original/shark'
|
||
data1 = json.dumps({"id":str(card['id']),"memberId":self.memberid})
|
||
res1 = await self.request(url1,'post',data=data1)
|
||
if res1['code'] == '00000':
|
||
print(f"[用户{self.index}]:分享卡片信息{res1['message']}")
|
||
else:
|
||
print(f"[用户{self.index}]:分享卡片信息未知错误{res}")
|
||
else:
|
||
print(f"[用户{self.index}]:分享卡片信息获取失败{res}")
|
||
|
||
async def goods(self):
|
||
url = 'https://campuscrm.pepsico.com.cn/web/integral/goods/page'
|
||
data = json.dumps({
|
||
"pageSize": 40,
|
||
"pageNum": 1,
|
||
"goodsCategoryId": "",
|
||
"isShow": "YES",
|
||
"goodsStatus": "ON",
|
||
"status": "ENABLE",
|
||
"memberId": self.memberid
|
||
})
|
||
res = await self.request(url,'post',data=data)
|
||
if not res:
|
||
print(f"[用户{self.index}]:获取兑换页面失败")
|
||
if res['code'] == '00000':
|
||
for item in res['data']['list']:
|
||
if self.num >= item['exchangePoint']:
|
||
print(f"[用户{self.index}]:当前可兑换的商品id {item['id']},需要消耗{item['exchangePoint']}金土豆,图片链接:{item['goodsImg'].split(',')[0]}")
|
||
else:
|
||
print(f"[用户{self.index}]:获取兑换页面出错了{res}")
|
||
|
||
|
||
async def run(self,index, ck:str):#
|
||
self.index = index
|
||
self.cookie,self.memberid = ck.split("#")
|
||
await self.member_point()
|
||
await self.signin()
|
||
await self.share_card()
|
||
await self.member_point()
|
||
await self.goods()
|
||
await self.close()
|
||
|
||
async def check_env():
|
||
# 这里可以写完善一点的获取环境变量功能
|
||
cks = os.getenv('lsxdscks')
|
||
if cks is None:
|
||
print("你没有填写lsxdscks='authorization#memberId'")
|
||
exit()
|
||
correct_data = []
|
||
for index ,ck in enumerate(cks.split("@")):
|
||
# 也许这里可以添加你的变量检测是否合规
|
||
# Here you can write some code.
|
||
correct_data.append(ck)
|
||
return correct_data
|
||
|
||
async def main():
|
||
cks_list = await check_env()
|
||
# 检查是否存在环境变量 multi
|
||
use_concurrency = os.environ.get('lsxds_multi', 'false').lower() == 'true'
|
||
tasks = []
|
||
for index, ck in enumerate(cks_list):
|
||
abc = Lsxds()
|
||
task = abc.run(index+1, ck)
|
||
tasks.append(task)
|
||
if use_concurrency: # 如果是true 那么就执行并发
|
||
await asyncio.gather(*tasks) # 并发执行任务
|
||
else: # 如果是false 那么就串行执行
|
||
for task in tasks:
|
||
await task
|
||
|
||
if __name__ == '__main__':
|
||
asyncio.run(main())
|