import unittest import logging from typing import Optional, List, Dict, Any, Type, Union from uuid import UUID import datetime as dt # 调整导入路径以适应测试文件在 tests/ 目录下的情况 import sys import os current_file_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_file_dir) if project_root not in sys.path: sys.path.insert(0, project_root) from pydantic import BaseModel, Field, ValidationError from pydantic.networks import EmailStr from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, _dynamic_model_cache from ddms_compliance_suite.llm_utils.llm_service import LLMService # For orchestrator init if needed # 基本的 Orchestrator 初始化参数,如果测试中需要实例化 Orchestrator BASE_URL_FOR_TEST = "http://fakeapi.com" # 全局禁用或设置较低级别的日志,以便测试输出更干净 # logging.basicConfig(level=logging.ERROR) # logging.getLogger(\"ddms_compliance_suite.test_orchestrator\").setLevel(logging.WARNING) # Helper functions to extract constraint values from FieldInfo.metadata def get_metadata_constraint_value(metadata_list: list, constraint_attr_name: str) -> Any: for m_obj in metadata_list: if hasattr(m_obj, constraint_attr_name): return getattr(m_obj, constraint_attr_name) return None class TestDynamicModelCreation(unittest.TestCase): """ 专门测试 APITestOrchestrator._create_pydantic_model_from_schema 方法。 """ def setUp(self): """清除动态模型缓存,确保每个测试的独立性。""" _dynamic_model_cache.clear() # 创建一个Orchestrator实例,_create_pydantic_model_from_schema是它的方法 # 对于仅测试 _create_pydantic_model_from_schema,LLM配置可以为None self.orchestrator = APITestOrchestrator(base_url=BASE_URL_FOR_TEST) # 可以通过 self.orchestrator._create_pydantic_model_from_schema 调用 def tearDown(self): """再次清除缓存,以防万一。""" _dynamic_model_cache.clear() def test_simple_object(self): """测试基本对象创建,包含不同类型的字段和必需字段。""" schema = { "type": "object", "properties": { "name": {"type": "string", "description": "User name"}, "age": {"type": "integer", "minimum": 0}, "email": {"type": "string", "format": "email"}, "is_active": {"type": "boolean", "default": True}, "height": {"type": "number"} }, "required": ["name", "age"] } model_name = "SimpleUserModel" DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(DynamicModel) self.assertTrue(issubclass(DynamicModel, BaseModel)) self.assertEqual(DynamicModel.__name__, model_name) fields = DynamicModel.model_fields self.assertIn("name", fields) self.assertEqual(fields["name"].annotation, str) self.assertTrue(fields["name"].is_required()) self.assertEqual(fields["name"].description, "User name") self.assertIn("age", fields) age_field_info = fields["age"] self.assertEqual(age_field_info.annotation, int) self.assertTrue(age_field_info.is_required()) self.assertEqual(get_metadata_constraint_value(age_field_info.metadata, 'ge'), 0) self.assertIn("email", fields) self.assertEqual(fields["email"].annotation, Optional[EmailStr]) # Not required, so Optional self.assertFalse(fields["email"].is_required()) self.assertIn("is_active", fields) self.assertEqual(fields["is_active"].annotation, bool) # Corrected: Has default, so it's bool self.assertEqual(fields["is_active"].default, True) self.assertFalse(fields["is_active"].is_required()) # Fields with defaults are not strictly required from user input self.assertIn("height", fields) self.assertEqual(fields["height"].annotation, Optional[float]) # Not required # 测试实例化和验证 valid_data = {"name": "Test", "age": 30, "email": "test@example.com", "height": 1.75} instance = DynamicModel(**valid_data) self.assertEqual(instance.name, "Test") self.assertEqual(instance.is_active, True) # Default value with self.assertRaises(ValidationError): DynamicModel(age=-5, email="bademail") # name missing, age invalid def test_nested_object(self): """测试嵌套对象的创建。""" schema = { "type": "object", "properties": { "id": {"type": "string"}, "profile": { "type": "object", "properties": { "user_email": {"type": "string", "format": "email"}, "score": {"type": "integer", "default": 0} }, "required": ["user_email"] } }, "required": ["id"] } model_name = "NestedOuterModel" DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(DynamicModel) fields = DynamicModel.model_fields self.assertIn("profile", fields) ProfileModel = fields["profile"].annotation self.assertTrue(hasattr(ProfileModel, '__origin__') and ProfileModel.__origin__ is Union) self.assertIn(type(None), ProfileModel.__args__) NestedProfileModel = [arg for arg in ProfileModel.__args__ if arg is not type(None)][0] self.assertTrue(issubclass(NestedProfileModel, BaseModel)) self.assertEqual(NestedProfileModel.__name__, f"{model_name}_profile") nested_fields = NestedProfileModel.model_fields self.assertIn("user_email", nested_fields) self.assertEqual(nested_fields["user_email"].annotation, EmailStr) self.assertTrue(nested_fields["user_email"].is_required()) self.assertIn("score", nested_fields) self.assertEqual(nested_fields["score"].annotation, int) self.assertEqual(nested_fields["score"].default, 0) # Test instantiation valid_data = {"id": "abc", "profile": {"user_email": "nested@example.com"}} instance = DynamicModel(**valid_data) self.assertEqual(instance.id, "abc") self.assertEqual(instance.profile.user_email, "nested@example.com") self.assertEqual(instance.profile.score, 0) def test_array_of_simple_types(self): schema = { "type": "object", "properties": { "tags": {"type": "array", "items": {"type": "string"}}, "scores": {"type": "array", "items": {"type": "integer"}, "default": []} } } model_name = "ArraySimpleModel" DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(DynamicModel) fields = DynamicModel.model_fields self.assertIn("tags", fields) self.assertEqual(fields["tags"].annotation, Optional[List[str]]) self.assertIn("scores", fields) self.assertEqual(fields["scores"].annotation, List[int]) self.assertEqual(fields["scores"].default, []) valid_data = {"tags": ["a", "b"], "scores": [1,2,3]} instance = DynamicModel(**valid_data) self.assertEqual(instance.tags, ["a", "b"]) # Test default for scores when tags is provided instance2 = DynamicModel(tags=["c"]) self.assertEqual(instance2.scores, []) def test_array_of_objects(self): schema = { "type": "object", "properties": { "users": { "type": "array", "items": { "type": "object", "properties": { "username": {"type": "string"}, "user_id": {"type": "integer"} }, "required": ["username"] } } } } model_name = "ArrayObjectModel" DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(DynamicModel) fields = DynamicModel.model_fields self.assertIn("users", fields) # users is Optional[List[UserModel_users_Item]] UserListItemType = fields["users"].annotation self.assertTrue(hasattr(UserListItemType, '__origin__') and UserListItemType.__origin__ is Union) UserListType = [arg for arg in UserListItemType.__args__ if arg is not type(None)][0] self.assertEqual(UserListType.__origin__, list) # Check it's a List ItemModel = UserListType.__args__[0] # Get the item type from List[ItemType] self.assertTrue(issubclass(ItemModel, BaseModel)) self.assertEqual(ItemModel.__name__, f"{model_name}_users_Item") item_fields = ItemModel.model_fields self.assertEqual(item_fields["username"].annotation, str) self.assertTrue(item_fields["username"].is_required()) self.assertEqual(item_fields["user_id"].annotation, Optional[int]) valid_data = {"users": [{"username": "a", "user_id":1}, {"username": "b"}]} instance = DynamicModel(**valid_data) self.assertEqual(len(instance.users), 2) self.assertEqual(instance.users[0].username, "a") def test_field_constraints(self): schema = { "type": "object", "properties": { "quantity": {"type": "integer", "minimum": 1, "maximum": 100}, "code": {"type": "string", "minLength": 3, "maxLength": 5, "pattern": "^[A-Z]+$"}, "percentage": {"type": "number", "minimum": 0.0, "maximum": 1.0} } } model_name = "ConstraintsModel" DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(DynamicModel) fields = DynamicModel.model_fields # Quantity (int) quantity_field_info = fields["quantity"] self.assertEqual(get_metadata_constraint_value(quantity_field_info.metadata, 'ge'), 1) self.assertEqual(get_metadata_constraint_value(quantity_field_info.metadata, 'le'), 100) # Code (str) code_field_info = fields["code"] self.assertEqual(get_metadata_constraint_value(code_field_info.metadata, 'min_length'), 3) self.assertEqual(get_metadata_constraint_value(code_field_info.metadata, 'max_length'), 5) self.assertEqual(get_metadata_constraint_value(code_field_info.metadata, 'pattern'), "^[A-Z]+$") # Percentage (float/number) percentage_field_info = fields["percentage"] self.assertEqual(get_metadata_constraint_value(percentage_field_info.metadata, 'ge'), 0.0) self.assertEqual(get_metadata_constraint_value(percentage_field_info.metadata, 'le'), 1.0) # Test validation with self.assertRaises(ValidationError): DynamicModel(quantity=0) with self.assertRaises(ValidationError): DynamicModel(code="ab") with self.assertRaises(ValidationError): DynamicModel(code="ABCDEF") with self.assertRaises(ValidationError): DynamicModel(code="ab1") with self.assertRaises(ValidationError): DynamicModel(percentage=1.1) DynamicModel(quantity=50, code="XYZ", percentage=0.5) # Should be valid def test_enum_in_description(self): schema = { "type": "object", "properties": { "status": {"type": "string", "enum": ["active", "inactive", "pending"], "description": "Current status."} } } model_name = "EnumDescModel" DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(DynamicModel) fields = DynamicModel.model_fields self.assertIn("status", fields) self.assertIn("Enum values: active, inactive, pending", fields["status"].description) self.assertIn("Current status.", fields["status"].description) def test_datetime_formats(self): schema = { "type": "object", "properties": { "created_at": {"type": "string", "format": "date-time"}, "event_date": {"type": "string", "format": "date"}, "uid": {"type": "string", "format": "uuid"} } } model_name = "DateTimeUUIDModel" DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(DynamicModel) fields = DynamicModel.model_fields self.assertEqual(fields["created_at"].annotation, Optional[dt.datetime]) self.assertEqual(fields["event_date"].annotation, Optional[dt.date]) self.assertEqual(fields["uid"].annotation, Optional[UUID]) valid_data = { "created_at": "2024-01-15T10:30:00Z", "event_date": "2024-01-15", "uid": "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" } instance = DynamicModel(**valid_data) self.assertIsInstance(instance.created_at, dt.datetime) self.assertIsInstance(instance.event_date, dt.date) self.assertIsInstance(instance.uid, UUID) def test_empty_object_schema(self): schema = {"type": "object", "properties": {}} # Empty properties model_name = "EmptyPropertiesModel" DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(DynamicModel) self.assertEqual(len(DynamicModel.model_fields), 0) DynamicModel() # Should instantiate schema2 = {"type": "object"} # No properties field at all model_name2 = "NoPropertiesFieldModel" DynamicModel2 = self.orchestrator._create_pydantic_model_from_schema(schema2, model_name2) self.assertIsNotNone(DynamicModel2) self.assertEqual(len(DynamicModel2.model_fields), 0) DynamicModel2() def test_invalid_top_level_schema(self): schema = {"type": "string"} # Not an object DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, "InvalidSchemaModel") self.assertIsNone(DynamicModel) schema2 = [{"type": "object"}] # Not a dict DynamicModel2 = self.orchestrator._create_pydantic_model_from_schema(schema2, "InvalidSchemaModel2") self.assertIsNone(DynamicModel2) def test_model_caching(self): schema = {"type": "object", "properties": {"name": {"type": "string"}}} model_name = "CachedModel" Model1 = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(Model1) self.assertIn(model_name, _dynamic_model_cache) Model2 = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) # Should be from cache self.assertIs(Model1, Model2) # Check they are the same object def test_recursion_depth_limit(self): # Construct a schema that would recurse indefinitely if not limited # A: { "prop_b": B }, B: { "prop_a": A } - this is hard with current naming # Easier: A: { "prop_a": A_prop_a } # Let's try A: { "next": A } # The _create_pydantic_model_from_schema method itself uses model_name + prop_name for nested models, # so a direct self-reference in schema like {"type": "object", "properties": {"self": {"$ref": "#/"}}} # is not fully handled yet and would rely on ForwardRef if schema was static. # For dynamic creation, the depth limit is the main guard. # Create a schema that nests deeply deep_schema: Dict[str, Any] = {"type": "object", "properties": {}} current_level = deep_schema["properties"] # MAX_RECURSION_DEPTH in APITestOrchestrator is 10 # We create a schema of depth 11 (0 to 10 for properties) # property name level_0 contains object with property level_1 etc. for i in range(12): # Go a bit beyond the limit current_level[f"level_{i}"] = {"type": "object", "properties": {}} if i < 11: # Don't add properties to the very last one current_level = current_level[f"level_{i}"]["properties"] with self.assertLogs(level='ERROR') as log_watcher: GeneratedModel = self.orchestrator._create_pydantic_model_from_schema(deep_schema, "DeepRecursiveModel") self.assertTrue(any("达到最大递归深度" in msg for msg in log_watcher.output)) self.assertIsNotNone(GeneratedModel) def test_name_sanitization(self): schema = {"type": "object", "properties": {"test": {"type": "string"}}} # Valid name Model1 = self.orchestrator._create_pydantic_model_from_schema(schema, "ValidName123") self.assertIsNotNone(Model1) self.assertEqual(Model1.__name__, "ValidName123") # Name with spaces and hyphens Model2 = self.orchestrator._create_pydantic_model_from_schema(schema, "Invalid Name-Test") self.assertIsNotNone(Model2) self.assertEqual(Model2.__name__, "Invalid_Name_Test") # Check sanitized name # Name starting with number Model3 = self.orchestrator._create_pydantic_model_from_schema(schema, "123InvalidStart") self.assertIsNotNone(Model3) self.assertEqual(Model3.__name__, "DynamicModel_123InvalidStart") # Empty name - should get a default prefix Model4 = self.orchestrator._create_pydantic_model_from_schema(schema, "") self.assertIsNotNone(Model4) self.assertTrue(Model4.__name__.startswith("DynamicModel_")) # Name that is just underscores Model5 = self.orchestrator._create_pydantic_model_from_schema(schema, "___") self.assertIsNotNone(Model5) self.assertEqual(Model5.__name__, "___") # Underscores are valid but Pydantic might mangle if it's a dunder name. create_model seems to keep it. def test_optional_logic_for_fields(self): schema = { "type": "object", "properties": { "required_field": {"type": "string"}, "optional_field_no_default": {"type": "integer"}, "optional_field_with_default": {"type": "boolean", "default": False}, "optional_nested_object": { "type": "object", "properties": {"value": {"type": "string"}} } }, "required": ["required_field"] } model_name = "OptionalFieldsModel" DynamicModel = self.orchestrator._create_pydantic_model_from_schema(schema, model_name) self.assertIsNotNone(DynamicModel) fields = DynamicModel.model_fields self.assertEqual(fields["required_field"].annotation, str) self.assertTrue(fields["required_field"].is_required()) self.assertEqual(fields["optional_field_no_default"].annotation, Optional[int]) self.assertFalse(fields["optional_field_no_default"].is_required()) self.assertEqual(fields["optional_field_no_default"].default, None) # Pydantic default for Optional[T] is None self.assertEqual(fields["optional_field_with_default"].annotation, bool) self.assertFalse(fields["optional_field_with_default"].is_required()) self.assertEqual(fields["optional_field_with_default"].default, False) # optional_nested_object is not required NestedType = fields["optional_nested_object"].annotation self.assertTrue(hasattr(NestedType, '__origin__') and NestedType.__origin__ is Union) self.assertIn(type(None), NestedType.__args__) ActualNestedModel = [arg for arg in NestedType.__args__ if arg is not type(None)][0] self.assertTrue(issubclass(ActualNestedModel, BaseModel)) if __name__ == '__main__': unittest.main()