compliance/api_server.py
gongwenxin fcdfe71646 report
2025-07-19 08:44:40 +08:00

571 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import logging
import datetime
import traceback
from pathlib import Path
from typing import List, Optional
import unicodedata
import html
# PDF generation libraries - with fallback
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
reportlab_available = True
except ImportError:
reportlab_available = False
from flask import Flask, request, jsonify
# --- Project-specific imports ---
from ddms_compliance_suite.api_caller.caller import APICallDetail
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary
from ddms_compliance_suite.input_parser.parser import ParsedAPISpec
from ddms_compliance_suite.utils.response_utils import extract_data_for_validation
from ddms_compliance_suite.utils.data_generator import DataGenerator
app = Flask(__name__)
# app.config['JSON_AS_ASCII'] = False # 直接返回可读的中文而不是Unicode转义字符
# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Helper functions (migrated from run_api_tests.py) ---
def save_results(summary: TestSummary, output_file_path: str, format_type: str):
"""Saves the main test summary results."""
output_path = Path(output_file_path)
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"Error creating directory for output file {output_path.parent}: {e}")
return
if format_type == 'json':
with open(output_path, 'w', encoding='utf-8') as f:
f.write(summary.to_json(pretty=True))
logger.info(f"Test results saved to JSON: {output_path}")
elif format_type == 'html':
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>API Test Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.summary {{ background-color: #f5f5f5; padding: 15px; border-radius: 5px; }}
.pass {{ color: green; }}
.fail {{ color: red; }}
.error {{ color: orange; }}
.skip {{ color: gray; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
tr:nth-child(even) {{ background-color: #f9f9f9; }}
</style>
</head>
<body>
<h1>API Test Report</h1>
<div class="summary">
<h2>Test Summary</h2>
<p>Total Test Cases Executed: {summary.total_test_cases_executed}</p>
<p class="pass">Passed: {summary.test_cases_passed}</p>
<p class="fail">Failed: {summary.test_cases_failed}</p>
<p class="error">Error: {summary.test_cases_error}</p>
<p class="skip">Skipped: {summary.test_cases_skipped_in_endpoint}</p>
<p>Success Rate: {summary.test_case_success_rate:.2f}%</p>
<p>Duration: {summary.duration:.2f}s</p>
<p>Start Time: {summary.start_time.isoformat()}</p>
<p>End Time: {summary.end_time.isoformat() if summary.end_time else 'N/A'}</p>
</div>
<h2>Detailed Results</h2>
<table>
<tr>
<th>Endpoint</th>
<th>Test Case ID</th>
<th>Test Case Name</th>
<th>Status</th>
<th>Message</th>
<th>Duration (s)</th>
</tr>
"""
for endpoint_result in summary.detailed_results:
for tc_result in endpoint_result.executed_test_cases:
status_class = "pass" if tc_result.status == ExecutedTestCaseResult.Status.PASSED else \
"fail" if tc_result.status == ExecutedTestCaseResult.Status.FAILED else \
"error" if tc_result.status == ExecutedTestCaseResult.Status.ERROR else "skip"
html_content += f"""
<tr>
<td>{endpoint_result.endpoint_name} ({endpoint_result.endpoint_id})</td>
<td>{tc_result.test_case_id}</td>
<td>{tc_result.test_case_name}</td>
<td class="{status_class}">{tc_result.status.value}</td>
<td>{html.escape(str(tc_result.message))}</td>
<td>{tc_result.duration:.4f}</td>
</tr>
"""
html_content += """
</table>
</body>
</html>
"""
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
logger.info(f"Test results saved to HTML: {output_path}")
def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
"""Saves API call details to a Markdown file."""
if not api_call_details:
logger.info("No API call details to save.")
return
output_dir = Path(output_dir_path)
try:
output_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"Failed to create output directory for API call details {output_dir}: {e}")
return
md_output_file = output_dir / filename
markdown_content = []
for detail in api_call_details:
url_to_display = detail.request_url
if detail.request_params:
try:
import urllib.parse
query_string = urllib.parse.urlencode(detail.request_params)
url_to_display = f"{detail.request_url}?{query_string}"
except Exception as e:
logger.warning(f"Error formatting URL with params for display: {e}")
markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
markdown_content.append("**cURL Command:**")
markdown_content.append("```sh")
markdown_content.append(detail.curl_command)
markdown_content.append("```")
markdown_content.append("### Request Details")
markdown_content.append(f"- **Method:** `{detail.request_method}`")
markdown_content.append(f"- **Full URL:** `{url_to_display}`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_params:
markdown_content.append("- **Query Parameters:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_body is not None:
markdown_content.append("- **Body:**")
body_lang = "json" if isinstance(detail.request_body, (dict, list)) else "text"
formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False) if body_lang == "json" else str(detail.request_body)
markdown_content.append(f"```{body_lang}")
markdown_content.append(formatted_body)
markdown_content.append("```")
markdown_content.append("### Response Details")
markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.response_body is not None:
markdown_content.append("- **Body:**")
resp_body_lang = "json" if isinstance(detail.response_body, (dict, list)) else "text"
formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False) if resp_body_lang == "json" else str(detail.response_body)
markdown_content.append(f"```{resp_body_lang}")
markdown_content.append(formatted_resp_body)
markdown_content.append("```")
markdown_content.append("\n---\n")
with open(md_output_file, 'w', encoding='utf-8') as f_md:
f_md.write("\n".join(markdown_content))
logger.info(f"API call details saved to Markdown: {md_output_file}")
def save_pdf_report(summary_data, output_path: Path):
"""Saves the test summary as a formatted PDF file."""
if not reportlab_available:
logger.warning("ReportLab library not found. PDF report will not be generated. Please run 'pip install reportlab'.")
return
logger.info(f"Attempting to generate PDF report at: {output_path}")
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
font_name = 'SimSun'
font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
if not Path(font_path).exists():
if hasattr(sys, '_MEIPASS'):
font_path = Path(sys._MEIPASS) / 'assets' / 'fonts' / 'STHeiti-Medium-4.ttc'
if not Path(font_path).exists():
logger.error(f"Font file not found at {font_path}")
return
pdfmetrics.registerFont(TTFont(font_name, str(font_path), subfontIndex=0))
pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
elements = []
styles = getSampleStyleSheet()
title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
# Add a new style for code blocks
code_style = ParagraphStyle('Code', parent=styles['Normal'], fontName='Courier', fontSize=9, leading=12, leftIndent=15, backColor=colors.whitesmoke, borderWidth=1, borderColor=colors.lightgrey, padding=5, borderRadius=2)
def to_para(text, style=normal_style, escape=True):
if text is None: content = ""
else: content = str(text)
if escape: content = html.escape(content)
content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
if not content.strip(): return Paragraph('&nbsp;', style)
content = content.replace('\n', '<br/>')
return Paragraph(content, style)
# PDF Content Generation
elements.append(to_para("API 测试报告", title_style, escape=False))
elements.append(Spacer(1, 20))
elements.append(to_para("测试摘要", heading_style, escape=False))
overall = summary_data.get('overall_summary', {})
start_time_str = summary_data.get('start_time', 'N/A')
end_time_str = summary_data.get('end_time', 'N/A')
duration = summary_data.get('duration_seconds', 0.0)
start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
summary_table_data = [
[to_para("<b>开始时间</b>", escape=False), to_para(start_time_formatted)],
[to_para("<b>结束时间</b>", escape=False), to_para(end_time_formatted)],
[to_para("<b>总耗时</b>", escape=False), to_para(f"{float(duration):.2f}")],
[to_para("<b>测试的端点数</b>", escape=False), to_para(overall.get('endpoints_tested', 'N/A'))],
[to_para("<b>执行的用例总数</b>", escape=False), to_para(overall.get('total_test_cases_executed', 'N/A'))],
[to_para("<b>执行的流程数</b>", escape=False), to_para(overall.get('stages_tested', 'N/A'))],
]
summary_table = Table(summary_table_data, colWidths=[120, '*'])
summary_table.setStyle(TableStyle([('GRID', (0,0), (-1,-1), 1, colors.grey), ('VALIGN', (0,0), (-1,-1), 'MIDDLE')]))
elements.append(summary_table)
elements.append(Spacer(1, 20))
elements.append(to_para("结果统计", heading_style, escape=False))
results_table_data = [
[to_para("<b>指标</b>", escape=False), to_para("<b>通过 ✅</b>", escape=False), to_para("<b>失败 ❌</b>", escape=False), to_para("<b>错误 ⚠️</b>", escape=False), to_para("<b>成功率</b>", escape=False)],
[to_para("端点"), to_para(overall.get('endpoints_passed', 'N/A')), to_para(overall.get('endpoints_failed', 'N/A')), to_para(overall.get('endpoints_error', 'N/A')), to_para(f"<b>{overall.get('endpoint_success_rate', 'N/A')}</b>", escape=False)],
[to_para("测试用例"), to_para(overall.get('test_cases_passed', 'N/A')), to_para(overall.get('test_cases_failed', 'N/A')), to_para(overall.get('test_cases_error', 'N/A')), to_para(f"<b>{overall.get('test_case_success_rate', 'N/A')}</b>", escape=False)],
[to_para("流程 (Stage)"), to_para(overall.get('stages_passed', 'N/A')), to_para(overall.get('stages_failed', 'N/A')), to_para(overall.get('stages_error', 'N/A')), to_para(f"<b>{overall.get('stage_success_rate', 'N/A')}</b>", escape=False)],
]
results_table = Table(results_table_data, colWidths=['*', 60, 60, 60, 80])
results_table.setStyle(TableStyle([('GRID', (0,0), (-1,-1), 1, colors.grey), ('BACKGROUND', (0,0), (-1,0), colors.lightgrey), ('ALIGN', (0,0), (-1,-1), 'CENTER'), ('VALIGN', (0,0), (-1,-1), 'MIDDLE')]))
elements.append(results_table)
elements.append(Spacer(1, 20))
elements.append(to_para("详细测试结果", heading_style, escape=False))
detailed_results = summary_data.get('endpoint_results', [])
if not detailed_results:
elements.append(to_para("无详细测试结果。"))
else:
status_map = {"PASSED": ("通过", colors.green), "FAILED": ("失败", colors.red), "ERROR": ("错误", colors.orange), "SKIPPED": ("跳过", colors.grey)}
for endpoint_result in detailed_results:
endpoint_name = endpoint_result.get('endpoint_name', 'N/A')
endpoint_style = ParagraphStyle('endpoint_heading', parent=heading_style, fontSize=12, spaceBefore=12, spaceAfter=6)
elements.append(to_para(f"端点: {endpoint_name}", style=endpoint_style))
test_cases = endpoint_result.get('executed_test_cases', [])
if not test_cases:
elements.append(to_para("该端点没有执行测试用例。", style=normal_style))
continue
for tc_result in test_cases:
elements.append(to_para(f"用例: {tc_result.get('test_case_name', 'N/A')}"))
status_en = tc_result.get('status', 'N/A')
status_cn, status_color = status_map.get(status_en, (status_en, colors.black))
status_text = f"状态: <font color='{status_color.hexval()}'>{status_cn}</font>"
elements.append(to_para(status_text, escape=False))
elements.append(to_para("消息:"))
message_text = tc_result.get('message', '')
message_style = ParagraphStyle('message_style', parent=normal_style, leftIndent=15)
elements.append(to_para(message_text, style=message_style, escape=True))
elements.append(Spacer(1, 6))
elements.append(HRFlowable(width="100%", thickness=0.5, color=colors.grey))
elements.append(Spacer(1, 6))
# Add Stage Results Section
elements.append(HRFlowable(width="100%", thickness=1, color=colors.black))
elements.append(Spacer(1, 12))
elements.append(to_para("流程测试结果 (Stages)", heading_style, escape=False))
stage_results = summary_data.get('stage_results', [])
if not stage_results:
elements.append(to_para("无流程测试结果。"))
else:
for stage_result in stage_results:
stage_name = stage_result.get('stage_name', 'N/A')
stage_style = ParagraphStyle('stage_heading', parent=heading_style, fontSize=14, spaceBefore=12, spaceAfter=6)
elements.append(to_para(f"流程: {stage_name}", style=stage_style))
stage_status_en = stage_result.get('status', 'N/A')
stage_status_cn, stage_status_color = status_map.get(stage_status_en, (stage_status_en, colors.black))
stage_status_text = f"<b>整体状态</b>: <font color='{stage_status_color.hexval()}'>{stage_status_cn}</font>"
elements.append(to_para(stage_status_text, escape=False))
stage_message = stage_result.get('message', '')
if stage_message:
elements.append(to_para(f"<b>消息</b>: {html.escape(stage_message)}", escape=False))
elements.append(Spacer(1, 10))
elements.append(to_para("<b>执行步骤详情:</b>", escape=False))
# The key is 'executed_steps', not 'executed_test_steps'
executed_steps = stage_result.get('executed_steps', [])
if not executed_steps:
elements.append(to_para("该流程没有执行任何步骤。", style=normal_style))
else:
step_style = ParagraphStyle('step_style', parent=normal_style, leftIndent=15)
for step_result in executed_steps:
elements.append(Spacer(1, 4))
step_name_text = f"<b>步骤</b>: {html.escape(step_result.get('step_name', 'N/A'))}"
elements.append(to_para(step_name_text, style=step_style, escape=False))
step_status_en = step_result.get('status', 'N/A')
step_status_cn, step_status_color = status_map.get(step_status_en, (step_status_en, colors.black))
step_status_text = f"&nbsp;&nbsp;<b>状态</b>: <font color='{step_status_color.hexval()}'>{step_status_cn}</font>"
elements.append(to_para(step_status_text, style=step_style, escape=False))
step_message = step_result.get('message', '')
if step_message:
step_message_text = f"&nbsp;&nbsp;<b>消息</b>: {html.escape(step_message)}"
elements.append(to_para(step_message_text, style=step_style, escape=False))
step_duration = step_result.get('duration_seconds', 0)
step_duration_text = f"&nbsp;&nbsp;<b>耗时</b>: {float(step_duration):.4f}s"
elements.append(to_para(step_duration_text, style=step_style, escape=False))
elements.append(Spacer(1, 6))
elements.append(HRFlowable(width="100%", thickness=0.5, color=colors.grey))
elements.append(Spacer(1, 6))
doc.build(elements)
logger.info(f"PDF report successfully generated: {output_path}")
except Exception as e:
logger.error(f"Error building PDF document: {e}", exc_info=True)
# --- Core Test Execution Logic ---
def run_tests_logic(config: dict):
"""
Main logic for running tests, adapted from the main() function in run_api_tests.py.
"""
try:
if config.get('verbose'):
logging.getLogger('ddms_compliance_suite').setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.debug("Verbose logging enabled.")
if not any(k in config for k in ['yapi', 'swagger', 'dms']):
raise ValueError("An API definition source is required: --yapi, --swagger, or --dms")
if sum(k in config for k in ['yapi', 'swagger', 'dms']) > 1:
raise ValueError("API definition sources are mutually exclusive.")
# Setup output directory with timestamp
base_output_dir = Path(config.get('output', './test_reports'))
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_directory = base_output_dir / timestamp
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Test reports will be saved to: {output_directory.resolve()}")
# Initialize the orchestrator
orchestrator = APITestOrchestrator(
base_url=config['base-url'],
custom_test_cases_dir=config.get('custom-test-cases-dir'),
llm_api_key=config.get('llm-api-key'),
llm_base_url=config.get('llm-base-url'),
llm_model_name=config.get('llm-model-name'),
use_llm_for_request_body=config.get('use-llm-for-request-body', False),
use_llm_for_path_params=config.get('use-llm-for-path-params', False),
use_llm_for_query_params=config.get('use-llm-for-query-params', False),
use_llm_for_headers=config.get('use-llm-for-headers', False),
output_dir=str(output_directory),
stages_dir=config.get('stages-dir'),
strictness_level=config.get('strictness-level', 'CRITICAL')
)
test_summary: Optional[TestSummary] = None
parsed_spec: Optional[ParsedAPISpec] = None
if 'yapi' in config:
logger.info(f"Running tests from YAPI file: {config['yapi']}")
test_summary, parsed_spec = orchestrator.run_tests_from_yapi(
yapi_file_path=config['yapi'],
categories=config.get('categories'),
custom_test_cases_dir=config.get('custom-test-cases-dir')
)
elif 'swagger' in config:
logger.info(f"Running tests from Swagger file: {config['swagger']}")
test_summary, parsed_spec = orchestrator.run_tests_from_swagger(
swagger_file_path=config['swagger'],
tags=config.get('tags'),
custom_test_cases_dir=config.get('custom-test-cases-dir')
)
elif 'dms' in config:
logger.info(f"Running tests from DMS service discovery: {config['dms']}")
test_summary, parsed_spec = orchestrator.run_tests_from_dms(
domain_mapping_path=config['dms'],
categories=config.get('categories'),
custom_test_cases_dir=config.get('custom-test-cases-dir')
)
if not parsed_spec:
raise RuntimeError("Failed to parse the API specification.")
if test_summary and config.get('stages-dir') and parsed_spec:
logger.info(f"Executing API test stages from directory: {config['stages-dir']}")
orchestrator.run_stages_from_spec(
parsed_spec=parsed_spec,
summary=test_summary
)
test_summary.finalize_summary()
test_summary.print_summary_to_console()
# Save reports
if test_summary:
main_report_file_path = output_directory / f"summary.{config.get('format', 'json')}"
save_results(test_summary, str(main_report_file_path), config.get('format', 'json'))
if config.get('generate-pdf', True):
pdf_report_path = output_directory / "report_cn.pdf"
save_pdf_report(test_summary.to_dict(), pdf_report_path)
api_calls_filename = "api_call_details.md"
save_api_call_details_to_file(
orchestrator.get_api_call_details(),
str(output_directory),
filename=api_calls_filename
)
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
return {
"status": "completed",
"message": "Tests finished." if failed_count == 0 and error_count == 0 else "Tests finished with failures or errors.",
"report_directory": str(output_directory.resolve()),
"summary": test_summary.to_dict()
}
else:
raise RuntimeError("Test execution failed to produce a summary.")
except Exception as e:
logger.error(f"An unexpected error occurred during test execution: {e}", exc_info=True)
return {
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
}
# --- Flask API Endpoint ---
@app.route('/run', methods=['POST'])
def run_api_tests_endpoint():
"""
Runs API tests by directly invoking the test orchestrator logic.
The request body should be a JSON object with keys corresponding to the script's command-line arguments.
"""
# Default configuration based on user's request
defaults = {
'base-url': 'http://127.0.0.1:5001/',
'dms': './assets/doc/dms/domain.json',
'stages-dir': './custom_stages',
'custom-test-cases-dir': './custom_testcases',
'verbose': True,
'output': './test_reports/',
'format': 'json',
'generate-pdf': True,
'strictness-level': 'CRITICAL',
# Default LLM options
'llm-api-key': os.environ.get("OPENAI_API_KEY"),
'llm-base-url': "https://dashscope.aliyuncs.com/compatible-mode/v1",
'llm-model-name': "qwen-plus",
'use-llm-for-request-body': False,
'use-llm-for-path-params': False,
'use-llm-for-query-params': False,
'use-llm-for-headers': False
}
try:
# Use silent=True to prevent an exception if Content-Type is not application/json
# This allows the endpoint to gracefully fall back to defaults.
request_config = request.get_json(silent=True)
if not request_config:
# If no JSON body is provided or it's empty, run with defaults
config = defaults
else:
# Merge request config with defaults
config = {**defaults, **request_config}
logger.info(f"Starting test run with configuration: {json.dumps(config, indent=2)}")
result = run_tests_logic(config)
if result['status'] == 'error':
return jsonify(result), 500
return jsonify(result), 200
except Exception as e:
logger.error(f"An error occurred in the API endpoint: {e}", exc_info=True)
return jsonify({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
}), 500
if __name__ == '__main__':
app.run(debug=True, port=5002)
# Example cURL to trigger the endpoint with custom params:
# curl -X POST http://127.0.0.1:5002/run \
# -H "Content-Type: application/json" \
# -d '{
# "base-url": "http://127.0.0.1:5001/",
# "dms": "./assets/doc/dms/domain.json",
# "custom-test-cases-dir": "./custom_testcases",
# "stages-dir": "./custom_stages",
# "output": "./test_reports/"
# }'