compliance/custom_testcases/setup_checks/required_headers_check.py
gongwenxin df90a5377f mvp
2025-06-16 14:49:49 +08:00

127 lines
5.9 KiB
Python

from ddms_compliance_suite.test_framework_core import BaseAPITestCase, TestSeverity, ValidationResult, APIRequestContext, APIResponseContext
import logging
import json
from typing import Dict, Any, Optional, List
class RequiredHeadersSchemaCheck(BaseAPITestCase):
"""验证API规范中是否包含必需的请求头"""
# 1. 元数据
id = "TC-HEADER-001"
name = "必需请求头Schema验证"
description = "验证API规范中是否包含必需的请求头(X-Tenant-ID、X-Data-Domain和Authorization)"
severity = TestSeverity.CRITICAL
tags = ["headers", "schema", "compliance"]
execution_order = 0 # 优先执行
# is_critical_setup_test = True
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
super().__init__(endpoint_spec, global_api_spec, json_schema_validator=json_schema_validator, llm_service=llm_service)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.info(f"测试用例 {self.id} ({self.name}) 已针对端点 '{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}' 初始化。")
# 定义必需的请求头和可能的命名变体
# self.required_headers = {
# 'X-Tenant-ID': ['X-Tenant-ID', 'tenant-id', 'X-TENANT-ID', 'TENANT-ID'],
# 'X-Data-Domain': ['X-Data-Domain', 'data-domain', 'X-DATA-DOMAIN', 'DATA-DOMAIN'],
# 'Authorization': ['Authorization', 'authorization', 'AUTHORIZATION']
# }
self.required_headers = {
'X-Tenant-ID': ['X-Tenant-ID'],
'X-Data-Domain': ['X-Data-Domain'],
'Authorization': ['Authorization']
}
def pre_request(self, request_context: APIRequestContext) -> APIRequestContext:
"""这个测试用例不需要发送实际请求"""
# 设置一个标志,表示不需要发送请求
self._skip_request = True
return request_context
def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]:
"""验证API规范中是否包含所有必需的请求头"""
results = []
# 从parameters数组中获取header类型的参数
parameters = self.endpoint_spec.get('parameters', [])
header_params = [p for p in parameters if p.get('in') == 'header']
# 记录调试信息
self.logger.info(f"API端点: {self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}")
self.logger.info(f"发现的header参数: {json.dumps(header_params, ensure_ascii=False)}")
# 记录找到的请求头,方便调试
found_headers = {}
# 检查每个必需的请求头
for header_key, possible_names in self.required_headers.items():
header_found = False
required_found = False
found_name = None
# 检查是否存在任何变体的请求头名称
for name in possible_names:
for header in header_params:
header_name = header.get('name', '')
if header_name.lower() == name.lower():
header_found = True
found_name = header_name
# 检查是否被标记为必需
if header.get('required') is True:
required_found = True
break
if header_found:
break
found_headers[header_key] = {
'found': header_found,
'required': required_found,
'name': found_name
}
# 记录检查结果
self.logger.info(f"检查请求头 {header_key}: 找到={header_found}, 必需={required_found}, 名称={found_name}")
# 根据检查结果添加验证结果
if not header_found:
results.append(
ValidationResult(
passed=False,
message=f"缺少必需的请求头 {header_key}",
details={
'header': header_key,
'possible_names': possible_names,
'endpoint': f"{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}"
}
)
)
self.logger.warning(f"规范验证失败: 缺少必需的请求头 {header_key}")
elif not required_found:
results.append(
ValidationResult(
passed=False,
message=f"请求头 {found_name} 存在但未标记为必需",
details={
'header': header_key,
'found_name': found_name,
'endpoint': f"{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}"
}
)
)
self.logger.warning(f"规范验证失败: 请求头 {found_name} 存在但未标记为必需")
# 如果没有失败的验证结果,添加一个成功的结果
if not [r for r in results if not r.passed]:
results.append(
ValidationResult(
passed=True,
message=f"所有必需的请求头都已正确定义",
details={
'found_headers': found_headers,
'endpoint': f"{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}"
}
)
)
self.logger.info(f"规范验证通过: 所有必需的请求头都已正确定义")
return results