import os import sys import json import logging import datetime import traceback from pathlib import Path from typing import List, Optional import unicodedata import html # PDF generation libraries - with fallback try: from reportlab.lib import colors from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont reportlab_available = True except ImportError: reportlab_available = False from flask import Flask, request, jsonify # --- Project-specific imports --- from ddms_compliance_suite.api_caller.caller import APICallDetail from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary from ddms_compliance_suite.input_parser.parser import ParsedAPISpec from ddms_compliance_suite.utils.response_utils import extract_data_for_validation from ddms_compliance_suite.utils.data_generator import DataGenerator app = Flask(__name__) # app.config['JSON_AS_ASCII'] = False # 直接返回可读的中文,而不是Unicode转义字符 # --- Logging Configuration --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- Helper functions (migrated from run_api_tests.py) --- def save_results(summary: TestSummary, output_file_path: str, format_type: str): """Saves the main test summary results.""" output_path = Path(output_file_path) try: output_path.parent.mkdir(parents=True, exist_ok=True) except OSError as e: logger.error(f"Error creating directory for output file {output_path.parent}: {e}") return if format_type == 'json': with open(output_path, 'w', encoding='utf-8') as f: f.write(summary.to_json(pretty=True)) logger.info(f"Test results saved to JSON: {output_path}") elif format_type == 'html': html_content = f""" API Test Report

API Test Report

Test Summary

Total Test Cases Executed: {summary.total_test_cases_executed}

Passed: {summary.test_cases_passed}

Failed: {summary.test_cases_failed}

Error: {summary.test_cases_error}

Skipped: {summary.test_cases_skipped_in_endpoint}

Success Rate: {summary.test_case_success_rate:.2f}%

Duration: {summary.duration:.2f}s

Start Time: {summary.start_time.isoformat()}

End Time: {summary.end_time.isoformat() if summary.end_time else 'N/A'}

Detailed Results

""" for endpoint_result in summary.detailed_results: for tc_result in endpoint_result.executed_test_cases: status_class = "pass" if tc_result.status == ExecutedTestCaseResult.Status.PASSED else \ "fail" if tc_result.status == ExecutedTestCaseResult.Status.FAILED else \ "error" if tc_result.status == ExecutedTestCaseResult.Status.ERROR else "skip" html_content += f""" """ html_content += """
Endpoint Test Case ID Test Case Name Status Message Duration (s)
{endpoint_result.endpoint_name} ({endpoint_result.endpoint_id}) {tc_result.test_case_id} {tc_result.test_case_name} {tc_result.status.value} {html.escape(str(tc_result.message))} {tc_result.duration:.4f}
""" with open(output_path, 'w', encoding='utf-8') as f: f.write(html_content) logger.info(f"Test results saved to HTML: {output_path}") def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"): """Saves API call details to a Markdown file.""" if not api_call_details: logger.info("No API call details to save.") return output_dir = Path(output_dir_path) try: output_dir.mkdir(parents=True, exist_ok=True) except OSError as e: logger.error(f"Failed to create output directory for API call details {output_dir}: {e}") return md_output_file = output_dir / filename markdown_content = [] for detail in api_call_details: url_to_display = detail.request_url if detail.request_params: try: import urllib.parse query_string = urllib.parse.urlencode(detail.request_params) url_to_display = f"{detail.request_url}?{query_string}" except Exception as e: logger.warning(f"Error formatting URL with params for display: {e}") markdown_content.append(f"## `{detail.request_method} {url_to_display}`") markdown_content.append("**cURL Command:**") markdown_content.append("```sh") markdown_content.append(detail.curl_command) markdown_content.append("```") markdown_content.append("### Request Details") markdown_content.append(f"- **Method:** `{detail.request_method}`") markdown_content.append(f"- **Full URL:** `{url_to_display}`") markdown_content.append("- **Headers:**") markdown_content.append("```json") markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False)) markdown_content.append("```") if detail.request_params: markdown_content.append("- **Query Parameters:**") markdown_content.append("```json") markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False)) markdown_content.append("```") if detail.request_body is not None: markdown_content.append("- **Body:**") body_lang = "json" if isinstance(detail.request_body, (dict, list)) else "text" formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False) if body_lang == "json" else str(detail.request_body) markdown_content.append(f"```{body_lang}") markdown_content.append(formatted_body) markdown_content.append("```") markdown_content.append("### Response Details") markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`") markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`") markdown_content.append("- **Headers:**") markdown_content.append("```json") markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False)) markdown_content.append("```") if detail.response_body is not None: markdown_content.append("- **Body:**") resp_body_lang = "json" if isinstance(detail.response_body, (dict, list)) else "text" formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False) if resp_body_lang == "json" else str(detail.response_body) markdown_content.append(f"```{resp_body_lang}") markdown_content.append(formatted_resp_body) markdown_content.append("```") markdown_content.append("\n---\n") with open(md_output_file, 'w', encoding='utf-8') as f_md: f_md.write("\n".join(markdown_content)) logger.info(f"API call details saved to Markdown: {md_output_file}") def save_pdf_report(summary_data, output_path: Path): """Saves the test summary as a formatted PDF file.""" if not reportlab_available: logger.warning("ReportLab library not found. PDF report will not be generated. Please run 'pip install reportlab'.") return logger.info(f"Attempting to generate PDF report at: {output_path}") output_path.parent.mkdir(parents=True, exist_ok=True) try: font_name = 'SimSun' font_path = 'assets/fonts/STHeiti-Medium-4.ttc' if not Path(font_path).exists(): if hasattr(sys, '_MEIPASS'): font_path = Path(sys._MEIPASS) / 'assets' / 'fonts' / 'STHeiti-Medium-4.ttc' if not Path(font_path).exists(): logger.error(f"Font file not found at {font_path}") return pdfmetrics.registerFont(TTFont(font_name, str(font_path), subfontIndex=0)) pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name) doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告") elements = [] styles = getSampleStyleSheet() title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28) heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8) normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14) # Add a new style for code blocks code_style = ParagraphStyle('Code', parent=styles['Normal'], fontName='Courier', fontSize=9, leading=12, leftIndent=15, backColor=colors.whitesmoke, borderWidth=1, borderColor=colors.lightgrey, padding=5, borderRadius=2) def to_para(text, style=normal_style, escape=True): if text is None: content = "" else: content = str(text) if escape: content = html.escape(content) content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C') if not content.strip(): return Paragraph(' ', style) content = content.replace('\n', '
') return Paragraph(content, style) # PDF Content Generation elements.append(to_para("API 测试报告", title_style, escape=False)) elements.append(Spacer(1, 20)) elements.append(to_para("测试摘要", heading_style, escape=False)) overall = summary_data.get('overall_summary', {}) start_time_str = summary_data.get('start_time', 'N/A') end_time_str = summary_data.get('end_time', 'N/A') duration = summary_data.get('duration_seconds', 0.0) start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A' end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A' summary_table_data = [ [to_para("开始时间", escape=False), to_para(start_time_formatted)], [to_para("结束时间", escape=False), to_para(end_time_formatted)], [to_para("总耗时", escape=False), to_para(f"{float(duration):.2f} 秒")], [to_para("测试的端点数", escape=False), to_para(overall.get('endpoints_tested', 'N/A'))], [to_para("执行的用例总数", escape=False), to_para(overall.get('total_test_cases_executed', 'N/A'))], [to_para("执行的流程数", escape=False), to_para(overall.get('stages_tested', 'N/A'))], ] summary_table = Table(summary_table_data, colWidths=[120, '*']) summary_table.setStyle(TableStyle([('GRID', (0,0), (-1,-1), 1, colors.grey), ('VALIGN', (0,0), (-1,-1), 'MIDDLE')])) elements.append(summary_table) elements.append(Spacer(1, 20)) elements.append(to_para("结果统计", heading_style, escape=False)) results_table_data = [ [to_para("指标", escape=False), to_para("通过 ✅", escape=False), to_para("失败 ❌", escape=False), to_para("错误 ⚠️", escape=False), to_para("成功率", escape=False)], [to_para("端点"), to_para(overall.get('endpoints_passed', 'N/A')), to_para(overall.get('endpoints_failed', 'N/A')), to_para(overall.get('endpoints_error', 'N/A')), to_para(f"{overall.get('endpoint_success_rate', 'N/A')}", escape=False)], [to_para("测试用例"), to_para(overall.get('test_cases_passed', 'N/A')), to_para(overall.get('test_cases_failed', 'N/A')), to_para(overall.get('test_cases_error', 'N/A')), to_para(f"{overall.get('test_case_success_rate', 'N/A')}", escape=False)], [to_para("流程 (Stage)"), to_para(overall.get('stages_passed', 'N/A')), to_para(overall.get('stages_failed', 'N/A')), to_para(overall.get('stages_error', 'N/A')), to_para(f"{overall.get('stage_success_rate', 'N/A')}", escape=False)], ] results_table = Table(results_table_data, colWidths=['*', 60, 60, 60, 80]) results_table.setStyle(TableStyle([('GRID', (0,0), (-1,-1), 1, colors.grey), ('BACKGROUND', (0,0), (-1,0), colors.lightgrey), ('ALIGN', (0,0), (-1,-1), 'CENTER'), ('VALIGN', (0,0), (-1,-1), 'MIDDLE')])) elements.append(results_table) elements.append(Spacer(1, 20)) elements.append(to_para("详细测试结果", heading_style, escape=False)) detailed_results = summary_data.get('endpoint_results', []) if not detailed_results: elements.append(to_para("无详细测试结果。")) else: status_map = {"PASSED": ("通过", colors.green), "FAILED": ("失败", colors.red), "ERROR": ("错误", colors.orange), "SKIPPED": ("跳过", colors.grey)} for endpoint_result in detailed_results: endpoint_name = endpoint_result.get('endpoint_name', 'N/A') endpoint_style = ParagraphStyle('endpoint_heading', parent=heading_style, fontSize=12, spaceBefore=12, spaceAfter=6) elements.append(to_para(f"端点: {endpoint_name}", style=endpoint_style)) test_cases = endpoint_result.get('executed_test_cases', []) if not test_cases: elements.append(to_para("该端点没有执行测试用例。", style=normal_style)) continue for tc_result in test_cases: elements.append(to_para(f"用例: {tc_result.get('test_case_name', 'N/A')}")) status_en = tc_result.get('status', 'N/A') status_cn, status_color = status_map.get(status_en, (status_en, colors.black)) status_text = f"状态: {status_cn}" elements.append(to_para(status_text, escape=False)) elements.append(to_para("消息:")) message_text = tc_result.get('message', '') message_style = ParagraphStyle('message_style', parent=normal_style, leftIndent=15) elements.append(to_para(message_text, style=message_style, escape=True)) elements.append(Spacer(1, 6)) elements.append(HRFlowable(width="100%", thickness=0.5, color=colors.grey)) elements.append(Spacer(1, 6)) # Add Stage Results Section elements.append(HRFlowable(width="100%", thickness=1, color=colors.black)) elements.append(Spacer(1, 12)) elements.append(to_para("流程测试结果 (Stages)", heading_style, escape=False)) stage_results = summary_data.get('stage_results', []) if not stage_results: elements.append(to_para("无流程测试结果。")) else: for stage_result in stage_results: stage_name = stage_result.get('stage_name', 'N/A') stage_style = ParagraphStyle('stage_heading', parent=heading_style, fontSize=14, spaceBefore=12, spaceAfter=6) elements.append(to_para(f"流程: {stage_name}", style=stage_style)) stage_status_en = stage_result.get('status', 'N/A') stage_status_cn, stage_status_color = status_map.get(stage_status_en, (stage_status_en, colors.black)) stage_status_text = f"整体状态: {stage_status_cn}" elements.append(to_para(stage_status_text, escape=False)) stage_message = stage_result.get('message', '') if stage_message: elements.append(to_para(f"消息: {html.escape(stage_message)}", escape=False)) elements.append(Spacer(1, 10)) elements.append(to_para("执行步骤详情:", escape=False)) # The key is 'executed_steps', not 'executed_test_steps' executed_steps = stage_result.get('executed_steps', []) if not executed_steps: elements.append(to_para("该流程没有执行任何步骤。", style=normal_style)) else: step_style = ParagraphStyle('step_style', parent=normal_style, leftIndent=15) for step_result in executed_steps: elements.append(Spacer(1, 4)) step_name_text = f"步骤: {html.escape(step_result.get('step_name', 'N/A'))}" elements.append(to_para(step_name_text, style=step_style, escape=False)) step_status_en = step_result.get('status', 'N/A') step_status_cn, step_status_color = status_map.get(step_status_en, (step_status_en, colors.black)) step_status_text = f"  状态: {step_status_cn}" elements.append(to_para(step_status_text, style=step_style, escape=False)) step_message = step_result.get('message', '') if step_message: step_message_text = f"  消息: {html.escape(step_message)}" elements.append(to_para(step_message_text, style=step_style, escape=False)) step_duration = step_result.get('duration_seconds', 0) step_duration_text = f"  耗时: {float(step_duration):.4f}s" elements.append(to_para(step_duration_text, style=step_style, escape=False)) elements.append(Spacer(1, 6)) elements.append(HRFlowable(width="100%", thickness=0.5, color=colors.grey)) elements.append(Spacer(1, 6)) doc.build(elements) logger.info(f"PDF report successfully generated: {output_path}") except Exception as e: logger.error(f"Error building PDF document: {e}", exc_info=True) # --- Core Test Execution Logic --- def run_tests_logic(config: dict): """ Main logic for running tests, adapted from the main() function in run_api_tests.py. """ try: if config.get('verbose'): logging.getLogger('ddms_compliance_suite').setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG) logger.debug("Verbose logging enabled.") if not any(k in config for k in ['yapi', 'swagger', 'dms']): raise ValueError("An API definition source is required: --yapi, --swagger, or --dms") if sum(k in config for k in ['yapi', 'swagger', 'dms']) > 1: raise ValueError("API definition sources are mutually exclusive.") # Setup output directory with timestamp base_output_dir = Path(config.get('output', './test_reports')) timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_directory = base_output_dir / timestamp output_directory.mkdir(parents=True, exist_ok=True) logger.info(f"Test reports will be saved to: {output_directory.resolve()}") # Initialize the orchestrator orchestrator = APITestOrchestrator( base_url=config['base-url'], custom_test_cases_dir=config.get('custom-test-cases-dir'), llm_api_key=config.get('llm-api-key'), llm_base_url=config.get('llm-base-url'), llm_model_name=config.get('llm-model-name'), use_llm_for_request_body=config.get('use-llm-for-request-body', False), use_llm_for_path_params=config.get('use-llm-for-path-params', False), use_llm_for_query_params=config.get('use-llm-for-query-params', False), use_llm_for_headers=config.get('use-llm-for-headers', False), output_dir=str(output_directory), stages_dir=config.get('stages-dir'), strictness_level=config.get('strictness-level', 'CRITICAL') ) test_summary: Optional[TestSummary] = None parsed_spec: Optional[ParsedAPISpec] = None if 'yapi' in config: logger.info(f"Running tests from YAPI file: {config['yapi']}") test_summary, parsed_spec = orchestrator.run_tests_from_yapi( yapi_file_path=config['yapi'], categories=config.get('categories'), custom_test_cases_dir=config.get('custom-test-cases-dir') ) elif 'swagger' in config: logger.info(f"Running tests from Swagger file: {config['swagger']}") test_summary, parsed_spec = orchestrator.run_tests_from_swagger( swagger_file_path=config['swagger'], tags=config.get('tags'), custom_test_cases_dir=config.get('custom-test-cases-dir') ) elif 'dms' in config: logger.info(f"Running tests from DMS service discovery: {config['dms']}") test_summary, parsed_spec = orchestrator.run_tests_from_dms( domain_mapping_path=config['dms'], categories=config.get('categories'), custom_test_cases_dir=config.get('custom-test-cases-dir') ) if not parsed_spec: raise RuntimeError("Failed to parse the API specification.") if test_summary and config.get('stages-dir') and parsed_spec: logger.info(f"Executing API test stages from directory: {config['stages-dir']}") orchestrator.run_stages_from_spec( parsed_spec=parsed_spec, summary=test_summary ) test_summary.finalize_summary() test_summary.print_summary_to_console() # Save reports if test_summary: main_report_file_path = output_directory / f"summary.{config.get('format', 'json')}" save_results(test_summary, str(main_report_file_path), config.get('format', 'json')) if config.get('generate-pdf', True): pdf_report_path = output_directory / "report_cn.pdf" save_pdf_report(test_summary.to_dict(), pdf_report_path) api_calls_filename = "api_call_details.md" save_api_call_details_to_file( orchestrator.get_api_call_details(), str(output_directory), filename=api_calls_filename ) failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0) error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0) return { "status": "completed", "message": "Tests finished." if failed_count == 0 and error_count == 0 else "Tests finished with failures or errors.", "report_directory": str(output_directory.resolve()), "summary": test_summary.to_dict() } else: raise RuntimeError("Test execution failed to produce a summary.") except Exception as e: logger.error(f"An unexpected error occurred during test execution: {e}", exc_info=True) return { "status": "error", "message": str(e), "traceback": traceback.format_exc() } # --- Flask API Endpoint --- @app.route('/run', methods=['POST']) def run_api_tests_endpoint(): """ Runs API tests by directly invoking the test orchestrator logic. The request body should be a JSON object with keys corresponding to the script's command-line arguments. """ # Default configuration based on user's request defaults = { 'base-url': 'http://127.0.0.1:5001/', 'dms': './assets/doc/dms/domain.json', 'stages-dir': './custom_stages', 'custom-test-cases-dir': './custom_testcases', 'verbose': True, 'output': './test_reports/', 'format': 'json', 'generate-pdf': True, 'strictness-level': 'CRITICAL', # Default LLM options 'llm-api-key': os.environ.get("OPENAI_API_KEY"), 'llm-base-url': "https://dashscope.aliyuncs.com/compatible-mode/v1", 'llm-model-name': "qwen-plus", 'use-llm-for-request-body': False, 'use-llm-for-path-params': False, 'use-llm-for-query-params': False, 'use-llm-for-headers': False } try: # Use silent=True to prevent an exception if Content-Type is not application/json # This allows the endpoint to gracefully fall back to defaults. request_config = request.get_json(silent=True) if not request_config: # If no JSON body is provided or it's empty, run with defaults config = defaults else: # Merge request config with defaults config = {**defaults, **request_config} logger.info(f"Starting test run with configuration: {json.dumps(config, indent=2)}") result = run_tests_logic(config) if result['status'] == 'error': return jsonify(result), 500 return jsonify(result), 200 except Exception as e: logger.error(f"An error occurred in the API endpoint: {e}", exc_info=True) return jsonify({ "status": "error", "message": str(e), "traceback": traceback.format_exc() }), 500 if __name__ == '__main__': app.run(debug=True, port=5002) # Example cURL to trigger the endpoint with custom params: # curl -X POST http://127.0.0.1:5002/run \ # -H "Content-Type: application/json" \ # -d '{ # "base-url": "http://127.0.0.1:5001/", # "dms": "./assets/doc/dms/domain.json", # "custom-test-cases-dir": "./custom_testcases", # "stages-dir": "./custom_stages", # "output": "./test_reports/" # }'