diff --git a/乐事心动社.py b/乐事心动社.py deleted file mode 100644 index d8ecf3c..0000000 --- a/乐事心动社.py +++ /dev/null @@ -1,185 +0,0 @@ -""" -new Env("乐事心动社") -cron: 43 7 * * * -微信小程序乐事心动社 -实物毛,兑换了个小包包,女朋友挺喜欢,如果不喜欢那不喜欢的是你 -cookie不会更改的,抓一次就不用管了 -export lsxdscks='authorization#memberId' -邀请扫码:https://pepsicocampuscrmstgblob.blob.core.chinacloudapi.cn/pepsicocampuscrmstgblob/FAQ_202310201504181814.png -fix 20240625 ArcadiaScriptPublic 频道:https://t.me/ArcadiaScript 群组:https://t.me/ArcadiaScriptPublic -增加打印签到信息 -""" - - - -import asyncio -import aiohttp -from typing import Optional, Dict -from urllib.parse import urlparse -import random,json -import os - - - -class Lsxds: - def __init__(self) -> None: - """ - new model - - """ - self.sessions = aiohttp.ClientSession() - - async def close(self): - await self.sessions.close() - - async def request(self, url, method='get', data=None, add_headers: Optional[Dict[str, str]] = None, headers=None, dtype='json', max_retries=3): - host = urlparse(url).netloc - _default_headers = { - "Accept": "*/*", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;", - "Authorization": self.cookie, - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36', - "content-type": "application/json", - "Host":host - } - retries = 0 - while retries < max_retries: - try: - request_headers = headers or _default_headers - if add_headers: - request_headers.update(add_headers) - async with getattr(self.sessions, method)(url, headers=request_headers, data=data) as response: - if response.status == 200: - if dtype == 'json': - return await response.json() - else: - return await response.text() - else: - print(f"请求失败状态码:{response.status}") - # 可以选择休眠一段时间再重试,以避免频繁请求 - await asyncio.sleep(random.randint(3,5)) # 休眠1秒钟 - except Exception as e: - print(f"请求出现错误:{e}") - await asyncio.sleep(random.randint(3,5)) # 休眠1秒钟 - retries += 1 - print(f"无法完成请求,已达到最大重试次数 ({max_retries})") - return None - - async def member_point(self): - url = 'https://campuscrm.pepsico.com.cn/web/user/point/getMemberPoint' - data = {"memberId": self.memberid} - res = await self.request(url, 'post',data=data) - if not res: - print(f"[用户{self.index}]:获取金土豆信息失败") - return - if res['code'] == '00000': - print(f"[用户{self.index}]:当前 {res['data']}金土豆") - self.num = res['data'] - else: - print(f"[用户{self.index}]:{res['message']}") - - async def signin(self): - url = 'https://campuscrm.pepsico.com.cn/web/user/member/signIn' - data = {"memberId": self.memberid} - res = await self.request(url,'post',data=data) - # print(res) - if not res: - print(f"[用户{self.index}]:请求签到失败") - return - if res['code'] == '00000': - if res['data'] is None: - print(f"[用户{self.index}]:请求签到已完成了过了") - else: - print(f"[用户{self.index}]:签到{res['message']},签到时间{res['data']['singInTime']},抽到卡片{res['data']['rewardName']}") - else: - print(f"[用户{self.index}]:请求签到出现未知错误{res}") - - async def share_card(self): - url = 'https://campuscrm.pepsico.com.cn/web/user/original/getArticleList' - data = json.dumps({ - "categoryId": 0, - "sort": 0, - "pageNum": 1, - "pageSize": 10, - "memberId": self.memberid - }) - res = await self.request(url,'post',data=data) - if not res: - print(f"[用户{self.index}]:请求分享信息失败") - return - if res['code'] == '00000': - card = random.choice(res['data']['list']) - print(f"[用户{self.index}]:分享卡片信息id {card['id']},标题 {card['title']}") - url1 = 'https://campuscrm.pepsico.com.cn/web/user/original/shark' - data1 = json.dumps({"id":str(card['id']),"memberId":self.memberid}) - res1 = await self.request(url1,'post',data=data1) - if res1['code'] == '00000': - print(f"[用户{self.index}]:分享卡片信息{res1['message']}") - else: - print(f"[用户{self.index}]:分享卡片信息未知错误{res}") - else: - print(f"[用户{self.index}]:分享卡片信息获取失败{res}") - - async def goods(self): - url = 'https://campuscrm.pepsico.com.cn/web/integral/goods/page' - data = json.dumps({ - "pageSize": 40, - "pageNum": 1, - "goodsCategoryId": "", - "isShow": "YES", - "goodsStatus": "ON", - "status": "ENABLE", - "memberId": self.memberid - }) - res = await self.request(url,'post',data=data) - if not res: - print(f"[用户{self.index}]:获取兑换页面失败") - if res['code'] == '00000': - for item in res['data']['list']: - if self.num >= item['exchangePoint']: - print(f"[用户{self.index}]:当前可兑换的商品id {item['id']},需要消耗{item['exchangePoint']}金土豆,图片链接:{item['goodsImg'].split(',')[0]}") - else: - print(f"[用户{self.index}]:获取兑换页面出错了{res}") - - - async def run(self,index, ck:str):# - self.index = index - self.cookie,self.memberid = ck.split("#") - await self.member_point() - await self.signin() - await self.share_card() - await self.member_point() - await self.goods() - await self.close() - -async def check_env(): - # 这里可以写完善一点的获取环境变量功能 - cks = os.getenv('lsxdscks') - if cks is None: - print("你没有填写lsxdscks='authorization#memberId'") - exit() - correct_data = [] - for index ,ck in enumerate(cks.split("@")): - # 也许这里可以添加你的变量检测是否合规 - # Here you can write some code. - correct_data.append(ck) - return correct_data - -async def main(): - cks_list = await check_env() - # 检查是否存在环境变量 multi - use_concurrency = os.environ.get('lsxds_multi', 'false').lower() == 'true' - tasks = [] - for index, ck in enumerate(cks_list): - abc = Lsxds() - task = abc.run(index+1, ck) - tasks.append(task) - if use_concurrency: # 如果是true 那么就执行并发 - await asyncio.gather(*tasks) # 并发执行任务 - else: # 如果是false 那么就串行执行 - for task in tasks: - await task - -if __name__ == '__main__': - asyncio.run(main())