From cc7fdca070478b5198910556be54cdc390824fd0 Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Sat, 5 Jul 2025 12:49:14 +0800
Subject: [PATCH] init2
---
main.py | 244 ++++++++++++++++++++++++++++----------------------------
1 file changed, 122 insertions(+), 122 deletions(-)
diff --git a/main.py b/main.py
index 3f4ca69..809d3d4 100644
--- a/main.py
+++ b/main.py
@@ -1,38 +1,32 @@
-import base64
import hashlib
+import dotenv
+import base64
import json
-import logging
import re
+import logging
+from typing import List, Dict, Optional
+from time import sleep,time
+
+import requests
+
+from bs4 import BeautifulSoup, Tag
+from os import environ
+import pandas as pd
from collections import deque
from datetime import datetime
-from os import environ
-from pathlib import Path
-from time import sleep, time
-from typing import Callable, List, Optional
-
-import dotenv
-import pandas as pd
-import requests
-from bs4 import BeautifulSoup
from serverchan_sdk import sc_send as _sc_send
# 配置日志
-logging.basicConfig(
- level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
-)
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
-
-Path("logs").mkdir(exist_ok=True) # 确保日志目录存在
now = datetime.now().strftime("%Y%m%d_%H%M%S")
-file = logging.FileHandler(f"logs/lessons_{now}.log", mode="w", encoding="utf-8")
-file.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
+file = logging.FileHandler(f'logs/lessons_{now}.log', mode='w', encoding='utf-8')
+file.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
file.setLevel(logging.INFO)
logger.addHandler(file) # 日志输出到文件
-
class LessonsException(Exception):
"""自定义异常类"""
-
pass
@@ -46,52 +40,59 @@ def sc_send(title: str, desp: str):
except Exception as e:
logger.error(f"发送失败: {e}")
-
class Lessons:
def __init__(self):
self.session = requests.session()
+ self.lessons_list: List[Dict[str, str]] = []
+ self.username: Optional[str] = None
self.term: Optional[str] = None
self.fajhh: Optional[str] = None
+ self.token: Optional[str] = None
# 加载环境变量
dotenv.load_dotenv()
-
+
# 检查必需的环境变量
- required_keys = [
- "uname",
- "password",
- "recap_username",
- "recap_password",
- "FILE",
- "SC_KEY",
- ]
+ required_keys = ["uname", "password", "recap_username", "recap_password"]
for key in required_keys:
if not environ.get(key):
raise LessonsException(f"请在环境变量中设置{key}")
-
+
self.base = environ.get("base", "http://jwstudent.lnu.edu.cn")
- def _retry_request(
- self, func, max_retries: int = 10, error_msg: str = "请求失败"
- ) -> requests.Response:
+ def _retry(self, max_retries: int = 5):
+ retry = 0
+ def _wrapper(func, *args, **kwargs):
+ nonlocal retry
+ """装饰器:重试请求"""
+ try:
+ return func(self, *args, **kwargs)
+ except Exception as e:
+ logger.warning(f"重试因为 {e}")
+ retry += 1
+ if retry > max_retries:
+ raise LessonsException("重试次数超过限制")
+ else:
+ return _wrapper(func, *args, **kwargs)
+ return _wrapper
+
+
+
+ def _retry_request(self, func, max_retries: int = 10, error_msg: str = "请求失败") -> requests.Response:
"""通用的请求重试方法"""
for attempt in range(1, max_retries + 1):
try:
response = func()
self.judge_logout(response)
return response
- except (
- requests.ConnectionError,
- requests.HTTPError,
- requests.Timeout,
- ) as e:
+ except (requests.ConnectionError, requests.HTTPError, requests.Timeout) as e:
logger.warning(f"{error_msg}!{type(e).__name__}: {e}")
if attempt < max_retries:
- logger.info(f"第{attempt}次重试")
+ logger.info(f'第{attempt}次重试')
else:
raise LessonsException(f"{error_msg}!请检查网络连接!")
-
+
# 这行不会被执行,但为了类型检查
raise LessonsException(f"{error_msg}!重试次数耗尽")
@@ -100,43 +101,46 @@ class Lessons:
"""验证码识别"""
recap_username = environ.get("recap_username")
recap_password = environ.get("recap_password")
-
+
if not recap_username or not recap_password:
raise LessonsException("验证码识别服务配置不完整")
-
+
data = {
"username": recap_username,
"password": recap_password,
"ID": "04897896",
"b64": b64,
- "version": "3.1.1",
+ "version": "3.1.1"
}
-
+
try:
- response = requests.post(
- "http://www.fdyscloud.com.cn/tuling/predict", json=data, timeout=10
- )
+ response = requests.post("http://www.fdyscloud.com.cn/tuling/predict",
+ json=data, timeout=10)
response.raise_for_status()
result = response.json()
return result["data"]["result"]
except (requests.RequestException, KeyError, json.JSONDecodeError) as e:
raise LessonsException(f"验证码识别失败: {e}")
+
@staticmethod
def pwd_md5(string: str) -> str:
md5_part1 = hashlib.md5((string + "{Urp602019}").encode()).hexdigest().lower()
md5_part2 = hashlib.md5(string.encode()).hexdigest().lower()
- final_result = md5_part1 + "*" + md5_part2
+ final_result = md5_part1 + '*' + md5_part2
return final_result
+ # @self._r
def _login(self):
"""登录模块"""
username = environ.get("uname")
password = environ.get("password")
-
+
if not username or not password:
raise LessonsException("用户名或密码未设置")
-
+
+ self.username = username
+
try:
# 获取登录页面的token
req = self.session.get("http://jwstudent.lnu.edu.cn/login")
@@ -151,9 +155,12 @@ class Lessons:
req = self.session.get(f"{self.base}/img/captcha.jpg")
req.raise_for_status()
im = req.content
- b64 = base64.b64encode(im).decode("utf-8")
+ b64 = base64.b64encode(im).decode('utf-8')
captcha_code = self.recapture(b64=b64)
-
+
+ # 保存验证码图片用于调试
+ # with open("captcha.jpg", "wb") as f:
+ # f.write(im)
logger.info(f"验证码识别结果: {captcha_code}")
hashed_password = self.pwd_md5(password)
@@ -163,35 +170,53 @@ class Lessons:
"j_username": username,
"j_password": hashed_password,
"j_captcha": captcha_code,
- "tokenValue": token_value,
+ "tokenValue": token_value
}
# 发送 POST 请求
url = f"{self.base}/j_spring_security_check"
headers = {
"User-Agent": "Mozilla/5.0",
- "Content-Type": "application/x-www-form-urlencoded",
+ "Content-Type": "application/x-www-form-urlencoded"
}
response = self.session.post(url, data=payload, headers=headers)
if "发生错误" in response.text:
- err = re.search(r"发生错误!(.+)", response.text)
+ err = re.search(r'发生错误!(.+)', response.text)
if err:
error_message = err.group(1).strip()
raise LessonsException(f"登录失败: {error_message}")
raise LessonsException("登录失败")
-
+
logger.info("登录成功")
return True
-
+
except requests.RequestException as e:
raise LessonsException(f"登录过程中网络错误: {e}")
-
+
+
def judge_logout(self, response: requests.Response):
"""检查账号是否在其他地方被登录"""
if response.url == f"{self.base}/login?errorCode=concurrentSessionExpired":
raise LessonsException("有人登录了您的账号!")
+ def judge_choose(self, bs: BeautifulSoup):
+ """判断是否可以选课"""
+ alert = bs.find("div", {"class": "alert alert-block alert-danger"})
+ if alert is not None:
+ raise LessonsException("对不起,当前为非选课阶段!")
+
+ def get_tokenvalue(self, bs: BeautifulSoup) -> str:
+ """获取token值"""
+ token_element = bs.find("input", {"type": "hidden", "id": "tokenValue"})
+ if not token_element or not isinstance(token_element, Tag):
+ raise LessonsException("未找到tokenValue元素")
+
+ value = token_element.get("value")
+ if not value:
+ raise LessonsException("未找到tokenValue")
+ return str(value)
+
def get_base_info(self):
res = self.session.get(f"{self.base}/student/courseSelect/gotoSelect/index")
res.raise_for_status()
@@ -201,10 +226,8 @@ class Lessons:
print(html)
raise LessonsException("未找到培养方案编号")
self.fajhh = match.group(1)
-
- res = self.session.get(
- f"{self.base}/student/courseSelect/planCourse/index?fajhh={self.fajhh}"
- )
+
+ res = self.session.get(f"{self.base}/student/courseSelect/planCourse/index?fajhh={self.fajhh}")
res.raise_for_status()
html = res.text
bs = BeautifulSoup(html, "html.parser")
@@ -212,26 +235,12 @@ class Lessons:
if not match:
raise LessonsException("未找到学期信息")
self.term = str(match.get("value"))
-
+
print(self.fajhh, self.term)
-
+
def read_lessons(self) -> List[tuple[str, str, str]]:
classes = []
- file = Path(environ.get("FILE", "class.xlsx"))
- if not file.is_file():
- raise LessonsException(f"课程文件 {file} 不存在,请检查路径")
- d: dict[str, Callable[[Path], pd.DataFrame]] = {
- ".csv": pd.read_csv,
- ".xlsx": pd.read_excel,
- ".xls": pd.read_excel,
- ".json": pd.read_json,
- }
- func = d.get(file.suffix.lower())
- if func is None:
- raise LessonsException(
- f"不支持的文件格式: {file.suffix}. 仅支持 .csv, .xlsx, .xls, .json 格式"
- )
- df = func(file)
+ df = pd.read_csv("class.csv")
df.columns = df.columns.str.strip() # 去除列名两端的空格
for col in ["课程号", "课序号", "课程名"]:
if col not in df.columns:
@@ -240,10 +249,10 @@ class Lessons:
df.columns = ["id", "kxh", "name"] # 重命名列
df = df.drop_duplicates(subset=["id", "kxh"]) # 去重
for line in df.itertuples(index=False):
- classes.append((line.id, "%02d" % line.kxh, line.name))
+ classes.append((line.id, "%02d"%line.kxh, line.name))
return classes
- def get_left(self, cl: tuple[str, list[str], str]) -> list[int] | None:
+ def get_left(self, cl: tuple[str, list[str], str]) -> list[int]|None:
"""获取课程余量"""
url = f"{self.base}/student/courseSelect/planCourse/courseList"
params = {
@@ -257,29 +266,27 @@ class Lessons:
"kzh": "",
"xqh": "",
"xq": 0,
- "jc": 0,
+ "jc": 0
}
response = self._retry_request(lambda: self.session.post(url, data=params))
data = response.json()["kylMap"]
if len(data) == 0:
logger.error(f"课程 {cl[2]} 的余量信息为空: {data}")
return
-
+
ret = []
for kxh in cl[1]:
key = f"{self.term}_{cl[0]}_{kxh}"
left = data.get(key, None)
if left is None:
- logger.error(
- f"课程 {cl[2]} 的余量信息不存在: {key} not in {data.keys()}"
- )
+ logger.error(f"课程 {cl[2]} 的余量信息不存在: {key} not in {data.keys()}")
ret.append(-1)
ret.append(int(left))
return ret
-
- def select(self, cl: tuple[str, str, str]) -> bool:
+
+ def select(self,cl: tuple[str,str,str]) -> bool:
"""选课"""
-
+
url = f"{self.base}/student/courseSelect/gotoSelect/index"
response = self._retry_request(lambda: self.session.get(url))
response.raise_for_status()
@@ -289,10 +296,10 @@ class Lessons:
logger.error("未找到 tokenValue")
return False
token = match.group(1)
-
+
url = f"{self.base}/student/courseSelect/selectCourse/checkInputCodeAndSubmit"
cms = f"{cl[2]}_{cl[1]}"
- cms = ",".join(map(lambda x: str(ord(x)), cms))
+ cms = ",".join(map(lambda x:str(ord(x)),cms))
params = {
"dealType": 2,
"fajhh": self.fajhh,
@@ -304,15 +311,15 @@ class Lessons:
"xqh": "",
}
sel_data = params.copy()
- sel_data.update({"inputCode": "undefined", "tokenValue": token})
+ sel_data.update({"inputCode":"undefined", "tokenValue": token})
response = self._retry_request(lambda: self.session.post(url, data=sel_data))
response.raise_for_status()
if response.json().get("result") != "ok":
logger.error(f"选课时发生错误: {response}")
return False
-
+
logger.info("选课请求已发送,等待结果...")
-
+
url = f"{self.base}/student/courseSelect/selectCourses/waitingfor"
response = self._retry_request(lambda: self.session.post(url, data=params))
response.raise_for_status()
@@ -321,15 +328,15 @@ class Lessons:
if not redisKey:
print(html)
logger.error(f"选课 {cl[2]} 时未找到 redisKey")
- return False
+ return False
redisKey = redisKey.group(1)
-
+
parms = {
"kcNum": 1,
"redisKey": redisKey,
}
cnt = 1
- while cnt <= 10:
+ while cnt<=10:
url = f"{self.base}/student/courseSelect/selectResult/query"
response = self._retry_request(lambda: self.session.post(url, data=parms))
response.raise_for_status()
@@ -346,12 +353,12 @@ class Lessons:
logger.info(f"选课成功: {text}")
else:
print(f"第{cnt}次查询中...")
- cnt += 1
+ cnt+=1
sleep(1)
-
+
logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课")
return False
-
+
def login(self):
logger.info("尝试登录")
flag = False
@@ -364,7 +371,7 @@ class Lessons:
logger.error(f"登录失败: {e}")
if not flag:
raise LessonsException("登录失败,无法获取token")
-
+
def auto_spider(self):
"""自动选课主程序"""
try:
@@ -372,21 +379,21 @@ class Lessons:
logger.info("获取基础信息")
self.get_base_info()
classes_src = self.read_lessons()
- classes: dict[str, tuple[str, list[str], str]] = {}
- mp: dict[str, str] = {} # {"课程号": "课程名称"}
+ classes:dict[str,tuple[str,list[str],str]] = {}
+ mp:dict[str,str] = {} # {"课程号": "课程名称"}
for id, kxh, name in classes_src:
mp[id] = name
if classes.get(id) is None:
classes[id] = (id, [kxh], name)
else:
classes[id][1].append(kxh)
-
+
logger.info(f"读取课程信息,共有 {len(classes)} 门课程")
for cl in classes.values():
logger.info(f"课程 {cl[2]} ({cl[0]}) 的可选课序号: {', '.join(cl[1])}")
-
+
logger.info("开始自动选课")
-
+
errs = deque(maxlen=5)
master_err = 0
while classes:
@@ -412,12 +419,10 @@ class Lessons:
errs.appendleft(time())
logger.error(f"获取课程 {lcl[2]} 余量时发生错误: {e}")
continue
- for i, left in enumerate(lefts):
+ for i,left in enumerate(lefts):
cl = (lcl[0], lcl[1][i], lcl[2])
if left > 0:
- logger.info(
- f"课程 {cl[2]}_{cl[1]} 有余量: {left},开始选课"
- )
+ logger.info(f"课程 {cl[2]}_{cl[1]} 有余量: {left},开始选课")
try:
ret = self.select(cl)
if ret:
@@ -434,18 +439,14 @@ class Lessons:
else:
logger.info(f"课程 {cl[2]}_{cl[1]} 无余量")
sleep(2)
- logger.info(
- f"当前还有{len(classes)}门课程未选上,分别为{','.join(mp[i] for i in classes.keys())}。等待10秒后继续检查"
- )
+ logger.info(f"当前还有{len(classes)}门课程未选上,分别为{','.join(mp[i] for i in classes.keys())}。等待10秒后继续检查")
if suc:
- sc_send(
- "选课成功",
- desp=f"已成功选上课程: {', '.join(f'{i[2]}_{i[1]}' for i in suc)}",
- )
+ sc_send("选课成功", desp=f"已成功选上课程: {', '.join(f'{i[2]}_{i[1]}' for i in suc)}")
sleep(10) # 等待10秒后继续检查
-
+
+
logger.info("自动选课完成")
-
+
except LessonsException as e:
logger.error(f"选课过程中发生错误: {e}")
sc_send("选课异常", desp=f"选课过程中发生错误: {e}")
@@ -455,7 +456,6 @@ class Lessons:
sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}")
raise e
-
if __name__ == "__main__":
les = Lessons()
- les.auto_spider()
+ les.auto_spider()
\ No newline at end of file