gongwenxin df90a5377f mvp
2025-06-16 14:49:49 +08:00

186 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from typing import Dict, Any, List, Optional
from ddms_compliance_suite.test_framework_core import BaseAPITestCase, ValidationResult, APIResponseContext, APIRequestContext, TestSeverity
from ddms_compliance_suite.utils.common_utils import is_camel_case
from ddms_compliance_suite.utils import schema_utils
class CoreNamingStructureTestCase(BaseAPITestCase):
id = "TC-RESTful-001"
name = "核心命名与结构规范检查"
description = "统一验证API的命名与结构是否遵循规范。包括1)模块名全小写且用中划线连接2)URL路径参数使用下划线命名法(snake_case)3)查询参数和请求体字段使用小驼峰命名法(camelCase)4)响应中的空数组为[]而非null5)数组类型数据被包裹在list字段中。"
severity = TestSeverity.HIGH
tags = ["normative", "restful", "structure", "naming-convention"]
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
super().__init__(endpoint_spec, global_api_spec, json_schema_validator, llm_service=llm_service)
def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]:
results = []
# 静态检查只分析API规范
self._check_module_name(results)
self._check_request_params_camel_case(results)
# 动态检查,需要分析实际响应
# 规则 "响应中的空数组为[]而非null" 和 "数组类型数据被包裹在list字段中" 需要在实际调用后进行
# 这里我们只对成功的响应进行检查
if 200 <= response_context.status_code < 300:
self._check_response_array_format(response_context, results)
else:
results.append(self.passed("跳过响应体检查:非成功状态码,不适用数组格式检查。"))
return results
def _check_module_name(self, results: List[ValidationResult]):
"""检查模块名是否为全小写且用中划线连接"""
path = self.endpoint_spec['path']
module_name_match = re.search(r'/api/([^/]+)/', path)
if module_name_match:
module_name = module_name_match.group(1)
# 模块名可以只是小写字母,但如果包含连接符,必须是中划线
is_valid = all(c.islower() or c.isdigit() or c == '-' for c in module_name) and '_' not in module_name
if is_valid:
results.append(self.passed(f"模块名 '{module_name}' 格式正确 (全小写/数字/中划线)。"))
else:
results.append(self.failed(f"模块名 '{module_name}' 格式不正确。应为全小写字母、数字和中划线的组合。",
details={'path': path, 'module': module_name}))
else:
results.append(self.failed(f"无法从路径 '{path}' 中提取模块名(格式应为 /api/module-name/...)。",
details={'path': path}))
def _check_request_params_camel_case(self, results: List[ValidationResult]):
"""检查请求参数命名规范:
- URL路径参数应使用下划线命名法snake_case
- 请求体和查询参数应使用小驼峰命名法camelCase
- HTTP头部有特殊规范不检查
"""
parameters = self.endpoint_spec.get('parameters', [])
# 定义HTTP头部例外不检查
header_exceptions = ['Authorization', 'X-Tenant-ID', 'X-Data-Domain', 'tenant-id', 'Content-Type']
for param in parameters:
param_name = param.get('name')
param_in = param.get('in')
# 跳过HTTP头部参数
if param_in == 'header' and param_name in header_exceptions:
continue
# 路径参数使用下划线命名法snake_case
if param_in == 'path':
# 下划线命名法:全小写字母、数字和下划线,不允许连续下划线,不能以下划线开头或结尾
is_valid_snake_case = re.match(r'^[a-z][a-z0-9_]*$', param_name) is not None and '__' not in param_name and not param_name.endswith('_')
if not is_valid_snake_case:
results.append(self.failed(f"路径参数 '{param_name}' 不符合下划线命名法snake_case规范。应为小写字母、数字和单下划线组合不能以下划线结尾。",
details={'parameter': param_name, 'location': param_in}))
# 查询参数使用小驼峰命名法camelCase
elif param_in == 'query':
if not is_camel_case(param_name):
results.append(self.failed(f"查询参数 '{param_name}' 不是小驼峰格式。",
details={'parameter': param_name, 'location': param_in}))
# 检查请求体
body_schema = self._get_resolved_request_body_schema()
if body_schema:
self._check_schema_properties_camel_case(body_schema, results)
def _check_schema_properties_camel_case(self, schema, results, path=""):
if not schema or not isinstance(schema, dict):
return
if 'properties' in schema:
for prop_name, prop_spec in schema['properties'].items():
if not is_camel_case(prop_name):
full_path = f"{path}.{prop_name}" if path else prop_name
results.append(self.failed(f"请求体字段 '{full_path}' 不是小驼峰格式。",
details={'field': full_path}))
prop_spec_resolved = self._get_resolved_schema(prop_spec)
if 'properties' in prop_spec_resolved or 'items' in prop_spec_resolved:
self._check_schema_properties_camel_case(prop_spec_resolved, results, f"{path}.{prop_name}" if path else prop_name)
def _check_response_array_format(self, response_context: APIResponseContext, results: List[ValidationResult]):
"""检查响应中的空数组和数组包裹"""
json_content = response_context.json_content
if json_content is None:
# 如果响应体为空,则跳过检查
results.append(self.passed("响应体为空,跳过数组格式检查。"))
return
# 检查 "数组类型数据被包裹在list字段中"
# 这条规则比较模糊这里理解为如果响应体是一个以数组为核心的列表那么这个数组的key应该是'list'
if isinstance(json_content, dict) and len(json_content) > 0:
list_keys = [k for k, v in json_content.items() if isinstance(v, list)]
if len(list_keys) == 1 and list_keys[0] != 'list':
results.append(self.failed(f"响应中包含一个主列表,但其键名 '{list_keys[0]}' 不是 'list'",
details={'keys': list(json_content.keys())}))
elif len(list_keys) > 1 and 'list' not in list_keys:
results.append(self.failed(f"响应中包含多个列表,但没有一个的键名是 'list'",
details={'keys': list(json_content.keys())}))
# 检查 "响应中的空数组为[]而非null"
# 查找匹配的响应定义时,需要兼容 status_code, status_code_family (e.g., 2XX), 和 default
responses = self.endpoint_spec.get('responses', {})
self.logger.info(f"responses: {responses}")
status_code = response_context.status_code
print("status_code: ", status_code)
status_code_str = str(status_code)
status_code_family = f"{status_code_str[0]}XX" # e.g., "2XX"
self.logger.info(f"检查响应定义: status_code={status_code}, 可用响应定义={list(responses.keys())}")
# 按优先级查找响应定义:精确状态码 > 状态码族 > 默认状态码200 > default
response_spec = None
print("responses: ", responses)
if status_code_str in responses:
response_spec = responses[status_code_str]
self.logger.info(f"找到精确状态码 {status_code_str} 的响应定义")
elif status_code_family in responses:
response_spec = responses[status_code_family]
self.logger.info(f"找到状态码族 {status_code_family} 的响应定义")
elif '200' in responses and 200 <= status_code < 300: # 对于2XX成功响应尝试使用200定义
response_spec = responses['200']
self.logger.info(f"未找到状态码 {status_code_str} 的响应定义使用默认成功状态码200的定义")
elif 'default' in responses:
response_spec = responses['default']
self.logger.info(f"使用default响应定义")
if not response_spec:
results.append(self.passed(f"规范中未找到响应码 {status_code} 或其类别({status_code_family}, default)的匹配定义跳过空数组与null的检查。"))
return
# 使用基类中定义好的工具函数获取响应schema
schema = self._get_resolved_response_schema(response_spec=response_spec)
if not schema:
results.append(self.passed(f"规范中响应码 {status_code} 的定义中未找到Schema跳过空数组与null的检查。"))
return
self._validate_null_for_array(json_content, schema, results, "")
def _validate_null_for_array(self, data: Any, schema: Dict[str, Any], results: List[ValidationResult], path: str):
if not schema:
return
schema = self._get_resolved_schema(schema)
if schema.get('type') == 'array' and data is None:
results.append(self.failed(f"响应中字段 '{path}' 的值为 null但其在规范中定义为数组应返回 []。", details={'field': path}))
return
if isinstance(data, dict) and 'properties' in schema:
for prop_name, prop_schema in schema['properties'].items():
if prop_name in data:
new_path = f"{path}.{prop_name}" if path else prop_name
self._validate_null_for_array(data[prop_name], prop_schema, results, new_path)
elif isinstance(data, list) and 'items' in schema:
item_schema = schema.get('items')
for i, item in enumerate(data):
new_path = f"{path}[{i}]"
self._validate_null_for_array(item, item_schema, results, new_path)