This commit is contained in:
ruoyunbai 2025-08-27 22:53:09 +08:00
commit face5c4a10
15 changed files with 2564 additions and 182 deletions

View File

@ -0,0 +1,301 @@
@echo off
setlocal enabledelayedexpansion
echo === DMS Compliance Tool Windows Final Version ===
echo.
REM Set defaults to avoid input issues
set "SELECTED_SERVICE_ARCH=dual"
set "SELECTED_PORTS=5050,5051"
set "SERVICE_DESC=Dual Service - FastAPI Server and History Viewer"
set "TARGET_PLATFORM=linux/amd64"
set "TARGET_PLATFORM_NAME=AMD64 - Auto-detected"
REM Configuration
set "TIMESTAMP=%date:~10,4%%date:~4,2%%date:~7,2%-%time:~0,2%%time:~3,2%%time:~6,2%"
set "TIMESTAMP=%TIMESTAMP: =0%"
set "platform_suffix=amd64"
set "EXPORT_DIR=dms-compliance-dual-amd64-windows-%TIMESTAMP%"
set "IMAGE_NAME=compliance-dms-windows"
echo [INFO] Using default configuration:
echo [INFO] Architecture: %SERVICE_DESC%
echo [INFO] Ports: %SELECTED_PORTS%
echo [INFO] Platform: %TARGET_PLATFORM_NAME%
echo.
REM Check Docker
docker --version >nul 2>&1
if errorlevel 1 (
echo [ERROR] Docker not installed
pause
exit /b 1
)
docker info >nul 2>&1
if errorlevel 1 (
echo [ERROR] Docker not running
pause
exit /b 1
)
echo [SUCCESS] Docker environment OK
echo.
echo === Starting Build Process ===
REM Create directories
if exist "%EXPORT_DIR%" rmdir /s /q "%EXPORT_DIR%"
mkdir "%EXPORT_DIR%"
set "TEMP_BUILD_DIR=%TEMP%\dms-build-%RANDOM%"
mkdir "%TEMP_BUILD_DIR%"
REM Copy files
echo [Step 1/6] Copying project files...
if exist "ddms_compliance_suite" robocopy "ddms_compliance_suite" "%TEMP_BUILD_DIR%\ddms_compliance_suite" /E /XD __pycache__ /XF *.pyc >nul 2>&1
if exist "custom_stages" robocopy "custom_stages" "%TEMP_BUILD_DIR%\custom_stages" /E /XD __pycache__ /XF *.pyc >nul 2>&1
if exist "custom_testcases" robocopy "custom_testcases" "%TEMP_BUILD_DIR%\custom_testcases" /E /XD __pycache__ /XF *.pyc >nul 2>&1
if exist "templates" robocopy "templates" "%TEMP_BUILD_DIR%\templates" /E >nul 2>&1
if exist "static" robocopy "static" "%TEMP_BUILD_DIR%\static" /E >nul 2>&1
if exist "assets" robocopy "assets" "%TEMP_BUILD_DIR%\assets" /E >nul 2>&1
for %%f in (fastapi_server.py history_viewer.py flask_app.py web_interface.py requirements.txt) do (
if exist "%%f" copy "%%f" "%TEMP_BUILD_DIR%\" >nul
)
echo [SUCCESS] Files copied
echo.
REM Create Dockerfile
echo [Step 2/6] Creating Dockerfile...
cd /d "%TEMP_BUILD_DIR%"
(
echo FROM python:3.11-alpine
echo WORKDIR /app
echo COPY requirements.txt .
echo RUN pip install --no-cache-dir -r requirements.txt
echo RUN pip install --no-cache-dir fastapi uvicorn[standard]
echo RUN apk add --no-cache supervisor curl bash
echo COPY . .
echo RUN mkdir -p /var/log/supervisor
echo COPY supervisord.conf /etc/supervisor/conf.d/
echo EXPOSE 5050 5051
echo CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
) > Dockerfile
(
echo [supervisord]
echo nodaemon=true
echo logfile=/var/log/supervisor/supervisord.log
echo user=root
echo.
echo [program:api_server]
echo command=python -m uvicorn fastapi_server:app --host 0.0.0.0 --port 5050
echo directory=/app
echo autostart=true
echo autorestart=true
echo redirect_stderr=true
echo stdout_logfile=/var/log/supervisor/api_server.log
echo.
echo [program:history_viewer]
echo command=python history_viewer.py
echo directory=/app
echo autostart=true
echo autorestart=true
echo redirect_stderr=true
echo stdout_logfile=/var/log/supervisor/history_viewer.log
) > supervisord.conf
cd /d "%~dp0"
echo [SUCCESS] Dockerfile created
echo.
REM Copy to final directory
echo [Step 3/6] Copying build files...
robocopy "%TEMP_BUILD_DIR%" "%EXPORT_DIR%" /E >nul 2>&1
echo [SUCCESS] Build files copied
echo.
REM Create Docker Compose
echo [Step 4/6] Creating Docker Compose...
(
echo services:
echo dms-compliance:
echo image: %IMAGE_NAME%:latest
echo container_name: dms-compliance-tool
echo ports:
echo - "5050:5050"
echo - "5051:5051"
echo volumes:
echo - ./uploads:/app/uploads
echo - ./logs:/app/logs
echo restart: unless-stopped
echo healthcheck:
echo test: ["CMD", "curl", "-f", "http://localhost:5050/health"]
echo interval: 30s
echo timeout: 10s
echo retries: 3
echo start_period: 40s
) > "%EXPORT_DIR%\docker-compose.yml"
echo [SUCCESS] Docker Compose created
echo.
REM Create scripts
echo [Step 5/6] Creating cross-platform management scripts...
REM Windows batch scripts
(
echo @echo off
echo echo Starting DMS Compliance Tool...
echo echo Loading pre-built Docker image...
echo docker load -i docker-image.tar
echo echo Starting services...
echo docker compose up -d
echo echo Services started!
echo echo FastAPI Server: http://localhost:5050
echo echo History Viewer: http://localhost:5051
echo pause
) > "%EXPORT_DIR%\start.bat"
(
echo @echo off
echo docker compose down
echo echo Services stopped.
echo pause
) > "%EXPORT_DIR%\stop.bat"
(
echo @echo off
echo docker compose logs -f
) > "%EXPORT_DIR%\logs.bat"
REM Linux shell scripts
(
echo #!/bin/bash
echo.
echo echo "Starting DMS Compliance Tool..."
echo echo "Loading pre-built Docker image..."
echo docker load -i docker-image.tar
echo echo "Starting services..."
echo docker compose up -d
echo echo "Services started!"
echo echo "FastAPI Server: http://localhost:5050"
echo echo "History Viewer: http://localhost:5051"
) > "%EXPORT_DIR%\start.sh"
(
echo #!/bin/bash
echo.
echo echo "Stopping DMS Compliance Tool..."
echo docker compose down
echo echo "Services stopped."
) > "%EXPORT_DIR%\stop.sh"
(
echo #!/bin/bash
echo.
echo docker compose logs -f
) > "%EXPORT_DIR%\logs.sh"
REM Set permissions script for Linux
(
echo #!/bin/bash
echo chmod +x *.sh
echo echo "Permissions set for shell scripts"
) > "%EXPORT_DIR%\set-permissions.sh"
(
echo # DMS Compliance Tool - Cross-Platform Version
echo.
echo ## Quick Start
echo.
echo ### Windows:
echo 1. Run start.bat
echo 2. Access web interface
echo 3. Use stop.bat to stop
echo.
echo ### Linux/macOS:
echo 1. chmod +x *.sh ^(or run ./set-permissions.sh^)
echo 2. ./start.sh
echo 3. Access web interface
echo 4. ./stop.sh to stop
echo.
echo ## Architecture
echo - FastAPI Server: http://localhost:5050
echo - History Viewer: http://localhost:5051
echo - Dual service architecture with supervisor
echo.
echo ## Management Commands
echo Windows: start.bat, stop.bat, logs.bat
echo Linux: start.sh, stop.sh, logs.sh
echo.
echo ## Package Contents
echo - docker-compose.yml: Service configuration ^(uses pre-built image^)
echo - Dockerfile: Container build instructions ^(for reference^)
echo - docker-image.tar: Pre-built Docker image ^(fast startup^)
echo - Cross-platform management scripts ^(auto-load image^)
) > "%EXPORT_DIR%\README.md"
echo [SUCCESS] Cross-platform scripts created
echo.
REM Build image
echo [Step 6/6] Building Docker image...
cd /d "%EXPORT_DIR%"
docker build -t "%IMAGE_NAME%:latest" .
if errorlevel 1 (
echo [ERROR] Build failed
cd /d "%~dp0"
pause
exit /b 1
)
docker save "%IMAGE_NAME%:latest" -o docker-image.tar
cd /d "%~dp0"
REM Clean up source files (like multiplatform script)
echo [INFO] Cleaning up source files...
cd /d "%EXPORT_DIR%"
REM Remove source code directories (keep only deployment files)
if exist "ddms_compliance_suite" rmdir /s /q "ddms_compliance_suite"
if exist "custom_stages" rmdir /s /q "custom_stages"
if exist "custom_testcases" rmdir /s /q "custom_testcases"
if exist "templates" rmdir /s /q "templates"
if exist "static" rmdir /s /q "static"
if exist "assets" rmdir /s /q "assets"
REM Remove unnecessary source Python files (keep fastapi_server.py and history_viewer.py for runtime)
for %%f in (flask_app.py web_interface.py) do (
if exist "%%f" del "%%f"
)
echo [SUCCESS] Source files cleaned up
cd /d "%~dp0"
REM Create archive (like multiplatform script)
echo [INFO] Creating archive...
powershell -command "Compress-Archive -Path '%EXPORT_DIR%' -DestinationPath '%EXPORT_DIR%.zip' -Force"
REM Clean up temporary directories (like multiplatform script)
if exist "%TEMP_BUILD_DIR%" rmdir /s /q "%TEMP_BUILD_DIR%"
if exist "%EXPORT_DIR%" rmdir /s /q "%EXPORT_DIR%"
echo.
echo === Build Complete ===
echo [SUCCESS] Package created: %EXPORT_DIR%.zip
echo [INFO] Architecture: Dual Service - FastAPI Server and History Viewer
echo [INFO] Ports: 5050,5051
echo [INFO] FastAPI Server: http://localhost:5050
echo [INFO] History Viewer: http://localhost:5051
echo.
echo Package contents:
echo - Cross-platform scripts ^(Windows .bat and Linux .sh^)
echo - Docker Compose configuration
echo - Pre-built Docker image
echo - Clean deployment package ^(no source code^)
echo.
echo Build complete!
pause

View File

@ -282,56 +282,40 @@ cd "$TEMP_BUILD_DIR"
if [[ "$SELECTED_SERVICE_ARCH" == "dual" ]]; then
# 双服务架构 - 使用supervisor管理两个服务
cat > "Dockerfile" << 'EOF'
# 多阶段构建 - 构建阶段
FROM python:3.11-alpine AS builder
# 使用稳定的Python基础镜像
FROM python:3.11-alpine
# 安装构建依赖
RUN apk add --no-cache \
# 安装系统依赖
RUN apk update && apk add --no-cache \
gcc \
musl-dev \
libffi-dev \
openssl-dev \
cargo \
rust
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 创建虚拟环境并安装依赖
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip setuptools wheel
RUN pip install --no-cache-dir -r requirements.txt
# 运行阶段
FROM python:3.11-alpine AS runtime
# 安装运行时依赖
RUN apk add --no-cache \
python3-dev \
build-base \
linux-headers \
supervisor \
curl \
bash \
tzdata
# 从构建阶段复制虚拟环境
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
tzdata && \
rm -rf /var/cache/apk/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件并安装Python包
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip setuptools wheel && \
pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建supervisor配置
RUN mkdir -p /etc/supervisor/conf.d
COPY supervisord.conf /etc/supervisor/conf.d/
# 创建supervisor配置目录
RUN mkdir -p /etc/supervisor/conf.d /var/log/supervisor /app/logs /app/test_reports /app/uploads
# 创建必要目录
RUN mkdir -p /var/log/supervisor /app/logs /app/test_reports /app/uploads
# 复制supervisor配置
COPY supervisord.conf /etc/supervisor/conf.d/
# 创建非root用户
RUN addgroup -g 1000 appuser && \
@ -346,7 +330,7 @@ ENV FLASK_ENV=production
ENV PYTHONUNBUFFERED=1
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD curl -f http://localhost:5050/ || exit 1
# 暴露端口
@ -409,50 +393,38 @@ EOF
elif [[ "$SELECTED_SERVICE_ARCH" == "fastapi" ]]; then
cat > "Dockerfile" << 'EOF'
# 多阶段构建 - 构建阶段
FROM python:3.11-alpine AS builder
# 使用稳定的Python基础镜像
FROM python:3.11-alpine
# 安装构建依赖
RUN apk add --no-cache \
# 安装系统依赖
RUN apk update && apk add --no-cache \
gcc \
musl-dev \
libffi-dev \
openssl-dev \
cargo \
rust
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 创建虚拟环境并安装依赖
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip setuptools wheel
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir fastapi uvicorn[standard]
# 运行阶段
FROM python:3.11-alpine AS runtime
# 安装运行时依赖
RUN apk add --no-cache \
python3-dev \
build-base \
linux-headers \
curl \
bash \
tzdata
# 从构建阶段复制虚拟环境
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
tzdata && \
rm -rf /var/cache/apk/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件并安装Python包
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip setuptools wheel && \
pip install --no-cache-dir -r requirements.txt && \
pip install --no-cache-dir fastapi uvicorn[standard]
# 复制应用代码
COPY . .
# 创建必要目录
RUN mkdir -p /app/logs /app/uploads /app/reports
# 创建非root用户
RUN addgroup -g 1000 appuser && \
adduser -D -u 1000 -G appuser appuser && \
@ -460,8 +432,12 @@ RUN addgroup -g 1000 appuser && \
USER appuser
# 设置环境变量
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD curl -f http://localhost:5051/health || exit 1
# 暴露端口
@ -827,50 +803,38 @@ EOF
else
# Flask版本的Dockerfile
cat > "Dockerfile" << 'EOF'
# 多阶段构建 - 构建阶段
FROM python:3.11-alpine AS builder
# 使用稳定的Python基础镜像
FROM python:3.11-alpine
# 安装构建依赖
RUN apk add --no-cache \
# 安装系统依赖
RUN apk update && apk add --no-cache \
gcc \
musl-dev \
libffi-dev \
openssl-dev \
cargo \
rust
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 创建虚拟环境并安装依赖
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip setuptools wheel
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir flask gunicorn
# 运行阶段
FROM python:3.11-alpine AS runtime
# 安装运行时依赖
RUN apk add --no-cache \
python3-dev \
build-base \
linux-headers \
curl \
bash \
tzdata
# 从构建阶段复制虚拟环境
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
tzdata && \
rm -rf /var/cache/apk/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件并安装Python包
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip setuptools wheel && \
pip install --no-cache-dir -r requirements.txt && \
pip install --no-cache-dir flask gunicorn
# 复制应用代码
COPY . .
# 创建必要目录
RUN mkdir -p /app/logs /app/uploads /app/reports
# 创建非root用户
RUN addgroup -g 1000 appuser && \
adduser -D -u 1000 -G appuser appuser && \
@ -878,9 +842,14 @@ RUN addgroup -g 1000 appuser && \
USER appuser
# 设置环境变量
ENV PYTHONPATH=/app
ENV FLASK_ENV=production
ENV PYTHONUNBUFFERED=1
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:5050/health || exit 1
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD curl -f http://localhost:5050/ || exit 1
# 暴露端口
EXPOSE 5050

View File

@ -0,0 +1,11 @@
FROM python:3.11-alpine
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir fastapi uvicorn[standard]
RUN apk add --no-cache supervisor curl bash
COPY . .
RUN mkdir -p /var/log/supervisor
COPY supervisord.conf /etc/supervisor/conf.d/
EXPOSE 5050 5051
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]

View File

@ -0,0 +1,29 @@
# DMS Compliance Tool - Cross-Platform Version
## Quick Start
### Windows:
1. Run start.bat
2. Access web interface
3. Use stop.bat to stop
### Linux/macOS:
1. chmod +x *.sh (or run ./set-permissions.sh)
2. ./start.sh
3. Access web interface
4. ./stop.sh to stop
## Architecture
- FastAPI Server: http://localhost:5050
- History Viewer: http://localhost:5051
- Dual service architecture with supervisor
## Management Commands
Windows: start.bat, stop.bat, logs.bat
Linux: start.sh, stop.sh, logs.sh
## Package Contents
- docker-compose.yml: Service configuration (uses pre-built image)
- Dockerfile: Container build instructions (for reference)
- docker-image.tar: Pre-built Docker image (fast startup)
- Cross-platform management scripts (auto-load image)

View File

@ -0,0 +1,17 @@
services:
dms-compliance:
image: compliance-dms-windows:latest
container_name: dms-compliance-tool
ports:
- "5050:5050"
- "5051:5051"
volumes:
- ./uploads:/app/uploads
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5050/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,315 @@
import os
import sys
import json
import logging
import sqlite3
from pathlib import Path
from datetime import timedelta
from werkzeug.security import generate_password_hash, check_password_hash
from flask import Flask, request, jsonify, send_from_directory, session, redirect, url_for, render_template, g, flash, get_flashed_messages, abort
from flask_cors import CORS
from functools import wraps
import markdown
# --- PyInstaller Path Helpers ---
# For data files that should persist outside the bundle (e.g., database, reports)
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
# Running in a PyInstaller bundle
APP_ROOT = os.path.dirname(sys.executable)
else:
# Running in a normal Python environment
APP_ROOT = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(APP_ROOT, 'templates')
static_dir = os.path.join(APP_ROOT, 'static')
app = Flask(__name__, static_folder=static_dir, template_folder=template_dir)
CORS(app)
# --- 基本配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
DATABASE = os.path.join(APP_ROOT, 'users.db')
REPORTS_DIR = os.path.join(APP_ROOT, 'test_reports')
app.config['SECRET_KEY'] = os.urandom(24)
app.config['DATABASE'] = DATABASE
app.config['REPORTS_DIR'] = REPORTS_DIR
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
os.makedirs(app.config['REPORTS_DIR'], exist_ok=True)
# --- 数据库 Schema 和辅助函数 (与 flask_app.py 相同) ---
DB_SCHEMA = '''
DROP TABLE IF EXISTS user;
CREATE TABLE user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
);
'''
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(app.config['DATABASE'])
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def init_db(force_create=False):
if force_create or not os.path.exists(app.config['DATABASE']):
with app.app_context():
db = get_db()
db.cursor().executescript(DB_SCHEMA)
db.commit()
logger.info("数据库已初始化!")
create_default_user()
else:
logger.info("数据库已存在。")
def create_default_user(username="admin", password="7#Xq9$Lm*2!Pw@5"):
with app.app_context():
db = get_db()
user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
if user is None:
db.execute("INSERT INTO user (username, password_hash) VALUES (?, ?)", (username, generate_password_hash(password)))
db.commit()
logger.info(f"已创建默认用户: {username}")
else:
logger.info(f"默认用户 {username} 已存在。")
@app.cli.command('init-db')
def init_db_command():
init_db(force_create=True)
print("已初始化数据库。")
# --- 用户认证 (与 flask_app.py 相同) ---
@app.route('/login', methods=('GET', 'POST'))
def login():
if g.user:
return redirect(url_for('list_history'))
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
error = None
user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone()
if user is None:
error = '用户名不存在。'
elif not check_password_hash(user['password_hash'], password):
error = '密码错误。'
if error is None:
session.clear()
session['user_id'] = user['id']
session['username'] = user['username']
session.permanent = True
return redirect(url_for('list_history'))
flash(error)
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
flash('您已成功登出。')
return redirect(url_for('login'))
def login_required(view):
@wraps(view)
def wrapped_view(**kwargs):
# if g.user is None:
# return redirect(url_for('login'))
return view(**kwargs)
return wrapped_view
@app.before_request
def load_logged_in_user():
user_id = session.get('user_id')
if user_id is None:
g.user = None
else:
g.user = get_db().execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
# --- LLM配置视图 ---
CRITERIA_FILE_PATH = os.path.join(APP_ROOT, 'custom_testcases', 'llm', 'compliance_criteria.json')
@app.route('/llm-config', methods=['GET', 'POST'])
@login_required
def llm_config():
criteria_for_template = []
file_exists = os.path.exists(CRITERIA_FILE_PATH)
if request.method == 'POST':
# 从表单获取所有名为'criteria'的输入项,作为一个列表
criteria_list = request.form.getlist('criteria')
# 过滤掉用户可能提交的空规则
criteria_list = [item.strip() for item in criteria_list if item.strip()]
try:
# 将规则列表格式化为美观的JSON并保存
pretty_content = json.dumps(criteria_list, indent=2, ensure_ascii=False)
with open(CRITERIA_FILE_PATH, 'w', encoding='utf-8') as f:
f.write(pretty_content)
flash('LLM合规性标准已成功保存', 'success')
except Exception as e:
flash(f'保存文件时发生未知错误: {e}', 'error')
# 无论是GET还是POST请求后都重新从文件中读取最新的规则列表用于显示
if file_exists:
try:
with open(CRITERIA_FILE_PATH, 'r', encoding='utf-8') as f:
criteria_for_template = json.load(f)
# 确保文件内容确实是一个列表
if not isinstance(criteria_for_template, list):
flash('配置文件格式错误内容应为JSON数组。已重置为空列表。', 'error')
criteria_for_template = []
except Exception as e:
flash(f'读取配置文件时出错: {e}', 'error')
criteria_for_template = []
# 准备一个用于页面展示的示例API信息
example_api_info = {
"path_template": "/api/dms/instance/v1/message/push/myschema/1.0",
"method": "POST",
"title": "数据推送接口",
"description": "用于向系统推送标准格式的数据。",
"schema_request_body": {"...": "... (此处为请求体Schema定义)"},
"instance_url": "http://example.com/api/dms/instance/v1/message/push/myschema/1.0",
"instance_request_headers": {"X-Tenant-ID": "tenant-001", "...": "..."},
"instance_request_body": {"id": "123", "data": "example"},
"instance_response_status": 200,
"instance_response_body": {"code": 0, "message": "success", "data": True}
}
return render_template('llm_config.html', criteria=criteria_for_template, file_exists=file_exists, example_api_info=json.dumps(example_api_info, indent=2, ensure_ascii=False))
# --- 文件下载路由 ---
@app.route('/download/<path:run_id>/<path:filename>')
@login_required
def download_report(run_id, filename):
"""安全地提供指定运行记录中的报告文件下载。"""
# 清理输入,防止目录遍历攻击
run_id_safe = Path(run_id).name
filename_safe = Path(filename).name
reports_dir = Path(app.config['REPORTS_DIR']).resolve()
run_dir = (reports_dir / run_id_safe).resolve()
# 安全检查确保请求的目录是REPORTS_DIR的子目录
if not run_dir.is_dir() or run_dir.parent != reports_dir:
abort(404, "找不到指定的测试记录或权限不足。")
return send_from_directory(run_dir, filename_safe, as_attachment=True)
# --- 新增PDF文件预览路由 ---
@app.route('/view_pdf/<path:run_id>')
@login_required
def view_pdf_report(run_id):
"""安全地提供PDF报告文件以内联方式查看。"""
run_id_safe = Path(run_id).name
filename_safe = "report_cn.pdf"
reports_dir = Path(app.config['REPORTS_DIR']).resolve()
run_dir = (reports_dir / run_id_safe).resolve()
# 安全检查
if not run_dir.is_dir() or run_dir.parent != reports_dir:
abort(404, "找不到指定的测试记录或权限不足。")
pdf_path = run_dir / filename_safe
if not pdf_path.exists():
abort(404, "未找到PDF报告文件。")
return send_from_directory(run_dir, filename_safe)
# --- 历史记录视图 ---
@app.route('/')
@login_required
def list_history():
history = []
reports_path = Path(app.config['REPORTS_DIR'])
if not reports_path.is_dir():
flash('报告目录不存在。')
return render_template('history.html', history=[])
# 获取所有子目录(即测试运行记录)
run_dirs = [d for d in reports_path.iterdir() if d.is_dir()]
# 按名称(时间戳)降序排序
run_dirs.sort(key=lambda x: x.name, reverse=True)
for run_dir in run_dirs:
summary_path = run_dir / 'summary.json'
details_path = run_dir / 'api_call_details.md'
run_info = {'id': run_dir.name, 'summary': None, 'has_details': details_path.exists()}
if summary_path.exists():
try:
with open(summary_path, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
run_info['summary'] = summary_data
except (json.JSONDecodeError, IOError) as e:
logger.error(f"无法读取或解析摘要文件 {summary_path}: {e}")
run_info['summary'] = {'error': '无法加载摘要'}
history.append(run_info)
return render_template('history.html', history=history)
@app.route('/details/<run_id>')
@login_required
def show_details(run_id):
run_id = Path(run_id).name # Sanitize input
run_dir = Path(app.config['REPORTS_DIR']) / run_id
if not run_dir.is_dir():
return "找不到指定的测试记录。", 404
summary_path = run_dir / 'summary.json'
details_path = run_dir / 'api_call_details.md'
pdf_path = run_dir / 'report_cn.pdf' # 新增PDF路径
summary_content = "{}"
details_content = "### 未找到API调用详情报告"
has_pdf_report = pdf_path.exists() # 检查PDF是否存在
has_md_report = details_path.exists() # 检查MD报告是否存在
if summary_path.exists():
try:
with open(summary_path, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
summary_content = json.dumps(summary_data, indent=2, ensure_ascii=False)
except Exception as e:
summary_content = f"加载摘要文件出错: {e}"
if has_md_report:
try:
with open(details_path, 'r', encoding='utf-8') as f:
# 将Markdown转换为HTML
details_content = markdown.markdown(f.read(), extensions=['fenced_code', 'tables', 'def_list', 'attr_list'])
except Exception as e:
details_content = f"加载详情文件出错: {e}"
return render_template('history_detail.html',
run_id=run_id,
summary_content=summary_content,
details_content=details_content,
has_pdf_report=has_pdf_report,
has_md_report=has_md_report)
# --- 根路径重定向 ---
@app.route('/index')
def index_redirect():
return redirect(url_for('list_history'))
if __name__ == '__main__':
# 首次运行时确保数据库和用户存在
init_db()
# 使用5051端口避免与api_server.py冲突
app.run(debug=True, host='0.0.0.0', port=5051)

View File

@ -0,0 +1,22 @@
pydantic>=2.0.0,<3.0.0
PyYAML>=6.0,<7.0
jsonschema>=4.0.0,<5.0.0
requests>=2.20.0,<3.0.0
flask>=2.0.0,<3.0.0 # 用于模拟服务器
numpy>=1.20.0,<2.0.0 # 用于数值计算
# 用于 OpenAPI/Swagger 解析 (可选, 如果输入解析器需要)
openapi-spec-validator>=0.5.0,<0.6.0
prance[osv]>=23.0.0,<24.0.0
# 用于 API Linting (可选, 如果规则库需要集成 Spectral-like 功能)
# pyaml-env>=1.0.0,<2.0.0 # 如果 linting 规则是 yaml 且用到了环境变量
# 测试框架 (可选, 推荐)
# pytest>=7.0.0,<8.0.0
# pytest-cov>=4.0.0,<5.0.0
# httpx>=0.20.0,<0.28.0 # for testing API calls
Flask-Cors>=3.0
markdown
reportlab>=3.6.0 # For PDF report generation

View File

@ -0,0 +1,3 @@
#/bin/bash
chmod +x *.sh
echo "Permissions set for shell scripts"

View File

@ -0,0 +1,10 @@
@echo off
echo Starting DMS Compliance Tool...
echo Loading pre-built Docker image...
docker load -i docker-image.tar
echo Starting services...
docker compose up -d
echo Services started
echo FastAPI Server: http://localhost:5050
echo History Viewer: http://localhost:5051
pause

View File

@ -0,0 +1,10 @@
#/bin/bash
echo "Starting DMS Compliance Tool..."
echo "Loading pre-built Docker image..."
docker load -i docker-image.tar
echo "Starting services..."
docker compose up -d
echo "Services started"
echo "FastAPI Server: http://localhost:5050"
echo "History Viewer: http://localhost:5051"

View File

@ -0,0 +1,4 @@
@echo off
docker compose down
echo Services stopped.
pause

View File

@ -0,0 +1,5 @@
#/bin/bash
echo "Stopping DMS Compliance Tool..."
docker compose down
echo "Services stopped."

View File

@ -0,0 +1,20 @@
[supervisord]
nodaemon=true
logfile=/var/log/supervisor/supervisord.log
user=root
[program:api_server]
command=python -m uvicorn fastapi_server:app --host 0.0.0.0 --port 5050
directory=/app
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/api_server.log
[program:history_viewer]
command=python history_viewer.py
directory=/app
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/history_viewer.log

View File

@ -62,11 +62,11 @@ app = FastAPI(
主要特性
- 🚀 高性能: 基于FastAPI支持异步处理
- 📊 分页支持: 解决大量API节点的内存问题
- 📝 自动文档: 自动生成交互式API文档
- 🔧 灵活配置: 支持多种测试配置选项
- 📈 详细报告: 生成PDF和JSON格式的测试报告
🚀 高性能: 基于FastAPI支持异步处理
📊 分页支持: 解决大量API节点的内存问题
📝 自动文档: 自动生成交互式API文档
🔧 灵活配置: 支持多种测试配置选项
📈 详细报告: 生成PDF和JSON格式的测试报告
""",
version="1.0.0",
docs_url="/docs", # Swagger UI
@ -87,46 +87,20 @@ class TestConfig(BaseModel):
"""测试配置模型"""
# API定义源 (三选一)
yapi: Optional[str] = Field(None, description="YAPI定义文件路径", example="./api_spec.json")
swagger: Optional[str] = Field(None, description="Swagger/OpenAPI定义文件路径", example="./openapi.yaml")
dms: Optional[str] = Field(None, description="DMS服务发现的domain mapping文件路径", example="./assets/doc/dms/domain.json")
yapi: Optional[str] = Field(None, description="YAPI定义文件路径", exclude=True)
swagger: Optional[str] = Field(None, description="Swagger/OpenAPI定义文件路径", exclude=True)
dms: Optional[str] = Field("./assets/doc/dms/domain.json", description="DMS服务发现的domain mapping文件路径", example="./assets/doc/dms/domain.json")
# 基本配置
base_url: str = Field(..., description="API基础URL", example="https://api.example.com")
base_url: str = Field("https://www.dev.ideas.cnpc/", description="API基础URL", example="https://www.dev.ideas.cnpc/")
# 分页配置
page_size: int = Field(1000, description="DMS API分页大小默认1000。较小的值可以减少内存使用", ge=1, le=10000)
page_size: int = Field(10, description="DMS API分页大小默认10。较小的值可以减少内存使用", ge=1, le=10000)
page_no: int = Field(1, description="起始页码从1开始。可用于断点续传或跳过前面的页面", ge=1)
fetch_all_pages: bool = Field(True, description="是否获取所有页面。True=获取所有数据False=只获取指定页面")
fetch_all_pages: bool = Field(False, description="是否获取所有页面。True=获取所有数据False=只获取指定页面")
# 过滤选项
categories: Optional[List[str]] = Field(None, description="YAPI分类列表", example=["用户管理", "订单系统"])
tags: Optional[List[str]] = Field(None, description="Swagger标签列表", example=["user", "order"])
strictness_level: str = Field("CRITICAL", description="测试严格等级", pattern="^(CRITICAL|HIGH|MEDIUM|LOW)$")
# SSL和安全
ignore_ssl: bool = Field(False, description="忽略SSL证书验证不推荐在生产环境使用")
# 输出配置
output: str = Field("./test_reports", description="测试报告输出目录")
generate_pdf: bool = Field(True, description="是否生成PDF报告")
# 自定义测试
custom_test_cases_dir: Optional[str] = Field(None, description="自定义测试用例目录路径")
stages_dir: Optional[str] = Field(None, description="自定义测试阶段目录路径")
# LLM配置
llm_api_key: Optional[str] = Field(None, description="LLM API密钥")
llm_base_url: Optional[str] = Field(None, description="LLM API基础URL")
llm_model_name: Optional[str] = Field("gpt-3.5-turbo", description="LLM模型名称")
use_llm_for_request_body: bool = Field(False, description="使用LLM生成请求体")
use_llm_for_path_params: bool = Field(False, description="使用LLM生成路径参数")
use_llm_for_query_params: bool = Field(False, description="使用LLM生成查询参数")
use_llm_for_headers: bool = Field(False, description="使用LLM生成请求头")
# 调试选项
verbose: bool = Field(False, description="启用详细日志输出")
@field_validator('base_url')
@classmethod
def validate_base_url(cls, v):
@ -239,7 +213,7 @@ def run_tests_logic(config: dict):
output_directory = base_output_dir / timestamp
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Test reports will be saved to: {output_directory.resolve()}")
print(f"config{config}")
# Initialize the orchestrator
orchestrator = APITestOrchestrator(
base_url=config['base_url'],
@ -291,9 +265,7 @@ def run_tests_logic(config: dict):
if test_summary and config.get('stages_dir') and parsed_spec:
logger.info(f"Executing API test stages from directory: {config['stages_dir']}")
stage_summary = orchestrator.run_stages_from_spec(parsed_spec, config['stages_dir'])
if stage_summary:
test_summary.merge_stage_summary(stage_summary)
orchestrator.run_stages_from_spec(parsed_spec, test_summary)
if test_summary:
# Save main summary
@ -303,12 +275,17 @@ def run_tests_logic(config: dict):
# Save API call details
api_calls_filename = "api_call_details.md"
save_api_call_details_to_markdown(
save_api_call_details_to_file(
orchestrator.get_api_call_details(),
str(output_directory),
filename=api_calls_filename
)
# Generate PDF report if reportlab is available
if reportlab_available and config.get('generate_pdf', True):
pdf_report_path = output_directory / "report_cn.pdf"
save_pdf_report(test_summary.to_dict(), pdf_report_path, config.get('strictness_level', 'CRITICAL'))
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
@ -335,47 +312,608 @@ def run_tests_logic(config: dict):
"traceback": traceback.format_exc()
}
def save_api_call_details_to_markdown(api_call_details: List[APICallDetail], output_dir: str, filename: str = "api_call_details.md"):
"""Save API call details to markdown file"""
def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
"""
将API调用详情列表保存到指定目录下的 Markdown 文件中
同时额外生成一个纯文本文件 (.txt)每行包含一个 cURL 命令
"""
if not api_call_details:
logger.info("没有API调用详情可供保存。")
return
output_dir = Path(output_dir_path)
try:
output_path = Path(output_dir) / filename
output_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"创建API调用详情输出目录 {output_dir} 失败: {e}")
return
with open(output_path, 'w', encoding='utf-8') as f:
f.write("# API调用详情\n\n")
# 主文件是 Markdown 文件
md_output_file = output_dir / filename
# 确保它是 .md尽管 main 函数应该已经处理了
if md_output_file.suffix.lower() not in ['.md', '.markdown']:
md_output_file = md_output_file.with_suffix('.md')
for i, detail in enumerate(api_call_details, 1):
f.write(f"## {i}. {detail.endpoint_name}\n\n")
f.write(f"**请求URL**: `{detail.request_url}`\n\n")
f.write(f"**请求方法**: `{detail.request_method}`\n\n")
markdown_content = []
if detail.request_headers:
f.write("**请求头**:\n```json\n")
f.write(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
f.write("\n```\n\n")
for detail in api_call_details:
# Request URL with params (if any)
url_to_display = detail.request_url
if detail.request_params:
try:
# Ensure urllib is available for this formatting step
import urllib.parse
query_string = urllib.parse.urlencode(detail.request_params)
url_to_display = f"{detail.request_url}?{query_string}"
except Exception as e:
logger.warning(f"Error formatting URL with params for display: {e}")
# Fallback to just the base URL if params formatting fails
markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
markdown_content.append("**cURL Command:**")
markdown_content.append("```sh")
markdown_content.append(detail.curl_command)
markdown_content.append("```")
markdown_content.append("### Request Details")
markdown_content.append(f"- **Method:** `{detail.request_method}`")
markdown_content.append(f"- **Full URL:** `{url_to_display}`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_body:
f.write("**请求体**:\n```json\n")
f.write(json.dumps(detail.request_body, indent=2, ensure_ascii=False))
f.write("\n```\n\n")
if detail.request_params:
markdown_content.append("- **Query Parameters:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
markdown_content.append("```")
f.write(f"**响应状态码**: `{detail.response_status_code}`\n\n")
if detail.request_body is not None:
markdown_content.append("- **Body:**")
body_lang = "text"
formatted_body = str(detail.request_body)
try:
# Try to parse as JSON for pretty printing
if isinstance(detail.request_body, str):
try:
parsed_json = json.loads(detail.request_body)
formatted_body = json.dumps(parsed_json, indent=2, ensure_ascii=False)
body_lang = "json"
except json.JSONDecodeError:
pass # Keep as text
elif isinstance(detail.request_body, (dict, list)):
formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False)
body_lang = "json"
except Exception as e:
logger.warning(f"Error formatting request body for Markdown: {e}")
markdown_content.append(f"```{body_lang}")
markdown_content.append(formatted_body)
markdown_content.append("```")
if detail.response_headers:
f.write("**响应头**:\n```json\n")
f.write(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
f.write("\n```\n\n")
markdown_content.append("### Response Details")
markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.response_body:
f.write("**响应体**:\n```json\n")
f.write(json.dumps(detail.response_body, indent=2, ensure_ascii=False))
f.write("\n```\n\n")
if detail.response_body is not None:
markdown_content.append("- **Body:**")
resp_body_lang = "text"
formatted_resp_body = str(detail.response_body)
try:
# Try to parse as JSON for pretty printing
if isinstance(detail.response_body, str):
try:
# If it's already a string that might be JSON, try parsing and re-dumping
parsed_json_resp = json.loads(detail.response_body)
formatted_resp_body = json.dumps(parsed_json_resp, indent=2, ensure_ascii=False)
resp_body_lang = "json"
except json.JSONDecodeError:
# It's a string, but not valid JSON, keep as text
pass
elif isinstance(detail.response_body, (dict, list)):
# It's already a dict/list, dump it as JSON
formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False)
resp_body_lang = "json"
# If it's neither string nor dict/list (e.g. int, bool from parsed json), str() is fine.
except Exception as e:
logger.warning(f"Error formatting response body for Markdown: {e}")
f.write("---\n\n")
logger.info(f"API call details saved to: {output_path}")
markdown_content.append(f"```{resp_body_lang}")
markdown_content.append(formatted_resp_body)
markdown_content.append("```")
markdown_content.append("") # Add a blank line for spacing before next --- or EOF
markdown_content.append("---") # Separator
try:
with open(md_output_file, 'w', encoding='utf-8') as f_md:
f_md.write("\n".join(markdown_content))
logger.info(f"API调用详情已保存为 Markdown: {md_output_file}")
except Exception as e:
logger.error(f"Error saving API call details: {e}")
logger.error(f"保存API调用详情到 Markdown 文件 {md_output_file} 失败: {e}", exc_info=True)
def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'):
"""将测试摘要保存为格式化的PDF文件"""
logger.info(f"开始生成PDF报告: {output_path}")
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
# --- 统一的字体管理和注册 ---
font_name = 'SimSun' # 使用一个简单清晰的注册名
font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
if not Path(font_path).exists():
logger.error(f"字体文件未找到: {Path(font_path).resolve()}")
return
# 关键修复: 对于 .ttc (TrueType Collection) 文件, 必须指定 subfontIndex
pdfmetrics.registerFont(TTFont(font_name, font_path, subfontIndex=0))
# 将注册的字体关联到 'SimSun' 字体族
pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
elements = []
# --- 统一样式定义, 全部使用注册的字体名 ---
styles = getSampleStyleSheet()
title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12)
def to_para(text, style=normal_style, escape=True):
"""
根据用户建议移除 textwrap 以进行诊断
此版本只包含净化和基本的换行符替换
"""
if text is None:
content = ""
else:
content = str(text)
if escape:
content = html.escape(content)
# 依然保留Unicode控制字符的净化
content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
if not content.strip():
# 对于完全空白或None的输入返回一个安全的非换行空格
return Paragraph('&nbsp;', style)
# 只使用基本的换行符替换
content = content.replace('\n', '<br/>')
return Paragraph(content, style)
# 3. 填充PDF内容 - 优化后的报告格式
# 生成报告编码(基于时间戳)
import time
report_code = f"DMS-TEST-{int(time.time())}"
# 报告标题
elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False))
elements.append(Spacer(1, 15))
# 报告基本信息表格
basic_info_data = [
[to_para("<b>报告编码</b>", escape=False), to_para(report_code)],
[to_para("<b>报告名称</b>", escape=False), to_para("DMS领域数据服务测试分析报告")],
[to_para("<b>申请日期</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d'))],
[to_para("<b>申请人</b>", escape=False), to_para("系统管理员")],
[to_para("<b>服务供应商名称</b>", escape=False), to_para("数据管理系统(DMS)")],
]
basic_info_table = Table(basic_info_data, colWidths=[120, '*'])
basic_info_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
]))
elements.append(basic_info_table)
elements.append(Spacer(1, 20))
# 摘要部分
elements.append(to_para("摘要", heading_style, escape=False))
overall = summary_data.get('overall_summary', {})
# 从JSON提取并格式化时间
try:
start_time_str = summary_data.get('start_time', 'N/A')
end_time_str = summary_data.get('end_time', 'N/A')
duration = summary_data.get('duration_seconds', summary_data.get('duration', 0.0))
start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
except:
start_time_formatted = start_time_str
end_time_formatted = end_time_str
# 摘要内容 - 安全计算跳过的数量
def safe_subtract(total, passed, failed):
"""安全地计算跳过的数量"""
try:
if isinstance(total, (int, float)) and isinstance(passed, (int, float)) and isinstance(failed, (int, float)):
return max(0, total - passed - failed)
else:
return 0
except:
return 0
endpoints_tested = overall.get('endpoints_tested', 0)
endpoints_passed = overall.get('endpoints_passed', 0)
endpoints_failed = overall.get('endpoints_failed', 0)
endpoints_skipped = safe_subtract(endpoints_tested, endpoints_passed, endpoints_failed)
test_cases_executed = overall.get('total_test_cases_executed', 0)
test_cases_passed = overall.get('test_cases_passed', 0)
test_cases_failed = overall.get('test_cases_failed', 0)
test_cases_skipped = safe_subtract(test_cases_executed, test_cases_passed, test_cases_failed)
stages_executed = overall.get('total_stages_executed', 0)
stages_passed = overall.get('stages_passed', 0)
stages_failed = overall.get('stages_failed', 0)
stages_skipped = safe_subtract(stages_executed, stages_passed, stages_failed)
summary_text = f"""本次测试针对DMS数据管理系统领域数据服务进行全面的合规性验证。
测试时间{start_time_formatted} {end_time_formatted}总耗时 {float(duration):.2f}
共测试 {endpoints_tested} 个API端点其中 {endpoints_passed} 个通过{endpoints_failed} 个失败{endpoints_skipped} 个跳过端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}
执行 {test_cases_executed} 个测试用例其中 {test_cases_passed} 个通过{test_cases_failed} 个失败{test_cases_skipped} 个跳过测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}
执行 {stages_executed} 个流程测试其中 {stages_passed} 个通过{stages_failed} 个失败{stages_skipped} 个跳过流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}"""
elements.append(to_para(summary_text, normal_style))
elements.append(Spacer(1, 20))
# 测试内容包括 - API列表表格
elements.append(to_para("测试内容包括", heading_style, escape=False))
# 从测试结果中提取API信息
endpoint_results = summary_data.get('endpoint_results', [])
api_list_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>服务名称</b>", escape=False),
to_para("<b>服务功能描述</b>", escape=False), to_para("<b>服务参数描述</b>", escape=False),
to_para("<b>服务返回值描述</b>", escape=False)]
]
for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API
endpoint_name = endpoint.get('endpoint_name', 'N/A')
# 简化的功能描述
if 'Create' in endpoint_name:
func_desc = "提供数据创建服务"
elif 'List' in endpoint_name or 'Query' in endpoint_name:
func_desc = "提供数据查询和列表服务"
elif 'Read' in endpoint_name:
func_desc = "提供单条数据读取服务"
elif 'Update' in endpoint_name:
func_desc = "提供数据更新服务"
elif 'Delete' in endpoint_name:
func_desc = "提供数据删除服务"
else:
func_desc = "提供数据管理服务"
api_list_data.append([
to_para(str(i), small_style),
to_para(endpoint_name, small_style),
to_para(func_desc, small_style),
to_para("标准DMS参数格式", small_style),
to_para("标准DMS响应格式", small_style)
])
api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80])
api_list_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(api_list_table)
elements.append(Spacer(1, 20))
# 测试用例列表 - 根据严格等级分为必须和非必须
elements.append(to_para("测试用例列表", heading_style, escape=False))
# 定义严重性等级的数值映射
severity_levels = {
'CRITICAL': 5,
'HIGH': 4,
'MEDIUM': 3,
'LOW': 2,
'INFO': 1
}
strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL
# 收集所有测试用例包括endpoint用例和stage用例
all_test_cases = []
failed_test_cases = [] # 专门收集失败的测试用例
# 1. 收集endpoint测试用例
for endpoint_result in endpoint_results:
test_cases = endpoint_result.get('executed_test_cases', [])
for tc in test_cases:
tc_severity = tc.get('test_case_severity', 'MEDIUM')
tc_severity_value = severity_levels.get(tc_severity, 3)
tc_status = tc.get('status', 'N/A')
tc_message = tc.get('message', '')
test_case_info = {
'type': 'Endpoint',
'endpoint': endpoint_result.get('endpoint_name', 'N/A'),
'endpoint_id': endpoint_result.get('endpoint_id', 'N/A'),
'case_name': tc.get('test_case_name', 'N/A'),
'case_id': tc.get('test_case_id', 'N/A'),
'status': tc_status,
'message': tc_message,
'severity': tc_severity,
'severity_value': tc_severity_value,
'is_required': tc_severity_value >= strictness_value,
'duration': tc.get('duration_seconds', 0),
'timestamp': tc.get('timestamp', '')
}
all_test_cases.append(test_case_info)
# 收集失败的测试用例
if tc_status in ['失败', 'FAILED', '错误', 'ERROR']:
failed_test_cases.append(test_case_info)
# 2. 收集stage测试用例
stage_results = summary_data.get('stage_results', [])
for stage_result in stage_results:
stage_name = stage_result.get('stage_name', 'N/A')
stage_status = stage_result.get('overall_status', 'N/A')
stage_message = stage_result.get('message', stage_result.get('error_message', ''))
stage_severity = 'HIGH' # Stage用例通常是高优先级
stage_severity_value = severity_levels.get(stage_severity, 4)
# 将stage作为一个测试用例添加
stage_case_info = {
'type': 'Stage',
'endpoint': f"Stage: {stage_name}",
'endpoint_id': f"STAGE_{stage_name}",
'case_name': stage_result.get('description', stage_name),
'case_id': f"STAGE_{stage_name}",
'status': stage_status,
'message': stage_message,
'severity': stage_severity,
'severity_value': stage_severity_value,
'is_required': stage_severity_value >= strictness_value,
'duration': stage_result.get('duration_seconds', 0),
'timestamp': stage_result.get('start_time', '')
}
all_test_cases.append(stage_case_info)
# 收集失败的stage用例
if stage_status in ['失败', 'FAILED', '错误', 'ERROR']:
failed_test_cases.append(stage_case_info)
# 分离必须和非必须的测试用例
required_cases = [case for case in all_test_cases if case['is_required']]
optional_cases = [case for case in all_test_cases if not case['is_required']]
# 创建分离的测试用例表格
if all_test_cases:
# 添加严格等级说明
strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。"
elements.append(to_para(strictness_text, small_style))
elements.append(Spacer(1, 10))
# 1. 必须的测试用例表格
if required_cases:
elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False))
required_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(required_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
required_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45])
required_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(required_table)
elements.append(Spacer(1, 15))
# 2. 非必须的测试用例表格
if optional_cases:
elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False))
optional_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(optional_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
optional_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45])
optional_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(optional_table)
elements.append(Spacer(1, 10))
# 添加用例统计信息
total_cases = len(all_test_cases)
endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint'])
stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage'])
required_count = len(required_cases)
optional_count = len(optional_cases)
stats_text = f"""测试用例统计:
总计 {total_cases} 个用例其中端点用例 {endpoint_cases} 阶段用例 {stage_cases}
必须用例 {required_count} 非必须用例 {optional_count}
严格等级{strictness_level}{severity_levels.get(strictness_level, 5)}级及以上为必须"""
elements.append(to_para(stats_text, small_style))
else:
elements.append(to_para("无测试用例执行记录。", normal_style))
elements.append(Spacer(1, 20))
# 失败用例详情部分
if failed_test_cases:
elements.append(to_para("失败用例详情分析", heading_style, escape=False))
elements.append(Spacer(1, 10))
# 按严重性分组失败用例
critical_failures = [tc for tc in failed_test_cases if tc['severity'] == 'CRITICAL']
high_failures = [tc for tc in failed_test_cases if tc['severity'] == 'HIGH']
medium_failures = [tc for tc in failed_test_cases if tc['severity'] == 'MEDIUM']
low_failures = [tc for tc in failed_test_cases if tc['severity'] == 'LOW']
failure_summary = f"""失败用例统计:
总计 {len(failed_test_cases)} 个失败用例其中
严重级别{len(critical_failures)}
高级别{len(high_failures)}
中级别{len(medium_failures)}
低级别{len(low_failures)}
以下是详细的失败原因分析"""
elements.append(to_para(failure_summary, normal_style))
elements.append(Spacer(1, 15))
# 详细失败用例列表
for i, failed_case in enumerate(failed_test_cases, 1):
# 用例标题
case_title = f"{i}. {failed_case['case_name']}"
elements.append(to_para(case_title, ParagraphStyle('case_title', parent=normal_style, fontSize=11, textColor=colors.darkred, spaceAfter=5)))
# 用例基本信息
case_info = f"""• 用例ID{failed_case['case_id']}
所属端点{failed_case['endpoint']}
严重级别{failed_case['severity']}
执行状态{failed_case['status']}"""
elements.append(to_para(case_info, ParagraphStyle('case_info', parent=small_style, leftIndent=15, spaceAfter=5)))
# 失败原因
failure_reason = failed_case.get('message', '无详细错误信息')
if failure_reason:
elements.append(to_para("失败原因:", ParagraphStyle('failure_label', parent=normal_style, fontSize=10, textColor=colors.darkblue, leftIndent=15)))
# 处理长文本确保在PDF中正确显示
if len(failure_reason) > 200:
# 对于很长的错误信息,进行适当的分段
failure_reason = failure_reason[:200] + "..."
elements.append(to_para(failure_reason, ParagraphStyle('failure_reason', parent=small_style, leftIndent=30, rightIndent=20, spaceAfter=10, textColor=colors.red)))
# 添加分隔线
if i < len(failed_test_cases):
elements.append(HRFlowable(width="80%", thickness=0.5, color=colors.lightgrey))
elements.append(Spacer(1, 10))
elements.append(Spacer(1, 20))
elements.append(Spacer(1, 20))
# 测试情况说明
elements.append(to_para("测试情况说明", heading_style, escape=False))
test_situation_text = f"""本次测试是对DMS领域数据管理服务V1.0版本下的{overall.get('endpoints_tested', 'N/A')}个API进行验证测试。
测试累计发现缺陷{overall.get('test_cases_failed', 0)}
测试执行时间{start_time_formatted} {end_time_formatted}
测试环境开发测试环境
测试方法自动化API合规性测试"""
elements.append(to_para(test_situation_text, normal_style))
elements.append(Spacer(1, 20))
# 测试结论
elements.append(to_para("测试结论", heading_style, escape=False))
# 根据测试结果生成结论
success_rate = overall.get('test_case_success_rate', '0%')
success_rate_num = float(success_rate.replace('%', '')) if success_rate != 'N/A' else 0
if success_rate_num >= 90:
conclusion_status = "通过"
conclusion_text = f"""本套领域数据服务已通过环境验证系统可以正常运行。验收测试通过标准关于用例执行、DMS业务流相关文档等两个方面分析该项目通过验收测试。
测试用例成功率达到{success_rate}符合验收标准"""
elif success_rate_num >= 70:
conclusion_status = "基本通过"
conclusion_text = f"""本套领域数据服务基本满足验收要求,但存在部分问题需要修复。测试用例成功率为{success_rate},建议修复失败用例后重新测试。"""
else:
conclusion_status = "不通过"
conclusion_text = f"""本套领域数据服务未达到验收标准,存在较多问题需要修复。测试用例成功率仅为{success_rate},需要全面检查和修复后重新测试。"""
elements.append(to_para(conclusion_text, normal_style))
elements.append(Spacer(1, 20))
# 检测依据
elements.append(to_para("检测依据", heading_style, escape=False))
detection_basis_text = """集成开发应用支撑系统开放数据生态数据共享要求和评价第1部分关于DMS领域数据服务的接口要求和测试细则。
参考标准
1. DMS数据管理系统API规范V1.0
2. RESTful API设计规范
3. 数据安全和隐私保护要求
4. 系统集成测试标准"""
elements.append(to_para(detection_basis_text, normal_style))
elements.append(Spacer(1, 20))
# 报告生成信息
elements.append(to_para("报告生成信息", heading_style, escape=False))
generation_info_data = [
[to_para("<b>生成时间</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d%H:%M:%S'))],
[to_para("<b>生成工具</b>", escape=False), to_para("DMS合规性测试工具")],
[to_para("<b>工具版本</b>", escape=False), to_para("V1.0.0")],
[to_para("<b>测试结论</b>", escape=False), to_para(f"<b>{conclusion_status}</b>", escape=False)],
]
generation_info_table = Table(generation_info_data, colWidths=[120, '*'])
generation_info_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
]))
elements.append(generation_info_table)
# 构建PDF
doc.build(elements)
logger.info(f"PDF报告已成功生成: {output_path}")
except Exception as e:
logger.error(f"构建PDF文档时出错: {e}", exc_info=True)
@app.post("/run",
summary="执行API合规性测试",
@ -383,16 +921,16 @@ def save_api_call_details_to_markdown(api_call_details: List[APICallDetail], out
执行API合规性测试的主要端点
支持三种API定义源
- **YAPI**: 基于YAPI定义文件
- **Swagger/OpenAPI**: 基于OpenAPI规范文件
- **DMS**: 动态发现DMS服务的API
- YAPI: 基于YAPI定义文件
- Swagger/OpenAPI: 基于OpenAPI规范文件
- DMS: 动态发现DMS服务的API
### 分页支持
分页支持
对于DMS测试支持分页获取API列表避免内存溢出
- `page_size`: 每页获取的API数量默认1000
- 返回详细的分页统计信息
### LLM集成
LLM集成
可选择使用大语言模型生成测试数据
- 智能生成请求体路径参数查询参数等
- 提高测试覆盖率和数据多样性
@ -407,17 +945,36 @@ async def run_api_tests(config: TestConfig):
"""
执行API合规性测试
- **config**: 测试配置包含API定义源测试参数等
- **returns**: 测试结果包含摘要信息和分页信息如适用
- config: 测试配置包含API定义源测试参数等
- returns: 测试结果包含摘要信息和分页信息如适用
"""
try:
logger.info(f"Starting test run with configuration: {config.model_dump()}")
# Convert Pydantic model to dict for compatibility
config_dict = config.model_dump(exclude_none=True)
# Replace underscores with hyphens for compatibility with original code
config_dict = {k.replace('_', '-'): v for k, v in config_dict.items()}
# Add hidden parameters with default values
hidden_defaults = {
"categories": [],
"tags": [],
"ignore_ssl": True,
"output": "./test_reports",
"generate_pdf": True,
"custom_test_cases_dir": "./custom_testcases",
"stages_dir": "./custom_stages",
"llm_api_key": "sk-lbGrsUPL1iby86h554FaE536C343435dAa9bA65967A840B2",
"llm_base_url": "https://aiproxy.petrotech.cnpc/v1",
"llm_model_name": "deepseek-v3",
"use_llm_for_request_body": False,
"use_llm_for_path_params": False,
"use_llm_for_query_params": False,
"use_llm_for_headers": False,
"verbose": False
}
# Merge hidden defaults with config
config_dict.update(hidden_defaults)
result = run_tests_logic(config_dict)
@ -456,8 +1013,8 @@ async def download_report(report_id: str, file_type: str = "summary.json"):
"""
下载测试报告文件
- **report_id**: 报告ID通常是时间戳
- **file_type**: 文件类型可选值summary.json, api_call_details.md
- report_id: 报告ID通常是时间戳
- file_type: 文件类型可选值summary.json, api_call_details.md
"""
try:
report_dir = Path("./test_reports") / report_id