This commit is contained in:
2025-07-05 12:49:56 +08:00
parent 5e18dadad5
commit 287c99c88e

128
main.py
View File

@ -1,35 +1,38 @@
import hashlib
import dotenv
import base64 import base64
import hashlib
import json import json
import re
import logging import logging
from typing import List, Optional,Callable import re
from time import sleep,time
import requests
from bs4 import BeautifulSoup
from os import environ
from pathlib import Path
import pandas as pd
from collections import deque from collections import deque
from datetime import datetime from datetime import datetime
from os import environ
from pathlib import Path
from time import sleep, time
from typing import Callable, List, Optional
import dotenv
import pandas as pd
import requests
from bs4 import BeautifulSoup
from serverchan_sdk import sc_send as _sc_send from serverchan_sdk import sc_send as _sc_send
# 配置日志 # 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
Path("logs").mkdir(exist_ok=True) # 确保日志目录存在 Path("logs").mkdir(exist_ok=True) # 确保日志目录存在
now = datetime.now().strftime("%Y%m%d_%H%M%S") now = datetime.now().strftime("%Y%m%d_%H%M%S")
file = logging.FileHandler(f'logs/lessons_{now}.log', mode='w', encoding='utf-8') file = logging.FileHandler(f"logs/lessons_{now}.log", mode="w", encoding="utf-8")
file.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) file.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
file.setLevel(logging.INFO) file.setLevel(logging.INFO)
logger.addHandler(file) # 日志输出到文件 logger.addHandler(file) # 日志输出到文件
class LessonsException(Exception): class LessonsException(Exception):
"""自定义异常类""" """自定义异常类"""
pass pass
@ -43,6 +46,7 @@ def sc_send(title: str, desp: str):
except Exception as e: except Exception as e:
logger.error(f"发送失败: {e}") logger.error(f"发送失败: {e}")
class Lessons: class Lessons:
def __init__(self): def __init__(self):
@ -54,24 +58,37 @@ class Lessons:
dotenv.load_dotenv() dotenv.load_dotenv()
# 检查必需的环境变量 # 检查必需的环境变量
required_keys = ["uname", "password", "recap_username", "recap_password","FILE","SC_KEY"] required_keys = [
"uname",
"password",
"recap_username",
"recap_password",
"FILE",
"SC_KEY",
]
for key in required_keys: for key in required_keys:
if not environ.get(key): if not environ.get(key):
raise LessonsException(f"请在环境变量中设置{key}") raise LessonsException(f"请在环境变量中设置{key}")
self.base = environ.get("base", "http://jwstudent.lnu.edu.cn") self.base = environ.get("base", "http://jwstudent.lnu.edu.cn")
def _retry_request(self, func, max_retries: int = 10, error_msg: str = "请求失败") -> requests.Response: def _retry_request(
self, func, max_retries: int = 10, error_msg: str = "请求失败"
) -> requests.Response:
"""通用的请求重试方法""" """通用的请求重试方法"""
for attempt in range(1, max_retries + 1): for attempt in range(1, max_retries + 1):
try: try:
response = func() response = func()
self.judge_logout(response) self.judge_logout(response)
return response return response
except (requests.ConnectionError, requests.HTTPError, requests.Timeout) as e: except (
requests.ConnectionError,
requests.HTTPError,
requests.Timeout,
) as e:
logger.warning(f"{error_msg}{type(e).__name__}: {e}") logger.warning(f"{error_msg}{type(e).__name__}: {e}")
if attempt < max_retries: if attempt < max_retries:
logger.info(f'{attempt}次重试') logger.info(f"{attempt}次重试")
else: else:
raise LessonsException(f"{error_msg}!请检查网络连接!") raise LessonsException(f"{error_msg}!请检查网络连接!")
@ -92,24 +109,24 @@ class Lessons:
"password": recap_password, "password": recap_password,
"ID": "04897896", "ID": "04897896",
"b64": b64, "b64": b64,
"version": "3.1.1" "version": "3.1.1",
} }
try: try:
response = requests.post("http://www.fdyscloud.com.cn/tuling/predict", response = requests.post(
json=data, timeout=10) "http://www.fdyscloud.com.cn/tuling/predict", json=data, timeout=10
)
response.raise_for_status() response.raise_for_status()
result = response.json() result = response.json()
return result["data"]["result"] return result["data"]["result"]
except (requests.RequestException, KeyError, json.JSONDecodeError) as e: except (requests.RequestException, KeyError, json.JSONDecodeError) as e:
raise LessonsException(f"验证码识别失败: {e}") raise LessonsException(f"验证码识别失败: {e}")
@staticmethod @staticmethod
def pwd_md5(string: str) -> str: def pwd_md5(string: str) -> str:
md5_part1 = hashlib.md5((string + "{Urp602019}").encode()).hexdigest().lower() md5_part1 = hashlib.md5((string + "{Urp602019}").encode()).hexdigest().lower()
md5_part2 = hashlib.md5(string.encode()).hexdigest().lower() md5_part2 = hashlib.md5(string.encode()).hexdigest().lower()
final_result = md5_part1 + '*' + md5_part2 final_result = md5_part1 + "*" + md5_part2
return final_result return final_result
def _login(self): def _login(self):
@ -120,7 +137,6 @@ class Lessons:
if not username or not password: if not username or not password:
raise LessonsException("用户名或密码未设置") raise LessonsException("用户名或密码未设置")
try: try:
# 获取登录页面的token # 获取登录页面的token
req = self.session.get("http://jwstudent.lnu.edu.cn/login") req = self.session.get("http://jwstudent.lnu.edu.cn/login")
@ -135,7 +151,7 @@ class Lessons:
req = self.session.get(f"{self.base}/img/captcha.jpg") req = self.session.get(f"{self.base}/img/captcha.jpg")
req.raise_for_status() req.raise_for_status()
im = req.content im = req.content
b64 = base64.b64encode(im).decode('utf-8') b64 = base64.b64encode(im).decode("utf-8")
captcha_code = self.recapture(b64=b64) captcha_code = self.recapture(b64=b64)
logger.info(f"验证码识别结果: {captcha_code}") logger.info(f"验证码识别结果: {captcha_code}")
@ -147,19 +163,19 @@ class Lessons:
"j_username": username, "j_username": username,
"j_password": hashed_password, "j_password": hashed_password,
"j_captcha": captcha_code, "j_captcha": captcha_code,
"tokenValue": token_value "tokenValue": token_value,
} }
# 发送 POST 请求 # 发送 POST 请求
url = f"{self.base}/j_spring_security_check" url = f"{self.base}/j_spring_security_check"
headers = { headers = {
"User-Agent": "Mozilla/5.0", "User-Agent": "Mozilla/5.0",
"Content-Type": "application/x-www-form-urlencoded" "Content-Type": "application/x-www-form-urlencoded",
} }
response = self.session.post(url, data=payload, headers=headers) response = self.session.post(url, data=payload, headers=headers)
if "发生错误" in response.text: if "发生错误" in response.text:
err = re.search(r'<strong>发生错误!</strong>(.+)', response.text) err = re.search(r"<strong>发生错误!</strong>(.+)", response.text)
if err: if err:
error_message = err.group(1).strip() error_message = err.group(1).strip()
raise LessonsException(f"登录失败: {error_message}") raise LessonsException(f"登录失败: {error_message}")
@ -171,7 +187,6 @@ class Lessons:
except requests.RequestException as e: except requests.RequestException as e:
raise LessonsException(f"登录过程中网络错误: {e}") raise LessonsException(f"登录过程中网络错误: {e}")
def judge_logout(self, response: requests.Response): def judge_logout(self, response: requests.Response):
"""检查账号是否在其他地方被登录""" """检查账号是否在其他地方被登录"""
if response.url == f"{self.base}/login?errorCode=concurrentSessionExpired": if response.url == f"{self.base}/login?errorCode=concurrentSessionExpired":
@ -187,7 +202,9 @@ class Lessons:
raise LessonsException("未找到培养方案编号") raise LessonsException("未找到培养方案编号")
self.fajhh = match.group(1) self.fajhh = match.group(1)
res = self.session.get(f"{self.base}/student/courseSelect/planCourse/index?fajhh={self.fajhh}") res = self.session.get(
f"{self.base}/student/courseSelect/planCourse/index?fajhh={self.fajhh}"
)
res.raise_for_status() res.raise_for_status()
html = res.text html = res.text
bs = BeautifulSoup(html, "html.parser") bs = BeautifulSoup(html, "html.parser")
@ -203,15 +220,17 @@ class Lessons:
file = Path(environ.get("FILE", "class.xlsx")) file = Path(environ.get("FILE", "class.xlsx"))
if not file.is_file(): if not file.is_file():
raise LessonsException(f"课程文件 {file} 不存在,请检查路径") raise LessonsException(f"课程文件 {file} 不存在,请检查路径")
d:dict[str, Callable[[Path], pd.DataFrame]] = { d: dict[str, Callable[[Path], pd.DataFrame]] = {
".csv": pd.read_csv, ".csv": pd.read_csv,
".xlsx": pd.read_excel, ".xlsx": pd.read_excel,
".xls": pd.read_excel, ".xls": pd.read_excel,
".json": pd.read_json ".json": pd.read_json,
} }
func = d.get(file.suffix.lower()) func = d.get(file.suffix.lower())
if func is None: if func is None:
raise LessonsException(f"不支持的文件格式: {file.suffix}. 仅支持 .csv, .xlsx, .xls, .json 格式") raise LessonsException(
f"不支持的文件格式: {file.suffix}. 仅支持 .csv, .xlsx, .xls, .json 格式"
)
df = func(file) df = func(file)
df.columns = df.columns.str.strip() # 去除列名两端的空格 df.columns = df.columns.str.strip() # 去除列名两端的空格
for col in ["课程号", "课序号", "课程名"]: for col in ["课程号", "课序号", "课程名"]:
@ -221,10 +240,10 @@ class Lessons:
df.columns = ["id", "kxh", "name"] # 重命名列 df.columns = ["id", "kxh", "name"] # 重命名列
df = df.drop_duplicates(subset=["id", "kxh"]) # 去重 df = df.drop_duplicates(subset=["id", "kxh"]) # 去重
for line in df.itertuples(index=False): for line in df.itertuples(index=False):
classes.append((line.id, "%02d"%line.kxh, line.name)) classes.append((line.id, "%02d" % line.kxh, line.name))
return classes return classes
def get_left(self, cl: tuple[str, list[str], str]) -> list[int]|None: def get_left(self, cl: tuple[str, list[str], str]) -> list[int] | None:
"""获取课程余量""" """获取课程余量"""
url = f"{self.base}/student/courseSelect/planCourse/courseList" url = f"{self.base}/student/courseSelect/planCourse/courseList"
params = { params = {
@ -238,7 +257,7 @@ class Lessons:
"kzh": "", "kzh": "",
"xqh": "", "xqh": "",
"xq": 0, "xq": 0,
"jc": 0 "jc": 0,
} }
response = self._retry_request(lambda: self.session.post(url, data=params)) response = self._retry_request(lambda: self.session.post(url, data=params))
data = response.json()["kylMap"] data = response.json()["kylMap"]
@ -251,12 +270,14 @@ class Lessons:
key = f"{self.term}_{cl[0]}_{kxh}" key = f"{self.term}_{cl[0]}_{kxh}"
left = data.get(key, None) left = data.get(key, None)
if left is None: if left is None:
logger.error(f"课程 {cl[2]} 的余量信息不存在: {key} not in {data.keys()}") logger.error(
f"课程 {cl[2]} 的余量信息不存在: {key} not in {data.keys()}"
)
ret.append(-1) ret.append(-1)
ret.append(int(left)) ret.append(int(left))
return ret return ret
def select(self,cl: tuple[str,str,str]) -> bool: def select(self, cl: tuple[str, str, str]) -> bool:
"""选课""" """选课"""
url = f"{self.base}/student/courseSelect/gotoSelect/index" url = f"{self.base}/student/courseSelect/gotoSelect/index"
@ -271,7 +292,7 @@ class Lessons:
url = f"{self.base}/student/courseSelect/selectCourse/checkInputCodeAndSubmit" url = f"{self.base}/student/courseSelect/selectCourse/checkInputCodeAndSubmit"
cms = f"{cl[2]}_{cl[1]}" cms = f"{cl[2]}_{cl[1]}"
cms = ",".join(map(lambda x:str(ord(x)),cms)) cms = ",".join(map(lambda x: str(ord(x)), cms))
params = { params = {
"dealType": 2, "dealType": 2,
"fajhh": self.fajhh, "fajhh": self.fajhh,
@ -283,7 +304,7 @@ class Lessons:
"xqh": "", "xqh": "",
} }
sel_data = params.copy() sel_data = params.copy()
sel_data.update({"inputCode":"undefined", "tokenValue": token}) sel_data.update({"inputCode": "undefined", "tokenValue": token})
response = self._retry_request(lambda: self.session.post(url, data=sel_data)) response = self._retry_request(lambda: self.session.post(url, data=sel_data))
response.raise_for_status() response.raise_for_status()
if response.json().get("result") != "ok": if response.json().get("result") != "ok":
@ -308,7 +329,7 @@ class Lessons:
"redisKey": redisKey, "redisKey": redisKey,
} }
cnt = 1 cnt = 1
while cnt<=10: while cnt <= 10:
url = f"{self.base}/student/courseSelect/selectResult/query" url = f"{self.base}/student/courseSelect/selectResult/query"
response = self._retry_request(lambda: self.session.post(url, data=parms)) response = self._retry_request(lambda: self.session.post(url, data=parms))
response.raise_for_status() response.raise_for_status()
@ -325,7 +346,7 @@ class Lessons:
logger.info(f"选课成功: {text}") logger.info(f"选课成功: {text}")
else: else:
print(f"{cnt}次查询中...") print(f"{cnt}次查询中...")
cnt+=1 cnt += 1
sleep(1) sleep(1)
logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课") logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课")
@ -351,8 +372,8 @@ class Lessons:
logger.info("获取基础信息") logger.info("获取基础信息")
self.get_base_info() self.get_base_info()
classes_src = self.read_lessons() classes_src = self.read_lessons()
classes:dict[str,tuple[str,list[str],str]] = {} classes: dict[str, tuple[str, list[str], str]] = {}
mp:dict[str,str] = {} # {"课程号": "课程名称"} mp: dict[str, str] = {} # {"课程号": "课程名称"}
for id, kxh, name in classes_src: for id, kxh, name in classes_src:
mp[id] = name mp[id] = name
if classes.get(id) is None: if classes.get(id) is None:
@ -391,10 +412,12 @@ class Lessons:
errs.appendleft(time()) errs.appendleft(time())
logger.error(f"获取课程 {lcl[2]} 余量时发生错误: {e}") logger.error(f"获取课程 {lcl[2]} 余量时发生错误: {e}")
continue continue
for i,left in enumerate(lefts): for i, left in enumerate(lefts):
cl = (lcl[0], lcl[1][i], lcl[2]) cl = (lcl[0], lcl[1][i], lcl[2])
if left > 0: if left > 0:
logger.info(f"课程 {cl[2]}_{cl[1]} 有余量: {left},开始选课") logger.info(
f"课程 {cl[2]}_{cl[1]} 有余量: {left},开始选课"
)
try: try:
ret = self.select(cl) ret = self.select(cl)
if ret: if ret:
@ -411,12 +434,16 @@ class Lessons:
else: else:
logger.info(f"课程 {cl[2]}_{cl[1]} 无余量") logger.info(f"课程 {cl[2]}_{cl[1]} 无余量")
sleep(2) sleep(2)
logger.info(f"当前还有{len(classes)}门课程未选上,分别为{','.join(mp[i] for i in classes.keys())}。等待10秒后继续检查") logger.info(
f"当前还有{len(classes)}门课程未选上,分别为{','.join(mp[i] for i in classes.keys())}。等待10秒后继续检查"
)
if suc: if suc:
sc_send("选课成功", desp=f"已成功选上课程: {', '.join(f'{i[2]}_{i[1]}' for i in suc)}") sc_send(
"选课成功",
desp=f"已成功选上课程: {', '.join(f'{i[2]}_{i[1]}' for i in suc)}",
)
sleep(10) # 等待10秒后继续检查 sleep(10) # 等待10秒后继续检查
logger.info("自动选课完成") logger.info("自动选课完成")
except LessonsException as e: except LessonsException as e:
@ -428,6 +455,7 @@ class Lessons:
sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}") sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}")
raise e raise e
if __name__ == "__main__": if __name__ == "__main__":
les = Lessons() les = Lessons()
les.auto_spider() les.auto_spider()