diff --git a/main.py b/main.py
index 15700bd..3f4ca69 100644
--- a/main.py
+++ b/main.py
@@ -1,35 +1,38 @@
-import hashlib
-import dotenv
import base64
+import hashlib
import json
-import re
import logging
-from typing import List, Optional,Callable
-from time import sleep,time
-
-import requests
-
-from bs4 import BeautifulSoup
-from os import environ
-from pathlib import Path
-import pandas as pd
+import re
from collections import deque
from datetime import datetime
+from os import environ
+from pathlib import Path
+from time import sleep, time
+from typing import Callable, List, Optional
+
+import dotenv
+import pandas as pd
+import requests
+from bs4 import BeautifulSoup
from serverchan_sdk import sc_send as _sc_send
# 配置日志
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+)
logger = logging.getLogger(__name__)
Path("logs").mkdir(exist_ok=True) # 确保日志目录存在
now = datetime.now().strftime("%Y%m%d_%H%M%S")
-file = logging.FileHandler(f'logs/lessons_{now}.log', mode='w', encoding='utf-8')
-file.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
+file = logging.FileHandler(f"logs/lessons_{now}.log", mode="w", encoding="utf-8")
+file.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
file.setLevel(logging.INFO)
logger.addHandler(file) # 日志输出到文件
+
class LessonsException(Exception):
"""自定义异常类"""
+
pass
@@ -43,6 +46,7 @@ def sc_send(title: str, desp: str):
except Exception as e:
logger.error(f"发送失败: {e}")
+
class Lessons:
def __init__(self):
@@ -52,29 +56,42 @@ class Lessons:
# 加载环境变量
dotenv.load_dotenv()
-
+
# 检查必需的环境变量
- required_keys = ["uname", "password", "recap_username", "recap_password","FILE","SC_KEY"]
+ required_keys = [
+ "uname",
+ "password",
+ "recap_username",
+ "recap_password",
+ "FILE",
+ "SC_KEY",
+ ]
for key in required_keys:
if not environ.get(key):
raise LessonsException(f"请在环境变量中设置{key}")
-
+
self.base = environ.get("base", "http://jwstudent.lnu.edu.cn")
- def _retry_request(self, func, max_retries: int = 10, error_msg: str = "请求失败") -> requests.Response:
+ def _retry_request(
+ self, func, max_retries: int = 10, error_msg: str = "请求失败"
+ ) -> requests.Response:
"""通用的请求重试方法"""
for attempt in range(1, max_retries + 1):
try:
response = func()
self.judge_logout(response)
return response
- except (requests.ConnectionError, requests.HTTPError, requests.Timeout) as e:
+ except (
+ requests.ConnectionError,
+ requests.HTTPError,
+ requests.Timeout,
+ ) as e:
logger.warning(f"{error_msg}!{type(e).__name__}: {e}")
if attempt < max_retries:
- logger.info(f'第{attempt}次重试')
+ logger.info(f"第{attempt}次重试")
else:
raise LessonsException(f"{error_msg}!请检查网络连接!")
-
+
# 这行不会被执行,但为了类型检查
raise LessonsException(f"{error_msg}!重试次数耗尽")
@@ -83,44 +100,43 @@ class Lessons:
"""验证码识别"""
recap_username = environ.get("recap_username")
recap_password = environ.get("recap_password")
-
+
if not recap_username or not recap_password:
raise LessonsException("验证码识别服务配置不完整")
-
+
data = {
"username": recap_username,
"password": recap_password,
"ID": "04897896",
"b64": b64,
- "version": "3.1.1"
+ "version": "3.1.1",
}
-
+
try:
- response = requests.post("http://www.fdyscloud.com.cn/tuling/predict",
- json=data, timeout=10)
+ response = requests.post(
+ "http://www.fdyscloud.com.cn/tuling/predict", json=data, timeout=10
+ )
response.raise_for_status()
result = response.json()
return result["data"]["result"]
except (requests.RequestException, KeyError, json.JSONDecodeError) as e:
raise LessonsException(f"验证码识别失败: {e}")
-
@staticmethod
def pwd_md5(string: str) -> str:
md5_part1 = hashlib.md5((string + "{Urp602019}").encode()).hexdigest().lower()
md5_part2 = hashlib.md5(string.encode()).hexdigest().lower()
- final_result = md5_part1 + '*' + md5_part2
+ final_result = md5_part1 + "*" + md5_part2
return final_result
def _login(self):
"""登录模块"""
username = environ.get("uname")
password = environ.get("password")
-
+
if not username or not password:
raise LessonsException("用户名或密码未设置")
-
-
+
try:
# 获取登录页面的token
req = self.session.get("http://jwstudent.lnu.edu.cn/login")
@@ -135,9 +151,9 @@ class Lessons:
req = self.session.get(f"{self.base}/img/captcha.jpg")
req.raise_for_status()
im = req.content
- b64 = base64.b64encode(im).decode('utf-8')
+ b64 = base64.b64encode(im).decode("utf-8")
captcha_code = self.recapture(b64=b64)
-
+
logger.info(f"验证码识别结果: {captcha_code}")
hashed_password = self.pwd_md5(password)
@@ -147,31 +163,30 @@ class Lessons:
"j_username": username,
"j_password": hashed_password,
"j_captcha": captcha_code,
- "tokenValue": token_value
+ "tokenValue": token_value,
}
# 发送 POST 请求
url = f"{self.base}/j_spring_security_check"
headers = {
"User-Agent": "Mozilla/5.0",
- "Content-Type": "application/x-www-form-urlencoded"
+ "Content-Type": "application/x-www-form-urlencoded",
}
response = self.session.post(url, data=payload, headers=headers)
if "发生错误" in response.text:
- err = re.search(r'发生错误!(.+)', response.text)
+ err = re.search(r"发生错误!(.+)", response.text)
if err:
error_message = err.group(1).strip()
raise LessonsException(f"登录失败: {error_message}")
raise LessonsException("登录失败")
-
+
logger.info("登录成功")
return True
-
+
except requests.RequestException as e:
raise LessonsException(f"登录过程中网络错误: {e}")
-
-
+
def judge_logout(self, response: requests.Response):
"""检查账号是否在其他地方被登录"""
if response.url == f"{self.base}/login?errorCode=concurrentSessionExpired":
@@ -186,8 +201,10 @@ class Lessons:
print(html)
raise LessonsException("未找到培养方案编号")
self.fajhh = match.group(1)
-
- res = self.session.get(f"{self.base}/student/courseSelect/planCourse/index?fajhh={self.fajhh}")
+
+ res = self.session.get(
+ f"{self.base}/student/courseSelect/planCourse/index?fajhh={self.fajhh}"
+ )
res.raise_for_status()
html = res.text
bs = BeautifulSoup(html, "html.parser")
@@ -195,23 +212,25 @@ class Lessons:
if not match:
raise LessonsException("未找到学期信息")
self.term = str(match.get("value"))
-
+
print(self.fajhh, self.term)
-
+
def read_lessons(self) -> List[tuple[str, str, str]]:
classes = []
file = Path(environ.get("FILE", "class.xlsx"))
if not file.is_file():
raise LessonsException(f"课程文件 {file} 不存在,请检查路径")
- d:dict[str, Callable[[Path], pd.DataFrame]] = {
- ".csv": pd.read_csv,
- ".xlsx": pd.read_excel,
+ d: dict[str, Callable[[Path], pd.DataFrame]] = {
+ ".csv": pd.read_csv,
+ ".xlsx": pd.read_excel,
".xls": pd.read_excel,
- ".json": pd.read_json
+ ".json": pd.read_json,
}
func = d.get(file.suffix.lower())
if func is None:
- raise LessonsException(f"不支持的文件格式: {file.suffix}. 仅支持 .csv, .xlsx, .xls, .json 格式")
+ raise LessonsException(
+ f"不支持的文件格式: {file.suffix}. 仅支持 .csv, .xlsx, .xls, .json 格式"
+ )
df = func(file)
df.columns = df.columns.str.strip() # 去除列名两端的空格
for col in ["课程号", "课序号", "课程名"]:
@@ -221,10 +240,10 @@ class Lessons:
df.columns = ["id", "kxh", "name"] # 重命名列
df = df.drop_duplicates(subset=["id", "kxh"]) # 去重
for line in df.itertuples(index=False):
- classes.append((line.id, "%02d"%line.kxh, line.name))
+ classes.append((line.id, "%02d" % line.kxh, line.name))
return classes
- def get_left(self, cl: tuple[str, list[str], str]) -> list[int]|None:
+ def get_left(self, cl: tuple[str, list[str], str]) -> list[int] | None:
"""获取课程余量"""
url = f"{self.base}/student/courseSelect/planCourse/courseList"
params = {
@@ -238,27 +257,29 @@ class Lessons:
"kzh": "",
"xqh": "",
"xq": 0,
- "jc": 0
+ "jc": 0,
}
response = self._retry_request(lambda: self.session.post(url, data=params))
data = response.json()["kylMap"]
if len(data) == 0:
logger.error(f"课程 {cl[2]} 的余量信息为空: {data}")
return
-
+
ret = []
for kxh in cl[1]:
key = f"{self.term}_{cl[0]}_{kxh}"
left = data.get(key, None)
if left is None:
- logger.error(f"课程 {cl[2]} 的余量信息不存在: {key} not in {data.keys()}")
+ logger.error(
+ f"课程 {cl[2]} 的余量信息不存在: {key} not in {data.keys()}"
+ )
ret.append(-1)
ret.append(int(left))
return ret
-
- def select(self,cl: tuple[str,str,str]) -> bool:
+
+ def select(self, cl: tuple[str, str, str]) -> bool:
"""选课"""
-
+
url = f"{self.base}/student/courseSelect/gotoSelect/index"
response = self._retry_request(lambda: self.session.get(url))
response.raise_for_status()
@@ -268,10 +289,10 @@ class Lessons:
logger.error("未找到 tokenValue")
return False
token = match.group(1)
-
+
url = f"{self.base}/student/courseSelect/selectCourse/checkInputCodeAndSubmit"
cms = f"{cl[2]}_{cl[1]}"
- cms = ",".join(map(lambda x:str(ord(x)),cms))
+ cms = ",".join(map(lambda x: str(ord(x)), cms))
params = {
"dealType": 2,
"fajhh": self.fajhh,
@@ -283,15 +304,15 @@ class Lessons:
"xqh": "",
}
sel_data = params.copy()
- sel_data.update({"inputCode":"undefined", "tokenValue": token})
+ sel_data.update({"inputCode": "undefined", "tokenValue": token})
response = self._retry_request(lambda: self.session.post(url, data=sel_data))
response.raise_for_status()
if response.json().get("result") != "ok":
logger.error(f"选课时发生错误: {response}")
return False
-
+
logger.info("选课请求已发送,等待结果...")
-
+
url = f"{self.base}/student/courseSelect/selectCourses/waitingfor"
response = self._retry_request(lambda: self.session.post(url, data=params))
response.raise_for_status()
@@ -300,15 +321,15 @@ class Lessons:
if not redisKey:
print(html)
logger.error(f"选课 {cl[2]} 时未找到 redisKey")
- return False
+ return False
redisKey = redisKey.group(1)
-
+
parms = {
"kcNum": 1,
"redisKey": redisKey,
}
cnt = 1
- while cnt<=10:
+ while cnt <= 10:
url = f"{self.base}/student/courseSelect/selectResult/query"
response = self._retry_request(lambda: self.session.post(url, data=parms))
response.raise_for_status()
@@ -325,12 +346,12 @@ class Lessons:
logger.info(f"选课成功: {text}")
else:
print(f"第{cnt}次查询中...")
- cnt+=1
+ cnt += 1
sleep(1)
-
+
logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课")
return False
-
+
def login(self):
logger.info("尝试登录")
flag = False
@@ -343,7 +364,7 @@ class Lessons:
logger.error(f"登录失败: {e}")
if not flag:
raise LessonsException("登录失败,无法获取token")
-
+
def auto_spider(self):
"""自动选课主程序"""
try:
@@ -351,21 +372,21 @@ class Lessons:
logger.info("获取基础信息")
self.get_base_info()
classes_src = self.read_lessons()
- classes:dict[str,tuple[str,list[str],str]] = {}
- mp:dict[str,str] = {} # {"课程号": "课程名称"}
+ classes: dict[str, tuple[str, list[str], str]] = {}
+ mp: dict[str, str] = {} # {"课程号": "课程名称"}
for id, kxh, name in classes_src:
mp[id] = name
if classes.get(id) is None:
classes[id] = (id, [kxh], name)
else:
classes[id][1].append(kxh)
-
+
logger.info(f"读取课程信息,共有 {len(classes)} 门课程")
for cl in classes.values():
logger.info(f"课程 {cl[2]} ({cl[0]}) 的可选课序号: {', '.join(cl[1])}")
-
+
logger.info("开始自动选课")
-
+
errs = deque(maxlen=5)
master_err = 0
while classes:
@@ -391,10 +412,12 @@ class Lessons:
errs.appendleft(time())
logger.error(f"获取课程 {lcl[2]} 余量时发生错误: {e}")
continue
- for i,left in enumerate(lefts):
+ for i, left in enumerate(lefts):
cl = (lcl[0], lcl[1][i], lcl[2])
if left > 0:
- logger.info(f"课程 {cl[2]}_{cl[1]} 有余量: {left},开始选课")
+ logger.info(
+ f"课程 {cl[2]}_{cl[1]} 有余量: {left},开始选课"
+ )
try:
ret = self.select(cl)
if ret:
@@ -411,14 +434,18 @@ class Lessons:
else:
logger.info(f"课程 {cl[2]}_{cl[1]} 无余量")
sleep(2)
- logger.info(f"当前还有{len(classes)}门课程未选上,分别为{','.join(mp[i] for i in classes.keys())}。等待10秒后继续检查")
+ logger.info(
+ f"当前还有{len(classes)}门课程未选上,分别为{','.join(mp[i] for i in classes.keys())}。等待10秒后继续检查"
+ )
if suc:
- sc_send("选课成功", desp=f"已成功选上课程: {', '.join(f'{i[2]}_{i[1]}' for i in suc)}")
+ sc_send(
+ "选课成功",
+ desp=f"已成功选上课程: {', '.join(f'{i[2]}_{i[1]}' for i in suc)}",
+ )
sleep(10) # 等待10秒后继续检查
-
-
+
logger.info("自动选课完成")
-
+
except LessonsException as e:
logger.error(f"选课过程中发生错误: {e}")
sc_send("选课异常", desp=f"选课过程中发生错误: {e}")
@@ -428,6 +455,7 @@ class Lessons:
sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}")
raise e
+
if __name__ == "__main__":
les = Lessons()
les.auto_spider()