Compare commits

...

2 Commits

Author SHA1 Message Date
d78e9902f2 split code into URP class 2025-07-08 23:59:54 +08:00
32b3473659 add grade 2025-07-08 23:39:18 +08:00
3 changed files with 329 additions and 195 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
.env .env
logs logs
lessons* lessons*
__pycache__

307
main.py
View File

@ -1,198 +1,30 @@
import base64
import hashlib
import json from utils import LessonsException, ReloginException
import logging from utils import sc_send,URP,logger
import re import re
from collections import deque from collections import deque
from datetime import datetime
from os import environ from os import environ
from pathlib import Path from pathlib import Path
from time import sleep, time from time import sleep, time
from typing import Callable, List, Optional from typing import Callable, List, Optional
import dotenv
import pandas as pd import pandas as pd
import requests
from serverchan_sdk import sc_send as _sc_send
# 配置日志
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
Path("logs").mkdir(exist_ok=True) # 确保日志目录存在
now = datetime.now().strftime("%Y%m%d_%H%M%S")
file = logging.FileHandler(f"logs/lessons_{now}.log", mode="w", encoding="utf-8")
file.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
file.setLevel(logging.INFO)
logger.addHandler(file) # 日志输出到文件
class LessonsException(Exception): class Lessons(URP):
"""自定义异常类"""
pass
class ReloginException(LessonsException):
"""用于处理需要重新登录的异常"""
pass
def sc_send(title: str, desp: str):
if not environ.get("SC_KEY"):
logger.error("SC_KEY 未设置,无法发送通知")
return
sc_key = environ.get("SC_KEY")
try:
_sc_send(sc_key, title, desp, options={"tags": "自动选课"})
except Exception as e:
logger.error(f"发送失败: {e}")
class Lessons:
def __init__(self): def __init__(self):
self.session = requests.session() super().__init__()
self.env_check()
self.term: Optional[str] = None self.term: Optional[str] = None
self.fajhh: Optional[str] = None self.fajhh: Optional[str] = None
# 加载环境变量
dotenv.load_dotenv()
# 检查必需的环境变量
required_keys = [
"uname",
"password",
"recap_username",
"recap_password",
"FILE",
]
for key in required_keys:
if not environ.get(key):
raise LessonsException(f"请在环境变量中设置{key}")
self.base = environ.get("base", "http://jwstudent.lnu.edu.cn")
self.interval_1 = int(environ.get("INTERVAL_1", 2)) # 请求间隔默认为2秒 self.interval_1 = int(environ.get("INTERVAL_1", 2)) # 请求间隔默认为2秒
self.interval_2 = int(environ.get("INTERVAL_2", 10)) # 请求间隔默认为10秒 self.interval_2 = int(environ.get("INTERVAL_2", 10)) # 请求间隔默认为10秒
def _retry_request(
self, func, max_retries: int = 10, error_msg: str = "请求失败"
) -> requests.Response:
"""通用的请求重试方法"""
for attempt in range(1, max_retries + 1):
try:
response = func()
self.judge_logout(response)
return response
except (
requests.ConnectionError,
requests.HTTPError,
requests.Timeout,
) as e:
logger.warning(f"{error_msg}{type(e).__name__}: {e}")
if attempt < max_retries:
logger.info(f"{attempt}次重试")
else:
raise LessonsException(f"{error_msg}!请检查网络连接!")
# 这行不会被执行,但为了类型检查
raise LessonsException(f"{error_msg}!重试次数耗尽")
@staticmethod
def recapture(b64: str) -> str:
"""验证码识别"""
recap_username = environ.get("recap_username")
recap_password = environ.get("recap_password")
if not recap_username or not recap_password:
raise LessonsException("验证码识别服务配置不完整")
data = {
"username": recap_username,
"password": recap_password,
"ID": "04897896",
"b64": b64,
"version": "3.1.1",
}
try:
response = requests.post(
"http://www.fdyscloud.com.cn/tuling/predict", json=data, timeout=10
)
response.raise_for_status()
result = response.json()
return result["data"]["result"]
except (requests.RequestException, KeyError, json.JSONDecodeError) as e:
raise LessonsException(f"验证码识别失败: {e}")
@staticmethod
def pwd_md5(string: str) -> str:
md5_part1 = hashlib.md5((string + "{Urp602019}").encode()).hexdigest().lower()
md5_part2 = hashlib.md5(string.encode()).hexdigest().lower()
final_result = md5_part1 + "*" + md5_part2
return final_result
def _login(self):
"""登录模块"""
username = environ.get("uname")
password = environ.get("password")
if not username or not password:
raise LessonsException("用户名或密码未设置")
try:
# 获取登录页面的token
req = self.session.get("http://jwstudent.lnu.edu.cn/login")
req.raise_for_status()
html = req.text
match = re.search(r'name="tokenValue" value="(.+?)">', html)
if not match:
raise LessonsException("未找到 tokenValue")
token_value = match.group(1)
# 获取验证码
req = self.session.get(f"{self.base}/img/captcha.jpg")
req.raise_for_status()
im = req.content
b64 = base64.b64encode(im).decode("utf-8")
captcha_code = self.recapture(b64=b64)
logger.info(f"验证码识别结果: {captcha_code}")
hashed_password = self.pwd_md5(password)
# 模拟请求的 payload
payload = {
"j_username": username,
"j_password": hashed_password,
"j_captcha": captcha_code,
"tokenValue": token_value,
}
# 发送 POST 请求
url = f"{self.base}/j_spring_security_check"
headers = {
"User-Agent": "Mozilla/5.0",
"Content-Type": "application/x-www-form-urlencoded",
}
response = self.session.post(url, data=payload, headers=headers)
if "发生错误" in response.text:
err = re.search(r"<strong>发生错误!</strong>(.+)", response.text)
if err:
error_message = err.group(1).strip()
raise LessonsException(f"登录失败: {error_message}")
raise LessonsException("登录失败")
logger.info("登录成功")
return True
except requests.RequestException as e:
raise LessonsException(f"登录过程中网络错误: {e}")
def judge_logout(self, response: requests.Response):
"""检查账号是否在其他地方被登录"""
if response.url == f"{self.base}/login?errorCode=concurrentSessionExpired":
raise ReloginException("有人登录了您的账号!")
def get_base_info(self): def get_base_info(self):
res = self.session.get(f"{self.base}/student/courseSelect/gotoSelect/index") res = self.session.get(f"{self.base}/student/courseSelect/gotoSelect/index")
@ -388,18 +220,6 @@ class Lessons:
logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课") logger.warning(f"选课 {cl[2]} 结果查询超时,可能未成功选课")
return False return False
def login(self):
logger.info("尝试登录")
flag = False
for i in range(10):
try:
if self._login():
flag = True
break
except Exception as e:
logger.error(f"登录失败: {e}")
if not flag:
raise LessonsException("登录失败无法获取token")
def auto_spider(self): def auto_spider(self):
"""自动选课主程序""" """自动选课主程序"""
@ -516,7 +336,112 @@ class Lessons:
sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}") sc_send("选课异常", desp=f"选课过程中发生意外错误: {e}")
raise e raise e
class Grade(URP):
def __init__(self):
super().__init__()
self.total = None
required_keys = [
"uname",
"password",
"recap_username",
"recap_password",
]
self.env_check(required_keys)
self.interval_2 = int(environ.get("interval_2",3600))
def query(self) -> tuple[dict[str,dict[str,str]],set[str]]:
url = f"{self.base}/student/integratedQuery/scoreQuery/thisTermScores/index"
res = self._retry_request(lambda:self.session.get(url))
res.raise_for_status()
html = res.text
match = re.search(f"/student/integratedQuery/scoreQuery/.+/thisTermScores/data",html)
if match:
url = self.base+match.group(0)
else:
raise RuntimeError("Cannot find url")
# url = f"{self.base}/student/integratedQuery/scoreQuery/U6I5OXib09/thisTermScores/data"
res = self._retry_request(lambda :self.session.get(url))
# print(res.text)
res_json = res.json()
l = res_json[0]["list"]
if self.total is None:
self.total = len(l)
elif self.total != len(l):
sc_send("成绩查询异常",f"课程数发生变化 {self.total}!={len(l)}")
ret = {x["courseName"]: x for x in l if x["avgcj"].strip()!=""}
return ret,set(ret.keys())
@staticmethod
def format(x:dict[str,str]):
return f"|{x['courseName']}|{x['courseScore']}|{x['maxcj']}|{x['avgcj']}|"
def auto_check(self):
self.login()
# self.session.cookies.update({"student.urpSoft.cn":"aaapnXQb62LApgwx7lkFz","UqZBpD3n3iXPAw1X9DmYiUaISMkd8YhMUen0":"v1IraGSUs3hnH"})
grades = set()
self.query()
assert isinstance(self.total,int)
assert self.total > 0
err = 0
while len(grades) < self.total:
try:
logger.info("Querying")
cls, new = self.query()
cls:dict[str,dict[str,str]]
new:set[str]
if new != grades:
delta_names = new - grades
delta = [cls[x] for x in delta_names]
t = []
t.append("新成绩")
t.append("|学科|成绩|最高分|平均分|")
t.append("|-|-|-|-|")
t.extend(map(self.format,delta))
logger.info("\n".join(t))
t.append("---")
t.append("所有成绩")
t.append("|学科|成绩|最高分|平均分|")
t.append("|-|-|-|-|")
t.extend(map(self.format,cls.values()))
t = "\n".join(t)
sc_send("成绩发布",t)
try:
from rich.markdown import Markdown
from rich import print
print(Markdown(t))
except ImportError:
print("Cannot import rich, show markdown directly.")
print(t)
grades = new
if err > 0: err-=1
except ReloginException as e:
logger.info("Relogin")
sc_send("成绩监控","重新登录")
self.login()
except Exception as e:
logger.error(f"Failed to update due to {e}")
err+=1
if err >= 5:
logger.error("Try to relogin")
sc_send("成绩监控","多次失败,尝试重新登录")
self.login()
logger.info(f"Next query will start after {self.interval_2}s")
sleep(self.interval_2)
logger.info("Normal terminated due to all grades is out.")
sc_send("成绩监控","所有成绩均已公布")
if __name__ == "__main__": if __name__ == "__main__":
les = Lessons() les = Lessons()
les.auto_spider() les.auto_spider()
# gra = Grade()
# gra.auto_check()

208
utils.py Normal file
View File

@ -0,0 +1,208 @@
from serverchan_sdk import sc_send as _sc_send
from datetime import datetime
import logging
from pathlib import Path
from os import environ
from dotenv import load_dotenv
import requests
import base64
import hashlib
import json
import re
from typing import NoReturn,Optional,Iterable
class LessonsException(Exception):
"""自定义异常类"""
pass
class ReloginException(LessonsException):
"""用于处理需要重新登录的异常"""
pass
# 配置日志
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def sc_send(title: str, desp: str):
if not environ.get("SC_KEY"):
logger.error("SC_KEY 未设置,无法发送通知")
return
sc_key = environ.get("SC_KEY")
try:
_sc_send(sc_key, title, desp, options={"tags": "自动选课"})
except Exception as e:
logger.error(f"发送失败: {e}")
def log_init():
Path("logs").mkdir(exist_ok=True) # 确保日志目录存在
now = datetime.now().strftime("%Y%m%d_%H%M%S")
file = logging.FileHandler(f"logs/lessons_{now}.log", mode="w", encoding="utf-8")
file.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
file.setLevel(logging.INFO)
logger.addHandler(file) # 日志输出到文件
class URP:
def __init__(self,base=None):
self.session = requests.session()
if base is None:
self.base = environ.get("base","http://jwstudent.lnu.edu.cn")
else:
self.base = base
@staticmethod
def env_check(required_keys:Optional[Iterable] = None) -> None|NoReturn:
# 检查必需的环境变量
if required_keys is None:
required_keys = [
"uname",
"password",
"recap_username",
"recap_password",
"FILE",
]
for key in required_keys:
if not environ.get(key):
raise LessonsException(f"请在环境变量中设置{key}")
@staticmethod
def _retry_request(
func, max_retries: int = 10, error_msg: str = "请求失败"
) -> requests.Response:
"""通用的请求重试方法"""
for attempt in range(1, max_retries + 1):
try:
response = func()
URP._judge_logout(response)
return response
except (
requests.ConnectionError,
requests.HTTPError,
requests.Timeout,
) as e:
logger.warning(f"{error_msg}{type(e).__name__}: {e}")
if attempt < max_retries:
logger.info(f"{attempt}次重试")
else:
raise LessonsException(f"{error_msg}!请检查网络连接!")
# 这行不会被执行,但为了类型检查
raise LessonsException(f"{error_msg}!重试次数耗尽")
@staticmethod
def recapture(b64: str) -> str:
"""验证码识别"""
recap_username = environ.get("recap_username")
recap_password = environ.get("recap_password")
if not recap_username or not recap_password:
raise LessonsException("验证码识别服务配置不完整")
data = {
"username": recap_username,
"password": recap_password,
"ID": "04897896",
"b64": b64,
"version": "3.1.1",
}
try:
response = requests.post(
"http://www.fdyscloud.com.cn/tuling/predict", json=data, timeout=10
)
response.raise_for_status()
result = response.json()
return result["data"]["result"]
except (requests.RequestException, KeyError, json.JSONDecodeError) as e:
raise LessonsException(f"验证码识别失败: {e}")
@staticmethod
def pwd_md5(string: str) -> str:
md5_part1 = hashlib.md5((string + "{Urp602019}").encode()).hexdigest().lower()
md5_part2 = hashlib.md5(string.encode()).hexdigest().lower()
final_result = md5_part1 + "*" + md5_part2
return final_result
def _login(self):
"""登录模块"""
username = environ.get("uname")
password = environ.get("password")
if not username or not password:
raise LessonsException("用户名或密码未设置")
try:
# 获取登录页面的token
req = self.session.get("http://jwstudent.lnu.edu.cn/login")
req.raise_for_status()
html = req.text
match = re.search(r'name="tokenValue" value="(.+?)">', html)
if not match:
raise LessonsException("未找到 tokenValue")
token_value = match.group(1)
# 获取验证码
req = self.session.get(f"{self.base}/img/captcha.jpg")
req.raise_for_status()
im = req.content
b64 = base64.b64encode(im).decode("utf-8")
captcha_code = self.recapture(b64=b64)
logger.info(f"验证码识别结果: {captcha_code}")
hashed_password = self.pwd_md5(password)
# 模拟请求的 payload
payload = {
"j_username": username,
"j_password": hashed_password,
"j_captcha": captcha_code,
"tokenValue": token_value,
}
# 发送 POST 请求
url = f"{self.base}/j_spring_security_check"
headers = {
"User-Agent": "Mozilla/5.0",
"Content-Type": "application/x-www-form-urlencoded",
}
response = self.session.post(url, data=payload, headers=headers)
if "发生错误" in response.text:
err = re.search(r"<strong>发生错误!</strong>(.+)", response.text)
if err:
error_message = err.group(1).strip()
raise LessonsException(f"登录失败: {error_message}")
raise LessonsException("登录失败")
logger.info("登录成功")
return True
except requests.RequestException as e:
raise LessonsException(f"登录过程中网络错误: {e}")
def login(self):
logger.info("尝试登录")
flag = False
for i in range(10):
try:
if self._login():
flag = True
break
except Exception as e:
logger.error(f"登录失败: {e}")
if not flag:
raise LessonsException("登录失败无法获取token")
@staticmethod
def _judge_logout(response: requests.Response):
"""检查账号是否在其他地方被登录"""
if "errorCode=concurrentSessionExpired" in response.url:
raise ReloginException("有人登录了您的账号!")
load_dotenv(override=True)
log_init()