#!/usr/bin/env python3 """ 测试多主键删除功能 """ import sys import json from unittest.mock import Mock from custom_stages.dms_crud_scenario_stage import DmsCrudScenarioStage from ddms_compliance_suite.input_parser.parser import DMSEndpoint def test_single_key_delete(): """测试单主键删除(传统方式)""" print("🧪 测试单主键删除") print("=" * 60) # 模拟单主键删除的schema delete_schema = { "type": "object", "properties": { "data": { "type": "array", "items": { "type": "string" # 简单的字符串数组 } } } } # 创建模拟的删除端点 mock_delete_endpoint = Mock(spec=DMSEndpoint) mock_delete_endpoint.request_body = { "content": { "application/json": { "schema": delete_schema } } } # 创建模拟的scenario scenario = { "delete": mock_delete_endpoint } # 创建CRUD Stage实例 crud_stage = DmsCrudScenarioStage( api_group_metadata={"name": "测试"}, apis_in_group=[], global_api_spec=Mock() ) # 测试构建删除请求体 create_payload = {"siteId": "test_site_001", "siteName": "测试工区"} delete_body = crud_stage._build_delete_request_body( scenario, "siteId", "test_site_001", create_payload ) print(f"单主键删除请求体: {json.dumps(delete_body, indent=2, ensure_ascii=False)}") # 验证结果 expected_structure = {"data": ["test_site_001"]} if delete_body == expected_structure: print("✅ 单主键删除请求体格式正确") return True else: print(f"❌ 单主键删除请求体格式错误,期望: {expected_structure}") return False def test_multi_key_delete(): """测试多主键删除(对象列表)""" print("\n🧪 测试多主键删除") print("=" * 60) # 模拟多主键删除的schema delete_schema = { "type": "object", "properties": { "version": {"type": "string"}, "data": { "type": "array", "items": { "type": "object", "properties": { "projectId": {"type": "string", "title": "项目ID"}, "surveyId": {"type": "string", "title": "工区ID"} }, "required": ["projectId", "surveyId"] } } } } # 创建模拟的删除端点 mock_delete_endpoint = Mock(spec=DMSEndpoint) mock_delete_endpoint.request_body = { "content": { "application/json": { "schema": delete_schema } } } # 创建模拟的scenario scenario = { "delete": mock_delete_endpoint } # 创建CRUD Stage实例 crud_stage = DmsCrudScenarioStage( api_group_metadata={"name": "测试"}, apis_in_group=[], global_api_spec=Mock() ) # 测试构建删除请求体 create_payload = { "projectId": "项目1_ID", "surveyId": "工区1_ID", "projectName": "测试项目", "surveyName": "测试工区" } delete_body = crud_stage._build_delete_request_body( scenario, "projectId", "项目1_ID", create_payload ) print(f"多主键删除请求体: {json.dumps(delete_body, indent=2, ensure_ascii=False)}") # 验证结果 if isinstance(delete_body, dict) and "data" in delete_body: data_array = delete_body["data"] if isinstance(data_array, list) and len(data_array) > 0: first_item = data_array[0] # 检查第一个对象是否包含正确的主键 if (isinstance(first_item, dict) and first_item.get("projectId") == "项目1_ID" and first_item.get("surveyId") == "工区1_ID"): print("✅ 多主键删除请求体格式正确") print(f"✅ 包含主键: projectId={first_item['projectId']}, surveyId={first_item['surveyId']}") # 检查是否有版本号 if delete_body.get("version"): print(f"✅ 包含版本号: {delete_body['version']}") # 检查是否支持批量删除 if len(data_array) > 1: print(f"✅ 支持批量删除,共{len(data_array)}个对象") return True else: print(f"❌ 主键字段不正确: {first_item}") return False else: print(f"❌ data数组格式错误: {data_array}") return False else: print(f"❌ 删除请求体格式错误: {delete_body}") return False def test_missing_key_generation(): """测试缺失主键的默认值生成""" print("\n🧪 测试缺失主键的默认值生成") print("=" * 60) # 模拟删除schema包含创建负载中没有的字段 delete_schema = { "type": "object", "properties": { "data": { "type": "array", "items": { "type": "object", "properties": { "siteId": {"type": "string"}, "regionId": {"type": "string"}, # 创建负载中没有这个字段 "version": {"type": "number"} # 创建负载中也没有这个字段 }, "required": ["siteId", "regionId", "version"] } } } } mock_delete_endpoint = Mock(spec=DMSEndpoint) mock_delete_endpoint.request_body = { "content": { "application/json": { "schema": delete_schema } } } scenario = {"delete": mock_delete_endpoint} crud_stage = DmsCrudScenarioStage( api_group_metadata={"name": "测试"}, apis_in_group=[], global_api_spec=Mock() ) # 创建负载只包含部分字段 create_payload = {"siteId": "test_site_001", "siteName": "测试工区"} delete_body = crud_stage._build_delete_request_body( scenario, "siteId", "test_site_001", create_payload ) print(f"缺失字段生成测试: {json.dumps(delete_body, indent=2, ensure_ascii=False)}") if isinstance(delete_body, dict) and "data" in delete_body: data_array = delete_body["data"] if isinstance(data_array, list) and len(data_array) > 0: first_item = data_array[0] # 检查是否包含所有必需字段 required_fields = ["siteId", "regionId", "version"] missing_fields = [field for field in required_fields if field not in first_item] if not missing_fields: print("✅ 成功生成所有缺失的必需字段") print(f"✅ siteId: {first_item['siteId']}") print(f"✅ regionId: {first_item['regionId']} (自动生成)") print(f"✅ version: {first_item['version']} (自动生成)") return True else: print(f"❌ 缺失必需字段: {missing_fields}") return False else: print(f"❌ data数组格式错误: {data_array}") return False else: print(f"❌ 删除请求体格式错误: {delete_body}") return False def test_fallback_scenarios(): """测试各种回退场景""" print("\n🧪 测试回退场景") print("=" * 60) crud_stage = DmsCrudScenarioStage( api_group_metadata={"name": "测试"}, apis_in_group=[], global_api_spec=Mock() ) create_payload = {"siteId": "test_site_001"} # 测试1: 没有删除端点 scenario_no_delete = {} delete_body1 = crud_stage._build_delete_request_body( scenario_no_delete, "siteId", "test_site_001", create_payload ) print(f"无删除端点回退: {delete_body1}") # 测试2: 删除端点没有请求体 mock_delete_no_body = Mock(spec=DMSEndpoint) mock_delete_no_body.request_body = None scenario_no_body = {"delete": mock_delete_no_body} delete_body2 = crud_stage._build_delete_request_body( scenario_no_body, "siteId", "test_site_001", create_payload ) print(f"无请求体回退: {delete_body2}") # 验证回退结果 expected_fallback = {"data": ["test_site_001"]} if delete_body1 == expected_fallback and delete_body2 == expected_fallback: print("✅ 回退场景处理正确") return True else: print("❌ 回退场景处理错误") return False def main(): """主函数""" print("🚀 多主键删除功能测试") print("=" * 80) success_count = 0 total_tests = 4 # 测试1: 单主键删除 if test_single_key_delete(): success_count += 1 # 测试2: 多主键删除 if test_multi_key_delete(): success_count += 1 # 测试3: 缺失字段生成 if test_missing_key_generation(): success_count += 1 # 测试4: 回退场景 if test_fallback_scenarios(): success_count += 1 # 总结 print("\n" + "=" * 80) print("📋 测试总结") print("=" * 80) print(f"通过测试: {success_count}/{total_tests}") if success_count == total_tests: print("🎉 多主键删除功能测试通过!") print("\n✅ 实现的功能:") print("- 自动检测删除操作的schema结构") print("- 支持单主键的字符串数组格式") print("- 支持多主键的对象列表格式") print("- 自动从创建负载中提取相关主键") print("- 为缺失的必需字段生成默认值") print("- 支持批量删除(生成多个对象)") print("- 优雅的回退到简单格式") print("\n💡 支持的删除格式:") print("1. 简单主键: {\"data\": [\"key1\", \"key2\"]}") print("2. 多主键对象: {\"version\": \"1.0.0\", \"data\": [{\"projectId\": \"项目1\", \"surveyId\": \"工区1\"}]}") sys.exit(0) else: print("❌ 部分测试失败") sys.exit(1) if __name__ == "__main__": main()