gongwenxin fa343eb111 .
2025-08-07 15:07:38 +08:00

120 lines
5.8 KiB
Python

"""
This module contains the DataGenerator class for creating test data from JSON schemas.
"""
import logging
import datetime
import uuid
from typing import Dict, Any, Optional, List
class DataGenerator:
"""
Generates test data based on a JSON Schema.
"""
def __init__(self, logger_param: Optional[logging.Logger] = None):
"""
Initializes the data generator.
Args:
logger_param: Optional logger instance. If not provided, a module-level logger is used.
"""
self.logger = logger_param or logging.getLogger(__name__)
def generate_data_from_schema(self, schema: Dict[str, Any],
context_name: Optional[str] = None,
operation_id: Optional[str] = None) -> Any:
"""
Generates test data from a JSON Schema.
This method was extracted and generalized from APITestOrchestrator.
Args:
schema: The JSON schema to generate data from.
context_name: A name for the context (e.g., 'requestBody'), for logging.
operation_id: The operation ID, for logging.
Returns:
Generated data that conforms to the schema.
"""
log_prefix = f"[{operation_id}] " if operation_id else ""
context_log = f" (context: {context_name})" if context_name else ""
if not schema or not isinstance(schema, dict):
self.logger.debug(f"{log_prefix}generate_data_from_schema: Invalid or empty schema provided{context_log}: {schema}")
return None
# Handle schema composition keywords
if 'oneOf' in schema or 'anyOf' in schema:
schemas_to_try = schema.get('oneOf') or schema.get('anyOf')
if schemas_to_try and isinstance(schemas_to_try, list) and schemas_to_try:
self.logger.debug(f"{log_prefix}Processing oneOf/anyOf, selecting the first schema for{context_log}")
return self.generate_data_from_schema(schemas_to_try[0], context_name, operation_id)
if 'allOf' in schema:
merged_schema = {}
for sub_schema in schema.get('allOf', []):
merged_schema.update(sub_schema)
self.logger.debug(f"{log_prefix}Processing allOf, merging schemas for{context_log}")
schema = merged_schema
# Use example or default values if available
if 'example' in schema:
self.logger.debug(f"{log_prefix}Using 'example' value from schema for{context_log}: {schema['example']}")
return schema['example']
if 'default' in schema:
self.logger.debug(f"{log_prefix}Using 'default' value from schema for{context_log}: {schema['default']}")
return schema['default']
schema_type = schema.get('type')
# Handle both 'object' and 'Object' (case-insensitive)
if schema_type and schema_type.lower() == 'object':
result = {}
properties = schema.get('properties', {})
self.logger.debug(f"{log_prefix}Generating object data for{context_log}. Properties: {list(properties.keys())}")
for prop_name, prop_schema in properties.items():
nested_context = f"{context_name}.{prop_name}" if context_name else prop_name
result[prop_name] = self.generate_data_from_schema(prop_schema, nested_context, operation_id)
additional_properties = schema.get('additionalProperties')
if isinstance(additional_properties, dict):
self.logger.debug(f"{log_prefix}Generating an example property for additionalProperties for{context_log}")
result['additionalProp1'] = self.generate_data_from_schema(additional_properties, f"{context_name}.additionalProp1", operation_id)
return result
# Handle both 'array' and 'Array' (case-insensitive)
elif schema_type and schema_type.lower() == 'array':
items_schema = schema.get('items', {})
min_items = schema.get('minItems', 1)
self.logger.debug(f"{log_prefix}Generating array data for{context_log}. Items schema: {items_schema}, minItems: {min_items}")
num_items_to_generate = max(1, min_items)
generated_array = []
for i in range(num_items_to_generate):
item_context = f"{context_name}[{i}]" if context_name else f"array_item[{i}]"
generated_array.append(self.generate_data_from_schema(items_schema, item_context, operation_id))
return generated_array
# Handle both 'string' and 'String' (case-insensitive)
elif schema_type and schema_type.lower() == 'string':
string_format = schema.get('format', '')
if 'enum' in schema and schema['enum']: return schema['enum'][0]
if string_format == 'date': return datetime.date.today().isoformat()
if string_format == 'date-time': return datetime.datetime.now().isoformat()
if string_format == 'email': return 'test@example.com'
if string_format == 'uuid': return str(uuid.uuid4())
return 'example_string'
# Handle both 'number'/'Number' and 'integer'/'Integer' (case-insensitive)
elif schema_type and schema_type.lower() in ['number', 'integer']:
minimum = schema.get('minimum')
if minimum is not None: return minimum
return 0 if schema_type.lower() == 'integer' else 0.0
# Handle both 'boolean' and 'Boolean' (case-insensitive)
elif schema_type and schema_type.lower() == 'boolean':
return schema.get('default', False)
elif schema_type == 'null':
return None
self.logger.warning(f"{log_prefix}Unsupported schema type '{schema_type}' in {context_log}. Schema: {schema}")
return None