From 402b4662042b9bc57654d8c6f5e3f4f51923ad4c Mon Sep 17 00:00:00 2001 From: gongwenxin Date: Wed, 27 Aug 2025 16:39:08 +0800 Subject: [PATCH] fix:fastapi --- fastapi_server.py | 638 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 602 insertions(+), 36 deletions(-) diff --git a/fastapi_server.py b/fastapi_server.py index 65e6063..de3ea32 100644 --- a/fastapi_server.py +++ b/fastapi_server.py @@ -62,11 +62,11 @@ app = FastAPI( 主要特性 - - 🚀 高性能: 基于FastAPI,支持异步处理 - - 📊 分页支持: 解决大量API节点的内存问题 - - 📝 自动文档: 自动生成交互式API文档 - - 🔧 灵活配置: 支持多种测试配置选项 - - 📈 详细报告: 生成PDF和JSON格式的测试报告 + 🚀 高性能: 基于FastAPI,支持异步处理 + 📊 分页支持: 解决大量API节点的内存问题 + 📝 自动文档: 自动生成交互式API文档 + 🔧 灵活配置: 支持多种测试配置选项 + 📈 详细报告: 生成PDF和JSON格式的测试报告 """, version="1.0.0", docs_url="/docs", # Swagger UI @@ -275,12 +275,17 @@ def run_tests_logic(config: dict): # Save API call details api_calls_filename = "api_call_details.md" - save_api_call_details_to_markdown( + save_api_call_details_to_file( orchestrator.get_api_call_details(), str(output_directory), filename=api_calls_filename ) + # Generate PDF report if reportlab is available + if reportlab_available and config.get('generate_pdf', True): + pdf_report_path = output_directory / "report_cn.pdf" + save_pdf_report(test_summary.to_dict(), pdf_report_path, config.get('strictness_level', 'CRITICAL')) + failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0) error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0) @@ -307,47 +312,608 @@ def run_tests_logic(config: dict): "traceback": traceback.format_exc() } -def save_api_call_details_to_markdown(api_call_details: List[APICallDetail], output_dir: str, filename: str = "api_call_details.md"): - """Save API call details to markdown file""" +def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"): + """ + 将API调用详情列表保存到指定目录下的 Markdown 文件中。 + 同时,额外生成一个纯文本文件 (.txt),每行包含一个 cURL 命令。 + """ + if not api_call_details: + logger.info("没有API调用详情可供保存。") + return + + output_dir = Path(output_dir_path) try: - output_path = Path(output_dir) / filename + output_dir.mkdir(parents=True, exist_ok=True) + except OSError as e: + logger.error(f"创建API调用详情输出目录 {output_dir} 失败: {e}") + return - with open(output_path, 'w', encoding='utf-8') as f: - f.write("# API调用详情\n\n") + # 主文件是 Markdown 文件 + md_output_file = output_dir / filename + # 确保它是 .md,尽管 main 函数应该已经处理了 + if md_output_file.suffix.lower() not in ['.md', '.markdown']: + md_output_file = md_output_file.with_suffix('.md') - for i, detail in enumerate(api_call_details, 1): - f.write(f" {i}. {detail.endpoint_name}\n\n") - f.write(f"请求URL: `{detail.request_url}`\n\n") - f.write(f"请求方法: `{detail.request_method}`\n\n") + markdown_content = [] - if detail.request_headers: - f.write("请求头:\n```json\n") - f.write(json.dumps(detail.request_headers, indent=2, ensure_ascii=False)) - f.write("\n```\n\n") + for detail in api_call_details: + + # Request URL with params (if any) + url_to_display = detail.request_url + if detail.request_params: + try: + # Ensure urllib is available for this formatting step + import urllib.parse + query_string = urllib.parse.urlencode(detail.request_params) + url_to_display = f"{detail.request_url}?{query_string}" + except Exception as e: + logger.warning(f"Error formatting URL with params for display: {e}") + # Fallback to just the base URL if params formatting fails + + markdown_content.append(f"## `{detail.request_method} {url_to_display}`") + markdown_content.append("**cURL Command:**") + markdown_content.append("```sh") + markdown_content.append(detail.curl_command) + markdown_content.append("```") + markdown_content.append("### Request Details") + markdown_content.append(f"- **Method:** `{detail.request_method}`") + markdown_content.append(f"- **Full URL:** `{url_to_display}`") + + markdown_content.append("- **Headers:**") + markdown_content.append("```json") + markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False)) + markdown_content.append("```") - if detail.request_body: - f.write("请求体:\n```json\n") - f.write(json.dumps(detail.request_body, indent=2, ensure_ascii=False)) - f.write("\n```\n\n") + if detail.request_params: + markdown_content.append("- **Query Parameters:**") + markdown_content.append("```json") + markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False)) + markdown_content.append("```") - f.write(f"响应状态码: `{detail.response_status_code}`\n\n") + if detail.request_body is not None: + markdown_content.append("- **Body:**") + body_lang = "text" + formatted_body = str(detail.request_body) + try: + # Try to parse as JSON for pretty printing + if isinstance(detail.request_body, str): + try: + parsed_json = json.loads(detail.request_body) + formatted_body = json.dumps(parsed_json, indent=2, ensure_ascii=False) + body_lang = "json" + except json.JSONDecodeError: + pass # Keep as text + elif isinstance(detail.request_body, (dict, list)): + formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False) + body_lang = "json" + except Exception as e: + logger.warning(f"Error formatting request body for Markdown: {e}") + + markdown_content.append(f"```{body_lang}") + markdown_content.append(formatted_body) + markdown_content.append("```") - if detail.response_headers: - f.write("响应头:\n```json\n") - f.write(json.dumps(detail.response_headers, indent=2, ensure_ascii=False)) - f.write("\n```\n\n") + markdown_content.append("### Response Details") + markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`") + markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`") + + markdown_content.append("- **Headers:**") + markdown_content.append("```json") + markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False)) + markdown_content.append("```") - if detail.response_body: - f.write("响应体:\n```json\n") - f.write(json.dumps(detail.response_body, indent=2, ensure_ascii=False)) - f.write("\n```\n\n") + if detail.response_body is not None: + markdown_content.append("- **Body:**") + resp_body_lang = "text" + formatted_resp_body = str(detail.response_body) + try: + # Try to parse as JSON for pretty printing + if isinstance(detail.response_body, str): + try: + # If it's already a string that might be JSON, try parsing and re-dumping + parsed_json_resp = json.loads(detail.response_body) + formatted_resp_body = json.dumps(parsed_json_resp, indent=2, ensure_ascii=False) + resp_body_lang = "json" + except json.JSONDecodeError: + # It's a string, but not valid JSON, keep as text + pass + elif isinstance(detail.response_body, (dict, list)): + # It's already a dict/list, dump it as JSON + formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False) + resp_body_lang = "json" + # If it's neither string nor dict/list (e.g. int, bool from parsed json), str() is fine. + except Exception as e: + logger.warning(f"Error formatting response body for Markdown: {e}") - f.write("---\n\n") - - logger.info(f"API call details saved to: {output_path}") + markdown_content.append(f"```{resp_body_lang}") + markdown_content.append(formatted_resp_body) + markdown_content.append("```") + markdown_content.append("") # Add a blank line for spacing before next --- or EOF + markdown_content.append("---") # Separator + try: + with open(md_output_file, 'w', encoding='utf-8') as f_md: + f_md.write("\n".join(markdown_content)) + logger.info(f"API调用详情已保存为 Markdown: {md_output_file}") except Exception as e: - logger.error(f"Error saving API call details: {e}") + logger.error(f"保存API调用详情到 Markdown 文件 {md_output_file} 失败: {e}", exc_info=True) + +def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'): + """将测试摘要保存为格式化的PDF文件""" + logger.info(f"开始生成PDF报告: {output_path}") + output_path.parent.mkdir(parents=True, exist_ok=True) + + try: + # --- 统一的字体管理和注册 --- + font_name = 'SimSun' # 使用一个简单清晰的注册名 + font_path = 'assets/fonts/STHeiti-Medium-4.ttc' + + if not Path(font_path).exists(): + logger.error(f"字体文件未找到: {Path(font_path).resolve()}") + return + + # 关键修复: 对于 .ttc (TrueType Collection) 文件, 必须指定 subfontIndex + pdfmetrics.registerFont(TTFont(font_name, font_path, subfontIndex=0)) + # 将注册的字体关联到 'SimSun' 字体族 + pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name) + + doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告") + elements = [] + + # --- 统一样式定义, 全部使用注册的字体名 --- + styles = getSampleStyleSheet() + title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28) + heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8) + normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14) + small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12) + + def to_para(text, style=normal_style, escape=True): + """ + 根据用户建议移除 textwrap 以进行诊断。 + 此版本只包含净化和基本的换行符替换。 + """ + if text is None: + content = "" + else: + content = str(text) + + if escape: + content = html.escape(content) + + # 依然保留Unicode控制字符的净化 + content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C') + + if not content.strip(): + # 对于完全空白或None的输入,返回一个安全的非换行空格 + return Paragraph(' ', style) + + # 只使用基本的换行符替换 + content = content.replace('\n', '
') + + return Paragraph(content, style) + + # 3. 填充PDF内容 - 优化后的报告格式 + + # 生成报告编码(基于时间戳) + import time + report_code = f"DMS-TEST-{int(time.time())}" + + # 报告标题 + elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False)) + elements.append(Spacer(1, 15)) + + # 报告基本信息表格 + basic_info_data = [ + [to_para("报告编码", escape=False), to_para(report_code)], + [to_para("报告名称", escape=False), to_para("DMS领域数据服务测试分析报告")], + [to_para("申请日期", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日'))], + [to_para("申请人", escape=False), to_para("系统管理员")], + [to_para("服务供应商名称", escape=False), to_para("数据管理系统(DMS)")], + ] + basic_info_table = Table(basic_info_data, colWidths=[120, '*']) + basic_info_table.setStyle(TableStyle([ + ('GRID', (0,0), (-1,-1), 1, colors.grey), + ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), + ('BACKGROUND', (0,0), (0,-1), colors.lightgrey) + ])) + elements.append(basic_info_table) + elements.append(Spacer(1, 20)) + + # 摘要部分 + elements.append(to_para("摘要", heading_style, escape=False)) + overall = summary_data.get('overall_summary', {}) + + # 从JSON提取并格式化时间 + try: + start_time_str = summary_data.get('start_time', 'N/A') + end_time_str = summary_data.get('end_time', 'N/A') + duration = summary_data.get('duration_seconds', summary_data.get('duration', 0.0)) + + start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A' + end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A' + except: + start_time_formatted = start_time_str + end_time_formatted = end_time_str + + # 摘要内容 - 安全计算跳过的数量 + def safe_subtract(total, passed, failed): + """安全地计算跳过的数量""" + try: + if isinstance(total, (int, float)) and isinstance(passed, (int, float)) and isinstance(failed, (int, float)): + return max(0, total - passed - failed) + else: + return 0 + except: + return 0 + + endpoints_tested = overall.get('endpoints_tested', 0) + endpoints_passed = overall.get('endpoints_passed', 0) + endpoints_failed = overall.get('endpoints_failed', 0) + endpoints_skipped = safe_subtract(endpoints_tested, endpoints_passed, endpoints_failed) + + test_cases_executed = overall.get('total_test_cases_executed', 0) + test_cases_passed = overall.get('test_cases_passed', 0) + test_cases_failed = overall.get('test_cases_failed', 0) + test_cases_skipped = safe_subtract(test_cases_executed, test_cases_passed, test_cases_failed) + + stages_executed = overall.get('total_stages_executed', 0) + stages_passed = overall.get('stages_passed', 0) + stages_failed = overall.get('stages_failed', 0) + stages_skipped = safe_subtract(stages_executed, stages_passed, stages_failed) + + summary_text = f"""本次测试针对DMS(数据管理系统)领域数据服务进行全面的合规性验证。 +测试时间:{start_time_formatted} 至 {end_time_formatted},总耗时 {float(duration):.2f} 秒。 +共测试 {endpoints_tested} 个API端点,其中 {endpoints_passed} 个通过,{endpoints_failed} 个失败,{endpoints_skipped} 个跳过,端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}。 +执行 {test_cases_executed} 个测试用例,其中 {test_cases_passed} 个通过,{test_cases_failed} 个失败,{test_cases_skipped} 个跳过,测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}。 +执行 {stages_executed} 个流程测试,其中 {stages_passed} 个通过,{stages_failed} 个失败,{stages_skipped} 个跳过,流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}。""" + + elements.append(to_para(summary_text, normal_style)) + elements.append(Spacer(1, 20)) + + # 测试内容包括 - API列表表格 + elements.append(to_para("测试内容包括", heading_style, escape=False)) + + # 从测试结果中提取API信息 + endpoint_results = summary_data.get('endpoint_results', []) + api_list_data = [ + [to_para("序号", escape=False), to_para("服务名称", escape=False), + to_para("服务功能描述", escape=False), to_para("服务参数描述", escape=False), + to_para("服务返回值描述", escape=False)] + ] + + for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API + endpoint_name = endpoint.get('endpoint_name', 'N/A') + + # 简化的功能描述 + if 'Create' in endpoint_name: + func_desc = "提供数据创建服务" + elif 'List' in endpoint_name or 'Query' in endpoint_name: + func_desc = "提供数据查询和列表服务" + elif 'Read' in endpoint_name: + func_desc = "提供单条数据读取服务" + elif 'Update' in endpoint_name: + func_desc = "提供数据更新服务" + elif 'Delete' in endpoint_name: + func_desc = "提供数据删除服务" + else: + func_desc = "提供数据管理服务" + + api_list_data.append([ + to_para(str(i), small_style), + to_para(endpoint_name, small_style), + to_para(func_desc, small_style), + to_para("标准DMS参数格式", small_style), + to_para("标准DMS响应格式", small_style) + ]) + + api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80]) + api_list_table.setStyle(TableStyle([ + ('GRID', (0,0), (-1,-1), 1, colors.grey), + ('BACKGROUND', (0,0), (-1,0), colors.lightgrey), + ('ALIGN', (0,0), (-1,-1), 'CENTER'), + ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), + ('FONTSIZE', (0,0), (-1,-1), 8) + ])) + elements.append(api_list_table) + elements.append(Spacer(1, 20)) + + # 测试用例列表 - 根据严格等级分为必须和非必须 + elements.append(to_para("测试用例列表", heading_style, escape=False)) + + # 定义严重性等级的数值映射 + severity_levels = { + 'CRITICAL': 5, + 'HIGH': 4, + 'MEDIUM': 3, + 'LOW': 2, + 'INFO': 1 + } + + strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL + + # 收集所有测试用例(包括endpoint用例和stage用例) + all_test_cases = [] + failed_test_cases = [] # 专门收集失败的测试用例 + + # 1. 收集endpoint测试用例 + for endpoint_result in endpoint_results: + test_cases = endpoint_result.get('executed_test_cases', []) + for tc in test_cases: + tc_severity = tc.get('test_case_severity', 'MEDIUM') + tc_severity_value = severity_levels.get(tc_severity, 3) + tc_status = tc.get('status', 'N/A') + tc_message = tc.get('message', '') + + test_case_info = { + 'type': 'Endpoint', + 'endpoint': endpoint_result.get('endpoint_name', 'N/A'), + 'endpoint_id': endpoint_result.get('endpoint_id', 'N/A'), + 'case_name': tc.get('test_case_name', 'N/A'), + 'case_id': tc.get('test_case_id', 'N/A'), + 'status': tc_status, + 'message': tc_message, + 'severity': tc_severity, + 'severity_value': tc_severity_value, + 'is_required': tc_severity_value >= strictness_value, + 'duration': tc.get('duration_seconds', 0), + 'timestamp': tc.get('timestamp', '') + } + + all_test_cases.append(test_case_info) + + # 收集失败的测试用例 + if tc_status in ['失败', 'FAILED', '错误', 'ERROR']: + failed_test_cases.append(test_case_info) + + # 2. 收集stage测试用例 + stage_results = summary_data.get('stage_results', []) + for stage_result in stage_results: + stage_name = stage_result.get('stage_name', 'N/A') + stage_status = stage_result.get('overall_status', 'N/A') + stage_message = stage_result.get('message', stage_result.get('error_message', '')) + stage_severity = 'HIGH' # Stage用例通常是高优先级 + stage_severity_value = severity_levels.get(stage_severity, 4) + + # 将stage作为一个测试用例添加 + stage_case_info = { + 'type': 'Stage', + 'endpoint': f"Stage: {stage_name}", + 'endpoint_id': f"STAGE_{stage_name}", + 'case_name': stage_result.get('description', stage_name), + 'case_id': f"STAGE_{stage_name}", + 'status': stage_status, + 'message': stage_message, + 'severity': stage_severity, + 'severity_value': stage_severity_value, + 'is_required': stage_severity_value >= strictness_value, + 'duration': stage_result.get('duration_seconds', 0), + 'timestamp': stage_result.get('start_time', '') + } + + all_test_cases.append(stage_case_info) + + # 收集失败的stage用例 + if stage_status in ['失败', 'FAILED', '错误', 'ERROR']: + failed_test_cases.append(stage_case_info) + + # 分离必须和非必须的测试用例 + required_cases = [case for case in all_test_cases if case['is_required']] + optional_cases = [case for case in all_test_cases if not case['is_required']] + + # 创建分离的测试用例表格 + if all_test_cases: + # 添加严格等级说明 + strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。" + elements.append(to_para(strictness_text, small_style)) + elements.append(Spacer(1, 10)) + + # 1. 必须的测试用例表格 + if required_cases: + elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False)) + + required_table_data = [ + [to_para("序号", escape=False), to_para("类型", escape=False), + to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False), + to_para("优先级", escape=False), to_para("执行结果", escape=False)] + ] + + for i, case in enumerate(required_cases, 1): + status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status'] + required_table_data.append([ + to_para(str(i), small_style), + to_para(case['type'], small_style), + to_para(case['case_name'], small_style), + to_para(case['endpoint'], small_style), + to_para(case['severity'], small_style), + to_para(status_display, small_style) + ]) + + required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45]) + required_table.setStyle(TableStyle([ + ('GRID', (0,0), (-1,-1), 1, colors.grey), + ('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例 + ('ALIGN', (0,0), (-1,-1), 'CENTER'), + ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), + ('FONTSIZE', (0,0), (-1,-1), 8) + ])) + elements.append(required_table) + elements.append(Spacer(1, 15)) + + # 2. 非必须的测试用例表格 + if optional_cases: + elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False)) + + optional_table_data = [ + [to_para("序号", escape=False), to_para("类型", escape=False), + to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False), + to_para("优先级", escape=False), to_para("执行结果", escape=False)] + ] + + for i, case in enumerate(optional_cases, 1): + status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status'] + optional_table_data.append([ + to_para(str(i), small_style), + to_para(case['type'], small_style), + to_para(case['case_name'], small_style), + to_para(case['endpoint'], small_style), + to_para(case['severity'], small_style), + to_para(status_display, small_style) + ]) + + optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45]) + optional_table.setStyle(TableStyle([ + ('GRID', (0,0), (-1,-1), 1, colors.grey), + ('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例 + ('ALIGN', (0,0), (-1,-1), 'CENTER'), + ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), + ('FONTSIZE', (0,0), (-1,-1), 8) + ])) + elements.append(optional_table) + elements.append(Spacer(1, 10)) + + # 添加用例统计信息 + total_cases = len(all_test_cases) + endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint']) + stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage']) + required_count = len(required_cases) + optional_count = len(optional_cases) + + stats_text = f"""测试用例统计: +总计 {total_cases} 个用例,其中端点用例 {endpoint_cases} 个,阶段用例 {stage_cases} 个。 +必须用例 {required_count} 个,非必须用例 {optional_count} 个。 +严格等级:{strictness_level}({severity_levels.get(strictness_level, 5)}级及以上为必须)。""" + + elements.append(to_para(stats_text, small_style)) + else: + elements.append(to_para("无测试用例执行记录。", normal_style)) + + elements.append(Spacer(1, 20)) + + # 失败用例详情部分 + if failed_test_cases: + elements.append(to_para("失败用例详情分析", heading_style, escape=False)) + elements.append(Spacer(1, 10)) + + # 按严重性分组失败用例 + critical_failures = [tc for tc in failed_test_cases if tc['severity'] == 'CRITICAL'] + high_failures = [tc for tc in failed_test_cases if tc['severity'] == 'HIGH'] + medium_failures = [tc for tc in failed_test_cases if tc['severity'] == 'MEDIUM'] + low_failures = [tc for tc in failed_test_cases if tc['severity'] == 'LOW'] + + failure_summary = f"""失败用例统计: +总计 {len(failed_test_cases)} 个失败用例,其中: +• 严重级别:{len(critical_failures)} 个 +• 高级别:{len(high_failures)} 个 +• 中级别:{len(medium_failures)} 个 +• 低级别:{len(low_failures)} 个 + +以下是详细的失败原因分析:""" + + elements.append(to_para(failure_summary, normal_style)) + elements.append(Spacer(1, 15)) + + # 详细失败用例列表 + for i, failed_case in enumerate(failed_test_cases, 1): + # 用例标题 + case_title = f"{i}. {failed_case['case_name']}" + elements.append(to_para(case_title, ParagraphStyle('case_title', parent=normal_style, fontSize=11, textColor=colors.darkred, spaceAfter=5))) + + # 用例基本信息 + case_info = f"""• 用例ID:{failed_case['case_id']} +• 所属端点:{failed_case['endpoint']} +• 严重级别:{failed_case['severity']} +• 执行状态:{failed_case['status']}""" + + elements.append(to_para(case_info, ParagraphStyle('case_info', parent=small_style, leftIndent=15, spaceAfter=5))) + + # 失败原因 + failure_reason = failed_case.get('message', '无详细错误信息') + if failure_reason: + elements.append(to_para("失败原因:", ParagraphStyle('failure_label', parent=normal_style, fontSize=10, textColor=colors.darkblue, leftIndent=15))) + + # 处理长文本,确保在PDF中正确显示 + if len(failure_reason) > 200: + # 对于很长的错误信息,进行适当的分段 + failure_reason = failure_reason[:200] + "..." + + elements.append(to_para(failure_reason, ParagraphStyle('failure_reason', parent=small_style, leftIndent=30, rightIndent=20, spaceAfter=10, textColor=colors.red))) + + # 添加分隔线 + if i < len(failed_test_cases): + elements.append(HRFlowable(width="80%", thickness=0.5, color=colors.lightgrey)) + elements.append(Spacer(1, 10)) + + elements.append(Spacer(1, 20)) + + elements.append(Spacer(1, 20)) + + # 测试情况说明 + elements.append(to_para("测试情况说明", heading_style, escape=False)) + + test_situation_text = f"""本次测试是对DMS领域数据管理服务V1.0版本下的{overall.get('endpoints_tested', 'N/A')}个API进行验证测试。 +测试:累计发现缺陷{overall.get('test_cases_failed', 0)}个。 +测试执行时间:{start_time_formatted} 至 {end_time_formatted} +测试环境:开发测试环境 +测试方法:自动化API合规性测试""" + + elements.append(to_para(test_situation_text, normal_style)) + elements.append(Spacer(1, 20)) + + # 测试结论 + elements.append(to_para("测试结论", heading_style, escape=False)) + + # 根据测试结果生成结论 + success_rate = overall.get('test_case_success_rate', '0%') + success_rate_num = float(success_rate.replace('%', '')) if success_rate != 'N/A' else 0 + + if success_rate_num >= 90: + conclusion_status = "通过" + conclusion_text = f"""本套领域数据服务已通过环境验证,系统可以正常运行。验收测试通过标准关于用例执行、DMS业务流相关文档等两个方面分析,该项目通过验收测试。 +测试用例成功率达到{success_rate},符合验收标准。""" + elif success_rate_num >= 70: + conclusion_status = "基本通过" + conclusion_text = f"""本套领域数据服务基本满足验收要求,但存在部分问题需要修复。测试用例成功率为{success_rate},建议修复失败用例后重新测试。""" + else: + conclusion_status = "不通过" + conclusion_text = f"""本套领域数据服务未达到验收标准,存在较多问题需要修复。测试用例成功率仅为{success_rate},需要全面检查和修复后重新测试。""" + + elements.append(to_para(conclusion_text, normal_style)) + elements.append(Spacer(1, 20)) + + # 检测依据 + elements.append(to_para("检测依据", heading_style, escape=False)) + + detection_basis_text = """集成开发应用支撑系统开放数据生态数据共享要求和评价第1部分:关于DMS领域数据服务的接口要求和测试细则。 +参考标准: +1. DMS数据管理系统API规范V1.0 +2. RESTful API设计规范 +3. 数据安全和隐私保护要求 +4. 系统集成测试标准""" + + elements.append(to_para(detection_basis_text, normal_style)) + elements.append(Spacer(1, 20)) + + # 报告生成信息 + elements.append(to_para("报告生成信息", heading_style, escape=False)) + generation_info_data = [ + [to_para("生成时间", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日 %H:%M:%S'))], + [to_para("生成工具", escape=False), to_para("DMS合规性测试工具")], + [to_para("工具版本", escape=False), to_para("V1.0.0")], + [to_para("测试结论", escape=False), to_para(f"{conclusion_status}", escape=False)], + ] + generation_info_table = Table(generation_info_data, colWidths=[120, '*']) + generation_info_table.setStyle(TableStyle([ + ('GRID', (0,0), (-1,-1), 1, colors.grey), + ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), + ('BACKGROUND', (0,0), (0,-1), colors.lightgrey) + ])) + elements.append(generation_info_table) + + # 构建PDF + doc.build(elements) + logger.info(f"PDF报告已成功生成: {output_path}") + except Exception as e: + logger.error(f"构建PDF文档时出错: {e}", exc_info=True) @app.post("/run", summary="执行API合规性测试",