diff --git a/ddms_compliance_suite/input_parser/parser.py b/ddms_compliance_suite/input_parser/parser.py index 8690bef..feeb120 100644 --- a/ddms_compliance_suite/input_parser/parser.py +++ b/ddms_compliance_suite/input_parser/parser.py @@ -457,6 +457,94 @@ class InputParser: def __init__(self): self.logger = logging.getLogger(__name__) + def _decode_schema_string(self, raw_string: str, api_name: str) -> Optional[Any]: + """尽量将字符串形式的模型解析为 Python 对象,并记录失败原因。""" + if raw_string is None: + return None + + candidate_str = str(raw_string).strip() + if not candidate_str: + self.logger.warning(f"Schema for '{api_name}' 是空字符串,无法解析。") + return None + + parse_attempts = [] + + def log_attempt(stage: str, exc: Exception): + parse_attempts.append((stage, exc)) + + # --- Attempt 1: 标准 JSON --- + try: + return json.loads(candidate_str) + except json.JSONDecodeError as exc: + log_attempt("json.loads", exc) + + # --- Attempt 2: 处理被整体转义或包裹的 JSON 字符串 --- + try: + unescaped_candidate = json.loads(candidate_str.replace('\\"', '"')) + if isinstance(unescaped_candidate, (dict, list)): + return unescaped_candidate + if isinstance(unescaped_candidate, str): + try: + return json.loads(unescaped_candidate) + except json.JSONDecodeError as exc_nested: + log_attempt("json.loads -> nested", exc_nested) + except Exception as exc: + log_attempt("json.loads after unescape", exc) + + # --- Attempt 3: unicode 转义还原后再尝试 JSON --- + try: + unicode_decoded = candidate_str.encode('utf-8').decode('unicode_escape') + if unicode_decoded != candidate_str: + return json.loads(unicode_decoded) + except Exception as exc: + log_attempt("json.loads after unicode_escape", exc) + + # --- Attempt 4: ast.literal_eval (需要替换关键字) --- + normalized_literal = candidate_str + replacements_for_ast = [ + (r"(? Optional[Dict[str, Any]]: + """确保候选模型以字典形式返回。""" + if isinstance(candidate, dict): + return candidate + if isinstance(candidate, str): + decoded = self._decode_schema_string(candidate, api_name) + if isinstance(decoded, dict): + return decoded + return None + def parse_yapi_spec(self, file_path: str) -> Optional[ParsedYAPISpec]: self.logger.info(f"Parsing YAPI spec from: {file_path}") all_endpoints: List[YAPIEndpoint] = [] @@ -714,43 +802,22 @@ class InputParser: # 支持多种返回结构 if isinstance(raw_model_data, dict): - # 1) data.model 格式 - candidate = raw_model_data.get('model') - # 2) data.schema 或其他命名 - candidate = candidate or raw_model_data.get('schema') - # 3) data.records[0] 内嵌模型 + candidate = self._normalize_model_candidate(raw_model_data.get('model'), name) + + if not candidate: + schema_field = raw_model_data.get('schema') + candidate = self._normalize_model_candidate(schema_field, name) + if not candidate and 'records' in raw_model_data and isinstance(raw_model_data['records'], list) and raw_model_data['records']: record = raw_model_data['records'][0] if isinstance(record, dict): - candidate = record.get('model') or record.get('schema') or record + record_candidate = record.get('model') or record.get('schema') or record + candidate = self._normalize_model_candidate(record_candidate, name) identity_id_list = identity_id_list or record.get('identityId') version = record.get('version', version) - # 4) data 本身就是模型 - candidate = candidate or raw_model_data - # 处理JSON字符串形式 - if isinstance(candidate, str): - candidate_str = candidate.strip() - if candidate_str: - try: - candidate = json.loads(candidate_str) - except json.JSONDecodeError: - fallback_literal = candidate_str - fallback_literal = re.sub(r'\bnull\b', 'None', fallback_literal, flags=re.IGNORECASE) - fallback_literal = re.sub(r'\btrue\b', 'True', fallback_literal, flags=re.IGNORECASE) - fallback_literal = re.sub(r'\bfalse\b', 'False', fallback_literal, flags=re.IGNORECASE) - try: - candidate = ast.literal_eval(fallback_literal) - self.logger.info( - f"Schema for '{name}' parsed using ast.literal_eval fallback after JSON decode failure." - ) - except (ValueError, SyntaxError): - self.logger.warning( - f"Schema for '{name}' is a string but not valid JSON; skipping this model." - ) - candidate = None - else: - candidate = None + if not candidate: + candidate = self._normalize_model_candidate(raw_model_data, name) if isinstance(candidate, dict): model = candidate @@ -759,27 +826,7 @@ class InputParser: else: model = None elif isinstance(raw_model_data, str): - raw_model_str = raw_model_data.strip() - if raw_model_str: - try: - model = json.loads(raw_model_str) - except json.JSONDecodeError: - fallback_literal = raw_model_str - fallback_literal = re.sub(r'\bnull\b', 'None', fallback_literal, flags=re.IGNORECASE) - fallback_literal = re.sub(r'\btrue\b', 'True', fallback_literal, flags=re.IGNORECASE) - fallback_literal = re.sub(r'\bfalse\b', 'False', fallback_literal, flags=re.IGNORECASE) - try: - model = ast.literal_eval(fallback_literal) - self.logger.info( - f"Schema for '{name}' string parsed using ast.literal_eval fallback after JSON decode failure." - ) - except (ValueError, SyntaxError): - self.logger.warning( - f"Schema for '{name}' returned as string is not valid JSON; skipping this model." - ) - model = None - else: - model = None + model = self._decode_schema_string(raw_model_data, name) if not isinstance(model, dict): self.logger.warning(f"Skipping API '{name}' because schema model could not be resolved to a dictionary.")