import re from typing import Dict, Any, List, Optional from ddms_compliance_suite.test_framework_core import BaseAPITestCase, ValidationResult, APIResponseContext, APIRequestContext, TestSeverity from ddms_compliance_suite.utils.common_utils import is_camel_case from ddms_compliance_suite.utils import schema_utils class CoreNamingStructureTestCase(BaseAPITestCase): id = "TC-RESTful-001" name = "核心命名与结构规范检查" description = "统一验证API的命名与结构是否遵循规范。包括:1)模块名全小写且用中划线连接;2)URL路径参数使用下划线命名法(snake_case);3)查询参数和请求体字段使用小驼峰命名法(camelCase);4)响应中的空数组为[]而非null;5)数组类型数据被包裹在list字段中。" severity = TestSeverity.HIGH tags = ["normative", "restful", "structure", "naming-convention"] def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None): super().__init__(endpoint_spec, global_api_spec, json_schema_validator, llm_service=llm_service) def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]: results = [] # 静态检查,只分析API规范 self._check_module_name(results) self._check_request_params_camel_case(results) # 动态检查,需要分析实际响应 # 规则 "响应中的空数组为[]而非null" 和 "数组类型数据被包裹在list字段中" 需要在实际调用后进行 # 这里我们只对成功的响应进行检查 if 200 <= response_context.status_code < 300: self._check_response_array_format(response_context, results) else: results.append(self.passed("跳过响应体检查:非成功状态码,不适用数组格式检查。")) return results def _check_module_name(self, results: List[ValidationResult]): """检查模块名是否为全小写且用中划线连接""" path = self.endpoint_spec['path'] module_name_match = re.search(r'/api/([^/]+)/', path) if module_name_match: module_name = module_name_match.group(1) # 模块名可以只是小写字母,但如果包含连接符,必须是中划线 is_valid = all(c.islower() or c.isdigit() or c == '-' for c in module_name) and '_' not in module_name if is_valid: results.append(self.passed(f"模块名 '{module_name}' 格式正确 (全小写/数字/中划线)。")) else: results.append(self.failed(f"模块名 '{module_name}' 格式不正确。应为全小写字母、数字和中划线的组合。", details={'path': path, 'module': module_name})) else: results.append(self.failed(f"无法从路径 '{path}' 中提取模块名(格式应为 /api/module-name/...)。", details={'path': path})) def _check_request_params_camel_case(self, results: List[ValidationResult]): """检查请求参数命名规范: - URL路径参数应使用下划线命名法(snake_case) - 请求体和查询参数应使用小驼峰命名法(camelCase) - HTTP头部有特殊规范,不检查 """ parameters = self.endpoint_spec.get('parameters', []) # 定义HTTP头部例外(不检查) header_exceptions = ['Authorization', 'X-Tenant-ID', 'X-Data-Domain', 'tenant-id', 'Content-Type'] for param in parameters: param_name = param.get('name') param_in = param.get('in') # 跳过HTTP头部参数 if param_in == 'header' and param_name in header_exceptions: continue # 路径参数使用下划线命名法(snake_case) if param_in == 'path': # 下划线命名法:全小写字母、数字和下划线,不允许连续下划线,不能以下划线开头或结尾 is_valid_snake_case = re.match(r'^[a-z][a-z0-9_]*$', param_name) is not None and '__' not in param_name and not param_name.endswith('_') if not is_valid_snake_case: results.append(self.failed(f"路径参数 '{param_name}' 不符合下划线命名法(snake_case)规范。应为小写字母、数字和单下划线组合,不能以下划线结尾。", details={'parameter': param_name, 'location': param_in})) # 查询参数使用小驼峰命名法(camelCase) elif param_in == 'query': if not is_camel_case(param_name): results.append(self.failed(f"查询参数 '{param_name}' 不是小驼峰格式。", details={'parameter': param_name, 'location': param_in})) # 检查请求体 body_schema = self._get_resolved_request_body_schema() if body_schema: self._check_schema_properties_camel_case(body_schema, results) def _check_schema_properties_camel_case(self, schema, results, path=""): if not schema or not isinstance(schema, dict): return if 'properties' in schema: for prop_name, prop_spec in schema['properties'].items(): if not is_camel_case(prop_name): full_path = f"{path}.{prop_name}" if path else prop_name results.append(self.failed(f"请求体字段 '{full_path}' 不是小驼峰格式。", details={'field': full_path})) prop_spec_resolved = self._get_resolved_schema(prop_spec) if 'properties' in prop_spec_resolved or 'items' in prop_spec_resolved: self._check_schema_properties_camel_case(prop_spec_resolved, results, f"{path}.{prop_name}" if path else prop_name) def _check_response_array_format(self, response_context: APIResponseContext, results: List[ValidationResult]): """检查响应中的空数组和数组包裹""" json_content = response_context.json_content if json_content is None: # 如果响应体为空,则跳过检查 results.append(self.passed("响应体为空,跳过数组格式检查。")) return # 检查 "数组类型数据被包裹在list字段中" # 这条规则比较模糊,这里理解为:如果响应体是一个以数组为核心的列表,那么这个数组的key应该是'list' if isinstance(json_content, dict) and len(json_content) > 0: list_keys = [k for k, v in json_content.items() if isinstance(v, list)] if len(list_keys) == 1 and list_keys[0] != 'list': results.append(self.failed(f"响应中包含一个主列表,但其键名 '{list_keys[0]}' 不是 'list'。", details={'keys': list(json_content.keys())})) elif len(list_keys) > 1 and 'list' not in list_keys: results.append(self.failed(f"响应中包含多个列表,但没有一个的键名是 'list'。", details={'keys': list(json_content.keys())})) # 检查 "响应中的空数组为[]而非null" # 查找匹配的响应定义时,需要兼容 status_code, status_code_family (e.g., 2XX), 和 default responses = self.endpoint_spec.get('responses', {}) self.logger.info(f"responses: {responses}") status_code = response_context.status_code print("status_code: ", status_code) status_code_str = str(status_code) status_code_family = f"{status_code_str[0]}XX" # e.g., "2XX" self.logger.info(f"检查响应定义: status_code={status_code}, 可用响应定义={list(responses.keys())}") # 按优先级查找响应定义:精确状态码 > 状态码族 > 默认状态码200 > default response_spec = None print("responses: ", responses) if status_code_str in responses: response_spec = responses[status_code_str] self.logger.info(f"找到精确状态码 {status_code_str} 的响应定义") elif status_code_family in responses: response_spec = responses[status_code_family] self.logger.info(f"找到状态码族 {status_code_family} 的响应定义") elif '200' in responses and 200 <= status_code < 300: # 对于2XX成功响应,尝试使用200定义 response_spec = responses['200'] self.logger.info(f"未找到状态码 {status_code_str} 的响应定义,使用默认成功状态码200的定义") elif 'default' in responses: response_spec = responses['default'] self.logger.info(f"使用default响应定义") if not response_spec: results.append(self.passed(f"规范中未找到响应码 {status_code} 或其类别({status_code_family}, default)的匹配定义,跳过空数组与null的检查。")) return # 使用基类中定义好的工具函数获取响应schema schema = self._get_resolved_response_schema(response_spec=response_spec) if not schema: results.append(self.passed(f"规范中响应码 {status_code} 的定义中未找到Schema,跳过空数组与null的检查。")) return self._validate_null_for_array(json_content, schema, results, "") def _validate_null_for_array(self, data: Any, schema: Dict[str, Any], results: List[ValidationResult], path: str): if not schema: return schema = self._get_resolved_schema(schema) if schema.get('type') == 'array' and data is None: results.append(self.failed(f"响应中字段 '{path}' 的值为 null,但其在规范中定义为数组,应返回 []。", details={'field': path})) return if isinstance(data, dict) and 'properties' in schema: for prop_name, prop_schema in schema['properties'].items(): if prop_name in data: new_path = f"{path}.{prop_name}" if path else prop_name self._validate_null_for_array(data[prop_name], prop_schema, results, new_path) elif isinstance(data, list) and 'items' in schema: item_schema = schema.get('items') for i, item in enumerate(data): new_path = f"{path}[{i}]" self._validate_null_for_array(item, item_schema, results, new_path)