""" This module contains the DataGenerator class for creating test data from JSON schemas. """ import logging import datetime import uuid from typing import Dict, Any, Optional, List class DataGenerator: """ Generates test data based on a JSON Schema. """ def __init__(self, logger_param: Optional[logging.Logger] = None): """ Initializes the data generator. Args: logger_param: Optional logger instance. If not provided, a module-level logger is used. """ self.logger = logger_param or logging.getLogger(__name__) def generate_data_from_schema(self, schema: Dict[str, Any], context_name: Optional[str] = None, operation_id: Optional[str] = None) -> Any: """ Generates test data from a JSON Schema. This method was extracted and generalized from APITestOrchestrator. Args: schema: The JSON schema to generate data from. context_name: A name for the context (e.g., 'requestBody'), for logging. operation_id: The operation ID, for logging. Returns: Generated data that conforms to the schema. """ log_prefix = f"[{operation_id}] " if operation_id else "" context_log = f" (context: {context_name})" if context_name else "" if not schema or not isinstance(schema, dict): self.logger.debug(f"{log_prefix}generate_data_from_schema: Invalid or empty schema provided{context_log}: {schema}") return None # Handle schema composition keywords if 'oneOf' in schema or 'anyOf' in schema: schemas_to_try = schema.get('oneOf') or schema.get('anyOf') if schemas_to_try and isinstance(schemas_to_try, list) and schemas_to_try: self.logger.debug(f"{log_prefix}Processing oneOf/anyOf, selecting the first schema for{context_log}") return self.generate_data_from_schema(schemas_to_try[0], context_name, operation_id) if 'allOf' in schema: merged_schema = {} for sub_schema in schema.get('allOf', []): merged_schema.update(sub_schema) self.logger.debug(f"{log_prefix}Processing allOf, merging schemas for{context_log}") schema = merged_schema # Use example or default values if available if 'example' in schema: self.logger.debug(f"{log_prefix}Using 'example' value from schema for{context_log}: {schema['example']}") return schema['example'] if 'default' in schema: self.logger.debug(f"{log_prefix}Using 'default' value from schema for{context_log}: {schema['default']}") return schema['default'] schema_type = schema.get('type') if schema_type == 'object': result = {} properties = schema.get('properties', {}) self.logger.debug(f"{log_prefix}Generating object data for{context_log}. Properties: {list(properties.keys())}") for prop_name, prop_schema in properties.items(): nested_context = f"{context_name}.{prop_name}" if context_name else prop_name result[prop_name] = self.generate_data_from_schema(prop_schema, nested_context, operation_id) additional_properties = schema.get('additionalProperties') if isinstance(additional_properties, dict): self.logger.debug(f"{log_prefix}Generating an example property for additionalProperties for{context_log}") result['additionalProp1'] = self.generate_data_from_schema(additional_properties, f"{context_name}.additionalProp1", operation_id) return result elif schema_type == 'array': items_schema = schema.get('items', {}) min_items = schema.get('minItems', 1) self.logger.debug(f"{log_prefix}Generating array data for{context_log}. Items schema: {items_schema}, minItems: {min_items}") num_items_to_generate = max(1, min_items) generated_array = [] for i in range(num_items_to_generate): item_context = f"{context_name}[{i}]" if context_name else f"array_item[{i}]" generated_array.append(self.generate_data_from_schema(items_schema, item_context, operation_id)) return generated_array elif schema_type == 'string': string_format = schema.get('format', '') if 'enum' in schema and schema['enum']: return schema['enum'][0] if string_format == 'date': return datetime.date.today().isoformat() if string_format == 'date-time': return datetime.datetime.now().isoformat() if string_format == 'email': return 'test@example.com' if string_format == 'uuid': return str(uuid.uuid4()) return 'example_string' elif schema_type in ['number', 'integer']: minimum = schema.get('minimum') if minimum is not None: return minimum return 0 if schema_type == 'integer' else 0.0 elif schema_type == 'boolean': return schema.get('default', False) elif schema_type == 'null': return None self.logger.warning(f"{log_prefix}Unsupported schema type '{schema_type}' in {context_log}. Schema: {schema}") return None