#!/usr/bin/env python3 """ 测试DMS服务连接 """ import requests import urllib3 import json import sys def test_connection(): """测试连接到DMS服务""" base_url = "https://www.dev.ideas.cnpc" api_url = f"{base_url}/api/schema/manage/schema" print("🔧 测试DMS服务连接") print("=" * 60) print(f"目标URL: {api_url}") print() # 禁用SSL警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 测试1: 忽略SSL证书 print("📡 测试1: 忽略SSL证书验证") try: response = requests.get(api_url, verify=False, timeout=30) print(f"✅ 连接成功!") print(f"状态码: {response.status_code}") print(f"响应头: {dict(response.headers)}") if response.status_code == 200: try: data = response.json() print(f"响应数据类型: {type(data)}") if isinstance(data, dict): print(f"响应键: {list(data.keys())}") if 'code' in data: print(f"业务代码: {data.get('code')}") if 'message' in data: print(f"消息: {data.get('message')}") print("✅ JSON解析成功") except json.JSONDecodeError as e: print(f"⚠️ JSON解析失败: {e}") print(f"响应内容前500字符: {response.text[:500]}") else: print(f"⚠️ HTTP状态码异常: {response.status_code}") print(f"响应内容: {response.text[:500]}") except requests.exceptions.SSLError as e: print(f"❌ SSL错误: {e}") return False except requests.exceptions.ConnectionError as e: print(f"❌ 连接错误: {e}") return False except requests.exceptions.Timeout as e: print(f"❌ 超时错误: {e}") return False except Exception as e: print(f"❌ 其他错误: {e}") return False print() # 测试2: 启用SSL证书验证 print("📡 测试2: 启用SSL证书验证") try: response = requests.get(api_url, verify=True, timeout=30) print(f"✅ SSL验证通过!") print(f"状态码: {response.status_code}") except requests.exceptions.SSLError as e: print(f"❌ SSL验证失败(预期): {e}") print("这证明SSL忽略功能是必要的") except Exception as e: print(f"❌ 其他错误: {e}") print() # 测试3: 测试基础连接 print("📡 测试3: 测试基础HTTP连接") try: # 尝试连接到根路径 root_url = base_url response = requests.get(root_url, verify=False, timeout=10) print(f"根路径连接: {response.status_code}") except Exception as e: print(f"根路径连接失败: {e}") return True def test_domain_mapping(): """测试域映射文件""" print("📁 测试域映射文件") print("=" * 60) domain_file = "./assets/doc/dms/domain.json" try: with open(domain_file, 'r', encoding='utf-8') as f: domain_data = json.load(f) print(f"✅ 域映射文件读取成功") print(f"文件路径: {domain_file}") print(f"域映射数据: {domain_data}") return True except FileNotFoundError: print(f"❌ 域映射文件不存在: {domain_file}") return False except json.JSONDecodeError as e: print(f"❌ 域映射文件JSON格式错误: {e}") return False except Exception as e: print(f"❌ 读取域映射文件出错: {e}") return False def main(): """主函数""" print("🧪 DMS服务连接测试") print("=" * 80) success = True # 测试域映射文件 if not test_domain_mapping(): success = False print() # 测试连接 if not test_connection(): success = False print("=" * 80) if success: print("🎉 连接测试完成") print("\n💡 建议:") print("- 如果SSL验证失败但忽略SSL成功,使用 --ignore-ssl 参数") print("- 如果连接完全失败,检查网络和防火墙设置") print("- 如果JSON解析失败,检查API端点是否正确") else: print("❌ 连接测试失败") print("\n🔧 故障排除:") print("1. 检查网络连接") print("2. 检查防火墙设置") print("3. 确认服务器地址正确") print("4. 检查域映射文件是否存在") return success if __name__ == "__main__": if main(): sys.exit(0) else: sys.exit(1)