fix:fastapi

This commit is contained in:
gongwenxin 2025-08-27 16:39:08 +08:00
parent a0ee9aa312
commit 402b466204

View File

@ -62,11 +62,11 @@ app = FastAPI(
主要特性 主要特性
- 🚀 高性能: 基于FastAPI支持异步处理 🚀 高性能: 基于FastAPI支持异步处理
- 📊 分页支持: 解决大量API节点的内存问题 📊 分页支持: 解决大量API节点的内存问题
- 📝 自动文档: 自动生成交互式API文档 📝 自动文档: 自动生成交互式API文档
- 🔧 灵活配置: 支持多种测试配置选项 🔧 灵活配置: 支持多种测试配置选项
- 📈 详细报告: 生成PDF和JSON格式的测试报告 📈 详细报告: 生成PDF和JSON格式的测试报告
""", """,
version="1.0.0", version="1.0.0",
docs_url="/docs", # Swagger UI docs_url="/docs", # Swagger UI
@ -275,12 +275,17 @@ def run_tests_logic(config: dict):
# Save API call details # Save API call details
api_calls_filename = "api_call_details.md" api_calls_filename = "api_call_details.md"
save_api_call_details_to_markdown( save_api_call_details_to_file(
orchestrator.get_api_call_details(), orchestrator.get_api_call_details(),
str(output_directory), str(output_directory),
filename=api_calls_filename filename=api_calls_filename
) )
# Generate PDF report if reportlab is available
if reportlab_available and config.get('generate_pdf', True):
pdf_report_path = output_directory / "report_cn.pdf"
save_pdf_report(test_summary.to_dict(), pdf_report_path, config.get('strictness_level', 'CRITICAL'))
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0) failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0) error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
@ -307,47 +312,608 @@ def run_tests_logic(config: dict):
"traceback": traceback.format_exc() "traceback": traceback.format_exc()
} }
def save_api_call_details_to_markdown(api_call_details: List[APICallDetail], output_dir: str, filename: str = "api_call_details.md"): def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
"""Save API call details to markdown file""" """
将API调用详情列表保存到指定目录下的 Markdown 文件中
同时额外生成一个纯文本文件 (.txt)每行包含一个 cURL 命令
"""
if not api_call_details:
logger.info("没有API调用详情可供保存。")
return
output_dir = Path(output_dir_path)
try: try:
output_path = Path(output_dir) / filename output_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"创建API调用详情输出目录 {output_dir} 失败: {e}")
return
with open(output_path, 'w', encoding='utf-8') as f: # 主文件是 Markdown 文件
f.write("# API调用详情\n\n") md_output_file = output_dir / filename
# 确保它是 .md尽管 main 函数应该已经处理了
if md_output_file.suffix.lower() not in ['.md', '.markdown']:
md_output_file = md_output_file.with_suffix('.md')
for i, detail in enumerate(api_call_details, 1): markdown_content = []
f.write(f" {i}. {detail.endpoint_name}\n\n")
f.write(f"请求URL: `{detail.request_url}`\n\n")
f.write(f"请求方法: `{detail.request_method}`\n\n")
if detail.request_headers: for detail in api_call_details:
f.write("请求头:\n```json\n")
f.write(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
f.write("\n```\n\n")
if detail.request_body:
f.write("请求体:\n```json\n")
f.write(json.dumps(detail.request_body, indent=2, ensure_ascii=False))
f.write("\n```\n\n")
f.write(f"响应状态码: `{detail.response_status_code}`\n\n")
if detail.response_headers:
f.write("响应头:\n```json\n")
f.write(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
f.write("\n```\n\n")
if detail.response_body:
f.write("响应体:\n```json\n")
f.write(json.dumps(detail.response_body, indent=2, ensure_ascii=False))
f.write("\n```\n\n")
f.write("---\n\n")
logger.info(f"API call details saved to: {output_path}")
# Request URL with params (if any)
url_to_display = detail.request_url
if detail.request_params:
try:
# Ensure urllib is available for this formatting step
import urllib.parse
query_string = urllib.parse.urlencode(detail.request_params)
url_to_display = f"{detail.request_url}?{query_string}"
except Exception as e: except Exception as e:
logger.error(f"Error saving API call details: {e}") logger.warning(f"Error formatting URL with params for display: {e}")
# Fallback to just the base URL if params formatting fails
markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
markdown_content.append("**cURL Command:**")
markdown_content.append("```sh")
markdown_content.append(detail.curl_command)
markdown_content.append("```")
markdown_content.append("### Request Details")
markdown_content.append(f"- **Method:** `{detail.request_method}`")
markdown_content.append(f"- **Full URL:** `{url_to_display}`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_params:
markdown_content.append("- **Query Parameters:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_body is not None:
markdown_content.append("- **Body:**")
body_lang = "text"
formatted_body = str(detail.request_body)
try:
# Try to parse as JSON for pretty printing
if isinstance(detail.request_body, str):
try:
parsed_json = json.loads(detail.request_body)
formatted_body = json.dumps(parsed_json, indent=2, ensure_ascii=False)
body_lang = "json"
except json.JSONDecodeError:
pass # Keep as text
elif isinstance(detail.request_body, (dict, list)):
formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False)
body_lang = "json"
except Exception as e:
logger.warning(f"Error formatting request body for Markdown: {e}")
markdown_content.append(f"```{body_lang}")
markdown_content.append(formatted_body)
markdown_content.append("```")
markdown_content.append("### Response Details")
markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.response_body is not None:
markdown_content.append("- **Body:**")
resp_body_lang = "text"
formatted_resp_body = str(detail.response_body)
try:
# Try to parse as JSON for pretty printing
if isinstance(detail.response_body, str):
try:
# If it's already a string that might be JSON, try parsing and re-dumping
parsed_json_resp = json.loads(detail.response_body)
formatted_resp_body = json.dumps(parsed_json_resp, indent=2, ensure_ascii=False)
resp_body_lang = "json"
except json.JSONDecodeError:
# It's a string, but not valid JSON, keep as text
pass
elif isinstance(detail.response_body, (dict, list)):
# It's already a dict/list, dump it as JSON
formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False)
resp_body_lang = "json"
# If it's neither string nor dict/list (e.g. int, bool from parsed json), str() is fine.
except Exception as e:
logger.warning(f"Error formatting response body for Markdown: {e}")
markdown_content.append(f"```{resp_body_lang}")
markdown_content.append(formatted_resp_body)
markdown_content.append("```")
markdown_content.append("") # Add a blank line for spacing before next --- or EOF
markdown_content.append("---") # Separator
try:
with open(md_output_file, 'w', encoding='utf-8') as f_md:
f_md.write("\n".join(markdown_content))
logger.info(f"API调用详情已保存为 Markdown: {md_output_file}")
except Exception as e:
logger.error(f"保存API调用详情到 Markdown 文件 {md_output_file} 失败: {e}", exc_info=True)
def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'):
"""将测试摘要保存为格式化的PDF文件"""
logger.info(f"开始生成PDF报告: {output_path}")
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
# --- 统一的字体管理和注册 ---
font_name = 'SimSun' # 使用一个简单清晰的注册名
font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
if not Path(font_path).exists():
logger.error(f"字体文件未找到: {Path(font_path).resolve()}")
return
# 关键修复: 对于 .ttc (TrueType Collection) 文件, 必须指定 subfontIndex
pdfmetrics.registerFont(TTFont(font_name, font_path, subfontIndex=0))
# 将注册的字体关联到 'SimSun' 字体族
pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
elements = []
# --- 统一样式定义, 全部使用注册的字体名 ---
styles = getSampleStyleSheet()
title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12)
def to_para(text, style=normal_style, escape=True):
"""
根据用户建议移除 textwrap 以进行诊断
此版本只包含净化和基本的换行符替换
"""
if text is None:
content = ""
else:
content = str(text)
if escape:
content = html.escape(content)
# 依然保留Unicode控制字符的净化
content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
if not content.strip():
# 对于完全空白或None的输入返回一个安全的非换行空格
return Paragraph(' ', style)
# 只使用基本的换行符替换
content = content.replace('\n', '<br/>')
return Paragraph(content, style)
# 3. 填充PDF内容 - 优化后的报告格式
# 生成报告编码(基于时间戳)
import time
report_code = f"DMS-TEST-{int(time.time())}"
# 报告标题
elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False))
elements.append(Spacer(1, 15))
# 报告基本信息表格
basic_info_data = [
[to_para("<b>报告编码</b>", escape=False), to_para(report_code)],
[to_para("<b>报告名称</b>", escape=False), to_para("DMS领域数据服务测试分析报告")],
[to_para("<b>申请日期</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d'))],
[to_para("<b>申请人</b>", escape=False), to_para("系统管理员")],
[to_para("<b>服务供应商名称</b>", escape=False), to_para("数据管理系统(DMS)")],
]
basic_info_table = Table(basic_info_data, colWidths=[120, '*'])
basic_info_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
]))
elements.append(basic_info_table)
elements.append(Spacer(1, 20))
# 摘要部分
elements.append(to_para("摘要", heading_style, escape=False))
overall = summary_data.get('overall_summary', {})
# 从JSON提取并格式化时间
try:
start_time_str = summary_data.get('start_time', 'N/A')
end_time_str = summary_data.get('end_time', 'N/A')
duration = summary_data.get('duration_seconds', summary_data.get('duration', 0.0))
start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
except:
start_time_formatted = start_time_str
end_time_formatted = end_time_str
# 摘要内容 - 安全计算跳过的数量
def safe_subtract(total, passed, failed):
"""安全地计算跳过的数量"""
try:
if isinstance(total, (int, float)) and isinstance(passed, (int, float)) and isinstance(failed, (int, float)):
return max(0, total - passed - failed)
else:
return 0
except:
return 0
endpoints_tested = overall.get('endpoints_tested', 0)
endpoints_passed = overall.get('endpoints_passed', 0)
endpoints_failed = overall.get('endpoints_failed', 0)
endpoints_skipped = safe_subtract(endpoints_tested, endpoints_passed, endpoints_failed)
test_cases_executed = overall.get('total_test_cases_executed', 0)
test_cases_passed = overall.get('test_cases_passed', 0)
test_cases_failed = overall.get('test_cases_failed', 0)
test_cases_skipped = safe_subtract(test_cases_executed, test_cases_passed, test_cases_failed)
stages_executed = overall.get('total_stages_executed', 0)
stages_passed = overall.get('stages_passed', 0)
stages_failed = overall.get('stages_failed', 0)
stages_skipped = safe_subtract(stages_executed, stages_passed, stages_failed)
summary_text = f"""本次测试针对DMS数据管理系统领域数据服务进行全面的合规性验证。
测试时间{start_time_formatted} {end_time_formatted}总耗时 {float(duration):.2f}
共测试 {endpoints_tested} 个API端点其中 {endpoints_passed} 个通过{endpoints_failed} 个失败{endpoints_skipped} 个跳过端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}
执行 {test_cases_executed} 个测试用例其中 {test_cases_passed} 个通过{test_cases_failed} 个失败{test_cases_skipped} 个跳过测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}
执行 {stages_executed} 个流程测试其中 {stages_passed} 个通过{stages_failed} 个失败{stages_skipped} 个跳过流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}"""
elements.append(to_para(summary_text, normal_style))
elements.append(Spacer(1, 20))
# 测试内容包括 - API列表表格
elements.append(to_para("测试内容包括", heading_style, escape=False))
# 从测试结果中提取API信息
endpoint_results = summary_data.get('endpoint_results', [])
api_list_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>服务名称</b>", escape=False),
to_para("<b>服务功能描述</b>", escape=False), to_para("<b>服务参数描述</b>", escape=False),
to_para("<b>服务返回值描述</b>", escape=False)]
]
for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API
endpoint_name = endpoint.get('endpoint_name', 'N/A')
# 简化的功能描述
if 'Create' in endpoint_name:
func_desc = "提供数据创建服务"
elif 'List' in endpoint_name or 'Query' in endpoint_name:
func_desc = "提供数据查询和列表服务"
elif 'Read' in endpoint_name:
func_desc = "提供单条数据读取服务"
elif 'Update' in endpoint_name:
func_desc = "提供数据更新服务"
elif 'Delete' in endpoint_name:
func_desc = "提供数据删除服务"
else:
func_desc = "提供数据管理服务"
api_list_data.append([
to_para(str(i), small_style),
to_para(endpoint_name, small_style),
to_para(func_desc, small_style),
to_para("标准DMS参数格式", small_style),
to_para("标准DMS响应格式", small_style)
])
api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80])
api_list_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(api_list_table)
elements.append(Spacer(1, 20))
# 测试用例列表 - 根据严格等级分为必须和非必须
elements.append(to_para("测试用例列表", heading_style, escape=False))
# 定义严重性等级的数值映射
severity_levels = {
'CRITICAL': 5,
'HIGH': 4,
'MEDIUM': 3,
'LOW': 2,
'INFO': 1
}
strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL
# 收集所有测试用例包括endpoint用例和stage用例
all_test_cases = []
failed_test_cases = [] # 专门收集失败的测试用例
# 1. 收集endpoint测试用例
for endpoint_result in endpoint_results:
test_cases = endpoint_result.get('executed_test_cases', [])
for tc in test_cases:
tc_severity = tc.get('test_case_severity', 'MEDIUM')
tc_severity_value = severity_levels.get(tc_severity, 3)
tc_status = tc.get('status', 'N/A')
tc_message = tc.get('message', '')
test_case_info = {
'type': 'Endpoint',
'endpoint': endpoint_result.get('endpoint_name', 'N/A'),
'endpoint_id': endpoint_result.get('endpoint_id', 'N/A'),
'case_name': tc.get('test_case_name', 'N/A'),
'case_id': tc.get('test_case_id', 'N/A'),
'status': tc_status,
'message': tc_message,
'severity': tc_severity,
'severity_value': tc_severity_value,
'is_required': tc_severity_value >= strictness_value,
'duration': tc.get('duration_seconds', 0),
'timestamp': tc.get('timestamp', '')
}
all_test_cases.append(test_case_info)
# 收集失败的测试用例
if tc_status in ['失败', 'FAILED', '错误', 'ERROR']:
failed_test_cases.append(test_case_info)
# 2. 收集stage测试用例
stage_results = summary_data.get('stage_results', [])
for stage_result in stage_results:
stage_name = stage_result.get('stage_name', 'N/A')
stage_status = stage_result.get('overall_status', 'N/A')
stage_message = stage_result.get('message', stage_result.get('error_message', ''))
stage_severity = 'HIGH' # Stage用例通常是高优先级
stage_severity_value = severity_levels.get(stage_severity, 4)
# 将stage作为一个测试用例添加
stage_case_info = {
'type': 'Stage',
'endpoint': f"Stage: {stage_name}",
'endpoint_id': f"STAGE_{stage_name}",
'case_name': stage_result.get('description', stage_name),
'case_id': f"STAGE_{stage_name}",
'status': stage_status,
'message': stage_message,
'severity': stage_severity,
'severity_value': stage_severity_value,
'is_required': stage_severity_value >= strictness_value,
'duration': stage_result.get('duration_seconds', 0),
'timestamp': stage_result.get('start_time', '')
}
all_test_cases.append(stage_case_info)
# 收集失败的stage用例
if stage_status in ['失败', 'FAILED', '错误', 'ERROR']:
failed_test_cases.append(stage_case_info)
# 分离必须和非必须的测试用例
required_cases = [case for case in all_test_cases if case['is_required']]
optional_cases = [case for case in all_test_cases if not case['is_required']]
# 创建分离的测试用例表格
if all_test_cases:
# 添加严格等级说明
strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。"
elements.append(to_para(strictness_text, small_style))
elements.append(Spacer(1, 10))
# 1. 必须的测试用例表格
if required_cases:
elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False))
required_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(required_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
required_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45])
required_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(required_table)
elements.append(Spacer(1, 15))
# 2. 非必须的测试用例表格
if optional_cases:
elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False))
optional_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(optional_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
optional_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45])
optional_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(optional_table)
elements.append(Spacer(1, 10))
# 添加用例统计信息
total_cases = len(all_test_cases)
endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint'])
stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage'])
required_count = len(required_cases)
optional_count = len(optional_cases)
stats_text = f"""测试用例统计:
总计 {total_cases} 个用例其中端点用例 {endpoint_cases} 阶段用例 {stage_cases}
必须用例 {required_count} 非必须用例 {optional_count}
严格等级{strictness_level}{severity_levels.get(strictness_level, 5)}级及以上为必须"""
elements.append(to_para(stats_text, small_style))
else:
elements.append(to_para("无测试用例执行记录。", normal_style))
elements.append(Spacer(1, 20))
# 失败用例详情部分
if failed_test_cases:
elements.append(to_para("失败用例详情分析", heading_style, escape=False))
elements.append(Spacer(1, 10))
# 按严重性分组失败用例
critical_failures = [tc for tc in failed_test_cases if tc['severity'] == 'CRITICAL']
high_failures = [tc for tc in failed_test_cases if tc['severity'] == 'HIGH']
medium_failures = [tc for tc in failed_test_cases if tc['severity'] == 'MEDIUM']
low_failures = [tc for tc in failed_test_cases if tc['severity'] == 'LOW']
failure_summary = f"""失败用例统计:
总计 {len(failed_test_cases)} 个失败用例其中
严重级别{len(critical_failures)}
高级别{len(high_failures)}
中级别{len(medium_failures)}
低级别{len(low_failures)}
以下是详细的失败原因分析"""
elements.append(to_para(failure_summary, normal_style))
elements.append(Spacer(1, 15))
# 详细失败用例列表
for i, failed_case in enumerate(failed_test_cases, 1):
# 用例标题
case_title = f"{i}. {failed_case['case_name']}"
elements.append(to_para(case_title, ParagraphStyle('case_title', parent=normal_style, fontSize=11, textColor=colors.darkred, spaceAfter=5)))
# 用例基本信息
case_info = f"""• 用例ID{failed_case['case_id']}
所属端点{failed_case['endpoint']}
严重级别{failed_case['severity']}
执行状态{failed_case['status']}"""
elements.append(to_para(case_info, ParagraphStyle('case_info', parent=small_style, leftIndent=15, spaceAfter=5)))
# 失败原因
failure_reason = failed_case.get('message', '无详细错误信息')
if failure_reason:
elements.append(to_para("失败原因:", ParagraphStyle('failure_label', parent=normal_style, fontSize=10, textColor=colors.darkblue, leftIndent=15)))
# 处理长文本确保在PDF中正确显示
if len(failure_reason) > 200:
# 对于很长的错误信息,进行适当的分段
failure_reason = failure_reason[:200] + "..."
elements.append(to_para(failure_reason, ParagraphStyle('failure_reason', parent=small_style, leftIndent=30, rightIndent=20, spaceAfter=10, textColor=colors.red)))
# 添加分隔线
if i < len(failed_test_cases):
elements.append(HRFlowable(width="80%", thickness=0.5, color=colors.lightgrey))
elements.append(Spacer(1, 10))
elements.append(Spacer(1, 20))
elements.append(Spacer(1, 20))
# 测试情况说明
elements.append(to_para("测试情况说明", heading_style, escape=False))
test_situation_text = f"""本次测试是对DMS领域数据管理服务V1.0版本下的{overall.get('endpoints_tested', 'N/A')}个API进行验证测试。
测试累计发现缺陷{overall.get('test_cases_failed', 0)}
测试执行时间{start_time_formatted} {end_time_formatted}
测试环境开发测试环境
测试方法自动化API合规性测试"""
elements.append(to_para(test_situation_text, normal_style))
elements.append(Spacer(1, 20))
# 测试结论
elements.append(to_para("测试结论", heading_style, escape=False))
# 根据测试结果生成结论
success_rate = overall.get('test_case_success_rate', '0%')
success_rate_num = float(success_rate.replace('%', '')) if success_rate != 'N/A' else 0
if success_rate_num >= 90:
conclusion_status = "通过"
conclusion_text = f"""本套领域数据服务已通过环境验证系统可以正常运行。验收测试通过标准关于用例执行、DMS业务流相关文档等两个方面分析该项目通过验收测试。
测试用例成功率达到{success_rate}符合验收标准"""
elif success_rate_num >= 70:
conclusion_status = "基本通过"
conclusion_text = f"""本套领域数据服务基本满足验收要求,但存在部分问题需要修复。测试用例成功率为{success_rate},建议修复失败用例后重新测试。"""
else:
conclusion_status = "不通过"
conclusion_text = f"""本套领域数据服务未达到验收标准,存在较多问题需要修复。测试用例成功率仅为{success_rate},需要全面检查和修复后重新测试。"""
elements.append(to_para(conclusion_text, normal_style))
elements.append(Spacer(1, 20))
# 检测依据
elements.append(to_para("检测依据", heading_style, escape=False))
detection_basis_text = """集成开发应用支撑系统开放数据生态数据共享要求和评价第1部分关于DMS领域数据服务的接口要求和测试细则。
参考标准
1. DMS数据管理系统API规范V1.0
2. RESTful API设计规范
3. 数据安全和隐私保护要求
4. 系统集成测试标准"""
elements.append(to_para(detection_basis_text, normal_style))
elements.append(Spacer(1, 20))
# 报告生成信息
elements.append(to_para("报告生成信息", heading_style, escape=False))
generation_info_data = [
[to_para("<b>生成时间</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d%H:%M:%S'))],
[to_para("<b>生成工具</b>", escape=False), to_para("DMS合规性测试工具")],
[to_para("<b>工具版本</b>", escape=False), to_para("V1.0.0")],
[to_para("<b>测试结论</b>", escape=False), to_para(f"<b>{conclusion_status}</b>", escape=False)],
]
generation_info_table = Table(generation_info_data, colWidths=[120, '*'])
generation_info_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
]))
elements.append(generation_info_table)
# 构建PDF
doc.build(elements)
logger.info(f"PDF报告已成功生成: {output_path}")
except Exception as e:
logger.error(f"构建PDF文档时出错: {e}", exc_info=True)
@app.post("/run", @app.post("/run",
summary="执行API合规性测试", summary="执行API合规性测试",