1342 lines
77 KiB
Python
1342 lines
77 KiB
Python
"""
|
||
测试编排器模块
|
||
|
||
负责组合API解析器、API调用器、验证器和规则执行器,进行端到端的API测试
|
||
"""
|
||
|
||
import logging
|
||
import json
|
||
import time
|
||
import re # 添加 re 模块导入
|
||
from typing import Dict, List, Any, Optional, Union, Tuple, Type, ForwardRef
|
||
from enum import Enum
|
||
import datetime
|
||
import datetime as dt
|
||
from uuid import UUID
|
||
|
||
from pydantic import BaseModel, Field, create_model
|
||
from pydantic.networks import EmailStr
|
||
|
||
from .input_parser.parser import InputParser, YAPIEndpoint, SwaggerEndpoint, ParsedYAPISpec, ParsedSwaggerSpec
|
||
from .api_caller.caller import APICaller, APIRequest, APIResponse
|
||
from .json_schema_validator.validator import JSONSchemaValidator
|
||
from .test_framework_core import ValidationResult, TestSeverity, APIRequestContext, APIResponseContext, BaseAPITestCase
|
||
from .test_case_registry import TestCaseRegistry
|
||
# 尝试导入 LLMService,如果失败则允许,因为 LLM 功能是可选的
|
||
try:
|
||
from .llm_utils.llm_service import LLMService
|
||
except ImportError:
|
||
LLMService = None
|
||
logging.getLogger(__name__).info("LLMService 未找到,LLM 相关功能将不可用。")
|
||
|
||
# Cache for dynamically created Pydantic models to avoid redefinition issues
|
||
_dynamic_model_cache: Dict[str, Type[BaseModel]] = {}
|
||
|
||
class ExecutedTestCaseResult:
|
||
"""存储单个APITestCase在其适用的端点上执行后的结果。"""
|
||
|
||
class Status(str, Enum):
|
||
"""单个测试用例的执行状态枚举"""
|
||
PASSED = "通过"
|
||
FAILED = "失败"
|
||
ERROR = "执行错误" # 指测试用例代码本身出错,而不是API验证失败
|
||
SKIPPED = "跳过" # 如果测试用例因某些条件被跳过执行
|
||
|
||
def __init__(self,
|
||
test_case_id: str,
|
||
test_case_name: str,
|
||
test_case_severity: TestSeverity,
|
||
status: Status,
|
||
validation_points: List[ValidationResult],
|
||
message: str = "", # 总体消息,例如执行错误时的错误信息
|
||
duration: float = 0.0):
|
||
self.test_case_id = test_case_id
|
||
self.test_case_name = test_case_name
|
||
self.test_case_severity = test_case_severity
|
||
self.status = status
|
||
self.validation_points = validation_points or []
|
||
self.message = message
|
||
self.duration = duration # 执行此测试用例的耗时
|
||
self.timestamp = datetime.datetime.now()
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
return {
|
||
"test_case_id": self.test_case_id,
|
||
"test_case_name": self.test_case_name,
|
||
"test_case_severity": self.test_case_severity.value, # 使用枚举值
|
||
"status": self.status.value,
|
||
"message": self.message,
|
||
"duration_seconds": self.duration,
|
||
"timestamp": self.timestamp.isoformat(),
|
||
"validation_points": [vp.details if vp.details else {"passed": vp.passed, "message": vp.message} for vp in self.validation_points]
|
||
}
|
||
|
||
class TestResult: # 原来的 TestResult 被重构为 EndpointExecutionResult
|
||
"""
|
||
存储对单个API端点执行所有适用APITestCase后的整体测试结果。
|
||
(此类替换了旧的 TestResult 的角色,并进行了结构调整)
|
||
"""
|
||
class Status(str, Enum): # 这个枚举保持不变,但其含义现在是端点的整体状态
|
||
"""端点测试状态枚举"""
|
||
PASSED = "通过" # 所有关键测试用例通过
|
||
FAILED = "失败" # 任何一个关键测试用例失败
|
||
ERROR = "错误" # 测试执行过程中出现错误(非API本身错误,而是测试代码或环境)
|
||
SKIPPED = "跳过" # 如果整个端点的测试被跳过
|
||
PARTIAL_SUCCESS = "部分成功" # 一些非关键测试用例失败,但关键的通过
|
||
|
||
def __init__(self,
|
||
endpoint_id: str, # 通常是 method + path
|
||
endpoint_name: str, # API 的可读名称/标题
|
||
overall_status: Status = Status.SKIPPED, # 默认为跳过,后续根据测试用例结果更新
|
||
start_time: Optional[datetime.datetime] = None
|
||
):
|
||
self.endpoint_id = endpoint_id
|
||
self.endpoint_name = endpoint_name
|
||
self.overall_status = overall_status
|
||
self.executed_test_cases: List[ExecutedTestCaseResult] = []
|
||
self.start_time = start_time if start_time else datetime.datetime.now()
|
||
self.end_time: Optional[datetime.datetime] = None
|
||
self.error_message: Optional[str] = None # 如果整个端点测试出错,记录错误信息
|
||
|
||
def add_executed_test_case_result(self, result: ExecutedTestCaseResult):
|
||
self.executed_test_cases.append(result)
|
||
|
||
def finalize_endpoint_test(self):
|
||
self.end_time = datetime.datetime.now()
|
||
# 根据所有 executed_test_cases 的状态和严重性来计算 overall_status
|
||
if not self.executed_test_cases and self.overall_status == TestResult.Status.SKIPPED : # 如果没有执行任何测试用例且状态仍为初始的SKIPPED
|
||
pass # 保持 SKIPPED
|
||
elif any(tc.status == ExecutedTestCaseResult.Status.ERROR for tc in self.executed_test_cases):
|
||
self.overall_status = TestResult.Status.ERROR
|
||
# 可以考虑将第一个遇到的ERROR的message赋给self.error_message
|
||
first_error = next((tc.message for tc in self.executed_test_cases if tc.status == ExecutedTestCaseResult.Status.ERROR), None)
|
||
if first_error:
|
||
self.error_message = f"测试用例执行错误: {first_error}"
|
||
else:
|
||
# 筛选出失败的测试用例
|
||
failed_tcs = [tc for tc in self.executed_test_cases if tc.status == ExecutedTestCaseResult.Status.FAILED]
|
||
if not failed_tcs:
|
||
if not self.executed_test_cases: # 如果没有执行任何测试用例但又不是SKIPPED,可能也算某种形式的错误或特殊通过
|
||
self.overall_status = TestResult.Status.PASSED # 或者定义一个"NO_CASES_RUN"状态
|
||
else:
|
||
self.overall_status = TestResult.Status.PASSED
|
||
else:
|
||
# 检查失败的测试用例中是否有CRITICAL或HIGH严重级别的
|
||
if any(tc.test_case_severity in [TestSeverity.CRITICAL, TestSeverity.HIGH] for tc in failed_tcs):
|
||
self.overall_status = TestResult.Status.FAILED
|
||
else: # 所有失败的都是 MEDIUM, LOW, INFO
|
||
self.overall_status = TestResult.Status.PARTIAL_SUCCESS
|
||
|
||
if not self.executed_test_cases and self.overall_status not in [TestResult.Status.SKIPPED, TestResult.Status.ERROR]:
|
||
# 如果没有执行测试用例,并且不是因为错误或明确跳过,这可能是一个配置问题或意外情况
|
||
self.overall_status = TestResult.Status.ERROR # 或者一个更特定的状态
|
||
self.error_message = "没有为该端点找到或执行任何适用的测试用例。"
|
||
|
||
|
||
@property
|
||
def duration(self) -> float:
|
||
if self.start_time and self.end_time:
|
||
return (self.end_time - self.start_time).total_seconds()
|
||
return 0.0
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
data = {
|
||
"endpoint_id": self.endpoint_id,
|
||
"endpoint_name": self.endpoint_name,
|
||
"overall_status": self.overall_status.value,
|
||
"duration_seconds": self.duration,
|
||
"start_time": self.start_time.isoformat() if self.start_time else None,
|
||
"end_time": self.end_time.isoformat() if self.end_time else None,
|
||
"executed_test_cases": [tc.to_dict() for tc in self.executed_test_cases]
|
||
}
|
||
if self.error_message:
|
||
data["error_message"] = self.error_message
|
||
return data
|
||
|
||
class TestSummary:
|
||
"""测试结果摘要 (已更新以适应新的结果结构)"""
|
||
|
||
def __init__(self):
|
||
self.total_endpoints_defined: int = 0 # YAPI/Swagger中定义的端点总数
|
||
self.total_endpoints_tested: int = 0 # 实际执行了测试的端点数量 (至少有一个测试用例被执行)
|
||
|
||
self.endpoints_passed: int = 0
|
||
self.endpoints_failed: int = 0
|
||
self.endpoints_partial_success: int = 0
|
||
self.endpoints_error: int = 0
|
||
self.endpoints_skipped: int = 0 # 由于配置或过滤器,整个端点被跳过测试
|
||
|
||
self.total_test_cases_applicable: int = 0 # 所有端点上适用测试用例的总和
|
||
self.total_test_cases_executed: int = 0 # 所有端点上实际执行的测试用例总数
|
||
self.test_cases_passed: int = 0
|
||
self.test_cases_failed: int = 0
|
||
self.test_cases_error: int = 0 # 测试用例代码本身出错
|
||
self.test_cases_skipped_in_endpoint: int = 0 # 测试用例在端点执行中被跳过
|
||
|
||
self.start_time = datetime.datetime.now()
|
||
self.end_time: Optional[datetime.datetime] = None
|
||
self.detailed_results: List[TestResult] = [] # 将存储新的 TestResult (EndpointExecutionResult) 对象
|
||
|
||
def add_endpoint_result(self, result: TestResult): # result 现在是新的 TestResult 类型
|
||
self.detailed_results.append(result)
|
||
|
||
if result.executed_test_cases or result.overall_status not in [TestResult.Status.SKIPPED, TestResult.Status.ERROR]: # 只有实际尝试了测试的端点才算tested
|
||
if not (len(result.executed_test_cases) == 0 and result.overall_status == TestResult.Status.ERROR and result.error_message and "没有为该端点找到或执行任何适用的测试用例" in result.error_message):
|
||
self.total_endpoints_tested +=1
|
||
|
||
if result.overall_status == TestResult.Status.PASSED:
|
||
self.endpoints_passed += 1
|
||
elif result.overall_status == TestResult.Status.FAILED:
|
||
self.endpoints_failed += 1
|
||
elif result.overall_status == TestResult.Status.PARTIAL_SUCCESS:
|
||
self.endpoints_partial_success +=1
|
||
elif result.overall_status == TestResult.Status.ERROR:
|
||
self.endpoints_error += 1
|
||
elif result.overall_status == TestResult.Status.SKIPPED: # 端点级别跳过
|
||
self.endpoints_skipped +=1
|
||
|
||
for tc_result in result.executed_test_cases:
|
||
self.total_test_cases_executed += 1 # 每个APITestCase算一次执行
|
||
if tc_result.status == ExecutedTestCaseResult.Status.PASSED:
|
||
self.test_cases_passed += 1
|
||
elif tc_result.status == ExecutedTestCaseResult.Status.FAILED:
|
||
self.test_cases_failed += 1
|
||
elif tc_result.status == ExecutedTestCaseResult.Status.ERROR:
|
||
self.test_cases_error +=1
|
||
elif tc_result.status == ExecutedTestCaseResult.Status.SKIPPED:
|
||
self.test_cases_skipped_in_endpoint +=1
|
||
|
||
def set_total_endpoints_defined(self, count: int):
|
||
self.total_endpoints_defined = count
|
||
|
||
def set_total_test_cases_applicable(self, count: int):
|
||
self.total_test_cases_applicable = count
|
||
|
||
def finalize_summary(self):
|
||
self.end_time = datetime.datetime.now()
|
||
|
||
@property
|
||
def duration(self) -> float:
|
||
if not self.end_time:
|
||
return 0.0
|
||
return (self.end_time - self.start_time).total_seconds()
|
||
|
||
@property
|
||
def endpoint_success_rate(self) -> float:
|
||
if self.total_endpoints_tested == 0:
|
||
return 0.0
|
||
# 通常只把 PASSED 算作成功
|
||
return (self.endpoints_passed / self.total_endpoints_tested) * 100
|
||
|
||
@property
|
||
def test_case_success_rate(self) -> float:
|
||
if self.total_test_cases_executed == 0:
|
||
return 0.0
|
||
return (self.test_cases_passed / self.total_test_cases_executed) * 100
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
return {
|
||
"summary_metadata": {
|
||
"start_time": self.start_time.isoformat(),
|
||
"end_time": self.end_time.isoformat() if self.end_time else None,
|
||
"duration_seconds": f"{self.duration:.2f}",
|
||
},
|
||
"endpoint_stats": {
|
||
"total_defined": self.total_endpoints_defined,
|
||
"total_tested": self.total_endpoints_tested,
|
||
"passed": self.endpoints_passed,
|
||
"failed": self.endpoints_failed,
|
||
"partial_success": self.endpoints_partial_success,
|
||
"error": self.endpoints_error,
|
||
"skipped": self.endpoints_skipped,
|
||
"success_rate_percentage": f"{self.endpoint_success_rate:.2f}",
|
||
},
|
||
"test_case_stats": {
|
||
"total_applicable": self.total_test_cases_applicable, # 计划执行的测试用例总数
|
||
"total_executed": self.total_test_cases_executed, # 实际执行的测试用例总数
|
||
"passed": self.test_cases_passed,
|
||
"failed": self.test_cases_failed,
|
||
"error_in_execution": self.test_cases_error,
|
||
"skipped_during_endpoint_execution": self.test_cases_skipped_in_endpoint,
|
||
"success_rate_percentage": f"{self.test_case_success_rate:.2f}",
|
||
},
|
||
"detailed_results": [result.to_dict() for result in self.detailed_results]
|
||
}
|
||
|
||
def to_json(self, pretty=True) -> str:
|
||
indent = 2 if pretty else None
|
||
return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
|
||
|
||
def print_summary_to_console(self): # Renamed from print_summary
|
||
# (Implementation can be more detailed based on the new stats)
|
||
print("\n===== 测试运行摘要 =====")
|
||
print(f"开始时间: {self.start_time.isoformat()}")
|
||
if self.end_time:
|
||
print(f"结束时间: {self.end_time.isoformat()}")
|
||
print(f"总耗时: {self.duration:.2f} 秒")
|
||
|
||
print("\n--- 端点统计 ---")
|
||
print(f"定义的端点总数: {self.total_endpoints_defined}")
|
||
print(f"实际测试的端点数: {self.total_endpoints_tested}")
|
||
print(f" 通过: {self.endpoints_passed}")
|
||
print(f" 失败: {self.endpoints_failed}")
|
||
print(f" 部分成功: {self.endpoints_partial_success}")
|
||
print(f" 执行错误: {self.endpoints_error}")
|
||
print(f" 跳过执行: {self.endpoints_skipped}")
|
||
print(f" 端点通过率: {self.endpoint_success_rate:.2f}%")
|
||
|
||
print("\n--- 测试用例统计 ---")
|
||
print(f"适用的测试用例总数 (计划执行): {self.total_test_cases_applicable}")
|
||
print(f"实际执行的测试用例总数: {self.total_test_cases_executed}")
|
||
print(f" 通过: {self.test_cases_passed}")
|
||
print(f" 失败: {self.test_cases_failed}")
|
||
print(f" 执行错误 (测试用例代码问题): {self.test_cases_error}")
|
||
print(f" 跳过 (在端点内被跳过): {self.test_cases_skipped_in_endpoint}")
|
||
print(f" 测试用例通过率: {self.test_case_success_rate:.2f}%")
|
||
|
||
# 可选:打印失败的端点和测试用例摘要
|
||
failed_endpoints = [res for res in self.detailed_results if res.overall_status == TestResult.Status.FAILED]
|
||
if failed_endpoints:
|
||
print("\n--- 失败的端点摘要 ---")
|
||
for ep_res in failed_endpoints:
|
||
print(f" 端点: {ep_res.endpoint_id} ({ep_res.endpoint_name}) - 状态: {ep_res.overall_status.value}")
|
||
for tc_res in ep_res.executed_test_cases:
|
||
if tc_res.status == ExecutedTestCaseResult.Status.FAILED:
|
||
print(f" - 测试用例失败: {tc_res.test_case_id} ({tc_res.test_case_name})")
|
||
for vp in tc_res.validation_points:
|
||
if not vp.passed:
|
||
print(f" - 验证点: {vp.message}")
|
||
|
||
class APITestOrchestrator:
|
||
"""API测试编排器"""
|
||
|
||
def __init__(self, base_url: str,
|
||
custom_test_cases_dir: Optional[str] = None,
|
||
llm_api_key: Optional[str] = None,
|
||
llm_base_url: Optional[str] = None,
|
||
llm_model_name: Optional[str] = None,
|
||
use_llm_for_request_body: bool = False,
|
||
use_llm_for_path_params: bool = False,
|
||
use_llm_for_query_params: bool = False,
|
||
use_llm_for_headers: bool = False
|
||
):
|
||
"""
|
||
初始化API测试编排器
|
||
|
||
Args:
|
||
base_url: API基础URL
|
||
custom_test_cases_dir: 存放自定义 APITestCase 的目录路径。如果为 None,则不加载自定义测试用例。
|
||
llm_api_key: 大模型服务的API Key。
|
||
llm_base_url: 大模型服务的兼容OpenAI的基础URL。
|
||
llm_model_name: 要使用的具体模型名称。
|
||
use_llm_for_request_body: 是否全局启用LLM生成请求体。
|
||
use_llm_for_path_params: 是否全局启用LLM生成路径参数。
|
||
use_llm_for_query_params: 是否全局启用LLM生成查询参数。
|
||
use_llm_for_headers: 是否全局启用LLM生成头部参数。
|
||
"""
|
||
self.base_url = base_url.rstrip('/')
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
# 初始化组件
|
||
self.parser = InputParser()
|
||
self.api_caller = APICaller()
|
||
self.validator = JSONSchemaValidator() # JSON Schema 验证器,可能会被测试用例内部使用
|
||
|
||
self.test_case_registry: Optional[TestCaseRegistry] = None
|
||
if custom_test_cases_dir:
|
||
self.logger.info(f"初始化 TestCaseRegistry,扫描目录: {custom_test_cases_dir}")
|
||
try:
|
||
self.test_case_registry = TestCaseRegistry(test_cases_dir=custom_test_cases_dir)
|
||
self.logger.info(f"TestCaseRegistry 初始化完成,发现 {len(self.test_case_registry.get_all_test_case_classes())} 个测试用例类。")
|
||
except Exception as e:
|
||
self.logger.error(f"初始化 TestCaseRegistry 失败: {e}", exc_info=True)
|
||
else:
|
||
self.logger.info("未提供 custom_test_cases_dir,不加载自定义 APITestCase。")
|
||
|
||
# LLM 全局配置开关
|
||
self.use_llm_for_request_body = use_llm_for_request_body
|
||
self.use_llm_for_path_params = use_llm_for_path_params
|
||
self.use_llm_for_query_params = use_llm_for_query_params
|
||
self.use_llm_for_headers = use_llm_for_headers
|
||
|
||
self.llm_service: Optional[LLMService] = None
|
||
if LLMService is None:
|
||
self.logger.warning("LLMService 类未能导入,LLM 相关功能将完全禁用。")
|
||
# 强制所有LLM使用为False,并确保服务实例为None
|
||
self.use_llm_for_request_body = False
|
||
self.use_llm_for_path_params = False
|
||
self.use_llm_for_query_params = False
|
||
self.use_llm_for_headers = False
|
||
elif llm_api_key and llm_base_url and llm_model_name: # 直接检查配置是否完整
|
||
try:
|
||
self.llm_service = LLMService(
|
||
api_key=llm_api_key,
|
||
base_url=llm_base_url,
|
||
model_name=llm_model_name
|
||
)
|
||
self.logger.info(f"LLMService 已成功初始化,模型: {llm_model_name}。")
|
||
except ValueError as ve:
|
||
self.logger.error(f"LLMService 初始化失败 (参数错误): {ve}。LLM相关功能将不可用。")
|
||
self.llm_service = None # 确保初始化失败时服务为None
|
||
except Exception as e:
|
||
self.logger.error(f"LLMService 初始化时发生未知错误: {e}。LLM相关功能将不可用。", exc_info=True)
|
||
self.llm_service = None # 确保初始化失败时服务为None
|
||
else:
|
||
# 如果LLMService类存在,但配置不完整
|
||
if LLMService:
|
||
self.logger.warning("LLMService 类已找到,但未提供完整的LLM配置 (api_key, base_url, model_name)。LLM相关功能将不可用。")
|
||
# self.llm_service 默认就是 None,无需额外操作
|
||
|
||
# 新增:端点级别的LLM生成参数缓存
|
||
self.llm_endpoint_params_cache: Dict[str, Dict[str, Any]] = {}
|
||
|
||
def _should_use_llm_for_param_type(
|
||
self,
|
||
param_type_key: str, # 例如 "path_params", "query_params", "headers", "body"
|
||
test_case_instance: Optional[BaseAPITestCase]
|
||
) -> bool:
|
||
"""
|
||
判断是否应为特定参数类型尝试使用LLM。
|
||
结合全局配置和测试用例特定配置。
|
||
"""
|
||
if not self.llm_service: # 如果LLM服务本身就不可用,则肯定不用
|
||
return False
|
||
|
||
global_flag = False
|
||
tc_specific_flag: Optional[bool] = None
|
||
|
||
if param_type_key == "body":
|
||
global_flag = self.use_llm_for_request_body
|
||
if test_case_instance:
|
||
tc_specific_flag = test_case_instance.use_llm_for_body
|
||
elif param_type_key == "path_params":
|
||
global_flag = self.use_llm_for_path_params
|
||
if test_case_instance:
|
||
tc_specific_flag = test_case_instance.use_llm_for_path_params
|
||
elif param_type_key == "query_params":
|
||
global_flag = self.use_llm_for_query_params
|
||
if test_case_instance:
|
||
tc_specific_flag = test_case_instance.use_llm_for_query_params
|
||
elif param_type_key == "headers":
|
||
global_flag = self.use_llm_for_headers
|
||
if test_case_instance:
|
||
tc_specific_flag = test_case_instance.use_llm_for_headers
|
||
else:
|
||
self.logger.warning(f"未知的参数类型键 '{param_type_key}' 在 _should_use_llm_for_param_type 中检查。")
|
||
return False
|
||
|
||
# 决定最终是否使用LLM的逻辑:
|
||
# 1. 如果测试用例明确设置了 (tc_specific_flag is not None),则以测试用例的设置为准。
|
||
# 2. 否则,使用全局设置。
|
||
final_decision = tc_specific_flag if tc_specific_flag is not None else global_flag
|
||
|
||
# self.logger.debug(f"LLM决策 for '{param_type_key}': TC specific='{tc_specific_flag}', Global='{global_flag}', Final='{final_decision}')
|
||
return final_decision
|
||
|
||
def _create_pydantic_model_from_schema(
|
||
self,
|
||
schema: Dict[str, Any],
|
||
model_name: str,
|
||
recursion_depth: int = 0
|
||
) -> Optional[Type[BaseModel]]:
|
||
"""
|
||
动态地从JSON Schema字典创建一个Pydantic模型类。
|
||
支持嵌套对象和数组。
|
||
|
||
Args:
|
||
schema: JSON Schema字典。
|
||
model_name: 要创建的Pydantic模型的名称。
|
||
recursion_depth: 当前递归深度,用于防止无限循环。
|
||
|
||
Returns:
|
||
一个Pydantic BaseModel的子类,如果创建失败则返回None。
|
||
"""
|
||
MAX_RECURSION_DEPTH = 10
|
||
if recursion_depth > MAX_RECURSION_DEPTH:
|
||
self.logger.error(f"创建Pydantic模型 '{model_name}' 时达到最大递归深度 {MAX_RECURSION_DEPTH}。可能存在循环引用。")
|
||
return None
|
||
|
||
# 清理模型名称,使其成为有效的Python标识符
|
||
safe_model_name = "".join(c if c.isalnum() or c == '_' else '_' for c in model_name)
|
||
if not safe_model_name or not safe_model_name[0].isalpha() and safe_model_name[0] != '_':
|
||
safe_model_name = f"DynamicModel_{safe_model_name}"
|
||
|
||
# 检查缓存 (使用清理后的名称)
|
||
if safe_model_name in _dynamic_model_cache:
|
||
self.logger.debug(f"从缓存返回动态模型: {safe_model_name}")
|
||
return _dynamic_model_cache[safe_model_name]
|
||
|
||
self.logger.debug(f"开始从Schema创建Pydantic模型: '{safe_model_name}' (原始名: '{model_name}', 深度: {recursion_depth})")
|
||
|
||
if not isinstance(schema, dict) or schema.get('type') != 'object':
|
||
# Safely get type for logging if schema is not a dict or does not have 'type'
|
||
schema_type_for_log = schema.get('type') if isinstance(schema, dict) else type(schema).__name__
|
||
self.logger.error(f"提供的Schema用于模型 '{safe_model_name}' 的必须是 type 'object' 且是一个字典, 实际: {schema_type_for_log}")
|
||
return None
|
||
|
||
properties = schema.get('properties', {})
|
||
required_fields = set(schema.get('required', []))
|
||
field_definitions: Dict[str, Tuple[Any, Any]] = {}
|
||
|
||
for prop_name, prop_schema in properties.items():
|
||
if not isinstance(prop_schema, dict):
|
||
self.logger.warning(f"属性 '{prop_name}' 在模型 '{safe_model_name}' 中的Schema无效,已跳过。")
|
||
continue
|
||
|
||
python_type: Any = Any
|
||
field_args: Dict[str, Any] = {}
|
||
|
||
default_value: Any = ... # Ellipsis for required fields with no default
|
||
if 'default' in prop_schema:
|
||
default_value = prop_schema['default']
|
||
elif prop_name not in required_fields:
|
||
default_value = None
|
||
|
||
if 'description' in prop_schema:
|
||
field_args['description'] = prop_schema['description']
|
||
|
||
json_type = prop_schema.get('type')
|
||
json_format = prop_schema.get('format')
|
||
|
||
if json_type == 'object':
|
||
nested_model_name_base = f"{safe_model_name}_{prop_name}"
|
||
python_type = self._create_pydantic_model_from_schema(prop_schema, nested_model_name_base, recursion_depth + 1)
|
||
if python_type is None:
|
||
self.logger.warning(f"无法为 '{safe_model_name}' 中的嵌套属性 '{prop_name}' 创建模型,已跳过。")
|
||
continue
|
||
elif json_type == 'array':
|
||
items_schema = prop_schema.get('items')
|
||
if not isinstance(items_schema, dict):
|
||
self.logger.warning(f"数组属性 '{prop_name}' 在模型 '{safe_model_name}' 中的 'items' schema无效,已跳过。")
|
||
continue
|
||
|
||
item_type: Any = Any
|
||
item_json_type = items_schema.get('type')
|
||
item_json_format = items_schema.get('format')
|
||
|
||
if item_json_type == 'object':
|
||
item_model_name_base = f"{safe_model_name}_{prop_name}_Item"
|
||
item_type = self._create_pydantic_model_from_schema(items_schema, item_model_name_base, recursion_depth + 1)
|
||
if item_type is None:
|
||
self.logger.warning(f"无法为 '{safe_model_name}' 中的数组属性 '{prop_name}' 的项创建模型,已跳过。")
|
||
continue
|
||
elif item_json_type == 'string':
|
||
if item_json_format == 'date-time': item_type = dt.datetime
|
||
elif item_json_format == 'date': item_type = dt.date
|
||
elif item_json_format == 'email': item_type = EmailStr
|
||
elif item_json_format == 'uuid': item_type = UUID
|
||
else: item_type = str
|
||
elif item_json_type == 'integer': item_type = int
|
||
elif item_json_type == 'number': item_type = float
|
||
elif item_json_type == 'boolean': item_type = bool
|
||
else:
|
||
self.logger.warning(f"数组 '{prop_name}' 中的项具有未知类型 '{item_json_type}',默认为 Any。")
|
||
|
||
python_type = List[item_type] # type: ignore
|
||
elif json_type == 'string':
|
||
if json_format == 'date-time': python_type = dt.datetime
|
||
elif json_format == 'date': python_type = dt.date
|
||
elif json_format == 'email': python_type = EmailStr
|
||
elif json_format == 'uuid': python_type = UUID
|
||
else: python_type = str
|
||
if 'minLength' in prop_schema: field_args['min_length'] = prop_schema['minLength']
|
||
if 'maxLength' in prop_schema: field_args['max_length'] = prop_schema['maxLength']
|
||
if 'pattern' in prop_schema: field_args['pattern'] = prop_schema['pattern']
|
||
elif json_type == 'integer':
|
||
python_type = int
|
||
if 'minimum' in prop_schema: field_args['ge'] = prop_schema['minimum']
|
||
if 'maximum' in prop_schema: field_args['le'] = prop_schema['maximum']
|
||
elif json_type == 'number':
|
||
python_type = float
|
||
if 'minimum' in prop_schema: field_args['ge'] = prop_schema['minimum']
|
||
if 'maximum' in prop_schema: field_args['le'] = prop_schema['maximum']
|
||
elif json_type == 'boolean':
|
||
python_type = bool
|
||
elif json_type is None and '$ref' in prop_schema:
|
||
self.logger.warning(f"Schema $ref '{prop_schema['$ref']}' in '{safe_model_name}.{prop_name}' not yet supported. Defaulting to Any.")
|
||
python_type = Any
|
||
else:
|
||
self.logger.warning(f"属性 '{prop_name}' 在模型 '{safe_model_name}' 中具有未知类型 '{json_type}',默认为 Any。")
|
||
python_type = Any
|
||
|
||
if 'enum' in prop_schema:
|
||
enum_values = prop_schema['enum']
|
||
if enum_values:
|
||
enum_desc = f" (Enum values: {', '.join(map(str, enum_values))})"
|
||
field_args['description'] = field_args.get('description', '') + enum_desc
|
||
|
||
current_field_is_optional = prop_name not in required_fields
|
||
if current_field_is_optional and python_type is not Any and default_value is None:
|
||
# For Pydantic v1/v2, if a field is not required and has no other default, it's Optional.
|
||
# The `python_type` itself might already be an `Optional` if it came from a nested optional model.
|
||
# We only wrap with Optional if it's not already wrapped effectively.
|
||
# A simple check: if the type name doesn't start with "Optional"
|
||
if not (hasattr(python_type, '__origin__') and python_type.__origin__ is Union and type(None) in python_type.__args__):
|
||
python_type = Optional[python_type]
|
||
|
||
|
||
field_definitions[prop_name] = (python_type, Field(default_value, **field_args))
|
||
|
||
if not field_definitions:
|
||
self.logger.warning(f"模型 '{safe_model_name}' 没有有效的字段定义,无法创建。")
|
||
# Return a very basic BaseModel if no properties are defined but an object schema was given
|
||
# This might happen for an empty object schema {}
|
||
try:
|
||
EmptyModel = create_model(safe_model_name, __base__=BaseModel)
|
||
_dynamic_model_cache[safe_model_name] = EmptyModel
|
||
self.logger.info(f"创建了一个空的动态Pydantic模型: '{safe_model_name}' (由于无属性定义)")
|
||
return EmptyModel
|
||
except Exception as e_empty:
|
||
self.logger.error(f"尝试为 '{safe_model_name}' 创建空模型时失败: {e_empty}", exc_info=True)
|
||
return None
|
||
|
||
|
||
try:
|
||
# ForwardRef for self-referencing models is complex; not fully handled here yet.
|
||
# If a type in field_definitions is a string (e.g., a ForwardRef string), create_model handles it.
|
||
DynamicModel = create_model(safe_model_name, **field_definitions, __base__=BaseModel) # type: ignore
|
||
_dynamic_model_cache[safe_model_name] = DynamicModel
|
||
self.logger.info(f"成功创建/缓存了动态Pydantic模型: '{safe_model_name}'")
|
||
|
||
# Attempt to update forward refs if any were string types that are now defined
|
||
# This is a simplified approach. Pydantic's update_forward_refs is usually called on the module or specific model.
|
||
# For dynamically created models, this might need careful handling if true circular deps are common.
|
||
# For now, we assume nested creation order mostly handles dependencies.
|
||
# if hasattr(DynamicModel, 'update_forward_refs'):
|
||
# try:
|
||
# DynamicModel.update_forward_refs(**_dynamic_model_cache)
|
||
# self.logger.debug(f"Attempted to update forward refs for {safe_model_name}")
|
||
# except Exception as e_fwd:
|
||
# self.logger.warning(f"Error updating forward_refs for {safe_model_name}: {e_fwd}")
|
||
|
||
return DynamicModel
|
||
except Exception as e:
|
||
self.logger.error(f"使用Pydantic create_model创建 '{safe_model_name}' 时失败: {e}", exc_info=True)
|
||
return None
|
||
|
||
def _execute_single_test_case(
|
||
self,
|
||
test_case_class: Type[BaseAPITestCase],
|
||
endpoint_spec: Union[YAPIEndpoint, SwaggerEndpoint], # 当前端点的规格
|
||
global_api_spec: Union[ParsedYAPISpec, ParsedSwaggerSpec] # 整个API的规格
|
||
) -> ExecutedTestCaseResult:
|
||
"""
|
||
实例化并执行单个APITestCase。
|
||
"""
|
||
tc_start_time = time.time()
|
||
validation_points: List[ValidationResult] = []
|
||
test_case_instance: Optional[BaseAPITestCase] = None
|
||
|
||
endpoint_spec_dict: Dict[str, Any]
|
||
# 确保 endpoint_spec 转换为字典,以便在测试用例和请求上下文中统一使用
|
||
if hasattr(endpoint_spec, 'to_dict') and callable(endpoint_spec.to_dict):
|
||
endpoint_spec_dict = endpoint_spec.to_dict()
|
||
elif isinstance(endpoint_spec, dict): # 如果它已经是字典 (例如从 OpenAPI 解析器直接过来)
|
||
endpoint_spec_dict = endpoint_spec
|
||
elif isinstance(endpoint_spec, (YAPIEndpoint, SwaggerEndpoint)): # 作为后备,从特定类型提取
|
||
self.logger.debug(f"Manually converting endpoint_spec of type {type(endpoint_spec).__name__} to dict.")
|
||
endpoint_spec_dict = {
|
||
"method": getattr(endpoint_spec, 'method', 'UNKNOWN_METHOD'),
|
||
"path": getattr(endpoint_spec, 'path', 'UNKNOWN_PATH'),
|
||
"title": getattr(endpoint_spec, 'title', getattr(endpoint_spec, 'summary', '')),
|
||
"summary": getattr(endpoint_spec, 'summary', ''),
|
||
"description": getattr(endpoint_spec, 'description', ''),
|
||
"operationId": getattr(endpoint_spec, 'operation_id',
|
||
f"{getattr(endpoint_spec, 'method', '').upper()}_{getattr(endpoint_spec, 'path', '').replace('/', '_')}"),
|
||
# 尝试提取参数和请求体 (简化版)
|
||
"parameters": getattr(endpoint_spec, 'parameters', []) if isinstance(endpoint_spec, SwaggerEndpoint) else (getattr(endpoint_spec, 'req_query', []) + getattr(endpoint_spec, 'req_headers', [])),
|
||
"requestBody": getattr(endpoint_spec, 'request_body', None) if isinstance(endpoint_spec, SwaggerEndpoint) else getattr(endpoint_spec, 'req_body_other', None),
|
||
"_original_object_type": type(endpoint_spec).__name__
|
||
}
|
||
else:
|
||
endpoint_spec_dict = {}
|
||
self.logger.warning(f"endpoint_spec无法转换为字典,实际类型: {type(endpoint_spec)}")
|
||
|
||
global_api_spec_dict: Dict[str, Any]
|
||
if hasattr(global_api_spec, 'to_dict') and callable(global_api_spec.to_dict):
|
||
global_api_spec_dict = global_api_spec.to_dict()
|
||
elif isinstance(global_api_spec, dict):
|
||
global_api_spec_dict = global_api_spec
|
||
else:
|
||
global_api_spec_dict = {}
|
||
self.logger.warning(f"global_api_spec无法转换为字典,实际类型: {type(global_api_spec)}")
|
||
|
||
|
||
try:
|
||
test_case_instance = test_case_class(
|
||
endpoint_spec=endpoint_spec_dict,
|
||
global_api_spec=global_api_spec_dict
|
||
)
|
||
test_case_instance.logger.info(f"开始执行测试用例 '{test_case_instance.id}' for endpoint '{endpoint_spec_dict.get('method')} {endpoint_spec_dict.get('path')}'")
|
||
|
||
# 调用 _prepare_initial_request_data 时传递 test_case_instance
|
||
# 并直接解包返回的元组
|
||
method, path_params_data, query_params_data, headers_data, body_data = \
|
||
self._prepare_initial_request_data(endpoint_spec_dict, test_case_instance=test_case_instance)
|
||
|
||
# 让测试用例有机会修改这些生成的数据
|
||
# 注意: BaseAPITestCase 中的 generate_* 方法现在需要传入 endpoint_spec_dict
|
||
# 因为它们可能需要原始的端点定义来进行更复杂的逻辑
|
||
current_q_params = test_case_instance.generate_query_params(query_params_data)
|
||
current_headers = test_case_instance.generate_headers(headers_data)
|
||
current_body = test_case_instance.generate_request_body(body_data)
|
||
# 路径参数通常由编排器根据路径模板和数据最终确定,但如果测试用例要覆盖,可以提供 generate_path_params
|
||
# 这里我们使用从 _prepare_initial_request_data 返回的 path_params_data 作为基础
|
||
current_path_params = test_case_instance.generate_path_params(path_params_data) if hasattr(test_case_instance, 'generate_path_params') and callable(getattr(test_case_instance, 'generate_path_params')) and getattr(test_case_instance, 'generate_path_params').__func__ != BaseAPITestCase.generate_path_params else path_params_data
|
||
|
||
|
||
final_url_template = endpoint_spec_dict.get('path', '')
|
||
final_url = self.base_url + final_url_template
|
||
for p_name, p_val in current_path_params.items():
|
||
placeholder = f"{{{p_name}}}"
|
||
if placeholder in final_url_template: # 替换基础路径模板中的占位符
|
||
final_url = final_url.replace(placeholder, str(p_val))
|
||
# 注意: 如果 _prepare_initial_request_data 填充的 final_url 已经包含了 base_url,这里的拼接逻辑需要调整
|
||
# 假设 final_url_template 只是 path string e.g. /users/{id}
|
||
|
||
api_request_context = APIRequestContext(
|
||
method=method, # 使用从 _prepare_initial_request_data 获取的 method
|
||
url=final_url,
|
||
path_params=current_path_params,
|
||
query_params=current_q_params,
|
||
headers=current_headers,
|
||
body=current_body,
|
||
endpoint_spec=endpoint_spec_dict
|
||
)
|
||
|
||
validation_points.extend(test_case_instance.validate_request_url(api_request_context.url, api_request_context))
|
||
validation_points.extend(test_case_instance.validate_request_headers(api_request_context.headers, api_request_context))
|
||
validation_points.extend(test_case_instance.validate_request_body(api_request_context.body, api_request_context))
|
||
|
||
critical_pre_validation_failure = False
|
||
failure_messages = []
|
||
for vp in validation_points:
|
||
if not vp.passed and test_case_instance.severity in [TestSeverity.CRITICAL, TestSeverity.HIGH]: # Check severity of the Test Case for pre-validation
|
||
critical_pre_validation_failure = True
|
||
failure_messages.append(vp.message)
|
||
|
||
if critical_pre_validation_failure:
|
||
self.logger.warning(f"测试用例 '{test_case_instance.id}' 因请求预校验失败而中止 (TC严重级别: {test_case_instance.severity.value})。失败信息: {'; '.join(failure_messages)}")
|
||
tc_duration = time.time() - tc_start_time
|
||
return ExecutedTestCaseResult(
|
||
test_case_id=test_case_instance.id,
|
||
test_case_name=test_case_instance.name,
|
||
test_case_severity=test_case_instance.severity,
|
||
status=ExecutedTestCaseResult.Status.FAILED,
|
||
validation_points=validation_points,
|
||
message=f"请求预校验失败: {'; '.join(failure_messages)}",
|
||
duration=tc_duration
|
||
)
|
||
|
||
api_request_obj = APIRequest(
|
||
method=api_request_context.method,
|
||
url=api_request_context.url,
|
||
params=api_request_context.query_params,
|
||
headers=api_request_context.headers,
|
||
json_data=api_request_context.body
|
||
)
|
||
|
||
response_call_start_time = time.time()
|
||
api_response_obj = self.api_caller.call_api(api_request_obj)
|
||
response_call_elapsed_time = time.time() - response_call_start_time
|
||
|
||
actual_text_content: Optional[str] = None
|
||
if hasattr(api_response_obj, 'text_content') and api_response_obj.text_content is not None:
|
||
actual_text_content = api_response_obj.text_content
|
||
elif api_response_obj.json_content is not None:
|
||
if isinstance(api_response_obj.json_content, str): # Should not happen if json_content is parsed
|
||
actual_text_content = api_response_obj.json_content
|
||
else:
|
||
try:
|
||
actual_text_content = json.dumps(api_response_obj.json_content, ensure_ascii=False)
|
||
except TypeError: # If json_content is not serializable (e.g. bytes)
|
||
actual_text_content = str(api_response_obj.json_content)
|
||
|
||
|
||
api_response_context = APIResponseContext(
|
||
status_code=api_response_obj.status_code,
|
||
headers=api_response_obj.headers,
|
||
json_content=api_response_obj.json_content,
|
||
text_content=actual_text_content,
|
||
elapsed_time=response_call_elapsed_time,
|
||
original_response= getattr(api_response_obj, 'raw_response', None), # Pass raw if available
|
||
request_context=api_request_context
|
||
)
|
||
|
||
validation_points.extend(test_case_instance.validate_response(api_response_context, api_request_context))
|
||
validation_points.extend(test_case_instance.check_performance(api_response_context, api_request_context))
|
||
|
||
final_status = ExecutedTestCaseResult.Status.PASSED
|
||
if any(not vp.passed for vp in validation_points):
|
||
final_status = ExecutedTestCaseResult.Status.FAILED
|
||
|
||
tc_duration = time.time() - tc_start_time
|
||
return ExecutedTestCaseResult(
|
||
test_case_id=test_case_instance.id,
|
||
test_case_name=test_case_instance.name,
|
||
test_case_severity=test_case_instance.severity,
|
||
status=final_status,
|
||
validation_points=validation_points,
|
||
duration=tc_duration
|
||
)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"执行测试用例 '{test_case_class.id if test_case_instance else test_case_class.__name__}' 时发生严重错误: {e}", exc_info=True)
|
||
tc_duration = time.time() - tc_start_time
|
||
return ExecutedTestCaseResult(
|
||
test_case_id=test_case_instance.id if test_case_instance else test_case_class.id if hasattr(test_case_class, 'id') else "unknown_tc_id",
|
||
test_case_name=test_case_instance.name if test_case_instance else test_case_class.name if hasattr(test_case_class, 'name') else "Unknown Test Case Name",
|
||
test_case_severity=test_case_instance.severity if test_case_instance else TestSeverity.CRITICAL,
|
||
status=ExecutedTestCaseResult.Status.ERROR,
|
||
validation_points=validation_points,
|
||
message=f"测试用例执行时发生内部错误: {str(e)}",
|
||
duration=tc_duration
|
||
)
|
||
|
||
def _prepare_initial_request_data(
|
||
self,
|
||
endpoint_spec: Dict[str, Any],
|
||
test_case_instance: Optional[BaseAPITestCase] = None
|
||
) -> Tuple[str, Dict[str, Any], Dict[str, Any], Dict[str, Any], Optional[Any]]:
|
||
"""
|
||
根据OpenAPI端点规格和测试用例实例准备初始请求数据。
|
||
包含端点级别的LLM参数缓存逻辑。
|
||
"""
|
||
method = endpoint_spec.get("method", "get").upper()
|
||
operation_id = endpoint_spec.get("operationId", f"{method}_{endpoint_spec.get('path', '')}")
|
||
endpoint_cache_key = f"{method}_{endpoint_spec.get('path', '')}"
|
||
|
||
self.logger.info(f"[{operation_id}] 开始为端点 {endpoint_cache_key} 准备初始请求数据 (TC: {test_case_instance.id if test_case_instance else 'N/A'})")
|
||
|
||
# 尝试从缓存加载参数
|
||
if endpoint_cache_key in self.llm_endpoint_params_cache:
|
||
cached_params = self.llm_endpoint_params_cache[endpoint_cache_key]
|
||
self.logger.info(f"[{operation_id}] 从缓存加载了端点 '{endpoint_cache_key}' 的LLM参数。")
|
||
# 直接从缓存中获取各类参数,如果存在的话
|
||
path_params_data = cached_params.get("path_params", {})
|
||
query_params_data = cached_params.get("query_params", {})
|
||
headers_data = cached_params.get("headers", {})
|
||
body_data = cached_params.get("body") # Body可能是None
|
||
|
||
# 即使从缓存加载,仍需确保默认头部(如Accept, Content-Type)存在或被正确设置
|
||
# Content-Type应基于body_data是否存在来决定
|
||
default_headers = {"Accept": "application/json"}
|
||
if body_data is not None and method not in ["GET", "DELETE", "HEAD", "OPTIONS"]:
|
||
default_headers["Content-Type"] = "application/json"
|
||
|
||
headers_data = {**default_headers, **headers_data} # 合并,缓存中的优先
|
||
|
||
self.logger.debug(f"[{operation_id}] (缓存加载) 准备的请求数据: method={method}, path_params={path_params_data}, query_params={query_params_data}, headers={list(headers_data.keys())}, body_type={type(body_data).__name__}")
|
||
return method, path_params_data, query_params_data, headers_data, body_data
|
||
|
||
# 缓存未命中,需要生成参数
|
||
self.logger.info(f"[{operation_id}] 端点 '{endpoint_cache_key}' 的参数未在缓存中找到,开始生成。")
|
||
generated_params_for_endpoint: Dict[str, Any] = {}
|
||
|
||
path_params_data: Dict[str, Any] = {}
|
||
query_params_data: Dict[str, Any] = {}
|
||
headers_data_generated: Dict[str, Any] = {} # LLM或常规生成的,不含默认
|
||
body_data: Optional[Any] = None
|
||
|
||
# 提取各类参数的定义列表
|
||
path_params_spec_list = [p for p in endpoint_spec.get("parameters", []) if p.get("in") == "path"]
|
||
query_params_spec_list = [p for p in endpoint_spec.get("parameters", []) if p.get("in") == "query"]
|
||
headers_spec_list = [p for p in endpoint_spec.get("parameters", []) if p.get("in") == "header"]
|
||
request_body_spec = endpoint_spec.get("requestBody", {}).get("content", {}).get("application/json", {}).get("schema")
|
||
|
||
# --- 1. 处理路径参数 ---
|
||
param_type_key = "path_params"
|
||
if self._should_use_llm_for_param_type(param_type_key, test_case_instance) and path_params_spec_list:
|
||
self.logger.info(f"[{operation_id}] 尝试使用LLM生成路径参数。")
|
||
object_schema, model_name = self._build_object_schema_for_params(path_params_spec_list, f"DynamicPathParamsFor_{operation_id}")
|
||
if object_schema and model_name:
|
||
try:
|
||
PydanticModel = self._create_pydantic_model_from_schema(object_schema, model_name)
|
||
if PydanticModel:
|
||
llm_generated = self.llm_service.generate_parameters_from_schema(
|
||
PydanticModel,
|
||
prompt_instruction=f"Generate valid path parameters for API operation: {operation_id}. Description: {endpoint_spec.get('description', '') or endpoint_spec.get('summary', 'N/A')}"
|
||
)
|
||
if isinstance(llm_generated, dict):
|
||
path_params_data = llm_generated
|
||
self.logger.info(f"[{operation_id}] LLM成功生成路径参数: {path_params_data}")
|
||
else:
|
||
self.logger.warning(f"[{operation_id}] LLM为路径参数返回了非字典类型: {type(llm_generated)}。回退到常规生成。")
|
||
path_params_data = self._generate_params_from_list(path_params_spec_list, operation_id, "path")
|
||
else:
|
||
path_params_data = self._generate_params_from_list(path_params_spec_list, operation_id, "path")
|
||
except Exception as e:
|
||
self.logger.error(f"[{operation_id}] LLM生成路径参数失败: {e}。回退到常规生成。", exc_info=True)
|
||
path_params_data = self._generate_params_from_list(path_params_spec_list, operation_id, "path")
|
||
else: # _build_object_schema_for_params 返回 None
|
||
path_params_data = self._generate_params_from_list(path_params_spec_list, operation_id, "path")
|
||
else: # 不使用LLM或LLM服务不可用,或者 path_params_spec_list 为空但仍需确保path_params_data被赋值
|
||
if self._should_use_llm_for_param_type(param_type_key, test_case_instance) and not path_params_spec_list:
|
||
self.logger.info(f"[{operation_id}] 配置为路径参数使用LLM,但没有定义路径参数规格。")
|
||
# 对于不使用LLM或LLM不适用的情况,或者 spec_list 为空的情况,都执行常规生成(如果 spec_list 非空则会记录)
|
||
if path_params_spec_list and not self._should_use_llm_for_param_type(param_type_key, test_case_instance):
|
||
self.logger.info(f"[{operation_id}] 使用常规方法或LLM未启用,为路径参数。")
|
||
path_params_data = self._generate_params_from_list(path_params_spec_list, operation_id, "path")
|
||
generated_params_for_endpoint[param_type_key] = path_params_data
|
||
|
||
# --- 2. 处理查询参数 ---
|
||
param_type_key = "query_params"
|
||
if self._should_use_llm_for_param_type(param_type_key, test_case_instance) and query_params_spec_list:
|
||
self.logger.info(f"[{operation_id}] 尝试使用LLM生成查询参数。")
|
||
object_schema, model_name = self._build_object_schema_for_params(query_params_spec_list, f"DynamicQueryParamsFor_{operation_id}")
|
||
if object_schema and model_name:
|
||
try:
|
||
PydanticModel = self._create_pydantic_model_from_schema(object_schema, model_name)
|
||
if PydanticModel:
|
||
llm_generated = self.llm_service.generate_parameters_from_schema(
|
||
PydanticModel,
|
||
prompt_instruction=f"Generate valid query parameters for API operation: {operation_id}. Description: {endpoint_spec.get('description', '') or endpoint_spec.get('summary', 'N/A')}"
|
||
)
|
||
if isinstance(llm_generated, dict):
|
||
query_params_data = llm_generated
|
||
self.logger.info(f"[{operation_id}] LLM成功生成查询参数: {query_params_data}")
|
||
else:
|
||
self.logger.warning(f"[{operation_id}] LLM为查询参数返回了非字典类型: {type(llm_generated)}。回退到常规生成。")
|
||
query_params_data = self._generate_params_from_list(query_params_spec_list, operation_id, "query")
|
||
else:
|
||
query_params_data = self._generate_params_from_list(query_params_spec_list, operation_id, "query")
|
||
except Exception as e:
|
||
self.logger.error(f"[{operation_id}] LLM生成查询参数失败: {e}。回退到常规生成。", exc_info=True)
|
||
query_params_data = self._generate_params_from_list(query_params_spec_list, operation_id, "query")
|
||
else: # _build_object_schema_for_params 返回 None
|
||
query_params_data = self._generate_params_from_list(query_params_spec_list, operation_id, "query")
|
||
else: # 不使用LLM或LLM服务不可用,或者 query_params_spec_list 为空
|
||
if self._should_use_llm_for_param_type(param_type_key, test_case_instance) and not query_params_spec_list:
|
||
self.logger.info(f"[{operation_id}] 配置为查询参数使用LLM,但没有定义查询参数规格。")
|
||
if query_params_spec_list and not self._should_use_llm_for_param_type(param_type_key, test_case_instance):
|
||
self.logger.info(f"[{operation_id}] 使用常规方法或LLM未启用,为查询参数。")
|
||
query_params_data = self._generate_params_from_list(query_params_spec_list, operation_id, "query")
|
||
generated_params_for_endpoint[param_type_key] = query_params_data
|
||
|
||
# --- 3. 处理头部参数 ---
|
||
param_type_key = "headers"
|
||
if self._should_use_llm_for_param_type(param_type_key, test_case_instance) and headers_spec_list:
|
||
self.logger.info(f"[{operation_id}] 尝试使用LLM生成头部参数。")
|
||
object_schema, model_name = self._build_object_schema_for_params(headers_spec_list, f"DynamicHeadersFor_{operation_id}")
|
||
if object_schema and model_name:
|
||
try:
|
||
PydanticModel = self._create_pydantic_model_from_schema(object_schema, model_name)
|
||
if PydanticModel:
|
||
llm_generated = self.llm_service.generate_parameters_from_schema(
|
||
PydanticModel,
|
||
prompt_instruction=f"Generate valid HTTP headers for API operation: {operation_id}. Description: {endpoint_spec.get('description', '') or endpoint_spec.get('summary', 'N/A')}"
|
||
)
|
||
if isinstance(llm_generated, dict):
|
||
headers_data_generated = llm_generated # Store LLM generated ones separately first
|
||
self.logger.info(f"[{operation_id}] LLM成功生成头部参数: {headers_data_generated}")
|
||
else:
|
||
self.logger.warning(f"[{operation_id}] LLM为头部参数返回了非字典类型: {type(llm_generated)}。回退到常规生成。")
|
||
headers_data_generated = self._generate_params_from_list(headers_spec_list, operation_id, "header")
|
||
else:
|
||
headers_data_generated = self._generate_params_from_list(headers_spec_list, operation_id, "header")
|
||
except Exception as e:
|
||
self.logger.error(f"[{operation_id}] LLM生成头部参数失败: {e}。回退到常规生成。", exc_info=True)
|
||
headers_data_generated = self._generate_params_from_list(headers_spec_list, operation_id, "header")
|
||
else: # _build_object_schema_for_params 返回 None
|
||
headers_data_generated = self._generate_params_from_list(headers_spec_list, operation_id, "header")
|
||
else: # 不使用LLM或LLM服务不可用,或者 headers_spec_list 为空
|
||
if self._should_use_llm_for_param_type(param_type_key, test_case_instance) and not headers_spec_list:
|
||
self.logger.info(f"[{operation_id}] 配置为头部参数使用LLM,但没有定义头部参数规格。")
|
||
if headers_spec_list and not self._should_use_llm_for_param_type(param_type_key, test_case_instance):
|
||
self.logger.info(f"[{operation_id}] 使用常规方法或LLM未启用,为头部参数。")
|
||
headers_data_generated = self._generate_params_from_list(headers_spec_list, operation_id, "header")
|
||
generated_params_for_endpoint[param_type_key] = headers_data_generated
|
||
|
||
# --- 4. 处理请求体 ---
|
||
param_type_key = "body"
|
||
if self._should_use_llm_for_param_type(param_type_key, test_case_instance) and request_body_spec:
|
||
self.logger.info(f"[{operation_id}] 尝试使用LLM生成请求体。")
|
||
model_name = f"DynamicBodyFor_{operation_id}"
|
||
try:
|
||
PydanticModel = self._create_pydantic_model_from_schema(request_body_spec, model_name)
|
||
if PydanticModel:
|
||
llm_generated_body = self.llm_service.generate_parameters_from_schema(
|
||
PydanticModel,
|
||
prompt_instruction=f"Generate a valid JSON request body for API operation: {operation_id}. Description: {endpoint_spec.get('description', '') or endpoint_spec.get('summary', 'N/A')}. Schema: {json.dumps(request_body_spec, indent=2)}"
|
||
)
|
||
if isinstance(llm_generated_body, dict):
|
||
try:
|
||
body_data = PydanticModel(**llm_generated_body).model_dump(by_alias=True)
|
||
self.logger.info(f"[{operation_id}] LLM成功生成并验证请求体。")
|
||
except ValidationError as ve:
|
||
self.logger.error(f"[{operation_id}] LLM生成的请求体未能通过Pydantic模型验证: {ve}。回退到常规生成。")
|
||
body_data = self._generate_data_from_schema(request_body_spec, "requestBody", operation_id)
|
||
elif isinstance(llm_generated_body, BaseModel): # LLM直接返回模型实例
|
||
body_data = llm_generated_body.model_dump(by_alias=True)
|
||
self.logger.info(f"[{operation_id}] LLM成功生成请求体 (模型实例)。")
|
||
else:
|
||
self.logger.warning(f"[{operation_id}] LLM为请求体返回了非预期类型: {type(llm_generated_body)}。回退到常规生成。")
|
||
body_data = self._generate_data_from_schema(request_body_spec, "requestBody", operation_id)
|
||
else: # _create_pydantic_model_from_schema 返回 None
|
||
self.logger.warning(f"[{operation_id}] 未能为请求体创建Pydantic模型。回退到常规生成。")
|
||
body_data = self._generate_data_from_schema(request_body_spec, "requestBody", operation_id)
|
||
except Exception as e:
|
||
self.logger.error(f"[{operation_id}] LLM生成请求体失败: {e}。回退到常规生成。", exc_info=True)
|
||
body_data = self._generate_data_from_schema(request_body_spec, "requestBody", operation_id)
|
||
elif request_body_spec: # 不使用LLM但有body spec
|
||
self.logger.info(f"[{operation_id}] 使用常规方法或LLM未启用/不适用,为请求体。")
|
||
body_data = self._generate_data_from_schema(request_body_spec, "requestBody", operation_id)
|
||
else: # 没有requestBody定义
|
||
self.logger.info(f"[{operation_id}] 端点没有定义请求体。")
|
||
body_data = None # 明确设为None
|
||
generated_params_for_endpoint[param_type_key] = body_data
|
||
|
||
# 合并最终的头部 (默认头部 + 生成的头部)
|
||
final_headers = {"Accept": "application/json"}
|
||
if body_data is not None and method not in ["GET", "DELETE", "HEAD", "OPTIONS"]:
|
||
final_headers["Content-Type"] = "application/json"
|
||
final_headers.update(headers_data_generated) # headers_data_generated 是从LLM或常规生成的
|
||
|
||
# 将本次生成的所有参数存入缓存
|
||
self.llm_endpoint_params_cache[endpoint_cache_key] = generated_params_for_endpoint
|
||
self.logger.info(f"[{operation_id}] 端点 '{endpoint_cache_key}' 的参数已生成并存入缓存。")
|
||
|
||
# 确保路径参数中的值都是字符串 (URL部分必须是字符串)
|
||
path_params_data_str = {k: str(v) if v is not None else "" for k, v in path_params_data.items()}
|
||
|
||
self.logger.debug(f"[{operation_id}] (新生成) 准备的请求数据: method={method}, path_params={path_params_data_str}, query_params={query_params_data}, headers={list(final_headers.keys())}, body_type={type(body_data).__name__}")
|
||
return method, path_params_data_str, query_params_data, final_headers, body_data
|
||
|
||
def _build_object_schema_for_params(self, params_spec_list: List[Dict[str, Any]], model_name_base: str) -> Tuple[Optional[Dict[str, Any]], str]:
|
||
"""
|
||
将参数列表 (如路径参数、查询参数列表) 转换为一个单一的 "type: object" JSON schema,
|
||
以便用于创建 Pydantic 模型。
|
||
会尝试适配参数定义中缺少嵌套 'schema' 字段但有顶层 'type' 的情况。
|
||
"""
|
||
if not params_spec_list:
|
||
return None, model_name_base
|
||
|
||
properties = {}
|
||
required_params = []
|
||
|
||
parameter_names = []
|
||
for param_spec in params_spec_list:
|
||
param_name = param_spec.get("name")
|
||
if not param_name:
|
||
self.logger.warning(f"参数定义缺少 'name' 字段: {param_spec}。已跳过。")
|
||
continue
|
||
parameter_names.append(param_name)
|
||
|
||
param_schema = param_spec.get("schema")
|
||
|
||
# ---- 适配开始 ----
|
||
if not param_schema and param_spec.get("type"):
|
||
self.logger.debug(f"参数 '{param_name}' 缺少嵌套 'schema' 字段,尝试从顶层 'type' 构建临时schema。 Param spec: {param_spec}")
|
||
temp_schema = {"type": param_spec.get("type")}
|
||
# 从 param_spec 顶层提取其他相关字段到 temp_schema
|
||
for key in ["format", "default", "example", "description", "enum",
|
||
"minimum", "maximum", "minLength", "maxLength", "pattern",
|
||
"items"]: # items 用于处理顶层定义的array
|
||
if key in param_spec:
|
||
temp_schema[key] = param_spec[key]
|
||
param_schema = temp_schema
|
||
# ---- 适配结束 ----
|
||
|
||
if not param_schema: # 如果适配后仍然没有schema
|
||
self.logger.warning(f"参数 '{param_name}' 缺少 'schema' 定义且无法从顶层构建: {param_spec}。已跳过。")
|
||
continue
|
||
|
||
# 处理 $ref (简单情况,假设ref在components.schemas)
|
||
# 更复杂的 $ref 解析可能需要访问完整的OpenAPI文档
|
||
if isinstance(param_schema, dict) and "$ref" in param_schema: # 确保 param_schema 是字典再检查 $ref
|
||
ref_path = param_schema["$ref"]
|
||
# 这是一个非常简化的$ref处理,实际可能需要解析整个文档
|
||
self.logger.warning(f"参数 '{param_name}' 的 schema 包含 $ref '{ref_path}',当前不支持自动解析。请确保schema是内联的。")
|
||
# 可以尝试提供一个非常基础的schema,或者跳过这个参数,或者让_generate_data_from_schema处理
|
||
properties[param_name] = {"type": "string", "description": f"Reference to {ref_path}"}
|
||
elif isinstance(param_schema, dict): # 确保 param_schema 是字典
|
||
properties[param_name] = param_schema
|
||
else:
|
||
self.logger.warning(f"参数 '{param_name}' 的 schema 不是一个有效的字典: {param_schema}。已跳过。")
|
||
continue
|
||
|
||
if param_spec.get("required", False):
|
||
required_params.append(param_name)
|
||
|
||
if not properties: # 如果所有参数都无效
|
||
return None, model_name_base
|
||
|
||
model_name = f"{model_name_base}_{'_'.join(sorted(parameter_names))}" # 使模型名更具唯一性
|
||
|
||
object_schema = {
|
||
"type": "object",
|
||
"properties": properties,
|
||
}
|
||
if required_params:
|
||
object_schema["required"] = required_params
|
||
|
||
self.logger.debug(f"[{model_name_base}] 为参数集 {parameter_names} 构建的最终 Object Schema: {json.dumps(object_schema, indent=2)}, 模型名: {model_name}")
|
||
return object_schema, model_name
|
||
|
||
def _generate_params_from_list(self, params_spec_list: List[Dict[str, Any]], operation_id: str, param_type: str) -> Dict[str, Any]:
|
||
"""
|
||
遍历参数定义列表,使用 _generate_data_from_schema 为每个参数生成数据。
|
||
会尝试适配参数定义中缺少嵌套 'schema' 字段但有顶层 'type' 的情况。
|
||
"""
|
||
generated_params: Dict[str, Any] = {}
|
||
if not params_spec_list:
|
||
self.logger.info(f"[{operation_id}] 没有定义 {param_type} 参数。")
|
||
return generated_params
|
||
|
||
self.logger.info(f"[{operation_id}] 使用常规方法生成 {param_type} 参数。")
|
||
for param_spec in params_spec_list:
|
||
param_name = param_spec.get("name")
|
||
param_schema = param_spec.get("schema")
|
||
|
||
# ---- 适配开始 ----
|
||
if not param_schema and param_spec.get("type"):
|
||
self.logger.debug(f"参数 '{param_name}' ('{param_type}' 类型) 缺少嵌套 'schema' 字段,尝试从顶层 'type' 构建临时schema用于常规生成。 Param spec: {param_spec}")
|
||
temp_schema = {"type": param_spec.get("type")}
|
||
# 从 param_spec 顶层提取其他相关字段到 temp_schema
|
||
for key in ["format", "default", "example", "description", "enum",
|
||
"minimum", "maximum", "minLength", "maxLength", "pattern",
|
||
"items"]: # items 用于处理顶层定义的array
|
||
if key in param_spec:
|
||
temp_schema[key] = param_spec[key]
|
||
param_schema = temp_schema
|
||
# ---- 适配结束 ----
|
||
|
||
if param_name and param_schema and isinstance(param_schema, dict): # 确保param_schema是字典
|
||
generated_value = self._generate_data_from_schema(
|
||
param_schema,
|
||
context_name=f"{param_type} parameter '{param_name}'",
|
||
operation_id=operation_id
|
||
)
|
||
if generated_value is not None:
|
||
generated_params[param_name] = generated_value
|
||
elif param_spec.get("required"):
|
||
self.logger.warning(f"[{operation_id}] 未能为必需的 {param_type} 参数 '{param_name}' 生成数据 (schema: {param_schema}),且其 schema 中可能没有有效的默认值或示例。")
|
||
else:
|
||
self.logger.warning(f"[{operation_id}] 跳过无效的 {param_type} 参数定义 (名称: {param_name}, schema: {param_schema}): {param_spec}")
|
||
self.logger.info(f"[{operation_id}] 常规方法生成的 {param_type} 参数: {generated_params}")
|
||
return generated_params
|
||
|
||
def run_test_for_endpoint(self, endpoint: Union[YAPIEndpoint, SwaggerEndpoint],
|
||
global_api_spec: Union[ParsedYAPISpec, ParsedSwaggerSpec]
|
||
) -> TestResult:
|
||
endpoint_id = f"{getattr(endpoint, 'method', 'GET').upper()} {getattr(endpoint, 'path', '/')}"
|
||
endpoint_name = getattr(endpoint, 'title', '') or getattr(endpoint, 'summary', '') or endpoint_id
|
||
|
||
self.logger.info(f"开始为端点测试: {endpoint_id} ({endpoint_name})")
|
||
|
||
endpoint_test_result = TestResult(
|
||
endpoint_id=endpoint_id,
|
||
endpoint_name=endpoint_name,
|
||
)
|
||
|
||
if not self.test_case_registry:
|
||
self.logger.warning(f"TestCaseRegistry 未初始化,无法为端点 '{endpoint_id}' 执行自定义测试用例。")
|
||
endpoint_test_result.overall_status = TestResult.Status.SKIPPED
|
||
endpoint_test_result.error_message = "TestCaseRegistry 未初始化。"
|
||
endpoint_test_result.finalize_endpoint_test()
|
||
return endpoint_test_result
|
||
|
||
applicable_test_case_classes = self.test_case_registry.get_applicable_test_cases(
|
||
endpoint_method=endpoint.method.upper(),
|
||
endpoint_path=endpoint.path
|
||
)
|
||
|
||
if not applicable_test_case_classes:
|
||
self.logger.info(f"端点 '{endpoint_id}' 没有找到适用的自定义测试用例。")
|
||
endpoint_test_result.finalize_endpoint_test()
|
||
return endpoint_test_result
|
||
|
||
self.logger.info(f"端点 '{endpoint_id}' 发现了 {len(applicable_test_case_classes)} 个适用的测试用例: {[tc.id for tc in applicable_test_case_classes]}")
|
||
|
||
for tc_class in applicable_test_case_classes:
|
||
self.logger.debug(f"准备执行测试用例 '{tc_class.id}' for '{endpoint_id}'")
|
||
executed_case_result = self._execute_single_test_case(
|
||
test_case_class=tc_class,
|
||
endpoint_spec=endpoint,
|
||
global_api_spec=global_api_spec
|
||
)
|
||
endpoint_test_result.add_executed_test_case_result(executed_case_result)
|
||
self.logger.debug(f"测试用例 '{tc_class.id}' 执行完毕,状态: {executed_case_result.status.value}")
|
||
|
||
endpoint_test_result.finalize_endpoint_test()
|
||
self.logger.info(f"端点 '{endpoint_id}' 测试完成,最终状态: {endpoint_test_result.overall_status.value}")
|
||
|
||
return endpoint_test_result
|
||
|
||
def run_tests_from_yapi(self, yapi_file_path: str,
|
||
categories: Optional[List[str]] = None,
|
||
custom_test_cases_dir: Optional[str] = None
|
||
) -> TestSummary:
|
||
if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir):
|
||
self.logger.info(f"从 run_tests_from_yapi 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}")
|
||
try:
|
||
self.test_case_registry = TestCaseRegistry(test_cases_dir=custom_test_cases_dir)
|
||
self.logger.info(f"TestCaseRegistry (re)initialization complete, found {len(self.test_case_registry.get_all_test_case_classes())} test case classes.")
|
||
except Exception as e:
|
||
self.logger.error(f"从 run_tests_from_yapi 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True)
|
||
|
||
self.logger.info(f"从YAPI文件加载API定义: {yapi_file_path}")
|
||
parsed_yapi = self.parser.parse_yapi_spec(yapi_file_path)
|
||
summary = TestSummary()
|
||
|
||
if not parsed_yapi:
|
||
self.logger.error(f"解析YAPI文件失败: {yapi_file_path}")
|
||
summary.finalize_summary()
|
||
return summary
|
||
|
||
endpoints_to_test = parsed_yapi.endpoints
|
||
if categories:
|
||
endpoints_to_test = [ep for ep in endpoints_to_test if ep.category_name in categories]
|
||
|
||
summary.set_total_endpoints_defined(len(endpoints_to_test))
|
||
|
||
total_applicable_tcs = 0
|
||
if self.test_case_registry:
|
||
for endpoint_spec in endpoints_to_test:
|
||
total_applicable_tcs += len(
|
||
self.test_case_registry.get_applicable_test_cases(
|
||
endpoint_spec.method.upper(), endpoint_spec.path
|
||
)
|
||
)
|
||
summary.set_total_test_cases_applicable(total_applicable_tcs)
|
||
|
||
for endpoint in endpoints_to_test:
|
||
result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_yapi)
|
||
summary.add_endpoint_result(result)
|
||
|
||
summary.finalize_summary()
|
||
return summary
|
||
|
||
def run_tests_from_swagger(self, swagger_file_path: str,
|
||
tags: Optional[List[str]] = None,
|
||
custom_test_cases_dir: Optional[str] = None
|
||
) -> TestSummary:
|
||
if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir):
|
||
self.logger.info(f"从 run_tests_from_swagger 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}")
|
||
try:
|
||
self.test_case_registry = TestCaseRegistry(test_cases_dir=custom_test_cases_dir)
|
||
self.logger.info(f"TestCaseRegistry (re)initialization complete, found {len(self.test_case_registry.get_all_test_case_classes())} test case classes.")
|
||
except Exception as e:
|
||
self.logger.error(f"从 run_tests_from_swagger 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True)
|
||
|
||
self.logger.info(f"从Swagger文件加载API定义: {swagger_file_path}")
|
||
parsed_swagger = self.parser.parse_swagger_spec(swagger_file_path)
|
||
summary = TestSummary()
|
||
|
||
if not parsed_swagger:
|
||
self.logger.error(f"解析Swagger文件失败: {swagger_file_path}")
|
||
summary.finalize_summary()
|
||
return summary
|
||
|
||
endpoints_to_test = parsed_swagger.endpoints
|
||
if tags:
|
||
endpoints_to_test = [ep for ep in endpoints_to_test if any(tag in ep.tags for tag in tags)]
|
||
|
||
summary.set_total_endpoints_defined(len(endpoints_to_test))
|
||
|
||
total_applicable_tcs = 0
|
||
if self.test_case_registry:
|
||
for endpoint_spec in endpoints_to_test:
|
||
total_applicable_tcs += len(
|
||
self.test_case_registry.get_applicable_test_cases(
|
||
endpoint_spec.method.upper(), endpoint_spec.path
|
||
)
|
||
)
|
||
summary.set_total_test_cases_applicable(total_applicable_tcs)
|
||
|
||
for endpoint in endpoints_to_test:
|
||
result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_swagger)
|
||
summary.add_endpoint_result(result)
|
||
|
||
summary.finalize_summary()
|
||
return summary
|
||
|
||
def _generate_data_from_schema(self, schema: Dict[str, Any],
|
||
context_name: Optional[str] = None,
|
||
operation_id: Optional[str] = None) -> Any:
|
||
"""
|
||
根据JSON Schema生成测试数据 (此方法基本保持不变,可能被测试用例或编排器内部使用)
|
||
增加了 context_name 和 operation_id 用于更详细的日志。
|
||
"""
|
||
log_prefix = f"[{operation_id}] " if operation_id else ""
|
||
context_log = f" (context: {context_name})" if context_name else ""
|
||
|
||
if not schema or not isinstance(schema, dict):
|
||
self.logger.debug(f"{log_prefix}_generate_data_from_schema: 提供的 schema 无效或为空{context_log}: {schema}")
|
||
return None
|
||
|
||
schema_type = schema.get('type')
|
||
|
||
if 'example' in schema:
|
||
self.logger.debug(f"{log_prefix}使用 schema 中的 'example' 值 for{context_log}: {schema['example']}")
|
||
return schema['example']
|
||
if 'default' in schema:
|
||
self.logger.debug(f"{log_prefix}使用 schema 中的 'default' 值 for{context_log}: {schema['default']}")
|
||
return schema['default']
|
||
|
||
if schema_type == 'object':
|
||
result = {}
|
||
properties = schema.get('properties', {})
|
||
self.logger.debug(f"{log_prefix}生成 object 类型数据 for{context_log}. Properties: {list(properties.keys())}")
|
||
for prop_name, prop_schema in properties.items():
|
||
# 递归调用时传递上下文,但稍微修改一下 context_name
|
||
nested_context = f"{context_name}.{prop_name}" if context_name else prop_name
|
||
result[prop_name] = self._generate_data_from_schema(prop_schema, nested_context, operation_id)
|
||
return result if result else {}
|
||
|
||
elif schema_type == 'array':
|
||
items_schema = schema.get('items', {})
|
||
min_items = schema.get('minItems', 1 if schema.get('default') is None and schema.get('example') is None else 0)
|
||
self.logger.debug(f"{log_prefix}生成 array 类型数据 for{context_log}. Items schema: {items_schema}, minItems: {min_items}")
|
||
if min_items == 0 and (schema.get('default') == [] or schema.get('example') == []):
|
||
return []
|
||
|
||
num_items_to_generate = max(1, min_items)
|
||
generated_array = []
|
||
for i in range(num_items_to_generate):
|
||
item_context = f"{context_name}[{i}]" if context_name else f"array_item[{i}]"
|
||
generated_array.append(self._generate_data_from_schema(items_schema, item_context, operation_id))
|
||
return generated_array
|
||
|
||
elif schema_type == 'string':
|
||
string_format = schema.get('format', '')
|
||
val = None
|
||
if 'enum' in schema and schema['enum']:
|
||
val = schema['enum'][0]
|
||
elif string_format == 'date': val = '2023-01-01'
|
||
elif string_format == 'date-time': val = datetime.datetime.now().isoformat()
|
||
elif string_format == 'email': val = 'test@example.com'
|
||
elif string_format == 'uuid': import uuid; val = str(uuid.uuid4())
|
||
else: val = 'example_string'
|
||
self.logger.debug(f"{log_prefix}生成 string 类型数据 ('{string_format}') for{context_log}: {val}")
|
||
return val
|
||
|
||
elif schema_type == 'number' or schema_type == 'integer':
|
||
val_to_return = schema.get('default', schema.get('example'))
|
||
if val_to_return is not None:
|
||
self.logger.debug(f"{log_prefix}使用 number/integer 的 default/example 值 for{context_log}: {val_to_return}")
|
||
return val_to_return
|
||
|
||
minimum = schema.get('minimum')
|
||
# maximum = schema.get('maximum') # Not used yet for generation, but could be
|
||
if minimum is not None:
|
||
val_to_return = minimum
|
||
else:
|
||
val_to_return = 0 if schema_type == 'integer' else 0.0
|
||
self.logger.debug(f"{log_prefix}生成 number/integer 类型数据 for{context_log}: {val_to_return}")
|
||
return val_to_return
|
||
|
||
elif schema_type == 'boolean':
|
||
val = schema.get('default', schema.get('example', False))
|
||
self.logger.debug(f"{log_prefix}生成 boolean 类型数据 for{context_log}: {val}")
|
||
return val
|
||
|
||
elif schema_type == 'null':
|
||
self.logger.debug(f"{log_prefix}生成 null 类型数据 for{context_log}")
|
||
return None
|
||
|
||
self.logger.debug(f"{log_prefix}_generate_data_from_schema: 未知或不支持的 schema 类型 '{schema_type}' for{context_log}. Schema: {schema}")
|
||
return None
|
||
|
||
|