This commit is contained in:
gongwenxin 2025-05-19 17:35:50 +08:00
parent 61b7befb1f
commit 4a22060890
4 changed files with 10053 additions and 10202 deletions

View File

@ -14,10 +14,6 @@ import datetime
from .input_parser.parser import InputParser, YAPIEndpoint, SwaggerEndpoint, ParsedYAPISpec, ParsedSwaggerSpec
from .api_caller.caller import APICaller, APIRequest, APIResponse
from .json_schema_validator.validator import JSONSchemaValidator
from .rule_repository.repository import RuleRepository
from .rule_executor.executor import RuleExecutor
from .models.rule_models import RuleQuery, TargetType, RuleCategory, RuleLifecycle, RuleScope
from .models.config_models import RuleRepositoryConfig, RuleStorageConfig
from .test_framework_core import ValidationResult, TestSeverity, APIRequestContext, APIResponseContext, BaseAPITestCase
from .test_case_registry import TestCaseRegistry
@ -302,7 +298,6 @@ class APITestOrchestrator:
"""API测试编排器"""
def __init__(self, base_url: str,
rule_repo_path: str = "./rules", # 旧规则引擎的规则库路径
custom_test_cases_dir: Optional[str] = None # 新的自定义测试用例目录路径
):
"""
@ -310,7 +305,6 @@ class APITestOrchestrator:
Args:
base_url: API基础URL
rule_repo_path: 规则库路径
custom_test_cases_dir: 存放自定义 APITestCase 的目录路径如果为 None则不加载自定义测试用例
"""
self.base_url = base_url.rstrip('/')
@ -321,14 +315,6 @@ class APITestOrchestrator:
self.api_caller = APICaller()
self.validator = JSONSchemaValidator() # JSON Schema 验证器,可能会被测试用例内部使用
# 初始化 (旧) 规则库和规则执行器
# 未来可以考虑是否完全移除或将其功能也通过 APITestCase 实现
rule_config = RuleRepositoryConfig(
storage=RuleStorageConfig(path=rule_repo_path)
)
self.rule_repo = RuleRepository(rule_config)
self.rule_executor = RuleExecutor(self.rule_repo)
# 初始化 (新) 测试用例注册表
self.test_case_registry: Optional[TestCaseRegistry] = None
if custom_test_cases_dir:

View File

@ -16,10 +16,6 @@ import argparse
from pathlib import Path
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary
from ddms_compliance_suite.models.rule_models import (
PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule,
RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel
)
# 配置日志
logging.basicConfig(
@ -51,15 +47,6 @@ def parse_args():
filter_group.add_argument('--list-categories', action='store_true', help='列出YAPI分类')
filter_group.add_argument('--list-tags', action='store_true', help='列出Swagger标签')
# 规则库参数
rule_group = parser.add_argument_group('规则库选项')
rule_group.add_argument('--rules-path', default='./rules', help='规则库路径')
# rule_group.add_argument('--disable-rules', action='store_true', help='禁用规则验证',default=False)
rule_group.add_argument('--disable-rules', action='store_true', help='禁用规则验证',default=True)
rule_group.add_argument('--list-rules', action='store_true', help='列出可用规则',default=False)
rule_group.add_argument('--create-basic-rules', action='store_true',
help='创建基本规则集性能、安全、RESTful设计、错误处理',default=False)
# 新增:自定义测试用例参数组
custom_tc_group = parser.add_argument_group('自定义测试用例选项')
custom_tc_group.add_argument('--custom-test-cases-dir',
@ -100,111 +87,6 @@ def list_swagger_tags(swagger_file: str):
for i, tag in enumerate(parsed_swagger.tags, 1):
print(f"{i}. {tag.get('name', '未命名')} - {tag.get('description', '无描述')}")
def create_basic_rules(orchestrator):
"""创建基本规则集"""
logger.info("创建基本规则集...")
# 1. 性能规则 - 响应时间不超过500毫秒
performance_rule = PerformanceRule(
id="response-time-max-500ms",
name="响应时间不超过500毫秒",
description="验证API响应时间不超过500毫秒",
category=RuleCategory.PERFORMANCE,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_TIME,
threshold=500.0,
metric="response_time",
unit="ms"
)
# 2. 安全规则 - HTTPS必须使用
security_rule = SecurityRule(
id="https-only-rule",
name="HTTPS强制使用规则",
description="验证API请求是否使用了HTTPS协议",
category=RuleCategory.SECURITY,
severity=SeverityLevel.ERROR,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.SECURITY,
check_type="transport_security",
expected_value="https"
)
# 3. RESTful设计规则 - URL路径格式
restful_rule = RESTfulDesignRule(
id="restful-url-pattern",
name="RESTful URL设计规则",
description="验证API URL是否符合RESTful设计规范",
category=RuleCategory.API_DESIGN,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_REQUEST,
lifecycle=RuleLifecycle.REQUEST_PREPARATION,
scope=RuleScope.REQUEST_URL,
design_aspect="URL设计",
pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$"
)
# 4. 错误处理规则 - 错误响应格式
error_rule = ErrorHandlingRule(
id="standard-error-response",
name="标准错误响应格式规则",
description="验证API错误响应是否符合标准格式",
category=RuleCategory.ERROR_HANDLING,
severity=SeverityLevel.WARNING,
target_type=TargetType.API_RESPONSE,
lifecycle=RuleLifecycle.RESPONSE_VALIDATION,
scope=RuleScope.RESPONSE_BODY,
error_code="*",
expected_status=400
)
# 保存规则到规则库
orchestrator.rule_repo.save_rule(performance_rule)
orchestrator.rule_repo.save_rule(security_rule)
orchestrator.rule_repo.save_rule(restful_rule)
orchestrator.rule_repo.save_rule(error_rule)
logger.info("已创建基本规则集")
def list_rules(rule_repo_path: str):
"""列出可用规则"""
from ddms_compliance_suite.models.config_models import RuleRepositoryConfig, RuleStorageConfig
from ddms_compliance_suite.rule_repository.repository import RuleRepository
# 初始化规则库
rule_config = RuleRepositoryConfig(
storage=RuleStorageConfig(path=rule_repo_path)
)
repo = RuleRepository(rule_config)
# 查询所有规则
rules = repo.query_rules()
if not rules:
print("未找到规则。使用 --create-basic-rules 创建基本规则集。")
return
print(f"\n找到 {len(rules)} 条规则:")
# 按类别分组
rules_by_category = {}
for rule in rules:
category = str(rule.category)
if category not in rules_by_category:
rules_by_category[category] = []
rules_by_category[category].append(rule)
# 打印规则
for category, category_rules in rules_by_category.items():
print(f"\n{category}:")
for rule in category_rules:
print(f" - {rule.id}: {rule.name} (严重性: {rule.severity}, 版本: {rule.version})")
print(f" {rule.description}")
print(f" 生命周期: {rule.lifecycle}, 作用域: {rule.scope}")
def save_results(summary: TestSummary, output_file: str, format_type: str):
"""保存测试结果"""
if format_type == 'json':
@ -291,11 +173,6 @@ def main():
logger.setLevel(logging.DEBUG)
logger.debug("已启用详细日志模式")
# 列出规则
if args.list_rules:
list_rules(args.rules_path)
return 0
# 检查是否提供了API定义源
if not args.yapi and not args.swagger:
logger.error("请提供API定义源--yapi 或 --swagger")
@ -318,21 +195,9 @@ def main():
# 将 custom_test_cases_dir 参数传递给 APITestOrchestrator 的构造函数
orchestrator = APITestOrchestrator(
base_url=args.base_url,
rule_repo_path=args.rules_path,
custom_test_cases_dir=args.custom_test_cases_dir # 新增参数
)
# 创建基本规则集
if args.create_basic_rules:
create_basic_rules(orchestrator)
# 如果禁用规则,替换规则执行器的方法
if args.disable_rules:
logger.info("规则验证已禁用")
# 替换规则执行器的方法为空实现
orchestrator.rule_executor.execute_rules_for_lifecycle = lambda *args, **kwargs: []
orchestrator.rule_executor.execute_rules_for_target = lambda *args, **kwargs: []
# 运行测试
summary = None

File diff suppressed because it is too large Load Diff