diff --git a/.coverage b/.coverage new file mode 100644 index 0000000..0bcd7f9 Binary files /dev/null and b/.coverage differ diff --git a/examples/rule_execution_demo.py b/examples/rule_execution_demo.py deleted file mode 100644 index 7306cf0..0000000 --- a/examples/rule_execution_demo.py +++ /dev/null @@ -1,755 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -规则执行演示脚本 - -本脚本演示如何使用增强后的规则库执行API测试,包括性能规则、安全规则、RESTful设计规则和错误处理规则的执行。 -脚本模拟了一系列API请求和响应场景,并使用规则执行引擎验证这些场景是否符合预定义的规则。 -""" - -import time -import json -import logging -import argparse -from dataclasses import dataclass, asdict -from typing import Dict, List, Any, Optional -import requests -from urllib.parse import urlparse - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - - -# ------------------ 模拟 API 调用相关类 ------------------ - -@dataclass -class APIRequest: - """API请求类,用于构建和发送API请求""" - method: str - url: str - headers: Optional[Dict[str, str]] = None - params: Optional[Dict[str, Any]] = None - json_data: Optional[Any] = None - body: Optional[Any] = None - data: Optional[Dict[str, Any]] = None - - -@dataclass -class APIResponse: - """API响应类,用于存储API响应结果""" - status_code: int - headers: Dict[str, str] - content: str - elapsed_time: float - json_content: Optional[Dict[str, Any]] = None - request: Optional[APIRequest] = None - - def __post_init__(self): - if self.content and (self.headers.get('Content-Type', '').startswith('application/json') or - self.content.strip().startswith('{')): - try: - self.json_content = json.loads(self.content) - except json.JSONDecodeError: - self.json_content = None - - -class APICaller: - """API调用器,用于发送API请求并获取响应""" - - def call_api(self, request: APIRequest) -> APIResponse: - """模拟调用API,返回响应结果""" - logger.info(f"发送 {request.method} 请求到 {request.url}") - - # 这里只是模拟,实际项目中应使用真实的HTTP请求 - # 根据URL创建不同的模拟响应 - start_time = time.time() - - # 模拟不同的API响应场景 - if "users" in request.url: - response = self._mock_user_api_response(request) - elif "slow" in request.url: - # 模拟慢响应 - time.sleep(0.6) # 模拟600ms响应时间 - response = self._mock_slow_api_response(request) - elif "error" in request.url: - # 模拟错误响应 - response = self._mock_error_api_response(request) - elif "http://" in request.url: - # 模拟HTTP响应(非HTTPS) - response = self._mock_http_api_response(request) - else: - # 默认响应 - response = self._mock_default_api_response(request) - - elapsed_time = time.time() - start_time - response.elapsed_time = elapsed_time - response.request = request - - return response - - def _mock_user_api_response(self, request: APIRequest) -> APIResponse: - """模拟用户API响应""" - headers = { - 'Content-Type': 'application/json', - 'X-Request-ID': '12345', - 'X-Rate-Limit-Remaining': '1000' - } - content = json.dumps({ - "id": 123, - "name": "Test User", - "email": "user@example.com", - "created_at": "2023-01-01T00:00:00Z", - "status": "active" - }) - return APIResponse( - status_code=200, - headers=headers, - content=content, - elapsed_time=0.1 - ) - - def _mock_slow_api_response(self, request: APIRequest) -> APIResponse: - """模拟慢响应API""" - headers = { - 'Content-Type': 'application/json', - 'X-Request-ID': '54321', - 'X-Rate-Limit-Remaining': '999' - } - content = json.dumps({ - "id": 456, - "name": "Test User", - "email": "slow@example.com", - "created_at": "2023-01-02T00:00:00Z", - "status": "active", - "data": [1, 2, 3, 4, 5] * 100 # 添加一些数据使响应大一些 - }) - return APIResponse( - status_code=200, - headers=headers, - content=content, - elapsed_time=0.6 # 模拟600ms响应时间 - ) - - def _mock_error_api_response(self, request: APIRequest) -> APIResponse: - """模拟错误响应API""" - headers = { - 'Content-Type': 'application/json', - 'X-Request-ID': '67890' - } - # 标准错误响应格式 - content = json.dumps({ - "code": "user_not_found", - "message": "User not found", - "details": { - "resource": "User", - "id": request.params.get("id", "unknown") if request.params else "unknown" - } - }) - return APIResponse( - status_code=404, - headers=headers, - content=content, - elapsed_time=0.05 - ) - - def _mock_http_api_response(self, request: APIRequest) -> APIResponse: - """模拟HTTP(非HTTPS)API响应""" - headers = { - 'Content-Type': 'application/json', - 'X-Request-ID': '13579' - } - content = json.dumps({ - "message": "This is an insecure HTTP response", - "timestamp": time.time() - }) - return APIResponse( - status_code=200, - headers=headers, - content=content, - elapsed_time=0.08 - ) - - def _mock_default_api_response(self, request: APIRequest) -> APIResponse: - """模拟默认API响应""" - headers = { - 'Content-Type': 'application/json', - 'X-Request-ID': '24680' - } - content = json.dumps({ - "message": "Default response", - "method": request.method, - "path": urlparse(request.url).path, - "timestamp": time.time() - }) - return APIResponse( - status_code=200, - headers=headers, - content=content, - elapsed_time=0.03 - ) - - -# ------------------ 规则模型类 ------------------ - -@dataclass -class RuleResult: - """规则执行结果类""" - rule_id: str - rule_name: str - is_valid: bool - message: str - details: Optional[Dict[str, Any]] = None - category: Optional[str] = None - severity: Optional[str] = None - - -class RuleCategory: - """规则类别枚举""" - PERFORMANCE = "Performance" - SECURITY = "Security" - API_DESIGN = "APIDesign" - ERROR_HANDLING = "ErrorHandling" - GENERAL = "General" - - -class RuleLifecycle: - """规则生命周期枚举""" - REQUEST_PREPARATION = "RequestPreparation" - REQUEST_EXECUTION = "RequestExecution" - RESPONSE_VALIDATION = "ResponseValidation" - POST_VALIDATION = "PostValidation" - ANY_STAGE = "AnyStage" - - -class RuleScope: - """规则作用域枚举""" - REQUEST_URL = "RequestURL" - REQUEST_HEADERS = "RequestHeaders" - REQUEST_PARAMS = "RequestParams" - REQUEST_BODY = "RequestBody" - RESPONSE_STATUS = "ResponseStatus" - RESPONSE_HEADERS = "ResponseHeaders" - RESPONSE_BODY = "ResponseBody" - RESPONSE_TIME = "ResponseTime" - SECURITY = "Security" - PERFORMANCE = "Performance" - ANY_SCOPE = "AnyScope" - - -class TargetType: - """规则目标类型枚举""" - API_REQUEST = "APIRequest" - API_RESPONSE = "APIResponse" - ANY = "Any" - - -@dataclass -class BaseRule: - """基础规则类""" - id: str - name: str - description: str - category: str = RuleCategory.GENERAL - version: str = "1.0.0" - severity: str = "warning" - is_enabled: bool = True - tags: List[str] = None - target_type: str = TargetType.ANY - lifecycle: str = RuleLifecycle.ANY_STAGE - scope: str = RuleScope.ANY_SCOPE - - def __post_init__(self): - if self.tags is None: - self.tags = [] - - def to_dict(self) -> Dict[str, Any]: - """将规则转换为字典""" - return asdict(self) - - def validate(self, context: Dict[str, Any]) -> Dict[str, Any]: - """验证逻辑,由子类实现""" - raise NotImplementedError("子类必须实现validate方法") - - -@dataclass -class PerformanceRule(BaseRule): - """性能规则类""" - category: str = RuleCategory.PERFORMANCE - target_type: str = TargetType.API_RESPONSE - lifecycle: str = RuleLifecycle.RESPONSE_VALIDATION - scope: str = RuleScope.RESPONSE_TIME - threshold: float = 500.0 # 默认阈值500毫秒 - metric: str = "response_time" - unit: str = "ms" - - def validate(self, context: Dict[str, Any]) -> Dict[str, Any]: - """验证API响应时间是否超过阈值""" - response = context.get('api_response') - if not response: - return { - 'is_valid': False, - 'message': '缺少API响应对象', - 'details': {} - } - - response_time = response.elapsed_time * 1000 # 转换为毫秒 - - if response_time > self.threshold: - return { - 'is_valid': False, - 'message': f'响应时间 {response_time:.2f}ms 超过阈值 {self.threshold}ms', - 'details': { - 'actual_time': response_time, - 'threshold': self.threshold, - 'unit': self.unit - } - } - - return { - 'is_valid': True, - 'message': f'响应时间 {response_time:.2f}ms 在阈值 {self.threshold}ms 内', - 'details': { - 'actual_time': response_time, - 'threshold': self.threshold, - 'unit': self.unit - } - } - - -@dataclass -class SecurityRule(BaseRule): - """安全规则类""" - category: str = RuleCategory.SECURITY - target_type: str = TargetType.API_REQUEST - lifecycle: str = RuleLifecycle.REQUEST_PREPARATION - scope: str = RuleScope.SECURITY - check_type: str = "transport_security" # 传输安全检查 - expected_value: str = "https" # 期望值为https - - def validate(self, context: Dict[str, Any]) -> Dict[str, Any]: - """验证API请求是否使用了HTTPS协议""" - request = context.get('api_request') - if not request: - return { - 'is_valid': False, - 'message': '缺少API请求对象', - 'details': {} - } - - url = str(request.url) - - if not url.startswith('https://'): - return { - 'is_valid': False, - 'message': 'API请求必须使用HTTPS协议', - 'details': { - 'current_url': url, - 'expected_protocol': 'https' - } - } - - return { - 'is_valid': True, - 'message': 'API请求使用了HTTPS协议', - 'details': { - 'url': url - } - } - - -@dataclass -class RESTfulDesignRule(BaseRule): - """RESTful设计规则类""" - category: str = RuleCategory.API_DESIGN - target_type: str = TargetType.API_REQUEST - lifecycle: str = RuleLifecycle.REQUEST_PREPARATION - scope: str = RuleScope.REQUEST_URL - design_aspect: str = "URL设计" - pattern: str = r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$" - - def validate(self, context: Dict[str, Any]) -> Dict[str, Any]: - """验证API URL是否符合RESTful设计规范""" - import re - - request = context.get('api_request') - if not request: - return { - 'is_valid': False, - 'message': '缺少API请求对象', - 'details': {} - } - - url = str(request.url) - - # 解析URL,获取路径部分 - parsed_url = urlparse(url) - path = parsed_url.path - - # 使用正则表达式验证路径 - if not re.match(self.pattern, path): - return { - 'is_valid': False, - 'message': 'API URL不符合RESTful设计规范', - 'details': { - 'current_path': path, - 'expected_pattern': self.pattern, - 'suggestion': '路径应该遵循 /api/v{version}/{资源}[/{id}] 格式' - } - } - - return { - 'is_valid': True, - 'message': 'API URL符合RESTful设计规范', - 'details': { - 'path': path - } - } - - -@dataclass -class ErrorHandlingRule(BaseRule): - """错误处理规则类""" - category: str = RuleCategory.ERROR_HANDLING - target_type: str = TargetType.API_RESPONSE - lifecycle: str = RuleLifecycle.RESPONSE_VALIDATION - scope: str = RuleScope.RESPONSE_BODY - error_code: str = "*" # 匹配所有错误码 - expected_status: int = -1 # 不验证状态码 - - def validate(self, context: Dict[str, Any]) -> Dict[str, Any]: - """验证API错误响应是否符合标准格式""" - response = context.get('api_response') - if not response: - return { - 'is_valid': False, - 'message': '缺少API响应对象', - 'details': {} - } - - # 只检查4xx和5xx状态码的响应 - if response.status_code < 400: - return { - 'is_valid': True, - 'message': '非错误响应,跳过验证', - 'details': { - 'status_code': response.status_code - } - } - - # 确保响应包含JSON内容 - if not response.json_content: - return { - 'is_valid': False, - 'message': '错误响应不是有效的JSON格式', - 'details': { - 'status_code': response.status_code, - 'content_type': response.headers.get('Content-Type', '未知') - } - } - - # 检查错误响应的必要字段 - required_fields = ['code', 'message'] - missing_fields = [field for field in required_fields if field not in response.json_content] - - if missing_fields: - return { - 'is_valid': False, - 'message': '错误响应缺少必要字段', - 'details': { - 'missing_fields': missing_fields, - 'required_fields': required_fields, - 'response': response.json_content - } - } - - return { - 'is_valid': True, - 'message': '错误响应符合标准格式', - 'details': { - 'status_code': response.status_code, - 'error_code': response.json_content.get('code'), - 'error_message': response.json_content.get('message') - } - } - - -# ------------------ 规则执行引擎 ------------------ - -class RuleExecutor: - """规则执行引擎""" - - def __init__(self, rules: List[BaseRule] = None): - self.rules = rules or [] - - def add_rule(self, rule: BaseRule): - """添加规则""" - self.rules.append(rule) - - def execute_rule(self, rule: BaseRule, context: Dict[str, Any]) -> RuleResult: - """执行单个规则""" - logger.info(f"执行规则 '{rule.name}' (ID: {rule.id})") - - try: - result = rule.validate(context) - - # 构建规则执行结果 - rule_result = RuleResult( - rule_id=rule.id, - rule_name=rule.name, - is_valid=result.get('is_valid', False), - message=result.get('message', ''), - details=result.get('details', {}), - category=rule.category, - severity=rule.severity - ) - - # 记录执行结果 - log_level = logging.INFO if rule_result.is_valid else logging.WARNING - logger.log(log_level, f"规则 '{rule.name}' 结果: {'通过' if rule_result.is_valid else '失败'} - {rule_result.message}") - - return rule_result - - except Exception as e: - # 记录执行异常 - logger.error(f"执行规则 '{rule.name}' 时发生异常: {str(e)}", exc_info=True) - - # 返回异常结果 - return RuleResult( - rule_id=rule.id, - rule_name=rule.name, - is_valid=False, - message=f"规则执行异常: {str(e)}", - details={'exception': str(e)}, - category=rule.category, - severity=rule.severity - ) - - def execute_rules_for_lifecycle(self, lifecycle: str, context: Dict[str, Any]) -> List[RuleResult]: - """执行特定生命周期阶段的所有规则""" - logger.info(f"执行 {lifecycle} 阶段的规则") - - # 筛选符合生命周期的规则 - lifecycle_rules = [rule for rule in self.rules if rule.lifecycle == lifecycle or rule.lifecycle == RuleLifecycle.ANY_STAGE] - - # 执行筛选出的规则 - results = [] - for rule in lifecycle_rules: - result = self.execute_rule(rule, context) - results.append(result) - - # 统计执行结果 - passed = sum(1 for result in results if result.is_valid) - failed = len(results) - passed - logger.info(f"{lifecycle} 阶段规则执行完成: {passed} 条通过, {failed} 条失败") - - return results - - def execute_all_rules(self, context: Dict[str, Any]) -> List[RuleResult]: - """执行所有规则""" - logger.info(f"执行所有规则") - - results = [] - for rule in self.rules: - result = self.execute_rule(rule, context) - results.append(result) - - # 统计执行结果 - passed = sum(1 for result in results if result.is_valid) - failed = len(results) - passed - logger.info(f"所有规则执行完成: {passed} 条通过, {failed} 条失败") - - return results - - -# ------------------ 演示函数 ------------------ - -def create_demo_rules() -> List[BaseRule]: - """创建演示用的规则""" - rules = [] - - # 添加性能规则 - performance_rule = PerformanceRule( - id="response-time-max-500ms", - name="响应时间不超过500毫秒", - description="验证API响应时间不超过500毫秒", - threshold=500, - metric="response_time", - unit="ms" - ) - rules.append(performance_rule) - - # 添加安全规则 - security_rule = SecurityRule( - id="https-only-rule", - name="HTTPS强制使用规则", - description="验证API是否只使用HTTPS协议", - severity="error", - check_type="transport_security", - expected_value="https" - ) - rules.append(security_rule) - - # 添加RESTful设计规则 - restful_rule = RESTfulDesignRule( - id="restful-url-pattern", - name="RESTful URL设计规则", - description="验证API URL是否符合RESTful设计规范", - pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$" - ) - rules.append(restful_rule) - - # 添加错误处理规则 - error_rule = ErrorHandlingRule( - id="standard-error-response", - name="标准错误响应格式规则", - description="验证API错误响应是否符合标准格式", - error_code="*" - ) - rules.append(error_rule) - - return rules - - -def demo_rule_execution(): - """演示规则执行过程""" - logger.info("开始规则执行演示") - - # 创建规则 - rules = create_demo_rules() - - # 创建规则执行引擎 - executor = RuleExecutor(rules) - - # 创建API调用器 - api_caller = APICaller() - - # === 场景1: 正常响应场景 === - logger.info("\n=== 场景1: 正常响应场景 ===") - - # 创建API请求 - request1 = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/123", - headers={"Content-Type": "application/json"} - ) - - # 执行请求准备阶段的规则 - context1 = {"api_request": request1} - prep_results1 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context1) - - # 发送API请求 - response1 = api_caller.call_api(request1) - - # 执行响应验证阶段的规则 - context1["api_response"] = response1 - resp_results1 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context1) - - # === 场景2: 慢响应场景 === - logger.info("\n=== 场景2: 慢响应场景 ===") - - # 创建API请求 - request2 = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/slow", - headers={"Content-Type": "application/json"} - ) - - # 执行请求准备阶段的规则 - context2 = {"api_request": request2} - prep_results2 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context2) - - # 发送API请求 - response2 = api_caller.call_api(request2) - - # 执行响应验证阶段的规则 - context2["api_response"] = response2 - resp_results2 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context2) - - # === 场景3: 错误响应场景 === - logger.info("\n=== 场景3: 错误响应场景 ===") - - # 创建API请求 - request3 = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/error", - headers={"Content-Type": "application/json"}, - params={"id": "999"} - ) - - # 执行请求准备阶段的规则 - context3 = {"api_request": request3} - prep_results3 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context3) - - # 发送API请求 - response3 = api_caller.call_api(request3) - - # 执行响应验证阶段的规则 - context3["api_response"] = response3 - resp_results3 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context3) - - # === 场景4: 非HTTPS场景 === - logger.info("\n=== 场景4: 非HTTPS场景 ===") - - # 创建API请求 - request4 = APIRequest( - method="GET", - url="http://api.example.com/api/v1/users/123", - headers={"Content-Type": "application/json"} - ) - - # 执行请求准备阶段的规则 - context4 = {"api_request": request4} - prep_results4 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context4) - - # 发送API请求 - response4 = api_caller.call_api(request4) - - # 执行响应验证阶段的规则 - context4["api_response"] = response4 - resp_results4 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context4) - - # === 场景5: 不符合RESTful设计的URL === - logger.info("\n=== 场景5: 不符合RESTful设计的URL ===") - - # 创建API请求 - request5 = APIRequest( - method="GET", - url="https://api.example.com/getUserInfo?id=123", - headers={"Content-Type": "application/json"} - ) - - # 执行请求准备阶段的规则 - context5 = {"api_request": request5} - prep_results5 = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context5) - - # 发送API请求 - response5 = api_caller.call_api(request5) - - # 执行响应验证阶段的规则 - context5["api_response"] = response5 - resp_results5 = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context5) - - logger.info("\n演示完成") - - -def main(): - """主函数""" - parser = argparse.ArgumentParser(description='规则执行演示脚本') - parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', - help='日志级别') - args = parser.parse_args() - - # 设置日志级别 - logging.getLogger().setLevel(getattr(logging, args.log_level)) - - # 执行演示 - demo_rule_execution() - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/examples/rule_execution_memory_demo.py b/examples/rule_execution_memory_demo.py deleted file mode 100644 index bbee942..0000000 --- a/examples/rule_execution_memory_demo.py +++ /dev/null @@ -1,339 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -规则执行演示脚本(内存版本) - -此示例演示如何在内存中创建规则并执行,不依赖文件系统存储。 -""" - -import sys -import logging -import json -import time -import re -from pathlib import Path -from typing import Dict, Any, List - -# 添加项目根目录到Python路径 -sys.path.insert(0, str(Path(__file__).resolve().parents[1])) - -from ddms_compliance_suite.models.rule_models import ( - BaseRule, PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule, - RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel -) -from ddms_compliance_suite.rule_executor.executor import RuleExecutor, RuleExecutionResult -from ddms_compliance_suite.api_caller.caller import APIRequest, APIResponse - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) - -logger = logging.getLogger(__name__) - -class MockRuleRepository: - """ - 模拟规则库,用于在内存中存储规则而不是文件系统 - """ - def __init__(self): - self.rules: Dict[str, BaseRule] = {} - - def save_rule(self, rule: BaseRule) -> bool: - """保存规则到内存""" - self.rules[rule.id] = rule - return True - - def load_rule_by_id(self, rule_id: str, version: str = None) -> BaseRule: - """从内存加载规则""" - return self.rules.get(rule_id) - - def get_rules_by_lifecycle(self, lifecycle: RuleLifecycle, target_type: TargetType = None) -> List[BaseRule]: - """获取指定生命周期阶段的规则""" - results = [] - for rule in self.rules.values(): - if rule.lifecycle == lifecycle: - if target_type is None or rule.target_type == target_type: - results.append(rule) - return results - - def get_rules_for_target(self, target_type: TargetType, target_id: str) -> List[BaseRule]: - """获取适用于特定目标的规则""" - results = [] - for rule in self.rules.values(): - if rule.target_type == target_type: - results.append(rule) - return results - -def create_mock_rules() -> List[BaseRule]: - """创建测试用的规则""" - rules = [] - - # 1. 性能规则 - 响应时间不超过500毫秒 - performance_rule = PerformanceRule( - id="response-time-max-500ms", - name="响应时间不超过500毫秒", - description="验证API响应时间不超过500毫秒", - category=RuleCategory.PERFORMANCE, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_TIME, - threshold=500.0, - metric="response_time", - unit="ms" - ) - rules.append(performance_rule) - - # 2. 安全规则 - HTTPS强制使用 - security_rule = SecurityRule( - id="https-only-rule", - name="HTTPS强制使用规则", - description="验证API请求是否使用了HTTPS协议", - category=RuleCategory.SECURITY, - severity=SeverityLevel.ERROR, - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.SECURITY, - check_type="transport_security", - expected_value="https" - ) - rules.append(security_rule) - - # 3. RESTful设计规则 - URL路径格式 - restful_rule = RESTfulDesignRule( - id="restful-url-pattern", - name="RESTful URL设计规则", - description="验证API URL是否符合RESTful设计规范", - category=RuleCategory.API_DESIGN, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.REQUEST_URL, - design_aspect="URL设计", - pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$" - ) - rules.append(restful_rule) - - # 4. 错误处理规则 - 错误响应格式 - error_rule = ErrorHandlingRule( - id="standard-error-response", - name="标准错误响应格式规则", - description="验证API错误响应是否符合标准格式", - category=RuleCategory.ERROR_HANDLING, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_BODY, - error_code="*", - expected_status=400 - ) - rules.append(error_rule) - - return rules - -def log_rule_execution_results(results: List[RuleExecutionResult], phase: str): - """记录规则执行结果""" - total_count = len(results) - passed_count = sum(1 for r in results if r.is_valid) - - logger.info(f"=== {phase} 规则执行结果: {passed_count}/{total_count} 通过 ===") - - for result in results: - status = "通过" if result.is_valid else "失败" - logger.info(f"规则: {result.rule_name} (ID: {result.rule_id}) - 结果: {status} - 消息: {result.message}") - -def mock_api_response(status_code=200, content=None, elapsed_time=0.1): - """创建模拟API响应""" - if content is None: - content = {"id": 123, "name": "测试用户", "status": "active"} - - return APIResponse( - status_code=status_code, - headers={"Content-Type": "application/json"}, - content=json.dumps(content).encode('utf-8'), - elapsed_time=elapsed_time, - json_content=content - ) - -def demo_rule_execution(): - """演示规则执行""" - # 创建模拟规则库和规则 - repo = MockRuleRepository() - rules = create_mock_rules() - - # 将规则添加到规则库 - for rule in rules: - repo.save_rule(rule) - - # 创建规则执行器 - executor = RuleExecutor(repo) - - # 演示场景1: 标准场景 - 所有规则应通过 - logger.info("\n========== 场景1: 标准场景 - 所有规则应通过 ==========") - request1 = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/123", - headers={"Content-Type": "application/json"} - ) - response1 = mock_api_response() - - # 创建上下文 - context1 = { - 'api_request': request1, - 'api_response': response1, - 'endpoint_id': 'GET /api/v1/users/123' - } - - # 执行请求准备阶段规则 - logger.info("执行请求准备阶段规则...") - request_results1 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context1 - ) - log_rule_execution_results(request_results1, "请求准备阶段") - - # 执行响应验证阶段规则 - logger.info("执行响应验证阶段规则...") - response_results1 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context1 - ) - log_rule_execution_results(response_results1, "响应验证阶段") - - # 演示场景2: 慢响应 - 性能规则应失败 - logger.info("\n========== 场景2: 慢响应 - 性能规则应失败 ==========") - request2 = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/123", - headers={"Content-Type": "application/json"} - ) - # 创建一个响应时间为1.5秒的慢响应 - response2 = mock_api_response(elapsed_time=1.5) - - # 创建上下文 - context2 = { - 'api_request': request2, - 'api_response': response2, - 'endpoint_id': 'GET /api/v1/users/123' - } - - # 执行请求准备阶段规则 - logger.info("执行请求准备阶段规则...") - request_results2 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context2 - ) - log_rule_execution_results(request_results2, "请求准备阶段") - - # 执行响应验证阶段规则 - logger.info("执行响应验证阶段规则...") - response_results2 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context2 - ) - log_rule_execution_results(response_results2, "响应验证阶段") - - # 演示场景3: 错误响应 - 错误处理规则应验证 - logger.info("\n========== 场景3: 错误响应 - 错误处理规则应验证 ==========") - request3 = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/999", - headers={"Content-Type": "application/json"} - ) - # 创建一个404错误响应 - response3 = mock_api_response( - status_code=404, - content={"code": "USER_NOT_FOUND", "message": "用户不存在"} - ) - - # 创建上下文 - context3 = { - 'api_request': request3, - 'api_response': response3, - 'endpoint_id': 'GET /api/v1/users/999' - } - - # 执行请求准备阶段规则 - logger.info("执行请求准备阶段规则...") - request_results3 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context3 - ) - log_rule_execution_results(request_results3, "请求准备阶段") - - # 执行响应验证阶段规则 - logger.info("执行响应验证阶段规则...") - response_results3 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context3 - ) - log_rule_execution_results(response_results3, "响应验证阶段") - - # 演示场景4: 非HTTPS请求 - 安全规则应失败 - logger.info("\n========== 场景4: 非HTTPS请求 - 安全规则应失败 ==========") - request4 = APIRequest( - method="GET", - url="http://api.example.com/api/v1/users/123", # 使用HTTP而不是HTTPS - headers={"Content-Type": "application/json"} - ) - response4 = mock_api_response() - - # 创建上下文 - context4 = { - 'api_request': request4, - 'api_response': response4, - 'endpoint_id': 'GET /api/v1/users/123' - } - - # 执行请求准备阶段规则 - logger.info("执行请求准备阶段规则...") - request_results4 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context4 - ) - log_rule_execution_results(request_results4, "请求准备阶段") - - # 执行响应验证阶段规则 - logger.info("执行响应验证阶段规则...") - response_results4 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context4 - ) - log_rule_execution_results(response_results4, "响应验证阶段") - - # 演示场景5: 非RESTful URL - RESTful设计规则应失败 - logger.info("\n========== 场景5: 非RESTful URL - RESTful设计规则应失败 ==========") - request5 = APIRequest( - method="GET", - url="https://api.example.com/getUserInfo?id=123", # 非RESTful URL - headers={"Content-Type": "application/json"} - ) - response5 = mock_api_response() - - # 创建上下文 - context5 = { - 'api_request': request5, - 'api_response': response5, - 'endpoint_id': 'GET /getUserInfo' - } - - # 执行请求准备阶段规则 - logger.info("执行请求准备阶段规则...") - request_results5 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context5 - ) - log_rule_execution_results(request_results5, "请求准备阶段") - - # 执行响应验证阶段规则 - logger.info("执行响应验证阶段规则...") - response_results5 = executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context5 - ) - log_rule_execution_results(response_results5, "响应验证阶段") - -if __name__ == "__main__": - demo_rule_execution() \ No newline at end of file diff --git a/examples/rule_repository_demo.py b/examples/rule_repository_demo.py deleted file mode 100644 index 1baf974..0000000 --- a/examples/rule_repository_demo.py +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -规则库演示脚本 - -此示例演示如何使用规则库创建、保存和加载规则。 -""" - -import sys -import logging -import json -import shutil -import tempfile -from pathlib import Path -from typing import Dict, Any, List - -# 添加项目根目录到Python路径 -sys.path.insert(0, str(Path(__file__).resolve().parents[1])) - -from ddms_compliance_suite.models.rule_models import ( - BaseRule, PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule, - RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel -) -from ddms_compliance_suite.models.config_models import RuleRepositoryConfig, RuleStorageConfig -from ddms_compliance_suite.rule_repository.repository import RuleRepository - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) - -logger = logging.getLogger(__name__) - -def create_test_rules() -> List[BaseRule]: - """创建测试用的规则""" - rules = [] - - # 1. 性能规则 - 响应时间不超过500毫秒 - performance_rule = PerformanceRule( - id="response-time-max-500ms", - name="响应时间不超过500毫秒", - description="验证API响应时间不超过500毫秒", - category=RuleCategory.PERFORMANCE, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_TIME, - threshold=500.0, - metric="response_time", - unit="ms" - ) - rules.append(performance_rule) - - # 2. 安全规则 - HTTPS强制使用 - security_rule = SecurityRule( - id="https-only-rule", - name="HTTPS强制使用规则", - description="验证API请求是否使用了HTTPS协议", - category=RuleCategory.SECURITY, - severity=SeverityLevel.ERROR, - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.SECURITY, - check_type="transport_security", - expected_value="https" - ) - rules.append(security_rule) - - # 3. RESTful设计规则 - URL路径格式 - restful_rule = RESTfulDesignRule( - id="restful-url-pattern", - name="RESTful URL设计规则", - description="验证API URL是否符合RESTful设计规范", - category=RuleCategory.API_DESIGN, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.REQUEST_URL, - design_aspect="URL设计", - pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$" - ) - rules.append(restful_rule) - - # 4. 错误处理规则 - 错误响应格式 - error_rule = ErrorHandlingRule( - id="standard-error-response", - name="标准错误响应格式规则", - description="验证API错误响应是否符合标准格式", - category=RuleCategory.ERROR_HANDLING, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_BODY, - error_code="*", - expected_status=400 - ) - rules.append(error_rule) - - return rules - -def test_rule_repository(): - """演示规则库的使用""" - # 创建临时目录用于存储规则 - temp_dir = tempfile.mkdtemp(prefix="rule_repo_demo_") - logger.info(f"使用临时目录: {temp_dir}") - - try: - # 创建规则库配置 - config = RuleRepositoryConfig( - storage=RuleStorageConfig( - type="filesystem", - path=temp_dir - ), - preload_rules=True - ) - - # 创建规则库 - repo = RuleRepository(config) - logger.info("规则库初始化完成") - - # 创建测试规则并保存到规则库 - rules = create_test_rules() - for rule in rules: - saved = repo.save_rule(rule) - logger.info(f"保存规则 {rule.id} ({rule.__class__.__name__}): {'成功' if saved else '失败'}") - - # 从规则库加载规则 - logger.info("\n加载规则进行验证:") - for rule_id in [r.id for r in rules]: - loaded_rule = repo.get_rule(rule_id) - if loaded_rule: - logger.info(f"加载规则 {rule_id}: 成功, 类型: {loaded_rule.__class__.__name__}") - # 检查类型特定属性 - if isinstance(loaded_rule, PerformanceRule): - logger.info(f" - 性能阈值: {loaded_rule.threshold} {loaded_rule.unit}") - elif isinstance(loaded_rule, SecurityRule): - logger.info(f" - 安全检查类型: {loaded_rule.check_type}, 预期值: {loaded_rule.expected_value}") - elif isinstance(loaded_rule, RESTfulDesignRule): - logger.info(f" - URL模式: {loaded_rule.pattern}") - elif isinstance(loaded_rule, ErrorHandlingRule): - logger.info(f" - 错误码: {loaded_rule.error_code}, 预期状态码: {loaded_rule.expected_status}") - else: - logger.error(f"加载规则 {rule_id}: 失败") - - # 按生命周期查询规则 - logger.info("\n按生命周期查询规则:") - for lifecycle in [RuleLifecycle.REQUEST_PREPARATION, RuleLifecycle.RESPONSE_VALIDATION]: - rules_for_lifecycle = repo.get_rules_by_lifecycle(lifecycle) - logger.info(f"生命周期 {lifecycle.value} 的规则数量: {len(rules_for_lifecycle)}") - for rule in rules_for_lifecycle: - logger.info(f" - {rule.id} ({rule.__class__.__name__})") - - finally: - # 清理临时目录 - logger.info(f"清理临时目录: {temp_dir}") - shutil.rmtree(temp_dir) - -if __name__ == "__main__": - test_rule_repository() \ No newline at end of file diff --git a/examples/rules/authentication_headers_rule.yaml b/examples/rules/authentication_headers_rule.yaml deleted file mode 100644 index 20ec86c..0000000 --- a/examples/rules/authentication_headers_rule.yaml +++ /dev/null @@ -1,99 +0,0 @@ -id: authentication-headers-required -name: API认证头部要求规则 -description: 验证API请求是否包含必要的认证头部,如Authorization、X-API-Key等 -category: Security -version: 1.0.0 -severity: error -is_enabled: true -tags: - - security - - authentication - - headers - - authorization -target_type: APIRequest -lifecycle: RequestPreparation -scope: RequestHeaders -required_headers: - - name: Authorization - pattern: "^(Bearer|Basic|Digest) [A-Za-z0-9\\-\\._~\\+\\/]+=*$" - description: 认证令牌,格式应为"Bearer token"、"Basic base64"或"Digest value" - - name: X-API-Key - pattern: "^[A-Za-z0-9]{32,64}$" - description: API密钥,应为32-64位的字母数字字符 - - name: X-Request-ID - pattern: "^[A-Za-z0-9\\-]{8,36}$" - description: 请求ID,用于跟踪请求,应为8-36位的字母数字和连字符 -check_mode: any # 可以是"all"或"any",表示是需要满足所有头部还是任一头部 -code: | - def validate(context): - """验证API请求是否包含必要的认证头部""" - request = context.get('api_request') - if not request: - return {'is_valid': False, 'message': '缺少API请求对象'} - - headers = request.headers or {} - required_headers = context.get('required_headers', []) - check_mode = context.get('check_mode', 'any') - - # 获取缺失的头部和无效的头部 - missing_headers = [] - invalid_headers = [] - valid_headers = [] - - import re - - for header in required_headers: - header_name = header.get('name') - header_pattern = header.get('pattern') - - if header_name not in headers: - missing_headers.append({ - 'name': header_name, - 'description': header.get('description', '') - }) - continue - - header_value = headers[header_name] - - # 如果指定了模式,验证头部值是否符合模式 - if header_pattern and not re.match(header_pattern, header_value): - invalid_headers.append({ - 'name': header_name, - 'value': header_value, - 'pattern': header_pattern, - 'description': header.get('description', '') - }) - else: - valid_headers.append(header_name) - - # 根据检查模式判断是否验证通过 - if check_mode == 'all': - # 需要所有头部都存在且有效 - is_valid = not missing_headers and not invalid_headers - else: # 'any' - # 至少有一个有效的头部即可 - is_valid = len(valid_headers) > 0 - - if is_valid: - message = '请求包含有效的认证头部' - if check_mode == 'any': - message += f": {', '.join(valid_headers)}" - else: - if check_mode == 'all': - if missing_headers: - message = f"请求缺少必要的认证头部: {', '.join([h['name'] for h in missing_headers])}" - else: - message = f"请求包含无效的认证头部: {', '.join([h['name'] for h in invalid_headers])}" - else: # 'any' - message = f"请求不包含任何有效的认证头部,至少需要其中之一: {', '.join([h['name'] for h in required_headers])}" - - return { - 'is_valid': is_valid, - 'message': message, - 'details': { - 'check_mode': check_mode, - 'valid_headers': valid_headers, - 'missing_headers': missing_headers, - 'invalid_headers': invalid_headers - } - } \ No newline at end of file diff --git a/examples/rules/error_response_rule.yaml b/examples/rules/error_response_rule.yaml deleted file mode 100644 index 25553c4..0000000 --- a/examples/rules/error_response_rule.yaml +++ /dev/null @@ -1,83 +0,0 @@ -id: standard-error-response -name: 标准错误响应格式规则 -description: 验证API错误响应是否符合标准格式 -category: ErrorHandling -version: 1.0.0 -severity: warning -is_enabled: true -tags: - - error-handling - - response-format -target_type: APIResponse -lifecycle: ResponseValidation -scope: ResponseBody -error_code: "*" # 匹配所有错误码 -expected_status: -1 # 不验证状态码 -code: | - def validate(context): - response = context.get('api_response') - if not response: - return {'is_valid': False, 'message': '缺少API响应对象'} - - # 只检查4xx和5xx状态码的响应 - if response.status_code < 400: - return {'is_valid': True, 'message': '非错误响应,跳过验证'} - - # 确保响应包含JSON内容 - if not response.json_content: - return { - 'is_valid': False, - 'message': '错误响应不是有效的JSON格式', - 'details': { - 'status_code': response.status_code, - 'content_type': response.headers.get('Content-Type', '未知') - } - } - - # 检查错误响应的必要字段 - required_fields = ['code', 'message'] - missing_fields = [field for field in required_fields if field not in response.json_content] - - if missing_fields: - return { - 'is_valid': False, - 'message': '错误响应缺少必要字段', - 'details': { - 'missing_fields': missing_fields, - 'required_fields': required_fields, - 'response': response.json_content - } - } - - # 检查字段类型 - if not isinstance(response.json_content.get('code'), (str, int)): - return { - 'is_valid': False, - 'message': '错误码字段类型不正确', - 'details': { - 'field': 'code', - 'expected_type': 'string或integer', - 'actual_type': type(response.json_content.get('code')).__name__ - } - } - - if not isinstance(response.json_content.get('message'), str): - return { - 'is_valid': False, - 'message': '错误消息字段类型不正确', - 'details': { - 'field': 'message', - 'expected_type': 'string', - 'actual_type': type(response.json_content.get('message')).__name__ - } - } - - return { - 'is_valid': True, - 'message': '错误响应符合标准格式', - 'details': { - 'status_code': response.status_code, - 'error_code': response.json_content.get('code'), - 'error_message': response.json_content.get('message') - } - } \ No newline at end of file diff --git a/examples/rules/performance_rule_response_time.yaml b/examples/rules/performance_rule_response_time.yaml deleted file mode 100644 index d5be6cc..0000000 --- a/examples/rules/performance_rule_response_time.yaml +++ /dev/null @@ -1,45 +0,0 @@ -id: response-time-threshold -name: 响应时间阈值规则 -description: 验证API响应时间是否在允许的范围内 -category: Performance -version: 1.0.0 -severity: warning -is_enabled: true -tags: - - performance - - response-time -target_type: APIResponse -lifecycle: ResponseValidation -scope: ResponseTime -threshold: 500 # 毫秒 -metric: response_time -unit: ms -code: | - def validate(context): - response = context.get('api_response') - if not response: - return {'is_valid': False, 'message': '缺少API响应对象'} - - response_time = response.elapsed_time * 1000 # 转换为毫秒 - threshold = context.get('threshold', 500) # 默认500毫秒 - - if response_time > threshold: - return { - 'is_valid': False, - 'message': f'响应时间 {response_time:.2f}ms 超过阈值 {threshold}ms', - 'details': { - 'actual_time': response_time, - 'threshold': threshold, - 'unit': 'ms' - } - } - - return { - 'is_valid': True, - 'message': f'响应时间 {response_time:.2f}ms 在阈值 {threshold}ms 内', - 'details': { - 'actual_time': response_time, - 'threshold': threshold, - 'unit': 'ms' - } - } \ No newline at end of file diff --git a/examples/rules/restful_design_url.yaml b/examples/rules/restful_design_url.yaml deleted file mode 100644 index cc968fd..0000000 --- a/examples/rules/restful_design_url.yaml +++ /dev/null @@ -1,51 +0,0 @@ -id: restful-url-pattern -name: RESTful URL设计规则 -description: 验证API URL是否符合RESTful设计规范 -category: APIDesign -version: 1.0.0 -severity: warning -is_enabled: true -tags: - - restful - - api-design - - url-pattern -target_type: APIRequest -lifecycle: RequestPreparation -scope: RequestURL -design_aspect: URL设计 -pattern: "^/api/v\\d+/[a-z0-9-]+(/[a-z0-9-]+)*$" -code: | - import re - - def validate(context): - request = context.get('api_request') - if not request: - return {'is_valid': False, 'message': '缺少API请求对象'} - - url = str(request.url) - - # 解析URL,获取路径部分 - from urllib.parse import urlparse - parsed_url = urlparse(url) - path = parsed_url.path - - # 使用正则表达式验证路径 - pattern = context.get('pattern', "^/api/v\\d+/[a-z0-9-]+(/[a-z0-9-]+)*$") - if not re.match(pattern, path): - return { - 'is_valid': False, - 'message': 'API URL不符合RESTful设计规范', - 'details': { - 'current_path': path, - 'expected_pattern': pattern, - 'suggestion': '路径应该遵循 /api/v{version}/{资源}[/{id}] 格式' - } - } - - return { - 'is_valid': True, - 'message': 'API URL符合RESTful设计规范', - 'details': { - 'path': path - } - } \ No newline at end of file diff --git a/examples/rules/security_rule_https.yaml b/examples/rules/security_rule_https.yaml deleted file mode 100644 index 735aea3..0000000 --- a/examples/rules/security_rule_https.yaml +++ /dev/null @@ -1,41 +0,0 @@ -id: https-only-rule -name: HTTPS强制使用规则 -description: 验证API是否只使用HTTPS协议,确保通信安全 -category: Security -version: 1.0.0 -severity: error -is_enabled: true -tags: - - security - - https - - encryption -target_type: APIRequest -lifecycle: RequestPreparation -scope: Security -check_type: transport_security -expected_value: https -code: | - def validate(context): - request = context.get('api_request') - if not request: - return {'is_valid': False, 'message': '缺少API请求对象'} - - url = str(request.url) - - if not url.startswith('https://'): - return { - 'is_valid': False, - 'message': 'API请求必须使用HTTPS协议', - 'details': { - 'current_url': url, - 'expected_protocol': 'https' - } - } - - return { - 'is_valid': True, - 'message': 'API请求使用了HTTPS协议', - 'details': { - 'url': url - } - } \ No newline at end of file diff --git a/examples/test_orchestrator_with_rules.py b/examples/test_orchestrator_with_rules.py deleted file mode 100644 index 3b8a3ad..0000000 --- a/examples/test_orchestrator_with_rules.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -测试编排器与规则库集成示例 - -此示例展示如何使用集成了规则库的测试编排器来执行API测试。 -""" - -import os -import sys -import json -import logging -from pathlib import Path - -# 添加项目根目录到Python路径 -sys.path.insert(0, str(Path(__file__).resolve().parents[1])) - -from ddms_compliance_suite.test_orchestrator import APITestOrchestrator -from ddms_compliance_suite.models.rule_models import ( - BaseRule, PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule, - RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel -) - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) - -logger = logging.getLogger(__name__) - -def create_example_rules(rule_repo): - """创建示例规则""" - # 1. 性能规则 - 响应时间不超过500毫秒 - performance_rule = PerformanceRule( - id="response-time-max-500ms", - name="响应时间不超过500毫秒", - description="验证API响应时间不超过500毫秒", - category=RuleCategory.PERFORMANCE, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_TIME, - threshold=500.0, - metric="response_time", - unit="ms" - ) - - # 2. 安全规则 - HTTPS必须使用 - security_rule = SecurityRule( - id="https-only-rule", - name="HTTPS强制使用规则", - description="验证API请求是否使用了HTTPS协议", - category=RuleCategory.SECURITY, - severity=SeverityLevel.ERROR, - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.SECURITY, - check_type="transport_security", - expected_value="https" - ) - - # 3. RESTful设计规则 - URL路径格式 - restful_rule = RESTfulDesignRule( - id="restful-url-pattern", - name="RESTful URL设计规则", - description="验证API URL是否符合RESTful设计规范", - category=RuleCategory.API_DESIGN, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.REQUEST_URL, - design_aspect="URL设计", - pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$" - ) - - # 4. 错误处理规则 - 错误响应格式 - error_rule = ErrorHandlingRule( - id="standard-error-response", - name="标准错误响应格式规则", - description="验证API错误响应是否符合标准格式", - category=RuleCategory.ERROR_HANDLING, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_BODY, - error_code="*", - expected_status=400 - ) - - # 保存规则到规则库 - rule_repo.save_rule(performance_rule) - rule_repo.save_rule(security_rule) - rule_repo.save_rule(restful_rule) - rule_repo.save_rule(error_rule) - - logger.info("已创建示例规则") - -def run_api_tests_with_rules(base_url, yapi_file_path, output_file=None): - """ - 使用规则库运行API测试 - - Args: - base_url: API基础URL - yapi_file_path: YAPI定义文件路径 - output_file: 输出文件路径(可选) - """ - # 创建临时规则目录 - rules_dir = Path("./temp_rules") - rules_dir.mkdir(exist_ok=True) - - try: - # 初始化测试编排器 - orchestrator = APITestOrchestrator( - base_url=base_url, - rule_repo_path=str(rules_dir) - ) - - # 创建示例规则 - create_example_rules(orchestrator.rule_repo) - - # 运行API测试 - logger.info(f"开始运行API测试: {yapi_file_path}") - summary = orchestrator.run_tests_from_yapi(yapi_file_path) - - # 打印测试结果摘要 - summary.print_summary() - - # 保存测试结果 - if output_file: - with open(output_file, 'w', encoding='utf-8') as f: - f.write(summary.to_json(pretty=True)) - logger.info(f"测试结果已保存到: {output_file}") - - return summary - finally: - # 清理临时规则目录 - # 注意:如果要保留规则,可以注释掉这部分 - import shutil - if rules_dir.exists(): - shutil.rmtree(rules_dir) - -def parse_arguments(): - """解析命令行参数""" - import argparse - - parser = argparse.ArgumentParser(description='测试编排器与规则库集成示例') - parser.add_argument('--base-url', required=True, help='API基础URL') - parser.add_argument('--yapi', required=True, help='YAPI定义文件路径') - parser.add_argument('--output', help='输出文件路径') - - return parser.parse_args() - -def main(): - """主函数""" - args = parse_arguments() - - # 运行API测试 - summary = run_api_tests_with_rules( - base_url=args.base_url, - yapi_file_path=args.yapi, - output_file=args.output - ) - - # 返回测试结果状态码 - return 1 if summary.failed > 0 or summary.error > 0 else 0 - -if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file diff --git a/examples/test_rule_engine.py b/examples/test_rule_engine.py deleted file mode 100644 index eea99ea..0000000 --- a/examples/test_rule_engine.py +++ /dev/null @@ -1,282 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -规则库和规则执行引擎示例 - -本示例展示如何使用规则库和规则执行引擎来验证API请求和响应。 -""" - -import os -import sys -import json -import logging -from pathlib import Path - -# 确保能够导入项目模块 -sys.path.insert(0, str(Path(__file__).resolve().parents[1])) - -from ddms_compliance_suite.models.config_models import AppConfig, RuleRepositoryConfig -from ddms_compliance_suite.rule_repository.repository import RuleRepository -from ddms_compliance_suite.rule_executor.executor import RuleExecutor -from ddms_compliance_suite.api_caller.caller import APICaller, APIRequest, APIResponse -from ddms_compliance_suite.models.rule_models import ( - RuleCategory, TargetType, RuleLifecycle, RuleScope, - PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule -) - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - -def load_example_rules(): - """加载示例规则""" - logger.info("加载示例规则...") - - # 创建规则库配置 - repo_config = RuleRepositoryConfig( - storage={"type": "filesystem", "path": "./examples/rules"}, - preload_rules=True - ) - - # 创建规则库 - repo = RuleRepository(repo_config) - - # 加载YAML文件中的规则 - yaml_rules_loaded = len(repo.query_rules()) - logger.info(f"从YAML文件加载了 {yaml_rules_loaded} 条规则") - - # 创建几个示例规则对象并保存 - if yaml_rules_loaded == 0: - # 性能规则示例 - performance_rule = PerformanceRule( - id="response-time-max-500ms", - name="响应时间不超过500毫秒", - description="验证API响应时间不超过500毫秒", - category=RuleCategory.PERFORMANCE, - severity="warning", - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_TIME, - threshold=500, - metric="response_time", - unit="ms" - ) - - # 安全规则示例 - security_rule = SecurityRule( - id="https-only", - name="仅允许HTTPS", - description="验证API请求仅使用HTTPS协议", - category=RuleCategory.SECURITY, - severity="error", - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.SECURITY, - check_type="transport_security", - expected_value="https" - ) - - # RESTful设计规则示例 - design_rule = RESTfulDesignRule( - id="restful-url-design", - name="RESTful URL设计", - description="验证API URL是否符合RESTful设计规范", - category=RuleCategory.API_DESIGN, - severity="warning", - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.REQUEST_URL, - design_aspect="URL设计", - pattern="^/api/v\\d+/[a-z0-9-]+(/[a-z0-9-]+)*$" - ) - - # 错误处理规则示例 - error_rule = ErrorHandlingRule( - id="standard-error-format", - name="标准错误格式", - description="验证API错误响应是否符合标准格式", - category=RuleCategory.ERROR_HANDLING, - severity="warning", - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_BODY, - error_code="*", - expected_status=-1 - ) - - # 保存规则 - repo.save_rule(performance_rule) - repo.save_rule(security_rule) - repo.save_rule(design_rule) - repo.save_rule(error_rule) - - logger.info("已创建并保存示例规则") - - return repo - -def demo_rule_execution(): - """演示规则执行""" - # 加载规则库 - repo = load_example_rules() - - # 创建规则执行引擎 - executor = RuleExecutor(repo) - - # 创建API调用器 - api_caller = APICaller() - - # 模拟API请求 - https_request = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/123", - headers={"Content-Type": "application/json"} - ) - - http_request = APIRequest( - method="GET", - url="http://api.example.com/users/123", - headers={"Content-Type": "application/json"} - ) - - # 模拟API响应 - success_response = APIResponse( - status_code=200, - headers={"Content-Type": "application/json"}, - content=b'{"id": 123, "name": "Test User"}', - json_content={"id": 123, "name": "Test User"}, - elapsed_time=0.2 # 200毫秒 - ) - - slow_response = APIResponse( - status_code=200, - headers={"Content-Type": "application/json"}, - content=b'{"id": 123, "name": "Test User"}', - json_content={"id": 123, "name": "Test User"}, - elapsed_time=0.8 # 800毫秒 - ) - - error_response = APIResponse( - status_code=404, - headers={"Content-Type": "application/json"}, - content=b'{"code": "NOT_FOUND", "message": "User not found"}', - json_content={"code": "NOT_FOUND", "message": "User not found"}, - elapsed_time=0.1 - ) - - bad_error_response = APIResponse( - status_code=500, - headers={"Content-Type": "application/json"}, - content=b'{"error": "Internal Server Error"}', - json_content={"error": "Internal Server Error"}, - elapsed_time=0.1 - ) - - # 测试场景1:HTTPS请求 + 成功响应 - logger.info("\n===== 测试场景1:HTTPS请求 + 成功响应 =====") - context1 = { - "api_request": https_request, - "api_response": success_response - } - - # 执行请求准备阶段的规则 - logger.info("执行请求准备阶段的规则...") - prep_results = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context1) - for result in prep_results: - logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}") - - # 执行响应验证阶段的规则 - logger.info("执行响应验证阶段的规则...") - resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context1) - for result in resp_results: - logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}") - - # 测试场景2:HTTP请求(非HTTPS)+ 成功响应 - logger.info("\n===== 测试场景2:HTTP请求(非HTTPS)+ 成功响应 =====") - context2 = { - "api_request": http_request, - "api_response": success_response - } - - # 执行请求准备阶段的规则 - logger.info("执行请求准备阶段的规则...") - prep_results = executor.execute_rules_for_lifecycle(RuleLifecycle.REQUEST_PREPARATION, context2) - for result in prep_results: - logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}") - - # 测试场景3:HTTPS请求 + 慢响应 - logger.info("\n===== 测试场景3:HTTPS请求 + 慢响应 =====") - context3 = { - "api_request": https_request, - "api_response": slow_response - } - - # 执行响应验证阶段的规则 - logger.info("执行响应验证阶段的规则...") - resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context3) - for result in resp_results: - logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}") - - # 测试场景4:HTTPS请求 + 错误响应 - logger.info("\n===== 测试场景4:HTTPS请求 + 错误响应 =====") - context4 = { - "api_request": https_request, - "api_response": error_response - } - - # 执行响应验证阶段的规则 - logger.info("执行响应验证阶段的规则...") - resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context4) - for result in resp_results: - logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}") - - # 测试场景5:HTTPS请求 + 格式不正确的错误响应 - logger.info("\n===== 测试场景5:HTTPS请求 + 格式不正确的错误响应 =====") - context5 = { - "api_request": https_request, - "api_response": bad_error_response - } - - # 执行响应验证阶段的规则 - logger.info("执行响应验证阶段的规则...") - resp_results = executor.execute_rules_for_lifecycle(RuleLifecycle.RESPONSE_VALIDATION, context5) - for result in resp_results: - logger.info(f"规则 '{result.rule_name}' 结果: {'通过' if result.is_valid else '失败'} - {result.message}") - - # 测试YAML规则 - logger.info("\n===== 测试YAML规则 =====") - # 获取所有规则 - all_rules = repo.query_rules() - yaml_rules = [rule for rule in all_rules if hasattr(rule, 'code') and rule.code] - - if yaml_rules: - logger.info(f"发现 {len(yaml_rules)} 条YAML规则,执行测试...") - for rule in yaml_rules: - logger.info(f"测试YAML规则: {rule.name}") - # 选择合适的上下文 - if rule.target_type == TargetType.API_REQUEST: - context = {"api_request": https_request} - elif rule.target_type == TargetType.API_RESPONSE: - if rule.category == RuleCategory.PERFORMANCE: - context = {"api_response": slow_response} - elif rule.category == RuleCategory.ERROR_HANDLING: - context = {"api_response": error_response} - else: - context = {"api_response": success_response} - else: - context = {} - - # 执行规则 - result = executor.execute_rule(rule, context) - logger.info(f"结果: {'通过' if result.is_valid else '失败'} - {result.message}") - if result.details: - logger.info(f"详情: {json.dumps(result.details, ensure_ascii=False, indent=2)}") - else: - logger.info("未发现YAML规则,跳过测试") - -if __name__ == "__main__": - # 执行示例 - demo_rule_execution() \ No newline at end of file diff --git a/examples/test_with_rules_demo.py b/examples/test_with_rules_demo.py deleted file mode 100644 index 426102e..0000000 --- a/examples/test_with_rules_demo.py +++ /dev/null @@ -1,416 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -测试编排器与规则库集成演示 - -此示例展示如何使用测试编排器和规则库执行API测试。 -""" - -import os -import sys -import logging -import argparse -import json -import re -from pathlib import Path - -# 添加项目根目录到Python路径 -sys.path.insert(0, str(Path(__file__).resolve().parents[1])) - -from ddms_compliance_suite.test_orchestrator import APITestOrchestrator -from ddms_compliance_suite.models.rule_models import ( - RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel -) -from ddms_compliance_suite.input_parser.parser import YAPIEndpoint - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) - -logger = logging.getLogger(__name__) - -def create_performance_rule(): - """创建性能规则""" - from ddms_compliance_suite.models.rule_models import PerformanceRule - - return PerformanceRule( - id="response-time-max-500ms", - name="响应时间不超过500毫秒", - description="验证API响应时间不超过500毫秒", - category=RuleCategory.PERFORMANCE, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_TIME, - threshold=500.0, - metric="response_time", - unit="ms" - ) - -def create_security_rule(): - """创建安全规则""" - from ddms_compliance_suite.models.rule_models import SecurityRule - - return SecurityRule( - id="https-only-rule", - name="HTTPS强制使用规则", - description="验证API请求是否使用了HTTPS协议", - category=RuleCategory.SECURITY, - severity=SeverityLevel.ERROR, - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.SECURITY, - check_type="transport_security", - expected_value="https" - ) - -def create_restful_rule(): - """创建RESTful设计规则""" - from ddms_compliance_suite.models.rule_models import RESTfulDesignRule - - return RESTfulDesignRule( - id="restful-url-pattern", - name="RESTful URL设计规则", - description="验证API URL是否符合RESTful设计规范", - category=RuleCategory.API_DESIGN, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_REQUEST, - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - scope=RuleScope.REQUEST_URL, - design_aspect="URL设计", - pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$" - ) - -def create_error_handling_rule(): - """创建错误处理规则""" - from ddms_compliance_suite.models.rule_models import ErrorHandlingRule - - return ErrorHandlingRule( - id="standard-error-response", - name="标准错误响应格式规则", - description="验证API错误响应是否符合标准格式", - category=RuleCategory.ERROR_HANDLING, - severity=SeverityLevel.WARNING, - target_type=TargetType.API_RESPONSE, - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - scope=RuleScope.RESPONSE_BODY, - error_code="*", - expected_status=400 - ) - -def create_rules(orchestrator): - """创建示例规则""" - logger.info("创建示例规则...") - - # 使用工厂函数创建规则实例 - performance_rule = create_performance_rule() - security_rule = create_security_rule() - restful_rule = create_restful_rule() - error_rule = create_error_handling_rule() - - # 保存规则到规则库 - orchestrator.rule_repo.save_rule(performance_rule) - orchestrator.rule_repo.save_rule(security_rule) - orchestrator.rule_repo.save_rule(restful_rule) - orchestrator.rule_repo.save_rule(error_rule) - - logger.info("已创建示例规则") - return [performance_rule, security_rule, restful_rule, error_rule] - -def demo_different_scenarios(orchestrator, rules): - """演示不同测试场景:标准场景、非HTTPS场景、非RESTful URL场景等""" - logger.info("开始演示不同测试场景...") - - # 创建测试摘要 - from ddms_compliance_suite.test_orchestrator import TestSummary - summary = TestSummary() - - # 创建模拟API响应函数 - def mock_api_response(status_code=200, content=None, elapsed_time=0.1): - from ddms_compliance_suite.api_caller.caller import APIResponse - - if content is None: - content = {"id": 123, "name": "测试用户", "status": "active"} - - return APIResponse( - status_code=status_code, - headers={"Content-Type": "application/json"}, - content=json.dumps(content).encode('utf-8'), - elapsed_time=elapsed_time, - json_content=content - ) - - # 场景1:标准响应(所有规则应通过) - logger.info("\n场景1: 标准响应 - 所有规则应通过") - endpoint1 = YAPIEndpoint( - path="/api/v1/users/123", - method="GET", - title="获取用户信息", - description="获取指定ID的用户信息" - ) - - # 手动构建API请求,而不是通过orchestrator._build_api_request - from ddms_compliance_suite.api_caller.caller import APIRequest - request1 = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/123", - headers={"Content-Type": "application/json"} - ) - - # 模拟API响应 - response1 = mock_api_response() - - # 创建测试上下文 - context1 = { - 'api_request': request1, - 'api_response': response1, - 'endpoint': endpoint1, - 'endpoint_id': f"{endpoint1.method} {endpoint1.path}" - } - - # 执行规则验证 - logger.info("执行请求准备阶段规则...") - request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context1 - ) - - log_results(request_rule_results, "请求准备阶段") - - logger.info("执行响应验证阶段规则...") - response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context1 - ) - - log_results(response_rule_results, "响应验证阶段") - - # 场景2:慢响应(性能规则应失败) - logger.info("\n场景2: 慢响应 - 性能规则应失败") - endpoint2 = YAPIEndpoint( - path="/api/v1/users/123", - method="GET", - title="获取用户信息", - description="获取指定ID的用户信息" - ) - - request2 = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/123", - headers={"Content-Type": "application/json"} - ) - - # 模拟慢响应(超过500毫秒) - response2 = mock_api_response(elapsed_time=1.5) - - # 创建测试上下文 - context2 = { - 'api_request': request2, - 'api_response': response2, - 'endpoint': endpoint2, - 'endpoint_id': f"{endpoint2.method} {endpoint2.path}" - } - - # 执行规则验证 - logger.info("执行请求准备阶段规则...") - request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context2 - ) - - log_results(request_rule_results, "请求准备阶段") - - logger.info("执行响应验证阶段规则...") - response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context2 - ) - - log_results(response_rule_results, "响应验证阶段") - - # 场景3:错误响应(错误处理规则应通过) - logger.info("\n场景3: 错误响应 - 错误处理规则应验证") - endpoint3 = YAPIEndpoint( - path="/api/v1/users/999", - method="GET", - title="获取不存在的用户信息", - description="获取不存在的用户信息,应返回404" - ) - - request3 = APIRequest( - method="GET", - url="https://api.example.com/api/v1/users/999", - headers={"Content-Type": "application/json"} - ) - - # 模拟错误响应 - response3 = mock_api_response( - status_code=404, - content={"code": "USER_NOT_FOUND", "message": "用户不存在"} - ) - - # 创建测试上下文 - context3 = { - 'api_request': request3, - 'api_response': response3, - 'endpoint': endpoint3, - 'endpoint_id': f"{endpoint3.method} {endpoint3.path}" - } - - # 执行规则验证 - logger.info("执行请求准备阶段规则...") - request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context3 - ) - - log_results(request_rule_results, "请求准备阶段") - - logger.info("执行响应验证阶段规则...") - response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context3 - ) - - log_results(response_rule_results, "响应验证阶段") - - # 场景4:非HTTPS场景(安全规则应失败) - logger.info("\n场景4: 非HTTPS请求 - 安全规则应失败") - endpoint4 = YAPIEndpoint( - path="/api/v1/users/123", - method="GET", - title="获取用户信息", - description="获取指定ID的用户信息" - ) - - # 注意这里使用HTTP而不是HTTPS - request4 = APIRequest( - method="GET", - url="http://api.example.com/api/v1/users/123", - headers={"Content-Type": "application/json"} - ) - - # 模拟响应 - response4 = mock_api_response() - - # 创建测试上下文 - context4 = { - 'api_request': request4, - 'api_response': response4, - 'endpoint': endpoint4, - 'endpoint_id': f"{endpoint4.method} {endpoint4.path}" - } - - # 执行规则验证 - logger.info("执行请求准备阶段规则...") - request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context4 - ) - - log_results(request_rule_results, "请求准备阶段") - - logger.info("执行响应验证阶段规则...") - response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context4 - ) - - log_results(response_rule_results, "响应验证阶段") - - # 场景5:非RESTful URL场景(RESTful设计规则应失败) - logger.info("\n场景5: 非RESTful URL请求 - RESTful设计规则应失败") - endpoint5 = YAPIEndpoint( - path="/getUserInfo", - method="GET", - title="获取用户信息", - description="获取指定ID的用户信息" - ) - - request5 = APIRequest( - method="GET", - url="https://api.example.com/getUserInfo?id=123", - headers={"Content-Type": "application/json"} - ) - - # 模拟响应 - response5 = mock_api_response() - - # 创建测试上下文 - context5 = { - 'api_request': request5, - 'api_response': response5, - 'endpoint': endpoint5, - 'endpoint_id': f"{endpoint5.method} {endpoint5.path}" - } - - # 执行规则验证 - logger.info("执行请求准备阶段规则...") - request_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.REQUEST_PREPARATION, - context=context5 - ) - - log_results(request_rule_results, "请求准备阶段") - - logger.info("执行响应验证阶段规则...") - response_rule_results = orchestrator.rule_executor.execute_rules_for_lifecycle( - lifecycle=RuleLifecycle.RESPONSE_VALIDATION, - context=context5 - ) - - log_results(response_rule_results, "响应验证阶段") - - logger.info("\n演示完成") - -def log_results(results, phase_name): - """打印规则执行结果""" - total_rules = len(results) - passed_rules = sum(1 for r in results if r.is_valid) - - logger.info(f"{phase_name}规则执行结果: {passed_rules}/{total_rules} 通过") - - for result in results: - status = "通过" if result.is_valid else "失败" - logger.info(f"规则: {result.rule_name} (ID: {result.rule_id}) - 结果: {status} - 消息: {result.message}") - -def main(): - """主函数""" - parser = argparse.ArgumentParser(description='测试编排器与规则库集成演示') - parser.add_argument('--rules-path', default='./temp_rules', - help='规则库路径,默认为./temp_rules') - args = parser.parse_args() - - # 创建临时规则目录 - rules_dir = Path(args.rules_path) - rules_dir.mkdir(exist_ok=True, parents=True) - - try: - # 初始化测试编排器 - orchestrator = APITestOrchestrator( - base_url="https://api.example.com", - rule_repo_path=str(rules_dir) - ) - - # 创建示例规则 - rules = create_rules(orchestrator) - - # 演示不同场景 - demo_different_scenarios(orchestrator, rules) - - return 0 - except Exception as e: - logger.error(f"演示过程中发生错误: {e}", exc_info=True) - return 1 - finally: - # 清理规则目录 - import shutil - if rules_dir.exists() and str(rules_dir).startswith('./temp'): - logger.info(f"清理临时规则目录: {rules_dir}") - shutil.rmtree(rules_dir) - -if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file