from ddms_compliance_suite.test_framework_core import BaseAPITestCase, TestSeverity, ValidationResult, APIRequestContext, APIResponseContext import logging import json from typing import Dict, Any, Optional, List class RequiredHeadersSchemaCheck(BaseAPITestCase): """验证API规范中是否包含必需的请求头""" # 1. 元数据 id = "TC-HEADER-001" name = "必需请求头Schema验证" description = "验证API规范中是否包含必需的请求头(X-Tenant-ID、X-Data-Domain和Authorization)" severity = TestSeverity.HIGH tags = ["headers", "schema", "compliance"] execution_order = 2 # 优先执行 # is_critical_setup_test = True def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None): super().__init__(endpoint_spec, global_api_spec, json_schema_validator=json_schema_validator, llm_service=llm_service) self.logger = logging.getLogger(self.__class__.__name__) self.logger.info(f"测试用例 {self.id} ({self.name}) 已针对端点 '{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}' 初始化。") # 定义必需的请求头和可能的命名变体 # self.required_headers = { # 'X-Tenant-ID': ['X-Tenant-ID', 'tenant-id', 'X-TENANT-ID', 'TENANT-ID'], # 'X-Data-Domain': ['X-Data-Domain', 'data-domain', 'X-DATA-DOMAIN', 'DATA-DOMAIN'], # 'Authorization': ['Authorization', 'authorization', 'AUTHORIZATION'] # } self.required_headers = { 'X-Tenant-ID': ['X-Tenant-ID'], 'X-Data-Domain': ['X-Data-Domain'], 'Authorization': ['Authorization'] } def pre_request(self, request_context: APIRequestContext) -> APIRequestContext: """这个测试用例不需要发送实际请求""" # 设置一个标志,表示不需要发送请求 self._skip_request = True return request_context def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]: """验证API规范中是否包含所有必需的请求头""" results = [] # 从parameters数组中获取header类型的参数 parameters = self.endpoint_spec.get('parameters', []) header_params = [p for p in parameters if p.get('in') == 'header'] # 记录调试信息 self.logger.info(f"API端点: {self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}") self.logger.info(f"发现的header参数: {json.dumps(header_params, ensure_ascii=False)}") # 记录找到的请求头,方便调试 found_headers = {} # 检查每个必需的请求头 for header_key, possible_names in self.required_headers.items(): header_found = False required_found = False found_name = None # 检查是否存在任何变体的请求头名称 for name in possible_names: for header in header_params: header_name = header.get('name', '') if header_name.lower() == name.lower(): header_found = True found_name = header_name # 检查是否被标记为必需 if header.get('required') is True: required_found = True break if header_found: break found_headers[header_key] = { 'found': header_found, 'required': required_found, 'name': found_name } # 记录检查结果 self.logger.info(f"检查请求头 {header_key}: 找到={header_found}, 必需={required_found}, 名称={found_name}") # 根据检查结果添加验证结果 if not header_found: results.append( ValidationResult( passed=False, message=f"缺少必需的请求头 {header_key}", details={ 'header': header_key, 'possible_names': possible_names, 'endpoint': f"{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}" } ) ) self.logger.warning(f"规范验证失败: 缺少必需的请求头 {header_key}") elif not required_found: results.append( ValidationResult( passed=False, message=f"请求头 {found_name} 存在但未标记为必需", details={ 'header': header_key, 'found_name': found_name, 'endpoint': f"{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}" } ) ) self.logger.warning(f"规范验证失败: 请求头 {found_name} 存在但未标记为必需") # 如果没有失败的验证结果,添加一个成功的结果 if not [r for r in results if not r.passed]: results.append( ValidationResult( passed=True, message=f"所有必需的请求头都已正确定义", details={ 'found_headers': found_headers, 'endpoint': f"{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}" } ) ) self.logger.info(f"规范验证通过: 所有必需的请求头都已正确定义") return results