clean code
This commit is contained in:
68
main.py
68
main.py
@ -4,13 +4,14 @@ import base64
|
|||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
import logging
|
import logging
|
||||||
from typing import List, Dict, Optional
|
from typing import List, Optional,Callable
|
||||||
from time import sleep,time
|
from time import sleep,time
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
from bs4 import BeautifulSoup, Tag
|
from bs4 import BeautifulSoup
|
||||||
from os import environ
|
from os import environ
|
||||||
|
from pathlib import Path
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@ -19,6 +20,8 @@ from serverchan_sdk import sc_send as _sc_send
|
|||||||
# 配置日志
|
# 配置日志
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
Path("logs").mkdir(exist_ok=True) # 确保日志目录存在
|
||||||
now = datetime.now().strftime("%Y%m%d_%H%M%S")
|
now = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
file = logging.FileHandler(f'logs/lessons_{now}.log', mode='w', encoding='utf-8')
|
file = logging.FileHandler(f'logs/lessons_{now}.log', mode='w', encoding='utf-8')
|
||||||
file.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
file.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
||||||
@ -44,41 +47,20 @@ class Lessons:
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.session = requests.session()
|
self.session = requests.session()
|
||||||
self.lessons_list: List[Dict[str, str]] = []
|
|
||||||
self.username: Optional[str] = None
|
|
||||||
self.term: Optional[str] = None
|
self.term: Optional[str] = None
|
||||||
self.fajhh: Optional[str] = None
|
self.fajhh: Optional[str] = None
|
||||||
self.token: Optional[str] = None
|
|
||||||
|
|
||||||
# 加载环境变量
|
# 加载环境变量
|
||||||
dotenv.load_dotenv()
|
dotenv.load_dotenv()
|
||||||
|
|
||||||
# 检查必需的环境变量
|
# 检查必需的环境变量
|
||||||
required_keys = ["uname", "password", "recap_username", "recap_password"]
|
required_keys = ["uname", "password", "recap_username", "recap_password","FILE","SC_KEY"]
|
||||||
for key in required_keys:
|
for key in required_keys:
|
||||||
if not environ.get(key):
|
if not environ.get(key):
|
||||||
raise LessonsException(f"请在环境变量中设置{key}")
|
raise LessonsException(f"请在环境变量中设置{key}")
|
||||||
|
|
||||||
self.base = environ.get("base", "http://jwstudent.lnu.edu.cn")
|
self.base = environ.get("base", "http://jwstudent.lnu.edu.cn")
|
||||||
|
|
||||||
def _retry(self, max_retries: int = 5):
|
|
||||||
retry = 0
|
|
||||||
def _wrapper(func, *args, **kwargs):
|
|
||||||
nonlocal retry
|
|
||||||
"""装饰器:重试请求"""
|
|
||||||
try:
|
|
||||||
return func(self, *args, **kwargs)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"重试因为 {e}")
|
|
||||||
retry += 1
|
|
||||||
if retry > max_retries:
|
|
||||||
raise LessonsException("重试次数超过限制")
|
|
||||||
else:
|
|
||||||
return _wrapper(func, *args, **kwargs)
|
|
||||||
return _wrapper
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def _retry_request(self, func, max_retries: int = 10, error_msg: str = "请求失败") -> requests.Response:
|
def _retry_request(self, func, max_retries: int = 10, error_msg: str = "请求失败") -> requests.Response:
|
||||||
"""通用的请求重试方法"""
|
"""通用的请求重试方法"""
|
||||||
for attempt in range(1, max_retries + 1):
|
for attempt in range(1, max_retries + 1):
|
||||||
@ -130,7 +112,6 @@ class Lessons:
|
|||||||
final_result = md5_part1 + '*' + md5_part2
|
final_result = md5_part1 + '*' + md5_part2
|
||||||
return final_result
|
return final_result
|
||||||
|
|
||||||
# @self._r
|
|
||||||
def _login(self):
|
def _login(self):
|
||||||
"""登录模块"""
|
"""登录模块"""
|
||||||
username = environ.get("uname")
|
username = environ.get("uname")
|
||||||
@ -139,7 +120,6 @@ class Lessons:
|
|||||||
if not username or not password:
|
if not username or not password:
|
||||||
raise LessonsException("用户名或密码未设置")
|
raise LessonsException("用户名或密码未设置")
|
||||||
|
|
||||||
self.username = username
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 获取登录页面的token
|
# 获取登录页面的token
|
||||||
@ -158,9 +138,6 @@ class Lessons:
|
|||||||
b64 = base64.b64encode(im).decode('utf-8')
|
b64 = base64.b64encode(im).decode('utf-8')
|
||||||
captcha_code = self.recapture(b64=b64)
|
captcha_code = self.recapture(b64=b64)
|
||||||
|
|
||||||
# 保存验证码图片用于调试
|
|
||||||
# with open("captcha.jpg", "wb") as f:
|
|
||||||
# f.write(im)
|
|
||||||
logger.info(f"验证码识别结果: {captcha_code}")
|
logger.info(f"验证码识别结果: {captcha_code}")
|
||||||
|
|
||||||
hashed_password = self.pwd_md5(password)
|
hashed_password = self.pwd_md5(password)
|
||||||
@ -200,23 +177,6 @@ class Lessons:
|
|||||||
if response.url == f"{self.base}/login?errorCode=concurrentSessionExpired":
|
if response.url == f"{self.base}/login?errorCode=concurrentSessionExpired":
|
||||||
raise LessonsException("有人登录了您的账号!")
|
raise LessonsException("有人登录了您的账号!")
|
||||||
|
|
||||||
def judge_choose(self, bs: BeautifulSoup):
|
|
||||||
"""判断是否可以选课"""
|
|
||||||
alert = bs.find("div", {"class": "alert alert-block alert-danger"})
|
|
||||||
if alert is not None:
|
|
||||||
raise LessonsException("对不起,当前为非选课阶段!")
|
|
||||||
|
|
||||||
def get_tokenvalue(self, bs: BeautifulSoup) -> str:
|
|
||||||
"""获取token值"""
|
|
||||||
token_element = bs.find("input", {"type": "hidden", "id": "tokenValue"})
|
|
||||||
if not token_element or not isinstance(token_element, Tag):
|
|
||||||
raise LessonsException("未找到tokenValue元素")
|
|
||||||
|
|
||||||
value = token_element.get("value")
|
|
||||||
if not value:
|
|
||||||
raise LessonsException("未找到tokenValue")
|
|
||||||
return str(value)
|
|
||||||
|
|
||||||
def get_base_info(self):
|
def get_base_info(self):
|
||||||
res = self.session.get(f"{self.base}/student/courseSelect/gotoSelect/index")
|
res = self.session.get(f"{self.base}/student/courseSelect/gotoSelect/index")
|
||||||
res.raise_for_status()
|
res.raise_for_status()
|
||||||
@ -240,7 +200,19 @@ class Lessons:
|
|||||||
|
|
||||||
def read_lessons(self) -> List[tuple[str, str, str]]:
|
def read_lessons(self) -> List[tuple[str, str, str]]:
|
||||||
classes = []
|
classes = []
|
||||||
df = pd.read_csv("class.csv")
|
file = Path(environ.get("FILE", "class.xlsx"))
|
||||||
|
if not file.is_file():
|
||||||
|
raise LessonsException(f"课程文件 {file} 不存在,请检查路径")
|
||||||
|
d:dict[str, Callable[[Path], pd.DataFrame]] = {
|
||||||
|
".csv": pd.read_csv,
|
||||||
|
".xlsx": pd.read_excel,
|
||||||
|
".xls": pd.read_excel,
|
||||||
|
".json": pd.read_json
|
||||||
|
}
|
||||||
|
func = d.get(file.suffix.lower())
|
||||||
|
if func is None:
|
||||||
|
raise LessonsException(f"不支持的文件格式: {file.suffix}. 仅支持 .csv, .xlsx, .xls, .json 格式")
|
||||||
|
df = func(file)
|
||||||
df.columns = df.columns.str.strip() # 去除列名两端的空格
|
df.columns = df.columns.str.strip() # 去除列名两端的空格
|
||||||
for col in ["课程号", "课序号", "课程名"]:
|
for col in ["课程号", "课序号", "课程名"]:
|
||||||
if col not in df.columns:
|
if col not in df.columns:
|
||||||
@ -458,4 +430,4 @@ class Lessons:
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
les = Lessons()
|
les = Lessons()
|
||||||
les.auto_spider()
|
les.auto_spider()
|
||||||
|
Reference in New Issue
Block a user