diff --git a/create-compose-package-fastapi.bat b/create-compose-package-fastapi.bat
new file mode 100644
index 0000000..575e77c
--- /dev/null
+++ b/create-compose-package-fastapi.bat
@@ -0,0 +1,301 @@
+@echo off
+setlocal enabledelayedexpansion
+
+echo === DMS Compliance Tool Windows Final Version ===
+echo.
+
+REM Set defaults to avoid input issues
+set "SELECTED_SERVICE_ARCH=dual"
+set "SELECTED_PORTS=5050,5051"
+set "SERVICE_DESC=Dual Service - FastAPI Server and History Viewer"
+set "TARGET_PLATFORM=linux/amd64"
+set "TARGET_PLATFORM_NAME=AMD64 - Auto-detected"
+
+REM Configuration
+set "TIMESTAMP=%date:~10,4%%date:~4,2%%date:~7,2%-%time:~0,2%%time:~3,2%%time:~6,2%"
+set "TIMESTAMP=%TIMESTAMP: =0%"
+set "platform_suffix=amd64"
+set "EXPORT_DIR=dms-compliance-dual-amd64-windows-%TIMESTAMP%"
+set "IMAGE_NAME=compliance-dms-windows"
+
+echo [INFO] Using default configuration:
+echo [INFO] Architecture: %SERVICE_DESC%
+echo [INFO] Ports: %SELECTED_PORTS%
+echo [INFO] Platform: %TARGET_PLATFORM_NAME%
+echo.
+
+REM Check Docker
+docker --version >nul 2>&1
+if errorlevel 1 (
+ echo [ERROR] Docker not installed
+ pause
+ exit /b 1
+)
+
+docker info >nul 2>&1
+if errorlevel 1 (
+ echo [ERROR] Docker not running
+ pause
+ exit /b 1
+)
+
+echo [SUCCESS] Docker environment OK
+echo.
+
+echo === Starting Build Process ===
+
+REM Create directories
+if exist "%EXPORT_DIR%" rmdir /s /q "%EXPORT_DIR%"
+mkdir "%EXPORT_DIR%"
+
+set "TEMP_BUILD_DIR=%TEMP%\dms-build-%RANDOM%"
+mkdir "%TEMP_BUILD_DIR%"
+
+REM Copy files
+echo [Step 1/6] Copying project files...
+if exist "ddms_compliance_suite" robocopy "ddms_compliance_suite" "%TEMP_BUILD_DIR%\ddms_compliance_suite" /E /XD __pycache__ /XF *.pyc >nul 2>&1
+if exist "custom_stages" robocopy "custom_stages" "%TEMP_BUILD_DIR%\custom_stages" /E /XD __pycache__ /XF *.pyc >nul 2>&1
+if exist "custom_testcases" robocopy "custom_testcases" "%TEMP_BUILD_DIR%\custom_testcases" /E /XD __pycache__ /XF *.pyc >nul 2>&1
+if exist "templates" robocopy "templates" "%TEMP_BUILD_DIR%\templates" /E >nul 2>&1
+if exist "static" robocopy "static" "%TEMP_BUILD_DIR%\static" /E >nul 2>&1
+if exist "assets" robocopy "assets" "%TEMP_BUILD_DIR%\assets" /E >nul 2>&1
+
+for %%f in (fastapi_server.py history_viewer.py flask_app.py web_interface.py requirements.txt) do (
+ if exist "%%f" copy "%%f" "%TEMP_BUILD_DIR%\" >nul
+)
+
+echo [SUCCESS] Files copied
+echo.
+
+REM Create Dockerfile
+echo [Step 2/6] Creating Dockerfile...
+cd /d "%TEMP_BUILD_DIR%"
+
+(
+echo FROM python:3.11-alpine
+echo WORKDIR /app
+echo COPY requirements.txt .
+echo RUN pip install --no-cache-dir -r requirements.txt
+echo RUN pip install --no-cache-dir fastapi uvicorn[standard]
+echo RUN apk add --no-cache supervisor curl bash
+echo COPY . .
+echo RUN mkdir -p /var/log/supervisor
+echo COPY supervisord.conf /etc/supervisor/conf.d/
+echo EXPOSE 5050 5051
+echo CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
+) > Dockerfile
+
+(
+echo [supervisord]
+echo nodaemon=true
+echo logfile=/var/log/supervisor/supervisord.log
+echo user=root
+echo.
+echo [program:api_server]
+echo command=python -m uvicorn fastapi_server:app --host 0.0.0.0 --port 5050
+echo directory=/app
+echo autostart=true
+echo autorestart=true
+echo redirect_stderr=true
+echo stdout_logfile=/var/log/supervisor/api_server.log
+echo.
+echo [program:history_viewer]
+echo command=python history_viewer.py
+echo directory=/app
+echo autostart=true
+echo autorestart=true
+echo redirect_stderr=true
+echo stdout_logfile=/var/log/supervisor/history_viewer.log
+) > supervisord.conf
+
+cd /d "%~dp0"
+echo [SUCCESS] Dockerfile created
+echo.
+
+REM Copy to final directory
+echo [Step 3/6] Copying build files...
+robocopy "%TEMP_BUILD_DIR%" "%EXPORT_DIR%" /E >nul 2>&1
+echo [SUCCESS] Build files copied
+echo.
+
+REM Create Docker Compose
+echo [Step 4/6] Creating Docker Compose...
+(
+echo services:
+echo dms-compliance:
+echo image: %IMAGE_NAME%:latest
+echo container_name: dms-compliance-tool
+echo ports:
+echo - "5050:5050"
+echo - "5051:5051"
+echo volumes:
+echo - ./uploads:/app/uploads
+echo - ./logs:/app/logs
+echo restart: unless-stopped
+echo healthcheck:
+echo test: ["CMD", "curl", "-f", "http://localhost:5050/health"]
+echo interval: 30s
+echo timeout: 10s
+echo retries: 3
+echo start_period: 40s
+) > "%EXPORT_DIR%\docker-compose.yml"
+echo [SUCCESS] Docker Compose created
+echo.
+
+REM Create scripts
+echo [Step 5/6] Creating cross-platform management scripts...
+
+REM Windows batch scripts
+(
+echo @echo off
+echo echo Starting DMS Compliance Tool...
+echo echo Loading pre-built Docker image...
+echo docker load -i docker-image.tar
+echo echo Starting services...
+echo docker compose up -d
+echo echo Services started!
+echo echo FastAPI Server: http://localhost:5050
+echo echo History Viewer: http://localhost:5051
+echo pause
+) > "%EXPORT_DIR%\start.bat"
+
+(
+echo @echo off
+echo docker compose down
+echo echo Services stopped.
+echo pause
+) > "%EXPORT_DIR%\stop.bat"
+
+(
+echo @echo off
+echo docker compose logs -f
+) > "%EXPORT_DIR%\logs.bat"
+
+REM Linux shell scripts
+(
+echo #!/bin/bash
+echo.
+echo echo "Starting DMS Compliance Tool..."
+echo echo "Loading pre-built Docker image..."
+echo docker load -i docker-image.tar
+echo echo "Starting services..."
+echo docker compose up -d
+echo echo "Services started!"
+echo echo "FastAPI Server: http://localhost:5050"
+echo echo "History Viewer: http://localhost:5051"
+) > "%EXPORT_DIR%\start.sh"
+
+(
+echo #!/bin/bash
+echo.
+echo echo "Stopping DMS Compliance Tool..."
+echo docker compose down
+echo echo "Services stopped."
+) > "%EXPORT_DIR%\stop.sh"
+
+(
+echo #!/bin/bash
+echo.
+echo docker compose logs -f
+) > "%EXPORT_DIR%\logs.sh"
+
+REM Set permissions script for Linux
+(
+echo #!/bin/bash
+echo chmod +x *.sh
+echo echo "Permissions set for shell scripts"
+) > "%EXPORT_DIR%\set-permissions.sh"
+
+(
+echo # DMS Compliance Tool - Cross-Platform Version
+echo.
+echo ## Quick Start
+echo.
+echo ### Windows:
+echo 1. Run start.bat
+echo 2. Access web interface
+echo 3. Use stop.bat to stop
+echo.
+echo ### Linux/macOS:
+echo 1. chmod +x *.sh ^(or run ./set-permissions.sh^)
+echo 2. ./start.sh
+echo 3. Access web interface
+echo 4. ./stop.sh to stop
+echo.
+echo ## Architecture
+echo - FastAPI Server: http://localhost:5050
+echo - History Viewer: http://localhost:5051
+echo - Dual service architecture with supervisor
+echo.
+echo ## Management Commands
+echo Windows: start.bat, stop.bat, logs.bat
+echo Linux: start.sh, stop.sh, logs.sh
+echo.
+echo ## Package Contents
+echo - docker-compose.yml: Service configuration ^(uses pre-built image^)
+echo - Dockerfile: Container build instructions ^(for reference^)
+echo - docker-image.tar: Pre-built Docker image ^(fast startup^)
+echo - Cross-platform management scripts ^(auto-load image^)
+) > "%EXPORT_DIR%\README.md"
+
+echo [SUCCESS] Cross-platform scripts created
+echo.
+
+REM Build image
+echo [Step 6/6] Building Docker image...
+cd /d "%EXPORT_DIR%"
+docker build -t "%IMAGE_NAME%:latest" .
+if errorlevel 1 (
+ echo [ERROR] Build failed
+ cd /d "%~dp0"
+ pause
+ exit /b 1
+)
+
+docker save "%IMAGE_NAME%:latest" -o docker-image.tar
+cd /d "%~dp0"
+
+REM Clean up source files (like multiplatform script)
+echo [INFO] Cleaning up source files...
+cd /d "%EXPORT_DIR%"
+
+REM Remove source code directories (keep only deployment files)
+if exist "ddms_compliance_suite" rmdir /s /q "ddms_compliance_suite"
+if exist "custom_stages" rmdir /s /q "custom_stages"
+if exist "custom_testcases" rmdir /s /q "custom_testcases"
+if exist "templates" rmdir /s /q "templates"
+if exist "static" rmdir /s /q "static"
+if exist "assets" rmdir /s /q "assets"
+
+REM Remove unnecessary source Python files (keep fastapi_server.py and history_viewer.py for runtime)
+for %%f in (flask_app.py web_interface.py) do (
+ if exist "%%f" del "%%f"
+)
+
+echo [SUCCESS] Source files cleaned up
+cd /d "%~dp0"
+
+REM Create archive (like multiplatform script)
+echo [INFO] Creating archive...
+powershell -command "Compress-Archive -Path '%EXPORT_DIR%' -DestinationPath '%EXPORT_DIR%.zip' -Force"
+
+REM Clean up temporary directories (like multiplatform script)
+if exist "%TEMP_BUILD_DIR%" rmdir /s /q "%TEMP_BUILD_DIR%"
+if exist "%EXPORT_DIR%" rmdir /s /q "%EXPORT_DIR%"
+
+echo.
+echo === Build Complete ===
+echo [SUCCESS] Package created: %EXPORT_DIR%.zip
+echo [INFO] Architecture: Dual Service - FastAPI Server and History Viewer
+echo [INFO] Ports: 5050,5051
+echo [INFO] FastAPI Server: http://localhost:5050
+echo [INFO] History Viewer: http://localhost:5051
+echo.
+echo Package contents:
+echo - Cross-platform scripts ^(Windows .bat and Linux .sh^)
+echo - Docker Compose configuration
+echo - Pre-built Docker image
+echo - Clean deployment package ^(no source code^)
+echo.
+echo Build complete!
+pause
diff --git a/create-compose-package-multiplatform.sh b/create-compose-package-multiplatform.sh
index 041609f..583913d 100755
--- a/create-compose-package-multiplatform.sh
+++ b/create-compose-package-multiplatform.sh
@@ -282,56 +282,40 @@ cd "$TEMP_BUILD_DIR"
if [[ "$SELECTED_SERVICE_ARCH" == "dual" ]]; then
# 双服务架构 - 使用supervisor管理两个服务
cat > "Dockerfile" << 'EOF'
-# 多阶段构建 - 构建阶段
-FROM python:3.11-alpine AS builder
+# 使用稳定的Python基础镜像
+FROM python:3.11-alpine
-# 安装构建依赖
-RUN apk add --no-cache \
+# 安装系统依赖
+RUN apk update && apk add --no-cache \
gcc \
musl-dev \
libffi-dev \
openssl-dev \
- cargo \
- rust
-
-# 设置工作目录
-WORKDIR /app
-
-# 复制依赖文件
-COPY requirements.txt .
-
-# 创建虚拟环境并安装依赖
-RUN python -m venv /opt/venv
-ENV PATH="/opt/venv/bin:$PATH"
-RUN pip install --no-cache-dir --upgrade pip setuptools wheel
-RUN pip install --no-cache-dir -r requirements.txt
-
-# 运行阶段
-FROM python:3.11-alpine AS runtime
-
-# 安装运行时依赖
-RUN apk add --no-cache \
+ python3-dev \
+ build-base \
+ linux-headers \
supervisor \
curl \
bash \
- tzdata
-
-# 从构建阶段复制虚拟环境
-COPY --from=builder /opt/venv /opt/venv
-ENV PATH="/opt/venv/bin:$PATH"
+ tzdata && \
+ rm -rf /var/cache/apk/*
# 设置工作目录
WORKDIR /app
+# 复制依赖文件并安装Python包
+COPY requirements.txt .
+RUN pip install --no-cache-dir --upgrade pip setuptools wheel && \
+ pip install --no-cache-dir -r requirements.txt
+
# 复制应用代码
COPY . .
-# 创建supervisor配置
-RUN mkdir -p /etc/supervisor/conf.d
-COPY supervisord.conf /etc/supervisor/conf.d/
+# 创建supervisor配置目录
+RUN mkdir -p /etc/supervisor/conf.d /var/log/supervisor /app/logs /app/test_reports /app/uploads
-# 创建必要目录
-RUN mkdir -p /var/log/supervisor /app/logs /app/test_reports /app/uploads
+# 复制supervisor配置
+COPY supervisord.conf /etc/supervisor/conf.d/
# 创建非root用户
RUN addgroup -g 1000 appuser && \
@@ -346,7 +330,7 @@ ENV FLASK_ENV=production
ENV PYTHONUNBUFFERED=1
# 健康检查
-HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
+HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD curl -f http://localhost:5050/ || exit 1
# 暴露端口
@@ -409,50 +393,38 @@ EOF
elif [[ "$SELECTED_SERVICE_ARCH" == "fastapi" ]]; then
cat > "Dockerfile" << 'EOF'
-# 多阶段构建 - 构建阶段
-FROM python:3.11-alpine AS builder
+# 使用稳定的Python基础镜像
+FROM python:3.11-alpine
-# 安装构建依赖
-RUN apk add --no-cache \
+# 安装系统依赖
+RUN apk update && apk add --no-cache \
gcc \
musl-dev \
libffi-dev \
openssl-dev \
- cargo \
- rust
-
-# 设置工作目录
-WORKDIR /app
-
-# 复制依赖文件
-COPY requirements.txt .
-
-# 创建虚拟环境并安装依赖
-RUN python -m venv /opt/venv
-ENV PATH="/opt/venv/bin:$PATH"
-RUN pip install --no-cache-dir --upgrade pip setuptools wheel
-RUN pip install --no-cache-dir -r requirements.txt
-RUN pip install --no-cache-dir fastapi uvicorn[standard]
-
-# 运行阶段
-FROM python:3.11-alpine AS runtime
-
-# 安装运行时依赖
-RUN apk add --no-cache \
+ python3-dev \
+ build-base \
+ linux-headers \
curl \
bash \
- tzdata
-
-# 从构建阶段复制虚拟环境
-COPY --from=builder /opt/venv /opt/venv
-ENV PATH="/opt/venv/bin:$PATH"
+ tzdata && \
+ rm -rf /var/cache/apk/*
# 设置工作目录
WORKDIR /app
+# 复制依赖文件并安装Python包
+COPY requirements.txt .
+RUN pip install --no-cache-dir --upgrade pip setuptools wheel && \
+ pip install --no-cache-dir -r requirements.txt && \
+ pip install --no-cache-dir fastapi uvicorn[standard]
+
# 复制应用代码
COPY . .
+# 创建必要目录
+RUN mkdir -p /app/logs /app/uploads /app/reports
+
# 创建非root用户
RUN addgroup -g 1000 appuser && \
adduser -D -u 1000 -G appuser appuser && \
@@ -460,8 +432,12 @@ RUN addgroup -g 1000 appuser && \
USER appuser
+# 设置环境变量
+ENV PYTHONPATH=/app
+ENV PYTHONUNBUFFERED=1
+
# 健康检查
-HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
+HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD curl -f http://localhost:5051/health || exit 1
# 暴露端口
@@ -827,50 +803,38 @@ EOF
else
# Flask版本的Dockerfile
cat > "Dockerfile" << 'EOF'
-# 多阶段构建 - 构建阶段
-FROM python:3.11-alpine AS builder
+# 使用稳定的Python基础镜像
+FROM python:3.11-alpine
-# 安装构建依赖
-RUN apk add --no-cache \
+# 安装系统依赖
+RUN apk update && apk add --no-cache \
gcc \
musl-dev \
libffi-dev \
openssl-dev \
- cargo \
- rust
-
-# 设置工作目录
-WORKDIR /app
-
-# 复制依赖文件
-COPY requirements.txt .
-
-# 创建虚拟环境并安装依赖
-RUN python -m venv /opt/venv
-ENV PATH="/opt/venv/bin:$PATH"
-RUN pip install --no-cache-dir --upgrade pip setuptools wheel
-RUN pip install --no-cache-dir -r requirements.txt
-RUN pip install --no-cache-dir flask gunicorn
-
-# 运行阶段
-FROM python:3.11-alpine AS runtime
-
-# 安装运行时依赖
-RUN apk add --no-cache \
+ python3-dev \
+ build-base \
+ linux-headers \
curl \
bash \
- tzdata
-
-# 从构建阶段复制虚拟环境
-COPY --from=builder /opt/venv /opt/venv
-ENV PATH="/opt/venv/bin:$PATH"
+ tzdata && \
+ rm -rf /var/cache/apk/*
# 设置工作目录
WORKDIR /app
+# 复制依赖文件并安装Python包
+COPY requirements.txt .
+RUN pip install --no-cache-dir --upgrade pip setuptools wheel && \
+ pip install --no-cache-dir -r requirements.txt && \
+ pip install --no-cache-dir flask gunicorn
+
# 复制应用代码
COPY . .
+# 创建必要目录
+RUN mkdir -p /app/logs /app/uploads /app/reports
+
# 创建非root用户
RUN addgroup -g 1000 appuser && \
adduser -D -u 1000 -G appuser appuser && \
@@ -878,9 +842,14 @@ RUN addgroup -g 1000 appuser && \
USER appuser
+# 设置环境变量
+ENV PYTHONPATH=/app
+ENV FLASK_ENV=production
+ENV PYTHONUNBUFFERED=1
+
# 健康检查
-HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
- CMD curl -f http://localhost:5050/health || exit 1
+HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
+ CMD curl -f http://localhost:5050/ || exit 1
# 暴露端口
EXPOSE 5050
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/Dockerfile b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/Dockerfile
new file mode 100644
index 0000000..ab313ab
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/Dockerfile
@@ -0,0 +1,11 @@
+FROM python:3.11-alpine
+WORKDIR /app
+COPY requirements.txt .
+RUN pip install --no-cache-dir -r requirements.txt
+RUN pip install --no-cache-dir fastapi uvicorn[standard]
+RUN apk add --no-cache supervisor curl bash
+COPY . .
+RUN mkdir -p /var/log/supervisor
+COPY supervisord.conf /etc/supervisor/conf.d/
+EXPOSE 5050 5051
+CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/README.md b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/README.md
new file mode 100644
index 0000000..3955483
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/README.md
@@ -0,0 +1,29 @@
+# DMS Compliance Tool - Cross-Platform Version
+
+## Quick Start
+
+### Windows:
+1. Run start.bat
+2. Access web interface
+3. Use stop.bat to stop
+
+### Linux/macOS:
+1. chmod +x *.sh (or run ./set-permissions.sh)
+2. ./start.sh
+3. Access web interface
+4. ./stop.sh to stop
+
+## Architecture
+- FastAPI Server: http://localhost:5050
+- History Viewer: http://localhost:5051
+- Dual service architecture with supervisor
+
+## Management Commands
+Windows: start.bat, stop.bat, logs.bat
+Linux: start.sh, stop.sh, logs.sh
+
+## Package Contents
+- docker-compose.yml: Service configuration (uses pre-built image)
+- Dockerfile: Container build instructions (for reference)
+- docker-image.tar: Pre-built Docker image (fast startup)
+- Cross-platform management scripts (auto-load image)
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/docker-compose.yml b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/docker-compose.yml
new file mode 100644
index 0000000..1a78eba
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/docker-compose.yml
@@ -0,0 +1,17 @@
+services:
+ dms-compliance:
+ image: compliance-dms-windows:latest
+ container_name: dms-compliance-tool
+ ports:
+ - "5050:5050"
+ - "5051:5051"
+ volumes:
+ - ./uploads:/app/uploads
+ - ./logs:/app/logs
+ restart: unless-stopped
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:5050/health"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ start_period: 40s
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/fastapi_server.py b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/fastapi_server.py
new file mode 100644
index 0000000..de3ea32
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/fastapi_server.py
@@ -0,0 +1,1109 @@
+#!/usr/bin/env python3
+"""
+DMS合规性测试工具 - FastAPI版本API服务器
+提供自动生成的交互式API文档
+"""
+
+import os
+import sys
+import json
+import logging
+import datetime
+import traceback
+from pathlib import Path
+from typing import List, Optional, Dict, Any, Union
+import unicodedata
+import html
+
+# FastAPI imports
+from fastapi import FastAPI, HTTPException, BackgroundTasks, status
+from fastapi.responses import JSONResponse, FileResponse
+from fastapi.middleware.cors import CORSMiddleware
+from pydantic import BaseModel, Field, field_validator, model_validator
+import uvicorn
+
+# PDF generation libraries - with fallback
+try:
+ from reportlab.lib import colors
+ from reportlab.lib.pagesizes import A4
+ from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
+ from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable
+ from reportlab.pdfbase import pdfmetrics
+ from reportlab.pdfbase.ttfonts import TTFont
+ reportlab_available = True
+except ImportError:
+ reportlab_available = False
+
+# Project-specific imports
+from ddms_compliance_suite.api_caller.caller import APICallDetail
+from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary
+from ddms_compliance_suite.input_parser.parser import ParsedAPISpec
+from ddms_compliance_suite.utils.response_utils import extract_data_for_validation
+from ddms_compliance_suite.utils.data_generator import DataGenerator
+
+# Configure logging
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
+
+# FastAPI app instance
+app = FastAPI(
+ title="DMS合规性测试工具 API",
+ description="""
+ DMS合规性测试工具 FastAPI版本
+
+ 这是一个用于API合规性测试的工具,支持:
+
+ YAPI规范测试 - 基于YAPI定义文件的测试
+ Swagger/OpenAPI测试 - 基于OpenAPI规范的测试
+ DMS服务发现测试 - 动态发现DMS服务的API进行测试
+ 分页支持 - 支持大量API的分页获取,避免内存溢出
+ PDF报告生成 - 生成详细的测试报告
+ LLM集成 - 支持大语言模型辅助生成测试数据
+
+ 主要特性
+
+ 🚀 高性能: 基于FastAPI,支持异步处理
+ 📊 分页支持: 解决大量API节点的内存问题
+ 📝 自动文档: 自动生成交互式API文档
+ 🔧 灵活配置: 支持多种测试配置选项
+ 📈 详细报告: 生成PDF和JSON格式的测试报告
+ """,
+ version="1.0.0",
+ docs_url="/docs", # Swagger UI
+ redoc_url="/redoc", # ReDoc
+)
+
+# Add CORS middleware
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"], # 在生产环境中应该限制具体域名
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+# Pydantic models for request/response
+class TestConfig(BaseModel):
+ """测试配置模型"""
+
+ # API定义源 (三选一)
+ yapi: Optional[str] = Field(None, description="YAPI定义文件路径", exclude=True)
+ swagger: Optional[str] = Field(None, description="Swagger/OpenAPI定义文件路径", exclude=True)
+ dms: Optional[str] = Field("./assets/doc/dms/domain.json", description="DMS服务发现的domain mapping文件路径", example="./assets/doc/dms/domain.json")
+ # 基本配置
+ base_url: str = Field("https://www.dev.ideas.cnpc/", description="API基础URL", example="https://www.dev.ideas.cnpc/")
+
+ # 分页配置
+ page_size: int = Field(10, description="DMS API分页大小,默认10。较小的值可以减少内存使用", ge=1, le=10000)
+ page_no: int = Field(1, description="起始页码,从1开始。可用于断点续传或跳过前面的页面", ge=1)
+ fetch_all_pages: bool = Field(False, description="是否获取所有页面。True=获取所有数据,False=只获取指定页面")
+
+ # 过滤选项
+ strictness_level: str = Field("CRITICAL", description="测试严格等级", pattern="^(CRITICAL|HIGH|MEDIUM|LOW)$")
+
+ @field_validator('base_url')
+ @classmethod
+ def validate_base_url(cls, v):
+ if not v.startswith(('http://', 'https://')):
+ raise ValueError('base_url must start with http:// or https://')
+ return v
+
+ @model_validator(mode='before')
+ @classmethod
+ def validate_api_source(cls, values):
+ """验证API定义源,确保三选一"""
+ if isinstance(values, dict):
+ api_sources = [values.get('yapi'), values.get('swagger'), values.get('dms')]
+ non_none_sources = [s for s in api_sources if s is not None]
+ if len(non_none_sources) > 1:
+ raise ValueError('只能选择一个API定义源:yapi、swagger或dms')
+ if len(non_none_sources) == 0:
+ raise ValueError('必须提供一个API定义源:yapi、swagger或dms')
+ return values
+
+class PaginationInfo(BaseModel):
+ """分页信息模型"""
+ page_size: int = Field(description="页面大小")
+ page_no_start: int = Field(description="起始页码")
+ total_pages: int = Field(description="总页数")
+ total_records: int = Field(description="总记录数")
+ pages_fetched: int = Field(description="已获取页数")
+ current_page: int = Field(description="当前页码")
+
+class TestResponse(BaseModel):
+ """测试响应模型"""
+ status: str = Field(description="测试状态", example="completed")
+ message: str = Field(description="状态消息")
+ report_directory: str = Field(description="报告目录路径")
+ summary: Dict[str, Any] = Field(description="测试摘要信息")
+ pagination: Optional[PaginationInfo] = Field(None, description="分页信息(仅DMS测试时返回)")
+
+class ErrorResponse(BaseModel):
+ """错误响应模型"""
+ status: str = Field("error", description="错误状态")
+ message: str = Field(description="错误消息")
+ traceback: Optional[str] = Field(None, description="错误堆栈跟踪")
+
+# Global variable to store running tasks
+running_tasks: Dict[str, Dict[str, Any]] = {}
+
+@app.get("/",
+ summary="健康检查",
+ description="检查API服务器是否正常运行",
+ response_model=Dict[str, str])
+async def health_check():
+ """健康检查端点,用于Docker健康检查"""
+ return {
+ "status": "healthy",
+ "service": "DMS Compliance API Server (FastAPI)",
+ "version": "2.0.0",
+ "docs_url": "/docs",
+ "redoc_url": "/redoc"
+ }
+
+@app.get("/info",
+ summary="服务信息",
+ description="获取API服务器的详细信息",
+ response_model=Dict[str, Any])
+async def get_info():
+ """获取服务器信息"""
+ return {
+ "service": "DMS Compliance API Server",
+ "version": "2.0.0",
+ "framework": "FastAPI",
+ "features": [
+ "YAPI规范测试",
+ "Swagger/OpenAPI测试",
+ "DMS服务发现测试",
+ "分页支持",
+ "PDF报告生成",
+ "LLM集成",
+ "自动API文档"
+ ],
+ "endpoints": {
+ "health": "/",
+ "info": "/info",
+ "run_tests": "/run",
+ "docs": "/docs",
+ "redoc": "/redoc"
+ },
+ "reportlab_available": reportlab_available
+ }
+
+# Import the test logic from the original Flask version
+def run_tests_logic(config: dict):
+ """
+ Main logic for running tests, adapted from the original Flask version.
+ """
+ try:
+ if config.get('verbose'):
+ logging.getLogger('ddms_compliance_suite').setLevel(logging.DEBUG)
+ logger.setLevel(logging.DEBUG)
+ logger.debug("Verbose logging enabled.")
+
+ if not any(k in config for k in ['yapi', 'swagger', 'dms']):
+ raise ValueError("An API definition source is required: --yapi, --swagger, or --dms")
+
+ if sum(k in config for k in ['yapi', 'swagger', 'dms']) > 1:
+ raise ValueError("API definition sources are mutually exclusive.")
+
+ # Setup output directory with timestamp
+ base_output_dir = Path(config.get('output', './test_reports'))
+ timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
+ output_directory = base_output_dir / timestamp
+ output_directory.mkdir(parents=True, exist_ok=True)
+ logger.info(f"Test reports will be saved to: {output_directory.resolve()}")
+ print(f"config{config}")
+ # Initialize the orchestrator
+ orchestrator = APITestOrchestrator(
+ base_url=config['base_url'],
+ custom_test_cases_dir=config.get('custom_test_cases_dir'),
+ llm_api_key=config.get('llm_api_key'),
+ llm_base_url=config.get('llm_base_url'),
+ llm_model_name=config.get('llm_model_name'),
+ use_llm_for_request_body=config.get('use_llm_for_request_body', False),
+ use_llm_for_path_params=config.get('use_llm_for_path_params', False),
+ use_llm_for_query_params=config.get('use_llm_for_query_params', False),
+ use_llm_for_headers=config.get('use_llm_for_headers', False),
+ output_dir=str(output_directory),
+ stages_dir=config.get('stages_dir'),
+ strictness_level=config.get('strictness_level', 'CRITICAL'),
+ ignore_ssl=config.get('ignore_ssl', False)
+ )
+
+ test_summary: Optional[TestSummary] = None
+ parsed_spec: Optional[ParsedAPISpec] = None
+ pagination_info: Dict[str, Any] = {}
+
+ if 'yapi' in config:
+ logger.info(f"Running tests from YAPI file: {config['yapi']}")
+ test_summary, parsed_spec = orchestrator.run_tests_from_yapi(
+ yapi_file_path=config['yapi'],
+ categories=config.get('categories'),
+ custom_test_cases_dir=config.get('custom_test_cases_dir')
+ )
+ elif 'swagger' in config:
+ logger.info(f"Running tests from Swagger file: {config['swagger']}")
+ test_summary, parsed_spec = orchestrator.run_tests_from_swagger(
+ swagger_file_path=config['swagger'],
+ tags=config.get('tags'),
+ custom_test_cases_dir=config.get('custom_test_cases_dir')
+ )
+ elif 'dms' in config:
+ logger.info(f"Running tests from DMS service discovery: {config['dms']}")
+ test_summary, parsed_spec, pagination_info = orchestrator.run_tests_from_dms(
+ domain_mapping_path=config['dms'],
+ categories=config.get('categories'),
+ custom_test_cases_dir=config.get('custom_test_cases_dir'),
+ page_size=config.get('page_size', 1000),
+ page_no_start=config.get('page_no', 1),
+ fetch_all_pages=config.get('fetch_all_pages', True)
+ )
+
+ if not parsed_spec:
+ raise RuntimeError("Failed to parse the API specification.")
+
+ if test_summary and config.get('stages_dir') and parsed_spec:
+ logger.info(f"Executing API test stages from directory: {config['stages_dir']}")
+ orchestrator.run_stages_from_spec(parsed_spec, test_summary)
+
+ if test_summary:
+ # Save main summary
+ main_report_file_path = output_directory / "summary.json"
+ with open(main_report_file_path, 'w', encoding='utf-8') as f:
+ f.write(test_summary.to_json(pretty=True))
+
+ # Save API call details
+ api_calls_filename = "api_call_details.md"
+ save_api_call_details_to_file(
+ orchestrator.get_api_call_details(),
+ str(output_directory),
+ filename=api_calls_filename
+ )
+
+ # Generate PDF report if reportlab is available
+ if reportlab_available and config.get('generate_pdf', True):
+ pdf_report_path = output_directory / "report_cn.pdf"
+ save_pdf_report(test_summary.to_dict(), pdf_report_path, config.get('strictness_level', 'CRITICAL'))
+
+ failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
+ error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
+
+ result = {
+ "status": "completed",
+ "message": "Tests finished." if failed_count == 0 and error_count == 0 else "Tests finished with failures or errors.",
+ "report_directory": str(output_directory.resolve()),
+ "summary": test_summary.to_dict()
+ }
+
+ # 如果有分页信息,添加到返回结果中
+ if pagination_info:
+ result["pagination"] = pagination_info
+
+ return result
+ else:
+ raise RuntimeError("Test execution failed to produce a summary.")
+
+ except Exception as e:
+ logger.error(f"An unexpected error occurred during test execution: {e}", exc_info=True)
+ return {
+ "status": "error",
+ "message": str(e),
+ "traceback": traceback.format_exc()
+ }
+
+def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
+ """
+ 将API调用详情列表保存到指定目录下的 Markdown 文件中。
+ 同时,额外生成一个纯文本文件 (.txt),每行包含一个 cURL 命令。
+ """
+ if not api_call_details:
+ logger.info("没有API调用详情可供保存。")
+ return
+
+ output_dir = Path(output_dir_path)
+ try:
+ output_dir.mkdir(parents=True, exist_ok=True)
+ except OSError as e:
+ logger.error(f"创建API调用详情输出目录 {output_dir} 失败: {e}")
+ return
+
+ # 主文件是 Markdown 文件
+ md_output_file = output_dir / filename
+ # 确保它是 .md,尽管 main 函数应该已经处理了
+ if md_output_file.suffix.lower() not in ['.md', '.markdown']:
+ md_output_file = md_output_file.with_suffix('.md')
+
+ markdown_content = []
+
+ for detail in api_call_details:
+
+ # Request URL with params (if any)
+ url_to_display = detail.request_url
+ if detail.request_params:
+ try:
+ # Ensure urllib is available for this formatting step
+ import urllib.parse
+ query_string = urllib.parse.urlencode(detail.request_params)
+ url_to_display = f"{detail.request_url}?{query_string}"
+ except Exception as e:
+ logger.warning(f"Error formatting URL with params for display: {e}")
+ # Fallback to just the base URL if params formatting fails
+
+ markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
+ markdown_content.append("**cURL Command:**")
+ markdown_content.append("```sh")
+ markdown_content.append(detail.curl_command)
+ markdown_content.append("```")
+ markdown_content.append("### Request Details")
+ markdown_content.append(f"- **Method:** `{detail.request_method}`")
+ markdown_content.append(f"- **Full URL:** `{url_to_display}`")
+
+ markdown_content.append("- **Headers:**")
+ markdown_content.append("```json")
+ markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
+ markdown_content.append("```")
+
+ if detail.request_params:
+ markdown_content.append("- **Query Parameters:**")
+ markdown_content.append("```json")
+ markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
+ markdown_content.append("```")
+
+ if detail.request_body is not None:
+ markdown_content.append("- **Body:**")
+ body_lang = "text"
+ formatted_body = str(detail.request_body)
+ try:
+ # Try to parse as JSON for pretty printing
+ if isinstance(detail.request_body, str):
+ try:
+ parsed_json = json.loads(detail.request_body)
+ formatted_body = json.dumps(parsed_json, indent=2, ensure_ascii=False)
+ body_lang = "json"
+ except json.JSONDecodeError:
+ pass # Keep as text
+ elif isinstance(detail.request_body, (dict, list)):
+ formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False)
+ body_lang = "json"
+ except Exception as e:
+ logger.warning(f"Error formatting request body for Markdown: {e}")
+
+ markdown_content.append(f"```{body_lang}")
+ markdown_content.append(formatted_body)
+ markdown_content.append("```")
+
+ markdown_content.append("### Response Details")
+ markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
+ markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
+
+ markdown_content.append("- **Headers:**")
+ markdown_content.append("```json")
+ markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
+ markdown_content.append("```")
+
+ if detail.response_body is not None:
+ markdown_content.append("- **Body:**")
+ resp_body_lang = "text"
+ formatted_resp_body = str(detail.response_body)
+ try:
+ # Try to parse as JSON for pretty printing
+ if isinstance(detail.response_body, str):
+ try:
+ # If it's already a string that might be JSON, try parsing and re-dumping
+ parsed_json_resp = json.loads(detail.response_body)
+ formatted_resp_body = json.dumps(parsed_json_resp, indent=2, ensure_ascii=False)
+ resp_body_lang = "json"
+ except json.JSONDecodeError:
+ # It's a string, but not valid JSON, keep as text
+ pass
+ elif isinstance(detail.response_body, (dict, list)):
+ # It's already a dict/list, dump it as JSON
+ formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False)
+ resp_body_lang = "json"
+ # If it's neither string nor dict/list (e.g. int, bool from parsed json), str() is fine.
+ except Exception as e:
+ logger.warning(f"Error formatting response body for Markdown: {e}")
+
+ markdown_content.append(f"```{resp_body_lang}")
+ markdown_content.append(formatted_resp_body)
+ markdown_content.append("```")
+ markdown_content.append("") # Add a blank line for spacing before next --- or EOF
+ markdown_content.append("---") # Separator
+
+ try:
+ with open(md_output_file, 'w', encoding='utf-8') as f_md:
+ f_md.write("\n".join(markdown_content))
+ logger.info(f"API调用详情已保存为 Markdown: {md_output_file}")
+ except Exception as e:
+ logger.error(f"保存API调用详情到 Markdown 文件 {md_output_file} 失败: {e}", exc_info=True)
+
+def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'):
+ """将测试摘要保存为格式化的PDF文件"""
+ logger.info(f"开始生成PDF报告: {output_path}")
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+
+ try:
+ # --- 统一的字体管理和注册 ---
+ font_name = 'SimSun' # 使用一个简单清晰的注册名
+ font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
+
+ if not Path(font_path).exists():
+ logger.error(f"字体文件未找到: {Path(font_path).resolve()}")
+ return
+
+ # 关键修复: 对于 .ttc (TrueType Collection) 文件, 必须指定 subfontIndex
+ pdfmetrics.registerFont(TTFont(font_name, font_path, subfontIndex=0))
+ # 将注册的字体关联到 'SimSun' 字体族
+ pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
+
+ doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
+ elements = []
+
+ # --- 统一样式定义, 全部使用注册的字体名 ---
+ styles = getSampleStyleSheet()
+ title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
+ heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
+ normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
+ small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12)
+
+ def to_para(text, style=normal_style, escape=True):
+ """
+ 根据用户建议移除 textwrap 以进行诊断。
+ 此版本只包含净化和基本的换行符替换。
+ """
+ if text is None:
+ content = ""
+ else:
+ content = str(text)
+
+ if escape:
+ content = html.escape(content)
+
+ # 依然保留Unicode控制字符的净化
+ content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
+
+ if not content.strip():
+ # 对于完全空白或None的输入,返回一个安全的非换行空格
+ return Paragraph(' ', style)
+
+ # 只使用基本的换行符替换
+ content = content.replace('\n', '
')
+
+ return Paragraph(content, style)
+
+ # 3. 填充PDF内容 - 优化后的报告格式
+
+ # 生成报告编码(基于时间戳)
+ import time
+ report_code = f"DMS-TEST-{int(time.time())}"
+
+ # 报告标题
+ elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False))
+ elements.append(Spacer(1, 15))
+
+ # 报告基本信息表格
+ basic_info_data = [
+ [to_para("报告编码", escape=False), to_para(report_code)],
+ [to_para("报告名称", escape=False), to_para("DMS领域数据服务测试分析报告")],
+ [to_para("申请日期", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日'))],
+ [to_para("申请人", escape=False), to_para("系统管理员")],
+ [to_para("服务供应商名称", escape=False), to_para("数据管理系统(DMS)")],
+ ]
+ basic_info_table = Table(basic_info_data, colWidths=[120, '*'])
+ basic_info_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
+ ]))
+ elements.append(basic_info_table)
+ elements.append(Spacer(1, 20))
+
+ # 摘要部分
+ elements.append(to_para("摘要", heading_style, escape=False))
+ overall = summary_data.get('overall_summary', {})
+
+ # 从JSON提取并格式化时间
+ try:
+ start_time_str = summary_data.get('start_time', 'N/A')
+ end_time_str = summary_data.get('end_time', 'N/A')
+ duration = summary_data.get('duration_seconds', summary_data.get('duration', 0.0))
+
+ start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
+ end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
+ except:
+ start_time_formatted = start_time_str
+ end_time_formatted = end_time_str
+
+ # 摘要内容 - 安全计算跳过的数量
+ def safe_subtract(total, passed, failed):
+ """安全地计算跳过的数量"""
+ try:
+ if isinstance(total, (int, float)) and isinstance(passed, (int, float)) and isinstance(failed, (int, float)):
+ return max(0, total - passed - failed)
+ else:
+ return 0
+ except:
+ return 0
+
+ endpoints_tested = overall.get('endpoints_tested', 0)
+ endpoints_passed = overall.get('endpoints_passed', 0)
+ endpoints_failed = overall.get('endpoints_failed', 0)
+ endpoints_skipped = safe_subtract(endpoints_tested, endpoints_passed, endpoints_failed)
+
+ test_cases_executed = overall.get('total_test_cases_executed', 0)
+ test_cases_passed = overall.get('test_cases_passed', 0)
+ test_cases_failed = overall.get('test_cases_failed', 0)
+ test_cases_skipped = safe_subtract(test_cases_executed, test_cases_passed, test_cases_failed)
+
+ stages_executed = overall.get('total_stages_executed', 0)
+ stages_passed = overall.get('stages_passed', 0)
+ stages_failed = overall.get('stages_failed', 0)
+ stages_skipped = safe_subtract(stages_executed, stages_passed, stages_failed)
+
+ summary_text = f"""本次测试针对DMS(数据管理系统)领域数据服务进行全面的合规性验证。
+测试时间:{start_time_formatted} 至 {end_time_formatted},总耗时 {float(duration):.2f} 秒。
+共测试 {endpoints_tested} 个API端点,其中 {endpoints_passed} 个通过,{endpoints_failed} 个失败,{endpoints_skipped} 个跳过,端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}。
+执行 {test_cases_executed} 个测试用例,其中 {test_cases_passed} 个通过,{test_cases_failed} 个失败,{test_cases_skipped} 个跳过,测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}。
+执行 {stages_executed} 个流程测试,其中 {stages_passed} 个通过,{stages_failed} 个失败,{stages_skipped} 个跳过,流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}。"""
+
+ elements.append(to_para(summary_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 测试内容包括 - API列表表格
+ elements.append(to_para("测试内容包括", heading_style, escape=False))
+
+ # 从测试结果中提取API信息
+ endpoint_results = summary_data.get('endpoint_results', [])
+ api_list_data = [
+ [to_para("序号", escape=False), to_para("服务名称", escape=False),
+ to_para("服务功能描述", escape=False), to_para("服务参数描述", escape=False),
+ to_para("服务返回值描述", escape=False)]
+ ]
+
+ for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API
+ endpoint_name = endpoint.get('endpoint_name', 'N/A')
+
+ # 简化的功能描述
+ if 'Create' in endpoint_name:
+ func_desc = "提供数据创建服务"
+ elif 'List' in endpoint_name or 'Query' in endpoint_name:
+ func_desc = "提供数据查询和列表服务"
+ elif 'Read' in endpoint_name:
+ func_desc = "提供单条数据读取服务"
+ elif 'Update' in endpoint_name:
+ func_desc = "提供数据更新服务"
+ elif 'Delete' in endpoint_name:
+ func_desc = "提供数据删除服务"
+ else:
+ func_desc = "提供数据管理服务"
+
+ api_list_data.append([
+ to_para(str(i), small_style),
+ to_para(endpoint_name, small_style),
+ to_para(func_desc, small_style),
+ to_para("标准DMS参数格式", small_style),
+ to_para("标准DMS响应格式", small_style)
+ ])
+
+ api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80])
+ api_list_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
+ ('ALIGN', (0,0), (-1,-1), 'CENTER'),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('FONTSIZE', (0,0), (-1,-1), 8)
+ ]))
+ elements.append(api_list_table)
+ elements.append(Spacer(1, 20))
+
+ # 测试用例列表 - 根据严格等级分为必须和非必须
+ elements.append(to_para("测试用例列表", heading_style, escape=False))
+
+ # 定义严重性等级的数值映射
+ severity_levels = {
+ 'CRITICAL': 5,
+ 'HIGH': 4,
+ 'MEDIUM': 3,
+ 'LOW': 2,
+ 'INFO': 1
+ }
+
+ strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL
+
+ # 收集所有测试用例(包括endpoint用例和stage用例)
+ all_test_cases = []
+ failed_test_cases = [] # 专门收集失败的测试用例
+
+ # 1. 收集endpoint测试用例
+ for endpoint_result in endpoint_results:
+ test_cases = endpoint_result.get('executed_test_cases', [])
+ for tc in test_cases:
+ tc_severity = tc.get('test_case_severity', 'MEDIUM')
+ tc_severity_value = severity_levels.get(tc_severity, 3)
+ tc_status = tc.get('status', 'N/A')
+ tc_message = tc.get('message', '')
+
+ test_case_info = {
+ 'type': 'Endpoint',
+ 'endpoint': endpoint_result.get('endpoint_name', 'N/A'),
+ 'endpoint_id': endpoint_result.get('endpoint_id', 'N/A'),
+ 'case_name': tc.get('test_case_name', 'N/A'),
+ 'case_id': tc.get('test_case_id', 'N/A'),
+ 'status': tc_status,
+ 'message': tc_message,
+ 'severity': tc_severity,
+ 'severity_value': tc_severity_value,
+ 'is_required': tc_severity_value >= strictness_value,
+ 'duration': tc.get('duration_seconds', 0),
+ 'timestamp': tc.get('timestamp', '')
+ }
+
+ all_test_cases.append(test_case_info)
+
+ # 收集失败的测试用例
+ if tc_status in ['失败', 'FAILED', '错误', 'ERROR']:
+ failed_test_cases.append(test_case_info)
+
+ # 2. 收集stage测试用例
+ stage_results = summary_data.get('stage_results', [])
+ for stage_result in stage_results:
+ stage_name = stage_result.get('stage_name', 'N/A')
+ stage_status = stage_result.get('overall_status', 'N/A')
+ stage_message = stage_result.get('message', stage_result.get('error_message', ''))
+ stage_severity = 'HIGH' # Stage用例通常是高优先级
+ stage_severity_value = severity_levels.get(stage_severity, 4)
+
+ # 将stage作为一个测试用例添加
+ stage_case_info = {
+ 'type': 'Stage',
+ 'endpoint': f"Stage: {stage_name}",
+ 'endpoint_id': f"STAGE_{stage_name}",
+ 'case_name': stage_result.get('description', stage_name),
+ 'case_id': f"STAGE_{stage_name}",
+ 'status': stage_status,
+ 'message': stage_message,
+ 'severity': stage_severity,
+ 'severity_value': stage_severity_value,
+ 'is_required': stage_severity_value >= strictness_value,
+ 'duration': stage_result.get('duration_seconds', 0),
+ 'timestamp': stage_result.get('start_time', '')
+ }
+
+ all_test_cases.append(stage_case_info)
+
+ # 收集失败的stage用例
+ if stage_status in ['失败', 'FAILED', '错误', 'ERROR']:
+ failed_test_cases.append(stage_case_info)
+
+ # 分离必须和非必须的测试用例
+ required_cases = [case for case in all_test_cases if case['is_required']]
+ optional_cases = [case for case in all_test_cases if not case['is_required']]
+
+ # 创建分离的测试用例表格
+ if all_test_cases:
+ # 添加严格等级说明
+ strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。"
+ elements.append(to_para(strictness_text, small_style))
+ elements.append(Spacer(1, 10))
+
+ # 1. 必须的测试用例表格
+ if required_cases:
+ elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False))
+
+ required_table_data = [
+ [to_para("序号", escape=False), to_para("类型", escape=False),
+ to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False),
+ to_para("优先级", escape=False), to_para("执行结果", escape=False)]
+ ]
+
+ for i, case in enumerate(required_cases, 1):
+ status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
+ required_table_data.append([
+ to_para(str(i), small_style),
+ to_para(case['type'], small_style),
+ to_para(case['case_name'], small_style),
+ to_para(case['endpoint'], small_style),
+ to_para(case['severity'], small_style),
+ to_para(status_display, small_style)
+ ])
+
+ required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45])
+ required_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例
+ ('ALIGN', (0,0), (-1,-1), 'CENTER'),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('FONTSIZE', (0,0), (-1,-1), 8)
+ ]))
+ elements.append(required_table)
+ elements.append(Spacer(1, 15))
+
+ # 2. 非必须的测试用例表格
+ if optional_cases:
+ elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False))
+
+ optional_table_data = [
+ [to_para("序号", escape=False), to_para("类型", escape=False),
+ to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False),
+ to_para("优先级", escape=False), to_para("执行结果", escape=False)]
+ ]
+
+ for i, case in enumerate(optional_cases, 1):
+ status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
+ optional_table_data.append([
+ to_para(str(i), small_style),
+ to_para(case['type'], small_style),
+ to_para(case['case_name'], small_style),
+ to_para(case['endpoint'], small_style),
+ to_para(case['severity'], small_style),
+ to_para(status_display, small_style)
+ ])
+
+ optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45])
+ optional_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例
+ ('ALIGN', (0,0), (-1,-1), 'CENTER'),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('FONTSIZE', (0,0), (-1,-1), 8)
+ ]))
+ elements.append(optional_table)
+ elements.append(Spacer(1, 10))
+
+ # 添加用例统计信息
+ total_cases = len(all_test_cases)
+ endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint'])
+ stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage'])
+ required_count = len(required_cases)
+ optional_count = len(optional_cases)
+
+ stats_text = f"""测试用例统计:
+总计 {total_cases} 个用例,其中端点用例 {endpoint_cases} 个,阶段用例 {stage_cases} 个。
+必须用例 {required_count} 个,非必须用例 {optional_count} 个。
+严格等级:{strictness_level}({severity_levels.get(strictness_level, 5)}级及以上为必须)。"""
+
+ elements.append(to_para(stats_text, small_style))
+ else:
+ elements.append(to_para("无测试用例执行记录。", normal_style))
+
+ elements.append(Spacer(1, 20))
+
+ # 失败用例详情部分
+ if failed_test_cases:
+ elements.append(to_para("失败用例详情分析", heading_style, escape=False))
+ elements.append(Spacer(1, 10))
+
+ # 按严重性分组失败用例
+ critical_failures = [tc for tc in failed_test_cases if tc['severity'] == 'CRITICAL']
+ high_failures = [tc for tc in failed_test_cases if tc['severity'] == 'HIGH']
+ medium_failures = [tc for tc in failed_test_cases if tc['severity'] == 'MEDIUM']
+ low_failures = [tc for tc in failed_test_cases if tc['severity'] == 'LOW']
+
+ failure_summary = f"""失败用例统计:
+总计 {len(failed_test_cases)} 个失败用例,其中:
+• 严重级别:{len(critical_failures)} 个
+• 高级别:{len(high_failures)} 个
+• 中级别:{len(medium_failures)} 个
+• 低级别:{len(low_failures)} 个
+
+以下是详细的失败原因分析:"""
+
+ elements.append(to_para(failure_summary, normal_style))
+ elements.append(Spacer(1, 15))
+
+ # 详细失败用例列表
+ for i, failed_case in enumerate(failed_test_cases, 1):
+ # 用例标题
+ case_title = f"{i}. {failed_case['case_name']}"
+ elements.append(to_para(case_title, ParagraphStyle('case_title', parent=normal_style, fontSize=11, textColor=colors.darkred, spaceAfter=5)))
+
+ # 用例基本信息
+ case_info = f"""• 用例ID:{failed_case['case_id']}
+• 所属端点:{failed_case['endpoint']}
+• 严重级别:{failed_case['severity']}
+• 执行状态:{failed_case['status']}"""
+
+ elements.append(to_para(case_info, ParagraphStyle('case_info', parent=small_style, leftIndent=15, spaceAfter=5)))
+
+ # 失败原因
+ failure_reason = failed_case.get('message', '无详细错误信息')
+ if failure_reason:
+ elements.append(to_para("失败原因:", ParagraphStyle('failure_label', parent=normal_style, fontSize=10, textColor=colors.darkblue, leftIndent=15)))
+
+ # 处理长文本,确保在PDF中正确显示
+ if len(failure_reason) > 200:
+ # 对于很长的错误信息,进行适当的分段
+ failure_reason = failure_reason[:200] + "..."
+
+ elements.append(to_para(failure_reason, ParagraphStyle('failure_reason', parent=small_style, leftIndent=30, rightIndent=20, spaceAfter=10, textColor=colors.red)))
+
+ # 添加分隔线
+ if i < len(failed_test_cases):
+ elements.append(HRFlowable(width="80%", thickness=0.5, color=colors.lightgrey))
+ elements.append(Spacer(1, 10))
+
+ elements.append(Spacer(1, 20))
+
+ elements.append(Spacer(1, 20))
+
+ # 测试情况说明
+ elements.append(to_para("测试情况说明", heading_style, escape=False))
+
+ test_situation_text = f"""本次测试是对DMS领域数据管理服务V1.0版本下的{overall.get('endpoints_tested', 'N/A')}个API进行验证测试。
+测试:累计发现缺陷{overall.get('test_cases_failed', 0)}个。
+测试执行时间:{start_time_formatted} 至 {end_time_formatted}
+测试环境:开发测试环境
+测试方法:自动化API合规性测试"""
+
+ elements.append(to_para(test_situation_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 测试结论
+ elements.append(to_para("测试结论", heading_style, escape=False))
+
+ # 根据测试结果生成结论
+ success_rate = overall.get('test_case_success_rate', '0%')
+ success_rate_num = float(success_rate.replace('%', '')) if success_rate != 'N/A' else 0
+
+ if success_rate_num >= 90:
+ conclusion_status = "通过"
+ conclusion_text = f"""本套领域数据服务已通过环境验证,系统可以正常运行。验收测试通过标准关于用例执行、DMS业务流相关文档等两个方面分析,该项目通过验收测试。
+测试用例成功率达到{success_rate},符合验收标准。"""
+ elif success_rate_num >= 70:
+ conclusion_status = "基本通过"
+ conclusion_text = f"""本套领域数据服务基本满足验收要求,但存在部分问题需要修复。测试用例成功率为{success_rate},建议修复失败用例后重新测试。"""
+ else:
+ conclusion_status = "不通过"
+ conclusion_text = f"""本套领域数据服务未达到验收标准,存在较多问题需要修复。测试用例成功率仅为{success_rate},需要全面检查和修复后重新测试。"""
+
+ elements.append(to_para(conclusion_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 检测依据
+ elements.append(to_para("检测依据", heading_style, escape=False))
+
+ detection_basis_text = """集成开发应用支撑系统开放数据生态数据共享要求和评价第1部分:关于DMS领域数据服务的接口要求和测试细则。
+参考标准:
+1. DMS数据管理系统API规范V1.0
+2. RESTful API设计规范
+3. 数据安全和隐私保护要求
+4. 系统集成测试标准"""
+
+ elements.append(to_para(detection_basis_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 报告生成信息
+ elements.append(to_para("报告生成信息", heading_style, escape=False))
+ generation_info_data = [
+ [to_para("生成时间", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日 %H:%M:%S'))],
+ [to_para("生成工具", escape=False), to_para("DMS合规性测试工具")],
+ [to_para("工具版本", escape=False), to_para("V1.0.0")],
+ [to_para("测试结论", escape=False), to_para(f"{conclusion_status}", escape=False)],
+ ]
+ generation_info_table = Table(generation_info_data, colWidths=[120, '*'])
+ generation_info_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
+ ]))
+ elements.append(generation_info_table)
+
+ # 构建PDF
+ doc.build(elements)
+ logger.info(f"PDF报告已成功生成: {output_path}")
+ except Exception as e:
+ logger.error(f"构建PDF文档时出错: {e}", exc_info=True)
+
+@app.post("/run",
+ summary="执行API合规性测试",
+ description="""
+ 执行API合规性测试的主要端点。
+
+ 支持三种API定义源:
+ - YAPI: 基于YAPI定义文件
+ - Swagger/OpenAPI: 基于OpenAPI规范文件
+ - DMS: 动态发现DMS服务的API
+
+ 分页支持
+ 对于DMS测试,支持分页获取API列表,避免内存溢出:
+ - `page_size`: 每页获取的API数量(默认1000)
+ - 返回详细的分页统计信息
+
+ LLM集成
+ 可选择使用大语言模型生成测试数据:
+ - 智能生成请求体、路径参数、查询参数等
+ - 提高测试覆盖率和数据多样性
+ """,
+ response_model=TestResponse,
+ responses={
+ 200: {"description": "测试执行成功"},
+ 400: {"description": "请求参数错误", "model": ErrorResponse},
+ 500: {"description": "服务器内部错误", "model": ErrorResponse}
+ })
+async def run_api_tests(config: TestConfig):
+ """
+ 执行API合规性测试
+
+ - config: 测试配置,包含API定义源、测试参数等
+ - returns: 测试结果,包含摘要信息和分页信息(如适用)
+ """
+ try:
+ logger.info(f"Starting test run with configuration: {config.model_dump()}")
+
+ # Convert Pydantic model to dict for compatibility
+ config_dict = config.model_dump(exclude_none=True)
+
+ # Add hidden parameters with default values
+ hidden_defaults = {
+ "categories": [],
+ "tags": [],
+ "ignore_ssl": True,
+ "output": "./test_reports",
+ "generate_pdf": True,
+ "custom_test_cases_dir": "./custom_testcases",
+ "stages_dir": "./custom_stages",
+ "llm_api_key": "sk-lbGrsUPL1iby86h554FaE536C343435dAa9bA65967A840B2",
+ "llm_base_url": "https://aiproxy.petrotech.cnpc/v1",
+ "llm_model_name": "deepseek-v3",
+ "use_llm_for_request_body": False,
+ "use_llm_for_path_params": False,
+ "use_llm_for_query_params": False,
+ "use_llm_for_headers": False,
+ "verbose": False
+ }
+
+ # Merge hidden defaults with config
+ config_dict.update(hidden_defaults)
+
+ result = run_tests_logic(config_dict)
+
+ if result['status'] == 'error':
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=result
+ )
+
+ return result
+
+ except ValueError as e:
+ logger.error(f"Validation error: {e}")
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail={
+ "status": "error",
+ "message": str(e)
+ }
+ )
+ except Exception as e:
+ logger.error(f"An error occurred in the API endpoint: {e}", exc_info=True)
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail={
+ "status": "error",
+ "message": str(e),
+ "traceback": traceback.format_exc()
+ }
+ )
+
+@app.get("/reports/{report_id}",
+ summary="下载测试报告",
+ description="根据报告ID下载对应的测试报告文件")
+async def download_report(report_id: str, file_type: str = "summary.json"):
+ """
+ 下载测试报告文件
+
+ - report_id: 报告ID(通常是时间戳)
+ - file_type: 文件类型,可选值:summary.json, api_call_details.md
+ """
+ try:
+ report_dir = Path("./test_reports") / report_id
+ file_path = report_dir / file_type
+
+ if not file_path.exists():
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"Report file not found: {file_type}"
+ )
+
+ return FileResponse(
+ path=str(file_path),
+ filename=file_type,
+ media_type='application/octet-stream'
+ )
+
+ except Exception as e:
+ logger.error(f"Error downloading report: {e}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error downloading report: {str(e)}"
+ )
+
+@app.get("/reports",
+ summary="列出所有测试报告",
+ description="获取所有可用的测试报告列表")
+async def list_reports():
+ """列出所有可用的测试报告"""
+ try:
+ reports_dir = Path("./test_reports")
+ if not reports_dir.exists():
+ return {"reports": []}
+
+ reports = []
+ for report_dir in reports_dir.iterdir():
+ if report_dir.is_dir():
+ summary_file = report_dir / "summary.json"
+ if summary_file.exists():
+ try:
+ with open(summary_file, 'r', encoding='utf-8') as f:
+ summary = json.load(f)
+
+ reports.append({
+ "id": report_dir.name,
+ "timestamp": report_dir.name,
+ "path": str(report_dir),
+ "summary": {
+ "endpoints_total": summary.get("endpoints_total", 0),
+ "endpoints_passed": summary.get("endpoints_passed", 0),
+ "endpoints_failed": summary.get("endpoints_failed", 0),
+ "test_cases_total": summary.get("test_cases_total", 0)
+ }
+ })
+ except Exception as e:
+ logger.warning(f"Error reading summary for {report_dir.name}: {e}")
+
+ # Sort by timestamp (newest first)
+ reports.sort(key=lambda x: x["timestamp"], reverse=True)
+
+ return {"reports": reports}
+
+ except Exception as e:
+ logger.error(f"Error listing reports: {e}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error listing reports: {str(e)}"
+ )
+
+if __name__ == "__main__":
+ import argparse
+
+ parser = argparse.ArgumentParser(description="DMS合规性测试工具 FastAPI服务器")
+ parser.add_argument("--host", default="0.0.0.0", help="服务器主机地址")
+ parser.add_argument("--port", type=int, default=5050, help="服务器端口")
+ parser.add_argument("--reload", action="store_true", help="启用自动重载(开发模式)")
+ parser.add_argument("--workers", type=int, default=1, help="工作进程数")
+
+ args = parser.parse_args()
+
+ logger.info(f"Starting FastAPI server on {args.host}:{args.port}")
+ logger.info(f"API文档地址: http://{args.host}:{args.port}/docs")
+ logger.info(f"ReDoc文档地址: http://{args.host}:{args.port}/redoc")
+
+ uvicorn.run(
+ "fastapi_server:app",
+ host=args.host,
+ port=args.port,
+ reload=args.reload,
+ workers=args.workers if not args.reload else 1,
+ log_level="info"
+ )
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/history_viewer.py b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/history_viewer.py
new file mode 100644
index 0000000..c0e1de9
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/history_viewer.py
@@ -0,0 +1,315 @@
+import os
+import sys
+import json
+import logging
+import sqlite3
+from pathlib import Path
+from datetime import timedelta
+from werkzeug.security import generate_password_hash, check_password_hash
+from flask import Flask, request, jsonify, send_from_directory, session, redirect, url_for, render_template, g, flash, get_flashed_messages, abort
+from flask_cors import CORS
+from functools import wraps
+import markdown
+
+# --- PyInstaller Path Helpers ---
+# For data files that should persist outside the bundle (e.g., database, reports)
+if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
+ # Running in a PyInstaller bundle
+ APP_ROOT = os.path.dirname(sys.executable)
+else:
+ # Running in a normal Python environment
+ APP_ROOT = os.path.dirname(os.path.abspath(__file__))
+
+template_dir = os.path.join(APP_ROOT, 'templates')
+static_dir = os.path.join(APP_ROOT, 'static')
+app = Flask(__name__, static_folder=static_dir, template_folder=template_dir)
+CORS(app)
+
+# --- 基本配置 ---
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
+
+DATABASE = os.path.join(APP_ROOT, 'users.db')
+REPORTS_DIR = os.path.join(APP_ROOT, 'test_reports')
+
+app.config['SECRET_KEY'] = os.urandom(24)
+app.config['DATABASE'] = DATABASE
+app.config['REPORTS_DIR'] = REPORTS_DIR
+app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
+
+os.makedirs(app.config['REPORTS_DIR'], exist_ok=True)
+
+# --- 数据库 Schema 和辅助函数 (与 flask_app.py 相同) ---
+DB_SCHEMA = '''
+DROP TABLE IF EXISTS user;
+CREATE TABLE user (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ username TEXT UNIQUE NOT NULL,
+ password_hash TEXT NOT NULL
+);
+'''
+
+def get_db():
+ db = getattr(g, '_database', None)
+ if db is None:
+ db = g._database = sqlite3.connect(app.config['DATABASE'])
+ db.row_factory = sqlite3.Row
+ return db
+
+@app.teardown_appcontext
+def close_connection(exception):
+ db = getattr(g, '_database', None)
+ if db is not None:
+ db.close()
+
+def init_db(force_create=False):
+ if force_create or not os.path.exists(app.config['DATABASE']):
+ with app.app_context():
+ db = get_db()
+ db.cursor().executescript(DB_SCHEMA)
+ db.commit()
+ logger.info("数据库已初始化!")
+ create_default_user()
+ else:
+ logger.info("数据库已存在。")
+
+def create_default_user(username="admin", password="7#Xq9$Lm*2!Pw@5"):
+ with app.app_context():
+ db = get_db()
+ user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
+ if user is None:
+ db.execute("INSERT INTO user (username, password_hash) VALUES (?, ?)", (username, generate_password_hash(password)))
+ db.commit()
+ logger.info(f"已创建默认用户: {username}")
+ else:
+ logger.info(f"默认用户 {username} 已存在。")
+
+@app.cli.command('init-db')
+def init_db_command():
+ init_db(force_create=True)
+ print("已初始化数据库。")
+
+# --- 用户认证 (与 flask_app.py 相同) ---
+@app.route('/login', methods=('GET', 'POST'))
+def login():
+ if g.user:
+ return redirect(url_for('list_history'))
+ if request.method == 'POST':
+ username = request.form['username']
+ password = request.form['password']
+ db = get_db()
+ error = None
+ user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
+ if user is None:
+ error = '用户名不存在。'
+ elif not check_password_hash(user['password_hash'], password):
+ error = '密码错误。'
+ if error is None:
+ session.clear()
+ session['user_id'] = user['id']
+ session['username'] = user['username']
+ session.permanent = True
+ return redirect(url_for('list_history'))
+ flash(error)
+ return render_template('login.html')
+
+@app.route('/logout')
+def logout():
+ session.clear()
+ flash('您已成功登出。')
+ return redirect(url_for('login'))
+
+def login_required(view):
+ @wraps(view)
+ def wrapped_view(**kwargs):
+ # if g.user is None:
+ # return redirect(url_for('login'))
+ return view(**kwargs)
+ return wrapped_view
+
+@app.before_request
+def load_logged_in_user():
+ user_id = session.get('user_id')
+ if user_id is None:
+ g.user = None
+ else:
+ g.user = get_db().execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
+
+# --- LLM配置视图 ---
+CRITERIA_FILE_PATH = os.path.join(APP_ROOT, 'custom_testcases', 'llm', 'compliance_criteria.json')
+
+@app.route('/llm-config', methods=['GET', 'POST'])
+@login_required
+def llm_config():
+ criteria_for_template = []
+ file_exists = os.path.exists(CRITERIA_FILE_PATH)
+
+ if request.method == 'POST':
+ # 从表单获取所有名为'criteria'的输入项,作为一个列表
+ criteria_list = request.form.getlist('criteria')
+ # 过滤掉用户可能提交的空规则
+ criteria_list = [item.strip() for item in criteria_list if item.strip()]
+
+ try:
+ # 将规则列表格式化为美观的JSON并保存
+ pretty_content = json.dumps(criteria_list, indent=2, ensure_ascii=False)
+ with open(CRITERIA_FILE_PATH, 'w', encoding='utf-8') as f:
+ f.write(pretty_content)
+ flash('LLM合规性标准已成功保存!', 'success')
+ except Exception as e:
+ flash(f'保存文件时发生未知错误: {e}', 'error')
+
+ # 无论是GET还是POST请求后,都重新从文件中读取最新的规则列表用于显示
+ if file_exists:
+ try:
+ with open(CRITERIA_FILE_PATH, 'r', encoding='utf-8') as f:
+ criteria_for_template = json.load(f)
+ # 确保文件内容确实是一个列表
+ if not isinstance(criteria_for_template, list):
+ flash('配置文件格式错误:内容应为JSON数组。已重置为空列表。', 'error')
+ criteria_for_template = []
+ except Exception as e:
+ flash(f'读取配置文件时出错: {e}', 'error')
+ criteria_for_template = []
+
+ # 准备一个用于页面展示的示例API信息
+ example_api_info = {
+ "path_template": "/api/dms/instance/v1/message/push/myschema/1.0",
+ "method": "POST",
+ "title": "数据推送接口",
+ "description": "用于向系统推送标准格式的数据。",
+ "schema_request_body": {"...": "... (此处为请求体Schema定义)"},
+ "instance_url": "http://example.com/api/dms/instance/v1/message/push/myschema/1.0",
+ "instance_request_headers": {"X-Tenant-ID": "tenant-001", "...": "..."},
+ "instance_request_body": {"id": "123", "data": "example"},
+ "instance_response_status": 200,
+ "instance_response_body": {"code": 0, "message": "success", "data": True}
+ }
+
+ return render_template('llm_config.html', criteria=criteria_for_template, file_exists=file_exists, example_api_info=json.dumps(example_api_info, indent=2, ensure_ascii=False))
+
+# --- 文件下载路由 ---
+@app.route('/download//')
+@login_required
+def download_report(run_id, filename):
+ """安全地提供指定运行记录中的报告文件下载。"""
+ # 清理输入,防止目录遍历攻击
+ run_id_safe = Path(run_id).name
+ filename_safe = Path(filename).name
+
+ reports_dir = Path(app.config['REPORTS_DIR']).resolve()
+ run_dir = (reports_dir / run_id_safe).resolve()
+
+ # 安全检查:确保请求的目录是REPORTS_DIR的子目录
+ if not run_dir.is_dir() or run_dir.parent != reports_dir:
+ abort(404, "找不到指定的测试记录或权限不足。")
+
+ return send_from_directory(run_dir, filename_safe, as_attachment=True)
+
+# --- 新增:PDF文件预览路由 ---
+@app.route('/view_pdf/')
+@login_required
+def view_pdf_report(run_id):
+ """安全地提供PDF报告文件以内联方式查看。"""
+ run_id_safe = Path(run_id).name
+ filename_safe = "report_cn.pdf"
+
+ reports_dir = Path(app.config['REPORTS_DIR']).resolve()
+ run_dir = (reports_dir / run_id_safe).resolve()
+
+ # 安全检查
+ if not run_dir.is_dir() or run_dir.parent != reports_dir:
+ abort(404, "找不到指定的测试记录或权限不足。")
+
+ pdf_path = run_dir / filename_safe
+ if not pdf_path.exists():
+ abort(404, "未找到PDF报告文件。")
+
+ return send_from_directory(run_dir, filename_safe)
+
+# --- 历史记录视图 ---
+@app.route('/')
+@login_required
+def list_history():
+ history = []
+ reports_path = Path(app.config['REPORTS_DIR'])
+ if not reports_path.is_dir():
+ flash('报告目录不存在。')
+ return render_template('history.html', history=[])
+
+ # 获取所有子目录(即测试运行记录)
+ run_dirs = [d for d in reports_path.iterdir() if d.is_dir()]
+ # 按名称(时间戳)降序排序
+ run_dirs.sort(key=lambda x: x.name, reverse=True)
+
+ for run_dir in run_dirs:
+ summary_path = run_dir / 'summary.json'
+ details_path = run_dir / 'api_call_details.md'
+ run_info = {'id': run_dir.name, 'summary': None, 'has_details': details_path.exists()}
+
+ if summary_path.exists():
+ try:
+ with open(summary_path, 'r', encoding='utf-8') as f:
+ summary_data = json.load(f)
+ run_info['summary'] = summary_data
+ except (json.JSONDecodeError, IOError) as e:
+ logger.error(f"无法读取或解析摘要文件 {summary_path}: {e}")
+ run_info['summary'] = {'error': '无法加载摘要'}
+
+ history.append(run_info)
+
+ return render_template('history.html', history=history)
+
+@app.route('/details/')
+@login_required
+def show_details(run_id):
+ run_id = Path(run_id).name # Sanitize input
+ run_dir = Path(app.config['REPORTS_DIR']) / run_id
+
+ if not run_dir.is_dir():
+ return "找不到指定的测试记录。", 404
+
+ summary_path = run_dir / 'summary.json'
+ details_path = run_dir / 'api_call_details.md'
+ pdf_path = run_dir / 'report_cn.pdf' # 新增PDF路径
+
+ summary_content = "{}"
+ details_content = "### 未找到API调用详情报告"
+
+ has_pdf_report = pdf_path.exists() # 检查PDF是否存在
+ has_md_report = details_path.exists() # 检查MD报告是否存在
+
+ if summary_path.exists():
+ try:
+ with open(summary_path, 'r', encoding='utf-8') as f:
+ summary_data = json.load(f)
+ summary_content = json.dumps(summary_data, indent=2, ensure_ascii=False)
+ except Exception as e:
+ summary_content = f"加载摘要文件出错: {e}"
+
+ if has_md_report:
+ try:
+ with open(details_path, 'r', encoding='utf-8') as f:
+ # 将Markdown转换为HTML
+ details_content = markdown.markdown(f.read(), extensions=['fenced_code', 'tables', 'def_list', 'attr_list'])
+ except Exception as e:
+ details_content = f"加载详情文件出错: {e}"
+
+ return render_template('history_detail.html',
+ run_id=run_id,
+ summary_content=summary_content,
+ details_content=details_content,
+ has_pdf_report=has_pdf_report,
+ has_md_report=has_md_report)
+
+
+# --- 根路径重定向 ---
+@app.route('/index')
+def index_redirect():
+ return redirect(url_for('list_history'))
+
+if __name__ == '__main__':
+ # 首次运行时确保数据库和用户存在
+ init_db()
+ # 使用5051端口避免与api_server.py冲突
+ app.run(debug=True, host='0.0.0.0', port=5051)
\ No newline at end of file
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/requirements.txt b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/requirements.txt
new file mode 100644
index 0000000..90e83ac
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/requirements.txt
@@ -0,0 +1,22 @@
+pydantic>=2.0.0,<3.0.0
+PyYAML>=6.0,<7.0
+jsonschema>=4.0.0,<5.0.0
+requests>=2.20.0,<3.0.0
+flask>=2.0.0,<3.0.0 # 用于模拟服务器
+numpy>=1.20.0,<2.0.0 # 用于数值计算
+
+# 用于 OpenAPI/Swagger 解析 (可选, 如果输入解析器需要)
+openapi-spec-validator>=0.5.0,<0.6.0
+prance[osv]>=23.0.0,<24.0.0
+
+# 用于 API Linting (可选, 如果规则库需要集成 Spectral-like 功能)
+# pyaml-env>=1.0.0,<2.0.0 # 如果 linting 规则是 yaml 且用到了环境变量
+
+# 测试框架 (可选, 推荐)
+# pytest>=7.0.0,<8.0.0
+# pytest-cov>=4.0.0,<5.0.0
+# httpx>=0.20.0,<0.28.0 # for testing API calls
+
+Flask-Cors>=3.0
+markdown
+reportlab>=3.6.0 # For PDF report generation
\ No newline at end of file
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/set-permissions.sh b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/set-permissions.sh
new file mode 100644
index 0000000..934aaff
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/set-permissions.sh
@@ -0,0 +1,3 @@
+#/bin/bash
+chmod +x *.sh
+echo "Permissions set for shell scripts"
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/start.bat b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/start.bat
new file mode 100644
index 0000000..8662b23
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/start.bat
@@ -0,0 +1,10 @@
+@echo off
+echo Starting DMS Compliance Tool...
+echo Loading pre-built Docker image...
+docker load -i docker-image.tar
+echo Starting services...
+docker compose up -d
+echo Services started
+echo FastAPI Server: http://localhost:5050
+echo History Viewer: http://localhost:5051
+pause
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/start.sh b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/start.sh
new file mode 100644
index 0000000..08957d5
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/start.sh
@@ -0,0 +1,10 @@
+#/bin/bash
+
+echo "Starting DMS Compliance Tool..."
+echo "Loading pre-built Docker image..."
+docker load -i docker-image.tar
+echo "Starting services..."
+docker compose up -d
+echo "Services started"
+echo "FastAPI Server: http://localhost:5050"
+echo "History Viewer: http://localhost:5051"
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/stop.bat b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/stop.bat
new file mode 100644
index 0000000..ae513aa
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/stop.bat
@@ -0,0 +1,4 @@
+@echo off
+docker compose down
+echo Services stopped.
+pause
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/stop.sh b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/stop.sh
new file mode 100644
index 0000000..9b33606
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/stop.sh
@@ -0,0 +1,5 @@
+#/bin/bash
+
+echo "Stopping DMS Compliance Tool..."
+docker compose down
+echo "Services stopped."
diff --git a/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/supervisord.conf b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/supervisord.conf
new file mode 100644
index 0000000..3fc5c54
--- /dev/null
+++ b/dms-compliance-dual-amd64-windows-0周三/0/2-165956/2-165956/supervisord.conf
@@ -0,0 +1,20 @@
+[supervisord]
+nodaemon=true
+logfile=/var/log/supervisor/supervisord.log
+user=root
+
+[program:api_server]
+command=python -m uvicorn fastapi_server:app --host 0.0.0.0 --port 5050
+directory=/app
+autostart=true
+autorestart=true
+redirect_stderr=true
+stdout_logfile=/var/log/supervisor/api_server.log
+
+[program:history_viewer]
+command=python history_viewer.py
+directory=/app
+autostart=true
+autorestart=true
+redirect_stderr=true
+stdout_logfile=/var/log/supervisor/history_viewer.log
diff --git a/fastapi_server.py b/fastapi_server.py
index 82af2df..de3ea32 100644
--- a/fastapi_server.py
+++ b/fastapi_server.py
@@ -62,11 +62,11 @@ app = FastAPI(
主要特性
- - 🚀 高性能: 基于FastAPI,支持异步处理
- - 📊 分页支持: 解决大量API节点的内存问题
- - 📝 自动文档: 自动生成交互式API文档
- - 🔧 灵活配置: 支持多种测试配置选项
- - 📈 详细报告: 生成PDF和JSON格式的测试报告
+ 🚀 高性能: 基于FastAPI,支持异步处理
+ 📊 分页支持: 解决大量API节点的内存问题
+ 📝 自动文档: 自动生成交互式API文档
+ 🔧 灵活配置: 支持多种测试配置选项
+ 📈 详细报告: 生成PDF和JSON格式的测试报告
""",
version="1.0.0",
docs_url="/docs", # Swagger UI
@@ -87,46 +87,20 @@ class TestConfig(BaseModel):
"""测试配置模型"""
# API定义源 (三选一)
- yapi: Optional[str] = Field(None, description="YAPI定义文件路径", example="./api_spec.json")
- swagger: Optional[str] = Field(None, description="Swagger/OpenAPI定义文件路径", example="./openapi.yaml")
- dms: Optional[str] = Field(None, description="DMS服务发现的domain mapping文件路径", example="./assets/doc/dms/domain.json")
-
+ yapi: Optional[str] = Field(None, description="YAPI定义文件路径", exclude=True)
+ swagger: Optional[str] = Field(None, description="Swagger/OpenAPI定义文件路径", exclude=True)
+ dms: Optional[str] = Field("./assets/doc/dms/domain.json", description="DMS服务发现的domain mapping文件路径", example="./assets/doc/dms/domain.json")
# 基本配置
- base_url: str = Field(..., description="API基础URL", example="https://api.example.com")
+ base_url: str = Field("https://www.dev.ideas.cnpc/", description="API基础URL", example="https://www.dev.ideas.cnpc/")
# 分页配置
- page_size: int = Field(1000, description="DMS API分页大小,默认1000。较小的值可以减少内存使用", ge=1, le=10000)
+ page_size: int = Field(10, description="DMS API分页大小,默认10。较小的值可以减少内存使用", ge=1, le=10000)
page_no: int = Field(1, description="起始页码,从1开始。可用于断点续传或跳过前面的页面", ge=1)
- fetch_all_pages: bool = Field(True, description="是否获取所有页面。True=获取所有数据,False=只获取指定页面")
+ fetch_all_pages: bool = Field(False, description="是否获取所有页面。True=获取所有数据,False=只获取指定页面")
# 过滤选项
- categories: Optional[List[str]] = Field(None, description="YAPI分类列表", example=["用户管理", "订单系统"])
- tags: Optional[List[str]] = Field(None, description="Swagger标签列表", example=["user", "order"])
strictness_level: str = Field("CRITICAL", description="测试严格等级", pattern="^(CRITICAL|HIGH|MEDIUM|LOW)$")
- # SSL和安全
- ignore_ssl: bool = Field(False, description="忽略SSL证书验证(不推荐在生产环境使用)")
-
- # 输出配置
- output: str = Field("./test_reports", description="测试报告输出目录")
- generate_pdf: bool = Field(True, description="是否生成PDF报告")
-
- # 自定义测试
- custom_test_cases_dir: Optional[str] = Field(None, description="自定义测试用例目录路径")
- stages_dir: Optional[str] = Field(None, description="自定义测试阶段目录路径")
-
- # LLM配置
- llm_api_key: Optional[str] = Field(None, description="LLM API密钥")
- llm_base_url: Optional[str] = Field(None, description="LLM API基础URL")
- llm_model_name: Optional[str] = Field("gpt-3.5-turbo", description="LLM模型名称")
- use_llm_for_request_body: bool = Field(False, description="使用LLM生成请求体")
- use_llm_for_path_params: bool = Field(False, description="使用LLM生成路径参数")
- use_llm_for_query_params: bool = Field(False, description="使用LLM生成查询参数")
- use_llm_for_headers: bool = Field(False, description="使用LLM生成请求头")
-
- # 调试选项
- verbose: bool = Field(False, description="启用详细日志输出")
-
@field_validator('base_url')
@classmethod
def validate_base_url(cls, v):
@@ -239,7 +213,7 @@ def run_tests_logic(config: dict):
output_directory = base_output_dir / timestamp
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Test reports will be saved to: {output_directory.resolve()}")
-
+ print(f"config{config}")
# Initialize the orchestrator
orchestrator = APITestOrchestrator(
base_url=config['base_url'],
@@ -291,9 +265,7 @@ def run_tests_logic(config: dict):
if test_summary and config.get('stages_dir') and parsed_spec:
logger.info(f"Executing API test stages from directory: {config['stages_dir']}")
- stage_summary = orchestrator.run_stages_from_spec(parsed_spec, config['stages_dir'])
- if stage_summary:
- test_summary.merge_stage_summary(stage_summary)
+ orchestrator.run_stages_from_spec(parsed_spec, test_summary)
if test_summary:
# Save main summary
@@ -303,12 +275,17 @@ def run_tests_logic(config: dict):
# Save API call details
api_calls_filename = "api_call_details.md"
- save_api_call_details_to_markdown(
+ save_api_call_details_to_file(
orchestrator.get_api_call_details(),
str(output_directory),
filename=api_calls_filename
)
+ # Generate PDF report if reportlab is available
+ if reportlab_available and config.get('generate_pdf', True):
+ pdf_report_path = output_directory / "report_cn.pdf"
+ save_pdf_report(test_summary.to_dict(), pdf_report_path, config.get('strictness_level', 'CRITICAL'))
+
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
@@ -335,47 +312,608 @@ def run_tests_logic(config: dict):
"traceback": traceback.format_exc()
}
-def save_api_call_details_to_markdown(api_call_details: List[APICallDetail], output_dir: str, filename: str = "api_call_details.md"):
- """Save API call details to markdown file"""
+def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
+ """
+ 将API调用详情列表保存到指定目录下的 Markdown 文件中。
+ 同时,额外生成一个纯文本文件 (.txt),每行包含一个 cURL 命令。
+ """
+ if not api_call_details:
+ logger.info("没有API调用详情可供保存。")
+ return
+
+ output_dir = Path(output_dir_path)
try:
- output_path = Path(output_dir) / filename
+ output_dir.mkdir(parents=True, exist_ok=True)
+ except OSError as e:
+ logger.error(f"创建API调用详情输出目录 {output_dir} 失败: {e}")
+ return
- with open(output_path, 'w', encoding='utf-8') as f:
- f.write("# API调用详情\n\n")
+ # 主文件是 Markdown 文件
+ md_output_file = output_dir / filename
+ # 确保它是 .md,尽管 main 函数应该已经处理了
+ if md_output_file.suffix.lower() not in ['.md', '.markdown']:
+ md_output_file = md_output_file.with_suffix('.md')
- for i, detail in enumerate(api_call_details, 1):
- f.write(f"## {i}. {detail.endpoint_name}\n\n")
- f.write(f"**请求URL**: `{detail.request_url}`\n\n")
- f.write(f"**请求方法**: `{detail.request_method}`\n\n")
+ markdown_content = []
- if detail.request_headers:
- f.write("**请求头**:\n```json\n")
- f.write(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
- f.write("\n```\n\n")
+ for detail in api_call_details:
+
+ # Request URL with params (if any)
+ url_to_display = detail.request_url
+ if detail.request_params:
+ try:
+ # Ensure urllib is available for this formatting step
+ import urllib.parse
+ query_string = urllib.parse.urlencode(detail.request_params)
+ url_to_display = f"{detail.request_url}?{query_string}"
+ except Exception as e:
+ logger.warning(f"Error formatting URL with params for display: {e}")
+ # Fallback to just the base URL if params formatting fails
+
+ markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
+ markdown_content.append("**cURL Command:**")
+ markdown_content.append("```sh")
+ markdown_content.append(detail.curl_command)
+ markdown_content.append("```")
+ markdown_content.append("### Request Details")
+ markdown_content.append(f"- **Method:** `{detail.request_method}`")
+ markdown_content.append(f"- **Full URL:** `{url_to_display}`")
+
+ markdown_content.append("- **Headers:**")
+ markdown_content.append("```json")
+ markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
+ markdown_content.append("```")
- if detail.request_body:
- f.write("**请求体**:\n```json\n")
- f.write(json.dumps(detail.request_body, indent=2, ensure_ascii=False))
- f.write("\n```\n\n")
+ if detail.request_params:
+ markdown_content.append("- **Query Parameters:**")
+ markdown_content.append("```json")
+ markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
+ markdown_content.append("```")
- f.write(f"**响应状态码**: `{detail.response_status_code}`\n\n")
+ if detail.request_body is not None:
+ markdown_content.append("- **Body:**")
+ body_lang = "text"
+ formatted_body = str(detail.request_body)
+ try:
+ # Try to parse as JSON for pretty printing
+ if isinstance(detail.request_body, str):
+ try:
+ parsed_json = json.loads(detail.request_body)
+ formatted_body = json.dumps(parsed_json, indent=2, ensure_ascii=False)
+ body_lang = "json"
+ except json.JSONDecodeError:
+ pass # Keep as text
+ elif isinstance(detail.request_body, (dict, list)):
+ formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False)
+ body_lang = "json"
+ except Exception as e:
+ logger.warning(f"Error formatting request body for Markdown: {e}")
+
+ markdown_content.append(f"```{body_lang}")
+ markdown_content.append(formatted_body)
+ markdown_content.append("```")
- if detail.response_headers:
- f.write("**响应头**:\n```json\n")
- f.write(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
- f.write("\n```\n\n")
+ markdown_content.append("### Response Details")
+ markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
+ markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
+
+ markdown_content.append("- **Headers:**")
+ markdown_content.append("```json")
+ markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
+ markdown_content.append("```")
- if detail.response_body:
- f.write("**响应体**:\n```json\n")
- f.write(json.dumps(detail.response_body, indent=2, ensure_ascii=False))
- f.write("\n```\n\n")
+ if detail.response_body is not None:
+ markdown_content.append("- **Body:**")
+ resp_body_lang = "text"
+ formatted_resp_body = str(detail.response_body)
+ try:
+ # Try to parse as JSON for pretty printing
+ if isinstance(detail.response_body, str):
+ try:
+ # If it's already a string that might be JSON, try parsing and re-dumping
+ parsed_json_resp = json.loads(detail.response_body)
+ formatted_resp_body = json.dumps(parsed_json_resp, indent=2, ensure_ascii=False)
+ resp_body_lang = "json"
+ except json.JSONDecodeError:
+ # It's a string, but not valid JSON, keep as text
+ pass
+ elif isinstance(detail.response_body, (dict, list)):
+ # It's already a dict/list, dump it as JSON
+ formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False)
+ resp_body_lang = "json"
+ # If it's neither string nor dict/list (e.g. int, bool from parsed json), str() is fine.
+ except Exception as e:
+ logger.warning(f"Error formatting response body for Markdown: {e}")
- f.write("---\n\n")
-
- logger.info(f"API call details saved to: {output_path}")
+ markdown_content.append(f"```{resp_body_lang}")
+ markdown_content.append(formatted_resp_body)
+ markdown_content.append("```")
+ markdown_content.append("") # Add a blank line for spacing before next --- or EOF
+ markdown_content.append("---") # Separator
+ try:
+ with open(md_output_file, 'w', encoding='utf-8') as f_md:
+ f_md.write("\n".join(markdown_content))
+ logger.info(f"API调用详情已保存为 Markdown: {md_output_file}")
except Exception as e:
- logger.error(f"Error saving API call details: {e}")
+ logger.error(f"保存API调用详情到 Markdown 文件 {md_output_file} 失败: {e}", exc_info=True)
+
+def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'):
+ """将测试摘要保存为格式化的PDF文件"""
+ logger.info(f"开始生成PDF报告: {output_path}")
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+
+ try:
+ # --- 统一的字体管理和注册 ---
+ font_name = 'SimSun' # 使用一个简单清晰的注册名
+ font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
+
+ if not Path(font_path).exists():
+ logger.error(f"字体文件未找到: {Path(font_path).resolve()}")
+ return
+
+ # 关键修复: 对于 .ttc (TrueType Collection) 文件, 必须指定 subfontIndex
+ pdfmetrics.registerFont(TTFont(font_name, font_path, subfontIndex=0))
+ # 将注册的字体关联到 'SimSun' 字体族
+ pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
+
+ doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
+ elements = []
+
+ # --- 统一样式定义, 全部使用注册的字体名 ---
+ styles = getSampleStyleSheet()
+ title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
+ heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
+ normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
+ small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12)
+
+ def to_para(text, style=normal_style, escape=True):
+ """
+ 根据用户建议移除 textwrap 以进行诊断。
+ 此版本只包含净化和基本的换行符替换。
+ """
+ if text is None:
+ content = ""
+ else:
+ content = str(text)
+
+ if escape:
+ content = html.escape(content)
+
+ # 依然保留Unicode控制字符的净化
+ content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
+
+ if not content.strip():
+ # 对于完全空白或None的输入,返回一个安全的非换行空格
+ return Paragraph(' ', style)
+
+ # 只使用基本的换行符替换
+ content = content.replace('\n', '
')
+
+ return Paragraph(content, style)
+
+ # 3. 填充PDF内容 - 优化后的报告格式
+
+ # 生成报告编码(基于时间戳)
+ import time
+ report_code = f"DMS-TEST-{int(time.time())}"
+
+ # 报告标题
+ elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False))
+ elements.append(Spacer(1, 15))
+
+ # 报告基本信息表格
+ basic_info_data = [
+ [to_para("报告编码", escape=False), to_para(report_code)],
+ [to_para("报告名称", escape=False), to_para("DMS领域数据服务测试分析报告")],
+ [to_para("申请日期", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日'))],
+ [to_para("申请人", escape=False), to_para("系统管理员")],
+ [to_para("服务供应商名称", escape=False), to_para("数据管理系统(DMS)")],
+ ]
+ basic_info_table = Table(basic_info_data, colWidths=[120, '*'])
+ basic_info_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
+ ]))
+ elements.append(basic_info_table)
+ elements.append(Spacer(1, 20))
+
+ # 摘要部分
+ elements.append(to_para("摘要", heading_style, escape=False))
+ overall = summary_data.get('overall_summary', {})
+
+ # 从JSON提取并格式化时间
+ try:
+ start_time_str = summary_data.get('start_time', 'N/A')
+ end_time_str = summary_data.get('end_time', 'N/A')
+ duration = summary_data.get('duration_seconds', summary_data.get('duration', 0.0))
+
+ start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
+ end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
+ except:
+ start_time_formatted = start_time_str
+ end_time_formatted = end_time_str
+
+ # 摘要内容 - 安全计算跳过的数量
+ def safe_subtract(total, passed, failed):
+ """安全地计算跳过的数量"""
+ try:
+ if isinstance(total, (int, float)) and isinstance(passed, (int, float)) and isinstance(failed, (int, float)):
+ return max(0, total - passed - failed)
+ else:
+ return 0
+ except:
+ return 0
+
+ endpoints_tested = overall.get('endpoints_tested', 0)
+ endpoints_passed = overall.get('endpoints_passed', 0)
+ endpoints_failed = overall.get('endpoints_failed', 0)
+ endpoints_skipped = safe_subtract(endpoints_tested, endpoints_passed, endpoints_failed)
+
+ test_cases_executed = overall.get('total_test_cases_executed', 0)
+ test_cases_passed = overall.get('test_cases_passed', 0)
+ test_cases_failed = overall.get('test_cases_failed', 0)
+ test_cases_skipped = safe_subtract(test_cases_executed, test_cases_passed, test_cases_failed)
+
+ stages_executed = overall.get('total_stages_executed', 0)
+ stages_passed = overall.get('stages_passed', 0)
+ stages_failed = overall.get('stages_failed', 0)
+ stages_skipped = safe_subtract(stages_executed, stages_passed, stages_failed)
+
+ summary_text = f"""本次测试针对DMS(数据管理系统)领域数据服务进行全面的合规性验证。
+测试时间:{start_time_formatted} 至 {end_time_formatted},总耗时 {float(duration):.2f} 秒。
+共测试 {endpoints_tested} 个API端点,其中 {endpoints_passed} 个通过,{endpoints_failed} 个失败,{endpoints_skipped} 个跳过,端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}。
+执行 {test_cases_executed} 个测试用例,其中 {test_cases_passed} 个通过,{test_cases_failed} 个失败,{test_cases_skipped} 个跳过,测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}。
+执行 {stages_executed} 个流程测试,其中 {stages_passed} 个通过,{stages_failed} 个失败,{stages_skipped} 个跳过,流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}。"""
+
+ elements.append(to_para(summary_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 测试内容包括 - API列表表格
+ elements.append(to_para("测试内容包括", heading_style, escape=False))
+
+ # 从测试结果中提取API信息
+ endpoint_results = summary_data.get('endpoint_results', [])
+ api_list_data = [
+ [to_para("序号", escape=False), to_para("服务名称", escape=False),
+ to_para("服务功能描述", escape=False), to_para("服务参数描述", escape=False),
+ to_para("服务返回值描述", escape=False)]
+ ]
+
+ for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API
+ endpoint_name = endpoint.get('endpoint_name', 'N/A')
+
+ # 简化的功能描述
+ if 'Create' in endpoint_name:
+ func_desc = "提供数据创建服务"
+ elif 'List' in endpoint_name or 'Query' in endpoint_name:
+ func_desc = "提供数据查询和列表服务"
+ elif 'Read' in endpoint_name:
+ func_desc = "提供单条数据读取服务"
+ elif 'Update' in endpoint_name:
+ func_desc = "提供数据更新服务"
+ elif 'Delete' in endpoint_name:
+ func_desc = "提供数据删除服务"
+ else:
+ func_desc = "提供数据管理服务"
+
+ api_list_data.append([
+ to_para(str(i), small_style),
+ to_para(endpoint_name, small_style),
+ to_para(func_desc, small_style),
+ to_para("标准DMS参数格式", small_style),
+ to_para("标准DMS响应格式", small_style)
+ ])
+
+ api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80])
+ api_list_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
+ ('ALIGN', (0,0), (-1,-1), 'CENTER'),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('FONTSIZE', (0,0), (-1,-1), 8)
+ ]))
+ elements.append(api_list_table)
+ elements.append(Spacer(1, 20))
+
+ # 测试用例列表 - 根据严格等级分为必须和非必须
+ elements.append(to_para("测试用例列表", heading_style, escape=False))
+
+ # 定义严重性等级的数值映射
+ severity_levels = {
+ 'CRITICAL': 5,
+ 'HIGH': 4,
+ 'MEDIUM': 3,
+ 'LOW': 2,
+ 'INFO': 1
+ }
+
+ strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL
+
+ # 收集所有测试用例(包括endpoint用例和stage用例)
+ all_test_cases = []
+ failed_test_cases = [] # 专门收集失败的测试用例
+
+ # 1. 收集endpoint测试用例
+ for endpoint_result in endpoint_results:
+ test_cases = endpoint_result.get('executed_test_cases', [])
+ for tc in test_cases:
+ tc_severity = tc.get('test_case_severity', 'MEDIUM')
+ tc_severity_value = severity_levels.get(tc_severity, 3)
+ tc_status = tc.get('status', 'N/A')
+ tc_message = tc.get('message', '')
+
+ test_case_info = {
+ 'type': 'Endpoint',
+ 'endpoint': endpoint_result.get('endpoint_name', 'N/A'),
+ 'endpoint_id': endpoint_result.get('endpoint_id', 'N/A'),
+ 'case_name': tc.get('test_case_name', 'N/A'),
+ 'case_id': tc.get('test_case_id', 'N/A'),
+ 'status': tc_status,
+ 'message': tc_message,
+ 'severity': tc_severity,
+ 'severity_value': tc_severity_value,
+ 'is_required': tc_severity_value >= strictness_value,
+ 'duration': tc.get('duration_seconds', 0),
+ 'timestamp': tc.get('timestamp', '')
+ }
+
+ all_test_cases.append(test_case_info)
+
+ # 收集失败的测试用例
+ if tc_status in ['失败', 'FAILED', '错误', 'ERROR']:
+ failed_test_cases.append(test_case_info)
+
+ # 2. 收集stage测试用例
+ stage_results = summary_data.get('stage_results', [])
+ for stage_result in stage_results:
+ stage_name = stage_result.get('stage_name', 'N/A')
+ stage_status = stage_result.get('overall_status', 'N/A')
+ stage_message = stage_result.get('message', stage_result.get('error_message', ''))
+ stage_severity = 'HIGH' # Stage用例通常是高优先级
+ stage_severity_value = severity_levels.get(stage_severity, 4)
+
+ # 将stage作为一个测试用例添加
+ stage_case_info = {
+ 'type': 'Stage',
+ 'endpoint': f"Stage: {stage_name}",
+ 'endpoint_id': f"STAGE_{stage_name}",
+ 'case_name': stage_result.get('description', stage_name),
+ 'case_id': f"STAGE_{stage_name}",
+ 'status': stage_status,
+ 'message': stage_message,
+ 'severity': stage_severity,
+ 'severity_value': stage_severity_value,
+ 'is_required': stage_severity_value >= strictness_value,
+ 'duration': stage_result.get('duration_seconds', 0),
+ 'timestamp': stage_result.get('start_time', '')
+ }
+
+ all_test_cases.append(stage_case_info)
+
+ # 收集失败的stage用例
+ if stage_status in ['失败', 'FAILED', '错误', 'ERROR']:
+ failed_test_cases.append(stage_case_info)
+
+ # 分离必须和非必须的测试用例
+ required_cases = [case for case in all_test_cases if case['is_required']]
+ optional_cases = [case for case in all_test_cases if not case['is_required']]
+
+ # 创建分离的测试用例表格
+ if all_test_cases:
+ # 添加严格等级说明
+ strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。"
+ elements.append(to_para(strictness_text, small_style))
+ elements.append(Spacer(1, 10))
+
+ # 1. 必须的测试用例表格
+ if required_cases:
+ elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False))
+
+ required_table_data = [
+ [to_para("序号", escape=False), to_para("类型", escape=False),
+ to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False),
+ to_para("优先级", escape=False), to_para("执行结果", escape=False)]
+ ]
+
+ for i, case in enumerate(required_cases, 1):
+ status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
+ required_table_data.append([
+ to_para(str(i), small_style),
+ to_para(case['type'], small_style),
+ to_para(case['case_name'], small_style),
+ to_para(case['endpoint'], small_style),
+ to_para(case['severity'], small_style),
+ to_para(status_display, small_style)
+ ])
+
+ required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45])
+ required_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例
+ ('ALIGN', (0,0), (-1,-1), 'CENTER'),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('FONTSIZE', (0,0), (-1,-1), 8)
+ ]))
+ elements.append(required_table)
+ elements.append(Spacer(1, 15))
+
+ # 2. 非必须的测试用例表格
+ if optional_cases:
+ elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False))
+
+ optional_table_data = [
+ [to_para("序号", escape=False), to_para("类型", escape=False),
+ to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False),
+ to_para("优先级", escape=False), to_para("执行结果", escape=False)]
+ ]
+
+ for i, case in enumerate(optional_cases, 1):
+ status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
+ optional_table_data.append([
+ to_para(str(i), small_style),
+ to_para(case['type'], small_style),
+ to_para(case['case_name'], small_style),
+ to_para(case['endpoint'], small_style),
+ to_para(case['severity'], small_style),
+ to_para(status_display, small_style)
+ ])
+
+ optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45])
+ optional_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例
+ ('ALIGN', (0,0), (-1,-1), 'CENTER'),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('FONTSIZE', (0,0), (-1,-1), 8)
+ ]))
+ elements.append(optional_table)
+ elements.append(Spacer(1, 10))
+
+ # 添加用例统计信息
+ total_cases = len(all_test_cases)
+ endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint'])
+ stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage'])
+ required_count = len(required_cases)
+ optional_count = len(optional_cases)
+
+ stats_text = f"""测试用例统计:
+总计 {total_cases} 个用例,其中端点用例 {endpoint_cases} 个,阶段用例 {stage_cases} 个。
+必须用例 {required_count} 个,非必须用例 {optional_count} 个。
+严格等级:{strictness_level}({severity_levels.get(strictness_level, 5)}级及以上为必须)。"""
+
+ elements.append(to_para(stats_text, small_style))
+ else:
+ elements.append(to_para("无测试用例执行记录。", normal_style))
+
+ elements.append(Spacer(1, 20))
+
+ # 失败用例详情部分
+ if failed_test_cases:
+ elements.append(to_para("失败用例详情分析", heading_style, escape=False))
+ elements.append(Spacer(1, 10))
+
+ # 按严重性分组失败用例
+ critical_failures = [tc for tc in failed_test_cases if tc['severity'] == 'CRITICAL']
+ high_failures = [tc for tc in failed_test_cases if tc['severity'] == 'HIGH']
+ medium_failures = [tc for tc in failed_test_cases if tc['severity'] == 'MEDIUM']
+ low_failures = [tc for tc in failed_test_cases if tc['severity'] == 'LOW']
+
+ failure_summary = f"""失败用例统计:
+总计 {len(failed_test_cases)} 个失败用例,其中:
+• 严重级别:{len(critical_failures)} 个
+• 高级别:{len(high_failures)} 个
+• 中级别:{len(medium_failures)} 个
+• 低级别:{len(low_failures)} 个
+
+以下是详细的失败原因分析:"""
+
+ elements.append(to_para(failure_summary, normal_style))
+ elements.append(Spacer(1, 15))
+
+ # 详细失败用例列表
+ for i, failed_case in enumerate(failed_test_cases, 1):
+ # 用例标题
+ case_title = f"{i}. {failed_case['case_name']}"
+ elements.append(to_para(case_title, ParagraphStyle('case_title', parent=normal_style, fontSize=11, textColor=colors.darkred, spaceAfter=5)))
+
+ # 用例基本信息
+ case_info = f"""• 用例ID:{failed_case['case_id']}
+• 所属端点:{failed_case['endpoint']}
+• 严重级别:{failed_case['severity']}
+• 执行状态:{failed_case['status']}"""
+
+ elements.append(to_para(case_info, ParagraphStyle('case_info', parent=small_style, leftIndent=15, spaceAfter=5)))
+
+ # 失败原因
+ failure_reason = failed_case.get('message', '无详细错误信息')
+ if failure_reason:
+ elements.append(to_para("失败原因:", ParagraphStyle('failure_label', parent=normal_style, fontSize=10, textColor=colors.darkblue, leftIndent=15)))
+
+ # 处理长文本,确保在PDF中正确显示
+ if len(failure_reason) > 200:
+ # 对于很长的错误信息,进行适当的分段
+ failure_reason = failure_reason[:200] + "..."
+
+ elements.append(to_para(failure_reason, ParagraphStyle('failure_reason', parent=small_style, leftIndent=30, rightIndent=20, spaceAfter=10, textColor=colors.red)))
+
+ # 添加分隔线
+ if i < len(failed_test_cases):
+ elements.append(HRFlowable(width="80%", thickness=0.5, color=colors.lightgrey))
+ elements.append(Spacer(1, 10))
+
+ elements.append(Spacer(1, 20))
+
+ elements.append(Spacer(1, 20))
+
+ # 测试情况说明
+ elements.append(to_para("测试情况说明", heading_style, escape=False))
+
+ test_situation_text = f"""本次测试是对DMS领域数据管理服务V1.0版本下的{overall.get('endpoints_tested', 'N/A')}个API进行验证测试。
+测试:累计发现缺陷{overall.get('test_cases_failed', 0)}个。
+测试执行时间:{start_time_formatted} 至 {end_time_formatted}
+测试环境:开发测试环境
+测试方法:自动化API合规性测试"""
+
+ elements.append(to_para(test_situation_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 测试结论
+ elements.append(to_para("测试结论", heading_style, escape=False))
+
+ # 根据测试结果生成结论
+ success_rate = overall.get('test_case_success_rate', '0%')
+ success_rate_num = float(success_rate.replace('%', '')) if success_rate != 'N/A' else 0
+
+ if success_rate_num >= 90:
+ conclusion_status = "通过"
+ conclusion_text = f"""本套领域数据服务已通过环境验证,系统可以正常运行。验收测试通过标准关于用例执行、DMS业务流相关文档等两个方面分析,该项目通过验收测试。
+测试用例成功率达到{success_rate},符合验收标准。"""
+ elif success_rate_num >= 70:
+ conclusion_status = "基本通过"
+ conclusion_text = f"""本套领域数据服务基本满足验收要求,但存在部分问题需要修复。测试用例成功率为{success_rate},建议修复失败用例后重新测试。"""
+ else:
+ conclusion_status = "不通过"
+ conclusion_text = f"""本套领域数据服务未达到验收标准,存在较多问题需要修复。测试用例成功率仅为{success_rate},需要全面检查和修复后重新测试。"""
+
+ elements.append(to_para(conclusion_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 检测依据
+ elements.append(to_para("检测依据", heading_style, escape=False))
+
+ detection_basis_text = """集成开发应用支撑系统开放数据生态数据共享要求和评价第1部分:关于DMS领域数据服务的接口要求和测试细则。
+参考标准:
+1. DMS数据管理系统API规范V1.0
+2. RESTful API设计规范
+3. 数据安全和隐私保护要求
+4. 系统集成测试标准"""
+
+ elements.append(to_para(detection_basis_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 报告生成信息
+ elements.append(to_para("报告生成信息", heading_style, escape=False))
+ generation_info_data = [
+ [to_para("生成时间", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日 %H:%M:%S'))],
+ [to_para("生成工具", escape=False), to_para("DMS合规性测试工具")],
+ [to_para("工具版本", escape=False), to_para("V1.0.0")],
+ [to_para("测试结论", escape=False), to_para(f"{conclusion_status}", escape=False)],
+ ]
+ generation_info_table = Table(generation_info_data, colWidths=[120, '*'])
+ generation_info_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
+ ]))
+ elements.append(generation_info_table)
+
+ # 构建PDF
+ doc.build(elements)
+ logger.info(f"PDF报告已成功生成: {output_path}")
+ except Exception as e:
+ logger.error(f"构建PDF文档时出错: {e}", exc_info=True)
@app.post("/run",
summary="执行API合规性测试",
@@ -383,16 +921,16 @@ def save_api_call_details_to_markdown(api_call_details: List[APICallDetail], out
执行API合规性测试的主要端点。
支持三种API定义源:
- - **YAPI**: 基于YAPI定义文件
- - **Swagger/OpenAPI**: 基于OpenAPI规范文件
- - **DMS**: 动态发现DMS服务的API
+ - YAPI: 基于YAPI定义文件
+ - Swagger/OpenAPI: 基于OpenAPI规范文件
+ - DMS: 动态发现DMS服务的API
- ### 分页支持
+ 分页支持
对于DMS测试,支持分页获取API列表,避免内存溢出:
- `page_size`: 每页获取的API数量(默认1000)
- 返回详细的分页统计信息
- ### LLM集成
+ LLM集成
可选择使用大语言模型生成测试数据:
- 智能生成请求体、路径参数、查询参数等
- 提高测试覆盖率和数据多样性
@@ -407,17 +945,36 @@ async def run_api_tests(config: TestConfig):
"""
执行API合规性测试
- - **config**: 测试配置,包含API定义源、测试参数等
- - **returns**: 测试结果,包含摘要信息和分页信息(如适用)
+ - config: 测试配置,包含API定义源、测试参数等
+ - returns: 测试结果,包含摘要信息和分页信息(如适用)
"""
try:
logger.info(f"Starting test run with configuration: {config.model_dump()}")
# Convert Pydantic model to dict for compatibility
config_dict = config.model_dump(exclude_none=True)
-
- # Replace underscores with hyphens for compatibility with original code
- config_dict = {k.replace('_', '-'): v for k, v in config_dict.items()}
+
+ # Add hidden parameters with default values
+ hidden_defaults = {
+ "categories": [],
+ "tags": [],
+ "ignore_ssl": True,
+ "output": "./test_reports",
+ "generate_pdf": True,
+ "custom_test_cases_dir": "./custom_testcases",
+ "stages_dir": "./custom_stages",
+ "llm_api_key": "sk-lbGrsUPL1iby86h554FaE536C343435dAa9bA65967A840B2",
+ "llm_base_url": "https://aiproxy.petrotech.cnpc/v1",
+ "llm_model_name": "deepseek-v3",
+ "use_llm_for_request_body": False,
+ "use_llm_for_path_params": False,
+ "use_llm_for_query_params": False,
+ "use_llm_for_headers": False,
+ "verbose": False
+ }
+
+ # Merge hidden defaults with config
+ config_dict.update(hidden_defaults)
result = run_tests_logic(config_dict)
@@ -456,8 +1013,8 @@ async def download_report(report_id: str, file_type: str = "summary.json"):
"""
下载测试报告文件
- - **report_id**: 报告ID(通常是时间戳)
- - **file_type**: 文件类型,可选值:summary.json, api_call_details.md
+ - report_id: 报告ID(通常是时间戳)
+ - file_type: 文件类型,可选值:summary.json, api_call_details.md
"""
try:
report_dir = Path("./test_reports") / report_id