compliance/custom_testcases/security_checks.py
gongwenxin df90a5377f mvp
2025-06-16 14:49:49 +08:00

286 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Dict, Any, Optional, List, Union, Set
import re
import json
import logging
import os
from ddms_compliance_suite.test_framework_core import BaseAPITestCase, TestSeverity, ValidationResult, APIRequestContext, APIResponseContext
from ddms_compliance_suite.utils import schema_utils
class EncryptedFieldsCheck(BaseAPITestCase):
"""
检查API响应中的敏感字段是否已加密。
该测试用例允许用户定义一组需要加密的敏感字段名称,然后检查响应中这些字段是否为明文。
支持多种方式配置敏感字段列表:
1. 使用默认预定义的敏感字段集合
2. 通过API规范中的x-sensitive-fields扩展字段指定
3. 通过环境变量SENSITIVE_FIELDS指定逗号分隔的字符串
"""
# 元数据
id = "TC-SECURITY-002"
name = "敏感字段加密检查"
description = "验证API响应中的敏感字段是否已加密而非明文。"
severity = TestSeverity.HIGH
tags = ["security", "encryption", "sensitive_data"]
execution_order = 50 # 在基本状态码检查之后执行
# 默认需要检查的敏感字段名称集合
DEFAULT_SENSITIVE_FIELDS = {
"coord","location","position"
}
# 加密字符串的正则表达式模式
# 这里假设加密字符串通常是base64或hex编码的字符串
ENCRYPTED_PATTERNS = [
# Base64编码模式 (标准base64字符集长度通常是4的倍数且较长)
r'^[A-Za-z0-9+/]{20,}={0,2}$',
# Hex编码模式 (16进制字符长度通常是偶数且较长)
r'^[0-9a-fA-F]{16,}$',
# JWT Token模式 (三段base64编码用点分隔)
r'^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$',
# 通用哈希值模式 (固定长度的hex字符串如MD5, SHA1, SHA256等)
r'^[0-9a-fA-F]{32}$|^[0-9a-fA-F]{40}$|^[0-9a-fA-F]{64}$',
# 带前缀的加密字符串 (如bcrypt, PBKDF2等)
r'^\$2[ayb]\$.{56}$|^\$pbkdf2-sha\d+\$',
]
# 明文检测模式 (如果匹配这些模式,则可能是明文)
PLAINTEXT_PATTERNS = [
# 常见的明文密码模式 (字母数字特殊字符组合长度通常在6-20之间)
r'^[A-Za-z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,20}$',
# 手机号模式 (中国手机号)
r'^1[3-9]\d{9}$',
# 身份证号模式 (中国身份证号)
r'^\d{17}[\dXx]$|^\d{15}$',
# 银行卡号模式 (数字通常12-19位)
r'^\d{12,19}$',
# 邮箱模式
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
]
def __init__(self, endpoint_spec: Dict[str, Any], global_api_spec: Dict[str, Any], json_schema_validator: Optional[Any] = None, llm_service: Optional[Any] = None):
super().__init__(endpoint_spec, global_api_spec, json_schema_validator, llm_service)
# 获取敏感字段列表优先级API规范扩展字段 > 环境变量 > 默认值
self.sensitive_fields = self._get_sensitive_fields(endpoint_spec)
self.logger.info(f"测试用例 {self.id} ({self.name}) 已初始化,将检查以下敏感字段是否加密: {self.sensitive_fields}")
def _get_sensitive_fields(self, endpoint_spec: Dict[str, Any]) -> Set[str]:
return self.DEFAULT_SENSITIVE_FIELDS
"""
获取敏感字段列表,按以下优先级:
1. API规范中的x-sensitive-fields扩展字段
2. 环境变量SENSITIVE_FIELDS
3. 默认敏感字段列表DEFAULT_SENSITIVE_FIELDS
Args:
endpoint_spec: API端点规范
Returns:
敏感字段集合
"""
custom_fields = set()
# 1. 从API规范中的x-sensitive-fields扩展字段获取
if "x-sensitive-fields" in endpoint_spec:
try:
fields = endpoint_spec["x-sensitive-fields"]
if isinstance(fields, list):
custom_fields.update(fields)
elif isinstance(fields, str):
# 如果是JSON字符串尝试解析
try:
parsed_fields = json.loads(fields)
if isinstance(parsed_fields, list):
custom_fields.update(parsed_fields)
elif isinstance(parsed_fields, dict) and "fields" in parsed_fields:
# 支持 {"fields": ["field1", "field2"]} 格式
if isinstance(parsed_fields["fields"], list):
custom_fields.update(parsed_fields["fields"])
except json.JSONDecodeError:
# 如果不是JSON假设是逗号分隔的字符串
custom_fields.update([f.strip() for f in fields.split(",")])
except Exception as e:
self.logger.warning(f"解析API规范中的自定义敏感字段时出错: {e}")
# 2. 如果API规范中没有定义尝试从环境变量获取
if not custom_fields:
env_fields = os.environ.get("SENSITIVE_FIELDS")
if env_fields:
try:
# 尝试解析为JSON
try:
parsed_fields = json.loads(env_fields)
if isinstance(parsed_fields, list):
custom_fields.update(parsed_fields)
except json.JSONDecodeError:
# 如果不是JSON假设是逗号分隔的字符串
custom_fields.update([f.strip() for f in env_fields.split(",")])
except Exception as e:
self.logger.warning(f"解析环境变量中的自定义敏感字段时出错: {e}")
# 3. 如果以上方式都没有获取到自定义字段,使用默认值
if not custom_fields:
self.logger.info("未找到自定义敏感字段配置,使用默认敏感字段列表")
return self.DEFAULT_SENSITIVE_FIELDS
else:
self.logger.info(f"使用自定义敏感字段列表: {custom_fields}")
return custom_fields
def is_likely_encrypted(self, value: str) -> bool:
"""
判断一个字符串是否可能是加密的。
Args:
value: 要检查的字符串值
Returns:
如果字符串可能是加密的则返回True否则返回False
"""
# 如果值太短,可能不是有效的加密值
if len(value) < 16:
return False
# 检查是否匹配任何加密模式
for pattern in self.ENCRYPTED_PATTERNS:
if re.match(pattern, value):
return True
# 检查是否匹配任何明文模式
for pattern in self.PLAINTEXT_PATTERNS:
if re.match(pattern, value):
return False
# 如果没有匹配任何模式,默认认为是加密的(保守策略)
return True
def find_sensitive_fields_in_response(self, response_data: Any, path: List[Union[str, int]] = None) -> Dict[str, List[Union[str, int]]]:
"""
递归搜索响应数据中的敏感字段。
Args:
response_data: 响应数据(可能是字典、列表或基本类型)
path: 当前路径
Returns:
包含敏感字段路径的字典 {字段名: 路径}
"""
if path is None:
path = []
sensitive_fields_found = {}
self.logger.info(f"response_data:{response_data}")
if isinstance(response_data, dict):
for key, value in response_data.items():
current_path = path + [key]
# 检查当前字段名是否包含任何敏感关键词,不再限制值必须是字符串类型
key_lower = key.lower()
for sensitive_field in self.sensitive_fields:
sensitive_field_lower = sensitive_field.lower()
# 修改为包含匹配而不是严格匹配
if sensitive_field_lower in key_lower or key_lower in sensitive_field_lower:
# 移除对值类型的检查,允许任何类型的敏感字段
sensitive_fields_found[key] = current_path
self.logger.info(f"找到敏感字段: {key}, 匹配关键词: {sensitive_field}")
break
# 递归检查子字段
if isinstance(value, (dict, list)):
nested_fields = self.find_sensitive_fields_in_response(value, current_path)
sensitive_fields_found.update(nested_fields)
elif isinstance(response_data, list):
for i, item in enumerate(response_data):
current_path = path + [i]
if isinstance(item, (dict, list)):
nested_fields = self.find_sensitive_fields_in_response(item, current_path)
sensitive_fields_found.update(nested_fields)
return sensitive_fields_found
def validate_response(self, response_context: APIResponseContext, request_context: APIRequestContext) -> List[ValidationResult]:
"""
验证响应中的敏感字段是否已加密。
Args:
response_context: API响应上下文
request_context: API请求上下文
Returns:
验证结果列表
"""
results = []
# 如果响应不是JSON格式则跳过检查
if not response_context.json_content:
results.append(self.passed("响应不是JSON格式跳过敏感字段加密检查。"))
return results
self.logger.info(f"response_context.json_content: {response_context.json_content}")
# 查找响应中的敏感字段
sensitive_fields_found = self.find_sensitive_fields_in_response(response_context.json_content)
if not sensitive_fields_found:
self.logger.info(f"未在响应中找到需要检查的敏感字段。")
results.append(self.passed("未在响应中找到需要检查的敏感字段。"))
return results
# 检查每个敏感字段是否已加密
for field_name, field_path in sensitive_fields_found.items():
# 获取字段值
current_data = response_context.json_content
try:
for path_part in field_path:
current_data = current_data[path_part]
except (KeyError, IndexError, TypeError):
self.logger.warning(f"无法访问路径 {field_path} 的值")
continue
self.logger.info(f"检查敏感字段: {field_name}, 路径: {field_path}, 值类型: {type(current_data)}, 值: {current_data}")
# 根据字段值类型进行不同处理
if isinstance(current_data, str):
# 字符串类型:检查是否已加密
if self.is_likely_encrypted(current_data):
results.append(self.passed(
f"敏感字段 '{'.'.join(map(str, field_path))}' 已正确加密。",
{"field_path": field_path, "field_name": field_name}
))
else:
results.append(self.failed(
f"敏感字段 '{'.'.join(map(str, field_path))}' 可能未加密,存在安全风险。",
{
"field_path": field_path,
"field_name": field_name,
"value_preview": current_data[:10] + "..." if len(current_data) > 10 else current_data
}
))
elif current_data is None:
# 空值:跳过检查
results.append(self.passed(
f"敏感字段 '{'.'.join(map(str, field_path))}' 的值为空,跳过加密检查。",
{"field_path": field_path, "field_name": field_name}
))
else:
# 非字符串类型:警告可能存在安全风险
results.append(self.failed(
f"敏感字段 '{'.'.join(map(str, field_path))}' 的值为非字符串类型 ({type(current_data).__name__}),可能存在安全风险。",
{
"field_path": field_path,
"field_name": field_name,
"field_type": type(current_data).__name__,
"value": str(current_data)
}
))
return results
@staticmethod
def passed(message: str, details: Optional[Dict[str, Any]] = None) -> ValidationResult:
"""创建通过的验证结果"""
return ValidationResult(passed=True, message=message, details=details)
@staticmethod
def failed(message: str, details: Optional[Dict[str, Any]] = None) -> ValidationResult:
"""创建失败的验证结果"""
return ValidationResult(passed=False, message=message, details=details)