import base64 import hashlib import json import logging import re from datetime import datetime from os import environ from pathlib import Path from typing import Iterable, NoReturn, Optional import requests from dotenv import load_dotenv from serverchan_sdk import sc_send as _sc_send class LessonsException(Exception): """自定义异常类""" pass class ReloginException(LessonsException): """用于处理需要重新登录的异常""" pass # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) def sc_send(title: str, desp: str): if not environ.get("SC_KEY"): logger.error("SC_KEY 未设置,无法发送通知") return sc_key = environ.get("SC_KEY") try: _sc_send(sc_key, title, desp, options={"tags": "自动选课"}) except Exception as e: logger.error(f"发送失败: {e}") def log_init(): Path("logs").mkdir(exist_ok=True) # 确保日志目录存在 now = datetime.now().strftime("%Y%m%d_%H%M%S") file = logging.FileHandler(f"logs/lessons_{now}.log", mode="w", encoding="utf-8") file.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) file.setLevel(logging.INFO) logger.addHandler(file) # 日志输出到文件 class URP: def __init__(self, base=None): self.session = requests.session() if base is None: self.base = environ.get("base", "http://jwstudent.lnu.edu.cn") else: self.base = base @staticmethod def env_check(required_keys: Optional[Iterable] = None) -> None | NoReturn: # 检查必需的环境变量 if required_keys is None: required_keys = [ "uname", "password", "recap_username", "recap_password", "FILE", ] for key in required_keys: if not environ.get(key): raise LessonsException(f"请在环境变量中设置{key}") @staticmethod def _retry_request( func, max_retries: int = 10, error_msg: str = "请求失败" ) -> requests.Response: """通用的请求重试方法""" for attempt in range(1, max_retries + 1): try: response = func() print(response.url) URP._judge_logout(response) return response except ( requests.ConnectionError, requests.HTTPError, requests.Timeout, ) as e: logger.warning(f"{error_msg}!{type(e).__name__}: {e}") if attempt < max_retries: logger.info(f"第{attempt}次重试") else: raise LessonsException(f"{error_msg}!请检查网络连接!") # 这行不会被执行,但为了类型检查 raise LessonsException(f"{error_msg}!重试次数耗尽") @staticmethod def recapture(b64: str) -> str: """验证码识别""" recap_username = environ.get("recap_username") recap_password = environ.get("recap_password") if not recap_username or not recap_password: raise LessonsException("验证码识别服务配置不完整") data = { "username": recap_username, "password": recap_password, "ID": "04897896", "b64": b64, "version": "3.1.1", } try: response = requests.post( "http://www.fdyscloud.com.cn/tuling/predict", json=data, timeout=10 ) response.raise_for_status() result = response.json() return result["data"]["result"] except (requests.RequestException, KeyError, json.JSONDecodeError) as e: raise LessonsException(f"验证码识别失败: {e}") @staticmethod def pwd_md5(string: str) -> str: md5_part1 = hashlib.md5((string + "{Urp602019}").encode()).hexdigest().lower() md5_part2 = hashlib.md5(string.encode()).hexdigest().lower() final_result = md5_part1 + "*" + md5_part2 return final_result def _login(self): """登录模块""" username = environ.get("uname") password = environ.get("password") if not username or not password: raise LessonsException("用户名或密码未设置") try: # 获取登录页面的token req = self.session.get("http://jwstudent.lnu.edu.cn/login") req.raise_for_status() html = req.text match = re.search(r'name="tokenValue" value="(.+?)">', html) if not match: raise LessonsException("未找到 tokenValue") token_value = match.group(1) # 获取验证码 req = self.session.get(f"{self.base}/img/captcha.jpg") req.raise_for_status() im = req.content b64 = base64.b64encode(im).decode("utf-8") captcha_code = self.recapture(b64=b64) logger.info(f"验证码识别结果: {captcha_code}") hashed_password = self.pwd_md5(password) # 模拟请求的 payload payload = { "j_username": username, "j_password": hashed_password, "j_captcha": captcha_code, "tokenValue": token_value, } # 发送 POST 请求 url = f"{self.base}/j_spring_security_check" headers = { "User-Agent": "Mozilla/5.0", "Content-Type": "application/x-www-form-urlencoded", } response = self.session.post(url, data=payload, headers=headers) if "发生错误" in response.text: err = re.search(r"发生错误!(.+)", response.text) if err: error_message = err.group(1).strip() raise LessonsException(f"登录失败: {error_message}") raise LessonsException("登录失败") logger.info("登录成功") return True except requests.RequestException as e: raise LessonsException(f"登录过程中网络错误: {e}") def login(self): logger.info("尝试登录") flag = False for i in range(10): try: if self._login(): flag = True break except Exception as e: logger.error(f"登录失败: {e}") if not flag: raise LessonsException("登录失败,无法获取token") @staticmethod def _judge_logout(response: requests.Response): """检查账号是否在其他地方被登录""" if "/login" in response.url: raise ReloginException("有人登录了您的账号!") load_dotenv(override=True) log_init()