compliance/examples/test_rule_engine.py
2025-05-16 15:18:02 +08:00

282 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
规则库和规则执行引擎示例
本示例展示如何使用规则库和规则执行引擎来验证API请求和响应。
"""
import os
import sys
import json
import logging
from pathlib import Path
# 确保能够导入项目模块
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from ddms_compliance_suite.models.config_models import AppConfig, RuleRepositoryConfig
from ddms_compliance_suite.rule_repository.repository import RuleRepository
from ddms_compliance_suite.rule_executor.executor import RuleExecutor
from ddms_compliance_suite.api_caller.caller import APICaller, APIRequest, APIResponse
from ddms_compliance_suite.models.rule_models import (
RuleCategory, TargetType, RuleLifecycle, RuleScope,
PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule
)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def load_example_rules():
"""加载示例规则"""
logger.info("加载示例规则...")
# 创建规则库配置
repo_config = RuleRepositoryConfig(
storage={"type": "filesystem", "path": "./examples/rules"},
preload_rules=True
)
# 创建规则库
repo = RuleRepository(repo_config)
# 加载YAML文件中的规则
yaml_rules_loaded = len(repo.query_rules())
logger.info(f"从YAML文件加载了 {yaml_rules_loaded} 条规则")
# 创建几个示例规则对象并保存
if yaml_rules_loaded == 0:
# 性能规则示例
performance_rule = PerformanceRule(
id="response-time-max-500ms",
name="响应时间不超过500毫秒",
description="验证API响应时间不超过500毫秒",
category=RuleCategory.PERFORMANCE,
severity="warning",
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_TIME,
threshold=500,
metric="response_time",
unit="ms"
)
# 安全规则示例
security_rule = SecurityRule(
id="https-only",
name="仅允许HTTPS",
description="验证API请求仅使用HTTPS协议",
category=RuleCategory.SECURITY,
severity="error",
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.SECURITY,
check_type="transport_security",
expected_value="https"
)
# RESTful设计规则示例
design_rule = RESTfulDesignRule(
id="restful-url-design",
name="RESTful URL设计",
description="验证API URL是否符合RESTful设计规范",
category=RuleCategory.API_DESIGN,
severity="warning",
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.REQUEST_URL,
design_aspect="URL设计",
pattern="^/api/v\\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
)
# 错误处理规则示例
error_rule = ErrorHandlingRule(
id="standard-error-format",
name="标准错误格式",
description="验证API错误响应是否符合标准格式",
category=RuleCategory.ERROR_HANDLING,
severity="warning",
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_BODY,
error_code="*",
expected_status=-1
)
# 保存规则
repo.save_rule(performance_rule)
repo.save_rule(security_rule)
repo.save_rule(design_rule)
repo.save_rule(error_rule)
logger.info("已创建并保存示例规则")
return repo
def demo_rule_execution():
"""演示规则执行"""
# 加载规则库
repo = load_example_rules()
# 创建规则执行引擎
executor = RuleExecutor(repo)
# 创建API调用器
api_caller = APICaller()
# 模拟API请求
https_request = APIRequest(
method="GET",
url="https://api.example.com/api/v1/users/123",
headers={"Content-Type": "application/json"}
)
http_request = APIRequest(
method="GET",
url="http://api.example.com/users/123",
headers={"Content-Type": "application/json"}
)
# 模拟API响应
success_response = APIResponse(
status_code=200,
headers={"Content-Type": "application/json"},
content=b'{"id": 123, "name": "Test User"}',
json_content={"id": 123, "name": "Test User"},
elapsed_time=0.2 # 200毫秒
)
slow_response = APIResponse(
status_code=200,
headers={"Content-Type": "application/json"},
content=b'{"id": 123, "name": "Test User"}',
json_content={"id": 123, "name": "Test User"},
elapsed_time=0.8 # 800毫秒
)
error_response = APIResponse(
status_code=404,
headers={"Content-Type": "application/json"},
content=b'{"code": "NOT_FOUND", "message": "User not found"}',
json_content={"code": "NOT_FOUND", "message": "User not found"},
elapsed_time=0.1
)
bad_error_response = APIResponse(
status_code=500,
headers={"Content-Type": "application/json"},
content=b'{"error": "Internal Server Error"}',
json_content={"error": "Internal Server Error"},
elapsed_time=0.1
)
# 测试场景1HTTPS请求 + 成功响应
logger.info("\n===== 测试场景1HTTPS请求 + 成功响应 =====")
context1 = {
"api_request": https_request,
"api_response": success_response
}
# 执行请求准备阶段的规则
logger.info("执行请求准备阶段的规则...")
prep_results = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context1)
for result in prep_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 执行响应验证阶段的规则
logger.info("执行响应验证阶段的规则...")
resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context1)
for result in resp_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试场景2HTTP请求非HTTPS+ 成功响应
logger.info("\n===== 测试场景2HTTP请求非HTTPS+ 成功响应 =====")
context2 = {
"api_request": http_request,
"api_response": success_response
}
# 执行请求准备阶段的规则
logger.info("执行请求准备阶段的规则...")
prep_results = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context2)
for result in prep_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试场景3HTTPS请求 + 慢响应
logger.info("\n===== 测试场景3HTTPS请求 + 慢响应 =====")
context3 = {
"api_request": https_request,
"api_response": slow_response
}
# 执行响应验证阶段的规则
logger.info("执行响应验证阶段的规则...")
resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context3)
for result in resp_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试场景4HTTPS请求 + 错误响应
logger.info("\n===== 测试场景4HTTPS请求 + 错误响应 =====")
context4 = {
"api_request": https_request,
"api_response": error_response
}
# 执行响应验证阶段的规则
logger.info("执行响应验证阶段的规则...")
resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context4)
for result in resp_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试场景5HTTPS请求 + 格式不正确的错误响应
logger.info("\n===== 测试场景5HTTPS请求 + 格式不正确的错误响应 =====")
context5 = {
"api_request": https_request,
"api_response": bad_error_response
}
# 执行响应验证阶段的规则
logger.info("执行响应验证阶段的规则...")
resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context5)
for result in resp_results:
logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}")
# 测试YAML规则
logger.info("\n===== 测试YAML规则 =====")
# 获取所有规则
all_rules = repo.query_rules()
yaml_rules = [rule for rule in all_rules if hasattr(rule, 'code') and rule.code]
if yaml_rules:
logger.info(f"发现 {len(yaml_rules)} 条YAML规则执行测试...")
for rule in yaml_rules:
logger.info(f"测试YAML规则: {rule.name}")
# 选择合适的上下文
if rule.target_type == TargetType.API_REQUEST:
context = {"api_request": https_request}
elif rule.target_type == TargetType.API_RESPONSE:
if rule.category == RuleCategory.PERFORMANCE:
context = {"api_response": slow_response}
elif rule.category == RuleCategory.ERROR_HANDLING:
context = {"api_response": error_response}
else:
context = {"api_response": success_response}
else:
context = {}
# 执行规则
result = executor.execute_rule(rule, context)
logger.info(f"结果: {'通过' if result.is_valid else '失败'} - {result.message}")
if result.details:
logger.info(f"详情: {json.dumps(result.details, ensure_ascii=False, indent=2)}")
else:
logger.info("未发现YAML规则跳过测试")
if __name__ == "__main__":
# 执行示例
demo_rule_execution()