compliance/fastapi_server.py
2025-09-29 13:09:08 +08:00

1935 lines
82 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
DMS合规性测试工具 - FastAPI版本API服务器
提供自动生成的交互式API文档
"""
import os
import sys
import json
import logging
import datetime
import traceback
from pathlib import Path
from typing import List, Optional, Dict, Any, Union, Literal
import unicodedata
import html
# FastAPI imports
from fastapi import FastAPI, HTTPException, BackgroundTasks, status, Request, Form, Depends, Cookie
from fastapi.responses import JSONResponse, FileResponse, HTMLResponse, RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.openapi.docs import (
get_redoc_html,
get_swagger_ui_html,
get_swagger_ui_oauth2_redirect_html,
)
from pydantic import BaseModel, Field, field_validator, model_validator
import uvicorn
# Additional imports for history viewer integration
import sqlite3
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import timedelta
import markdown
# PDF generation libraries - with fallback
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
reportlab_available = True
except ImportError:
reportlab_available = False
# Project-specific imports
from ddms_compliance_suite.api_caller.caller import APICallDetail
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary
from ddms_compliance_suite.input_parser.parser import ParsedAPISpec
from ddms_compliance_suite.utils.response_utils import extract_data_for_validation
from ddms_compliance_suite.utils.data_generator import DataGenerator
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Database and authentication setup
DATABASE = './users.db'
REPORTS_DIR = './test_reports'
SECRET_KEY = os.urandom(24)
# FastAPI app instance
app = FastAPI(
title="DMS合规性测试工具 API",
description="""
DMS合规性测试工具 FastAPI版本
这是一个用于API合规性测试的工具支持
YAPI规范测试 - 基于YAPI定义文件的测试
Swagger/OpenAPI测试 - 基于OpenAPI规范的测试
DMS服务发现测试 - 动态发现DMS服务的API进行测试
分页支持 - 支持大量API的分页获取避免内存溢出
PDF报告生成 - 生成详细的测试报告
LLM集成 - 支持大语言模型辅助生成测试数据
主要特性
🚀 高性能: 基于FastAPI支持异步处理
📊 分页支持: 解决大量API节点的内存问题
📝 自动文档: 自动生成交互式API文档
🔧 灵活配置: 支持多种测试配置选项
📈 详细报告: 生成PDF和JSON格式的测试报告
""",
version="1.0.0",
docs_url=None, # 禁用默认的Swagger UI
redoc_url=None, # 禁用默认的ReDoc
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 在生产环境中应该限制具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files first (before routes)
app.mount("/static", StaticFiles(directory="static"), name="static")
# Configure templates
templates = Jinja2Templates(directory="templates")
# Ensure directories exist
os.makedirs(REPORTS_DIR, exist_ok=True)
# Pydantic models for request/response
class TestConfig(BaseModel):
"""测试配置模型"""
# API定义源 (三选一)
mode: str = Field("dms", description="API定义源", pattern="^(yapi|swagger|dms)$")
# yapi: Optional[str] = Field(None, description="YAPI定义文件路径", exclude=True)
# swagger: Optional[str] = Field(None, description="Swagger/OpenAPI定义文件路径", exclude=True)
# dms: Optional[str] = Field("./assets/doc/dms/domain.json", description="DMS服务发现的domain mapping文件路径", example="./assets/doc/dms/domain.json")
# # 基本配置
base_url: str = Field("https://www.dev.ideas.cnpc/", description="API基础URL", example="https://www.dev.ideas.cnpc/")
# 分页配置
page_size: int = Field(10, description="DMS API分页大小默认10。较小的值可以减少内存使用", ge=1, le=10000)
page_no: int = Field(1, description="起始页码从1开始。可用于断点续传或跳过前面的页面", ge=1)
fetch_all_pages: bool = Field(False, description="是否获取所有页面。True=获取所有数据False=只获取指定页面")
# 过滤选项
strictness_level: str = Field("CRITICAL", description="测试严格等级", pattern="^(CRITICAL|HIGH|MEDIUM|LOW)$")
ignore_ssl: bool = Field(True, description="是否忽略SSL证书错误", examples=[True, False])
@field_validator('base_url')
@classmethod
def validate_base_url(cls, v):
if not v.startswith(('http://', 'https://')):
raise ValueError('base_url must start with http:// or https://')
return v
@model_validator(mode='before')
@classmethod
def validate_api_source(cls, values):
"""验证API定义源确保三选一"""
if isinstance(values, dict):
# api_sources = [values.get('yapi'), values.get('swagger'), values.get('dms')]
api_source=values.get('mode')
if api_source in ['yapi', 'swagger', 'dms']:
return values
raise ValueError('API定义源无效必须是yapi、swagger或dms之一')
class PaginationInfo(BaseModel):
"""分页信息模型"""
page_size: int = Field(description="页面大小")
page_no_start: int = Field(description="起始页码")
total_pages: int = Field(description="总页数")
total_records: int = Field(description="总记录数")
pages_fetched: int = Field(description="已获取页数")
current_page: int = Field(description="当前页码")
class TestResponse(BaseModel):
"""测试响应模型"""
status: str = Field(description="测试状态", example="completed")
message: str = Field(description="状态消息")
report_id: str = Field(description="报告ID")
report_directory: str = Field(description="报告目录路径")
summary: Dict[str, Any] = Field(description="测试摘要信息")
pagination: Optional[PaginationInfo] = Field(None, description="分页信息仅DMS测试时返回")
class ErrorResponse(BaseModel):
"""错误响应模型"""
status: str = Field("error", description="错误状态")
message: str = Field(description="错误消息")
traceback: Optional[str] = Field(None, description="错误堆栈跟踪")
class TestStartResponse(BaseModel):
"""测试异步启动响应模型"""
status: Literal["started"] = Field(description="任务当前状态")
message: str = Field(description="状态消息")
report_id: str = Field(description="报告ID")
status_url: str = Field(description="查询任务状态的URL")
report_url: str = Field(description="获取测试报告的URL")
report_directory: Optional[str] = Field(None, description="报告在服务器上的存储路径")
# Global variable to store running tasks
running_tasks: Dict[str, Dict[str, Any]] = {}
# Database setup and authentication functions
DB_SCHEMA = '''
DROP TABLE IF EXISTS user;
CREATE TABLE user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
);
'''
def get_db():
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
return conn
def init_db():
if not os.path.exists(DATABASE):
conn = get_db()
conn.executescript(DB_SCHEMA)
conn.commit()
conn.close()
create_default_user()
def create_default_user(username="admin", password="7#Xq9$Lm*2!Pw@5"):
conn = get_db()
user = conn.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
if user is None:
conn.execute("INSERT INTO user (username, password_hash) VALUES (?, ?)",
(username, generate_password_hash(password)))
conn.commit()
logger.info(f"Created default user: {username}")
conn.close()
def get_current_user(session_token: Optional[str] = Cookie(None)):
if not session_token:
return None
# Simple session validation - in production use proper session management
try:
import base64
import json
data = json.loads(base64.b64decode(session_token))
if data.get('username'):
return data
except:
pass
return None
def create_session_token(username: str):
import base64
import json
data = {"username": username}
return base64.b64encode(json.dumps(data).encode()).decode()
# Initialize database on startup
init_db()
# Custom documentation routes using local static files
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html():
"""自定义Swagger UI使用本地静态文件"""
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title=app.title + " - Swagger UI",
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
swagger_js_url="/static/swagger-ui-bundle.js",
swagger_css_url="/static/swagger-ui.css",
)
@app.get("/redoc", include_in_schema=False)
async def redoc_html():
"""自定义ReDoc使用本地静态文件"""
return get_redoc_html(
openapi_url=app.openapi_url,
title=app.title + " - ReDoc",
redoc_js_url="/static/redoc.standalone.js",
)
@app.get(app.swagger_ui_oauth2_redirect_url, include_in_schema=False)
async def swagger_ui_redirect():
"""OAuth2重定向助手"""
return get_swagger_ui_oauth2_redirect_html()
@app.get("/",
summary="健康检查",
description="检查API服务器是否正常运行",
response_model=Dict[str, str])
async def health_check():
"""健康检查端点用于Docker健康检查"""
return {
"status": "healthy",
"service": "DMS Compliance API Server (FastAPI)",
"version": "2.0.0",
"docs_url": "/docs",
"redoc_url": "/redoc"
}
@app.get("/info",
summary="服务信息",
description="获取API服务器的详细信息",
response_model=Dict[str, Any])
async def get_info():
"""获取服务器信息"""
return {
"service": "DMS Compliance API Server",
"version": "2.0.0",
"framework": "FastAPI",
"features": [
"YAPI规范测试",
"Swagger/OpenAPI测试",
"DMS服务发现测试",
"分页支持",
"PDF报告生成",
"LLM集成",
"自动API文档"
],
"endpoints": {
"health": "/",
"info": "/info",
"run_tests": "/run",
"docs": "/docs",
"redoc": "/redoc"
},
"reportlab_available": reportlab_available
}
# Import the test logic from the original Flask version
def run_tests_logic(config: dict):
"""
Main logic for running tests, adapted from the original Flask version.
"""
try:
if config.get('verbose'):
logging.getLogger('ddms_compliance_suite').setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.debug("Verbose logging enabled.")
if not any(k in config for k in ['yapi', 'swagger', 'dms']):
raise ValueError("An API definition source is required: --yapi, --swagger, or --dms")
if sum(k in config for k in ['yapi', 'swagger', 'dms']) > 1:
raise ValueError("API definition sources are mutually exclusive.")
# Setup output directory, reuse provided report_id when available
base_output_dir = Path(config.get('output', './test_reports'))
report_id = config.get('report_id')
if report_id:
output_directory = base_output_dir / report_id
else:
report_id = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_directory = base_output_dir / report_id
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Test reports will be saved to: {output_directory.resolve()}")
print(f"config{config}")
# Initialize the orchestrator
orchestrator = APITestOrchestrator(
base_url=config['base_url'],
custom_test_cases_dir=config.get('custom_test_cases_dir'),
llm_api_key=config.get('llm_api_key'),
llm_base_url=config.get('llm_base_url'),
llm_model_name=config.get('llm_model_name'),
use_llm_for_request_body=config.get('use_llm_for_request_body', False),
use_llm_for_path_params=config.get('use_llm_for_path_params', False),
use_llm_for_query_params=config.get('use_llm_for_query_params', False),
use_llm_for_headers=config.get('use_llm_for_headers', False),
output_dir=str(output_directory),
stages_dir=config.get('stages_dir'),
strictness_level=config.get('strictness_level', 'CRITICAL'),
ignore_ssl=config.get('ignore_ssl', False)
)
test_summary: Optional[TestSummary] = None
parsed_spec: Optional[ParsedAPISpec] = None
pagination_info: Dict[str, Any] = {}
if 'yapi' in config:
logger.info(f"Running tests from YAPI file: {config['yapi']}")
test_summary, parsed_spec = orchestrator.run_tests_from_yapi(
yapi_file_path=config['yapi'],
categories=config.get('categories'),
custom_test_cases_dir=config.get('custom_test_cases_dir')
)
elif 'swagger' in config:
logger.info(f"Running tests from Swagger file: {config['swagger']}")
test_summary, parsed_spec = orchestrator.run_tests_from_swagger(
swagger_file_path=config['swagger'],
tags=config.get('tags'),
custom_test_cases_dir=config.get('custom_test_cases_dir')
)
elif 'dms' in config:
logger.info(f"Running tests from DMS service discovery: {config['dms']}")
test_summary, parsed_spec, pagination_info = orchestrator.run_tests_from_dms(
domain_mapping_path=config['dms'],
categories=config.get('categories'),
custom_test_cases_dir=config.get('custom_test_cases_dir'),
page_size=config.get('page_size', 1000),
page_no_start=config.get('page_no', 1),
fetch_all_pages=config.get('fetch_all_pages', True)
)
if not parsed_spec:
raise RuntimeError("Failed to parse the API specification.")
if test_summary and config.get('stages_dir') and parsed_spec:
logger.info(f"Executing API test stages from directory: {config['stages_dir']}")
orchestrator.run_stages_from_spec(parsed_spec, test_summary)
if test_summary:
# Save main summary
main_report_file_path = output_directory / "summary.json"
with open(main_report_file_path, 'w', encoding='utf-8') as f:
f.write(test_summary.to_json(pretty=True))
# Save API call details
api_calls_filename = "api_call_details.md"
save_api_call_details_to_file(
orchestrator.get_api_call_details(),
str(output_directory),
filename=api_calls_filename
)
# Generate PDF report if reportlab is available
if reportlab_available and config.get('generate_pdf', True):
pdf_report_path = output_directory / "report_cn.pdf"
save_pdf_report(test_summary.to_dict(), pdf_report_path, config.get('strictness_level', 'CRITICAL'))
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
result_status = "completed" if failed_count == 0 and error_count == 0 else "completed_with_issues"
result = {
"status": result_status,
"report_id": report_id,
"message": "Tests finished." if result_status == "completed" else "Tests finished with failures or errors.",
"report_directory": str(output_directory.resolve()),
"summary": test_summary.to_dict()
}
# 如果有分页信息,添加到返回结果中
if pagination_info:
result["pagination"] = pagination_info
return result
else:
raise RuntimeError("Test execution failed to produce a summary.")
except Exception as e:
logger.error(f"An unexpected error occurred during test execution: {e}", exc_info=True)
error_report_id = config.get('report_id')
error_output_dir = None
if error_report_id:
error_output_dir = str((Path(config.get('output', './test_reports')) / error_report_id).resolve())
return {
"status": "error",
"message": str(e),
"traceback": traceback.format_exc(),
"report_id": error_report_id,
"report_directory": error_output_dir
}
def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
"""
将API调用详情列表保存到指定目录下的 Markdown 文件中。
同时,额外生成一个纯文本文件 (.txt),每行包含一个 cURL 命令。
"""
if not api_call_details:
logger.info("没有API调用详情可供保存。")
return
output_dir = Path(output_dir_path)
try:
output_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"创建API调用详情输出目录 {output_dir} 失败: {e}")
return
# 主文件是 Markdown 文件
md_output_file = output_dir / filename
# 确保它是 .md尽管 main 函数应该已经处理了
if md_output_file.suffix.lower() not in ['.md', '.markdown']:
md_output_file = md_output_file.with_suffix('.md')
markdown_content = []
for detail in api_call_details:
# Request URL with params (if any)
url_to_display = detail.request_url
if detail.request_params:
try:
# Ensure urllib is available for this formatting step
import urllib.parse
query_string = urllib.parse.urlencode(detail.request_params)
url_to_display = f"{detail.request_url}?{query_string}"
except Exception as e:
logger.warning(f"Error formatting URL with params for display: {e}")
# Fallback to just the base URL if params formatting fails
markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
markdown_content.append("**cURL Command:**")
markdown_content.append("```sh")
markdown_content.append(detail.curl_command)
markdown_content.append("```")
markdown_content.append("### Request Details")
markdown_content.append(f"- **Method:** `{detail.request_method}`")
markdown_content.append(f"- **Full URL:** `{url_to_display}`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_params:
markdown_content.append("- **Query Parameters:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_body is not None:
markdown_content.append("- **Body:**")
body_lang = "text"
formatted_body = str(detail.request_body)
try:
# Try to parse as JSON for pretty printing
if isinstance(detail.request_body, str):
try:
parsed_json = json.loads(detail.request_body)
formatted_body = json.dumps(parsed_json, indent=2, ensure_ascii=False)
body_lang = "json"
except json.JSONDecodeError:
pass # Keep as text
elif isinstance(detail.request_body, (dict, list)):
formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False)
body_lang = "json"
except Exception as e:
logger.warning(f"Error formatting request body for Markdown: {e}")
markdown_content.append(f"```{body_lang}")
markdown_content.append(formatted_body)
markdown_content.append("```")
markdown_content.append("### Response Details")
markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.response_body is not None:
markdown_content.append("- **Body:**")
resp_body_lang = "text"
formatted_resp_body = str(detail.response_body)
try:
# Try to parse as JSON for pretty printing
if isinstance(detail.response_body, str):
try:
# If it's already a string that might be JSON, try parsing and re-dumping
parsed_json_resp = json.loads(detail.response_body)
formatted_resp_body = json.dumps(parsed_json_resp, indent=2, ensure_ascii=False)
resp_body_lang = "json"
except json.JSONDecodeError:
# It's a string, but not valid JSON, keep as text
pass
elif isinstance(detail.response_body, (dict, list)):
# It's already a dict/list, dump it as JSON
formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False)
resp_body_lang = "json"
# If it's neither string nor dict/list (e.g. int, bool from parsed json), str() is fine.
except Exception as e:
logger.warning(f"Error formatting response body for Markdown: {e}")
markdown_content.append(f"```{resp_body_lang}")
markdown_content.append(formatted_resp_body)
markdown_content.append("```")
markdown_content.append("") # Add a blank line for spacing before next --- or EOF
markdown_content.append("---") # Separator
try:
with open(md_output_file, 'w', encoding='utf-8') as f_md:
f_md.write("\n".join(markdown_content))
logger.info(f"API调用详情已保存为 Markdown: {md_output_file}")
except Exception as e:
logger.error(f"保存API调用详情到 Markdown 文件 {md_output_file} 失败: {e}", exc_info=True)
def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'):
"""将测试摘要保存为格式化的PDF文件"""
logger.info(f"开始生成PDF报告: {output_path}")
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
# --- 统一的字体管理和注册 ---
font_name = 'SimSun' # 使用一个简单清晰的注册名
font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
if not Path(font_path).exists():
logger.error(f"字体文件未找到: {Path(font_path).resolve()}")
return
# 关键修复: 对于 .ttc (TrueType Collection) 文件, 必须指定 subfontIndex
pdfmetrics.registerFont(TTFont(font_name, font_path, subfontIndex=0))
# 将注册的字体关联到 'SimSun' 字体族
pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
elements = []
# --- 统一样式定义, 全部使用注册的字体名 ---
styles = getSampleStyleSheet()
title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12)
def to_para(text, style=normal_style, escape=True):
"""
根据用户建议移除 textwrap 以进行诊断。
此版本只包含净化和基本的换行符替换。
"""
if text is None:
content = ""
else:
content = str(text)
if escape:
content = html.escape(content)
# 依然保留Unicode控制字符的净化
content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
if not content.strip():
# 对于完全空白或None的输入返回一个安全的非换行空格
return Paragraph(' ', style)
# 只使用基本的换行符替换
content = content.replace('\n', '<br/>')
return Paragraph(content, style)
# 3. 填充PDF内容 - 优化后的报告格式
# 生成报告编码(基于时间戳)
import time
report_code = f"DMS-TEST-{int(time.time())}"
# 报告标题
elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False))
elements.append(Spacer(1, 15))
# 报告基本信息表格
basic_info_data = [
[to_para("<b>报告编码</b>", escape=False), to_para(report_code)],
[to_para("<b>报告名称</b>", escape=False), to_para("DMS领域数据服务测试分析报告")],
[to_para("<b>申请日期</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d'))],
[to_para("<b>申请人</b>", escape=False), to_para("系统管理员")],
[to_para("<b>服务供应商名称</b>", escape=False), to_para("数据管理系统(DMS)")],
]
basic_info_table = Table(basic_info_data, colWidths=[120, '*'])
basic_info_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
]))
elements.append(basic_info_table)
elements.append(Spacer(1, 20))
# 摘要部分
elements.append(to_para("摘要", heading_style, escape=False))
overall = summary_data.get('overall_summary', {})
# 从JSON提取并格式化时间
try:
start_time_str = summary_data.get('start_time', 'N/A')
end_time_str = summary_data.get('end_time', 'N/A')
duration = summary_data.get('duration_seconds', summary_data.get('duration', 0.0))
start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
except:
start_time_formatted = start_time_str
end_time_formatted = end_time_str
# 摘要内容 - 安全计算跳过的数量
def safe_subtract(total, passed, failed):
"""安全地计算跳过的数量"""
try:
if isinstance(total, (int, float)) and isinstance(passed, (int, float)) and isinstance(failed, (int, float)):
return max(0, total - passed - failed)
else:
return 0
except:
return 0
endpoints_tested = overall.get('endpoints_tested', 0)
endpoints_passed = overall.get('endpoints_passed', 0)
endpoints_failed = overall.get('endpoints_failed', 0)
endpoints_skipped = safe_subtract(endpoints_tested, endpoints_passed, endpoints_failed)
test_cases_executed = overall.get('total_test_cases_executed', 0)
test_cases_passed = overall.get('test_cases_passed', 0)
test_cases_failed = overall.get('test_cases_failed', 0)
test_cases_skipped = safe_subtract(test_cases_executed, test_cases_passed, test_cases_failed)
stages_executed = overall.get('total_stages_executed', 0)
stages_passed = overall.get('stages_passed', 0)
stages_failed = overall.get('stages_failed', 0)
stages_skipped = safe_subtract(stages_executed, stages_passed, stages_failed)
summary_text = f"""本次测试针对DMS数据管理系统领域数据服务进行全面的合规性验证。
测试时间:{start_time_formatted}{end_time_formatted},总耗时 {float(duration):.2f} 秒。
共测试 {endpoints_tested} 个API端点其中 {endpoints_passed} 个通过,{endpoints_failed} 个失败,{endpoints_skipped} 个跳过,端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}
执行 {test_cases_executed} 个测试用例,其中 {test_cases_passed} 个通过,{test_cases_failed} 个失败,{test_cases_skipped} 个跳过,测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}
执行 {stages_executed} 个流程测试,其中 {stages_passed} 个通过,{stages_failed} 个失败,{stages_skipped} 个跳过,流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}"""
elements.append(to_para(summary_text, normal_style))
elements.append(Spacer(1, 20))
# 测试内容包括 - API列表表格
elements.append(to_para("测试内容包括", heading_style, escape=False))
# 从测试结果中提取API信息
endpoint_results = summary_data.get('endpoint_results', [])
api_list_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>服务名称</b>", escape=False),
to_para("<b>服务功能描述</b>", escape=False), to_para("<b>服务参数描述</b>", escape=False),
to_para("<b>服务返回值描述</b>", escape=False)]
]
for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API
endpoint_name = endpoint.get('endpoint_name', 'N/A')
# 简化的功能描述
if 'Create' in endpoint_name:
func_desc = "提供数据创建服务"
elif 'List' in endpoint_name or 'Query' in endpoint_name:
func_desc = "提供数据查询和列表服务"
elif 'Read' in endpoint_name:
func_desc = "提供单条数据读取服务"
elif 'Update' in endpoint_name:
func_desc = "提供数据更新服务"
elif 'Delete' in endpoint_name:
func_desc = "提供数据删除服务"
else:
func_desc = "提供数据管理服务"
api_list_data.append([
to_para(str(i), small_style),
to_para(endpoint_name, small_style),
to_para(func_desc, small_style),
to_para("标准DMS参数格式", small_style),
to_para("标准DMS响应格式", small_style)
])
api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80])
api_list_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(api_list_table)
elements.append(Spacer(1, 20))
# 测试用例列表 - 根据严格等级分为必须和非必须
elements.append(to_para("测试用例列表", heading_style, escape=False))
# 定义严重性等级的数值映射
severity_levels = {
'CRITICAL': 5,
'HIGH': 4,
'MEDIUM': 3,
'LOW': 2,
'INFO': 1
}
strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL
# 收集所有测试用例包括endpoint用例和stage用例
all_test_cases = []
failed_test_cases = [] # 专门收集失败的测试用例
# 1. 收集endpoint测试用例
for endpoint_result in endpoint_results:
test_cases = endpoint_result.get('executed_test_cases', [])
for tc in test_cases:
tc_severity = tc.get('test_case_severity', 'MEDIUM')
tc_severity_value = severity_levels.get(tc_severity, 3)
tc_status = tc.get('status', 'N/A')
tc_message = tc.get('message', '')
test_case_info = {
'type': 'Endpoint',
'endpoint': endpoint_result.get('endpoint_name', 'N/A'),
'endpoint_id': endpoint_result.get('endpoint_id', 'N/A'),
'case_name': tc.get('test_case_name', 'N/A'),
'case_id': tc.get('test_case_id', 'N/A'),
'status': tc_status,
'message': tc_message,
'severity': tc_severity,
'severity_value': tc_severity_value,
'is_required': tc_severity_value >= strictness_value,
'duration': tc.get('duration_seconds', 0),
'timestamp': tc.get('timestamp', '')
}
all_test_cases.append(test_case_info)
# 收集失败的测试用例
if tc_status in ['失败', 'FAILED', '错误', 'ERROR']:
failed_test_cases.append(test_case_info)
# 2. 收集stage测试用例
stage_results = summary_data.get('stage_results', [])
for stage_result in stage_results:
stage_name = stage_result.get('stage_name', 'N/A')
stage_status = stage_result.get('overall_status', 'N/A')
stage_message = stage_result.get('message', stage_result.get('error_message', ''))
stage_severity = 'HIGH' # Stage用例通常是高优先级
stage_severity_value = severity_levels.get(stage_severity, 4)
# 将stage作为一个测试用例添加
stage_case_info = {
'type': 'Stage',
'endpoint': f"Stage: {stage_name}",
'endpoint_id': f"STAGE_{stage_name}",
'case_name': stage_result.get('description', stage_name),
'case_id': f"STAGE_{stage_name}",
'status': stage_status,
'message': stage_message,
'severity': stage_severity,
'severity_value': stage_severity_value,
'is_required': stage_severity_value >= strictness_value,
'duration': stage_result.get('duration_seconds', 0),
'timestamp': stage_result.get('start_time', '')
}
all_test_cases.append(stage_case_info)
# 收集失败的stage用例
if stage_status in ['失败', 'FAILED', '错误', 'ERROR']:
failed_test_cases.append(stage_case_info)
# 分离必须和非必须的测试用例
required_cases = [case for case in all_test_cases if case['is_required']]
optional_cases = [case for case in all_test_cases if not case['is_required']]
# 创建分离的测试用例表格
if all_test_cases:
# 添加严格等级说明
strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。"
elements.append(to_para(strictness_text, small_style))
elements.append(Spacer(1, 10))
# 1. 必须的测试用例表格
if required_cases:
elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False))
required_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(required_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
required_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45])
required_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(required_table)
elements.append(Spacer(1, 15))
# 2. 非必须的测试用例表格
if optional_cases:
elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False))
optional_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(optional_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
optional_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45])
optional_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(optional_table)
elements.append(Spacer(1, 10))
# 添加用例统计信息
total_cases = len(all_test_cases)
endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint'])
stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage'])
required_count = len(required_cases)
optional_count = len(optional_cases)
stats_text = f"""测试用例统计:
总计 {total_cases} 个用例,其中端点用例 {endpoint_cases} 个,阶段用例 {stage_cases} 个。
必须用例 {required_count} 个,非必须用例 {optional_count} 个。
严格等级:{strictness_level}{severity_levels.get(strictness_level, 5)}级及以上为必须)。"""
elements.append(to_para(stats_text, small_style))
else:
elements.append(to_para("无测试用例执行记录。", normal_style))
elements.append(Spacer(1, 20))
# 失败用例详情部分
if failed_test_cases:
elements.append(to_para("失败用例详情分析", heading_style, escape=False))
elements.append(Spacer(1, 10))
# 按严重性分组失败用例
critical_failures = [tc for tc in failed_test_cases if tc['severity'] == 'CRITICAL']
high_failures = [tc for tc in failed_test_cases if tc['severity'] == 'HIGH']
medium_failures = [tc for tc in failed_test_cases if tc['severity'] == 'MEDIUM']
low_failures = [tc for tc in failed_test_cases if tc['severity'] == 'LOW']
failure_summary = f"""失败用例统计:
总计 {len(failed_test_cases)} 个失败用例,其中:
• 严重级别:{len(critical_failures)}
• 高级别:{len(high_failures)}
• 中级别:{len(medium_failures)}
• 低级别:{len(low_failures)}
以下是详细的失败原因分析:"""
elements.append(to_para(failure_summary, normal_style))
elements.append(Spacer(1, 15))
# 详细失败用例列表
for i, failed_case in enumerate(failed_test_cases, 1):
# 用例标题
case_title = f"{i}. {failed_case['case_name']}"
elements.append(to_para(case_title, ParagraphStyle('case_title', parent=normal_style, fontSize=11, textColor=colors.darkred, spaceAfter=5)))
# 用例基本信息
case_info = f"""• 用例ID{failed_case['case_id']}
• 所属端点:{failed_case['endpoint']}
• 严重级别:{failed_case['severity']}
• 执行状态:{failed_case['status']}"""
elements.append(to_para(case_info, ParagraphStyle('case_info', parent=small_style, leftIndent=15, spaceAfter=5)))
# 失败原因
failure_reason = failed_case.get('message', '无详细错误信息')
if failure_reason:
elements.append(to_para("失败原因:", ParagraphStyle('failure_label', parent=normal_style, fontSize=10, textColor=colors.darkblue, leftIndent=15)))
# 处理长文本确保在PDF中正确显示
if len(failure_reason) > 200:
# 对于很长的错误信息,进行适当的分段
failure_reason = failure_reason[:200] + "..."
elements.append(to_para(failure_reason, ParagraphStyle('failure_reason', parent=small_style, leftIndent=30, rightIndent=20, spaceAfter=10, textColor=colors.red)))
# 添加分隔线
if i < len(failed_test_cases):
elements.append(HRFlowable(width="80%", thickness=0.5, color=colors.lightgrey))
elements.append(Spacer(1, 10))
elements.append(Spacer(1, 20))
elements.append(Spacer(1, 20))
# 测试情况说明
elements.append(to_para("测试情况说明", heading_style, escape=False))
test_situation_text = f"""本次测试是对DMS领域数据管理服务V1.0版本下的{overall.get('endpoints_tested', 'N/A')}个API进行验证测试。
测试:累计发现缺陷{overall.get('test_cases_failed', 0)}个。
测试执行时间:{start_time_formatted}{end_time_formatted}
测试环境:开发测试环境
测试方法自动化API合规性测试"""
elements.append(to_para(test_situation_text, normal_style))
elements.append(Spacer(1, 20))
# 测试结论
elements.append(to_para("测试结论", heading_style, escape=False))
# 根据测试结果生成结论
success_rate = overall.get('test_case_success_rate', '0%')
success_rate_num = float(success_rate.replace('%', '')) if success_rate != 'N/A' else 0
if success_rate_num >= 90:
conclusion_status = "通过"
conclusion_text = f"""本套领域数据服务已通过环境验证系统可以正常运行。验收测试通过标准关于用例执行、DMS业务流相关文档等两个方面分析该项目通过验收测试。
测试用例成功率达到{success_rate},符合验收标准。"""
elif success_rate_num >= 70:
conclusion_status = "基本通过"
conclusion_text = f"""本套领域数据服务基本满足验收要求,但存在部分问题需要修复。测试用例成功率为{success_rate},建议修复失败用例后重新测试。"""
else:
conclusion_status = "不通过"
conclusion_text = f"""本套领域数据服务未达到验收标准,存在较多问题需要修复。测试用例成功率仅为{success_rate},需要全面检查和修复后重新测试。"""
elements.append(to_para(conclusion_text, normal_style))
elements.append(Spacer(1, 20))
# 检测依据
elements.append(to_para("检测依据", heading_style, escape=False))
detection_basis_text = """集成开发应用支撑系统开放数据生态数据共享要求和评价第1部分关于DMS领域数据服务的接口要求和测试细则。
参考标准:
1. DMS数据管理系统API规范V1.0
2. RESTful API设计规范
3. 数据安全和隐私保护要求
4. 系统集成测试标准"""
elements.append(to_para(detection_basis_text, normal_style))
elements.append(Spacer(1, 20))
# 报告生成信息
elements.append(to_para("报告生成信息", heading_style, escape=False))
generation_info_data = [
[to_para("<b>生成时间</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d%H:%M:%S'))],
[to_para("<b>生成工具</b>", escape=False), to_para("DMS合规性测试工具")],
[to_para("<b>工具版本</b>", escape=False), to_para("V1.0.0")],
[to_para("<b>测试结论</b>", escape=False), to_para(f"<b>{conclusion_status}</b>", escape=False)],
]
generation_info_table = Table(generation_info_data, colWidths=[120, '*'])
generation_info_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
]))
elements.append(generation_info_table)
# 构建PDF
doc.build(elements)
logger.info(f"PDF报告已成功生成: {output_path}")
except Exception as e:
logger.error(f"构建PDF文档时出错: {e}", exc_info=True)
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
import uuid
# Add a global executor for background tasks
executor = ThreadPoolExecutor(max_workers=4)
# Store running tasks
running_tasks = {}
def run_tests_background(config_dict: dict, report_id: str):
"""Background function to run tests"""
try:
result = run_tests_logic(config_dict)
result_status = result.get("status", "completed")
task_state = running_tasks.get(report_id, {}).copy()
task_state.update({
"status": result_status,
"result": result,
"report_directory": result.get("report_directory", task_state.get("report_directory")),
"completed_at": datetime.datetime.now().isoformat()
})
if result_status == "error":
task_state["error"] = result.get("message")
task_state["traceback"] = result.get("traceback")
running_tasks[report_id] = task_state
except Exception as e:
task_state = running_tasks.get(report_id, {}).copy()
task_state.update({
"status": "error",
"error": str(e),
"traceback": traceback.format_exc(),
"completed_at": datetime.datetime.now().isoformat()
})
running_tasks[report_id] = task_state
@app.post("/run",
summary="执行API合规性测试",
description="""
执行API合规性测试的主要端点异步执行
立即返回报告ID测试在后台运行。
""",
response_model=TestStartResponse,
responses={
200: {"description": "测试执行成功"},
400: {"description": "请求参数错误", "model": ErrorResponse},
500: {"description": "服务器内部错误", "model": ErrorResponse}
})
async def run_api_tests(config: TestConfig):
"""
异步执行API合规性测试立即返回报告ID
"""
try:
# Generate report ID and directory
report_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + str(uuid.uuid4())[:8]
report_directory_path = (Path(REPORTS_DIR) / report_id).resolve()
# Prepare config
if hasattr(config, "model_dump"):
raw_config = config.model_dump(exclude_none=True)
else:
raw_config = config.dict(exclude_none=True)
mode = raw_config.pop('mode')
hidden_defaults = {
"categories": [],
"tags": [],
"ignore_ssl": True,
"output": "./test_reports",
"generate_pdf": True,
"custom_test_cases_dir": "./custom_testcases",
"stages_dir": "./custom_stages",
"llm_api_key": "sk-lbGrsUPL1iby86h554FaE536C343435dAa9bA65967A840B2",
"llm_base_url": "https://aiproxy.petrotech.cnpc/v1",
"llm_model_name": "deepseek-v3",
"use_llm_for_request_body": False,
"use_llm_for_path_params": False,
"use_llm_for_query_params": False,
"use_llm_for_headers": False,
"verbose": False
}
config_dict = hidden_defaults.copy()
for key, value in raw_config.items():
if value is None:
continue
if isinstance(value, str) and not value.strip():
continue
config_dict[key] = value
for default_path_key in ("custom_test_cases_dir", "stages_dir"):
default_path = config_dict.get(default_path_key)
if default_path:
try:
Path(default_path).mkdir(parents=True, exist_ok=True)
except Exception as path_error:
logger.warning(f"无法创建默认目录 {default_path}: {path_error}")
# Set file paths based on mode
if mode == 'yapi':
config_dict['yapi'] = "./assets/doc/yapi/yapi.json"
elif mode == 'swagger':
config_dict['swagger'] = "./assets/doc/swagger/swagger.json"
elif mode == 'dms':
config_dict['dms'] = "./assets/doc/dms/domain.json"
config_dict.update({
"report_id": report_id,
"output": REPORTS_DIR
})
# Mark task as running
running_tasks[report_id] = {
"status": "running",
"report_id": report_id,
"report_directory": str(report_directory_path),
"created_at": datetime.datetime.now().isoformat(),
"status_url": f"/status/{report_id}",
"report_url": f"/reports/{report_id}"
}
# Submit to background executor
executor.submit(run_tests_background, config_dict, report_id)
return {
"status": "started",
"report_id": report_id,
"message": "测试已开始请使用report_id查询结果",
"status_url": f"/status/{report_id}",
"report_url": f"/reports/{report_id}",
"report_directory": str(report_directory_path)
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={"status": "error", "message": str(e)}
)
@app.get("/status/{report_id}",
summary="查询测试状态",
description="根据报告ID查询测试执行状态")
async def get_test_status(report_id: str):
"""查询测试执行状态"""
if report_id not in running_tasks:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Report ID not found"
)
return running_tasks[report_id]
@app.get("/reports/{report_id}",
summary="下载测试报告",
description="根据报告ID下载对应的测试报告文件")
async def download_report(report_id: str, file_type: str = "summary.json"):
"""
下载测试报告文件
- report_id: 报告ID通常是时间戳
- file_type: 文件类型可选值summary.json, api_call_details.md
"""
try:
report_dir = Path("./test_reports") / report_id
file_path = report_dir / file_type
if not file_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Report file not found: {file_type}"
)
return FileResponse(
path=str(file_path),
filename=file_type,
media_type='application/octet-stream'
)
except Exception as e:
logger.error(f"Error downloading report: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error downloading report: {str(e)}"
)
@app.get("/reports",
summary="列出所有测试报告",
description="获取所有可用的测试报告列表")
async def list_reports():
"""列出所有可用的测试报告"""
try:
reports_dir = Path("./test_reports")
if not reports_dir.exists():
return {"reports": []}
reports = []
for report_dir in reports_dir.iterdir():
if report_dir.is_dir():
summary_file = report_dir / "summary.json"
if summary_file.exists():
try:
with open(summary_file, 'r', encoding='utf-8') as f:
summary = json.load(f)
reports.append({
"id": report_dir.name,
"timestamp": report_dir.name,
"path": str(report_dir),
# "summary": {
# "endpoints_total": summary.get("endpoints_total", 0),
# "endpoints_passed": summary.get("endpoints_passed", 0),
# "endpoints_failed": summary.get("endpoints_failed", 0),
# "test_cases_total": summary.get("test_cases_total", 0)
# }
})
except Exception as e:
logger.warning(f"Error reading summary for {report_dir.name}: {e}")
# Sort by timestamp (newest first)
reports.sort(key=lambda x: x["timestamp"], reverse=True)
return {"reports": reports}
except Exception as e:
logger.error(f"Error listing reports: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error listing reports: {str(e)}"
)
# History viewer routes with /history prefix
@app.get("/history/login", response_class=HTMLResponse)
async def login_page(request: Request):
# Simple login form without template dependency
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>登录</title>
<link rel="stylesheet" href="/static/style.css">
<style>
.login-container { max-width: 400px; margin: 100px auto; padding: 20px; border: 1px solid #ddd; border-radius: 5px; }
.form-group { margin-bottom: 15px; }
.form-group label { display: block; margin-bottom: 5px; }
.form-group input { width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 3px; }
.btn { background: #007bff; color: white; padding: 10px 20px; border: none; border-radius: 3px; cursor: pointer; }
.error { color: red; margin-top: 10px; }
</style>
</head>
<body>
<div class="login-container">
<h2>登录</h2>
<form method="post" action="/history/login">
<div class="form-group">
<label for="username">用户名:</label>
<input type="text" id="username" name="username" required>
</div>
<div class="form-group">
<label for="password">密码:</label>
<input type="password" id="password" name="password" required>
</div>
<button type="submit" class="btn">登录</button>
</form>
</div>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.post("/history/login")
async def login(request: Request, username: str = Form(...), password: str = Form(...)):
conn = get_db()
user = conn.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
conn.close()
if user and check_password_hash(user['password_hash'], password):
response = RedirectResponse(url="/history", status_code=302)
response.set_cookie("session_token", create_session_token(username))
return response
# Return login form with error
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>登录</title>
<link rel="stylesheet" href="/static/style.css">
<style>
.login-container { max-width: 400px; margin: 100px auto; padding: 20px; border: 1px solid #ddd; border-radius: 5px; }
.form-group { margin-bottom: 15px; }
.form-group label { display: block; margin-bottom: 5px; }
.form-group input { width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 3px; }
.btn { background: #007bff; color: white; padding: 10px 20px; border: none; border-radius: 3px; cursor: pointer; }
.error { color: red; margin-top: 10px; }
</style>
</head>
<body>
<div class="login-container">
<h2>登录</h2>
<form method="post" action="/history/login">
<div class="form-group">
<label for="username">用户名:</label>
<input type="text" id="username" name="username" required>
</div>
<div class="form-group">
<label for="password">密码:</label>
<input type="password" id="password" name="password" required>
</div>
<button type="submit" class="btn">登录</button>
<div class="error">用户名或密码错误</div>
</form>
</div>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.get("/history/logout")
async def logout():
response = RedirectResponse(url="/history/login", status_code=302)
response.delete_cookie("session_token")
return response
@app.get("/history", response_class=HTMLResponse)
async def list_history(request: Request):
# Temporarily remove login requirement
user = {"username": "admin"} # Mock user
history = []
reports_path = Path(REPORTS_DIR)
if reports_path.is_dir():
run_dirs = [d for d in reports_path.iterdir() if d.is_dir()]
run_dirs.sort(key=lambda x: x.name, reverse=True)
for run_dir in run_dirs:
summary_path = run_dir / 'summary.json'
details_path = run_dir / 'api_call_details.md'
run_info = {'id': run_dir.name, 'summary': None, 'has_details': details_path.exists()}
if summary_path.exists():
try:
with open(summary_path, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
run_info['summary'] = summary_data
except Exception as e:
logger.error(f"Error reading summary {summary_path}: {e}")
run_info['summary'] = {'error': '无法加载摘要'}
history.append(run_info)
# Enhanced HTML response for history list with color differentiation
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>测试历史记录</title>
<link rel="stylesheet" href="/static/style.css">
<link rel="stylesheet" href="/static/history_style.css">
<style>
.history-table {{
width: 100%;
border-collapse: collapse;
margin-top: 20px;
background: white;
border-radius: 8px;
overflow: hidden;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}}
.history-table th {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 15px 10px;
text-align: center;
font-weight: 600;
border: none;
}}
.history-table td {{
padding: 12px 10px;
text-align: center;
border-bottom: 1px solid #f0f0f0;
vertical-align: middle;
}}
.history-table tr:hover {{
background-color: #f8f9ff;
transform: translateY(-1px);
transition: all 0.2s ease;
}}
.status-success {{
background-color: #d4edda !important;
color: #155724;
}}
.status-warning {{
background-color: #fff3cd !important;
color: #856404;
}}
.status-danger {{
background-color: #f8d7da !important;
color: #721c24;
}}
.status-info {{
background-color: #d1ecf1 !important;
color: #0c5460;
}}
.button {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 8px 16px;
border: none;
border-radius: 20px;
text-decoration: none;
font-size: 12px;
font-weight: 500;
transition: all 0.3s ease;
display: inline-block;
}}
.button:hover {{
transform: translateY(-2px);
box-shadow: 0 4px 15px rgba(102, 126, 234, 0.4);
color: white;
text-decoration: none;
}}
.stats-badge {{
display: inline-block;
padding: 4px 8px;
border-radius: 12px;
font-size: 11px;
font-weight: 600;
margin: 0 2px;
}}
.badge-success {{ background: #28a745; color: white; }}
.badge-danger {{ background: #dc3545; color: white; }}
.badge-warning {{ background: #ffc107; color: #212529; }}
.badge-info {{ background: #17a2b8; color: white; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>测试历史记录</h1>
<div class="user-info">
<a href="/history/llm-config">LLM 标准配置</a> |
<span>欢迎, {user['username']}</span> |
<a href="/history/logout">登出</a>
</div>
</div>
<div class="content">
<table class="history-table">
<thead>
<tr>
<th>运行ID</th>
<th>开始时间</th>
<th>结束时间</th>
<th>端点统计</th>
<th>成功率</th>
<th>耗时(秒)</th>
<th>操作</th>
</tr>
</thead>
<tbody>
"""
for run in history:
if run['summary'] and 'error' not in run['summary']:
overall = run['summary'].get('overall_summary', {})
start_time = run['summary'].get('start_time', 'N/A')
end_time = run['summary'].get('end_time', 'N/A')
duration = run['summary'].get('duration_seconds', 0)
# Get statistics
endpoints_tested = overall.get('endpoints_tested', 0)
endpoints_passed = overall.get('endpoints_passed', 0)
endpoints_failed = overall.get('endpoints_failed', 0)
# Calculate success rate (passed + skipped = success, only failed = failure)
if endpoints_tested > 0:
# Skipped tests should be considered as passed (not failed)
endpoints_success = endpoints_tested - endpoints_failed
success_rate = (endpoints_success / endpoints_tested) * 100
if success_rate >= 90:
row_class = "status-success"
elif success_rate >= 70:
row_class = "status-warning"
elif endpoints_failed > 0:
row_class = "status-danger"
else:
row_class = "status-info"
success_rate_str = f"{success_rate:.1f}%"
else:
row_class = "status-info"
success_rate_str = "N/A"
# Ensure duration is a number
try:
duration_float = float(duration) if duration else 0.0
duration_str = f"{duration_float:.2f}"
except (ValueError, TypeError):
duration_str = "N/A"
# Format time strings (remove microseconds if present)
start_display = start_time.split('.')[0] if start_time != 'N/A' else 'N/A'
end_display = end_time.split('.')[0] if end_time != 'N/A' else 'N/A'
# Calculate skipped count
endpoints_skipped = endpoints_tested - endpoints_passed - endpoints_failed
# Create stats badges
stats_html = f"""
<span class="stats-badge badge-info">{endpoints_tested} 总计</span>
<span class="stats-badge badge-success">{endpoints_passed} 通过</span>
{f'<span class="stats-badge badge-warning">{endpoints_skipped} 跳过</span>' if endpoints_skipped > 0 else ''}
{f'<span class="stats-badge badge-danger">{endpoints_failed} 失败</span>' if endpoints_failed > 0 else ''}
"""
html_content += f"""
<tr class="{row_class}">
<td><strong>{run['id']}</strong></td>
<td>{start_display}</td>
<td>{end_display}</td>
<td>{stats_html}</td>
<td><strong>{success_rate_str}</strong></td>
<td>{duration_str}</td>
<td>
<a href="/history/details/{run['id']}" class="button">查看详情</a>
</td>
</tr>
"""
else:
html_content += f"""
<tr class="status-danger">
<td><strong>{run['id']}</strong></td>
<td colspan="5">加载失败</td>
<td>
<a href="/history/details/{run['id']}" class="button">查看详情</a>
</td>
</tr>
"""
if not history:
html_content += """
<tr>
<td colspan="7" style="text-align: center; padding: 40px; color: #6c757d;">
<div style="font-size: 16px; margin-bottom: 10px;">📊</div>
<div>暂无测试记录</div>
<div style="font-size: 12px; margin-top: 5px;">运行测试后记录将显示在这里</div>
</td>
</tr>
"""
html_content += """
</tbody>
</table>
</div>
</div>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.get("/history/details/{run_id}", response_class=HTMLResponse)
async def show_details(request: Request, run_id: str):
# Temporarily remove login requirement
user = {"username": "admin"} # Mock user
run_id = Path(run_id).name # Sanitize
run_dir = Path(REPORTS_DIR) / run_id
if not run_dir.is_dir():
raise HTTPException(status_code=404, detail="找不到指定的测试记录")
summary_path = run_dir / 'summary.json'
details_path = run_dir / 'api_call_details.md'
pdf_path = run_dir / 'report_cn.pdf'
summary_content = "{}"
details_content = "### 未找到API调用详情报告"
has_pdf_report = pdf_path.exists()
has_md_report = details_path.exists()
if summary_path.exists():
try:
with open(summary_path, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
summary_content = json.dumps(summary_data, indent=2, ensure_ascii=False)
except Exception as e:
summary_content = f"加载摘要文件出错: {e}"
if has_md_report:
try:
with open(details_path, 'r', encoding='utf-8') as f:
details_content = markdown.markdown(f.read(), extensions=['fenced_code', 'tables'])
except Exception as e:
details_content = f"加载详情文件出错: {e}"
# Generate download buttons
download_buttons = ""
if has_pdf_report:
download_buttons += f'<a href="/history/download/{run_id}/report_cn.pdf" class="button download-btn">下载PDF报告</a> '
if has_md_report:
download_buttons += f'<a href="/history/download/{run_id}/api_call_details.md" class="button download-btn">下载MD报告</a> '
download_buttons += f'<a href="/history/download/{run_id}/summary.json" class="button download-btn">下载JSON报告</a>'
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>测试详情 - {run_id}</title>
<link rel="stylesheet" href="/static/style.css">
<link rel="stylesheet" href="/static/history_style.css">
<style>
.floating-nav {{
position: fixed;
right: 20px;
top: 50%;
transform: translateY(-50%);
background: white;
border: 1px solid #ddd;
border-radius: 8px;
padding: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
z-index: 1000;
}}
.floating-nav a {{
display: block;
padding: 8px 12px;
text-decoration: none;
color: #007bff;
border-radius: 4px;
margin-bottom: 5px;
font-size: 12px;
text-align: center;
background: #f8f9fa;
border: 1px solid #dee2e6;
}}
.floating-nav a:hover {{
background: #e9ecef;
}}
.content-section {{
margin-bottom: 30px;
padding: 20px;
background: white;
border-radius: 8px;
box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}}
.section-title {{
color: #333;
border-bottom: 2px solid #007bff;
padding-bottom: 10px;
margin-bottom: 20px;
}}
</style>
</head>
<body>
<div class="floating-nav">
{f'<a href="#pdf-section">PDF报告</a>' if has_pdf_report else ''}
<a href="#summary-section">JSON摘要</a>
<a href="#details-section">调用详情</a>
<div style="border-top: 1px solid #ddd; margin: 10px 0; padding-top: 10px;">
{download_buttons.replace('class="button download-btn"', 'style="margin-bottom: 5px; font-size: 11px;"')}
</div>
</div>
<div class="container">
<div class="header">
<h1>测试详情: {run_id}</h1>
<div class="header-controls">
<div class="user-info">
<a href="/history">返回列表</a> |
<a href="/history/llm-config">LLM 标准配置</a> |
<span>欢迎, {user['username']}</span> |
<a href="/history/logout">登出</a>
</div>
</div>
</div>
<div class="content">
{f'''
<div id="pdf-section" class="content-section">
<h2 class="section-title">PDF 报告</h2>
<iframe src="/history/view_pdf/{run_id}" width="100%" height="800px" style="border:1px solid #ccc; border-radius: 4px;"></iframe>
</div>
''' if has_pdf_report else ''}
<div id="summary-section" class="content-section">
<h2 class="section-title">测试摘要 (JSON)</h2>
<pre style="background: #f8f9fa; padding: 15px; border-radius: 5px; overflow-x: auto; border: 1px solid #dee2e6; font-size: 12px; line-height: 1.4;">{html.escape(summary_content)}</pre>
</div>
<div id="details-section" class="content-section">
<h2 class="section-title">API调用详情</h2>
<div style="border: 1px solid #dee2e6; padding: 15px; border-radius: 5px; background: #fefefe;">
{details_content}
</div>
</div>
</div>
</div>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.get("/history/download/{run_id}/{filename}")
async def download_history_report(run_id: str, filename: str):
# Temporarily remove login requirement
run_id_safe = Path(run_id).name
filename_safe = Path(filename).name
reports_dir = Path(REPORTS_DIR).resolve()
run_dir = (reports_dir / run_id_safe).resolve()
if not run_dir.is_dir() or run_dir.parent != reports_dir:
raise HTTPException(status_code=404, detail="找不到指定的测试记录")
file_path = run_dir / filename_safe
if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
return FileResponse(path=str(file_path), filename=filename_safe)
@app.get("/history/view_pdf/{run_id}")
async def view_pdf_report(run_id: str):
# Temporarily remove login requirement
run_id_safe = Path(run_id).name
filename_safe = "report_cn.pdf"
reports_dir = Path(REPORTS_DIR).resolve()
run_dir = (reports_dir / run_id_safe).resolve()
if not run_dir.is_dir() or run_dir.parent != reports_dir:
raise HTTPException(status_code=404, detail="找不到指定的测试记录")
pdf_path = run_dir / filename_safe
if not pdf_path.exists():
raise HTTPException(status_code=404, detail="未找到PDF报告文件")
return FileResponse(path=str(pdf_path), media_type="application/pdf")
@app.get("/history/llm-config", response_class=HTMLResponse)
async def llm_config_page(request: Request):
# Temporarily remove login requirement
user = {"username": "admin"} # Mock user
criteria_file_path = Path('./custom_testcases/llm/compliance_criteria.json')
criteria_for_template = []
file_exists = criteria_file_path.exists()
if file_exists:
try:
with open(criteria_file_path, 'r', encoding='utf-8') as f:
criteria_for_template = json.load(f)
if not isinstance(criteria_for_template, list):
criteria_for_template = []
except Exception as e:
logger.error(f"Error reading criteria file: {e}")
criteria_for_template = []
# Generate criteria HTML
criteria_html = ""
for i, criterion in enumerate(criteria_for_template):
criteria_html += f'<div class="criteria-item"><input type="text" name="criteria" value="{html.escape(criterion)}" placeholder="输入合规性标准..."></div>'
if not criteria_html:
criteria_html = '<div class="criteria-item"><input type="text" name="criteria" value="" placeholder="输入合规性标准..."></div>'
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>LLM 合规性标准配置</title>
<link rel="stylesheet" href="/static/style.css">
<link rel="stylesheet" href="/static/history_style.css">
<style>
.editor-section {{ margin-bottom: 20px; }}
.info-box {{ background-color: #f8f9fa; border: 1px solid #dee2e6; padding: 15px; border-radius: 5px; margin-top: 10px; }}
.criteria-item {{ margin-bottom: 10px; }}
.criteria-item input {{ width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 3px; }}
.btn {{ background: #007bff; color: white; padding: 10px 20px; border: none; border-radius: 3px; cursor: pointer; margin-right: 10px; }}
.btn-secondary {{ background: #6c757d; }}
</style>
<script>
function addCriteria() {{
const container = document.getElementById('criteria-container');
const div = document.createElement('div');
div.className = 'criteria-item';
div.innerHTML = '<input type="text" name="criteria" value="" placeholder="输入合规性标准...">';
container.appendChild(div);
}}
</script>
</head>
<body>
<div class="container">
<div class="header">
<h1>LLM 合规性标准配置</h1>
<div class="user-info">
<a href="/history">返回列表</a> |
<span>欢迎, {user['username']}</span> |
<a href="/history/logout">登出</a>
</div>
</div>
<div class="content">
<form method="post" action="/history/llm-config">
<div class="editor-section">
<h3>合规性标准列表</h3>
<div id="criteria-container">
{criteria_html}
</div>
<button type="button" onclick="addCriteria()" class="btn btn-secondary">添加标准</button>
</div>
<div class="editor-section">
<button type="submit" class="btn">保存配置</button>
</div>
</form>
<div class="info-box">
<h4>使用说明</h4>
<p>这些标准将用于LLM评估API的合规性。每个标准应该是一个明确的要求或规则。</p>
</div>
</div>
</div>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.post("/history/llm-config")
async def llm_config_save(request: Request):
# Temporarily remove login requirement
form = await request.form()
criteria_list = form.getlist('criteria')
criteria_list = [item.strip() for item in criteria_list if item.strip()]
criteria_file_path = Path('./custom_testcases/llm/compliance_criteria.json')
criteria_file_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(criteria_file_path, 'w', encoding='utf-8') as f:
json.dump(criteria_list, f, indent=2, ensure_ascii=False)
message = "LLM合规性标准已成功保存"
except Exception as e:
message = f"保存文件时发生错误: {e}"
return templates.TemplateResponse("llm_config.html", {
"request": request,
"criteria": criteria_list,
"file_exists": True,
"message": message,
"example_api_info": json.dumps({}, indent=2, ensure_ascii=False)
})
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="DMS合规性测试工具 FastAPI服务器")
parser.add_argument("--host", default="0.0.0.0", help="服务器主机地址")
parser.add_argument("--port", type=int, default=5050, help="服务器端口")
parser.add_argument("--reload", action="store_true", help="启用自动重载(开发模式)")
parser.add_argument("--workers", type=int, default=1, help="工作进程数")
args = parser.parse_args()
logger.info(f"Starting FastAPI server on {args.host}:{args.port}")
logger.info(f"API文档地址: http://{args.host}:{args.port}/docs")
logger.info(f"ReDoc文档地址: http://{args.host}:{args.port}/redoc")
uvicorn.run(
"fastapi_server:app",
host=args.host,
port=args.port,
reload=args.reload,
workers=args.workers if not args.reload else 1,
log_level="info"
)