127 lines
5.9 KiB
Python
127 lines
5.9 KiB
Python
from ddms_compliance_suite.test_framework_core import BaseAPITestCase, TestSeverity, ValidationResult, APIRequestContext, APIResponseContext
|
|
import logging
|
|
import json
|
|
from typing import Dict, Any, Optional, List
|
|
|
|
class RequiredHeadersSchemaCheck(BaseAPITestCase):
|
|
"""验证API规范中是否包含必需的请求头"""
|
|
|
|
# 1. 元数据
|
|
id = "TC-HEADER-001"
|
|
name = "必需请求头Schema验证"
|
|
description = "验证API规范中是否包含必需的请求头(X-Tenant-ID、X-Data-Domain和Authorization)"
|
|
severity = TestSeverity.CRITICAL
|
|
tags = ["headers", "schema", "compliance"]
|
|
execution_order = 0 # 优先执行
|
|
# is_critical_setup_test = True
|
|
|
|
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
|
|
super().__init__(endpoint_spec, global_api_spec, json_schema_validator=json_schema_validator, llm_service=llm_service)
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
self.logger.info(f"测试用例 {self.id} ({self.name}) 已针对端点 '{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}' 初始化。")
|
|
|
|
# 定义必需的请求头和可能的命名变体
|
|
# self.required_headers = {
|
|
# 'X-Tenant-ID': ['X-Tenant-ID', 'tenant-id', 'X-TENANT-ID', 'TENANT-ID'],
|
|
# 'X-Data-Domain': ['X-Data-Domain', 'data-domain', 'X-DATA-DOMAIN', 'DATA-DOMAIN'],
|
|
# 'Authorization': ['Authorization', 'authorization', 'AUTHORIZATION']
|
|
# }
|
|
self.required_headers = {
|
|
'X-Tenant-ID': ['X-Tenant-ID'],
|
|
'X-Data-Domain': ['X-Data-Domain'],
|
|
'Authorization': ['Authorization']
|
|
}
|
|
|
|
def pre_request(self, request_context: APIRequestContext) -> APIRequestContext:
|
|
"""这个测试用例不需要发送实际请求"""
|
|
# 设置一个标志,表示不需要发送请求
|
|
self._skip_request = True
|
|
return request_context
|
|
|
|
def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]:
|
|
"""验证API规范中是否包含所有必需的请求头"""
|
|
results = []
|
|
|
|
# 从parameters数组中获取header类型的参数
|
|
parameters = self.endpoint_spec.get('parameters', [])
|
|
header_params = [p for p in parameters if p.get('in') == 'header']
|
|
|
|
# 记录调试信息
|
|
self.logger.info(f"API端点: {self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}")
|
|
self.logger.info(f"发现的header参数: {json.dumps(header_params, ensure_ascii=False)}")
|
|
|
|
# 记录找到的请求头,方便调试
|
|
found_headers = {}
|
|
|
|
# 检查每个必需的请求头
|
|
for header_key, possible_names in self.required_headers.items():
|
|
header_found = False
|
|
required_found = False
|
|
found_name = None
|
|
|
|
# 检查是否存在任何变体的请求头名称
|
|
for name in possible_names:
|
|
for header in header_params:
|
|
header_name = header.get('name', '')
|
|
if header_name.lower() == name.lower():
|
|
header_found = True
|
|
found_name = header_name
|
|
# 检查是否被标记为必需
|
|
if header.get('required') is True:
|
|
required_found = True
|
|
break
|
|
if header_found:
|
|
break
|
|
|
|
found_headers[header_key] = {
|
|
'found': header_found,
|
|
'required': required_found,
|
|
'name': found_name
|
|
}
|
|
|
|
# 记录检查结果
|
|
self.logger.info(f"检查请求头 {header_key}: 找到={header_found}, 必需={required_found}, 名称={found_name}")
|
|
|
|
# 根据检查结果添加验证结果
|
|
if not header_found:
|
|
results.append(
|
|
ValidationResult(
|
|
passed=False,
|
|
message=f"缺少必需的请求头 {header_key}",
|
|
details={
|
|
'header': header_key,
|
|
'possible_names': possible_names,
|
|
'endpoint': f"{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}"
|
|
}
|
|
)
|
|
)
|
|
self.logger.warning(f"规范验证失败: 缺少必需的请求头 {header_key}")
|
|
elif not required_found:
|
|
results.append(
|
|
ValidationResult(
|
|
passed=False,
|
|
message=f"请求头 {found_name} 存在但未标记为必需",
|
|
details={
|
|
'header': header_key,
|
|
'found_name': found_name,
|
|
'endpoint': f"{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}"
|
|
}
|
|
)
|
|
)
|
|
self.logger.warning(f"规范验证失败: 请求头 {found_name} 存在但未标记为必需")
|
|
|
|
# 如果没有失败的验证结果,添加一个成功的结果
|
|
if not [r for r in results if not r.passed]:
|
|
results.append(
|
|
ValidationResult(
|
|
passed=True,
|
|
message=f"所有必需的请求头都已正确定义",
|
|
details={
|
|
'found_headers': found_headers,
|
|
'endpoint': f"{self.endpoint_spec.get('method')} {self.endpoint_spec.get('path')}"
|
|
}
|
|
)
|
|
)
|
|
self.logger.info(f"规范验证通过: 所有必需的请求头都已正确定义")
|
|
|
|
return results |