From 8df41527d6eae2b81f5ad6e76959009d81f71b4a Mon Sep 17 00:00:00 2001 From: gongwenxin Date: Fri, 8 Aug 2025 00:43:22 +0800 Subject: [PATCH] fix:stage --- custom_stages/dms_crud_scenario_stage.py | 108 ++++++++++---- ddms_compliance_suite/test_orchestrator.py | 30 +++- run_api_tests.py | 3 +- test_dms_multi_stage.py | 164 +++++++++++++++++++++ 4 files changed, 270 insertions(+), 35 deletions(-) create mode 100644 test_dms_multi_stage.py diff --git a/custom_stages/dms_crud_scenario_stage.py b/custom_stages/dms_crud_scenario_stage.py index 448edf9..ab150b2 100644 --- a/custom_stages/dms_crud_scenario_stage.py +++ b/custom_stages/dms_crud_scenario_stage.py @@ -173,7 +173,7 @@ class DmsCrudScenarioStage(BaseAPIStage): name = "DMS Full CRUD Scenario" description = "Performs a full Create -> Read -> Update -> Read -> Delete -> List workflow for a single DMS business object." tags = ["dms", "crud", "scenario"] - continue_on_failure = False + continue_on_failure = True # 🔧 修改为True,让Stage在失败时继续执行其他场景 # DMS Stage专用配置:自动启用LLM智能数据生成 enable_llm_data_generation = True # 如果LLM服务可用,自动使用LLM生成测试数据 @@ -183,54 +183,106 @@ class DmsCrudScenarioStage(BaseAPIStage): # scenarios will be populated by is_applicable_to_api_group self.scenarios: List[Dict[str, Endpoint]] = [] self.current_scenario_index = -1 - - def is_applicable_to_api_group(self, api_group_name: Optional[str], global_api_spec: ParsedAPISpec) -> bool: - """ - Checks if the group of APIs contains at least one full CRUD set for a DMS object. - A full set consists of create, read, update, delete, and list operations. - """ - endpoints_in_group = self.apis_in_group - - # We only care about DMS endpoints for this stage - dms_endpoints = [ep for ep in endpoints_in_group if isinstance(ep, DMSEndpoint)] - if not dms_endpoints: - return False + # 新增:指定要处理的场景索引(用于多实例模式) + self.target_scenario_index: Optional[int] = kwargs.get('target_scenario_index', None) - # Group endpoints by base resource name from operation_id + @staticmethod + def discover_crud_scenarios(parsed_spec: ParsedAPISpec) -> List[Dict[str, Any]]: + """ + 静态方法:发现所有完整的DMS CRUD场景 + 返回场景信息列表,每个场景包含resource_name和endpoints + """ + if not isinstance(parsed_spec, ParsedAPISpec): + return [] + + # 只处理DMS端点 + dms_endpoints = [ep for ep in parsed_spec.endpoints if isinstance(ep, DMSEndpoint)] + if not dms_endpoints: + return [] + + # 按资源名称分组端点 grouped_ops = defaultdict(dict) for ep in dms_endpoints: if not ep.operation_id: continue - + parts = ep.operation_id.split('_', 1) if len(parts) != 2: continue - + op_type, resource_name = parts grouped_ops[resource_name][op_type] = ep - # Find complete scenarios + # 找到完整的CRUD场景 required_ops = {'create', 'read', 'update', 'delete', 'list'} + scenarios = [] for resource_name, ops in grouped_ops.items(): if required_ops.issubset(ops.keys()): - self.scenarios.append(ops) - self.logger.info(f"Found complete CRUD scenario for DMS resource: '{resource_name}'") + scenarios.append({ + 'resource_name': resource_name, + 'endpoints': ops, + 'virtual_group_name': f"dms_crud_{resource_name}" + }) - return len(self.scenarios) > 0 + return scenarios + + def is_applicable_to_api_group(self, api_group_name: Optional[str], global_api_spec: ParsedAPISpec) -> bool: + """ + 检查此Stage是否适用于给定的API分组。 + 支持两种模式: + 1. 传统模式:处理所有发现的CRUD场景 + 2. 单场景模式:只处理指定索引的场景(通过virtual_group_name匹配) + """ + # 使用静态方法发现所有场景 + all_scenarios = self.discover_crud_scenarios(global_api_spec) + if not all_scenarios: + return False + + # 如果指定了虚拟分组名称,只处理匹配的场景 + if api_group_name and api_group_name.startswith('dms_crud_'): + # 单场景模式:只处理匹配的场景 + target_scenario = None + for scenario in all_scenarios: + if scenario['virtual_group_name'] == api_group_name: + target_scenario = scenario + break + + if target_scenario: + self.scenarios = [target_scenario['endpoints']] + self.logger.info(f"DMS Stage (单场景模式) 匹配到场景: {target_scenario['resource_name']}") + return True + else: + self.logger.info(f"DMS Stage (单场景模式) 未找到匹配的场景: {api_group_name}") + return False + else: + # 传统模式:处理所有场景(向后兼容) + self.scenarios = [scenario['endpoints'] for scenario in all_scenarios] + self.logger.info(f"DMS Stage (传统模式) 发现 {len(self.scenarios)} 个完整的CRUD场景") + return len(self.scenarios) > 0 def before_stage(self, stage_context: dict, global_api_spec: ParsedAPISpec, api_group_name: str | None): """ - Set up the context for the next scenario to run. - This will be called by the orchestrator. We will use it to prepare for a single scenario execution. + 为要运行的场景设置上下文。 + 在单场景模式下,只处理一个场景。 """ - self.current_scenario_index += 1 - if self.current_scenario_index >= len(self.scenarios): - # Should not happen if orchestrator works as expected (one stage instance per scenario) - # but as a safeguard. - raise Exception("No more scenarios to run.") + # 设置当前场景索引 + self.current_scenario_index = 0 + if len(self.scenarios) == 0: + raise Exception("No CRUD scenarios found to run.") current_scenario = self.scenarios[self.current_scenario_index] - self.logger.info(f"Setting up before_stage for scenario: {list(current_scenario.keys())}") + + # 获取资源名称用于日志 + resource_name = "unknown" + if current_scenario: + first_op = next(iter(current_scenario.values())) + if hasattr(first_op, 'operation_id') and first_op.operation_id: + parts = first_op.operation_id.split('_', 1) + if len(parts) == 2: + resource_name = parts[1] + + self.logger.info(f"🎯 DMS Stage设置场景上下文: {resource_name} (分组: {api_group_name})") + self.logger.info(f"📋 场景包含操作: {list(current_scenario.keys())}") # Get the 'create' endpoint to determine the primary key create_op: DMSEndpoint = current_scenario.get('create') diff --git a/ddms_compliance_suite/test_orchestrator.py b/ddms_compliance_suite/test_orchestrator.py index 31cfc62..582bd4e 100644 --- a/ddms_compliance_suite/test_orchestrator.py +++ b/ddms_compliance_suite/test_orchestrator.py @@ -2446,17 +2446,27 @@ class APITestOrchestrator: if isinstance(parsed_spec, ParsedYAPISpec): if parsed_spec.categories: api_groups_to_iterate.extend([cat.get('name') for cat in parsed_spec.categories if cat.get('name')]) - if not api_groups_to_iterate: + if not api_groups_to_iterate: self.logger.info("YAPI规范: 未找到已命名的分类,或分类列表为空。将阶段应用于整个规范 (api_group_name=None).") - api_groups_to_iterate.append(None) + api_groups_to_iterate.append(None) elif isinstance(parsed_spec, ParsedSwaggerSpec): # For Swagger, iterate through globally defined tags. Stages can then filter APIs based on these tags. # If a stage is designed for broader application, it can ignore the specific group name. - if parsed_spec.tags: + if parsed_spec.tags: api_groups_to_iterate.extend([tag.get('name') for tag in parsed_spec.tags if tag.get('name')]) if not api_groups_to_iterate: self.logger.info("Swagger规范: 未找到已定义的标签,或标签列表为空。将阶段应用于整个规范 (api_group_name=None).") api_groups_to_iterate.append(None) + elif hasattr(parsed_spec, 'spec_type') and parsed_spec.spec_type == 'dms': + # 🔧 DMS特殊处理:为每个CRUD场景创建虚拟分组 + from custom_stages.dms_crud_scenario_stage import DmsCrudScenarioStage + dms_scenarios = DmsCrudScenarioStage.discover_crud_scenarios(parsed_spec) + if dms_scenarios: + api_groups_to_iterate.extend([scenario['virtual_group_name'] for scenario in dms_scenarios]) + self.logger.info(f"DMS规范: 发现 {len(dms_scenarios)} 个CRUD场景,创建对应的虚拟分组: {[s['virtual_group_name'] for s in dms_scenarios]}") + else: + self.logger.info("DMS规范: 未发现完整的CRUD场景,将阶段应用于整个规范 (api_group_name=None).") + api_groups_to_iterate.append(None) else: self.logger.warning(f"未知的解析规范类型: {type(parsed_spec)}。将阶段应用于整个规范 (api_group_name=None).") api_groups_to_iterate.append(None) @@ -2531,11 +2541,21 @@ class APITestOrchestrator: current_group_metadata = {'name': current_api_group_name, 'description': '标签定义可能仅在操作上'} apis_for_current_group_objects = [ - api for api in parsed_spec.endpoints + api for api in parsed_spec.endpoints if isinstance(api, SwaggerEndpoint) and hasattr(api, 'tags') and isinstance(api.tags, list) and current_api_group_name in api.tags ] self.logger.debug(f"For Swagger group '{current_api_group_name}', selected {len(apis_for_current_group_objects)} endpoint objects.") - else: + elif hasattr(parsed_spec, 'spec_type') and parsed_spec.spec_type == 'dms' and current_api_group_name and current_api_group_name.startswith('dms_crud_'): + # 🔧 DMS虚拟分组处理:为特定的CRUD场景提供所有DMS端点 + resource_name = current_api_group_name.replace('dms_crud_', '') + current_group_metadata = { + 'name': current_api_group_name, + 'description': f'DMS CRUD场景: {resource_name}' + } + # 为DMS虚拟分组提供所有DMS端点(Stage会自己过滤) + apis_for_current_group_objects = list(parsed_spec.endpoints) + self.logger.debug(f"For DMS virtual group '{current_api_group_name}', provided {len(apis_for_current_group_objects)} DMS endpoint objects.") + else: self.logger.warning(f"Unknown spec type ({type(parsed_spec)}) for group '{current_api_group_name}'.") current_group_metadata = {"name": current_api_group_name, "description": f"未知规范类型 ({type(parsed_spec)}) 的分组"} else: diff --git a/run_api_tests.py b/run_api_tests.py index 2779956..bf17964 100644 --- a/run_api_tests.py +++ b/run_api_tests.py @@ -86,8 +86,7 @@ def parse_args(): # 新增:LLM 配置选项 llm_group = parser.add_argument_group('LLM 配置选项 (可选)') llm_group.add_argument('--llm-api-key', - # default=os.environ.get("OPENAI_API_KEY"), # 尝试从环境变量获取 - default='sk-0213c70194624703a1d0d80e0f762b0e', # 尝试从环境变量获取 + default=os.environ.get("OPENAI_API_KEY"), # 尝试从环境变量获取 help='LLM服务的API密钥 (例如 OpenAI API Key)。默认从环境变量 OPENAI_API_KEY 读取。') llm_group.add_argument('--llm-base-url', default="https://dashscope.aliyuncs.com/compatible-mode/v1", diff --git a/test_dms_multi_stage.py b/test_dms_multi_stage.py new file mode 100644 index 0000000..486880b --- /dev/null +++ b/test_dms_multi_stage.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +""" +测试DMS多Stage实例功能的脚本 +""" + +import sys +import os +from collections import defaultdict +from typing import Dict, List, Any, Optional + +# 简化的测试,不依赖完整的框架 + +class MockDMSEndpoint: + """模拟的DMS端点""" + def __init__(self, method: str, path: str, operation_id: str): + self.method = method + self.path = path + self.operation_id = operation_id + +class MockParsedDMSSpec: + """模拟的DMS规范""" + def __init__(self, endpoints: List[MockDMSEndpoint]): + self.endpoints = endpoints + self.spec_type = 'dms' + +def create_mock_dms_spec(): + """创建模拟的DMS规范,包含3个完整的CRUD场景""" + + # 模拟3个资源的CRUD端点 + resources = ['cd_site', 'cd_well_event_log', 'op_pro_well_stim_daily'] + endpoints = [] + + for resource in resources: + # 为每个资源创建5个CRUD操作 + operations = ['create', 'read', 'update', 'delete', 'list'] + for op in operations: + endpoint = MockDMSEndpoint( + method='POST' if op in ['create', 'list'] else 'GET' if op == 'read' else 'PUT' if op == 'update' else 'DELETE', + path=f'/api/dms/wb_cd/v1/{resource}', + operation_id=f'{op}_{resource}' + ) + endpoints.append(endpoint) + + return MockParsedDMSSpec(endpoints=endpoints) + +def discover_crud_scenarios_mock(parsed_spec): + """模拟场景发现逻辑""" + if not hasattr(parsed_spec, 'endpoints'): + return [] + + # 按资源名称分组端点 + grouped_ops = defaultdict(dict) + for ep in parsed_spec.endpoints: + if not hasattr(ep, 'operation_id') or not ep.operation_id: + continue + + parts = ep.operation_id.split('_', 1) + if len(parts) != 2: + continue + + op_type, resource_name = parts + grouped_ops[resource_name][op_type] = ep + + # 找到完整的CRUD场景 + required_ops = {'create', 'read', 'update', 'delete', 'list'} + scenarios = [] + for resource_name, ops in grouped_ops.items(): + if required_ops.issubset(ops.keys()): + scenarios.append({ + 'resource_name': resource_name, + 'endpoints': ops, + 'virtual_group_name': f"dms_crud_{resource_name}" + }) + + return scenarios + +def test_scenario_discovery(): + """测试场景发现功能""" + print("=== 测试DMS场景发现功能 ===") + + mock_spec = create_mock_dms_spec() + scenarios = discover_crud_scenarios_mock(mock_spec) + + print(f"发现的场景数量: {len(scenarios)}") + for i, scenario in enumerate(scenarios): + print(f"场景 {i+1}: {scenario['resource_name']} -> {scenario['virtual_group_name']}") + print(f" 包含操作: {list(scenario['endpoints'].keys())}") + + assert len(scenarios) == 3, f"期望3个场景,实际发现{len(scenarios)}个" + + expected_resources = ['cd_site', 'cd_well_event_log', 'op_pro_well_stim_daily'] + actual_resources = [s['resource_name'] for s in scenarios] + assert set(actual_resources) == set(expected_resources), f"资源名称不匹配: {actual_resources}" + + print("✅ 场景发现测试通过") + +def test_virtual_group_logic(): + """测试虚拟分组逻辑""" + print("\n=== 测试虚拟分组逻辑 ===") + + mock_spec = create_mock_dms_spec() + scenarios = discover_crud_scenarios_mock(mock_spec) + + # 测试单场景匹配 + target_group = 'dms_crud_cd_site' + matched_scenario = None + for scenario in scenarios: + if scenario['virtual_group_name'] == target_group: + matched_scenario = scenario + break + + assert matched_scenario is not None, f"应该找到匹配的场景: {target_group}" + assert matched_scenario['resource_name'] == 'cd_site', "资源名称应该匹配" + + expected_ops = {'create', 'read', 'update', 'delete', 'list'} + actual_ops = set(matched_scenario['endpoints'].keys()) + assert actual_ops == expected_ops, f"场景操作不完整: {actual_ops}" + + print(f"✅ 虚拟分组 '{target_group}' 匹配成功") + print(f" 资源名称: {matched_scenario['resource_name']}") + print(f" 包含操作: {list(matched_scenario['endpoints'].keys())}") + +def test_orchestrator_logic(): + """测试orchestrator逻辑""" + print("\n=== 测试Orchestrator逻辑 ===") + + mock_spec = create_mock_dms_spec() + scenarios = discover_crud_scenarios_mock(mock_spec) + + # 模拟orchestrator的api_groups_to_iterate逻辑 + api_groups_to_iterate = [] + if hasattr(mock_spec, 'spec_type') and mock_spec.spec_type == 'dms': + if scenarios: + api_groups_to_iterate.extend([scenario['virtual_group_name'] for scenario in scenarios]) + print(f"DMS规范: 发现 {len(scenarios)} 个CRUD场景,创建对应的虚拟分组") + else: + api_groups_to_iterate.append(None) + + print(f"生成的API分组列表: {api_groups_to_iterate}") + + # 验证结果 + expected_groups = [ + 'dms_crud_cd_site', + 'dms_crud_cd_well_event_log', + 'dms_crud_op_pro_well_stim_daily' + ] + + assert len(api_groups_to_iterate) == 3, f"应该生成3个分组,实际生成{len(api_groups_to_iterate)}个" + assert set(api_groups_to_iterate) == set(expected_groups), f"分组名称不匹配: {api_groups_to_iterate}" + + print("✅ Orchestrator逻辑测试通过") + +if __name__ == "__main__": + test_scenario_discovery() + test_virtual_group_logic() + test_orchestrator_logic() + + print("\n🎉 所有DMS多Stage实例测试通过!") + print("💡 现在orchestrator应该能为每个CRUD场景创建独立的Stage实例了") + print("\n📋 预期结果:") + print("- 10个model → 10个虚拟分组 → 10个Stage实例") + print("- 每个实例只处理一个CRUD场景") + print("- 一个实例失败不影响其他实例") + print("- 最终显示10个Stage执行结果")