This commit is contained in:
2025-07-09 00:17:36 +08:00
parent eebe92bab0
commit d56d2a3267
2 changed files with 87 additions and 80 deletions

126
main.py
View File

@ -1,11 +1,5 @@
from utils import LessonsException, ReloginException
from utils import sc_send,URP,logger
import re
from collections import deque
from os import environ
from pathlib import Path
from time import sleep, time
@ -13,25 +7,28 @@ from typing import Callable, List, Optional
import pandas as pd
from utils import URP, LessonsException, ReloginException, logger, sc_send
try:
from rich.markdown import Markdown
from rich import print
from rich.markdown import Markdown
RICH = True
except ImportError:
print("Some function in console may disabled due to no rich.")
RICH = False
class Lessons(URP):
def __init__(self):
super().__init__()
self.env_check()
self.term: Optional[str] = None
self.fajhh: Optional[str] = None
self.fajhh: Optional[str] = None
self.interval_1 = int(environ.get("INTERVAL_1", 2)) # 请求间隔默认为2秒
self.interval_2 = int(environ.get("INTERVAL_2", 10)) # 请求间隔默认为10秒
def get_base_info(self):
res = self.session.get(f"{self.base}/student/courseSelect/gotoSelect/index")
@ -50,19 +47,19 @@ class Lessons(URP):
html = res.text
# 使用正则表达式替代 BeautifulSoup 来查找选中的学期选项
# 由于 HTML 结构特殊selected 在单独行上,需要向前查找对应的 option
lines = html.split('\n')
lines = html.split("\n")
for i, line in enumerate(lines):
if 'selected' in line.strip():
if "selected" in line.strip():
# 向前查找包含 option value 的行
for j in range(i-1, max(0, i-10), -1):
if 'option value=' in lines[j]:
for j in range(i - 1, max(0, i - 10), -1):
if "option value=" in lines[j]:
value_match = re.search(r'value="([^"]*)"', lines[j])
if value_match:
self.term = str(value_match.group(1))
break
if self.term:
break
if not self.term:
raise LessonsException("未找到学期信息")
@ -115,9 +112,9 @@ class Lessons(URP):
response = self._retry_request(lambda: self.session.post(url, data=params))
with open("response.json", "w", encoding="utf-8") as f:
f.write(response.text)
data:dict = response.json()
cls:list[dict] = data.get("rwfalist", [])
data: dict = response.json()
cls: list[dict] = data.get("rwfalist", [])
if not cls:
logger.error(f"课程 {cl[2]} 的课程信息为空: {data}")
return None
@ -134,8 +131,8 @@ class Lessons(URP):
desp=f"课程 {cl[2]} 的课程名与查询信息不匹配: {item['kcm']} != {cl[2]}",
)
return None
kyl:dict[str,str] = data["kylMap"]
kyl: dict[str, str] = data["kylMap"]
if len(kyl) == 0:
logger.error(f"课程 {cl[2]} 的余量信息为空: {kyl}")
return
@ -227,7 +224,6 @@ class Lessons(URP):
logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课")
return False
def auto_spider(self):
"""自动选课主程序"""
try:
@ -299,7 +295,9 @@ class Lessons(URP):
raise e
except Exception as e:
errs.appendleft(time())
logger.error(f"选课 {cl[2]}_{cl[1]} 时发生错误: {e}")
logger.error(
f"选课 {cl[2]}_{cl[1]} 时发生错误: {e}"
)
finally:
sleep(self.interval_1) # 避免请求过快导致服务器拒绝
elif left == -1:
@ -343,6 +341,7 @@ class Lessons(URP):
sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}")
raise e
class Grade(URP):
def __init__(self):
super().__init__()
@ -354,52 +353,53 @@ class Grade(URP):
"recap_password",
]
self.env_check(required_keys)
self.interval_2 = int(environ.get("interval_2",3600))
def query(self) -> tuple[dict[str,dict[str,str]],set[str]]:
self.interval_2 = int(environ.get("interval_2", 3600))
def query(self) -> tuple[dict[str, dict[str, str]], set[str]]:
url = f"{self.base}/student/integratedQuery/scoreQuery/thisTermScores/index"
res = self._retry_request(lambda:self.session.get(url))
res = self._retry_request(lambda: self.session.get(url))
res.raise_for_status()
html = res.text
match = re.search(f"/student/integratedQuery/scoreQuery/.+/thisTermScores/data",html)
match = re.search(
f"/student/integratedQuery/scoreQuery/.+/thisTermScores/data", html
)
if match:
url = self.base+match.group(0)
url = self.base + match.group(0)
else:
raise RuntimeError("Cannot find url")
# url = f"{self.base}/student/integratedQuery/scoreQuery/U6I5OXib09/thisTermScores/data"
res = self._retry_request(lambda :self.session.get(url))
res = self._retry_request(lambda: self.session.get(url))
# print(res.text)
res_json = res.json()
res_json = res.json()
l = res_json[0]["list"]
if self.total is None:
self.total = len(l)
elif self.total != len(l):
sc_send("成绩查询异常",f"课程数发生变化 {self.total}!={len(l)}")
ret = {x["courseName"]: x for x in l if x["avgcj"].strip()!=""}
return ret,set(ret.keys())
sc_send("成绩查询异常", f"课程数发生变化 {self.total}!={len(l)}")
ret = {x["courseName"]: x for x in l if x["avgcj"].strip() != ""}
return ret, set(ret.keys())
@staticmethod
def format(x:dict[str,str]):
def format(x: dict[str, str]):
return f"|{x['courseName']}|{x['courseScore']}|{x['maxcj']}|{x['avgcj']}|"
def auto_check(self):
self.login()
# self.session.cookies.update({"student.urpSoft.cn":"aaapnXQb62LApgwx7lkFz","UqZBpD3n3iXPAw1X9DmYiUaISMkd8YhMUen0":"v1IraGSUs3hnH"})
grades = set()
self.query()
assert isinstance(self.total,int)
assert isinstance(self.total, int)
assert self.total > 0
err = 0
while len(grades) < self.total:
try:
logger.info("Querying")
cls, new = self.query()
cls:dict[str,dict[str,str]]
new:set[str]
cls: dict[str, dict[str, str]]
new: set[str]
if new != grades:
delta_names = new - grades
delta = [cls[x] for x in delta_names]
@ -407,45 +407,45 @@ class Grade(URP):
t.append("新成绩")
t.append("|学科|成绩|最高分|平均分|")
t.append("|-|-|-|-|")
t.extend(map(self.format,delta))
t.extend(map(self.format, delta))
logger.info("\n".join(t))
t.append("---")
t.append("所有成绩")
t.append("|学科|成绩|最高分|平均分|")
t.append("|-|-|-|-|")
t.extend(map(self.format,cls.values()))
t.extend(map(self.format, cls.values()))
t = "\n".join(t)
sc_send("成绩发布",t)
sc_send("成绩发布", t)
if RICH:
print(Markdown(t))
else:
print(t)
grades = new
if err > 0: err-=1
if err > 0:
err -= 1
except ReloginException as e:
logger.info("Relogin")
sc_send("成绩监控","重新登录")
sc_send("成绩监控", "重新登录")
self.login()
except Exception as e:
logger.error(f"Failed to update due to {e}")
err+=1
err += 1
if err >= 5:
logger.error("Try to relogin")
sc_send("成绩监控","多次失败,尝试重新登录")
sc_send("成绩监控", "多次失败,尝试重新登录")
self.login()
logger.info(f"Next query will start after {self.interval_2}s")
sleep(self.interval_2)
logger.info("Normal terminated due to all grades is out.")
sc_send("成绩监控","所有成绩均已公布")
sc_send("成绩监控", "所有成绩均已公布")
if __name__ == "__main__":
# les = Lessons()
# les.auto_spider()
gra = Grade()
gra.auto_check()
les = Lessons()
les.auto_spider()
# gra = Grade()
# gra.auto_check()