fix:stage

This commit is contained in:
gongwenxin 2025-08-08 01:07:16 +08:00
parent 8df41527d6
commit a472abbd51
11 changed files with 514851 additions and 1710 deletions

View File

@ -2700,7 +2700,8 @@ class APITestOrchestrator:
summary.finalize_summary()
return summary, None
self.run_stages_from_spec(parsed_spec, summary)
# 🔧 移除重复的run_stages_from_spec调用
# Stage执行将在主程序中统一处理
summary = self._execute_tests_from_parsed_spec(parsed_spec, summary, categories=categories, custom_test_cases_dir=custom_test_cases_dir)
summary.finalize_summary()

View File

@ -86,7 +86,8 @@ def parse_args():
# 新增LLM 配置选项
llm_group = parser.add_argument_group('LLM 配置选项 (可选)')
llm_group.add_argument('--llm-api-key',
default=os.environ.get("OPENAI_API_KEY"), # 尝试从环境变量获取
# default=os.environ.get("OPENAI_API_KEY"), # 尝试从环境变量获取
default='sk-0213c70194624703a1d0d80e0f762b0e',
help='LLM服务的API密钥 (例如 OpenAI API Key)。默认从环境变量 OPENAI_API_KEY 读取。')
llm_group.add_argument('--llm-base-url',
default="https://dashscope.aliyuncs.com/compatible-mode/v1",

514845
summary(3).json Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,298 +0,0 @@
#!/usr/bin/env python3
"""
测试修正后的多主键删除逻辑
基于identityId列表长度而不是schema结构
"""
import sys
import json
from unittest.mock import Mock
from custom_stages.dms_crud_scenario_stage import DmsCrudScenarioStage
from ddms_compliance_suite.input_parser.parser import DMSEndpoint
def test_single_key_by_identity_id():
"""测试单主键删除基于identityId长度=1"""
print("🧪 测试单主键删除identityId长度=1")
print("=" * 60)
# 创建模拟的删除端点identityId只有一个元素
mock_delete_endpoint = Mock(spec=DMSEndpoint)
mock_delete_endpoint.identity_id_list = ["proppantId"] # 单主键
# 创建模拟的scenario
scenario = {"delete": mock_delete_endpoint}
# 创建CRUD Stage实例
crud_stage = DmsCrudScenarioStage(
api_group_metadata={"name": "测试"},
apis_in_group=[],
global_api_spec=Mock()
)
# 测试构建删除请求体
create_payload = {"proppantId": "proppant_001", "proppantName": "测试支撑剂"}
delete_body = crud_stage._build_delete_request_body(
scenario, "proppantId", "proppant_001", create_payload
)
print(f"单主键删除请求体: {json.dumps(delete_body, indent=2, ensure_ascii=False)}")
# 验证结果:应该是字符串数组格式
expected_structure = {"data": ["proppant_001"]}
if delete_body == expected_structure:
print("✅ 单主键删除请求体格式正确(字符串数组)")
return True
else:
print(f"❌ 单主键删除请求体格式错误,期望: {expected_structure}")
return False
def test_multi_key_by_identity_id():
"""测试多主键删除基于identityId长度>1"""
print("\n🧪 测试多主键删除identityId长度>1")
print("=" * 60)
# 创建模拟的删除端点identityId有多个元素
mock_delete_endpoint = Mock(spec=DMSEndpoint)
mock_delete_endpoint.identity_id_list = ["projectId", "surveyId"] # 多主键
# 创建模拟的scenario
scenario = {"delete": mock_delete_endpoint}
# 创建CRUD Stage实例
crud_stage = DmsCrudScenarioStage(
api_group_metadata={"name": "测试"},
apis_in_group=[],
global_api_spec=Mock()
)
# 测试构建删除请求体
create_payload = {
"projectId": "项目1_ID",
"surveyId": "工区1_ID",
"projectName": "测试项目",
"surveyName": "测试工区"
}
delete_body = crud_stage._build_delete_request_body(
scenario, "projectId", "项目1_ID", create_payload
)
print(f"多主键删除请求体: {json.dumps(delete_body, indent=2, ensure_ascii=False)}")
# 验证结果:应该是对象数组格式
if isinstance(delete_body, dict) and "data" in delete_body:
data_array = delete_body["data"]
if isinstance(data_array, list) and len(data_array) > 0:
first_item = data_array[0]
# 检查第一个对象是否包含正确的主键
if (isinstance(first_item, dict) and
first_item.get("projectId") == "项目1_ID" and
first_item.get("surveyId") == "工区1_ID"):
print("✅ 多主键删除请求体格式正确(对象数组)")
print(f"✅ 包含主键: projectId={first_item['projectId']}, surveyId={first_item['surveyId']}")
# 检查是否有版本号
if delete_body.get("version"):
print(f"✅ 包含版本号: {delete_body['version']}")
# 检查是否支持批量删除
if len(data_array) > 1:
print(f"✅ 支持批量删除,共{len(data_array)}个对象")
return True
else:
print(f"❌ 主键字段不正确: {first_item}")
return False
else:
print(f"❌ data数组格式错误: {data_array}")
return False
else:
print(f"❌ 删除请求体格式错误: {delete_body}")
return False
def test_three_key_scenario():
"""测试三主键删除场景"""
print("\n🧪 测试三主键删除场景")
print("=" * 60)
# 创建模拟的删除端点identityId有三个元素
mock_delete_endpoint = Mock(spec=DMSEndpoint)
mock_delete_endpoint.identity_id_list = ["wellId", "layerId", "sampleId"] # 三主键
scenario = {"delete": mock_delete_endpoint}
crud_stage = DmsCrudScenarioStage(
api_group_metadata={"name": "测试"},
apis_in_group=[],
global_api_spec=Mock()
)
# 创建负载只包含部分主键
create_payload = {
"wellId": "井001",
"layerId": "层001",
# 注意缺少sampleId
"wellName": "测试井",
"layerName": "测试层"
}
delete_body = crud_stage._build_delete_request_body(
scenario, "wellId", "井001", create_payload
)
print(f"三主键删除请求体: {json.dumps(delete_body, indent=2, ensure_ascii=False)}")
# 验证结果
if isinstance(delete_body, dict) and "data" in delete_body:
data_array = delete_body["data"]
if isinstance(data_array, list) and len(data_array) > 0:
first_item = data_array[0]
# 检查是否包含所有三个主键
required_keys = ["wellId", "layerId", "sampleId"]
missing_keys = [key for key in required_keys if key not in first_item]
if not missing_keys:
print("✅ 成功生成所有三个主键字段")
print(f"✅ wellId: {first_item['wellId']}")
print(f"✅ layerId: {first_item['layerId']}")
print(f"✅ sampleId: {first_item['sampleId']} (自动生成)")
return True
else:
print(f"❌ 缺失主键字段: {missing_keys}")
return False
else:
print(f"❌ data数组格式错误: {data_array}")
return False
else:
print(f"❌ 删除请求体格式错误: {delete_body}")
return False
def test_empty_identity_id_fallback():
"""测试identityId为空时的回退逻辑"""
print("\n🧪 测试identityId为空时的回退逻辑")
print("=" * 60)
# 创建模拟的删除端点identityId为空
mock_delete_endpoint = Mock(spec=DMSEndpoint)
mock_delete_endpoint.identity_id_list = [] # 空列表
scenario = {"delete": mock_delete_endpoint}
crud_stage = DmsCrudScenarioStage(
api_group_metadata={"name": "测试"},
apis_in_group=[],
global_api_spec=Mock()
)
create_payload = {"siteId": "test_site_001"}
delete_body = crud_stage._build_delete_request_body(
scenario, "siteId", "test_site_001", create_payload
)
print(f"回退删除请求体: {json.dumps(delete_body, indent=2, ensure_ascii=False)}")
# 验证回退结果
expected_fallback = {"data": ["test_site_001"]}
if delete_body == expected_fallback:
print("✅ identityId为空时正确回退到简单格式")
return True
else:
print("❌ identityId为空时回退失败")
return False
def test_logic_comparison():
"""对比新旧逻辑的差异"""
print("\n🧪 对比新旧逻辑的差异")
print("=" * 60)
print("📋 新逻辑基于identityId:")
print("- 单主键: identityId = ['proppantId'] → 字符串数组")
print("- 多主键: identityId = ['projectId', 'surveyId'] → 对象数组")
print("- 判断依据: len(identity_id_list)")
print("\n📋 旧逻辑基于schema:")
print("- 单主键: items.type = 'string' → 字符串数组")
print("- 多主键: items.type = 'object' → 对象数组")
print("- 判断依据: schema结构分析")
print("\n✅ 新逻辑的优势:")
print("- 直接基于业务配置identityId")
print("- 不依赖schema解析")
print("- 更准确反映业务意图")
print("- 支持任意数量的主键组合")
return True
def main():
"""主函数"""
print("🚀 修正后的多主键删除逻辑测试")
print("=" * 80)
success_count = 0
total_tests = 5
# 测试1: 单主键删除
if test_single_key_by_identity_id():
success_count += 1
# 测试2: 多主键删除
if test_multi_key_by_identity_id():
success_count += 1
# 测试3: 三主键删除
if test_three_key_scenario():
success_count += 1
# 测试4: 空identityId回退
if test_empty_identity_id_fallback():
success_count += 1
# 测试5: 逻辑对比
if test_logic_comparison():
success_count += 1
# 总结
print("\n" + "=" * 80)
print("📋 测试总结")
print("=" * 80)
print(f"通过测试: {success_count}/{total_tests}")
if success_count == total_tests:
print("🎉 修正后的多主键删除逻辑测试通过!")
print("\n✅ 修正内容:")
print("1. 判断依据改为identityId列表长度")
print(" - len(identity_id_list) == 1 → 单主键(字符串数组)")
print(" - len(identity_id_list) > 1 → 多主键(对象数组)")
print("\n2. DMSEndpoint增加identity_id_list属性")
print(" - 存储完整的identityId配置")
print(" - 在解析DMS API时自动设置")
print("\n3. 删除schema自动生成")
print(" - 单主键: {\"data\": [\"key1\", \"key2\"]}")
print(" - 多主键: {\"version\": \"1.0.0\", \"data\": [{\"key1\": \"val1\", \"key2\": \"val2\"}]}")
print("\n💡 配置示例:")
print("单主键: \"identityId\": [\"proppantId\"]")
print("多主键: \"identityId\": [\"projectId\", \"surveyId\"]")
print("三主键: \"identityId\": [\"wellId\", \"layerId\", \"sampleId\"]")
sys.exit(0)
else:
print("❌ 部分测试失败")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,164 +0,0 @@
#!/usr/bin/env python3
"""
测试DMS多Stage实例功能的脚本
"""
import sys
import os
from collections import defaultdict
from typing import Dict, List, Any, Optional
# 简化的测试,不依赖完整的框架
class MockDMSEndpoint:
"""模拟的DMS端点"""
def __init__(self, method: str, path: str, operation_id: str):
self.method = method
self.path = path
self.operation_id = operation_id
class MockParsedDMSSpec:
"""模拟的DMS规范"""
def __init__(self, endpoints: List[MockDMSEndpoint]):
self.endpoints = endpoints
self.spec_type = 'dms'
def create_mock_dms_spec():
"""创建模拟的DMS规范包含3个完整的CRUD场景"""
# 模拟3个资源的CRUD端点
resources = ['cd_site', 'cd_well_event_log', 'op_pro_well_stim_daily']
endpoints = []
for resource in resources:
# 为每个资源创建5个CRUD操作
operations = ['create', 'read', 'update', 'delete', 'list']
for op in operations:
endpoint = MockDMSEndpoint(
method='POST' if op in ['create', 'list'] else 'GET' if op == 'read' else 'PUT' if op == 'update' else 'DELETE',
path=f'/api/dms/wb_cd/v1/{resource}',
operation_id=f'{op}_{resource}'
)
endpoints.append(endpoint)
return MockParsedDMSSpec(endpoints=endpoints)
def discover_crud_scenarios_mock(parsed_spec):
"""模拟场景发现逻辑"""
if not hasattr(parsed_spec, 'endpoints'):
return []
# 按资源名称分组端点
grouped_ops = defaultdict(dict)
for ep in parsed_spec.endpoints:
if not hasattr(ep, 'operation_id') or not ep.operation_id:
continue
parts = ep.operation_id.split('_', 1)
if len(parts) != 2:
continue
op_type, resource_name = parts
grouped_ops[resource_name][op_type] = ep
# 找到完整的CRUD场景
required_ops = {'create', 'read', 'update', 'delete', 'list'}
scenarios = []
for resource_name, ops in grouped_ops.items():
if required_ops.issubset(ops.keys()):
scenarios.append({
'resource_name': resource_name,
'endpoints': ops,
'virtual_group_name': f"dms_crud_{resource_name}"
})
return scenarios
def test_scenario_discovery():
"""测试场景发现功能"""
print("=== 测试DMS场景发现功能 ===")
mock_spec = create_mock_dms_spec()
scenarios = discover_crud_scenarios_mock(mock_spec)
print(f"发现的场景数量: {len(scenarios)}")
for i, scenario in enumerate(scenarios):
print(f"场景 {i+1}: {scenario['resource_name']} -> {scenario['virtual_group_name']}")
print(f" 包含操作: {list(scenario['endpoints'].keys())}")
assert len(scenarios) == 3, f"期望3个场景实际发现{len(scenarios)}"
expected_resources = ['cd_site', 'cd_well_event_log', 'op_pro_well_stim_daily']
actual_resources = [s['resource_name'] for s in scenarios]
assert set(actual_resources) == set(expected_resources), f"资源名称不匹配: {actual_resources}"
print("✅ 场景发现测试通过")
def test_virtual_group_logic():
"""测试虚拟分组逻辑"""
print("\n=== 测试虚拟分组逻辑 ===")
mock_spec = create_mock_dms_spec()
scenarios = discover_crud_scenarios_mock(mock_spec)
# 测试单场景匹配
target_group = 'dms_crud_cd_site'
matched_scenario = None
for scenario in scenarios:
if scenario['virtual_group_name'] == target_group:
matched_scenario = scenario
break
assert matched_scenario is not None, f"应该找到匹配的场景: {target_group}"
assert matched_scenario['resource_name'] == 'cd_site', "资源名称应该匹配"
expected_ops = {'create', 'read', 'update', 'delete', 'list'}
actual_ops = set(matched_scenario['endpoints'].keys())
assert actual_ops == expected_ops, f"场景操作不完整: {actual_ops}"
print(f"✅ 虚拟分组 '{target_group}' 匹配成功")
print(f" 资源名称: {matched_scenario['resource_name']}")
print(f" 包含操作: {list(matched_scenario['endpoints'].keys())}")
def test_orchestrator_logic():
"""测试orchestrator逻辑"""
print("\n=== 测试Orchestrator逻辑 ===")
mock_spec = create_mock_dms_spec()
scenarios = discover_crud_scenarios_mock(mock_spec)
# 模拟orchestrator的api_groups_to_iterate逻辑
api_groups_to_iterate = []
if hasattr(mock_spec, 'spec_type') and mock_spec.spec_type == 'dms':
if scenarios:
api_groups_to_iterate.extend([scenario['virtual_group_name'] for scenario in scenarios])
print(f"DMS规范: 发现 {len(scenarios)} 个CRUD场景创建对应的虚拟分组")
else:
api_groups_to_iterate.append(None)
print(f"生成的API分组列表: {api_groups_to_iterate}")
# 验证结果
expected_groups = [
'dms_crud_cd_site',
'dms_crud_cd_well_event_log',
'dms_crud_op_pro_well_stim_daily'
]
assert len(api_groups_to_iterate) == 3, f"应该生成3个分组实际生成{len(api_groups_to_iterate)}"
assert set(api_groups_to_iterate) == set(expected_groups), f"分组名称不匹配: {api_groups_to_iterate}"
print("✅ Orchestrator逻辑测试通过")
if __name__ == "__main__":
test_scenario_discovery()
test_virtual_group_logic()
test_orchestrator_logic()
print("\n🎉 所有DMS多Stage实例测试通过")
print("💡 现在orchestrator应该能为每个CRUD场景创建独立的Stage实例了")
print("\n📋 预期结果:")
print("- 10个model → 10个虚拟分组 → 10个Stage实例")
print("- 每个实例只处理一个CRUD场景")
print("- 一个实例失败不影响其他实例")
print("- 最终显示10个Stage执行结果")

View File

@ -1,301 +0,0 @@
#!/usr/bin/env python3
"""
测试基于LLM的智能数据生成
"""
import sys
import json
from unittest.mock import Mock
from ddms_compliance_suite.utils.data_generator import DataGenerator
def test_llm_prompt_building():
"""测试LLM提示构建功能"""
print("🧪 测试LLM提示构建")
print("=" * 60)
# 模拟包含bsflag的schema
schema = {
"type": "object",
"properties": {
"bsflag": {
"type": "number",
"title": "删除标识",
"description": "逻辑删除标识表示该条记录在用或者已经失效1表示正常数据、-5表示废弃数据"
},
"siteId": {
"type": "string",
"title": "物探工区ID",
"description": "物探工区ID"
},
"siteName": {
"type": "string",
"title": "物探工区名称",
"description": "物探工区名称"
},
"dataRegion": {
"type": "string",
"title": "油田标识",
"description": "油田标识"
}
},
"required": ["bsflag", "siteId"]
}
generator = DataGenerator()
# 测试是否应该使用LLM
should_use_llm = generator._should_use_llm_for_schema(schema)
print(f"是否应该使用LLM: {should_use_llm}")
if should_use_llm:
print("✅ 检测到包含描述信息的schema应该使用LLM")
# 构建LLM提示
prompt = generator._build_llm_prompt(schema, "create_payload", "CREATE_SITE")
print("\n📝 生成的LLM提示:")
print("-" * 40)
print(prompt)
print("-" * 40)
# 检查提示是否包含关键信息
if "bsflag" in prompt and "1表示正常数据、-5表示废弃数据" in prompt:
print("✅ 提示包含bsflag的业务规则描述")
return True
else:
print("❌ 提示缺少关键的业务规则信息")
return False
else:
print("❌ 未检测到应该使用LLM的条件")
return False
def test_mock_llm_generation():
"""测试模拟LLM数据生成"""
print("\n🧪 测试模拟LLM数据生成")
print("=" * 60)
# 创建模拟的LLM服务
mock_llm_service = Mock()
# 模拟LLM返回符合业务规则的数据
mock_llm_service.generate_data_from_schema.return_value = {
"bsflag": 1, # 正确的业务值
"siteId": "SITE_001",
"siteName": "大庆油田勘探工区",
"dataRegion": "华北"
}
schema = {
"type": "object",
"properties": {
"bsflag": {
"type": "number",
"title": "删除标识",
"description": "1表示正常数据、-5表示废弃数据"
},
"siteId": {
"type": "string",
"title": "物探工区ID"
},
"siteName": {
"type": "string",
"title": "物探工区名称"
},
"dataRegion": {
"type": "string",
"title": "油田标识"
}
}
}
generator = DataGenerator()
# 使用模拟的LLM服务生成数据
generated_data = generator.generate_data_from_schema(
schema,
context_name="create_payload",
operation_id="CREATE_SITE",
llm_service=mock_llm_service
)
print(f"生成的数据: {generated_data}")
if generated_data and isinstance(generated_data, dict):
bsflag_value = generated_data.get('bsflag')
site_name = generated_data.get('siteName')
print(f"bsflag值: {bsflag_value}")
print(f"siteName: {site_name}")
# 检查LLM是否被调用
if mock_llm_service.generate_data_from_schema.called:
print("✅ LLM服务被成功调用")
# 检查调用参数
call_args = mock_llm_service.generate_data_from_schema.call_args
if call_args and 'prompt_instruction' in call_args.kwargs:
prompt = call_args.kwargs['prompt_instruction']
if "1表示正常数据、-5表示废弃数据" in prompt:
print("✅ LLM调用时传递了正确的业务规则描述")
else:
print("❌ LLM调用时缺少业务规则描述")
return False
# 检查生成的数据是否符合业务规则
if bsflag_value in [1, -5]:
print(f"✅ 生成的bsflag值符合业务规则: {bsflag_value}")
return True
else:
print(f"❌ 生成的bsflag值不符合业务规则: {bsflag_value}")
return False
else:
print("❌ LLM服务未被调用")
return False
else:
print(f"❌ 生成的数据格式不正确: {generated_data}")
return False
def test_fallback_to_traditional():
"""测试回退到传统生成的情况"""
print("\n🧪 测试回退到传统生成")
print("=" * 60)
# 创建一个会抛出异常的模拟LLM服务
mock_llm_service = Mock()
mock_llm_service.generate_data_from_schema.side_effect = Exception("LLM服务不可用")
schema = {
"type": "object",
"properties": {
"bsflag": {
"type": "number",
"description": "1表示正常数据、-5表示废弃数据"
},
"testField": {
"type": "string"
}
}
}
generator = DataGenerator()
# 尝试生成数据,应该回退到传统方式
generated_data = generator.generate_data_from_schema(
schema,
context_name="create_payload",
operation_id="CREATE_SITE",
llm_service=mock_llm_service
)
print(f"回退生成的数据: {generated_data}")
if generated_data and isinstance(generated_data, dict):
print("✅ 成功回退到传统数据生成")
# 检查是否包含基本字段
if 'bsflag' in generated_data and 'testField' in generated_data:
print("✅ 传统生成包含所有必要字段")
return True
else:
print("❌ 传统生成缺少字段")
return False
else:
print(f"❌ 回退生成失败: {generated_data}")
return False
def test_no_description_schema():
"""测试没有描述信息的schema"""
print("\n🧪 测试没有描述信息的schema")
print("=" * 60)
# 没有描述信息的简单schema
schema = {
"type": "object",
"properties": {
"id": {"type": "string"},
"count": {"type": "number"}
}
}
generator = DataGenerator()
# 检查是否应该使用LLM
should_use_llm = generator._should_use_llm_for_schema(schema)
print(f"是否应该使用LLM: {should_use_llm}")
if not should_use_llm:
print("✅ 正确识别出不需要使用LLM的schema")
# 生成数据应该直接使用传统方式
generated_data = generator.generate_data_from_schema(schema)
print(f"传统生成的数据: {generated_data}")
if generated_data and isinstance(generated_data, dict):
print("✅ 传统生成工作正常")
return True
else:
print("❌ 传统生成失败")
return False
else:
print("❌ 错误地认为应该使用LLM")
return False
def main():
"""主函数"""
print("🚀 基于LLM的智能数据生成测试")
print("=" * 80)
success_count = 0
total_tests = 4
# 测试1: LLM提示构建
if test_llm_prompt_building():
success_count += 1
# 测试2: 模拟LLM生成
if test_mock_llm_generation():
success_count += 1
# 测试3: 回退机制
if test_fallback_to_traditional():
success_count += 1
# 测试4: 无描述schema
if test_no_description_schema():
success_count += 1
# 总结
print("\n" + "=" * 80)
print("📋 测试总结")
print("=" * 80)
print(f"通过测试: {success_count}/{total_tests}")
if success_count == total_tests:
print("🎉 智能数据生成测试通过!")
print("\n✅ 实现的功能:")
print("- LLM根据字段描述智能生成数据")
print("- 自动检测是否需要使用LLM")
print("- 构建包含业务规则的详细提示")
print("- 优雅的回退到传统生成方式")
print("- 支持复杂的业务规则理解")
print("\n💡 优势:")
print("- 不需要硬编码业务规则")
print("- LLM可以理解自然语言描述")
print("- 自动适应新的业务字段")
print("- 生成更真实的测试数据")
print("\n🔧 使用方法:")
print("在schema中添加详细的description字段LLM会自动理解并生成合适的数据")
sys.exit(0)
else:
print("❌ 部分测试失败")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,333 +0,0 @@
#!/usr/bin/env python3
"""
测试多主键删除功能
"""
import sys
import json
from unittest.mock import Mock
from custom_stages.dms_crud_scenario_stage import DmsCrudScenarioStage
from ddms_compliance_suite.input_parser.parser import DMSEndpoint
def test_single_key_delete():
"""测试单主键删除(传统方式)"""
print("🧪 测试单主键删除")
print("=" * 60)
# 模拟单主键删除的schema
delete_schema = {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"type": "string" # 简单的字符串数组
}
}
}
}
# 创建模拟的删除端点
mock_delete_endpoint = Mock(spec=DMSEndpoint)
mock_delete_endpoint.request_body = {
"content": {
"application/json": {
"schema": delete_schema
}
}
}
# 创建模拟的scenario
scenario = {
"delete": mock_delete_endpoint
}
# 创建CRUD Stage实例
crud_stage = DmsCrudScenarioStage(
api_group_metadata={"name": "测试"},
apis_in_group=[],
global_api_spec=Mock()
)
# 测试构建删除请求体
create_payload = {"siteId": "test_site_001", "siteName": "测试工区"}
delete_body = crud_stage._build_delete_request_body(
scenario, "siteId", "test_site_001", create_payload
)
print(f"单主键删除请求体: {json.dumps(delete_body, indent=2, ensure_ascii=False)}")
# 验证结果
expected_structure = {"data": ["test_site_001"]}
if delete_body == expected_structure:
print("✅ 单主键删除请求体格式正确")
return True
else:
print(f"❌ 单主键删除请求体格式错误,期望: {expected_structure}")
return False
def test_multi_key_delete():
"""测试多主键删除(对象列表)"""
print("\n🧪 测试多主键删除")
print("=" * 60)
# 模拟多主键删除的schema
delete_schema = {
"type": "object",
"properties": {
"version": {"type": "string"},
"data": {
"type": "array",
"items": {
"type": "object",
"properties": {
"projectId": {"type": "string", "title": "项目ID"},
"surveyId": {"type": "string", "title": "工区ID"}
},
"required": ["projectId", "surveyId"]
}
}
}
}
# 创建模拟的删除端点
mock_delete_endpoint = Mock(spec=DMSEndpoint)
mock_delete_endpoint.request_body = {
"content": {
"application/json": {
"schema": delete_schema
}
}
}
# 创建模拟的scenario
scenario = {
"delete": mock_delete_endpoint
}
# 创建CRUD Stage实例
crud_stage = DmsCrudScenarioStage(
api_group_metadata={"name": "测试"},
apis_in_group=[],
global_api_spec=Mock()
)
# 测试构建删除请求体
create_payload = {
"projectId": "项目1_ID",
"surveyId": "工区1_ID",
"projectName": "测试项目",
"surveyName": "测试工区"
}
delete_body = crud_stage._build_delete_request_body(
scenario, "projectId", "项目1_ID", create_payload
)
print(f"多主键删除请求体: {json.dumps(delete_body, indent=2, ensure_ascii=False)}")
# 验证结果
if isinstance(delete_body, dict) and "data" in delete_body:
data_array = delete_body["data"]
if isinstance(data_array, list) and len(data_array) > 0:
first_item = data_array[0]
# 检查第一个对象是否包含正确的主键
if (isinstance(first_item, dict) and
first_item.get("projectId") == "项目1_ID" and
first_item.get("surveyId") == "工区1_ID"):
print("✅ 多主键删除请求体格式正确")
print(f"✅ 包含主键: projectId={first_item['projectId']}, surveyId={first_item['surveyId']}")
# 检查是否有版本号
if delete_body.get("version"):
print(f"✅ 包含版本号: {delete_body['version']}")
# 检查是否支持批量删除
if len(data_array) > 1:
print(f"✅ 支持批量删除,共{len(data_array)}个对象")
return True
else:
print(f"❌ 主键字段不正确: {first_item}")
return False
else:
print(f"❌ data数组格式错误: {data_array}")
return False
else:
print(f"❌ 删除请求体格式错误: {delete_body}")
return False
def test_missing_key_generation():
"""测试缺失主键的默认值生成"""
print("\n🧪 测试缺失主键的默认值生成")
print("=" * 60)
# 模拟删除schema包含创建负载中没有的字段
delete_schema = {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"type": "object",
"properties": {
"siteId": {"type": "string"},
"regionId": {"type": "string"}, # 创建负载中没有这个字段
"version": {"type": "number"} # 创建负载中也没有这个字段
},
"required": ["siteId", "regionId", "version"]
}
}
}
}
mock_delete_endpoint = Mock(spec=DMSEndpoint)
mock_delete_endpoint.request_body = {
"content": {
"application/json": {
"schema": delete_schema
}
}
}
scenario = {"delete": mock_delete_endpoint}
crud_stage = DmsCrudScenarioStage(
api_group_metadata={"name": "测试"},
apis_in_group=[],
global_api_spec=Mock()
)
# 创建负载只包含部分字段
create_payload = {"siteId": "test_site_001", "siteName": "测试工区"}
delete_body = crud_stage._build_delete_request_body(
scenario, "siteId", "test_site_001", create_payload
)
print(f"缺失字段生成测试: {json.dumps(delete_body, indent=2, ensure_ascii=False)}")
if isinstance(delete_body, dict) and "data" in delete_body:
data_array = delete_body["data"]
if isinstance(data_array, list) and len(data_array) > 0:
first_item = data_array[0]
# 检查是否包含所有必需字段
required_fields = ["siteId", "regionId", "version"]
missing_fields = [field for field in required_fields if field not in first_item]
if not missing_fields:
print("✅ 成功生成所有缺失的必需字段")
print(f"✅ siteId: {first_item['siteId']}")
print(f"✅ regionId: {first_item['regionId']} (自动生成)")
print(f"✅ version: {first_item['version']} (自动生成)")
return True
else:
print(f"❌ 缺失必需字段: {missing_fields}")
return False
else:
print(f"❌ data数组格式错误: {data_array}")
return False
else:
print(f"❌ 删除请求体格式错误: {delete_body}")
return False
def test_fallback_scenarios():
"""测试各种回退场景"""
print("\n🧪 测试回退场景")
print("=" * 60)
crud_stage = DmsCrudScenarioStage(
api_group_metadata={"name": "测试"},
apis_in_group=[],
global_api_spec=Mock()
)
create_payload = {"siteId": "test_site_001"}
# 测试1: 没有删除端点
scenario_no_delete = {}
delete_body1 = crud_stage._build_delete_request_body(
scenario_no_delete, "siteId", "test_site_001", create_payload
)
print(f"无删除端点回退: {delete_body1}")
# 测试2: 删除端点没有请求体
mock_delete_no_body = Mock(spec=DMSEndpoint)
mock_delete_no_body.request_body = None
scenario_no_body = {"delete": mock_delete_no_body}
delete_body2 = crud_stage._build_delete_request_body(
scenario_no_body, "siteId", "test_site_001", create_payload
)
print(f"无请求体回退: {delete_body2}")
# 验证回退结果
expected_fallback = {"data": ["test_site_001"]}
if delete_body1 == expected_fallback and delete_body2 == expected_fallback:
print("✅ 回退场景处理正确")
return True
else:
print("❌ 回退场景处理错误")
return False
def main():
"""主函数"""
print("🚀 多主键删除功能测试")
print("=" * 80)
success_count = 0
total_tests = 4
# 测试1: 单主键删除
if test_single_key_delete():
success_count += 1
# 测试2: 多主键删除
if test_multi_key_delete():
success_count += 1
# 测试3: 缺失字段生成
if test_missing_key_generation():
success_count += 1
# 测试4: 回退场景
if test_fallback_scenarios():
success_count += 1
# 总结
print("\n" + "=" * 80)
print("📋 测试总结")
print("=" * 80)
print(f"通过测试: {success_count}/{total_tests}")
if success_count == total_tests:
print("🎉 多主键删除功能测试通过!")
print("\n✅ 实现的功能:")
print("- 自动检测删除操作的schema结构")
print("- 支持单主键的字符串数组格式")
print("- 支持多主键的对象列表格式")
print("- 自动从创建负载中提取相关主键")
print("- 为缺失的必需字段生成默认值")
print("- 支持批量删除(生成多个对象)")
print("- 优雅的回退到简单格式")
print("\n💡 支持的删除格式:")
print("1. 简单主键: {\"data\": [\"key1\", \"key2\"]}")
print("2. 多主键对象: {\"version\": \"1.0.0\", \"data\": [{\"projectId\": \"项目1\", \"surveyId\": \"工区1\"}]}")
sys.exit(0)
else:
print("❌ 部分测试失败")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,71 +0,0 @@
#!/usr/bin/env python3
"""
测试多主键支持的简单脚本
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from custom_stages.dms_crud_scenario_stage import DmsCrudScenarioStage
def test_build_list_filter_payload():
"""测试LIST过滤条件构建"""
# 创建一个简单的stage实例来测试方法
stage = DmsCrudScenarioStage(api_group_metadata={}, apis_in_group=[])
# 测试多主键场景
identity_id_list = ["wellId", "wellboreId", "eventName"]
all_pk_values = {
"wellId": "WELLTL100017525",
"wellboreId": "WEBHTL100001283",
"eventName": "测试井2"
}
result = stage._build_list_filter_payload(identity_id_list, all_pk_values)
print("=== 多主键LIST过滤条件 ===")
import json
print(json.dumps(result, indent=2, ensure_ascii=False))
# 验证结构
assert "isSearchCount" in result
assert "query" in result
assert "filter" in result["query"]
assert "subFilter" in result["query"]["filter"]
assert len(result["query"]["filter"]["subFilter"]) == 1 # 简化模式:只使用一个过滤条件
# 验证过滤条件(简化模式:只使用第一个主键)
assert len(result["query"]["filter"]["subFilter"]) == 1
sub_filter = result["query"]["filter"]["subFilter"][0]
assert sub_filter["key"] == identity_id_list[0] # 第一个主键
assert sub_filter["symbol"] == "="
assert sub_filter["realValue"] == [all_pk_values[identity_id_list[0]]]
print("✅ 多主键LIST过滤条件测试通过")
def test_single_pk_scenario():
"""测试单主键场景"""
stage = DmsCrudScenarioStage(api_group_metadata={}, apis_in_group=[])
# 测试单主键场景
identity_id_list = ["id"]
all_pk_values = {"id": "12345"}
result = stage._build_list_filter_payload(identity_id_list, all_pk_values)
print("\n=== 单主键LIST过滤条件 ===")
import json
print(json.dumps(result, indent=2, ensure_ascii=False))
# 验证结构
assert len(result["query"]["filter"]["subFilter"]) == 1
assert result["query"]["filter"]["subFilter"][0]["key"] == "id"
assert result["query"]["filter"]["subFilter"][0]["realValue"] == ["12345"]
print("✅ 单主键LIST过滤条件测试通过")
if __name__ == "__main__":
test_build_list_filter_payload()
test_single_pk_scenario()
print("\n🎉 所有测试通过!")

View File

@ -1,270 +0,0 @@
#!/usr/bin/env python3
"""
测试SSL忽略功能修复
"""
import sys
import json
import requests
from unittest.mock import Mock, patch
def test_api_server_ssl_config():
"""测试api_server.py的SSL配置"""
print("🧪 测试api_server.py的SSL配置")
print("=" * 60)
try:
# 导入api_server模块
import api_server
# 测试默认配置
print("检查默认配置...")
# 模拟请求数据
test_config = {
'base-url': 'https://127.0.0.1:5001/',
'dms': './assets/doc/dms/domain.json'
}
# 模拟Flask请求
with patch('api_server.request') as mock_request:
mock_request.get_json.return_value = test_config
# 检查默认配置是否包含ignore-ssl
defaults = {
'base-url': 'http://127.0.0.1:5001/',
'dms': './assets/doc/dms/domain.json',
'stages-dir': './custom_stages',
'custom-test-cases-dir': './custom_testcases',
'verbose': True,
'output': './test_reports/',
'format': 'json',
'generate-pdf': True,
'strictness-level': 'CRITICAL',
'ignore-ssl': True, # 这是我们要检查的
}
# 合并配置
config = {**defaults, **test_config}
if 'ignore-ssl' in config:
print(f"✅ 默认配置包含ignore-ssl: {config['ignore-ssl']}")
return True
else:
print("❌ 默认配置缺少ignore-ssl选项")
return False
except ImportError as e:
print(f"❌ 导入api_server失败: {e}")
return False
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
def test_orchestrator_ssl_parameter():
"""测试APITestOrchestrator的SSL参数"""
print("\n🧪 测试APITestOrchestrator的SSL参数")
print("=" * 60)
try:
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator
# 测试创建带有ignore_ssl参数的orchestrator
orchestrator = APITestOrchestrator(
base_url="https://127.0.0.1:5001",
ignore_ssl=True
)
# 检查ignore_ssl属性是否正确设置
if hasattr(orchestrator, 'ignore_ssl') and orchestrator.ignore_ssl:
print("✅ APITestOrchestrator正确接受并存储ignore_ssl参数")
print(f"✅ ignore_ssl值: {orchestrator.ignore_ssl}")
return True
else:
print("❌ APITestOrchestrator没有正确处理ignore_ssl参数")
return False
except Exception as e:
print(f"❌ 测试APITestOrchestrator失败: {e}")
return False
def test_run_tests_from_dms_ssl():
"""测试run_tests_from_dms方法的SSL参数传递"""
print("\n🧪 测试run_tests_from_dms的SSL参数传递")
print("=" * 60)
try:
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator
from unittest.mock import patch, MagicMock
# 创建orchestrator实例
orchestrator = APITestOrchestrator(
base_url="https://127.0.0.1:5001",
ignore_ssl=True
)
# 模拟InputParser
with patch('ddms_compliance_suite.test_orchestrator.InputParser') as mock_parser_class:
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
mock_parser.parse_dms_spec.return_value = None # 模拟解析失败,避免实际网络调用
# 调用run_tests_from_dms方法
try:
summary, spec = orchestrator.run_tests_from_dms(
domain_mapping_path="./test_domain.json"
)
# 检查parse_dms_spec是否被正确调用
mock_parser.parse_dms_spec.assert_called_once()
call_args = mock_parser.parse_dms_spec.call_args
# 检查ignore_ssl参数是否正确传递
if 'ignore_ssl' in call_args.kwargs:
ignore_ssl_value = call_args.kwargs['ignore_ssl']
if ignore_ssl_value:
print("✅ run_tests_from_dms正确传递ignore_ssl=True")
return True
else:
print(f"❌ ignore_ssl值不正确: {ignore_ssl_value}")
return False
else:
print("❌ run_tests_from_dms没有传递ignore_ssl参数")
return False
except Exception as e:
print(f"⚠️ run_tests_from_dms调用出现预期的错误这是正常的: {e}")
# 即使出错,也要检查参数传递
if mock_parser.parse_dms_spec.called:
call_args = mock_parser.parse_dms_spec.call_args
if 'ignore_ssl' in call_args.kwargs and call_args.kwargs['ignore_ssl']:
print("✅ 即使出错ignore_ssl参数也正确传递了")
return True
return False
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
def test_curl_example():
"""测试cURL示例中的SSL配置"""
print("\n🧪 测试cURL示例的SSL配置")
print("=" * 60)
# 模拟cURL请求的数据
curl_data = {
"base-url": "https://127.0.0.1:5001/",
"dms": "./assets/doc/dms/domain.json",
"custom-test-cases-dir": "./custom_testcases",
"stages-dir": "./custom_stages",
"output": "./test_reports/",
"ignore-ssl": True # 用户可以在cURL中指定
}
print("模拟cURL请求数据:")
print(json.dumps(curl_data, indent=2, ensure_ascii=False))
# 检查关键配置
if curl_data.get('ignore-ssl'):
print("✅ cURL示例支持ignore-ssl配置")
return True
else:
print("❌ cURL示例缺少ignore-ssl配置")
return False
def test_ssl_verification_behavior():
"""测试SSL验证行为"""
print("\n🧪 测试SSL验证行为")
print("=" * 60)
try:
# 测试requests库的SSL验证设置
print("测试requests库的SSL验证设置...")
# 模拟HTTPS请求不实际发送
session = requests.Session()
# 测试ignore_ssl=True的情况
session.verify = False # 这相当于ignore_ssl=True
print(f"✅ ignore_ssl=True时requests.verify={session.verify}")
# 测试ignore_ssl=False的情况
session.verify = True # 这相当于ignore_ssl=False
print(f"✅ ignore_ssl=False时requests.verify={session.verify}")
return True
except Exception as e:
print(f"❌ SSL验证行为测试失败: {e}")
return False
def main():
"""主函数"""
print("🚀 SSL忽略功能修复测试")
print("=" * 80)
success_count = 0
total_tests = 5
# 测试1: api_server.py的SSL配置
if test_api_server_ssl_config():
success_count += 1
# 测试2: APITestOrchestrator的SSL参数
if test_orchestrator_ssl_parameter():
success_count += 1
# 测试3: run_tests_from_dms的SSL参数传递
if test_run_tests_from_dms_ssl():
success_count += 1
# 测试4: cURL示例
if test_curl_example():
success_count += 1
# 测试5: SSL验证行为
if test_ssl_verification_behavior():
success_count += 1
# 总结
print("\n" + "=" * 80)
print("📋 测试总结")
print("=" * 80)
print(f"通过测试: {success_count}/{total_tests}")
if success_count == total_tests:
print("🎉 SSL忽略功能修复测试通过")
print("\n✅ 修复内容:")
print("- APITestOrchestrator.__init__()添加ignore_ssl参数")
print("- api_server.py默认配置包含ignore-ssl: True")
print("- APITestOrchestrator初始化时传递ignore_ssl参数")
print("- run_tests_from_dms方法正确使用ignore_ssl设置")
print("\n💡 使用方法:")
print("1. 命令行: python run_api_tests.py --dms domain.json --ignore-ssl")
print("2. API服务器: 默认启用ignore-ssl或在请求中指定")
print("3. cURL示例: 在JSON数据中添加 \"ignore-ssl\": true")
print("\n🔧 cURL示例:")
print("curl -X POST http://127.0.0.1:5002/run \\")
print("-H \"Content-Type: application/json\" \\")
print("-d '{")
print(" \"base-url\": \"https://127.0.0.1:5001/\",")
print(" \"dms\": \"./assets/doc/dms/domain.json\",")
print(" \"ignore-ssl\": true")
print("}'")
sys.exit(0)
else:
print("❌ 部分测试失败")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,226 +0,0 @@
#!/usr/bin/env python3
"""
简化的SSL忽略功能测试
"""
import sys
def test_orchestrator_ssl_support():
"""测试APITestOrchestrator的SSL支持"""
print("🧪 测试APITestOrchestrator的SSL支持")
print("=" * 60)
try:
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator
# 测试1: 创建带有ignore_ssl=True的orchestrator
orchestrator_ssl_true = APITestOrchestrator(
base_url="https://127.0.0.1:5001",
ignore_ssl=True
)
if hasattr(orchestrator_ssl_true, 'ignore_ssl') and orchestrator_ssl_true.ignore_ssl:
print("✅ ignore_ssl=True正确设置")
else:
print("❌ ignore_ssl=True设置失败")
return False
# 测试2: 创建带有ignore_ssl=False的orchestrator
orchestrator_ssl_false = APITestOrchestrator(
base_url="https://127.0.0.1:5001",
ignore_ssl=False
)
if hasattr(orchestrator_ssl_false, 'ignore_ssl') and not orchestrator_ssl_false.ignore_ssl:
print("✅ ignore_ssl=False正确设置")
else:
print("❌ ignore_ssl=False设置失败")
return False
# 测试3: 默认值应该是False
orchestrator_default = APITestOrchestrator(
base_url="https://127.0.0.1:5001"
)
if hasattr(orchestrator_default, 'ignore_ssl') and not orchestrator_default.ignore_ssl:
print("✅ ignore_ssl默认值正确False")
else:
print("❌ ignore_ssl默认值错误")
return False
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
def test_api_server_config():
"""测试api_server.py的配置"""
print("\n🧪 测试api_server.py的配置")
print("=" * 60)
try:
# 直接检查api_server.py文件内容
with open('api_server.py', 'r', encoding='utf-8') as f:
content = f.read()
# 检查是否包含ignore-ssl配置
if "'ignore-ssl': True" in content:
print("✅ api_server.py包含ignore-ssl默认配置")
else:
print("❌ api_server.py缺少ignore-ssl默认配置")
return False
# 检查是否在APITestOrchestrator初始化中传递了ignore_ssl
if "ignore_ssl=config.get('ignore-ssl', False)" in content:
print("✅ api_server.py正确传递ignore_ssl参数")
else:
print("❌ api_server.py没有传递ignore_ssl参数")
return False
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
def test_run_api_tests_ssl():
"""测试run_api_tests.py的SSL支持"""
print("\n🧪 测试run_api_tests.py的SSL支持")
print("=" * 60)
try:
# 检查run_api_tests.py文件内容
with open('run_api_tests.py', 'r', encoding='utf-8') as f:
content = f.read()
# 检查是否包含--ignore-ssl参数
if "--ignore-ssl" in content:
print("✅ run_api_tests.py包含--ignore-ssl参数")
else:
print("❌ run_api_tests.py缺少--ignore-ssl参数")
return False
# 检查是否传递给APITestOrchestrator
if "ignore_ssl=" in content:
print("✅ run_api_tests.py传递ignore_ssl参数")
else:
print("❌ run_api_tests.py没有传递ignore_ssl参数")
return False
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
def test_ssl_parameter_flow():
"""测试SSL参数的完整流程"""
print("\n🧪 测试SSL参数的完整流程")
print("=" * 60)
try:
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator
from unittest.mock import patch, MagicMock
# 创建orchestrator实例启用SSL忽略
orchestrator = APITestOrchestrator(
base_url="https://127.0.0.1:5001",
ignore_ssl=True
)
# 模拟InputParser来测试参数传递
with patch('ddms_compliance_suite.test_orchestrator.InputParser') as mock_parser_class:
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
mock_parser.parse_dms_spec.return_value = None
# 调用run_tests_from_dms不传递ignore_ssl参数应该使用实例的设置
try:
orchestrator.run_tests_from_dms("./test.json")
except:
pass # 忽略实际执行错误
# 检查parse_dms_spec是否被调用且ignore_ssl=True
if mock_parser.parse_dms_spec.called:
call_args = mock_parser.parse_dms_spec.call_args
if call_args and 'ignore_ssl' in call_args.kwargs:
ignore_ssl_value = call_args.kwargs['ignore_ssl']
if ignore_ssl_value:
print("✅ 实例的ignore_ssl设置正确传递给parse_dms_spec")
else:
print(f"❌ ignore_ssl值不正确: {ignore_ssl_value}")
return False
else:
print("❌ parse_dms_spec没有收到ignore_ssl参数")
return False
else:
print("❌ parse_dms_spec没有被调用")
return False
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
def main():
"""主函数"""
print("🚀 SSL忽略功能简化测试")
print("=" * 80)
success_count = 0
total_tests = 4
# 测试1: APITestOrchestrator的SSL支持
if test_orchestrator_ssl_support():
success_count += 1
# 测试2: api_server.py的配置
if test_api_server_config():
success_count += 1
# 测试3: run_api_tests.py的SSL支持
if test_run_api_tests_ssl():
success_count += 1
# 测试4: SSL参数流程
if test_ssl_parameter_flow():
success_count += 1
# 总结
print("\n" + "=" * 80)
print("📋 测试总结")
print("=" * 80)
print(f"通过测试: {success_count}/{total_tests}")
if success_count == total_tests:
print("🎉 SSL忽略功能修复完成")
print("\n✅ 修复总结:")
print("1. 多主键删除功能:")
print(" - 单主键: {\"data\": [\"key1\", \"key2\"]}")
print(" - 多主键: {\"version\": \"1.0.0\", \"data\": [{\"projectId\": \"项目1\", \"surveyId\": \"工区1\"}]}")
print(" - 自动检测schema结构并选择合适的格式")
print(" - 支持批量删除和缺失字段生成")
print("\n2. SSL忽略功能:")
print(" - APITestOrchestrator.__init__()支持ignore_ssl参数")
print(" - api_server.py默认启用ignore-ssl")
print(" - run_api_tests.py已有--ignore-ssl参数")
print(" - 参数正确传递到DMS解析器")
print("\n💡 使用方法:")
print("命令行: python run_api_tests.py --dms domain.json --ignore-ssl")
print("API服务器: 默认启用或在请求JSON中指定 \"ignore-ssl\": true")
sys.exit(0)
else:
print("❌ 部分测试失败")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,43 +0,0 @@
#!/usr/bin/env python3
"""
测试Stage专用LLM配置的脚本
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from custom_stages.dms_crud_scenario_stage import DmsCrudScenarioStage
def test_stage_llm_config():
"""测试DMS Stage内置的LLM配置"""
print("=== 测试DMS Stage内置LLM配置 ===")
# 测试1: 检查类属性
print(f"DMS Stage LLM启用状态: {DmsCrudScenarioStage.enable_llm_data_generation}")
assert DmsCrudScenarioStage.enable_llm_data_generation == True
print("✅ 类属性测试通过")
# 测试2: 实例化Stage
stage = DmsCrudScenarioStage(
api_group_metadata={},
apis_in_group=[],
llm_service=None # 模拟没有LLM服务
)
print(f"Stage实例LLM启用状态: {stage.enable_llm_data_generation}")
assert stage.enable_llm_data_generation == True
print("✅ 实例属性测试通过")
# 测试3: 可以动态关闭LLM
stage.enable_llm_data_generation = False
print(f"动态关闭后: {stage.enable_llm_data_generation}")
assert stage.enable_llm_data_generation == False
print("✅ 动态控制测试通过")
print("\n🎉 DMS Stage内置LLM配置测试通过")
print("💡 现在只需要提供LLM API密钥DMS Stage会自动使用LLM生成数据")
if __name__ == "__main__":
test_stage_llm_config()