import os import sys import json import logging import datetime import traceback from pathlib import Path from typing import List, Optional, Dict, Any import unicodedata import html # PDF generation libraries - with fallback try: from reportlab.lib import colors from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont reportlab_available = True except ImportError: reportlab_available = False from flask import Flask, request, jsonify # --- Project-specific imports --- from ddms_compliance_suite.api_caller.caller import APICallDetail from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary from ddms_compliance_suite.input_parser.parser import ParsedAPISpec from ddms_compliance_suite.utils.response_utils import extract_data_for_validation from ddms_compliance_suite.utils.data_generator import DataGenerator app = Flask(__name__) # app.config['JSON_AS_ASCII'] = False # 直接返回可读的中文,而不是Unicode转义字符 # --- Logging Configuration --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- Helper functions (migrated from run_api_tests.py) --- def save_results(summary: TestSummary, output_file_path: str, format_type: str): """Saves the main test summary results.""" output_path = Path(output_file_path) try: output_path.parent.mkdir(parents=True, exist_ok=True) except OSError as e: logger.error(f"Error creating directory for output file {output_path.parent}: {e}") return if format_type == 'json': with open(output_path, 'w', encoding='utf-8') as f: f.write(summary.to_json(pretty=True)) logger.info(f"Test results saved to JSON: {output_path}") elif format_type == 'html': html_content = f""" API Test Report

API Test Report

Test Summary

Total Test Cases Executed: {summary.total_test_cases_executed}

Passed: {summary.test_cases_passed}

Failed: {summary.test_cases_failed}

Error: {summary.test_cases_error}

Skipped: {summary.test_cases_skipped_in_endpoint}

Success Rate: {summary.test_case_success_rate:.2f}%

Duration: {summary.duration:.2f}s

Start Time: {summary.start_time.isoformat()}

End Time: {summary.end_time.isoformat() if summary.end_time else 'N/A'}

Detailed Results

""" for endpoint_result in summary.detailed_results: for tc_result in endpoint_result.executed_test_cases: status_class = "pass" if tc_result.status == ExecutedTestCaseResult.Status.PASSED else \ "fail" if tc_result.status == ExecutedTestCaseResult.Status.FAILED else \ "error" if tc_result.status == ExecutedTestCaseResult.Status.ERROR else "skip" html_content += f""" """ html_content += """
Endpoint Test Case ID Test Case Name Status Message Duration (s)
{endpoint_result.endpoint_name} ({endpoint_result.endpoint_id}) {tc_result.test_case_id} {tc_result.test_case_name} {tc_result.status.value} {html.escape(str(tc_result.message))} {tc_result.duration:.4f}
""" with open(output_path, 'w', encoding='utf-8') as f: f.write(html_content) logger.info(f"Test results saved to HTML: {output_path}") def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"): """Saves API call details to a Markdown file.""" if not api_call_details: logger.info("No API call details to save.") return output_dir = Path(output_dir_path) try: output_dir.mkdir(parents=True, exist_ok=True) except OSError as e: logger.error(f"Failed to create output directory for API call details {output_dir}: {e}") return md_output_file = output_dir / filename markdown_content = [] for detail in api_call_details: url_to_display = detail.request_url if detail.request_params: try: import urllib.parse query_string = urllib.parse.urlencode(detail.request_params) url_to_display = f"{detail.request_url}?{query_string}" except Exception as e: logger.warning(f"Error formatting URL with params for display: {e}") markdown_content.append(f"## `{detail.request_method} {url_to_display}`") markdown_content.append("**cURL Command:**") markdown_content.append("```sh") markdown_content.append(detail.curl_command) markdown_content.append("```") markdown_content.append("### Request Details") markdown_content.append(f"- **Method:** `{detail.request_method}`") markdown_content.append(f"- **Full URL:** `{url_to_display}`") markdown_content.append("- **Headers:**") markdown_content.append("```json") markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False)) markdown_content.append("```") if detail.request_params: markdown_content.append("- **Query Parameters:**") markdown_content.append("```json") markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False)) markdown_content.append("```") if detail.request_body is not None: markdown_content.append("- **Body:**") body_lang = "json" if isinstance(detail.request_body, (dict, list)) else "text" formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False) if body_lang == "json" else str(detail.request_body) markdown_content.append(f"```{body_lang}") markdown_content.append(formatted_body) markdown_content.append("```") markdown_content.append("### Response Details") markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`") markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`") markdown_content.append("- **Headers:**") markdown_content.append("```json") markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False)) markdown_content.append("```") if detail.response_body is not None: markdown_content.append("- **Body:**") resp_body_lang = "json" if isinstance(detail.response_body, (dict, list)) else "text" formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False) if resp_body_lang == "json" else str(detail.response_body) markdown_content.append(f"```{resp_body_lang}") markdown_content.append(formatted_resp_body) markdown_content.append("```") markdown_content.append("\n---\n") with open(md_output_file, 'w', encoding='utf-8') as f_md: f_md.write("\n".join(markdown_content)) logger.info(f"API call details saved to Markdown: {md_output_file}") def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'): """Saves the test summary as a formatted PDF file.""" if not reportlab_available: logger.warning("ReportLab library not found. PDF report will not be generated. Please run 'pip install reportlab'.") return logger.info(f"Attempting to generate PDF report at: {output_path}") output_path.parent.mkdir(parents=True, exist_ok=True) try: font_name = 'SimSun' font_path = 'assets/fonts/STHeiti-Medium-4.ttc' if not Path(font_path).exists(): if hasattr(sys, '_MEIPASS'): font_path = Path(sys._MEIPASS) / 'assets' / 'fonts' / 'STHeiti-Medium-4.ttc' if not Path(font_path).exists(): logger.error(f"Font file not found at {font_path}") return pdfmetrics.registerFont(TTFont(font_name, str(font_path), subfontIndex=0)) pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name) doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告") elements = [] styles = getSampleStyleSheet() title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28) heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8) normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14) small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12) # Add a new style for code blocks code_style = ParagraphStyle('Code', parent=styles['Normal'], fontName='Courier', fontSize=9, leading=12, leftIndent=15, backColor=colors.whitesmoke, borderWidth=1, borderColor=colors.lightgrey, padding=5, borderRadius=2) def to_para(text, style=normal_style, escape=True): if text is None: content = "" else: content = str(text) if escape: content = html.escape(content) content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C') if not content.strip(): return Paragraph(' ', style) content = content.replace('\n', '
') return Paragraph(content, style) # PDF Content Generation - 优化后的报告格式 # 生成报告编码(基于时间戳) import time report_code = f"DMS-TEST-{int(time.time())}" # 报告标题 elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False)) elements.append(Spacer(1, 15)) # 报告基本信息表格 basic_info_data = [ [to_para("报告编码", escape=False), to_para(report_code)], [to_para("报告名称", escape=False), to_para("DMS领域数据服务测试分析报告")], [to_para("申请日期", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d日'))], [to_para("申请人", escape=False), to_para("系统管理员")], [to_para("服务供应商名称", escape=False), to_para("数据管理系统(DMS)")], ] basic_info_table = Table(basic_info_data, colWidths=[120, '*']) basic_info_table.setStyle(TableStyle([ ('GRID', (0,0), (-1,-1), 1, colors.grey), ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), ('BACKGROUND', (0,0), (0,-1), colors.lightgrey) ])) elements.append(basic_info_table) elements.append(Spacer(1, 20)) # 摘要部分 elements.append(to_para("摘要", heading_style, escape=False)) overall = summary_data.get('overall_summary', {}) start_time_str = summary_data.get('start_time', 'N/A') end_time_str = summary_data.get('end_time', 'N/A') duration = summary_data.get('duration_seconds', 0.0) start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A' end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A' # 摘要内容 summary_text = f"""本次测试针对DMS(数据管理系统)领域数据服务进行全面的合规性验证。 测试时间:{start_time_formatted} 至 {end_time_formatted},总耗时 {float(duration):.2f} 秒。 共测试 {overall.get('endpoints_tested', 'N/A')} 个API端点,其中 {overall.get('endpoints_passed', 'N/A')} 个通过,{overall.get('endpoints_failed', 'N/A')} 个失败,{overall.get('endpoints_tested', 'N/A')-overall.get('endpoints_passed', 'N/A')-overall.get('endpoints_failed', 'N/A')}个跳过,端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}。 执行 {overall.get('total_test_cases_executed', 'N/A')} 个测试用例,其中 {overall.get('test_cases_passed', 'N/A')} 个通过,{overall.get('test_cases_failed', 'N/A')} 个失败,{overall.get('total_test_cases_executed', 'N/A')-overall.get('test_cases_passed', 'N/A')-overall.get('test_cases_failed', 'N/A')}个跳过,测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}。 执行 {overall.get('total_stages_executed', 'N/A')} 个流程测试,其中 {overall.get('stages_passed', 'N/A')} 个通过,{overall.get('stages_failed', 'N/A')} 个失败,{overall.get('total_stages_executed', 'N/A')-overall.get('stages_passed', 'N/A')-overall.get('stages_failed', 'N/A')}个跳过,流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}。""" elements.append(to_para(summary_text, normal_style)) elements.append(Spacer(1, 20)) # 测试内容包括 - API列表表格 elements.append(to_para("测试内容包括", heading_style, escape=False)) # 从测试结果中提取API信息 endpoint_results = summary_data.get('endpoint_results', []) api_list_data = [ [to_para("序号", escape=False), to_para("服务名称", escape=False), to_para("服务功能描述", escape=False), to_para("服务参数描述", escape=False), to_para("服务返回值描述", escape=False)] ] for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API endpoint_name = endpoint.get('endpoint_name', 'N/A') # 简化的功能描述 if 'Create' in endpoint_name: func_desc = "提供数据创建服务" elif 'List' in endpoint_name or 'Query' in endpoint_name: func_desc = "提供数据查询和列表服务" elif 'Read' in endpoint_name: func_desc = "提供单条数据读取服务" elif 'Update' in endpoint_name: func_desc = "提供数据更新服务" elif 'Delete' in endpoint_name: func_desc = "提供数据删除服务" else: func_desc = "提供数据管理服务" api_list_data.append([ to_para(str(i), small_style), to_para(endpoint_name, small_style), to_para(func_desc, small_style), to_para("标准DMS参数格式", small_style), to_para("标准DMS响应格式", small_style) ]) api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80]) api_list_table.setStyle(TableStyle([ ('GRID', (0,0), (-1,-1), 1, colors.grey), ('BACKGROUND', (0,0), (-1,0), colors.lightgrey), ('ALIGN', (0,0), (-1,-1), 'CENTER'), ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), ('FONTSIZE', (0,0), (-1,-1), 8) ])) elements.append(api_list_table) elements.append(Spacer(1, 20)) # 测试用例列表 - 根据严格等级分为必须和非必须 elements.append(to_para("测试用例列表", heading_style, escape=False)) # 定义严重性等级的数值映射 severity_levels = { 'CRITICAL': 5, 'HIGH': 4, 'MEDIUM': 3, 'LOW': 2, 'INFO': 1 } strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL # 收集所有测试用例(包括endpoint用例和stage用例) all_test_cases = [] # 1. 收集endpoint测试用例 for endpoint_result in endpoint_results: test_cases = endpoint_result.get('executed_test_cases', []) for tc in test_cases: tc_severity = tc.get('test_case_severity', 'MEDIUM') tc_severity_value = severity_levels.get(tc_severity, 3) all_test_cases.append({ 'type': 'Endpoint', 'endpoint': endpoint_result.get('endpoint_name', 'N/A'), 'case_name': tc.get('test_case_name', 'N/A'), 'status': tc.get('status', 'N/A'), 'severity': tc_severity, 'severity_value': tc_severity_value, 'is_required': tc_severity_value >= strictness_value }) # 2. 收集stage测试用例 stage_results = summary_data.get('stage_results', []) for stage_result in stage_results: stage_name = stage_result.get('stage_name', 'N/A') stage_status = stage_result.get('overall_status', 'N/A') stage_severity = 'HIGH' # Stage用例通常是高优先级 stage_severity_value = severity_levels.get(stage_severity, 4) # 将stage作为一个测试用例添加 all_test_cases.append({ 'type': 'Stage', 'endpoint': f"Stage: {stage_name}", 'case_name': stage_result.get('description', stage_name), 'status': stage_status, 'severity': stage_severity, 'severity_value': stage_severity_value, 'is_required': stage_severity_value >= strictness_value }) # 分离必须和非必须的测试用例 required_cases = [case for case in all_test_cases if case['is_required']] optional_cases = [case for case in all_test_cases if not case['is_required']] # 创建分离的测试用例表格 if all_test_cases: # 添加严格等级说明 strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。" elements.append(to_para(strictness_text, small_style)) elements.append(Spacer(1, 10)) # 1. 必须的测试用例表格 if required_cases: elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False)) required_table_data = [ [to_para("序号", escape=False), to_para("类型", escape=False), to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False), to_para("优先级", escape=False), to_para("执行结果", escape=False)] ] for i, case in enumerate(required_cases, 1): status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status'] required_table_data.append([ to_para(str(i), small_style), to_para(case['type'], small_style), to_para(case['case_name'], small_style), to_para(case['endpoint'], small_style), to_para(case['severity'], small_style), to_para(status_display, small_style) ]) required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45]) required_table.setStyle(TableStyle([ ('GRID', (0,0), (-1,-1), 1, colors.grey), ('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例 ('ALIGN', (0,0), (-1,-1), 'CENTER'), ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), ('FONTSIZE', (0,0), (-1,-1), 8) ])) elements.append(required_table) elements.append(Spacer(1, 15)) # 2. 非必须的测试用例表格 if optional_cases: elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False)) optional_table_data = [ [to_para("序号", escape=False), to_para("类型", escape=False), to_para("测试用例名称", escape=False), to_para("所属端点/阶段", escape=False), to_para("优先级", escape=False), to_para("执行结果", escape=False)] ] for i, case in enumerate(optional_cases, 1): status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status'] optional_table_data.append([ to_para(str(i), small_style), to_para(case['type'], small_style), to_para(case['case_name'], small_style), to_para(case['endpoint'], small_style), to_para(case['severity'], small_style), to_para(status_display, small_style) ]) optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45]) optional_table.setStyle(TableStyle([ ('GRID', (0,0), (-1,-1), 1, colors.grey), ('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例 ('ALIGN', (0,0), (-1,-1), 'CENTER'), ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), ('FONTSIZE', (0,0), (-1,-1), 8) ])) elements.append(optional_table) elements.append(Spacer(1, 10)) # 添加用例统计信息 total_cases = len(all_test_cases) endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint']) stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage']) required_count = len(required_cases) optional_count = len(optional_cases) stats_text = f"""测试用例统计: 总计 {total_cases} 个用例,其中端点用例 {endpoint_cases} 个,阶段用例 {stage_cases} 个。 必须用例 {required_count} 个,非必须用例 {optional_count} 个。 严格等级:{strictness_level}({severity_levels.get(strictness_level, 5)}级及以上为必须)。""" elements.append(to_para(stats_text, small_style)) else: elements.append(to_para("无测试用例执行记录。", normal_style)) elements.append(Spacer(1, 20)) elements.append(to_para("详细测试结果", heading_style, escape=False)) detailed_results = summary_data.get('endpoint_results', []) if not detailed_results: elements.append(to_para("无详细测试结果。")) else: status_map = {"PASSED": ("通过", colors.green), "FAILED": ("失败", colors.red), "ERROR": ("错误", colors.orange), "SKIPPED": ("跳过", colors.grey)} for endpoint_result in detailed_results: endpoint_name = endpoint_result.get('endpoint_name', 'N/A') endpoint_style = ParagraphStyle('endpoint_heading', parent=heading_style, fontSize=12, spaceBefore=12, spaceAfter=6) elements.append(to_para(f"端点: {endpoint_name}", style=endpoint_style)) test_cases = endpoint_result.get('executed_test_cases', []) if not test_cases: elements.append(to_para("该端点没有执行测试用例。", style=normal_style)) continue for tc_result in test_cases: elements.append(to_para(f"用例: {tc_result.get('test_case_name', 'N/A')}")) status_en = tc_result.get('status', 'N/A') status_cn, status_color = status_map.get(status_en, (status_en, colors.black)) status_text = f"状态: {status_cn}" elements.append(to_para(status_text, escape=False)) elements.append(to_para("消息:")) message_text = tc_result.get('message', '') message_style = ParagraphStyle('message_style', parent=normal_style, leftIndent=15) elements.append(to_para(message_text, style=message_style, escape=True)) elements.append(Spacer(1, 6)) elements.append(HRFlowable(width="100%", thickness=0.5, color=colors.grey)) elements.append(Spacer(1, 6)) # Add Stage Results Section elements.append(HRFlowable(width="100%", thickness=1, color=colors.black)) elements.append(Spacer(1, 12)) elements.append(to_para("流程测试结果 (Stages)", heading_style, escape=False)) stage_results = summary_data.get('stage_results', []) if not stage_results: elements.append(to_para("无流程测试结果。")) else: for stage_result in stage_results: stage_name = stage_result.get('stage_name', 'N/A') stage_style = ParagraphStyle('stage_heading', parent=heading_style, fontSize=14, spaceBefore=12, spaceAfter=6) elements.append(to_para(f"流程: {stage_name}", style=stage_style)) stage_status_en = stage_result.get('status', 'N/A') stage_status_cn, stage_status_color = status_map.get(stage_status_en, (stage_status_en, colors.black)) stage_status_text = f"整体状态: {stage_status_cn}" elements.append(to_para(stage_status_text, escape=False)) stage_message = stage_result.get('message', '') if stage_message: elements.append(to_para(f"消息: {html.escape(stage_message)}", escape=False)) elements.append(Spacer(1, 10)) elements.append(to_para("执行步骤详情:", escape=False)) # The key is 'executed_steps', not 'executed_test_steps' executed_steps = stage_result.get('executed_steps', []) if not executed_steps: elements.append(to_para("该流程没有执行任何步骤。", style=normal_style)) else: step_style = ParagraphStyle('step_style', parent=normal_style, leftIndent=15) for step_result in executed_steps: elements.append(Spacer(1, 4)) step_name_text = f"步骤: {html.escape(step_result.get('step_name', 'N/A'))}" elements.append(to_para(step_name_text, style=step_style, escape=False)) step_status_en = step_result.get('status', 'N/A') step_status_cn, step_status_color = status_map.get(step_status_en, (step_status_en, colors.black)) step_status_text = f"  状态: {step_status_cn}" elements.append(to_para(step_status_text, style=step_style, escape=False)) step_message = step_result.get('message', '') if step_message: step_message_text = f"  消息: {html.escape(step_message)}" elements.append(to_para(step_message_text, style=step_style, escape=False)) step_duration = step_result.get('duration_seconds', 0) step_duration_text = f"  耗时: {float(step_duration):.4f}s" elements.append(to_para(step_duration_text, style=step_style, escape=False)) elements.append(Spacer(1, 6)) elements.append(HRFlowable(width="100%", thickness=0.5, color=colors.grey)) elements.append(Spacer(1, 6)) doc.build(elements) logger.info(f"PDF report successfully generated: {output_path}") except Exception as e: logger.error(f"Error building PDF document: {e}", exc_info=True) # --- Core Test Execution Logic --- def run_tests_logic(config: dict): """ Main logic for running tests, adapted from the main() function in run_api_tests.py. """ try: if config.get('verbose'): logging.getLogger('ddms_compliance_suite').setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG) logger.debug("Verbose logging enabled.") if not any(k in config for k in ['yapi', 'swagger', 'dms']): raise ValueError("An API definition source is required: --yapi, --swagger, or --dms") if sum(k in config for k in ['yapi', 'swagger', 'dms']) > 1: raise ValueError("API definition sources are mutually exclusive.") # Setup output directory with timestamp base_output_dir = Path(config.get('output', './test_reports')) timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_directory = base_output_dir / timestamp output_directory.mkdir(parents=True, exist_ok=True) logger.info(f"Test reports will be saved to: {output_directory.resolve()}") # Initialize the orchestrator orchestrator = APITestOrchestrator( base_url=config['base-url'], custom_test_cases_dir=config.get('custom-test-cases-dir'), llm_api_key=config.get('llm-api-key'), llm_base_url=config.get('llm-base-url'), llm_model_name=config.get('llm-model-name'), use_llm_for_request_body=config.get('use-llm-for-request-body', False), use_llm_for_path_params=config.get('use-llm-for-path-params', False), use_llm_for_query_params=config.get('use-llm-for-query-params', False), use_llm_for_headers=config.get('use-llm-for-headers', False), output_dir=str(output_directory), stages_dir=config.get('stages-dir'), strictness_level=config.get('strictness-level', 'CRITICAL'), ignore_ssl=config.get('ignore-ssl', False) ) test_summary: Optional[TestSummary] = None parsed_spec: Optional[ParsedAPISpec] = None pagination_info: Dict[str, Any] = {} if 'yapi' in config: logger.info(f"Running tests from YAPI file: {config['yapi']}") test_summary, parsed_spec = orchestrator.run_tests_from_yapi( yapi_file_path=config['yapi'], categories=config.get('categories'), custom_test_cases_dir=config.get('custom-test-cases-dir') ) elif 'swagger' in config: logger.info(f"Running tests from Swagger file: {config['swagger']}") test_summary, parsed_spec = orchestrator.run_tests_from_swagger( swagger_file_path=config['swagger'], tags=config.get('tags'), custom_test_cases_dir=config.get('custom-test-cases-dir') ) elif 'dms' in config: logger.info(f"Running tests from DMS service discovery: {config['dms']}") test_summary, parsed_spec, pagination_info = orchestrator.run_tests_from_dms( domain_mapping_path=config['dms'], categories=config.get('categories'), custom_test_cases_dir=config.get('custom-test-cases-dir'), page_size=config.get('page-size', 1000) ) if not parsed_spec: raise RuntimeError("Failed to parse the API specification.") if test_summary and config.get('stages-dir') and parsed_spec: logger.info(f"Executing API test stages from directory: {config['stages-dir']}") orchestrator.run_stages_from_spec( parsed_spec=parsed_spec, summary=test_summary ) test_summary.finalize_summary() test_summary.print_summary_to_console() # Save reports if test_summary: main_report_file_path = output_directory / f"summary.{config.get('format', 'json')}" save_results(test_summary, str(main_report_file_path), config.get('format', 'json')) if config.get('generate-pdf', True): pdf_report_path = output_directory / "report_cn.pdf" save_pdf_report(test_summary.to_dict(), pdf_report_path, config.get('strictness-level', 'CRITICAL')) api_calls_filename = "api_call_details.md" save_api_call_details_to_file( orchestrator.get_api_call_details(), str(output_directory), filename=api_calls_filename ) failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0) error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0) result = { "status": "completed", "message": "Tests finished." if failed_count == 0 and error_count == 0 else "Tests finished with failures or errors.", "report_directory": str(output_directory.resolve()), "summary": test_summary.to_dict() } # 如果有分页信息,添加到返回结果中 if pagination_info: result["pagination"] = pagination_info return result else: raise RuntimeError("Test execution failed to produce a summary.") except Exception as e: logger.error(f"An unexpected error occurred during test execution: {e}", exc_info=True) return { "status": "error", "message": str(e), "traceback": traceback.format_exc() } # --- Flask API Endpoint --- @app.route('/', methods=['GET']) def health_check(): """Health check endpoint for Docker health checks.""" return {"status": "healthy", "service": "DMS Compliance API Server"}, 200 @app.route('/run', methods=['POST']) def run_api_tests_endpoint(): """ Runs API tests by directly invoking the test orchestrator logic. The request body should be a JSON object with keys corresponding to the script's command-line arguments. """ # Default configuration based on user's request defaults = { 'base-url': 'http://127.0.0.1:5001/', 'dms': './assets/doc/dms/domain.json', 'stages-dir': './custom_stages', 'custom-test-cases-dir': './custom_testcases', 'verbose': True, 'output': './test_reports/', 'format': 'json', 'generate-pdf': True, 'strictness-level': 'CRITICAL', 'ignore-ssl': True, # 默认忽略SSL证书验证 # Default LLM options # 'llm-api-key': os.environ.get("OPENAI_API_KEY"), 'llm-api-key': "sk-lbGrsUPL1iby86h554FaE536C343435dAa9bA65967A840B2", # 'llm-base-url': "https://dashscope.aliyuncs.com/compatible-mode/v1", 'llm-base-url': "https://aiproxy.petrotech.cnpc/v1", # 'llm-model-name': "qwen-plus", 'llm-model-name': "deepseek-v3", 'use-llm-for-request-body': False, 'use-llm-for-path-params': False, 'use-llm-for-query-params': False, 'use-llm-for-headers': False } try: # Use silent=True to prevent an exception if Content-Type is not application/json # This allows the endpoint to gracefully fall back to defaults. request_config = request.get_json(silent=True) if not request_config: # If no JSON body is provided or it's empty, run with defaults config = defaults else: # Merge request config with defaults config = {**defaults, **request_config} logger.info(f"Starting test run with configuration: {json.dumps(config, indent=2)}") result = run_tests_logic(config) if result['status'] == 'error': return jsonify(result), 500 return jsonify(result), 200 except Exception as e: logger.error(f"An error occurred in the API endpoint: {e}", exc_info=True) return jsonify({ "status": "error", "message": str(e), "traceback": traceback.format_exc() }), 500 if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5050) # Example cURL to trigger the endpoint with custom params: # curl -X POST http://127.0.0.1:5002/run \ # -H "Content-Type: application/json" \ # -d '{ # "base-url": "http://127.0.0.1:5001/", # "dms": "./assets/doc/dms/domain.json", # "custom-test-cases-dir": "./custom_testcases", # "stages-dir": "./custom_stages", # "output": "./test_reports/" # }'