compliance/run_api_tests.py
2025-08-19 17:03:32 +08:00

1048 lines
51 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
API测试工具
此工具使用DDMS测试编排器从YAPI或Swagger定义执行API测试。
支持使用规则库进行高级验证。
"""
import os
import sys
import json
import logging
import argparse
import datetime
import subprocess
from pathlib import Path
from typing import List, Optional
import string # 导入string模块用于字符过滤
import unicodedata # 导入unicodedata用于字符净化
import html
# PDF生成库 - 使用ReportLab并打包字体
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont, TTFError
reportlab_available = True
except ImportError:
reportlab_available = False
from ddms_compliance_suite.api_caller.caller import APICallDetail
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary
from ddms_compliance_suite.input_parser.parser import ParsedAPISpec
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='DDMS API测试工具')
# 基本参数
parser.add_argument('--base-url', required=True, help='API基础URL')
parser.add_argument('--verbose', '-v', action='store_true', help='启用详细日志')
parser.add_argument('--output', '-o', help='输出目录或主报告文件路径 (例如 ./test_reports/ 或 ./test_reports/summary.json)')
parser.add_argument('--format', choices=['json', 'html'], default='json', help='主测试摘要报告的输出格式')
parser.add_argument('--api-calls-output', help='API 调用详情的 Markdown 输出文件路径 (例如 ./test_reports/api_calls.md)。如果未提供,将尝试使用 --output 目录和默认文件名 api_call_details.md。始终会额外生成一个同名的 .txt 文件包含纯 cURL 命令。')
parser.add_argument('--generate-pdf', action='store_true', help='是否生成中文PDF报告', default=True)
# API定义参数
api_group = parser.add_argument_group('API定义源')
api_group.add_argument('--yapi', help='YAPI定义文件路径')
api_group.add_argument('--swagger', help='Swagger定义文件路径')
api_group.add_argument('--dms', help='DMS服务发现的domain mapping文件路径')
api_group.add_argument('--page-size', type=int, default=1000,
help='DMS API分页大小默认1000。较小的值可以减少内存使用但会增加请求次数')
api_group.add_argument('--page-no', type=int, default=1,
help='DMS API起始页码从1开始。可用于断点续传或跳过前面的页面')
api_group.add_argument('--fetch-single-page', action='store_true',
help='只获取指定页面的数据,而不是获取所有页面。用于快速测试或内存受限环境')
# 过滤参数
filter_group = parser.add_argument_group('过滤选项')
filter_group.add_argument('--categories', help='YAPI分类逗号分隔')
filter_group.add_argument('--tags', help='Swagger标签逗号分隔')
filter_group.add_argument('--list-categories', action='store_true', help='列出YAPI分类')
filter_group.add_argument('--list-tags', action='store_true', help='列出Swagger标签')
filter_group.add_argument('--strictness-level',
choices=['CRITICAL', 'HIGH', 'MEDIUM', 'LOW'],
default='CRITICAL',
help='设置测试的严格等级。只有严重性等于或高于此级别的失败用例才会导致API端点被标记为失败。')
filter_group.add_argument('--ignore-ssl', action='store_true',
help='忽略SSL证书验证不推荐在生产环境使用')
# 新增:自定义测试用例参数组
custom_tc_group = parser.add_argument_group('自定义测试用例选项')
custom_tc_group.add_argument('--custom-test-cases-dir',
default=None, # 或者 './custom_testcases' 如果想设为默认
help='存放自定义APITestCase Python文件的目录路径。如果未提供则不加载自定义测试。')
# 新增LLM 配置选项
llm_group = parser.add_argument_group('LLM 配置选项 (可选)')
llm_group.add_argument('--llm-api-key',
# default=os.environ.get("OPENAI_API_KEY"), # 尝试从环境变量获取
# default='sk-0213c70194624703a1d0d80e0f762b0e',
default='sk-lbGrsUPL1iby86h554FaE536C343435dAa9bA65967A840B2',
help='LLM服务的API密钥 (例如 OpenAI API Key)。默认从环境变量 OPENAI_API_KEY 读取。')
llm_group.add_argument('--llm-base-url',
# default="https://dashscope.aliyuncs.com/compatible-mode/v1",
default="https://aiproxy.petrotech.cnpc/v1",
help='LLM服务的自定义基础URL (例如 OpenAI API代理)。')
llm_group.add_argument('--llm-model-name',
# default="qwen-plus", # 设置一个常用的默认模型
default="deepseek-v3", # 设置一个常用的默认模型
help='要使用的LLM模型名称 (例如 "gpt-3.5-turbo", "gpt-4")。')
llm_group.add_argument('--use-llm-for-request-body',
action='store_true',
default=False, # 默认不使用LLM生成请求体
help='是否启用LLM为API请求生成请求体数据。')
llm_group.add_argument('--use-llm-for-path-params',
action='store_true',
default=False,
help='是否启用LLM为API请求生成路径参数。')
llm_group.add_argument('--use-llm-for-query-params',
action='store_true',
default=False,
help='是否启用LLM为API请求生成查询参数。')
llm_group.add_argument('--use-llm-for-headers',
action='store_true',
default=False,
help='是否启用LLM为API请求生成头部参数。')
# 新增:场景测试参数组
scenario_group = parser.add_argument_group('API测试阶段 (Stage) 选项 (可选)')
scenario_group.add_argument('--stages-dir',
default=None,
help='存放自定义APIStage Python文件的目录路径。如果未提供则不执行测试阶段。')
return parser.parse_args()
def list_yapi_categories(yapi_file:str):
"""列出YAPI分类"""
from ddms_compliance_suite.input_parser.parser import InputParser
logger.info(f"从YAPI文件解析分类: {yapi_file}")
parser = InputParser()
parsed_yapi = parser.parse_yapi_spec(yapi_file)
if not parsed_yapi:
logger.error(f"解析YAPI文件失败: {yapi_file}")
return
print("\nYAPI分类:")
for i, category in enumerate(parsed_yapi.categories, 1):
print(f"{i}. {category.get('name', '未命名')} - {category.get('desc', '无描述')}")
def list_swagger_tags(swagger_file: str):
"""列出Swagger标签"""
from ddms_compliance_suite.input_parser.parser import InputParser
logger.info(f"从Swagger文件解析标签: {swagger_file}")
parser = InputParser()
parsed_swagger = parser.parse_swagger_spec(swagger_file)
if not parsed_swagger:
logger.error(f"解析Swagger文件失败: {swagger_file}")
return
print("\nSwagger标签:")
for i, tag in enumerate(parsed_swagger.tags, 1):
print(f"{i}. {tag.get('name', '未命名')} - {tag.get('description', '无描述')}")
def save_results(summary: TestSummary, output_file_path: str, format_type: str):
"""保存主测试摘要结果"""
output_path = Path(output_file_path)
# Ensure the directory for the output file exists
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"Error creating directory for output file {output_path.parent}: {e}")
return
if format_type == 'json':
with open(output_path, 'w', encoding='utf-8') as f:
f.write(summary.to_json(pretty=True))
logger.info(f"测试结果已保存为JSON: {output_path}")
elif format_type == 'html':
# Creating simple HTML report
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>API测试报告</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.summary {{ background-color: #f5f5f5; padding: 15px; border-radius: 5px; }}
.pass {{ color: green; }}
.fail {{ color: red; }}
.error {{ color: orange; }}
.skip {{ color: gray; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
tr:nth-child(even) {{ background-color: #f9f9f9; }}
</style>
</head>
<body>
<h1>API测试报告</h1>
<div class="summary">
<h2>测试结果摘要</h2>
<p>总测试数: {summary.total_test_cases_executed}</p>
<p class="pass">通过: {summary.test_cases_passed}</p>
<p class="fail">失败: {summary.test_cases_failed}</p>
<p class="error">错误: {summary.test_cases_error}</p>
<p class="skip">跳过: {summary.test_cases_skipped_in_endpoint}</p>
<p>成功率: {summary.test_case_success_rate:.2f}%</p>
<p>总耗时: {summary.duration:.2f}秒</p>
<p>开始时间: {summary.start_time.isoformat()}</p>
<p>结束时间: {summary.end_time.isoformat() if summary.end_time else 'N/A'}</p>
</div>
<h2>详细测试结果</h2>
<table>
<tr>
<th>端点</th>
<th>测试用例ID</th>
<th>测试用例名称</th>
<th>状态</th>
<th>消息</th>
<th>耗时(秒)</th>
</tr>
"""
for endpoint_result in summary.detailed_results:
for tc_result in endpoint_result.executed_test_cases:
status_class = "pass" if tc_result.status == ExecutedTestCaseResult.Status.PASSED else \
"fail" if tc_result.status == ExecutedTestCaseResult.Status.FAILED else \
"error" if tc_result.status == ExecutedTestCaseResult.Status.ERROR else "skip"
html_content += f"""
<tr>
<td>{endpoint_result.endpoint_name} ({endpoint_result.endpoint_id})</td>
<td>{tc_result.test_case_id}</td>
<td>{tc_result.test_case_name}</td>
<td class="{status_class}">{tc_result.status.value}</td>
<td>{tc_result.message}</td>
<td>{tc_result.duration:.4f}</td>
</tr>
"""
html_content += """
</table>
</body>
</html>
"""
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
logger.info(f"测试结果已保存为HTML: {output_path}")
def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
"""
将API调用详情列表保存到指定目录下的 Markdown 文件中。
同时,额外生成一个纯文本文件 (.txt),每行包含一个 cURL 命令。
"""
if not api_call_details:
logger.info("没有API调用详情可供保存。")
return
output_dir = Path(output_dir_path)
try:
output_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"创建API调用详情输出目录 {output_dir} 失败: {e}")
return
# 主文件是 Markdown 文件
md_output_file = output_dir / filename
# 确保它是 .md尽管 main 函数应该已经处理了
if md_output_file.suffix.lower() not in ['.md', '.markdown']:
md_output_file = md_output_file.with_suffix('.md')
markdown_content = []
for detail in api_call_details:
# Request URL with params (if any)
url_to_display = detail.request_url
if detail.request_params:
try:
# Ensure urllib is available for this formatting step
import urllib.parse
query_string = urllib.parse.urlencode(detail.request_params)
url_to_display = f"{detail.request_url}?{query_string}"
except Exception as e:
logger.warning(f"Error formatting URL with params for display: {e}")
# Fallback to just the base URL if params formatting fails
markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
markdown_content.append("**cURL Command:**")
markdown_content.append("```sh")
markdown_content.append(detail.curl_command)
markdown_content.append("```")
markdown_content.append("### Request Details")
markdown_content.append(f"- **Method:** `{detail.request_method}`")
markdown_content.append(f"- **Full URL:** `{url_to_display}`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_params:
markdown_content.append("- **Query Parameters:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_body is not None:
markdown_content.append("- **Body:**")
body_lang = "text"
formatted_body = str(detail.request_body)
try:
# Try to parse as JSON for pretty printing
if isinstance(detail.request_body, str):
try:
parsed_json = json.loads(detail.request_body)
formatted_body = json.dumps(parsed_json, indent=2, ensure_ascii=False)
body_lang = "json"
except json.JSONDecodeError:
pass # Keep as text
elif isinstance(detail.request_body, (dict, list)):
formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False)
body_lang = "json"
except Exception as e:
logger.warning(f"Error formatting request body for Markdown: {e}")
markdown_content.append(f"```{body_lang}")
markdown_content.append(formatted_body)
markdown_content.append("```")
markdown_content.append("### Response Details")
markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.response_body is not None:
markdown_content.append("- **Body:**")
resp_body_lang = "text"
formatted_resp_body = str(detail.response_body)
try:
# Try to parse as JSON for pretty printing
if isinstance(detail.response_body, str):
try:
# If it's already a string that might be JSON, try parsing and re-dumping
parsed_json_resp = json.loads(detail.response_body)
formatted_resp_body = json.dumps(parsed_json_resp, indent=2, ensure_ascii=False)
resp_body_lang = "json"
except json.JSONDecodeError:
# It's a string, but not valid JSON, keep as text
pass
elif isinstance(detail.response_body, (dict, list)):
# It's already a dict/list, dump it as JSON
formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False)
resp_body_lang = "json"
# If it's neither string nor dict/list (e.g. int, bool from parsed json), str() is fine.
except Exception as e:
logger.warning(f"Error formatting response body for Markdown: {e}")
markdown_content.append(f"```{resp_body_lang}")
markdown_content.append(formatted_resp_body)
markdown_content.append("```")
markdown_content.append("") # Add a blank line for spacing before next --- or EOF
markdown_content.append("---") # Separator
try:
with open(md_output_file, 'w', encoding='utf-8') as f_md:
f_md.write("\n".join(markdown_content))
logger.info(f"API调用详情已保存为 Markdown: {md_output_file}")
except Exception as e:
logger.error(f"保存API调用详情到 Markdown 文件 {md_output_file} 失败: {e}", exc_info=True)
def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'):
"""将测试摘要保存为格式化的PDF文件"""
logger.info(f"开始生成PDF报告: {output_path}")
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
# --- 统一的字体管理和注册 ---
font_name = 'SimSun' # 使用一个简单清晰的注册名
font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
if not Path(font_path).exists():
logger.error(f"字体文件未找到: {Path(font_path).resolve()}")
return
# 关键修复: 对于 .ttc (TrueType Collection) 文件, 必须指定 subfontIndex
pdfmetrics.registerFont(TTFont(font_name, font_path, subfontIndex=0))
# 将注册的字体关联到 'SimSun' 字体族
pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
elements = []
# --- 统一样式定义, 全部使用注册的字体名 ---
styles = getSampleStyleSheet()
title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12)
def to_para(text, style=normal_style, escape=True):
"""
根据用户建议移除 textwrap 以进行诊断。
此版本只包含净化和基本的换行符替换。
"""
if text is None:
content = ""
else:
content = str(text)
if escape:
content = html.escape(content)
# 依然保留Unicode控制字符的净化
content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
if not content.strip():
# 对于完全空白或None的输入返回一个安全的非换行空格
return Paragraph('&nbsp;', style)
# 只使用基本的换行符替换
content = content.replace('\n', '<br/>')
return Paragraph(content, style)
# 3. 填充PDF内容 - 优化后的报告格式
# 生成报告编码(基于时间戳)
import time
report_code = f"DMS-TEST-{int(time.time())}"
# 报告标题
elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False))
elements.append(Spacer(1, 15))
# 报告基本信息表格
basic_info_data = [
[to_para("<b>报告编码</b>", escape=False), to_para(report_code)],
[to_para("<b>报告名称</b>", escape=False), to_para("DMS领域数据服务测试分析报告")],
[to_para("<b>申请日期</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d'))],
[to_para("<b>申请人</b>", escape=False), to_para("系统管理员")],
[to_para("<b>服务供应商名称</b>", escape=False), to_para("数据管理系统(DMS)")],
]
basic_info_table = Table(basic_info_data, colWidths=[120, '*'])
basic_info_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
]))
elements.append(basic_info_table)
elements.append(Spacer(1, 20))
# 摘要部分
elements.append(to_para("摘要", heading_style, escape=False))
overall = summary_data.get('overall_summary', {})
# 从JSON提取并格式化时间
try:
start_time_str = summary_data.get('start_time', 'N/A')
end_time_str = summary_data.get('end_time', 'N/A')
duration = summary_data.get('duration_seconds', summary_data.get('duration', 0.0))
start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
except:
start_time_formatted = start_time_str
end_time_formatted = end_time_str
# 摘要内容 - 安全计算跳过的数量
def safe_subtract(total, passed, failed):
"""安全地计算跳过的数量"""
try:
if isinstance(total, (int, float)) and isinstance(passed, (int, float)) and isinstance(failed, (int, float)):
return max(0, total - passed - failed)
else:
return 0
except:
return 0
endpoints_tested = overall.get('endpoints_tested', 0)
endpoints_passed = overall.get('endpoints_passed', 0)
endpoints_failed = overall.get('endpoints_failed', 0)
endpoints_skipped = safe_subtract(endpoints_tested, endpoints_passed, endpoints_failed)
test_cases_executed = overall.get('total_test_cases_executed', 0)
test_cases_passed = overall.get('test_cases_passed', 0)
test_cases_failed = overall.get('test_cases_failed', 0)
test_cases_skipped = safe_subtract(test_cases_executed, test_cases_passed, test_cases_failed)
stages_executed = overall.get('total_stages_executed', 0)
stages_passed = overall.get('stages_passed', 0)
stages_failed = overall.get('stages_failed', 0)
stages_skipped = safe_subtract(stages_executed, stages_passed, stages_failed)
summary_text = f"""本次测试针对DMS数据管理系统领域数据服务进行全面的合规性验证。
测试时间:{start_time_formatted}{end_time_formatted},总耗时 {float(duration):.2f} 秒。
共测试 {endpoints_tested} 个API端点其中 {endpoints_passed} 个通过,{endpoints_failed} 个失败,{endpoints_skipped} 个跳过,端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}
执行 {test_cases_executed} 个测试用例,其中 {test_cases_passed} 个通过,{test_cases_failed} 个失败,{test_cases_skipped} 个跳过,测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}
执行 {stages_executed} 个流程测试,其中 {stages_passed} 个通过,{stages_failed} 个失败,{stages_skipped} 个跳过,流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}"""
elements.append(to_para(summary_text, normal_style))
elements.append(Spacer(1, 20))
# 测试内容包括 - API列表表格
elements.append(to_para("测试内容包括", heading_style, escape=False))
# 从测试结果中提取API信息
endpoint_results = summary_data.get('endpoint_results', [])
api_list_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>服务名称</b>", escape=False),
to_para("<b>服务功能描述</b>", escape=False), to_para("<b>服务参数描述</b>", escape=False),
to_para("<b>服务返回值描述</b>", escape=False)]
]
for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API
endpoint_name = endpoint.get('endpoint_name', 'N/A')
# 简化的功能描述
if 'Create' in endpoint_name:
func_desc = "提供数据创建服务"
elif 'List' in endpoint_name or 'Query' in endpoint_name:
func_desc = "提供数据查询和列表服务"
elif 'Read' in endpoint_name:
func_desc = "提供单条数据读取服务"
elif 'Update' in endpoint_name:
func_desc = "提供数据更新服务"
elif 'Delete' in endpoint_name:
func_desc = "提供数据删除服务"
else:
func_desc = "提供数据管理服务"
api_list_data.append([
to_para(str(i), small_style),
to_para(endpoint_name, small_style),
to_para(func_desc, small_style),
to_para("标准DMS参数格式", small_style),
to_para("标准DMS响应格式", small_style)
])
api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80])
api_list_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(api_list_table)
elements.append(Spacer(1, 20))
# 测试用例列表 - 根据严格等级分为必须和非必须
elements.append(to_para("测试用例列表", heading_style, escape=False))
# 定义严重性等级的数值映射
severity_levels = {
'CRITICAL': 5,
'HIGH': 4,
'MEDIUM': 3,
'LOW': 2,
'INFO': 1
}
strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL
# 收集所有测试用例包括endpoint用例和stage用例
all_test_cases = []
failed_test_cases = [] # 专门收集失败的测试用例
# 1. 收集endpoint测试用例
for endpoint_result in endpoint_results:
test_cases = endpoint_result.get('executed_test_cases', [])
for tc in test_cases:
tc_severity = tc.get('test_case_severity', 'MEDIUM')
tc_severity_value = severity_levels.get(tc_severity, 3)
tc_status = tc.get('status', 'N/A')
tc_message = tc.get('message', '')
test_case_info = {
'type': 'Endpoint',
'endpoint': endpoint_result.get('endpoint_name', 'N/A'),
'endpoint_id': endpoint_result.get('endpoint_id', 'N/A'),
'case_name': tc.get('test_case_name', 'N/A'),
'case_id': tc.get('test_case_id', 'N/A'),
'status': tc_status,
'message': tc_message,
'severity': tc_severity,
'severity_value': tc_severity_value,
'is_required': tc_severity_value >= strictness_value,
'duration': tc.get('duration_seconds', 0),
'timestamp': tc.get('timestamp', '')
}
all_test_cases.append(test_case_info)
# 收集失败的测试用例
if tc_status in ['失败', 'FAILED', '错误', 'ERROR']:
failed_test_cases.append(test_case_info)
# 2. 收集stage测试用例
stage_results = summary_data.get('stage_results', [])
for stage_result in stage_results:
stage_name = stage_result.get('stage_name', 'N/A')
stage_status = stage_result.get('overall_status', 'N/A')
stage_message = stage_result.get('message', stage_result.get('error_message', ''))
stage_severity = 'HIGH' # Stage用例通常是高优先级
stage_severity_value = severity_levels.get(stage_severity, 4)
# 将stage作为一个测试用例添加
stage_case_info = {
'type': 'Stage',
'endpoint': f"Stage: {stage_name}",
'endpoint_id': f"STAGE_{stage_name}",
'case_name': stage_result.get('description', stage_name),
'case_id': f"STAGE_{stage_name}",
'status': stage_status,
'message': stage_message,
'severity': stage_severity,
'severity_value': stage_severity_value,
'is_required': stage_severity_value >= strictness_value,
'duration': stage_result.get('duration_seconds', 0),
'timestamp': stage_result.get('start_time', '')
}
all_test_cases.append(stage_case_info)
# 收集失败的stage用例
if stage_status in ['失败', 'FAILED', '错误', 'ERROR']:
failed_test_cases.append(stage_case_info)
# 分离必须和非必须的测试用例
required_cases = [case for case in all_test_cases if case['is_required']]
optional_cases = [case for case in all_test_cases if not case['is_required']]
# 创建分离的测试用例表格
if all_test_cases:
# 添加严格等级说明
strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。"
elements.append(to_para(strictness_text, small_style))
elements.append(Spacer(1, 10))
# 1. 必须的测试用例表格
if required_cases:
elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False))
required_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(required_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
required_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45])
required_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(required_table)
elements.append(Spacer(1, 15))
# 2. 非必须的测试用例表格
if optional_cases:
elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False))
optional_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(optional_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
optional_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45])
optional_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(optional_table)
elements.append(Spacer(1, 10))
# 添加用例统计信息
total_cases = len(all_test_cases)
endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint'])
stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage'])
required_count = len(required_cases)
optional_count = len(optional_cases)
stats_text = f"""测试用例统计:
总计 {total_cases} 个用例,其中端点用例 {endpoint_cases} 个,阶段用例 {stage_cases} 个。
必须用例 {required_count} 个,非必须用例 {optional_count} 个。
严格等级:{strictness_level}{severity_levels.get(strictness_level, 5)}级及以上为必须)。"""
elements.append(to_para(stats_text, small_style))
else:
elements.append(to_para("无测试用例执行记录。", normal_style))
elements.append(Spacer(1, 20))
# 失败用例详情部分
if failed_test_cases:
elements.append(to_para("失败用例详情分析", heading_style, escape=False))
elements.append(Spacer(1, 10))
# 按严重性分组失败用例
critical_failures = [tc for tc in failed_test_cases if tc['severity'] == 'CRITICAL']
high_failures = [tc for tc in failed_test_cases if tc['severity'] == 'HIGH']
medium_failures = [tc for tc in failed_test_cases if tc['severity'] == 'MEDIUM']
low_failures = [tc for tc in failed_test_cases if tc['severity'] == 'LOW']
failure_summary = f"""失败用例统计:
总计 {len(failed_test_cases)} 个失败用例,其中:
• 严重级别:{len(critical_failures)}
• 高级别:{len(high_failures)}
• 中级别:{len(medium_failures)}
• 低级别:{len(low_failures)}
以下是详细的失败原因分析:"""
elements.append(to_para(failure_summary, normal_style))
elements.append(Spacer(1, 15))
# 详细失败用例列表
for i, failed_case in enumerate(failed_test_cases, 1):
# 用例标题
case_title = f"{i}. {failed_case['case_name']}"
elements.append(to_para(case_title, ParagraphStyle('case_title', parent=normal_style, fontSize=11, textColor=colors.darkred, spaceAfter=5)))
# 用例基本信息
case_info = f"""• 用例ID{failed_case['case_id']}
• 所属端点:{failed_case['endpoint']}
• 严重级别:{failed_case['severity']}
• 执行状态:{failed_case['status']}"""
elements.append(to_para(case_info, ParagraphStyle('case_info', parent=small_style, leftIndent=15, spaceAfter=5)))
# 失败原因
failure_reason = failed_case.get('message', '无详细错误信息')
if failure_reason:
elements.append(to_para("失败原因:", ParagraphStyle('failure_label', parent=normal_style, fontSize=10, textColor=colors.darkblue, leftIndent=15)))
# 处理长文本确保在PDF中正确显示
if len(failure_reason) > 200:
# 对于很长的错误信息,进行适当的分段
failure_reason = failure_reason[:200] + "..."
elements.append(to_para(failure_reason, ParagraphStyle('failure_reason', parent=small_style, leftIndent=30, rightIndent=20, spaceAfter=10, textColor=colors.red)))
# 添加分隔线
if i < len(failed_test_cases):
elements.append(HRFlowable(width="80%", thickness=0.5, color=colors.lightgrey))
elements.append(Spacer(1, 10))
elements.append(Spacer(1, 20))
elements.append(Spacer(1, 20))
# 测试情况说明
elements.append(to_para("测试情况说明", heading_style, escape=False))
test_situation_text = f"""本次测试是对DMS领域数据管理服务V1.0版本下的{overall.get('endpoints_tested', 'N/A')}个API进行验证测试。
测试:累计发现缺陷{overall.get('test_cases_failed', 0)}个。
测试执行时间:{start_time_formatted}{end_time_formatted}
测试环境:开发测试环境
测试方法自动化API合规性测试"""
elements.append(to_para(test_situation_text, normal_style))
elements.append(Spacer(1, 20))
# 测试结论
elements.append(to_para("测试结论", heading_style, escape=False))
# 根据测试结果生成结论
success_rate = overall.get('test_case_success_rate', '0%')
success_rate_num = float(success_rate.replace('%', '')) if success_rate != 'N/A' else 0
if success_rate_num >= 90:
conclusion_status = "通过"
conclusion_text = f"""本套领域数据服务已通过环境验证系统可以正常运行。验收测试通过标准关于用例执行、DMS业务流相关文档等两个方面分析该项目通过验收测试。
测试用例成功率达到{success_rate},符合验收标准。"""
elif success_rate_num >= 70:
conclusion_status = "基本通过"
conclusion_text = f"""本套领域数据服务基本满足验收要求,但存在部分问题需要修复。测试用例成功率为{success_rate},建议修复失败用例后重新测试。"""
else:
conclusion_status = "不通过"
conclusion_text = f"""本套领域数据服务未达到验收标准,存在较多问题需要修复。测试用例成功率仅为{success_rate},需要全面检查和修复后重新测试。"""
elements.append(to_para(conclusion_text, normal_style))
elements.append(Spacer(1, 20))
# 检测依据
elements.append(to_para("检测依据", heading_style, escape=False))
detection_basis_text = """集成开发应用支撑系统开放数据生态数据共享要求和评价第1部分关于DMS领域数据服务的接口要求和测试细则。
参考标准:
1. DMS数据管理系统API规范V1.0
2. RESTful API设计规范
3. 数据安全和隐私保护要求
4. 系统集成测试标准"""
elements.append(to_para(detection_basis_text, normal_style))
elements.append(Spacer(1, 20))
# 报告生成信息
elements.append(to_para("报告生成信息", heading_style, escape=False))
generation_info_data = [
[to_para("<b>生成时间</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d%H:%M:%S'))],
[to_para("<b>生成工具</b>", escape=False), to_para("DMS合规性测试工具")],
[to_para("<b>工具版本</b>", escape=False), to_para("V1.0.0")],
[to_para("<b>测试结论</b>", escape=False), to_para(f"<b>{conclusion_status}</b>", escape=False)],
]
generation_info_table = Table(generation_info_data, colWidths=[120, '*'])
generation_info_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
]))
elements.append(generation_info_table)
# 构建PDF
doc.build(elements)
logger.info(f"PDF报告已成功生成: {output_path}")
except Exception as e:
logger.error(f"构建PDF文档时出错: {e}", exc_info=True)
def main():
"""主函数"""
args = parse_args()
if args.verbose:
logging.getLogger('ddms_compliance_suite').setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.debug("已启用详细日志模式")
if not args.yapi and not args.swagger and not args.dms:
logger.error("请提供API定义源--yapi, --swagger, 或 --dms")
sys.exit(1)
if (args.yapi and args.swagger) or (args.yapi and args.dms) or (args.swagger and args.dms):
logger.error("API定义源 --yapi, --swagger, 和 --dms 是互斥的,请只选择一个。")
sys.exit(1)
if args.list_categories and args.yapi:
list_yapi_categories(args.yapi)
sys.exit(0)
if args.list_tags and args.swagger:
list_swagger_tags(args.swagger)
sys.exit(0)
categories = args.categories.split(',') if args.categories else None
tags = args.tags.split(',') if args.tags else None
# 确定基础输出目录
base_output_dir = Path("./test_reports")
if args.output:
# 如果提供了 --output将其作为所有时间戳报告的基础目录。
# 这简化了逻辑:--output 始终是一个目录。
base_output_dir = Path(args.output)
# 为本次测试运行创建一个唯一的、带时间戳的子目录
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_directory = base_output_dir / timestamp
# 主摘要报告的文件路径现在位于新目录内
main_report_file_path = output_directory / f"summary.{args.format}"
try:
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"测试报告将保存到: {output_directory.resolve()}")
except OSError as e:
logger.error(f"创建输出目录失败 {output_directory}: {e}")
sys.exit(1)
orchestrator = APITestOrchestrator(
base_url=args.base_url,
custom_test_cases_dir=args.custom_test_cases_dir,
llm_api_key=args.llm_api_key,
llm_base_url=args.llm_base_url,
llm_model_name=args.llm_model_name,
use_llm_for_request_body=args.use_llm_for_request_body,
use_llm_for_path_params=args.use_llm_for_path_params,
use_llm_for_query_params=args.use_llm_for_query_params,
use_llm_for_headers=args.use_llm_for_headers,
output_dir=str(output_directory),
stages_dir=args.stages_dir, # 将 stages_dir 传递给编排器
strictness_level=args.strictness_level,
ignore_ssl=args.ignore_ssl,
enable_well_data=True # 默认启用井数据功能
)
test_summary: Optional[TestSummary] = None
parsed_spec_for_scenarios: Optional[ParsedAPISpec] = None # 用于存储已解析的规范,供场景使用
try:
if args.yapi:
logger.info(f"从YAPI文件运行测试: {args.yapi}")
# orchestrator.run_tests_from_yapi 现在返回一个元组
test_summary, parsed_spec_for_scenarios = orchestrator.run_tests_from_yapi(
yapi_file_path=args.yapi,
categories=categories,
custom_test_cases_dir=args.custom_test_cases_dir
)
if not parsed_spec_for_scenarios: # 检查解析是否成功
logger.error(f"YAPI文件 '{args.yapi}' 解析失败 (由编排器报告)。程序将退出。")
sys.exit(1)
elif args.swagger:
logger.info(f"从Swagger文件运行测试: {args.swagger}")
# orchestrator.run_tests_from_swagger 现在返回一个元组
test_summary, parsed_spec_for_scenarios = orchestrator.run_tests_from_swagger(
swagger_file_path=args.swagger,
tags=tags,
custom_test_cases_dir=args.custom_test_cases_dir
)
if not parsed_spec_for_scenarios: # 检查解析是否成功
logger.error(f"Swagger文件 '{args.swagger}' 解析失败 (由编排器报告)。程序将退出。")
sys.exit(1)
elif args.dms:
logger.info(f"从DMS服务动态发现运行测试: {args.dms}")
test_summary, parsed_spec_for_scenarios, pagination_info = orchestrator.run_tests_from_dms(
domain_mapping_path=args.dms,
categories=categories,
custom_test_cases_dir=args.custom_test_cases_dir,
ignore_ssl=args.ignore_ssl,
page_size=args.page_size,
page_no_start=args.page_no,
fetch_all_pages=not args.fetch_single_page
)
if not parsed_spec_for_scenarios: # 检查解析是否成功
logger.error(f"从DMS服务 '{args.dms}' 解析失败 (由编排器报告)。程序将退出。")
sys.exit(1)
# 显示分页信息
if pagination_info:
logger.info(f"DMS分页信息: 总记录数={pagination_info.get('total_records', 0)}, "
f"页面大小={pagination_info.get('page_size', 0)}, "
f"起始页码={pagination_info.get('page_no_start', 1)}, "
f"当前页码={pagination_info.get('current_page', 1)}, "
f"获取页数={pagination_info.get('pages_fetched', 0)}/{pagination_info.get('total_pages', 0)}")
else:
logger.warning("未获取到分页信息")
except Exception as e:
logger.error(f"执行测试用例时发生意外错误: {e}", exc_info=True)
sys.exit(1)
if test_summary:
# 在保存单个测试用例结果之后运行API测试阶段 (如果指定了目录)
if args.stages_dir and parsed_spec_for_scenarios:
logger.info(f"开始执行API测试阶段 (Stages),目录: {args.stages_dir}")
# 注意:这里假设 test_orchestrator.py 中已经有了 run_stages_from_spec 方法
# 并且 APITestOrchestrator 的 __init__ 也接受 stages_dir
orchestrator.run_stages_from_spec( # 调用 run_stages_from_spec
# stages_dir is managed by orchestrator's __init__
parsed_spec=parsed_spec_for_scenarios, # 使用之前解析的规范
summary=test_summary # 将阶段结果添加到同一个摘要对象
)
logger.info("API测试阶段 (Stages) 执行完毕。")
# 阶段执行后,摘要已更新,重新最终确定和打印摘要
test_summary.finalize_summary() # 重新计算总时长等
test_summary.print_summary_to_console() # 打印包含阶段结果的更新摘要
# 保存主测试摘要,格式由用户指定
main_report_file_path = output_directory / f"summary.{args.format}"
save_results(test_summary, str(main_report_file_path), args.format)
# 如果需要生成PDF报告
if args.generate_pdf:
json_summary_path_for_pdf = output_directory / "summary.json"
# 如果用户请求的不是json我们需要额外生成一份json文件作为PDF的数据源
if args.format != 'json':
with open(json_summary_path_for_pdf, 'w', encoding='utf-8') as f:
f.write(test_summary.to_json(pretty=True))
logger.info(f"为生成PDF而创建临时JSON摘要: {json_summary_path_for_pdf}")
pdf_report_path = output_directory / "report_cn.pdf"
save_pdf_report(test_summary.to_dict(), pdf_report_path, args.strictness_level)
# API调用详情报告也应保存在同一时间戳目录中
api_calls_filename = "api_call_details.md"
if orchestrator:
save_api_call_details_to_file(
orchestrator.get_api_call_details(),
str(output_directory),
filename=api_calls_filename
)
# Improved HTML report summary access
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
if failed_count > 0 or error_count > 0:
logger.info("部分测试失败或出错,请检查报告。")
# sys.exit(1) # Keep this commented if a report is always desired regardless of outcome
else:
logger.info("所有测试完成。")
else:
logger.error("未能生成测试摘要。")
sys.exit(1)
sys.exit(0)
if __name__ == '__main__':
main()
# python run_api_tests.py --base-url http://127.0.0.1:4523/m1/6389742-6086420-default --swagger assets/doc/井筒API示例swagger.json --custom-test-cases-dir ./custom_testcases \
# --verbose \
# --output test_report.json
# python run_api_tests.py --base-url https://127.0.0.1:4523/m1/6389742-6086420-default --yapi assets/doc/井筒API示例_simple.json --custom-test-cases-dir ./custom_testcases \
# --verbose \
# --output test_report.json
# 示例:同时运行测试用例和场景
# python run_api_tests.py --base-url http://127.0.0.1:8000 --swagger ./assets/doc/petstore_swagger.json --custom-test-cases-dir ./custom_testcases --stages-dir ./custom_stages -v -o reports/