315 lines
12 KiB
Python
315 lines
12 KiB
Python
import os
|
||
import sys
|
||
import json
|
||
import logging
|
||
import sqlite3
|
||
from pathlib import Path
|
||
from datetime import timedelta
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
from flask import Flask, request, jsonify, send_from_directory, session, redirect, url_for, render_template, g, flash, get_flashed_messages, abort
|
||
from flask_cors import CORS
|
||
from functools import wraps
|
||
import markdown
|
||
|
||
# --- PyInstaller Path Helpers ---
|
||
# For data files that should persist outside the bundle (e.g., database, reports)
|
||
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
|
||
# Running in a PyInstaller bundle
|
||
APP_ROOT = os.path.dirname(sys.executable)
|
||
else:
|
||
# Running in a normal Python environment
|
||
APP_ROOT = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
template_dir = os.path.join(APP_ROOT, 'templates')
|
||
static_dir = os.path.join(APP_ROOT, 'static')
|
||
app = Flask(__name__, static_folder=static_dir, template_folder=template_dir)
|
||
CORS(app)
|
||
|
||
# --- 基本配置 ---
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
logger = logging.getLogger(__name__)
|
||
|
||
DATABASE = os.path.join(APP_ROOT, 'users.db')
|
||
REPORTS_DIR = os.path.join(APP_ROOT, 'test_reports')
|
||
|
||
app.config['SECRET_KEY'] = os.urandom(24)
|
||
app.config['DATABASE'] = DATABASE
|
||
app.config['REPORTS_DIR'] = REPORTS_DIR
|
||
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
|
||
|
||
os.makedirs(app.config['REPORTS_DIR'], exist_ok=True)
|
||
|
||
# --- 数据库 Schema 和辅助函数 (与 flask_app.py 相同) ---
|
||
DB_SCHEMA = '''
|
||
DROP TABLE IF EXISTS user;
|
||
CREATE TABLE user (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
username TEXT UNIQUE NOT NULL,
|
||
password_hash TEXT NOT NULL
|
||
);
|
||
'''
|
||
|
||
def get_db():
|
||
db = getattr(g, '_database', None)
|
||
if db is None:
|
||
db = g._database = sqlite3.connect(app.config['DATABASE'])
|
||
db.row_factory = sqlite3.Row
|
||
return db
|
||
|
||
@app.teardown_appcontext
|
||
def close_connection(exception):
|
||
db = getattr(g, '_database', None)
|
||
if db is not None:
|
||
db.close()
|
||
|
||
def init_db(force_create=False):
|
||
if force_create or not os.path.exists(app.config['DATABASE']):
|
||
with app.app_context():
|
||
db = get_db()
|
||
db.cursor().executescript(DB_SCHEMA)
|
||
db.commit()
|
||
logger.info("数据库已初始化!")
|
||
create_default_user()
|
||
else:
|
||
logger.info("数据库已存在。")
|
||
|
||
def create_default_user(username="admin", password="7#Xq9$Lm*2!Pw@5"):
|
||
with app.app_context():
|
||
db = get_db()
|
||
user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
|
||
if user is None:
|
||
db.execute("INSERT INTO user (username, password_hash) VALUES (?, ?)", (username, generate_password_hash(password)))
|
||
db.commit()
|
||
logger.info(f"已创建默认用户: {username}")
|
||
else:
|
||
logger.info(f"默认用户 {username} 已存在。")
|
||
|
||
@app.cli.command('init-db')
|
||
def init_db_command():
|
||
init_db(force_create=True)
|
||
print("已初始化数据库。")
|
||
|
||
# --- 用户认证 (与 flask_app.py 相同) ---
|
||
@app.route('/login', methods=('GET', 'POST'))
|
||
def login():
|
||
if g.user:
|
||
return redirect(url_for('list_history'))
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
db = get_db()
|
||
error = None
|
||
user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
|
||
if user is None:
|
||
error = '用户名不存在。'
|
||
elif not check_password_hash(user['password_hash'], password):
|
||
error = '密码错误。'
|
||
if error is None:
|
||
session.clear()
|
||
session['user_id'] = user['id']
|
||
session['username'] = user['username']
|
||
session.permanent = True
|
||
return redirect(url_for('list_history'))
|
||
flash(error)
|
||
return render_template('login.html')
|
||
|
||
@app.route('/logout')
|
||
def logout():
|
||
session.clear()
|
||
flash('您已成功登出。')
|
||
return redirect(url_for('login'))
|
||
|
||
def login_required(view):
|
||
@wraps(view)
|
||
def wrapped_view(**kwargs):
|
||
# if g.user is None:
|
||
# return redirect(url_for('login'))
|
||
return view(**kwargs)
|
||
return wrapped_view
|
||
|
||
@app.before_request
|
||
def load_logged_in_user():
|
||
user_id = session.get('user_id')
|
||
if user_id is None:
|
||
g.user = None
|
||
else:
|
||
g.user = get_db().execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
|
||
|
||
# --- LLM配置视图 ---
|
||
CRITERIA_FILE_PATH = os.path.join(APP_ROOT, 'custom_testcases', 'llm', 'compliance_criteria.json')
|
||
|
||
@app.route('/llm-config', methods=['GET', 'POST'])
|
||
@login_required
|
||
def llm_config():
|
||
criteria_for_template = []
|
||
file_exists = os.path.exists(CRITERIA_FILE_PATH)
|
||
|
||
if request.method == 'POST':
|
||
# 从表单获取所有名为'criteria'的输入项,作为一个列表
|
||
criteria_list = request.form.getlist('criteria')
|
||
# 过滤掉用户可能提交的空规则
|
||
criteria_list = [item.strip() for item in criteria_list if item.strip()]
|
||
|
||
try:
|
||
# 将规则列表格式化为美观的JSON并保存
|
||
pretty_content = json.dumps(criteria_list, indent=2, ensure_ascii=False)
|
||
with open(CRITERIA_FILE_PATH, 'w', encoding='utf-8') as f:
|
||
f.write(pretty_content)
|
||
flash('LLM合规性标准已成功保存!', 'success')
|
||
except Exception as e:
|
||
flash(f'保存文件时发生未知错误: {e}', 'error')
|
||
|
||
# 无论是GET还是POST请求后,都重新从文件中读取最新的规则列表用于显示
|
||
if file_exists:
|
||
try:
|
||
with open(CRITERIA_FILE_PATH, 'r', encoding='utf-8') as f:
|
||
criteria_for_template = json.load(f)
|
||
# 确保文件内容确实是一个列表
|
||
if not isinstance(criteria_for_template, list):
|
||
flash('配置文件格式错误:内容应为JSON数组。已重置为空列表。', 'error')
|
||
criteria_for_template = []
|
||
except Exception as e:
|
||
flash(f'读取配置文件时出错: {e}', 'error')
|
||
criteria_for_template = []
|
||
|
||
# 准备一个用于页面展示的示例API信息
|
||
example_api_info = {
|
||
"path_template": "/api/dms/instance/v1/message/push/myschema/1.0",
|
||
"method": "POST",
|
||
"title": "数据推送接口",
|
||
"description": "用于向系统推送标准格式的数据。",
|
||
"schema_request_body": {"...": "... (此处为请求体Schema定义)"},
|
||
"instance_url": "http://example.com/api/dms/instance/v1/message/push/myschema/1.0",
|
||
"instance_request_headers": {"X-Tenant-ID": "tenant-001", "...": "..."},
|
||
"instance_request_body": {"id": "123", "data": "example"},
|
||
"instance_response_status": 200,
|
||
"instance_response_body": {"code": 0, "message": "success", "data": True}
|
||
}
|
||
|
||
return render_template('llm_config.html', criteria=criteria_for_template, file_exists=file_exists, example_api_info=json.dumps(example_api_info, indent=2, ensure_ascii=False))
|
||
|
||
# --- 文件下载路由 ---
|
||
@app.route('/download/<path:run_id>/<path:filename>')
|
||
@login_required
|
||
def download_report(run_id, filename):
|
||
"""安全地提供指定运行记录中的报告文件下载。"""
|
||
# 清理输入,防止目录遍历攻击
|
||
run_id_safe = Path(run_id).name
|
||
filename_safe = Path(filename).name
|
||
|
||
reports_dir = Path(app.config['REPORTS_DIR']).resolve()
|
||
run_dir = (reports_dir / run_id_safe).resolve()
|
||
|
||
# 安全检查:确保请求的目录是REPORTS_DIR的子目录
|
||
if not run_dir.is_dir() or run_dir.parent != reports_dir:
|
||
abort(404, "找不到指定的测试记录或权限不足。")
|
||
|
||
return send_from_directory(run_dir, filename_safe, as_attachment=True)
|
||
|
||
# --- 新增:PDF文件预览路由 ---
|
||
@app.route('/view_pdf/<path:run_id>')
|
||
@login_required
|
||
def view_pdf_report(run_id):
|
||
"""安全地提供PDF报告文件以内联方式查看。"""
|
||
run_id_safe = Path(run_id).name
|
||
filename_safe = "report_cn.pdf"
|
||
|
||
reports_dir = Path(app.config['REPORTS_DIR']).resolve()
|
||
run_dir = (reports_dir / run_id_safe).resolve()
|
||
|
||
# 安全检查
|
||
if not run_dir.is_dir() or run_dir.parent != reports_dir:
|
||
abort(404, "找不到指定的测试记录或权限不足。")
|
||
|
||
pdf_path = run_dir / filename_safe
|
||
if not pdf_path.exists():
|
||
abort(404, "未找到PDF报告文件。")
|
||
|
||
return send_from_directory(run_dir, filename_safe)
|
||
|
||
# --- 历史记录视图 ---
|
||
@app.route('/')
|
||
@login_required
|
||
def list_history():
|
||
history = []
|
||
reports_path = Path(app.config['REPORTS_DIR'])
|
||
if not reports_path.is_dir():
|
||
flash('报告目录不存在。')
|
||
return render_template('history.html', history=[])
|
||
|
||
# 获取所有子目录(即测试运行记录)
|
||
run_dirs = [d for d in reports_path.iterdir() if d.is_dir()]
|
||
# 按名称(时间戳)降序排序
|
||
run_dirs.sort(key=lambda x: x.name, reverse=True)
|
||
|
||
for run_dir in run_dirs:
|
||
summary_path = run_dir / 'summary.json'
|
||
details_path = run_dir / 'api_call_details.md'
|
||
run_info = {'id': run_dir.name, 'summary': None, 'has_details': details_path.exists()}
|
||
|
||
if summary_path.exists():
|
||
try:
|
||
with open(summary_path, 'r', encoding='utf-8') as f:
|
||
summary_data = json.load(f)
|
||
run_info['summary'] = summary_data
|
||
except (json.JSONDecodeError, IOError) as e:
|
||
logger.error(f"无法读取或解析摘要文件 {summary_path}: {e}")
|
||
run_info['summary'] = {'error': '无法加载摘要'}
|
||
|
||
history.append(run_info)
|
||
|
||
return render_template('history.html', history=history)
|
||
|
||
@app.route('/details/<run_id>')
|
||
@login_required
|
||
def show_details(run_id):
|
||
run_id = Path(run_id).name # Sanitize input
|
||
run_dir = Path(app.config['REPORTS_DIR']) / run_id
|
||
|
||
if not run_dir.is_dir():
|
||
return "找不到指定的测试记录。", 404
|
||
|
||
summary_path = run_dir / 'summary.json'
|
||
details_path = run_dir / 'api_call_details.md'
|
||
pdf_path = run_dir / 'report_cn.pdf' # 新增PDF路径
|
||
|
||
summary_content = "{}"
|
||
details_content = "### 未找到API调用详情报告"
|
||
|
||
has_pdf_report = pdf_path.exists() # 检查PDF是否存在
|
||
has_md_report = details_path.exists() # 检查MD报告是否存在
|
||
|
||
if summary_path.exists():
|
||
try:
|
||
with open(summary_path, 'r', encoding='utf-8') as f:
|
||
summary_data = json.load(f)
|
||
summary_content = json.dumps(summary_data, indent=2, ensure_ascii=False)
|
||
except Exception as e:
|
||
summary_content = f"加载摘要文件出错: {e}"
|
||
|
||
if has_md_report:
|
||
try:
|
||
with open(details_path, 'r', encoding='utf-8') as f:
|
||
# 将Markdown转换为HTML
|
||
details_content = markdown.markdown(f.read(), extensions=['fenced_code', 'tables', 'def_list', 'attr_list'])
|
||
except Exception as e:
|
||
details_content = f"加载详情文件出错: {e}"
|
||
|
||
return render_template('history_detail.html',
|
||
run_id=run_id,
|
||
summary_content=summary_content,
|
||
details_content=details_content,
|
||
has_pdf_report=has_pdf_report,
|
||
has_md_report=has_md_report)
|
||
|
||
|
||
# --- 根路径重定向 ---
|
||
@app.route('/index')
|
||
def index_redirect():
|
||
return redirect(url_for('list_history'))
|
||
|
||
if __name__ == '__main__':
|
||
# 首次运行时确保数据库和用户存在
|
||
init_db()
|
||
# 使用5051端口避免与api_server.py冲突
|
||
app.run(debug=True, host='0.0.0.0', port=5051) |