This commit is contained in:
ruoyunbai 2025-09-29 09:42:45 +08:00
parent 93043a0366
commit cd9f7847b2
4 changed files with 2000 additions and 96 deletions

View File

@ -11,7 +11,7 @@ import logging
import datetime
import traceback
from pathlib import Path
from typing import List, Optional, Dict, Any, Union
from typing import List, Optional, Dict, Any, Union, Literal
import unicodedata
import html
@ -21,6 +21,11 @@ from fastapi.responses import JSONResponse, FileResponse, HTMLResponse, Redirect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.openapi.docs import (
get_redoc_html,
get_swagger_ui_html,
get_swagger_ui_oauth2_redirect_html,
)
from pydantic import BaseModel, Field, field_validator, model_validator
import uvicorn
@ -65,16 +70,16 @@ app = FastAPI(
DMS合规性测试工具 FastAPI版本
这是一个用于API合规性测试的工具支持
YAPI规范测试 - 基于YAPI定义文件的测试
Swagger/OpenAPI测试 - 基于OpenAPI规范的测试
Swagger/OpenAPI测试 - 基于OpenAPI规范的测试
DMS服务发现测试 - 动态发现DMS服务的API进行测试
分页支持 - 支持大量API的分页获取避免内存溢出
PDF报告生成 - 生成详细的测试报告
LLM集成 - 支持大语言模型辅助生成测试数据
主要特性
🚀 高性能: 基于FastAPI支持异步处理
📊 分页支持: 解决大量API节点的内存问题
📝 自动文档: 自动生成交互式API文档
@ -82,8 +87,8 @@ app = FastAPI(
📈 详细报告: 生成PDF和JSON格式的测试报告
""",
version="1.0.0",
docs_url="/docs", # Swagger UI
redoc_url="/redoc", # ReDoc
docs_url=None, # 禁用默认的Swagger UI
redoc_url=None, # 禁用默认的ReDoc
)
# Add CORS middleware
@ -166,6 +171,15 @@ class ErrorResponse(BaseModel):
message: str = Field(description="错误消息")
traceback: Optional[str] = Field(None, description="错误堆栈跟踪")
class TestStartResponse(BaseModel):
"""测试异步启动响应模型"""
status: Literal["started"] = Field(description="任务当前状态")
message: str = Field(description="状态消息")
report_id: str = Field(description="报告ID")
status_url: str = Field(description="查询任务状态的URL")
report_url: str = Field(description="获取测试报告的URL")
report_directory: Optional[str] = Field(None, description="报告在服务器上的存储路径")
# Global variable to store running tasks
running_tasks: Dict[str, Dict[str, Any]] = {}
@ -225,8 +239,34 @@ def create_session_token(username: str):
# Initialize database on startup
init_db()
@app.get("/",
summary="健康检查",
# Custom documentation routes using local static files
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html():
"""自定义Swagger UI使用本地静态文件"""
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title=app.title + " - Swagger UI",
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
swagger_js_url="/static/swagger-ui-bundle.js",
swagger_css_url="/static/swagger-ui.css",
)
@app.get("/redoc", include_in_schema=False)
async def redoc_html():
"""自定义ReDoc使用本地静态文件"""
return get_redoc_html(
openapi_url=app.openapi_url,
title=app.title + " - ReDoc",
redoc_js_url="/static/redoc.standalone.js",
)
@app.get(app.swagger_ui_oauth2_redirect_url, include_in_schema=False)
async def swagger_ui_redirect():
"""OAuth2重定向助手"""
return get_swagger_ui_oauth2_redirect_html()
@app.get("/",
summary="健康检查",
description="检查API服务器是否正常运行",
response_model=Dict[str, str])
async def health_check():
@ -285,10 +325,14 @@ def run_tests_logic(config: dict):
if sum(k in config for k in ['yapi', 'swagger', 'dms']) > 1:
raise ValueError("API definition sources are mutually exclusive.")
# Setup output directory with timestamp
# Setup output directory, reuse provided report_id when available
base_output_dir = Path(config.get('output', './test_reports'))
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_directory = base_output_dir / timestamp
report_id = config.get('report_id')
if report_id:
output_directory = base_output_dir / report_id
else:
report_id = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_directory = base_output_dir / report_id
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Test reports will be saved to: {output_directory.resolve()}")
print(f"config{config}")
@ -367,10 +411,11 @@ def run_tests_logic(config: dict):
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
result_status = "completed" if failed_count == 0 and error_count == 0 else "completed_with_issues"
result = {
"status": "completed",
"report_id":str(output_directory.resolve()).split('/')[-1],
"message": "Tests finished." if failed_count == 0 and error_count == 0 else "Tests finished with failures or errors.",
"status": result_status,
"report_id": report_id,
"message": "Tests finished." if result_status == "completed" else "Tests finished with failures or errors.",
"report_directory": str(output_directory.resolve()),
"summary": test_summary.to_dict()
}
@ -385,10 +430,16 @@ def run_tests_logic(config: dict):
except Exception as e:
logger.error(f"An unexpected error occurred during test execution: {e}", exc_info=True)
error_report_id = config.get('report_id')
error_output_dir = None
if error_report_id:
error_output_dir = str((Path(config.get('output', './test_reports')) / error_report_id).resolve())
return {
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
"traceback": traceback.format_exc(),
"report_id": error_report_id,
"report_directory": error_output_dir
}
def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
@ -994,27 +1045,51 @@ def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CR
except Exception as e:
logger.error(f"构建PDF文档时出错: {e}", exc_info=True)
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
import uuid
from datetime import datetime
# Add a global executor for background tasks
executor = ThreadPoolExecutor(max_workers=4)
# Store running tasks
running_tasks = {}
def run_tests_background(config_dict: dict, report_id: str):
"""Background function to run tests"""
try:
result = run_tests_logic(config_dict)
result_status = result.get("status", "completed")
task_state = running_tasks.get(report_id, {}).copy()
task_state.update({
"status": result_status,
"result": result,
"report_directory": result.get("report_directory", task_state.get("report_directory")),
"completed_at": datetime.now().isoformat()
})
if result_status == "error":
task_state["error"] = result.get("message")
task_state["traceback"] = result.get("traceback")
running_tasks[report_id] = task_state
except Exception as e:
task_state = running_tasks.get(report_id, {}).copy()
task_state.update({
"status": "error",
"error": str(e),
"traceback": traceback.format_exc(),
"completed_at": datetime.now().isoformat()
})
running_tasks[report_id] = task_state
@app.post("/run",
summary="执行API合规性测试",
description="""
执行API合规性测试的主要端点
支持三种API定义源
- YAPI: 基于YAPI定义文件
- Swagger/OpenAPI: 基于OpenAPI规范文件
- DMS: 动态发现DMS服务的API
分页支持
对于DMS测试支持分页获取API列表避免内存溢出
- `page_size`: 每页获取的API数量默认1000
- 返回详细的分页统计信息
LLM集成
可选择使用大语言模型生成测试数据
- 智能生成请求体路径参数查询参数等
- 提高测试覆盖率和数据多样性
执行API合规性测试的主要端点异步执行
立即返回报告ID测试在后台运行
""",
response_model=TestResponse,
response_model=TestStartResponse,
responses={
200: {"description": "测试执行成功"},
400: {"description": "请求参数错误", "model": ErrorResponse},
@ -1022,80 +1097,71 @@ def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CR
})
async def run_api_tests(config: TestConfig):
"""
执行API合规性测试
- config: 测试配置包含API定义源测试参数等
- returns: 测试结果包含摘要信息和分页信息如适用
异步执行API合规性测试立即返回报告ID
"""
try:
logger.info(f"Starting test run with configuration: {config.model_dump()}")
# Generate report ID and directory
report_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + str(uuid.uuid4())[:8]
report_directory_path = (Path(REPORTS_DIR) / report_id).resolve()
# Convert Pydantic model to dict for compatibility
config_dict = config.model_dump(exclude_none=True)
# Add hidden parameters with default values
hidden_defaults = {
"categories": [],
"tags": [],
"ignore_ssl": True,
"output": "./test_reports",
"generate_pdf": True,
"custom_test_cases_dir": "./custom_testcases",
"stages_dir": "./custom_stages",
"llm_api_key": "sk-lbGrsUPL1iby86h554FaE536C343435dAa9bA65967A840B2",
"llm_base_url": "https://aiproxy.petrotech.cnpc/v1",
"llm_model_name": "deepseek-v3",
"use_llm_for_request_body": False,
"use_llm_for_path_params": False,
"use_llm_for_query_params": False,
"use_llm_for_headers": False,
"verbose": False
# Prepare config
config_dict = config.dict()
mode = config_dict.pop('mode')
# Set file paths based on mode
if mode == 'yapi':
config_dict['yapi'] = "./assets/doc/yapi/yapi.json"
elif mode == 'swagger':
config_dict['swagger'] = "./assets/doc/swagger/swagger.json"
elif mode == 'dms':
config_dict['dms'] = "./assets/doc/dms/domain.json"
config_dict.update({
"report_id": report_id,
"output": REPORTS_DIR
})
# Mark task as running
running_tasks[report_id] = {
"status": "running",
"report_id": report_id,
"report_directory": str(report_directory_path),
"created_at": datetime.now().isoformat(),
"status_url": f"/status/{report_id}",
"report_url": f"/reports/{report_id}"
}
# Submit to background executor
executor.submit(run_tests_background, config_dict, report_id)
return {
"status": "started",
"report_id": report_id,
"message": "测试已开始请使用report_id查询结果",
"status_url": f"/status/{report_id}",
"report_url": f"/reports/{report_id}",
"report_directory": str(report_directory_path)
}
yapi="./assets/doc/yapi/yapi.json"
swagger="./assets/doc/swagger/swagger.json"
dms="./assets/doc/dms/domain.json"
# Merge hidden defaults with config
config_dict.update(hidden_defaults)
if config_dict['mode'] == 'yapi':
config_dict['yapi'] = yapi
elif config_dict['mode'] == 'swagger':
config_dict['swagger'] = swagger
elif config_dict['mode'] == 'dms':
config_dict['dms'] = dms
config_dict.pop('mode')
result = run_tests_logic(config_dict)
if result['status'] == 'error':
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result
)
return result
except ValueError as e:
logger.error(f"Validation error: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"status": "error",
"message": str(e)
}
)
except Exception as e:
logger.error(f"An error occurred in the API endpoint: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
}
detail={"status": "error", "message": str(e)}
)
@app.get("/status/{report_id}",
summary="查询测试状态",
description="根据报告ID查询测试执行状态")
async def get_test_status(report_id: str):
"""查询测试执行状态"""
if report_id not in running_tasks:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Report ID not found"
)
return running_tasks[report_id]
@app.get("/reports/{report_id}",
summary="下载测试报告",
description="根据报告ID下载对应的测试报告文件")
@ -1828,3 +1894,4 @@ if __name__ == "__main__":
workers=args.workers if not args.reload else 1,
log_level="info"
)

1832
static/redoc.standalone.js Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

3
static/swagger-ui.css Normal file

File diff suppressed because one or more lines are too long