275 lines
9.1 KiB
Python
275 lines
9.1 KiB
Python
"""
|
||
井数据管理器模块
|
||
|
||
负责在测试开始前预获取井和井筒的真实数据,并在测试过程中提供这些真实值
|
||
用于替换wellId、wellboreId、wellCommonName等参数的模拟值
|
||
"""
|
||
|
||
import logging
|
||
import json
|
||
import requests
|
||
from typing import Dict, List, Any, Optional, Tuple
|
||
from urllib.parse import urljoin
|
||
import random
|
||
|
||
class WellDataManager:
|
||
"""
|
||
井数据管理器
|
||
|
||
负责:
|
||
1. 在测试开始前从指定的API获取井和井筒的真实数据
|
||
2. 缓存这些数据供后续测试使用
|
||
3. 在生成测试数据时提供真实的井相关参数值
|
||
"""
|
||
|
||
def __init__(self, base_url: str, ignore_ssl: bool = False, logger: Optional[logging.Logger] = None):
|
||
"""
|
||
初始化井数据管理器
|
||
|
||
Args:
|
||
base_url: API基础URL
|
||
ignore_ssl: 是否忽略SSL证书验证
|
||
logger: 日志记录器
|
||
"""
|
||
self.base_url = base_url.rstrip('/')
|
||
self.ignore_ssl = ignore_ssl
|
||
self.logger = logger or logging.getLogger(__name__)
|
||
|
||
# 缓存的井数据
|
||
self.well_data: List[Dict[str, Any]] = []
|
||
self.wellbore_data: List[Dict[str, Any]] = []
|
||
|
||
# 井数据API端点配置
|
||
self.well_api_config = {
|
||
"domain": "wb_cd",
|
||
"name": "cd_well",
|
||
"version": "1.0.0",
|
||
"path": "/api/dms/well_kd_wellbore_ideas01/v1/cd_well/1.0.0"
|
||
}
|
||
|
||
self.wellbore_api_config = {
|
||
"domain": "wb_cd",
|
||
"name": "cd_wellbore",
|
||
"version": "1.0.0",
|
||
"path": "/api/dms/well_kd_wellbore_ideas01/v1/cd_wellbore/1.0.0"
|
||
}
|
||
|
||
# 井相关字段名称(严格按照接口定义)
|
||
self.well_field_names = {
|
||
'wellId', # 井ID
|
||
'wellboreId', # 井筒ID
|
||
'wellCommonName' # 井通用名称
|
||
}
|
||
|
||
def fetch_well_data(self) -> bool:
|
||
"""
|
||
获取井基本信息数据
|
||
|
||
Returns:
|
||
bool: 是否成功获取数据
|
||
"""
|
||
try:
|
||
url = urljoin(self.base_url, self.well_api_config["path"])
|
||
|
||
headers = {
|
||
"Accept-Encoding": "gzip, deflate",
|
||
"Accept": "application/json",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# 发送POST请求获取井数据
|
||
response = requests.post(
|
||
url,
|
||
headers=headers,
|
||
json={}, # 空的JSON body
|
||
verify=not self.ignore_ssl,
|
||
timeout=30
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('code') == 0 and 'data' in data and 'list' in data['data']:
|
||
self.well_data = data['data']['list']
|
||
self.logger.info(f"成功获取 {len(self.well_data)} 条井基本信息数据")
|
||
return True
|
||
else:
|
||
self.logger.error(f"井数据API返回错误: {data.get('message', '未知错误')}")
|
||
return False
|
||
else:
|
||
self.logger.error(f"获取井数据失败,HTTP状态码: {response.status_code}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"获取井数据时发生异常: {str(e)}")
|
||
return False
|
||
|
||
def fetch_wellbore_data(self) -> bool:
|
||
"""
|
||
获取井筒基本信息数据
|
||
|
||
Returns:
|
||
bool: 是否成功获取数据
|
||
"""
|
||
try:
|
||
url = urljoin(self.base_url, self.wellbore_api_config["path"])
|
||
|
||
headers = {
|
||
"Accept-Encoding": "gzip, deflate",
|
||
"Accept": "application/json",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# 发送POST请求获取井筒数据
|
||
response = requests.post(
|
||
url,
|
||
headers=headers,
|
||
json={}, # 空的JSON body
|
||
verify=not self.ignore_ssl,
|
||
timeout=30
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('code') == 0 and 'data' in data and 'list' in data['data']:
|
||
self.wellbore_data = data['data']['list']
|
||
self.logger.info(f"成功获取 {len(self.wellbore_data)} 条井筒基本信息数据")
|
||
return True
|
||
else:
|
||
self.logger.error(f"井筒数据API返回错误: {data.get('message', '未知错误')}")
|
||
return False
|
||
else:
|
||
self.logger.error(f"获取井筒数据失败,HTTP状态码: {response.status_code}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"获取井筒数据时发生异常: {str(e)}")
|
||
return False
|
||
|
||
def initialize_well_data(self) -> bool:
|
||
"""
|
||
初始化井数据,获取井和井筒的真实数据
|
||
|
||
Returns:
|
||
bool: 是否成功初始化所有数据
|
||
"""
|
||
self.logger.info("开始初始化井数据...")
|
||
|
||
well_success = self.fetch_well_data()
|
||
wellbore_success = self.fetch_wellbore_data()
|
||
|
||
if well_success and wellbore_success:
|
||
self.logger.info("井数据初始化完成")
|
||
return True
|
||
else:
|
||
self.logger.warning("井数据初始化部分失败,某些测试可能使用模拟数据")
|
||
return False
|
||
|
||
def get_random_well_data(self) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取随机的井数据
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 井数据字典,如果没有数据则返回None
|
||
"""
|
||
if not self.well_data:
|
||
return None
|
||
return random.choice(self.well_data)
|
||
|
||
def get_random_wellbore_data(self) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取随机的井筒数据
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 井筒数据字典,如果没有数据则返回None
|
||
"""
|
||
if not self.wellbore_data:
|
||
return None
|
||
return random.choice(self.wellbore_data)
|
||
|
||
def get_well_value_for_field(self, field_name: str) -> Optional[Any]:
|
||
"""
|
||
根据字段名获取井相关的真实值
|
||
|
||
Args:
|
||
field_name: 字段名称(严格匹配)
|
||
|
||
Returns:
|
||
Optional[Any]: 对应的真实值,如果没有找到则返回None
|
||
"""
|
||
# 严格按照字段名匹配
|
||
if field_name == 'wellId':
|
||
well_data = self.get_random_well_data()
|
||
if well_data:
|
||
return well_data.get('wellId')
|
||
|
||
elif field_name == 'wellboreId':
|
||
wellbore_data = self.get_random_wellbore_data()
|
||
if wellbore_data:
|
||
return wellbore_data.get('wellboreId')
|
||
|
||
elif field_name == 'wellCommonName':
|
||
well_data = self.get_random_well_data()
|
||
if well_data:
|
||
return well_data.get('wellCommonName')
|
||
|
||
return None
|
||
|
||
def is_well_related_field(self, field_name: str) -> bool:
|
||
"""
|
||
判断字段是否与井相关
|
||
|
||
Args:
|
||
field_name: 字段名称
|
||
|
||
Returns:
|
||
bool: 是否为井相关字段
|
||
"""
|
||
return field_name in self.well_field_names
|
||
|
||
def enhance_data_with_well_values(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
用真实的井数据增强测试数据
|
||
|
||
Args:
|
||
data: 原始测试数据
|
||
|
||
Returns:
|
||
Dict[str, Any]: 增强后的测试数据
|
||
"""
|
||
if not isinstance(data, dict):
|
||
return data
|
||
|
||
enhanced_data = data.copy()
|
||
|
||
for field_name, value in data.items():
|
||
if self.is_well_related_field(field_name):
|
||
real_value = self.get_well_value_for_field(field_name)
|
||
if real_value is not None:
|
||
enhanced_data[field_name] = real_value
|
||
self.logger.debug(f"替换字段 '{field_name}' 的值: {value} -> {real_value}")
|
||
|
||
return enhanced_data
|
||
|
||
def get_well_data_summary(self) -> Dict[str, Any]:
|
||
"""
|
||
获取井数据的摘要信息
|
||
|
||
Returns:
|
||
Dict[str, Any]: 井数据摘要
|
||
"""
|
||
return {
|
||
"well_count": len(self.well_data),
|
||
"wellbore_count": len(self.wellbore_data),
|
||
"sample_well_ids": [w.get('wellId') for w in self.well_data[:5]] if self.well_data else [],
|
||
"sample_wellbore_ids": [w.get('wellboreId') for w in self.wellbore_data[:5]] if self.wellbore_data else [],
|
||
"sample_well_names": [w.get('wellCommonName') for w in self.well_data[:5]] if self.well_data else []
|
||
}
|
||
|
||
def clear_cache(self):
|
||
"""
|
||
清空缓存的井数据
|
||
"""
|
||
self.well_data.clear()
|
||
self.wellbore_data.clear()
|
||
self.logger.info("井数据缓存已清空")
|