286 lines
13 KiB
Python
286 lines
13 KiB
Python
from typing import Dict, Any, Optional, List, Union, Set
|
||
import re
|
||
import json
|
||
import logging
|
||
import os
|
||
from ddms_compliance_suite.test_framework_core import BaseAPITestCase, TestSeverity, ValidationResult, APIRequestContext, APIResponseContext
|
||
from ddms_compliance_suite.utils import schema_utils
|
||
|
||
class EncryptedFieldsCheck(BaseAPITestCase):
|
||
"""
|
||
检查API响应中的敏感字段是否已加密。
|
||
该测试用例允许用户定义一组需要加密的敏感字段名称,然后检查响应中这些字段是否为明文。
|
||
|
||
支持多种方式配置敏感字段列表:
|
||
1. 使用默认预定义的敏感字段集合
|
||
2. 通过API规范中的x-sensitive-fields扩展字段指定
|
||
3. 通过环境变量SENSITIVE_FIELDS指定(逗号分隔的字符串)
|
||
"""
|
||
# 元数据
|
||
id = "TC-SECURITY-002"
|
||
name = "敏感字段加密检查"
|
||
description = "验证API响应中的敏感字段是否已加密,而非明文。"
|
||
severity = TestSeverity.HIGH
|
||
tags = ["security", "encryption", "sensitive_data"]
|
||
execution_order = 50 # 在基本状态码检查之后执行
|
||
|
||
# 默认需要检查的敏感字段名称集合
|
||
DEFAULT_SENSITIVE_FIELDS = {
|
||
"coord","location","position"
|
||
}
|
||
|
||
# 加密字符串的正则表达式模式
|
||
# 这里假设加密字符串通常是base64或hex编码的字符串
|
||
ENCRYPTED_PATTERNS = [
|
||
# Base64编码模式 (标准base64字符集,长度通常是4的倍数且较长)
|
||
r'^[A-Za-z0-9+/]{20,}={0,2}$',
|
||
# Hex编码模式 (16进制字符,长度通常是偶数且较长)
|
||
r'^[0-9a-fA-F]{16,}$',
|
||
# JWT Token模式 (三段base64编码,用点分隔)
|
||
r'^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$',
|
||
# 通用哈希值模式 (固定长度的hex字符串,如MD5, SHA1, SHA256等)
|
||
r'^[0-9a-fA-F]{32}$|^[0-9a-fA-F]{40}$|^[0-9a-fA-F]{64}$',
|
||
# 带前缀的加密字符串 (如bcrypt, PBKDF2等)
|
||
r'^\$2[ayb]\$.{56}$|^\$pbkdf2-sha\d+\$',
|
||
]
|
||
|
||
# 明文检测模式 (如果匹配这些模式,则可能是明文)
|
||
PLAINTEXT_PATTERNS = [
|
||
# 常见的明文密码模式 (字母数字特殊字符组合,长度通常在6-20之间)
|
||
r'^[A-Za-z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,20}$',
|
||
# 手机号模式 (中国手机号)
|
||
r'^1[3-9]\d{9}$',
|
||
# 身份证号模式 (中国身份证号)
|
||
r'^\d{17}[\dXx]$|^\d{15}$',
|
||
# 银行卡号模式 (数字,通常12-19位)
|
||
r'^\d{12,19}$',
|
||
# 邮箱模式
|
||
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
|
||
]
|
||
|
||
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
|
||
super().__init__(endpoint_spec, global_api_spec, json_schema_validator, llm_service)
|
||
|
||
# 获取敏感字段列表,优先级:API规范扩展字段 > 环境变量 > 默认值
|
||
self.sensitive_fields = self._get_sensitive_fields(endpoint_spec)
|
||
self.logger.info(f"测试用例 {self.id} ({self.name}) 已初始化,将检查以下敏感字段是否加密: {self.sensitive_fields}")
|
||
|
||
def _get_sensitive_fields(self, endpoint_spec: Dict[str, Any]) -> Set[str]:
|
||
return self.DEFAULT_SENSITIVE_FIELDS
|
||
"""
|
||
获取敏感字段列表,按以下优先级:
|
||
1. API规范中的x-sensitive-fields扩展字段
|
||
2. 环境变量SENSITIVE_FIELDS
|
||
3. 默认敏感字段列表DEFAULT_SENSITIVE_FIELDS
|
||
|
||
Args:
|
||
endpoint_spec: API端点规范
|
||
|
||
Returns:
|
||
敏感字段集合
|
||
"""
|
||
custom_fields = set()
|
||
|
||
# 1. 从API规范中的x-sensitive-fields扩展字段获取
|
||
if "x-sensitive-fields" in endpoint_spec:
|
||
try:
|
||
fields = endpoint_spec["x-sensitive-fields"]
|
||
if isinstance(fields, list):
|
||
custom_fields.update(fields)
|
||
elif isinstance(fields, str):
|
||
# 如果是JSON字符串,尝试解析
|
||
try:
|
||
parsed_fields = json.loads(fields)
|
||
if isinstance(parsed_fields, list):
|
||
custom_fields.update(parsed_fields)
|
||
elif isinstance(parsed_fields, dict) and "fields" in parsed_fields:
|
||
# 支持 {"fields": ["field1", "field2"]} 格式
|
||
if isinstance(parsed_fields["fields"], list):
|
||
custom_fields.update(parsed_fields["fields"])
|
||
except json.JSONDecodeError:
|
||
# 如果不是JSON,假设是逗号分隔的字符串
|
||
custom_fields.update([f.strip() for f in fields.split(",")])
|
||
except Exception as e:
|
||
self.logger.warning(f"解析API规范中的自定义敏感字段时出错: {e}")
|
||
|
||
# 2. 如果API规范中没有定义,尝试从环境变量获取
|
||
if not custom_fields:
|
||
env_fields = os.environ.get("SENSITIVE_FIELDS")
|
||
if env_fields:
|
||
try:
|
||
# 尝试解析为JSON
|
||
try:
|
||
parsed_fields = json.loads(env_fields)
|
||
if isinstance(parsed_fields, list):
|
||
custom_fields.update(parsed_fields)
|
||
except json.JSONDecodeError:
|
||
# 如果不是JSON,假设是逗号分隔的字符串
|
||
custom_fields.update([f.strip() for f in env_fields.split(",")])
|
||
except Exception as e:
|
||
self.logger.warning(f"解析环境变量中的自定义敏感字段时出错: {e}")
|
||
|
||
# 3. 如果以上方式都没有获取到自定义字段,使用默认值
|
||
if not custom_fields:
|
||
self.logger.info("未找到自定义敏感字段配置,使用默认敏感字段列表")
|
||
return self.DEFAULT_SENSITIVE_FIELDS
|
||
else:
|
||
self.logger.info(f"使用自定义敏感字段列表: {custom_fields}")
|
||
return custom_fields
|
||
|
||
def is_likely_encrypted(self, value: str) -> bool:
|
||
"""
|
||
判断一个字符串是否可能是加密的。
|
||
|
||
Args:
|
||
value: 要检查的字符串值
|
||
|
||
Returns:
|
||
如果字符串可能是加密的,则返回True;否则返回False
|
||
"""
|
||
# 如果值太短,可能不是有效的加密值
|
||
if len(value) < 16:
|
||
return False
|
||
|
||
# 检查是否匹配任何加密模式
|
||
for pattern in self.ENCRYPTED_PATTERNS:
|
||
if re.match(pattern, value):
|
||
return True
|
||
|
||
# 检查是否匹配任何明文模式
|
||
for pattern in self.PLAINTEXT_PATTERNS:
|
||
if re.match(pattern, value):
|
||
return False
|
||
|
||
# 如果没有匹配任何模式,默认认为是加密的(保守策略)
|
||
return True
|
||
|
||
def find_sensitive_fields_in_response(self, response_data: Any, path: List[Union[str, int]] = None) -> Dict[str, List[Union[str, int]]]:
|
||
"""
|
||
递归搜索响应数据中的敏感字段。
|
||
|
||
Args:
|
||
response_data: 响应数据(可能是字典、列表或基本类型)
|
||
path: 当前路径
|
||
|
||
Returns:
|
||
包含敏感字段路径的字典 {字段名: 路径}
|
||
"""
|
||
if path is None:
|
||
path = []
|
||
|
||
sensitive_fields_found = {}
|
||
self.logger.info(f"response_data:{response_data}")
|
||
if isinstance(response_data, dict):
|
||
for key, value in response_data.items():
|
||
current_path = path + [key]
|
||
|
||
# 检查当前字段名是否包含任何敏感关键词,不再限制值必须是字符串类型
|
||
key_lower = key.lower()
|
||
for sensitive_field in self.sensitive_fields:
|
||
sensitive_field_lower = sensitive_field.lower()
|
||
# 修改为包含匹配而不是严格匹配
|
||
if sensitive_field_lower in key_lower or key_lower in sensitive_field_lower:
|
||
# 移除对值类型的检查,允许任何类型的敏感字段
|
||
sensitive_fields_found[key] = current_path
|
||
self.logger.info(f"找到敏感字段: {key}, 匹配关键词: {sensitive_field}")
|
||
break
|
||
|
||
# 递归检查子字段
|
||
if isinstance(value, (dict, list)):
|
||
nested_fields = self.find_sensitive_fields_in_response(value, current_path)
|
||
sensitive_fields_found.update(nested_fields)
|
||
|
||
elif isinstance(response_data, list):
|
||
for i, item in enumerate(response_data):
|
||
current_path = path + [i]
|
||
if isinstance(item, (dict, list)):
|
||
nested_fields = self.find_sensitive_fields_in_response(item, current_path)
|
||
sensitive_fields_found.update(nested_fields)
|
||
|
||
return sensitive_fields_found
|
||
|
||
def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]:
|
||
"""
|
||
验证响应中的敏感字段是否已加密。
|
||
|
||
Args:
|
||
response_context: API响应上下文
|
||
request_context: API请求上下文
|
||
|
||
Returns:
|
||
验证结果列表
|
||
"""
|
||
results = []
|
||
|
||
# 如果响应不是JSON格式,则跳过检查
|
||
if not response_context.json_content:
|
||
results.append(self.passed("响应不是JSON格式,跳过敏感字段加密检查。"))
|
||
return results
|
||
self.logger.info(f"response_context.json_content: {response_context.json_content}")
|
||
# 查找响应中的敏感字段
|
||
sensitive_fields_found = self.find_sensitive_fields_in_response(response_context.json_content)
|
||
|
||
if not sensitive_fields_found:
|
||
self.logger.info(f"未在响应中找到需要检查的敏感字段。")
|
||
results.append(self.passed("未在响应中找到需要检查的敏感字段。"))
|
||
return results
|
||
|
||
# 检查每个敏感字段是否已加密
|
||
for field_name, field_path in sensitive_fields_found.items():
|
||
# 获取字段值
|
||
current_data = response_context.json_content
|
||
try:
|
||
for path_part in field_path:
|
||
current_data = current_data[path_part]
|
||
except (KeyError, IndexError, TypeError):
|
||
self.logger.warning(f"无法访问路径 {field_path} 的值")
|
||
continue
|
||
|
||
self.logger.info(f"检查敏感字段: {field_name}, 路径: {field_path}, 值类型: {type(current_data)}, 值: {current_data}")
|
||
|
||
# 根据字段值类型进行不同处理
|
||
if isinstance(current_data, str):
|
||
# 字符串类型:检查是否已加密
|
||
if self.is_likely_encrypted(current_data):
|
||
results.append(self.passed(
|
||
f"敏感字段 '{'.'.join(map(str, field_path))}' 已正确加密。",
|
||
{"field_path": field_path, "field_name": field_name}
|
||
))
|
||
else:
|
||
results.append(self.failed(
|
||
f"敏感字段 '{'.'.join(map(str, field_path))}' 可能未加密,存在安全风险。",
|
||
{
|
||
"field_path": field_path,
|
||
"field_name": field_name,
|
||
"value_preview": current_data[:10] + "..." if len(current_data) > 10 else current_data
|
||
}
|
||
))
|
||
elif current_data is None:
|
||
# 空值:跳过检查
|
||
results.append(self.passed(
|
||
f"敏感字段 '{'.'.join(map(str, field_path))}' 的值为空,跳过加密检查。",
|
||
{"field_path": field_path, "field_name": field_name}
|
||
))
|
||
else:
|
||
# 非字符串类型:警告可能存在安全风险
|
||
results.append(self.failed(
|
||
f"敏感字段 '{'.'.join(map(str, field_path))}' 的值为非字符串类型 ({type(current_data).__name__}),可能存在安全风险。",
|
||
{
|
||
"field_path": field_path,
|
||
"field_name": field_name,
|
||
"field_type": type(current_data).__name__,
|
||
"value": str(current_data)
|
||
}
|
||
))
|
||
|
||
return results
|
||
|
||
@staticmethod
|
||
def passed(message: str, details: Optional[Dict[str, Any]] = None) -> ValidationResult:
|
||
"""创建通过的验证结果"""
|
||
return ValidationResult(passed=True, message=message, details=details)
|
||
|
||
@staticmethod
|
||
def failed(message: str, details: Optional[Dict[str, Any]] = None) -> ValidationResult:
|
||
"""创建失败的验证结果"""
|
||
return ValidationResult(passed=False, message=message, details=details) |