""" This module contains the DataGenerator class for creating test data from JSON schemas. """ import logging import datetime import uuid from typing import Dict, Any, Optional, List # 导入井数据管理器 try: from .well_data_manager import WellDataManager except ImportError: WellDataManager = None class DataGenerator: """ Generates test data based on a JSON Schema. """ def __init__(self, logger_param: Optional[logging.Logger] = None, well_data_manager: Optional['WellDataManager'] = None): """ Initializes the data generator. Args: logger_param: Optional logger instance. If not provided, a module-level logger is used. well_data_manager: Optional well data manager for providing real well data. """ self.logger = logger_param or logging.getLogger(__name__) self.well_data_manager = well_data_manager def generate_data_from_schema(self, schema: Dict[str, Any], context_name: Optional[str] = None, operation_id: Optional[str] = None, llm_service=None) -> Any: """ Generates test data from a JSON Schema. This method was extracted and generalized from APITestOrchestrator. Args: schema: The JSON schema to generate data from. context_name: A name for the context (e.g., 'requestBody'), for logging. operation_id: The operation ID, for logging. llm_service: Optional LLM service for intelligent data generation. Returns: Generated data that conforms to the schema. """ log_prefix = f"[{operation_id}] " if operation_id else "" context_log = f" (context: {context_name})" if context_name else "" if not schema or not isinstance(schema, dict): self.logger.debug(f"{log_prefix}generate_data_from_schema: Invalid or empty schema provided{context_log}: {schema}") return None # Handle schema composition keywords if 'oneOf' in schema or 'anyOf' in schema: schemas_to_try = schema.get('oneOf') or schema.get('anyOf') if schemas_to_try and isinstance(schemas_to_try, list) and schemas_to_try: self.logger.debug(f"{log_prefix}Processing oneOf/anyOf, selecting the first schema for{context_log}") return self.generate_data_from_schema(schemas_to_try[0], context_name, operation_id) if 'allOf' in schema: merged_schema = {} for sub_schema in schema.get('allOf', []): merged_schema.update(sub_schema) self.logger.debug(f"{log_prefix}Processing allOf, merging schemas for{context_log}") schema = merged_schema # Use example or default values if available if 'example' in schema: self.logger.debug(f"{log_prefix}Using 'example' value from schema for{context_log}: {schema['example']}") return schema['example'] if 'default' in schema: self.logger.debug(f"{log_prefix}Using 'default' value from schema for{context_log}: {schema['default']}") return schema['default'] schema_type = schema.get('type') # Handle both 'object' and 'Object' (case-insensitive) if schema_type and schema_type.lower() == 'object': # 尝试使用LLM智能生成(如果可用且schema包含描述信息) if llm_service and self._should_use_llm_for_schema(schema): try: llm_data = self._generate_with_llm(schema, llm_service, context_name, operation_id) if llm_data is not None: self.logger.debug(f"{log_prefix}LLM successfully generated data for{context_log}") # 🔑 关键修复:LLM生成的数据也需要井数据增强 if self.well_data_manager and isinstance(llm_data, dict): llm_data = self.well_data_manager.enhance_data_with_well_values(llm_data) return llm_data except Exception as e: self.logger.debug(f"{log_prefix}LLM generation failed for{context_log}: {e}, falling back to traditional generation") # 传统生成方式 result = {} properties = schema.get('properties', {}) self.logger.debug(f"{log_prefix}Generating object data for{context_log}. Properties: {list(properties.keys())}") for prop_name, prop_schema in properties.items(): nested_context = f"{context_name}.{prop_name}" if context_name else prop_name result[prop_name] = self.generate_data_from_schema(prop_schema, nested_context, operation_id, llm_service) additional_properties = schema.get('additionalProperties') if isinstance(additional_properties, dict): self.logger.debug(f"{log_prefix}Generating an example property for additionalProperties for{context_log}") result['additionalProp1'] = self.generate_data_from_schema(additional_properties, f"{context_name}.additionalProp1", operation_id, llm_service) # 使用井数据管理器增强数据 if self.well_data_manager: result = self.well_data_manager.enhance_data_with_well_values(result) return result # Handle both 'array' and 'Array' (case-insensitive) elif schema_type and schema_type.lower() == 'array': items_schema = schema.get('items', {}) min_items = schema.get('minItems', 1) self.logger.debug(f"{log_prefix}Generating array data for{context_log}. Items schema: {items_schema}, minItems: {min_items}") num_items_to_generate = max(1, min_items) generated_array = [] for i in range(num_items_to_generate): item_context = f"{context_name}[{i}]" if context_name else f"array_item[{i}]" generated_array.append(self.generate_data_from_schema(items_schema, item_context, operation_id)) return generated_array # Handle both 'string' and 'String' (case-insensitive) elif schema_type and schema_type.lower() == 'string': string_format = schema.get('format', '') if 'enum' in schema and schema['enum']: return schema['enum'][0] if string_format == 'date': return datetime.date.today().isoformat() if string_format == 'date-time': return datetime.datetime.now().isoformat() if string_format == 'email': return 'test@example.com' if string_format == 'uuid': return str(uuid.uuid4()) # 检查是否为井相关字段,如果是则尝试使用真实数据 if self.well_data_manager and context_name: # 从context_name中提取字段名(去掉路径前缀) field_name = context_name.split('.')[-1] if '.' in context_name else context_name if self.well_data_manager.is_well_related_field(field_name): real_value = self.well_data_manager.get_well_value_for_field(field_name) if real_value is not None: self.logger.info(f"{log_prefix}🔄 使用真实井数据替换字段 '{field_name}': {real_value}") return str(real_value) return 'example_string' # Handle both 'number'/'Number' and 'integer'/'Integer' (case-insensitive) elif schema_type and schema_type.lower() in ['number', 'integer']: minimum = schema.get('minimum') if minimum is not None: return minimum return 0 if schema_type.lower() == 'integer' else 0.0 # Handle both 'boolean' and 'Boolean' (case-insensitive) elif schema_type and schema_type.lower() == 'boolean': return schema.get('default', False) elif schema_type == 'null': return None self.logger.warning(f"{log_prefix}Unsupported schema type '{schema_type}' in {context_log}. Schema: {schema}") return None def _should_use_llm_for_schema(self, schema: Dict[str, Any]) -> bool: """判断是否应该使用LLM来生成数据""" # 检查schema是否包含足够的描述信息来让LLM理解 properties = schema.get('properties', {}) # 如果有字段包含描述信息,就使用LLM for prop_name, prop_schema in properties.items(): if isinstance(prop_schema, dict): # 检查是否有描述信息 if prop_schema.get('description') or prop_schema.get('title'): return True # 检查是否有特殊的业务字段(如bsflag) if prop_name in ['bsflag', 'dataSource', 'dataRegion', 'surveyType', 'siteType']: return True return False def _generate_with_llm(self, schema: Dict[str, Any], llm_service, context_name: str, operation_id: str) -> Any: """使用LLM生成数据""" # 构建包含字段描述的提示 prompt = self._build_llm_prompt(schema, context_name, operation_id) # 调用LLM服务 if hasattr(llm_service, 'generate_data_from_schema'): return llm_service.generate_data_from_schema( schema, prompt_instruction=prompt, max_tokens=512, temperature=0.1 ) else: # 如果LLM服务没有专门的方法,返回None让其回退到传统生成 return None def _build_llm_prompt(self, schema: Dict[str, Any], context_name: str, operation_id: str) -> str: """构建LLM提示,包含字段描述信息""" properties = schema.get('properties', {}) prompt = f"""请为以下JSON Schema生成合理的测试数据。 操作上下文: {operation_id or 'unknown'} 数据上下文: {context_name or 'unknown'} 字段说明: """ for prop_name, prop_schema in properties.items(): if isinstance(prop_schema, dict): prop_type = prop_schema.get('type', 'unknown') title = prop_schema.get('title', '') description = prop_schema.get('description', '') prompt += f"- {prop_name} ({prop_type})" if title: prompt += f" - {title}" if description: prompt += f": {description}" prompt += "\n" prompt += """ 请根据字段的描述信息生成合理的测试数据: 1. 严格遵守字段描述中的业务规则 2. 生成真实、有意义的测试数据 3. 对于有特定取值范围的字段,请选择合适的值 4. 日期字段使用合理的日期格式 5. 返回一个完整的JSON对象 请只返回JSON数据,不要包含其他说明文字。""" return prompt