This commit is contained in:
XiaoGe-LiBai
2025-12-10 17:49:11 +08:00
parent 39263f5a84
commit 0f6593789f
9 changed files with 4049 additions and 0 deletions

View File

@@ -0,0 +1,22 @@
{
"accounts": [
{
"name": "账号1",
"token": "",
"photo": "photo.txt",
"enabled": true
},
{
"name": "账号2",
"token": "",
"photo": "photo_account2.txt",
"enabled": true
},
{
"name": "账号3",
"token": "",
"photo": "photo_account3.txt",
"enabled": false
}
]
}

View File

@@ -0,0 +1,932 @@
"""
小程序自动打卡脚本 - 多账号版本
=== 配置说明 ===
方式一:配置文件(推荐)
在 accounts.json 中配置多个账号:
{
"accounts": [
{
"name": "张三",
"token": "eyJhbGc...xyz",
"photo": "photo.txt",
"enabled": true
},
{
"name": "李四",
"token": "eyJhbGc...abc",
"photo": "photo_lisi.txt",
"enabled": true
}
]
}
方式二:环境变量
变量名: hxza_dk
格式1推荐: 备注名#token#photo文件名
示例: 张三##photo.txt&侯利##photo_houli.txt
说明: token留空时会自动使用photo文件登录获取token
格式2完整: 备注名#token#photo文件名
示例: 张三#eyJhbGc...xyz#photo.txt&侯利#eyJhbGc...abc#photo_houli.txt
格式3兼容旧版: token#备注名
示例: eyJhbGc...xyz#张三&eyJhbGc...abc#侯利
说明: 使用默认photo.txt文件
说明:
- name: 账号备注名称(用于日志标识)
- token: 登录凭证(可留空,留空时自动登录)
- photo: 打卡照片文件路径相对于脚本目录默认photo.txt
- enabled: 是否启用该账号(环境变量模式下默认全部启用)
- 多个账号用 & 分隔
- 配置文件优先级高于环境变量
"""
import requests
import time
import os
import sys
import logging
import json
from datetime import datetime
from typing import Dict, Optional, Tuple, List
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 添加父目录到路径,以便导入通知模块
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 导入通知模块
try:
from notify import send as notify_send
NOTIFY_ENABLED = True
except ImportError:
try:
from sendNotify import send as notify_send
NOTIFY_ENABLED = True
except ImportError:
NOTIFY_ENABLED = False
def notify_send(title, content):
pass
# 配置日志(青龙面板只输出到控制台)
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class AccountConfig:
"""账号配置类"""
def __init__(self, name: str, token: str, photo: str = "photo.txt", enabled: bool = True):
self.name = name
self.token = token
self.photo = photo
self.enabled = enabled
@classmethod
def from_dict(cls, data: Dict) -> 'AccountConfig':
"""从字典创建配置"""
return cls(
name=data.get('name', ''),
token=data.get('token', ''),
photo=data.get('photo', 'photo.txt'),
enabled=data.get('enabled', True)
)
@classmethod
def from_env_string(cls, env_str: str) -> 'AccountConfig':
"""从环境变量字符串创建配置
支持两种格式:
1. 旧格式: token#备注名
2. 新格式: 备注名#token#photo文件名
"""
parts = env_str.split('#')
if len(parts) >= 3:
# 新格式: 备注名#token#photo文件名
name = parts[0].strip()
token = parts[1].strip()
photo = parts[2].strip()
return cls(name=name, token=token, photo=photo)
elif len(parts) == 2:
# 旧格式: token#备注名
token = parts[0].strip()
name = parts[1].strip()
return cls(name=name, token=token)
else:
# 只有token
token = parts[0].strip()
return cls(name="", token=token)
class ConfigLoader:
"""配置加载器"""
@staticmethod
def load_from_file(file_path: str = "accounts.json") -> List[AccountConfig]:
"""从配置文件加载账号"""
try:
if not os.path.exists(file_path):
return []
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
accounts = []
for account_data in data.get('accounts', []):
account = AccountConfig.from_dict(account_data)
# 只要账号启用就加载token 可以为空(会自动登录)
if account.enabled:
accounts.append(account)
return accounts
except Exception as e:
logger.error(f"❌ 加载配置文件失败: {str(e)}")
return []
@staticmethod
def load_from_env() -> List[AccountConfig]:
"""从环境变量加载账号"""
env_value = os.getenv('hxza_dk', '')
if not env_value:
return []
accounts = []
# 支持 & 分隔多个账号
for account_str in env_value.split('&'):
account_str = account_str.strip()
if account_str:
accounts.append(AccountConfig.from_env_string(account_str))
return accounts
@staticmethod
def load_accounts() -> List[AccountConfig]:
"""加载所有账号配置(配置文件优先)"""
# 优先从配置文件加载
accounts = ConfigLoader.load_from_file()
# 如果配置文件为空,尝试从环境变量加载
if not accounts:
accounts = ConfigLoader.load_from_env()
return accounts
class AutoCheckIn:
"""自动打卡类"""
def __init__(self, account: AccountConfig):
self.account = account
self.base_url = "https://erp.baian.tech/baian-admin"
self.token = account.token
self.remark = account.name
self.photo_file = account.photo
# 配置带重试的 session
self.session = self._create_session()
# 动态获取的用户信息
self.user_name = None
self.user_mobile = None
# 动态获取的班次时间
self.start_time = None
self.end_time = None
self.is_cross_day = False
# 打卡状态信息(从接口获取)
self.is_card_time = False # 是否在打卡时间内
self.checkin_status = None # 上班打卡状态 (0=未打卡, 1=已打卡)
self.checkin_time = None # 上班实际打卡时间
self.checkout_status = None # 下班打卡状态 (0=未打卡, 1=已打卡)
self.checkout_time = None # 下班实际打卡时间
# 固定参数
self.app_id = "wxa81d5d83880ea6b6"
# 动态参数(从班次信息接口获取)
self.item_id = None
self.job_date_id = None
self.job_id = None
self.address_name = None
self.longitude = None
self.latitude = None
self.headers = {
'Host': 'erp.baian.tech',
'Connection': 'keep-alive',
'appId': self.app_id,
'content-type': 'application/json',
'wxAppKeyCompanyId': self.app_id,
'token': self.token,
'Accept-Encoding': 'gzip,compress,br,deflate',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI',
'Referer': f"https://servicewechat.com/{self.app_id}/102/page-frame.html"
}
def _create_session(self) -> requests.Session:
"""创建带重试机制的 session"""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["POST", "GET"]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)
session.mount('http://', adapter)
return session
def _check_token_valid(self) -> bool:
"""检查 token 是否有效"""
try:
# 尝试获取用户信息来验证 token
url = f"{self.base_url}/userH5/getMyDetails"
response = self.session.get(url, headers=self.headers, timeout=15)
if response.status_code != 200:
return False
result = response.json()
return result.get('code') == 200
except Exception:
return False
def _update_accounts_json(self, new_token: str, config_file: str = "accounts.json") -> bool:
"""
更新 accounts.json 配置文件中的 token
Args:
new_token: 新的 token
config_file: 配置文件路径
Returns:
成功返回 True失败返回 False
"""
try:
if not os.path.exists(config_file):
logger.warning(f"⚠️ 配置文件不存在: {config_file}")
return False
logger.info(f"\n📝 正在更新配置文件...")
logger.info(f" 📄 文件: {config_file}")
# 读取配置文件
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
# 查找并更新当前账号的 token
updated = False
for account in config.get('accounts', []):
if account.get('name') == self.account.name:
account['token'] = new_token
updated = True
break
if not updated:
logger.warning(f"⚠️ 未找到账号 [{self.account.name}] 的配置")
return False
# 写回配置文件
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
logger.info(f"✅ 配置文件已更新")
logger.info(f" 账号: {self.account.name}")
logger.info(f" Token: {new_token[:20]}...{new_token[-10:]}")
return True
except PermissionError:
logger.warning("⚠️ 没有权限写入配置文件")
return False
except Exception as e:
logger.warning(f"⚠️ 更新配置文件失败: {str(e)}")
return False
def _auto_login(self) -> bool:
"""
使用照片自动登录获取新 token
Returns:
成功返回 True失败返回 False
"""
try:
logger.info("🔍 正在读取照片文件...")
# 读取照片文件
if not os.path.exists(self.photo_file):
logger.error(f"❌ 照片文件不存在: {self.photo_file}")
return False
with open(self.photo_file, 'r', encoding='utf-8') as f:
image_data = f.read().strip()
if not image_data:
logger.error(f"❌ 照片文件为空")
return False
logger.info(f"✅ 照片文件读取成功")
# 发送登录请求
logger.info("🔐 正在发送登录请求...")
url = f"{self.base_url}/recognition/faceSearch"
data = {"image": image_data}
headers = {
'Host': 'erp.baian.tech',
'Connection': 'keep-alive',
'appId': self.app_id,
'content-type': 'application/json',
'wxAppKeyCompanyId': self.app_id,
'token': '', # 登录时不需要 token
'Accept-Encoding': 'gzip,compress,br,deflate',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'Referer': f"https://servicewechat.com/{self.app_id}/102/page-frame.html"
}
response = self.session.post(
url,
json=data,
headers=headers,
timeout=30
)
if response.status_code != 200:
logger.error(f"❌ 登录请求失败: HTTP {response.status_code}")
return False
result = response.json()
if result.get('code') != 200:
logger.error(f"❌ 登录失败: {result.get('msg')}")
return False
# 解析返回数据
data = result.get('data', {})
token = data.get('token')
expire = data.get('expire') # 秒数1296000 = 15天
if not token:
logger.error(f"❌ 未获取到 token")
return False
# 更新 token
self.token = token
self.account.token = token
self.headers['token'] = token
# 计算过期时间
from datetime import timedelta
expire_date = datetime.now() + timedelta(seconds=expire)
expire_str = expire_date.strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"✅ 自动登录成功!")
logger.info(f" 🎫 Token: {token[:20]}...{token[-10:]}")
logger.info(f" ⏰ 有效期: {expire // 86400}")
logger.info(f" 📅 过期时间: {expire_str}")
# 尝试更新配置文件
self._update_accounts_json(token)
return True
except Exception as e:
logger.error(f"❌ 自动登录异常: {str(e)}")
return False
def _fetch_user_info(self) -> bool:
"""获取用户信息(姓名、手机号等)"""
try:
url = f"{self.base_url}/userH5/getMyDetails"
response = self.session.get(url, headers=self.headers, timeout=15)
if response.status_code != 200:
logger.error(f"❌ 获取用户信息失败: HTTP {response.status_code}")
return False
result = response.json()
if result.get('code') != 200:
logger.error(f"❌ 获取用户信息失败: {result.get('msg')}")
return False
data = result.get('data', {})
self.user_name = data.get('name', '')
self.user_mobile = data.get('mobile', '')
return True
except Exception as e:
logger.error(f"❌ 获取用户信息异常: {str(e)}")
return False
def _fetch_job_details(self) -> bool:
"""获取班次信息itemId、jobDateId、经纬度、班次时间等"""
try:
url = f"{self.base_url}/Management/getJobUserDetails"
params = {"userId": ""}
response = self.session.get(url, params=params, headers=self.headers, timeout=15)
if response.status_code != 200:
logger.error(f"❌ 获取班次信息失败: HTTP {response.status_code}")
return False
result = response.json()
if result.get('code') != 200:
logger.error(f"❌ 获取班次信息失败: {result.get('msg')}")
return False
data = result.get('data', {})
# 提取 itemId 和 jobId
self.item_id = data.get('itemId')
self.job_id = data.get('jobId')
# 提取是否在打卡时间内
self.is_card_time = data.get('isCardTime', False)
# 提取班次时段信息
job_date_dtos = data.get('jobDateDTOS', [])
if job_date_dtos:
job_date = job_date_dtos[0]
self.job_date_id = job_date.get('jobDateId')
self.start_time = job_date.get('startDate')
self.end_time = job_date.get('endDate')
self.is_cross_day = job_date.get('ismorrow') == 1
# 提取打卡状态信息
self.checkin_status = job_date.get('status') # 上班打卡状态
self.checkin_time = job_date.get('date') # 上班打卡时间
self.checkout_status = job_date.get('statusOff') # 下班打卡状态
self.checkout_time = job_date.get('dateOff') # 下班打卡时间
# 提取地址信息(经纬度)
address_dtos = data.get('addressDTOS', [])
if address_dtos:
address = address_dtos[0]
self.address_name = address.get('addressName')
self.longitude = float(address.get('longitude'))
self.latitude = float(address.get('latitude'))
# 验证必需参数
if not all([self.item_id, self.job_date_id, self.address_name,
self.longitude, self.latitude, self.start_time, self.end_time]):
logger.error(f"❌ 班次信息不完整")
return False
return True
except Exception as e:
logger.error(f"❌ 获取班次信息异常: {str(e)}")
return False
def initialize(self) -> bool:
"""初始化账号信息"""
display_name = self.remark if self.remark else "未命名"
logger.info(f"\n{'='*50}")
logger.info(f"🔄 正在初始化账号: {display_name}")
logger.info(f"{'='*50}")
# 检查 Token 有效性
if not self.token or not self._check_token_valid():
if not self.token:
logger.warning("⚠️ Token 为空")
else:
logger.warning("⚠️ Token 无效或已过期")
logger.info("🔐 尝试自动登录...")
# 尝试自动登录
if not self._auto_login():
logger.error("❌ 自动登录失败")
return False
if not self._fetch_user_info():
return False
if not self._fetch_job_details():
return False
# 显示账号信息
display_name = self.remark if self.remark else self.user_name
logger.info(f"\n✅ 账号初始化成功")
logger.info(f" 👤 姓名: {display_name}")
logger.info(f" 📱 手机: {self.user_mobile}")
logger.info(f" 📍 地点: {self.address_name}")
logger.info(f" ⏰ 班次: {self.start_time} - {self.end_time}{' (跨天)' if self.is_cross_day else ''}")
self._show_checkin_windows()
return True
def _request_with_retry(self, url: str, data: Dict, checkin_type: str, max_retries: int = 3) -> Optional[Dict]:
"""带重试的网络请求"""
for attempt in range(1, max_retries + 1):
try:
response = self.session.post(url, json=data, headers=self.headers, timeout=15)
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
return result
else:
logger.error(f"{result.get('msg')}")
return None
else:
if attempt < max_retries:
logger.warning(f"⚠️ HTTP {response.status_code}, 重试 {attempt}/{max_retries}")
time.sleep(2 ** attempt)
continue
return None
except requests.exceptions.Timeout:
if attempt < max_retries:
logger.warning(f"⏱️ 超时, 重试 {attempt}/{max_retries}")
time.sleep(2 ** attempt)
continue
return None
except requests.exceptions.RequestException as e:
if attempt < max_retries:
logger.warning(f"⚠️ {str(e)}, 重试 {attempt}/{max_retries}")
time.sleep(2 ** attempt)
continue
return None
logger.error(f"❌ 请求失败")
return None
def _time_to_minutes(self, time_str: str) -> int:
"""时间字符串转为分钟数"""
h, m = map(int, time_str.split(':'))
return h * 60 + m
def _minutes_to_time(self, minutes: int) -> str:
"""分钟数转时间字符串"""
h, m = divmod(minutes % 1440, 60)
return f"{h:02d}:{m:02d}"
def _get_checkin_windows(self) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""根据班次时间,自动计算打卡时间窗口"""
start_minutes = self._time_to_minutes(self.start_time)
end_minutes = self._time_to_minutes(self.end_time)
first_start = (start_minutes - 120) % 1440
first_end = start_minutes
second_start = (end_minutes + 1) % 1440
second_end = (start_minutes - 121) % 1440
return (first_start, first_end), (second_start, second_end)
def _is_in_window(self, current_minutes: int, start: int, end: int) -> bool:
"""检查当前时间是否在窗口内"""
if start <= end:
return start <= current_minutes <= end
else:
return current_minutes >= start or current_minutes <= end
def _get_current_checkin_type(self) -> Optional[str]:
"""
智能判断应该执行哪种打卡
判断逻辑:
1. 必须在打卡时间内 (isCardTime == true)
2. 检查当前时间是否在对应的打卡窗口内
3. 如果 status == 0 或 date == null 且在上班窗口 → 执行上班打卡
4. 如果 status == 1 且 (statusOff == 0 或 dateOff == null) 且在下班窗口 → 执行下班打卡
5. 如果两次都已打卡 → 返回 'already_done'
Returns:
'first' - 需要上班打卡
'second' - 需要下班打卡
'already_done' - 已完成所有打卡
None - 不在打卡时间内或不在对应窗口内
"""
# 检查是否在打卡时间内
if not self.is_card_time:
return None
# 获取当前时间(分钟数)
now = datetime.now()
current_minutes = now.hour * 60 + now.minute
# 获取打卡时间窗口
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
# 判断上班打卡状态
need_checkin = (self.checkin_status == 0 or
self.checkin_status is None or
self.checkin_time is None)
# 判断下班打卡状态
need_checkout = (self.checkout_status == 0 or
self.checkout_status is None or
self.checkout_time is None)
# 检查是否在上班打卡窗口内
in_first_window = self._is_in_window(current_minutes, first_start, first_end)
# 检查是否在下班打卡窗口内
in_second_window = self._is_in_window(current_minutes, second_start, second_end)
# 如果需要上班打卡且在上班窗口内
if need_checkin and in_first_window:
return 'first'
# 如果上班已打卡,需要下班打卡,且在下班窗口内
if not need_checkin and need_checkout and in_second_window:
return 'second'
# 两次都已打卡
if not need_checkin and not need_checkout:
return 'already_done'
# 不在对应的打卡窗口内
return None
def _show_checkin_windows(self):
"""显示打卡时间窗口和打卡状态信息"""
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
first_start_str = self._minutes_to_time(first_start)
first_end_str = self._minutes_to_time(first_end)
second_start_str = self._minutes_to_time(second_start)
second_end_str = self._minutes_to_time(second_end)
logger.info(f"\n📅 打卡时间窗口:")
logger.info(f" 🌅 上班: {first_start_str} - {first_end_str}")
logger.info(f" 🌆 下班: {second_start_str} - {second_end_str}")
# 显示打卡状态
logger.info(f"\n📊 打卡状态:")
# 上班打卡状态
if self.checkin_status == 1 and self.checkin_time:
logger.info(f" ✅ 上班打卡: 已完成 ({self.checkin_time})")
else:
logger.info(f" ⏰ 上班打卡: 未完成")
# 下班打卡状态
if self.checkout_status == 1 and self.checkout_time:
logger.info(f" ✅ 下班打卡: 已完成 ({self.checkout_time})")
else:
logger.info(f" ⏰ 下班打卡: 未完成")
# 显示当前时间和状态
now = datetime.now()
current_time_str = now.strftime('%H:%M:%S')
logger.info(f"\n🕐 当前时间: {current_time_str}")
if self.is_card_time:
logger.info(f" ✅ 当前在打卡时间内")
else:
logger.info(f" ⏰ 当前不在打卡时间内")
def face_detect(self, checkin_type: str) -> Optional[Dict]:
"""人脸识别验证"""
try:
url = f"{self.base_url}/recognition/faceDetect"
if not os.path.exists(self.photo_file):
logger.error(f"❌ 照片文件不存在: {self.photo_file}")
return None
with open(self.photo_file, 'r', encoding='utf-8') as f:
image_data = f.read().strip()
data = {"image": image_data}
return self._request_with_retry(url, data, checkin_type)
except Exception as e:
logger.error(f"❌ 人脸识别异常: {str(e)}")
return None
def save_attendance(self, checkin_type: str) -> bool:
"""保存打卡记录"""
try:
url = f"{self.base_url}/Management/saveAttendanceManagement"
with open(self.photo_file, 'r', encoding='utf-8') as f:
image_data = f.read().strip()
is_off = 0 if checkin_type == 'first' else 1
update_card = 0 if checkin_type == 'first' else 1
data = {
"itemId": self.item_id,
"jobDateId": self.job_date_id,
"addressName": self.address_name,
"isLegwork": 0,
"legworkState": "",
"isOff": is_off,
"imageId": "",
"updateCard": update_card,
"image": image_data,
"longitude": self.longitude,
"latitude": self.latitude,
"userId": ""
}
result = self._request_with_retry(url, data, checkin_type)
return bool(result)
except Exception as e:
logger.error(f"❌ 保存记录异常: {str(e)}")
return False
def do_checkin(self, checkin_type: str) -> Tuple[bool, str]:
"""执行打卡流程,返回(成功与否, 结果消息)"""
checkin_name = '上班' if checkin_type == 'first' else '下班'
checkin_emoji = '🌅' if checkin_type == 'first' else '🌆'
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
display_name = self.remark if self.remark else self.user_name
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
first_window = f"{self._minutes_to_time(first_start)} - {self._minutes_to_time(first_end)}"
second_window = f"{self._minutes_to_time(second_start)} - {self._minutes_to_time(second_end)}"
msg_header = f"👤 {display_name}\n📱 {self.user_mobile}\n📍 {self.address_name}\n{self.start_time}-{self.end_time}{' (跨天)' if self.is_cross_day else ''}\n\n🌅 上班: {first_window}\n🌆 下班: {second_window}\n\n"
logger.info(f"\n{checkin_emoji} [{display_name}] {checkin_name}打卡中...")
face_result = self.face_detect(checkin_type)
if not face_result:
logger.error(f"❌ [{display_name}] {checkin_name}打卡失败")
return False, f"{msg_header}{current_time}\n{checkin_name}失败"
time.sleep(1)
success = self.save_attendance(checkin_type)
if success:
logger.info(f"✅ [{display_name}] {checkin_name}打卡成功")
return True, f"{msg_header}{current_time}\n{checkin_name}成功"
else:
logger.error(f"❌ [{display_name}] {checkin_name}打卡失败")
return False, f"{msg_header}{current_time}\n{checkin_name}失败"
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='小程序自动打卡脚本 - 多账号版本')
parser.add_argument('--type', choices=['first', 'second'], help='立即执行指定类型的打卡')
parser.add_argument('--config', default='accounts.json', help='配置文件路径')
args = parser.parse_args()
try:
# 加载所有账号配置
accounts = ConfigLoader.load_accounts()
if not accounts:
error_msg = "❌ 未找到任何账号配置,请检查 accounts.json 或环境变量 hxza_dk"
logger.error(error_msg)
if NOTIFY_ENABLED:
notify_send("❌ 小程序打卡配置错误", error_msg)
exit(1)
logger.info(f"\n📋 共加载 {len(accounts)} 个账号")
# 统计结果
total_count = len(accounts)
success_count = 0
fail_count = 0
skip_count = 0
all_messages = []
# 循环处理每个账号
for i, account in enumerate(accounts, 1):
try:
logger.info(f"\n{'='*60}")
logger.info(f"📌 处理账号 {i}/{total_count}")
logger.info(f"{'='*60}")
checkin = AutoCheckIn(account)
if not checkin.initialize():
fail_count += 1
display_name = account.name if account.name else "未命名"
all_messages.append(f"❌ [{display_name}] 初始化失败")
continue
success = False
msg = ""
if args.type:
# 强制执行指定类型的打卡(忽略智能判断)
logger.info(f"\n⚠️ 强制执行 {'上班' if args.type == 'first' else '下班'} 打卡")
success, msg = checkin.do_checkin(args.type)
else:
# 智能判断打卡类型
checkin_type = checkin._get_current_checkin_type()
if checkin_type == 'first':
# 需要上班打卡
logger.info(f"\n🎯 智能判断: 需要执行上班打卡")
success, msg = checkin.do_checkin('first')
elif checkin_type == 'second':
# 需要下班打卡
logger.info(f"\n🎯 智能判断: 需要执行下班打卡")
success, msg = checkin.do_checkin('second')
elif checkin_type == 'already_done':
# 已完成所有打卡 - 只记录日志,不生成通知消息
skip_count += 1
display_name = account.name if account.name else checkin.user_name
logger.info(f"\n✅ [{display_name}] 今日打卡已全部完成")
logger.info(f" 上班打卡: {checkin.checkin_time}")
logger.info(f" 下班打卡: {checkin.checkout_time}")
# 不设置 msg跳过的账号不加入通知消息
else:
# 不在打卡时间内 - 只记录日志,不生成通知消息
skip_count += 1
display_name = account.name if account.name else checkin.user_name
logger.info(f"\n⏰ [{display_name}] 当前不在打卡时间内")
# 不设置 msg跳过的账号不加入通知消息
if success:
success_count += 1
# 只有成功和失败的消息才加入通知列表
if msg:
all_messages.append(msg)
elif msg:
# 打卡失败
fail_count += 1
all_messages.append(msg)
# 账号之间间隔,避免请求过快
if i < total_count:
time.sleep(2)
except Exception as e:
fail_count += 1
display_name = account.name if account.name else "未命名"
error_msg = f"❌ [{display_name}] 处理异常: {str(e)}"
logger.error(error_msg)
all_messages.append(error_msg)
# 汇总结果
logger.info(f"\n{'='*60}")
logger.info(f"📊 执行结果汇总")
logger.info(f"{'='*60}")
logger.info(f" 总账号数: {total_count}")
logger.info(f" ✅ 成功: {success_count}")
logger.info(f" ❌ 失败: {fail_count}")
logger.info(f" ⏰ 跳过: {skip_count}")
# 发送汇总通知
if NOTIFY_ENABLED:
# 构建统计摘要
summary = f"📊 总计: {total_count} | ✅ 成功: {success_count} | ❌ 失败: {fail_count} | ⏰ 跳过: {skip_count}"
# 如果有详细消息,追加到摘要后面
if all_messages:
summary += "\n\n" + "\n\n".join(all_messages)
# 根据结果确定通知标题
if fail_count > 0:
title = f"⚠️ 小程序打卡完成 (有失败)"
elif success_count > 0:
title = f"✅ 小程序打卡完成"
elif skip_count == total_count:
title = f"⏰ 小程序打卡提醒 (全部跳过)"
else:
title = f"📊 小程序打卡执行完成"
notify_send(title, summary)
logger.info(f"\n📢 通知已发送: {title}")
else:
logger.warning("\n⚠️ 通知模块未启用,跳过通知推送")
# 如果有失败的账号,返回失败状态
exit(0 if fail_count == 0 else 1)
except KeyboardInterrupt:
logger.info("\n⚠️ 用户中断程序")
exit(1)
except Exception as e:
error_msg = f"❌ 程序异常: {str(e)}"
logger.error(error_msg)
if NOTIFY_ENABLED:
notify_send("❌ 小程序打卡异常", error_msg)
exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,819 @@
"""
小程序自动打卡脚本 - 多账号版本(优化版)
主要改动:
- 支持自定义配置文件路径,全程使用同一配置文件(含自动登录后 token 回写)
- 避免重复请求用户详情接口,只请求一次同时做 token 校验和用户信息获取
- 照片文件只读取一次并缓存,减少磁盘 IO
- 精简并统一网络重试逻辑,重试次数和间隔可配置
- 日志信息更清晰,关键异常使用 logger.exception 方便排查
"""
import argparse
import json
import logging
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import requests
# ========= 常量与全局配置 =========
BASE_URL = "https://erp.baian.tech/baian-admin"
APP_ID = "wxa81d5d83880ea6b6"
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 "
"MicroMessenger/8.0.65(0x1800412e) NetType/WIFI"
)
ENV_VAR_NAME = "hxza_dk"
DEFAULT_CONFIG_FILE = "accounts.json"
DEFAULT_PHOTO_FILE = ""
REQUEST_TIMEOUT = 15
MAX_RETRIES = 3
ACCOUNT_DELAY_SECONDS = 2
# 为了兼容青龙,尝试导入通知模块
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
from notify import send as notify_send # type: ignore
NOTIFY_ENABLED = True
except ImportError:
try:
from sendNotify import send as notify_send # type: ignore
NOTIFY_ENABLED = True
except ImportError:
NOTIFY_ENABLED = False
def notify_send(title: str, content: str) -> None:
pass
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
# ========= 配置相关 =========
@dataclass
class AccountConfig:
"""账号配置"""
name: str
token: str
photo: Optional[str] = None
enabled: bool = True
@classmethod
def from_dict(cls, data: Dict) -> "AccountConfig":
return cls(
name=data.get("name", "") or "",
token=data.get("token", "") or "",
photo=data.get("photo"),
enabled=bool(data.get("enabled", True)),
)
@classmethod
def from_env_string(cls, env_str: str) -> "AccountConfig":
"""
从环境变量字符串创建配置
支持格式:
1) 备注名#token#photo文件名
示例:张三##photo.txt token 为空时自动登录)
2) token#备注名(兼容旧版)
示例eyJhbGc...xyz#张三
3) 只有 token无备注名
"""
parts = [p.strip() for p in env_str.split("#")]
if len(parts) >= 3:
# 新格式: 备注名#token#photo文件名
name, token, photo = parts[0], parts[1], parts[2]
photo = photo or None
return cls(name=name, token=token, photo=photo)
if len(parts) == 2:
# 旧格式: token#备注名
token, name = parts[0], parts[1]
return cls(name=name, token=token, photo=None)
# 只有 token
token = parts[0] if parts else ""
return cls(name="", token=token, photo=None)
def load_accounts_from_file(config_path: str) -> List[AccountConfig]:
"""从配置文件加载账号"""
if not os.path.exists(config_path):
return []
try:
with open(config_path, "r", encoding="utf-8") as f:
data = json.load(f)
accounts: List[AccountConfig] = []
for account_data in data.get("accounts", []):
account = AccountConfig.from_dict(account_data)
if account.enabled:
accounts.append(account)
return accounts
except Exception as e:
logger.exception(f"❌ 加载配置文件失败: {config_path} - {e}")
return []
def load_accounts_from_env() -> List[AccountConfig]:
"""从环境变量加载账号"""
env_value = os.getenv(ENV_VAR_NAME, "") or ""
if not env_value.strip():
return []
accounts: List[AccountConfig] = []
for account_str in env_value.split("&"):
account_str = account_str.strip()
if not account_str:
continue
accounts.append(AccountConfig.from_env_string(account_str))
return accounts
def load_all_accounts(config_path: str) -> List[AccountConfig]:
"""加载所有账号配置(配置文件优先)"""
accounts = load_accounts_from_file(config_path)
if accounts:
logger.info(f"✅ 从配置文件加载账号: {config_path}")
return accounts
accounts = load_accounts_from_env()
if accounts:
logger.info(f"✅ 从环境变量 {ENV_VAR_NAME} 加载账号")
return accounts
# ========= 打卡逻辑 =========
class AutoCheckIn:
"""单账号自动打卡"""
def __init__(self, account: AccountConfig, config_path: str):
self.account = account
self.config_path = config_path
self.base_url = BASE_URL
self.app_id = APP_ID
self.token: str = account.token or ""
self.remark: str = account.name
self.photo_file: Optional[str] = account.photo
self.session = requests.Session()
# 用户信息
self.user_name: Optional[str] = None
self.user_mobile: Optional[str] = None
# 班次信息
self.start_time: Optional[str] = None
self.end_time: Optional[str] = None
self.is_cross_day: bool = False
# 打卡状态
self.is_card_time: bool = False
self.checkin_status: Optional[int] = None
self.checkin_time: Optional[str] = None
self.checkout_status: Optional[int] = None
self.checkout_time: Optional[str] = None
# 其他参数(接口返回)
self.item_id: Optional[str] = None
self.job_date_id: Optional[str] = None
self.job_id: Optional[str] = None
self.address_name: Optional[str] = None
self.longitude: Optional[float] = None
self.latitude: Optional[float] = None
# 照片数据缓存
self._image_data: Optional[str] = None
# ---- HTTP & 工具方法 ----
@property
def headers(self) -> Dict[str, str]:
return {
"Host": "erp.baian.tech",
"Connection": "keep-alive",
"appId": self.app_id,
"content-type": "application/json",
"wxAppKeyCompanyId": self.app_id,
"token": self.token,
"Accept-Encoding": "gzip,compress,br,deflate",
"User-Agent": DEFAULT_USER_AGENT,
"Referer": f"https://servicewechat.com/{self.app_id}/102/page-frame.html",
}
def _post_with_retry(self, url: str, data: Dict, max_retries: int = MAX_RETRIES) -> Optional[Dict]:
"""带重试的 POST 请求(自己控制重试,不依赖 HTTPAdapter"""
for attempt in range(1, max_retries + 1):
try:
resp = self.session.post(url, json=data, headers=self.headers, timeout=REQUEST_TIMEOUT)
if resp.status_code != 200:
if attempt < max_retries:
logger.warning(f"⚠️ HTTP {resp.status_code}, 重试 {attempt}/{max_retries}")
time.sleep(2**attempt)
continue
logger.error(f"❌ 请求失败: HTTP {resp.status_code}")
return None
result = resp.json()
if result.get("code") != 200:
logger.error(f"❌ 接口返回错误: {result.get('msg')}")
return None
return result
except requests.exceptions.Timeout:
if attempt < max_retries:
logger.warning(f"⚠️ 请求超时, 重试 {attempt}/{max_retries}")
time.sleep(2**attempt)
continue
logger.error("❌ 请求超时")
return None
except requests.exceptions.RequestException as e:
if attempt < max_retries:
logger.warning(f"⚠️ 请求异常 {e}, 重试 {attempt}/{max_retries}")
time.sleep(2**attempt)
continue
logger.exception(f"❌ 请求异常: {e}")
return None
except Exception as e:
logger.exception(f"❌ 未知异常: {e}")
return None
return None
def _load_image_data(self) -> bool:
"""读取并缓存照片文件内容"""
if self._image_data is not None:
return True
try:
if not self.photo_file:
logger.error("❌ 未配置照片文件,请在配置中指定 photo")
return False
if not os.path.exists(self.photo_file):
logger.error(f"❌ 照片文件不存在: {self.photo_file}")
return False
with open(self.photo_file, "r", encoding="utf-8") as f:
image_data = f.read().strip()
if not image_data:
logger.error("❌ 照片文件内容为空")
return False
self._image_data = image_data
logger.info("✅ 照片文件读取成功")
return True
except Exception as e:
logger.exception(f"❌ 读取照片文件异常: {e}")
return False
# ---- 账号初始化相关 ----
def _update_token_in_config(self, new_token: str) -> None:
"""自动登录后回写 token 到配置文件"""
if not self.config_path or not os.path.exists(self.config_path):
# 环境变量模式或无配置文件时忽略
return
try:
logger.info(f"\n🔄 更新配置文件中的 token: {self.config_path}")
with open(self.config_path, "r", encoding="utf-8") as f:
config = json.load(f)
updated = False
for account in config.get("accounts", []):
if account.get("name", "") == self.account.name:
account["token"] = new_token
updated = True
break
if not updated:
logger.warning(f"⚠️ 配置文件中未找到账号 [{self.account.name}],未更新 token")
return
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
logger.info("✅ 配置文件 token 已更新")
except PermissionError:
logger.warning("⚠️ 没有权限写入配置文件,跳过 token 回写")
except Exception as e:
logger.exception(f"❌ 更新配置文件失败: {e}")
def _auto_login(self) -> bool:
"""使用人脸识别接口自动登录获取新 token"""
logger.info("🔐 Token 无效,尝试自动登录...")
if not self._load_image_data():
return False
url = f"{self.base_url}/recognition/faceSearch"
data = {"image": self._image_data}
# 登录时不携带 token
headers = self.headers.copy()
headers["token"] = ""
try:
resp = self.session.post(url, json=data, headers=headers, timeout=REQUEST_TIMEOUT)
if resp.status_code != 200:
logger.error(f"❌ 自动登录请求失败: HTTP {resp.status_code}")
return False
result = resp.json()
if result.get("code") != 200:
logger.error(f"❌ 自动登录失败: {result.get('msg')}")
return False
data = result.get("data", {}) or {}
token = data.get("token")
expire = data.get("expire", 0)
if not token:
logger.error("❌ 自动登录未返回 token")
return False
self.token = token
self.account.token = token
expire_days = expire // 86400 if expire else 0
expire_dt = datetime.now() + timedelta(seconds=expire or 0)
logger.info("✅ 自动登录成功")
logger.info(f" 🔑 Token: {token[:20]}...{token[-10:]}")
logger.info(f" ⏰ 有效期: {expire_days} 天, 过期时间: {expire_dt:%Y-%m-%d %H:%M:%S}")
self._update_token_in_config(token)
return True
except Exception as e:
logger.exception(f"❌ 自动登录异常: {e}")
return False
def _fetch_user_info(self, allow_auto_login: bool = True) -> bool:
"""获取用户信息;若 token 失效可选择自动登录一次"""
url = f"{self.base_url}/userH5/getMyDetails"
try:
resp = self.session.get(url, headers=self.headers, timeout=REQUEST_TIMEOUT)
if resp.status_code != 200:
logger.warning(f"⚠️ 获取用户信息失败 HTTP {resp.status_code}")
if allow_auto_login and self._auto_login():
return self._fetch_user_info(allow_auto_login=False)
return False
result = resp.json()
if result.get("code") != 200:
logger.warning(f"⚠️ 获取用户信息失败: {result.get('msg')}")
if allow_auto_login and self._auto_login():
return self._fetch_user_info(allow_auto_login=False)
return False
data = result.get("data", {}) or {}
self.user_name = data.get("name", "")
self.user_mobile = data.get("mobile", "")
return True
except Exception as e:
logger.exception(f"❌ 获取用户信息异常: {e}")
if allow_auto_login and self._auto_login():
return self._fetch_user_info(allow_auto_login=False)
return False
def _fetch_job_details(self) -> bool:
"""获取班次信息itemId/jobDateId/经纬度/班次时间/打卡状态等)"""
url = f"{self.base_url}/Management/getJobUserDetails"
try:
resp = self.session.get(url, params={"userId": ""}, headers=self.headers, timeout=REQUEST_TIMEOUT)
if resp.status_code != 200:
logger.error(f"❌ 获取班次信息失败: HTTP {resp.status_code}")
return False
result = resp.json()
if result.get("code") != 200:
logger.error(f"❌ 获取班次信息失败: {result.get('msg')}")
return False
data = result.get("data", {}) or {}
self.item_id = data.get("itemId")
self.job_id = data.get("jobId")
self.is_card_time = bool(data.get("isCardTime", False))
job_date_dtos = data.get("jobDateDTOS", []) or []
if job_date_dtos:
job_date = job_date_dtos[0] or {}
self.job_date_id = job_date.get("jobDateId")
self.start_time = job_date.get("startDate")
self.end_time = job_date.get("endDate")
self.is_cross_day = job_date.get("ismorrow") == 1
self.checkin_status = job_date.get("status")
self.checkin_time = job_date.get("date")
self.checkout_status = job_date.get("statusOff")
self.checkout_time = job_date.get("dateOff")
address_dtos = data.get("addressDTOS", []) or []
if address_dtos:
address = address_dtos[0] or {}
self.address_name = address.get("addressName")
try:
self.longitude = float(address.get("longitude"))
self.latitude = float(address.get("latitude"))
except (TypeError, ValueError):
self.longitude = None
self.latitude = None
required = [
self.item_id,
self.job_date_id,
self.address_name,
self.longitude,
self.latitude,
self.start_time,
self.end_time,
]
if not all(required):
logger.error("❌ 班次信息不完整,无法打卡")
return False
return True
except Exception as e:
logger.exception(f"❌ 获取班次信息异常: {e}")
return False
# ---- 时间窗口与判断 ----
@staticmethod
def _time_to_minutes(time_str: str) -> int:
h, m = map(int, time_str.split(":"))
return h * 60 + m
@staticmethod
def _minutes_to_time(minutes: int) -> str:
h, m = divmod(minutes % 1440, 60)
return f"{h:02d}:{m:02d}"
def _get_checkin_windows(self) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""
根据班次时间计算打卡时间窗口单位分钟0-1439
- 上班窗口:上班时间前 120 分钟内
- 下班窗口:下班后到下一次上班前 121 分钟以内
"""
if not self.start_time or not self.end_time:
raise ValueError("尚未加载班次时间")
start_minutes = self._time_to_minutes(self.start_time)
end_minutes = self._time_to_minutes(self.end_time)
first_start = (start_minutes - 120) % 1440
first_end = start_minutes
second_start = (end_minutes + 1) % 1440
second_end = (start_minutes - 121) % 1440
return (first_start, first_end), (second_start, second_end)
@staticmethod
def _is_in_window(current_minutes: int, start: int, end: int) -> bool:
"""检查当前时间是否在窗口内(考虑跨午夜情况)"""
if start <= end:
return start <= current_minutes <= end
return current_minutes >= start or current_minutes <= end
def _get_current_checkin_type(self) -> Optional[str]:
"""
智能判断应该执行哪种打卡
Returns:
'first' - 需要上班打卡
'second' - 需要下班打卡
'already_done' - 已完成所有打卡
None - 不在打卡时间内或不在对应窗口内
"""
if not self.is_card_time:
return None
now = datetime.now()
current_minutes = now.hour * 60 + now.minute
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
need_checkin = (
self.checkin_status == 0
or self.checkin_status is None
or self.checkin_time is None
)
need_checkout = (
self.checkout_status == 0
or self.checkout_status is None
or self.checkout_time is None
)
in_first_window = self._is_in_window(current_minutes, first_start, first_end)
in_second_window = self._is_in_window(current_minutes, second_start, second_end)
if need_checkin and in_first_window:
return "first"
if (not need_checkin) and need_checkout and in_second_window:
return "second"
if (not need_checkin) and (not need_checkout):
return "already_done"
return None
def _show_checkin_windows(self) -> None:
"""显示打卡时间窗口和状态"""
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
first_start_str = self._minutes_to_time(first_start)
first_end_str = self._minutes_to_time(first_end)
second_start_str = self._minutes_to_time(second_start)
second_end_str = self._minutes_to_time(second_end)
logger.info("\n🕒 打卡时间窗口:")
logger.info(f" ⏰ 上班: {first_start_str} - {first_end_str}")
logger.info(f" ⏰ 下班: {second_start_str} - {second_end_str}")
logger.info("\n📌 打卡状态:")
if self.checkin_status == 1 and self.checkin_time:
logger.info(f" ✅ 上班打卡: 已完成 ({self.checkin_time})")
else:
logger.info(" ❗ 上班打卡: 未完成")
if self.checkout_status == 1 and self.checkout_time:
logger.info(f" ✅ 下班打卡: 已完成 ({self.checkout_time})")
else:
logger.info(" ❗ 下班打卡: 未完成")
now = datetime.now().strftime("%H:%M:%S")
logger.info(f"\n⏱ 当前时间: {now}")
if self.is_card_time:
logger.info(" ✅ 当前在打卡时间内")
else:
logger.info(" ⛔ 当前不在打卡时间内")
# ---- 核心动作 ----
def initialize(self) -> bool:
"""初始化账号信息token 检查/自动登录 + 用户信息 + 班次信息)"""
display_name = self.remark or "未命名"
logger.info("\n" + "=" * 50)
logger.info(f"👤 正在初始化账号: {display_name}")
logger.info("=" * 50)
# token 为空或无效,将在 _fetch_user_info 内自动尝试登录一次
if not self.token:
logger.warning("⚠️ Token 为空,将尝试自动登录")
if not self._fetch_user_info(allow_auto_login=True):
logger.error("❌ 初始化失败:获取用户信息失败")
return False
if not self._fetch_job_details():
logger.error("❌ 初始化失败:获取班次信息失败")
return False
display_name = self.remark or self.user_name or "未命名"
logger.info("\n✅ 账号初始化成功")
logger.info(f" 👤 姓名: {display_name}")
logger.info(f" 📱 手机: {self.user_mobile}")
logger.info(f" 📍 地点: {self.address_name}")
logger.info(
f" 🗓 班次: {self.start_time} - {self.end_time}{' (跨天)' if self.is_cross_day else ''}"
)
self._show_checkin_windows()
return True
def face_detect(self, checkin_type: str) -> Optional[Dict]:
"""人脸识别验证"""
if not self._load_image_data():
return None
url = f"{self.base_url}/recognition/faceDetect"
data = {"image": self._image_data}
return self._post_with_retry(url, data)
def save_attendance(self, checkin_type: str) -> bool:
"""保存打卡记录"""
if not self._load_image_data():
return False
url = f"{self.base_url}/Management/saveAttendanceManagement"
is_off = 0 if checkin_type == "first" else 1
update_card = 0 if checkin_type == "first" else 1
data = {
"itemId": self.item_id,
"jobDateId": self.job_date_id,
"addressName": self.address_name,
"isLegwork": 0,
"legworkState": "",
"isOff": is_off,
"imageId": "",
"updateCard": update_card,
"image": self._image_data,
"longitude": self.longitude,
"latitude": self.latitude,
"userId": "",
}
result = self._post_with_retry(url, data)
return bool(result)
def do_checkin(self, checkin_type: str) -> Tuple[bool, str]:
"""执行打卡流程,返回 (成功与否, 结果消息)"""
checkin_name = "上班" if checkin_type == "first" else "下班"
emoji = "🌞" if checkin_type == "first" else "🌙"
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
display_name = self.remark or self.user_name or "未命名"
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
first_window = f"{self._minutes_to_time(first_start)} - {self._minutes_to_time(first_end)}"
second_window = f"{self._minutes_to_time(second_start)} - {self._minutes_to_time(second_end)}"
msg_header = (
f"👤 {display_name}\n"
f"📱 {self.user_mobile}\n"
f"📍 {self.address_name}\n"
f"🗓 {self.start_time}-{self.end_time}{' (跨天)' if self.is_cross_day else ''}\n\n"
f"⏰ 上班: {first_window}\n"
f"⏰ 下班: {second_window}\n\n"
)
logger.info(f"\n{emoji} [{display_name}] {checkin_name}打卡中...")
face_result = self.face_detect(checkin_type)
if not face_result:
logger.error(f"❌ [{display_name}] {checkin_name}打卡失败(人脸识别失败)")
return False, f"{msg_header}🕒 {current_time}\n{checkin_name}失败"
time.sleep(1)
success = self.save_attendance(checkin_type)
if success:
logger.info(f"✅ [{display_name}] {checkin_name}打卡成功")
return True, f"{msg_header}🕒 {current_time}\n{checkin_name}成功"
logger.error(f"❌ [{display_name}] {checkin_name}打卡失败(保存记录失败)")
return False, f"{msg_header}🕒 {current_time}\n{checkin_name}失败"
# ========= 主流程 =========
def run() -> int:
parser = argparse.ArgumentParser(description="小程序自动打卡脚本 - 多账号版本(优化版)")
parser.add_argument("--type", choices=["first", "second"], help="立即执行指定类型的打卡")
parser.add_argument(
"--config",
default=DEFAULT_CONFIG_FILE,
help="配置文件路径(默认 accounts.json",
)
args = parser.parse_args()
try:
accounts = load_all_accounts(args.config)
if not accounts:
msg = f"❌ 未找到任何账号配置,请检查 {args.config} 或环境变量 {ENV_VAR_NAME}"
logger.error(msg)
if NOTIFY_ENABLED:
notify_send("❌ 小程序打卡配置错误", msg)
return 1
total = len(accounts)
logger.info(f"\n✅ 共加载 {total} 个账号")
success_count = 0
fail_count = 0
skip_count = 0
all_messages: List[str] = []
for i, account in enumerate(accounts, 1):
display_name = account.name or "未命名"
logger.info("\n" + "=" * 60)
logger.info(f"👤 处理账号 {i}/{total}: {display_name}")
logger.info("=" * 60)
try:
checker = AutoCheckIn(account, config_path=args.config)
if not checker.initialize():
fail_count += 1
all_messages.append(f"❌ [{display_name}] 初始化失败")
continue
success = False
msg = ""
if args.type:
logger.info(
f"\n🎯 强制执行 {'上班' if args.type == 'first' else '下班'} 打卡"
)
success, msg = checker.do_checkin(args.type)
else:
ctype = checker._get_current_checkin_type()
if ctype == "first":
logger.info("\n🤖 智能判断: 需要执行上班打卡")
success, msg = checker.do_checkin("first")
elif ctype == "second":
logger.info("\n🤖 智能判断: 需要执行下班打卡")
success, msg = checker.do_checkin("second")
elif ctype == "already_done":
skip_count += 1
logger.info(f"\n✅ [{display_name}] 今日打卡已全部完成")
logger.info(f" 上班打卡: {checker.checkin_time}")
logger.info(f" 下班打卡: {checker.checkout_time}")
else:
skip_count += 1
logger.info(f"\n [{display_name}] 当前不在打卡时间内")
if success:
success_count += 1
if msg:
all_messages.append(msg)
elif msg:
fail_count += 1
all_messages.append(msg)
if i < total:
time.sleep(ACCOUNT_DELAY_SECONDS)
except Exception as e:
fail_count += 1
logger.exception(f"❌ 处理账号 [{display_name}] 异常: {e}")
all_messages.append(f"❌ [{display_name}] 处理异常: {e}")
logger.info("\n" + "=" * 60)
logger.info("📊 执行结果汇总")
logger.info("=" * 60)
logger.info(f" 总账号数: {total}")
logger.info(f" ✅ 成功: {success_count}")
logger.info(f" ❌ 失败: {fail_count}")
logger.info(f" ⏭ 跳过: {skip_count}")
if NOTIFY_ENABLED:
# 全部账号都被跳过时不发送任何通知
if success_count == 0 and fail_count == 0 and skip_count == total:
logger.info("\n 所有账号均被跳过,不发送通知")
else:
summary = (
f"📊 总计: {total} | ✅ 成功: {success_count} | "
f"❌ 失败: {fail_count} | ⏭ 跳过: {skip_count}"
)
if all_messages:
summary += "\n\n" + "\n\n".join(all_messages)
if fail_count > 0:
title = "⚠️ 小程序打卡完成(有失败)"
elif success_count > 0:
title = "✅ 小程序打卡完成"
else:
title = "📊 小程序打卡执行完成"
notify_send(title, summary)
logger.info(f"\n📣 通知已发送: {title}")
else:
logger.warning("\n⚠️ 通知模块未启用,跳过通知推送")
return 0 if fail_count == 0 else 1
except KeyboardInterrupt:
logger.info("\n⛔ 用户中断程序")
return 1
except Exception as e:
logger.exception(f"❌ 程序异常: {e}")
if NOTIFY_ENABLED:
notify_send("❌ 小程序打卡异常", str(e))
return 1
if __name__ == "__main__":
raise SystemExit(run())

View File

@@ -0,0 +1,800 @@
"""
小程序自动打卡脚本(支持自动登录)
=== 配置说明 ===
环境变量配置(青龙面板在"环境变量"中添加):
变量名: hxza_dk
变量值: token 或 token#备注 或 空(自动登录)
示例: eyJhbGc...xyz 或 eyJhbGc...xyz#张三 或 留空
说明:
- token: 登录凭证(可选,如果为空或失效会自动登录)
- 备注: 可选,用于日志标识(如果不提供,则显示从接口获取的姓名)
- 用户信息(姓名、手机号)会自动从接口获取
- 班次信息(上下班时间、打卡地点、经纬度)会自动从接口获取
- 打卡时间窗口会根据班次时间自动计算
- Token 失效时会自动使用照片登录获取新 token
文件配置:
- photo.txt : 打卡照片的 base64 数据(必须与脚本同目录)
"""
import requests
import time
import os
import sys
import logging
from datetime import datetime
from typing import Dict, Optional, Tuple
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 添加父目录到路径,以便导入通知模块
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 导入通知模块
try:
from notify import send as notify_send
NOTIFY_ENABLED = True
except ImportError:
try:
from sendNotify import send as notify_send
NOTIFY_ENABLED = True
except ImportError:
NOTIFY_ENABLED = False
def notify_send(title, content):
pass
# 配置日志(青龙面板只输出到控制台)
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class AutoCheckIn:
"""自动打卡类"""
def __init__(self):
self.base_url = "https://erp.baian.tech/baian-admin"
# 固定参数(必须在最前面定义,因为自动登录需要用到)
self.app_id = "wxa81d5d83880ea6b6"
self.photo_file = "photo.txt"
# 解析环境变量
config = os.getenv('hxza_dk', '')
parts = config.split('#') if config else ['']
self.token = parts[0].strip()
self.remark = parts[1].strip() if len(parts) > 1 else ""
# 配置带重试的 session
self.session = self._create_session()
# 如果 token 为空或失效,自动登录获取新 token
if not self.token or not self._check_token_valid():
logger.info("\n🔐 Token 无效或为空,尝试自动登录...")
if not self._auto_login():
error_msg = "❌ 自动登录失败"
logger.error(error_msg)
raise ValueError(error_msg)
# 动态获取的用户信息
self.user_name = None
self.user_mobile = None
# 动态获取的班次时间
self.start_time = None # 上班时间(如 "23:00"
self.end_time = None # 下班时间(如 "07:00"
self.is_cross_day = False # 是否跨天
# 动态参数(从班次信息接口获取)
self.item_id = None
self.job_date_id = None
self.job_id = None
self.address_name = None
self.longitude = None
self.latitude = None
# 更新 headers使用最新的 token
self.headers = {
'Host': 'erp.baian.tech',
'Connection': 'keep-alive',
'appId': self.app_id,
'content-type': 'application/json',
'wxAppKeyCompanyId': self.app_id,
'token': self.token,
'Accept-Encoding': 'gzip,compress,br,deflate',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI',
'Referer': f"https://servicewechat.com/{self.app_id}/102/page-frame.html"
}
# 初始化时获取用户信息
if not self._fetch_user_info():
raise ValueError("❌ 获取用户信息失败")
# 初始化时获取班次信息
if not self._fetch_job_details():
raise ValueError("❌ 获取班次信息失败")
# 显示打卡时间窗口
self._show_checkin_windows()
def _create_session(self) -> requests.Session:
"""创建带重试机制的 session"""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["POST", "GET"]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)
session.mount('http://', adapter)
return session
def _check_token_valid(self) -> bool:
"""检查 token 是否有效"""
try:
# 尝试获取用户信息来验证 token
url = f"{self.base_url}/userH5/getMyDetails"
headers = {
'Host': 'erp.baian.tech',
'Connection': 'keep-alive',
'appId': self.app_id,
'content-type': 'application/json',
'wxAppKeyCompanyId': self.app_id,
'token': self.token,
'Accept-Encoding': 'gzip,compress,br,deflate',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI',
'Referer': f"https://servicewechat.com/{self.app_id}/102/page-frame.html"
}
response = self.session.get(url, headers=headers, timeout=15)
if response.status_code != 200:
return False
result = response.json()
return result.get('code') == 200
except Exception:
return False
def _update_ql_env(self, token: str) -> bool:
"""
更新青龙面板环境变量配置文件
Args:
token: 新的 token
Returns:
成功返回 True失败返回 False
"""
try:
# 青龙面板环境变量配置文件路径
ql_env_paths = [
'/ql/data/config/config.sh', # 青龙面板默认路径
# '/ql/config/env.sh', # 旧版青龙面板路径
os.path.expanduser('~/ql/data/config/env.sh'), # 用户目录
]
env_file = None
for path in ql_env_paths:
if os.path.exists(path):
env_file = path
break
if not env_file:
logger.warning("⚠️ 未找到青龙面板环境变量配置文件,跳过自动更新")
logger.info("💡 请手动更新环境变量 hxza_dk")
return False
logger.info(f"\n📝 正在更新青龙面板环境变量...")
logger.info(f" 📄 配置文件: {env_file}")
# 读取配置文件
with open(env_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 构建新的环境变量值
new_value = f'{token}#{self.remark}' if self.remark else token
# 查找并更新 hxza_dk 变量
updated = False
for i, line in enumerate(lines):
# 匹配 export hxza_dk="..." 或 export hxza_dk='...'
if line.strip().startswith('export hxza_dk=') or line.strip().startswith('hxza_dk='):
lines[i] = f'export hxza_dk="{new_value}"\n'
updated = True
break
# 如果没有找到,则添加到文件末尾
if not updated:
# 确保文件末尾有换行符
if lines and not lines[-1].endswith('\n'):
lines[-1] += '\n'
lines.append(f'export hxza_dk="{new_value}"\n')
# 写回配置文件
with open(env_file, 'w', encoding='utf-8') as f:
f.writelines(lines)
logger.info(f"✅ 环境变量已更新")
logger.info(f" 变量名: hxza_dk")
logger.info(f" 变量值: {token[:20]}...{token[-10:]}{('#' + self.remark) if self.remark else ''}")
return True
except PermissionError:
logger.warning("⚠️ 没有权限写入配置文件,跳过自动更新")
logger.info("💡 请手动更新环境变量 hxza_dk 或使用 root 权限运行")
return False
except Exception as e:
logger.warning(f"⚠️ 更新环境变量失败: {str(e)}")
logger.info("💡 请手动更新环境变量 hxza_dk")
return False
def _auto_login(self) -> bool:
"""
使用照片自动登录获取新 token
Returns:
成功返回 True失败返回 False
"""
try:
logger.info("🔍 正在读取照片文件...")
# 读取照片文件
if not os.path.exists(self.photo_file):
logger.error(f"❌ 照片文件不存在: {self.photo_file}")
return False
with open(self.photo_file, 'r', encoding='utf-8') as f:
image_data = f.read().strip()
if not image_data:
logger.error(f"❌ 照片文件为空")
return False
logger.info(f"✅ 照片文件读取成功")
# 发送登录请求
logger.info("🔐 正在发送登录请求...")
url = f"{self.base_url}/recognition/faceSearch"
data = {"image": image_data}
headers = {
'Host': 'erp.baian.tech',
'Connection': 'keep-alive',
'appId': self.app_id,
'content-type': 'application/json',
'wxAppKeyCompanyId': self.app_id,
'token': '', # 登录时不需要 token
'Accept-Encoding': 'gzip,compress,br,deflate',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'Referer': f"https://servicewechat.com/{self.app_id}/102/page-frame.html"
}
response = self.session.post(
url,
json=data,
headers=headers,
timeout=30
)
if response.status_code != 200:
logger.error(f"❌ 登录请求失败: HTTP {response.status_code}")
return False
result = response.json()
if result.get('code') != 200:
logger.error(f"❌ 登录失败: {result.get('msg')}")
return False
# 解析返回数据
data = result.get('data', {})
token = data.get('token')
expire = data.get('expire') # 秒数1296000 = 15天
if not token:
logger.error(f"❌ 未获取到 token")
return False
# 更新 token
self.token = token
# 计算过期时间
from datetime import timedelta
expire_date = datetime.now() + timedelta(seconds=expire)
expire_str = expire_date.strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"✅ 自动登录成功!")
logger.info(f" 🎫 Token: {token[:20]}...{token[-10:]}")
logger.info(f" ⏰ 有效期: {expire // 86400}")
logger.info(f" 📅 过期时间: {expire_str}")
# 尝试更新青龙面板环境变量配置文件
self._update_ql_env(token)
return True
except Exception as e:
logger.error(f"❌ 自动登录异常: {str(e)}")
return False
def _fetch_user_info(self) -> bool:
"""获取用户信息(姓名、手机号等)"""
try:
logger.info("\n🔍 正在获取用户信息...")
url = f"{self.base_url}/userH5/getMyDetails"
response = self.session.get(url, headers=self.headers, timeout=15)
if response.status_code != 200:
logger.error(f"❌ 获取用户信息失败: HTTP {response.status_code}")
return False
result = response.json()
if result.get('code') != 200:
logger.error(f"❌ 获取用户信息失败: {result.get('msg')}")
return False
data = result.get('data', {})
self.user_name = data.get('name', '')
self.user_mobile = data.get('mobile', '')
# 显示备注或姓名
display_name = self.remark if self.remark else self.user_name
logger.info(f"✅ 用户信息获取成功")
logger.info(f" 👤 姓名: {display_name}")
logger.info(f" 📱 手机: {self.user_mobile}")
return True
except Exception as e:
logger.error(f"❌ 获取用户信息异常: {str(e)}")
return False
def _fetch_job_details(self) -> bool:
"""获取班次信息itemId、jobDateId、经纬度、班次时间等"""
try:
logger.info("\n🔍 正在获取班次信息...")
url = f"{self.base_url}/Management/getJobUserDetails"
params = {"userId": ""}
response = self.session.get(url, params=params, headers=self.headers, timeout=15)
if response.status_code != 200:
logger.error(f"❌ 获取班次信息失败: HTTP {response.status_code}")
return False
result = response.json()
if result.get('code') != 200:
logger.error(f"❌ 获取班次信息失败: {result.get('msg')}")
return False
data = result.get('data', {})
# 提取 itemId 和 jobId
self.item_id = data.get('itemId')
self.job_id = data.get('jobId')
# 提取班次时段信息
job_date_dtos = data.get('jobDateDTOS', [])
if job_date_dtos:
job_date = job_date_dtos[0]
self.job_date_id = job_date.get('jobDateId')
self.start_time = job_date.get('startDate') # 如 "23:00"
self.end_time = job_date.get('endDate') # 如 "07:00"
self.is_cross_day = job_date.get('ismorrow') == 1 # 是否跨天
# 提取地址信息(经纬度)
address_dtos = data.get('addressDTOS', [])
if address_dtos:
address = address_dtos[0]
self.address_name = address.get('addressName')
self.longitude = float(address.get('longitude'))
self.latitude = float(address.get('latitude'))
# 验证必需参数
if not all([self.item_id, self.job_date_id, self.address_name, self.longitude, self.latitude, self.start_time, self.end_time]):
logger.error(f"❌ 班次信息不完整")
return False
logger.info(f"✅ 班次信息获取成功")
logger.info(f" 📍 打卡地点: {self.address_name}")
logger.info(f" 🌐 经纬度: {self.longitude}, {self.latitude}")
logger.info(f" ⏰ 班次时间: {self.start_time} - {self.end_time}{' (跨天)' if self.is_cross_day else ''}")
# logger.info(f" 🆔 项目ID: {self.item_id}")
# logger.info(f" 🆔 班次ID: {self.job_id}")
# logger.info(f" 🆔 时段ID: {self.job_date_id}")
return True
except Exception as e:
logger.error(f"❌ 获取班次信息异常: {str(e)}")
return False
def _request_with_retry(self, url: str, data: Dict, checkin_type: str, max_retries: int = 3) -> Optional[Dict]:
"""带重试的网络请求"""
for attempt in range(1, max_retries + 1):
try:
response = self.session.post(url, json=data, headers=self.headers, timeout=15)
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
return result
else:
logger.error(f"{result.get('msg')}")
return None
else:
if attempt < max_retries:
logger.warning(f"⚠️ HTTP {response.status_code}, 重试 {attempt}/{max_retries}")
time.sleep(2 ** attempt)
continue
return None
except requests.exceptions.Timeout:
if attempt < max_retries:
logger.warning(f"⏱️ 超时, 重试 {attempt}/{max_retries}")
time.sleep(2 ** attempt)
continue
return None
except requests.exceptions.RequestException as e:
if attempt < max_retries:
logger.warning(f"⚠️ {str(e)}, 重试 {attempt}/{max_retries}")
time.sleep(2 ** attempt)
continue
return None
logger.error(f"❌ 请求失败")
return None
def _time_to_minutes(self, time_str: str) -> int:
"""时间字符串转为分钟数"""
h, m = map(int, time_str.split(':'))
return h * 60 + m
def _minutes_to_time(self, minutes: int) -> str:
"""分钟数转时间字符串"""
h, m = divmod(minutes % 1440, 60)
return f"{h:02d}:{m:02d}"
def _get_checkin_windows(self) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""
根据班次时间,自动计算打卡时间窗口
规则:
- 第一次打卡上班start_time 前2小时 到 start_time
- 第二次打卡下班end_time+1分钟 到 (start_time - 2小时 - 1分钟)
Returns:
((first_start, first_end), (second_start, second_end)) 分钟数
"""
start_minutes = self._time_to_minutes(self.start_time)
end_minutes = self._time_to_minutes(self.end_time)
# 第一次打卡窗口start_time - 120分钟 到 start_time
first_start = (start_minutes - 120) % 1440 # 1440 = 24*60
first_end = start_minutes
# 第二次打卡窗口end_time + 1分钟 到 (start_time - 121分钟)
second_start = (end_minutes + 1) % 1440
second_end = (start_minutes - 121) % 1440
return (first_start, first_end), (second_start, second_end)
def _is_in_window(self, current_minutes: int, start: int, end: int) -> bool:
"""检查当前时间是否在窗口内"""
if start <= end:
# 同一天内的窗口(如 21:00-23:00
return start <= current_minutes <= end
else:
# 跨天的窗口(如 07:01-20:59实际不跨天或 23:00-05:59 跨天)
return current_minutes >= start or current_minutes <= end
def _get_current_checkin_type(self) -> Optional[str]:
"""
根据当前时间判断应该执行哪种打卡
Returns:
'first' / 'second' / None
"""
now = datetime.now()
current_minutes = now.hour * 60 + now.minute
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
# 检查是否在第一次打卡窗口
if self._is_in_window(current_minutes, first_start, first_end):
return 'first'
# 检查是否在第二次打卡窗口
if self._is_in_window(current_minutes, second_start, second_end):
return 'second'
return None
def _get_window_str(self, start: int, end: int) -> str:
"""分钟数转时间字符串"""
start_h, start_m = divmod(start, 60)
end_h, end_m = divmod(end, 60)
return f"{start_h:02d}:{start_m:02d}-{end_h:02d}:{end_m:02d}"
def _show_checkin_windows(self):
"""显示打卡时间窗口信息"""
logger.info("\n📅 打卡时间窗口:")
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
first_start_str = self._minutes_to_time(first_start)
first_end_str = self._minutes_to_time(first_end)
second_start_str = self._minutes_to_time(second_start)
second_end_str = self._minutes_to_time(second_end)
logger.info(f" 🌅 上班打卡窗口: {first_start_str} - {first_end_str}")
logger.info(f" 🌆 下班打卡窗口: {second_start_str} - {second_end_str}")
# 检查当前时间
now = datetime.now()
current_minutes = now.hour * 60 + now.minute
current_time_str = now.strftime('%H:%M:%S')
logger.info(f"\n🕐 当前时间: {current_time_str}")
if self._is_in_window(current_minutes, first_start, first_end):
logger.info(f" ✅ 当前在【上班打卡】窗口内")
elif self._is_in_window(current_minutes, second_start, second_end):
logger.info(f" ✅ 当前在【下班打卡】窗口内")
else:
logger.info(f" ⏰ 当前不在任何打卡窗口内")
def face_detect(self, checkin_type: str) -> Optional[Dict]:
"""人脸识别验证"""
try:
url = f"{self.base_url}/recognition/faceDetect"
if not os.path.exists(self.photo_file):
logger.error(f"❌ 照片文件不存在")
return None
with open(self.photo_file, 'r', encoding='utf-8') as f:
image_data = f.read().strip()
data = {"image": image_data}
return self._request_with_retry(url, data, checkin_type)
except Exception as e:
logger.error(f"❌ 人脸识别异常: {str(e)}")
return None
def save_attendance(self, checkin_type: str) -> bool:
"""保存打卡记录"""
try:
url = f"{self.base_url}/Management/saveAttendanceManagement"
with open(self.photo_file, 'r', encoding='utf-8') as f:
image_data = f.read().strip()
# 第一次打卡isOff=0, updateCard=0
# 第二次打卡isOff=1, updateCard=1
is_off = 0 if checkin_type == 'first' else 1
update_card = 0 if checkin_type == 'first' else 1
data = {
"itemId": self.item_id,
"jobDateId": self.job_date_id,
"addressName": self.address_name,
"isLegwork": 0,
"legworkState": "",
"isOff": is_off,
"imageId": "",
"updateCard": update_card,
"image": image_data,
"longitude": self.longitude,
"latitude": self.latitude,
"userId": ""
}
result = self._request_with_retry(url, data, checkin_type)
return bool(result)
except Exception as e:
logger.error(f"❌ 保存记录异常: {str(e)}")
return False
def do_checkin(self, checkin_type: str) -> Tuple[bool, str]:
"""执行打卡流程,返回(成功与否, 结果消息)"""
checkin_name = '上班' if checkin_type == 'first' else '下班'
checkin_emoji = '🌅' if checkin_type == 'first' else '🌆'
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
prefix = f"[{self.user_name}] " if self.user_name else ""
# 构建详细的通知消息
display_name = self.remark if self.remark else self.user_name
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
first_window = f"{self._minutes_to_time(first_start)} - {self._minutes_to_time(first_end)}"
second_window = f"{self._minutes_to_time(second_start)} - {self._minutes_to_time(second_end)}"
msg_header = f"👤 {display_name}\n📱 {self.user_mobile}\n📍 {self.address_name}\n🌐 {self.longitude}, {self.latitude}\n{self.start_time}-{self.end_time}{' (跨天)' if self.is_cross_day else ''}\n\n🌅 上班: {first_window}\n🌆 下班: {second_window}\n\n"
logger.info(f"{prefix}{checkin_emoji} {checkin_name}打卡中...")
face_result = self.face_detect(checkin_type)
if not face_result:
logger.error(f"{prefix}{checkin_name}打卡失败")
return False, f"{msg_header}{current_time}\n{checkin_name}失败"
time.sleep(1)
success = self.save_attendance(checkin_type)
if success:
logger.info(f"{prefix}{checkin_name}打卡成功")
return True, f"{msg_header}{current_time}\n{checkin_name}成功"
else:
logger.error(f"{prefix}{checkin_name}打卡失败")
return False, f"{msg_header}{current_time}\n{checkin_name}失败"
def get_all_accounts() -> list:
"""
获取所有配置的账号
支持两种配置方式:
1. 单账号hxza_dk="token" 或 hxza_dk="token#备注"
2. 多账号hxza_dk_1="token1#备注1" hxza_dk_2="token2#备注2" ...
Returns:
账号配置列表,每个元素为 (env_var_name, config_value)
"""
accounts = []
# 检查单账号配置
single_config = os.getenv('hxza_dk', '')
if single_config:
accounts.append(('hxza_dk', single_config))
return accounts
# 检查多账号配置
i = 1
while True:
env_var = f'hxza_dk_{i}'
config = os.getenv(env_var, '')
if not config:
break
accounts.append((env_var, config))
i += 1
return accounts
def process_account(env_var_name: str, config: str, checkin_type_arg: Optional[str] = None) -> Tuple[bool, str]:
"""
处理单个账号的打卡
Args:
env_var_name: 环境变量名称
config: 配置值
checkin_type_arg: 指定的打卡类型('first''second'None 表示自动判断
Returns:
(成功与否, 结果消息)
"""
try:
# 临时设置环境变量
os.environ['hxza_dk'] = config
logger.info("\n" + "=" * 60)
logger.info(f"📱 处理账号: {env_var_name}")
logger.info("=" * 60)
checkin = AutoCheckIn()
success = False
msg = ""
if checkin_type_arg:
success, msg = checkin.do_checkin(checkin_type_arg)
else:
checkin_type = checkin._get_current_checkin_type()
if checkin_type:
type_emoji = '🌅' if checkin_type == 'first' else '🌆'
logger.info(f"{type_emoji} 当前时间在{'第一次' if checkin_type == 'first' else '第二次'}打卡窗口内")
success, msg = checkin.do_checkin(checkin_type)
else:
logger.info(f"⏰ 当前时间不在任何打卡窗口内")
(first_start, first_end), (second_start, second_end) = checkin._get_checkin_windows()
logger.info(f"🌅 第一次打卡窗口: {checkin._get_window_str(first_start, first_end)}")
logger.info(f"🌆 第二次打卡窗口: {checkin._get_window_str(second_start, second_end)}")
# 构建详细的通知消息
display_name = checkin.remark if checkin.remark else checkin.user_name
first_window = f"{checkin._minutes_to_time(first_start)} - {checkin._minutes_to_time(first_end)}"
second_window = f"{checkin._minutes_to_time(second_start)} - {checkin._minutes_to_time(second_end)}"
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
msg = f"👤 {display_name}\n📱 {checkin.user_mobile}\n📍 {checkin.address_name}\n🌐 {checkin.longitude}, {checkin.latitude}\n{checkin.start_time}-{checkin.end_time}{' (跨天)' if checkin.is_cross_day else ''}\n\n🌅 上班: {first_window}\n🌆 下班: {second_window}\n\n🕐 {current_time}\n⏰ 当前时间不在打卡窗口内"
return success, msg
except Exception as e:
error_msg = f"账号 {env_var_name} 处理异常: {str(e)}"
logger.error(error_msg)
return False, error_msg
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='小程序自动打卡脚本')
parser.add_argument('--type', choices=['first', 'second'], help='立即执行指定类型的打卡')
args = parser.parse_args()
try:
checkin = AutoCheckIn()
success = False
msg = ""
if args.type:
success, msg = checkin.do_checkin(args.type)
else:
checkin_type = checkin._get_current_checkin_type()
if checkin_type:
type_emoji = '🌅' if checkin_type == 'first' else '🌆'
logger.info(f"{type_emoji} 当前时间在{'第一次' if checkin_type == 'first' else '第二次'}打卡窗口内")
success, msg = checkin.do_checkin(checkin_type)
else:
logger.info(f"⏰ 当前时间不在任何打卡窗口内")
(first_start, first_end), (second_start, second_end) = checkin._get_checkin_windows()
logger.info(f"🌅 第一次打卡窗口: {checkin._get_window_str(first_start, first_end)}")
logger.info(f"🌆 第二次打卡窗口: {checkin._get_window_str(second_start, second_end)}")
# 构建详细的通知消息
display_name = checkin.remark if checkin.remark else checkin.user_name
first_window = f"{checkin._minutes_to_time(first_start)} - {checkin._minutes_to_time(first_end)}"
second_window = f"{checkin._minutes_to_time(second_start)} - {checkin._minutes_to_time(second_end)}"
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
msg = f"👤 {display_name}\n📱 {checkin.user_mobile}\n📍 {checkin.address_name}\n🌐 {checkin.longitude}, {checkin.latitude}\n{checkin.start_time}-{checkin.end_time}{' (跨天)' if checkin.is_cross_day else ''}\n\n🌅 上班: {first_window}\n🌆 下班: {second_window}\n\n🕐 {current_time}\n⏰ 当前时间不在打卡窗口内"
# 发送通知
if NOTIFY_ENABLED and msg:
title = "📱 小程序打卡通知"
if success:
title = "✅ 小程序打卡成功"
elif "不在打卡窗口" in msg:
title = "⏰ 小程序打卡提醒"
else:
title = "❌ 小程序打卡失败"
notify_send(title, msg)
logger.info(f"📢 通知已发送: {title}")
elif not NOTIFY_ENABLED:
logger.warning("⚠️ 通知模块未启用,跳过通知推送")
exit(0 if success else 1)
except KeyboardInterrupt:
logger.info("\n用户中断程序")
except Exception as e:
error_msg = f"程序异常: {str(e)}"
logger.error(error_msg)
# 异常时也发送通知
if NOTIFY_ENABLED:
notify_send("❌ 小程序打卡异常", error_msg)
exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,172 @@
"""
小程序自动登录脚本(照片登录)
=== 功能说明 ===
- 使用照片自动登录获取新的 token
- token 有效期 15 天1296000 秒)
- 无需验证码,直接通过人脸识别登录
=== 使用方法 ===
1. 确保 photo.txt 文件存在且包含 Base64 编码的照片
2. 运行脚本python auto_login.py
3. 脚本会输出新的 token有效期 15 天
"""
import requests
import os
import logging
from datetime import datetime, timedelta
from typing import Optional
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class AutoLogin:
"""自动登录类"""
def __init__(self):
self.base_url = "https://erp.baian.tech/baian-admin"
self.app_id = "wxa81d5d83880ea6b6"
self.photo_file = "photo.txt"
self.headers = {
'Host': 'erp.baian.tech',
'Connection': 'keep-alive',
'appId': self.app_id,
'content-type': 'application/json',
'wxAppKeyCompanyId': self.app_id,
'token': '', # 登录时不需要 token
'Accept-Encoding': 'gzip,compress,br,deflate',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.65(0x1800412e) NetType/WIFI Language/zh_CN',
'Referer': f"https://servicewechat.com/{self.app_id}/102/page-frame.html"
}
logger.info("=" * 60)
logger.info("🔐 小程序自动登录(照片登录)")
logger.info("=" * 60)
def login(self) -> Optional[dict]:
"""
使用照片登录获取 token
Returns:
成功返回 {"token": "xxx", "expire": 1296000},失败返回 None
"""
try:
logger.info("\n🔍 正在读取照片文件...")
# 读取照片文件
if not os.path.exists(self.photo_file):
logger.error(f"❌ 照片文件不存在: {self.photo_file}")
return None
with open(self.photo_file, 'r', encoding='utf-8') as f:
image_data = f.read().strip()
if not image_data:
logger.error(f"❌ 照片文件为空")
return None
logger.info(f"✅ 照片文件读取成功")
logger.info(f" 📄 文件大小: {len(image_data)} 字符")
# 发送登录请求
logger.info("\n🔐 正在发送登录请求...")
url = f"{self.base_url}/recognition/faceSearch"
data = {"image": image_data}
response = requests.post(
url,
json=data,
headers=self.headers,
timeout=30
)
if response.status_code != 200:
logger.error(f"❌ 登录请求失败: HTTP {response.status_code}")
return None
result = response.json()
if result.get('code') != 200:
logger.error(f"❌ 登录失败: {result.get('msg')}")
return None
# 解析返回数据
data = result.get('data', {})
token = data.get('token')
expire = data.get('expire') # 秒数1296000 = 15天
if not token:
logger.error(f"❌ 未获取到 token")
return None
# 计算过期时间
expire_date = datetime.now() + timedelta(seconds=expire)
expire_str = expire_date.strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"✅ 登录成功!")
logger.info(f"\n" + "=" * 60)
logger.info(f"🎫 Token 信息:")
logger.info(f"=" * 60)
logger.info(f"Token: {token}")
logger.info(f"有效期: {expire} 秒 ({expire // 86400} 天)")
logger.info(f"过期时间: {expire_str}")
logger.info(f"=" * 60)
logger.info(f"\n💡 提示:")
logger.info(f" 请将以下 token 配置到环境变量 hxza_dk 中")
logger.info(f" 格式: hxza_dk=\"{token}\"")
logger.info(f" 或者: hxza_dk=\"{token}#备注\"")
return {
'token': token,
'expire': expire,
'expire_date': expire_str
}
except Exception as e:
logger.error(f"❌ 登录异常: {str(e)}")
return None
def main():
"""主函数"""
try:
logger.info("\n🚀 开始自动登录...\n")
login = AutoLogin()
result = login.login()
if result:
logger.info("\n✅ 自动登录成功!")
logger.info("\n💡 下一步操作:")
logger.info(" 1. 复制上面的 token")
logger.info(" 2. 更新环境变量 hxza_dk")
logger.info(" 3. 运行 auto_checkin.py 进行打卡")
else:
logger.error("\n❌ 自动登录失败!")
logger.error("\n💡 请检查:")
logger.error(" 1. photo.txt 文件是否存在")
logger.error(" 2. 照片内容是否正确")
logger.error(" 3. 网络连接是否正常")
exit(1)
except KeyboardInterrupt:
logger.info("\n⚠️ 用户中断程序")
except Exception as e:
error_msg = f"\n❌ 程序异常: {str(e)}"
logger.error(error_msg)
exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,541 @@
#!/usr/bin/env python3
# _*_ coding:utf-8 _*_
import base64
import hashlib
import hmac
import json
import os
import re
import threading
import time
import urllib.parse
import requests
# 原先的 print 函数和主线程的锁
_print = print
mutex = threading.Lock()
# 定义新的 print 函数
def print(text, *args, **kw):
"""
使输出有序进行,不出现多线程同一时间输出导致错乱的问题。
"""
with mutex:
_print(text, *args, **kw)
# 通知服务
# fmt: off
push_config = {
'HITOKOTO': False, # 启用一言(随机句子)
'BARK_PUSH': '', # bark IP 或设备码https://api.day.app/DxHcxxxxxRxxxxxxcm/
'BARK_ARCHIVE': '', # bark 推送是否存档
'BARK_GROUP': '', # bark 推送分组
'BARK_SOUND': '', # bark 推送声音
'CONSOLE': True, # 控制台输出
'DD_BOT_SECRET': '', # 钉钉机器人的 DD_BOT_SECRET
'DD_BOT_TOKEN': '', # 钉钉机器人的 DD_BOT_TOKEN
'FSKEY': '', # 飞书机器人的 FSKEY
'GOBOT_URL': '', # go-cqhttp
# 推送到个人QQhttp://127.0.0.1/send_private_msg
# 群http://127.0.0.1/send_group_msg
'GOBOT_QQ': '', # go-cqhttp 的推送群或用户
# GOBOT_URL 设置 /send_private_msg 时填入 user_id=个人QQ
# /send_group_msg 时填入 group_id=QQ群
'GOBOT_TOKEN': '', # go-cqhttp 的 access_token
'GOTIFY_URL': '', # gotify地址,如https://push.example.de:8080
'GOTIFY_TOKEN': '', # gotify的消息应用token
'GOTIFY_PRIORITY': 0, # 推送消息优先级,默认为0
'IGOT_PUSH_KEY': '', # iGot 聚合推送的 IGOT_PUSH_KEY
'PUSH_KEY': '', # server 酱的 PUSH_KEY兼容旧版与 Turbo 版
'PUSH_PLUS_TOKEN': '', # push+ 微信推送的用户令牌
'PUSH_PLUS_USER': '', # push+ 微信推送的群组编码
'QMSG_KEY': '', # qmsg 酱的 QMSG_KEY
'QMSG_TYPE': '', # qmsg 酱的 QMSG_TYPE
'QYWX_AM': '', # 企业微信应用
'QYWX_KEY': '', # 企业微信机器人
'TG_BOT_TOKEN': '', # tg 机器人的 TG_BOT_TOKEN1407203283:AAG9rt-6RDaaX0HBLZQq0laNOh898iFYaRQ
'TG_USER_ID': '', # tg 机器人的 TG_USER_ID1434078534
'TG_API_HOST': '', # tg 代理 api
'TG_PROXY_AUTH': '', # tg 代理认证参数
'TG_PROXY_HOST': '', # tg 机器人的 TG_PROXY_HOST
'TG_PROXY_PORT': '', # tg 机器人的 TG_PROXY_PORT
}
notify_function = []
# fmt: on
# 首先读取 面板变量 或者 github action 运行变量
for k in push_config:
if os.getenv(k):
v = os.getenv(k)
push_config[k] = v
def bark(title: str, content: str) -> None:
"""
使用 bark 推送消息。
"""
if not push_config.get("BARK_PUSH"):
print("bark 服务的 BARK_PUSH 未设置!!\n取消推送")
return
print("bark 服务启动")
if push_config.get("BARK_PUSH").startswith("http"):
url = f'{push_config.get("BARK_PUSH")}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}'
else:
url = f'https://api.day.app/{push_config.get("BARK_PUSH")}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}'
bark_params = {
"BARK_ARCHIVE": "isArchive",
"BARK_GROUP": "group",
"BARK_SOUND": "sound",
}
params = ""
for pair in filter(
lambda pairs: pairs[0].startswith("BARK_")
and pairs[0] != "BARK_PUSH"
and pairs[1]
and bark_params.get(pairs[0]),
push_config.items(),
):
params += f"{bark_params.get(pair[0])}={pair[1]}&"
if params:
url = url + "?" + params.rstrip("&")
response = requests.get(url).json()
if response["code"] == 200:
print("bark 推送成功!")
else:
print("bark 推送失败!")
def console(title: str, content: str) -> None:
"""
使用 控制台 推送消息。
"""
print(f"{title}\n\n{content}")
def dingding_bot(title: str, content: str) -> None:
"""
使用 钉钉机器人 推送消息。
"""
if not push_config.get("DD_BOT_SECRET") or not push_config.get("DD_BOT_TOKEN"):
print("钉钉机器人 服务的 DD_BOT_SECRET 或者 DD_BOT_TOKEN 未设置!!\n取消推送")
return
print("钉钉机器人 服务启动")
timestamp = str(round(time.time() * 1000))
secret_enc = push_config.get("DD_BOT_SECRET").encode("utf-8")
string_to_sign = "{}\n{}".format(timestamp, push_config.get("DD_BOT_SECRET"))
string_to_sign_enc = string_to_sign.encode("utf-8")
hmac_code = hmac.new(
secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = f'https://oapi.dingtalk.com/robot/send?access_token={push_config.get("DD_BOT_TOKEN")}&timestamp={timestamp}&sign={sign}'
headers = {"Content-Type": "application/json;charset=utf-8"}
data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}}
response = requests.post(
url=url, data=json.dumps(data), headers=headers, timeout=15
).json()
if not response["errcode"]:
print("钉钉机器人 推送成功!")
else:
print("钉钉机器人 推送失败!")
def feishu_bot(title: str, content: str) -> None:
"""
使用 飞书机器人 推送消息。
"""
if not push_config.get("FSKEY"):
print("飞书 服务的 FSKEY 未设置!!\n取消推送")
return
print("飞书 服务启动")
url = f'https://open.feishu.cn/open-apis/bot/v2/hook/{push_config.get("FSKEY")}'
data = {"msg_type": "text", "content": {"text": f"{title}\n\n{content}"}}
response = requests.post(url, data=json.dumps(data)).json()
if response.get("StatusCode") == 0:
print("飞书 推送成功!")
else:
print("飞书 推送失败!错误信息如下:\n", response)
def go_cqhttp(title: str, content: str) -> None:
"""
使用 go_cqhttp 推送消息。
"""
if not push_config.get("GOBOT_URL") or not push_config.get("GOBOT_QQ"):
print("go-cqhttp 服务的 GOBOT_URL 或 GOBOT_QQ 未设置!!\n取消推送")
return
print("go-cqhttp 服务启动")
url = f'{push_config.get("GOBOT_URL")}?access_token={push_config.get("GOBOT_TOKEN")}&{push_config.get("GOBOT_QQ")}&message=标题:{title}\n内容:{content}'
response = requests.get(url).json()
if response["status"] == "ok":
print("go-cqhttp 推送成功!")
else:
print("go-cqhttp 推送失败!")
def gotify(title:str,content:str) -> None:
"""
使用 gotify 推送消息。
"""
if not push_config.get("GOTIFY_URL") or not push_config.get("GOTIFY_TOKEN"):
print("gotify 服务的 GOTIFY_URL 或 GOTIFY_TOKEN 未设置!!\n取消推送")
return
print("gotify 服务启动")
url = f'{push_config.get("GOTIFY_URL")}/message?token={push_config.get("GOTIFY_TOKEN")}'
data = {"title": title,"message": content,"priority": push_config.get("GOTIFY_PRIORITY")}
response = requests.post(url,data=data).json()
if response.get("id"):
print("gotify 推送成功!")
else:
print("gotify 推送失败!")
def iGot(title: str, content: str) -> None:
"""
使用 iGot 推送消息。
"""
if not push_config.get("IGOT_PUSH_KEY"):
print("iGot 服务的 IGOT_PUSH_KEY 未设置!!\n取消推送")
return
print("iGot 服务启动")
url = f'https://push.hellyw.com/{push_config.get("IGOT_PUSH_KEY")}'
data = {"title": title, "content": content}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=data, headers=headers).json()
if response["ret"] == 0:
print("iGot 推送成功!")
else:
print(f'iGot 推送失败!{response["errMsg"]}')
def serverJ(title: str, content: str) -> None:
"""
通过 serverJ 推送消息。
"""
if not push_config.get("PUSH_KEY"):
print("serverJ 服务的 PUSH_KEY 未设置!!\n取消推送")
return
print("serverJ 服务启动")
data = {"text": title, "desp": content.replace("\n", "\n\n")}
if push_config.get("PUSH_KEY").index("SCT") != -1:
url = f'https://sctapi.ftqq.com/{push_config.get("PUSH_KEY")}.send'
else:
url = f'https://sc.ftqq.com/${push_config.get("PUSH_KEY")}.send'
response = requests.post(url, data=data).json()
if response.get("errno") == 0 or response.get("code") == 0:
print("serverJ 推送成功!")
else:
print(f'serverJ 推送失败!错误码:{response["message"]}')
def pushplus_bot(title: str, content: str) -> None:
"""
通过 push+ 推送消息。
"""
if not push_config.get("PUSH_PLUS_TOKEN"):
print("PUSHPLUS 服务的 PUSH_PLUS_TOKEN 未设置!!\n取消推送")
return
print("PUSHPLUS 服务启动")
url = "http://www.pushplus.plus/send"
data = {
"token": push_config.get("PUSH_PLUS_TOKEN"),
"title": title,
"content": content,
"topic": push_config.get("PUSH_PLUS_USER"),
}
body = json.dumps(data).encode(encoding="utf-8")
headers = {"Content-Type": "application/json"}
response = requests.post(url=url, data=body, headers=headers).json()
if response["code"] == 200:
print("PUSHPLUS 推送成功!")
else:
url_old = "http://pushplus.hxtrip.com/send"
headers["Accept"] = "application/json"
response = requests.post(url=url_old, data=body, headers=headers).json()
if response["code"] == 200:
print("PUSHPLUS(hxtrip) 推送成功!")
else:
print("PUSHPLUS 推送失败!")
def qmsg_bot(title: str, content: str) -> None:
"""
使用 qmsg 推送消息。
"""
if not push_config.get("QMSG_KEY") or not push_config.get("QMSG_TYPE"):
print("qmsg 的 QMSG_KEY 或者 QMSG_TYPE 未设置!!\n取消推送")
return
print("qmsg 服务启动")
url = f'https://qmsg.zendee.cn/{push_config.get("QMSG_TYPE")}/{push_config.get("QMSG_KEY")}'
payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")}
response = requests.post(url=url, params=payload).json()
if response["code"] == 0:
print("qmsg 推送成功!")
else:
print(f'qmsg 推送失败!{response["reason"]}')
def wecom_app(title: str, content: str) -> None:
"""
通过 企业微信 APP 推送消息。
"""
if not push_config.get("QYWX_AM"):
print("QYWX_AM 未设置!!\n取消推送")
return
QYWX_AM_AY = re.split(",", push_config.get("QYWX_AM"))
if 4 < len(QYWX_AM_AY) > 5:
print("QYWX_AM 设置错误!!\n取消推送")
return
print("企业微信 APP 服务启动")
corpid = QYWX_AM_AY[0]
corpsecret = QYWX_AM_AY[1]
touser = QYWX_AM_AY[2]
agentid = QYWX_AM_AY[3]
try:
media_id = QYWX_AM_AY[4]
except IndexError:
media_id = ""
wx = WeCom(corpid, corpsecret, agentid)
# 如果没有配置 media_id 默认就以 text 方式发送
if not media_id:
message = title + "\n\n" + content
response = wx.send_text(message, touser)
else:
response = wx.send_mpnews(title, content, media_id, touser)
if response == "ok":
print("企业微信推送成功!")
else:
print("企业微信推送失败!错误信息如下:\n", response)
class WeCom:
def __init__(self, corpid, corpsecret, agentid):
self.CORPID = corpid
self.CORPSECRET = corpsecret
self.AGENTID = agentid
def get_access_token(self):
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
values = {
"corpid": self.CORPID,
"corpsecret": self.CORPSECRET,
}
req = requests.post(url, params=values)
data = json.loads(req.text)
return data["access_token"]
def send_text(self, message, touser="@all"):
send_url = (
"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="
+ self.get_access_token()
)
send_values = {
"touser": touser,
"msgtype": "text",
"agentid": self.AGENTID,
"text": {"content": message},
"safe": "0",
}
send_msges = bytes(json.dumps(send_values), "utf-8")
respone = requests.post(send_url, send_msges)
respone = respone.json()
return respone["errmsg"]
def send_mpnews(self, title, message, media_id, touser="@all"):
send_url = (
"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="
+ self.get_access_token()
)
send_values = {
"touser": touser,
"msgtype": "mpnews",
"agentid": self.AGENTID,
"mpnews": {
"articles": [
{
"title": title,
"thumb_media_id": media_id,
"author": "Author",
"content_source_url": "",
"content": message.replace("\n", "<br/>"),
"digest": message,
}
]
},
}
send_msges = bytes(json.dumps(send_values), "utf-8")
respone = requests.post(send_url, send_msges)
respone = respone.json()
return respone["errmsg"]
def wecom_bot(title: str, content: str) -> None:
"""
通过 企业微信机器人 推送消息。
"""
if not push_config.get("QYWX_KEY"):
print("企业微信机器人 服务的 QYWX_KEY 未设置!!\n取消推送")
return
print("企业微信机器人服务启动")
url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={push_config.get('QYWX_KEY')}"
headers = {"Content-Type": "application/json;charset=utf-8"}
data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}}
response = requests.post(
url=url, data=json.dumps(data), headers=headers, timeout=15
).json()
if response["errcode"] == 0:
print("企业微信机器人推送成功!")
else:
print("企业微信机器人推送失败!")
def telegram_bot(title: str, content: str) -> None:
"""
使用 telegram 机器人 推送消息。
"""
if not push_config.get("TG_BOT_TOKEN") or not push_config.get("TG_USER_ID"):
print("tg 服务的 bot_token 或者 user_id 未设置!!\n取消推送")
return
print("tg 服务启动")
if push_config.get("TG_API_HOST"):
url = f"https://{push_config.get('TG_API_HOST')}/bot{push_config.get('TG_BOT_TOKEN')}/sendMessage"
else:
url = (
f"https://api.telegram.org/bot{push_config.get('TG_BOT_TOKEN')}/sendMessage"
)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
payload = {
"chat_id": str(push_config.get("TG_USER_ID")),
"text": f"{title}\n\n{content}",
"disable_web_page_preview": "true",
}
proxies = None
if push_config.get("TG_PROXY_HOST") and push_config.get("TG_PROXY_PORT"):
if push_config.get("TG_PROXY_AUTH") is not None and "@" not in push_config.get(
"TG_PROXY_HOST"
):
push_config["TG_PROXY_HOST"] = (
push_config.get("TG_PROXY_AUTH")
+ "@"
+ push_config.get("TG_PROXY_HOST")
)
proxyStr = "http://{}:{}".format(
push_config.get("TG_PROXY_HOST"), push_config.get("TG_PROXY_PORT")
)
proxies = {"http": proxyStr, "https": proxyStr}
response = requests.post(
url=url, headers=headers, params=payload, proxies=proxies
).json()
if response["ok"]:
print("tg 推送成功!")
else:
print("tg 推送失败!")
def one() -> str:
"""
获取一条一言。
:return:
"""
url = "https://v1.hitokoto.cn/"
res = requests.get(url).json()
return res["hitokoto"] + " ----" + res["from"]
if push_config.get("BARK_PUSH"):
notify_function.append(bark)
if push_config.get("CONSOLE"):
notify_function.append(console)
if push_config.get("DD_BOT_TOKEN") and push_config.get("DD_BOT_SECRET"):
notify_function.append(dingding_bot)
if push_config.get("FSKEY"):
notify_function.append(feishu_bot)
if push_config.get("GOBOT_URL") and push_config.get("GOBOT_QQ"):
notify_function.append(go_cqhttp)
if push_config.get("GOTIFY_URL") and push_config.get("GOTIFY_TOKEN"):
notify_function.append(gotify)
if push_config.get("IGOT_PUSH_KEY"):
notify_function.append(iGot)
if push_config.get("PUSH_KEY"):
notify_function.append(serverJ)
if push_config.get("PUSH_PLUS_TOKEN"):
notify_function.append(pushplus_bot)
if push_config.get("QMSG_KEY") and push_config.get("QMSG_TYPE"):
notify_function.append(qmsg_bot)
if push_config.get("QYWX_AM"):
notify_function.append(wecom_app)
if push_config.get("QYWX_KEY"):
notify_function.append(wecom_bot)
if push_config.get("TG_BOT_TOKEN") and push_config.get("TG_USER_ID"):
notify_function.append(telegram_bot)
def send(title: str, content: str) -> None:
if not content:
print(f"{title} 推送内容为空!")
return
hitokoto = push_config.get("HITOKOTO")
text = one() if hitokoto else ""
content += "\n\n" + text
ts = [
threading.Thread(target=mode, args=(title, content), name=mode.__name__)
for mode in notify_function
]
[t.start() for t in ts]
[t.join() for t in ts]
def main():
send("title", "content")
if __name__ == "__main__":
main()

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,462 @@
#!/usr/bin/env python3
# _*_ coding:utf-8 _*_
#Modify: Kirin
from curses.ascii import FS
import sys
import os, re
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
cur_path = os.path.abspath(os.path.dirname(__file__))
root_path = os.path.split(cur_path)[0]
sys.path.append(root_path)
# 通知服务
BARK = 'GQ9g7h6ir2gQActgab25Jj' # bark服务,自行搜索; secrets可填;
BARK_PUSH='' # bark自建服务器要填完整链接结尾的/不要
PUSH_KEY = '' # Server酱的PUSH_KEY; secrets可填
TG_BOT_TOKEN = '' # tg机器人的TG_BOT_TOKEN; secrets可填1407203283:AAG9rt-6RDaaX0HBLZQq0laNOh898iFYaRQ
TG_USER_ID = '' # tg机器人的TG_USER_ID; secrets可填 1434078534
TG_API_HOST='' # tg 代理api
TG_PROXY_IP = '' # tg机器人的TG_PROXY_IP; secrets可填
TG_PROXY_PORT = '' # tg机器人的TG_PROXY_PORT; secrets可填
DD_BOT_TOKEN = '' # 钉钉机器人的DD_BOT_TOKEN; secrets可填
DD_BOT_SECRET = '' # 钉钉机器人的DD_BOT_SECRET; secrets可填
QQ_SKEY = '' # qq机器人的QQ_SKEY; secrets可填
QQ_MODE = '' # qq机器人的QQ_MODE; secrets可填
QYWX_AM = '' # 企业微信
QYWX_KEY = '' # 企业微信BOT
PUSH_PLUS_TOKEN = '' # 微信推送Plus+
FS_KEY = '' #飞书群BOT
notify_mode = []
message_info = ''''''
# GitHub action运行需要填写对应的secrets
if "BARK" in os.environ and os.environ["BARK"]:
BARK = os.environ["BARK"]
if "BARK_PUSH" in os.environ and os.environ["BARK_PUSH"]:
BARK_PUSH = os.environ["BARK_PUSH"]
if "PUSH_KEY" in os.environ and os.environ["PUSH_KEY"]:
PUSH_KEY = os.environ["PUSH_KEY"]
if "TG_BOT_TOKEN" in os.environ and os.environ["TG_BOT_TOKEN"] and "TG_USER_ID" in os.environ and os.environ["TG_USER_ID"]:
TG_BOT_TOKEN = os.environ["TG_BOT_TOKEN"]
TG_USER_ID = os.environ["TG_USER_ID"]
if "TG_API_HOST" in os.environ and os.environ["TG_API_HOST"]:
TG_API_HOST = os.environ["TG_API_HOST"]
if "DD_BOT_TOKEN" in os.environ and os.environ["DD_BOT_TOKEN"] and "DD_BOT_SECRET" in os.environ and os.environ["DD_BOT_SECRET"]:
DD_BOT_TOKEN = os.environ["DD_BOT_TOKEN"]
DD_BOT_SECRET = os.environ["DD_BOT_SECRET"]
if "QQ_SKEY" in os.environ and os.environ["QQ_SKEY"] and "QQ_MODE" in os.environ and os.environ["QQ_MODE"]:
QQ_SKEY = os.environ["QQ_SKEY"]
QQ_MODE = os.environ["QQ_MODE"]
# 获取pushplus+ PUSH_PLUS_TOKEN
if "PUSH_PLUS_TOKEN" in os.environ:
if len(os.environ["PUSH_PLUS_TOKEN"]) > 1:
PUSH_PLUS_TOKEN = os.environ["PUSH_PLUS_TOKEN"]
# print("已获取并使用Env环境 PUSH_PLUS_TOKEN")
# 获取企业微信应用推送 QYWX_AM
if "QYWX_AM" in os.environ:
if len(os.environ["QYWX_AM"]) > 1:
QYWX_AM = os.environ["QYWX_AM"]
if "QYWX_KEY" in os.environ:
if len(os.environ["QYWX_KEY"]) > 1:
QYWX_KEY = os.environ["QYWX_KEY"]
# print("已获取并使用Env环境 QYWX_AM")
#接入飞书webhook推送
if "FS_KEY" in os.environ:
if len(os.environ["FS_KEY"]) > 1:
FS_KEY = os.environ["FS_KEY"]
if BARK:
notify_mode.append('bark')
# print("BARK 推送打开")
if BARK_PUSH:
notify_mode.append('bark')
# print("BARK 推送打开")
if PUSH_KEY:
notify_mode.append('sc_key')
# print("Server酱 推送打开")
if TG_BOT_TOKEN and TG_USER_ID:
notify_mode.append('telegram_bot')
# print("Telegram 推送打开")
if DD_BOT_TOKEN and DD_BOT_SECRET:
notify_mode.append('dingding_bot')
# print("钉钉机器人 推送打开")
if QQ_SKEY and QQ_MODE:
notify_mode.append('coolpush_bot')
# print("QQ机器人 推送打开")
if PUSH_PLUS_TOKEN:
notify_mode.append('pushplus_bot')
# print("微信推送Plus机器人 推送打开")
if QYWX_AM:
notify_mode.append('wecom_app')
# print("企业微信机器人 推送打开")
if QYWX_KEY:
notify_mode.append('wecom_key')
# print("企业微信机器人 推送打开")
if FS_KEY:
notify_mode.append('fs_key')
# print("飞书机器人 推送打开")
def message(str_msg):
global message_info
print(str_msg)
message_info = "{}\n{}".format(message_info, str_msg)
sys.stdout.flush()
def bark(title, content):
print("\n")
if BARK:
try:
response = requests.get(
f"""https://api.day.app/{BARK}/{title}/{urllib.parse.quote_plus(content)}""").json()
if response['code'] == 200:
print('推送成功!')
else:
print('推送失败!')
except:
print('推送失败!')
if BARK_PUSH:
try:
response = requests.get(
f"""{BARK_PUSH}/{title}/{urllib.parse.quote_plus(content)}""").json()
if response['code'] == 200:
print('推送成功!')
else:
print('推送失败!')
except:
print('推送失败!')
print("bark服务启动")
if BARK=='' and BARK_PUSH=='':
print("bark服务的bark_token未设置!!\n取消推送")
return
def serverJ(title, content):
print("\n")
if not PUSH_KEY:
print("server酱服务的PUSH_KEY未设置!!\n取消推送")
return
print("serverJ服务启动")
data = {
"text": title,
"desp": content.replace("\n", "\n\n")
}
response = requests.post(f"https://sc.ftqq.com/{PUSH_KEY}.send", data=data).json()
if response['errno'] == 0:
print('推送成功!')
else:
print('推送失败!')
# tg通知
def telegram_bot(title, content):
try:
print("\n")
bot_token = TG_BOT_TOKEN
user_id = TG_USER_ID
if not bot_token or not user_id:
print("tg服务的bot_token或者user_id未设置!!\n取消推送")
return
print("tg服务启动")
if TG_API_HOST:
if 'http' in TG_API_HOST:
url = f"{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage"
else:
url = f"https://{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage"
else:
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
payload = {'chat_id': str(TG_USER_ID), 'text': f'{title}\n\n{content}', 'disable_web_page_preview': 'true'}
proxies = None
if TG_PROXY_IP and TG_PROXY_PORT:
proxyStr = "http://{}:{}".format(TG_PROXY_IP, TG_PROXY_PORT)
proxies = {"http": proxyStr, "https": proxyStr}
try:
response = requests.post(url=url, headers=headers, params=payload, proxies=proxies).json()
except:
print('推送失败!')
if response['ok']:
print('推送成功!')
else:
print('推送失败!')
except Exception as e:
print(e)
def dingding_bot(title, content):
timestamp = str(round(time.time() * 1000)) # 时间戳
secret_enc = DD_BOT_SECRET.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, DD_BOT_SECRET)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 签名
print('开始使用 钉钉机器人 推送消息...', end='')
url = f'https://oapi.dingtalk.com/robot/send?access_token={DD_BOT_TOKEN}&timestamp={timestamp}&sign={sign}'
headers = {'Content-Type': 'application/json;charset=utf-8'}
data = {
'msgtype': 'text',
'text': {'content': f'{title}\n\n{content}'}
}
response = requests.post(url=url, data=json.dumps(data), headers=headers, timeout=15).json()
if not response['errcode']:
print('推送成功!')
else:
print('推送失败!')
def coolpush_bot(title, content):
print("\n")
if not QQ_SKEY or not QQ_MODE:
print("qq服务的QQ_SKEY或者QQ_MODE未设置!!\n取消推送")
return
print("qq服务启动")
url=f"https://qmsg.zendee.cn/{QQ_MODE}/{QQ_SKEY}"
payload = {'msg': f"{title}\n\n{content}".encode('utf-8')}
response = requests.post(url=url, params=payload).json()
if response['code'] == 0:
print('推送成功!')
else:
print('推送失败!')
# push推送
def pushplus_bot(title, content):
try:
print("\n")
if not PUSH_PLUS_TOKEN:
print("PUSHPLUS服务的token未设置!!\n取消推送")
return
print("PUSHPLUS服务启动")
url = 'http://www.pushplus.plus/send'
data = {
"token": PUSH_PLUS_TOKEN,
"title": title,
"content": content
}
body = json.dumps(data).encode(encoding='utf-8')
headers = {'Content-Type': 'application/json'}
response = requests.post(url=url, data=body, headers=headers).json()
if response['code'] == 200:
print('推送成功!')
else:
print('推送失败!')
except Exception as e:
print(e)
def wecom_key(title, content):
print("\n")
if not QYWX_KEY:
print("QYWX_KEY未设置!!\n取消推送")
return
print("QYWX_KEY服务启动")
print("content"+content)
headers = {'Content-Type': 'application/json'}
data = {
"msgtype":"text",
"text":{
"content":title+"\n"+content.replace("\n", "\n\n")
}
}
print(f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={QYWX_KEY}")
response = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={QYWX_KEY}", json=data,headers=headers).json()
print(response)
# 飞书机器人推送
def fs_key(title, content):
print("\n")
if not FS_KEY:
print("FS_KEY未设置!!\n取消推送")
return
print("FS_KEY服务启动")
print("content"+content)
headers = {'Content-Type': 'application/json'}
data = {
"msg_type":"text",
"content":{
"text":title+"\n"+content.replace("\n", "\n\n")
}
}
print(f"https://open.feishu.cn/open-apis/bot/v2/hook/{FS_KEY}")
response = requests.post(f"https://open.feishu.cn/open-apis/bot/v2/hook/{FS_KEY}", json=data,headers=headers).json()
print(response)
# 企业微信 APP 推送
def wecom_app(title, content):
try:
if not QYWX_AM:
print("QYWX_AM 并未设置!!\n取消推送")
return
QYWX_AM_AY = re.split(',', QYWX_AM)
if 4 < len(QYWX_AM_AY) > 5:
print("QYWX_AM 设置错误!!\n取消推送")
return
corpid = QYWX_AM_AY[0]
corpsecret = QYWX_AM_AY[1]
touser = QYWX_AM_AY[2]
agentid = QYWX_AM_AY[3]
try:
media_id = QYWX_AM_AY[4]
except:
media_id = ''
wx = WeCom(corpid, corpsecret, agentid)
# 如果没有配置 media_id 默认就以 text 方式发送
if not media_id:
message = title + '\n\n' + content
response = wx.send_text(message, touser)
else:
response = wx.send_mpnews(title, content, media_id, touser)
if response == 'ok':
print('推送成功!')
else:
print('推送失败!错误信息如下:\n', response)
except Exception as e:
print(e)
class WeCom:
def __init__(self, corpid, corpsecret, agentid):
self.CORPID = corpid
self.CORPSECRET = corpsecret
self.AGENTID = agentid
def get_access_token(self):
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
values = {'corpid': self.CORPID,
'corpsecret': self.CORPSECRET,
}
req = requests.post(url, params=values)
data = json.loads(req.text)
return data["access_token"]
def send_text(self, message, touser="@all"):
send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + self.get_access_token()
send_values = {
"touser": touser,
"msgtype": "text",
"agentid": self.AGENTID,
"text": {
"content": message
},
"safe": "0"
}
send_msges = (bytes(json.dumps(send_values), 'utf-8'))
respone = requests.post(send_url, send_msges)
respone = respone.json()
return respone["errmsg"]
def send_mpnews(self, title, message, media_id, touser="@all"):
send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + self.get_access_token()
send_values = {
"touser": touser,
"msgtype": "mpnews",
"agentid": self.AGENTID,
"mpnews": {
"articles": [
{
"title": title,
"thumb_media_id": media_id,
"author": "Author",
"content_source_url": "",
"content": message.replace('\n', '<br/>'),
"digest": message
}
]
}
}
send_msges = (bytes(json.dumps(send_values), 'utf-8'))
respone = requests.post(send_url, send_msges)
respone = respone.json()
return respone["errmsg"]
def send(title, content):
"""
使用 bark, telegram bot, dingding bot, server, feishuJ 发送手机推送
:param title:
:param content:
:return:
"""
for i in notify_mode:
if i == 'bark':
if BARK or BARK_PUSH:
bark(title=title, content=content)
else:
print('未启用 bark')
continue
if i == 'sc_key':
if PUSH_KEY:
serverJ(title=title, content=content)
else:
print('未启用 Server酱')
continue
elif i == 'dingding_bot':
if DD_BOT_TOKEN and DD_BOT_SECRET:
dingding_bot(title=title, content=content)
else:
print('未启用 钉钉机器人')
continue
elif i == 'telegram_bot':
if TG_BOT_TOKEN and TG_USER_ID:
telegram_bot(title=title, content=content)
else:
print('未启用 telegram机器人')
continue
elif i == 'coolpush_bot':
if QQ_SKEY and QQ_MODE:
coolpush_bot(title=title, content=content)
else:
print('未启用 QQ机器人')
continue
elif i == 'pushplus_bot':
if PUSH_PLUS_TOKEN:
pushplus_bot(title=title, content=content)
else:
print('未启用 PUSHPLUS机器人')
continue
elif i == 'wecom_app':
if QYWX_AM:
wecom_app(title=title, content=content)
else:
print('未启用企业微信应用消息推送')
continue
elif i == 'wecom_key':
if QYWX_KEY:
for i in range(int(len(content)/2000)+1):
wecom_key(title=title, content=content[i*2000:(i+1)*2000])
else:
print('未启用企业微信应用消息推送')
continue
elif i == 'fs_key':
if FS_KEY:
fs_key(title=title, content=content)
else:
print('未启用飞书机器人消息推送')
continue
else:
print('此类推送方式不存在')
def main():
send('title', 'content')
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,300 @@
"""
小程序打卡信息测试脚本(不执行实际打卡)
=== 配置说明 ===
环境变量配置:
变量名: hxza_dk
说明:
- 此脚本仅用于测试,只获取信息,不执行实际打卡
- 会显示:用户信息、班次信息、打卡时间窗口等
"""
import requests
import os
import logging
from datetime import datetime
from typing import Dict, Optional, Tuple
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class CheckInInfoTest:
"""打卡信息测试类(不执行实际打卡)"""
def __init__(self):
self.base_url = "https://erp.baian.tech/baian-admin"
# 解析环境变量
config = os.getenv('hxza_dk', '')
if not config:
error_msg = "❌ 缺少环境变量 hxza_dk"
logger.error(error_msg)
raise ValueError(error_msg)
parts = config.split('#')
self.token = parts[0].strip()
self.remark = parts[1].strip() if len(parts) > 1 else ""
# 配置带重试的 session
self.session = self._create_session()
# 动态获取的用户信息
self.user_name = None
self.user_mobile = None
# 动态获取的班次时间
self.start_time = None # 上班时间(如 "23:00"
self.end_time = None # 下班时间(如 "07:00"
self.is_cross_day = False # 是否跨天
# 固定参数
self.app_id = "wxa81d5d83880ea6b6"
# 动态参数(从班次信息接口获取)
self.item_id = None
self.job_date_id = None
self.job_id = None
self.address_name = None
self.longitude = None
self.latitude = None
self.headers = {
'Host': 'erp.baian.tech',
'Connection': 'keep-alive',
'appId': self.app_id,
'content-type': 'application/json',
'wxAppKeyCompanyId': self.app_id,
'token': self.token,
'Accept-Encoding': 'gzip,compress,br,deflate',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
'Referer': f"https://servicewechat.com/{self.app_id}/102/page-frame.html"
}
logger.info("=" * 60)
logger.info("📋 打卡信息测试(不执行实际打卡)")
logger.info("=" * 60)
# 初始化时获取用户信息
if not self._fetch_user_info():
raise ValueError("❌ 获取用户信息失败")
# 初始化时获取班次信息
if not self._fetch_job_details():
raise ValueError("❌ 获取班次信息失败")
# 显示打卡时间窗口
self._show_checkin_windows()
def _create_session(self) -> requests.Session:
"""创建带重试机制的 session"""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["POST", "GET"]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)
session.mount('http://', adapter)
return session
def _fetch_user_info(self) -> bool:
"""获取用户信息(姓名、手机号等)"""
try:
logger.info("\n🔍 正在获取用户信息...")
url = f"{self.base_url}/userH5/getMyDetails"
response = self.session.get(url, headers=self.headers, timeout=15)
if response.status_code != 200:
logger.error(f"❌ 获取用户信息失败: HTTP {response.status_code}")
return False
result = response.json()
if result.get('code') != 200:
logger.error(f"❌ 获取用户信息失败: {result.get('msg')}")
return False
data = result.get('data', {})
self.user_name = data.get('name', '')
self.user_mobile = data.get('mobile', '')
# 显示备注或姓名
display_name = self.remark if self.remark else self.user_name
logger.info(f"✅ 用户信息获取成功")
logger.info(f" 👤 姓名: {display_name}")
logger.info(f" 📱 手机: {self.user_mobile}")
return True
except Exception as e:
logger.error(f"❌ 获取用户信息异常: {str(e)}")
return False
def _fetch_job_details(self) -> bool:
"""获取班次信息itemId、jobDateId、经纬度、班次时间等"""
try:
logger.info("\n🔍 正在获取班次信息...")
url = f"{self.base_url}/Management/getJobUserDetails"
params = {"userId": ""}
response = self.session.get(url, params=params, headers=self.headers, timeout=15)
if response.status_code != 200:
logger.error(f"❌ 获取班次信息失败: HTTP {response.status_code}")
return False
result = response.json()
if result.get('code') != 200:
logger.error(f"❌ 获取班次信息失败: {result.get('msg')}")
return False
data = result.get('data', {})
# 提取 itemId 和 jobId
self.item_id = data.get('itemId')
self.job_id = data.get('jobId')
# 提取班次时段信息
job_date_dtos = data.get('jobDateDTOS', [])
if job_date_dtos:
job_date = job_date_dtos[0]
self.job_date_id = job_date.get('jobDateId')
self.start_time = job_date.get('startDate') # 如 "23:00"
self.end_time = job_date.get('endDate') # 如 "07:00"
self.is_cross_day = job_date.get('ismorrow') == 1 # 是否跨天
# 提取地址信息(经纬度)
address_dtos = data.get('addressDTOS', [])
if address_dtos:
address = address_dtos[0]
self.address_name = address.get('addressName')
self.longitude = float(address.get('longitude'))
self.latitude = float(address.get('latitude'))
# 验证必需参数
if not all([self.item_id, self.job_date_id, self.address_name, self.longitude, self.latitude, self.start_time, self.end_time]):
logger.error(f"❌ 班次信息不完整")
return False
logger.info(f"✅ 班次信息获取成功")
logger.info(f" 📍 打卡地点: {self.address_name}")
logger.info(f" 🌐 经纬度: {self.longitude}, {self.latitude}")
logger.info(f" ⏰ 班次时间: {self.start_time} - {self.end_time}{' (跨天)' if self.is_cross_day else ''}")
# logger.info(f" 🆔 项目ID: {self.item_id}")
# logger.info(f" 🆔 班次ID: {self.job_id}")
# logger.info(f" 🆔 时段ID: {self.job_date_id}")
return True
except Exception as e:
logger.error(f"❌ 获取班次信息异常: {str(e)}")
return False
def _time_to_minutes(self, time_str: str) -> int:
"""时间字符串转为分钟数"""
h, m = map(int, time_str.split(':'))
return h * 60 + m
def _minutes_to_time(self, minutes: int) -> str:
"""分钟数转时间字符串"""
h, m = divmod(minutes % 1440, 60)
return f"{h:02d}:{m:02d}"
def _get_checkin_windows(self) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""
根据班次时间,自动计算打卡时间窗口
规则:
- 第一次打卡上班start_time 前2小时 到 start_time
- 第二次打卡下班end_time+1分钟 到 (start_time - 2小时 - 1分钟)
Returns:
((first_start, first_end), (second_start, second_end)) 分钟数
"""
start_minutes = self._time_to_minutes(self.start_time)
end_minutes = self._time_to_minutes(self.end_time)
# 第一次打卡窗口start_time - 120分钟 到 start_time
first_start = (start_minutes - 120) % 1440 # 1440 = 24*60
first_end = start_minutes
# 第二次打卡窗口end_time + 1分钟 到 (start_time - 121分钟)
second_start = (end_minutes + 1) % 1440
second_end = (start_minutes - 121) % 1440
return (first_start, first_end), (second_start, second_end)
def _is_in_window(self, current_minutes: int, start: int, end: int) -> bool:
"""检查当前时间是否在窗口内"""
if start <= end:
# 同一天内的窗口(如 21:00-23:00
return start <= current_minutes <= end
else:
# 跨天的窗口(如 23:00-05:59 跨天)
return current_minutes >= start or current_minutes <= end
def _show_checkin_windows(self):
"""显示打卡时间窗口信息"""
logger.info("\n📅 打卡时间窗口:")
(first_start, first_end), (second_start, second_end) = self._get_checkin_windows()
first_start_str = self._minutes_to_time(first_start)
first_end_str = self._minutes_to_time(first_end)
second_start_str = self._minutes_to_time(second_start)
second_end_str = self._minutes_to_time(second_end)
logger.info(f" 🌅 上班打卡窗口: {first_start_str} - {first_end_str}")
logger.info(f" 🌆 下班打卡窗口: {second_start_str} - {second_end_str}")
# 检查当前时间
now = datetime.now()
current_minutes = now.hour * 60 + now.minute
current_time_str = now.strftime('%H:%M:%S')
logger.info(f"\n🕐 当前时间: {current_time_str}")
if self._is_in_window(current_minutes, first_start, first_end):
logger.info(f" ✅ 当前在【上班打卡】窗口内")
elif self._is_in_window(current_minutes, second_start, second_end):
logger.info(f" ✅ 当前在【下班打卡】窗口内")
else:
logger.info(f" ⏰ 当前不在任何打卡窗口内")
def main():
"""主函数"""
try:
logger.info("\n🚀 开始测试...\n")
test = CheckInInfoTest()
logger.info("\n" + "=" * 60)
logger.info("✅ 所有信息获取成功!")
logger.info("=" * 60)
logger.info("\n💡 提示: 这是测试模式,未执行实际打卡操作")
logger.info("💡 如需执行打卡,请运行 auto_checkin.py\n")
except KeyboardInterrupt:
logger.info("\n⚠️ 用户中断程序")
except Exception as e:
error_msg = f"\n❌ 程序异常: {str(e)}"
logger.error(error_msg)
exit(1)
if __name__ == '__main__':
main()