llm 通过json列表判断

This commit is contained in:
gongwenxin 2025-06-19 18:33:03 +08:00
parent d3333d4f90
commit ba175cb2ae
32 changed files with 12241 additions and 8427 deletions

View File

@ -1,4 +1,4 @@
文昕
RESTful接口设计要求
1.使用范围
本设计要求旨在基于标准的 RESTul 协议, 对部分内容的使用加以约束,指导 RESTu接口的设计人员设计出相对统

View File

@ -1,4 +1,4 @@
文昕
开发指南-后端
2024年7月4日创建
1.环境介绍

View File

@ -8,7 +8,7 @@ import string
class InvalidEnumValueCase(BaseAPITestCase):
id = "TC-ERROR-4006"
name = "Error Code 4006 - Invalid Enum Value Validation"
description = "测试当发送的参数值不在指定的枚举范围内时API是否按预期返回code=4006的错误状态码应为200"
description = "测试当发送的参数值不在指定的枚举范围内时API是否按预期返回code=-1的错误状态码应为200"
severity = TestSeverity.MEDIUM
tags = ["error-handling", "appendix-b", "4006", "invalid-enum"]
execution_order = 204 # 在数值越界之后执行
@ -133,7 +133,7 @@ class InvalidEnumValueCase(BaseAPITestCase):
return [self.passed("跳过测试:未找到具有明确枚举值限制的字段。")]
expected_http_status_code = 200
expected_business_error_code = "4006"
expected_business_error_code = -1
context_msg_prefix = (
f"{self.target_field_location} 字段 '{self.target_field_name}' "
@ -153,12 +153,12 @@ class InvalidEnumValueCase(BaseAPITestCase):
return [self.failed(f"{context_msg_prefix}API响应体不是一个有效的JSON对象。")]
actual_business_code = json_content.get("code")
if str(actual_business_code) != expected_business_error_code:
if actual_business_code != expected_business_error_code:
return [self.failed(
message=f"{context_msg_prefix}业务错误码应为 '{expected_business_error_code}',但实际为 '{actual_business_code}'",
message=f"{context_msg_prefix}业务错误码应为 {expected_business_error_code},但实际为 {actual_business_code}",
details={"expected_code": expected_business_error_code, "actual_code": actual_business_code, "response_body": json_content}
)]
return [self.passed(
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 '{expected_business_error_code}'"
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 {expected_business_error_code}"
)]

View File

@ -5,8 +5,8 @@ from ddms_compliance_suite.utils import schema_utils # Keep this import for util
class MissingRequiredFieldBodyCase(BaseAPITestCase):
id = "TC-ERROR-4003-BODY"
name = "Error Code 4003 - Missing Required Request Body Field Validation"
description = "测试当请求体中缺少API规范定义的必填字段时API是否按预期返回类似4003的错误。"
name = "Error Code -1 - Missing Required Request Body Field Validation"
description = "测试当请求体中缺少API规范定义的必填字段时API是否按预期返回code=-1的错误。"
severity = TestSeverity.HIGH
tags = ["error-handling", "appendix-b", "4003", "required-fields", "request-body"]
execution_order = 210
@ -61,7 +61,7 @@ class MissingRequiredFieldBodyCase(BaseAPITestCase):
return results
expected_http_status_code = 200
expected_business_error_code = "4003"
expected_business_error_code = -1
removed_field_str = '.'.join(map(str, self.removed_field_path))
context_msg_prefix = f"当移除必填请求体字段 '{removed_field_str}' 时, "
@ -79,12 +79,12 @@ class MissingRequiredFieldBodyCase(BaseAPITestCase):
return [self.failed(f"{context_msg_prefix}API响应体不是一个有效的JSON对象。")]
actual_business_code = json_content.get("code")
if str(actual_business_code) != expected_business_error_code:
if actual_business_code != expected_business_error_code:
return [self.failed(
message=f"{context_msg_prefix}业务错误码应为 '{expected_business_error_code}',但实际为 '{actual_business_code}'",
message=f"{context_msg_prefix}业务错误码应为 {expected_business_error_code},但实际为 {actual_business_code}",
details={"expected_code": expected_business_error_code, "actual_code": actual_business_code, "response_body": json_content}
)]
return [self.passed(
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 '{expected_business_error_code}'"
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 {expected_business_error_code}"
)]

View File

@ -4,8 +4,8 @@ import copy
class MissingRequiredFieldQueryCase(BaseAPITestCase):
id = "TC-ERROR-4003-QUERY"
name = "Error Code 4003 - Missing Required Query Parameter Validation"
description = "测试当请求中缺少API规范定义的必填查询参数时API是否按预期返回类似4003的错误。"
name = "Error Code -1 - Missing Required Query Parameter Validation"
description = "测试当请求中缺少API规范定义的必填查询参数时API是否按预期返回code=-1的错误。"
severity = TestSeverity.HIGH
tags = ["error-handling", "appendix-b", "4003", "required-fields", "query-parameters"]
execution_order = 211 # After body, before original combined one might have been
@ -52,7 +52,7 @@ class MissingRequiredFieldQueryCase(BaseAPITestCase):
return results
expected_http_status_code = 200
expected_business_error_code = "4003"
expected_business_error_code = -1
context_msg_prefix = f"当移除必填查询参数 '{self.target_param_name}' 时, "
@ -69,12 +69,12 @@ class MissingRequiredFieldQueryCase(BaseAPITestCase):
return [self.failed(f"{context_msg_prefix}API响应体不是一个有效的JSON对象。")]
actual_business_code = json_content.get("code")
if str(actual_business_code) != expected_business_error_code:
if actual_business_code != expected_business_error_code:
return [self.failed(
message=f"{context_msg_prefix}业务错误码应为 '{expected_business_error_code}',但实际为 '{actual_business_code}'",
message=f"{context_msg_prefix}业务错误码应为 {expected_business_error_code},但实际为 {actual_business_code}",
details={"expected_code": expected_business_error_code, "actual_code": actual_business_code, "response_body": json_content}
)]
return [self.passed(
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 '{expected_business_error_code}'"
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 {expected_business_error_code}"
)]

View File

@ -5,8 +5,8 @@ from ddms_compliance_suite.utils import schema_utils
class NumberOutOfRangeCase(BaseAPITestCase):
id = "TC-ERROR-4002"
name = "Error Code 4002 - Number Value Out of Range Validation"
description = "测试当发送的数值参数超出范围限制时API是否按预期返回code=4002的错误状态码应为200"
name = "Error Code -1 - Number Value Out of Range Validation"
description = "测试当发送的数值参数超出范围限制时API是否按预期返回code=-1的错误状态码应为200"
severity = TestSeverity.MEDIUM
tags = ["error-handling", "appendix-b", "4002", "out-of-range"]
execution_order = 203 # 在类型不匹配测试之后执行
@ -162,7 +162,7 @@ class NumberOutOfRangeCase(BaseAPITestCase):
return [self.passed("跳过测试:未找到具有明确范围限制的数值字段。")]
expected_http_status_code = 200
expected_business_error_code = "4002"
expected_business_error_code = -1
context_msg_prefix = (
f"{self.target_field_location} 字段 '{self.target_field_name}' "
@ -182,12 +182,12 @@ class NumberOutOfRangeCase(BaseAPITestCase):
return [self.failed(f"{context_msg_prefix}API响应体不是一个有效的JSON对象。")]
actual_business_code = json_content.get("code")
if str(actual_business_code) != expected_business_error_code:
if actual_business_code != expected_business_error_code:
return [self.failed(
message=f"{context_msg_prefix}业务错误码应为 '{expected_business_error_code}',但实际为 '{actual_business_code}'",
message=f"{context_msg_prefix}业务错误码应为 {expected_business_error_code},但实际为 {actual_business_code}",
details={"expected_code": expected_business_error_code, "actual_code": actual_business_code, "response_body": json_content}
)]
return [self.passed(
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 '{expected_business_error_code}'"
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 {expected_business_error_code}"
)]

View File

@ -6,8 +6,8 @@ from ddms_compliance_suite.utils import schema_utils
class TypeMismatchBodyCase(BaseAPITestCase):
id = "TC-ERROR-4001-BODY"
name = "Error Code 4001 - Request Body Type Mismatch Validation"
description = "测试当发送的请求体中字段的数据类型与API规范定义不符时API是否按预期返回类似4001的错误或通用400错误"
name = "Error Code -1 - Request Body Type Mismatch Validation"
description = "测试当发送的请求体中字段的数据类型与API规范定义不符时API是否按预期返回code=-1的错误"
severity = TestSeverity.MEDIUM
tags = ["error-handling", "appendix-b", "4001", "request-body"]
execution_order = 202 # Slightly after query param one
@ -88,7 +88,7 @@ class TypeMismatchBodyCase(BaseAPITestCase):
return [self.passed("跳过测试:在请求体中未找到合适的字段来测试类型不匹配。")]
expected_http_status_code = 200
expected_business_error_code = "4001"
expected_business_error_code = -1
field_path_str = '.'.join(map(str, self.target_field_path))
context_msg_prefix = f"当请求体字段 '{field_path_str}' 类型不匹配时, "
@ -106,12 +106,12 @@ class TypeMismatchBodyCase(BaseAPITestCase):
return [self.failed(f"{context_msg_prefix}API响应体不是一个有效的JSON对象。")]
actual_business_code = json_content.get("code")
if str(actual_business_code) != expected_business_error_code:
if actual_business_code != expected_business_error_code:
return [self.failed(
message=f"{context_msg_prefix}业务错误码应为 '{expected_business_error_code}',但实际为 '{actual_business_code}'",
message=f"{context_msg_prefix}业务错误码应为 {expected_business_error_code},但实际为 {actual_business_code}",
details={"expected_code": expected_business_error_code, "actual_code": actual_business_code, "response_body": json_content}
)]
return [self.passed(
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 '{expected_business_error_code}'"
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 {expected_business_error_code}"
)]

View File

@ -6,8 +6,8 @@ from ddms_compliance_suite.utils import schema_utils
class TypeMismatchQueryParamCase(BaseAPITestCase):
id = "TC-ERROR-4001-QUERY"
name = "Error Code 4001 - Query Parameter Type Mismatch Validation"
description = "测试当发送的查询参数数据类型与API规范定义不符时API是否按预期返回类似4001的错误或通用400错误"
name = "Error Code -1 - Query Parameter Type Mismatch Validation"
description = "测试当发送的查询参数数据类型与API规范定义不符时API是否按预期返回code=-1的错误"
severity = TestSeverity.MEDIUM
tags = ["error-handling", "appendix-b", "4001", "query-parameters"]
execution_order = 201 # Slightly after the combined one might have been
@ -85,7 +85,7 @@ class TypeMismatchQueryParamCase(BaseAPITestCase):
return [self.passed("跳过测试:在查询参数中未找到合适的字段来测试类型不匹配。")]
expected_http_status_code = 200
expected_business_error_code = "4001"
expected_business_error_code = -1
context_param_identifier = self.target_param_name or '.'.join(map(str, self.target_field_path))
context_msg_prefix = f"当查询参数 '{context_param_identifier}' (路径: '{'.'.join(map(str, self.target_field_path))}') 类型不匹配时, "
@ -103,14 +103,14 @@ class TypeMismatchQueryParamCase(BaseAPITestCase):
return [self.failed(f"{context_msg_prefix}API响应体不是一个有效的JSON对象。")]
actual_business_code = json_content.get("code")
if str(actual_business_code) != expected_business_error_code:
if actual_business_code != expected_business_error_code:
return [self.failed(
message=f"{context_msg_prefix}业务错误码应为 '{expected_business_error_code}',但实际为 '{actual_business_code}'",
message=f"{context_msg_prefix}业务错误码应为 {expected_business_error_code},但实际为 {actual_business_code}",
details={"expected_code": expected_business_error_code, "actual_code": actual_business_code, "response_body": json_content}
)]
return [self.passed(
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 '{expected_business_error_code}'"
f"{context_msg_prefix}API正确返回了状态码 {expected_http_status_code} 和业务错误码 {expected_business_error_code}"
)]
def generate_path_params(self, current_path_params: Dict[str, Any]) -> Dict[str, Any]:

View File

@ -0,0 +1,9 @@
[
"API接口应该遵循RESTful设计规范URL应使用名词而非动词",
"API响应格式应统一包含状态码、消息和数据三个字段",
"API应该妥善处理错误情况返回适当的错误代码和说明",
"API应该使用正确的HTTP方法GET用于检索POST用于创建PUT用于更新DELETE用于删除",
"API响应中的时间字段应符合ISO 8601标准格式",
"API路径结构应遵循'<前缀>/<专业领域>/v<版本号>/<资源类型>'格式",
"API应提供适当的缓存控制机制"
]

View File

@ -0,0 +1,5 @@
[
"API应该使用正确的HTTP方法GET用于检索POST用于创建PUT用于更新DELETE用于删除"
]

View File

@ -0,0 +1,108 @@
import os
import json
from typing import Dict, Any, Optional, List
from ddms_compliance_suite.test_framework_core import BaseAPITestCase, TestSeverity, ValidationResult, APIRequestContext, APIResponseContext
class LLMComplianceCheckTestCase(BaseAPITestCase):
id = "TC-LLM-COMPLIANCE-001"
name = "LLM合规性综合检查"
description = "读取固定的合规性标准列表将API所有关键信息url、headers、params、query、body、示例响应等发送给大模型让其判断是否通过并给出理由。"
severity = TestSeverity.MEDIUM
tags = ["llm", "compliance", "auto-eval"]
execution_order = 99
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
super().__init__(endpoint_spec, global_api_spec, json_schema_validator=json_schema_validator, llm_service=llm_service)
# 读取合规性标准
criteria_path = os.path.join(os.path.dirname(__file__), "compliance_criteria.json")
with open(criteria_path, "r", encoding="utf-8") as f:
self.compliance_criteria = json.load(f)
self.logger.info(f"已加载合规性标准: {self.compliance_criteria}")
def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]:
results = []
# 收集API所有关键信息
api_info = {
"method": request_context.method,
"url": request_context.url,
"path": self.endpoint_spec.get("path"),
"operationId": self.endpoint_spec.get("operationId"),
"headers": dict(request_context.headers) if hasattr(request_context, "headers") else {},
"query_params": getattr(request_context, "query_params", {}),
"path_params": getattr(request_context, "path_params", {}),
"body": getattr(request_context, "body", None),
"response_status": response_context.status_code,
"response_headers": dict(response_context.headers) if hasattr(response_context, "headers") else {},
"response_body": response_context.text_content if hasattr(response_context, "text_content") else None
}
# 日志打印所有API信息
self.logger.info("LLM合规性检查-API信息收集: " + json.dumps(api_info, ensure_ascii=False, indent=2))
self.logger.info("LLM合规性检查-标准: " + json.dumps(self.compliance_criteria, ensure_ascii=False, indent=2))
if not self.llm_service:
results.append(ValidationResult(
passed=True,
message="LLM服务不可用跳过本用例。",
details={"reason": "llm_service is None"}
))
return results
# 构建prompt
prompt = f"""
你是一位API合规性专家请根据以下合规性标准对给定的API调用信息进行逐条评估每条标准请给出是否通过true/false和理由
合规性标准:
{json.dumps(self.compliance_criteria, ensure_ascii=False, indent=2)}
API信息:
{json.dumps(api_info, ensure_ascii=False, indent=2)}
请以如下JSON格式输出
[
{{"criterion": "标准内容", "passed": true/false, "reason": "理由"}},
...
]
"""
messages = [
{"role": "system", "content": "你是一位API合规性专家输出必须是严格的JSON数组。"},
{"role": "user", "content": prompt}
]
self.logger.info("发送给LLM的prompt: " + prompt)
llm_response_str = self.llm_service._execute_chat_completion_request(
messages=messages,
max_tokens=2048,
temperature=0.2
)
if not llm_response_str:
results.append(ValidationResult(
passed=False,
message="未能从LLM获取响应。",
details={"prompt": prompt}
))
return results
self.logger.info(f"LLM原始响应: {llm_response_str}")
try:
cleaned = llm_response_str.strip()
if cleaned.startswith("```json"):
cleaned = cleaned[7:]
if cleaned.endswith("```"):
cleaned = cleaned[:-3]
llm_result = json.loads(cleaned)
if not isinstance(llm_result, list):
raise ValueError("LLM返回的不是JSON数组")
for item in llm_result:
criterion = item.get("criterion", "未知标准")
passed = item.get("passed", False)
reason = item.get("reason", "无理由")
results.append(ValidationResult(
passed=passed,
message=f"[{criterion}] {'通过' if passed else '不通过'}: {reason}",
details={"criterion": criterion, "llm_reason": reason}
))
except Exception as e:
results.append(ValidationResult(
passed=False,
message=f"LLM响应解析失败: {e}",
details={"raw_llm_response": llm_response_str}
))
return results

View File

@ -0,0 +1,139 @@
from typing import Dict, Any, List, Optional
from ddms_compliance_suite.test_framework_core import BaseAPITestCase, ValidationResult, APIResponseContext, APIRequestContext, TestSeverity
class ResponseSchemaFormatCheck(BaseAPITestCase):
"""
检查API响应的schema格式是否符合{"code":int or number or string,"message":"","data": any}的标准格式
"""
id = "TC-DMS-CORE-SCHEMA-001"
name = "DMS核心存储服务API响应格式检查"
description = "验证API响应的schema是否符合标准格式{'code':int or number or string, 'message':string, 'data': any}"
severity = TestSeverity.HIGH
tags = ["schema", "format", "dms-core", "response"]
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
super().__init__(endpoint_spec, global_api_spec, json_schema_validator, llm_service=llm_service)
def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]:
"""
仅验证API响应schema的格式不验证实际响应
"""
results = []
# 获取规范中的响应schema
status_codes_to_check = ["200", "201", "default"]
for status_code in status_codes_to_check:
response_schema = self._get_response_schema_for_status(status_code)
if response_schema:
validation_results = self._validate_response_schema_format(response_schema, status_code)
results.extend(validation_results)
# 找到一个有效的响应schema后就不再继续检查
break
# 如果没有找到任何响应schema记录为失败
if not results:
results.append(self.failed(
message="无法找到API响应的schema定义无法验证响应格式。",
details={"endpoint": self.endpoint_spec.get("path", "未知")}
))
return results
def _get_response_schema_for_status(self, status_code: str) -> Optional[Dict[str, Any]]:
"""获取指定状态码的响应schema"""
responses = self.endpoint_spec.get("responses", {})
if status_code not in responses:
return None
response_spec = responses[status_code]
return self._get_resolved_response_schema(response_spec, status_code)
def _validate_response_schema_format(self, schema: Dict[str, Any], status_code: str) -> List[ValidationResult]:
"""验证响应schema是否符合标准格式"""
results = []
# 如果schema为空则记录为失败
if not schema or not isinstance(schema, dict):
results.append(self.failed(
message=f"响应schema不是有效的对象: {schema}",
details={"status_code": status_code}
))
return results
# 解析schema确保它是一个已解析的schema
schema = self._get_resolved_schema(schema)
# 检查schema是否有properties
if "properties" not in schema:
results.append(self.failed(
message=f"响应schema中缺少'properties'定义",
details={"status_code": status_code, "schema": schema}
))
return results
properties = schema.get("properties", {})
required_fields = schema.get("required", [])
# 检查必须的字段: code, message, data
expected_fields = ["code", "message", "data"]
missing_fields = [field for field in expected_fields if field not in properties]
if missing_fields:
results.append(self.failed(
message=f"响应schema中缺少必要字段: {', '.join(missing_fields)}",
details={
"status_code": status_code,
"available_fields": list(properties.keys()),
"missing_fields": missing_fields
}
))
else:
# 检查字段类型
type_errors = []
# 检查code字段类型
code_schema = properties.get("code", {})
if not( code_schema.get("type") == "integer" or code_schema.get("type") == "number" or code_schema.get("type") == "string"):
type_errors.append(f"'code'字段应为integer类型实际为{code_schema.get('type')}")
# 检查message字段类型
message_schema = properties.get("message", {})
if message_schema.get("type") != "string":
type_errors.append(f"'message'字段应为string类型实际为{message_schema.get('type')}")
# # 检查data字段类型
data_schema = properties.get("data", {})
# if data_schema.get("type") != "object" and data_schema.get("type") is not None:
# type_errors.append(f"'data'字段应为object类型实际为{data_schema.get('type')}")
if type_errors:
results.append(self.failed(
message=f"响应schema中字段类型不符合要求: {'; '.join(type_errors)}",
details={
"status_code": status_code,
"code_schema": code_schema,
"message_schema": message_schema,
"data_schema": data_schema
}
))
# 检查必填字段
for field in expected_fields:
if field not in required_fields:
results.append(ValidationResult(
passed=True, # 作为警告而非错误
message=f"字段'{field}'在schema中未标记为必填(required)",
details={"status_code": status_code, "required_fields": required_fields}
))
# 如果没有错误,则记录为通过
if not any(not result.passed for result in results):
results.append(self.passed(
message="响应schema符合标准格式: {'code':int or number or string, 'message':string, 'data': any}",
details={"status_code": status_code}
))
return results

View File

@ -0,0 +1,128 @@
import re
from typing import Dict, Any, List, Optional
from ddms_compliance_suite.test_framework_core import BaseAPITestCase, ValidationResult, APIResponseContext, APIRequestContext, TestSeverity
class URLVersionCheckCase(BaseAPITestCase):
"""
检查API URL是否包含版本号如v1, api/v2, v3.0并以/api开头
"""
id = "TC-DMS-URL-VERSION-001"
name = "DMS API URL版本号检查"
description = "检查API URL是否包含标准格式的版本号支持的格式包括v1, api/v2, v3.0, version/1, 1.0等,并且路径需要以/api开头"
severity = TestSeverity.MEDIUM
tags = ["url", "version", "dms-core", "api-design"]
# 这个测试用例不需要发送实际请求
skip_execution = True
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
super().__init__(endpoint_spec, global_api_spec, json_schema_validator, llm_service=llm_service)
def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]:
"""
检查API URL是否以/api开头并包含版本号
"""
results = []
# 获取API路径
path = self.endpoint_spec.get('path', '')
if not path:
results.append(self.failed(
message="无法获取API路径",
details={"endpoint_spec_keys": list(self.endpoint_spec.keys())}
))
return results
# 检查是否是系统级API可能不需要遵循标准路径格式
is_system_api = re.match(r'^/(health|ping|status|metrics|system)(/|$)', path)
# 1. 检查路径是否以/api开头
starts_with_api = path.startswith('/api/')
if not starts_with_api and not is_system_api:
results.append(self.failed(
message=f"API路径 '{path}' 不是以'/api/'开头",
details={"full_path": path, "requirement": "路径必须以'/api/'开头"}
))
elif starts_with_api:
results.append(self.passed(
message=f"API路径 '{path}' 正确以'/api/'开头",
details={"full_path": path}
))
# 2. 检查路径中是否包含版本号
version_patterns = [
# 标准版本格式: /v1/, /v2/, /v3/ 等
r'/v\d+/',
# 带小数点的版本: /v1.0/, /v2.1/ 等
r'/v\d+\.\d+/',
# 使用 'version' 单词: /version/1/, /version/2/ 等
r'/version/\d+/',
# API前缀版本: /api/v1/, /api/v2/ 等
r'/api/v\d+/',
# 直接数字版本: /1/, /2/ (仅在特定位置)
r'/api/\d+/',
# 特殊格式: 如 /v1-beta/, /v2-alpha/ 等
r'/v\d+[\-_](alpha|beta|rc\d*)/',
# 年份版本: /2023/, /2024/ 等 (仅在特定位置)
r'/20\d{2}/',
]
# 检查是否包含版本号
matched_pattern = None
version_str = None
for pattern in version_patterns:
match = re.search(pattern, path)
if match:
matched_pattern = pattern
version_str = match.group(0).strip('/')
break
if matched_pattern and version_str:
results.append(self.passed(
message=f"API路径 '{path}' 包含版本标识: '{version_str}'",
details={
"pattern_matched": matched_pattern,
"version_string": version_str,
"full_path": path
}
))
else:
# 特殊情况检查是否是根API或系统级API可能不需要版本号
if is_system_api:
results.append(self.passed(
message=f"API路径 '{path}' 是系统级API不需要版本号",
details={"full_path": path, "api_type": "system"}
))
else:
results.append(self.failed(
message=f"API路径 '{path}' 不包含任何已知格式的版本标识",
details={
"full_path": path,
"supported_patterns": [p.replace('\\d+', 'N').replace('\\d{2}', 'NN') for p in version_patterns]
}
))
# 提供改进建议
# 确保建议路径始终以/api开头并包含版本号
base_path_parts = path.split('/')
base_path_parts = [p for p in base_path_parts if p] # 移除空字符串
if not starts_with_api:
# 如果不是以/api开头建议路径应该是/api/v1/原始路径
suggested_path = f"/api/v1/{'/'.join(base_path_parts)}"
else:
# 如果已经以/api开头但缺少版本号插入v1在api之后
suggested_path = "/api/v1"
if len(base_path_parts) > 1: # 有api后面的部分
suggested_path += f"/{'/'.join(base_path_parts[1:])}"
results.append(ValidationResult(
passed=False,
message=f"建议将路径修改为符合规范的格式,例如: '{suggested_path}'",
details={"original_path": path, "suggested_path": suggested_path}
))
return results

View File

@ -71,9 +71,4 @@ class TimeFormatCheckTestCase(BaseAPITestCase):
if 'properties' in prop_spec or 'allOf' in prop_spec or 'oneOf' in prop_spec or 'anyOf' in prop_spec:
self._check_schema_properties(prop_spec, results, current_path)
elif prop_spec.get('type') == 'array' and 'items' in prop_spec:
self._check_schema_properties(self._get_resolved_schema(prop_spec['items']), results, f"{current_path}[]")
def _get_resolved_schema(self, schema_or_ref):
if '$ref' in schema_or_ref:
return schema_utils.util_resolve_ref(schema_or_ref['$ref'], self.global_api_spec)
return schema_or_ref
self._check_schema_properties(self._get_resolved_schema(prop_spec['items']), results, f"{current_path}[]")

View File

@ -0,0 +1,141 @@
from ddms_compliance_suite.test_framework_core import BaseAPITestCase, ValidationResult, APIResponseContext, APIRequestContext, TestSeverity
from typing import Dict, Any, List, Optional
import re
class PaginationParamsCheckTestCase(BaseAPITestCase):
"""
检查API请求中是否包含标准分页参数pageNopageSize和isSearchCount
只有名称含有"查询"一类并且不含有"详情"一类的API才应用这个验证
"""
id = "TC-DMS-PAGINATION-001"
name = "分页参数检查"
description = "检查API请求参数中是否包含标准分页参数pageNo、pageSize和isSearchCount。只有名称含有'查询''列表'等并且不含有'详情'一类的API才应用此验证。"
severity = TestSeverity.MEDIUM
tags = ["pagination", "params", "backend-guide"]
# 这个测试用例不需要发送实际请求
skip_execution = True
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
super().__init__(endpoint_spec, global_api_spec, json_schema_validator, llm_service=llm_service)
# 定义需要验证的API名称关键词
self.include_keywords = ["查询", "列表", "分页", "page", "list", "query", "search", "find"]
# 定义排除的API名称关键词
self.exclude_keywords = ["详情", "明细", "detail", "info", "get", "查看"]
def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]:
"""
检查API请求中是否包含标准分页参数
"""
results = []
# 获取API路径和方法
path = self.endpoint_spec.get('path', '')
method = self.endpoint_spec.get('method', '').lower()
# 获取API的摘要和描述信息
summary = self.endpoint_spec.get('summary', '')
description = self.endpoint_spec.get('description', '')
operation_id = self.endpoint_spec.get('operationId', '')
# 组合所有可能包含API名称或功能描述的字段
api_description_text = f"{summary} {description} {operation_id} {path}".lower()
# 检查是否包含需要验证的关键词,且不包含排除的关键词
contains_include_keyword = any(keyword.lower() in api_description_text for keyword in self.include_keywords)
contains_exclude_keyword = any(keyword.lower() in api_description_text for keyword in self.exclude_keywords)
# 如果不满足准入规则,直接返回通过
if not contains_include_keyword or contains_exclude_keyword:
results.append(self.passed(
message=f"跳过检查API不符合分页参数检查的准入规则需包含'查询'/'列表'等关键词,且不包含'详情'等关键词)",
details={
"path": path,
"method": method.upper(),
"summary": summary,
"contains_include_keyword": contains_include_keyword,
"contains_exclude_keyword": contains_exclude_keyword
}
))
return results
# 如果是GET请求或可能返回列表的请求才进行检查
if method not in ['get', 'post']:
results.append(self.passed(
message=f"跳过检查:{method.upper()} 方法,不适用于分页参数检查"
))
return results
# 初始化检查结果
found_page_no = False
found_page_size = False
found_is_search_count = False
# 检查查询参数
parameters = self.endpoint_spec.get('parameters', [])
for param in parameters:
param_name = param.get('name', '')
param_in = param.get('in', '')
if param_in == 'query':
if param_name == 'pageNo':
found_page_no = True
elif param_name == 'pageSize':
found_page_size = True
elif param_name == 'isSearchCount':
found_is_search_count = True
# 检查请求体如果是POST请求
if method == 'post':
request_body = self.endpoint_spec.get('requestBody', {})
content = request_body.get('content', {})
for media_type, media_content in content.items():
if 'schema' in media_content:
schema = self._get_resolved_schema(media_content['schema'])
if 'properties' in schema:
properties = schema['properties']
# 检查请求体属性
if 'pageNo' in properties:
found_page_no = True
if 'pageSize' in properties:
found_page_size = True
if 'isSearchCount' in properties:
found_is_search_count = True
# 汇总检查结果
if found_page_no and found_page_size and found_is_search_count:
results.append(self.passed(
message=f"API请求包含所有标准分页参数pageNo、pageSize和isSearchCount",
details={"path": path, "method": method.upper()}
))
else:
# 计算缺失的参数
missing_params = []
if not found_page_no:
missing_params.append("pageNo")
if not found_page_size:
missing_params.append("pageSize")
if not found_is_search_count:
missing_params.append("isSearchCount")
if missing_params:
results.append(self.failed(
message=f"API请求缺少标准分页参数{', '.join(missing_params)}",
details={
"path": path,
"method": method.upper(),
"missing_params": missing_params,
"found_params": {
"pageNo": found_page_no,
"pageSize": found_page_size,
"isSearchCount": found_is_search_count
}
}
))
return results

File diff suppressed because one or more lines are too long

View File

@ -1,420 +0,0 @@
# API规范解析框架设计
## 背景与需求
当前系统需要处理多种API规范格式包括YAPI、Swagger/OpenAPI 2.0和OpenAPI 3.0等。目前的实现在`BaseAPITestCase`类中包含了针对不同格式的特定处理逻辑,这导致:
1. 代码重复和维护困难
2. 处理逻辑分散在多个地方
3. 添加新格式支持需要修改多处代码
4. 测试用例需要了解底层规范格式的细节
我们需要一个统一的解析框架将不同格式的API规范转换为一致的内部表示使测试用例能够以统一的方式访问API规范信息而不必关心原始格式的差异。
## 设计目标
1. **统一接口**提供一套统一的接口来访问API规范信息无论原始格式如何
2. **可扩展性**易于添加新的API规范格式支持
3. **完全解析**:在解析阶段处理所有的引用和格式特定的细节
4. **一致性**:确保不同格式的规范被转换为相同的内部表示
5. **性能优化**:减少重复解析和处理
## 架构设计
### 1. 核心组件
#### 1.1 统一解析器接口 (`APISpecParser`)
```python
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class APISpecParser(ABC):
"""API规范解析器的抽象基类"""
@abstractmethod
def parse(self, spec_content: Dict[str, Any]) -> Dict[str, Any]:
"""
解析API规范内容返回统一格式的内部表示
Args:
spec_content: 原始API规范内容
Returns:
统一格式的API规范内部表示
"""
pass
@abstractmethod
def detect_format(self, spec_content: Dict[str, Any]) -> str:
"""
检测API规范的格式
Args:
spec_content: 原始API规范内容
Returns:
规范格式的标识符,如 'yapi', 'openapi2', 'openapi3'
"""
pass
```
#### 1.2 格式特定的解析器实现
```python
class YAPIParser(APISpecParser):
"""YAPI格式解析器"""
def parse(self, spec_content: Dict[str, Any]) -> Dict[str, Any]:
# YAPI特定的解析逻辑
# 转换为统一的内部表示
pass
def detect_format(self, spec_content: Dict[str, Any]) -> str:
# 检测是否为YAPI格式
if self._is_yapi_format(spec_content):
return 'yapi'
return ''
def _is_yapi_format(self, spec_content: Dict[str, Any]) -> bool:
# 判断是否为YAPI格式的逻辑
pass
class OpenAPI2Parser(APISpecParser):
"""OpenAPI 2.0 (Swagger)格式解析器"""
def parse(self, spec_content: Dict[str, Any]) -> Dict[str, Any]:
# OpenAPI 2.0特定的解析逻辑
pass
def detect_format(self, spec_content: Dict[str, Any]) -> str:
# 检测是否为OpenAPI 2.0格式
if self._is_openapi2_format(spec_content):
return 'openapi2'
return ''
def _is_openapi2_format(self, spec_content: Dict[str, Any]) -> bool:
# 判断是否为OpenAPI 2.0格式的逻辑
pass
class OpenAPI3Parser(APISpecParser):
"""OpenAPI 3.0格式解析器"""
def parse(self, spec_content: Dict[str, Any]) -> Dict[str, Any]:
# OpenAPI 3.0特定的解析逻辑
pass
def detect_format(self, spec_content: Dict[str, Any]) -> str:
# 检测是否为OpenAPI 3.0格式
if self._is_openapi3_format(spec_content):
return 'openapi3'
return ''
def _is_openapi3_format(self, spec_content: Dict[str, Any]) -> bool:
# 判断是否为OpenAPI 3.0格式的逻辑
pass
```
#### 1.3 解析器工厂 (`APISpecParserFactory`)
```python
class APISpecParserFactory:
"""API规范解析器工厂用于创建适合特定规范格式的解析器"""
def __init__(self):
self.parsers = [
YAPIParser(),
OpenAPI2Parser(),
OpenAPI3Parser()
]
def get_parser(self, spec_content: Dict[str, Any]) -> Optional[APISpecParser]:
"""
根据规范内容自动选择合适的解析器
Args:
spec_content: 原始API规范内容
Returns:
适合处理该规范的解析器实例如果没有找到则返回None
"""
for parser in self.parsers:
format_type = parser.detect_format(spec_content)
if format_type:
return parser
return None
def register_parser(self, parser: APISpecParser):
"""
注册新的解析器
Args:
parser: 解析器实例
"""
self.parsers.append(parser)
```
#### 1.4 统一API规范管理器 (`UnifiedAPISpecManager`)
```python
class UnifiedAPISpecManager:
"""统一API规范管理器负责解析和提供统一的API规范访问接口"""
def __init__(self):
self.parser_factory = APISpecParserFactory()
self.cached_specs = {} # 缓存已解析的规范
def parse_spec(self, spec_content: Dict[str, Any], spec_id: str = None) -> Dict[str, Any]:
"""
解析API规范返回统一格式的内部表示
Args:
spec_content: 原始API规范内容
spec_id: 规范的唯一标识符,用于缓存
Returns:
统一格式的API规范内部表示
"""
if spec_id and spec_id in self.cached_specs:
return self.cached_specs[spec_id]
parser = self.parser_factory.get_parser(spec_content)
if not parser:
raise ValueError("无法识别的API规范格式")
parsed_spec = parser.parse(spec_content)
if spec_id:
self.cached_specs[spec_id] = parsed_spec
return parsed_spec
def register_custom_parser(self, parser: APISpecParser):
"""
注册自定义解析器
Args:
parser: 自定义解析器实例
"""
self.parser_factory.register_parser(parser)
```
### 2. 统一内部表示格式
所有解析器都应该将原始规范转换为一个统一的内部表示格式,该格式应该包含以下核心元素:
```python
{
"info": {
"title": "API标题",
"version": "API版本",
"description": "API描述"
},
"paths": {
"/path/to/resource": {
"get": {
"summary": "操作摘要",
"description": "操作描述",
"parameters": [...], # 统一格式的参数列表
"requestBody": {...}, # 统一格式的请求体定义
"responses": {
"200": {
"description": "成功响应",
"content": {
"application/json": {
"schema": {...} # 统一格式的响应schema
}
}
}
}
}
}
},
"components": {
"schemas": {...}, # 统一格式的schema定义
"parameters": {...}, # 统一格式的参数定义
"responses": {...} # 统一格式的响应定义
}
}
```
### 3. 解析过程中的标准化处理
在解析过程中,需要进行以下标准化处理:
1. **路径标准化**:确保所有路径格式一致
2. **参数标准化**:将不同格式的参数定义转换为统一格式
3. **响应标准化**根据HTTP方法添加适当的默认状态码
4. **Schema标准化**:解析所有的`$ref`引用
5. **数据类型标准化**:确保数据类型表示一致
### 4. 与测试框架的集成
#### 4.1 更新 `BaseAPITestCase`
```python
class BaseAPITestCase:
# ... 现有代码 ...
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
"""
初始化测试用例。
Args:
endpoint_spec: 当前被测API端点的详细定义 (已转换为统一格式)。
global_api_spec: 完整的API规范文档 (已转换为统一格式)。
json_schema_validator: APITestOrchestrator 传入的 JSONSchemaValidator 实例 (可选)。
llm_service: APITestOrchestrator 传入的 LLMService 实例 (可选)。
"""
self.endpoint_spec = endpoint_spec
self.global_api_spec = global_api_spec
self.logger = logging.getLogger(f"testcase.{self.id}")
self.json_schema_validator = json_schema_validator
self.llm_service = llm_service
self.logger.debug(f"Test case '{self.id}' initialized for endpoint: {self.endpoint_spec.get('method', '')} {self.endpoint_spec.get('path', '')}")
# 简化的方法,不再需要处理不同格式的差异
def _get_request_body_schema(self) -> Optional[Dict[str, Any]]:
"""获取请求体schema"""
request_body = self.endpoint_spec.get("requestBody", {})
return request_body.get("schema")
def _get_response_schema(self, status_code: str) -> Optional[Dict[str, Any]]:
"""获取指定状态码的响应schema"""
responses = self.endpoint_spec.get("responses", {})
response = responses.get(status_code)
if response:
return response.get("schema")
return None
```
#### 4.2 更新 `APITestOrchestrator`
```python
class APITestOrchestrator:
# ... 现有代码 ...
def __init__(self, config: Dict[str, Any]):
# ... 现有代码 ...
self.spec_manager = UnifiedAPISpecManager()
def load_api_spec(self, spec_file_path: str) -> Dict[str, Any]:
"""
加载并解析API规范文件
Args:
spec_file_path: API规范文件路径
Returns:
统一格式的API规范内部表示
"""
with open(spec_file_path, 'r', encoding='utf-8') as f:
spec_content = json.load(f)
return self.spec_manager.parse_spec(spec_content, spec_id=spec_file_path)
def execute_tests(self, api_spec: Dict[str, Any], base_url: str):
"""
执行测试用例
Args:
api_spec: 统一格式的API规范内部表示
base_url: API基础URL
"""
# ... 现有代码 ...
for path, path_item in api_spec["paths"].items():
for method, operation in path_item.items():
# 使用统一格式的operation创建测试用例
self._execute_tests_for_endpoint(operation, path, method, base_url)
```
## 实现计划
### 阶段1基础框架搭建
1. 创建核心接口和基类
- `APISpecParser` 抽象基类
- `APISpecParserFactory` 工厂类
- `UnifiedAPISpecManager` 管理器类
2. 实现格式检测逻辑
- 为YAPI、OpenAPI 2.0和OpenAPI 3.0实现格式检测方法
### 阶段2解析器实现
1. 实现YAPI解析器
- 分析YAPI特有的结构
- 实现转换为统一格式的逻辑
2. 实现OpenAPI 2.0解析器
- 分析Swagger特有的结构
- 实现转换为统一格式的逻辑
3. 实现OpenAPI 3.0解析器
- 分析OpenAPI 3.0特有的结构
- 实现转换为统一格式的逻辑
### 阶段3标准化处理
1. 实现路径标准化
2. 实现参数标准化
3. 实现响应标准化
4. 实现Schema标准化
5. 实现数据类型标准化
### 阶段4框架集成
1. 更新`BaseAPITestCase`
- 简化API规范访问方法
- 移除格式特定的处理逻辑
2. 更新`APITestOrchestrator`
- 集成`UnifiedAPISpecManager`
- 使用统一格式的API规范
3. 更新测试用例
- 修改现有测试用例以使用新的API
### 阶段5测试与验证
1. 编写单元测试
- 测试各个解析器
- 测试标准化处理
2. 编写集成测试
- 测试完整的解析流程
- 测试与测试框架的集成
3. 性能测试
- 测试解析大型API规范的性能
- 测试缓存机制的有效性
## 扩展性考虑
### 添加新格式支持
要添加对新的API规范格式的支持只需
1. 创建一个新的解析器类,继承自`APISpecParser`
2. 实现`parse``detect_format`方法
3. 通过`UnifiedAPISpecManager.register_custom_parser`注册新的解析器
### 自定义处理逻辑
对于特定的标准化需求,可以:
1. 创建专门的处理器类
2. 在解析过程中调用这些处理器
3. 通过配置控制处理器的行为
## 结论
通过实现这个统一的API规范解析框架我们可以
1. 使测试用例代码更加简洁、可维护
2. 轻松支持新的API规范格式
3. 确保所有测试用例使用一致的API规范表示
4. 提高解析性能和可靠性
这个框架将为DDMS合规性测试工具提供一个坚实的基础使其能够适应各种API规范格式并且易于扩展和维护。

View File

@ -36,8 +36,13 @@
| ID | 名称 | 描述 | 严重程度 |
| -------------------------------------- | -------------------------------- | ------------------------------------------------------------------- | -------- |
| TC-RESTful-001 | 核心命名与结构规范检查 | 统一验证API的命名与结构是否遵循规范。包括1)模块名全小写且用中划线连接2)URL路径参数使用下划线命名法(snake_case)3)查询参数和请求体字段使用小驼峰命名法(camelCase)4)响应中的空数组为[]而非null5)数组类型数据被包裹在list字段中。 | 高 |
| TC-RESTful-002 | 资源路径名词检查 | 验证API路径中是否使用名词而非动词来表示资源。 | 中 |
| TC-RESTful-003 | 时间字段ISO 8601格式检查 | 验证返回的时间字段是否遵循 YYYY-MM-DDTHH:MM:SS+08:00 的ISO 8601格式。此检查为静态检查验证规范中`string`类型且`format``date-time`的字段是否包含推荐的`pattern`。 | 中 |
| TC-NORMATIVE-URL-LLM-COMPREHENSIVE-001 | 合URL规范与RESTful风格检查 (LLM) | 使用LLM统一评估API路径是否符合命名、结构、版本和RESTful风格等规范。 | 中 |
| TC-NORMATIVE-001 | HTTP方法使用规范检查 | 验证API是否恰当使用HTTP方法例如GET用于检索POST用于创建。 | 中 |
| TC-DMS-CORE-SCHEMA-001 | DMS核心存储服务API响应格式检查 | 验证API响应的schema是否符合标准格式{'code':int\|string\|number, 'message':string, 'data':any} | 高 |
| TC-DMS-URL-VERSION-001 | DMS API URL版本号检查 | 检查API URL是否包含标准格式的版本号支持的格式包括v1, api/v2, v3.0, version/1, 1.0等 | 中 |
## 设置与环境检查 (Setup Checks)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff