fix:dms api

This commit is contained in:
gongwenxin 2025-08-19 17:53:40 +08:00
parent ad5f3b2c52
commit fbd90237ec

202
test_crud_stage_fixes.py Normal file
View File

@ -0,0 +1,202 @@
#!/usr/bin/env python3
"""
测试CRUD场景修复的脚本
"""
import sys
import logging
import json
from custom_stages.dms_crud_scenario_stage import DmsCrudScenarioStage
from ddms_compliance_suite.utils.data_generator import DataGenerator
def test_crud_stage_fixes():
"""测试CRUD场景的修复"""
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.info("开始测试CRUD场景修复...")
# 模拟井数据管理器
class MockWellDataManager:
def __init__(self):
self.well_data = [
{"wellId": "HB00019975", "wellCommonName": "郑4-106"},
]
self.wellbore_data = [
{"wellboreId": "WEBHHB100083169", "wellboreCommonName": "郑4-106主井筒"},
]
def is_well_related_field(self, field_name):
return field_name in ['wellId', 'wellboreId', 'wellCommonName']
def get_well_value_for_field(self, field_name):
if field_name == 'wellId':
return self.well_data[0]['wellId']
elif field_name == 'wellboreId':
return self.wellbore_data[0]['wellboreId']
elif field_name == 'wellCommonName':
return self.well_data[0]['wellCommonName']
return None
def enhance_data_with_well_values(self, data):
if not isinstance(data, dict):
return data
enhanced_data = data.copy()
for field_name, value in data.items():
if self.is_well_related_field(field_name):
real_value = self.get_well_value_for_field(field_name)
if real_value is not None:
enhanced_data[field_name] = real_value
logger.info(f"🔄 替换字段 '{field_name}': {value} -> {real_value}")
return enhanced_data
# 模拟编排器
class MockOrchestrator:
def __init__(self):
self.well_data_manager = MockWellDataManager()
# 创建CRUD场景实例
crud_stage = DmsCrudScenarioStage()
crud_stage.orchestrator = MockOrchestrator()
# 测试1: 验证DataGenerator是否正确传递井数据管理器
logger.info("1. 测试DataGenerator井数据管理器传递...")
# 模拟创建Schema
create_schema = {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"type": "object",
"properties": {
"wellId": {"type": "string"},
"wellboreId": {"type": "string"},
"wellCommonName": {"type": "string"},
"dataRegion": {"type": "string"}
}
}
}
}
}
# 测试数据生成器是否正确传递井数据管理器
data_generator = DataGenerator(
logger_param=logger,
well_data_manager=getattr(crud_stage.orchestrator, 'well_data_manager', None)
)
if data_generator.well_data_manager:
logger.info("✅ DataGenerator正确接收到井数据管理器")
else:
logger.error("❌ DataGenerator没有接收到井数据管理器")
return False
# 测试数据生成
generated_data = data_generator.generate_data_from_schema(create_schema, "test_context")
logger.info(f"生成的数据: {json.dumps(generated_data, ensure_ascii=False, indent=2)}")
# 验证井数据是否被应用
if generated_data and 'data' in generated_data and len(generated_data['data']) > 0:
first_item = generated_data['data'][0]
if 'wellId' in first_item and first_item['wellId'] == 'HB00019975':
logger.info("✅ 井数据被正确应用")
else:
logger.warning(f"❌ 井数据未被应用wellId值: {first_item.get('wellId')}")
# 测试2: 验证CRUD场景步骤的请求体格式
logger.info("2. 测试CRUD场景步骤的请求体格式...")
# 检查创建步骤的请求体格式
create_step = crud_stage.steps[0] # Step 1: Create Resource
create_body = create_step.request_overrides.get("body", {})
logger.info(f"创建步骤请求体: {json.dumps(create_body, ensure_ascii=False, indent=2)}")
# 验证创建请求体格式
if "version" in create_body and "act" in create_body and "data" in create_body:
if create_body["version"] == "1.0.0" and create_body["act"] == -1:
logger.info("✅ 创建请求体格式正确")
else:
logger.error(f"❌ 创建请求体字段值错误: version={create_body.get('version')}, act={create_body.get('act')}")
return False
else:
logger.error(f"❌ 创建请求体缺少必要字段: {list(create_body.keys())}")
return False
# 检查更新步骤的请求体格式
update_step = crud_stage.steps[2] # Step 3: Update Resource
update_body = update_step.request_overrides.get("body", {})
logger.info(f"更新步骤请求体: {json.dumps(update_body, ensure_ascii=False, indent=2)}")
# 验证更新请求体格式
if "version" in update_body and "act" in update_body and "data" in update_body:
if update_body["version"] == "1.0.0" and update_body["act"] == -1:
logger.info("✅ 更新请求体格式正确")
else:
logger.error(f"❌ 更新请求体字段值错误: version={update_body.get('version')}, act={update_body.get('act')}")
return False
else:
logger.error(f"❌ 更新请求体缺少必要字段: {list(update_body.keys())}")
return False
# 测试3: 验证删除请求体格式
logger.info("3. 测试删除请求体格式...")
# 模拟删除请求体构建
mock_scenario = {}
mock_create_payload = {"wellId": "test_well", "dataRegion": "test_region"}
delete_body = crud_stage._build_delete_request_body(mock_scenario, "dsid", "test-id", mock_create_payload)
logger.info(f"删除请求体: {json.dumps(delete_body, ensure_ascii=False, indent=2)}")
# 验证删除请求体格式
if "version" in delete_body and "data" in delete_body:
if delete_body["version"] == "1.0.0":
logger.info("✅ 删除请求体格式正确")
else:
logger.error(f"❌ 删除请求体version字段值错误: {delete_body.get('version')}")
return False
else:
logger.error(f"❌ 删除请求体缺少必要字段: {list(delete_body.keys())}")
return False
# 测试4: 验证查询请求体格式
logger.info("4. 测试查询请求体格式...")
# 模拟查询请求体构建
mock_identity_list = ["dsid"]
mock_pk_values = {"dsid": "test-id"}
list_body = crud_stage._build_list_filter_payload(mock_identity_list, mock_pk_values)
logger.info(f"查询请求体: {json.dumps(list_body, ensure_ascii=False, indent=2)}")
# 验证查询请求体格式
if "version" in list_body and "isSearchCount" in list_body and "query" in list_body:
if list_body["version"] == "1.0.0":
logger.info("✅ 查询请求体格式正确")
else:
logger.error(f"❌ 查询请求体version字段值错误: {list_body.get('version')}")
return False
else:
logger.error(f"❌ 查询请求体缺少必要字段: {list(list_body.keys())}")
return False
logger.info("🎉 所有CRUD场景修复测试通过")
return True
if __name__ == "__main__":
try:
success = test_crud_stage_fixes()
sys.exit(0 if success else 1)
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)