208 lines
9.3 KiB
Python
208 lines
9.3 KiB
Python
"""
|
||
This module contains the DataGenerator class for creating test data from JSON schemas.
|
||
"""
|
||
import logging
|
||
import datetime
|
||
import uuid
|
||
from typing import Dict, Any, Optional, List
|
||
|
||
class DataGenerator:
|
||
"""
|
||
Generates test data based on a JSON Schema.
|
||
"""
|
||
def __init__(self, logger_param: Optional[logging.Logger] = None):
|
||
"""
|
||
Initializes the data generator.
|
||
Args:
|
||
logger_param: Optional logger instance. If not provided, a module-level logger is used.
|
||
"""
|
||
self.logger = logger_param or logging.getLogger(__name__)
|
||
|
||
def generate_data_from_schema(self, schema: Dict[str, Any],
|
||
context_name: Optional[str] = None,
|
||
operation_id: Optional[str] = None,
|
||
llm_service=None) -> Any:
|
||
"""
|
||
Generates test data from a JSON Schema.
|
||
This method was extracted and generalized from APITestOrchestrator.
|
||
|
||
Args:
|
||
schema: The JSON schema to generate data from.
|
||
context_name: A name for the context (e.g., 'requestBody'), for logging.
|
||
operation_id: The operation ID, for logging.
|
||
llm_service: Optional LLM service for intelligent data generation.
|
||
|
||
Returns:
|
||
Generated data that conforms to the schema.
|
||
"""
|
||
log_prefix = f"[{operation_id}] " if operation_id else ""
|
||
context_log = f" (context: {context_name})" if context_name else ""
|
||
|
||
if not schema or not isinstance(schema, dict):
|
||
self.logger.debug(f"{log_prefix}generate_data_from_schema: Invalid or empty schema provided{context_log}: {schema}")
|
||
return None
|
||
|
||
# Handle schema composition keywords
|
||
if 'oneOf' in schema or 'anyOf' in schema:
|
||
schemas_to_try = schema.get('oneOf') or schema.get('anyOf')
|
||
if schemas_to_try and isinstance(schemas_to_try, list) and schemas_to_try:
|
||
self.logger.debug(f"{log_prefix}Processing oneOf/anyOf, selecting the first schema for{context_log}")
|
||
return self.generate_data_from_schema(schemas_to_try[0], context_name, operation_id)
|
||
|
||
if 'allOf' in schema:
|
||
merged_schema = {}
|
||
for sub_schema in schema.get('allOf', []):
|
||
merged_schema.update(sub_schema)
|
||
self.logger.debug(f"{log_prefix}Processing allOf, merging schemas for{context_log}")
|
||
schema = merged_schema
|
||
|
||
# Use example or default values if available
|
||
if 'example' in schema:
|
||
self.logger.debug(f"{log_prefix}Using 'example' value from schema for{context_log}: {schema['example']}")
|
||
return schema['example']
|
||
if 'default' in schema:
|
||
self.logger.debug(f"{log_prefix}Using 'default' value from schema for{context_log}: {schema['default']}")
|
||
return schema['default']
|
||
|
||
schema_type = schema.get('type')
|
||
|
||
# Handle both 'object' and 'Object' (case-insensitive)
|
||
if schema_type and schema_type.lower() == 'object':
|
||
# 尝试使用LLM智能生成(如果可用且schema包含描述信息)
|
||
if llm_service and self._should_use_llm_for_schema(schema):
|
||
try:
|
||
llm_data = self._generate_with_llm(schema, llm_service, context_name, operation_id)
|
||
if llm_data is not None:
|
||
self.logger.debug(f"{log_prefix}LLM successfully generated data for{context_log}")
|
||
return llm_data
|
||
except Exception as e:
|
||
self.logger.debug(f"{log_prefix}LLM generation failed for{context_log}: {e}, falling back to traditional generation")
|
||
|
||
# 传统生成方式
|
||
result = {}
|
||
properties = schema.get('properties', {})
|
||
self.logger.debug(f"{log_prefix}Generating object data for{context_log}. Properties: {list(properties.keys())}")
|
||
for prop_name, prop_schema in properties.items():
|
||
nested_context = f"{context_name}.{prop_name}" if context_name else prop_name
|
||
result[prop_name] = self.generate_data_from_schema(prop_schema, nested_context, operation_id, llm_service)
|
||
|
||
additional_properties = schema.get('additionalProperties')
|
||
if isinstance(additional_properties, dict):
|
||
self.logger.debug(f"{log_prefix}Generating an example property for additionalProperties for{context_log}")
|
||
result['additionalProp1'] = self.generate_data_from_schema(additional_properties, f"{context_name}.additionalProp1", operation_id, llm_service)
|
||
return result
|
||
|
||
# Handle both 'array' and 'Array' (case-insensitive)
|
||
elif schema_type and schema_type.lower() == 'array':
|
||
items_schema = schema.get('items', {})
|
||
min_items = schema.get('minItems', 1)
|
||
self.logger.debug(f"{log_prefix}Generating array data for{context_log}. Items schema: {items_schema}, minItems: {min_items}")
|
||
|
||
num_items_to_generate = max(1, min_items)
|
||
generated_array = []
|
||
for i in range(num_items_to_generate):
|
||
item_context = f"{context_name}[{i}]" if context_name else f"array_item[{i}]"
|
||
generated_array.append(self.generate_data_from_schema(items_schema, item_context, operation_id))
|
||
return generated_array
|
||
|
||
# Handle both 'string' and 'String' (case-insensitive)
|
||
elif schema_type and schema_type.lower() == 'string':
|
||
string_format = schema.get('format', '')
|
||
if 'enum' in schema and schema['enum']: return schema['enum'][0]
|
||
if string_format == 'date': return datetime.date.today().isoformat()
|
||
if string_format == 'date-time': return datetime.datetime.now().isoformat()
|
||
if string_format == 'email': return 'test@example.com'
|
||
if string_format == 'uuid': return str(uuid.uuid4())
|
||
return 'example_string'
|
||
|
||
# Handle both 'number'/'Number' and 'integer'/'Integer' (case-insensitive)
|
||
elif schema_type and schema_type.lower() in ['number', 'integer']:
|
||
minimum = schema.get('minimum')
|
||
if minimum is not None: return minimum
|
||
return 0 if schema_type.lower() == 'integer' else 0.0
|
||
|
||
# Handle both 'boolean' and 'Boolean' (case-insensitive)
|
||
elif schema_type and schema_type.lower() == 'boolean':
|
||
return schema.get('default', False)
|
||
|
||
elif schema_type == 'null':
|
||
return None
|
||
|
||
self.logger.warning(f"{log_prefix}Unsupported schema type '{schema_type}' in {context_log}. Schema: {schema}")
|
||
return None
|
||
|
||
def _should_use_llm_for_schema(self, schema: Dict[str, Any]) -> bool:
|
||
"""判断是否应该使用LLM来生成数据"""
|
||
|
||
# 检查schema是否包含足够的描述信息来让LLM理解
|
||
properties = schema.get('properties', {})
|
||
|
||
# 如果有字段包含描述信息,就使用LLM
|
||
for prop_name, prop_schema in properties.items():
|
||
if isinstance(prop_schema, dict):
|
||
# 检查是否有描述信息
|
||
if prop_schema.get('description') or prop_schema.get('title'):
|
||
return True
|
||
|
||
# 检查是否有特殊的业务字段(如bsflag)
|
||
if prop_name in ['bsflag', 'dataSource', 'dataRegion', 'surveyType', 'siteType']:
|
||
return True
|
||
|
||
return False
|
||
|
||
def _generate_with_llm(self, schema: Dict[str, Any], llm_service, context_name: str, operation_id: str) -> Any:
|
||
"""使用LLM生成数据"""
|
||
|
||
# 构建包含字段描述的提示
|
||
prompt = self._build_llm_prompt(schema, context_name, operation_id)
|
||
|
||
# 调用LLM服务
|
||
if hasattr(llm_service, 'generate_data_from_schema'):
|
||
return llm_service.generate_data_from_schema(
|
||
schema,
|
||
prompt_instruction=prompt,
|
||
max_tokens=512,
|
||
temperature=0.1
|
||
)
|
||
else:
|
||
# 如果LLM服务没有专门的方法,返回None让其回退到传统生成
|
||
return None
|
||
|
||
def _build_llm_prompt(self, schema: Dict[str, Any], context_name: str, operation_id: str) -> str:
|
||
"""构建LLM提示,包含字段描述信息"""
|
||
|
||
properties = schema.get('properties', {})
|
||
|
||
prompt = f"""请为以下JSON Schema生成合理的测试数据。
|
||
|
||
操作上下文: {operation_id or 'unknown'}
|
||
数据上下文: {context_name or 'unknown'}
|
||
|
||
字段说明:
|
||
"""
|
||
|
||
for prop_name, prop_schema in properties.items():
|
||
if isinstance(prop_schema, dict):
|
||
prop_type = prop_schema.get('type', 'unknown')
|
||
title = prop_schema.get('title', '')
|
||
description = prop_schema.get('description', '')
|
||
|
||
prompt += f"- {prop_name} ({prop_type})"
|
||
if title:
|
||
prompt += f" - {title}"
|
||
if description:
|
||
prompt += f": {description}"
|
||
prompt += "\n"
|
||
|
||
prompt += """
|
||
请根据字段的描述信息生成合理的测试数据:
|
||
1. 严格遵守字段描述中的业务规则
|
||
2. 生成真实、有意义的测试数据
|
||
3. 对于有特定取值范围的字段,请选择合适的值
|
||
4. 日期字段使用合理的日期格式
|
||
5. 返回一个完整的JSON对象
|
||
|
||
请只返回JSON数据,不要包含其他说明文字。"""
|
||
|
||
return prompt
|