Compare commits
2 Commits
d78e9902f2
...
d56d2a3267
Author | SHA1 | Date | |
---|---|---|---|
d56d2a3267 | |||
eebe92bab0 |
132
main.py
132
main.py
@ -1,11 +1,5 @@
|
|||||||
|
|
||||||
|
|
||||||
from utils import LessonsException, ReloginException
|
|
||||||
from utils import sc_send,URP,logger
|
|
||||||
|
|
||||||
import re
|
import re
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
from os import environ
|
from os import environ
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from time import sleep, time
|
from time import sleep, time
|
||||||
@ -13,18 +7,28 @@ from typing import Callable, List, Optional
|
|||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
|
from utils import URP, LessonsException, ReloginException, logger, sc_send
|
||||||
|
|
||||||
|
try:
|
||||||
|
from rich import print
|
||||||
|
from rich.markdown import Markdown
|
||||||
|
|
||||||
|
RICH = True
|
||||||
|
except ImportError:
|
||||||
|
print("Some function in console may disabled due to no rich.")
|
||||||
|
RICH = False
|
||||||
|
|
||||||
|
|
||||||
class Lessons(URP):
|
class Lessons(URP):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
self.env_check()
|
self.env_check()
|
||||||
|
|
||||||
self.term: Optional[str] = None
|
self.term: Optional[str] = None
|
||||||
self.fajhh: Optional[str] = None
|
self.fajhh: Optional[str] = None
|
||||||
self.interval_1 = int(environ.get("INTERVAL_1", 2)) # 请求间隔,默认为2秒
|
self.interval_1 = int(environ.get("INTERVAL_1", 2)) # 请求间隔,默认为2秒
|
||||||
self.interval_2 = int(environ.get("INTERVAL_2", 10)) # 请求间隔,默认为10秒
|
self.interval_2 = int(environ.get("INTERVAL_2", 10)) # 请求间隔,默认为10秒
|
||||||
|
|
||||||
|
|
||||||
def get_base_info(self):
|
def get_base_info(self):
|
||||||
res = self.session.get(f"{self.base}/student/courseSelect/gotoSelect/index")
|
res = self.session.get(f"{self.base}/student/courseSelect/gotoSelect/index")
|
||||||
@ -43,19 +47,19 @@ class Lessons(URP):
|
|||||||
html = res.text
|
html = res.text
|
||||||
# 使用正则表达式替代 BeautifulSoup 来查找选中的学期选项
|
# 使用正则表达式替代 BeautifulSoup 来查找选中的学期选项
|
||||||
# 由于 HTML 结构特殊,selected 在单独行上,需要向前查找对应的 option
|
# 由于 HTML 结构特殊,selected 在单独行上,需要向前查找对应的 option
|
||||||
lines = html.split('\n')
|
lines = html.split("\n")
|
||||||
for i, line in enumerate(lines):
|
for i, line in enumerate(lines):
|
||||||
if 'selected' in line.strip():
|
if "selected" in line.strip():
|
||||||
# 向前查找包含 option value 的行
|
# 向前查找包含 option value 的行
|
||||||
for j in range(i-1, max(0, i-10), -1):
|
for j in range(i - 1, max(0, i - 10), -1):
|
||||||
if 'option value=' in lines[j]:
|
if "option value=" in lines[j]:
|
||||||
value_match = re.search(r'value="([^"]*)"', lines[j])
|
value_match = re.search(r'value="([^"]*)"', lines[j])
|
||||||
if value_match:
|
if value_match:
|
||||||
self.term = str(value_match.group(1))
|
self.term = str(value_match.group(1))
|
||||||
break
|
break
|
||||||
if self.term:
|
if self.term:
|
||||||
break
|
break
|
||||||
|
|
||||||
if not self.term:
|
if not self.term:
|
||||||
raise LessonsException("未找到学期信息")
|
raise LessonsException("未找到学期信息")
|
||||||
|
|
||||||
@ -108,9 +112,9 @@ class Lessons(URP):
|
|||||||
response = self._retry_request(lambda: self.session.post(url, data=params))
|
response = self._retry_request(lambda: self.session.post(url, data=params))
|
||||||
with open("response.json", "w", encoding="utf-8") as f:
|
with open("response.json", "w", encoding="utf-8") as f:
|
||||||
f.write(response.text)
|
f.write(response.text)
|
||||||
data:dict = response.json()
|
data: dict = response.json()
|
||||||
|
|
||||||
cls:list[dict] = data.get("rwfalist", [])
|
cls: list[dict] = data.get("rwfalist", [])
|
||||||
if not cls:
|
if not cls:
|
||||||
logger.error(f"课程 {cl[2]} 的课程信息为空: {data}")
|
logger.error(f"课程 {cl[2]} 的课程信息为空: {data}")
|
||||||
return None
|
return None
|
||||||
@ -127,8 +131,8 @@ class Lessons(URP):
|
|||||||
desp=f"课程 {cl[2]} 的课程名与查询信息不匹配: {item['kcm']} != {cl[2]}",
|
desp=f"课程 {cl[2]} 的课程名与查询信息不匹配: {item['kcm']} != {cl[2]}",
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
kyl:dict[str,str] = data["kylMap"]
|
kyl: dict[str, str] = data["kylMap"]
|
||||||
if len(kyl) == 0:
|
if len(kyl) == 0:
|
||||||
logger.error(f"课程 {cl[2]} 的余量信息为空: {kyl}")
|
logger.error(f"课程 {cl[2]} 的余量信息为空: {kyl}")
|
||||||
return
|
return
|
||||||
@ -220,7 +224,6 @@ class Lessons(URP):
|
|||||||
logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课")
|
logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def auto_spider(self):
|
def auto_spider(self):
|
||||||
"""自动选课主程序"""
|
"""自动选课主程序"""
|
||||||
try:
|
try:
|
||||||
@ -292,7 +295,9 @@ class Lessons(URP):
|
|||||||
raise e
|
raise e
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
errs.appendleft(time())
|
errs.appendleft(time())
|
||||||
logger.error(f"选课 {cl[2]}_{cl[1]} 时发生错误: {e}")
|
logger.error(
|
||||||
|
f"选课 {cl[2]}_{cl[1]} 时发生错误: {e}"
|
||||||
|
)
|
||||||
finally:
|
finally:
|
||||||
sleep(self.interval_1) # 避免请求过快导致服务器拒绝
|
sleep(self.interval_1) # 避免请求过快导致服务器拒绝
|
||||||
elif left == -1:
|
elif left == -1:
|
||||||
@ -336,6 +341,7 @@ class Lessons(URP):
|
|||||||
sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}")
|
sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}")
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
|
||||||
class Grade(URP):
|
class Grade(URP):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@ -347,52 +353,53 @@ class Grade(URP):
|
|||||||
"recap_password",
|
"recap_password",
|
||||||
]
|
]
|
||||||
self.env_check(required_keys)
|
self.env_check(required_keys)
|
||||||
self.interval_2 = int(environ.get("interval_2",3600))
|
self.interval_2 = int(environ.get("interval_2", 3600))
|
||||||
|
|
||||||
def query(self) -> tuple[dict[str,dict[str,str]],set[str]]:
|
def query(self) -> tuple[dict[str, dict[str, str]], set[str]]:
|
||||||
url = f"{self.base}/student/integratedQuery/scoreQuery/thisTermScores/index"
|
url = f"{self.base}/student/integratedQuery/scoreQuery/thisTermScores/index"
|
||||||
res = self._retry_request(lambda:self.session.get(url))
|
res = self._retry_request(lambda: self.session.get(url))
|
||||||
res.raise_for_status()
|
res.raise_for_status()
|
||||||
html = res.text
|
html = res.text
|
||||||
match = re.search(f"/student/integratedQuery/scoreQuery/.+/thisTermScores/data",html)
|
match = re.search(
|
||||||
|
f"/student/integratedQuery/scoreQuery/.+/thisTermScores/data", html
|
||||||
|
)
|
||||||
if match:
|
if match:
|
||||||
url = self.base+match.group(0)
|
url = self.base + match.group(0)
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("Cannot find url")
|
raise RuntimeError("Cannot find url")
|
||||||
# url = f"{self.base}/student/integratedQuery/scoreQuery/U6I5OXib09/thisTermScores/data"
|
# url = f"{self.base}/student/integratedQuery/scoreQuery/U6I5OXib09/thisTermScores/data"
|
||||||
res = self._retry_request(lambda :self.session.get(url))
|
res = self._retry_request(lambda: self.session.get(url))
|
||||||
# print(res.text)
|
# print(res.text)
|
||||||
res_json = res.json()
|
res_json = res.json()
|
||||||
l = res_json[0]["list"]
|
l = res_json[0]["list"]
|
||||||
if self.total is None:
|
if self.total is None:
|
||||||
self.total = len(l)
|
self.total = len(l)
|
||||||
elif self.total != len(l):
|
elif self.total != len(l):
|
||||||
sc_send("成绩查询异常",f"课程数发生变化 {self.total}!={len(l)}")
|
sc_send("成绩查询异常", f"课程数发生变化 {self.total}!={len(l)}")
|
||||||
ret = {x["courseName"]: x for x in l if x["avgcj"].strip()!=""}
|
ret = {x["courseName"]: x for x in l if x["avgcj"].strip() != ""}
|
||||||
return ret,set(ret.keys())
|
return ret, set(ret.keys())
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def format(x:dict[str,str]):
|
def format(x: dict[str, str]):
|
||||||
return f"|{x['courseName']}|{x['courseScore']}|{x['maxcj']}|{x['avgcj']}|"
|
return f"|{x['courseName']}|{x['courseScore']}|{x['maxcj']}|{x['avgcj']}|"
|
||||||
|
|
||||||
def auto_check(self):
|
def auto_check(self):
|
||||||
self.login()
|
self.login()
|
||||||
# self.session.cookies.update({"student.urpSoft.cn":"aaapnXQb62LApgwx7lkFz","UqZBpD3n3iXPAw1X9DmYiUaISMkd8YhMUen0":"v1IraGSUs3hnH"})
|
|
||||||
|
|
||||||
grades = set()
|
grades = set()
|
||||||
self.query()
|
self.query()
|
||||||
|
|
||||||
assert isinstance(self.total,int)
|
assert isinstance(self.total, int)
|
||||||
assert self.total > 0
|
assert self.total > 0
|
||||||
|
|
||||||
err = 0
|
err = 0
|
||||||
|
|
||||||
while len(grades) < self.total:
|
while len(grades) < self.total:
|
||||||
try:
|
try:
|
||||||
logger.info("Querying")
|
logger.info("Querying")
|
||||||
cls, new = self.query()
|
cls, new = self.query()
|
||||||
cls:dict[str,dict[str,str]]
|
cls: dict[str, dict[str, str]]
|
||||||
new:set[str]
|
new: set[str]
|
||||||
if new != grades:
|
if new != grades:
|
||||||
delta_names = new - grades
|
delta_names = new - grades
|
||||||
delta = [cls[x] for x in delta_names]
|
delta = [cls[x] for x in delta_names]
|
||||||
@ -400,48 +407,45 @@ class Grade(URP):
|
|||||||
t.append("新成绩")
|
t.append("新成绩")
|
||||||
t.append("|学科|成绩|最高分|平均分|")
|
t.append("|学科|成绩|最高分|平均分|")
|
||||||
t.append("|-|-|-|-|")
|
t.append("|-|-|-|-|")
|
||||||
t.extend(map(self.format,delta))
|
t.extend(map(self.format, delta))
|
||||||
|
|
||||||
logger.info("\n".join(t))
|
logger.info("\n".join(t))
|
||||||
|
|
||||||
t.append("---")
|
t.append("---")
|
||||||
t.append("所有成绩")
|
t.append("所有成绩")
|
||||||
t.append("|学科|成绩|最高分|平均分|")
|
t.append("|学科|成绩|最高分|平均分|")
|
||||||
t.append("|-|-|-|-|")
|
t.append("|-|-|-|-|")
|
||||||
t.extend(map(self.format,cls.values()))
|
t.extend(map(self.format, cls.values()))
|
||||||
|
|
||||||
t = "\n".join(t)
|
t = "\n".join(t)
|
||||||
sc_send("成绩发布",t)
|
sc_send("成绩发布", t)
|
||||||
try:
|
if RICH:
|
||||||
from rich.markdown import Markdown
|
|
||||||
from rich import print
|
|
||||||
print(Markdown(t))
|
print(Markdown(t))
|
||||||
except ImportError:
|
else:
|
||||||
print("Cannot import rich, show markdown directly.")
|
|
||||||
print(t)
|
print(t)
|
||||||
grades = new
|
grades = new
|
||||||
if err > 0: err-=1
|
if err > 0:
|
||||||
|
err -= 1
|
||||||
except ReloginException as e:
|
except ReloginException as e:
|
||||||
logger.info("Relogin")
|
logger.info("Relogin")
|
||||||
sc_send("成绩监控","重新登录")
|
sc_send("成绩监控", "重新登录")
|
||||||
self.login()
|
self.login()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to update due to {e}")
|
logger.error(f"Failed to update due to {e}")
|
||||||
err+=1
|
err += 1
|
||||||
if err >= 5:
|
if err >= 5:
|
||||||
logger.error("Try to relogin")
|
logger.error("Try to relogin")
|
||||||
sc_send("成绩监控","多次失败,尝试重新登录")
|
sc_send("成绩监控", "多次失败,尝试重新登录")
|
||||||
self.login()
|
self.login()
|
||||||
|
|
||||||
logger.info(f"Next query will start after {self.interval_2}s")
|
logger.info(f"Next query will start after {self.interval_2}s")
|
||||||
sleep(self.interval_2)
|
sleep(self.interval_2)
|
||||||
logger.info("Normal terminated due to all grades is out.")
|
logger.info("Normal terminated due to all grades is out.")
|
||||||
sc_send("成绩监控","所有成绩均已公布")
|
sc_send("成绩监控", "所有成绩均已公布")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
les = Lessons()
|
les = Lessons()
|
||||||
les.auto_spider()
|
les.auto_spider()
|
||||||
# gra = Grade()
|
# gra = Grade()
|
||||||
# gra.auto_check()
|
# gra.auto_check()
|
||||||
|
41
utils.py
41
utils.py
@ -1,29 +1,37 @@
|
|||||||
from serverchan_sdk import sc_send as _sc_send
|
|
||||||
from datetime import datetime
|
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
|
||||||
from os import environ
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
import requests
|
|
||||||
import base64
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import re
|
import re
|
||||||
from typing import NoReturn,Optional,Iterable
|
from datetime import datetime
|
||||||
|
from os import environ
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Iterable, NoReturn, Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from serverchan_sdk import sc_send as _sc_send
|
||||||
|
|
||||||
|
|
||||||
class LessonsException(Exception):
|
class LessonsException(Exception):
|
||||||
"""自定义异常类"""
|
"""自定义异常类"""
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ReloginException(LessonsException):
|
class ReloginException(LessonsException):
|
||||||
"""用于处理需要重新登录的异常"""
|
"""用于处理需要重新登录的异常"""
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
# 配置日志
|
# 配置日志
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||||||
)
|
)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def sc_send(title: str, desp: str):
|
def sc_send(title: str, desp: str):
|
||||||
if not environ.get("SC_KEY"):
|
if not environ.get("SC_KEY"):
|
||||||
logger.error("SC_KEY 未设置,无法发送通知")
|
logger.error("SC_KEY 未设置,无法发送通知")
|
||||||
@ -43,16 +51,17 @@ def log_init():
|
|||||||
file.setLevel(logging.INFO)
|
file.setLevel(logging.INFO)
|
||||||
logger.addHandler(file) # 日志输出到文件
|
logger.addHandler(file) # 日志输出到文件
|
||||||
|
|
||||||
|
|
||||||
class URP:
|
class URP:
|
||||||
def __init__(self,base=None):
|
def __init__(self, base=None):
|
||||||
self.session = requests.session()
|
self.session = requests.session()
|
||||||
if base is None:
|
if base is None:
|
||||||
self.base = environ.get("base","http://jwstudent.lnu.edu.cn")
|
self.base = environ.get("base", "http://jwstudent.lnu.edu.cn")
|
||||||
else:
|
else:
|
||||||
self.base = base
|
self.base = base
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def env_check(required_keys:Optional[Iterable] = None) -> None|NoReturn:
|
def env_check(required_keys: Optional[Iterable] = None) -> None | NoReturn:
|
||||||
# 检查必需的环境变量
|
# 检查必需的环境变量
|
||||||
if required_keys is None:
|
if required_keys is None:
|
||||||
required_keys = [
|
required_keys = [
|
||||||
@ -65,7 +74,7 @@ class URP:
|
|||||||
for key in required_keys:
|
for key in required_keys:
|
||||||
if not environ.get(key):
|
if not environ.get(key):
|
||||||
raise LessonsException(f"请在环境变量中设置{key}")
|
raise LessonsException(f"请在环境变量中设置{key}")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _retry_request(
|
def _retry_request(
|
||||||
func, max_retries: int = 10, error_msg: str = "请求失败"
|
func, max_retries: int = 10, error_msg: str = "请求失败"
|
||||||
@ -123,7 +132,7 @@ class URP:
|
|||||||
md5_part2 = hashlib.md5(string.encode()).hexdigest().lower()
|
md5_part2 = hashlib.md5(string.encode()).hexdigest().lower()
|
||||||
final_result = md5_part1 + "*" + md5_part2
|
final_result = md5_part1 + "*" + md5_part2
|
||||||
return final_result
|
return final_result
|
||||||
|
|
||||||
def _login(self):
|
def _login(self):
|
||||||
"""登录模块"""
|
"""登录模块"""
|
||||||
username = environ.get("uname")
|
username = environ.get("uname")
|
||||||
@ -195,7 +204,6 @@ class URP:
|
|||||||
if not flag:
|
if not flag:
|
||||||
raise LessonsException("登录失败,无法获取token")
|
raise LessonsException("登录失败,无法获取token")
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _judge_logout(response: requests.Response):
|
def _judge_logout(response: requests.Response):
|
||||||
"""检查账号是否在其他地方被登录"""
|
"""检查账号是否在其他地方被登录"""
|
||||||
@ -203,6 +211,5 @@ class URP:
|
|||||||
raise ReloginException("有人登录了您的账号!")
|
raise ReloginException("有人登录了您的账号!")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
load_dotenv(override=True)
|
load_dotenv(override=True)
|
||||||
log_init()
|
log_init()
|
||||||
|
Reference in New Issue
Block a user