import os import sys import json import logging import sqlite3 from pathlib import Path from werkzeug.security import generate_password_hash, check_password_hash from flask import Flask, request, jsonify, send_from_directory, session, redirect, url_for, render_template, g, flash, get_flashed_messages, abort from flask_cors import CORS from functools import wraps import markdown app = Flask(__name__, static_folder='static', template_folder='templates') CORS(app) # --- 基本配置 --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) APP_ROOT = os.path.dirname(os.path.abspath(__file__)) DATABASE = os.path.join(APP_ROOT, 'users.db') REPORTS_DIR = os.path.join(APP_ROOT, 'test_reports') app.config['SECRET_KEY'] = os.urandom(24) app.config['DATABASE'] = DATABASE app.config['REPORTS_DIR'] = REPORTS_DIR os.makedirs(app.config['REPORTS_DIR'], exist_ok=True) # --- 数据库 Schema 和辅助函数 (与 flask_app.py 相同) --- DB_SCHEMA = ''' DROP TABLE IF EXISTS user; CREATE TABLE user ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL ); ''' def get_db(): db = getattr(g, '_database', None) if db is None: db = g._database = sqlite3.connect(app.config['DATABASE']) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, '_database', None) if db is not None: db.close() def init_db(force_create=False): if force_create or not os.path.exists(app.config['DATABASE']): with app.app_context(): db = get_db() db.cursor().executescript(DB_SCHEMA) db.commit() logger.info("数据库已初始化!") create_default_user() else: logger.info("数据库已存在。") def create_default_user(username="admin", password="admin123"): with app.app_context(): db = get_db() user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone() if user is None: db.execute("INSERT INTO user (username, password_hash) VALUES (?, ?)", (username, generate_password_hash(password))) db.commit() logger.info(f"已创建默认用户: {username}") else: logger.info(f"默认用户 {username} 已存在。") @app.cli.command('init-db') def init_db_command(): init_db(force_create=True) print("已初始化数据库。") # --- 用户认证 (与 flask_app.py 相同) --- @app.route('/login', methods=('GET', 'POST')) def login(): if g.user: return redirect(url_for('list_history')) if request.method == 'POST': username = request.form['username'] password = request.form['password'] db = get_db() error = None user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone() if user is None: error = '用户名不存在。' elif not check_password_hash(user['password_hash'], password): error = '密码错误。' if error is None: session.clear() session['user_id'] = user['id'] session['username'] = user['username'] return redirect(url_for('list_history')) flash(error) return render_template('login.html') @app.route('/logout') def logout(): session.clear() flash('您已成功登出。') return redirect(url_for('login')) def login_required(view): @wraps(view) def wrapped_view(**kwargs): if g.user is None: return redirect(url_for('login')) return view(**kwargs) return wrapped_view @app.before_request def load_logged_in_user(): user_id = session.get('user_id') if user_id is None: g.user = None else: g.user = get_db().execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone() # --- LLM配置视图 --- CRITERIA_FILE_PATH = os.path.join(APP_ROOT, 'custom_testcases', 'llm', 'compliance_criteria.json') @app.route('/llm-config', methods=['GET', 'POST']) @login_required def llm_config(): criteria_for_template = [] file_exists = os.path.exists(CRITERIA_FILE_PATH) if request.method == 'POST': # 从表单获取所有名为'criteria'的输入项,作为一个列表 criteria_list = request.form.getlist('criteria') # 过滤掉用户可能提交的空规则 criteria_list = [item.strip() for item in criteria_list if item.strip()] try: # 将规则列表格式化为美观的JSON并保存 pretty_content = json.dumps(criteria_list, indent=2, ensure_ascii=False) with open(CRITERIA_FILE_PATH, 'w', encoding='utf-8') as f: f.write(pretty_content) flash('LLM合规性标准已成功保存!', 'success') except Exception as e: flash(f'保存文件时发生未知错误: {e}', 'error') # 无论是GET还是POST请求后,都重新从文件中读取最新的规则列表用于显示 if file_exists: try: with open(CRITERIA_FILE_PATH, 'r', encoding='utf-8') as f: criteria_for_template = json.load(f) # 确保文件内容确实是一个列表 if not isinstance(criteria_for_template, list): flash('配置文件格式错误:内容应为JSON数组。已重置为空列表。', 'error') criteria_for_template = [] except Exception as e: flash(f'读取配置文件时出错: {e}', 'error') criteria_for_template = [] # 准备一个用于页面展示的示例API信息 example_api_info = { "path_template": "/api/dms/instance/v1/message/push/myschema/1.0", "method": "POST", "title": "数据推送接口", "description": "用于向系统推送标准格式的数据。", "schema_request_body": {"...": "... (此处为请求体Schema定义)"}, "instance_url": "http://example.com/api/dms/instance/v1/message/push/myschema/1.0", "instance_request_headers": {"X-Tenant-ID": "tenant-001", "...": "..."}, "instance_request_body": {"id": "123", "data": "example"}, "instance_response_status": 200, "instance_response_body": {"code": 0, "message": "success", "data": True} } return render_template('llm_config.html', criteria=criteria_for_template, file_exists=file_exists, example_api_info=json.dumps(example_api_info, indent=2, ensure_ascii=False)) # --- 文件下载路由 --- @app.route('/download//') @login_required def download_report(run_id, filename): """安全地提供指定运行记录中的报告文件下载。""" # 清理输入,防止目录遍历攻击 run_id_safe = Path(run_id).name filename_safe = Path(filename).name reports_dir = Path(app.config['REPORTS_DIR']).resolve() run_dir = (reports_dir / run_id_safe).resolve() # 安全检查:确保请求的目录是REPORTS_DIR的子目录 if not run_dir.is_dir() or run_dir.parent != reports_dir: abort(404, "找不到指定的测试记录或权限不足。") return send_from_directory(run_dir, filename_safe, as_attachment=True) # --- 新增:PDF文件预览路由 --- @app.route('/view_pdf/') @login_required def view_pdf_report(run_id): """安全地提供PDF报告文件以内联方式查看。""" run_id_safe = Path(run_id).name filename_safe = "report_cn.pdf" reports_dir = Path(app.config['REPORTS_DIR']).resolve() run_dir = (reports_dir / run_id_safe).resolve() # 安全检查 if not run_dir.is_dir() or run_dir.parent != reports_dir: abort(404, "找不到指定的测试记录或权限不足。") pdf_path = run_dir / filename_safe if not pdf_path.exists(): abort(404, "未找到PDF报告文件。") return send_from_directory(run_dir, filename_safe) # --- 历史记录视图 --- @app.route('/') @login_required def list_history(): history = [] reports_path = Path(app.config['REPORTS_DIR']) if not reports_path.is_dir(): flash('报告目录不存在。') return render_template('history.html', history=[]) # 获取所有子目录(即测试运行记录) run_dirs = [d for d in reports_path.iterdir() if d.is_dir()] # 按名称(时间戳)降序排序 run_dirs.sort(key=lambda x: x.name, reverse=True) for run_dir in run_dirs: summary_path = run_dir / 'summary.json' details_path = run_dir / 'api_call_details.md' run_info = {'id': run_dir.name, 'summary': None, 'has_details': details_path.exists()} if summary_path.exists(): try: with open(summary_path, 'r', encoding='utf-8') as f: summary_data = json.load(f) run_info['summary'] = summary_data except (json.JSONDecodeError, IOError) as e: logger.error(f"无法读取或解析摘要文件 {summary_path}: {e}") run_info['summary'] = {'error': '无法加载摘要'} history.append(run_info) return render_template('history.html', history=history) @app.route('/details/') @login_required def show_details(run_id): run_id = Path(run_id).name # Sanitize input run_dir = Path(app.config['REPORTS_DIR']) / run_id if not run_dir.is_dir(): return "找不到指定的测试记录。", 404 summary_path = run_dir / 'summary.json' details_path = run_dir / 'api_call_details.md' pdf_path = run_dir / 'report_cn.pdf' # 新增PDF路径 summary_content = "{}" details_content = "### 未找到API调用详情报告" has_pdf_report = pdf_path.exists() # 检查PDF是否存在 has_md_report = details_path.exists() # 检查MD报告是否存在 if summary_path.exists(): try: with open(summary_path, 'r', encoding='utf-8') as f: summary_data = json.load(f) summary_content = json.dumps(summary_data, indent=2, ensure_ascii=False) except Exception as e: summary_content = f"加载摘要文件出错: {e}" if has_md_report: try: with open(details_path, 'r', encoding='utf-8') as f: # 将Markdown转换为HTML details_content = markdown.markdown(f.read(), extensions=['fenced_code', 'tables', 'def_list', 'attr_list']) except Exception as e: details_content = f"加载详情文件出错: {e}" return render_template('history_detail.html', run_id=run_id, summary_content=summary_content, details_content=details_content, has_pdf_report=has_pdf_report, has_md_report=has_md_report) # --- 根路径重定向 --- @app.route('/index') def index_redirect(): return redirect(url_for('list_history')) if __name__ == '__main__': # 首次运行时确保数据库和用户存在 init_db() app.run(debug=True, host='0.0.0.0', port=5051)