from utils import LessonsException, ReloginException from utils import sc_send,URP,logger import re from collections import deque from os import environ from pathlib import Path from time import sleep, time from typing import Callable, List, Optional import pandas as pd class Lessons(URP): def __init__(self): super().__init__() self.env_check() self.term: Optional[str] = None self.fajhh: Optional[str] = None self.interval_1 = int(environ.get("INTERVAL_1", 2)) # 请求间隔,默认为2秒 self.interval_2 = int(environ.get("INTERVAL_2", 10)) # 请求间隔,默认为10秒 def get_base_info(self): res = self.session.get(f"{self.base}/student/courseSelect/gotoSelect/index") res.raise_for_status() html = res.text match = re.search(r"fajhh=(\d+)", html) if not match: # print(html) raise LessonsException("未找到培养方案编号") self.fajhh = match.group(1) res = self.session.get( f"{self.base}/student/courseSelect/planCourse/index?fajhh={self.fajhh}" ) res.raise_for_status() html = res.text # 使用正则表达式替代 BeautifulSoup 来查找选中的学期选项 # 由于 HTML 结构特殊,selected 在单独行上,需要向前查找对应的 option lines = html.split('\n') for i, line in enumerate(lines): if 'selected' in line.strip(): # 向前查找包含 option value 的行 for j in range(i-1, max(0, i-10), -1): if 'option value=' in lines[j]: value_match = re.search(r'value="([^"]*)"', lines[j]) if value_match: self.term = str(value_match.group(1)) break if self.term: break if not self.term: raise LessonsException("未找到学期信息") # print(self.fajhh, self.term) def read_lessons(self) -> List[tuple[str, str, str]]: classes = [] file = Path(environ.get("FILE", "class.xlsx")) if not file.is_file(): raise LessonsException(f"课程文件 {file} 不存在,请检查路径") d: dict[str, Callable[[Path], pd.DataFrame]] = { ".csv": pd.read_csv, ".xlsx": pd.read_excel, ".xls": pd.read_excel, ".json": pd.read_json, } func = d.get(file.suffix.lower()) if func is None: raise LessonsException( f"不支持的文件格式: {file.suffix}. 仅支持 .csv, .xlsx, .xls, .json 格式" ) df = func(file) df.columns = df.columns.str.strip() # 去除列名两端的空格 for col in ["课程号", "课序号", "课程名"]: if col not in df.columns: raise LessonsException(f"缺少必要的列: {col}") df = df[["课程号", "课序号", "课程名"]] df.columns = ["id", "kxh", "name"] # 重命名列 df = df.drop_duplicates(subset=["id", "kxh"]) # 去重 for line in df.itertuples(index=False): classes.append((line.id, "%02d" % line.kxh, line.name)) return classes def get_left(self, cl: tuple[str, list[str], str]) -> list[int] | None: """获取课程余量""" url = f"{self.base}/student/courseSelect/planCourse/courseList" params = { "fajhh": self.fajhh, "jhxn": self.term, "kcsxdm": "", "kch": cl[0], "kcm": "", "kxh": "", "kclbdm": "", "kzh": "", "xqh": "", "xq": 0, "jc": 0, } response = self._retry_request(lambda: self.session.post(url, data=params)) with open("response.json", "w", encoding="utf-8") as f: f.write(response.text) data:dict = response.json() cls:list[dict] = data.get("rwfalist", []) if not cls: logger.error(f"课程 {cl[2]} 的课程信息为空: {data}") return None for item in cls: if item["classNum"] in cl[1]: # print(item["classNum"],type(item["classNum"])) if item["kcm"] != cl[2]: logger.critical( f"课程 {cl[2]} 的课程名与查询信息不匹配: {item['kcm']} != {cl[2]}" ) sc_send( "选课异常", desp=f"课程 {cl[2]} 的课程名与查询信息不匹配: {item['kcm']} != {cl[2]}", ) return None kyl:dict[str,str] = data["kylMap"] if len(kyl) == 0: logger.error(f"课程 {cl[2]} 的余量信息为空: {kyl}") return ret = [] for kxh in cl[1]: key = f"{self.term}_{cl[0]}_{kxh}" left = kyl.get(key, None) if left is None: logger.error( f"课程 {cl[2]} 的余量信息不存在: {key} not in {kyl.keys()}" ) ret.append(-1) continue ret.append(int(left)) return ret def select(self, cl: tuple[str, str, str]) -> bool: """选课""" url = f"{self.base}/student/courseSelect/gotoSelect/index" response = self._retry_request(lambda: self.session.get(url)) response.raise_for_status() html = response.text match = re.search(r'', html) if not match: logger.error("未找到 tokenValue") return False token = match.group(1) url = f"{self.base}/student/courseSelect/selectCourse/checkInputCodeAndSubmit" cms = f"{cl[2]}_{cl[1]}" cms = ",".join(map(lambda x: str(ord(x)), cms)) params = { "dealType": 2, "fajhh": self.fajhh, "kcIds": f"{cl[0]}_{cl[1]}_{self.term}", "kcms": cms, "sj": "0_0", "kclbdm": "", "kzh": "", "xqh": "", } sel_data = params.copy() sel_data.update({"inputCode": "undefined", "tokenValue": token}) response = self._retry_request(lambda: self.session.post(url, data=sel_data)) response.raise_for_status() if response.json().get("result") != "ok": logger.error(f"选课时发生错误: {response}") return False logger.info("选课请求已发送,等待结果...") url = f"{self.base}/student/courseSelect/selectCourses/waitingfor" response = self._retry_request(lambda: self.session.post(url, data=params)) response.raise_for_status() html = response.text redisKey = re.search(r'var redisKey = "(.+)";', html) if not redisKey: # print(html) logger.error(f"选课 {cl[2]} 时未找到 redisKey") return False redisKey = redisKey.group(1) parms = { "kcNum": 1, "redisKey": redisKey, } cnt = 1 while cnt <= 10: url = f"{self.base}/student/courseSelect/selectResult/query" response = self._retry_request(lambda: self.session.post(url, data=parms)) response.raise_for_status() result = response.json() if result["isFinish"]: text = "\n".join(result["result"]) if "已经选择了课程!" in text: logger.info(f"课程 {cl[2]} 已经选上,无需重复选课") return True if "成功" not in text: logger.error(f"选课失败: {text}") return False else: logger.info(f"选课成功: {text}") else: logger.info(f"第{cnt}次查询中...") cnt += 1 sleep(1) logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课") return False def auto_spider(self): """自动选课主程序""" try: self.login() logger.info("获取基础信息") self.get_base_info() classes_src = self.read_lessons() classes: dict[str, tuple[str, list[str], str]] = {} mp: dict[str, str] = {} # {"课程号": "课程名称"} for id, kxh, name in classes_src: mp[id] = name if classes.get(id) is None: classes[id] = (id, [kxh], name) else: if classes[id][2] != name: raise LessonsException( f"课程 {name}_{kxh} 的名称不一致: {classes[id][2]} != {name}" ) classes[id][1].append(kxh) logger.info(f"读取课程信息,共有 {len(classes)} 门课程") for cl in classes.values(): logger.info(f"课程 {cl[2]} ({cl[0]}) 的可选课序号: {', '.join(cl[1])}") logger.info("开始自动选课") errs = deque(maxlen=5) master_err = 0 while classes: suc = [] if len(errs) == 5 and errs[0] - time() < 20: logger.error("最近5次获取课程余量异常,尝试重新登录") sc_send("选课异常", desp="最近5次获取课程余量异常,尝试重新登录") self.login() master_err += 1 errs.clear() if master_err >= 3: logger.error("反复发生重要异常,退出程序") sc_send("选课异常", desp="反复发生重要异常,退出程序") return try: for lcl in classes.copy().values(): logger.info(f"检查课程 {lcl[2]} 余量") try: lefts = self.get_left(lcl) if lefts is None: errs.appendleft(time()) logger.error(f"获取课程 {lcl[2]} 余量时返回异常") continue except ReloginException as e: raise e except Exception as e: errs.appendleft(time()) logger.error(f"获取课程 {lcl[2]} 余量时发生错误: {e}") continue for i, left in enumerate(lefts): cl = (lcl[0], lcl[1][i], lcl[2]) if left > 0: logger.info( f"课程 {cl[2]}_{cl[1]} 有余量: {left},开始选课" ) try: ret = self.select(cl) if ret: suc.append(cl) classes.pop(cl[0]) break except ReloginException as e: raise e except Exception as e: errs.appendleft(time()) logger.error(f"选课 {cl[2]}_{cl[1]} 时发生错误: {e}") finally: sleep(self.interval_1) # 避免请求过快导致服务器拒绝 elif left == -1: logger.error(f"课程 {cl[2]}_{cl[1]} 余量信息异常") else: logger.info(f"课程 {cl[2]}_{cl[1]} 无余量") sleep(self.interval_1) logger.info( f"当前还有{len(classes)}门课程未选上,分别为{','.join(mp[i] for i in classes.keys())}。等待{self.interval_2}秒后继续检查" ) if suc: sc_send( "选课成功", desp=f"已成功选上课程: {', '.join(f'{i[2]}_{i[1]}' for i in suc)}", ) except ReloginException as e: logger.error(f"需要重新登录: {e}") sc_send("选课异常", desp=f"需要重新登录: {e}") self.login() continue except LessonsException as e: logger.error(f"选课过程中发生错误: {e}") sc_send("选课异常", desp=f"选课过程中发生错误: {e}") errs.appendleft(time()) continue except Exception as e: logger.error(f"意外错误: {e}") sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}") errs.appendleft(time()) continue sleep(self.interval_2) # 等待10秒后继续检查 logger.info("自动选课完成") except LessonsException as e: logger.error(f"选课过程中发生错误: {e}") sc_send("选课异常", desp=f"选课过程中发生错误: {e}") raise e except Exception as e: logger.error(f"意外错误: {e}") sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}") raise e class Grade(URP): def __init__(self): super().__init__() self.total = None required_keys = [ "uname", "password", "recap_username", "recap_password", ] self.env_check(required_keys) self.interval_2 = int(environ.get("interval_2",3600)) def query(self) -> tuple[dict[str,dict[str,str]],set[str]]: url = f"{self.base}/student/integratedQuery/scoreQuery/thisTermScores/index" res = self._retry_request(lambda:self.session.get(url)) res.raise_for_status() html = res.text match = re.search(f"/student/integratedQuery/scoreQuery/.+/thisTermScores/data",html) if match: url = self.base+match.group(0) else: raise RuntimeError("Cannot find url") # url = f"{self.base}/student/integratedQuery/scoreQuery/U6I5OXib09/thisTermScores/data" res = self._retry_request(lambda :self.session.get(url)) # print(res.text) res_json = res.json() l = res_json[0]["list"] if self.total is None: self.total = len(l) elif self.total != len(l): sc_send("成绩查询异常",f"课程数发生变化 {self.total}!={len(l)}") ret = {x["courseName"]: x for x in l if x["avgcj"].strip()!=""} return ret,set(ret.keys()) @staticmethod def format(x:dict[str,str]): return f"|{x['courseName']}|{x['courseScore']}|{x['maxcj']}|{x['avgcj']}|" def auto_check(self): self.login() # self.session.cookies.update({"student.urpSoft.cn":"aaapnXQb62LApgwx7lkFz","UqZBpD3n3iXPAw1X9DmYiUaISMkd8YhMUen0":"v1IraGSUs3hnH"}) grades = set() self.query() assert isinstance(self.total,int) assert self.total > 0 err = 0 while len(grades) < self.total: try: logger.info("Querying") cls, new = self.query() cls:dict[str,dict[str,str]] new:set[str] if new != grades: delta_names = new - grades delta = [cls[x] for x in delta_names] t = [] t.append("新成绩") t.append("|学科|成绩|最高分|平均分|") t.append("|-|-|-|-|") t.extend(map(self.format,delta)) logger.info("\n".join(t)) t.append("---") t.append("所有成绩") t.append("|学科|成绩|最高分|平均分|") t.append("|-|-|-|-|") t.extend(map(self.format,cls.values())) t = "\n".join(t) sc_send("成绩发布",t) try: from rich.markdown import Markdown from rich import print print(Markdown(t)) except ImportError: print("Cannot import rich, show markdown directly.") print(t) grades = new if err > 0: err-=1 except ReloginException as e: logger.info("Relogin") sc_send("成绩监控","重新登录") self.login() except Exception as e: logger.error(f"Failed to update due to {e}") err+=1 if err >= 5: logger.error("Try to relogin") sc_send("成绩监控","多次失败,尝试重新登录") self.login() logger.info(f"Next query will start after {self.interval_2}s") sleep(self.interval_2) logger.info("Normal terminated due to all grades is out.") sc_send("成绩监控","所有成绩均已公布") if __name__ == "__main__": les = Lessons() les.auto_spider() # gra = Grade() # gra.auto_check()