1048 lines
51 KiB
Python
1048 lines
51 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
API测试工具
|
||
|
||
此工具使用DDMS测试编排器从YAPI或Swagger定义执行API测试。
|
||
支持使用规则库进行高级验证。
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import logging
|
||
import argparse
|
||
import datetime
|
||
import subprocess
|
||
from pathlib import Path
|
||
from typing import List, Optional
|
||
import string # 导入string模块用于字符过滤
|
||
import unicodedata # 导入unicodedata用于字符净化
|
||
import html
|
||
|
||
# PDF生成库 - 使用ReportLab并打包字体
|
||
try:
|
||
from reportlab.lib import colors
|
||
from reportlab.lib.pagesizes import A4
|
||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable
|
||
from reportlab.pdfbase import pdfmetrics
|
||
from reportlab.pdfbase.ttfonts import TTFont, TTFError
|
||
|
||
|
||
reportlab_available = True
|
||
except ImportError:
|
||
reportlab_available = False
|
||
|
||
from ddms_compliance_suite.api_caller.caller import APICallDetail
|
||
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary
|
||
from ddms_compliance_suite.input_parser.parser import ParsedAPISpec
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def parse_args():
|
||
"""解析命令行参数"""
|
||
parser = argparse.ArgumentParser(description='DDMS API测试工具')
|
||
|
||
# 基本参数
|
||
parser.add_argument('--base-url', required=True, help='API基础URL')
|
||
parser.add_argument('--verbose', '-v', action='store_true', help='启用详细日志')
|
||
parser.add_argument('--output', '-o', help='输出目录或主报告文件路径 (例如 ./test_reports/ 或 ./test_reports/summary.json)')
|
||
parser.add_argument('--format', choices=['json', 'html'], default='json', help='主测试摘要报告的输出格式')
|
||
parser.add_argument('--api-calls-output', help='API 调用详情的 Markdown 输出文件路径 (例如 ./test_reports/api_calls.md)。如果未提供,将尝试使用 --output 目录和默认文件名 api_call_details.md。始终会额外生成一个同名的 .txt 文件包含纯 cURL 命令。')
|
||
parser.add_argument('--generate-pdf', action='store_true', help='是否生成中文PDF报告', default=True)
|
||
|
||
# API定义参数
|
||
api_group = parser.add_argument_group('API定义源')
|
||
api_group.add_argument('--yapi', help='YAPI定义文件路径')
|
||
api_group.add_argument('--swagger', help='Swagger定义文件路径')
|
||
api_group.add_argument('--dms', help='DMS服务发现的domain mapping文件路径')
|
||
api_group.add_argument('--page-size', type=int, default=1000,
|
||
help='DMS API分页大小,默认1000。较小的值可以减少内存使用,但会增加请求次数')
|
||
api_group.add_argument('--page-no', type=int, default=1,
|
||
help='DMS API起始页码,从1开始。可用于断点续传或跳过前面的页面')
|
||
api_group.add_argument('--fetch-single-page', action='store_true',
|
||
help='只获取指定页面的数据,而不是获取所有页面。用于快速测试或内存受限环境')
|
||
|
||
# 过滤参数
|
||
filter_group = parser.add_argument_group('过滤选项')
|
||
filter_group.add_argument('--categories', help='YAPI分类,逗号分隔')
|
||
filter_group.add_argument('--tags', help='Swagger标签,逗号分隔')
|
||
filter_group.add_argument('--list-categories', action='store_true', help='列出YAPI分类')
|
||
filter_group.add_argument('--list-tags', action='store_true', help='列出Swagger标签')
|
||
filter_group.add_argument('--strictness-level',
|
||
choices=['CRITICAL', 'HIGH', 'MEDIUM', 'LOW'],
|
||
default='CRITICAL',
|
||
help='设置测试的严格等级。只有严重性等于或高于此级别的失败用例才会导致API端点被标记为失败。')
|
||
filter_group.add_argument('--ignore-ssl', action='store_true',
|
||
help='忽略SSL证书验证(不推荐在生产环境使用)')
|
||
|
||
# 新增:自定义测试用例参数组
|
||
custom_tc_group = parser.add_argument_group('自定义测试用例选项')
|
||
custom_tc_group.add_argument('--custom-test-cases-dir',
|
||
default=None, # 或者 './custom_testcases' 如果想设为默认
|
||
help='存放自定义APITestCase Python文件的目录路径。如果未提供,则不加载自定义测试。')
|
||
|
||
# 新增:LLM 配置选项
|
||
llm_group = parser.add_argument_group('LLM 配置选项 (可选)')
|
||
llm_group.add_argument('--llm-api-key',
|
||
# default=os.environ.get("OPENAI_API_KEY"), # 尝试从环境变量获取
|
||
# default='sk-0213c70194624703a1d0d80e0f762b0e',
|
||
default='sk-lbGrsUPL1iby86h554FaE536C343435dAa9bA65967A840B2',
|
||
help='LLM服务的API密钥 (例如 OpenAI API Key)。默认从环境变量 OPENAI_API_KEY 读取。')
|
||
llm_group.add_argument('--llm-base-url',
|
||
# default="https://dashscope.aliyuncs.com/compatible-mode/v1",
|
||
default="https://aiproxy.petrotech.cnpc/v1",
|
||
help='LLM服务的自定义基础URL (例如 OpenAI API代理)。')
|
||
llm_group.add_argument('--llm-model-name',
|
||
# default="qwen-plus", # 设置一个常用的默认模型
|
||
default="deepseek-v3", # 设置一个常用的默认模型
|
||
help='要使用的LLM模型名称 (例如 "gpt-3.5-turbo", "gpt-4")。')
|
||
llm_group.add_argument('--use-llm-for-request-body',
|
||
action='store_true',
|
||
default=False, # 默认不使用LLM生成请求体
|
||
help='是否启用LLM为API请求生成请求体数据。')
|
||
llm_group.add_argument('--use-llm-for-path-params',
|
||
action='store_true',
|
||
default=False,
|
||
help='是否启用LLM为API请求生成路径参数。')
|
||
llm_group.add_argument('--use-llm-for-query-params',
|
||
action='store_true',
|
||
default=False,
|
||
help='是否启用LLM为API请求生成查询参数。')
|
||
llm_group.add_argument('--use-llm-for-headers',
|
||
action='store_true',
|
||
default=False,
|
||
help='是否启用LLM为API请求生成头部参数。')
|
||
|
||
# 新增:场景测试参数组
|
||
scenario_group = parser.add_argument_group('API测试阶段 (Stage) 选项 (可选)')
|
||
scenario_group.add_argument('--stages-dir',
|
||
default=None,
|
||
help='存放自定义APIStage Python文件的目录路径。如果未提供,则不执行测试阶段。')
|
||
|
||
return parser.parse_args()
|
||
|
||
def list_yapi_categories(yapi_file:str):
|
||
"""列出YAPI分类"""
|
||
from ddms_compliance_suite.input_parser.parser import InputParser
|
||
|
||
logger.info(f"从YAPI文件解析分类: {yapi_file}")
|
||
parser = InputParser()
|
||
parsed_yapi = parser.parse_yapi_spec(yapi_file)
|
||
|
||
if not parsed_yapi:
|
||
logger.error(f"解析YAPI文件失败: {yapi_file}")
|
||
return
|
||
|
||
print("\nYAPI分类:")
|
||
for i, category in enumerate(parsed_yapi.categories, 1):
|
||
print(f"{i}. {category.get('name', '未命名')} - {category.get('desc', '无描述')}")
|
||
|
||
def list_swagger_tags(swagger_file: str):
|
||
"""列出Swagger标签"""
|
||
from ddms_compliance_suite.input_parser.parser import InputParser
|
||
|
||
logger.info(f"从Swagger文件解析标签: {swagger_file}")
|
||
parser = InputParser()
|
||
parsed_swagger = parser.parse_swagger_spec(swagger_file)
|
||
|
||
if not parsed_swagger:
|
||
logger.error(f"解析Swagger文件失败: {swagger_file}")
|
||
return
|
||
|
||
print("\nSwagger标签:")
|
||
for i, tag in enumerate(parsed_swagger.tags, 1):
|
||
print(f"{i}. {tag.get('name', '未命名')} - {tag.get('description', '无描述')}")
|
||
|
||
def save_results(summary: TestSummary, output_file_path: str, format_type: str):
|
||
"""保存主测试摘要结果"""
|
||
output_path = Path(output_file_path)
|
||
# Ensure the directory for the output file exists
|
||
try:
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
except OSError as e:
|
||
logger.error(f"Error creating directory for output file {output_path.parent}: {e}")
|
||
return
|
||
|
||
if format_type == 'json':
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write(summary.to_json(pretty=True))
|
||
logger.info(f"测试结果已保存为JSON: {output_path}")
|
||
elif format_type == 'html':
|
||
# Creating simple HTML report
|
||
html_content = f"""
|
||
<!DOCTYPE html>
|
||
<html>
|
||
<head>
|
||
<meta charset="utf-8">
|
||
<title>API测试报告</title>
|
||
<style>
|
||
body {{ font-family: Arial, sans-serif; margin: 20px; }}
|
||
.summary {{ background-color: #f5f5f5; padding: 15px; border-radius: 5px; }}
|
||
.pass {{ color: green; }}
|
||
.fail {{ color: red; }}
|
||
.error {{ color: orange; }}
|
||
.skip {{ color: gray; }}
|
||
table {{ border-collapse: collapse; width: 100%; }}
|
||
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
|
||
th {{ background-color: #f2f2f2; }}
|
||
tr:nth-child(even) {{ background-color: #f9f9f9; }}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<h1>API测试报告</h1>
|
||
<div class="summary">
|
||
<h2>测试结果摘要</h2>
|
||
<p>总测试数: {summary.total_test_cases_executed}</p>
|
||
<p class="pass">通过: {summary.test_cases_passed}</p>
|
||
<p class="fail">失败: {summary.test_cases_failed}</p>
|
||
<p class="error">错误: {summary.test_cases_error}</p>
|
||
<p class="skip">跳过: {summary.test_cases_skipped_in_endpoint}</p>
|
||
<p>成功率: {summary.test_case_success_rate:.2f}%</p>
|
||
<p>总耗时: {summary.duration:.2f}秒</p>
|
||
<p>开始时间: {summary.start_time.isoformat()}</p>
|
||
<p>结束时间: {summary.end_time.isoformat() if summary.end_time else 'N/A'}</p>
|
||
</div>
|
||
|
||
<h2>详细测试结果</h2>
|
||
<table>
|
||
<tr>
|
||
<th>端点</th>
|
||
<th>测试用例ID</th>
|
||
<th>测试用例名称</th>
|
||
<th>状态</th>
|
||
<th>消息</th>
|
||
<th>耗时(秒)</th>
|
||
</tr>
|
||
"""
|
||
|
||
for endpoint_result in summary.detailed_results:
|
||
for tc_result in endpoint_result.executed_test_cases:
|
||
status_class = "pass" if tc_result.status == ExecutedTestCaseResult.Status.PASSED else \
|
||
"fail" if tc_result.status == ExecutedTestCaseResult.Status.FAILED else \
|
||
"error" if tc_result.status == ExecutedTestCaseResult.Status.ERROR else "skip"
|
||
|
||
html_content += f"""
|
||
<tr>
|
||
<td>{endpoint_result.endpoint_name} ({endpoint_result.endpoint_id})</td>
|
||
<td>{tc_result.test_case_id}</td>
|
||
<td>{tc_result.test_case_name}</td>
|
||
<td class="{status_class}">{tc_result.status.value}</td>
|
||
<td>{tc_result.message}</td>
|
||
<td>{tc_result.duration:.4f}</td>
|
||
</tr>
|
||
"""
|
||
|
||
html_content += """
|
||
</table>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write(html_content)
|
||
logger.info(f"测试结果已保存为HTML: {output_path}")
|
||
|
||
def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
|
||
"""
|
||
将API调用详情列表保存到指定目录下的 Markdown 文件中。
|
||
同时,额外生成一个纯文本文件 (.txt),每行包含一个 cURL 命令。
|
||
"""
|
||
if not api_call_details:
|
||
logger.info("没有API调用详情可供保存。")
|
||
return
|
||
|
||
output_dir = Path(output_dir_path)
|
||
try:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
except OSError as e:
|
||
logger.error(f"创建API调用详情输出目录 {output_dir} 失败: {e}")
|
||
return
|
||
|
||
# 主文件是 Markdown 文件
|
||
md_output_file = output_dir / filename
|
||
# 确保它是 .md,尽管 main 函数应该已经处理了
|
||
if md_output_file.suffix.lower() not in ['.md', '.markdown']:
|
||
md_output_file = md_output_file.with_suffix('.md')
|
||
|
||
markdown_content = []
|
||
|
||
for detail in api_call_details:
|
||
|
||
# Request URL with params (if any)
|
||
url_to_display = detail.request_url
|
||
if detail.request_params:
|
||
try:
|
||
# Ensure urllib is available for this formatting step
|
||
import urllib.parse
|
||
query_string = urllib.parse.urlencode(detail.request_params)
|
||
url_to_display = f"{detail.request_url}?{query_string}"
|
||
except Exception as e:
|
||
logger.warning(f"Error formatting URL with params for display: {e}")
|
||
# Fallback to just the base URL if params formatting fails
|
||
|
||
markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
|
||
markdown_content.append("**cURL Command:**")
|
||
markdown_content.append("```sh")
|
||
markdown_content.append(detail.curl_command)
|
||
markdown_content.append("```")
|
||
markdown_content.append("### Request Details")
|
||
markdown_content.append(f"- **Method:** `{detail.request_method}`")
|
||
markdown_content.append(f"- **Full URL:** `{url_to_display}`")
|
||
|
||
markdown_content.append("- **Headers:**")
|
||
markdown_content.append("```json")
|
||
markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
|
||
markdown_content.append("```")
|
||
|
||
if detail.request_params:
|
||
markdown_content.append("- **Query Parameters:**")
|
||
markdown_content.append("```json")
|
||
markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
|
||
markdown_content.append("```")
|
||
|
||
if detail.request_body is not None:
|
||
markdown_content.append("- **Body:**")
|
||
body_lang = "text"
|
||
formatted_body = str(detail.request_body)
|
||
try:
|
||
# Try to parse as JSON for pretty printing
|
||
if isinstance(detail.request_body, str):
|
||
try:
|
||
parsed_json = json.loads(detail.request_body)
|
||
formatted_body = json.dumps(parsed_json, indent=2, ensure_ascii=False)
|
||
body_lang = "json"
|
||
except json.JSONDecodeError:
|
||
pass # Keep as text
|
||
elif isinstance(detail.request_body, (dict, list)):
|
||
formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False)
|
||
body_lang = "json"
|
||
except Exception as e:
|
||
logger.warning(f"Error formatting request body for Markdown: {e}")
|
||
|
||
markdown_content.append(f"```{body_lang}")
|
||
markdown_content.append(formatted_body)
|
||
markdown_content.append("```")
|
||
|
||
markdown_content.append("### Response Details")
|
||
markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
|
||
markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
|
||
|
||
markdown_content.append("- **Headers:**")
|
||
markdown_content.append("```json")
|
||
markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
|
||
markdown_content.append("```")
|
||
|
||
if detail.response_body is not None:
|
||
markdown_content.append("- **Body:**")
|
||
resp_body_lang = "text"
|
||
formatted_resp_body = str(detail.response_body)
|
||
try:
|
||
# Try to parse as JSON for pretty printing
|
||
if isinstance(detail.response_body, str):
|
||
try:
|
||
# If it's already a string that might be JSON, try parsing and re-dumping
|
||
parsed_json_resp = json.loads(detail.response_body)
|
||
formatted_resp_body = json.dumps(parsed_json_resp, indent=2, ensure_ascii=False)
|
||
resp_body_lang = "json"
|
||
except json.JSONDecodeError:
|
||
# It's a string, but not valid JSON, keep as text
|
||
pass
|
||
elif isinstance(detail.response_body, (dict, list)):
|
||
# It's already a dict/list, dump it as JSON
|
||
formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False)
|
||
resp_body_lang = "json"
|
||
# If it's neither string nor dict/list (e.g. int, bool from parsed json), str() is fine.
|
||
except Exception as e:
|
||
logger.warning(f"Error formatting response body for Markdown: {e}")
|
||
|
||
markdown_content.append(f"```{resp_body_lang}")
|
||
markdown_content.append(formatted_resp_body)
|
||
markdown_content.append("```")
|
||
markdown_content.append("") # Add a blank line for spacing before next --- or EOF
|
||
markdown_content.append("---") # Separator
|
||
|
||
try:
|
||
with open(md_output_file, 'w', encoding='utf-8') as f_md:
|
||
f_md.write("\n".join(markdown_content))
|
||
logger.info(f"API调用详情已保存为 Markdown: {md_output_file}")
|
||
except Exception as e:
|
||
logger.error(f"保存API调用详情到 Markdown 文件 {md_output_file} 失败: {e}", exc_info=True)
|
||
|
||
def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'):
|
||
"""将测试摘要保存为格式化的PDF文件"""
|
||
logger.info(f"开始生成PDF报告: {output_path}")
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
try:
|
||
# --- 统一的字体管理和注册 ---
|
||
font_name = 'SimSun' # 使用一个简单清晰的注册名
|
||
font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
|
||
|
||
if not Path(font_path).exists():
|
||
logger.error(f"字体文件未找到: {Path(font_path).resolve()}")
|
||
return
|
||
|
||
# 关键修复: 对于 .ttc (TrueType Collection) 文件, 必须指定 subfontIndex
|
||
pdfmetrics.registerFont(TTFont(font_name, font_path, subfontIndex=0))
|
||
# 将注册的字体关联到 'SimSun' 字体族
|
||
pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
|
||
|
||
doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
|
||
elements = []
|
||
|
||
# --- 统一样式定义, 全部使用注册的字体名 ---
|
||
styles = getSampleStyleSheet()
|
||
title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
|
||
heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
|
||
normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
|
||
small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12)
|
||
|
||
def to_para(text, style=normal_style, escape=True):
|
||
"""
|
||
根据用户建议移除 textwrap 以进行诊断。
|
||
此版本只包含净化和基本的换行符替换。
|
||
"""
|
||
if text is None:
|
||
content = ""
|
||
else:
|
||
content = str(text)
|
||
|
||
if escape:
|
||
content = html.escape(content)
|
||
|
||
# 依然保留Unicode控制字符的净化
|
||
content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
|
||
|
||
if not content.strip():
|
||
# 对于完全空白或None的输入,返回一个安全的非换行空格
|
||
return Paragraph(' ', style)
|
||
|
||
# 只使用基本的换行符替换
|
||
content = content.replace('\n', '<br/>')
|
||
|
||
return Paragraph(content, style)
|
||
|
||
# 3. 填充PDF内容 - 优化后的报告格式
|
||
|
||
# 生成报告编码(基于时间戳)
|
||
import time
|
||
report_code = f"DMS-TEST-{int(time.time())}"
|
||
|
||
# 报告标题
|
||
elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False))
|
||
elements.append(Spacer(1, 15))
|
||
|
||
# 报告基本信息表格
|
||
basic_info_data = [
|
||
[to_para("<b>报告编码</b>", escape=False), to_para(report_code)],
|
||
[to_para("<b>报告名称</b>", escape=False), to_para("DMS领域数据服务测试分析报告")],
|
||
[to_para("<b>申请日期</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日'))],
|
||
[to_para("<b>申请人</b>", escape=False), to_para("系统管理员")],
|
||
[to_para("<b>服务供应商名称</b>", escape=False), to_para("数据管理系统(DMS)")],
|
||
]
|
||
basic_info_table = Table(basic_info_data, colWidths=[120, '*'])
|
||
basic_info_table.setStyle(TableStyle([
|
||
('GRID', (0,0), (-1,-1), 1, colors.grey),
|
||
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
|
||
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
|
||
]))
|
||
elements.append(basic_info_table)
|
||
elements.append(Spacer(1, 20))
|
||
|
||
# 摘要部分
|
||
elements.append(to_para("摘要", heading_style, escape=False))
|
||
overall = summary_data.get('overall_summary', {})
|
||
|
||
# 从JSON提取并格式化时间
|
||
try:
|
||
start_time_str = summary_data.get('start_time', 'N/A')
|
||
end_time_str = summary_data.get('end_time', 'N/A')
|
||
duration = summary_data.get('duration_seconds', summary_data.get('duration', 0.0))
|
||
|
||
start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
|
||
end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
|
||
except:
|
||
start_time_formatted = start_time_str
|
||
end_time_formatted = end_time_str
|
||
|
||
# 摘要内容 - 安全计算跳过的数量
|
||
def safe_subtract(total, passed, failed):
|
||
"""安全地计算跳过的数量"""
|
||
try:
|
||
if isinstance(total, (int, float)) and isinstance(passed, (int, float)) and isinstance(failed, (int, float)):
|
||
return max(0, total - passed - failed)
|
||
else:
|
||
return 0
|
||
except:
|
||
return 0
|
||
|
||
endpoints_tested = overall.get('endpoints_tested', 0)
|
||
endpoints_passed = overall.get('endpoints_passed', 0)
|
||
endpoints_failed = overall.get('endpoints_failed', 0)
|
||
endpoints_skipped = safe_subtract(endpoints_tested, endpoints_passed, endpoints_failed)
|
||
|
||
test_cases_executed = overall.get('total_test_cases_executed', 0)
|
||
test_cases_passed = overall.get('test_cases_passed', 0)
|
||
test_cases_failed = overall.get('test_cases_failed', 0)
|
||
test_cases_skipped = safe_subtract(test_cases_executed, test_cases_passed, test_cases_failed)
|
||
|
||
stages_executed = overall.get('total_stages_executed', 0)
|
||
stages_passed = overall.get('stages_passed', 0)
|
||
stages_failed = overall.get('stages_failed', 0)
|
||
stages_skipped = safe_subtract(stages_executed, stages_passed, stages_failed)
|
||
|
||
summary_text = f"""本次测试针对DMS(数据管理系统)领域数据服务进行全面的合规性验证。
|
||
测试时间:{start_time_formatted} 至 {end_time_formatted},总耗时 {float(duration):.2f} 秒。
|
||
共测试 {endpoints_tested} 个API端点,其中 {endpoints_passed} 个通过,{endpoints_failed} 个失败,{endpoints_skipped} 个跳过,端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}。
|
||
执行 {test_cases_executed} 个测试用例,其中 {test_cases_passed} 个通过,{test_cases_failed} 个失败,{test_cases_skipped} 个跳过,测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}。
|
||
执行 {stages_executed} 个流程测试,其中 {stages_passed} 个通过,{stages_failed} 个失败,{stages_skipped} 个跳过,流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}。"""
|
||
|
||
elements.append(to_para(summary_text, normal_style))
|
||
elements.append(Spacer(1, 20))
|
||
|
||
# 测试内容包括 - API列表表格
|
||
elements.append(to_para("测试内容包括", heading_style, escape=False))
|
||
|
||
# 从测试结果中提取API信息
|
||
endpoint_results = summary_data.get('endpoint_results', [])
|
||
api_list_data = [
|
||
[to_para("<b>序号</b>", escape=False), to_para("<b>服务名称</b>", escape=False),
|
||
to_para("<b>服务功能描述</b>", escape=False), to_para("<b>服务参数描述</b>", escape=False),
|
||
to_para("<b>服务返回值描述</b>", escape=False)]
|
||
]
|
||
|
||
for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API
|
||
endpoint_name = endpoint.get('endpoint_name', 'N/A')
|
||
|
||
# 简化的功能描述
|
||
if 'Create' in endpoint_name:
|
||
func_desc = "提供数据创建服务"
|
||
elif 'List' in endpoint_name or 'Query' in endpoint_name:
|
||
func_desc = "提供数据查询和列表服务"
|
||
elif 'Read' in endpoint_name:
|
||
func_desc = "提供单条数据读取服务"
|
||
elif 'Update' in endpoint_name:
|
||
func_desc = "提供数据更新服务"
|
||
elif 'Delete' in endpoint_name:
|
||
func_desc = "提供数据删除服务"
|
||
else:
|
||
func_desc = "提供数据管理服务"
|
||
|
||
api_list_data.append([
|
||
to_para(str(i), small_style),
|
||
to_para(endpoint_name, small_style),
|
||
to_para(func_desc, small_style),
|
||
to_para("标准DMS参数格式", small_style),
|
||
to_para("标准DMS响应格式", small_style)
|
||
])
|
||
|
||
api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80])
|
||
api_list_table.setStyle(TableStyle([
|
||
('GRID', (0,0), (-1,-1), 1, colors.grey),
|
||
('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
|
||
('ALIGN', (0,0), (-1,-1), 'CENTER'),
|
||
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
|
||
('FONTSIZE', (0,0), (-1,-1), 8)
|
||
]))
|
||
elements.append(api_list_table)
|
||
elements.append(Spacer(1, 20))
|
||
|
||
# 测试用例列表 - 根据严格等级分为必须和非必须
|
||
elements.append(to_para("测试用例列表", heading_style, escape=False))
|
||
|
||
# 定义严重性等级的数值映射
|
||
severity_levels = {
|
||
'CRITICAL': 5,
|
||
'HIGH': 4,
|
||
'MEDIUM': 3,
|
||
'LOW': 2,
|
||
'INFO': 1
|
||
}
|
||
|
||
strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL
|
||
|
||
# 收集所有测试用例(包括endpoint用例和stage用例)
|
||
all_test_cases = []
|
||
failed_test_cases = [] # 专门收集失败的测试用例
|
||
|
||
# 1. 收集endpoint测试用例
|
||
for endpoint_result in endpoint_results:
|
||
test_cases = endpoint_result.get('executed_test_cases', [])
|
||
for tc in test_cases:
|
||
tc_severity = tc.get('test_case_severity', 'MEDIUM')
|
||
tc_severity_value = severity_levels.get(tc_severity, 3)
|
||
tc_status = tc.get('status', 'N/A')
|
||
tc_message = tc.get('message', '')
|
||
|
||
test_case_info = {
|
||
'type': 'Endpoint',
|
||
'endpoint': endpoint_result.get('endpoint_name', 'N/A'),
|
||
'endpoint_id': endpoint_result.get('endpoint_id', 'N/A'),
|
||
'case_name': tc.get('test_case_name', 'N/A'),
|
||
'case_id': tc.get('test_case_id', 'N/A'),
|
||
'status': tc_status,
|
||
'message': tc_message,
|
||
'severity': tc_severity,
|
||
'severity_value': tc_severity_value,
|
||
'is_required': tc_severity_value >= strictness_value,
|
||
'duration': tc.get('duration_seconds', 0),
|
||
'timestamp': tc.get('timestamp', '')
|
||
}
|
||
|
||
all_test_cases.append(test_case_info)
|
||
|
||
# 收集失败的测试用例
|
||
if tc_status in ['失败', 'FAILED', '错误', 'ERROR']:
|
||
failed_test_cases.append(test_case_info)
|
||
|
||
# 2. 收集stage测试用例
|
||
stage_results = summary_data.get('stage_results', [])
|
||
for stage_result in stage_results:
|
||
stage_name = stage_result.get('stage_name', 'N/A')
|
||
stage_status = stage_result.get('overall_status', 'N/A')
|
||
stage_message = stage_result.get('message', stage_result.get('error_message', ''))
|
||
stage_severity = 'HIGH' # Stage用例通常是高优先级
|
||
stage_severity_value = severity_levels.get(stage_severity, 4)
|
||
|
||
# 将stage作为一个测试用例添加
|
||
stage_case_info = {
|
||
'type': 'Stage',
|
||
'endpoint': f"Stage: {stage_name}",
|
||
'endpoint_id': f"STAGE_{stage_name}",
|
||
'case_name': stage_result.get('description', stage_name),
|
||
'case_id': f"STAGE_{stage_name}",
|
||
'status': stage_status,
|
||
'message': stage_message,
|
||
'severity': stage_severity,
|
||
'severity_value': stage_severity_value,
|
||
'is_required': stage_severity_value >= strictness_value,
|
||
'duration': stage_result.get('duration_seconds', 0),
|
||
'timestamp': stage_result.get('start_time', '')
|
||
}
|
||
|
||
all_test_cases.append(stage_case_info)
|
||
|
||
# 收集失败的stage用例
|
||
if stage_status in ['失败', 'FAILED', '错误', 'ERROR']:
|
||
failed_test_cases.append(stage_case_info)
|
||
|
||
# 分离必须和非必须的测试用例
|
||
required_cases = [case for case in all_test_cases if case['is_required']]
|
||
optional_cases = [case for case in all_test_cases if not case['is_required']]
|
||
|
||
# 创建分离的测试用例表格
|
||
if all_test_cases:
|
||
# 添加严格等级说明
|
||
strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。"
|
||
elements.append(to_para(strictness_text, small_style))
|
||
elements.append(Spacer(1, 10))
|
||
|
||
# 1. 必须的测试用例表格
|
||
if required_cases:
|
||
elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False))
|
||
|
||
required_table_data = [
|
||
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
|
||
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
|
||
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
|
||
]
|
||
|
||
for i, case in enumerate(required_cases, 1):
|
||
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
|
||
required_table_data.append([
|
||
to_para(str(i), small_style),
|
||
to_para(case['type'], small_style),
|
||
to_para(case['case_name'], small_style),
|
||
to_para(case['endpoint'], small_style),
|
||
to_para(case['severity'], small_style),
|
||
to_para(status_display, small_style)
|
||
])
|
||
|
||
required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45])
|
||
required_table.setStyle(TableStyle([
|
||
('GRID', (0,0), (-1,-1), 1, colors.grey),
|
||
('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例
|
||
('ALIGN', (0,0), (-1,-1), 'CENTER'),
|
||
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
|
||
('FONTSIZE', (0,0), (-1,-1), 8)
|
||
]))
|
||
elements.append(required_table)
|
||
elements.append(Spacer(1, 15))
|
||
|
||
# 2. 非必须的测试用例表格
|
||
if optional_cases:
|
||
elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False))
|
||
|
||
optional_table_data = [
|
||
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
|
||
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
|
||
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
|
||
]
|
||
|
||
for i, case in enumerate(optional_cases, 1):
|
||
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
|
||
optional_table_data.append([
|
||
to_para(str(i), small_style),
|
||
to_para(case['type'], small_style),
|
||
to_para(case['case_name'], small_style),
|
||
to_para(case['endpoint'], small_style),
|
||
to_para(case['severity'], small_style),
|
||
to_para(status_display, small_style)
|
||
])
|
||
|
||
optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45])
|
||
optional_table.setStyle(TableStyle([
|
||
('GRID', (0,0), (-1,-1), 1, colors.grey),
|
||
('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例
|
||
('ALIGN', (0,0), (-1,-1), 'CENTER'),
|
||
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
|
||
('FONTSIZE', (0,0), (-1,-1), 8)
|
||
]))
|
||
elements.append(optional_table)
|
||
elements.append(Spacer(1, 10))
|
||
|
||
# 添加用例统计信息
|
||
total_cases = len(all_test_cases)
|
||
endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint'])
|
||
stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage'])
|
||
required_count = len(required_cases)
|
||
optional_count = len(optional_cases)
|
||
|
||
stats_text = f"""测试用例统计:
|
||
总计 {total_cases} 个用例,其中端点用例 {endpoint_cases} 个,阶段用例 {stage_cases} 个。
|
||
必须用例 {required_count} 个,非必须用例 {optional_count} 个。
|
||
严格等级:{strictness_level}({severity_levels.get(strictness_level, 5)}级及以上为必须)。"""
|
||
|
||
elements.append(to_para(stats_text, small_style))
|
||
else:
|
||
elements.append(to_para("无测试用例执行记录。", normal_style))
|
||
|
||
elements.append(Spacer(1, 20))
|
||
|
||
# 失败用例详情部分
|
||
if failed_test_cases:
|
||
elements.append(to_para("失败用例详情分析", heading_style, escape=False))
|
||
elements.append(Spacer(1, 10))
|
||
|
||
# 按严重性分组失败用例
|
||
critical_failures = [tc for tc in failed_test_cases if tc['severity'] == 'CRITICAL']
|
||
high_failures = [tc for tc in failed_test_cases if tc['severity'] == 'HIGH']
|
||
medium_failures = [tc for tc in failed_test_cases if tc['severity'] == 'MEDIUM']
|
||
low_failures = [tc for tc in failed_test_cases if tc['severity'] == 'LOW']
|
||
|
||
failure_summary = f"""失败用例统计:
|
||
总计 {len(failed_test_cases)} 个失败用例,其中:
|
||
• 严重级别:{len(critical_failures)} 个
|
||
• 高级别:{len(high_failures)} 个
|
||
• 中级别:{len(medium_failures)} 个
|
||
• 低级别:{len(low_failures)} 个
|
||
|
||
以下是详细的失败原因分析:"""
|
||
|
||
elements.append(to_para(failure_summary, normal_style))
|
||
elements.append(Spacer(1, 15))
|
||
|
||
# 详细失败用例列表
|
||
for i, failed_case in enumerate(failed_test_cases, 1):
|
||
# 用例标题
|
||
case_title = f"{i}. {failed_case['case_name']}"
|
||
elements.append(to_para(case_title, ParagraphStyle('case_title', parent=normal_style, fontSize=11, textColor=colors.darkred, spaceAfter=5)))
|
||
|
||
# 用例基本信息
|
||
case_info = f"""• 用例ID:{failed_case['case_id']}
|
||
• 所属端点:{failed_case['endpoint']}
|
||
• 严重级别:{failed_case['severity']}
|
||
• 执行状态:{failed_case['status']}"""
|
||
|
||
elements.append(to_para(case_info, ParagraphStyle('case_info', parent=small_style, leftIndent=15, spaceAfter=5)))
|
||
|
||
# 失败原因
|
||
failure_reason = failed_case.get('message', '无详细错误信息')
|
||
if failure_reason:
|
||
elements.append(to_para("失败原因:", ParagraphStyle('failure_label', parent=normal_style, fontSize=10, textColor=colors.darkblue, leftIndent=15)))
|
||
|
||
# 处理长文本,确保在PDF中正确显示
|
||
if len(failure_reason) > 200:
|
||
# 对于很长的错误信息,进行适当的分段
|
||
failure_reason = failure_reason[:200] + "..."
|
||
|
||
elements.append(to_para(failure_reason, ParagraphStyle('failure_reason', parent=small_style, leftIndent=30, rightIndent=20, spaceAfter=10, textColor=colors.red)))
|
||
|
||
# 添加分隔线
|
||
if i < len(failed_test_cases):
|
||
elements.append(HRFlowable(width="80%", thickness=0.5, color=colors.lightgrey))
|
||
elements.append(Spacer(1, 10))
|
||
|
||
elements.append(Spacer(1, 20))
|
||
|
||
elements.append(Spacer(1, 20))
|
||
|
||
# 测试情况说明
|
||
elements.append(to_para("测试情况说明", heading_style, escape=False))
|
||
|
||
test_situation_text = f"""本次测试是对DMS领域数据管理服务V1.0版本下的{overall.get('endpoints_tested', 'N/A')}个API进行验证测试。
|
||
测试:累计发现缺陷{overall.get('test_cases_failed', 0)}个。
|
||
测试执行时间:{start_time_formatted} 至 {end_time_formatted}
|
||
测试环境:开发测试环境
|
||
测试方法:自动化API合规性测试"""
|
||
|
||
elements.append(to_para(test_situation_text, normal_style))
|
||
elements.append(Spacer(1, 20))
|
||
|
||
# 测试结论
|
||
elements.append(to_para("测试结论", heading_style, escape=False))
|
||
|
||
# 根据测试结果生成结论
|
||
success_rate = overall.get('test_case_success_rate', '0%')
|
||
success_rate_num = float(success_rate.replace('%', '')) if success_rate != 'N/A' else 0
|
||
|
||
if success_rate_num >= 90:
|
||
conclusion_status = "通过"
|
||
conclusion_text = f"""本套领域数据服务已通过环境验证,系统可以正常运行。验收测试通过标准关于用例执行、DMS业务流相关文档等两个方面分析,该项目通过验收测试。
|
||
测试用例成功率达到{success_rate},符合验收标准。"""
|
||
elif success_rate_num >= 70:
|
||
conclusion_status = "基本通过"
|
||
conclusion_text = f"""本套领域数据服务基本满足验收要求,但存在部分问题需要修复。测试用例成功率为{success_rate},建议修复失败用例后重新测试。"""
|
||
else:
|
||
conclusion_status = "不通过"
|
||
conclusion_text = f"""本套领域数据服务未达到验收标准,存在较多问题需要修复。测试用例成功率仅为{success_rate},需要全面检查和修复后重新测试。"""
|
||
|
||
elements.append(to_para(conclusion_text, normal_style))
|
||
elements.append(Spacer(1, 20))
|
||
|
||
# 检测依据
|
||
elements.append(to_para("检测依据", heading_style, escape=False))
|
||
|
||
detection_basis_text = """集成开发应用支撑系统开放数据生态数据共享要求和评价第1部分:关于DMS领域数据服务的接口要求和测试细则。
|
||
参考标准:
|
||
1. DMS数据管理系统API规范V1.0
|
||
2. RESTful API设计规范
|
||
3. 数据安全和隐私保护要求
|
||
4. 系统集成测试标准"""
|
||
|
||
elements.append(to_para(detection_basis_text, normal_style))
|
||
elements.append(Spacer(1, 20))
|
||
|
||
# 报告生成信息
|
||
elements.append(to_para("报告生成信息", heading_style, escape=False))
|
||
generation_info_data = [
|
||
[to_para("<b>生成时间</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日 %H:%M:%S'))],
|
||
[to_para("<b>生成工具</b>", escape=False), to_para("DMS合规性测试工具")],
|
||
[to_para("<b>工具版本</b>", escape=False), to_para("V1.0.0")],
|
||
[to_para("<b>测试结论</b>", escape=False), to_para(f"<b>{conclusion_status}</b>", escape=False)],
|
||
]
|
||
generation_info_table = Table(generation_info_data, colWidths=[120, '*'])
|
||
generation_info_table.setStyle(TableStyle([
|
||
('GRID', (0,0), (-1,-1), 1, colors.grey),
|
||
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
|
||
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
|
||
]))
|
||
elements.append(generation_info_table)
|
||
|
||
# 构建PDF
|
||
doc.build(elements)
|
||
logger.info(f"PDF报告已成功生成: {output_path}")
|
||
except Exception as e:
|
||
logger.error(f"构建PDF文档时出错: {e}", exc_info=True)
|
||
|
||
def main():
|
||
"""主函数"""
|
||
args = parse_args()
|
||
|
||
if args.verbose:
|
||
logging.getLogger('ddms_compliance_suite').setLevel(logging.DEBUG)
|
||
logger.setLevel(logging.DEBUG)
|
||
logger.debug("已启用详细日志模式")
|
||
|
||
if not args.yapi and not args.swagger and not args.dms:
|
||
logger.error("请提供API定义源:--yapi, --swagger, 或 --dms")
|
||
sys.exit(1)
|
||
|
||
if (args.yapi and args.swagger) or (args.yapi and args.dms) or (args.swagger and args.dms):
|
||
logger.error("API定义源 --yapi, --swagger, 和 --dms 是互斥的,请只选择一个。")
|
||
sys.exit(1)
|
||
|
||
if args.list_categories and args.yapi:
|
||
list_yapi_categories(args.yapi)
|
||
sys.exit(0)
|
||
|
||
if args.list_tags and args.swagger:
|
||
list_swagger_tags(args.swagger)
|
||
sys.exit(0)
|
||
|
||
categories = args.categories.split(',') if args.categories else None
|
||
tags = args.tags.split(',') if args.tags else None
|
||
|
||
# 确定基础输出目录
|
||
base_output_dir = Path("./test_reports")
|
||
if args.output:
|
||
# 如果提供了 --output,将其作为所有时间戳报告的基础目录。
|
||
# 这简化了逻辑:--output 始终是一个目录。
|
||
base_output_dir = Path(args.output)
|
||
|
||
# 为本次测试运行创建一个唯一的、带时间戳的子目录
|
||
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||
output_directory = base_output_dir / timestamp
|
||
|
||
# 主摘要报告的文件路径现在位于新目录内
|
||
main_report_file_path = output_directory / f"summary.{args.format}"
|
||
|
||
try:
|
||
output_directory.mkdir(parents=True, exist_ok=True)
|
||
logger.info(f"测试报告将保存到: {output_directory.resolve()}")
|
||
except OSError as e:
|
||
logger.error(f"创建输出目录失败 {output_directory}: {e}")
|
||
sys.exit(1)
|
||
|
||
orchestrator = APITestOrchestrator(
|
||
base_url=args.base_url,
|
||
custom_test_cases_dir=args.custom_test_cases_dir,
|
||
llm_api_key=args.llm_api_key,
|
||
llm_base_url=args.llm_base_url,
|
||
llm_model_name=args.llm_model_name,
|
||
use_llm_for_request_body=args.use_llm_for_request_body,
|
||
use_llm_for_path_params=args.use_llm_for_path_params,
|
||
use_llm_for_query_params=args.use_llm_for_query_params,
|
||
use_llm_for_headers=args.use_llm_for_headers,
|
||
output_dir=str(output_directory),
|
||
stages_dir=args.stages_dir, # 将 stages_dir 传递给编排器
|
||
strictness_level=args.strictness_level,
|
||
ignore_ssl=args.ignore_ssl,
|
||
enable_well_data=True # 默认启用井数据功能
|
||
)
|
||
|
||
test_summary: Optional[TestSummary] = None
|
||
parsed_spec_for_scenarios: Optional[ParsedAPISpec] = None # 用于存储已解析的规范,供场景使用
|
||
|
||
try:
|
||
if args.yapi:
|
||
logger.info(f"从YAPI文件运行测试: {args.yapi}")
|
||
# orchestrator.run_tests_from_yapi 现在返回一个元组
|
||
test_summary, parsed_spec_for_scenarios = orchestrator.run_tests_from_yapi(
|
||
yapi_file_path=args.yapi,
|
||
categories=categories,
|
||
custom_test_cases_dir=args.custom_test_cases_dir
|
||
)
|
||
if not parsed_spec_for_scenarios: # 检查解析是否成功
|
||
logger.error(f"YAPI文件 '{args.yapi}' 解析失败 (由编排器报告)。程序将退出。")
|
||
sys.exit(1)
|
||
|
||
elif args.swagger:
|
||
logger.info(f"从Swagger文件运行测试: {args.swagger}")
|
||
# orchestrator.run_tests_from_swagger 现在返回一个元组
|
||
test_summary, parsed_spec_for_scenarios = orchestrator.run_tests_from_swagger(
|
||
swagger_file_path=args.swagger,
|
||
tags=tags,
|
||
custom_test_cases_dir=args.custom_test_cases_dir
|
||
)
|
||
if not parsed_spec_for_scenarios: # 检查解析是否成功
|
||
logger.error(f"Swagger文件 '{args.swagger}' 解析失败 (由编排器报告)。程序将退出。")
|
||
sys.exit(1)
|
||
|
||
elif args.dms:
|
||
logger.info(f"从DMS服务动态发现运行测试: {args.dms}")
|
||
test_summary, parsed_spec_for_scenarios, pagination_info = orchestrator.run_tests_from_dms(
|
||
domain_mapping_path=args.dms,
|
||
categories=categories,
|
||
custom_test_cases_dir=args.custom_test_cases_dir,
|
||
ignore_ssl=args.ignore_ssl,
|
||
page_size=args.page_size,
|
||
page_no_start=args.page_no,
|
||
fetch_all_pages=not args.fetch_single_page
|
||
)
|
||
if not parsed_spec_for_scenarios: # 检查解析是否成功
|
||
logger.error(f"从DMS服务 '{args.dms}' 解析失败 (由编排器报告)。程序将退出。")
|
||
sys.exit(1)
|
||
|
||
# 显示分页信息
|
||
if pagination_info:
|
||
logger.info(f"DMS分页信息: 总记录数={pagination_info.get('total_records', 0)}, "
|
||
f"页面大小={pagination_info.get('page_size', 0)}, "
|
||
f"起始页码={pagination_info.get('page_no_start', 1)}, "
|
||
f"当前页码={pagination_info.get('current_page', 1)}, "
|
||
f"获取页数={pagination_info.get('pages_fetched', 0)}/{pagination_info.get('total_pages', 0)}")
|
||
else:
|
||
logger.warning("未获取到分页信息")
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行测试用例时发生意外错误: {e}", exc_info=True)
|
||
sys.exit(1)
|
||
|
||
if test_summary:
|
||
# 在保存单个测试用例结果之后,运行API测试阶段 (如果指定了目录)
|
||
if args.stages_dir and parsed_spec_for_scenarios:
|
||
logger.info(f"开始执行API测试阶段 (Stages),目录: {args.stages_dir}")
|
||
# 注意:这里假设 test_orchestrator.py 中已经有了 run_stages_from_spec 方法
|
||
# 并且 APITestOrchestrator 的 __init__ 也接受 stages_dir
|
||
orchestrator.run_stages_from_spec( # 调用 run_stages_from_spec
|
||
# stages_dir is managed by orchestrator's __init__
|
||
parsed_spec=parsed_spec_for_scenarios, # 使用之前解析的规范
|
||
summary=test_summary # 将阶段结果添加到同一个摘要对象
|
||
)
|
||
logger.info("API测试阶段 (Stages) 执行完毕。")
|
||
# 阶段执行后,摘要已更新,重新最终确定和打印摘要
|
||
test_summary.finalize_summary() # 重新计算总时长等
|
||
test_summary.print_summary_to_console() # 打印包含阶段结果的更新摘要
|
||
|
||
# 保存主测试摘要,格式由用户指定
|
||
main_report_file_path = output_directory / f"summary.{args.format}"
|
||
save_results(test_summary, str(main_report_file_path), args.format)
|
||
|
||
# 如果需要生成PDF报告
|
||
if args.generate_pdf:
|
||
json_summary_path_for_pdf = output_directory / "summary.json"
|
||
|
||
# 如果用户请求的不是json,我们需要额外生成一份json文件作为PDF的数据源
|
||
if args.format != 'json':
|
||
with open(json_summary_path_for_pdf, 'w', encoding='utf-8') as f:
|
||
f.write(test_summary.to_json(pretty=True))
|
||
logger.info(f"为生成PDF而创建临时JSON摘要: {json_summary_path_for_pdf}")
|
||
|
||
pdf_report_path = output_directory / "report_cn.pdf"
|
||
save_pdf_report(test_summary.to_dict(), pdf_report_path, args.strictness_level)
|
||
|
||
# API调用详情报告也应保存在同一时间戳目录中
|
||
api_calls_filename = "api_call_details.md"
|
||
if orchestrator:
|
||
save_api_call_details_to_file(
|
||
orchestrator.get_api_call_details(),
|
||
str(output_directory),
|
||
filename=api_calls_filename
|
||
)
|
||
|
||
# Improved HTML report summary access
|
||
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
|
||
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
|
||
|
||
if failed_count > 0 or error_count > 0:
|
||
logger.info("部分测试失败或出错,请检查报告。")
|
||
# sys.exit(1) # Keep this commented if a report is always desired regardless of outcome
|
||
else:
|
||
logger.info("所有测试完成。")
|
||
else:
|
||
logger.error("未能生成测试摘要。")
|
||
sys.exit(1)
|
||
|
||
sys.exit(0)
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|
||
# python run_api_tests.py --base-url http://127.0.0.1:4523/m1/6389742-6086420-default --swagger assets/doc/井筒API示例swagger.json --custom-test-cases-dir ./custom_testcases \
|
||
# --verbose \
|
||
# --output test_report.json
|
||
|
||
# python run_api_tests.py --base-url https://127.0.0.1:4523/m1/6389742-6086420-default --yapi assets/doc/井筒API示例_simple.json --custom-test-cases-dir ./custom_testcases \
|
||
# --verbose \
|
||
# --output test_report.json
|
||
|
||
# 示例:同时运行测试用例和场景
|
||
# python run_api_tests.py --base-url http://127.0.0.1:8000 --swagger ./assets/doc/petstore_swagger.json --custom-test-cases-dir ./custom_testcases --stages-dir ./custom_stages -v -o reports/ |