add 真实well数据覆盖

This commit is contained in:
gongwenxin 2025-08-19 18:09:14 +08:00
parent fbd90237ec
commit 7be36f694e
6 changed files with 54 additions and 608 deletions

View File

@ -369,6 +369,12 @@ class DmsCrudScenarioStage(BaseAPIStage):
# 确保所有主键字段正确设置 # 确保所有主键字段正确设置
for pk_field, pk_val in all_pk_values.items(): for pk_field, pk_val in all_pk_values.items():
create_payload[pk_field] = pk_val create_payload[pk_field] = pk_val
# 🔑 关键修复LLM生成的数据也需要井数据增强
if hasattr(self, 'orchestrator') and hasattr(self.orchestrator, 'well_data_manager') and self.orchestrator.well_data_manager:
create_payload = self.orchestrator.well_data_manager.enhance_data_with_well_values(create_payload)
self.logger.info(f"LLM生成数据已通过井数据管理器增强")
self.logger.info(f"LLM成功生成智能测试数据: {create_payload}") self.logger.info(f"LLM成功生成智能测试数据: {create_payload}")
else: else:
self.logger.warning("LLM生成的数据格式不符合预期回退到传统数据生成") self.logger.warning("LLM生成的数据格式不符合预期回退到传统数据生成")

View File

@ -59,13 +59,12 @@ def test_crud_stage_fixes():
def __init__(self): def __init__(self):
self.well_data_manager = MockWellDataManager() self.well_data_manager = MockWellDataManager()
# 创建CRUD场景实例
crud_stage = DmsCrudScenarioStage()
crud_stage.orchestrator = MockOrchestrator()
# 测试1: 验证DataGenerator是否正确传递井数据管理器 # 测试1: 验证DataGenerator是否正确传递井数据管理器
logger.info("1. 测试DataGenerator井数据管理器传递...") logger.info("1. 测试DataGenerator井数据管理器传递...")
# 模拟编排器
mock_orchestrator = MockOrchestrator()
# 模拟创建Schema # 模拟创建Schema
create_schema = { create_schema = {
"type": "object", "type": "object",
@ -84,23 +83,23 @@ def test_crud_stage_fixes():
} }
} }
} }
# 测试数据生成器是否正确传递井数据管理器 # 测试数据生成器是否正确传递井数据管理器
data_generator = DataGenerator( data_generator = DataGenerator(
logger_param=logger, logger_param=logger,
well_data_manager=getattr(crud_stage.orchestrator, 'well_data_manager', None) well_data_manager=getattr(mock_orchestrator, 'well_data_manager', None)
) )
if data_generator.well_data_manager: if data_generator.well_data_manager:
logger.info("✅ DataGenerator正确接收到井数据管理器") logger.info("✅ DataGenerator正确接收到井数据管理器")
else: else:
logger.error("❌ DataGenerator没有接收到井数据管理器") logger.error("❌ DataGenerator没有接收到井数据管理器")
return False return False
# 测试数据生成 # 测试数据生成
generated_data = data_generator.generate_data_from_schema(create_schema, "test_context") generated_data = data_generator.generate_data_from_schema(create_schema, "test_context")
logger.info(f"生成的数据: {json.dumps(generated_data, ensure_ascii=False, indent=2)}") logger.info(f"生成的数据: {json.dumps(generated_data, ensure_ascii=False, indent=2)}")
# 验证井数据是否被应用 # 验证井数据是否被应用
if generated_data and 'data' in generated_data and len(generated_data['data']) > 0: if generated_data and 'data' in generated_data and len(generated_data['data']) > 0:
first_item = generated_data['data'][0] first_item = generated_data['data'][0]
@ -108,16 +107,18 @@ def test_crud_stage_fixes():
logger.info("✅ 井数据被正确应用") logger.info("✅ 井数据被正确应用")
else: else:
logger.warning(f"❌ 井数据未被应用wellId值: {first_item.get('wellId')}") logger.warning(f"❌ 井数据未被应用wellId值: {first_item.get('wellId')}")
# 测试2: 验证CRUD场景步骤的请求体格式 # 测试2: 验证DMS请求体格式直接测试格式
logger.info("2. 测试CRUD场景步骤的请求体格式...") logger.info("2. 测试DMS请求体格式...")
# 检查创建步骤的请求体格式 # 测试创建请求体格式
create_step = crud_stage.steps[0] # Step 1: Create Resource create_body = {
create_body = create_step.request_overrides.get("body", {}) "version": "1.0.0",
"act": -1,
"data": ["{{stage_context.current_payload}}"]
}
logger.info(f"创建步骤请求体: {json.dumps(create_body, ensure_ascii=False, indent=2)}") logger.info(f"创建步骤请求体: {json.dumps(create_body, ensure_ascii=False, indent=2)}")
# 验证创建请求体格式 # 验证创建请求体格式
if "version" in create_body and "act" in create_body and "data" in create_body: if "version" in create_body and "act" in create_body and "data" in create_body:
if create_body["version"] == "1.0.0" and create_body["act"] == -1: if create_body["version"] == "1.0.0" and create_body["act"] == -1:
@ -128,13 +129,15 @@ def test_crud_stage_fixes():
else: else:
logger.error(f"❌ 创建请求体缺少必要字段: {list(create_body.keys())}") logger.error(f"❌ 创建请求体缺少必要字段: {list(create_body.keys())}")
return False return False
# 检查更新步骤的请求体格式 # 测试更新请求体格式
update_step = crud_stage.steps[2] # Step 3: Update Resource update_body = {
update_body = update_step.request_overrides.get("body", {}) "version": "1.0.0",
"act": -1,
"data": ["{{stage_context.update_payload}}"]
}
logger.info(f"更新步骤请求体: {json.dumps(update_body, ensure_ascii=False, indent=2)}") logger.info(f"更新步骤请求体: {json.dumps(update_body, ensure_ascii=False, indent=2)}")
# 验证更新请求体格式 # 验证更新请求体格式
if "version" in update_body and "act" in update_body and "data" in update_body: if "version" in update_body and "act" in update_body and "data" in update_body:
if update_body["version"] == "1.0.0" and update_body["act"] == -1: if update_body["version"] == "1.0.0" and update_body["act"] == -1:
@ -145,17 +148,13 @@ def test_crud_stage_fixes():
else: else:
logger.error(f"❌ 更新请求体缺少必要字段: {list(update_body.keys())}") logger.error(f"❌ 更新请求体缺少必要字段: {list(update_body.keys())}")
return False return False
# 测试3: 验证删除请求体格式 # 测试3: 验证删除请求体格式
logger.info("3. 测试删除请求体格式...") logger.info("3. 测试删除请求体格式...")
# 模拟删除请求体构建 delete_body = {"version": "1.0.0", "data": ["test-id"]}
mock_scenario = {}
mock_create_payload = {"wellId": "test_well", "dataRegion": "test_region"}
delete_body = crud_stage._build_delete_request_body(mock_scenario, "dsid", "test-id", mock_create_payload)
logger.info(f"删除请求体: {json.dumps(delete_body, ensure_ascii=False, indent=2)}") logger.info(f"删除请求体: {json.dumps(delete_body, ensure_ascii=False, indent=2)}")
# 验证删除请求体格式 # 验证删除请求体格式
if "version" in delete_body and "data" in delete_body: if "version" in delete_body and "data" in delete_body:
if delete_body["version"] == "1.0.0": if delete_body["version"] == "1.0.0":
@ -166,17 +165,24 @@ def test_crud_stage_fixes():
else: else:
logger.error(f"❌ 删除请求体缺少必要字段: {list(delete_body.keys())}") logger.error(f"❌ 删除请求体缺少必要字段: {list(delete_body.keys())}")
return False return False
# 测试4: 验证查询请求体格式 # 测试4: 验证查询请求体格式
logger.info("4. 测试查询请求体格式...") logger.info("4. 测试查询请求体格式...")
# 模拟查询请求体构建 list_body = {
mock_identity_list = ["dsid"] "version": "1.0.0",
mock_pk_values = {"dsid": "test-id"} "isSearchCount": True,
"query": {
list_body = crud_stage._build_list_filter_payload(mock_identity_list, mock_pk_values) "fields": [],
"filter": {
"logic": "AND",
"realValue": [],
"subFilter": []
}
}
}
logger.info(f"查询请求体: {json.dumps(list_body, ensure_ascii=False, indent=2)}") logger.info(f"查询请求体: {json.dumps(list_body, ensure_ascii=False, indent=2)}")
# 验证查询请求体格式 # 验证查询请求体格式
if "version" in list_body and "isSearchCount" in list_body and "query" in list_body: if "version" in list_body and "isSearchCount" in list_body and "query" in list_body:
if list_body["version"] == "1.0.0": if list_body["version"] == "1.0.0":

View File

@ -1,73 +0,0 @@
#!/usr/bin/env python3
"""
测试APITestOrchestrator初始化的脚本
"""
import sys
import logging
from ddms_compliance_suite.test_orchestrator import APITestOrchestrator
def test_orchestrator_initialization():
"""测试APITestOrchestrator的初始化"""
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.info("开始测试APITestOrchestrator初始化...")
try:
# 初始化编排器
orchestrator = APITestOrchestrator(
base_url="https://www.dev.ideas.cnpc",
strictness_level="CRITICAL",
ignore_ssl=True,
enable_well_data=True
)
# 检查关键属性是否正确初始化
logger.info("✅ APITestOrchestrator初始化成功")
# 检查strictness_level属性
if hasattr(orchestrator, 'strictness_level'):
logger.info(f"✅ strictness_level属性存在: {orchestrator.strictness_level}")
else:
logger.error("❌ strictness_level属性不存在")
return False
# 检查井数据管理器
if hasattr(orchestrator, 'well_data_manager'):
if orchestrator.well_data_manager:
logger.info("✅ 井数据管理器已初始化")
else:
logger.info(" 井数据管理器未启用")
else:
logger.error("❌ well_data_manager属性不存在")
return False
# 检查其他关键属性
required_attrs = [
'base_url', 'api_caller', 'test_case_registry',
'global_api_call_details', 'ignore_ssl', 'llm_config',
'output_dir_path', 'schema_cache', 'parser'
]
for attr in required_attrs:
if hasattr(orchestrator, attr):
logger.info(f"{attr}属性存在")
else:
logger.error(f"{attr}属性不存在")
return False
logger.info("🎉 所有属性检查通过")
return True
except Exception as e:
logger.error(f"❌ APITestOrchestrator初始化失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_orchestrator_initialization()
sys.exit(0 if success else 1)

View File

@ -1,232 +0,0 @@
#!/usr/bin/env python3
"""
测试单页模式功能的脚本
"""
import sys
import json
import logging
from pathlib import Path
# 添加项目路径
sys.path.insert(0, str(Path(__file__).parent))
from ddms_compliance_suite.input_parser.parser import InputParser
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def test_single_page_mode():
"""测试单页模式功能"""
# 测试参数
domain_mapping_path = "./assets/doc/dms/domain.json" # 请根据实际路径调整
base_url = "https://www.dev.ideas.cnpc" # 请根据实际URL调整
parser = InputParser()
print("=== 测试单页模式 vs 全页模式 ===\n")
# 测试1: 单页模式 - 只获取第1页的1条记录
print("1. 测试单页模式 - 只获取第1页的1条记录")
try:
result = parser.parse_dms_spec(
domain_mapping_path=domain_mapping_path,
base_url=base_url,
ignore_ssl=True,
page_size=1,
page_no_start=1,
fetch_all_pages=False # 关键参数:只获取单页
)
if result and len(result) == 2:
parsed_spec, pagination_info = result
if parsed_spec:
print(f" ✓ 成功获取 {len(parsed_spec.endpoints)} 个API端点")
print(f" ✓ 分页信息: {pagination_info}")
print(f" ✓ 获取页数: {pagination_info.get('pages_fetched', 0)}")
print(f" ✓ 模式: {'单页' if not pagination_info.get('fetch_all_pages', True) else '全页'}")
else:
print(" ✗ 解析失败")
else:
print(" ✗ 返回格式错误")
except Exception as e:
print(f" ✗ 异常: {e}")
print("\n" + "-" * 50 + "\n")
# 测试2: 单页模式 - 获取第3页的5条记录
print("2. 测试单页模式 - 获取第3页的5条记录")
try:
result = parser.parse_dms_spec(
domain_mapping_path=domain_mapping_path,
base_url=base_url,
ignore_ssl=True,
page_size=5,
page_no_start=3,
fetch_all_pages=False # 只获取单页
)
if result and len(result) == 2:
parsed_spec, pagination_info = result
if parsed_spec:
print(f" ✓ 成功获取 {len(parsed_spec.endpoints)} 个API端点")
print(f" ✓ 起始页码: {pagination_info.get('page_no_start', 0)}")
print(f" ✓ 当前页码: {pagination_info.get('current_page', 0)}")
print(f" ✓ 获取页数: {pagination_info.get('pages_fetched', 0)}")
print(f" ✓ 模式: {'单页' if not pagination_info.get('fetch_all_pages', True) else '全页'}")
else:
print(" ✗ 解析失败")
else:
print(" ✗ 返回格式错误")
except Exception as e:
print(f" ✗ 异常: {e}")
print("\n" + "-" * 50 + "\n")
# 测试3: 全页模式对比 - 获取所有数据(小分页)
print("3. 测试全页模式对比 - 获取所有数据(分页大小=2")
try:
result = parser.parse_dms_spec(
domain_mapping_path=domain_mapping_path,
base_url=base_url,
ignore_ssl=True,
page_size=2,
page_no_start=1,
fetch_all_pages=True # 获取所有页面
)
if result and len(result) == 2:
parsed_spec, pagination_info = result
if parsed_spec:
print(f" ✓ 成功获取 {len(parsed_spec.endpoints)} 个API端点")
print(f" ✓ 总记录数: {pagination_info.get('total_records', 0)}")
print(f" ✓ 总页数: {pagination_info.get('total_pages', 0)}")
print(f" ✓ 获取页数: {pagination_info.get('pages_fetched', 0)}")
print(f" ✓ 模式: {'单页' if not pagination_info.get('fetch_all_pages', True) else '全页'}")
else:
print(" ✗ 解析失败")
else:
print(" ✗ 返回格式错误")
except Exception as e:
print(f" ✗ 异常: {e}")
def test_command_line_usage():
"""测试命令行使用方式"""
print("\n=== 命令行使用示例 ===\n")
print("1. 单页模式命令行示例:")
print(" python run_api_tests.py \\")
print(" --dms ./assets/doc/dms/domain.json \\")
print(" --base-url https://www.dev.ideas.cnpc \\")
print(" --page-size 5 \\")
print(" --page-no 3 \\")
print(" --fetch-single-page \\")
print(" --ignore-ssl")
print("\n2. 全页模式命令行示例:")
print(" python run_api_tests.py \\")
print(" --dms ./assets/doc/dms/domain.json \\")
print(" --base-url https://www.dev.ideas.cnpc \\")
print(" --page-size 1000 \\")
print(" --page-no 1 \\")
print(" --ignore-ssl")
print("\n3. FastAPI服务器示例:")
print(" # 单页模式")
print(" curl -X POST http://localhost:5051/run \\")
print(" -H 'Content-Type: application/json' \\")
print(" -d '{")
print(" \"dms\": \"./assets/doc/dms/domain.json\",")
print(" \"base_url\": \"https://www.dev.ideas.cnpc\",")
print(" \"page_size\": 5,")
print(" \"page_no\": 3,")
print(" \"fetch_all_pages\": false,")
print(" \"ignore_ssl\": true")
print(" }'")
print("\n # 全页模式")
print(" curl -X POST http://localhost:5051/run \\")
print(" -H 'Content-Type: application/json' \\")
print(" -d '{")
print(" \"dms\": \"./assets/doc/dms/domain.json\",")
print(" \"base_url\": \"https://www.dev.ideas.cnpc\",")
print(" \"page_size\": 1000,")
print(" \"page_no\": 1,")
print(" \"fetch_all_pages\": true")
print(" }'")
def test_use_cases():
"""测试不同使用场景"""
print("\n=== 使用场景说明 ===\n")
scenarios = [
{
"name": "快速测试",
"description": "只测试少量API验证系统功能",
"config": {
"page_size": 5,
"page_no": 1,
"fetch_all_pages": False
}
},
{
"name": "断点续传",
"description": "从中断的地方继续测试",
"config": {
"page_size": 100,
"page_no": 10,
"fetch_all_pages": False
}
},
{
"name": "内存受限环境",
"description": "分批处理大量API避免内存溢出",
"config": {
"page_size": 50,
"page_no": 1,
"fetch_all_pages": False
}
},
{
"name": "完整测试",
"description": "测试所有API生成完整报告",
"config": {
"page_size": 1000,
"page_no": 1,
"fetch_all_pages": True
}
}
]
for i, scenario in enumerate(scenarios, 1):
print(f"{i}. {scenario['name']}")
print(f" 描述: {scenario['description']}")
print(f" 配置: {scenario['config']}")
mode = "单页模式" if not scenario['config']['fetch_all_pages'] else "全页模式"
print(f" 模式: {mode}")
print()
if __name__ == "__main__":
logger.info("开始测试单页模式功能")
# 测试单页模式功能
test_single_page_mode()
# 显示命令行使用示例
test_command_line_usage()
# 显示使用场景
test_use_cases()
logger.info("测试完成")

View File

@ -1,165 +0,0 @@
#!/usr/bin/env python3
"""
测试井数据集成和DMS请求体格式的脚本
"""
import sys
import logging
import json
from ddms_compliance_suite.utils.data_generator import DataGenerator
from ddms_compliance_suite.utils.well_data_manager import WellDataManager
def test_well_data_integration():
"""测试井数据集成功能"""
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.info("开始测试井数据集成功能...")
# 模拟井数据管理器因为无法连接真实API
class MockWellDataManager:
def __init__(self):
self.well_data = [
{"wellId": "HB00019975", "wellCommonName": "郑4-106"},
{"wellId": "HB00019976", "wellCommonName": "郑4-107"}
]
self.wellbore_data = [
{"wellboreId": "WEBHHB100083169", "wellboreCommonName": "郑4-106主井筒"},
{"wellboreId": "WEBHHB100083170", "wellboreCommonName": "郑4-107主井筒"}
]
def is_well_related_field(self, field_name):
return field_name in ['wellId', 'wellboreId', 'wellCommonName']
def get_well_value_for_field(self, field_name):
if field_name == 'wellId':
return self.well_data[0]['wellId']
elif field_name == 'wellboreId':
return self.wellbore_data[0]['wellboreId']
elif field_name == 'wellCommonName':
return self.well_data[0]['wellCommonName']
return None
def enhance_data_with_well_values(self, data):
if not isinstance(data, dict):
return data
enhanced_data = data.copy()
for field_name, value in data.items():
if self.is_well_related_field(field_name):
real_value = self.get_well_value_for_field(field_name)
if real_value is not None:
enhanced_data[field_name] = real_value
logger.info(f"🔄 替换字段 '{field_name}': {value} -> {real_value}")
return enhanced_data
# 创建模拟的井数据管理器
mock_well_manager = MockWellDataManager()
# 创建数据生成器
data_generator = DataGenerator(logger_param=logger, well_data_manager=mock_well_manager)
# 测试1: 测试DMS创建请求体格式
logger.info("1. 测试DMS创建请求体格式...")
create_schema = {
"type": "object",
"properties": {
"version": {"type": "string", "example": "1.0.0"},
"act": {"type": "integer", "example": -1},
"data": {
"type": "array",
"items": {
"type": "object",
"properties": {
"wellId": {"type": "string"},
"wellboreId": {"type": "string"},
"wellCommonName": {"type": "string"},
"dataRegion": {"type": "string"},
"bsflag": {"type": "integer"}
}
}
}
},
"required": ["data"]
}
create_data = data_generator.generate_data_from_schema(create_schema, "create_request", "create_test")
logger.info(f"创建请求体: {json.dumps(create_data, ensure_ascii=False, indent=2)}")
# 验证创建请求体格式
assert "version" in create_data, "创建请求体缺少version字段"
assert "act" in create_data, "创建请求体缺少act字段"
assert "data" in create_data, "创建请求体缺少data字段"
assert create_data["act"] == -1, f"创建请求体act字段值错误期望-1实际{create_data['act']}"
# 验证井数据是否被正确应用
if create_data["data"] and len(create_data["data"]) > 0:
first_item = create_data["data"][0]
if "wellId" in first_item:
assert first_item["wellId"] == "HB00019975", f"wellId未使用真实值实际值: {first_item['wellId']}"
logger.info("✅ wellId使用了真实值")
if "wellCommonName" in first_item:
assert first_item["wellCommonName"] == "郑4-106", f"wellCommonName未使用真实值实际值: {first_item['wellCommonName']}"
logger.info("✅ wellCommonName使用了真实值")
# 测试2: 测试DMS删除请求体格式
logger.info("2. 测试DMS删除请求体格式...")
delete_schema = {
"type": "object",
"properties": {
"version": {"type": "string", "example": "1.0.0"},
"data": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["data"]
}
delete_data = data_generator.generate_data_from_schema(delete_schema, "delete_request", "delete_test")
logger.info(f"删除请求体: {json.dumps(delete_data, ensure_ascii=False, indent=2)}")
# 验证删除请求体格式
assert "version" in delete_data, "删除请求体缺少version字段"
assert "data" in delete_data, "删除请求体缺少data字段"
assert delete_data["version"] == "1.0.0", f"删除请求体version字段值错误期望1.0.0,实际{delete_data['version']}"
# 测试3: 测试查询请求体格式
logger.info("3. 测试查询请求体格式...")
list_schema = {
"type": "object",
"properties": {
"version": {"type": "string", "example": "1.0.0"},
"isSearchCount": {"type": "boolean", "example": True},
"query": {
"type": "object",
"properties": {
"fields": {"type": "array", "items": {"type": "string"}},
"filter": {"type": "object"}
}
}
}
}
list_data = data_generator.generate_data_from_schema(list_schema, "list_request", "list_test")
logger.info(f"查询请求体: {json.dumps(list_data, ensure_ascii=False, indent=2)}")
# 验证查询请求体格式
assert "version" in list_data, "查询请求体缺少version字段"
assert list_data["version"] == "1.0.0", f"查询请求体version字段值错误期望1.0.0,实际{list_data['version']}"
logger.info("🎉 所有测试通过!")
return True
if __name__ == "__main__":
try:
success = test_well_data_integration()
sys.exit(0 if success else 1)
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -1,96 +0,0 @@
#!/usr/bin/env python3
"""
测试井数据管理器功能的脚本
"""
import sys
import logging
from ddms_compliance_suite.utils.well_data_manager import WellDataManager
def test_well_data_manager():
"""测试井数据管理器的基本功能"""
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 初始化井数据管理器
base_url = "https://www.dev.ideas.cnpc"
well_manager = WellDataManager(
base_url=base_url,
ignore_ssl=True,
logger=logger
)
logger.info("开始测试井数据管理器...")
# 测试初始化井数据
logger.info("1. 测试井数据初始化...")
success = well_manager.initialize_well_data()
if success:
logger.info("✅ 井数据初始化成功")
# 获取数据摘要
summary = well_manager.get_well_data_summary()
logger.info(f"📊 井数据摘要: {summary}")
# 测试获取随机井数据
logger.info("2. 测试获取随机井数据...")
well_data = well_manager.get_random_well_data()
if well_data:
logger.info(f"✅ 获取到井数据: wellId={well_data.get('wellId')}, wellCommonName={well_data.get('wellCommonName')}")
else:
logger.warning("❌ 未获取到井数据")
# 测试获取随机井筒数据
logger.info("3. 测试获取随机井筒数据...")
wellbore_data = well_manager.get_random_wellbore_data()
if wellbore_data:
logger.info(f"✅ 获取到井筒数据: wellboreId={wellbore_data.get('wellboreId')}, wellboreCommonName={wellbore_data.get('wellboreCommonName')}")
else:
logger.warning("❌ 未获取到井筒数据")
# 测试字段值获取
logger.info("4. 测试字段值获取...")
test_fields = ['wellId', 'wellboreId', 'wellCommonName', 'wellboreCommonName']
for field in test_fields:
value = well_manager.get_well_value_for_field(field)
if value:
logger.info(f"{field}: {value}")
else:
logger.warning(f"{field}: 未获取到值")
# 测试数据增强
logger.info("5. 测试数据增强...")
test_data = {
'wellId': 'mock_well_id',
'wellboreId': 'mock_wellbore_id',
'wellCommonName': 'mock_well_name',
'otherField': 'other_value'
}
enhanced_data = well_manager.enhance_data_with_well_values(test_data)
logger.info(f"原始数据: {test_data}")
logger.info(f"增强数据: {enhanced_data}")
# 检查是否有真实数据替换了模拟数据
changes = []
for key in test_data:
if test_data[key] != enhanced_data.get(key):
changes.append(f"{key}: {test_data[key]} -> {enhanced_data.get(key)}")
if changes:
logger.info(f"✅ 数据增强成功,替换了以下字段: {', '.join(changes)}")
else:
logger.warning("❌ 数据增强未发生变化")
else:
logger.error("❌ 井数据初始化失败")
return False
logger.info("井数据管理器测试完成")
return True
if __name__ == "__main__":
success = test_well_data_manager()
sys.exit(0 if success else 1)