compliance/tests/test_test_orchestrator.py
gongwenxin 00de3a880a llm
2025-05-21 16:04:09 +08:00

438 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import unittest
import logging
from typing import Optional, List, Dict, Any, Type, Union
from uuid import UUID
import datetime as dt
# 调整导入路径以适应测试文件在 tests/ 目录下的情况
import sys
import os
current_file_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_file_dir)
if project_root not in sys.path:
sys.path.insert(0, project_root)
from pydantic import BaseModel, Field, ValidationError
from pydantic.networks import EmailStr
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, _dynamic_model_cache
from ddms_compliance_suite.llm_utils.llm_service import LLMService # For orchestrator init if needed
# 基本的 Orchestrator 初始化参数,如果测试中需要实例化 Orchestrator
BASE_URL_FOR_TEST = "http://fakeapi.com"
# 全局禁用或设置较低级别的日志,以便测试输出更干净
# logging.basicConfig(level=logging.ERROR)
# logging.getLogger(\"ddms_compliance_suite.test_orchestrator\").setLevel(logging.WARNING)
# Helper functions to extract constraint values from FieldInfo.metadata
def get_metadata_constraint_value(metadata_list: list, constraint_attr_name: str) -> Any:
for m_obj in metadata_list:
if hasattr(m_obj, constraint_attr_name):
return getattr(m_obj, constraint_attr_name)
return None
class TestDynamicModelCreation(unittest.TestCase):
"""
专门测试 APITestOrchestrator._create_pydantic_model_from_schema 方法。
"""
def setUp(self):
"""清除动态模型缓存,确保每个测试的独立性。"""
_dynamic_model_cache.clear()
# 创建一个Orchestrator实例_create_pydantic_model_from_schema是它的方法
# 对于仅测试 _create_pydantic_model_from_schemaLLM配置可以为None
self.orchestrator = APITestOrchestrator(base_url=BASE_URL_FOR_TEST)
# 可以通过 self.orchestrator._create_pydantic_model_from_schema 调用
def tearDown(self):
"""再次清除缓存,以防万一。"""
_dynamic_model_cache.clear()
def test_simple_object(self):
"""测试基本对象创建,包含不同类型的字段和必需字段。"""
schema = {
"type": "object",
"properties": {
"name": {"type": "string", "description": "User name"},
"age": {"type": "integer", "minimum": 0},
"email": {"type": "string", "format": "email"},
"is_active": {"type": "boolean", "default": True},
"height": {"type": "number"}
},
"required": ["name", "age"]
}
model_name = "SimpleUserModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
self.assertTrue(issubclass(DynamicModel, BaseModel))
self.assertEqual(DynamicModel.__name__, model_name)
fields = DynamicModel.model_fields
self.assertIn("name", fields)
self.assertEqual(fields["name"].annotation, str)
self.assertTrue(fields["name"].is_required())
self.assertEqual(fields["name"].description, "User name")
self.assertIn("age", fields)
age_field_info = fields["age"]
self.assertEqual(age_field_info.annotation, int)
self.assertTrue(age_field_info.is_required())
self.assertEqual(get_metadata_constraint_value(age_field_info.metadata, 'ge'), 0)
self.assertIn("email", fields)
self.assertEqual(fields["email"].annotation, Optional[EmailStr]) # Not required, so Optional
self.assertFalse(fields["email"].is_required())
self.assertIn("is_active", fields)
self.assertEqual(fields["is_active"].annotation, bool) # Corrected: Has default, so it's bool
self.assertEqual(fields["is_active"].default, True)
self.assertFalse(fields["is_active"].is_required()) # Fields with defaults are not strictly required from user input
self.assertIn("height", fields)
self.assertEqual(fields["height"].annotation, Optional[float]) # Not required
# 测试实例化和验证
valid_data = {"name": "Test", "age": 30, "email": "test@example.com", "height": 1.75}
instance = DynamicModel(**valid_data)
self.assertEqual(instance.name, "Test")
self.assertEqual(instance.is_active, True) # Default value
with self.assertRaises(ValidationError):
DynamicModel(age=-5, email="bademail") # name missing, age invalid
def test_nested_object(self):
"""测试嵌套对象的创建。"""
schema = {
"type": "object",
"properties": {
"id": {"type": "string"},
"profile": {
"type": "object",
"properties": {
"user_email": {"type": "string", "format": "email"},
"score": {"type": "integer", "default": 0}
},
"required": ["user_email"]
}
},
"required": ["id"]
}
model_name = "NestedOuterModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertIn("profile", fields)
ProfileModel = fields["profile"].annotation
self.assertTrue(hasattr(ProfileModel, '__origin__') and ProfileModel.__origin__ is Union)
self.assertIn(type(None), ProfileModel.__args__)
NestedProfileModel = [arg for arg in ProfileModel.__args__ if arg is not type(None)][0]
self.assertTrue(issubclass(NestedProfileModel, BaseModel))
self.assertEqual(NestedProfileModel.__name__, f"{model_name}_profile")
nested_fields = NestedProfileModel.model_fields
self.assertIn("user_email", nested_fields)
self.assertEqual(nested_fields["user_email"].annotation, EmailStr)
self.assertTrue(nested_fields["user_email"].is_required())
self.assertIn("score", nested_fields)
self.assertEqual(nested_fields["score"].annotation, int)
self.assertEqual(nested_fields["score"].default, 0)
# Test instantiation
valid_data = {"id": "abc", "profile": {"user_email": "nested@example.com"}}
instance = DynamicModel(**valid_data)
self.assertEqual(instance.id, "abc")
self.assertEqual(instance.profile.user_email, "nested@example.com")
self.assertEqual(instance.profile.score, 0)
def test_array_of_simple_types(self):
schema = {
"type": "object",
"properties": {
"tags": {"type": "array", "items": {"type": "string"}},
"scores": {"type": "array", "items": {"type": "integer"}, "default": []}
}
}
model_name = "ArraySimpleModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertIn("tags", fields)
self.assertEqual(fields["tags"].annotation, Optional[List[str]])
self.assertIn("scores", fields)
self.assertEqual(fields["scores"].annotation, List[int])
self.assertEqual(fields["scores"].default, [])
valid_data = {"tags": ["a", "b"], "scores": [1,2,3]}
instance = DynamicModel(**valid_data)
self.assertEqual(instance.tags, ["a", "b"])
# Test default for scores when tags is provided
instance2 = DynamicModel(tags=["c"])
self.assertEqual(instance2.scores, [])
def test_array_of_objects(self):
schema = {
"type": "object",
"properties": {
"users": {
"type": "array",
"items": {
"type": "object",
"properties": {
"username": {"type": "string"},
"user_id": {"type": "integer"}
},
"required": ["username"]
}
}
}
}
model_name = "ArrayObjectModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertIn("users", fields)
# users is Optional[List[UserModel_users_Item]]
UserListItemType = fields["users"].annotation
self.assertTrue(hasattr(UserListItemType, '__origin__') and UserListItemType.__origin__ is Union)
UserListType = [arg for arg in UserListItemType.__args__ if arg is not type(None)][0]
self.assertEqual(UserListType.__origin__, list) # Check it's a List
ItemModel = UserListType.__args__[0] # Get the item type from List[ItemType]
self.assertTrue(issubclass(ItemModel, BaseModel))
self.assertEqual(ItemModel.__name__, f"{model_name}_users_Item")
item_fields = ItemModel.model_fields
self.assertEqual(item_fields["username"].annotation, str)
self.assertTrue(item_fields["username"].is_required())
self.assertEqual(item_fields["user_id"].annotation, Optional[int])
valid_data = {"users": [{"username": "a", "user_id":1}, {"username": "b"}]}
instance = DynamicModel(**valid_data)
self.assertEqual(len(instance.users), 2)
self.assertEqual(instance.users[0].username, "a")
def test_field_constraints(self):
schema = {
"type": "object",
"properties": {
"quantity": {"type": "integer", "minimum": 1, "maximum": 100},
"code": {"type": "string", "minLength": 3, "maxLength": 5, "pattern": "^[A-Z]+$"},
"percentage": {"type": "number", "minimum": 0.0, "maximum": 1.0}
}
}
model_name = "ConstraintsModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
# Quantity (int)
quantity_field_info = fields["quantity"]
self.assertEqual(get_metadata_constraint_value(quantity_field_info.metadata, 'ge'), 1)
self.assertEqual(get_metadata_constraint_value(quantity_field_info.metadata, 'le'), 100)
# Code (str)
code_field_info = fields["code"]
self.assertEqual(get_metadata_constraint_value(code_field_info.metadata, 'min_length'), 3)
self.assertEqual(get_metadata_constraint_value(code_field_info.metadata, 'max_length'), 5)
self.assertEqual(get_metadata_constraint_value(code_field_info.metadata, 'pattern'), "^[A-Z]+$")
# Percentage (float/number)
percentage_field_info = fields["percentage"]
self.assertEqual(get_metadata_constraint_value(percentage_field_info.metadata, 'ge'), 0.0)
self.assertEqual(get_metadata_constraint_value(percentage_field_info.metadata, 'le'), 1.0)
# Test validation
with self.assertRaises(ValidationError): DynamicModel(quantity=0)
with self.assertRaises(ValidationError): DynamicModel(code="ab")
with self.assertRaises(ValidationError): DynamicModel(code="ABCDEF")
with self.assertRaises(ValidationError): DynamicModel(code="ab1")
with self.assertRaises(ValidationError): DynamicModel(percentage=1.1)
DynamicModel(quantity=50, code="XYZ", percentage=0.5) # Should be valid
def test_enum_in_description(self):
schema = {
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["active", "inactive", "pending"], "description": "Current status."}
}
}
model_name = "EnumDescModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertIn("status", fields)
self.assertIn("Enum values: active, inactive, pending", fields["status"].description)
self.assertIn("Current status.", fields["status"].description)
def test_datetime_formats(self):
schema = {
"type": "object",
"properties": {
"created_at": {"type": "string", "format": "date-time"},
"event_date": {"type": "string", "format": "date"},
"uid": {"type": "string", "format": "uuid"}
}
}
model_name = "DateTimeUUIDModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertEqual(fields["created_at"].annotation, Optional[dt.datetime])
self.assertEqual(fields["event_date"].annotation, Optional[dt.date])
self.assertEqual(fields["uid"].annotation, Optional[UUID])
valid_data = {
"created_at": "2024-01-15T10:30:00Z",
"event_date": "2024-01-15",
"uid": "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11"
}
instance = DynamicModel(**valid_data)
self.assertIsInstance(instance.created_at, dt.datetime)
self.assertIsInstance(instance.event_date, dt.date)
self.assertIsInstance(instance.uid, UUID)
def test_empty_object_schema(self):
schema = {"type": "object", "properties": {}} # Empty properties
model_name = "EmptyPropertiesModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
self.assertEqual(len(DynamicModel.model_fields), 0)
DynamicModel() # Should instantiate
schema2 = {"type": "object"} # No properties field at all
model_name2 = "NoPropertiesFieldModel"
DynamicModel2 = self.orchestrator._create_pydantic_model_from_schema(schema2, model_name2)
self.assertIsNotNone(DynamicModel2)
self.assertEqual(len(DynamicModel2.model_fields), 0)
DynamicModel2()
def test_invalid_top_level_schema(self):
schema = {"type": "string"} # Not an object
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, "InvalidSchemaModel")
self.assertIsNone(DynamicModel)
schema2 = [{"type": "object"}] # Not a dict
DynamicModel2 = self.orchestrator._create_pydantic_model_from_schema(schema2, "InvalidSchemaModel2")
self.assertIsNone(DynamicModel2)
def test_model_caching(self):
schema = {"type": "object", "properties": {"name": {"type": "string"}}}
model_name = "CachedModel"
Model1 = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(Model1)
self.assertIn(model_name, _dynamic_model_cache)
Model2 = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) # Should be from cache
self.assertIs(Model1, Model2) # Check they are the same object
def test_recursion_depth_limit(self):
# Construct a schema that would recurse indefinitely if not limited
# A: { "prop_b": B }, B: { "prop_a": A } - this is hard with current naming
# Easier: A: { "prop_a": A_prop_a }
# Let's try A: { "next": A }
# The _create_pydantic_model_from_schema method itself uses model_name + prop_name for nested models,
# so a direct self-reference in schema like {"type": "object", "properties": {"self": {"$ref": "#/"}}}
# is not fully handled yet and would rely on ForwardRef if schema was static.
# For dynamic creation, the depth limit is the main guard.
# Create a schema that nests deeply
deep_schema: Dict[str, Any] = {"type": "object", "properties": {}}
current_level = deep_schema["properties"]
# MAX_RECURSION_DEPTH in APITestOrchestrator is 10
# We create a schema of depth 11 (0 to 10 for properties)
# property name level_0 contains object with property level_1 etc.
for i in range(12): # Go a bit beyond the limit
current_level[f"level_{i}"] = {"type": "object", "properties": {}}
if i < 11: # Don't add properties to the very last one
current_level = current_level[f"level_{i}"]["properties"]
with self.assertLogs(level='ERROR') as log_watcher:
GeneratedModel = self.orchestrator._create_pydantic_model_from_schema(deep_schema, "DeepRecursiveModel")
self.assertTrue(any("达到最大递归深度" in msg for msg in log_watcher.output))
self.assertIsNotNone(GeneratedModel)
def test_name_sanitization(self):
schema = {"type": "object", "properties": {"test": {"type": "string"}}}
# Valid name
Model1 = self.orchestrator._create_pydantic_model_from_schema(schema, "ValidName123")
self.assertIsNotNone(Model1)
self.assertEqual(Model1.__name__, "ValidName123")
# Name with spaces and hyphens
Model2 = self.orchestrator._create_pydantic_model_from_schema(schema, "Invalid Name-Test")
self.assertIsNotNone(Model2)
self.assertEqual(Model2.__name__, "Invalid_Name_Test") # Check sanitized name
# Name starting with number
Model3 = self.orchestrator._create_pydantic_model_from_schema(schema, "123InvalidStart")
self.assertIsNotNone(Model3)
self.assertEqual(Model3.__name__, "DynamicModel_123InvalidStart")
# Empty name - should get a default prefix
Model4 = self.orchestrator._create_pydantic_model_from_schema(schema, "")
self.assertIsNotNone(Model4)
self.assertTrue(Model4.__name__.startswith("DynamicModel_"))
# Name that is just underscores
Model5 = self.orchestrator._create_pydantic_model_from_schema(schema, "___")
self.assertIsNotNone(Model5)
self.assertEqual(Model5.__name__, "___") # Underscores are valid but Pydantic might mangle if it's a dunder name. create_model seems to keep it.
def test_optional_logic_for_fields(self):
schema = {
"type": "object",
"properties": {
"required_field": {"type": "string"},
"optional_field_no_default": {"type": "integer"},
"optional_field_with_default": {"type": "boolean", "default": False},
"optional_nested_object": {
"type": "object",
"properties": {"value": {"type": "string"}}
}
},
"required": ["required_field"]
}
model_name = "OptionalFieldsModel"
DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name)
self.assertIsNotNone(DynamicModel)
fields = DynamicModel.model_fields
self.assertEqual(fields["required_field"].annotation, str)
self.assertTrue(fields["required_field"].is_required())
self.assertEqual(fields["optional_field_no_default"].annotation, Optional[int])
self.assertFalse(fields["optional_field_no_default"].is_required())
self.assertEqual(fields["optional_field_no_default"].default, None) # Pydantic default for Optional[T] is None
self.assertEqual(fields["optional_field_with_default"].annotation, bool)
self.assertFalse(fields["optional_field_with_default"].is_required())
self.assertEqual(fields["optional_field_with_default"].default, False)
# optional_nested_object is not required
NestedType = fields["optional_nested_object"].annotation
self.assertTrue(hasattr(NestedType, '__origin__') and NestedType.__origin__ is Union)
self.assertIn(type(None), NestedType.__args__)
ActualNestedModel = [arg for arg in NestedType.__args__ if arg is not type(None)][0]
self.assertTrue(issubclass(ActualNestedModel, BaseModel))
if __name__ == '__main__':
unittest.main()