217 lines
6.9 KiB
Python
217 lines
6.9 KiB
Python
import base64
|
||
import hashlib
|
||
import json
|
||
import logging
|
||
import re
|
||
from datetime import datetime
|
||
from os import environ
|
||
from pathlib import Path
|
||
from typing import Iterable, NoReturn, Optional
|
||
|
||
import requests
|
||
from dotenv import load_dotenv
|
||
from serverchan_sdk import sc_send as _sc_send
|
||
|
||
|
||
class LessonsException(Exception):
|
||
"""自定义异常类"""
|
||
|
||
pass
|
||
|
||
|
||
class ReloginException(LessonsException):
|
||
"""用于处理需要重新登录的异常"""
|
||
|
||
pass
|
||
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def sc_send(title: str, desp: str):
|
||
if not environ.get("SC_KEY"):
|
||
logger.error("SC_KEY 未设置,无法发送通知")
|
||
return
|
||
sc_key = environ.get("SC_KEY")
|
||
try:
|
||
_sc_send(sc_key, title, desp, options={"tags": "自动选课"})
|
||
except Exception as e:
|
||
logger.error(f"发送失败: {e}")
|
||
|
||
|
||
def log_init():
|
||
Path("logs").mkdir(exist_ok=True) # 确保日志目录存在
|
||
now = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
file = logging.FileHandler(f"logs/lessons_{now}.log", mode="w", encoding="utf-8")
|
||
file.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
|
||
file.setLevel(logging.INFO)
|
||
logger.addHandler(file) # 日志输出到文件
|
||
|
||
|
||
class URP:
|
||
def __init__(self, base=None):
|
||
self.session = requests.session()
|
||
if base is None:
|
||
self.base = environ.get("base", "http://jwstudent.lnu.edu.cn")
|
||
else:
|
||
self.base = base
|
||
|
||
@staticmethod
|
||
def env_check(required_keys: Optional[Iterable] = None) -> None | NoReturn:
|
||
# 检查必需的环境变量
|
||
if required_keys is None:
|
||
required_keys = [
|
||
"uname",
|
||
"password",
|
||
"recap_username",
|
||
"recap_password",
|
||
"FILE",
|
||
]
|
||
for key in required_keys:
|
||
if not environ.get(key):
|
||
raise LessonsException(f"请在环境变量中设置{key}")
|
||
|
||
@staticmethod
|
||
def _retry_request(
|
||
func, max_retries: int = 10, error_msg: str = "请求失败"
|
||
) -> requests.Response:
|
||
"""通用的请求重试方法"""
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
response = func()
|
||
print(response.url)
|
||
URP._judge_logout(response)
|
||
return response
|
||
except (
|
||
requests.ConnectionError,
|
||
requests.HTTPError,
|
||
requests.Timeout,
|
||
) as e:
|
||
logger.warning(f"{error_msg}!{type(e).__name__}: {e}")
|
||
if attempt < max_retries:
|
||
logger.info(f"第{attempt}次重试")
|
||
else:
|
||
raise LessonsException(f"{error_msg}!请检查网络连接!")
|
||
|
||
# 这行不会被执行,但为了类型检查
|
||
raise LessonsException(f"{error_msg}!重试次数耗尽")
|
||
|
||
@staticmethod
|
||
def recapture(b64: str) -> str:
|
||
"""验证码识别"""
|
||
recap_username = environ.get("recap_username")
|
||
recap_password = environ.get("recap_password")
|
||
|
||
if not recap_username or not recap_password:
|
||
raise LessonsException("验证码识别服务配置不完整")
|
||
|
||
data = {
|
||
"username": recap_username,
|
||
"password": recap_password,
|
||
"ID": "04897896",
|
||
"b64": b64,
|
||
"version": "3.1.1",
|
||
}
|
||
|
||
try:
|
||
response = requests.post(
|
||
"http://www.fdyscloud.com.cn/tuling/predict", json=data, timeout=10
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
return result["data"]["result"]
|
||
except (requests.RequestException, KeyError, json.JSONDecodeError) as e:
|
||
raise LessonsException(f"验证码识别失败: {e}")
|
||
|
||
@staticmethod
|
||
def pwd_md5(string: str) -> str:
|
||
md5_part1 = hashlib.md5((string + "{Urp602019}").encode()).hexdigest().lower()
|
||
md5_part2 = hashlib.md5(string.encode()).hexdigest().lower()
|
||
final_result = md5_part1 + "*" + md5_part2
|
||
return final_result
|
||
|
||
def _login(self):
|
||
"""登录模块"""
|
||
username = environ.get("uname")
|
||
password = environ.get("password")
|
||
|
||
if not username or not password:
|
||
raise LessonsException("用户名或密码未设置")
|
||
|
||
try:
|
||
# 获取登录页面的token
|
||
req = self.session.get("http://jwstudent.lnu.edu.cn/login")
|
||
req.raise_for_status()
|
||
html = req.text
|
||
match = re.search(r'name="tokenValue" value="(.+?)">', html)
|
||
if not match:
|
||
raise LessonsException("未找到 tokenValue")
|
||
token_value = match.group(1)
|
||
|
||
# 获取验证码
|
||
req = self.session.get(f"{self.base}/img/captcha.jpg")
|
||
req.raise_for_status()
|
||
im = req.content
|
||
b64 = base64.b64encode(im).decode("utf-8")
|
||
captcha_code = self.recapture(b64=b64)
|
||
|
||
logger.info(f"验证码识别结果: {captcha_code}")
|
||
|
||
hashed_password = self.pwd_md5(password)
|
||
|
||
# 模拟请求的 payload
|
||
payload = {
|
||
"j_username": username,
|
||
"j_password": hashed_password,
|
||
"j_captcha": captcha_code,
|
||
"tokenValue": token_value,
|
||
}
|
||
|
||
# 发送 POST 请求
|
||
url = f"{self.base}/j_spring_security_check"
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0",
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
}
|
||
response = self.session.post(url, data=payload, headers=headers)
|
||
|
||
if "发生错误" in response.text:
|
||
err = re.search(r"<strong>发生错误!</strong>(.+)", response.text)
|
||
if err:
|
||
error_message = err.group(1).strip()
|
||
raise LessonsException(f"登录失败: {error_message}")
|
||
raise LessonsException("登录失败")
|
||
|
||
logger.info("登录成功")
|
||
return True
|
||
|
||
except requests.RequestException as e:
|
||
raise LessonsException(f"登录过程中网络错误: {e}")
|
||
|
||
def login(self):
|
||
logger.info("尝试登录")
|
||
flag = False
|
||
for i in range(10):
|
||
try:
|
||
if self._login():
|
||
flag = True
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"登录失败: {e}")
|
||
if not flag:
|
||
raise LessonsException("登录失败,无法获取token")
|
||
|
||
@staticmethod
|
||
def _judge_logout(response: requests.Response):
|
||
"""检查账号是否在其他地方被登录"""
|
||
if "/login" in response.url:
|
||
raise ReloginException("有人登录了您的账号!")
|
||
|
||
|
||
load_dotenv(override=True)
|
||
log_init()
|