fix 真实well数据覆盖

This commit is contained in:
gongwenxin 2025-08-19 18:37:53 +08:00
parent d9bbdbb3ae
commit 30bb61867a
4 changed files with 471 additions and 208 deletions

View File

@ -411,9 +411,21 @@ class DmsCrudScenarioStage(BaseAPIStage):
self.logger.warning("No create schema found. Falling back to a minimal payload.")
# 🔑 关键修复:应用井数据增强到创建数据
self.logger.info(f"🔍 调试井数据管理器条件判断:")
self.logger.info(f" - hasattr(self, 'orchestrator'): {hasattr(self, 'orchestrator')}")
if hasattr(self, 'orchestrator'):
self.logger.info(f" - hasattr(self.orchestrator, 'well_data_manager'): {hasattr(self.orchestrator, 'well_data_manager')}")
if hasattr(self.orchestrator, 'well_data_manager'):
self.logger.info(f" - self.orchestrator.well_data_manager: {self.orchestrator.well_data_manager}")
self.logger.info(f" - type(self.orchestrator.well_data_manager): {type(self.orchestrator.well_data_manager)}")
self.logger.info(f" - bool(self.orchestrator.well_data_manager): {bool(self.orchestrator.well_data_manager)}")
if hasattr(self, 'orchestrator') and hasattr(self.orchestrator, 'well_data_manager') and self.orchestrator.well_data_manager:
self.logger.info(f"✅ 井数据管理器条件判断通过,开始增强数据")
create_payload = self.orchestrator.well_data_manager.enhance_data_with_well_values(create_payload)
self.logger.info(f"创建数据已通过井数据管理器增强")
self.logger.info(f"✅ 创建数据已通过井数据管理器增强")
else:
self.logger.warning(f"❌ 井数据管理器条件判断失败,跳过井数据增强")
# 更新负载基于创建负载,但修改描述字段
update_payload = copy.deepcopy(create_payload)
@ -525,7 +537,7 @@ class DmsCrudScenarioStage(BaseAPIStage):
"request_body": "{{stage_context.list_filter_payload}}"
},
response_assertions=[validate_resource_details_after_update]
),
),
StageStepDefinition(
name="Step 5: Delete Resource",
endpoint_spec_lookup_key="DELETE",

View File

@ -1,206 +0,0 @@
#!/usr/bin/env python3
"""
测试FastAPI服务器功能的脚本
"""
import json
import time
import requests
import subprocess
import signal
import os
from pathlib import Path
def test_fastapi_server():
"""测试FastAPI服务器功能"""
print("=== FastAPI服务器功能测试 ===")
# 启动服务器
print("[信息] 启动FastAPI服务器...")
server_process = subprocess.Popen([
"python3", "fastapi_server.py",
"--host", "127.0.0.1",
"--port", "5051"
])
try:
# 等待服务器启动
print("[信息] 等待服务器启动...")
time.sleep(3)
base_url = "http://127.0.0.1:5051"
# 测试健康检查
print("\n1. 测试健康检查...")
try:
response = requests.get(f"{base_url}/", timeout=5)
if response.status_code == 200:
data = response.json()
print(f" ✓ 健康检查成功: {data.get('status', 'unknown')}")
print(f" ✓ 服务版本: {data.get('version', 'unknown')}")
else:
print(f" ✗ 健康检查失败: {response.status_code}")
except Exception as e:
print(f" ✗ 健康检查异常: {e}")
# 测试服务信息
print("\n2. 测试服务信息...")
try:
response = requests.get(f"{base_url}/info", timeout=5)
if response.status_code == 200:
data = response.json()
print(f" ✓ 服务信息获取成功")
print(f" ✓ 框架: {data.get('framework', 'unknown')}")
print(f" ✓ 功能数量: {len(data.get('features', []))}")
else:
print(f" ✗ 服务信息获取失败: {response.status_code}")
except Exception as e:
print(f" ✗ 服务信息获取异常: {e}")
# 测试API文档
print("\n3. 测试API文档...")
try:
# 测试Swagger UI
response = requests.get(f"{base_url}/docs", timeout=5)
if response.status_code == 200:
print(" ✓ Swagger UI 可访问")
else:
print(f" ✗ Swagger UI 访问失败: {response.status_code}")
# 测试ReDoc
response = requests.get(f"{base_url}/redoc", timeout=5)
if response.status_code == 200:
print(" ✓ ReDoc 可访问")
else:
print(f" ✗ ReDoc 访问失败: {response.status_code}")
# 测试OpenAPI规范
response = requests.get(f"{base_url}/openapi.json", timeout=5)
if response.status_code == 200:
openapi_spec = response.json()
print(" ✓ OpenAPI规范可获取")
print(f" ✓ API标题: {openapi_spec.get('info', {}).get('title', 'unknown')}")
print(f" ✓ 端点数量: {len(openapi_spec.get('paths', {}))}")
else:
print(f" ✗ OpenAPI规范获取失败: {response.status_code}")
except Exception as e:
print(f" ✗ API文档测试异常: {e}")
# 测试数据验证
print("\n4. 测试数据验证...")
try:
# 测试无效请求
invalid_config = {
"base_url": "invalid-url", # 无效URL
"page_size": 0 # 无效分页大小
}
response = requests.post(f"{base_url}/run", json=invalid_config, timeout=5)
if response.status_code == 422: # Validation Error
print(" ✓ 数据验证正常工作")
error_detail = response.json()
print(f" ✓ 验证错误数量: {len(error_detail.get('detail', []))}")
else:
print(f" ✗ 数据验证异常: {response.status_code}")
except Exception as e:
print(f" ✗ 数据验证测试异常: {e}")
# 测试报告列表
print("\n5. 测试报告列表...")
try:
response = requests.get(f"{base_url}/reports", timeout=5)
if response.status_code == 200:
data = response.json()
print(" ✓ 报告列表获取成功")
print(f" ✓ 报告数量: {len(data.get('reports', []))}")
else:
print(f" ✗ 报告列表获取失败: {response.status_code}")
except Exception as e:
print(f" ✗ 报告列表测试异常: {e}")
print("\n=== 测试完成 ===")
print(f"FastAPI服务器运行在: {base_url}")
print(f"API文档地址: {base_url}/docs")
print(f"ReDoc地址: {base_url}/redoc")
finally:
# 停止服务器
print("\n[信息] 停止服务器...")
server_process.terminate()
try:
server_process.wait(timeout=5)
except subprocess.TimeoutExpired:
server_process.kill()
server_process.wait()
print("[信息] 服务器已停止")
def test_pagination_parameters():
"""测试分页参数功能"""
print("\n=== 分页参数测试 ===")
# 测试配置
test_configs = [
{
"name": "基本配置",
"config": {
"dms": "./test.json",
"base_url": "https://api.example.com",
"page_size": 100,
"page_no": 1
}
},
{
"name": "大分页配置",
"config": {
"dms": "./test.json",
"base_url": "https://api.example.com",
"page_size": 5000,
"page_no": 1
}
},
{
"name": "跳页配置",
"config": {
"dms": "./test.json",
"base_url": "https://api.example.com",
"page_size": 500,
"page_no": 5
}
}
]
for test_case in test_configs:
print(f"\n测试: {test_case['name']}")
config = test_case['config']
# 验证配置格式
try:
from fastapi_server import TestConfig
validated_config = TestConfig(**config)
print(f" ✓ 配置验证通过")
print(f" ✓ 页面大小: {validated_config.page_size}")
print(f" ✓ 起始页码: {validated_config.page_no}")
except Exception as e:
print(f" ✗ 配置验证失败: {e}")
if __name__ == "__main__":
print("开始FastAPI功能测试")
# 检查依赖
try:
import fastapi
import uvicorn
import pydantic
print(f"FastAPI版本: {fastapi.__version__}")
print(f"Pydantic版本: {pydantic.__version__}")
except ImportError as e:
print(f"依赖缺失: {e}")
print("请运行: pip install -r requirements_fastapi.txt")
exit(1)
# 运行测试
test_fastapi_server()
test_pagination_parameters()
print("\n测试完成!")

279
test_final_well_data_fix.py Normal file
View File

@ -0,0 +1,279 @@
#!/usr/bin/env python3
"""
测试最终井数据修复的脚本
"""
import sys
import logging
import json
def test_final_well_data_fix():
"""测试最终的井数据修复方案"""
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.info("开始测试最终井数据修复方案...")
# 模拟井数据管理器
class MockWellDataManager:
def __init__(self):
self.well_data = [
{"wellId": "HB00019975", "wellCommonName": "郑4-106"},
]
self.wellbore_data = [
{"wellboreId": "WEBHHB100083169", "wellboreCommonName": "郑4-106主井筒"},
]
def is_well_related_field(self, field_name):
return field_name in ['wellId', 'wellboreId', 'wellCommonName']
def get_well_value_for_field(self, field_name):
if field_name == 'wellId':
return self.well_data[0]['wellId']
elif field_name == 'wellboreId':
return self.wellbore_data[0]['wellboreId']
elif field_name == 'wellCommonName':
return self.well_data[0]['wellCommonName']
return None
def enhance_data_with_well_values(self, data):
if not isinstance(data, dict):
return data
enhanced_data = data.copy()
for field_name, value in data.items():
if self.is_well_related_field(field_name):
real_value = self.get_well_value_for_field(field_name)
if real_value is not None:
enhanced_data[field_name] = real_value
logger.info(f"🔄 替换字段 '{field_name}': {value} -> {real_value}")
return enhanced_data
# 模拟编排器
class MockOrchestrator:
def __init__(self):
self.well_data_manager = MockWellDataManager()
# 模拟CRUD场景的before_stage逻辑
class MockCrudStage:
def __init__(self):
self.orchestrator = MockOrchestrator()
self.logger = logger
# 测试1: 模拟完整的before_stage流程
logger.info("1. 测试完整的before_stage流程...")
# 模拟LLM生成的原始数据
llm_generated_data = {
'seq': 1,
'tvd': 1500.5,
'dsid': '0c61f6b0-d845-492a-a0e4-110705fac142',
'XAxis': 123456.78,
'YAxis': 987654.32,
'phase': '开发阶段',
'bsflag': 1,
'tester': '张三',
'wellId': 'WELL001', # LLM生成的模拟值
'azimuth': 45.6,
'phaseId': 'DEV',
'remarks': '测斜数据正常',
'testType': '常规测斜',
'avgDogleg': 2.5,
'checkDate': '2023-10-15T10:30:00',
'dataGroup': '测斜数据组',
'startDate': '2023-10-01',
'versionNo': '1.0',
'createDate': '2023-10-15T09:00:00',
'dataRegion': '大庆油田',
'testTypeId': 'NORMAL',
'updateDate': '2023-10-15T09:00:00',
'wellboreId': 'WB001', # LLM生成的模拟值
'checkUserId': 'USER001',
'createAppId': 'DMS系统',
'description': '测斜轨迹测斜记录',
'createUserId': 'USER001',
'updateUserId': 'USER001',
'angleDeviation': 15.3,
'closureAzimuth': 46.2,
'directionalWay': '定向钻井',
'wellCommonName': '大庆1井', # LLM生成的模拟值
'closureDistance': 1200.75,
'horiDisplacemen': 800.25,
'surveypointStep': 30,
'avgRateDeviation': 1.2,
'eastWestDisplace': 600.5,
'sourceCreateDate': '2023-10-01T08:00:00',
'surveyPointDepth': 1500,
'southNorthDisplace': 700.3,
'wellboreCommonName': '大庆1井-主井筒', # LLM生成的模拟值
'overallAngleChangeRate': 1.8,
'rateDirectionChangeMean': 0.9
}
logger.info(f"LLM生成的原始数据: {json.dumps(llm_generated_data, ensure_ascii=False, indent=2)}")
# 模拟CRUD场景的before_stage逻辑
crud_stage = MockCrudStage()
create_payload = llm_generated_data.copy()
# 🔑 关键修复:应用井数据增强到创建数据
if hasattr(crud_stage, 'orchestrator') and hasattr(crud_stage.orchestrator, 'well_data_manager') and crud_stage.orchestrator.well_data_manager:
create_payload = crud_stage.orchestrator.well_data_manager.enhance_data_with_well_values(create_payload)
logger.info(f"创建数据已通过井数据管理器增强")
# 更新负载基于创建负载,但修改描述字段
import copy
update_payload = copy.deepcopy(create_payload)
update_payload["description"] = "updated-test-entry-from-scenario"
logger.info(f"井数据增强后的创建数据: {json.dumps(create_payload, ensure_ascii=False, indent=2)}")
logger.info(f"井数据增强后的更新数据: {json.dumps(update_payload, ensure_ascii=False, indent=2)}")
# 模拟stage_context
stage_context = {
"pk_name": "dsid",
"pk_value": "0c61f6b0-d845-492a-a0e4-110705fac142",
"current_payload": create_payload,
"update_payload": update_payload
}
# 测试2: 验证请求体模板解析
logger.info("2. 测试请求体模板解析...")
# 模拟测试编排器的模板解析逻辑
def mock_resolve_template(template, context):
"""模拟测试编排器的模板解析"""
if isinstance(template, str) and template == "{{stage_context.current_payload}}":
return context["current_payload"]
elif isinstance(template, str) and template == "{{stage_context.update_payload}}":
return context["update_payload"]
elif isinstance(template, list):
return [mock_resolve_template(item, context) for item in template]
elif isinstance(template, dict):
return {k: mock_resolve_template(v, context) for k, v in template.items()}
else:
return template
# 测试创建请求体模板
create_body_template = {
"version": "1.0.0",
"act": -1,
"data": ["{{stage_context.current_payload}}"]
}
resolved_create_body = mock_resolve_template(create_body_template, stage_context)
logger.info(f"解析后的创建请求体: {json.dumps(resolved_create_body, ensure_ascii=False, indent=2)}")
# 测试更新请求体模板
update_body_template = {
"version": "1.0.0",
"act": -1,
"data": ["{{stage_context.update_payload}}"]
}
resolved_update_body = mock_resolve_template(update_body_template, stage_context)
logger.info(f"解析后的更新请求体: {json.dumps(resolved_update_body, ensure_ascii=False, indent=2)}")
# 测试3: 验证最终效果
logger.info("3. 验证最终效果...")
# 验证井数据是否被正确应用
expected_replacements = {
'wellId': 'HB00019975',
'wellboreId': 'WEBHHB100083169',
'wellCommonName': '郑4-106'
}
all_correct = True
# 验证创建请求体
create_data = resolved_create_body["data"][0]
for field, expected_value in expected_replacements.items():
if create_data.get(field) == expected_value:
logger.info(f"✅ 创建请求体 - {field} 正确替换为真实值: {expected_value}")
else:
logger.error(f"❌ 创建请求体 - {field} 未正确替换,期望: {expected_value}, 实际: {create_data.get(field)}")
all_correct = False
# 验证更新请求体
update_data = resolved_update_body["data"][0]
for field, expected_value in expected_replacements.items():
if update_data.get(field) == expected_value:
logger.info(f"✅ 更新请求体 - {field} 正确替换为真实值: {expected_value}")
else:
logger.error(f"❌ 更新请求体 - {field} 未正确替换,期望: {expected_value}, 实际: {update_data.get(field)}")
all_correct = False
# 验证DMS格式
dms_format_checks = [
("创建请求体version", resolved_create_body.get("version") == "1.0.0"),
("创建请求体act", resolved_create_body.get("act") == -1),
("创建请求体data存在", "data" in resolved_create_body),
("更新请求体version", resolved_update_body.get("version") == "1.0.0"),
("更新请求体act", resolved_update_body.get("act") == -1),
("更新请求体data存在", "data" in resolved_update_body),
("更新description字段", update_data.get("description") == "updated-test-entry-from-scenario"),
]
for check_name, result in dms_format_checks:
if result:
logger.info(f"{check_name}: 通过")
else:
logger.error(f"{check_name}: 失败")
all_correct = False
# 测试4: 模拟curl命令生成
logger.info("4. 模拟curl命令生成...")
create_curl = f"""curl -X POST -H "Content-Type: application/json" \\
-d '{json.dumps(resolved_create_body, ensure_ascii=False)}' \\
--insecure \\
https://www.dev.ideas.cnpc/api/dms/well_kd_wellbore_ideas01/v1/dr_ach_survey_inc"""
update_curl = f"""curl -X PUT -H "Content-Type: application/json" \\
-d '{json.dumps(resolved_update_body, ensure_ascii=False)}' \\
--insecure \\
https://www.dev.ideas.cnpc/api/dms/well_kd_wellbore_ideas01/v1/dr_ach_survey_inc"""
logger.info(f"创建curl命令:\n{create_curl}")
logger.info(f"更新curl命令:\n{update_curl}")
# 验证curl命令中的井数据
curl_checks = [
("创建curl包含真实wellId", "HB00019975" in create_curl),
("创建curl包含真实wellboreId", "WEBHHB100083169" in create_curl),
("创建curl包含真实wellCommonName", "郑4-106" in create_curl),
("创建curl不包含模拟wellId", "WELL001" not in create_curl),
("更新curl包含真实wellId", "HB00019975" in update_curl),
("更新curl包含真实wellboreId", "WEBHHB100083169" in update_curl),
("更新curl包含真实wellCommonName", "郑4-106" in update_curl),
("更新curl不包含模拟wellId", "WELL001" not in update_curl),
]
for check_name, result in curl_checks:
if result:
logger.info(f"✅ curl命令 - {check_name}: 通过")
else:
logger.error(f"❌ curl命令 - {check_name}: 失败")
all_correct = False
# 总结
if all_correct:
logger.info("🎉 所有测试通过!最终井数据修复方案正常工作")
return True
else:
logger.error("❌ 部分测试失败")
return False
if __name__ == "__main__":
try:
success = test_final_well_data_fix()
sys.exit(0 if success else 1)
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

178
test_well_data_debug.py Normal file
View File

@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""
直接测试井数据调试的脚本
"""
import sys
import os
import logging
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from ddms_compliance_suite.test_orchestrator import TestOrchestrator
from ddms_compliance_suite.input_parser.parser import ParsedDMSSpec
from custom_stages.dms_crud_scenario_stage import DmsCrudScenarioStage
def test_well_data_debug():
"""直接测试井数据调试"""
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.info("开始直接测试井数据调试...")
try:
# 创建一个简单的API规范
simple_api_spec = {
"dms_api_list": [
{
"method": "POST",
"path": "/api/dms/well_kd_wellbore_ideas01/v1/dr_ach_survey_inc",
"title": "Create dr_ach_survey_inc",
"summary": "Create a new dr_ach_survey_inc record",
"description": "Create a new dr_ach_survey_inc record",
"operationId": "create_dr_ach_survey_inc",
"tags": ["dr_ach_survey_inc"],
"parameters": [],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"version": {"type": "string", "example": "1.0.0"},
"act": {"type": "integer", "example": -1},
"data": {
"type": "array",
"items": {
"type": "object",
"properties": {
"dsid": {"type": "string", "title": "数据ID"},
"wellId": {"type": "string", "title": "井标识符"},
"wellboreId": {"type": "string", "title": "井筒唯一标识符"},
"wellCommonName": {"type": "string", "title": "井名"},
"dataRegion": {"type": "string", "title": "数据标识"},
"description": {"type": "string", "title": "测斜描述"},
"bsflag": {"type": "number", "title": "逻辑删除标识"}
},
"required": ["dsid", "dataRegion", "bsflag"]
}
}
}
}
}
}
},
"responses": {
"200": {
"description": "Success",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"code": {"type": "integer"},
"message": {"type": "string"},
"data": {"type": "object"}
}
}
}
}
}
},
"_test_mode": "scenario_only",
"_dms_model_pk_name": "dsid"
}
]
}
# 创建ParsedDMSSpec
parsed_spec = ParsedDMSSpec(simple_api_spec)
# 创建测试编排器
orchestrator = TestOrchestrator(
global_api_spec=parsed_spec,
base_url="https://www.dev.ideas.cnpc",
logger=logger
)
logger.info(f"测试编排器创建成功")
logger.info(f"orchestrator.well_data_manager: {getattr(orchestrator, 'well_data_manager', 'NOT_FOUND')}")
logger.info(f"type(orchestrator.well_data_manager): {type(getattr(orchestrator, 'well_data_manager', None))}")
# 创建CRUD场景
crud_stage = DmsCrudScenarioStage()
crud_stage.orchestrator = orchestrator
crud_stage.logger = logger
logger.info(f"CRUD场景创建成功")
logger.info(f"crud_stage.orchestrator: {crud_stage.orchestrator}")
logger.info(f"hasattr(crud_stage, 'orchestrator'): {hasattr(crud_stage, 'orchestrator')}")
if hasattr(crud_stage, 'orchestrator'):
logger.info(f"hasattr(crud_stage.orchestrator, 'well_data_manager'): {hasattr(crud_stage.orchestrator, 'well_data_manager')}")
if hasattr(crud_stage.orchestrator, 'well_data_manager'):
logger.info(f"crud_stage.orchestrator.well_data_manager: {crud_stage.orchestrator.well_data_manager}")
logger.info(f"type(crud_stage.orchestrator.well_data_manager): {type(crud_stage.orchestrator.well_data_manager)}")
logger.info(f"bool(crud_stage.orchestrator.well_data_manager): {bool(crud_stage.orchestrator.well_data_manager)}")
# 测试条件判断
condition_result = (hasattr(crud_stage, 'orchestrator') and
hasattr(crud_stage.orchestrator, 'well_data_manager') and
crud_stage.orchestrator.well_data_manager)
logger.info(f"完整条件判断结果: {condition_result}")
if condition_result:
logger.info("✅ 条件判断通过,井数据管理器可用")
# 测试井数据增强
test_data = {
'dsid': 'test-123',
'wellId': 'WELL001',
'wellboreId': 'WB001',
'wellCommonName': '大庆1井',
'dataRegion': '大庆油田',
'description': '测试数据',
'bsflag': 1
}
logger.info(f"原始测试数据: {test_data}")
enhanced_data = crud_stage.orchestrator.well_data_manager.enhance_data_with_well_values(test_data)
logger.info(f"增强后数据: {enhanced_data}")
else:
logger.warning("❌ 条件判断失败,井数据管理器不可用")
# 检查井数据管理器初始化
logger.info("尝试手动初始化井数据管理器...")
try:
orchestrator._initialize_well_data_manager()
logger.info(f"手动初始化后 - orchestrator.well_data_manager: {orchestrator.well_data_manager}")
logger.info(f"手动初始化后 - type: {type(orchestrator.well_data_manager)}")
logger.info(f"手动初始化后 - bool: {bool(orchestrator.well_data_manager)}")
except Exception as e:
logger.error(f"手动初始化井数据管理器失败: {e}")
logger.info("测试完成")
except Exception as e:
logger.error(f"测试失败: {e}")
import traceback
traceback.print_exc()
return False
return True
if __name__ == "__main__":
try:
success = test_well_data_debug()
sys.exit(0 if success else 1)
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)