compliance/examples/test_with_rules_demo.py
2025-05-16 15:18:02 +08:00

416 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试编排器与规则库集成演示
此示例展示如何使用测试编排器和规则库执行API测试。
"""
import os
import sys
import logging
import argparse
import json
import re
from pathlib import Path
# 添加项目根目录到Python路径
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator
from ddms_compliance_suite.models.rule_models import (
RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel
)
from ddms_compliance_suite.input_parser.parser import YAPIEndpoint
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def create_performance_rule():
"""创建性能规则"""
from ddms_compliance_suite.models.rule_models import PerformanceRule
return PerformanceRule(
id="response-time-max-500ms",
name="响应时间不超过500毫秒",
description="验证API响应时间不超过500毫秒",
category=RuleCategory.PERFORMANCE,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_TIME,
threshold=500.0,
metric="response_time",
unit="ms"
)
def create_security_rule():
"""创建安全规则"""
from ddms_compliance_suite.models.rule_models import SecurityRule
return SecurityRule(
id="https-only-rule",
name="HTTPS强制使用规则",
description="验证API请求是否使用了HTTPS协议",
category=RuleCategory.SECURITY,
severity=SeverityLevel.ERROR,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.SECURITY,
check_type="transport_security",
expected_value="https"
)
def create_restful_rule():
"""创建RESTful设计规则"""
from ddms_compliance_suite.models.rule_models import RESTfulDesignRule
return RESTfulDesignRule(
id="restful-url-pattern",
name="RESTful URL设计规则",
description="验证API URL是否符合RESTful设计规范",
category=RuleCategory.API_DESIGN,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.REQUEST_URL,
design_aspect="URL设计",
pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
)
def create_error_handling_rule():
"""创建错误处理规则"""
from ddms_compliance_suite.models.rule_models import ErrorHandlingRule
return ErrorHandlingRule(
id="standard-error-response",
name="标准错误响应格式规则",
description="验证API错误响应是否符合标准格式",
category=RuleCategory.ERROR_HANDLING,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_BODY,
error_code="*",
expected_status=400
)
def create_rules(orchestrator):
"""创建示例规则"""
logger.info("创建示例规则...")
# 使用工厂函数创建规则实例
performance_rule = create_performance_rule()
security_rule = create_security_rule()
restful_rule = create_restful_rule()
error_rule = create_error_handling_rule()
# 保存规则到规则库
orchestrator.rule_repo.save_rule(performance_rule)
orchestrator.rule_repo.save_rule(security_rule)
orchestrator.rule_repo.save_rule(restful_rule)
orchestrator.rule_repo.save_rule(error_rule)
logger.info("已创建示例规则")
return [performance_rule, security_rule, restful_rule, error_rule]
def demo_different_scenarios(orchestrator, rules):
"""演示不同测试场景标准场景、非HTTPS场景、非RESTful URL场景等"""
logger.info("开始演示不同测试场景...")
# 创建测试摘要
from ddms_compliance_suite.test_orchestrator import TestSummary
summary = TestSummary()
# 创建模拟API响应函数
def mock_api_response(status_code=200, content=None, elapsed_time=0.1):
from ddms_compliance_suite.api_caller.caller import APIResponse
if content is None:
content = {"id": 123, "name": "测试用户", "status": "active"}
return APIResponse(
status_code=status_code,
headers={"Content-Type": "application/json"},
content=json.dumps(content).encode('utf-8'),
elapsed_time=elapsed_time,
json_content=content
)
# 场景1标准响应所有规则应通过
logger.info("\n场景1: 标准响应 - 所有规则应通过")
endpoint1 = YAPIEndpoint(
path="/api/v1/users/123",
method="GET",
title="获取用户信息",
description="获取指定ID的用户信息"
)
# 手动构建API请求而不是通过orchestrator._build_api_request
from ddms_compliance_suite.api_caller.caller import APIRequest
request1 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
# 模拟API响应
response1 = mock_api_response()
# 创建测试上下文
context1 = {
'api_request': request1,
'api_response': response1,
'endpoint': endpoint1,
'endpoint_id': f"{endpoint1.method} {endpoint1.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context1
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context1
)
log_results(response_rule_results, "响应验证阶段")
# 场景2慢响应性能规则应失败
logger.info("\n场景2: 慢响应 - 性能规则应失败")
endpoint2 = YAPIEndpoint(
path="/api/v1/users/123",
method="GET",
title="获取用户信息",
description="获取指定ID的用户信息"
)
request2 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
# 模拟慢响应超过500毫秒
response2 = mock_api_response(elapsed_time=1.5)
# 创建测试上下文
context2 = {
'api_request': request2,
'api_response': response2,
'endpoint': endpoint2,
'endpoint_id': f"{endpoint2.method} {endpoint2.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context2
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context2
)
log_results(response_rule_results, "响应验证阶段")
# 场景3错误响应错误处理规则应通过
logger.info("\n场景3: 错误响应 - 错误处理规则应验证")
endpoint3 = YAPIEndpoint(
path="/api/v1/users/999",
method="GET",
title="获取不存在的用户信息",
description="获取不存在的用户信息应返回404"
)
request3 = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/999",
headers={"Content-Type": "application/json"}
)
# 模拟错误响应
response3 = mock_api_response(
status_code=404,
content={"code": "USER_NOT_FOUND", "message": "用户不存在"}
)
# 创建测试上下文
context3 = {
'api_request': request3,
'api_response': response3,
'endpoint': endpoint3,
'endpoint_id': f"{endpoint3.method} {endpoint3.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context3
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context3
)
log_results(response_rule_results, "响应验证阶段")
# 场景4非HTTPS场景安全规则应失败
logger.info("\n场景4: 非HTTPS请求 - 安全规则应失败")
endpoint4 = YAPIEndpoint(
path="/api/v1/users/123",
method="GET",
title="获取用户信息",
description="获取指定ID的用户信息"
)
# 注意这里使用HTTP而不是HTTPS
request4 = APIRequest(
method="GET",
url="http://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
# 模拟响应
response4 = mock_api_response()
# 创建测试上下文
context4 = {
'api_request': request4,
'api_response': response4,
'endpoint': endpoint4,
'endpoint_id': f"{endpoint4.method} {endpoint4.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context4
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context4
)
log_results(response_rule_results, "响应验证阶段")
# 场景5非RESTful URL场景RESTful设计规则应失败
logger.info("\n场景5: 非RESTful URL请求 - RESTful设计规则应失败")
endpoint5 = YAPIEndpoint(
path="/getUserInfo",
method="GET",
title="获取用户信息",
description="获取指定ID的用户信息"
)
request5 = APIRequest(
method="GET",
url="https://api.example.com/getUserInfo?id=123",
headers={"Content-Type": "application/json"}
)
# 模拟响应
response5 = mock_api_response()
# 创建测试上下文
context5 = {
'api_request': request5,
'api_response': response5,
'endpoint': endpoint5,
'endpoint_id': f"{endpoint5.method} {endpoint5.path}"
}
# 执行规则验证
logger.info("执行请求准备阶段规则...")
request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
context=context5
)
log_results(request_rule_results, "请求准备阶段")
logger.info("执行响应验证阶段规则...")
response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle(
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
context=context5
)
log_results(response_rule_results, "响应验证阶段")
logger.info("\n演示完成")
def log_results(results, phase_name):
"""打印规则执行结果"""
total_rules = len(results)
passed_rules = sum(1 for r in results if r.is_valid)
logger.info(f"{phase_name}规则执行结果: {passed_rules}/{total_rules} 通过")
for result in results:
status = "通过" if result.is_valid else "失败"
logger.info(f"规则: {result.rule_name} (ID: {result.rule_id}) - 结果: {status} - 消息: {result.message}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='测试编排器与规则库集成演示')
parser.add_argument('--rules-path', default='./temp_rules',
help='规则库路径,默认为./temp_rules')
args = parser.parse_args()
# 创建临时规则目录
rules_dir = Path(args.rules_path)
rules_dir.mkdir(exist_ok=True, parents=True)
try:
# 初始化测试编排器
orchestrator = APITestOrchestrator(
base_url="https://api.example.com",
rule_repo_path=str(rules_dir)
)
# 创建示例规则
rules = create_rules(orchestrator)
# 演示不同场景
demo_different_scenarios(orchestrator, rules)
return 0
except Exception as e:
logger.error(f"演示过程中发生错误: {e}", exc_info=True)
return 1
finally:
# 清理规则目录
import shutil
if rules_dir.exists() and str(rules_dir).startswith('./temp'):
logger.info(f"清理临时规则目录: {rules_dir}")
shutil.rmtree(rules_dir)
if __name__ == "__main__":
sys.exit(main())