This commit is contained in:
gongwenxin 2025-05-19 17:09:30 +08:00
parent 156dcdfaf9
commit 82e79b393a
12 changed files with 0 additions and 2442 deletions

BIN
.coverage Normal file

Binary file not shown.

View File

@ -1,755 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
规则执行演示脚本
本脚本演示如何使用增强后的规则库执行API测试包括性能规则安全规则RESTful设计规则和错误处理规则的执行
脚本模拟了一系列API请求和响应场景并使用规则执行引擎验证这些场景是否符合预定义的规则
"""
import time
import json
import logging
import argparse
from dataclasses import dataclass, asdict
from typing import Dict, List, Any, Optional
import requests
from urllib.parse import urlparse
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ------------------ 模拟 API 调用相关类 ------------------
@dataclass
class APIRequest:
"""API请求类用于构建和发送API请求"""
method: str
url: str
headers: Optional[Dict[str, str]] = None
params: Optional[Dict[str, Any]] = None
json_data: Optional[Any] = None
body: Optional[Any] = None
data: Optional[Dict[str, Any]] = None
@dataclass
class APIResponse:
"""API响应类用于存储API响应结果"""
status_code: int
headers: Dict[str, str]
content: str
elapsed_time: float
json_content: Optional[Dict[str, Any]] = None
request: Optional[APIRequest] = None
def __post_init__(self):
if self.content and (self.headers.get('Content-Type', '').startswith('application/json') or
self.content.strip().startswith('{')):
try:
self.json_content = json.loads(self.content)
except json.JSONDecodeError:
self.json_content = None
class APICaller:
"""API调用器用于发送API请求并获取响应"""
def call_api(self, request: APIRequest) -> APIResponse:
"""模拟调用API返回响应结果"""
logger.info(f"发送 {request.method} 请求到 {request.url}")
# 这里只是模拟实际项目中应使用真实的HTTP请求
# 根据URL创建不同的模拟响应
start_time = time.time()
# 模拟不同的API响应场景
if "users" in request.url:
response = self._mock_user_api_response(request)
elif "slow" in request.url:
# 模拟慢响应
time.sleep(0.6) # 模拟600ms响应时间
response = self._mock_slow_api_response(request)
elif "error" in request.url:
# 模拟错误响应
response = self._mock_error_api_response(request)
elif "http://" in request.url:
# 模拟HTTP响应非HTTPS
response = self._mock_http_api_response(request)
else:
# 默认响应
response = self._mock_default_api_response(request)
elapsed_time = time.time() - start_time
response.elapsed_time = elapsed_time
response.request = request
return response
def _mock_user_api_response(self, request: APIRequest) -> APIResponse:
"""模拟用户API响应"""
headers = {
'Content-Type': 'application/json',
'X-Request-ID': '12345',
'X-Rate-Limit-Remaining': '1000'
}
content = json.dumps({
"id": 123,
"name": "Test User",
"email": "user@example.com",
"created_at": "2023-01-01T00:00:00Z",
"status": "active"
})
return APIResponse(
status_code=200,
headers=headers,
content=content,
elapsed_time=0.1
)
def _mock_slow_api_response(self, request: APIRequest) -> APIResponse:
"""模拟慢响应API"""
headers = {
'Content-Type': 'application/json',
'X-Request-ID': '54321',
'X-Rate-Limit-Remaining': '999'
}
content = json.dumps({
"id": 456,
"name": "Test User",
"email": "slow@example.com",
"created_at": "2023-01-02T00:00:00Z",
"status": "active",
"data": [1, 2, 3, 4, 5] * 100 # 添加一些数据使响应大一些
})
return APIResponse(
status_code=200,
headers=headers,
content=content,
elapsed_time=0.6 # 模拟600ms响应时间
)
def _mock_error_api_response(self, request: APIRequest) -> APIResponse:
"""模拟错误响应API"""
headers = {
'Content-Type': 'application/json',
'X-Request-ID': '67890'
}
# 标准错误响应格式
content = json.dumps({
"code": "user_not_found",
"message": "User not found",
"details": {
"resource": "User",
"id": request.params.get("id", "unknown") if request.params else "unknown"
}
})
return APIResponse(
status_code=404,
headers=headers,
content=content,
elapsed_time=0.05
)
def _mock_http_api_response(self, request: APIRequest) -> APIResponse:
"""模拟HTTP非HTTPSAPI响应"""
headers = {
'Content-Type': 'application/json',
'X-Request-ID': '13579'
}
content = json.dumps({
"message": "This is an insecure HTTP response",
"timestamp": time.time()
})
return APIResponse(
status_code=200,
headers=headers,
content=content,
elapsed_time=0.08
)
def _mock_default_api_response(self, request: APIRequest) -> APIResponse:
"""模拟默认API响应"""
headers = {
'Content-Type': 'application/json',
'X-Request-ID': '24680'
}
content = json.dumps({
"message": "Default response",
"method": request.method,
"path": urlparse(request.url).path,
"timestamp": time.time()
})
return APIResponse(
status_code=200,
headers=headers,
content=content,
elapsed_time=0.03
)
# ------------------ 规则模型类 ------------------
@dataclass
class RuleResult:
"""规则执行结果类"""
rule_id: str
rule_name: str
is_valid: bool
message: str
details: Optional[Dict[str, Any]] = None
category: Optional[str] = None
severity: Optional[str] = None
class RuleCategory:
"""规则类别枚举"""
PERFORMANCE = "Performance"
SECURITY = "Security"
API_DESIGN = "APIDesign"
ERROR_HANDLING = "ErrorHandling"
GENERAL = "General"
class RuleLifecycle:
"""规则生命周期枚举"""
REQUEST_PREPARATION = "RequestPreparation"
REQUEST_EXECUTION = "RequestExecution"
RESPONSE_VALIDATION = "ResponseValidation"
POST_VALIDATION = "PostValidation"
ANY_STAGE = "AnyStage"
class RuleScope:
"""规则作用域枚举"""
REQUEST_URL = "RequestURL"
REQUEST_HEADERS = "RequestHeaders"
REQUEST_PARAMS = "RequestParams"
REQUEST_BODY = "RequestBody"
RESPONSE_STATUS = "ResponseStatus"
RESPONSE_HEADERS = "ResponseHeaders"
RESPONSE_BODY = "ResponseBody"
RESPONSE_TIME = "ResponseTime"
SECURITY = "Security"
PERFORMANCE = "Performance"
ANY_SCOPE = "AnyScope"
class TargetType:
"""规则目标类型枚举"""
API_REQUEST = "APIRequest"
API_RESPONSE = "APIResponse"
ANY = "Any"
@dataclass
class BaseRule:
"""基础规则类"""
id: str
name: str
description: str
category: str = RuleCategory.GENERAL
version: str = "1.0.0"
severity: str = "warning"
is_enabled: bool = True
tags: List[str] = None
target_type: str = TargetType.ANY
lifecycle: str = RuleLifecycle.ANY_STAGE
scope: str = RuleScope.ANY_SCOPE
def __post_init__(self):
if self.tags is None:
self.tags = []
def to_dict(self) -> Dict[str, Any]:
"""将规则转换为字典"""
return asdict(self)
def validate(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""验证逻辑,由子类实现"""
raise NotImplementedError("子类必须实现validate方法")
@dataclass
class PerformanceRule(BaseRule):
"""性能规则类"""
category: str = RuleCategory.PERFORMANCE
target_type: str = TargetType.API_RESPONSE
lifecycle: str = RuleLifecycle.RESPONSE_VALIDATION
scope: str = RuleScope.RESPONSE_TIME
threshold: float = 500.0 # 默认阈值500毫秒
metric: str = "response_time"
unit: str = "ms"
def validate(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""验证API响应时间是否超过阈值"""
response = context.get('api_response')
if not response:
return {
'is_valid': False,
'message': '缺少API响应对象',
'details': {}
}
response_time = response.elapsed_time * 1000 # 转换为毫秒
if response_time > self.threshold:
return {
'is_valid': False,
'message': f'响应时间 {response_time:.2f}ms 超过阈值 {self.threshold}ms',
'details': {
'actual_time': response_time,
'threshold': self.threshold,
'unit': self.unit
}
}
return {
'is_valid': True,
'message': f'响应时间 {response_time:.2f}ms 在阈值 {self.threshold}ms 内',
'details': {
'actual_time': response_time,
'threshold': self.threshold,
'unit': self.unit
}
}
@dataclass
class SecurityRule(BaseRule):
"""安全规则类"""
category: str = RuleCategory.SECURITY
target_type: str = TargetType.API_REQUEST
lifecycle: str = RuleLifecycle.REQUEST_PREPARATION
scope: str = RuleScope.SECURITY
check_type: str = "transport_security" # 传输安全检查
expected_value: str = "https" # 期望值为https
def validate(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""验证API请求是否使用了HTTPS协议"""
request = context.get('api_request')
if not request:
return {
'is_valid': False,
'message': '缺少API请求对象',
'details': {}
}
url = str(request.url)
if not url.startswith('https://'):
return {
'is_valid': False,
'message': 'API请求必须使用HTTPS协议',
'details': {
'current_url': url,
'expected_protocol': 'https'
}
}
return {
'is_valid': True,
'message': 'API请求使用了HTTPS协议',
'details': {
'url': url
}
}
@dataclass
class RESTfulDesignRule(BaseRule):
"""RESTful设计规则类"""
category: str = RuleCategory.API_DESIGN
target_type: str = TargetType.API_REQUEST
lifecycle: str = RuleLifecycle.REQUEST_PREPARATION
scope: str = RuleScope.REQUEST_URL
design_aspect: str = "URL设计"
pattern: str = r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
def validate(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""验证API URL是否符合RESTful设计规范"""
import re
request = context.get('api_request')
if not request:
return {
'is_valid': False,
'message': '缺少API请求对象',
'details': {}
}
url = str(request.url)
# 解析URL获取路径部分
parsed_url = urlparse(url)
path = parsed_url.path
# 使用正则表达式验证路径
if not re.match(self.pattern, path):
return {
'is_valid': False,
'message': 'API URL不符合RESTful设计规范',
'details': {
'current_path': path,
'expected_pattern': self.pattern,
'suggestion': '路径应该遵循 /api/v{version}/{资源}[/{id}] 格式'
}
}
return {
'is_valid': True,
'message': 'API URL符合RESTful设计规范',
'details': {
'path': path
}
}
@dataclass
class ErrorHandlingRule(BaseRule):
"""错误处理规则类"""
category: str = RuleCategory.ERROR_HANDLING
target_type: str = TargetType.API_RESPONSE
lifecycle: str = RuleLifecycle.RESPONSE_VALIDATION
scope: str = RuleScope.RESPONSE_BODY
error_code: str = "*" # 匹配所有错误码
expected_status: int = -1 # 不验证状态码
def validate(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""验证API错误响应是否符合标准格式"""
response = context.get('api_response')
if not response:
return {
'is_valid': False,
'message': '缺少API响应对象',
'details': {}
}
# 只检查4xx和5xx状态码的响应
if response.status_code < 400:
return {
'is_valid': True,
'message': '非错误响应,跳过验证',
'details': {
'status_code': response.status_code
}
}
# 确保响应包含JSON内容
if not response.json_content:
return {
'is_valid': False,
'message': '错误响应不是有效的JSON格式',
'details': {
'status_code': response.status_code,
'content_type': response.headers.get('Content-Type', '未知')
}
}
# 检查错误响应的必要字段
required_fields = ['code', 'message']
missing_fields = [field for field in required_fields if field not in response.json_content]
if missing_fields:
return {
'is_valid': False,
'message': '错误响应缺少必要字段',
'details': {
'missing_fields': missing_fields,
'required_fields': required_fields,
'response': response.json_content
}
}
return {
'is_valid': True,
'message': '错误响应符合标准格式',
'details': {
'status_code': response.status_code,
'error_code': response.json_content.get('code'),
'error_message': response.json_content.get('message')
}
}
# ------------------ 规则执行引擎 ------------------
class RuleExecutor:
"""规则执行引擎"""
def __init__(self, rules: List[BaseRule] = None):
self.rules = rules or []
def add_rule(self, rule: BaseRule):
"""添加规则"""
self.rules.append(rule)
def execute_rule(self, rule: BaseRule, context: Dict[str, Any]) -> RuleResult:
"""执行单个规则"""
logger.info(f"执行规则 '{rule.name}' (ID: {rule.id})")
try:
result = rule.validate(context)
# 构建规则执行结果
rule_result = RuleResult(
rule_id=rule.id,
rule_name=rule.name,
is_valid=result.get('is_valid', False),
message=result.get('message', ''),
details=result.get('details', {}),
category=rule.category,
severity=rule.severity
)
# 记录执行结果
log_level = logging.INFO if rule_result.is_valid else logging.WARNING
logger.log(log_level, f"规则 '{rule.name}' 结果: {'通过' if rule_result.is_valid else '失败'} - {rule_result.message}")
return rule_result
except Exception as e:
# 记录执行异常
logger.error(f"执行规则 '{rule.name}' 时发生异常: {str(e)}", exc_info=True)
# 返回异常结果
return RuleResult(
rule_id=rule.id,
rule_name=rule.name,
is_valid=False,
message=f"规则执行异常: {str(e)}",
details={'exception': str(e)},
category=rule.category,
severity=rule.severity
)
def execute_rules_for_lifecycle(self, lifecycle: str, context: Dict[str, Any]) -> List[RuleResult]:
"""执行特定生命周期阶段的所有规则"""
logger.info(f"执行 {lifecycle} 阶段的规则")
# 筛选符合生命周期的规则
lifecycle_rules = [rule for rule in self.rules if rule.lifecycle == lifecycle or rule.lifecycle == RuleLifecycle.ANY_STAGE]
# 执行筛选出的规则
results = []
for rule in lifecycle_rules:
result = self.execute_rule(rule, context)
results.append(result)
# 统计执行结果
passed = sum(1 for result in results if result.is_valid)
failed = len(results) - passed
logger.info(f"{lifecycle} 阶段规则执行完成: {passed} 条通过, {failed} 条失败")
return results
def execute_all_rules(self, context: Dict[str, Any]) -> List[RuleResult]:
"""执行所有规则"""
logger.info(f"执行所有规则")
results = []
for rule in self.rules:
result = self.execute_rule(rule, context)
results.append(result)
# 统计执行结果
passed = sum(1 for result in results if result.is_valid)
failed = len(results) - passed
logger.info(f"所有规则执行完成: {passed} 条通过, {failed} 条失败")
return results
# ------------------ 演示函数 ------------------
def create_demo_rules() -> List[BaseRule]:
"""创建演示用的规则"""
rules = []
# 添加性能规则
performance_rule = PerformanceRule(
id="response-time-max-500ms",
name="响应时间不超过500毫秒",
description="验证API响应时间不超过500毫秒",
threshold=500,
metric="response_time",
unit="ms"
)
rules.append(performance_rule)
# 添加安全规则
security_rule = SecurityRule(
id="https-only-rule",
name="HTTPS强制使用规则",
description="验证API是否只使用HTTPS协议",
severity="error",
check_type="transport_security",
expected_value="https"
)
rules.append(security_rule)
# 添加RESTful设计规则
restful_rule = RESTfulDesignRule(
id="restful-url-pattern",
name="RESTful URL设计规则",
description="验证API URL是否符合RESTful设计规范",
pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
)
rules.append(restful_rule)
# 添加错误处理规则
error_rule = ErrorHandlingRule(
id="standard-error-response",
name="标准错误响应格式规则",
description="验证API错误响应是否符合标准格式",
error_code="*"
)
rules.append(error_rule)
return rules
def demo_rule_execution():
"""演示规则执行过程"""
logger.info("开始规则执行演示")
# 创建规则
rules = create_demo_rules()
# 创建规则执行引擎
executor = RuleExecutor(rules)
# 创建API调用器
api_caller = APICaller()
# === 场景1: 正常响应场景 ===
logger.info("\n=== 场景1: 正常响应场景 ===")
# 创建API请求
request1 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
# 执行请求准备阶段的规则
context1 = {"api_request": request1}
prep_results1 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context1)
# 发送API请求
response1 = api_caller.call_api(request1)
# 执行响应验证阶段的规则
context1["api_response"] = response1
resp_results1 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context1)
# === 场景2: 慢响应场景 ===
logger.info("\n=== 场景2: 慢响应场景 ===")
# 创建API请求
request2 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/slow",
headers={"Content-Type": "application/json"}
)
# 执行请求准备阶段的规则
context2 = {"api_request": request2}
prep_results2 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context2)
# 发送API请求
response2 = api_caller.call_api(request2)
# 执行响应验证阶段的规则
context2["api_response"] = response2
resp_results2 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context2)
# === 场景3: 错误响应场景 ===
logger.info("\n=== 场景3: 错误响应场景 ===")
# 创建API请求
request3 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/error",
headers={"Content-Type": "application/json"},
params={"id": "999"}
)
# 执行请求准备阶段的规则
context3 = {"api_request": request3}
prep_results3 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context3)
# 发送API请求
response3 = api_caller.call_api(request3)
# 执行响应验证阶段的规则
context3["api_response"] = response3
resp_results3 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context3)
# === 场景4: 非HTTPS场景 ===
logger.info("\n=== 场景4: 非HTTPS场景 ===")
# 创建API请求
request4 = APIRequest(
method="GET",
url="http://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
# 执行请求准备阶段的规则
context4 = {"api_request": request4}
prep_results4 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context4)
# 发送API请求
response4 = api_caller.call_api(request4)
# 执行响应验证阶段的规则
context4["api_response"] = response4
resp_results4 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context4)
# === 场景5: 不符合RESTful设计的URL ===
logger.info("\n=== 场景5: 不符合RESTful设计的URL ===")
# 创建API请求
request5 = APIRequest(
method="GET",
url="https://api.example.com/getUserInfo?id=123",
headers={"Content-Type": "application/json"}
)
# 执行请求准备阶段的规则
context5 = {"api_request": request5}
prep_results5 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context5)
# 发送API请求
response5 = api_caller.call_api(request5)
# 执行响应验证阶段的规则
context5["api_response"] = response5
resp_results5 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context5)
logger.info("\n演示完成")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='规则执行演示脚本')
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO',
help='日志级别')
args = parser.parse_args()
# 设置日志级别
logging.getLogger().setLevel(getattr(logging, args.log_level))
# 执行演示
demo_rule_execution()
if __name__ == "__main__":
main()

View File

@ -1,339 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
规则执行演示脚本内存版本
此示例演示如何在内存中创建规则并执行不依赖文件系统存储
"""
import sys
import logging
import json
import time
import re
from pathlib import Path
from typing import Dict, Any, List
# 添加项目根目录到Python路径
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from ddms_compliance_suite.models.rule_models import (
BaseRule, PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule,
RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel
)
from ddms_compliance_suite.rule_executor.executor import RuleExecutor, RuleExecutionResult
from ddms_compliance_suite.api_caller.caller import APIRequest, APIResponse
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MockRuleRepository:
"""
模拟规则库用于在内存中存储规则而不是文件系统
"""
def __init__(self):
self.rules: Dict[str, BaseRule] = {}
def save_rule(self, rule: BaseRule) -> bool:
"""保存规则到内存"""
self.rules[rule.id] = rule
return True
def load_rule_by_id(self, rule_id: str, version: str = None) -> BaseRule:
"""从内存加载规则"""
return self.rules.get(rule_id)
def get_rules_by_lifecycle(self, lifecycle: RuleLifecycle, target_type: TargetType = None) -> List[BaseRule]:
"""获取指定生命周期阶段的规则"""
results = []
for rule in self.rules.values():
if rule.lifecycle == lifecycle:
if target_type is None or rule.target_type == target_type:
results.append(rule)
return results
def get_rules_for_target(self, target_type: TargetType, target_id: str) -> List[BaseRule]:
"""获取适用于特定目标的规则"""
results = []
for rule in self.rules.values():
if rule.target_type == target_type:
results.append(rule)
return results
def create_mock_rules() -> List[BaseRule]:
"""创建测试用的规则"""
rules = []
# 1. 性能规则 - 响应时间不超过500毫秒
performance_rule = PerformanceRule(
id="response-time-max-500ms",
name="响应时间不超过500毫秒",
description="验证API响应时间不超过500毫秒",
category=RuleCategory.PERFORMANCE,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_TIME,
threshold=500.0,
metric="response_time",
unit="ms"
)
rules.append(performance_rule)
# 2. 安全规则 - HTTPS强制使用
security_rule = SecurityRule(
id="https-only-rule",
name="HTTPS强制使用规则",
description="验证API请求是否使用了HTTPS协议",
category=RuleCategory.SECURITY,
severity=SeverityLevel.ERROR,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.SECURITY,
check_type="transport_security",
expected_value="https"
)
rules.append(security_rule)
# 3. RESTful设计规则 - URL路径格式
restful_rule = RESTfulDesignRule(
id="restful-url-pattern",
name="RESTful URL设计规则",
description="验证API URL是否符合RESTful设计规范",
category=RuleCategory.API_DESIGN,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.REQUEST_URL,
design_aspect="URL设计",
pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
)
rules.append(restful_rule)
# 4. 错误处理规则 - 错误响应格式
error_rule = ErrorHandlingRule(
id="standard-error-response",
name="标准错误响应格式规则",
description="验证API错误响应是否符合标准格式",
category=RuleCategory.ERROR_HANDLING,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_BODY,
error_code="*",
expected_status=400
)
rules.append(error_rule)
return rules
def log_rule_execution_results(results: List[RuleExecutionResult], phase: str):
"""记录规则执行结果"""
total_count = len(results)
passed_count = sum(1 for r in results if r.is_valid)
logger.info(f"=== {phase} 规则执行结果: {passed_count}/{total_count} 通过 ===")
for result in results:
status = "通过" if result.is_valid else "失败"
logger.info(f"规则: {result.rule_name} (ID: {result.rule_id}) - 结果: {status} - 消息: {result.message}")
def mock_api_response(status_code=200, content=None, elapsed_time=0.1):
"""创建模拟API响应"""
if content is None:
content = {"id": 123, "name": "测试用户", "status": "active"}
return APIResponse(
status_code=status_code,
headers={"Content-Type": "application/json"},
content=json.dumps(content).encode('utf-8'),
elapsed_time=elapsed_time,
json_content=content
)
def demo_rule_execution():
"""演示规则执行"""
# 创建模拟规则库和规则
repo = MockRuleRepository()
rules = create_mock_rules()
# 将规则添加到规则库
for rule in rules:
repo.save_rule(rule)
# 创建规则执行器
executor = RuleExecutor(repo)
# 演示场景1: 标准场景 - 所有规则应通过
logger.info("\n========== 场景1: 标准场景 - 所有规则应通过 ==========")
request1 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
response1 = mock_api_response()
# 创建上下文
context1 = {
'api_request': request1,
'api_response': response1,
'endpoint_id': 'GET /api/v1/users/123'
}
# 执行请求准备阶段规则
logger.info("执行请求准备阶段规则...")
request_results1 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context1
)
log_rule_execution_results(request_results1, "请求准备阶段")
# 执行响应验证阶段规则
logger.info("执行响应验证阶段规则...")
response_results1 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context1
)
log_rule_execution_results(response_results1, "响应验证阶段")
# 演示场景2: 慢响应 - 性能规则应失败
logger.info("\n========== 场景2: 慢响应 - 性能规则应失败 ==========")
request2 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
# 创建一个响应时间为1.5秒的慢响应
response2 = mock_api_response(elapsed_time=1.5)
# 创建上下文
context2 = {
'api_request': request2,
'api_response': response2,
'endpoint_id': 'GET /api/v1/users/123'
}
# 执行请求准备阶段规则
logger.info("执行请求准备阶段规则...")
request_results2 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context2
)
log_rule_execution_results(request_results2, "请求准备阶段")
# 执行响应验证阶段规则
logger.info("执行响应验证阶段规则...")
response_results2 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context2
)
log_rule_execution_results(response_results2, "响应验证阶段")
# 演示场景3: 错误响应 - 错误处理规则应验证
logger.info("\n========== 场景3: 错误响应 - 错误处理规则应验证 ==========")
request3 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/999",
headers={"Content-Type": "application/json"}
)
# 创建一个404错误响应
response3 = mock_api_response(
status_code=404,
content={"code": "USER_NOT_FOUND", "message": "用户不存在"}
)
# 创建上下文
context3 = {
'api_request': request3,
'api_response': response3,
'endpoint_id': 'GET /api/v1/users/999'
}
# 执行请求准备阶段规则
logger.info("执行请求准备阶段规则...")
request_results3 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context3
)
log_rule_execution_results(request_results3, "请求准备阶段")
# 执行响应验证阶段规则
logger.info("执行响应验证阶段规则...")
response_results3 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context3
)
log_rule_execution_results(response_results3, "响应验证阶段")
# 演示场景4: 非HTTPS请求 - 安全规则应失败
logger.info("\n========== 场景4: 非HTTPS请求 - 安全规则应失败 ==========")
request4 = APIRequest(
method="GET",
url="http://api.example.com/api/v1/users/123", # 使用HTTP而不是HTTPS
headers={"Content-Type": "application/json"}
)
response4 = mock_api_response()
# 创建上下文
context4 = {
'api_request': request4,
'api_response': response4,
'endpoint_id': 'GET /api/v1/users/123'
}
# 执行请求准备阶段规则
logger.info("执行请求准备阶段规则...")
request_results4 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context4
)
log_rule_execution_results(request_results4, "请求准备阶段")
# 执行响应验证阶段规则
logger.info("执行响应验证阶段规则...")
response_results4 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context4
)
log_rule_execution_results(response_results4, "响应验证阶段")
# 演示场景5: 非RESTful URL - RESTful设计规则应失败
logger.info("\n========== 场景5: 非RESTful URL - RESTful设计规则应失败 ==========")
request5 = APIRequest(
method="GET",
url="https://api.example.com/getUserInfo?id=123", # 非RESTful URL
headers={"Content-Type": "application/json"}
)
response5 = mock_api_response()
# 创建上下文
context5 = {
'api_request': request5,
'api_response': response5,
'endpoint_id': 'GET /getUserInfo'
}
# 执行请求准备阶段规则
logger.info("执行请求准备阶段规则...")
request_results5 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context5
)
log_rule_execution_results(request_results5, "请求准备阶段")
# 执行响应验证阶段规则
logger.info("执行响应验证阶段规则...")
response_results5 = executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context5
)
log_rule_execution_results(response_results5, "响应验证阶段")
if __name__ == "__main__":
demo_rule_execution()

View File

@ -1,161 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
规则库演示脚本
此示例演示如何使用规则库创建保存和加载规则
"""
import sys
import logging
import json
import shutil
import tempfile
from pathlib import Path
from typing import Dict, Any, List
# 添加项目根目录到Python路径
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from ddms_compliance_suite.models.rule_models import (
BaseRule, PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule,
RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel
)
from ddms_compliance_suite.models.config_models import RuleRepositoryConfig, RuleStorageConfig
from ddms_compliance_suite.rule_repository.repository import RuleRepository
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def create_test_rules() -> List[BaseRule]:
"""创建测试用的规则"""
rules = []
# 1. 性能规则 - 响应时间不超过500毫秒
performance_rule = PerformanceRule(
id="response-time-max-500ms",
name="响应时间不超过500毫秒",
description="验证API响应时间不超过500毫秒",
category=RuleCategory.PERFORMANCE,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_TIME,
threshold=500.0,
metric="response_time",
unit="ms"
)
rules.append(performance_rule)
# 2. 安全规则 - HTTPS强制使用
security_rule = SecurityRule(
id="https-only-rule",
name="HTTPS强制使用规则",
description="验证API请求是否使用了HTTPS协议",
category=RuleCategory.SECURITY,
severity=SeverityLevel.ERROR,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.SECURITY,
check_type="transport_security",
expected_value="https"
)
rules.append(security_rule)
# 3. RESTful设计规则 - URL路径格式
restful_rule = RESTfulDesignRule(
id="restful-url-pattern",
name="RESTful URL设计规则",
description="验证API URL是否符合RESTful设计规范",
category=RuleCategory.API_DESIGN,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.REQUEST_URL,
design_aspect="URL设计",
pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
)
rules.append(restful_rule)
# 4. 错误处理规则 - 错误响应格式
error_rule = ErrorHandlingRule(
id="standard-error-response",
name="标准错误响应格式规则",
description="验证API错误响应是否符合标准格式",
category=RuleCategory.ERROR_HANDLING,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_BODY,
error_code="*",
expected_status=400
)
rules.append(error_rule)
return rules
def test_rule_repository():
"""演示规则库的使用"""
# 创建临时目录用于存储规则
temp_dir = tempfile.mkdtemp(prefix="rule_repo_demo_")
logger.info(f"使用临时目录: {temp_dir}")
try:
# 创建规则库配置
config = RuleRepositoryConfig(
storage=RuleStorageConfig(
type="filesystem",
path=temp_dir
),
preload_rules=True
)
# 创建规则库
repo = RuleRepository(config)
logger.info("规则库初始化完成")
# 创建测试规则并保存到规则库
rules = create_test_rules()
for rule in rules:
saved = repo.save_rule(rule)
logger.info(f"保存规则 {rule.id} ({rule.__class__.__name__}): {'成功' if saved else '失败'}")
# 从规则库加载规则
logger.info("\n加载规则进行验证:")
for rule_id in [r.id for r in rules]:
loaded_rule = repo.get_rule(rule_id)
if loaded_rule:
logger.info(f"加载规则 {rule_id}: 成功, 类型: {loaded_rule.__class__.__name__}")
# 检查类型特定属性
if isinstance(loaded_rule, PerformanceRule):
logger.info(f" - 性能阈值: {loaded_rule.threshold} {loaded_rule.unit}")
elif isinstance(loaded_rule, SecurityRule):
logger.info(f" - 安全检查类型: {loaded_rule.check_type}, 预期值: {loaded_rule.expected_value}")
elif isinstance(loaded_rule, RESTfulDesignRule):
logger.info(f" - URL模式: {loaded_rule.pattern}")
elif isinstance(loaded_rule, ErrorHandlingRule):
logger.info(f" - 错误码: {loaded_rule.error_code}, 预期状态码: {loaded_rule.expected_status}")
else:
logger.error(f"加载规则 {rule_id}: 失败")
# 按生命周期查询规则
logger.info("\n按生命周期查询规则:")
for lifecycle in [RuleLifecycle.REQUEST_PREPARATION, RuleLifecycle.RESPONSE_VALIDATION]:
rules_for_lifecycle = repo.get_rules_by_lifecycle(lifecycle)
logger.info(f"生命周期 {lifecycle.value} 的规则数量: {len(rules_for_lifecycle)}")
for rule in rules_for_lifecycle:
logger.info(f" - {rule.id} ({rule.__class__.__name__})")
finally:
# 清理临时目录
logger.info(f"清理临时目录: {temp_dir}")
shutil.rmtree(temp_dir)
if __name__ == "__main__":
test_rule_repository()

View File

@ -1,99 +0,0 @@
id: authentication-headers-required
name: API认证头部要求规则
description: 验证API请求是否包含必要的认证头部如Authorization、X-API-Key等
category: Security
version: 1.0.0
severity: error
is_enabled: true
tags:
- security
- authentication
- headers
- authorization
target_type: APIRequest
lifecycle: RequestPreparation
scope: RequestHeaders
required_headers:
- name: Authorization
pattern: "^(Bearer|Basic|Digest) [A-Za-z0-9\\-\\._~\\+\\/]+=*$"
description: 认证令牌,格式应为"Bearer token"、"Basic base64"或"Digest value"
- name: X-API-Key
pattern: "^[A-Za-z0-9]{32,64}$"
description: API密钥应为32-64位的字母数字字符
- name: X-Request-ID
pattern: "^[A-Za-z0-9\\-]{8,36}$"
description: 请求ID用于跟踪请求应为8-36位的字母数字和连字符
check_mode: any # 可以是"all"或"any",表示是需要满足所有头部还是任一头部
code: |
def validate(context):
"""验证API请求是否包含必要的认证头部"""
request = context.get('api_request')
if not request:
return {'is_valid': False, 'message': '缺少API请求对象'}
headers = request.headers or {}
required_headers = context.get('required_headers', [])
check_mode = context.get('check_mode', 'any')
# 获取缺失的头部和无效的头部
missing_headers = []
invalid_headers = []
valid_headers = []
import re
for header in required_headers:
header_name = header.get('name')
header_pattern = header.get('pattern')
if header_name not in headers:
missing_headers.append({
'name': header_name,
'description': header.get('description', '')
})
continue
header_value = headers[header_name]
# 如果指定了模式,验证头部值是否符合模式
if header_pattern and not re.match(header_pattern, header_value):
invalid_headers.append({
'name': header_name,
'value': header_value,
'pattern': header_pattern,
'description': header.get('description', '')
})
else:
valid_headers.append(header_name)
# 根据检查模式判断是否验证通过
if check_mode == 'all':
# 需要所有头部都存在且有效
is_valid = not missing_headers and not invalid_headers
else: # 'any'
# 至少有一个有效的头部即可
is_valid = len(valid_headers) > 0
if is_valid:
message = '请求包含有效的认证头部'
if check_mode == 'any':
message += f": {', '.join(valid_headers)}"
else:
if check_mode == 'all':
if missing_headers:
message = f"请求缺少必要的认证头部: {', '.join([h['name'] for h in missing_headers])}"
else:
message = f"请求包含无效的认证头部: {', '.join([h['name'] for h in invalid_headers])}"
else: # 'any'
message = f"请求不包含任何有效的认证头部,至少需要其中之一: {', '.join([h['name'] for h in required_headers])}"
return {
'is_valid': is_valid,
'message': message,
'details': {
'check_mode': check_mode,
'valid_headers': valid_headers,
'missing_headers': missing_headers,
'invalid_headers': invalid_headers
}
}

View File

@ -1,83 +0,0 @@
id: standard-error-response
name: 标准错误响应格式规则
description: 验证API错误响应是否符合标准格式
category: ErrorHandling
version: 1.0.0
severity: warning
is_enabled: true
tags:
- error-handling
- response-format
target_type: APIResponse
lifecycle: ResponseValidation
scope: ResponseBody
error_code: "*" # 匹配所有错误码
expected_status: -1 # 不验证状态码
code: |
def validate(context):
response = context.get('api_response')
if not response:
return {'is_valid': False, 'message': '缺少API响应对象'}
# 只检查4xx和5xx状态码的响应
if response.status_code < 400:
return {'is_valid': True, 'message': '非错误响应,跳过验证'}
# 确保响应包含JSON内容
if not response.json_content:
return {
'is_valid': False,
'message': '错误响应不是有效的JSON格式',
'details': {
'status_code': response.status_code,
'content_type': response.headers.get('Content-Type', '未知')
}
}
# 检查错误响应的必要字段
required_fields = ['code', 'message']
missing_fields = [field for field in required_fields if field not in response.json_content]
if missing_fields:
return {
'is_valid': False,
'message': '错误响应缺少必要字段',
'details': {
'missing_fields': missing_fields,
'required_fields': required_fields,
'response': response.json_content
}
}
# 检查字段类型
if not isinstance(response.json_content.get('code'), (str, int)):
return {
'is_valid': False,
'message': '错误码字段类型不正确',
'details': {
'field': 'code',
'expected_type': 'string或integer',
'actual_type': type(response.json_content.get('code')).__name__
}
}
if not isinstance(response.json_content.get('message'), str):
return {
'is_valid': False,
'message': '错误消息字段类型不正确',
'details': {
'field': 'message',
'expected_type': 'string',
'actual_type': type(response.json_content.get('message')).__name__
}
}
return {
'is_valid': True,
'message': '错误响应符合标准格式',
'details': {
'status_code': response.status_code,
'error_code': response.json_content.get('code'),
'error_message': response.json_content.get('message')
}
}

View File

@ -1,45 +0,0 @@
id: response-time-threshold
name: 响应时间阈值规则
description: 验证API响应时间是否在允许的范围内
category: Performance
version: 1.0.0
severity: warning
is_enabled: true
tags:
- performance
- response-time
target_type: APIResponse
lifecycle: ResponseValidation
scope: ResponseTime
threshold: 500 # 毫秒
metric: response_time
unit: ms
code: |
def validate(context):
response = context.get('api_response')
if not response:
return {'is_valid': False, 'message': '缺少API响应对象'}
response_time = response.elapsed_time * 1000 # 转换为毫秒
threshold = context.get('threshold', 500) # 默认500毫秒
if response_time > threshold:
return {
'is_valid': False,
'message': f'响应时间 {response_time:.2f}ms 超过阈值 {threshold}ms',
'details': {
'actual_time': response_time,
'threshold': threshold,
'unit': 'ms'
}
}
return {
'is_valid': True,
'message': f'响应时间 {response_time:.2f}ms 在阈值 {threshold}ms 内',
'details': {
'actual_time': response_time,
'threshold': threshold,
'unit': 'ms'
}
}

View File

@ -1,51 +0,0 @@
id: restful-url-pattern
name: RESTful URL设计规则
description: 验证API URL是否符合RESTful设计规范
category: APIDesign
version: 1.0.0
severity: warning
is_enabled: true
tags:
- restful
- api-design
- url-pattern
target_type: APIRequest
lifecycle: RequestPreparation
scope: RequestURL
design_aspect: URL设计
pattern: "^/api/v\\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
code: |
import re
def validate(context):
request = context.get('api_request')
if not request:
return {'is_valid': False, 'message': '缺少API请求对象'}
url = str(request.url)
# 解析URL获取路径部分
from urllib.parse import urlparse
parsed_url = urlparse(url)
path = parsed_url.path
# 使用正则表达式验证路径
pattern = context.get('pattern', "^/api/v\\d+/[a-z0-9-]+(/[a-z0-9-]+)*$")
if not re.match(pattern, path):
return {
'is_valid': False,
'message': 'API URL不符合RESTful设计规范',
'details': {
'current_path': path,
'expected_pattern': pattern,
'suggestion': '路径应该遵循 /api/v{version}/{资源}[/{id}] 格式'
}
}
return {
'is_valid': True,
'message': 'API URL符合RESTful设计规范',
'details': {
'path': path
}
}

View File

@ -1,41 +0,0 @@
id: https-only-rule
name: HTTPS强制使用规则
description: 验证API是否只使用HTTPS协议确保通信安全
category: Security
version: 1.0.0
severity: error
is_enabled: true
tags:
- security
- https
- encryption
target_type: APIRequest
lifecycle: RequestPreparation
scope: Security
check_type: transport_security
expected_value: https
code: |
def validate(context):
request = context.get('api_request')
if not request:
return {'is_valid': False, 'message': '缺少API请求对象'}
url = str(request.url)
if not url.startswith('https://'):
return {
'is_valid': False,
'message': 'API请求必须使用HTTPS协议',
'details': {
'current_url': url,
'expected_protocol': 'https'
}
}
return {
'is_valid': True,
'message': 'API请求使用了HTTPS协议',
'details': {
'url': url
}
}

View File

@ -1,170 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试编排器与规则库集成示例
此示例展示如何使用集成了规则库的测试编排器来执行API测试
"""
import os
import sys
import json
import logging
from pathlib import Path
# 添加项目根目录到Python路径
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator
from ddms_compliance_suite.models.rule_models import (
BaseRule, PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule,
RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel
)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def create_example_rules(rule_repo):
"""创建示例规则"""
# 1. 性能规则 - 响应时间不超过500毫秒
performance_rule = PerformanceRule(
id="response-time-max-500ms",
name="响应时间不超过500毫秒",
description="验证API响应时间不超过500毫秒",
category=RuleCategory.PERFORMANCE,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_TIME,
threshold=500.0,
metric="response_time",
unit="ms"
)
# 2. 安全规则 - HTTPS必须使用
security_rule = SecurityRule(
id="https-only-rule",
name="HTTPS强制使用规则",
description="验证API请求是否使用了HTTPS协议",
category=RuleCategory.SECURITY,
severity=SeverityLevel.ERROR,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.SECURITY,
check_type="transport_security",
expected_value="https"
)
# 3. RESTful设计规则 - URL路径格式
restful_rule = RESTfulDesignRule(
id="restful-url-pattern",
name="RESTful URL设计规则",
description="验证API URL是否符合RESTful设计规范",
category=RuleCategory.API_DESIGN,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.REQUEST_URL,
design_aspect="URL设计",
pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
)
# 4. 错误处理规则 - 错误响应格式
error_rule = ErrorHandlingRule(
id="standard-error-response",
name="标准错误响应格式规则",
description="验证API错误响应是否符合标准格式",
category=RuleCategory.ERROR_HANDLING,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_BODY,
error_code="*",
expected_status=400
)
# 保存规则到规则库
rule_repo.save_rule(performance_rule)
rule_repo.save_rule(security_rule)
rule_repo.save_rule(restful_rule)
rule_repo.save_rule(error_rule)
logger.info("已创建示例规则")
def run_api_tests_with_rules(base_url, yapi_file_path, output_file=None):
"""
使用规则库运行API测试
Args:
base_url: API基础URL
yapi_file_path: YAPI定义文件路径
output_file: 输出文件路径可选
"""
# 创建临时规则目录
rules_dir = Path("./temp_rules")
rules_dir.mkdir(exist_ok=True)
try:
# 初始化测试编排器
orchestrator = APITestOrchestrator(
base_url=base_url,
rule_repo_path=str(rules_dir)
)
# 创建示例规则
create_example_rules(orchestrator.rule_repo)
# 运行API测试
logger.info(f"开始运行API测试: {yapi_file_path}")
summary = orchestrator.run_tests_from_yapi(yapi_file_path)
# 打印测试结果摘要
summary.print_summary()
# 保存测试结果
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(summary.to_json(pretty=True))
logger.info(f"测试结果已保存到: {output_file}")
return summary
finally:
# 清理临时规则目录
# 注意:如果要保留规则,可以注释掉这部分
import shutil
if rules_dir.exists():
shutil.rmtree(rules_dir)
def parse_arguments():
"""解析命令行参数"""
import argparse
parser = argparse.ArgumentParser(description='测试编排器与规则库集成示例')
parser.add_argument('--base-url', required=True, help='API基础URL')
parser.add_argument('--yapi', required=True, help='YAPI定义文件路径')
parser.add_argument('--output', help='输出文件路径')
return parser.parse_args()
def main():
"""主函数"""
args = parse_arguments()
# 运行API测试
summary = run_api_tests_with_rules(
base_url=args.base_url,
yapi_file_path=args.yapi,
output_file=args.output
)
# 返回测试结果状态码
return 1 if summary.failed > 0 or summary.error > 0 else 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,282 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
规则库和规则执行引擎示例
本示例展示如何使用规则库和规则执行引擎来验证API请求和响应
"""
import os
import sys
import json
import logging
from pathlib import Path
# 确保能够导入项目模块
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from ddms_compliance_suite.models.config_models import AppConfig, RuleRepositoryConfig
from ddms_compliance_suite.rule_repository.repository import RuleRepository
from ddms_compliance_suite.rule_executor.executor import RuleExecutor
from ddms_compliance_suite.api_caller.caller import APICaller, APIRequest, APIResponse
from ddms_compliance_suite.models.rule_models import (
RuleCategory, TargetType, RuleLifecycle, RuleScope,
PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule
)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def load_example_rules():
"""加载示例规则"""
logger.info("加载示例规则...")
# 创建规则库配置
repo_config = RuleRepositoryConfig(
storage={"type": "filesystem", "path": "./examples/rules"},
preload_rules=True
)
# 创建规则库
repo = RuleRepository(repo_config)
# 加载YAML文件中的规则
yaml_rules_loaded = len(repo.query_rules())
logger.info(f"从YAML文件加载了 {yaml_rules_loaded} 条规则")
# 创建几个示例规则对象并保存
if yaml_rules_loaded == 0:
# 性能规则示例
performance_rule = PerformanceRule(
id="response-time-max-500ms",
name="响应时间不超过500毫秒",
description="验证API响应时间不超过500毫秒",
category=RuleCategory.PERFORMANCE,
severity="warning",
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_TIME,
threshold=500,
metric="response_time",
unit="ms"
)
# 安全规则示例
security_rule = SecurityRule(
id="https-only",
name="仅允许HTTPS",
description="验证API请求仅使用HTTPS协议",
category=RuleCategory.SECURITY,
severity="error",
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.SECURITY,
check_type="transport_security",
expected_value="https"
)
# RESTful设计规则示例
design_rule = RESTfulDesignRule(
id="restful-url-design",
name="RESTful URL设计",
description="验证API URL是否符合RESTful设计规范",
category=RuleCategory.API_DESIGN,
severity="warning",
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.REQUEST_URL,
design_aspect="URL设计",
pattern="^/api/v\\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
)
# 错误处理规则示例
error_rule = ErrorHandlingRule(
id="standard-error-format",
name="标准错误格式",
description="验证API错误响应是否符合标准格式",
category=RuleCategory.ERROR_HANDLING,
severity="warning",
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_BODY,
error_code="*",
expected_status=-1
)
# 保存规则
repo.save_rule(performance_rule)
repo.save_rule(security_rule)
repo.save_rule(design_rule)
repo.save_rule(error_rule)
logger.info("已创建并保存示例规则")
return repo
def demo_rule_execution():
"""演示规则执行"""
# 加载规则库
repo = load_example_rules()
# 创建规则执行引擎
executor = RuleExecutor(repo)
# 创建API调用器
api_caller = APICaller()
# 模拟API请求
https_request = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
http_request = APIRequest(
method="GET",
url="http://api.example.com/users/123",
headers={"Content-Type": "application/json"}
)
# 模拟API响应
success_response = APIResponse(
status_code=200,
headers={"Content-Type": "application/json"},
content=b'{"id": 123, "name": "Test User"}',
json_content={"id": 123, "name": "Test User"},
elapsed_time=0.2 # 200毫秒
)
slow_response = APIResponse(
status_code=200,
headers={"Content-Type": "application/json"},
content=b'{"id": 123, "name": "Test User"}',
json_content={"id": 123, "name": "Test User"},
elapsed_time=0.8 # 800毫秒
)
error_response = APIResponse(
status_code=404,
headers={"Content-Type": "application/json"},
content=b'{"code": "NOT_FOUND", "message": "User not found"}',
json_content={"code": "NOT_FOUND", "message": "User not found"},
elapsed_time=0.1
)
bad_error_response = APIResponse(
status_code=500,
headers={"Content-Type": "application/json"},
content=b'{"error": "Internal Server Error"}',
json_content={"error": "Internal Server Error"},
elapsed_time=0.1
)
# 测试场景1HTTPS请求 + 成功响应
logger.info("\n===== 测试场景1HTTPS请求 + 成功响应 =====")
context1 = {
"api_request": https_request,
"api_response": success_response
}
# 执行请求准备阶段的规则
logger.info("执行请求准备阶段的规则...")
prep_results = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context1)
for result in prep_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 执行响应验证阶段的规则
logger.info("执行响应验证阶段的规则...")
resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context1)
for result in resp_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试场景2HTTP请求非HTTPS+ 成功响应
logger.info("\n===== 测试场景2HTTP请求非HTTPS+ 成功响应 =====")
context2 = {
"api_request": http_request,
"api_response": success_response
}
# 执行请求准备阶段的规则
logger.info("执行请求准备阶段的规则...")
prep_results = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context2)
for result in prep_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试场景3HTTPS请求 + 慢响应
logger.info("\n===== 测试场景3HTTPS请求 + 慢响应 =====")
context3 = {
"api_request": https_request,
"api_response": slow_response
}
# 执行响应验证阶段的规则
logger.info("执行响应验证阶段的规则...")
resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context3)
for result in resp_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试场景4HTTPS请求 + 错误响应
logger.info("\n===== 测试场景4HTTPS请求 + 错误响应 =====")
context4 = {
"api_request": https_request,
"api_response": error_response
}
# 执行响应验证阶段的规则
logger.info("执行响应验证阶段的规则...")
resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context4)
for result in resp_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试场景5HTTPS请求 + 格式不正确的错误响应
logger.info("\n===== 测试场景5HTTPS请求 + 格式不正确的错误响应 =====")
context5 = {
"api_request": https_request,
"api_response": bad_error_response
}
# 执行响应验证阶段的规则
logger.info("执行响应验证阶段的规则...")
resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context5)
for result in resp_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试YAML规则
logger.info("\n===== 测试YAML规则 =====")
# 获取所有规则
all_rules = repo.query_rules()
yaml_rules = [rule for rule in all_rules if hasattr(rule, 'code') and rule.code]
if yaml_rules:
logger.info(f"发现 {len(yaml_rules)} 条YAML规则执行测试...")
for rule in yaml_rules:
logger.info(f"测试YAML规则: {rule.name}")
# 选择合适的上下文
if rule.target_type == TargetType.API_REQUEST:
context = {"api_request": https_request}
elif rule.target_type == TargetType.API_RESPONSE:
if rule.category == RuleCategory.PERFORMANCE:
context = {"api_response": slow_response}
elif rule.category == RuleCategory.ERROR_HANDLING:
context = {"api_response": error_response}
else:
context = {"api_response": success_response}
else:
context = {}
# 执行规则
result = executor.execute_rule(rule, context)
logger.info(f"结果: {'通过' if result.is_valid else '失败'} - {result.message}")
if result.details:
logger.info(f"详情: {json.dumps(result.details, ensure_ascii=False, indent=2)}")
else:
logger.info("未发现YAML规则跳过测试")
if __name__ == "__main__":
# 执行示例
demo_rule_execution()

View File

@ -1,416 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试编排器与规则库集成演示
此示例展示如何使用测试编排器和规则库执行API测试
"""
import os
import sys
import logging
import argparse
import json
import re
from pathlib import Path
# 添加项目根目录到Python路径
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator
from ddms_compliance_suite.models.rule_models import (
RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel
)
from ddms_compliance_suite.input_parser.parser import YAPIEndpoint
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def create_performance_rule():
"""创建性能规则"""
from ddms_compliance_suite.models.rule_models import PerformanceRule
return PerformanceRule(
id="response-time-max-500ms",
name="响应时间不超过500毫秒",
description="验证API响应时间不超过500毫秒",
category=RuleCategory.PERFORMANCE,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_TIME,
threshold=500.0,
metric="response_time",
unit="ms"
)
def create_security_rule():
"""创建安全规则"""
from ddms_compliance_suite.models.rule_models import SecurityRule
return SecurityRule(
id="https-only-rule",
name="HTTPS强制使用规则",
description="验证API请求是否使用了HTTPS协议",
category=RuleCategory.SECURITY,
severity=SeverityLevel.ERROR,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.SECURITY,
check_type="transport_security",
expected_value="https"
)
def create_restful_rule():
"""创建RESTful设计规则"""
from ddms_compliance_suite.models.rule_models import RESTfulDesignRule
return RESTfulDesignRule(
id="restful-url-pattern",
name="RESTful URL设计规则",
description="验证API URL是否符合RESTful设计规范",
category=RuleCategory.API_DESIGN,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.REQUEST_URL,
design_aspect="URL设计",
pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
)
def create_error_handling_rule():
"""创建错误处理规则"""
from ddms_compliance_suite.models.rule_models import ErrorHandlingRule
return ErrorHandlingRule(
id="standard-error-response",
name="标准错误响应格式规则",
description="验证API错误响应是否符合标准格式",
category=RuleCategory.ERROR_HANDLING,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_BODY,
error_code="*",
expected_status=400
)
def create_rules(orchestrator):
"""创建示例规则"""
logger.info("创建示例规则...")
# 使用工厂函数创建规则实例
performance_rule = create_performance_rule()
security_rule = create_security_rule()
restful_rule = create_restful_rule()
error_rule = create_error_handling_rule()
# 保存规则到规则库
orchestrator.rule_repo.save_rule(performance_rule)
orchestrator.rule_repo.save_rule(security_rule)
orchestrator.rule_repo.save_rule(restful_rule)
orchestrator.rule_repo.save_rule(error_rule)
logger.info("已创建示例规则")
return [performance_rule, security_rule, restful_rule, error_rule]
def demo_different_scenarios(orchestrator, rules):
"""演示不同测试场景标准场景、非HTTPS场景、非RESTful URL场景等"""
logger.info("开始演示不同测试场景...")
# 创建测试摘要
from ddms_compliance_suite.test_orchestrator import TestSummary
summary = TestSummary()
# 创建模拟API响应函数
def mock_api_response(status_code=200, content=None, elapsed_time=0.1):
from ddms_compliance_suite.api_caller.caller import APIResponse
if content is None:
content = {"id": 123, "name": "测试用户", "status": "active"}
return APIResponse(
status_code=status_code,
headers={"Content-Type": "application/json"},
content=json.dumps(content).encode('utf-8'),
elapsed_time=elapsed_time,
json_content=content
)
# 场景1标准响应所有规则应通过
logger.info("\n场景1: 标准响应 - 所有规则应通过")
endpoint1 = YAPIEndpoint(
path="/api/v1/users/123",
method="GET",
title="获取用户信息",
description="获取指定ID的用户信息"
)
# 手动构建API请求而不是通过orchestrator._build_api_request
from ddms_compliance_suite.api_caller.caller import APIRequest
request1 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
# 模拟API响应
response1 = mock_api_response()
# 创建测试上下文
context1 = {
'api_request': request1,
'api_response': response1,
'endpoint': endpoint1,
'endpoint_id': f"{endpoint1.method} {endpoint1.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context1
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context1
)
log_results(response_rule_results, "响应验证阶段")
# 场景2慢响应性能规则应失败
logger.info("\n场景2: 慢响应 - 性能规则应失败")
endpoint2 = YAPIEndpoint(
path="/api/v1/users/123",
method="GET",
title="获取用户信息",
description="获取指定ID的用户信息"
)
request2 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
# 模拟慢响应超过500毫秒
response2 = mock_api_response(elapsed_time=1.5)
# 创建测试上下文
context2 = {
'api_request': request2,
'api_response': response2,
'endpoint': endpoint2,
'endpoint_id': f"{endpoint2.method} {endpoint2.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context2
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context2
)
log_results(response_rule_results, "响应验证阶段")
# 场景3错误响应错误处理规则应通过
logger.info("\n场景3: 错误响应 - 错误处理规则应验证")
endpoint3 = YAPIEndpoint(
path="/api/v1/users/999",
method="GET",
title="获取不存在的用户信息",
description="获取不存在的用户信息应返回404"
)
request3 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/999",
headers={"Content-Type": "application/json"}
)
# 模拟错误响应
response3 = mock_api_response(
status_code=404,
content={"code": "USER_NOT_FOUND", "message": "用户不存在"}
)
# 创建测试上下文
context3 = {
'api_request': request3,
'api_response': response3,
'endpoint': endpoint3,
'endpoint_id': f"{endpoint3.method} {endpoint3.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context3
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context3
)
log_results(response_rule_results, "响应验证阶段")
# 场景4非HTTPS场景安全规则应失败
logger.info("\n场景4: 非HTTPS请求 - 安全规则应失败")
endpoint4 = YAPIEndpoint(
path="/api/v1/users/123",
method="GET",
title="获取用户信息",
description="获取指定ID的用户信息"
)
# 注意这里使用HTTP而不是HTTPS
request4 = APIRequest(
method="GET",
url="http://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
# 模拟响应
response4 = mock_api_response()
# 创建测试上下文
context4 = {
'api_request': request4,
'api_response': response4,
'endpoint': endpoint4,
'endpoint_id': f"{endpoint4.method} {endpoint4.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context4
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context4
)
log_results(response_rule_results, "响应验证阶段")
# 场景5非RESTful URL场景RESTful设计规则应失败
logger.info("\n场景5: 非RESTful URL请求 - RESTful设计规则应失败")
endpoint5 = YAPIEndpoint(
path="/getUserInfo",
method="GET",
title="获取用户信息",
description="获取指定ID的用户信息"
)
request5 = APIRequest(
method="GET",
url="https://api.example.com/getUserInfo?id=123",
headers={"Content-Type": "application/json"}
)
# 模拟响应
response5 = mock_api_response()
# 创建测试上下文
context5 = {
'api_request': request5,
'api_response': response5,
'endpoint': endpoint5,
'endpoint_id': f"{endpoint5.method} {endpoint5.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context5
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context5
)
log_results(response_rule_results, "响应验证阶段")
logger.info("\n演示完成")
def log_results(results, phase_name):
"""打印规则执行结果"""
total_rules = len(results)
passed_rules = sum(1 for r in results if r.is_valid)
logger.info(f"{phase_name}规则执行结果: {passed_rules}/{total_rules} 通过")
for result in results:
status = "通过" if result.is_valid else "失败"
logger.info(f"规则: {result.rule_name} (ID: {result.rule_id}) - 结果: {status} - 消息: {result.message}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='测试编排器与规则库集成演示')
parser.add_argument('--rules-path', default='./temp_rules',
help='规则库路径,默认为./temp_rules')
args = parser.parse_args()
# 创建临时规则目录
rules_dir = Path(args.rules_path)
rules_dir.mkdir(exist_ok=True, parents=True)
try:
# 初始化测试编排器
orchestrator = APITestOrchestrator(
base_url="https://api.example.com",
rule_repo_path=str(rules_dir)
)
# 创建示例规则
rules = create_rules(orchestrator)
# 演示不同场景
demo_different_scenarios(orchestrator, rules)
return 0
except Exception as e:
logger.error(f"演示过程中发生错误: {e}", exc_info=True)
return 1
finally:
# 清理规则目录
import shutil
if rules_dir.exists() and str(rules_dir).startswith('./temp'):
logger.info(f"清理临时规则目录: {rules_dir}")
shutil.rmtree(rules_dir)
if __name__ == "__main__":
sys.exit(main())