146 lines
5.2 KiB
Python
146 lines
5.2 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
JSON Schema validator implementation for DDMS Compliance Suite.
|
|
|
|
Provides validators to check if data objects conform to defined JSON Schemas.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from typing import Dict, List, Any, Optional
|
|
import jsonschema
|
|
from jsonschema import ValidationError
|
|
|
|
from ddms_compliance_suite.models.rule_models import JSONSchemaDefinition
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ValidationResult:
|
|
"""Validation result container"""
|
|
|
|
def __init__(self, passed: bool, errors: List[str] = None, warnings: List[str] = None):
|
|
"""
|
|
Initialize a validation result
|
|
|
|
Args:
|
|
passed: Whether the data is valid according to the schema
|
|
errors: List of error messages (if any)
|
|
warnings: List of warning messages (if any)
|
|
"""
|
|
self.passed = passed
|
|
self.errors = errors or []
|
|
self.warnings = warnings or []
|
|
|
|
def __str__(self) -> str:
|
|
"""String representation of validation result"""
|
|
status = "Valid" if self.passed else "Invalid"
|
|
result = f"Validation Result: {status}\n"
|
|
|
|
if self.errors:
|
|
result += f"Errors ({len(self.errors)}):\n"
|
|
for i, error in enumerate(self.errors, 1):
|
|
result += f" {i}. {error}\n"
|
|
|
|
if self.warnings:
|
|
result += f"Warnings ({len(self.warnings)}):\n"
|
|
for i, warning in enumerate(self.warnings, 1):
|
|
result += f" {i}. {warning}\n"
|
|
|
|
return result
|
|
|
|
class JSONSchemaValidator:
|
|
"""JSON Schema validator implementation"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the JSON Schema validator"""
|
|
self.validator_cache = {}
|
|
|
|
def validate(self, data: Dict[str, Any], schema: Dict[str, Any]) -> ValidationResult:
|
|
"""
|
|
Validate data against a JSON schema
|
|
|
|
Args:
|
|
data: The data to validate
|
|
schema: The JSON schema to validate against
|
|
|
|
Returns:
|
|
ValidationResult: Result of the validation
|
|
"""
|
|
if not schema:
|
|
logger.error("Schema is empty or None")
|
|
return ValidationResult(False, ["Schema is not provided"])
|
|
|
|
if not data:
|
|
logger.error("Data is empty or None")
|
|
return ValidationResult(False, ["Data is not provided"])
|
|
|
|
try:
|
|
# Get or create validator
|
|
schema_str = json.dumps(schema, sort_keys=True)
|
|
if schema_str not in self.validator_cache:
|
|
self.validator_cache[schema_str] = jsonschema.Draft7Validator(schema)
|
|
|
|
validator = self.validator_cache[schema_str]
|
|
|
|
# Collect all validation errors
|
|
errors = list(validator.iter_errors(data))
|
|
|
|
if not errors:
|
|
return ValidationResult(True)
|
|
|
|
# Format error messages
|
|
error_messages = []
|
|
for error in errors:
|
|
path = ".".join(str(path_item) for path_item in error.path) if error.path else "root"
|
|
|
|
# Include the error type in the message to make it easier to identify
|
|
error_type = "unknown"
|
|
|
|
# Determine error type based on the validation error
|
|
if error.validator == 'required':
|
|
error_type = "required"
|
|
elif error.validator == 'pattern':
|
|
error_type = "pattern"
|
|
elif error.validator == 'enum':
|
|
error_type = "enum"
|
|
elif error.validator == 'type':
|
|
error_type = "type"
|
|
elif error.validator == 'format':
|
|
error_type = "format"
|
|
elif error.validator == 'minimum' or error.validator == 'maximum':
|
|
error_type = error.validator
|
|
elif error.validator == 'additionalProperties':
|
|
error_type = "additionalProperties"
|
|
else:
|
|
error_type = error.validator
|
|
|
|
error_messages.append(f"Error at {path}: {error_type} - {error.message}")
|
|
|
|
return ValidationResult(False, error_messages)
|
|
|
|
except Exception as e:
|
|
logger.exception("Schema validation failed with error")
|
|
return ValidationResult(False, [f"Validation error: {str(e)}"])
|
|
|
|
def validate_with_rule(self, data: Dict[str, Any], rule: JSONSchemaDefinition) -> ValidationResult:
|
|
"""
|
|
Validate data against a schema rule
|
|
|
|
Args:
|
|
data: The data to validate
|
|
rule: The JSON schema rule to validate against
|
|
|
|
Returns:
|
|
ValidationResult: Result of the validation
|
|
"""
|
|
if not rule:
|
|
logger.error("Rule is empty or None")
|
|
return ValidationResult(False, ["Rule is not provided"])
|
|
|
|
if not rule.schema_content:
|
|
logger.error(f"Rule {rule.id} does not have schema content")
|
|
return ValidationResult(False, [f"Rule {rule.id} does not have schema content"])
|
|
|
|
return self.validate(data, rule.schema_content) |