This commit is contained in:
ruoyunbai 2025-09-29 14:32:09 +08:00
parent 19885f5e0b
commit a242a00e48

View File

@ -457,6 +457,94 @@ class InputParser:
def __init__(self):
self.logger = logging.getLogger(__name__)
def _decode_schema_string(self, raw_string: str, api_name: str) -> Optional[Any]:
"""尽量将字符串形式的模型解析为 Python 对象,并记录失败原因。"""
if raw_string is None:
return None
candidate_str = str(raw_string).strip()
if not candidate_str:
self.logger.warning(f"Schema for '{api_name}' 是空字符串,无法解析。")
return None
parse_attempts = []
def log_attempt(stage: str, exc: Exception):
parse_attempts.append((stage, exc))
# --- Attempt 1: 标准 JSON ---
try:
return json.loads(candidate_str)
except json.JSONDecodeError as exc:
log_attempt("json.loads", exc)
# --- Attempt 2: 处理被整体转义或包裹的 JSON 字符串 ---
try:
unescaped_candidate = json.loads(candidate_str.replace('\\"', '"'))
if isinstance(unescaped_candidate, (dict, list)):
return unescaped_candidate
if isinstance(unescaped_candidate, str):
try:
return json.loads(unescaped_candidate)
except json.JSONDecodeError as exc_nested:
log_attempt("json.loads -> nested", exc_nested)
except Exception as exc:
log_attempt("json.loads after unescape", exc)
# --- Attempt 3: unicode 转义还原后再尝试 JSON ---
try:
unicode_decoded = candidate_str.encode('utf-8').decode('unicode_escape')
if unicode_decoded != candidate_str:
return json.loads(unicode_decoded)
except Exception as exc:
log_attempt("json.loads after unicode_escape", exc)
# --- Attempt 4: ast.literal_eval (需要替换关键字) ---
normalized_literal = candidate_str
replacements_for_ast = [
(r"(?<![A-Za-z0-9_])NaN(?![A-Za-z0-9_])", "None"),
(r"(?<![A-Za-z0-9_])-?Infinity(?![A-Za-z0-9_])", "None"),
(r"(?<![A-Za-z0-9_])null(?![A-Za-z0-9_])", "None"),
(r"(?<![A-Za-z0-9_])true(?![A-Za-z0-9_])", "True"),
(r"(?<![A-Za-z0-9_])false(?![A-Za-z0-9_])", "False"),
]
for pattern, replacement in replacements_for_ast:
normalized_literal = re.sub(pattern, replacement, normalized_literal, flags=re.IGNORECASE)
try:
return ast.literal_eval(normalized_literal)
except (ValueError, SyntaxError) as exc:
log_attempt("ast.literal_eval", exc)
# --- Attempt 5: 单引号 JSON 简单归一化后再次尝试 ---
if candidate_str.count("'") and candidate_str.count('"') == 0:
approx_json = candidate_str.replace("'", '"')
approx_json = re.sub(r"(?<![A-Za-z0-9_])None(?![A-Za-z0-9_])", "null", approx_json)
approx_json = re.sub(r"(?<![A-Za-z0-9_])True(?![A-Za-z0-9_])", "true", approx_json)
approx_json = re.sub(r"(?<![A-Za-z0-9_])False(?![A-Za-z0-9_])", "false", approx_json)
try:
return json.loads(approx_json)
except json.JSONDecodeError as exc:
log_attempt("json.loads single-quote normalized", exc)
snippet = candidate_str[:500]
self.logger.warning(
f"Schema for '{api_name}' 仍无法解析,已尝试多种策略。示例片段: {snippet}"
)
for stage, exc in parse_attempts:
self.logger.debug(f"解析失败阶段 [{stage}]: {exc}")
return None
def _normalize_model_candidate(self, candidate: Any, api_name: str) -> Optional[Dict[str, Any]]:
"""确保候选模型以字典形式返回。"""
if isinstance(candidate, dict):
return candidate
if isinstance(candidate, str):
decoded = self._decode_schema_string(candidate, api_name)
if isinstance(decoded, dict):
return decoded
return None
def parse_yapi_spec(self, file_path: str) -> Optional[ParsedYAPISpec]:
self.logger.info(f"Parsing YAPI spec from: {file_path}")
all_endpoints: List[YAPIEndpoint] = []
@ -714,43 +802,22 @@ class InputParser:
# 支持多种返回结构
if isinstance(raw_model_data, dict):
# 1) data.model 格式
candidate = raw_model_data.get('model')
# 2) data.schema 或其他命名
candidate = candidate or raw_model_data.get('schema')
# 3) data.records[0] 内嵌模型
candidate = self._normalize_model_candidate(raw_model_data.get('model'), name)
if not candidate:
schema_field = raw_model_data.get('schema')
candidate = self._normalize_model_candidate(schema_field, name)
if not candidate and 'records' in raw_model_data and isinstance(raw_model_data['records'], list) and raw_model_data['records']:
record = raw_model_data['records'][0]
if isinstance(record, dict):
candidate = record.get('model') or record.get('schema') or record
record_candidate = record.get('model') or record.get('schema') or record
candidate = self._normalize_model_candidate(record_candidate, name)
identity_id_list = identity_id_list or record.get('identityId')
version = record.get('version', version)
# 4) data 本身就是模型
candidate = candidate or raw_model_data
# 处理JSON字符串形式
if isinstance(candidate, str):
candidate_str = candidate.strip()
if candidate_str:
try:
candidate = json.loads(candidate_str)
except json.JSONDecodeError:
fallback_literal = candidate_str
fallback_literal = re.sub(r'\bnull\b', 'None', fallback_literal, flags=re.IGNORECASE)
fallback_literal = re.sub(r'\btrue\b', 'True', fallback_literal, flags=re.IGNORECASE)
fallback_literal = re.sub(r'\bfalse\b', 'False', fallback_literal, flags=re.IGNORECASE)
try:
candidate = ast.literal_eval(fallback_literal)
self.logger.info(
f"Schema for '{name}' parsed using ast.literal_eval fallback after JSON decode failure."
)
except (ValueError, SyntaxError):
self.logger.warning(
f"Schema for '{name}' is a string but not valid JSON; skipping this model."
)
candidate = None
else:
candidate = None
if not candidate:
candidate = self._normalize_model_candidate(raw_model_data, name)
if isinstance(candidate, dict):
model = candidate
@ -759,27 +826,7 @@ class InputParser:
else:
model = None
elif isinstance(raw_model_data, str):
raw_model_str = raw_model_data.strip()
if raw_model_str:
try:
model = json.loads(raw_model_str)
except json.JSONDecodeError:
fallback_literal = raw_model_str
fallback_literal = re.sub(r'\bnull\b', 'None', fallback_literal, flags=re.IGNORECASE)
fallback_literal = re.sub(r'\btrue\b', 'True', fallback_literal, flags=re.IGNORECASE)
fallback_literal = re.sub(r'\bfalse\b', 'False', fallback_literal, flags=re.IGNORECASE)
try:
model = ast.literal_eval(fallback_literal)
self.logger.info(
f"Schema for '{name}' string parsed using ast.literal_eval fallback after JSON decode failure."
)
except (ValueError, SyntaxError):
self.logger.warning(
f"Schema for '{name}' returned as string is not valid JSON; skipping this model."
)
model = None
else:
model = None
model = self._decode_schema_string(raw_model_data, name)
if not isinstance(model, dict):
self.logger.warning(f"Skipping API '{name}' because schema model could not be resolved to a dictionary.")