190 lines
8.0 KiB
Python
190 lines
8.0 KiB
Python
"""Base Pydantic models for the application."""
|
||
from pydantic import BaseModel, Field
|
||
from typing import Optional, List, Dict, Any, Union, Literal
|
||
from enum import Enum
|
||
|
||
class SeverityLevel(str, Enum):
|
||
ERROR = "error"
|
||
WARNING = "warning"
|
||
INFO = "info"
|
||
|
||
class RuleCategory(str, Enum):
|
||
JSON_SCHEMA = "JSONSchema"
|
||
API_LINTING = "APILinting"
|
||
BUSINESS_LOGIC = "BusinessLogic"
|
||
DATA_QUALITY = "DataQuality"
|
||
SECURITY = "Security"
|
||
PERFORMANCE = "Performance"
|
||
MAPPING = "Mapping"
|
||
PYTHON_CODE = "PythonCode" # Python代码规则类别
|
||
API_DESIGN = "APIDesign" # API设计规则类别
|
||
ERROR_HANDLING = "ErrorHandling" # 错误处理规则类别
|
||
COMPATIBILITY = "Compatibility" # 兼容性规则类别
|
||
GENERIC = "Generic"
|
||
|
||
class TargetType(str, Enum):
|
||
API_REQUEST = "APIRequest"
|
||
API_RESPONSE = "APIResponse"
|
||
DATA_OBJECT = "DataObject"
|
||
OPENAPI_SPECIFICATION = "OpenAPISpecification"
|
||
GENERIC_TARGET = "GenericTarget"
|
||
|
||
class RuleLifecycle(str, Enum):
|
||
"""规则生命周期枚举,定义规则在测试流程中的适用阶段"""
|
||
REQUEST_PREPARATION = "请求准备阶段"
|
||
RESPONSE_VALIDATION = "响应验证阶段"
|
||
POST_VALIDATION = "后处理阶段"
|
||
API_SPECIFICATION_VALIDATION = "API规范验证阶段" # 新增的生命周期
|
||
ANY_STAGE = "任意阶段"
|
||
|
||
class RuleScope(str, Enum):
|
||
"""规则作用域枚举,定义规则针对的具体对象"""
|
||
REQUEST_URL = "请求URL"
|
||
REQUEST_HEADERS = "请求头"
|
||
REQUEST_PARAMS = "请求参数"
|
||
REQUEST_BODY = "请求体"
|
||
RESPONSE_STATUS = "响应状态码"
|
||
RESPONSE_HEADERS = "响应头"
|
||
RESPONSE_BODY = "响应体"
|
||
RESPONSE_TIME = "响应时间"
|
||
SECURITY = "安全性"
|
||
PERFORMANCE = "性能"
|
||
ANY_SCOPE = "任意作用域"
|
||
|
||
class BaseRule(BaseModel):
|
||
"""通用规则属性 (Pydantic基类 BaseRule)"""
|
||
id: str # 规则唯一ID
|
||
name: str # 规则名称
|
||
description: Optional[str] = None # 规则详细描述
|
||
category: RuleCategory # 规则类别
|
||
version: str = "1.0.0" # 版本号
|
||
severity: SeverityLevel = SeverityLevel.INFO # 严重性
|
||
source: Optional[str] = None # 规则来源 (例如 "PlatformStandard-XYZ", "OWASP-ASVS")
|
||
is_enabled: bool = True # 是否启用
|
||
tags: Optional[List[str]] = None # 规则标签 (如 "critical", "wellbore")
|
||
target_type: Optional[TargetType] = None # 规则适用目标类型 (描述性元数据)
|
||
target_identifier: Optional[str] = None # 规则适用目标的具体标识 (描述性元数据)
|
||
author: Optional[str] = None # 规则作者
|
||
# 新增字段
|
||
lifecycle: Optional[RuleLifecycle] = Field(
|
||
default=RuleLifecycle.ANY_STAGE,
|
||
description="规则的推荐适用阶段或类型。主要用于识别 API_SPECIFICATION_VALIDATION (静态分析) 规则。"
|
||
"对于其他动态规则,此字段为元数据提示,实际执行阶段由 TestStep 定义。"
|
||
)
|
||
scope: Optional[RuleScope] = Field(
|
||
default=RuleScope.ANY_SCOPE,
|
||
description="描述规则逻辑主要关注的API交互部分 (例如,请求头、响应体)。"
|
||
"此字段为描述性元数据,用于规则分类和理解,不作为执行时的强制约束。"
|
||
)
|
||
code: Optional[str] = None # 规则验证代码
|
||
|
||
class JSONSchemaDefinition(BaseRule):
|
||
"""JSON Schema 定义 (用于API请求/响应体结构验证)"""
|
||
category: Literal[RuleCategory.JSON_SCHEMA] = RuleCategory.JSON_SCHEMA
|
||
schema_content: Dict[str, Any] # 实际的JSON Schema
|
||
|
||
class APILintingRuleset(BaseRule):
|
||
"""API 设计规范/Linting规则 (例如,基于Spectral的OpenAPI规范校验规则集)"""
|
||
category: Literal[RuleCategory.API_LINTING] = RuleCategory.API_LINTING
|
||
ruleset_format: str = "spectral" # 例如 "spectral", "custom_json_rules"
|
||
ruleset_content: Union[str, Dict[str, Any]] # 规则集内容或其引用(例如,文件路径或URL,或直接嵌入的规则字典)
|
||
|
||
class BusinessAssertionTemplate(BaseRule):
|
||
"""业务逻辑断言模板或具体规则"""
|
||
category: Literal[RuleCategory.BUSINESS_LOGIC] = RuleCategory.BUSINESS_LOGIC
|
||
template_language: str = "python_expression" # 如 "python_expression", "jsonpath_assert"
|
||
template_expression: str
|
||
expected_parameters: Optional[List[str]] = None # 执行断言模板时需要输入的参数列表
|
||
|
||
class DataQualityRule(BaseRule):
|
||
"""数据质量校验规则"""
|
||
category: Literal[RuleCategory.DATA_QUALITY] = RuleCategory.DATA_QUALITY
|
||
# Specific fields for data quality rules, e.g.:
|
||
field_name: Optional[str] = None
|
||
validation_type: str # e.g., "regex", "range", "custom_function"
|
||
validation_expression: str # regex pattern, range boundaries, function name/path
|
||
|
||
class PythonCodeRule(BaseRule):
|
||
"""使用 Python 代码定义的规则
|
||
|
||
此类规则允许以 Python 代码片段形式定义复杂的验证逻辑,提供了最大的灵活性。
|
||
代码将在受控环境中执行,以确保安全性。
|
||
"""
|
||
category: Literal[RuleCategory.PYTHON_CODE] = RuleCategory.PYTHON_CODE
|
||
|
||
# Python 代码内容或文件路径
|
||
code_file: Optional[str] = None
|
||
|
||
# 脚本入口函数名,默认为 validate
|
||
entry_function: str = "validate"
|
||
|
||
# 预期的入口函数参数
|
||
expected_parameters: Optional[List[str]] = None
|
||
|
||
# 是否允许导入外部模块 (出于安全考虑,默认为 False)
|
||
allow_imports: bool = False
|
||
|
||
# 允许导入的模块列表 (如果 allow_imports 为 True)
|
||
allowed_modules: Optional[List[str]] = None
|
||
|
||
# 超时设置(秒),防止无限循环等恶意代码
|
||
timeout: int = 5
|
||
|
||
# 代码依赖的其他规则ID
|
||
depends_on: Optional[List[str]] = None
|
||
|
||
# 新增规则类型
|
||
class PerformanceRule(BaseRule):
|
||
"""性能相关规则,如响应时间、吞吐量等"""
|
||
category: Literal[RuleCategory.PERFORMANCE] = RuleCategory.PERFORMANCE
|
||
lifecycle: Literal[RuleLifecycle.RESPONSE_VALIDATION] = RuleLifecycle.RESPONSE_VALIDATION
|
||
scope: Literal[RuleScope.RESPONSE_TIME] = RuleScope.RESPONSE_TIME
|
||
threshold: Union[float, int] # 性能阈值
|
||
metric: str # 性能指标名称
|
||
unit: str = "ms" # 默认单位:毫秒
|
||
|
||
class SecurityRule(BaseRule):
|
||
"""安全相关规则,如身份验证、授权、加密等"""
|
||
category: Literal[RuleCategory.SECURITY] = RuleCategory.SECURITY
|
||
check_type: str # 安全检查类型(如认证、授权、加密等)
|
||
expected_value: Optional[str] = None # 预期值
|
||
|
||
class RESTfulDesignRule(BaseRule):
|
||
"""RESTful API设计规则,如HTTP方法使用、URL设计等"""
|
||
category: Literal[RuleCategory.API_DESIGN] = RuleCategory.API_DESIGN
|
||
design_aspect: str # 设计方面(如URL、HTTP方法、参数等)
|
||
pattern: Optional[str] = None # 匹配模式(如正则表达式)
|
||
|
||
class ErrorHandlingRule(BaseRule):
|
||
"""错误处理规则,如错误码、错误消息等"""
|
||
category: Literal[RuleCategory.ERROR_HANDLING] = RuleCategory.ERROR_HANDLING
|
||
error_code: str # 错误码
|
||
expected_status: int # 预期HTTP状态码
|
||
expected_message: Optional[str] = None # 预期错误消息
|
||
|
||
# 更新联合类型以包含新规则类型
|
||
AnyRule = Union[
|
||
JSONSchemaDefinition,
|
||
APILintingRuleset,
|
||
BusinessAssertionTemplate,
|
||
DataQualityRule,
|
||
PythonCodeRule,
|
||
PerformanceRule,
|
||
SecurityRule,
|
||
RESTfulDesignRule,
|
||
ErrorHandlingRule,
|
||
BaseRule # 通用或未明确分类的规则
|
||
]
|
||
|
||
class RuleQuery(BaseModel):
|
||
"""规则查询条件对象 (Pydantic模型)"""
|
||
rule_id: Optional[str] = None
|
||
category: Optional[RuleCategory] = None
|
||
target_type: Optional[TargetType] = None
|
||
target_identifier: Optional[str] = None
|
||
version: Optional[str] = "latest" # 或特殊标识 ("latest", "stable", or specific version)
|
||
tags: Optional[List[str]] = None
|
||
is_enabled: Optional[bool] = True # 默认查询已启用的规则
|
||
# 新增字段
|
||
lifecycle: Optional[RuleLifecycle] = None
|
||
scope: Optional[RuleScope] = None |