diff --git a/fastapi_server.py b/fastapi_server.py
index 65e6063..de3ea32 100644
--- a/fastapi_server.py
+++ b/fastapi_server.py
@@ -62,11 +62,11 @@ app = FastAPI(
主要特性
- - 🚀 高性能: 基于FastAPI,支持异步处理
- - 📊 分页支持: 解决大量API节点的内存问题
- - 📝 自动文档: 自动生成交互式API文档
- - 🔧 灵活配置: 支持多种测试配置选项
- - 📈 详细报告: 生成PDF和JSON格式的测试报告
+ 🚀 高性能: 基于FastAPI,支持异步处理
+ 📊 分页支持: 解决大量API节点的内存问题
+ 📝 自动文档: 自动生成交互式API文档
+ 🔧 灵活配置: 支持多种测试配置选项
+ 📈 详细报告: 生成PDF和JSON格式的测试报告
""",
version="1.0.0",
docs_url="/docs", # Swagger UI
@@ -275,12 +275,17 @@ def run_tests_logic(config: dict):
# Save API call details
api_calls_filename = "api_call_details.md"
- save_api_call_details_to_markdown(
+ save_api_call_details_to_file(
orchestrator.get_api_call_details(),
str(output_directory),
filename=api_calls_filename
)
+ # Generate PDF report if reportlab is available
+ if reportlab_available and config.get('generate_pdf', True):
+ pdf_report_path = output_directory / "report_cn.pdf"
+ save_pdf_report(test_summary.to_dict(), pdf_report_path, config.get('strictness_level', 'CRITICAL'))
+
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
@@ -307,47 +312,608 @@ def run_tests_logic(config: dict):
"traceback": traceback.format_exc()
}
-def save_api_call_details_to_markdown(api_call_details: List[APICallDetail], output_dir: str, filename: str = "api_call_details.md"):
- """Save API call details to markdown file"""
+def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
+ """
+ 将API调用详情列表保存到指定目录下的 Markdown 文件中。
+ 同时,额外生成一个纯文本文件 (.txt),每行包含一个 cURL 命令。
+ """
+ if not api_call_details:
+ logger.info("没有API调用详情可供保存。")
+ return
+
+ output_dir = Path(output_dir_path)
try:
- output_path = Path(output_dir) / filename
+ output_dir.mkdir(parents=True, exist_ok=True)
+ except OSError as e:
+ logger.error(f"创建API调用详情输出目录 {output_dir} 失败: {e}")
+ return
- with open(output_path, 'w', encoding='utf-8') as f:
- f.write("# API调用详情\n\n")
+ # 主文件是 Markdown 文件
+ md_output_file = output_dir / filename
+ # 确保它是 .md,尽管 main 函数应该已经处理了
+ if md_output_file.suffix.lower() not in ['.md', '.markdown']:
+ md_output_file = md_output_file.with_suffix('.md')
- for i, detail in enumerate(api_call_details, 1):
- f.write(f" {i}. {detail.endpoint_name}\n\n")
- f.write(f"请求URL: `{detail.request_url}`\n\n")
- f.write(f"请求方法: `{detail.request_method}`\n\n")
+ markdown_content = []
- if detail.request_headers:
- f.write("请求头:\n```json\n")
- f.write(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
- f.write("\n```\n\n")
+ for detail in api_call_details:
+
+ # Request URL with params (if any)
+ url_to_display = detail.request_url
+ if detail.request_params:
+ try:
+ # Ensure urllib is available for this formatting step
+ import urllib.parse
+ query_string = urllib.parse.urlencode(detail.request_params)
+ url_to_display = f"{detail.request_url}?{query_string}"
+ except Exception as e:
+ logger.warning(f"Error formatting URL with params for display: {e}")
+ # Fallback to just the base URL if params formatting fails
+
+ markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
+ markdown_content.append("**cURL Command:**")
+ markdown_content.append("```sh")
+ markdown_content.append(detail.curl_command)
+ markdown_content.append("```")
+ markdown_content.append("### Request Details")
+ markdown_content.append(f"- **Method:** `{detail.request_method}`")
+ markdown_content.append(f"- **Full URL:** `{url_to_display}`")
+
+ markdown_content.append("- **Headers:**")
+ markdown_content.append("```json")
+ markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
+ markdown_content.append("```")
- if detail.request_body:
- f.write("请求体:\n```json\n")
- f.write(json.dumps(detail.request_body, indent=2, ensure_ascii=False))
- f.write("\n```\n\n")
+ if detail.request_params:
+ markdown_content.append("- **Query Parameters:**")
+ markdown_content.append("```json")
+ markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
+ markdown_content.append("```")
- f.write(f"响应状态码: `{detail.response_status_code}`\n\n")
+ if detail.request_body is not None:
+ markdown_content.append("- **Body:**")
+ body_lang = "text"
+ formatted_body = str(detail.request_body)
+ try:
+ # Try to parse as JSON for pretty printing
+ if isinstance(detail.request_body, str):
+ try:
+ parsed_json = json.loads(detail.request_body)
+ formatted_body = json.dumps(parsed_json, indent=2, ensure_ascii=False)
+ body_lang = "json"
+ except json.JSONDecodeError:
+ pass # Keep as text
+ elif isinstance(detail.request_body, (dict, list)):
+ formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False)
+ body_lang = "json"
+ except Exception as e:
+ logger.warning(f"Error formatting request body for Markdown: {e}")
+
+ markdown_content.append(f"```{body_lang}")
+ markdown_content.append(formatted_body)
+ markdown_content.append("```")
- if detail.response_headers:
- f.write("响应头:\n```json\n")
- f.write(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
- f.write("\n```\n\n")
+ markdown_content.append("### Response Details")
+ markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
+ markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
+
+ markdown_content.append("- **Headers:**")
+ markdown_content.append("```json")
+ markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
+ markdown_content.append("```")
- if detail.response_body:
- f.write("响应体:\n```json\n")
- f.write(json.dumps(detail.response_body, indent=2, ensure_ascii=False))
- f.write("\n```\n\n")
+ if detail.response_body is not None:
+ markdown_content.append("- **Body:**")
+ resp_body_lang = "text"
+ formatted_resp_body = str(detail.response_body)
+ try:
+ # Try to parse as JSON for pretty printing
+ if isinstance(detail.response_body, str):
+ try:
+ # If it's already a string that might be JSON, try parsing and re-dumping
+ parsed_json_resp = json.loads(detail.response_body)
+ formatted_resp_body = json.dumps(parsed_json_resp, indent=2, ensure_ascii=False)
+ resp_body_lang = "json"
+ except json.JSONDecodeError:
+ # It's a string, but not valid JSON, keep as text
+ pass
+ elif isinstance(detail.response_body, (dict, list)):
+ # It's already a dict/list, dump it as JSON
+ formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False)
+ resp_body_lang = "json"
+ # If it's neither string nor dict/list (e.g. int, bool from parsed json), str() is fine.
+ except Exception as e:
+ logger.warning(f"Error formatting response body for Markdown: {e}")
- f.write("---\n\n")
-
- logger.info(f"API call details saved to: {output_path}")
+ markdown_content.append(f"```{resp_body_lang}")
+ markdown_content.append(formatted_resp_body)
+ markdown_content.append("```")
+ markdown_content.append("") # Add a blank line for spacing before next --- or EOF
+ markdown_content.append("---") # Separator
+ try:
+ with open(md_output_file, 'w', encoding='utf-8') as f_md:
+ f_md.write("\n".join(markdown_content))
+ logger.info(f"API调用详情已保存为 Markdown: {md_output_file}")
except Exception as e:
- logger.error(f"Error saving API call details: {e}")
+ logger.error(f"保存API调用详情到 Markdown 文件 {md_output_file} 失败: {e}", exc_info=True)
+
+def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'):
+ """将测试摘要保存为格式化的PDF文件"""
+ logger.info(f"开始生成PDF报告: {output_path}")
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+
+ try:
+ # --- 统一的字体管理和注册 ---
+ font_name = 'SimSun' # 使用一个简单清晰的注册名
+ font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
+
+ if not Path(font_path).exists():
+ logger.error(f"字体文件未找到: {Path(font_path).resolve()}")
+ return
+
+ # 关键修复: 对于 .ttc (TrueType Collection) 文件, 必须指定 subfontIndex
+ pdfmetrics.registerFont(TTFont(font_name, font_path, subfontIndex=0))
+ # 将注册的字体关联到 'SimSun' 字体族
+ pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
+
+ doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
+ elements = []
+
+ # --- 统一样式定义, 全部使用注册的字体名 ---
+ styles = getSampleStyleSheet()
+ title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
+ heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
+ normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
+ small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12)
+
+ def to_para(text, style=normal_style, escape=True):
+ """
+ 根据用户建议移除 textwrap 以进行诊断。
+ 此版本只包含净化和基本的换行符替换。
+ """
+ if text is None:
+ content = ""
+ else:
+ content = str(text)
+
+ if escape:
+ content = html.escape(content)
+
+ # 依然保留Unicode控制字符的净化
+ content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
+
+ if not content.strip():
+ # 对于完全空白或None的输入,返回一个安全的非换行空格
+ return Paragraph(' ', style)
+
+ # 只使用基本的换行符替换
+ content = content.replace('\n', '
')
+
+ return Paragraph(content, style)
+
+ # 3. 填充PDF内容 - 优化后的报告格式
+
+ # 生成报告编码(基于时间戳)
+ import time
+ report_code = f"DMS-TEST-{int(time.time())}"
+
+ # 报告标题
+ elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False))
+ elements.append(Spacer(1, 15))
+
+ # 报告基本信息表格
+ basic_info_data = [
+ [to_para("报告编码", escape=False), to_para(report_code)],
+ [to_para("报告名称", escape=False), to_para("DMS领域数据服务测试分析报告")],
+ [to_para("申请日期", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日'))],
+ [to_para("申请人", escape=False), to_para("系统管理员")],
+ [to_para("服务供应商名称", escape=False), to_para("数据管理系统(DMS)")],
+ ]
+ basic_info_table = Table(basic_info_data, colWidths=[120, '*'])
+ basic_info_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
+ ]))
+ elements.append(basic_info_table)
+ elements.append(Spacer(1, 20))
+
+ # 摘要部分
+ elements.append(to_para("摘要", heading_style, escape=False))
+ overall = summary_data.get('overall_summary', {})
+
+ # 从JSON提取并格式化时间
+ try:
+ start_time_str = summary_data.get('start_time', 'N/A')
+ end_time_str = summary_data.get('end_time', 'N/A')
+ duration = summary_data.get('duration_seconds', summary_data.get('duration', 0.0))
+
+ start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
+ end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
+ except:
+ start_time_formatted = start_time_str
+ end_time_formatted = end_time_str
+
+ # 摘要内容 - 安全计算跳过的数量
+ def safe_subtract(total, passed, failed):
+ """安全地计算跳过的数量"""
+ try:
+ if isinstance(total, (int, float)) and isinstance(passed, (int, float)) and isinstance(failed, (int, float)):
+ return max(0, total - passed - failed)
+ else:
+ return 0
+ except:
+ return 0
+
+ endpoints_tested = overall.get('endpoints_tested', 0)
+ endpoints_passed = overall.get('endpoints_passed', 0)
+ endpoints_failed = overall.get('endpoints_failed', 0)
+ endpoints_skipped = safe_subtract(endpoints_tested, endpoints_passed, endpoints_failed)
+
+ test_cases_executed = overall.get('total_test_cases_executed', 0)
+ test_cases_passed = overall.get('test_cases_passed', 0)
+ test_cases_failed = overall.get('test_cases_failed', 0)
+ test_cases_skipped = safe_subtract(test_cases_executed, test_cases_passed, test_cases_failed)
+
+ stages_executed = overall.get('total_stages_executed', 0)
+ stages_passed = overall.get('stages_passed', 0)
+ stages_failed = overall.get('stages_failed', 0)
+ stages_skipped = safe_subtract(stages_executed, stages_passed, stages_failed)
+
+ summary_text = f"""本次测试针对DMS(数据管理系统)领域数据服务进行全面的合规性验证。
+测试时间:{start_time_formatted} 至 {end_time_formatted},总耗时 {float(duration):.2f} 秒。
+共测试 {endpoints_tested} 个API端点,其中 {endpoints_passed} 个通过,{endpoints_failed} 个失败,{endpoints_skipped} 个跳过,端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}。
+执行 {test_cases_executed} 个测试用例,其中 {test_cases_passed} 个通过,{test_cases_failed} 个失败,{test_cases_skipped} 个跳过,测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}。
+执行 {stages_executed} 个流程测试,其中 {stages_passed} 个通过,{stages_failed} 个失败,{stages_skipped} 个跳过,流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}。"""
+
+ elements.append(to_para(summary_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 测试内容包括 - API列表表格
+ elements.append(to_para("测试内容包括", heading_style, escape=False))
+
+ # 从测试结果中提取API信息
+ endpoint_results = summary_data.get('endpoint_results', [])
+ api_list_data = [
+ [to_para("序号", escape=False), to_para("服务名称", escape=False),
+ to_para("服务功能描述", escape=False), to_para("服务参数描述", escape=False),
+ to_para("服务返回值描述", escape=False)]
+ ]
+
+ for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API
+ endpoint_name = endpoint.get('endpoint_name', 'N/A')
+
+ # 简化的功能描述
+ if 'Create' in endpoint_name:
+ func_desc = "提供数据创建服务"
+ elif 'List' in endpoint_name or 'Query' in endpoint_name:
+ func_desc = "提供数据查询和列表服务"
+ elif 'Read' in endpoint_name:
+ func_desc = "提供单条数据读取服务"
+ elif 'Update' in endpoint_name:
+ func_desc = "提供数据更新服务"
+ elif 'Delete' in endpoint_name:
+ func_desc = "提供数据删除服务"
+ else:
+ func_desc = "提供数据管理服务"
+
+ api_list_data.append([
+ to_para(str(i), small_style),
+ to_para(endpoint_name, small_style),
+ to_para(func_desc, small_style),
+ to_para("标准DMS参数格式", small_style),
+ to_para("标准DMS响应格式", small_style)
+ ])
+
+ api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80])
+ api_list_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
+ ('ALIGN', (0,0), (-1,-1), 'CENTER'),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('FONTSIZE', (0,0), (-1,-1), 8)
+ ]))
+ elements.append(api_list_table)
+ elements.append(Spacer(1, 20))
+
+ # 测试用例列表 - 根据严格等级分为必须和非必须
+ elements.append(to_para("测试用例列表", heading_style, escape=False))
+
+ # 定义严重性等级的数值映射
+ severity_levels = {
+ 'CRITICAL': 5,
+ 'HIGH': 4,
+ 'MEDIUM': 3,
+ 'LOW': 2,
+ 'INFO': 1
+ }
+
+ strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL
+
+ # 收集所有测试用例(包括endpoint用例和stage用例)
+ all_test_cases = []
+ failed_test_cases = [] # 专门收集失败的测试用例
+
+ # 1. 收集endpoint测试用例
+ for endpoint_result in endpoint_results:
+ test_cases = endpoint_result.get('executed_test_cases', [])
+ for tc in test_cases:
+ tc_severity = tc.get('test_case_severity', 'MEDIUM')
+ tc_severity_value = severity_levels.get(tc_severity, 3)
+ tc_status = tc.get('status', 'N/A')
+ tc_message = tc.get('message', '')
+
+ test_case_info = {
+ 'type': 'Endpoint',
+ 'endpoint': endpoint_result.get('endpoint_name', 'N/A'),
+ 'endpoint_id': endpoint_result.get('endpoint_id', 'N/A'),
+ 'case_name': tc.get('test_case_name', 'N/A'),
+ 'case_id': tc.get('test_case_id', 'N/A'),
+ 'status': tc_status,
+ 'message': tc_message,
+ 'severity': tc_severity,
+ 'severity_value': tc_severity_value,
+ 'is_required': tc_severity_value >= strictness_value,
+ 'duration': tc.get('duration_seconds', 0),
+ 'timestamp': tc.get('timestamp', '')
+ }
+
+ all_test_cases.append(test_case_info)
+
+ # 收集失败的测试用例
+ if tc_status in ['失败', 'FAILED', '错误', 'ERROR']:
+ failed_test_cases.append(test_case_info)
+
+ # 2. 收集stage测试用例
+ stage_results = summary_data.get('stage_results', [])
+ for stage_result in stage_results:
+ stage_name = stage_result.get('stage_name', 'N/A')
+ stage_status = stage_result.get('overall_status', 'N/A')
+ stage_message = stage_result.get('message', stage_result.get('error_message', ''))
+ stage_severity = 'HIGH' # Stage用例通常是高优先级
+ stage_severity_value = severity_levels.get(stage_severity, 4)
+
+ # 将stage作为一个测试用例添加
+ stage_case_info = {
+ 'type': 'Stage',
+ 'endpoint': f"Stage: {stage_name}",
+ 'endpoint_id': f"STAGE_{stage_name}",
+ 'case_name': stage_result.get('description', stage_name),
+ 'case_id': f"STAGE_{stage_name}",
+ 'status': stage_status,
+ 'message': stage_message,
+ 'severity': stage_severity,
+ 'severity_value': stage_severity_value,
+ 'is_required': stage_severity_value >= strictness_value,
+ 'duration': stage_result.get('duration_seconds', 0),
+ 'timestamp': stage_result.get('start_time', '')
+ }
+
+ all_test_cases.append(stage_case_info)
+
+ # 收集失败的stage用例
+ if stage_status in ['失败', 'FAILED', '错误', 'ERROR']:
+ failed_test_cases.append(stage_case_info)
+
+ # 分离必须和非必须的测试用例
+ required_cases = [case for case in all_test_cases if case['is_required']]
+ optional_cases = [case for case in all_test_cases if not case['is_required']]
+
+ # 创建分离的测试用例表格
+ if all_test_cases:
+ # 添加严格等级说明
+ strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。"
+ elements.append(to_para(strictness_text, small_style))
+ elements.append(Spacer(1, 10))
+
+ # 1. 必须的测试用例表格
+ if required_cases:
+ elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False))
+
+ required_table_data = [
+ [to_para("序号", escape=False), to_para("类型", escape=False),
+ to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False),
+ to_para("优先级", escape=False), to_para("执行结果", escape=False)]
+ ]
+
+ for i, case in enumerate(required_cases, 1):
+ status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
+ required_table_data.append([
+ to_para(str(i), small_style),
+ to_para(case['type'], small_style),
+ to_para(case['case_name'], small_style),
+ to_para(case['endpoint'], small_style),
+ to_para(case['severity'], small_style),
+ to_para(status_display, small_style)
+ ])
+
+ required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45])
+ required_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例
+ ('ALIGN', (0,0), (-1,-1), 'CENTER'),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('FONTSIZE', (0,0), (-1,-1), 8)
+ ]))
+ elements.append(required_table)
+ elements.append(Spacer(1, 15))
+
+ # 2. 非必须的测试用例表格
+ if optional_cases:
+ elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False))
+
+ optional_table_data = [
+ [to_para("序号", escape=False), to_para("类型", escape=False),
+ to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False),
+ to_para("优先级", escape=False), to_para("执行结果", escape=False)]
+ ]
+
+ for i, case in enumerate(optional_cases, 1):
+ status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
+ optional_table_data.append([
+ to_para(str(i), small_style),
+ to_para(case['type'], small_style),
+ to_para(case['case_name'], small_style),
+ to_para(case['endpoint'], small_style),
+ to_para(case['severity'], small_style),
+ to_para(status_display, small_style)
+ ])
+
+ optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45])
+ optional_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例
+ ('ALIGN', (0,0), (-1,-1), 'CENTER'),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('FONTSIZE', (0,0), (-1,-1), 8)
+ ]))
+ elements.append(optional_table)
+ elements.append(Spacer(1, 10))
+
+ # 添加用例统计信息
+ total_cases = len(all_test_cases)
+ endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint'])
+ stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage'])
+ required_count = len(required_cases)
+ optional_count = len(optional_cases)
+
+ stats_text = f"""测试用例统计:
+总计 {total_cases} 个用例,其中端点用例 {endpoint_cases} 个,阶段用例 {stage_cases} 个。
+必须用例 {required_count} 个,非必须用例 {optional_count} 个。
+严格等级:{strictness_level}({severity_levels.get(strictness_level, 5)}级及以上为必须)。"""
+
+ elements.append(to_para(stats_text, small_style))
+ else:
+ elements.append(to_para("无测试用例执行记录。", normal_style))
+
+ elements.append(Spacer(1, 20))
+
+ # 失败用例详情部分
+ if failed_test_cases:
+ elements.append(to_para("失败用例详情分析", heading_style, escape=False))
+ elements.append(Spacer(1, 10))
+
+ # 按严重性分组失败用例
+ critical_failures = [tc for tc in failed_test_cases if tc['severity'] == 'CRITICAL']
+ high_failures = [tc for tc in failed_test_cases if tc['severity'] == 'HIGH']
+ medium_failures = [tc for tc in failed_test_cases if tc['severity'] == 'MEDIUM']
+ low_failures = [tc for tc in failed_test_cases if tc['severity'] == 'LOW']
+
+ failure_summary = f"""失败用例统计:
+总计 {len(failed_test_cases)} 个失败用例,其中:
+• 严重级别:{len(critical_failures)} 个
+• 高级别:{len(high_failures)} 个
+• 中级别:{len(medium_failures)} 个
+• 低级别:{len(low_failures)} 个
+
+以下是详细的失败原因分析:"""
+
+ elements.append(to_para(failure_summary, normal_style))
+ elements.append(Spacer(1, 15))
+
+ # 详细失败用例列表
+ for i, failed_case in enumerate(failed_test_cases, 1):
+ # 用例标题
+ case_title = f"{i}. {failed_case['case_name']}"
+ elements.append(to_para(case_title, ParagraphStyle('case_title', parent=normal_style, fontSize=11, textColor=colors.darkred, spaceAfter=5)))
+
+ # 用例基本信息
+ case_info = f"""• 用例ID:{failed_case['case_id']}
+• 所属端点:{failed_case['endpoint']}
+• 严重级别:{failed_case['severity']}
+• 执行状态:{failed_case['status']}"""
+
+ elements.append(to_para(case_info, ParagraphStyle('case_info', parent=small_style, leftIndent=15, spaceAfter=5)))
+
+ # 失败原因
+ failure_reason = failed_case.get('message', '无详细错误信息')
+ if failure_reason:
+ elements.append(to_para("失败原因:", ParagraphStyle('failure_label', parent=normal_style, fontSize=10, textColor=colors.darkblue, leftIndent=15)))
+
+ # 处理长文本,确保在PDF中正确显示
+ if len(failure_reason) > 200:
+ # 对于很长的错误信息,进行适当的分段
+ failure_reason = failure_reason[:200] + "..."
+
+ elements.append(to_para(failure_reason, ParagraphStyle('failure_reason', parent=small_style, leftIndent=30, rightIndent=20, spaceAfter=10, textColor=colors.red)))
+
+ # 添加分隔线
+ if i < len(failed_test_cases):
+ elements.append(HRFlowable(width="80%", thickness=0.5, color=colors.lightgrey))
+ elements.append(Spacer(1, 10))
+
+ elements.append(Spacer(1, 20))
+
+ elements.append(Spacer(1, 20))
+
+ # 测试情况说明
+ elements.append(to_para("测试情况说明", heading_style, escape=False))
+
+ test_situation_text = f"""本次测试是对DMS领域数据管理服务V1.0版本下的{overall.get('endpoints_tested', 'N/A')}个API进行验证测试。
+测试:累计发现缺陷{overall.get('test_cases_failed', 0)}个。
+测试执行时间:{start_time_formatted} 至 {end_time_formatted}
+测试环境:开发测试环境
+测试方法:自动化API合规性测试"""
+
+ elements.append(to_para(test_situation_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 测试结论
+ elements.append(to_para("测试结论", heading_style, escape=False))
+
+ # 根据测试结果生成结论
+ success_rate = overall.get('test_case_success_rate', '0%')
+ success_rate_num = float(success_rate.replace('%', '')) if success_rate != 'N/A' else 0
+
+ if success_rate_num >= 90:
+ conclusion_status = "通过"
+ conclusion_text = f"""本套领域数据服务已通过环境验证,系统可以正常运行。验收测试通过标准关于用例执行、DMS业务流相关文档等两个方面分析,该项目通过验收测试。
+测试用例成功率达到{success_rate},符合验收标准。"""
+ elif success_rate_num >= 70:
+ conclusion_status = "基本通过"
+ conclusion_text = f"""本套领域数据服务基本满足验收要求,但存在部分问题需要修复。测试用例成功率为{success_rate},建议修复失败用例后重新测试。"""
+ else:
+ conclusion_status = "不通过"
+ conclusion_text = f"""本套领域数据服务未达到验收标准,存在较多问题需要修复。测试用例成功率仅为{success_rate},需要全面检查和修复后重新测试。"""
+
+ elements.append(to_para(conclusion_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 检测依据
+ elements.append(to_para("检测依据", heading_style, escape=False))
+
+ detection_basis_text = """集成开发应用支撑系统开放数据生态数据共享要求和评价第1部分:关于DMS领域数据服务的接口要求和测试细则。
+参考标准:
+1. DMS数据管理系统API规范V1.0
+2. RESTful API设计规范
+3. 数据安全和隐私保护要求
+4. 系统集成测试标准"""
+
+ elements.append(to_para(detection_basis_text, normal_style))
+ elements.append(Spacer(1, 20))
+
+ # 报告生成信息
+ elements.append(to_para("报告生成信息", heading_style, escape=False))
+ generation_info_data = [
+ [to_para("生成时间", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日 %H:%M:%S'))],
+ [to_para("生成工具", escape=False), to_para("DMS合规性测试工具")],
+ [to_para("工具版本", escape=False), to_para("V1.0.0")],
+ [to_para("测试结论", escape=False), to_para(f"{conclusion_status}", escape=False)],
+ ]
+ generation_info_table = Table(generation_info_data, colWidths=[120, '*'])
+ generation_info_table.setStyle(TableStyle([
+ ('GRID', (0,0), (-1,-1), 1, colors.grey),
+ ('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
+ ('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
+ ]))
+ elements.append(generation_info_table)
+
+ # 构建PDF
+ doc.build(elements)
+ logger.info(f"PDF报告已成功生成: {output_path}")
+ except Exception as e:
+ logger.error(f"构建PDF文档时出错: {e}", exc_info=True)
@app.post("/run",
summary="执行API合规性测试",