943 lines
51 KiB
Python
943 lines
51 KiB
Python
"""
|
||
测试编排器模块
|
||
|
||
负责组合API解析器、API调用器、验证器和规则执行器,进行端到端的API测试
|
||
"""
|
||
|
||
import logging
|
||
import json
|
||
import time
|
||
from typing import Dict, List, Any, Optional, Union, Tuple, Type
|
||
from enum import Enum
|
||
import datetime
|
||
|
||
from .input_parser.parser import InputParser, YAPIEndpoint, SwaggerEndpoint, ParsedYAPISpec, ParsedSwaggerSpec
|
||
from .api_caller.caller import APICaller, APIRequest, APIResponse
|
||
from .json_schema_validator.validator import JSONSchemaValidator
|
||
from .rule_repository.repository import RuleRepository
|
||
from .rule_executor.executor import RuleExecutor
|
||
from .models.rule_models import RuleQuery, TargetType, RuleCategory, RuleLifecycle, RuleScope
|
||
from .models.config_models import RuleRepositoryConfig, RuleStorageConfig
|
||
from .test_framework_core import ValidationResult, TestSeverity, APIRequestContext, APIResponseContext, BaseAPITestCase
|
||
from .test_case_registry import TestCaseRegistry
|
||
|
||
class ExecutedTestCaseResult:
|
||
"""存储单个APITestCase在其适用的端点上执行后的结果。"""
|
||
|
||
class Status(str, Enum):
|
||
"""单个测试用例的执行状态枚举"""
|
||
PASSED = "通过"
|
||
FAILED = "失败"
|
||
ERROR = "执行错误" # 指测试用例代码本身出错,而不是API验证失败
|
||
SKIPPED = "跳过" # 如果测试用例因某些条件被跳过执行
|
||
|
||
def __init__(self,
|
||
test_case_id: str,
|
||
test_case_name: str,
|
||
test_case_severity: TestSeverity,
|
||
status: Status,
|
||
validation_points: List[ValidationResult],
|
||
message: str = "", # 总体消息,例如执行错误时的错误信息
|
||
duration: float = 0.0):
|
||
self.test_case_id = test_case_id
|
||
self.test_case_name = test_case_name
|
||
self.test_case_severity = test_case_severity
|
||
self.status = status
|
||
self.validation_points = validation_points or []
|
||
self.message = message
|
||
self.duration = duration # 执行此测试用例的耗时
|
||
self.timestamp = datetime.datetime.now()
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
return {
|
||
"test_case_id": self.test_case_id,
|
||
"test_case_name": self.test_case_name,
|
||
"test_case_severity": self.test_case_severity.value, # 使用枚举值
|
||
"status": self.status.value,
|
||
"message": self.message,
|
||
"duration_seconds": self.duration,
|
||
"timestamp": self.timestamp.isoformat(),
|
||
"validation_points": [vp.details if vp.details else {"passed": vp.passed, "message": vp.message} for vp in self.validation_points]
|
||
}
|
||
|
||
class TestResult: # 原来的 TestResult 被重构为 EndpointExecutionResult
|
||
"""
|
||
存储对单个API端点执行所有适用APITestCase后的整体测试结果。
|
||
(此类替换了旧的 TestResult 的角色,并进行了结构调整)
|
||
"""
|
||
class Status(str, Enum): # 这个枚举保持不变,但其含义现在是端点的整体状态
|
||
"""端点测试状态枚举"""
|
||
PASSED = "通过" # 所有关键测试用例通过
|
||
FAILED = "失败" # 任何一个关键测试用例失败
|
||
ERROR = "错误" # 测试执行过程中出现错误(非API本身错误,而是测试代码或环境)
|
||
SKIPPED = "跳过" # 如果整个端点的测试被跳过
|
||
PARTIAL_SUCCESS = "部分成功" # 一些非关键测试用例失败,但关键的通过
|
||
|
||
def __init__(self,
|
||
endpoint_id: str, # 通常是 method + path
|
||
endpoint_name: str, # API 的可读名称/标题
|
||
# api_spec_details: Dict[str, Any], # 包含该端点从YAPI/Swagger解析的原始信息,可选
|
||
overall_status: Status = Status.SKIPPED, # 默认为跳过,后续根据测试用例结果更新
|
||
start_time: Optional[datetime.datetime] = None
|
||
):
|
||
self.endpoint_id = endpoint_id
|
||
self.endpoint_name = endpoint_name
|
||
# self.api_spec_details = api_spec_details
|
||
self.overall_status = overall_status
|
||
self.executed_test_cases: List[ExecutedTestCaseResult] = []
|
||
self.start_time = start_time if start_time else datetime.datetime.now()
|
||
self.end_time: Optional[datetime.datetime] = None
|
||
self.error_message: Optional[str] = None # 如果整个端点测试出错,记录错误信息
|
||
|
||
def add_executed_test_case_result(self, result: ExecutedTestCaseResult):
|
||
self.executed_test_cases.append(result)
|
||
|
||
def finalize_endpoint_test(self):
|
||
self.end_time = datetime.datetime.now()
|
||
# 根据所有 executed_test_cases 的状态和严重性来计算 overall_status
|
||
if not self.executed_test_cases and self.overall_status == TestResult.Status.SKIPPED : # 如果没有执行任何测试用例且状态仍为初始的SKIPPED
|
||
pass # 保持 SKIPPED
|
||
elif any(tc.status == ExecutedTestCaseResult.Status.ERROR for tc in self.executed_test_cases):
|
||
self.overall_status = TestResult.Status.ERROR
|
||
# 可以考虑将第一个遇到的ERROR的message赋给self.error_message
|
||
first_error = next((tc.message for tc in self.executed_test_cases if tc.status == ExecutedTestCaseResult.Status.ERROR), None)
|
||
if first_error:
|
||
self.error_message = f"测试用例执行错误: {first_error}"
|
||
else:
|
||
# 筛选出失败的测试用例
|
||
failed_tcs = [tc for tc in self.executed_test_cases if tc.status == ExecutedTestCaseResult.Status.FAILED]
|
||
if not failed_tcs:
|
||
if not self.executed_test_cases: # 如果没有执行任何测试用例但又不是SKIPPED,可能也算某种形式的错误或特殊通过
|
||
self.overall_status = TestResult.Status.PASSED # 或者定义一个"NO_CASES_RUN"状态
|
||
else:
|
||
self.overall_status = TestResult.Status.PASSED
|
||
else:
|
||
# 检查失败的测试用例中是否有CRITICAL或HIGH严重级别的
|
||
if any(tc.test_case_severity in [TestSeverity.CRITICAL, TestSeverity.HIGH] for tc in failed_tcs):
|
||
self.overall_status = TestResult.Status.FAILED
|
||
else: # 所有失败的都是 MEDIUM, LOW, INFO
|
||
self.overall_status = TestResult.Status.PARTIAL_SUCCESS
|
||
|
||
if not self.executed_test_cases and self.overall_status not in [TestResult.Status.SKIPPED, TestResult.Status.ERROR]:
|
||
# 如果没有执行测试用例,并且不是因为错误或明确跳过,这可能是一个配置问题或意外情况
|
||
self.overall_status = TestResult.Status.ERROR # 或者一个更特定的状态
|
||
self.error_message = "没有为该端点找到或执行任何适用的测试用例。"
|
||
|
||
|
||
@property
|
||
def duration(self) -> float:
|
||
if self.start_time and self.end_time:
|
||
return (self.end_time - self.start_time).total_seconds()
|
||
return 0.0
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
data = {
|
||
"endpoint_id": self.endpoint_id,
|
||
"endpoint_name": self.endpoint_name,
|
||
"overall_status": self.overall_status.value,
|
||
"duration_seconds": self.duration,
|
||
"start_time": self.start_time.isoformat() if self.start_time else None,
|
||
"end_time": self.end_time.isoformat() if self.end_time else None,
|
||
"executed_test_cases": [tc.to_dict() for tc in self.executed_test_cases]
|
||
}
|
||
if self.error_message:
|
||
data["error_message"] = self.error_message
|
||
return data
|
||
|
||
class TestSummary:
|
||
"""测试结果摘要 (已更新以适应新的结果结构)"""
|
||
|
||
def __init__(self):
|
||
self.total_endpoints_defined: int = 0 # YAPI/Swagger中定义的端点总数
|
||
self.total_endpoints_tested: int = 0 # 实际执行了测试的端点数量 (至少有一个测试用例被执行)
|
||
|
||
self.endpoints_passed: int = 0
|
||
self.endpoints_failed: int = 0
|
||
self.endpoints_partial_success: int = 0
|
||
self.endpoints_error: int = 0
|
||
self.endpoints_skipped: int = 0 # 由于配置或过滤器,整个端点被跳过测试
|
||
|
||
self.total_test_cases_applicable: int = 0 # 所有端点上适用测试用例的总和
|
||
self.total_test_cases_executed: int = 0 # 所有端点上实际执行的测试用例总数
|
||
self.test_cases_passed: int = 0
|
||
self.test_cases_failed: int = 0
|
||
self.test_cases_error: int = 0 # 测试用例代码本身出错
|
||
self.test_cases_skipped_in_endpoint: int = 0 # 测试用例在端点执行中被跳过
|
||
|
||
self.start_time = datetime.datetime.now()
|
||
self.end_time: Optional[datetime.datetime] = None
|
||
self.detailed_results: List[TestResult] = [] # 将存储新的 TestResult (EndpointExecutionResult) 对象
|
||
|
||
def add_endpoint_result(self, result: TestResult): # result 现在是新的 TestResult 类型
|
||
self.detailed_results.append(result)
|
||
|
||
if result.executed_test_cases or result.overall_status not in [TestResult.Status.SKIPPED, TestResult.Status.ERROR]: # 只有实际尝试了测试的端点才算tested
|
||
if not (len(result.executed_test_cases) == 0 and result.overall_status == TestResult.Status.ERROR and result.error_message and "没有为该端点找到或执行任何适用的测试用例" in result.error_message):
|
||
self.total_endpoints_tested +=1
|
||
|
||
if result.overall_status == TestResult.Status.PASSED:
|
||
self.endpoints_passed += 1
|
||
elif result.overall_status == TestResult.Status.FAILED:
|
||
self.endpoints_failed += 1
|
||
elif result.overall_status == TestResult.Status.PARTIAL_SUCCESS:
|
||
self.endpoints_partial_success +=1
|
||
elif result.overall_status == TestResult.Status.ERROR:
|
||
self.endpoints_error += 1
|
||
elif result.overall_status == TestResult.Status.SKIPPED: # 端点级别跳过
|
||
self.endpoints_skipped +=1
|
||
|
||
for tc_result in result.executed_test_cases:
|
||
self.total_test_cases_executed += 1 # 每个APITestCase算一次执行
|
||
if tc_result.status == ExecutedTestCaseResult.Status.PASSED:
|
||
self.test_cases_passed += 1
|
||
elif tc_result.status == ExecutedTestCaseResult.Status.FAILED:
|
||
self.test_cases_failed += 1
|
||
elif tc_result.status == ExecutedTestCaseResult.Status.ERROR:
|
||
self.test_cases_error +=1
|
||
elif tc_result.status == ExecutedTestCaseResult.Status.SKIPPED:
|
||
self.test_cases_skipped_in_endpoint +=1
|
||
|
||
def set_total_endpoints_defined(self, count: int):
|
||
self.total_endpoints_defined = count
|
||
|
||
def set_total_test_cases_applicable(self, count: int):
|
||
self.total_test_cases_applicable = count
|
||
|
||
def finalize_summary(self):
|
||
self.end_time = datetime.datetime.now()
|
||
|
||
@property
|
||
def duration(self) -> float:
|
||
if not self.end_time:
|
||
return 0.0
|
||
return (self.end_time - self.start_time).total_seconds()
|
||
|
||
@property
|
||
def endpoint_success_rate(self) -> float:
|
||
if self.total_endpoints_tested == 0:
|
||
return 0.0
|
||
# 通常只把 PASSED 算作成功
|
||
return (self.endpoints_passed / self.total_endpoints_tested) * 100
|
||
|
||
@property
|
||
def test_case_success_rate(self) -> float:
|
||
if self.total_test_cases_executed == 0:
|
||
return 0.0
|
||
return (self.test_cases_passed / self.total_test_cases_executed) * 100
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
return {
|
||
"summary_metadata": {
|
||
"start_time": self.start_time.isoformat(),
|
||
"end_time": self.end_time.isoformat() if self.end_time else None,
|
||
"duration_seconds": f"{self.duration:.2f}",
|
||
},
|
||
"endpoint_stats": {
|
||
"total_defined": self.total_endpoints_defined,
|
||
"total_tested": self.total_endpoints_tested,
|
||
"passed": self.endpoints_passed,
|
||
"failed": self.endpoints_failed,
|
||
"partial_success": self.endpoints_partial_success,
|
||
"error": self.endpoints_error,
|
||
"skipped": self.endpoints_skipped,
|
||
"success_rate_percentage": f"{self.endpoint_success_rate:.2f}",
|
||
},
|
||
"test_case_stats": {
|
||
"total_applicable": self.total_test_cases_applicable, # 计划执行的测试用例总数
|
||
"total_executed": self.total_test_cases_executed, # 实际执行的测试用例总数
|
||
"passed": self.test_cases_passed,
|
||
"failed": self.test_cases_failed,
|
||
"error_in_execution": self.test_cases_error,
|
||
"skipped_during_endpoint_execution": self.test_cases_skipped_in_endpoint,
|
||
"success_rate_percentage": f"{self.test_case_success_rate:.2f}",
|
||
},
|
||
"detailed_results": [result.to_dict() for result in self.detailed_results]
|
||
}
|
||
|
||
def to_json(self, pretty=True) -> str:
|
||
indent = 2 if pretty else None
|
||
return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
|
||
|
||
def print_summary_to_console(self): # Renamed from print_summary
|
||
# (Implementation can be more detailed based on the new stats)
|
||
print("\\n===== 测试运行摘要 =====")
|
||
print(f"开始时间: {self.start_time.isoformat()}")
|
||
if self.end_time:
|
||
print(f"结束时间: {self.end_time.isoformat()}")
|
||
print(f"总耗时: {self.duration:.2f} 秒")
|
||
|
||
print("\\n--- 端点统计 ---")
|
||
print(f"定义的端点总数: {self.total_endpoints_defined}")
|
||
print(f"实际测试的端点数: {self.total_endpoints_tested}")
|
||
print(f" 通过: {self.endpoints_passed}")
|
||
print(f" 失败: {self.endpoints_failed}")
|
||
print(f" 部分成功: {self.endpoints_partial_success}")
|
||
print(f" 执行错误: {self.endpoints_error}")
|
||
print(f" 跳过执行: {self.endpoints_skipped}")
|
||
print(f" 端点通过率: {self.endpoint_success_rate:.2f}%")
|
||
|
||
print("\\n--- 测试用例统计 ---")
|
||
print(f"适用的测试用例总数 (计划执行): {self.total_test_cases_applicable}")
|
||
print(f"实际执行的测试用例总数: {self.total_test_cases_executed}")
|
||
print(f" 通过: {self.test_cases_passed}")
|
||
print(f" 失败: {self.test_cases_failed}")
|
||
print(f" 执行错误 (测试用例代码问题): {self.test_cases_error}")
|
||
print(f" 跳过 (在端点内被跳过): {self.test_cases_skipped_in_endpoint}")
|
||
print(f" 测试用例通过率: {self.test_case_success_rate:.2f}%")
|
||
|
||
# 可选:打印失败的端点和测试用例摘要
|
||
failed_endpoints = [res for res in self.detailed_results if res.overall_status == TestResult.Status.FAILED]
|
||
if failed_endpoints:
|
||
print("\\n--- 失败的端点摘要 ---")
|
||
for ep_res in failed_endpoints:
|
||
print(f" 端点: {ep_res.endpoint_id} ({ep_res.endpoint_name}) - 状态: {ep_res.overall_status.value}")
|
||
for tc_res in ep_res.executed_test_cases:
|
||
if tc_res.status == ExecutedTestCaseResult.Status.FAILED:
|
||
print(f" - 测试用例失败: {tc_res.test_case_id} ({tc_res.test_case_name})")
|
||
for vp in tc_res.validation_points:
|
||
if not vp.passed:
|
||
print(f" - 验证点: {vp.message}")
|
||
|
||
class APITestOrchestrator:
|
||
"""API测试编排器"""
|
||
|
||
def __init__(self, base_url: str,
|
||
rule_repo_path: str = "./rules", # 旧规则引擎的规则库路径
|
||
custom_test_cases_dir: Optional[str] = None # 新的自定义测试用例目录路径
|
||
):
|
||
"""
|
||
初始化API测试编排器
|
||
|
||
Args:
|
||
base_url: API基础URL
|
||
rule_repo_path: (旧)规则库路径
|
||
custom_test_cases_dir: 存放自定义 APITestCase 的目录路径。如果为 None,则不加载自定义测试用例。
|
||
"""
|
||
self.base_url = base_url.rstrip('/')
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
# 初始化组件
|
||
self.parser = InputParser()
|
||
self.api_caller = APICaller()
|
||
self.validator = JSONSchemaValidator() # JSON Schema 验证器,可能会被测试用例内部使用
|
||
|
||
# 初始化 (旧) 规则库和规则执行器
|
||
# 未来可以考虑是否完全移除或将其功能也通过 APITestCase 实现
|
||
rule_config = RuleRepositoryConfig(
|
||
storage=RuleStorageConfig(path=rule_repo_path)
|
||
)
|
||
self.rule_repo = RuleRepository(rule_config)
|
||
self.rule_executor = RuleExecutor(self.rule_repo)
|
||
|
||
# 初始化 (新) 测试用例注册表
|
||
self.test_case_registry: Optional[TestCaseRegistry] = None
|
||
if custom_test_cases_dir:
|
||
self.logger.info(f"初始化 TestCaseRegistry,扫描目录: {custom_test_cases_dir}")
|
||
try:
|
||
self.test_case_registry = TestCaseRegistry(test_cases_dir=custom_test_cases_dir)
|
||
self.logger.info(f"TestCaseRegistry 初始化完成,发现 {len(self.test_case_registry.get_all_test_case_classes())} 个测试用例类。")
|
||
except Exception as e:
|
||
self.logger.error(f"初始化 TestCaseRegistry 失败: {e}", exc_info=True)
|
||
# 即使注册表初始化失败,编排器本身仍可用于旧逻辑(如果保留)或不运行自定义测试
|
||
else:
|
||
self.logger.info("未提供 custom_test_cases_dir,不加载自定义 APITestCase。")
|
||
|
||
def _execute_single_test_case(
|
||
self,
|
||
test_case_class: Type[BaseAPITestCase],
|
||
endpoint_spec: Union[YAPIEndpoint, SwaggerEndpoint], # 当前端点的规格
|
||
global_api_spec: Union[ParsedYAPISpec, ParsedSwaggerSpec] # 整个API的规格
|
||
) -> ExecutedTestCaseResult:
|
||
"""
|
||
实例化并执行单个APITestCase。
|
||
"""
|
||
tc_start_time = time.time()
|
||
validation_points: List[ValidationResult] = []
|
||
test_case_instance: Optional[BaseAPITestCase] = None
|
||
|
||
# 准备 endpoint_spec_dict
|
||
endpoint_spec_dict: Dict[str, Any]
|
||
if hasattr(endpoint_spec, 'to_dict') and callable(endpoint_spec.to_dict):
|
||
endpoint_spec_dict = endpoint_spec.to_dict()
|
||
elif isinstance(endpoint_spec, (YAPIEndpoint, SwaggerEndpoint)):
|
||
endpoint_spec_dict = {
|
||
"method": getattr(endpoint_spec, 'method', 'UNKNOWN_METHOD'),
|
||
"path": getattr(endpoint_spec, 'path', 'UNKNOWN_PATH'),
|
||
"title": getattr(endpoint_spec, 'title', ''),
|
||
"summary": getattr(endpoint_spec, 'summary', ''),
|
||
"_original_object_type": type(endpoint_spec).__name__
|
||
}
|
||
if isinstance(endpoint_spec, YAPIEndpoint):
|
||
for attr_name in dir(endpoint_spec):
|
||
if not attr_name.startswith('_') and not callable(getattr(endpoint_spec, attr_name)):
|
||
try:
|
||
# Test serializability before adding
|
||
json.dumps({attr_name: getattr(endpoint_spec, attr_name)})
|
||
endpoint_spec_dict[attr_name] = getattr(endpoint_spec, attr_name)
|
||
except (TypeError, OverflowError):
|
||
pass
|
||
elif isinstance(endpoint_spec, SwaggerEndpoint):
|
||
if hasattr(endpoint_spec, 'parameters'): endpoint_spec_dict['parameters'] = endpoint_spec.parameters
|
||
if hasattr(endpoint_spec, 'request_body'): endpoint_spec_dict['request_body'] = endpoint_spec.request_body
|
||
if hasattr(endpoint_spec, 'responses'): endpoint_spec_dict['responses'] = endpoint_spec.responses
|
||
else:
|
||
endpoint_spec_dict = endpoint_spec if isinstance(endpoint_spec, dict) else {}
|
||
if not endpoint_spec_dict:
|
||
self.logger.warning(f"endpoint_spec 无法转换为字典,实际类型: {type(endpoint_spec)}")
|
||
|
||
global_api_spec_dict: Dict[str, Any]
|
||
if hasattr(global_api_spec, 'to_dict') and callable(global_api_spec.to_dict):
|
||
global_api_spec_dict = global_api_spec.to_dict()
|
||
else:
|
||
global_api_spec_dict = global_api_spec if isinstance(global_api_spec, dict) else {}
|
||
if not global_api_spec_dict:
|
||
self.logger.warning(f"global_api_spec 无法转换为字典,实际类型: {type(global_api_spec)}")
|
||
|
||
|
||
try:
|
||
test_case_instance = test_case_class(
|
||
endpoint_spec=endpoint_spec_dict,
|
||
global_api_spec=global_api_spec_dict
|
||
)
|
||
test_case_instance.logger.info(f"开始执行测试用例 '{test_case_instance.id}' for endpoint '{endpoint_spec_dict.get('method')} {endpoint_spec_dict.get('path')}'")
|
||
|
||
# 1. 请求构建阶段
|
||
initial_request_data = self._prepare_initial_request_data(endpoint_spec) # endpoint_spec 是原始对象
|
||
|
||
current_q_params = test_case_instance.generate_query_params(initial_request_data['query_params'])
|
||
current_headers = test_case_instance.generate_headers(initial_request_data['headers'])
|
||
current_body = test_case_instance.generate_request_body(initial_request_data['body'])
|
||
|
||
# 路径参数应该从 initial_request_data 中获取,因为 _prepare_initial_request_data 负责生成它们
|
||
current_path_params = initial_request_data['path_params']
|
||
|
||
# 构建最终请求URL,使用 current_path_params 进行替换
|
||
final_url = self.base_url + endpoint_spec_dict.get('path', '')
|
||
for p_name, p_val in current_path_params.items():
|
||
placeholder = f"{{{p_name}}}"
|
||
if placeholder in final_url:
|
||
final_url = final_url.replace(placeholder, str(p_val))
|
||
else:
|
||
self.logger.warning(f"路径参数 '{p_name}' 在路径模板 '{endpoint_spec_dict.get('path')}' 中未找到占位符,但为其生成了值。")
|
||
|
||
api_request_context = APIRequestContext(
|
||
method=endpoint_spec_dict.get('method', 'GET').upper(),
|
||
url=final_url,
|
||
path_params=current_path_params,
|
||
query_params=current_q_params,
|
||
headers=current_headers,
|
||
body=current_body,
|
||
endpoint_spec=endpoint_spec_dict
|
||
)
|
||
|
||
# 1.5. 请求预校验
|
||
validation_points.extend(test_case_instance.validate_request_url(api_request_context.url, api_request_context))
|
||
validation_points.extend(test_case_instance.validate_request_headers(api_request_context.headers, api_request_context))
|
||
validation_points.extend(test_case_instance.validate_request_body(api_request_context.body, api_request_context))
|
||
|
||
# 检查是否有严重预校验失败
|
||
critical_pre_validation_failure = False
|
||
failure_messages = []
|
||
for vp in validation_points:
|
||
if not vp.passed and test_case_instance.severity in [TestSeverity.CRITICAL, TestSeverity.HIGH]:
|
||
critical_pre_validation_failure = True
|
||
failure_messages.append(vp.message)
|
||
|
||
if critical_pre_validation_failure:
|
||
self.logger.warning(f"测试用例 '{test_case_instance.id}' 因请求预校验失败而中止 (严重级别: {test_case_instance.severity.value})。失败信息: {'; '.join(failure_messages)}")
|
||
tc_duration = time.time() - tc_start_time
|
||
return ExecutedTestCaseResult(
|
||
test_case_id=test_case_instance.id,
|
||
test_case_name=test_case_instance.name,
|
||
test_case_severity=test_case_instance.severity,
|
||
status=ExecutedTestCaseResult.Status.FAILED, # 预校验失败算作 FAILED
|
||
validation_points=validation_points,
|
||
message=f"请求预校验失败: {'; '.join(failure_messages)}",
|
||
duration=tc_duration
|
||
)
|
||
|
||
# ---- API 调用 ----
|
||
api_request_obj = APIRequest(
|
||
method=api_request_context.method,
|
||
url=api_request_context.url,
|
||
params=api_request_context.query_params,
|
||
headers=api_request_context.headers,
|
||
json_data=api_request_context.body # Assuming JSON, APICaller might need to handle other types
|
||
)
|
||
|
||
response_call_start_time = time.time()
|
||
api_response_obj = self.api_caller.call_api(api_request_obj)
|
||
response_call_elapsed_time = time.time() - response_call_start_time
|
||
|
||
# ---- 响应验证 ----
|
||
# 3. 创建 APIResponseContext
|
||
|
||
actual_text_content: Optional[str] = None
|
||
if hasattr(api_response_obj, 'text_content') and api_response_obj.text_content is not None: # 优先尝试直接获取
|
||
actual_text_content = api_response_obj.text_content
|
||
elif api_response_obj.json_content is not None:
|
||
if isinstance(api_response_obj.json_content, str):
|
||
actual_text_content = api_response_obj.json_content
|
||
else:
|
||
try:
|
||
actual_text_content = json.dumps(api_response_obj.json_content, ensure_ascii=False)
|
||
except TypeError:
|
||
actual_text_content = str(api_response_obj.json_content) # 最后手段
|
||
|
||
# elapsed_time: 使用 response_call_elapsed_time
|
||
# original_response: 设置为 None 因为 api_response_obj 没有 raw_response
|
||
|
||
api_response_context = APIResponseContext(
|
||
status_code=api_response_obj.status_code,
|
||
headers=api_response_obj.headers, # 假设这些直接在 api_response_obj 上
|
||
json_content=api_response_obj.json_content, # 这个根据之前的错误提示是存在的
|
||
text_content=actual_text_content,
|
||
elapsed_time=response_call_elapsed_time,
|
||
original_response=None, # api_response_obj 没有 .raw_response 属性
|
||
request_context=api_request_context
|
||
)
|
||
|
||
# 4. 执行响应验证和性能检查
|
||
validation_points.extend(test_case_instance.validate_response(api_response_context, api_request_context))
|
||
validation_points.extend(test_case_instance.check_performance(api_response_context, api_request_context))
|
||
|
||
# ---- 结果判定 ----
|
||
# 5. 判断此测试用例的最终状态
|
||
final_status = ExecutedTestCaseResult.Status.PASSED
|
||
if any(not vp.passed for vp in validation_points):
|
||
final_status = ExecutedTestCaseResult.Status.FAILED
|
||
|
||
tc_duration = time.time() - tc_start_time
|
||
return ExecutedTestCaseResult(
|
||
test_case_id=test_case_instance.id,
|
||
test_case_name=test_case_instance.name,
|
||
test_case_severity=test_case_instance.severity,
|
||
status=final_status,
|
||
validation_points=validation_points,
|
||
duration=tc_duration
|
||
)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"执行测试用例 '{test_case_class.id if test_case_instance else test_case_class.__name__}' 时发生严重错误: {e}", exc_info=True)
|
||
tc_duration = time.time() - tc_start_time
|
||
return ExecutedTestCaseResult(
|
||
test_case_id=test_case_instance.id if test_case_instance else test_case_class.id if hasattr(test_case_class, 'id') else "unknown_tc_id",
|
||
test_case_name=test_case_instance.name if test_case_instance else test_case_class.name if hasattr(test_case_class, 'name') else "Unknown Test Case Name",
|
||
test_case_severity=test_case_instance.severity if test_case_instance else TestSeverity.CRITICAL, # Default to critical on error
|
||
status=ExecutedTestCaseResult.Status.ERROR,
|
||
validation_points=validation_points, # 可能包含部分成功或失败的验证点
|
||
message=f"测试用例执行时发生内部错误: {str(e)}",
|
||
duration=tc_duration
|
||
)
|
||
|
||
def _prepare_initial_request_data(self, endpoint_spec: Union[YAPIEndpoint, SwaggerEndpoint]) -> Dict[str, Any]:
|
||
"""
|
||
根据端点规格准备一个初始的请求数据结构。
|
||
返回一个包含 'path_params', 'query_params', 'headers', 'body' 的字典。
|
||
"""
|
||
self.logger.debug(f"Preparing initial request data for: {endpoint_spec.method} {endpoint_spec.path}")
|
||
|
||
# path_params_spec: List[Dict] # 用于存储从Swagger等提取的路径参数定义
|
||
# query_params_spec: List[Dict]
|
||
# headers_spec: List[Dict]
|
||
# body_schema: Optional[Dict]
|
||
|
||
# 重置/初始化这些变量,以避免跨调用共享状态(如果 APITestOrchestrator 实例被重用)
|
||
path_params_spec_list: List[Dict[str, Any]] = []
|
||
query_params_spec_list: List[Dict[str, Any]] = []
|
||
headers_spec_list: List[Dict[str, Any]] = []
|
||
body_schema_dict: Optional[Dict[str, Any]] = None
|
||
|
||
path_str = getattr(endpoint_spec, 'path', '')
|
||
|
||
if isinstance(endpoint_spec, YAPIEndpoint):
|
||
query_params_spec_list = endpoint_spec.req_query or []
|
||
headers_spec_list = endpoint_spec.req_headers or []
|
||
# YAPI 的路径参数在 req_params 中,如果用户定义了的话
|
||
if endpoint_spec.req_params:
|
||
for p in endpoint_spec.req_params:
|
||
# YAPI的req_params可能混合了路径参数和查询参数,这里只关心路径中实际存在的
|
||
# 需要从 path_str 中解析出占位符,然后匹配 req_params 中的定义
|
||
# 简化:我们假设 req_params 中的条目如果其 name 在路径占位符中,则是路径参数
|
||
# 更好的做法是 YAPI 解析器能明确区分它们
|
||
pass # 下面会统一处理路径参数
|
||
|
||
if endpoint_spec.req_body_type == 'json' and endpoint_spec.req_body_other:
|
||
try:
|
||
body_schema_dict = json.loads(endpoint_spec.req_body_other) if isinstance(endpoint_spec.req_body_other, str) else endpoint_spec.req_body_other
|
||
except json.JSONDecodeError:
|
||
self.logger.warning(f"YAPI req_body_other for {path_str} is not valid JSON: {endpoint_spec.req_body_other}")
|
||
|
||
elif isinstance(endpoint_spec, SwaggerEndpoint):
|
||
if endpoint_spec.parameters:
|
||
for param_spec in endpoint_spec.parameters:
|
||
param_in = param_spec.get('in')
|
||
if param_in == 'path':
|
||
path_params_spec_list.append(param_spec)
|
||
elif param_in == 'query':
|
||
query_params_spec_list.append(param_spec)
|
||
elif param_in == 'header':
|
||
headers_spec_list.append(param_spec)
|
||
if endpoint_spec.request_body and 'content' in endpoint_spec.request_body:
|
||
json_content_spec = endpoint_spec.request_body['content'].get('application/json', {})
|
||
if 'schema' in json_content_spec:
|
||
body_schema_dict = json_content_spec['schema']
|
||
|
||
# --- 生成路径参数数据 ---
|
||
path_params_data: Dict[str, Any] = {}
|
||
import re
|
||
# 从路径字符串中提取所有占位符名称,例如 /users/{id}/items/{itemId} -> ["id", "itemId"]
|
||
path_param_names_in_url = re.findall(r'{(.*?)}', path_str)
|
||
|
||
for p_name in path_param_names_in_url:
|
||
found_spec = None
|
||
# 尝试从 Swagger 的 path_params_spec_list 查找详细定义
|
||
for spec in path_params_spec_list:
|
||
if spec.get('name') == p_name:
|
||
found_spec = spec
|
||
break
|
||
# 尝试从 YAPI 的 req_params (如果之前有解析并填充到类似 path_params_spec_list 的结构)
|
||
# (当前YAPI的req_params未直接用于填充path_params_spec_list, 需要改进InputParser或此处逻辑)
|
||
# TODO: YAPI的req_params需要更可靠地映射到路径参数
|
||
|
||
if found_spec and isinstance(found_spec, dict):
|
||
# 如果找到参数的详细规格 (例如来自Swagger)
|
||
value = found_spec.get('example')
|
||
if value is None and found_spec.get('schema'):
|
||
value = self._generate_data_from_schema(found_spec['schema'])
|
||
path_params_data[p_name] = value if value is not None else f"example_{p_name}" # Fallback
|
||
else:
|
||
# 如果没有详细规格,生成一个通用占位符值
|
||
path_params_data[p_name] = f"example_{p_name}"
|
||
self.logger.debug(f"Path param '{p_name}' generated value: {path_params_data[p_name]}")
|
||
|
||
# --- 生成查询参数数据 ---
|
||
query_params_data: Dict[str, Any] = {}
|
||
for q_param_spec in query_params_spec_list:
|
||
name = q_param_spec.get('name')
|
||
if name:
|
||
value = q_param_spec.get('example') # Swagger/OpenAPI style
|
||
if value is None and 'value' in q_param_spec: # YAPI style (value often holds example or default)
|
||
value = q_param_spec['value']
|
||
|
||
if value is None and q_param_spec.get('schema'): # Swagger/OpenAPI schema for param
|
||
value = self._generate_data_from_schema(q_param_spec['schema'])
|
||
elif value is None and q_param_spec.get('type'): # YAPI may define type directly
|
||
# Simplified schema generation for YAPI direct type if no 'value' field
|
||
value = self._generate_data_from_schema({'type': q_param_spec.get('type')})
|
||
|
||
query_params_data[name] = value if value is not None else f"example_query_{name}"
|
||
|
||
# --- 生成请求头数据 ---
|
||
headers_data: Dict[str, str] = {"Content-Type": "application/json", "Accept": "application/json"}
|
||
for h_param_spec in headers_spec_list:
|
||
name = h_param_spec.get('name')
|
||
if name and name.lower() not in ['content-type', 'accept']: # 不要覆盖基础的Content-Type/Accept,除非明确
|
||
value = h_param_spec.get('example')
|
||
if value is None and 'value' in h_param_spec: # YAPI
|
||
value = h_param_spec['value']
|
||
|
||
if value is None and h_param_spec.get('schema'): # Swagger
|
||
value = self._generate_data_from_schema(h_param_spec['schema'])
|
||
elif value is None and h_param_spec.get('type'): # YAPI
|
||
value = self._generate_data_from_schema({'type': h_param_spec.get('type')})
|
||
|
||
if value is not None:
|
||
headers_data[name] = str(value)
|
||
else:
|
||
headers_data[name] = f"example_header_{name}"
|
||
|
||
# --- 生成请求体数据 ---
|
||
body_data: Optional[Any] = None
|
||
if body_schema_dict:
|
||
body_data = self._generate_data_from_schema(body_schema_dict)
|
||
|
||
return {
|
||
"path_params": path_params_data,
|
||
"query_params": query_params_data,
|
||
"headers": headers_data,
|
||
"body": body_data
|
||
}
|
||
|
||
def run_test_for_endpoint(self, endpoint: Union[YAPIEndpoint, SwaggerEndpoint],
|
||
global_api_spec: Union[ParsedYAPISpec, ParsedSwaggerSpec] # 新增参数
|
||
) -> TestResult: # 返回类型更新为新的TestResult (EndpointExecutionResult)
|
||
"""
|
||
运行单个API端点的所有适用测试用例。
|
||
"""
|
||
endpoint_id = f"{getattr(endpoint, 'method', 'GET').upper()} {getattr(endpoint, 'path', '/')}"
|
||
endpoint_name = getattr(endpoint, 'title', '') or getattr(endpoint, 'summary', '') or endpoint_id
|
||
|
||
self.logger.info(f"开始为端点测试: {endpoint_id} ({endpoint_name})")
|
||
|
||
# 使用新的TestResult结构 (它现在代表 EndpointExecutionResult)
|
||
endpoint_test_result = TestResult( # 这是新的 TestResult
|
||
endpoint_id=endpoint_id,
|
||
endpoint_name=endpoint_name,
|
||
# api_spec_details=endpoint.to_dict() if hasattr(endpoint, 'to_dict') else endpoint # 可选
|
||
)
|
||
|
||
if not self.test_case_registry:
|
||
self.logger.warning(f"TestCaseRegistry 未初始化,无法为端点 '{endpoint_id}' 执行自定义测试用例。")
|
||
# TODO: 决定此时的行为,是跳过,还是执行旧的规则引擎(如果保留),或者标记为错误。
|
||
# 简化:如果只想运行新的测试用例,那么这里就直接结束此端点的测试。
|
||
endpoint_test_result.overall_status = TestResult.Status.SKIPPED # 或者 ERROR
|
||
endpoint_test_result.error_message = "TestCaseRegistry 未初始化。"
|
||
endpoint_test_result.finalize_endpoint_test() # 计算持续时间等
|
||
return endpoint_test_result
|
||
|
||
applicable_test_case_classes = self.test_case_registry.get_applicable_test_cases(
|
||
endpoint_method=endpoint.method.upper(),
|
||
endpoint_path=endpoint.path
|
||
)
|
||
|
||
if not applicable_test_case_classes:
|
||
self.logger.info(f"端点 '{endpoint_id}' 没有找到适用的自定义测试用例。")
|
||
# 同样,决定行为。如果只依赖自定义测试用例,则此端点可能算作 SKIPPED 或某种形式的通过/信息。
|
||
# endpoint_test_result.overall_status = TestResult.Status.SKIPPED # 或 INFO / PASSED_NO_CASES
|
||
# endpoint_test_result.message = "没有适用的自定义测试用例。"
|
||
endpoint_test_result.finalize_endpoint_test() # 会将状态设置为ERROR并附带消息
|
||
return endpoint_test_result
|
||
|
||
self.logger.info(f"端点 '{endpoint_id}' 发现了 {len(applicable_test_case_classes)} 个适用的测试用例: {[tc.id for tc in applicable_test_case_classes]}")
|
||
|
||
for tc_class in applicable_test_case_classes:
|
||
self.logger.debug(f"准备执行测试用例 '{tc_class.id}' for '{endpoint_id}'")
|
||
executed_case_result = self._execute_single_test_case(
|
||
test_case_class=tc_class,
|
||
endpoint_spec=endpoint,
|
||
global_api_spec=global_api_spec
|
||
)
|
||
endpoint_test_result.add_executed_test_case_result(executed_case_result)
|
||
self.logger.debug(f"测试用例 '{tc_class.id}' 执行完毕,状态: {executed_case_result.status.value}")
|
||
|
||
# 所有测试用例执行完毕后,最终确定此端点的状态
|
||
endpoint_test_result.finalize_endpoint_test()
|
||
self.logger.info(f"端点 '{endpoint_id}' 测试完成,最终状态: {endpoint_test_result.overall_status.value}")
|
||
|
||
# 旧的规则引擎逻辑 (self.rule_executor) 可以选择性地在这里调用,
|
||
# 或者完全被新的 APITestCase 机制取代。
|
||
# 如果要保留,需要决定它如何与新的结果结构集成。
|
||
# 目前,为了清晰和逐步迁移,我们假设主要依赖新的 APITestCase。
|
||
|
||
return endpoint_test_result
|
||
|
||
def run_tests_from_yapi(self, yapi_file_path: str,
|
||
categories: Optional[List[str]] = None,
|
||
custom_test_cases_dir: Optional[str] = None # 新增参数
|
||
) -> TestSummary:
|
||
"""
|
||
从YAPI定义文件运行API测试
|
||
|
||
Args:
|
||
yapi_file_path: YAPI定义文件路径
|
||
categories: 要测试的API分类列表(如果为None,则测试所有分类)
|
||
custom_test_cases_dir: 自定义测试用例的目录。如果 Orchestrator 初始化时已提供,则此参数可选。
|
||
如果 Orchestrator 未提供,则必须在此处提供以加载测试用例。
|
||
如果 Orchestrator 初始化和此处都提供了,此处的优先。
|
||
|
||
Returns:
|
||
TestSummary: 测试结果摘要
|
||
"""
|
||
# 如果调用时传入了 custom_test_cases_dir,则重新初始化/更新 TestCaseRegistry
|
||
if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir):
|
||
self.logger.info(f"从 run_tests_from_yapi 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}")
|
||
try:
|
||
self.test_case_registry = TestCaseRegistry(test_cases_dir=custom_test_cases_dir)
|
||
self.logger.info(f"TestCaseRegistry (re)initialization complete, found {len(self.test_case_registry.get_all_test_case_classes())} test case classes.")
|
||
except Exception as e:
|
||
self.logger.error(f"从 run_tests_from_yapi 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True)
|
||
# 决定是中止还是继续(可能不运行自定义测试)
|
||
# For now, if it fails here, it might proceed without custom tests if registry becomes None
|
||
|
||
|
||
self.logger.info(f"从YAPI文件加载API定义: {yapi_file_path}")
|
||
parsed_yapi = self.parser.parse_yapi_spec(yapi_file_path)
|
||
|
||
summary = TestSummary() # 使用新的 TestSummary
|
||
|
||
if not parsed_yapi:
|
||
self.logger.error(f"解析YAPI文件失败: {yapi_file_path}")
|
||
summary.finalize_summary()
|
||
return summary
|
||
|
||
endpoints_to_test = parsed_yapi.endpoints
|
||
if categories:
|
||
endpoints_to_test = [ep for ep in endpoints_to_test if ep.category_name in categories]
|
||
|
||
summary.set_total_endpoints_defined(len(endpoints_to_test))
|
||
|
||
# 计算总的适用测试用例数量 (粗略估计,实际执行时可能会因内部逻辑跳过)
|
||
total_applicable_tcs = 0
|
||
if self.test_case_registry:
|
||
for endpoint_spec in endpoints_to_test:
|
||
total_applicable_tcs += len(
|
||
self.test_case_registry.get_applicable_test_cases(
|
||
endpoint_spec.method.upper(), endpoint_spec.path
|
||
)
|
||
)
|
||
summary.set_total_test_cases_applicable(total_applicable_tcs)
|
||
|
||
|
||
for endpoint in endpoints_to_test:
|
||
# 将完整的 parsed_yapi 作为 global_api_spec 传递
|
||
result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_yapi)
|
||
summary.add_endpoint_result(result) # 使用新的 TestSummary 方法
|
||
|
||
summary.finalize_summary() # 使用新的 TestSummary 方法
|
||
return summary
|
||
|
||
def run_tests_from_swagger(self, swagger_file_path: str,
|
||
tags: Optional[List[str]] = None,
|
||
custom_test_cases_dir: Optional[str] = None # 新增参数
|
||
) -> TestSummary:
|
||
"""
|
||
从Swagger定义文件运行API测试
|
||
|
||
Args:
|
||
swagger_file_path: Swagger定义文件路径
|
||
tags: 要测试的API标签列表(如果为None,则测试所有标签)
|
||
custom_test_cases_dir: 自定义测试用例的目录。 (逻辑同 yapi 方法)
|
||
|
||
Returns:
|
||
TestSummary: 测试结果摘要
|
||
"""
|
||
if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir):
|
||
self.logger.info(f"从 run_tests_from_swagger 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}")
|
||
try:
|
||
self.test_case_registry = TestCaseRegistry(test_cases_dir=custom_test_cases_dir)
|
||
self.logger.info(f"TestCaseRegistry (re)initialization complete, found {len(self.test_case_registry.get_all_test_case_classes())} test case classes.")
|
||
except Exception as e:
|
||
self.logger.error(f"从 run_tests_from_swagger 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True)
|
||
|
||
|
||
self.logger.info(f"从Swagger文件加载API定义: {swagger_file_path}")
|
||
parsed_swagger = self.parser.parse_swagger_spec(swagger_file_path)
|
||
|
||
summary = TestSummary() # 使用新的 TestSummary
|
||
|
||
if not parsed_swagger:
|
||
self.logger.error(f"解析Swagger文件失败: {swagger_file_path}")
|
||
summary.finalize_summary()
|
||
return summary
|
||
|
||
endpoints_to_test = parsed_swagger.endpoints
|
||
if tags:
|
||
endpoints_to_test = [ep for ep in endpoints_to_test if any(tag in ep.tags for tag in tags)]
|
||
|
||
summary.set_total_endpoints_defined(len(endpoints_to_test))
|
||
|
||
total_applicable_tcs = 0
|
||
if self.test_case_registry:
|
||
for endpoint_spec in endpoints_to_test:
|
||
total_applicable_tcs += len(
|
||
self.test_case_registry.get_applicable_test_cases(
|
||
endpoint_spec.method.upper(), endpoint_spec.path
|
||
)
|
||
)
|
||
summary.set_total_test_cases_applicable(total_applicable_tcs)
|
||
|
||
for endpoint in endpoints_to_test:
|
||
# 将完整的 parsed_swagger 作为 global_api_spec 传递
|
||
result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_swagger)
|
||
summary.add_endpoint_result(result) # 使用新的 TestSummary 方法
|
||
|
||
summary.finalize_summary() # 使用新的 TestSummary 方法
|
||
return summary
|
||
|
||
def _generate_data_from_schema(self, schema: Dict[str, Any]) -> Any:
|
||
"""
|
||
根据JSON Schema生成测试数据 (此方法基本保持不变,可能被测试用例或编排器内部使用)
|
||
|
||
Args:
|
||
schema: JSON Schema
|
||
|
||
Returns:
|
||
生成的测试数据
|
||
"""
|
||
if not schema or not isinstance(schema, dict): # 添加检查 schema 是否为 dict
|
||
self.logger.debug(f"_generate_data_from_schema: 提供的 schema 无效或为空: {schema}")
|
||
return None
|
||
|
||
schema_type = schema.get('type')
|
||
|
||
# 优先使用 example 或 default
|
||
if 'example' in schema:
|
||
return schema['example']
|
||
if 'default' in schema:
|
||
return schema['default']
|
||
|
||
if schema_type == 'object':
|
||
# ... (内容与旧版本相同,此处省略以便简洁) ...
|
||
result = {}
|
||
properties = schema.get('properties', {})
|
||
required_fields = schema.get('required', [])
|
||
|
||
for prop_name, prop_schema in properties.items():
|
||
# 如果字段是必需的,或者我们想为所有字段生成值
|
||
# 为了生成更完整的请求体,我们通常会为所有定义的属性生成值
|
||
# if prop_name in required_fields or True: # 改为总是尝试生成
|
||
result[prop_name] = self._generate_data_from_schema(prop_schema)
|
||
|
||
# 确保所有必需字段都有值,即使它们在 properties 中没有 schema(不常见,但可能)
|
||
# for req_field in required_fields:
|
||
# if req_field not in result:
|
||
# result[req_field] = "example_required_value" # 或 None
|
||
return result if result else {} # 确保返回字典
|
||
|
||
elif schema_type == 'array':
|
||
items_schema = schema.get('items', {})
|
||
# 尝试生成一个或多个项,可以使用 minItems/maxItems (简化:生成一项)
|
||
min_items = schema.get('minItems', 1 if schema.get('default') is None and schema.get('example') is None else 0) # 如果有默认或示例空数组,则可以为0
|
||
if min_items == 0 and (schema.get('default') == [] or schema.get('example') == []):
|
||
return []
|
||
|
||
num_items_to_generate = max(1, min_items) # 至少生成一项,除非minItems显式为0且无内容
|
||
|
||
generated_array = [self._generate_data_from_schema(items_schema) for _ in range(num_items_to_generate)]
|
||
# 过滤掉生成失败的 None 值,除非 schema 允许 null
|
||
# if items_schema.get('type') != 'null' and not ('null' in items_schema.get('type', []) if isinstance(items_schema.get('type'), list) else False):
|
||
# generated_array = [item for item in generated_array if item is not None]
|
||
return generated_array
|
||
|
||
elif schema_type == 'string':
|
||
string_format = schema.get('format', '')
|
||
if 'enum' in schema and schema['enum']: # 确保 enum 非空
|
||
return schema['enum'][0]
|
||
|
||
# ... (其他格式处理与旧版类似) ...
|
||
if string_format == 'date': return '2023-01-01'
|
||
if string_format == 'date-time': return datetime.datetime.now().isoformat()
|
||
if string_format == 'email': return 'test@example.com'
|
||
if string_format == 'uuid': import uuid; return str(uuid.uuid4())
|
||
# pattern, minLength, maxLength 等可以进一步细化
|
||
return schema.get('default', schema.get('example', 'example_string'))
|
||
|
||
elif schema_type == 'number' or schema_type == 'integer':
|
||
# ... (与旧版类似,优先 default/example) ...
|
||
val = schema.get('default', schema.get('example'))
|
||
if val is not None: return val
|
||
|
||
minimum = schema.get('minimum')
|
||
maximum = schema.get('maximum')
|
||
if minimum is not None: return minimum
|
||
if maximum is not None: return maximum # (如果只有max,可能需要调整)
|
||
return 0 if schema_type == 'integer' else 0.0
|
||
|
||
elif schema_type == 'boolean':
|
||
return schema.get('default', schema.get('example', False)) # 默认为 False
|
||
|
||
elif schema_type == 'null':
|
||
return None
|
||
|
||
self.logger.debug(f"_generate_data_from_schema: 未知或不支持的 schema 类型 '{schema_type}' for schema: {schema}")
|
||
return None # 对于未知类型,返回None
|
||
|
||
# ... (旧的 _build_api_request 和 _validate_response 基本可以移除了,因为它们的功能被新的流程覆盖) ...
|
||
# 确保删除或注释掉旧的 `_build_api_request` 和 `_validate_response` 方法,
|
||
# 因为它们的功能现在被 `_execute_single_test_case` 和 `_prepare_initial_request_data` 中的逻辑所取代或整合。
|
||
|
||
# python run_api_tests.py --base-url http://127.0.0.1:4523/m1/6386850-6083489-default --yapi assets/doc/井筒API示例.json --custom-test-cases-dir ./custom_testcases
|
||
# (示例命令行调用,需要更新以匹配新的参数)
|
||
|