from typing import Dict, Any, Optional, List, Union, Set import re import json import logging import os from ddms_compliance_suite.test_framework_core import BaseAPITestCase, TestSeverity, ValidationResult, APIRequestContext, APIResponseContext from ddms_compliance_suite.utils import schema_utils class EncryptedFieldsCheck(BaseAPITestCase): """ 检查API响应中的敏感字段是否已加密。 该测试用例允许用户定义一组需要加密的敏感字段名称,然后检查响应中这些字段是否为明文。 支持多种方式配置敏感字段列表: 1. 使用默认预定义的敏感字段集合 2. 通过API规范中的x-sensitive-fields扩展字段指定 3. 通过环境变量SENSITIVE_FIELDS指定(逗号分隔的字符串) """ # 元数据 id = "TC-SECURITY-002" name = "敏感字段加密检查" description = "验证API响应中的敏感字段是否已加密,而非明文。" severity = TestSeverity.HIGH tags = ["security", "encryption", "sensitive_data"] execution_order = 50 # 在基本状态码检查之后执行 # 默认需要检查的敏感字段名称集合 DEFAULT_SENSITIVE_FIELDS = { "coord","location","position" } # 加密字符串的正则表达式模式 # 这里假设加密字符串通常是base64或hex编码的字符串 ENCRYPTED_PATTERNS = [ # Base64编码模式 (标准base64字符集,长度通常是4的倍数且较长) r'^[A-Za-z0-9+/]{20,}={0,2}$', # Hex编码模式 (16进制字符,长度通常是偶数且较长) r'^[0-9a-fA-F]{16,}$', # JWT Token模式 (三段base64编码,用点分隔) r'^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$', # 通用哈希值模式 (固定长度的hex字符串,如MD5, SHA1, SHA256等) r'^[0-9a-fA-F]{32}$|^[0-9a-fA-F]{40}$|^[0-9a-fA-F]{64}$', # 带前缀的加密字符串 (如bcrypt, PBKDF2等) r'^\$2[ayb]\$.{56}$|^\$pbkdf2-sha\d+\$', ] # 明文检测模式 (如果匹配这些模式,则可能是明文) PLAINTEXT_PATTERNS = [ # 常见的明文密码模式 (字母数字特殊字符组合,长度通常在6-20之间) r'^[A-Za-z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,20}$', # 手机号模式 (中国手机号) r'^1[3-9]\d{9}$', # 身份证号模式 (中国身份证号) r'^\d{17}[\dXx]$|^\d{15}$', # 银行卡号模式 (数字,通常12-19位) r'^\d{12,19}$', # 邮箱模式 r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', ] def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None): super().__init__(endpoint_spec, global_api_spec, json_schema_validator, llm_service) # 获取敏感字段列表,优先级:API规范扩展字段 > 环境变量 > 默认值 self.sensitive_fields = self._get_sensitive_fields(endpoint_spec) self.logger.info(f"测试用例 {self.id} ({self.name}) 已初始化,将检查以下敏感字段是否加密: {self.sensitive_fields}") def _get_sensitive_fields(self, endpoint_spec: Dict[str, Any]) -> Set[str]: return self.DEFAULT_SENSITIVE_FIELDS """ 获取敏感字段列表,按以下优先级: 1. API规范中的x-sensitive-fields扩展字段 2. 环境变量SENSITIVE_FIELDS 3. 默认敏感字段列表DEFAULT_SENSITIVE_FIELDS Args: endpoint_spec: API端点规范 Returns: 敏感字段集合 """ custom_fields = set() # 1. 从API规范中的x-sensitive-fields扩展字段获取 if "x-sensitive-fields" in endpoint_spec: try: fields = endpoint_spec["x-sensitive-fields"] if isinstance(fields, list): custom_fields.update(fields) elif isinstance(fields, str): # 如果是JSON字符串,尝试解析 try: parsed_fields = json.loads(fields) if isinstance(parsed_fields, list): custom_fields.update(parsed_fields) elif isinstance(parsed_fields, dict) and "fields" in parsed_fields: # 支持 {"fields": ["field1", "field2"]} 格式 if isinstance(parsed_fields["fields"], list): custom_fields.update(parsed_fields["fields"]) except json.JSONDecodeError: # 如果不是JSON,假设是逗号分隔的字符串 custom_fields.update([f.strip() for f in fields.split(",")]) except Exception as e: self.logger.warning(f"解析API规范中的自定义敏感字段时出错: {e}") # 2. 如果API规范中没有定义,尝试从环境变量获取 if not custom_fields: env_fields = os.environ.get("SENSITIVE_FIELDS") if env_fields: try: # 尝试解析为JSON try: parsed_fields = json.loads(env_fields) if isinstance(parsed_fields, list): custom_fields.update(parsed_fields) except json.JSONDecodeError: # 如果不是JSON,假设是逗号分隔的字符串 custom_fields.update([f.strip() for f in env_fields.split(",")]) except Exception as e: self.logger.warning(f"解析环境变量中的自定义敏感字段时出错: {e}") # 3. 如果以上方式都没有获取到自定义字段,使用默认值 if not custom_fields: self.logger.info("未找到自定义敏感字段配置,使用默认敏感字段列表") return self.DEFAULT_SENSITIVE_FIELDS else: self.logger.info(f"使用自定义敏感字段列表: {custom_fields}") return custom_fields def is_likely_encrypted(self, value: str) -> bool: """ 判断一个字符串是否可能是加密的。 Args: value: 要检查的字符串值 Returns: 如果字符串可能是加密的,则返回True;否则返回False """ # 如果值太短,可能不是有效的加密值 if len(value) < 16: return False # 检查是否匹配任何加密模式 for pattern in self.ENCRYPTED_PATTERNS: if re.match(pattern, value): return True # 检查是否匹配任何明文模式 for pattern in self.PLAINTEXT_PATTERNS: if re.match(pattern, value): return False # 如果没有匹配任何模式,默认认为是加密的(保守策略) return True def find_sensitive_fields_in_response(self, response_data: Any, path: List[Union[str, int]] = None) -> Dict[str, List[Union[str, int]]]: """ 递归搜索响应数据中的敏感字段。 Args: response_data: 响应数据(可能是字典、列表或基本类型) path: 当前路径 Returns: 包含敏感字段路径的字典 {字段名: 路径} """ if path is None: path = [] sensitive_fields_found = {} self.logger.info(f"response_data:{response_data}") if isinstance(response_data, dict): for key, value in response_data.items(): current_path = path + [key] # 检查当前字段名是否包含任何敏感关键词,不再限制值必须是字符串类型 key_lower = key.lower() for sensitive_field in self.sensitive_fields: sensitive_field_lower = sensitive_field.lower() # 修改为包含匹配而不是严格匹配 if sensitive_field_lower in key_lower or key_lower in sensitive_field_lower: # 移除对值类型的检查,允许任何类型的敏感字段 sensitive_fields_found[key] = current_path self.logger.info(f"找到敏感字段: {key}, 匹配关键词: {sensitive_field}") break # 递归检查子字段 if isinstance(value, (dict, list)): nested_fields = self.find_sensitive_fields_in_response(value, current_path) sensitive_fields_found.update(nested_fields) elif isinstance(response_data, list): for i, item in enumerate(response_data): current_path = path + [i] if isinstance(item, (dict, list)): nested_fields = self.find_sensitive_fields_in_response(item, current_path) sensitive_fields_found.update(nested_fields) return sensitive_fields_found def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]: """ 验证响应中的敏感字段是否已加密。 Args: response_context: API响应上下文 request_context: API请求上下文 Returns: 验证结果列表 """ results = [] # 如果响应不是JSON格式,则跳过检查 if not response_context.json_content: results.append(self.passed("响应不是JSON格式,跳过敏感字段加密检查。")) return results self.logger.info(f"response_context.json_content: {response_context.json_content}") # 查找响应中的敏感字段 sensitive_fields_found = self.find_sensitive_fields_in_response(response_context.json_content) if not sensitive_fields_found: self.logger.info(f"未在响应中找到需要检查的敏感字段。") results.append(self.passed("未在响应中找到需要检查的敏感字段。")) return results # 检查每个敏感字段是否已加密 for field_name, field_path in sensitive_fields_found.items(): # 获取字段值 current_data = response_context.json_content try: for path_part in field_path: current_data = current_data[path_part] except (KeyError, IndexError, TypeError): self.logger.warning(f"无法访问路径 {field_path} 的值") continue self.logger.info(f"检查敏感字段: {field_name}, 路径: {field_path}, 值类型: {type(current_data)}, 值: {current_data}") # 根据字段值类型进行不同处理 if isinstance(current_data, str): # 字符串类型:检查是否已加密 if self.is_likely_encrypted(current_data): results.append(self.passed( f"敏感字段 '{'.'.join(map(str, field_path))}' 已正确加密。", {"field_path": field_path, "field_name": field_name} )) else: results.append(self.failed( f"敏感字段 '{'.'.join(map(str, field_path))}' 可能未加密,存在安全风险。", { "field_path": field_path, "field_name": field_name, "value_preview": current_data[:10] + "..." if len(current_data) > 10 else current_data } )) elif current_data is None: # 空值:跳过检查 results.append(self.passed( f"敏感字段 '{'.'.join(map(str, field_path))}' 的值为空,跳过加密检查。", {"field_path": field_path, "field_name": field_name} )) else: # 非字符串类型:警告可能存在安全风险 results.append(self.failed( f"敏感字段 '{'.'.join(map(str, field_path))}' 的值为非字符串类型 ({type(current_data).__name__}),可能存在安全风险。", { "field_path": field_path, "field_name": field_name, "field_type": type(current_data).__name__, "value": str(current_data) } )) return results @staticmethod def passed(message: str, details: Optional[Dict[str, Any]] = None) -> ValidationResult: """创建通过的验证结果""" return ValidationResult(passed=True, message=message, details=details) @staticmethod def failed(message: str, details: Optional[Dict[str, Any]] = None) -> ValidationResult: """创建失败的验证结果""" return ValidationResult(passed=False, message=message, details=details)