2025-05-26 15:38:37 +08:00

146 lines
5.2 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
JSON Schema validator implementation for DDMS Compliance Suite.
Provides validators to check if data objects conform to defined JSON Schemas.
"""
import json
import logging
from typing import Dict, List, Any, Optional
import jsonschema
from jsonschema import ValidationError
from ddms_compliance_suite.models.rule_models import JSONSchemaDefinition
logger = logging.getLogger(__name__)
class ValidationResult:
"""Validation result container"""
def __init__(self, passed: bool, errors: List[str] = None, warnings: List[str] = None):
"""
Initialize a validation result
Args:
passed: Whether the data is valid according to the schema
errors: List of error messages (if any)
warnings: List of warning messages (if any)
"""
self.passed = passed
self.errors = errors or []
self.warnings = warnings or []
def __str__(self) -> str:
"""String representation of validation result"""
status = "Valid" if self.passed else "Invalid"
result = f"Validation Result: {status}\n"
if self.errors:
result += f"Errors ({len(self.errors)}):\n"
for i, error in enumerate(self.errors, 1):
result += f" {i}. {error}\n"
if self.warnings:
result += f"Warnings ({len(self.warnings)}):\n"
for i, warning in enumerate(self.warnings, 1):
result += f" {i}. {warning}\n"
return result
class JSONSchemaValidator:
"""JSON Schema validator implementation"""
def __init__(self):
"""Initialize the JSON Schema validator"""
self.validator_cache = {}
def validate(self, data: Dict[str, Any], schema: Dict[str, Any]) -> ValidationResult:
"""
Validate data against a JSON schema
Args:
data: The data to validate
schema: The JSON schema to validate against
Returns:
ValidationResult: Result of the validation
"""
if not schema:
logger.error("Schema is empty or None")
return ValidationResult(False, ["Schema is not provided"])
if not data:
logger.error("Data is empty or None")
return ValidationResult(False, ["Data is not provided"])
try:
# Get or create validator
schema_str = json.dumps(schema, sort_keys=True)
if schema_str not in self.validator_cache:
self.validator_cache[schema_str] = jsonschema.Draft7Validator(schema)
validator = self.validator_cache[schema_str]
# Collect all validation errors
errors = list(validator.iter_errors(data))
if not errors:
return ValidationResult(True)
# Format error messages
error_messages = []
for error in errors:
path = ".".join(str(path_item) for path_item in error.path) if error.path else "root"
# Include the error type in the message to make it easier to identify
error_type = "unknown"
# Determine error type based on the validation error
if error.validator == 'required':
error_type = "required"
elif error.validator == 'pattern':
error_type = "pattern"
elif error.validator == 'enum':
error_type = "enum"
elif error.validator == 'type':
error_type = "type"
elif error.validator == 'format':
error_type = "format"
elif error.validator == 'minimum' or error.validator == 'maximum':
error_type = error.validator
elif error.validator == 'additionalProperties':
error_type = "additionalProperties"
else:
error_type = error.validator
error_messages.append(f"Error at {path}: {error_type} - {error.message}")
return ValidationResult(False, error_messages)
except Exception as e:
logger.exception("Schema validation failed with error")
return ValidationResult(False, [f"Validation error: {str(e)}"])
def validate_with_rule(self, data: Dict[str, Any], rule: JSONSchemaDefinition) -> ValidationResult:
"""
Validate data against a schema rule
Args:
data: The data to validate
rule: The JSON schema rule to validate against
Returns:
ValidationResult: Result of the validation
"""
if not rule:
logger.error("Rule is empty or None")
return ValidationResult(False, ["Rule is not provided"])
if not rule.schema_content:
logger.error(f"Rule {rule.id} does not have schema content")
return ValidationResult(False, [f"Rule {rule.id} does not have schema content"])
return self.validate(data, rule.schema_content)