compliance/api_server.py
2025-08-19 17:03:32 +08:00

793 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import logging
import datetime
import traceback
from pathlib import Path
from typing import List, Optional, Dict, Any
import unicodedata
import html
# PDF generation libraries - with fallback
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
reportlab_available = True
except ImportError:
reportlab_available = False
from flask import Flask, request, jsonify
# --- Project-specific imports ---
from ddms_compliance_suite.api_caller.caller import APICallDetail
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary
from ddms_compliance_suite.input_parser.parser import ParsedAPISpec
from ddms_compliance_suite.utils.response_utils import extract_data_for_validation
from ddms_compliance_suite.utils.data_generator import DataGenerator
app = Flask(__name__)
# app.config['JSON_AS_ASCII'] = False # 直接返回可读的中文而不是Unicode转义字符
# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Helper functions (migrated from run_api_tests.py) ---
def save_results(summary: TestSummary, output_file_path: str, format_type: str):
"""Saves the main test summary results."""
output_path = Path(output_file_path)
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"Error creating directory for output file {output_path.parent}: {e}")
return
if format_type == 'json':
with open(output_path, 'w', encoding='utf-8') as f:
f.write(summary.to_json(pretty=True))
logger.info(f"Test results saved to JSON: {output_path}")
elif format_type == 'html':
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>API Test Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.summary {{ background-color: #f5f5f5; padding: 15px; border-radius: 5px; }}
.pass {{ color: green; }}
.fail {{ color: red; }}
.error {{ color: orange; }}
.skip {{ color: gray; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
tr:nth-child(even) {{ background-color: #f9f9f9; }}
</style>
</head>
<body>
<h1>API Test Report</h1>
<div class="summary">
<h2>Test Summary</h2>
<p>Total Test Cases Executed: {summary.total_test_cases_executed}</p>
<p class="pass">Passed: {summary.test_cases_passed}</p>
<p class="fail">Failed: {summary.test_cases_failed}</p>
<p class="error">Error: {summary.test_cases_error}</p>
<p class="skip">Skipped: {summary.test_cases_skipped_in_endpoint}</p>
<p>Success Rate: {summary.test_case_success_rate:.2f}%</p>
<p>Duration: {summary.duration:.2f}s</p>
<p>Start Time: {summary.start_time.isoformat()}</p>
<p>End Time: {summary.end_time.isoformat() if summary.end_time else 'N/A'}</p>
</div>
<h2>Detailed Results</h2>
<table>
<tr>
<th>Endpoint</th>
<th>Test Case ID</th>
<th>Test Case Name</th>
<th>Status</th>
<th>Message</th>
<th>Duration (s)</th>
</tr>
"""
for endpoint_result in summary.detailed_results:
for tc_result in endpoint_result.executed_test_cases:
status_class = "pass" if tc_result.status == ExecutedTestCaseResult.Status.PASSED else \
"fail" if tc_result.status == ExecutedTestCaseResult.Status.FAILED else \
"error" if tc_result.status == ExecutedTestCaseResult.Status.ERROR else "skip"
html_content += f"""
<tr>
<td>{endpoint_result.endpoint_name} ({endpoint_result.endpoint_id})</td>
<td>{tc_result.test_case_id}</td>
<td>{tc_result.test_case_name}</td>
<td class="{status_class}">{tc_result.status.value}</td>
<td>{html.escape(str(tc_result.message))}</td>
<td>{tc_result.duration:.4f}</td>
</tr>
"""
html_content += """
</table>
</body>
</html>
"""
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
logger.info(f"Test results saved to HTML: {output_path}")
def save_api_call_details_to_file(api_call_details: List[APICallDetail], output_dir_path: str, filename: str = "api_call_details.md"):
"""Saves API call details to a Markdown file."""
if not api_call_details:
logger.info("No API call details to save.")
return
output_dir = Path(output_dir_path)
try:
output_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"Failed to create output directory for API call details {output_dir}: {e}")
return
md_output_file = output_dir / filename
markdown_content = []
for detail in api_call_details:
url_to_display = detail.request_url
if detail.request_params:
try:
import urllib.parse
query_string = urllib.parse.urlencode(detail.request_params)
url_to_display = f"{detail.request_url}?{query_string}"
except Exception as e:
logger.warning(f"Error formatting URL with params for display: {e}")
markdown_content.append(f"## `{detail.request_method} {url_to_display}`")
markdown_content.append("**cURL Command:**")
markdown_content.append("```sh")
markdown_content.append(detail.curl_command)
markdown_content.append("```")
markdown_content.append("### Request Details")
markdown_content.append(f"- **Method:** `{detail.request_method}`")
markdown_content.append(f"- **Full URL:** `{url_to_display}`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_params:
markdown_content.append("- **Query Parameters:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.request_params, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.request_body is not None:
markdown_content.append("- **Body:**")
body_lang = "json" if isinstance(detail.request_body, (dict, list)) else "text"
formatted_body = json.dumps(detail.request_body, indent=2, ensure_ascii=False) if body_lang == "json" else str(detail.request_body)
markdown_content.append(f"```{body_lang}")
markdown_content.append(formatted_body)
markdown_content.append("```")
markdown_content.append("### Response Details")
markdown_content.append(f"- **Status Code:** `{detail.response_status_code}`")
markdown_content.append(f"- **Elapsed Time:** `{detail.response_elapsed_time:.4f}s`")
markdown_content.append("- **Headers:**")
markdown_content.append("```json")
markdown_content.append(json.dumps(detail.response_headers, indent=2, ensure_ascii=False))
markdown_content.append("```")
if detail.response_body is not None:
markdown_content.append("- **Body:**")
resp_body_lang = "json" if isinstance(detail.response_body, (dict, list)) else "text"
formatted_resp_body = json.dumps(detail.response_body, indent=2, ensure_ascii=False) if resp_body_lang == "json" else str(detail.response_body)
markdown_content.append(f"```{resp_body_lang}")
markdown_content.append(formatted_resp_body)
markdown_content.append("```")
markdown_content.append("\n---\n")
with open(md_output_file, 'w', encoding='utf-8') as f_md:
f_md.write("\n".join(markdown_content))
logger.info(f"API call details saved to Markdown: {md_output_file}")
def save_pdf_report(summary_data, output_path: Path, strictness_level: str = 'CRITICAL'):
"""Saves the test summary as a formatted PDF file."""
if not reportlab_available:
logger.warning("ReportLab library not found. PDF report will not be generated. Please run 'pip install reportlab'.")
return
logger.info(f"Attempting to generate PDF report at: {output_path}")
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
font_name = 'SimSun'
font_path = 'assets/fonts/STHeiti-Medium-4.ttc'
if not Path(font_path).exists():
if hasattr(sys, '_MEIPASS'):
font_path = Path(sys._MEIPASS) / 'assets' / 'fonts' / 'STHeiti-Medium-4.ttc'
if not Path(font_path).exists():
logger.error(f"Font file not found at {font_path}")
return
pdfmetrics.registerFont(TTFont(font_name, str(font_path), subfontIndex=0))
pdfmetrics.registerFontFamily(font_name, normal=font_name, bold=font_name, italic=font_name, boldItalic=font_name)
doc = SimpleDocTemplate(str(output_path), pagesize=A4, title="API测试报告")
elements = []
styles = getSampleStyleSheet()
title_style = ParagraphStyle('ChineseTitle', parent=styles['Title'], fontName=font_name, fontSize=22, leading=28)
heading_style = ParagraphStyle('ChineseHeading', parent=styles['Heading1'], fontName=font_name, fontSize=16, leading=20, spaceAfter=8)
normal_style = ParagraphStyle('ChineseNormal', parent=styles['Normal'], fontName=font_name, fontSize=10, leading=14)
small_style = ParagraphStyle('ChineseSmall', parent=styles['Normal'], fontName=font_name, fontSize=9, leading=12)
# Add a new style for code blocks
code_style = ParagraphStyle('Code', parent=styles['Normal'], fontName='Courier', fontSize=9, leading=12, leftIndent=15, backColor=colors.whitesmoke, borderWidth=1, borderColor=colors.lightgrey, padding=5, borderRadius=2)
def to_para(text, style=normal_style, escape=True):
if text is None: content = ""
else: content = str(text)
if escape: content = html.escape(content)
content = "".join(ch for ch in content if unicodedata.category(ch)[0] != 'C')
if not content.strip(): return Paragraph('&nbsp;', style)
content = content.replace('\n', '<br/>')
return Paragraph(content, style)
# PDF Content Generation - 优化后的报告格式
# 生成报告编码(基于时间戳)
import time
report_code = f"DMS-TEST-{int(time.time())}"
# 报告标题
elements.append(to_para("数据管理服务测试分析报告", title_style, escape=False))
elements.append(Spacer(1, 15))
# 报告基本信息表格
basic_info_data = [
[to_para("<b>报告编码</b>", escape=False), to_para(report_code)],
[to_para("<b>报告名称</b>", escape=False), to_para("DMS领域数据服务测试分析报告")],
[to_para("<b>申请日期</b>", escape=False), to_para(datetime.datetime.now().strftime('%Y年%m月%d'))],
[to_para("<b>申请人</b>", escape=False), to_para("系统管理员")],
[to_para("<b>服务供应商名称</b>", escape=False), to_para("数据管理系统(DMS)")],
]
basic_info_table = Table(basic_info_data, colWidths=[120, '*'])
basic_info_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('BACKGROUND', (0,0), (0,-1), colors.lightgrey)
]))
elements.append(basic_info_table)
elements.append(Spacer(1, 20))
# 摘要部分
elements.append(to_para("摘要", heading_style, escape=False))
overall = summary_data.get('overall_summary', {})
start_time_str = summary_data.get('start_time', 'N/A')
end_time_str = summary_data.get('end_time', 'N/A')
duration = summary_data.get('duration_seconds', 0.0)
start_time_formatted = datetime.datetime.fromisoformat(start_time_str).strftime('%Y-%m-%d %H:%M:%S') if start_time_str != 'N/A' else 'N/A'
end_time_formatted = datetime.datetime.fromisoformat(end_time_str).strftime('%Y-%m-%d %H:%M:%S') if end_time_str != 'N/A' else 'N/A'
# 摘要内容
summary_text = f"""本次测试针对DMS数据管理系统领域数据服务进行全面的合规性验证。
测试时间:{start_time_formatted}{end_time_formatted},总耗时 {float(duration):.2f} 秒。
共测试 {overall.get('endpoints_tested', 'N/A')} 个API端点其中 {overall.get('endpoints_passed', 'N/A')} 个通过,{overall.get('endpoints_failed', 'N/A')} 个失败,{overall.get('endpoints_tested', 'N/A')-overall.get('endpoints_passed', 'N/A')-overall.get('endpoints_failed', 'N/A')}个跳过,端点成功率为 {overall.get('endpoint_success_rate', 'N/A')}
执行 {overall.get('total_test_cases_executed', 'N/A')} 个测试用例,其中 {overall.get('test_cases_passed', 'N/A')} 个通过,{overall.get('test_cases_failed', 'N/A')} 个失败,{overall.get('total_test_cases_executed', 'N/A')-overall.get('test_cases_passed', 'N/A')-overall.get('test_cases_failed', 'N/A')}个跳过,测试用例成功率为 {overall.get('test_case_success_rate', 'N/A')}
执行 {overall.get('total_stages_executed', 'N/A')} 个流程测试,其中 {overall.get('stages_passed', 'N/A')} 个通过,{overall.get('stages_failed', 'N/A')} 个失败,{overall.get('total_stages_executed', 'N/A')-overall.get('stages_passed', 'N/A')-overall.get('stages_failed', 'N/A')}个跳过,流程测试成功率为 {overall.get('stage_success_rate', 'N/A')}"""
elements.append(to_para(summary_text, normal_style))
elements.append(Spacer(1, 20))
# 测试内容包括 - API列表表格
elements.append(to_para("测试内容包括", heading_style, escape=False))
# 从测试结果中提取API信息
endpoint_results = summary_data.get('endpoint_results', [])
api_list_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>服务名称</b>", escape=False),
to_para("<b>服务功能描述</b>", escape=False), to_para("<b>服务参数描述</b>", escape=False),
to_para("<b>服务返回值描述</b>", escape=False)]
]
for i, endpoint in enumerate(endpoint_results[:10], 1): # 限制显示前10个API
endpoint_name = endpoint.get('endpoint_name', 'N/A')
# 简化的功能描述
if 'Create' in endpoint_name:
func_desc = "提供数据创建服务"
elif 'List' in endpoint_name or 'Query' in endpoint_name:
func_desc = "提供数据查询和列表服务"
elif 'Read' in endpoint_name:
func_desc = "提供单条数据读取服务"
elif 'Update' in endpoint_name:
func_desc = "提供数据更新服务"
elif 'Delete' in endpoint_name:
func_desc = "提供数据删除服务"
else:
func_desc = "提供数据管理服务"
api_list_data.append([
to_para(str(i), small_style),
to_para(endpoint_name, small_style),
to_para(func_desc, small_style),
to_para("标准DMS参数格式", small_style),
to_para("标准DMS响应格式", small_style)
])
api_list_table = Table(api_list_data, colWidths=[30, 80, 120, 80, 80])
api_list_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(api_list_table)
elements.append(Spacer(1, 20))
# 测试用例列表 - 根据严格等级分为必须和非必须
elements.append(to_para("测试用例列表", heading_style, escape=False))
# 定义严重性等级的数值映射
severity_levels = {
'CRITICAL': 5,
'HIGH': 4,
'MEDIUM': 3,
'LOW': 2,
'INFO': 1
}
strictness_value = severity_levels.get(strictness_level, 5) # 默认为CRITICAL
# 收集所有测试用例包括endpoint用例和stage用例
all_test_cases = []
# 1. 收集endpoint测试用例
for endpoint_result in endpoint_results:
test_cases = endpoint_result.get('executed_test_cases', [])
for tc in test_cases:
tc_severity = tc.get('test_case_severity', 'MEDIUM')
tc_severity_value = severity_levels.get(tc_severity, 3)
all_test_cases.append({
'type': 'Endpoint',
'endpoint': endpoint_result.get('endpoint_name', 'N/A'),
'case_name': tc.get('test_case_name', 'N/A'),
'status': tc.get('status', 'N/A'),
'severity': tc_severity,
'severity_value': tc_severity_value,
'is_required': tc_severity_value >= strictness_value
})
# 2. 收集stage测试用例
stage_results = summary_data.get('stage_results', [])
for stage_result in stage_results:
stage_name = stage_result.get('stage_name', 'N/A')
stage_status = stage_result.get('overall_status', 'N/A')
stage_severity = 'HIGH' # Stage用例通常是高优先级
stage_severity_value = severity_levels.get(stage_severity, 4)
# 将stage作为一个测试用例添加
all_test_cases.append({
'type': 'Stage',
'endpoint': f"Stage: {stage_name}",
'case_name': stage_result.get('description', stage_name),
'status': stage_status,
'severity': stage_severity,
'severity_value': stage_severity_value,
'is_required': stage_severity_value >= strictness_value
})
# 分离必须和非必须的测试用例
required_cases = [case for case in all_test_cases if case['is_required']]
optional_cases = [case for case in all_test_cases if not case['is_required']]
# 创建分离的测试用例表格
if all_test_cases:
# 添加严格等级说明
strictness_text = f"当前严格等级:{strictness_level}。根据此等级,测试用例被分为必须执行和非必须执行两部分。"
elements.append(to_para(strictness_text, small_style))
elements.append(Spacer(1, 10))
# 1. 必须的测试用例表格
if required_cases:
elements.append(to_para("必须的测试用例(影响测试结果)", heading_style, escape=False))
required_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(required_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
required_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
required_table = Table(required_table_data, colWidths=[25, 35, 110, 90, 45, 45])
required_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightblue), # 使用浅蓝色突出必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(required_table)
elements.append(Spacer(1, 15))
# 2. 非必须的测试用例表格
if optional_cases:
elements.append(to_para("非必须的测试用例(不影响测试结果)", heading_style, escape=False))
optional_table_data = [
[to_para("<b>序号</b>", escape=False), to_para("<b>类型</b>", escape=False),
to_para("<b>测试用例名称</b>", escape=False), to_para("<b>所属端点/阶段</b>", escape=False),
to_para("<b>优先级</b>", escape=False), to_para("<b>执行结果</b>", escape=False)]
]
for i, case in enumerate(optional_cases, 1):
status_display = "通过" if case['status'] == "通过" else "失败" if case['status'] == "失败" else case['status']
optional_table_data.append([
to_para(str(i), small_style),
to_para(case['type'], small_style),
to_para(case['case_name'], small_style),
to_para(case['endpoint'], small_style),
to_para(case['severity'], small_style),
to_para(status_display, small_style)
])
optional_table = Table(optional_table_data, colWidths=[25, 35, 110, 90, 45, 45])
optional_table.setStyle(TableStyle([
('GRID', (0,0), (-1,-1), 1, colors.grey),
('BACKGROUND', (0,0), (-1,0), colors.lightgrey), # 使用浅灰色表示非必须用例
('ALIGN', (0,0), (-1,-1), 'CENTER'),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('FONTSIZE', (0,0), (-1,-1), 8)
]))
elements.append(optional_table)
elements.append(Spacer(1, 10))
# 添加用例统计信息
total_cases = len(all_test_cases)
endpoint_cases = len([c for c in all_test_cases if c['type'] == 'Endpoint'])
stage_cases = len([c for c in all_test_cases if c['type'] == 'Stage'])
required_count = len(required_cases)
optional_count = len(optional_cases)
stats_text = f"""测试用例统计:
总计 {total_cases} 个用例,其中端点用例 {endpoint_cases} 个,阶段用例 {stage_cases} 个。
必须用例 {required_count} 个,非必须用例 {optional_count} 个。
严格等级:{strictness_level}{severity_levels.get(strictness_level, 5)}级及以上为必须)。"""
elements.append(to_para(stats_text, small_style))
else:
elements.append(to_para("无测试用例执行记录。", normal_style))
elements.append(Spacer(1, 20))
elements.append(to_para("详细测试结果", heading_style, escape=False))
detailed_results = summary_data.get('endpoint_results', [])
if not detailed_results:
elements.append(to_para("无详细测试结果。"))
else:
status_map = {"PASSED": ("通过", colors.green), "FAILED": ("失败", colors.red), "ERROR": ("错误", colors.orange), "SKIPPED": ("跳过", colors.grey)}
for endpoint_result in detailed_results:
endpoint_name = endpoint_result.get('endpoint_name', 'N/A')
endpoint_style = ParagraphStyle('endpoint_heading', parent=heading_style, fontSize=12, spaceBefore=12, spaceAfter=6)
elements.append(to_para(f"端点: {endpoint_name}", style=endpoint_style))
test_cases = endpoint_result.get('executed_test_cases', [])
if not test_cases:
elements.append(to_para("该端点没有执行测试用例。", style=normal_style))
continue
for tc_result in test_cases:
elements.append(to_para(f"用例: {tc_result.get('test_case_name', 'N/A')}"))
status_en = tc_result.get('status', 'N/A')
status_cn, status_color = status_map.get(status_en, (status_en, colors.black))
status_text = f"状态: <font color='{status_color.hexval()}'>{status_cn}</font>"
elements.append(to_para(status_text, escape=False))
elements.append(to_para("消息:"))
message_text = tc_result.get('message', '')
message_style = ParagraphStyle('message_style', parent=normal_style, leftIndent=15)
elements.append(to_para(message_text, style=message_style, escape=True))
elements.append(Spacer(1, 6))
elements.append(HRFlowable(width="100%", thickness=0.5, color=colors.grey))
elements.append(Spacer(1, 6))
# Add Stage Results Section
elements.append(HRFlowable(width="100%", thickness=1, color=colors.black))
elements.append(Spacer(1, 12))
elements.append(to_para("流程测试结果 (Stages)", heading_style, escape=False))
stage_results = summary_data.get('stage_results', [])
if not stage_results:
elements.append(to_para("无流程测试结果。"))
else:
for stage_result in stage_results:
stage_name = stage_result.get('stage_name', 'N/A')
stage_style = ParagraphStyle('stage_heading', parent=heading_style, fontSize=14, spaceBefore=12, spaceAfter=6)
elements.append(to_para(f"流程: {stage_name}", style=stage_style))
stage_status_en = stage_result.get('status', 'N/A')
stage_status_cn, stage_status_color = status_map.get(stage_status_en, (stage_status_en, colors.black))
stage_status_text = f"<b>整体状态</b>: <font color='{stage_status_color.hexval()}'>{stage_status_cn}</font>"
elements.append(to_para(stage_status_text, escape=False))
stage_message = stage_result.get('message', '')
if stage_message:
elements.append(to_para(f"<b>消息</b>: {html.escape(stage_message)}", escape=False))
elements.append(Spacer(1, 10))
elements.append(to_para("<b>执行步骤详情:</b>", escape=False))
# The key is 'executed_steps', not 'executed_test_steps'
executed_steps = stage_result.get('executed_steps', [])
if not executed_steps:
elements.append(to_para("该流程没有执行任何步骤。", style=normal_style))
else:
step_style = ParagraphStyle('step_style', parent=normal_style, leftIndent=15)
for step_result in executed_steps:
elements.append(Spacer(1, 4))
step_name_text = f"<b>步骤</b>: {html.escape(step_result.get('step_name', 'N/A'))}"
elements.append(to_para(step_name_text, style=step_style, escape=False))
step_status_en = step_result.get('status', 'N/A')
step_status_cn, step_status_color = status_map.get(step_status_en, (step_status_en, colors.black))
step_status_text = f"&nbsp;&nbsp;<b>状态</b>: <font color='{step_status_color.hexval()}'>{step_status_cn}</font>"
elements.append(to_para(step_status_text, style=step_style, escape=False))
step_message = step_result.get('message', '')
if step_message:
step_message_text = f"&nbsp;&nbsp;<b>消息</b>: {html.escape(step_message)}"
elements.append(to_para(step_message_text, style=step_style, escape=False))
step_duration = step_result.get('duration_seconds', 0)
step_duration_text = f"&nbsp;&nbsp;<b>耗时</b>: {float(step_duration):.4f}s"
elements.append(to_para(step_duration_text, style=step_style, escape=False))
elements.append(Spacer(1, 6))
elements.append(HRFlowable(width="100%", thickness=0.5, color=colors.grey))
elements.append(Spacer(1, 6))
doc.build(elements)
logger.info(f"PDF report successfully generated: {output_path}")
except Exception as e:
logger.error(f"Error building PDF document: {e}", exc_info=True)
# --- Core Test Execution Logic ---
def run_tests_logic(config: dict):
"""
Main logic for running tests, adapted from the main() function in run_api_tests.py.
"""
try:
if config.get('verbose'):
logging.getLogger('ddms_compliance_suite').setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.debug("Verbose logging enabled.")
if not any(k in config for k in ['yapi', 'swagger', 'dms']):
raise ValueError("An API definition source is required: --yapi, --swagger, or --dms")
if sum(k in config for k in ['yapi', 'swagger', 'dms']) > 1:
raise ValueError("API definition sources are mutually exclusive.")
# Setup output directory with timestamp
base_output_dir = Path(config.get('output', './test_reports'))
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_directory = base_output_dir / timestamp
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Test reports will be saved to: {output_directory.resolve()}")
# Initialize the orchestrator
orchestrator = APITestOrchestrator(
base_url=config['base-url'],
custom_test_cases_dir=config.get('custom-test-cases-dir'),
llm_api_key=config.get('llm-api-key'),
llm_base_url=config.get('llm-base-url'),
llm_model_name=config.get('llm-model-name'),
use_llm_for_request_body=config.get('use-llm-for-request-body', False),
use_llm_for_path_params=config.get('use-llm-for-path-params', False),
use_llm_for_query_params=config.get('use-llm-for-query-params', False),
use_llm_for_headers=config.get('use-llm-for-headers', False),
output_dir=str(output_directory),
stages_dir=config.get('stages-dir'),
strictness_level=config.get('strictness-level', 'CRITICAL'),
ignore_ssl=config.get('ignore-ssl', False),
enable_well_data=config.get('enable-well-data', True) # 默认启用井数据功能
)
test_summary: Optional[TestSummary] = None
parsed_spec: Optional[ParsedAPISpec] = None
pagination_info: Dict[str, Any] = {}
if 'yapi' in config:
logger.info(f"Running tests from YAPI file: {config['yapi']}")
test_summary, parsed_spec = orchestrator.run_tests_from_yapi(
yapi_file_path=config['yapi'],
categories=config.get('categories'),
custom_test_cases_dir=config.get('custom-test-cases-dir')
)
elif 'swagger' in config:
logger.info(f"Running tests from Swagger file: {config['swagger']}")
test_summary, parsed_spec = orchestrator.run_tests_from_swagger(
swagger_file_path=config['swagger'],
tags=config.get('tags'),
custom_test_cases_dir=config.get('custom-test-cases-dir')
)
elif 'dms' in config:
logger.info(f"Running tests from DMS service discovery: {config['dms']}")
test_summary, parsed_spec, pagination_info = orchestrator.run_tests_from_dms(
domain_mapping_path=config['dms'],
categories=config.get('categories'),
custom_test_cases_dir=config.get('custom-test-cases-dir'),
page_size=config.get('page-size', 1000)
)
if not parsed_spec:
raise RuntimeError("Failed to parse the API specification.")
if test_summary and config.get('stages-dir') and parsed_spec:
logger.info(f"Executing API test stages from directory: {config['stages-dir']}")
orchestrator.run_stages_from_spec(
parsed_spec=parsed_spec,
summary=test_summary
)
test_summary.finalize_summary()
test_summary.print_summary_to_console()
# Save reports
if test_summary:
main_report_file_path = output_directory / f"summary.{config.get('format', 'json')}"
save_results(test_summary, str(main_report_file_path), config.get('format', 'json'))
if config.get('generate-pdf', True):
pdf_report_path = output_directory / "report_cn.pdf"
save_pdf_report(test_summary.to_dict(), pdf_report_path, config.get('strictness-level', 'CRITICAL'))
api_calls_filename = "api_call_details.md"
save_api_call_details_to_file(
orchestrator.get_api_call_details(),
str(output_directory),
filename=api_calls_filename
)
failed_count = getattr(test_summary, 'endpoints_failed', 0) + getattr(test_summary, 'test_cases_failed', 0)
error_count = getattr(test_summary, 'endpoints_error', 0) + getattr(test_summary, 'test_cases_error', 0)
result = {
"status": "completed",
"message": "Tests finished." if failed_count == 0 and error_count == 0 else "Tests finished with failures or errors.",
"report_directory": str(output_directory.resolve()),
"summary": test_summary.to_dict()
}
# 如果有分页信息,添加到返回结果中
if pagination_info:
result["pagination"] = pagination_info
return result
else:
raise RuntimeError("Test execution failed to produce a summary.")
except Exception as e:
logger.error(f"An unexpected error occurred during test execution: {e}", exc_info=True)
return {
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
}
# --- Flask API Endpoint ---
@app.route('/', methods=['GET'])
def health_check():
"""Health check endpoint for Docker health checks."""
return {"status": "healthy", "service": "DMS Compliance API Server"}, 200
@app.route('/run', methods=['POST'])
def run_api_tests_endpoint():
"""
Runs API tests by directly invoking the test orchestrator logic.
The request body should be a JSON object with keys corresponding to the script's command-line arguments.
"""
# Default configuration based on user's request
defaults = {
'base-url': 'http://127.0.0.1:5001/',
'dms': './assets/doc/dms/domain.json',
'stages-dir': './custom_stages',
'custom-test-cases-dir': './custom_testcases',
'verbose': True,
'output': './test_reports/',
'format': 'json',
'generate-pdf': True,
'strictness-level': 'CRITICAL',
'ignore-ssl': True, # 默认忽略SSL证书验证
# Default LLM options
# 'llm-api-key': os.environ.get("OPENAI_API_KEY"),
'llm-api-key': "sk-lbGrsUPL1iby86h554FaE536C343435dAa9bA65967A840B2",
# 'llm-base-url': "https://dashscope.aliyuncs.com/compatible-mode/v1",
'llm-base-url': "https://aiproxy.petrotech.cnpc/v1",
# 'llm-model-name': "qwen-plus",
'llm-model-name': "deepseek-v3",
'use-llm-for-request-body': False,
'use-llm-for-path-params': False,
'use-llm-for-query-params': False,
'use-llm-for-headers': False
}
try:
# Use silent=True to prevent an exception if Content-Type is not application/json
# This allows the endpoint to gracefully fall back to defaults.
request_config = request.get_json(silent=True)
if not request_config:
# If no JSON body is provided or it's empty, run with defaults
config = defaults
else:
# Merge request config with defaults
config = {**defaults, **request_config}
logger.info(f"Starting test run with configuration: {json.dumps(config, indent=2)}")
result = run_tests_logic(config)
if result['status'] == 'error':
return jsonify(result), 500
return jsonify(result), 200
except Exception as e:
logger.error(f"An error occurred in the API endpoint: {e}", exc_info=True)
return jsonify({
"status": "error",
"message": str(e),
"traceback": traceback.format_exc()
}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5050)
# Example cURL to trigger the endpoint with custom params:
# curl -X POST http://127.0.0.1:5002/run \
# -H "Content-Type: application/json" \
# -d '{
# "base-url": "http://127.0.0.1:5001/",
# "dms": "./assets/doc/dms/domain.json",
# "custom-test-cases-dir": "./custom_testcases",
# "stages-dir": "./custom_stages",
# "output": "./test_reports/"
# }'