"""Base Pydantic models for the application.""" from pydantic import BaseModel, Field from typing import Optional, List, Dict, Any, Union, Literal from enum import Enum class SeverityLevel(str, Enum): ERROR = "error" WARNING = "warning" INFO = "info" class RuleCategory(str, Enum): JSON_SCHEMA = "JSONSchema" API_LINTING = "APILinting" BUSINESS_LOGIC = "BusinessLogic" DATA_QUALITY = "DataQuality" SECURITY = "Security" PERFORMANCE = "Performance" MAPPING = "Mapping" PYTHON_CODE = "PythonCode" # Python代码规则类别 API_DESIGN = "APIDesign" # API设计规则类别 ERROR_HANDLING = "ErrorHandling" # 错误处理规则类别 COMPATIBILITY = "Compatibility" # 兼容性规则类别 GENERIC = "Generic" class TargetType(str, Enum): API_REQUEST = "APIRequest" API_RESPONSE = "APIResponse" DATA_OBJECT = "DataObject" OPENAPI_SPECIFICATION = "OpenAPISpecification" GENERIC_TARGET = "GenericTarget" class RuleLifecycle(str, Enum): """规则生命周期枚举,定义规则在测试流程中的适用阶段""" REQUEST_PREPARATION = "请求准备阶段" RESPONSE_VALIDATION = "响应验证阶段" POST_VALIDATION = "后处理阶段" API_SPECIFICATION_VALIDATION = "API规范验证阶段" # 新增的生命周期 ANY_STAGE = "任意阶段" class RuleScope(str, Enum): """规则作用域枚举,定义规则针对的具体对象""" REQUEST_URL = "请求URL" REQUEST_HEADERS = "请求头" REQUEST_PARAMS = "请求参数" REQUEST_BODY = "请求体" RESPONSE_STATUS = "响应状态码" RESPONSE_HEADERS = "响应头" RESPONSE_BODY = "响应体" RESPONSE_TIME = "响应时间" SECURITY = "安全性" PERFORMANCE = "性能" ANY_SCOPE = "任意作用域" class BaseRule(BaseModel): """通用规则属性 (Pydantic基类 BaseRule)""" id: str # 规则唯一ID name: str # 规则名称 description: Optional[str] = None # 规则详细描述 category: RuleCategory # 规则类别 version: str = "1.0.0" # 版本号 severity: SeverityLevel = SeverityLevel.INFO # 严重性 source: Optional[str] = None # 规则来源 (例如 "PlatformStandard-XYZ", "OWASP-ASVS") is_enabled: bool = True # 是否启用 tags: Optional[List[str]] = None # 规则标签 (如 "critical", "wellbore") target_type: Optional[TargetType] = None # 规则适用目标类型 (描述性元数据) target_identifier: Optional[str] = None # 规则适用目标的具体标识 (描述性元数据) author: Optional[str] = None # 规则作者 # 新增字段 lifecycle: Optional[RuleLifecycle] = Field( default=RuleLifecycle.ANY_STAGE, description="规则的推荐适用阶段或类型。主要用于识别 API_SPECIFICATION_VALIDATION (静态分析) 规则。" "对于其他动态规则,此字段为元数据提示,实际执行阶段由 TestStep 定义。" ) scope: Optional[RuleScope] = Field( default=RuleScope.ANY_SCOPE, description="描述规则逻辑主要关注的API交互部分 (例如,请求头、响应体)。" "此字段为描述性元数据,用于规则分类和理解,不作为执行时的强制约束。" ) code: Optional[str] = None # 规则验证代码 class JSONSchemaDefinition(BaseRule): """JSON Schema 定义 (用于API请求/响应体结构验证)""" category: Literal[RuleCategory.JSON_SCHEMA] = RuleCategory.JSON_SCHEMA schema_content: Dict[str, Any] # 实际的JSON Schema class APILintingRuleset(BaseRule): """API 设计规范/Linting规则 (例如,基于Spectral的OpenAPI规范校验规则集)""" category: Literal[RuleCategory.API_LINTING] = RuleCategory.API_LINTING ruleset_format: str = "spectral" # 例如 "spectral", "custom_json_rules" ruleset_content: Union[str, Dict[str, Any]] # 规则集内容或其引用(例如,文件路径或URL,或直接嵌入的规则字典) class BusinessAssertionTemplate(BaseRule): """业务逻辑断言模板或具体规则""" category: Literal[RuleCategory.BUSINESS_LOGIC] = RuleCategory.BUSINESS_LOGIC template_language: str = "python_expression" # 如 "python_expression", "jsonpath_assert" template_expression: str expected_parameters: Optional[List[str]] = None # 执行断言模板时需要输入的参数列表 class DataQualityRule(BaseRule): """数据质量校验规则""" category: Literal[RuleCategory.DATA_QUALITY] = RuleCategory.DATA_QUALITY # Specific fields for data quality rules, e.g.: field_name: Optional[str] = None validation_type: str # e.g., "regex", "range", "custom_function" validation_expression: str # regex pattern, range boundaries, function name/path class PythonCodeRule(BaseRule): """使用 Python 代码定义的规则 此类规则允许以 Python 代码片段形式定义复杂的验证逻辑,提供了最大的灵活性。 代码将在受控环境中执行,以确保安全性。 """ category: Literal[RuleCategory.PYTHON_CODE] = RuleCategory.PYTHON_CODE # Python 代码内容或文件路径 code_file: Optional[str] = None # 脚本入口函数名,默认为 validate entry_function: str = "validate" # 预期的入口函数参数 expected_parameters: Optional[List[str]] = None # 是否允许导入外部模块 (出于安全考虑,默认为 False) allow_imports: bool = False # 允许导入的模块列表 (如果 allow_imports 为 True) allowed_modules: Optional[List[str]] = None # 超时设置(秒),防止无限循环等恶意代码 timeout: int = 5 # 代码依赖的其他规则ID depends_on: Optional[List[str]] = None # 新增规则类型 class PerformanceRule(BaseRule): """性能相关规则,如响应时间、吞吐量等""" category: Literal[RuleCategory.PERFORMANCE] = RuleCategory.PERFORMANCE lifecycle: Literal[RuleLifecycle.RESPONSE_VALIDATION] = RuleLifecycle.RESPONSE_VALIDATION scope: Literal[RuleScope.RESPONSE_TIME] = RuleScope.RESPONSE_TIME threshold: Union[float, int] # 性能阈值 metric: str # 性能指标名称 unit: str = "ms" # 默认单位:毫秒 class SecurityRule(BaseRule): """安全相关规则,如身份验证、授权、加密等""" category: Literal[RuleCategory.SECURITY] = RuleCategory.SECURITY check_type: str # 安全检查类型(如认证、授权、加密等) expected_value: Optional[str] = None # 预期值 class RESTfulDesignRule(BaseRule): """RESTful API设计规则,如HTTP方法使用、URL设计等""" category: Literal[RuleCategory.API_DESIGN] = RuleCategory.API_DESIGN design_aspect: str # 设计方面(如URL、HTTP方法、参数等) pattern: Optional[str] = None # 匹配模式(如正则表达式) class ErrorHandlingRule(BaseRule): """错误处理规则,如错误码、错误消息等""" category: Literal[RuleCategory.ERROR_HANDLING] = RuleCategory.ERROR_HANDLING error_code: str # 错误码 expected_status: int # 预期HTTP状态码 expected_message: Optional[str] = None # 预期错误消息 # 更新联合类型以包含新规则类型 AnyRule = Union[ JSONSchemaDefinition, APILintingRuleset, BusinessAssertionTemplate, DataQualityRule, PythonCodeRule, PerformanceRule, SecurityRule, RESTfulDesignRule, ErrorHandlingRule, BaseRule # 通用或未明确分类的规则 ] class RuleQuery(BaseModel): """规则查询条件对象 (Pydantic模型)""" rule_id: Optional[str] = None category: Optional[RuleCategory] = None target_type: Optional[TargetType] = None target_identifier: Optional[str] = None version: Optional[str] = "latest" # 或特殊标识 ("latest", "stable", or specific version) tags: Optional[List[str]] = None is_enabled: Optional[bool] = True # 默认查询已启用的规则 # 新增字段 lifecycle: Optional[RuleLifecycle] = None scope: Optional[RuleScope] = None