compliance/history_viewer.py
gongwenxin fa343eb111 .
2025-08-07 15:07:38 +08:00

315 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import logging
import sqlite3
from pathlib import Path
from datetime import timedelta
from werkzeug.security import generate_password_hash, check_password_hash
from flask import Flask, request, jsonify, send_from_directory, session, redirect, url_for, render_template, g, flash, get_flashed_messages, abort
from flask_cors import CORS
from functools import wraps
import markdown
# --- PyInstaller Path Helpers ---
# For data files that should persist outside the bundle (e.g., database, reports)
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
# Running in a PyInstaller bundle
APP_ROOT = os.path.dirname(sys.executable)
else:
# Running in a normal Python environment
APP_ROOT = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(APP_ROOT, 'templates')
static_dir = os.path.join(APP_ROOT, 'static')
app = Flask(__name__, static_folder=static_dir, template_folder=template_dir)
CORS(app)
# --- 基本配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
DATABASE = os.path.join(APP_ROOT, 'users.db')
REPORTS_DIR = os.path.join(APP_ROOT, 'test_reports')
app.config['SECRET_KEY'] = os.urandom(24)
app.config['DATABASE'] = DATABASE
app.config['REPORTS_DIR'] = REPORTS_DIR
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
os.makedirs(app.config['REPORTS_DIR'], exist_ok=True)
# --- 数据库 Schema 和辅助函数 (与 flask_app.py 相同) ---
DB_SCHEMA = '''
DROP TABLE IF EXISTS user;
CREATE TABLE user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
);
'''
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(app.config['DATABASE'])
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def init_db(force_create=False):
if force_create or not os.path.exists(app.config['DATABASE']):
with app.app_context():
db = get_db()
db.cursor().executescript(DB_SCHEMA)
db.commit()
logger.info("数据库已初始化!")
create_default_user()
else:
logger.info("数据库已存在。")
def create_default_user(username="admin", password="7#Xq9$Lm*2!Pw@5"):
with app.app_context():
db = get_db()
user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
if user is None:
db.execute("INSERT INTO user (username, password_hash) VALUES (?, ?)", (username, generate_password_hash(password)))
db.commit()
logger.info(f"已创建默认用户: {username}")
else:
logger.info(f"默认用户 {username} 已存在。")
@app.cli.command('init-db')
def init_db_command():
init_db(force_create=True)
print("已初始化数据库。")
# --- 用户认证 (与 flask_app.py 相同) ---
@app.route('/login', methods=('GET', 'POST'))
def login():
if g.user:
return redirect(url_for('list_history'))
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
error = None
user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
if user is None:
error = '用户名不存在。'
elif not check_password_hash(user['password_hash'], password):
error = '密码错误。'
if error is None:
session.clear()
session['user_id'] = user['id']
session['username'] = user['username']
session.permanent = True
return redirect(url_for('list_history'))
flash(error)
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
flash('您已成功登出。')
return redirect(url_for('login'))
def login_required(view):
@wraps(view)
def wrapped_view(**kwargs):
# if g.user is None:
# return redirect(url_for('login'))
return view(**kwargs)
return wrapped_view
@app.before_request
def load_logged_in_user():
user_id = session.get('user_id')
if user_id is None:
g.user = None
else:
g.user = get_db().execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
# --- LLM配置视图 ---
CRITERIA_FILE_PATH = os.path.join(APP_ROOT, 'custom_testcases', 'llm', 'compliance_criteria.json')
@app.route('/llm-config', methods=['GET', 'POST'])
@login_required
def llm_config():
criteria_for_template = []
file_exists = os.path.exists(CRITERIA_FILE_PATH)
if request.method == 'POST':
# 从表单获取所有名为'criteria'的输入项,作为一个列表
criteria_list = request.form.getlist('criteria')
# 过滤掉用户可能提交的空规则
criteria_list = [item.strip() for item in criteria_list if item.strip()]
try:
# 将规则列表格式化为美观的JSON并保存
pretty_content = json.dumps(criteria_list, indent=2, ensure_ascii=False)
with open(CRITERIA_FILE_PATH, 'w', encoding='utf-8') as f:
f.write(pretty_content)
flash('LLM合规性标准已成功保存', 'success')
except Exception as e:
flash(f'保存文件时发生未知错误: {e}', 'error')
# 无论是GET还是POST请求后都重新从文件中读取最新的规则列表用于显示
if file_exists:
try:
with open(CRITERIA_FILE_PATH, 'r', encoding='utf-8') as f:
criteria_for_template = json.load(f)
# 确保文件内容确实是一个列表
if not isinstance(criteria_for_template, list):
flash('配置文件格式错误内容应为JSON数组。已重置为空列表。', 'error')
criteria_for_template = []
except Exception as e:
flash(f'读取配置文件时出错: {e}', 'error')
criteria_for_template = []
# 准备一个用于页面展示的示例API信息
example_api_info = {
"path_template": "/api/dms/instance/v1/message/push/myschema/1.0",
"method": "POST",
"title": "数据推送接口",
"description": "用于向系统推送标准格式的数据。",
"schema_request_body": {"...": "... (此处为请求体Schema定义)"},
"instance_url": "http://example.com/api/dms/instance/v1/message/push/myschema/1.0",
"instance_request_headers": {"X-Tenant-ID": "tenant-001", "...": "..."},
"instance_request_body": {"id": "123", "data": "example"},
"instance_response_status": 200,
"instance_response_body": {"code": 0, "message": "success", "data": True}
}
return render_template('llm_config.html', criteria=criteria_for_template, file_exists=file_exists, example_api_info=json.dumps(example_api_info, indent=2, ensure_ascii=False))
# --- 文件下载路由 ---
@app.route('/download/<path:run_id>/<path:filename>')
@login_required
def download_report(run_id, filename):
"""安全地提供指定运行记录中的报告文件下载。"""
# 清理输入,防止目录遍历攻击
run_id_safe = Path(run_id).name
filename_safe = Path(filename).name
reports_dir = Path(app.config['REPORTS_DIR']).resolve()
run_dir = (reports_dir / run_id_safe).resolve()
# 安全检查确保请求的目录是REPORTS_DIR的子目录
if not run_dir.is_dir() or run_dir.parent != reports_dir:
abort(404, "找不到指定的测试记录或权限不足。")
return send_from_directory(run_dir, filename_safe, as_attachment=True)
# --- 新增PDF文件预览路由 ---
@app.route('/view_pdf/<path:run_id>')
@login_required
def view_pdf_report(run_id):
"""安全地提供PDF报告文件以内联方式查看。"""
run_id_safe = Path(run_id).name
filename_safe = "report_cn.pdf"
reports_dir = Path(app.config['REPORTS_DIR']).resolve()
run_dir = (reports_dir / run_id_safe).resolve()
# 安全检查
if not run_dir.is_dir() or run_dir.parent != reports_dir:
abort(404, "找不到指定的测试记录或权限不足。")
pdf_path = run_dir / filename_safe
if not pdf_path.exists():
abort(404, "未找到PDF报告文件。")
return send_from_directory(run_dir, filename_safe)
# --- 历史记录视图 ---
@app.route('/')
@login_required
def list_history():
history = []
reports_path = Path(app.config['REPORTS_DIR'])
if not reports_path.is_dir():
flash('报告目录不存在。')
return render_template('history.html', history=[])
# 获取所有子目录(即测试运行记录)
run_dirs = [d for d in reports_path.iterdir() if d.is_dir()]
# 按名称(时间戳)降序排序
run_dirs.sort(key=lambda x: x.name, reverse=True)
for run_dir in run_dirs:
summary_path = run_dir / 'summary.json'
details_path = run_dir / 'api_call_details.md'
run_info = {'id': run_dir.name, 'summary': None, 'has_details': details_path.exists()}
if summary_path.exists():
try:
with open(summary_path, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
run_info['summary'] = summary_data
except (json.JSONDecodeError, IOError) as e:
logger.error(f"无法读取或解析摘要文件 {summary_path}: {e}")
run_info['summary'] = {'error': '无法加载摘要'}
history.append(run_info)
return render_template('history.html', history=history)
@app.route('/details/<run_id>')
@login_required
def show_details(run_id):
run_id = Path(run_id).name # Sanitize input
run_dir = Path(app.config['REPORTS_DIR']) / run_id
if not run_dir.is_dir():
return "找不到指定的测试记录。", 404
summary_path = run_dir / 'summary.json'
details_path = run_dir / 'api_call_details.md'
pdf_path = run_dir / 'report_cn.pdf' # 新增PDF路径
summary_content = "{}"
details_content = "### 未找到API调用详情报告"
has_pdf_report = pdf_path.exists() # 检查PDF是否存在
has_md_report = details_path.exists() # 检查MD报告是否存在
if summary_path.exists():
try:
with open(summary_path, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
summary_content = json.dumps(summary_data, indent=2, ensure_ascii=False)
except Exception as e:
summary_content = f"加载摘要文件出错: {e}"
if has_md_report:
try:
with open(details_path, 'r', encoding='utf-8') as f:
# 将Markdown转换为HTML
details_content = markdown.markdown(f.read(), extensions=['fenced_code', 'tables', 'def_list', 'attr_list'])
except Exception as e:
details_content = f"加载详情文件出错: {e}"
return render_template('history_detail.html',
run_id=run_id,
summary_content=summary_content,
details_content=details_content,
has_pdf_report=has_pdf_report,
has_md_report=has_md_report)
# --- 根路径重定向 ---
@app.route('/index')
def index_redirect():
return redirect(url_for('list_history'))
if __name__ == '__main__':
# 首次运行时确保数据库和用户存在
init_db()
# 使用5051端口避免与api_server.py冲突
app.run(debug=True, host='0.0.0.0', port=5051)