""" 测试编排器模块 负责组合API解析器、API调用器、验证器和规则执行器,进行端到端的API测试 """ import logging import json import time from typing import Dict, List, Any, Optional, Union, Tuple, Type from enum import Enum import datetime from .input_parser.parser import InputParser, YAPIEndpoint, SwaggerEndpoint, ParsedYAPISpec, ParsedSwaggerSpec from .api_caller.caller import APICaller, APIRequest, APIResponse from .json_schema_validator.validator import JSONSchemaValidator from .test_framework_core import ValidationResult, TestSeverity, APIRequestContext, APIResponseContext, BaseAPITestCase from .test_case_registry import TestCaseRegistry # 尝试导入 LLMService,如果失败则允许,因为 LLM 功能是可选的 try: from .llm_utils.llm_service import LLMService except ImportError: LLMService = None logging.getLogger(__name__).info("LLMService 未找到,LLM 相关功能将不可用。") class ExecutedTestCaseResult: """存储单个APITestCase在其适用的端点上执行后的结果。""" class Status(str, Enum): """单个测试用例的执行状态枚举""" PASSED = "通过" FAILED = "失败" ERROR = "执行错误" # 指测试用例代码本身出错,而不是API验证失败 SKIPPED = "跳过" # 如果测试用例因某些条件被跳过执行 def __init__(self, test_case_id: str, test_case_name: str, test_case_severity: TestSeverity, status: Status, validation_points: List[ValidationResult], message: str = "", # 总体消息,例如执行错误时的错误信息 duration: float = 0.0): self.test_case_id = test_case_id self.test_case_name = test_case_name self.test_case_severity = test_case_severity self.status = status self.validation_points = validation_points or [] self.message = message self.duration = duration # 执行此测试用例的耗时 self.timestamp = datetime.datetime.now() def to_dict(self) -> Dict[str, Any]: return { "test_case_id": self.test_case_id, "test_case_name": self.test_case_name, "test_case_severity": self.test_case_severity.value, # 使用枚举值 "status": self.status.value, "message": self.message, "duration_seconds": self.duration, "timestamp": self.timestamp.isoformat(), "validation_points": [vp.details if vp.details else {"passed": vp.passed, "message": vp.message} for vp in self.validation_points] } class TestResult: # 原来的 TestResult 被重构为 EndpointExecutionResult """ 存储对单个API端点执行所有适用APITestCase后的整体测试结果。 (此类替换了旧的 TestResult 的角色,并进行了结构调整) """ class Status(str, Enum): # 这个枚举保持不变,但其含义现在是端点的整体状态 """端点测试状态枚举""" PASSED = "通过" # 所有关键测试用例通过 FAILED = "失败" # 任何一个关键测试用例失败 ERROR = "错误" # 测试执行过程中出现错误(非API本身错误,而是测试代码或环境) SKIPPED = "跳过" # 如果整个端点的测试被跳过 PARTIAL_SUCCESS = "部分成功" # 一些非关键测试用例失败,但关键的通过 def __init__(self, endpoint_id: str, # 通常是 method + path endpoint_name: str, # API 的可读名称/标题 # api_spec_details: Dict[str, Any], # 包含该端点从YAPI/Swagger解析的原始信息,可选 overall_status: Status = Status.SKIPPED, # 默认为跳过,后续根据测试用例结果更新 start_time: Optional[datetime.datetime] = None ): self.endpoint_id = endpoint_id self.endpoint_name = endpoint_name # self.api_spec_details = api_spec_details self.overall_status = overall_status self.executed_test_cases: List[ExecutedTestCaseResult] = [] self.start_time = start_time if start_time else datetime.datetime.now() self.end_time: Optional[datetime.datetime] = None self.error_message: Optional[str] = None # 如果整个端点测试出错,记录错误信息 def add_executed_test_case_result(self, result: ExecutedTestCaseResult): self.executed_test_cases.append(result) def finalize_endpoint_test(self): self.end_time = datetime.datetime.now() # 根据所有 executed_test_cases 的状态和严重性来计算 overall_status if not self.executed_test_cases and self.overall_status == TestResult.Status.SKIPPED : # 如果没有执行任何测试用例且状态仍为初始的SKIPPED pass # 保持 SKIPPED elif any(tc.status == ExecutedTestCaseResult.Status.ERROR for tc in self.executed_test_cases): self.overall_status = TestResult.Status.ERROR # 可以考虑将第一个遇到的ERROR的message赋给self.error_message first_error = next((tc.message for tc in self.executed_test_cases if tc.status == ExecutedTestCaseResult.Status.ERROR), None) if first_error: self.error_message = f"测试用例执行错误: {first_error}" else: # 筛选出失败的测试用例 failed_tcs = [tc for tc in self.executed_test_cases if tc.status == ExecutedTestCaseResult.Status.FAILED] if not failed_tcs: if not self.executed_test_cases: # 如果没有执行任何测试用例但又不是SKIPPED,可能也算某种形式的错误或特殊通过 self.overall_status = TestResult.Status.PASSED # 或者定义一个"NO_CASES_RUN"状态 else: self.overall_status = TestResult.Status.PASSED else: # 检查失败的测试用例中是否有CRITICAL或HIGH严重级别的 if any(tc.test_case_severity in [TestSeverity.CRITICAL, TestSeverity.HIGH] for tc in failed_tcs): self.overall_status = TestResult.Status.FAILED else: # 所有失败的都是 MEDIUM, LOW, INFO self.overall_status = TestResult.Status.PARTIAL_SUCCESS if not self.executed_test_cases and self.overall_status not in [TestResult.Status.SKIPPED, TestResult.Status.ERROR]: # 如果没有执行测试用例,并且不是因为错误或明确跳过,这可能是一个配置问题或意外情况 self.overall_status = TestResult.Status.ERROR # 或者一个更特定的状态 self.error_message = "没有为该端点找到或执行任何适用的测试用例。" @property def duration(self) -> float: if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return 0.0 def to_dict(self) -> Dict[str, Any]: data = { "endpoint_id": self.endpoint_id, "endpoint_name": self.endpoint_name, "overall_status": self.overall_status.value, "duration_seconds": self.duration, "start_time": self.start_time.isoformat() if self.start_time else None, "end_time": self.end_time.isoformat() if self.end_time else None, "executed_test_cases": [tc.to_dict() for tc in self.executed_test_cases] } if self.error_message: data["error_message"] = self.error_message return data class TestSummary: """测试结果摘要 (已更新以适应新的结果结构)""" def __init__(self): self.total_endpoints_defined: int = 0 # YAPI/Swagger中定义的端点总数 self.total_endpoints_tested: int = 0 # 实际执行了测试的端点数量 (至少有一个测试用例被执行) self.endpoints_passed: int = 0 self.endpoints_failed: int = 0 self.endpoints_partial_success: int = 0 self.endpoints_error: int = 0 self.endpoints_skipped: int = 0 # 由于配置或过滤器,整个端点被跳过测试 self.total_test_cases_applicable: int = 0 # 所有端点上适用测试用例的总和 self.total_test_cases_executed: int = 0 # 所有端点上实际执行的测试用例总数 self.test_cases_passed: int = 0 self.test_cases_failed: int = 0 self.test_cases_error: int = 0 # 测试用例代码本身出错 self.test_cases_skipped_in_endpoint: int = 0 # 测试用例在端点执行中被跳过 self.start_time = datetime.datetime.now() self.end_time: Optional[datetime.datetime] = None self.detailed_results: List[TestResult] = [] # 将存储新的 TestResult (EndpointExecutionResult) 对象 def add_endpoint_result(self, result: TestResult): # result 现在是新的 TestResult 类型 self.detailed_results.append(result) if result.executed_test_cases or result.overall_status not in [TestResult.Status.SKIPPED, TestResult.Status.ERROR]: # 只有实际尝试了测试的端点才算tested if not (len(result.executed_test_cases) == 0 and result.overall_status == TestResult.Status.ERROR and result.error_message and "没有为该端点找到或执行任何适用的测试用例" in result.error_message): self.total_endpoints_tested +=1 if result.overall_status == TestResult.Status.PASSED: self.endpoints_passed += 1 elif result.overall_status == TestResult.Status.FAILED: self.endpoints_failed += 1 elif result.overall_status == TestResult.Status.PARTIAL_SUCCESS: self.endpoints_partial_success +=1 elif result.overall_status == TestResult.Status.ERROR: self.endpoints_error += 1 elif result.overall_status == TestResult.Status.SKIPPED: # 端点级别跳过 self.endpoints_skipped +=1 for tc_result in result.executed_test_cases: self.total_test_cases_executed += 1 # 每个APITestCase算一次执行 if tc_result.status == ExecutedTestCaseResult.Status.PASSED: self.test_cases_passed += 1 elif tc_result.status == ExecutedTestCaseResult.Status.FAILED: self.test_cases_failed += 1 elif tc_result.status == ExecutedTestCaseResult.Status.ERROR: self.test_cases_error +=1 elif tc_result.status == ExecutedTestCaseResult.Status.SKIPPED: self.test_cases_skipped_in_endpoint +=1 def set_total_endpoints_defined(self, count: int): self.total_endpoints_defined = count def set_total_test_cases_applicable(self, count: int): self.total_test_cases_applicable = count def finalize_summary(self): self.end_time = datetime.datetime.now() @property def duration(self) -> float: if not self.end_time: return 0.0 return (self.end_time - self.start_time).total_seconds() @property def endpoint_success_rate(self) -> float: if self.total_endpoints_tested == 0: return 0.0 # 通常只把 PASSED 算作成功 return (self.endpoints_passed / self.total_endpoints_tested) * 100 @property def test_case_success_rate(self) -> float: if self.total_test_cases_executed == 0: return 0.0 return (self.test_cases_passed / self.total_test_cases_executed) * 100 def to_dict(self) -> Dict[str, Any]: return { "summary_metadata": { "start_time": self.start_time.isoformat(), "end_time": self.end_time.isoformat() if self.end_time else None, "duration_seconds": f"{self.duration:.2f}", }, "endpoint_stats": { "total_defined": self.total_endpoints_defined, "total_tested": self.total_endpoints_tested, "passed": self.endpoints_passed, "failed": self.endpoints_failed, "partial_success": self.endpoints_partial_success, "error": self.endpoints_error, "skipped": self.endpoints_skipped, "success_rate_percentage": f"{self.endpoint_success_rate:.2f}", }, "test_case_stats": { "total_applicable": self.total_test_cases_applicable, # 计划执行的测试用例总数 "total_executed": self.total_test_cases_executed, # 实际执行的测试用例总数 "passed": self.test_cases_passed, "failed": self.test_cases_failed, "error_in_execution": self.test_cases_error, "skipped_during_endpoint_execution": self.test_cases_skipped_in_endpoint, "success_rate_percentage": f"{self.test_case_success_rate:.2f}", }, "detailed_results": [result.to_dict() for result in self.detailed_results] } def to_json(self, pretty=True) -> str: indent = 2 if pretty else None return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False) def print_summary_to_console(self): # Renamed from print_summary # (Implementation can be more detailed based on the new stats) print("\\n===== 测试运行摘要 =====") print(f"开始时间: {self.start_time.isoformat()}") if self.end_time: print(f"结束时间: {self.end_time.isoformat()}") print(f"总耗时: {self.duration:.2f} 秒") print("\\n--- 端点统计 ---") print(f"定义的端点总数: {self.total_endpoints_defined}") print(f"实际测试的端点数: {self.total_endpoints_tested}") print(f" 通过: {self.endpoints_passed}") print(f" 失败: {self.endpoints_failed}") print(f" 部分成功: {self.endpoints_partial_success}") print(f" 执行错误: {self.endpoints_error}") print(f" 跳过执行: {self.endpoints_skipped}") print(f" 端点通过率: {self.endpoint_success_rate:.2f}%") print("\\n--- 测试用例统计 ---") print(f"适用的测试用例总数 (计划执行): {self.total_test_cases_applicable}") print(f"实际执行的测试用例总数: {self.total_test_cases_executed}") print(f" 通过: {self.test_cases_passed}") print(f" 失败: {self.test_cases_failed}") print(f" 执行错误 (测试用例代码问题): {self.test_cases_error}") print(f" 跳过 (在端点内被跳过): {self.test_cases_skipped_in_endpoint}") print(f" 测试用例通过率: {self.test_case_success_rate:.2f}%") # 可选:打印失败的端点和测试用例摘要 failed_endpoints = [res for res in self.detailed_results if res.overall_status == TestResult.Status.FAILED] if failed_endpoints: print("\\n--- 失败的端点摘要 ---") for ep_res in failed_endpoints: print(f" 端点: {ep_res.endpoint_id} ({ep_res.endpoint_name}) - 状态: {ep_res.overall_status.value}") for tc_res in ep_res.executed_test_cases: if tc_res.status == ExecutedTestCaseResult.Status.FAILED: print(f" - 测试用例失败: {tc_res.test_case_id} ({tc_res.test_case_name})") for vp in tc_res.validation_points: if not vp.passed: print(f" - 验证点: {vp.message}") class APITestOrchestrator: """API测试编排器""" def __init__(self, base_url: str, custom_test_cases_dir: Optional[str] = None, # 新的自定义测试用例目录路径 llm_api_key: Optional[str] = None, llm_base_url: Optional[str] = None, llm_model_name: Optional[str] = None, use_llm_for_request_body: bool = False ): """ 初始化API测试编排器 Args: base_url: API基础URL custom_test_cases_dir: 存放自定义 APITestCase 的目录路径。如果为 None,则不加载自定义测试用例。 llm_api_key: 大模型服务的API Key。 llm_base_url: 大模型服务的兼容OpenAI的基础URL。 llm_model_name: 要使用的具体模型名称。 use_llm_for_request_body: 是否使用LLM生成请求体,默认为False。 """ self.base_url = base_url.rstrip('/') self.logger = logging.getLogger(__name__) # 初始化组件 self.parser = InputParser() self.api_caller = APICaller() self.validator = JSONSchemaValidator() # JSON Schema 验证器,可能会被测试用例内部使用 # 初始化 (新) 测试用例注册表 self.test_case_registry: Optional[TestCaseRegistry] = None if custom_test_cases_dir: self.logger.info(f"初始化 TestCaseRegistry,扫描目录: {custom_test_cases_dir}") try: self.test_case_registry = TestCaseRegistry(test_cases_dir=custom_test_cases_dir) self.logger.info(f"TestCaseRegistry 初始化完成,发现 {len(self.test_case_registry.get_all_test_case_classes())} 个测试用例类。") except Exception as e: self.logger.error(f"初始化 TestCaseRegistry 失败: {e}", exc_info=True) else: self.logger.info("未提供 custom_test_cases_dir,不加载自定义 APITestCase。") # 初始化 LLM 服务 (如果配置了) self.llm_service: Optional[LLMService] = None self.use_llm_for_request_body = use_llm_for_request_body if LLMService is None: # 检查导入是否成功 self.logger.warning("LLMService 类未能导入,LLM 相关功能将完全禁用。") self.use_llm_for_request_body = False # 强制禁用 elif self.use_llm_for_request_body: # 只有当用户希望使用且类已导入时才尝试初始化 if llm_api_key and llm_base_url and llm_model_name: try: self.llm_service = LLMService( api_key=llm_api_key, base_url=llm_base_url, model_name=llm_model_name ) self.logger.info(f"LLMService 已成功初始化,模型: {llm_model_name}。将尝试使用LLM生成请求体。") except ValueError as ve: # LLMService init might raise ValueError for bad args self.logger.error(f"LLMService 初始化失败 (参数错误): {ve}。将回退到非LLM请求体生成。") self.llm_service = None self.use_llm_for_request_body = False # 初始化失败,禁用LLM使用 except Exception as e: self.logger.error(f"LLMService 初始化时发生未知错误: {e}。将回退到非LLM请求体生成。", exc_info=True) self.llm_service = None self.use_llm_for_request_body = False # 初始化失败,禁用LLM使用 else: self.logger.warning("希望使用LLM生成请求体,但未提供完整的LLM配置 (api_key, base_url, model_name)。将回退到非LLM请求体生成。") self.use_llm_for_request_body = False # 配置不全,禁用LLM使用 elif not self.use_llm_for_request_body: self.logger.info("配置为不使用LLM生成请求体。") def _execute_single_test_case( self, test_case_class: Type[BaseAPITestCase], endpoint_spec: Union[YAPIEndpoint, SwaggerEndpoint], # 当前端点的规格 global_api_spec: Union[ParsedYAPISpec, ParsedSwaggerSpec] # 整个API的规格 ) -> ExecutedTestCaseResult: """ 实例化并执行单个APITestCase。 """ tc_start_time = time.time() validation_points: List[ValidationResult] = [] test_case_instance: Optional[BaseAPITestCase] = None # 准备 endpoint_spec_dict endpoint_spec_dict: Dict[str, Any] if hasattr(endpoint_spec, 'to_dict') and callable(endpoint_spec.to_dict): endpoint_spec_dict = endpoint_spec.to_dict() elif isinstance(endpoint_spec, (YAPIEndpoint, SwaggerEndpoint)): endpoint_spec_dict = { "method": getattr(endpoint_spec, 'method', 'UNKNOWN_METHOD'), "path": getattr(endpoint_spec, 'path', 'UNKNOWN_PATH'), "title": getattr(endpoint_spec, 'title', ''), "summary": getattr(endpoint_spec, 'summary', ''), "_original_object_type": type(endpoint_spec).__name__ } if isinstance(endpoint_spec, YAPIEndpoint): for attr_name in dir(endpoint_spec): if not attr_name.startswith('_') and not callable(getattr(endpoint_spec, attr_name)): try: # Test serializability before adding json.dumps({attr_name: getattr(endpoint_spec, attr_name)}) endpoint_spec_dict[attr_name] = getattr(endpoint_spec, attr_name) except (TypeError, OverflowError): pass elif isinstance(endpoint_spec, SwaggerEndpoint): if hasattr(endpoint_spec, 'parameters'): endpoint_spec_dict['parameters'] = endpoint_spec.parameters if hasattr(endpoint_spec, 'request_body'): endpoint_spec_dict['request_body'] = endpoint_spec.request_body if hasattr(endpoint_spec, 'responses'): endpoint_spec_dict['responses'] = endpoint_spec.responses else: endpoint_spec_dict = endpoint_spec if isinstance(endpoint_spec, dict) else {} if not endpoint_spec_dict: self.logger.warning(f"endpoint_spec 无法转换为字典,实际类型: {type(endpoint_spec)}") global_api_spec_dict: Dict[str, Any] if hasattr(global_api_spec, 'to_dict') and callable(global_api_spec.to_dict): global_api_spec_dict = global_api_spec.to_dict() else: global_api_spec_dict = global_api_spec if isinstance(global_api_spec, dict) else {} if not global_api_spec_dict: self.logger.warning(f"global_api_spec 无法转换为字典,实际类型: {type(global_api_spec)}") try: test_case_instance = test_case_class( endpoint_spec=endpoint_spec_dict, global_api_spec=global_api_spec_dict ) test_case_instance.logger.info(f"开始执行测试用例 '{test_case_instance.id}' for endpoint '{endpoint_spec_dict.get('method')} {endpoint_spec_dict.get('path')}'") # 1. 请求构建阶段 initial_request_data = self._prepare_initial_request_data(endpoint_spec) # endpoint_spec 是原始对象 current_q_params = test_case_instance.generate_query_params(initial_request_data['query_params']) current_headers = test_case_instance.generate_headers(initial_request_data['headers']) current_body = test_case_instance.generate_request_body(initial_request_data['body']) # 路径参数应该从 initial_request_data 中获取,因为 _prepare_initial_request_data 负责生成它们 current_path_params = initial_request_data['path_params'] # 构建最终请求URL,使用 current_path_params 进行替换 final_url = self.base_url + endpoint_spec_dict.get('path', '') for p_name, p_val in current_path_params.items(): placeholder = f"{{{p_name}}}" if placeholder in final_url: final_url = final_url.replace(placeholder, str(p_val)) else: self.logger.warning(f"路径参数 '{p_name}' 在路径模板 '{endpoint_spec_dict.get('path')}' 中未找到占位符,但为其生成了值。") api_request_context = APIRequestContext( method=endpoint_spec_dict.get('method', 'GET').upper(), url=final_url, path_params=current_path_params, query_params=current_q_params, headers=current_headers, body=current_body, endpoint_spec=endpoint_spec_dict ) # 1.5. 请求预校验 validation_points.extend(test_case_instance.validate_request_url(api_request_context.url, api_request_context)) validation_points.extend(test_case_instance.validate_request_headers(api_request_context.headers, api_request_context)) validation_points.extend(test_case_instance.validate_request_body(api_request_context.body, api_request_context)) # 检查是否有严重预校验失败 critical_pre_validation_failure = False failure_messages = [] for vp in validation_points: if not vp.passed and test_case_instance.severity in [TestSeverity.CRITICAL, TestSeverity.HIGH]: critical_pre_validation_failure = True failure_messages.append(vp.message) if critical_pre_validation_failure: self.logger.warning(f"测试用例 '{test_case_instance.id}' 因请求预校验失败而中止 (严重级别: {test_case_instance.severity.value})。失败信息: {'; '.join(failure_messages)}") tc_duration = time.time() - tc_start_time return ExecutedTestCaseResult( test_case_id=test_case_instance.id, test_case_name=test_case_instance.name, test_case_severity=test_case_instance.severity, status=ExecutedTestCaseResult.Status.FAILED, # 预校验失败算作 FAILED validation_points=validation_points, message=f"请求预校验失败: {'; '.join(failure_messages)}", duration=tc_duration ) # ---- API 调用 ---- api_request_obj = APIRequest( method=api_request_context.method, url=api_request_context.url, params=api_request_context.query_params, headers=api_request_context.headers, json_data=api_request_context.body # Assuming JSON, APICaller might need to handle other types ) response_call_start_time = time.time() api_response_obj = self.api_caller.call_api(api_request_obj) response_call_elapsed_time = time.time() - response_call_start_time # ---- 响应验证 ---- # 3. 创建 APIResponseContext actual_text_content: Optional[str] = None if hasattr(api_response_obj, 'text_content') and api_response_obj.text_content is not None: # 优先尝试直接获取 actual_text_content = api_response_obj.text_content elif api_response_obj.json_content is not None: if isinstance(api_response_obj.json_content, str): actual_text_content = api_response_obj.json_content else: try: actual_text_content = json.dumps(api_response_obj.json_content, ensure_ascii=False) except TypeError: actual_text_content = str(api_response_obj.json_content) # 最后手段 # elapsed_time: 使用 response_call_elapsed_time # original_response: 设置为 None 因为 api_response_obj 没有 raw_response api_response_context = APIResponseContext( status_code=api_response_obj.status_code, headers=api_response_obj.headers, # 假设这些直接在 api_response_obj 上 json_content=api_response_obj.json_content, # 这个根据之前的错误提示是存在的 text_content=actual_text_content, elapsed_time=response_call_elapsed_time, original_response=None, # api_response_obj 没有 .raw_response 属性 request_context=api_request_context ) # 4. 执行响应验证和性能检查 validation_points.extend(test_case_instance.validate_response(api_response_context, api_request_context)) validation_points.extend(test_case_instance.check_performance(api_response_context, api_request_context)) # ---- 结果判定 ---- # 5. 判断此测试用例的最终状态 final_status = ExecutedTestCaseResult.Status.PASSED if any(not vp.passed for vp in validation_points): final_status = ExecutedTestCaseResult.Status.FAILED tc_duration = time.time() - tc_start_time return ExecutedTestCaseResult( test_case_id=test_case_instance.id, test_case_name=test_case_instance.name, test_case_severity=test_case_instance.severity, status=final_status, validation_points=validation_points, duration=tc_duration ) except Exception as e: self.logger.error(f"执行测试用例 '{test_case_class.id if test_case_instance else test_case_class.__name__}' 时发生严重错误: {e}", exc_info=True) tc_duration = time.time() - tc_start_time return ExecutedTestCaseResult( test_case_id=test_case_instance.id if test_case_instance else test_case_class.id if hasattr(test_case_class, 'id') else "unknown_tc_id", test_case_name=test_case_instance.name if test_case_instance else test_case_class.name if hasattr(test_case_class, 'name') else "Unknown Test Case Name", test_case_severity=test_case_instance.severity if test_case_instance else TestSeverity.CRITICAL, # Default to critical on error status=ExecutedTestCaseResult.Status.ERROR, validation_points=validation_points, # 可能包含部分成功或失败的验证点 message=f"测试用例执行时发生内部错误: {str(e)}", duration=tc_duration ) def _prepare_initial_request_data(self, endpoint_spec: Union[YAPIEndpoint, SwaggerEndpoint]) -> Dict[str, Any]: """ 根据端点规格准备一个初始的请求数据结构。 返回一个包含 'path_params', 'query_params', 'headers', 'body' 的字典。 """ self.logger.debug(f"Preparing initial request data for: {endpoint_spec.method} {endpoint_spec.path}") # path_params_spec: List[Dict] # 用于存储从Swagger等提取的路径参数定义 # query_params_spec: List[Dict] # headers_spec: List[Dict] # body_schema: Optional[Dict] # 重置/初始化这些变量,以避免跨调用共享状态(如果 APITestOrchestrator 实例被重用) path_params_spec_list: List[Dict[str, Any]] = [] query_params_spec_list: List[Dict[str, Any]] = [] headers_spec_list: List[Dict[str, Any]] = [] body_schema_dict: Optional[Dict[str, Any]] = None path_str = getattr(endpoint_spec, 'path', '') if isinstance(endpoint_spec, YAPIEndpoint): query_params_spec_list = endpoint_spec.req_query or [] headers_spec_list = endpoint_spec.req_headers or [] # YAPI 的路径参数在 req_params 中,如果用户定义了的话 if endpoint_spec.req_params: for p in endpoint_spec.req_params: # YAPI的req_params可能混合了路径参数和查询参数,这里只关心路径中实际存在的 # 需要从 path_str 中解析出占位符,然后匹配 req_params 中的定义 # 简化:我们假设 req_params 中的条目如果其 name 在路径占位符中,则是路径参数 # 更好的做法是 YAPI 解析器能明确区分它们 pass # 下面会统一处理路径参数 if endpoint_spec.req_body_type == 'json' and endpoint_spec.req_body_other: try: body_schema_dict = json.loads(endpoint_spec.req_body_other) if isinstance(endpoint_spec.req_body_other, str) else endpoint_spec.req_body_other except json.JSONDecodeError: self.logger.warning(f"YAPI req_body_other for {path_str} is not valid JSON: {endpoint_spec.req_body_other}") elif isinstance(endpoint_spec, SwaggerEndpoint): if endpoint_spec.parameters: for param_spec in endpoint_spec.parameters: param_in = param_spec.get('in') if param_in == 'path': path_params_spec_list.append(param_spec) elif param_in == 'query': query_params_spec_list.append(param_spec) elif param_in == 'header': headers_spec_list.append(param_spec) if endpoint_spec.request_body and 'content' in endpoint_spec.request_body: json_content_spec = endpoint_spec.request_body['content'].get('application/json', {}) if 'schema' in json_content_spec: body_schema_dict = json_content_spec['schema'] # --- 生成路径参数数据 --- path_params_data: Dict[str, Any] = {} import re # 从路径字符串中提取所有占位符名称,例如 /users/{id}/items/{itemId} -> ["id", "itemId"] path_param_names_in_url = re.findall(r'{(.*?)}', path_str) for p_name in path_param_names_in_url: found_spec = None # 尝试从 Swagger 的 path_params_spec_list 查找详细定义 for spec in path_params_spec_list: if spec.get('name') == p_name: found_spec = spec break # 尝试从 YAPI 的 req_params (如果之前有解析并填充到类似 path_params_spec_list 的结构) # (当前YAPI的req_params未直接用于填充path_params_spec_list, 需要改进InputParser或此处逻辑) # TODO: YAPI的req_params需要更可靠地映射到路径参数 if found_spec and isinstance(found_spec, dict): # 如果找到参数的详细规格 (例如来自Swagger) value = found_spec.get('example') if value is None and found_spec.get('schema'): value = self._generate_data_from_schema(found_spec['schema']) path_params_data[p_name] = value if value is not None else f"example_{p_name}" # Fallback else: # 如果没有详细规格,生成一个通用占位符值 path_params_data[p_name] = f"example_{p_name}" self.logger.debug(f"Path param '{p_name}' generated value: {path_params_data[p_name]}") # --- 生成查询参数数据 --- query_params_data: Dict[str, Any] = {} for q_param_spec in query_params_spec_list: name = q_param_spec.get('name') if name: value = q_param_spec.get('example') # Swagger/OpenAPI style if value is None and 'value' in q_param_spec: # YAPI style (value often holds example or default) value = q_param_spec['value'] if value is None and q_param_spec.get('schema'): # Swagger/OpenAPI schema for param value = self._generate_data_from_schema(q_param_spec['schema']) elif value is None and q_param_spec.get('type'): # YAPI may define type directly # Simplified schema generation for YAPI direct type if no 'value' field value = self._generate_data_from_schema({'type': q_param_spec.get('type')}) query_params_data[name] = value if value is not None else f"example_query_{name}" # --- 生成请求头数据 --- headers_data: Dict[str, str] = {"Content-Type": "application/json", "Accept": "application/json"} for h_param_spec in headers_spec_list: name = h_param_spec.get('name') if name and name.lower() not in ['content-type', 'accept']: # 不要覆盖基础的Content-Type/Accept,除非明确 value = h_param_spec.get('example') if value is None and 'value' in h_param_spec: # YAPI value = h_param_spec['value'] if value is None and h_param_spec.get('schema'): # Swagger value = self._generate_data_from_schema(h_param_spec['schema']) elif value is None and h_param_spec.get('type'): # YAPI value = self._generate_data_from_schema({'type': h_param_spec.get('type')}) if value is not None: headers_data[name] = str(value) else: headers_data[name] = f"example_header_{name}" # --- 生成请求体数据 --- body_data: Optional[Any] = None if body_schema_dict: generated_by_llm = False if self.use_llm_for_request_body and self.llm_service: self.logger.debug(f"尝试使用 LLM 为端点 {endpoint_spec.method} {endpoint_spec.path} 生成请求体。") try: # TODO: 动态创建 Pydantic 模型 (步骤2的核心) # DynamicPydanticModel = self._create_pydantic_model_from_schema(body_schema_dict, "DynamicRequestBodyModel") # if DynamicPydanticModel: # # TODO: 考虑是否需要从 endpoint_spec 中提取一些 prompt_instructions # llm_generated_body = self.llm_service.generate_parameters_from_schema( # pydantic_model_class=DynamicPydanticModel, # prompt_instructions=f"为API端点 {endpoint_spec.title or endpoint_spec.path} 生成请求体。" # ) # if llm_generated_body is not None: # body_data = llm_generated_body # generated_by_llm = True # self.logger.info(f"LLM 成功为 {endpoint_spec.method} {endpoint_spec.path} 生成了请求体。") # else: # self.logger.warning(f"LLM未能为 {endpoint_spec.method} {endpoint_spec.path} 生成请求体,将回退到默认方法。") # else: # self.logger.warning(f"未能从Schema动态创建Pydantic模型用于LLM请求体生成,将回退。") self.logger.info("LLM请求体生成部分尚未完全实现 (_create_pydantic_model_from_schema)。暂时回退。") # 临时日志 pass # 占位,直到 _create_pydantic_model_from_schema 完成 except Exception as e: self.logger.error(f"使用LLM生成请求体时发生错误: {e}。将回退到默认方法。", exc_info=True) if not generated_by_llm: # 如果未使用LLM或LLM生成失败 if self.use_llm_for_request_body and self.llm_service: # 只有在尝试过LLM之后才打印这条回退日志 self.logger.debug(f"LLM生成请求体失败或未启用,回退到基于规则的生成方法 for {endpoint_spec.method} {endpoint_spec.path}。") body_data = self._generate_data_from_schema(body_schema_dict) return { "path_params": path_params_data, "query_params": query_params_data, "headers": headers_data, "body": body_data } def run_test_for_endpoint(self, endpoint: Union[YAPIEndpoint, SwaggerEndpoint], global_api_spec: Union[ParsedYAPISpec, ParsedSwaggerSpec] # 新增参数 ) -> TestResult: # 返回类型更新为新的TestResult (EndpointExecutionResult) """ 运行单个API端点的所有适用测试用例。 """ endpoint_id = f"{getattr(endpoint, 'method', 'GET').upper()} {getattr(endpoint, 'path', '/')}" endpoint_name = getattr(endpoint, 'title', '') or getattr(endpoint, 'summary', '') or endpoint_id self.logger.info(f"开始为端点测试: {endpoint_id} ({endpoint_name})") # 使用新的TestResult结构 (它现在代表 EndpointExecutionResult) endpoint_test_result = TestResult( # 这是新的 TestResult endpoint_id=endpoint_id, endpoint_name=endpoint_name, # api_spec_details=endpoint.to_dict() if hasattr(endpoint, 'to_dict') else endpoint # 可选 ) if not self.test_case_registry: self.logger.warning(f"TestCaseRegistry 未初始化,无法为端点 '{endpoint_id}' 执行自定义测试用例。") # TODO: 决定此时的行为,是跳过,还是执行旧的规则引擎(如果保留),或者标记为错误。 # 简化:如果只想运行新的测试用例,那么这里就直接结束此端点的测试。 endpoint_test_result.overall_status = TestResult.Status.SKIPPED # 或者 ERROR endpoint_test_result.error_message = "TestCaseRegistry 未初始化。" endpoint_test_result.finalize_endpoint_test() # 计算持续时间等 return endpoint_test_result applicable_test_case_classes = self.test_case_registry.get_applicable_test_cases( endpoint_method=endpoint.method.upper(), endpoint_path=endpoint.path ) if not applicable_test_case_classes: self.logger.info(f"端点 '{endpoint_id}' 没有找到适用的自定义测试用例。") # 同样,决定行为。如果只依赖自定义测试用例,则此端点可能算作 SKIPPED 或某种形式的通过/信息。 # endpoint_test_result.overall_status = TestResult.Status.SKIPPED # 或 INFO / PASSED_NO_CASES # endpoint_test_result.message = "没有适用的自定义测试用例。" endpoint_test_result.finalize_endpoint_test() # 会将状态设置为ERROR并附带消息 return endpoint_test_result self.logger.info(f"端点 '{endpoint_id}' 发现了 {len(applicable_test_case_classes)} 个适用的测试用例: {[tc.id for tc in applicable_test_case_classes]}") for tc_class in applicable_test_case_classes: self.logger.debug(f"准备执行测试用例 '{tc_class.id}' for '{endpoint_id}'") executed_case_result = self._execute_single_test_case( test_case_class=tc_class, endpoint_spec=endpoint, global_api_spec=global_api_spec ) endpoint_test_result.add_executed_test_case_result(executed_case_result) self.logger.debug(f"测试用例 '{tc_class.id}' 执行完毕,状态: {executed_case_result.status.value}") # 所有测试用例执行完毕后,最终确定此端点的状态 endpoint_test_result.finalize_endpoint_test() self.logger.info(f"端点 '{endpoint_id}' 测试完成,最终状态: {endpoint_test_result.overall_status.value}") # 旧的规则引擎逻辑 (self.rule_executor) 可以选择性地在这里调用, # 或者完全被新的 APITestCase 机制取代。 # 如果要保留,需要决定它如何与新的结果结构集成。 # 目前,为了清晰和逐步迁移,我们假设主要依赖新的 APITestCase。 return endpoint_test_result def run_tests_from_yapi(self, yapi_file_path: str, categories: Optional[List[str]] = None, custom_test_cases_dir: Optional[str] = None # 新增参数 ) -> TestSummary: """ 从YAPI定义文件运行API测试 Args: yapi_file_path: YAPI定义文件路径 categories: 要测试的API分类列表(如果为None,则测试所有分类) custom_test_cases_dir: 自定义测试用例的目录。如果 Orchestrator 初始化时已提供,则此参数可选。 如果 Orchestrator 未提供,则必须在此处提供以加载测试用例。 如果 Orchestrator 初始化和此处都提供了,此处的优先。 Returns: TestSummary: 测试结果摘要 """ # 如果调用时传入了 custom_test_cases_dir,则重新初始化/更新 TestCaseRegistry if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir): self.logger.info(f"从 run_tests_from_yapi 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}") try: self.test_case_registry = TestCaseRegistry(test_cases_dir=custom_test_cases_dir) self.logger.info(f"TestCaseRegistry (re)initialization complete, found {len(self.test_case_registry.get_all_test_case_classes())} test case classes.") except Exception as e: self.logger.error(f"从 run_tests_from_yapi 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True) # 决定是中止还是继续(可能不运行自定义测试) # For now, if it fails here, it might proceed without custom tests if registry becomes None self.logger.info(f"从YAPI文件加载API定义: {yapi_file_path}") parsed_yapi = self.parser.parse_yapi_spec(yapi_file_path) summary = TestSummary() # 使用新的 TestSummary if not parsed_yapi: self.logger.error(f"解析YAPI文件失败: {yapi_file_path}") summary.finalize_summary() return summary endpoints_to_test = parsed_yapi.endpoints if categories: endpoints_to_test = [ep for ep in endpoints_to_test if ep.category_name in categories] summary.set_total_endpoints_defined(len(endpoints_to_test)) # 计算总的适用测试用例数量 (粗略估计,实际执行时可能会因内部逻辑跳过) total_applicable_tcs = 0 if self.test_case_registry: for endpoint_spec in endpoints_to_test: total_applicable_tcs += len( self.test_case_registry.get_applicable_test_cases( endpoint_spec.method.upper(), endpoint_spec.path ) ) summary.set_total_test_cases_applicable(total_applicable_tcs) for endpoint in endpoints_to_test: # 将完整的 parsed_yapi 作为 global_api_spec 传递 result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_yapi) summary.add_endpoint_result(result) # 使用新的 TestSummary 方法 summary.finalize_summary() # 使用新的 TestSummary 方法 return summary def run_tests_from_swagger(self, swagger_file_path: str, tags: Optional[List[str]] = None, custom_test_cases_dir: Optional[str] = None # 新增参数 ) -> TestSummary: """ 从Swagger定义文件运行API测试 Args: swagger_file_path: Swagger定义文件路径 tags: 要测试的API标签列表(如果为None,则测试所有标签) custom_test_cases_dir: 自定义测试用例的目录。 (逻辑同 yapi 方法) Returns: TestSummary: 测试结果摘要 """ if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir): self.logger.info(f"从 run_tests_from_swagger 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}") try: self.test_case_registry = TestCaseRegistry(test_cases_dir=custom_test_cases_dir) self.logger.info(f"TestCaseRegistry (re)initialization complete, found {len(self.test_case_registry.get_all_test_case_classes())} test case classes.") except Exception as e: self.logger.error(f"从 run_tests_from_swagger 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True) self.logger.info(f"从Swagger文件加载API定义: {swagger_file_path}") parsed_swagger = self.parser.parse_swagger_spec(swagger_file_path) summary = TestSummary() # 使用新的 TestSummary if not parsed_swagger: self.logger.error(f"解析Swagger文件失败: {swagger_file_path}") summary.finalize_summary() return summary endpoints_to_test = parsed_swagger.endpoints if tags: endpoints_to_test = [ep for ep in endpoints_to_test if any(tag in ep.tags for tag in tags)] summary.set_total_endpoints_defined(len(endpoints_to_test)) total_applicable_tcs = 0 if self.test_case_registry: for endpoint_spec in endpoints_to_test: total_applicable_tcs += len( self.test_case_registry.get_applicable_test_cases( endpoint_spec.method.upper(), endpoint_spec.path ) ) summary.set_total_test_cases_applicable(total_applicable_tcs) for endpoint in endpoints_to_test: # 将完整的 parsed_swagger 作为 global_api_spec 传递 result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_swagger) summary.add_endpoint_result(result) # 使用新的 TestSummary 方法 summary.finalize_summary() # 使用新的 TestSummary 方法 return summary def _generate_data_from_schema(self, schema: Dict[str, Any]) -> Any: """ 根据JSON Schema生成测试数据 (此方法基本保持不变,可能被测试用例或编排器内部使用) Args: schema: JSON Schema Returns: 生成的测试数据 """ if not schema or not isinstance(schema, dict): # 添加检查 schema 是否为 dict self.logger.debug(f"_generate_data_from_schema: 提供的 schema 无效或为空: {schema}") return None schema_type = schema.get('type') # 优先使用 example 或 default if 'example' in schema: return schema['example'] if 'default' in schema: return schema['default'] if schema_type == 'object': # ... (内容与旧版本相同,此处省略以便简洁) ... result = {} properties = schema.get('properties', {}) required_fields = schema.get('required', []) for prop_name, prop_schema in properties.items(): # 如果字段是必需的,或者我们想为所有字段生成值 # 为了生成更完整的请求体,我们通常会为所有定义的属性生成值 # if prop_name in required_fields or True: # 改为总是尝试生成 result[prop_name] = self._generate_data_from_schema(prop_schema) # 确保所有必需字段都有值,即使它们在 properties 中没有 schema(不常见,但可能) # for req_field in required_fields: # if req_field not in result: # result[req_field] = "example_required_value" # 或 None return result if result else {} # 确保返回字典 elif schema_type == 'array': items_schema = schema.get('items', {}) # 尝试生成一个或多个项,可以使用 minItems/maxItems (简化:生成一项) min_items = schema.get('minItems', 1 if schema.get('default') is None and schema.get('example') is None else 0) # 如果有默认或示例空数组,则可以为0 if min_items == 0 and (schema.get('default') == [] or schema.get('example') == []): return [] num_items_to_generate = max(1, min_items) # 至少生成一项,除非minItems显式为0且无内容 generated_array = [self._generate_data_from_schema(items_schema) for _ in range(num_items_to_generate)] # 过滤掉生成失败的 None 值,除非 schema 允许 null # if items_schema.get('type') != 'null' and not ('null' in items_schema.get('type', []) if isinstance(items_schema.get('type'), list) else False): # generated_array = [item for item in generated_array if item is not None] return generated_array elif schema_type == 'string': string_format = schema.get('format', '') if 'enum' in schema and schema['enum']: # 确保 enum 非空 return schema['enum'][0] # ... (其他格式处理与旧版类似) ... if string_format == 'date': return '2023-01-01' if string_format == 'date-time': return datetime.datetime.now().isoformat() if string_format == 'email': return 'test@example.com' if string_format == 'uuid': import uuid; return str(uuid.uuid4()) # pattern, minLength, maxLength 等可以进一步细化 return schema.get('default', schema.get('example', 'example_string')) elif schema_type == 'number' or schema_type == 'integer': # ... (与旧版类似,优先 default/example) ... val = schema.get('default', schema.get('example')) if val is not None: return val minimum = schema.get('minimum') maximum = schema.get('maximum') if minimum is not None: return minimum if maximum is not None: return maximum # (如果只有max,可能需要调整) return 0 if schema_type == 'integer' else 0.0 elif schema_type == 'boolean': return schema.get('default', schema.get('example', False)) # 默认为 False elif schema_type == 'null': return None self.logger.debug(f"_generate_data_from_schema: 未知或不支持的 schema 类型 '{schema_type}' for schema: {schema}") return None # 对于未知类型,返回None # ... (旧的 _build_api_request 和 _validate_response 基本可以移除了,因为它们的功能被新的流程覆盖) ... # 确保删除或注释掉旧的 `_build_api_request` 和 `_validate_response` 方法, # 因为它们的功能现在被 `_execute_single_test_case` 和 `_prepare_initial_request_data` 中的逻辑所取代或整合。 # python run_api_tests.py --base-url http://127.0.0.1:4523/m1/6386850-6083489-default --yapi assets/doc/井筒API示例.json --custom-test-cases-dir ./custom_testcases # (示例命令行调用,需要更新以匹配新的参数)