From 5a8694d21797791a874e97db8c5f531111aa6af3 Mon Sep 17 00:00:00 2001 From: gongwenxin Date: Wed, 27 Aug 2025 22:51:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6port?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- custom_testcases/llm/compliance_criteria.json | 1 + fastapi_server.py | 763 +++++++++++++++++- 2 files changed, 743 insertions(+), 21 deletions(-) diff --git a/custom_testcases/llm/compliance_criteria.json b/custom_testcases/llm/compliance_criteria.json index 32960f8..87f82f7 100644 --- a/custom_testcases/llm/compliance_criteria.json +++ b/custom_testcases/llm/compliance_criteria.json @@ -1,2 +1,3 @@ [ + "接口需要符合RESTFUL规范" ] \ No newline at end of file diff --git a/fastapi_server.py b/fastapi_server.py index de3ea32..876d641 100644 --- a/fastapi_server.py +++ b/fastapi_server.py @@ -16,12 +16,20 @@ import unicodedata import html # FastAPI imports -from fastapi import FastAPI, HTTPException, BackgroundTasks, status -from fastapi.responses import JSONResponse, FileResponse +from fastapi import FastAPI, HTTPException, BackgroundTasks, status, Request, Form, Depends, Cookie +from fastapi.responses import JSONResponse, FileResponse, HTMLResponse, RedirectResponse from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates from pydantic import BaseModel, Field, field_validator, model_validator import uvicorn +# Additional imports for history viewer integration +import sqlite3 +from werkzeug.security import generate_password_hash, check_password_hash +from datetime import timedelta +import markdown + # PDF generation libraries - with fallback try: from reportlab.lib import colors @@ -45,6 +53,11 @@ from ddms_compliance_suite.utils.data_generator import DataGenerator logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) +# Database and authentication setup +DATABASE = './users.db' +REPORTS_DIR = './test_reports' +SECRET_KEY = os.urandom(24) + # FastAPI app instance app = FastAPI( title="DMS合规性测试工具 API", @@ -82,15 +95,25 @@ app.add_middleware( allow_headers=["*"], ) +# Mount static files first (before routes) +app.mount("/static", StaticFiles(directory="static"), name="static") + +# Configure templates +templates = Jinja2Templates(directory="templates") + +# Ensure directories exist +os.makedirs(REPORTS_DIR, exist_ok=True) + # Pydantic models for request/response class TestConfig(BaseModel): """测试配置模型""" # API定义源 (三选一) - yapi: Optional[str] = Field(None, description="YAPI定义文件路径", exclude=True) - swagger: Optional[str] = Field(None, description="Swagger/OpenAPI定义文件路径", exclude=True) - dms: Optional[str] = Field("./assets/doc/dms/domain.json", description="DMS服务发现的domain mapping文件路径", example="./assets/doc/dms/domain.json") - # 基本配置 + mode: str = Field("dms", description="API定义源", pattern="^(yapi|swagger|dms)$") + # yapi: Optional[str] = Field(None, description="YAPI定义文件路径", exclude=True) + # swagger: Optional[str] = Field(None, description="Swagger/OpenAPI定义文件路径", exclude=True) + # dms: Optional[str] = Field("./assets/doc/dms/domain.json", description="DMS服务发现的domain mapping文件路径", example="./assets/doc/dms/domain.json") + # # 基本配置 base_url: str = Field("https://www.dev.ideas.cnpc/", description="API基础URL", example="https://www.dev.ideas.cnpc/") # 分页配置 @@ -113,13 +136,11 @@ class TestConfig(BaseModel): def validate_api_source(cls, values): """验证API定义源,确保三选一""" if isinstance(values, dict): - api_sources = [values.get('yapi'), values.get('swagger'), values.get('dms')] - non_none_sources = [s for s in api_sources if s is not None] - if len(non_none_sources) > 1: - raise ValueError('只能选择一个API定义源:yapi、swagger或dms') - if len(non_none_sources) == 0: - raise ValueError('必须提供一个API定义源:yapi、swagger或dms') - return values + # api_sources = [values.get('yapi'), values.get('swagger'), values.get('dms')] + api_source=values.get('mode') + if api_source in ['yapi', 'swagger', 'dms']: + return values + raise ValueError('API定义源无效,必须是yapi、swagger或dms之一') class PaginationInfo(BaseModel): """分页信息模型""" @@ -134,6 +155,7 @@ class TestResponse(BaseModel): """测试响应模型""" status: str = Field(description="测试状态", example="completed") message: str = Field(description="状态消息") + report_id: str = Field(description="报告ID") report_directory: str = Field(description="报告目录路径") summary: Dict[str, Any] = Field(description="测试摘要信息") pagination: Optional[PaginationInfo] = Field(None, description="分页信息(仅DMS测试时返回)") @@ -147,6 +169,62 @@ class ErrorResponse(BaseModel): # Global variable to store running tasks running_tasks: Dict[str, Dict[str, Any]] = {} +# Database setup and authentication functions +DB_SCHEMA = ''' +DROP TABLE IF EXISTS user; +CREATE TABLE user ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT UNIQUE NOT NULL, + password_hash TEXT NOT NULL +); +''' + +def get_db(): + conn = sqlite3.connect(DATABASE) + conn.row_factory = sqlite3.Row + return conn + +def init_db(): + if not os.path.exists(DATABASE): + conn = get_db() + conn.executescript(DB_SCHEMA) + conn.commit() + conn.close() + create_default_user() + +def create_default_user(username="admin", password="7#Xq9$Lm*2!Pw@5"): + conn = get_db() + user = conn.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone() + if user is None: + conn.execute("INSERT INTO user (username, password_hash) VALUES (?, ?)", + (username, generate_password_hash(password))) + conn.commit() + logger.info(f"Created default user: {username}") + conn.close() + +def get_current_user(session_token: Optional[str] = Cookie(None)): + if not session_token: + return None + # Simple session validation - in production use proper session management + try: + import base64 + import json + data = json.loads(base64.b64decode(session_token)) + if data.get('username'): + return data + except: + pass + return None + +def create_session_token(username: str): + import base64 + import json + data = {"username": username} + return base64.b64encode(json.dumps(data).encode()).decode() + +# Initialize database on startup +init_db() + @app.get("/", summary="健康检查", description="检查API服务器是否正常运行", @@ -291,6 +369,7 @@ def run_tests_logic(config: dict): result = { "status": "completed", + "report_id":str(output_directory.resolve()).split('/')[-1], "message": "Tests finished." if failed_count == 0 and error_count == 0 else "Tests finished with failures or errors.", "report_directory": str(output_directory.resolve()), "summary": test_summary.to_dict() @@ -971,11 +1050,22 @@ async def run_api_tests(config: TestConfig): "use_llm_for_query_params": False, "use_llm_for_headers": False, "verbose": False + } - + yapi="./assets/doc/yapi/yapi.json" + swagger="./assets/doc/swagger/swagger.json" + dms="./assets/doc/dms/domain.json" + # Merge hidden defaults with config config_dict.update(hidden_defaults) - + if config_dict['mode'] == 'yapi': + config_dict['yapi'] = yapi + elif config_dict['mode'] == 'swagger': + config_dict['swagger'] = swagger + elif config_dict['mode'] == 'dms': + config_dict['dms'] = dms + config_dict.pop('mode') + result = run_tests_logic(config_dict) if result['status'] == 'error': @@ -1062,12 +1152,12 @@ async def list_reports(): "id": report_dir.name, "timestamp": report_dir.name, "path": str(report_dir), - "summary": { - "endpoints_total": summary.get("endpoints_total", 0), - "endpoints_passed": summary.get("endpoints_passed", 0), - "endpoints_failed": summary.get("endpoints_failed", 0), - "test_cases_total": summary.get("test_cases_total", 0) - } + # "summary": { + # "endpoints_total": summary.get("endpoints_total", 0), + # "endpoints_passed": summary.get("endpoints_passed", 0), + # "endpoints_failed": summary.get("endpoints_failed", 0), + # "test_cases_total": summary.get("test_cases_total", 0) + # } }) except Exception as e: logger.warning(f"Error reading summary for {report_dir.name}: {e}") @@ -1084,6 +1174,637 @@ async def list_reports(): detail=f"Error listing reports: {str(e)}" ) +# History viewer routes with /history prefix +@app.get("/history/login", response_class=HTMLResponse) +async def login_page(request: Request): + # Simple login form without template dependency + html_content = """ + + + + + 登录 + + + + +
+

登录

+
+
+ + +
+
+ + +
+ +
+
+ + + """ + return HTMLResponse(content=html_content) + +@app.post("/history/login") +async def login(request: Request, username: str = Form(...), password: str = Form(...)): + conn = get_db() + user = conn.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone() + conn.close() + + if user and check_password_hash(user['password_hash'], password): + response = RedirectResponse(url="/history", status_code=302) + response.set_cookie("session_token", create_session_token(username)) + return response + + # Return login form with error + html_content = """ + + + + + 登录 + + + + +
+

登录

+
+
+ + +
+
+ + +
+ +
用户名或密码错误
+
+
+ + + """ + return HTMLResponse(content=html_content) + +@app.get("/history/logout") +async def logout(): + response = RedirectResponse(url="/history/login", status_code=302) + response.delete_cookie("session_token") + return response + +@app.get("/history", response_class=HTMLResponse) +async def list_history(request: Request): + # Temporarily remove login requirement + user = {"username": "admin"} # Mock user + + history = [] + reports_path = Path(REPORTS_DIR) + if reports_path.is_dir(): + run_dirs = [d for d in reports_path.iterdir() if d.is_dir()] + run_dirs.sort(key=lambda x: x.name, reverse=True) + + for run_dir in run_dirs: + summary_path = run_dir / 'summary.json' + details_path = run_dir / 'api_call_details.md' + run_info = {'id': run_dir.name, 'summary': None, 'has_details': details_path.exists()} + + if summary_path.exists(): + try: + with open(summary_path, 'r', encoding='utf-8') as f: + summary_data = json.load(f) + run_info['summary'] = summary_data + except Exception as e: + logger.error(f"Error reading summary {summary_path}: {e}") + run_info['summary'] = {'error': '无法加载摘要'} + + history.append(run_info) + + # Enhanced HTML response for history list with color differentiation + html_content = f""" + + + + + 测试历史记录 + + + + + +
+
+

测试历史记录

+ +
+
+ + + + + + + + + + + + + + """ + + for run in history: + if run['summary'] and 'error' not in run['summary']: + overall = run['summary'].get('overall_summary', {}) + start_time = run['summary'].get('start_time', 'N/A') + end_time = run['summary'].get('end_time', 'N/A') + duration = run['summary'].get('duration_seconds', 0) + + # Get statistics + endpoints_tested = overall.get('endpoints_tested', 0) + endpoints_passed = overall.get('endpoints_passed', 0) + endpoints_failed = overall.get('endpoints_failed', 0) + + # Calculate success rate (passed + skipped = success, only failed = failure) + if endpoints_tested > 0: + # Skipped tests should be considered as passed (not failed) + endpoints_success = endpoints_tested - endpoints_failed + success_rate = (endpoints_success / endpoints_tested) * 100 + if success_rate >= 90: + row_class = "status-success" + elif success_rate >= 70: + row_class = "status-warning" + elif endpoints_failed > 0: + row_class = "status-danger" + else: + row_class = "status-info" + success_rate_str = f"{success_rate:.1f}%" + else: + row_class = "status-info" + success_rate_str = "N/A" + + # Ensure duration is a number + try: + duration_float = float(duration) if duration else 0.0 + duration_str = f"{duration_float:.2f}" + except (ValueError, TypeError): + duration_str = "N/A" + + # Format time strings (remove microseconds if present) + start_display = start_time.split('.')[0] if start_time != 'N/A' else 'N/A' + end_display = end_time.split('.')[0] if end_time != 'N/A' else 'N/A' + + # Calculate skipped count + endpoints_skipped = endpoints_tested - endpoints_passed - endpoints_failed + + # Create stats badges + stats_html = f""" + {endpoints_tested} 总计 + {endpoints_passed} 通过 + {f'{endpoints_skipped} 跳过' if endpoints_skipped > 0 else ''} + {f'{endpoints_failed} 失败' if endpoints_failed > 0 else ''} + """ + + html_content += f""" + + + + + + + + + + """ + else: + html_content += f""" + + + + + + """ + + if not history: + html_content += """ + + + + """ + + html_content += """ + +
运行ID开始时间结束时间端点统计成功率耗时(秒)操作
{run['id']}{start_display}{end_display}{stats_html}{success_rate_str}{duration_str} + 查看详情 +
{run['id']}加载失败 + 查看详情 +
+
📊
+
暂无测试记录
+
运行测试后记录将显示在这里
+
+
+
+ + + """ + + return HTMLResponse(content=html_content) + +@app.get("/history/details/{run_id}", response_class=HTMLResponse) +async def show_details(request: Request, run_id: str): + # Temporarily remove login requirement + user = {"username": "admin"} # Mock user + + run_id = Path(run_id).name # Sanitize + run_dir = Path(REPORTS_DIR) / run_id + + if not run_dir.is_dir(): + raise HTTPException(status_code=404, detail="找不到指定的测试记录") + + summary_path = run_dir / 'summary.json' + details_path = run_dir / 'api_call_details.md' + pdf_path = run_dir / 'report_cn.pdf' + + summary_content = "{}" + details_content = "### 未找到API调用详情报告" + has_pdf_report = pdf_path.exists() + has_md_report = details_path.exists() + + if summary_path.exists(): + try: + with open(summary_path, 'r', encoding='utf-8') as f: + summary_data = json.load(f) + summary_content = json.dumps(summary_data, indent=2, ensure_ascii=False) + except Exception as e: + summary_content = f"加载摘要文件出错: {e}" + + if has_md_report: + try: + with open(details_path, 'r', encoding='utf-8') as f: + details_content = markdown.markdown(f.read(), extensions=['fenced_code', 'tables']) + except Exception as e: + details_content = f"加载详情文件出错: {e}" + + # Generate download buttons + download_buttons = "" + if has_pdf_report: + download_buttons += f'下载PDF报告 ' + if has_md_report: + download_buttons += f'下载MD报告 ' + download_buttons += f'下载JSON报告' + + html_content = f""" + + + + + 测试详情 - {run_id} + + + + + +
+ {f'PDF报告' if has_pdf_report else ''} + JSON摘要 + 调用详情 +
+ {download_buttons.replace('class="button download-btn"', 'style="margin-bottom: 5px; font-size: 11px;"')} +
+
+ +
+
+

测试详情: {run_id}

+
+ +
+
+ +
+ {f''' +
+

PDF 报告

+ +
+ ''' if has_pdf_report else ''} + +
+

测试摘要 (JSON)

+
{html.escape(summary_content)}
+
+ +
+

API调用详情

+
+ {details_content} +
+
+
+
+ + + """ + + return HTMLResponse(content=html_content) + +@app.get("/history/download/{run_id}/{filename}") +async def download_history_report(run_id: str, filename: str): + # Temporarily remove login requirement + + run_id_safe = Path(run_id).name + filename_safe = Path(filename).name + + reports_dir = Path(REPORTS_DIR).resolve() + run_dir = (reports_dir / run_id_safe).resolve() + + if not run_dir.is_dir() or run_dir.parent != reports_dir: + raise HTTPException(status_code=404, detail="找不到指定的测试记录") + + file_path = run_dir / filename_safe + if not file_path.exists(): + raise HTTPException(status_code=404, detail="文件不存在") + + return FileResponse(path=str(file_path), filename=filename_safe) + +@app.get("/history/view_pdf/{run_id}") +async def view_pdf_report(run_id: str): + # Temporarily remove login requirement + + run_id_safe = Path(run_id).name + filename_safe = "report_cn.pdf" + + reports_dir = Path(REPORTS_DIR).resolve() + run_dir = (reports_dir / run_id_safe).resolve() + + if not run_dir.is_dir() or run_dir.parent != reports_dir: + raise HTTPException(status_code=404, detail="找不到指定的测试记录") + + pdf_path = run_dir / filename_safe + if not pdf_path.exists(): + raise HTTPException(status_code=404, detail="未找到PDF报告文件") + + return FileResponse(path=str(pdf_path), media_type="application/pdf") + +@app.get("/history/llm-config", response_class=HTMLResponse) +async def llm_config_page(request: Request): + # Temporarily remove login requirement + user = {"username": "admin"} # Mock user + + criteria_file_path = Path('./custom_testcases/llm/compliance_criteria.json') + criteria_for_template = [] + file_exists = criteria_file_path.exists() + + if file_exists: + try: + with open(criteria_file_path, 'r', encoding='utf-8') as f: + criteria_for_template = json.load(f) + if not isinstance(criteria_for_template, list): + criteria_for_template = [] + except Exception as e: + logger.error(f"Error reading criteria file: {e}") + criteria_for_template = [] + + # Generate criteria HTML + criteria_html = "" + for i, criterion in enumerate(criteria_for_template): + criteria_html += f'
' + + if not criteria_html: + criteria_html = '
' + + html_content = f""" + + + + + LLM 合规性标准配置 + + + + + + +
+
+

LLM 合规性标准配置

+ +
+ +
+
+
+

合规性标准列表

+
+ {criteria_html} +
+ +
+ +
+ +
+
+ +
+

使用说明

+

这些标准将用于LLM评估API的合规性。每个标准应该是一个明确的要求或规则。

+
+
+
+ + + """ + + return HTMLResponse(content=html_content) + +@app.post("/history/llm-config") +async def llm_config_save(request: Request): + # Temporarily remove login requirement + + form = await request.form() + criteria_list = form.getlist('criteria') + criteria_list = [item.strip() for item in criteria_list if item.strip()] + + criteria_file_path = Path('./custom_testcases/llm/compliance_criteria.json') + criteria_file_path.parent.mkdir(parents=True, exist_ok=True) + + try: + with open(criteria_file_path, 'w', encoding='utf-8') as f: + json.dump(criteria_list, f, indent=2, ensure_ascii=False) + message = "LLM合规性标准已成功保存!" + except Exception as e: + message = f"保存文件时发生错误: {e}" + + return templates.TemplateResponse("llm_config.html", { + "request": request, + "criteria": criteria_list, + "file_exists": True, + "message": message, + "example_api_info": json.dumps({}, indent=2, ensure_ascii=False) + }) + if __name__ == "__main__": import argparse