This commit is contained in:
gongwenxin 2025-05-21 16:04:09 +08:00
parent 4318cc103d
commit 00de3a880a
16 changed files with 16573 additions and 7615 deletions

View File

@ -8,7 +8,7 @@ class StatusCode200Check(BaseAPITestCase):
description = "验证 API 响应状态码是否为 200 OK。" description = "验证 API 响应状态码是否为 200 OK。"
severity = TestSeverity.CRITICAL severity = TestSeverity.CRITICAL
tags = ["status_code", "smoke_test"] tags = ["status_code", "smoke_test"]
use_llm_for_body = False
# 适用于所有方法和路径 (默认) # 适用于所有方法和路径 (默认)
# applicable_methods = None # applicable_methods = None
# applicable_paths_regex = None # applicable_paths_regex = None
@ -52,6 +52,7 @@ class HeaderExistenceCheck(BaseAPITestCase):
description = "验证 API 响应是否包含 'X-Request-ID' 头。" description = "验证 API 响应是否包含 'X-Request-ID' 头。"
severity = TestSeverity.MEDIUM severity = TestSeverity.MEDIUM
tags = ["header", "observability"] tags = ["header", "observability"]
use_llm_for_body = False
EXPECTED_HEADER = "X-Request-ID" # 示例,可以根据实际需要修改 EXPECTED_HEADER = "X-Request-ID" # 示例,可以根据实际需要修改

View File

@ -83,6 +83,7 @@ class BaseAPITestCase:
applicable_methods: Optional[List[str]] = None applicable_methods: Optional[List[str]] = None
applicable_paths_regex: Optional[str] = None applicable_paths_regex: Optional[str] = None
use_llm_for_body: Optional[bool] = None # 新增属性控制此测试用例是否使用LLM生成请求体
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any]): def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any]):
""" """

View File

@ -7,9 +7,15 @@
import logging import logging
import json import json
import time import time
from typing import Dict, List, Any, Optional, Union, Tuple, Type import re # 添加 re 模块导入
from typing import Dict, List, Any, Optional, Union, Tuple, Type, ForwardRef
from enum import Enum from enum import Enum
import datetime import datetime
import datetime as dt
from uuid import UUID
from pydantic import BaseModel, Field, create_model
from pydantic.networks import EmailStr
from .input_parser.parser import InputParser, YAPIEndpoint, SwaggerEndpoint, ParsedYAPISpec, ParsedSwaggerSpec from .input_parser.parser import InputParser, YAPIEndpoint, SwaggerEndpoint, ParsedYAPISpec, ParsedSwaggerSpec
from .api_caller.caller import APICaller, APIRequest, APIResponse from .api_caller.caller import APICaller, APIRequest, APIResponse
@ -23,6 +29,9 @@ except ImportError:
LLMService = None LLMService = None
logging.getLogger(__name__).info("LLMService 未找到LLM 相关功能将不可用。") logging.getLogger(__name__).info("LLMService 未找到LLM 相关功能将不可用。")
# Cache for dynamically created Pydantic models to avoid redefinition issues
_dynamic_model_cache: Dict[str, Type[BaseModel]] = {}
class ExecutedTestCaseResult: class ExecutedTestCaseResult:
"""存储单个APITestCase在其适用的端点上执行后的结果。""" """存储单个APITestCase在其适用的端点上执行后的结果。"""
@ -78,13 +87,11 @@ class TestResult: # 原来的 TestResult 被重构为 EndpointExecutionResult
def __init__(self, def __init__(self,
endpoint_id: str, # 通常是 method + path endpoint_id: str, # 通常是 method + path
endpoint_name: str, # API 的可读名称/标题 endpoint_name: str, # API 的可读名称/标题
# api_spec_details: Dict[str, Any], # 包含该端点从YAPI/Swagger解析的原始信息可选
overall_status: Status = Status.SKIPPED, # 默认为跳过,后续根据测试用例结果更新 overall_status: Status = Status.SKIPPED, # 默认为跳过,后续根据测试用例结果更新
start_time: Optional[datetime.datetime] = None start_time: Optional[datetime.datetime] = None
): ):
self.endpoint_id = endpoint_id self.endpoint_id = endpoint_id
self.endpoint_name = endpoint_name self.endpoint_name = endpoint_name
# self.api_spec_details = api_spec_details
self.overall_status = overall_status self.overall_status = overall_status
self.executed_test_cases: List[ExecutedTestCaseResult] = [] self.executed_test_cases: List[ExecutedTestCaseResult] = []
self.start_time = start_time if start_time else datetime.datetime.now() self.start_time = start_time if start_time else datetime.datetime.now()
@ -262,13 +269,13 @@ class TestSummary:
def print_summary_to_console(self): # Renamed from print_summary def print_summary_to_console(self): # Renamed from print_summary
# (Implementation can be more detailed based on the new stats) # (Implementation can be more detailed based on the new stats)
print("\\n===== 测试运行摘要 =====") print("\n===== 测试运行摘要 =====")
print(f"开始时间: {self.start_time.isoformat()}") print(f"开始时间: {self.start_time.isoformat()}")
if self.end_time: if self.end_time:
print(f"结束时间: {self.end_time.isoformat()}") print(f"结束时间: {self.end_time.isoformat()}")
print(f"总耗时: {self.duration:.2f}") print(f"总耗时: {self.duration:.2f}")
print("\\n--- 端点统计 ---") print("\n--- 端点统计 ---")
print(f"定义的端点总数: {self.total_endpoints_defined}") print(f"定义的端点总数: {self.total_endpoints_defined}")
print(f"实际测试的端点数: {self.total_endpoints_tested}") print(f"实际测试的端点数: {self.total_endpoints_tested}")
print(f" 通过: {self.endpoints_passed}") print(f" 通过: {self.endpoints_passed}")
@ -278,7 +285,7 @@ class TestSummary:
print(f" 跳过执行: {self.endpoints_skipped}") print(f" 跳过执行: {self.endpoints_skipped}")
print(f" 端点通过率: {self.endpoint_success_rate:.2f}%") print(f" 端点通过率: {self.endpoint_success_rate:.2f}%")
print("\\n--- 测试用例统计 ---") print("\n--- 测试用例统计 ---")
print(f"适用的测试用例总数 (计划执行): {self.total_test_cases_applicable}") print(f"适用的测试用例总数 (计划执行): {self.total_test_cases_applicable}")
print(f"实际执行的测试用例总数: {self.total_test_cases_executed}") print(f"实际执行的测试用例总数: {self.total_test_cases_executed}")
print(f" 通过: {self.test_cases_passed}") print(f" 通过: {self.test_cases_passed}")
@ -290,7 +297,7 @@ class TestSummary:
# 可选:打印失败的端点和测试用例摘要 # 可选:打印失败的端点和测试用例摘要
failed_endpoints = [res for res in self.detailed_results if res.overall_status == TestResult.Status.FAILED] failed_endpoints = [res for res in self.detailed_results if res.overall_status == TestResult.Status.FAILED]
if failed_endpoints: if failed_endpoints:
print("\\n--- 失败的端点摘要 ---") print("\n--- 失败的端点摘要 ---")
for ep_res in failed_endpoints: for ep_res in failed_endpoints:
print(f" 端点: {ep_res.endpoint_id} ({ep_res.endpoint_name}) - 状态: {ep_res.overall_status.value}") print(f" 端点: {ep_res.endpoint_id} ({ep_res.endpoint_name}) - 状态: {ep_res.overall_status.value}")
for tc_res in ep_res.executed_test_cases: for tc_res in ep_res.executed_test_cases:
@ -371,6 +378,186 @@ class APITestOrchestrator:
elif not self.use_llm_for_request_body: elif not self.use_llm_for_request_body:
self.logger.info("配置为不使用LLM生成请求体。") self.logger.info("配置为不使用LLM生成请求体。")
def _create_pydantic_model_from_schema(
self,
schema: Dict[str, Any],
model_name: str,
recursion_depth: int = 0
) -> Optional[Type[BaseModel]]:
"""
动态地从JSON Schema字典创建一个Pydantic模型类
支持嵌套对象和数组
Args:
schema: JSON Schema字典
model_name: 要创建的Pydantic模型的名称
recursion_depth: 当前递归深度用于防止无限循环
Returns:
一个Pydantic BaseModel的子类如果创建失败则返回None
"""
MAX_RECURSION_DEPTH = 10
if recursion_depth > MAX_RECURSION_DEPTH:
self.logger.error(f"创建Pydantic模型 '{model_name}' 时达到最大递归深度 {MAX_RECURSION_DEPTH}。可能存在循环引用。")
return None
# 清理模型名称使其成为有效的Python标识符
safe_model_name = "".join(c if c.isalnum() or c == '_' else '_' for c in model_name)
if not safe_model_name or not safe_model_name[0].isalpha() and safe_model_name[0] != '_':
safe_model_name = f"DynamicModel_{safe_model_name}"
# 检查缓存 (使用清理后的名称)
if safe_model_name in _dynamic_model_cache:
self.logger.debug(f"从缓存返回动态模型: {safe_model_name}")
return _dynamic_model_cache[safe_model_name]
self.logger.debug(f"开始从Schema创建Pydantic模型: '{safe_model_name}' (原始名: '{model_name}', 深度: {recursion_depth})")
if not isinstance(schema, dict) or schema.get('type') != 'object':
# Safely get type for logging if schema is not a dict or does not have 'type'
schema_type_for_log = schema.get('type') if isinstance(schema, dict) else type(schema).__name__
self.logger.error(f"提供的Schema用于模型 '{safe_model_name}' 的必须是 type 'object' 且是一个字典, 实际: {schema_type_for_log}")
return None
properties = schema.get('properties', {})
required_fields = set(schema.get('required', []))
field_definitions: Dict[str, Tuple[Any, Any]] = {}
for prop_name, prop_schema in properties.items():
if not isinstance(prop_schema, dict):
self.logger.warning(f"属性 '{prop_name}' 在模型 '{safe_model_name}' 中的Schema无效已跳过。")
continue
python_type: Any = Any
field_args: Dict[str, Any] = {}
default_value: Any = ... # Ellipsis for required fields with no default
if 'default' in prop_schema:
default_value = prop_schema['default']
elif prop_name not in required_fields:
default_value = None
if 'description' in prop_schema:
field_args['description'] = prop_schema['description']
json_type = prop_schema.get('type')
json_format = prop_schema.get('format')
if json_type == 'object':
nested_model_name_base = f"{safe_model_name}_{prop_name}"
python_type = self._create_pydantic_model_from_schema(prop_schema, nested_model_name_base, recursion_depth + 1)
if python_type is None:
self.logger.warning(f"无法为 '{safe_model_name}' 中的嵌套属性 '{prop_name}' 创建模型,已跳过。")
continue
elif json_type == 'array':
items_schema = prop_schema.get('items')
if not isinstance(items_schema, dict):
self.logger.warning(f"数组属性 '{prop_name}' 在模型 '{safe_model_name}' 中的 'items' schema无效已跳过。")
continue
item_type: Any = Any
item_json_type = items_schema.get('type')
item_json_format = items_schema.get('format')
if item_json_type == 'object':
item_model_name_base = f"{safe_model_name}_{prop_name}_Item"
item_type = self._create_pydantic_model_from_schema(items_schema, item_model_name_base, recursion_depth + 1)
if item_type is None:
self.logger.warning(f"无法为 '{safe_model_name}' 中的数组属性 '{prop_name}' 的项创建模型,已跳过。")
continue
elif item_json_type == 'string':
if item_json_format == 'date-time': item_type = dt.datetime
elif item_json_format == 'date': item_type = dt.date
elif item_json_format == 'email': item_type = EmailStr
elif item_json_format == 'uuid': item_type = UUID
else: item_type = str
elif item_json_type == 'integer': item_type = int
elif item_json_type == 'number': item_type = float
elif item_json_type == 'boolean': item_type = bool
else:
self.logger.warning(f"数组 '{prop_name}' 中的项具有未知类型 '{item_json_type}',默认为 Any。")
python_type = List[item_type] # type: ignore
elif json_type == 'string':
if json_format == 'date-time': python_type = dt.datetime
elif json_format == 'date': python_type = dt.date
elif json_format == 'email': python_type = EmailStr
elif json_format == 'uuid': python_type = UUID
else: python_type = str
if 'minLength' in prop_schema: field_args['min_length'] = prop_schema['minLength']
if 'maxLength' in prop_schema: field_args['max_length'] = prop_schema['maxLength']
if 'pattern' in prop_schema: field_args['pattern'] = prop_schema['pattern']
elif json_type == 'integer':
python_type = int
if 'minimum' in prop_schema: field_args['ge'] = prop_schema['minimum']
if 'maximum' in prop_schema: field_args['le'] = prop_schema['maximum']
elif json_type == 'number':
python_type = float
if 'minimum' in prop_schema: field_args['ge'] = prop_schema['minimum']
if 'maximum' in prop_schema: field_args['le'] = prop_schema['maximum']
elif json_type == 'boolean':
python_type = bool
elif json_type is None and '$ref' in prop_schema:
self.logger.warning(f"Schema $ref '{prop_schema['$ref']}' in '{safe_model_name}.{prop_name}' not yet supported. Defaulting to Any.")
python_type = Any
else:
self.logger.warning(f"属性 '{prop_name}' 在模型 '{safe_model_name}' 中具有未知类型 '{json_type}',默认为 Any。")
python_type = Any
if 'enum' in prop_schema:
enum_values = prop_schema['enum']
if enum_values:
enum_desc = f" (Enum values: {', '.join(map(str, enum_values))})"
field_args['description'] = field_args.get('description', '') + enum_desc
current_field_is_optional = prop_name not in required_fields
if current_field_is_optional and python_type is not Any and default_value is None:
# For Pydantic v1/v2, if a field is not required and has no other default, it's Optional.
# The `python_type` itself might already be an `Optional` if it came from a nested optional model.
# We only wrap with Optional if it's not already wrapped effectively.
# A simple check: if the type name doesn't start with "Optional"
if not (hasattr(python_type, '__origin__') and python_type.__origin__ is Union and type(None) in python_type.__args__):
python_type = Optional[python_type]
field_definitions[prop_name] = (python_type, Field(default_value, **field_args))
if not field_definitions:
self.logger.warning(f"模型 '{safe_model_name}' 没有有效的字段定义,无法创建。")
# Return a very basic BaseModel if no properties are defined but an object schema was given
# This might happen for an empty object schema {}
try:
EmptyModel = create_model(safe_model_name, __base__=BaseModel)
_dynamic_model_cache[safe_model_name] = EmptyModel
self.logger.info(f"创建了一个空的动态Pydantic模型: '{safe_model_name}' (由于无属性定义)")
return EmptyModel
except Exception as e_empty:
self.logger.error(f"尝试为 '{safe_model_name}' 创建空模型时失败: {e_empty}", exc_info=True)
return None
try:
# ForwardRef for self-referencing models is complex; not fully handled here yet.
# If a type in field_definitions is a string (e.g., a ForwardRef string), create_model handles it.
DynamicModel = create_model(safe_model_name, **field_definitions, __base__=BaseModel) # type: ignore
_dynamic_model_cache[safe_model_name] = DynamicModel
self.logger.info(f"成功创建/缓存了动态Pydantic模型: '{safe_model_name}'")
# Attempt to update forward refs if any were string types that are now defined
# This is a simplified approach. Pydantic's update_forward_refs is usually called on the module or specific model.
# For dynamically created models, this might need careful handling if true circular deps are common.
# For now, we assume nested creation order mostly handles dependencies.
# if hasattr(DynamicModel, 'update_forward_refs'):
# try:
# DynamicModel.update_forward_refs(**_dynamic_model_cache)
# self.logger.debug(f"Attempted to update forward refs for {safe_model_name}")
# except Exception as e_fwd:
# self.logger.warning(f"Error updating forward_refs for {safe_model_name}: {e_fwd}")
return DynamicModel
except Exception as e:
self.logger.error(f"使用Pydantic create_model创建 '{safe_model_name}' 时失败: {e}", exc_info=True)
return None
def _execute_single_test_case( def _execute_single_test_case(
self, self,
@ -385,7 +572,6 @@ class APITestOrchestrator:
validation_points: List[ValidationResult] = [] validation_points: List[ValidationResult] = []
test_case_instance: Optional[BaseAPITestCase] = None test_case_instance: Optional[BaseAPITestCase] = None
# 准备 endpoint_spec_dict
endpoint_spec_dict: Dict[str, Any] endpoint_spec_dict: Dict[str, Any]
if hasattr(endpoint_spec, 'to_dict') and callable(endpoint_spec.to_dict): if hasattr(endpoint_spec, 'to_dict') and callable(endpoint_spec.to_dict):
endpoint_spec_dict = endpoint_spec.to_dict() endpoint_spec_dict = endpoint_spec.to_dict()
@ -393,15 +579,15 @@ class APITestOrchestrator:
endpoint_spec_dict = { endpoint_spec_dict = {
"method": getattr(endpoint_spec, 'method', 'UNKNOWN_METHOD'), "method": getattr(endpoint_spec, 'method', 'UNKNOWN_METHOD'),
"path": getattr(endpoint_spec, 'path', 'UNKNOWN_PATH'), "path": getattr(endpoint_spec, 'path', 'UNKNOWN_PATH'),
"title": getattr(endpoint_spec, 'title', ''), "title": getattr(endpoint_spec, 'title', getattr(endpoint_spec, 'summary', '')),
"summary": getattr(endpoint_spec, 'summary', ''), "summary": getattr(endpoint_spec, 'summary', ''),
"description": getattr(endpoint_spec, 'description', ''), # 确保description也被传递
"_original_object_type": type(endpoint_spec).__name__ "_original_object_type": type(endpoint_spec).__name__
} }
if isinstance(endpoint_spec, YAPIEndpoint): if isinstance(endpoint_spec, YAPIEndpoint):
for attr_name in dir(endpoint_spec): for attr_name in dir(endpoint_spec):
if not attr_name.startswith('_') and not callable(getattr(endpoint_spec, attr_name)): if not attr_name.startswith('_') and not callable(getattr(endpoint_spec, attr_name)):
try: try:
# Test serializability before adding
json.dumps({attr_name: getattr(endpoint_spec, attr_name)}) json.dumps({attr_name: getattr(endpoint_spec, attr_name)})
endpoint_spec_dict[attr_name] = getattr(endpoint_spec, attr_name) endpoint_spec_dict[attr_name] = getattr(endpoint_spec, attr_name)
except (TypeError, OverflowError): except (TypeError, OverflowError):
@ -431,24 +617,22 @@ class APITestOrchestrator:
) )
test_case_instance.logger.info(f"开始执行测试用例 '{test_case_instance.id}' for endpoint '{endpoint_spec_dict.get('method')} {endpoint_spec_dict.get('path')}'") test_case_instance.logger.info(f"开始执行测试用例 '{test_case_instance.id}' for endpoint '{endpoint_spec_dict.get('method')} {endpoint_spec_dict.get('path')}'")
# 1. 请求构建阶段 # 调用 _prepare_initial_request_data 时传递 test_case_instance
initial_request_data = self._prepare_initial_request_data(endpoint_spec) # endpoint_spec 是原始对象 initial_request_data = self._prepare_initial_request_data(endpoint_spec, test_case_instance=test_case_instance)
current_q_params = test_case_instance.generate_query_params(initial_request_data['query_params']) current_q_params = test_case_instance.generate_query_params(initial_request_data['query_params'])
current_headers = test_case_instance.generate_headers(initial_request_data['headers']) current_headers = test_case_instance.generate_headers(initial_request_data['headers'])
current_body = test_case_instance.generate_request_body(initial_request_data['body']) current_body = test_case_instance.generate_request_body(initial_request_data['body'])
# 路径参数应该从 initial_request_data 中获取,因为 _prepare_initial_request_data 负责生成它们
current_path_params = initial_request_data['path_params'] current_path_params = initial_request_data['path_params']
# 构建最终请求URL使用 current_path_params 进行替换
final_url = self.base_url + endpoint_spec_dict.get('path', '') final_url = self.base_url + endpoint_spec_dict.get('path', '')
for p_name, p_val in current_path_params.items(): for p_name, p_val in current_path_params.items():
placeholder = f"{{{p_name}}}" placeholder = f"{{{p_name}}}"
if placeholder in final_url: if placeholder in final_url:
final_url = final_url.replace(placeholder, str(p_val)) final_url = final_url.replace(placeholder, str(p_val))
else: else:
self.logger.warning(f"路径参数 '{p_name}' 在路径模板 '{endpoint_spec_dict.get('path')}' 中未找到占位符,但为其生成了值") self.logger.warning(f"路径参数 '{p_name}' 在路径模板 '{endpoint_spec_dict.get('path')}' 中未找到占位符")
api_request_context = APIRequestContext( api_request_context = APIRequestContext(
method=endpoint_spec_dict.get('method', 'GET').upper(), method=endpoint_spec_dict.get('method', 'GET').upper(),
@ -460,79 +644,68 @@ class APITestOrchestrator:
endpoint_spec=endpoint_spec_dict endpoint_spec=endpoint_spec_dict
) )
# 1.5. 请求预校验
validation_points.extend(test_case_instance.validate_request_url(api_request_context.url, api_request_context)) validation_points.extend(test_case_instance.validate_request_url(api_request_context.url, api_request_context))
validation_points.extend(test_case_instance.validate_request_headers(api_request_context.headers, api_request_context)) validation_points.extend(test_case_instance.validate_request_headers(api_request_context.headers, api_request_context))
validation_points.extend(test_case_instance.validate_request_body(api_request_context.body, api_request_context)) validation_points.extend(test_case_instance.validate_request_body(api_request_context.body, api_request_context))
# 检查是否有严重预校验失败
critical_pre_validation_failure = False critical_pre_validation_failure = False
failure_messages = [] failure_messages = []
for vp in validation_points: for vp in validation_points:
if not vp.passed and test_case_instance.severity in [TestSeverity.CRITICAL, TestSeverity.HIGH]: if not vp.passed and test_case_instance.severity in [TestSeverity.CRITICAL, TestSeverity.HIGH]: # Check severity of the Test Case for pre-validation
critical_pre_validation_failure = True critical_pre_validation_failure = True
failure_messages.append(vp.message) failure_messages.append(vp.message)
if critical_pre_validation_failure: if critical_pre_validation_failure:
self.logger.warning(f"测试用例 '{test_case_instance.id}' 因请求预校验失败而中止 (严重级别: {test_case_instance.severity.value})。失败信息: {'; '.join(failure_messages)}") self.logger.warning(f"测试用例 '{test_case_instance.id}' 因请求预校验失败而中止 (TC严重级别: {test_case_instance.severity.value})。失败信息: {'; '.join(failure_messages)}")
tc_duration = time.time() - tc_start_time tc_duration = time.time() - tc_start_time
return ExecutedTestCaseResult( return ExecutedTestCaseResult(
test_case_id=test_case_instance.id, test_case_id=test_case_instance.id,
test_case_name=test_case_instance.name, test_case_name=test_case_instance.name,
test_case_severity=test_case_instance.severity, test_case_severity=test_case_instance.severity,
status=ExecutedTestCaseResult.Status.FAILED, # 预校验失败算作 FAILED status=ExecutedTestCaseResult.Status.FAILED,
validation_points=validation_points, validation_points=validation_points,
message=f"请求预校验失败: {'; '.join(failure_messages)}", message=f"请求预校验失败: {'; '.join(failure_messages)}",
duration=tc_duration duration=tc_duration
) )
# ---- API 调用 ----
api_request_obj = APIRequest( api_request_obj = APIRequest(
method=api_request_context.method, method=api_request_context.method,
url=api_request_context.url, url=api_request_context.url,
params=api_request_context.query_params, params=api_request_context.query_params,
headers=api_request_context.headers, headers=api_request_context.headers,
json_data=api_request_context.body # Assuming JSON, APICaller might need to handle other types json_data=api_request_context.body
) )
response_call_start_time = time.time() response_call_start_time = time.time()
api_response_obj = self.api_caller.call_api(api_request_obj) api_response_obj = self.api_caller.call_api(api_request_obj)
response_call_elapsed_time = time.time() - response_call_start_time response_call_elapsed_time = time.time() - response_call_start_time
# ---- 响应验证 ----
# 3. 创建 APIResponseContext
actual_text_content: Optional[str] = None actual_text_content: Optional[str] = None
if hasattr(api_response_obj, 'text_content') and api_response_obj.text_content is not None: # 优先尝试直接获取 if hasattr(api_response_obj, 'text_content') and api_response_obj.text_content is not None:
actual_text_content = api_response_obj.text_content actual_text_content = api_response_obj.text_content
elif api_response_obj.json_content is not None: elif api_response_obj.json_content is not None:
if isinstance(api_response_obj.json_content, str): if isinstance(api_response_obj.json_content, str): # Should not happen if json_content is parsed
actual_text_content = api_response_obj.json_content actual_text_content = api_response_obj.json_content
else: else:
try: try:
actual_text_content = json.dumps(api_response_obj.json_content, ensure_ascii=False) actual_text_content = json.dumps(api_response_obj.json_content, ensure_ascii=False)
except TypeError: except TypeError: # If json_content is not serializable (e.g. bytes)
actual_text_content = str(api_response_obj.json_content) # 最后手段 actual_text_content = str(api_response_obj.json_content)
# elapsed_time: 使用 response_call_elapsed_time
# original_response: 设置为 None 因为 api_response_obj 没有 raw_response
api_response_context = APIResponseContext( api_response_context = APIResponseContext(
status_code=api_response_obj.status_code, status_code=api_response_obj.status_code,
headers=api_response_obj.headers, # 假设这些直接在 api_response_obj 上 headers=api_response_obj.headers,
json_content=api_response_obj.json_content, # 这个根据之前的错误提示是存在的 json_content=api_response_obj.json_content,
text_content=actual_text_content, text_content=actual_text_content,
elapsed_time=response_call_elapsed_time, elapsed_time=response_call_elapsed_time,
original_response=None, # api_response_obj 没有 .raw_response 属性 original_response= getattr(api_response_obj, 'raw_response', None), # Pass raw if available
request_context=api_request_context request_context=api_request_context
) )
# 4. 执行响应验证和性能检查
validation_points.extend(test_case_instance.validate_response(api_response_context, api_request_context)) validation_points.extend(test_case_instance.validate_response(api_response_context, api_request_context))
validation_points.extend(test_case_instance.check_performance(api_response_context, api_request_context)) validation_points.extend(test_case_instance.check_performance(api_response_context, api_request_context))
# ---- 结果判定 ----
# 5. 判断此测试用例的最终状态
final_status = ExecutedTestCaseResult.Status.PASSED final_status = ExecutedTestCaseResult.Status.PASSED
if any(not vp.passed for vp in validation_points): if any(not vp.passed for vp in validation_points):
final_status = ExecutedTestCaseResult.Status.FAILED final_status = ExecutedTestCaseResult.Status.FAILED
@ -553,45 +726,32 @@ class APITestOrchestrator:
return ExecutedTestCaseResult( return ExecutedTestCaseResult(
test_case_id=test_case_instance.id if test_case_instance else test_case_class.id if hasattr(test_case_class, 'id') else "unknown_tc_id", test_case_id=test_case_instance.id if test_case_instance else test_case_class.id if hasattr(test_case_class, 'id') else "unknown_tc_id",
test_case_name=test_case_instance.name if test_case_instance else test_case_class.name if hasattr(test_case_class, 'name') else "Unknown Test Case Name", test_case_name=test_case_instance.name if test_case_instance else test_case_class.name if hasattr(test_case_class, 'name') else "Unknown Test Case Name",
test_case_severity=test_case_instance.severity if test_case_instance else TestSeverity.CRITICAL, # Default to critical on error test_case_severity=test_case_instance.severity if test_case_instance else TestSeverity.CRITICAL,
status=ExecutedTestCaseResult.Status.ERROR, status=ExecutedTestCaseResult.Status.ERROR,
validation_points=validation_points, # 可能包含部分成功或失败的验证点 validation_points=validation_points,
message=f"测试用例执行时发生内部错误: {str(e)}", message=f"测试用例执行时发生内部错误: {str(e)}",
duration=tc_duration duration=tc_duration
) )
def _prepare_initial_request_data(self, endpoint_spec: Union[YAPIEndpoint, SwaggerEndpoint]) -> Dict[str, Any]: def _prepare_initial_request_data(self, endpoint_spec: Union[YAPIEndpoint, SwaggerEndpoint], test_case_instance: Optional[BaseAPITestCase] = None) -> Dict[str, Any]:
""" """
根据端点规格准备一个初始的请求数据结构 根据端点规格准备一个初始的请求数据结构
返回一个包含 'path_params', 'query_params', 'headers', 'body' 的字典 返回一个包含 'path_params', 'query_params', 'headers', 'body' 的字典
Args:
endpoint_spec: 当前端点的规格
test_case_instance: (可选) 当前正在执行的测试用例实例用于细粒度控制LLM使用
""" """
self.logger.debug(f"Preparing initial request data for: {endpoint_spec.method} {endpoint_spec.path}") self.logger.debug(f"Preparing initial request data for: {endpoint_spec.method} {endpoint_spec.path}")
# path_params_spec: List[Dict] # 用于存储从Swagger等提取的路径参数定义
# query_params_spec: List[Dict]
# headers_spec: List[Dict]
# body_schema: Optional[Dict]
# 重置/初始化这些变量,以避免跨调用共享状态(如果 APITestOrchestrator 实例被重用)
path_params_spec_list: List[Dict[str, Any]] = [] path_params_spec_list: List[Dict[str, Any]] = []
query_params_spec_list: List[Dict[str, Any]] = [] query_params_spec_list: List[Dict[str, Any]] = []
headers_spec_list: List[Dict[str, Any]] = [] headers_spec_list: List[Dict[str, Any]] = []
body_schema_dict: Optional[Dict[str, Any]] = None body_schema_dict: Optional[Dict[str, Any]] = None
path_str = getattr(endpoint_spec, 'path', '') path_str = getattr(endpoint_spec, 'path', '')
if isinstance(endpoint_spec, YAPIEndpoint): if isinstance(endpoint_spec, YAPIEndpoint):
query_params_spec_list = endpoint_spec.req_query or [] query_params_spec_list = endpoint_spec.req_query or []
headers_spec_list = endpoint_spec.req_headers or [] headers_spec_list = endpoint_spec.req_headers or []
# YAPI 的路径参数在 req_params 中,如果用户定义了的话
if endpoint_spec.req_params:
for p in endpoint_spec.req_params:
# YAPI的req_params可能混合了路径参数和查询参数这里只关心路径中实际存在的
# 需要从 path_str 中解析出占位符,然后匹配 req_params 中的定义
# 简化:我们假设 req_params 中的条目如果其 name 在路径占位符中,则是路径参数
# 更好的做法是 YAPI 解析器能明确区分它们
pass # 下面会统一处理路径参数
if endpoint_spec.req_body_type == 'json' and endpoint_spec.req_body_other: if endpoint_spec.req_body_type == 'json' and endpoint_spec.req_body_other:
try: try:
body_schema_dict = json.loads(endpoint_spec.req_body_other) if isinstance(endpoint_spec.req_body_other, str) else endpoint_spec.req_body_other body_schema_dict = json.loads(endpoint_spec.req_body_other) if isinstance(endpoint_spec.req_body_other, str) else endpoint_spec.req_body_other
@ -599,6 +759,23 @@ class APITestOrchestrator:
self.logger.warning(f"YAPI req_body_other for {path_str} is not valid JSON: {endpoint_spec.req_body_other}") self.logger.warning(f"YAPI req_body_other for {path_str} is not valid JSON: {endpoint_spec.req_body_other}")
elif isinstance(endpoint_spec, SwaggerEndpoint): elif isinstance(endpoint_spec, SwaggerEndpoint):
# 优先尝试 OpenAPI 3.0+ 的 requestBody
if endpoint_spec.request_body and 'content' in endpoint_spec.request_body:
json_content_spec = endpoint_spec.request_body['content'].get('application/json', {})
if 'schema' in json_content_spec:
body_schema_dict = json_content_spec['schema']
self.logger.debug("从 Swagger 3.0+ 'requestBody' 中提取到 body schema。")
# 如果没有从 requestBody 中找到,再尝试 Swagger 2.0 的 in: "body" 参数
if not body_schema_dict and endpoint_spec.parameters:
for param_spec in endpoint_spec.parameters:
if param_spec.get('in') == 'body':
if 'schema' in param_spec:
body_schema_dict = param_spec['schema']
self.logger.debug(f"从 Swagger 2.0 'in: body' 参数 '{param_spec.get('name')}' 中提取到 body schema (作为回退)。")
break # 找到一个 body 参数就足够了
# 处理 path, query, header 参数 (这部分逻辑需要保留并放在正确的位置)
if endpoint_spec.parameters: if endpoint_spec.parameters:
for param_spec in endpoint_spec.parameters: for param_spec in endpoint_spec.parameters:
param_in = param_spec.get('in') param_in = param_spec.get('in')
@ -608,68 +785,53 @@ class APITestOrchestrator:
query_params_spec_list.append(param_spec) query_params_spec_list.append(param_spec)
elif param_in == 'header': elif param_in == 'header':
headers_spec_list.append(param_spec) headers_spec_list.append(param_spec)
if endpoint_spec.request_body and 'content' in endpoint_spec.request_body:
json_content_spec = endpoint_spec.request_body['content'].get('application/json', {})
if 'schema' in json_content_spec:
body_schema_dict = json_content_spec['schema']
# --- 生成路径参数数据 ---
path_params_data: Dict[str, Any] = {} path_params_data: Dict[str, Any] = {}
import re import re
# 从路径字符串中提取所有占位符名称,例如 /users/{id}/items/{itemId} -> ["id", "itemId"]
path_param_names_in_url = re.findall(r'{(.*?)}', path_str) path_param_names_in_url = re.findall(r'{(.*?)}', path_str)
for p_name in path_param_names_in_url: for p_name in path_param_names_in_url:
found_spec = None found_spec = None
# 尝试从 Swagger 的 path_params_spec_list 查找详细定义
for spec in path_params_spec_list: for spec in path_params_spec_list:
if spec.get('name') == p_name: if spec.get('name') == p_name:
found_spec = spec found_spec = spec
break break
# 尝试从 YAPI 的 req_params (如果之前有解析并填充到类似 path_params_spec_list 的结构)
# (当前YAPI的req_params未直接用于填充path_params_spec_list, 需要改进InputParser或此处逻辑)
# TODO: YAPI的req_params需要更可靠地映射到路径参数
if found_spec and isinstance(found_spec, dict): if found_spec and isinstance(found_spec, dict):
# 如果找到参数的详细规格 (例如来自Swagger)
value = found_spec.get('example') value = found_spec.get('example')
if value is None and found_spec.get('schema'): if value is None and found_spec.get('schema'):
value = self._generate_data_from_schema(found_spec['schema']) value = self._generate_data_from_schema(found_spec['schema'])
path_params_data[p_name] = value if value is not None else f"example_{p_name}" # Fallback path_params_data[p_name] = value if value is not None else f"example_{p_name}"
else: else:
# 如果没有详细规格,生成一个通用占位符值
path_params_data[p_name] = f"example_{p_name}" path_params_data[p_name] = f"example_{p_name}"
self.logger.debug(f"Path param '{p_name}' generated value: {path_params_data[p_name]}") self.logger.debug(f"Path param '{p_name}' generated value: {path_params_data[p_name]}")
# --- 生成查询参数数据 ---
query_params_data: Dict[str, Any] = {} query_params_data: Dict[str, Any] = {}
for q_param_spec in query_params_spec_list: for q_param_spec in query_params_spec_list:
name = q_param_spec.get('name') name = q_param_spec.get('name')
if name: if name:
value = q_param_spec.get('example') # Swagger/OpenAPI style value = q_param_spec.get('example')
if value is None and 'value' in q_param_spec: # YAPI style (value often holds example or default) if value is None and 'value' in q_param_spec:
value = q_param_spec['value'] value = q_param_spec['value']
if value is None and q_param_spec.get('schema'): # Swagger/OpenAPI schema for param if value is None and q_param_spec.get('schema'):
value = self._generate_data_from_schema(q_param_spec['schema']) value = self._generate_data_from_schema(q_param_spec['schema'])
elif value is None and q_param_spec.get('type'): # YAPI may define type directly elif value is None and q_param_spec.get('type'):
# Simplified schema generation for YAPI direct type if no 'value' field
value = self._generate_data_from_schema({'type': q_param_spec.get('type')}) value = self._generate_data_from_schema({'type': q_param_spec.get('type')})
query_params_data[name] = value if value is not None else f"example_query_{name}" query_params_data[name] = value if value is not None else f"example_query_{name}"
# --- 生成请求头数据 ---
headers_data: Dict[str, str] = {"Content-Type": "application/json", "Accept": "application/json"} headers_data: Dict[str, str] = {"Content-Type": "application/json", "Accept": "application/json"}
for h_param_spec in headers_spec_list: for h_param_spec in headers_spec_list:
name = h_param_spec.get('name') name = h_param_spec.get('name')
if name and name.lower() not in ['content-type', 'accept']: # 不要覆盖基础的Content-Type/Accept除非明确 if name and name.lower() not in ['content-type', 'accept']:
value = h_param_spec.get('example') value = h_param_spec.get('example')
if value is None and 'value' in h_param_spec: # YAPI if value is None and 'value' in h_param_spec:
value = h_param_spec['value'] value = h_param_spec['value']
if value is None and h_param_spec.get('schema'): # Swagger if value is None and h_param_spec.get('schema'):
value = self._generate_data_from_schema(h_param_spec['schema']) value = self._generate_data_from_schema(h_param_spec['schema'])
elif value is None and h_param_spec.get('type'): # YAPI elif value is None and h_param_spec.get('type'):
value = self._generate_data_from_schema({'type': h_param_spec.get('type')}) value = self._generate_data_from_schema({'type': h_param_spec.get('type')})
if value is not None: if value is not None:
@ -677,37 +839,85 @@ class APITestOrchestrator:
else: else:
headers_data[name] = f"example_header_{name}" headers_data[name] = f"example_header_{name}"
# --- 生成请求体数据 ---
body_data: Optional[Any] = None body_data: Optional[Any] = None
if body_schema_dict: if body_schema_dict:
generated_by_llm = False generated_by_llm = False
if self.use_llm_for_request_body and self.llm_service:
self.logger.debug(f"尝试使用 LLM 为端点 {endpoint_spec.method} {endpoint_spec.path} 生成请求体。") # 决定是否应该为这个特定的情况尝试LLM
# 1. 全局开关 self.use_llm_for_request_body 必须为 True
# 2. LLM 服务 self.llm_service 必须可用
# 3. 测试用例级别配置 test_case_instance.use_llm_for_body (如果存在且不是None) 会覆盖全局配置
attempt_llm_globally = self.use_llm_for_request_body and self.llm_service
should_try_llm_for_this_run = attempt_llm_globally
if test_case_instance and hasattr(test_case_instance, 'use_llm_for_body') and test_case_instance.use_llm_for_body is not None:
should_try_llm_for_this_run = test_case_instance.use_llm_for_body
if should_try_llm_for_this_run and not self.llm_service:
self.logger.warning(f"测试用例 '{test_case_instance.id}' 配置为使用LLM但LLM服务不可用。将回退。")
should_try_llm_for_this_run = False # LLM服务不可用时即使TC要求也无法使用
self.logger.debug(f"测试用例 '{test_case_instance.id}' 的 use_llm_for_body 设置为 {test_case_instance.use_llm_for_body}最终决策是否尝试LLM: {should_try_llm_for_this_run}")
elif not attempt_llm_globally and test_case_instance and hasattr(test_case_instance, 'use_llm_for_body') and test_case_instance.use_llm_for_body is True and not self.llm_service:
# 特殊情况全局LLM关闭但测试用例希望开启可是LLM服务不可用
self.logger.warning(f"测试用例 '{test_case_instance.id}' 配置为使用LLM但全局LLM服务不可用或未配置。将回退。")
should_try_llm_for_this_run = False
if should_try_llm_for_this_run: # 只有在最终决策为True时才尝试
self.logger.debug(f"尝试使用 LLM 为端点 {endpoint_spec.method} {endpoint_spec.path} 生成请求体 (TC覆盖: {test_case_instance.use_llm_for_body if test_case_instance else 'N/A'})。")
try: try:
# TODO: 动态创建 Pydantic 模型 (步骤2的核心) # 生成一个稍微独特但可预测的模型名称,以利于缓存和调试
# DynamicPydanticModel = self._create_pydantic_model_from_schema(body_schema_dict, "DynamicRequestBodyModel") model_base_name = "".join(part.capitalize() for part in re.split(r'[^a-zA-Z0-9]+', endpoint_spec.path.strip('/')) if part)
# if DynamicPydanticModel: dynamic_model_name = f"{model_base_name}{endpoint_spec.method.capitalize()}Body"
# # TODO: 考虑是否需要从 endpoint_spec 中提取一些 prompt_instructions if not dynamic_model_name or not dynamic_model_name[0].isalpha(): # 确保名称有效
# llm_generated_body = self.llm_service.generate_parameters_from_schema( dynamic_model_name = f"Dynamic{endpoint_spec.method.capitalize()}Body_{abs(hash(endpoint_spec.path))}"
# pydantic_model_class=DynamicPydanticModel,
# prompt_instructions=f"为API端点 {endpoint_spec.title or endpoint_spec.path} 生成请求体。"
# ) DynamicPydanticModel = self._create_pydantic_model_from_schema(body_schema_dict, dynamic_model_name)
# if llm_generated_body is not None:
# body_data = llm_generated_body if DynamicPydanticModel:
# generated_by_llm = True # 尝试获取端点的可读名称,优先顺序: title, summary, path
# self.logger.info(f"LLM 成功为 {endpoint_spec.method} {endpoint_spec.path} 生成了请求体。") readable_endpoint_name = getattr(endpoint_spec, 'title', None) or \
# else: getattr(endpoint_spec, 'summary', None) or \
# self.logger.warning(f"LLM未能为 {endpoint_spec.method} {endpoint_spec.path} 生成请求体,将回退到默认方法。") endpoint_spec.path
# else:
# self.logger.warning(f"未能从Schema动态创建Pydantic模型用于LLM请求体生成将回退。") prompt_instr = f"请为API端点 '{readable_endpoint_name}' (方法: {endpoint_spec.method}) 生成一个符合其定义的请求体。"
self.logger.info("LLM请求体生成部分尚未完全实现 (_create_pydantic_model_from_schema)。暂时回退。") # 临时日志
pass # 占位,直到 _create_pydantic_model_from_schema 完成 # 可以进一步从 description 获取更详细的上下文给LLM
ep_description = getattr(endpoint_spec, 'description', None)
if ep_description:
prompt_instr += f" API描述: {ep_description}"
llm_generated_body = self.llm_service.generate_parameters_from_schema(
pydantic_model_class=DynamicPydanticModel,
prompt_instructions=prompt_instr
)
if llm_generated_body is not None:
try:
# 尝试用生成的模型验证LLM的输出确保LLM确实遵循了schema
DynamicPydanticModel.model_validate(llm_generated_body)
body_data = llm_generated_body
generated_by_llm = True
self.logger.info(f"LLM 成功为 {endpoint_spec.method} {endpoint_spec.path} 生成并验证了请求体。")
except Exception as p_val_error: # Catches Pydantic's ValidationError
self.logger.warning(f"LLM为 {endpoint_spec.method} {endpoint_spec.path} 生成的请求体未能通过动态Pydantic模型验证: {p_val_error}. 将回退。LLM输出: {json.dumps(llm_generated_body, indent=2, ensure_ascii=False)[:500]}...")
else:
self.logger.warning(f"LLM未能为 {endpoint_spec.method} {endpoint_spec.path} 生成请求体内容,将回退到默认方法。")
else:
self.logger.warning(f"未能从Schema动态创建Pydantic模型用于LLM请求体生成 (端点: {endpoint_spec.method} {endpoint_spec.path}),将回退。")
except Exception as e: except Exception as e:
self.logger.error(f"使用LLM生成请求体时发生错误: {e}。将回退到默认方法。", exc_info=True) self.logger.error(f"使用LLM生成请求体时发生错误: {e}。将回退到默认方法。", exc_info=True)
if not generated_by_llm: # 如果未使用LLM或LLM生成失败 if not generated_by_llm:
if self.use_llm_for_request_body and self.llm_service: # 只有在尝试过LLM之后才打印这条回退日志 # 只有当确实尝试了LLMshould_try_llm_for_this_run为True但失败了或者测试用例强制不使用LLM才记录回退日志
self.logger.debug(f"LLM生成请求体失败或未启用回退到基于规则的生成方法 for {endpoint_spec.method} {endpoint_spec.path}") log_fallback = False
if should_try_llm_for_this_run: # 如果本应尝试LLM但generated_by_llm是False说明LLM失败了
log_fallback = True
elif test_case_instance and hasattr(test_case_instance, 'use_llm_for_body') and test_case_instance.use_llm_for_body is False:
# 如果测试用例明确禁用了LLM
log_fallback = True
self.logger.debug(f"测试用例 '{test_case_instance.id}' 明确配置不使用LLM使用基于规则的生成方法 for {endpoint_spec.method} {endpoint_spec.path}")
if log_fallback and not (test_case_instance and hasattr(test_case_instance, 'use_llm_for_body') and test_case_instance.use_llm_for_body is False) : # 避免重复日志
self.logger.debug(f"LLM生成请求体失败或未启用 (最终决策: {should_try_llm_for_this_run}), 回退到基于规则的生成方法 for {endpoint_spec.method} {endpoint_spec.path}")
body_data = self._generate_data_from_schema(body_schema_dict) body_data = self._generate_data_from_schema(body_schema_dict)
return { return {
@ -718,30 +928,23 @@ class APITestOrchestrator:
} }
def run_test_for_endpoint(self, endpoint: Union[YAPIEndpoint, SwaggerEndpoint], def run_test_for_endpoint(self, endpoint: Union[YAPIEndpoint, SwaggerEndpoint],
global_api_spec: Union[ParsedYAPISpec, ParsedSwaggerSpec] # 新增参数 global_api_spec: Union[ParsedYAPISpec, ParsedSwaggerSpec]
) -> TestResult: # 返回类型更新为新的TestResult (EndpointExecutionResult) ) -> TestResult:
"""
运行单个API端点的所有适用测试用例
"""
endpoint_id = f"{getattr(endpoint, 'method', 'GET').upper()} {getattr(endpoint, 'path', '/')}" endpoint_id = f"{getattr(endpoint, 'method', 'GET').upper()} {getattr(endpoint, 'path', '/')}"
endpoint_name = getattr(endpoint, 'title', '') or getattr(endpoint, 'summary', '') or endpoint_id endpoint_name = getattr(endpoint, 'title', '') or getattr(endpoint, 'summary', '') or endpoint_id
self.logger.info(f"开始为端点测试: {endpoint_id} ({endpoint_name})") self.logger.info(f"开始为端点测试: {endpoint_id} ({endpoint_name})")
# 使用新的TestResult结构 (它现在代表 EndpointExecutionResult) endpoint_test_result = TestResult(
endpoint_test_result = TestResult( # 这是新的 TestResult
endpoint_id=endpoint_id, endpoint_id=endpoint_id,
endpoint_name=endpoint_name, endpoint_name=endpoint_name,
# api_spec_details=endpoint.to_dict() if hasattr(endpoint, 'to_dict') else endpoint # 可选
) )
if not self.test_case_registry: if not self.test_case_registry:
self.logger.warning(f"TestCaseRegistry 未初始化,无法为端点 '{endpoint_id}' 执行自定义测试用例。") self.logger.warning(f"TestCaseRegistry 未初始化,无法为端点 '{endpoint_id}' 执行自定义测试用例。")
# TODO: 决定此时的行为,是跳过,还是执行旧的规则引擎(如果保留),或者标记为错误。 endpoint_test_result.overall_status = TestResult.Status.SKIPPED
# 简化:如果只想运行新的测试用例,那么这里就直接结束此端点的测试。
endpoint_test_result.overall_status = TestResult.Status.SKIPPED # 或者 ERROR
endpoint_test_result.error_message = "TestCaseRegistry 未初始化。" endpoint_test_result.error_message = "TestCaseRegistry 未初始化。"
endpoint_test_result.finalize_endpoint_test() # 计算持续时间等 endpoint_test_result.finalize_endpoint_test()
return endpoint_test_result return endpoint_test_result
applicable_test_case_classes = self.test_case_registry.get_applicable_test_cases( applicable_test_case_classes = self.test_case_registry.get_applicable_test_cases(
@ -751,10 +954,7 @@ class APITestOrchestrator:
if not applicable_test_case_classes: if not applicable_test_case_classes:
self.logger.info(f"端点 '{endpoint_id}' 没有找到适用的自定义测试用例。") self.logger.info(f"端点 '{endpoint_id}' 没有找到适用的自定义测试用例。")
# 同样,决定行为。如果只依赖自定义测试用例,则此端点可能算作 SKIPPED 或某种形式的通过/信息。 endpoint_test_result.finalize_endpoint_test()
# endpoint_test_result.overall_status = TestResult.Status.SKIPPED # 或 INFO / PASSED_NO_CASES
# endpoint_test_result.message = "没有适用的自定义测试用例。"
endpoint_test_result.finalize_endpoint_test() # 会将状态设置为ERROR并附带消息
return endpoint_test_result return endpoint_test_result
self.logger.info(f"端点 '{endpoint_id}' 发现了 {len(applicable_test_case_classes)} 个适用的测试用例: {[tc.id for tc in applicable_test_case_classes]}") self.logger.info(f"端点 '{endpoint_id}' 发现了 {len(applicable_test_case_classes)} 个适用的测试用例: {[tc.id for tc in applicable_test_case_classes]}")
@ -769,35 +969,15 @@ class APITestOrchestrator:
endpoint_test_result.add_executed_test_case_result(executed_case_result) endpoint_test_result.add_executed_test_case_result(executed_case_result)
self.logger.debug(f"测试用例 '{tc_class.id}' 执行完毕,状态: {executed_case_result.status.value}") self.logger.debug(f"测试用例 '{tc_class.id}' 执行完毕,状态: {executed_case_result.status.value}")
# 所有测试用例执行完毕后,最终确定此端点的状态
endpoint_test_result.finalize_endpoint_test() endpoint_test_result.finalize_endpoint_test()
self.logger.info(f"端点 '{endpoint_id}' 测试完成,最终状态: {endpoint_test_result.overall_status.value}") self.logger.info(f"端点 '{endpoint_id}' 测试完成,最终状态: {endpoint_test_result.overall_status.value}")
# 旧的规则引擎逻辑 (self.rule_executor) 可以选择性地在这里调用,
# 或者完全被新的 APITestCase 机制取代。
# 如果要保留,需要决定它如何与新的结果结构集成。
# 目前,为了清晰和逐步迁移,我们假设主要依赖新的 APITestCase。
return endpoint_test_result return endpoint_test_result
def run_tests_from_yapi(self, yapi_file_path: str, def run_tests_from_yapi(self, yapi_file_path: str,
categories: Optional[List[str]] = None, categories: Optional[List[str]] = None,
custom_test_cases_dir: Optional[str] = None # 新增参数 custom_test_cases_dir: Optional[str] = None
) -> TestSummary: ) -> TestSummary:
"""
从YAPI定义文件运行API测试
Args:
yapi_file_path: YAPI定义文件路径
categories: 要测试的API分类列表如果为None则测试所有分类
custom_test_cases_dir: 自定义测试用例的目录如果 Orchestrator 初始化时已提供则此参数可选
如果 Orchestrator 未提供则必须在此处提供以加载测试用例
如果 Orchestrator 初始化和此处都提供了此处的优先
Returns:
TestSummary: 测试结果摘要
"""
# 如果调用时传入了 custom_test_cases_dir则重新初始化/更新 TestCaseRegistry
if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir): if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir):
self.logger.info(f"从 run_tests_from_yapi 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}") self.logger.info(f"从 run_tests_from_yapi 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}")
try: try:
@ -805,14 +985,10 @@ class APITestOrchestrator:
self.logger.info(f"TestCaseRegistry (re)initialization complete, found {len(self.test_case_registry.get_all_test_case_classes())} test case classes.") self.logger.info(f"TestCaseRegistry (re)initialization complete, found {len(self.test_case_registry.get_all_test_case_classes())} test case classes.")
except Exception as e: except Exception as e:
self.logger.error(f"从 run_tests_from_yapi 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True) self.logger.error(f"从 run_tests_from_yapi 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True)
# 决定是中止还是继续(可能不运行自定义测试)
# For now, if it fails here, it might proceed without custom tests if registry becomes None
self.logger.info(f"从YAPI文件加载API定义: {yapi_file_path}") self.logger.info(f"从YAPI文件加载API定义: {yapi_file_path}")
parsed_yapi = self.parser.parse_yapi_spec(yapi_file_path) parsed_yapi = self.parser.parse_yapi_spec(yapi_file_path)
summary = TestSummary()
summary = TestSummary() # 使用新的 TestSummary
if not parsed_yapi: if not parsed_yapi:
self.logger.error(f"解析YAPI文件失败: {yapi_file_path}") self.logger.error(f"解析YAPI文件失败: {yapi_file_path}")
@ -825,7 +1001,6 @@ class APITestOrchestrator:
summary.set_total_endpoints_defined(len(endpoints_to_test)) summary.set_total_endpoints_defined(len(endpoints_to_test))
# 计算总的适用测试用例数量 (粗略估计,实际执行时可能会因内部逻辑跳过)
total_applicable_tcs = 0 total_applicable_tcs = 0
if self.test_case_registry: if self.test_case_registry:
for endpoint_spec in endpoints_to_test: for endpoint_spec in endpoints_to_test:
@ -836,30 +1011,17 @@ class APITestOrchestrator:
) )
summary.set_total_test_cases_applicable(total_applicable_tcs) summary.set_total_test_cases_applicable(total_applicable_tcs)
for endpoint in endpoints_to_test: for endpoint in endpoints_to_test:
# 将完整的 parsed_yapi 作为 global_api_spec 传递
result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_yapi) result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_yapi)
summary.add_endpoint_result(result) # 使用新的 TestSummary 方法 summary.add_endpoint_result(result)
summary.finalize_summary() # 使用新的 TestSummary 方法 summary.finalize_summary()
return summary return summary
def run_tests_from_swagger(self, swagger_file_path: str, def run_tests_from_swagger(self, swagger_file_path: str,
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
custom_test_cases_dir: Optional[str] = None # 新增参数 custom_test_cases_dir: Optional[str] = None
) -> TestSummary: ) -> TestSummary:
"""
从Swagger定义文件运行API测试
Args:
swagger_file_path: Swagger定义文件路径
tags: 要测试的API标签列表如果为None则测试所有标签
custom_test_cases_dir: 自定义测试用例的目录 (逻辑同 yapi 方法)
Returns:
TestSummary: 测试结果摘要
"""
if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir): if custom_test_cases_dir and (not self.test_case_registry or self.test_case_registry.test_cases_dir != custom_test_cases_dir):
self.logger.info(f"从 run_tests_from_swagger 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}") self.logger.info(f"从 run_tests_from_swagger 使用新的目录重新初始化 TestCaseRegistry: {custom_test_cases_dir}")
try: try:
@ -868,11 +1030,9 @@ class APITestOrchestrator:
except Exception as e: except Exception as e:
self.logger.error(f"从 run_tests_from_swagger 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True) self.logger.error(f"从 run_tests_from_swagger 重新初始化 TestCaseRegistry 失败: {e}", exc_info=True)
self.logger.info(f"从Swagger文件加载API定义: {swagger_file_path}") self.logger.info(f"从Swagger文件加载API定义: {swagger_file_path}")
parsed_swagger = self.parser.parse_swagger_spec(swagger_file_path) parsed_swagger = self.parser.parse_swagger_spec(swagger_file_path)
summary = TestSummary()
summary = TestSummary() # 使用新的 TestSummary
if not parsed_swagger: if not parsed_swagger:
self.logger.error(f"解析Swagger文件失败: {swagger_file_path}") self.logger.error(f"解析Swagger文件失败: {swagger_file_path}")
@ -896,105 +1056,70 @@ class APITestOrchestrator:
summary.set_total_test_cases_applicable(total_applicable_tcs) summary.set_total_test_cases_applicable(total_applicable_tcs)
for endpoint in endpoints_to_test: for endpoint in endpoints_to_test:
# 将完整的 parsed_swagger 作为 global_api_spec 传递
result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_swagger) result = self.run_test_for_endpoint(endpoint, global_api_spec=parsed_swagger)
summary.add_endpoint_result(result) # 使用新的 TestSummary 方法 summary.add_endpoint_result(result)
summary.finalize_summary() # 使用新的 TestSummary 方法 summary.finalize_summary()
return summary return summary
def _generate_data_from_schema(self, schema: Dict[str, Any]) -> Any: def _generate_data_from_schema(self, schema: Dict[str, Any]) -> Any:
""" """
根据JSON Schema生成测试数据 (此方法基本保持不变可能被测试用例或编排器内部使用) 根据JSON Schema生成测试数据 (此方法基本保持不变可能被测试用例或编排器内部使用)
Args:
schema: JSON Schema
Returns:
生成的测试数据
""" """
if not schema or not isinstance(schema, dict): # 添加检查 schema 是否为 dict if not schema or not isinstance(schema, dict):
self.logger.debug(f"_generate_data_from_schema: 提供的 schema 无效或为空: {schema}") self.logger.debug(f"_generate_data_from_schema: 提供的 schema 无效或为空: {schema}")
return None return None
schema_type = schema.get('type') schema_type = schema.get('type')
# 优先使用 example 或 default
if 'example' in schema: if 'example' in schema:
return schema['example'] return schema['example']
if 'default' in schema: if 'default' in schema:
return schema['default'] return schema['default']
if schema_type == 'object': if schema_type == 'object':
# ... (内容与旧版本相同,此处省略以便简洁) ...
result = {} result = {}
properties = schema.get('properties', {}) properties = schema.get('properties', {})
required_fields = schema.get('required', [])
for prop_name, prop_schema in properties.items(): for prop_name, prop_schema in properties.items():
# 如果字段是必需的,或者我们想为所有字段生成值 result[prop_name] = self._generate_data_from_schema(prop_schema)
# 为了生成更完整的请求体,我们通常会为所有定义的属性生成值 return result if result else {}
# if prop_name in required_fields or True: # 改为总是尝试生成
result[prop_name] = self._generate_data_from_schema(prop_schema)
# 确保所有必需字段都有值,即使它们在 properties 中没有 schema不常见但可能
# for req_field in required_fields:
# if req_field not in result:
# result[req_field] = "example_required_value" # 或 None
return result if result else {} # 确保返回字典
elif schema_type == 'array': elif schema_type == 'array':
items_schema = schema.get('items', {}) items_schema = schema.get('items', {})
# 尝试生成一个或多个项,可以使用 minItems/maxItems (简化:生成一项) min_items = schema.get('minItems', 1 if schema.get('default') is None and schema.get('example') is None else 0)
min_items = schema.get('minItems', 1 if schema.get('default') is None and schema.get('example') is None else 0) # 如果有默认或示例空数组则可以为0
if min_items == 0 and (schema.get('default') == [] or schema.get('example') == []): if min_items == 0 and (schema.get('default') == [] or schema.get('example') == []):
return [] return []
num_items_to_generate = max(1, min_items) # 至少生成一项除非minItems显式为0且无内容 num_items_to_generate = max(1, min_items)
generated_array = [self._generate_data_from_schema(items_schema) for _ in range(num_items_to_generate)] generated_array = [self._generate_data_from_schema(items_schema) for _ in range(num_items_to_generate)]
# 过滤掉生成失败的 None 值,除非 schema 允许 null
# if items_schema.get('type') != 'null' and not ('null' in items_schema.get('type', []) if isinstance(items_schema.get('type'), list) else False):
# generated_array = [item for item in generated_array if item is not None]
return generated_array return generated_array
elif schema_type == 'string': elif schema_type == 'string':
string_format = schema.get('format', '') string_format = schema.get('format', '')
if 'enum' in schema and schema['enum']: # 确保 enum 非空 if 'enum' in schema and schema['enum']:
return schema['enum'][0] return schema['enum'][0]
# ... (其他格式处理与旧版类似) ...
if string_format == 'date': return '2023-01-01' if string_format == 'date': return '2023-01-01'
if string_format == 'date-time': return datetime.datetime.now().isoformat() if string_format == 'date-time': return datetime.datetime.now().isoformat()
if string_format == 'email': return 'test@example.com' if string_format == 'email': return 'test@example.com'
if string_format == 'uuid': import uuid; return str(uuid.uuid4()) if string_format == 'uuid': import uuid; return str(uuid.uuid4())
# pattern, minLength, maxLength 等可以进一步细化
return schema.get('default', schema.get('example', 'example_string')) return schema.get('default', schema.get('example', 'example_string'))
elif schema_type == 'number' or schema_type == 'integer': elif schema_type == 'number' or schema_type == 'integer':
# ... (与旧版类似,优先 default/example) ...
val = schema.get('default', schema.get('example')) val = schema.get('default', schema.get('example'))
if val is not None: return val if val is not None: return val
minimum = schema.get('minimum') minimum = schema.get('minimum')
maximum = schema.get('maximum') maximum = schema.get('maximum') # Not used yet for generation, but could be
if minimum is not None: return minimum if minimum is not None: return minimum
if maximum is not None: return maximum # (如果只有max可能需要调整)
return 0 if schema_type == 'integer' else 0.0 return 0 if schema_type == 'integer' else 0.0
elif schema_type == 'boolean': elif schema_type == 'boolean':
return schema.get('default', schema.get('example', False)) # 默认为 False return schema.get('default', schema.get('example', False))
elif schema_type == 'null': elif schema_type == 'null':
return None return None
self.logger.debug(f"_generate_data_from_schema: 未知或不支持的 schema 类型 '{schema_type}' for schema: {schema}") self.logger.debug(f"_generate_data_from_schema: 未知或不支持的 schema 类型 '{schema_type}' for schema: {schema}")
return None # 对于未知类型返回None return None
# ... (旧的 _build_api_request 和 _validate_response 基本可以移除了,因为它们的功能被新的流程覆盖) ...
# 确保删除或注释掉旧的 `_build_api_request` 和 `_validate_response` 方法,
# 因为它们的功能现在被 `_execute_single_test_case` 和 `_prepare_initial_request_data` 中的逻辑所取代或整合。
# python run_api_tests.py --base-url http://127.0.0.1:4523/m1/6386850-6083489-default --yapi assets/doc/井筒API示例.json --custom-test-cases-dir ./custom_testcases
# (示例命令行调用,需要更新以匹配新的参数)

8447
log.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,18 +0,0 @@
{
"id": "restful-url-pattern",
"name": "RESTful URL设计规则",
"description": "验证API URL是否符合RESTful设计规范",
"category": "APIDesign",
"version": "1.0.0",
"severity": "warning",
"source": null,
"is_enabled": true,
"tags": null,
"target_type": "APIRequest",
"target_identifier": null,
"lifecycle": "请求准备阶段",
"scope": "请求URL",
"code": null,
"design_aspect": "URL设计",
"pattern": "^/api/v\\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
}

View File

@ -1,19 +0,0 @@
{
"id": "standard-error-response",
"name": "标准错误响应格式规则",
"description": "验证API错误响应是否符合标准格式",
"category": "ErrorHandling",
"version": "1.0.0",
"severity": "warning",
"source": null,
"is_enabled": true,
"tags": null,
"target_type": "APIResponse",
"target_identifier": null,
"lifecycle": "响应验证阶段",
"scope": "响应体",
"code": null,
"error_code": "*",
"expected_status": 400,
"expected_message": null
}

View File

@ -1,19 +0,0 @@
{
"id": "response-time-max-500ms",
"name": "响应时间不超过500毫秒",
"description": "验证API响应时间不超过500毫秒",
"category": "Performance",
"version": "1.0.0",
"severity": "warning",
"source": null,
"is_enabled": true,
"tags": null,
"target_type": "APIResponse",
"target_identifier": null,
"lifecycle": "响应验证阶段",
"scope": "响应时间",
"code": null,
"threshold": 500.0,
"metric": "response_time",
"unit": "ms"
}

View File

@ -1,18 +0,0 @@
{
"id": "https-only-rule",
"name": "HTTPS强制使用规则",
"description": "验证API请求是否使用了HTTPS协议",
"category": "Security",
"version": "1.0.0",
"severity": "error",
"source": null,
"is_enabled": true,
"tags": null,
"target_type": "APIRequest",
"target_identifier": null,
"lifecycle": "请求准备阶段",
"scope": "安全性",
"code": null,
"check_type": "transport_security",
"expected_value": "https"
}

View File

@ -53,6 +53,22 @@ def parse_args():
default=None, # 或者 './custom_testcases' 如果想设为默认 default=None, # 或者 './custom_testcases' 如果想设为默认
help='存放自定义APITestCase Python文件的目录路径。如果未提供则不加载自定义测试。') help='存放自定义APITestCase Python文件的目录路径。如果未提供则不加载自定义测试。')
# 新增LLM 配置选项
llm_group = parser.add_argument_group('LLM 配置选项 (可选)')
llm_group.add_argument('--llm-api-key',
default=os.environ.get("OPENAI_API_KEY"), # 尝试从环境变量获取
help='LLM服务的API密钥 (例如 OpenAI API Key)。默认从环境变量 OPENAI_API_KEY 读取。')
llm_group.add_argument('--llm-base-url',
default="https://dashscope.aliyuncs.com/compatible-mode/v1",
help='LLM服务的自定义基础URL (例如 OpenAI API代理)。')
llm_group.add_argument('--llm-model-name',
default="qwen-plus", # 设置一个常用的默认模型
help='要使用的LLM模型名称 (例如 "gpt-3.5-turbo", "gpt-4")。')
llm_group.add_argument('--use-llm-for-request-body',
action='store_true',
default=True,
help='是否启用LLM为API请求生成请求体数据。')
return parser.parse_args() return parser.parse_args()
def list_yapi_categories(yapi_file: str): def list_yapi_categories(yapi_file: str):
@ -190,12 +206,17 @@ def main():
# 解析分类/标签过滤器 # 解析分类/标签过滤器
categories = args.categories.split(',') if args.categories else None categories = args.categories.split(',') if args.categories else None
tags = args.tags.split(',') if args.tags else None tags = args.tags.split(',') if args.tags else None
logger.info(f"args.api_key: {args.llm_api_key}")
# 实例化测试编排器 # 实例化测试编排器
# 将 custom_test_cases_dir 参数传递给 APITestOrchestrator 的构造函数 # 将 custom_test_cases_dir 参数传递给 APITestOrchestrator 的构造函数
orchestrator = APITestOrchestrator( orchestrator = APITestOrchestrator(
base_url=args.base_url, base_url=args.base_url,
custom_test_cases_dir=args.custom_test_cases_dir # 新增参数 custom_test_cases_dir=args.custom_test_cases_dir, # 新增参数
llm_api_key=args.llm_api_key,
llm_base_url=args.llm_base_url,
llm_model_name=args.llm_model_name,
use_llm_for_request_body=args.use_llm_for_request_body
) )
# 运行测试 # 运行测试

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,438 @@
import unittest
import logging
from typing import Optional, List, Dict, Any, Type, Union
from uuid import UUID
import datetime as dt
# 调整导入路径以适应测试文件在 tests/ 目录下的情况
import sys
import os
current_file_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_file_dir)
if project_root not in sys.path:
sys.path.insert(0, project_root)
from pydantic import BaseModel, Field, ValidationError
from pydantic.networks import EmailStr
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, _dynamic_model_cache
from ddms_compliance_suite.llm_utils.llm_service import LLMService # For orchestrator init if needed
# 基本的 Orchestrator 初始化参数,如果测试中需要实例化 Orchestrator
BASE_URL_FOR_TEST = "http://fakeapi.com"
# 全局禁用或设置较低级别的日志,以便测试输出更干净
# logging.basicConfig(level=logging.ERROR)
# logging.getLogger(\"ddms_compliance_suite.test_orchestrator\").setLevel(logging.WARNING)
# Helper functions to extract constraint values from FieldInfo.metadata
def get_metadata_constraint_value(metadata_list: list, constraint_attr_name: str) -> Any:
for m_obj in metadata_list:
if hasattr(m_obj, constraint_attr_name):
return getattr(m_obj, constraint_attr_name)
return None
class TestDynamicModelCreation(unittest.TestCase):
"""
专门测试 APITestOrchestrator._create_pydantic_model_from_schema 方法
"""
def setUp(self):
"""清除动态模型缓存,确保每个测试的独立性。"""
_dynamic_model_cache.clear()
# 创建一个Orchestrator实例_create_pydantic_model_from_schema是它的方法
# 对于仅测试 _create_pydantic_model_from_schemaLLM配置可以为None
self.orchestrator = APITestOrchestrator(base_url=BASE_URL_FOR_TEST)
# 可以通过 self.orchestrator._create_pydantic_model_from_schema 调用
def tearDown(self):
"""再次清除缓存,以防万一。"""
_dynamic_model_cache.clear()
def test_simple_object(self):
"""测试基本对象创建,包含不同类型的字段和必需字段。"""
schema = {
"type": "object",
"properties": {
"name": {"type": "string", "description": "User name"},
"age": {"type": "integer", "minimum": 0},
"email": {"type": "string", "format": "email"},
"is_active": {"type": "boolean", "default": True},
"height": {"type": "number"}
},
"required": ["name", "age"]
}
model_name = "SimpleUserModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
self.assertTrue(issubclass(DynamicModel, BaseModel))
self.assertEqual(DynamicModel.__name__, model_name)
fields = DynamicModel.model_fields
self.assertIn("name", fields)
self.assertEqual(fields["name"].annotation, str)
self.assertTrue(fields["name"].is_required())
self.assertEqual(fields["name"].description, "User name")
self.assertIn("age", fields)
age_field_info = fields["age"]
self.assertEqual(age_field_info.annotation, int)
self.assertTrue(age_field_info.is_required())
self.assertEqual(get_metadata_constraint_value(age_field_info.metadata, 'ge'), 0)
self.assertIn("email", fields)
self.assertEqual(fields["email"].annotation, Optional[EmailStr]) # Not required, so Optional
self.assertFalse(fields["email"].is_required())
self.assertIn("is_active", fields)
self.assertEqual(fields["is_active"].annotation, bool) # Corrected: Has default, so it's bool
self.assertEqual(fields["is_active"].default, True)
self.assertFalse(fields["is_active"].is_required()) # Fields with defaults are not strictly required from user input
self.assertIn("height", fields)
self.assertEqual(fields["height"].annotation, Optional[float]) # Not required
# 测试实例化和验证
valid_data = {"name": "Test", "age": 30, "email": "test@example.com", "height": 1.75}
instance = DynamicModel(**valid_data)
self.assertEqual(instance.name, "Test")
self.assertEqual(instance.is_active, True) # Default value
with self.assertRaises(ValidationError):
DynamicModel(age=-5, email="bademail") # name missing, age invalid
def test_nested_object(self):
"""测试嵌套对象的创建。"""
schema = {
"type": "object",
"properties": {
"id": {"type": "string"},
"profile": {
"type": "object",
"properties": {
"user_email": {"type": "string", "format": "email"},
"score": {"type": "integer", "default": 0}
},
"required": ["user_email"]
}
},
"required": ["id"]
}
model_name = "NestedOuterModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertIn("profile", fields)
ProfileModel = fields["profile"].annotation
self.assertTrue(hasattr(ProfileModel, '__origin__') and ProfileModel.__origin__ is Union)
self.assertIn(type(None), ProfileModel.__args__)
NestedProfileModel = [arg for arg in ProfileModel.__args__ if arg is not type(None)][0]
self.assertTrue(issubclass(NestedProfileModel, BaseModel))
self.assertEqual(NestedProfileModel.__name__, f"{model_name}_profile")
nested_fields = NestedProfileModel.model_fields
self.assertIn("user_email", nested_fields)
self.assertEqual(nested_fields["user_email"].annotation, EmailStr)
self.assertTrue(nested_fields["user_email"].is_required())
self.assertIn("score", nested_fields)
self.assertEqual(nested_fields["score"].annotation, int)
self.assertEqual(nested_fields["score"].default, 0)
# Test instantiation
valid_data = {"id": "abc", "profile": {"user_email": "nested@example.com"}}
instance = DynamicModel(**valid_data)
self.assertEqual(instance.id, "abc")
self.assertEqual(instance.profile.user_email, "nested@example.com")
self.assertEqual(instance.profile.score, 0)
def test_array_of_simple_types(self):
schema = {
"type": "object",
"properties": {
"tags": {"type": "array", "items": {"type": "string"}},
"scores": {"type": "array", "items": {"type": "integer"}, "default": []}
}
}
model_name = "ArraySimpleModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertIn("tags", fields)
self.assertEqual(fields["tags"].annotation, Optional[List[str]])
self.assertIn("scores", fields)
self.assertEqual(fields["scores"].annotation, List[int])
self.assertEqual(fields["scores"].default, [])
valid_data = {"tags": ["a", "b"], "scores": [1,2,3]}
instance = DynamicModel(**valid_data)
self.assertEqual(instance.tags, ["a", "b"])
# Test default for scores when tags is provided
instance2 = DynamicModel(tags=["c"])
self.assertEqual(instance2.scores, [])
def test_array_of_objects(self):
schema = {
"type": "object",
"properties": {
"users": {
"type": "array",
"items": {
"type": "object",
"properties": {
"username": {"type": "string"},
"user_id": {"type": "integer"}
},
"required": ["username"]
}
}
}
}
model_name = "ArrayObjectModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertIn("users", fields)
# users is Optional[List[UserModel_users_Item]]
UserListItemType = fields["users"].annotation
self.assertTrue(hasattr(UserListItemType, '__origin__') and UserListItemType.__origin__ is Union)
UserListType = [arg for arg in UserListItemType.__args__ if arg is not type(None)][0]
self.assertEqual(UserListType.__origin__, list) # Check it's a List
ItemModel = UserListType.__args__[0] # Get the item type from List[ItemType]
self.assertTrue(issubclass(ItemModel, BaseModel))
self.assertEqual(ItemModel.__name__, f"{model_name}_users_Item")
item_fields = ItemModel.model_fields
self.assertEqual(item_fields["username"].annotation, str)
self.assertTrue(item_fields["username"].is_required())
self.assertEqual(item_fields["user_id"].annotation, Optional[int])
valid_data = {"users": [{"username": "a", "user_id":1}, {"username": "b"}]}
instance = DynamicModel(**valid_data)
self.assertEqual(len(instance.users), 2)
self.assertEqual(instance.users[0].username, "a")
def test_field_constraints(self):
schema = {
"type": "object",
"properties": {
"quantity": {"type": "integer", "minimum": 1, "maximum": 100},
"code": {"type": "string", "minLength": 3, "maxLength": 5, "pattern": "^[A-Z]+$"},
"percentage": {"type": "number", "minimum": 0.0, "maximum": 1.0}
}
}
model_name = "ConstraintsModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
# Quantity (int)
quantity_field_info = fields["quantity"]
self.assertEqual(get_metadata_constraint_value(quantity_field_info.metadata, 'ge'), 1)
self.assertEqual(get_metadata_constraint_value(quantity_field_info.metadata, 'le'), 100)
# Code (str)
code_field_info = fields["code"]
self.assertEqual(get_metadata_constraint_value(code_field_info.metadata, 'min_length'), 3)
self.assertEqual(get_metadata_constraint_value(code_field_info.metadata, 'max_length'), 5)
self.assertEqual(get_metadata_constraint_value(code_field_info.metadata, 'pattern'), "^[A-Z]+$")
# Percentage (float/number)
percentage_field_info = fields["percentage"]
self.assertEqual(get_metadata_constraint_value(percentage_field_info.metadata, 'ge'), 0.0)
self.assertEqual(get_metadata_constraint_value(percentage_field_info.metadata, 'le'), 1.0)
# Test validation
with self.assertRaises(ValidationError): DynamicModel(quantity=0)
with self.assertRaises(ValidationError): DynamicModel(code="ab")
with self.assertRaises(ValidationError): DynamicModel(code="ABCDEF")
with self.assertRaises(ValidationError): DynamicModel(code="ab1")
with self.assertRaises(ValidationError): DynamicModel(percentage=1.1)
DynamicModel(quantity=50, code="XYZ", percentage=0.5) # Should be valid
def test_enum_in_description(self):
schema = {
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["active", "inactive", "pending"], "description": "Current status."}
}
}
model_name = "EnumDescModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertIn("status", fields)
self.assertIn("Enum values: active, inactive, pending", fields["status"].description)
self.assertIn("Current status.", fields["status"].description)
def test_datetime_formats(self):
schema = {
"type": "object",
"properties": {
"created_at": {"type": "string", "format": "date-time"},
"event_date": {"type": "string", "format": "date"},
"uid": {"type": "string", "format": "uuid"}
}
}
model_name = "DateTimeUUIDModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertEqual(fields["created_at"].annotation, Optional[dt.datetime])
self.assertEqual(fields["event_date"].annotation, Optional[dt.date])
self.assertEqual(fields["uid"].annotation, Optional[UUID])
valid_data = {
"created_at": "2024-01-15T10:30:00Z",
"event_date": "2024-01-15",
"uid": "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11"
}
instance = DynamicModel(**valid_data)
self.assertIsInstance(instance.created_at, dt.datetime)
self.assertIsInstance(instance.event_date, dt.date)
self.assertIsInstance(instance.uid, UUID)
def test_empty_object_schema(self):
schema = {"type": "object", "properties": {}} # Empty properties
model_name = "EmptyPropertiesModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
self.assertEqual(len(DynamicModel.model_fields), 0)
DynamicModel() # Should instantiate
schema2 = {"type": "object"} # No properties field at all
model_name2 = "NoPropertiesFieldModel"
DynamicModel2 = self.orchestrator._create_pydantic_model_from_schema(schema2, model_name2)
self.assertIsNotNone(DynamicModel2)
self.assertEqual(len(DynamicModel2.model_fields), 0)
DynamicModel2()
def test_invalid_top_level_schema(self):
schema = {"type": "string"} # Not an object
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, "InvalidSchemaModel")
self.assertIsNone(DynamicModel)
schema2 = [{"type": "object"}] # Not a dict
DynamicModel2 = self.orchestrator._create_pydantic_model_from_schema(schema2, "InvalidSchemaModel2")
self.assertIsNone(DynamicModel2)
def test_model_caching(self):
schema = {"type": "object", "properties": {"name": {"type": "string"}}}
model_name = "CachedModel"
Model1 = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(Model1)
self.assertIn(model_name, _dynamic_model_cache)
Model2 = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) # Should be from cache
self.assertIs(Model1, Model2) # Check they are the same object
def test_recursion_depth_limit(self):
# Construct a schema that would recurse indefinitely if not limited
# A: { "prop_b": B }, B: { "prop_a": A } - this is hard with current naming
# Easier: A: { "prop_a": A_prop_a }
# Let's try A: { "next": A }
# The _create_pydantic_model_from_schema method itself uses model_name + prop_name for nested models,
# so a direct self-reference in schema like {"type": "object", "properties": {"self": {"$ref": "#/"}}}
# is not fully handled yet and would rely on ForwardRef if schema was static.
# For dynamic creation, the depth limit is the main guard.
# Create a schema that nests deeply
deep_schema: Dict[str, Any] = {"type": "object", "properties": {}}
current_level = deep_schema["properties"]
# MAX_RECURSION_DEPTH in APITestOrchestrator is 10
# We create a schema of depth 11 (0 to 10 for properties)
# property name level_0 contains object with property level_1 etc.
for i in range(12): # Go a bit beyond the limit
current_level[f"level_{i}"] = {"type": "object", "properties": {}}
if i < 11: # Don't add properties to the very last one
current_level = current_level[f"level_{i}"]["properties"]
with self.assertLogs(level='ERROR') as log_watcher:
GeneratedModel = self.orchestrator._create_pydantic_model_from_schema(deep_schema, "DeepRecursiveModel")
self.assertTrue(any("达到最大递归深度" in msg for msg in log_watcher.output))
self.assertIsNotNone(GeneratedModel)
def test_name_sanitization(self):
schema = {"type": "object", "properties": {"test": {"type": "string"}}}
# Valid name
Model1 = self.orchestrator._create_pydantic_model_from_schema(schema, "ValidName123")
self.assertIsNotNone(Model1)
self.assertEqual(Model1.__name__, "ValidName123")
# Name with spaces and hyphens
Model2 = self.orchestrator._create_pydantic_model_from_schema(schema, "Invalid Name-Test")
self.assertIsNotNone(Model2)
self.assertEqual(Model2.__name__, "Invalid_Name_Test") # Check sanitized name
# Name starting with number
Model3 = self.orchestrator._create_pydantic_model_from_schema(schema, "123InvalidStart")
self.assertIsNotNone(Model3)
self.assertEqual(Model3.__name__, "DynamicModel_123InvalidStart")
# Empty name - should get a default prefix
Model4 = self.orchestrator._create_pydantic_model_from_schema(schema, "")
self.assertIsNotNone(Model4)
self.assertTrue(Model4.__name__.startswith("DynamicModel_"))
# Name that is just underscores
Model5 = self.orchestrator._create_pydantic_model_from_schema(schema, "___")
self.assertIsNotNone(Model5)
self.assertEqual(Model5.__name__, "___") # Underscores are valid but Pydantic might mangle if it's a dunder name. create_model seems to keep it.
def test_optional_logic_for_fields(self):
schema = {
"type": "object",
"properties": {
"required_field": {"type": "string"},
"optional_field_no_default": {"type": "integer"},
"optional_field_with_default": {"type": "boolean", "default": False},
"optional_nested_object": {
"type": "object",
"properties": {"value": {"type": "string"}}
}
},
"required": ["required_field"]
}
model_name = "OptionalFieldsModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertEqual(fields["required_field"].annotation, str)
self.assertTrue(fields["required_field"].is_required())
self.assertEqual(fields["optional_field_no_default"].annotation, Optional[int])
self.assertFalse(fields["optional_field_no_default"].is_required())
self.assertEqual(fields["optional_field_no_default"].default, None) # Pydantic default for Optional[T] is None
self.assertEqual(fields["optional_field_with_default"].annotation, bool)
self.assertFalse(fields["optional_field_with_default"].is_required())
self.assertEqual(fields["optional_field_with_default"].default, False)
# optional_nested_object is not required
NestedType = fields["optional_nested_object"].annotation
self.assertTrue(hasattr(NestedType, '__origin__') and NestedType.__origin__ is Union)
self.assertIn(type(None), NestedType.__args__)
ActualNestedModel = [arg for arg in NestedType.__args__ if arg is not type(None)][0]
self.assertTrue(issubclass(ActualNestedModel, BaseModel))
if __name__ == '__main__':
unittest.main()