This commit is contained in:
ruoyunbai 2025-09-29 10:29:44 +08:00
parent cd9f7847b2
commit 89b009dafb
3 changed files with 1207 additions and 1120 deletions

2
.gitignore vendored
View File

@ -8,6 +8,8 @@
build/
dist/
log*
docker/*
dms-compliance*
dms-compliance-compose*
# Python相关
__pycache__/

View File

@ -594,8 +594,12 @@ class InputParser:
api_list_data = response.json()
# 检查业务代码是否成功
if api_list_data.get("code") != 0:
self.logger.error(f"DMS API list endpoint returned a business error: {api_list_data.get('message')}")
list_code = api_list_data.get("code")
list_code_normalized = str(list_code).strip().lower() if list_code is not None else ""
if list_code_normalized not in {"0", "success", "ok", "200"}:
self.logger.error(
f"DMS API list endpoint returned a business error: {api_list_data.get('message')} (code={list_code})"
)
return None, {}
# 从分页结构中提取 'records'
@ -689,20 +693,95 @@ class InputParser:
self.logger.error(f"Failed to decode JSON for model '{name}'.")
continue
if not model_schema_response or 'data' not in model_schema_response or not model_schema_response['data']:
self.logger.warning(f"Skipping API '{name}' due to missing or empty model schema in response.")
detail_code = model_schema_response.get('code')
detail_code_normalized = str(detail_code).strip().lower() if detail_code is not None else ""
if detail_code is not None and detail_code_normalized not in {"0", "success", "ok", "200"}:
self.logger.warning(
f"Skipping API '{name}' due to business error when fetching schema: {model_schema_response.get('message')} (code={detail_code})"
)
continue
model_data = model_schema_response['data']
# Based on user feedback, model_data itself is the schema, not model_data['model']
model = model_data
if not model or 'properties' not in model or not model['properties']:
self.logger.warning(f"Skipping API '{name}' due to missing or invalid 'model' object in schema.")
raw_model_data = model_schema_response.get('data')
if not raw_model_data:
self.logger.warning(f"Skipping API '{name}' due to missing 'data' section in schema response.")
continue
model = None
identity_id_list = None
version = item.get('version', '1.0.0')
# 支持多种返回结构
if isinstance(raw_model_data, dict):
# 1) data.model 格式
candidate = raw_model_data.get('model')
# 2) data.schema 或其他命名
candidate = candidate or raw_model_data.get('schema')
# 3) data.records[0] 内嵌模型
if not candidate and 'records' in raw_model_data and isinstance(raw_model_data['records'], list) and raw_model_data['records']:
record = raw_model_data['records'][0]
if isinstance(record, dict):
candidate = record.get('model') or record.get('schema') or record
identity_id_list = identity_id_list or record.get('identityId')
version = record.get('version', version)
# 4) data 本身就是模型
candidate = candidate or raw_model_data
# 处理JSON字符串形式
if isinstance(candidate, str):
try:
candidate = json.loads(candidate)
except json.JSONDecodeError:
self.logger.warning(f"Schema for '{name}' is a string but not valid JSON; skipping this model.")
candidate = None
if isinstance(candidate, dict):
model = candidate
identity_id_list = identity_id_list or raw_model_data.get('identityId')
version = raw_model_data.get('version', version)
else:
model = None
elif isinstance(raw_model_data, str):
try:
model = json.loads(raw_model_data)
except json.JSONDecodeError:
self.logger.warning(f"Schema for '{name}' returned as string is not valid JSON; skipping this model.")
model = None
if not isinstance(model, dict):
self.logger.warning(f"Skipping API '{name}' because schema model could not be resolved to a dictionary.")
continue
# 新接口可能将实际模型嵌套在 model['model'] 中
if 'properties' not in model and isinstance(model.get('model'), dict):
inner_model = model.get('model')
if isinstance(inner_model, dict) and 'properties' in inner_model:
identity_id_list = identity_id_list or model.get('identityId') or inner_model.get('identityId')
version = model.get('version', version)
model = inner_model
# 某些接口可能将schema序列化为model['modelJson']之类的字段
if 'properties' not in model:
json_field = None
for key in ('modelJson', 'schemaJson', 'model_content', 'schema_content'):
if key in model:
json_field = model[key]
break
if json_field and isinstance(json_field, str):
try:
decoded_model = json.loads(json_field)
if isinstance(decoded_model, dict) and 'properties' in decoded_model:
model = decoded_model
except json.JSONDecodeError:
self.logger.debug(f"Field '{key}' for '{name}' is not valid JSON; ignoring.")
if not model or 'properties' not in model or not isinstance(model['properties'], dict) or not model['properties']:
self.logger.warning(f"Skipping API '{name}' due to missing or invalid 'properties' in resolved schema.")
continue
pk_name = None
# Find primary key by looking for top-level "identityId" array.
identity_id_list = model.get("identityId")
if identity_id_list is None:
identity_id_list = model.get("identityId")
if isinstance(identity_id_list, list) and len(identity_id_list) > 0:
candidate_pk_name = identity_id_list[0]
# 🔧 验证identityId指向的字段是否真的存在于properties中
@ -712,6 +791,12 @@ class InputParser:
else:
self.logger.warning(f"identityId property '{candidate_pk_name}' not found in model properties for '{name}'. Will use fallback.")
# 规范化主键ID列表
if isinstance(identity_id_list, str):
identity_id_list = [identity_id_list]
elif not isinstance(identity_id_list, list):
identity_id_list = []
# Fallback to original behavior if no identityId found or identityId field doesn't exist
if not pk_name:
pk_name = next(iter(model['properties']), None)
@ -729,7 +814,7 @@ class InputParser:
pk_schema = model['properties'][pk_name]
version = model_data.get('version', '1.0.0')
version = version or model.get('version', '1.0.0')
dms_instance_code = instance_code
category_name = domain_name

View File

@ -128,7 +128,7 @@ class TestConfig(BaseModel):
# 过滤选项
strictness_level: str = Field("CRITICAL", description="测试严格等级", pattern="^(CRITICAL|HIGH|MEDIUM|LOW)$")
ignore_ssl: bool = Field(True, description="是否忽略SSL证书错误", examples=[True, False])
@field_validator('base_url')
@classmethod
def validate_base_url(cls, v):