#!/usr/bin/env python # -*- coding: utf-8 -*- """ API测试工具 此工具使用DDMS测试编排器从YAPI或Swagger定义执行API测试。 支持使用规则库进行高级验证。 """ import os import sys import json import logging import argparse from pathlib import Path from ddms_compliance_suite.test_orchestrator import APITestOrchestrator, TestSummary from ddms_compliance_suite.models.rule_models import ( PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule, RuleCategory, TargetType, RuleLifecycle, RuleScope, SeverityLevel ) # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def parse_args(): """解析命令行参数""" parser = argparse.ArgumentParser(description='DDMS API测试工具') # 基本参数 parser.add_argument('--base-url', required=True, help='API基础URL') parser.add_argument('--verbose', '-v', action='store_true', help='启用详细日志') parser.add_argument('--output', '-o', help='输出文件路径') parser.add_argument('--format', choices=['json', 'html'], default='json', help='输出格式') # API定义参数 api_group = parser.add_argument_group('API定义源') api_group.add_argument('--yapi', help='YAPI定义文件路径') api_group.add_argument('--swagger', help='Swagger定义文件路径') # 过滤参数 filter_group = parser.add_argument_group('过滤选项') filter_group.add_argument('--categories', help='YAPI分类,逗号分隔') filter_group.add_argument('--tags', help='Swagger标签,逗号分隔') filter_group.add_argument('--list-categories', action='store_true', help='列出YAPI分类') filter_group.add_argument('--list-tags', action='store_true', help='列出Swagger标签') # 规则库参数 rule_group = parser.add_argument_group('规则库选项') rule_group.add_argument('--rules-path', default='./rules', help='规则库路径') # rule_group.add_argument('--disable-rules', action='store_true', help='禁用规则验证',default=False) rule_group.add_argument('--disable-rules', action='store_true', help='禁用规则验证',default=True) rule_group.add_argument('--list-rules', action='store_true', help='列出可用规则',default=False) rule_group.add_argument('--create-basic-rules', action='store_true', help='创建基本规则集(性能、安全、RESTful设计、错误处理)',default=False) return parser.parse_args() def list_yapi_categories(yapi_file: str): """列出YAPI分类""" from ddms_compliance_suite.input_parser.parser import InputParser logger.info(f"从YAPI文件解析分类: {yapi_file}") parser = InputParser() parsed_yapi = parser.parse_yapi_spec(yapi_file) if not parsed_yapi: logger.error(f"解析YAPI文件失败: {yapi_file}") return print("\nYAPI分类:") for i, category in enumerate(parsed_yapi.categories, 1): print(f"{i}. {category.get('name', '未命名')} - {category.get('desc', '无描述')}") def list_swagger_tags(swagger_file: str): """列出Swagger标签""" from ddms_compliance_suite.input_parser.parser import InputParser logger.info(f"从Swagger文件解析标签: {swagger_file}") parser = InputParser() parsed_swagger = parser.parse_swagger_spec(swagger_file) if not parsed_swagger: logger.error(f"解析Swagger文件失败: {swagger_file}") return print("\nSwagger标签:") for i, tag in enumerate(parsed_swagger.tags, 1): print(f"{i}. {tag.get('name', '未命名')} - {tag.get('description', '无描述')}") def create_basic_rules(orchestrator): """创建基本规则集""" logger.info("创建基本规则集...") # 1. 性能规则 - 响应时间不超过500毫秒 performance_rule = PerformanceRule( id="response-time-max-500ms", name="响应时间不超过500毫秒", description="验证API响应时间不超过500毫秒", category=RuleCategory.PERFORMANCE, severity=SeverityLevel.WARNING, target_type=TargetType.API_RESPONSE, lifecycle=RuleLifecycle.RESPONSE_VALIDATION, scope=RuleScope.RESPONSE_TIME, threshold=500.0, metric="response_time", unit="ms" ) # 2. 安全规则 - HTTPS必须使用 security_rule = SecurityRule( id="https-only-rule", name="HTTPS强制使用规则", description="验证API请求是否使用了HTTPS协议", category=RuleCategory.SECURITY, severity=SeverityLevel.ERROR, target_type=TargetType.API_REQUEST, lifecycle=RuleLifecycle.REQUEST_PREPARATION, scope=RuleScope.SECURITY, check_type="transport_security", expected_value="https" ) # 3. RESTful设计规则 - URL路径格式 restful_rule = RESTfulDesignRule( id="restful-url-pattern", name="RESTful URL设计规则", description="验证API URL是否符合RESTful设计规范", category=RuleCategory.API_DESIGN, severity=SeverityLevel.WARNING, target_type=TargetType.API_REQUEST, lifecycle=RuleLifecycle.REQUEST_PREPARATION, scope=RuleScope.REQUEST_URL, design_aspect="URL设计", pattern=r"^/api/v\d+/[a-z0-9-]+(/[a-z0-9-]+)*$" ) # 4. 错误处理规则 - 错误响应格式 error_rule = ErrorHandlingRule( id="standard-error-response", name="标准错误响应格式规则", description="验证API错误响应是否符合标准格式", category=RuleCategory.ERROR_HANDLING, severity=SeverityLevel.WARNING, target_type=TargetType.API_RESPONSE, lifecycle=RuleLifecycle.RESPONSE_VALIDATION, scope=RuleScope.RESPONSE_BODY, error_code="*", expected_status=400 ) # 保存规则到规则库 orchestrator.rule_repo.save_rule(performance_rule) orchestrator.rule_repo.save_rule(security_rule) orchestrator.rule_repo.save_rule(restful_rule) orchestrator.rule_repo.save_rule(error_rule) logger.info("已创建基本规则集") def list_rules(rule_repo_path: str): """列出可用规则""" from ddms_compliance_suite.models.config_models import RuleRepositoryConfig, RuleStorageConfig from ddms_compliance_suite.rule_repository.repository import RuleRepository # 初始化规则库 rule_config = RuleRepositoryConfig( storage=RuleStorageConfig(path=rule_repo_path) ) repo = RuleRepository(rule_config) # 查询所有规则 rules = repo.query_rules() if not rules: print("未找到规则。使用 --create-basic-rules 创建基本规则集。") return print(f"\n找到 {len(rules)} 条规则:") # 按类别分组 rules_by_category = {} for rule in rules: category = str(rule.category) if category not in rules_by_category: rules_by_category[category] = [] rules_by_category[category].append(rule) # 打印规则 for category, category_rules in rules_by_category.items(): print(f"\n● {category}:") for rule in category_rules: print(f" - {rule.id}: {rule.name} (严重性: {rule.severity}, 版本: {rule.version})") print(f" {rule.description}") print(f" 生命周期: {rule.lifecycle}, 作用域: {rule.scope}") def save_results(summary: TestSummary, output_file: str, format_type: str): """保存测试结果""" if format_type == 'json': with open(output_file, 'w', encoding='utf-8') as f: f.write(summary.to_json(pretty=True)) logger.info(f"测试结果已保存为JSON: {output_file}") elif format_type == 'html': # 创建简单的HTML报告 html_content = f"""
总测试数: {summary.total}
通过: {summary.passed}
失败: {summary.failed}
错误: {summary.error}
跳过: {summary.skipped}
成功率: {summary.success_rate:.2f}%
总耗时: {summary.duration:.2f}秒
开始时间: {summary.start_time.isoformat()}
结束时间: {summary.end_time.isoformat() if summary.end_time else 'N/A'}
| 端点 | 状态 | 消息 | 响应码 | 耗时(秒) |
|---|---|---|---|---|
| {result.endpoint_id} | {result.status} | {result.message} | {response_code} | {result.elapsed_time:.4f} |