diff --git a/jd_taskop.py b/jd_taskop.py index a63bff5..631cc47 100644 --- a/jd_taskop.py +++ b/jd_taskop.py @@ -156,14 +156,26 @@ def disable_duplicate_tasks(ids: list) -> None: logger.info("🎉成功禁用重复任务~") + + + +def get_latest_file(files): + latest_file = None + latest_mtime = 0 + for file in files: + try: + stats = os.stat(file) + mtime = stats.st_mtime + if mtime > latest_mtime: + latest_mtime = mtime + latest_file = file + except FileNotFoundError: + continue + return latest_file + def get_token() -> str or None: - if os.path.exists("/ql/data/db/keyv.sqlite"): - path="/ql/data/db/keyv.sqlite" - elif os.path.exists("/ql/config/auth.json"): - path="/ql/config/auth.json" - elif os.path.exists("/ql/data/config/auth.json"): - path="/ql/data/config/auth.json" - + token_file_list = ['/ql/data/db/keyv.sqlite', '/ql/data/config/auth.json', '/ql/config/auth.json'] + path=get_latest_file(token_file_list) try: if 'keyv' in path: with open(path, "r", encoding="latin1") as file: diff --git a/jd_wsck.py b/jd_wsck.py index b6e19a1..a496a4e 100644 --- a/jd_wsck.py +++ b/jd_wsck.py @@ -424,6 +424,20 @@ def getRemark(pt_pin,token): return strreturn +def get_latest_file(files): + latest_file = None + latest_mtime = 0 + for file in files: + try: + stats = os.stat(file) + mtime = stats.st_mtime + if mtime > latest_mtime: + latest_mtime = mtime + latest_file = file + except FileNotFoundError: + continue + return latest_file + def main(): printf("版本: 20230602") printf("说明: 如果用Wxpusher通知需配置WP_APP_TOKEN_ONE和WP_APP_MAIN_UID,其中WP_APP_MAIN_UID是你的Wxpusher UID") @@ -438,18 +452,12 @@ def main(): iswxpusher=False counttime=0 - if os.path.exists("/ql/config/auth.json"): - config="/ql/config/auth.json" - envtype="ql" - if os.path.exists("/ql/data/config/auth.json"): - config="/ql/data/config/auth.json" - envtype="ql" - if os.path.exists("/jd/config/auth.json"): - config="/jd/config/auth.json" - envtype="arcadia" - if os.path.exists("/ql/data/db/keyv.sqlite"): - config="/ql/data/db/keyv.sqlite" - envtype="ql_latest" + + + token_file_list = ['/ql/data/db/keyv.sqlite', '/ql/data/config/auth.json', '/ql/config/auth.json'] + + config = get_latest_file(token_file_list) + envtype="ql" if os.path.exists("/arcadia/config/auth.json"): config="/arcadia/config/auth.json" @@ -482,8 +490,16 @@ def main(): summary="" if envtype == "ql": - with open(config, "r", encoding="utf-8") as f1: - token = json.load(f1)['token'] + if 'keyv' in config: + with open(config, "r", encoding="latin1") as file: + auth = file.read() + matches = re.search(r'token":"([^"]+)"', auth) + token = matches.group(1) + else: + with open(config, "r") as file: + auth = file.read() + auth = json.loads(auth) + token = auth["token"] url = 'http://127.0.0.1:5600/api/envs' headers = {'Authorization': f'Bearer {token}'} body = { @@ -499,20 +515,7 @@ def main(): url = 'http://127.0.0.1:5678/openApi/count' headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ', 'api-token': f'{token}'} datas = get(url, headers=headers).json()["data"]["accountCount"] - elif envtype == "ql_latest": - with open(config, "r", encoding="latin1") as f1: - content = f1.read() - matches = re.search(r'token":"([^"]+)"', content) - try: - token = matches.group(1) - except Exception as e: - sys.exit(0) - url = 'http://127.0.0.1:5600/api/envs' - headers = {'Authorization': f'Bearer {token}'} - body = { - 'searchValue': 'JD_WSCK', - 'Authorization': f'Bearer {token}' - } + datas = get(url, params=body, headers=headers).json()['data'] @@ -522,7 +525,7 @@ def main(): printf("\n错误:没有需要转换的JD_WSCK,退出脚本!") return - if envtype in ('ql','ql_latest'): + if envtype in ('ql',''): for data in datas: randomuserAgent() if data['status']!=0: diff --git a/jd_wskey.py b/jd_wskey.py index e27e16a..d267a4b 100644 --- a/jd_wskey.py +++ b/jd_wskey.py @@ -230,28 +230,36 @@ def get_qltoken(username, password, twoFactorSecret): # 方法 用于获取青 # else: # 无异常执行分支 # return token # 返回 token值 - +def get_latest_file(files): + latest_file = None + latest_mtime = 0 + for file in files: + try: + stats = os.stat(file) + mtime = stats.st_mtime + if mtime > latest_mtime: + latest_mtime = mtime + latest_file = file + except FileNotFoundError: + continue + return latest_file # 返回值 Token def ql_login() -> str: # 方法 青龙登录(获取Token 功能同上) - path_latest = '/ql/data/db/keyv.sqlite' # 设置青龙 auth文件地址 - path = '/ql/data/config/auth.json' - if not os.path.isfile(path): - path = '/ql/config/auth.json' # 尝试设置青龙 auth 新版文件地址 - if os.path.isfile(path) or os.path.isfile(path_latest): # 进行文件真值判断 - - if os.path.isfile(path_latest): - with open(path_latest, "r", encoding="latin1") as file: - auth = file.read() # 读取文件 + token_file_list = ['/ql/data/db/keyv.sqlite', '/ql/data/config/auth.json', '/ql/config/auth.json'] + path = get_latest_file(token_file_list) + if os.path.isfile(path): # 进行文件真值判断 + if 'keyv' in path: + with open(path, "r", encoding="latin1") as file: + auth = file.read() matches = re.search(r'token":"([^"]+)"', auth) - token = matches.group(1) + token = matches.group(1) else: - with open(path, "r") as file: # 上下文管理 - auth = file.read() # 读取文件 - file.close() # 关闭文件 - auth = json.loads(auth) # 使用 json模块读取 - username = auth["username"] # 提取 username - password = auth["password"] # 提取 password - token = auth["token"] # 提取 authkey + with open(path, "r") as file: + auth = file.read() + auth = json.loads(auth) + username = auth["username"] # 提取 username + password = auth["password"] # 提取 password + token = auth["token"] try: twoFactorSecret = auth["twoFactorSecret"]