gongwenxin fa343eb111 .
2025-08-07 15:07:38 +08:00

822 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import copy
from typing import Dict, List, Any, Optional, Union, Tuple
import re
# 获取模块级别的 logger
logger = logging.getLogger(__name__)
def resolve_json_schema_references(
schema_to_resolve: Any,
full_api_spec: Dict[str, Any],
max_depth: int = 10,
current_depth: int = 0,
discard_refs: bool = True # 新增参数,默认为 True
) -> Any:
"""
递归解析JSON Schema中的$ref引用。
Args:
schema_to_resolve: 当前需要解析的schema部分 (可以是字典、列表或基本类型)。
full_api_spec: 完整的API规范字典用于查找$ref路径。
max_depth: 最大递归深度,防止无限循环。
current_depth: 当前递归深度。
discard_refs: 是否在解析前移除 $ref 和 $$ prefixed 键。
Returns:
解析了$ref的schema部分。
"""
if current_depth > max_depth:
logger.warning(f"达到最大$ref解析深度 ({max_depth}),可能存在循环引用。停止进一步解析。 Schema: {str(schema_to_resolve)[:200]}")
return schema_to_resolve
if isinstance(schema_to_resolve, dict):
current_dict_processing = dict(schema_to_resolve) # 操作副本
if discard_refs:
# 模式1: 丢弃 $ref 和 $$ 开头的键, 然后递归处理剩余值
ref_value = current_dict_processing.pop("$ref", None)
if ref_value is not None:
logger.debug(f"因 discard_refs=True丢弃 '$ref': {ref_value}")
keys_to_remove = [k for k in current_dict_processing if k.startswith("$$")]
for key_to_remove in keys_to_remove:
key_val = current_dict_processing.pop(key_to_remove, None)
logger.debug(f"因 discard_refs=True丢弃 '{key_to_remove}': {key_val}")
# current_dict_processing 已清理完毕,递归处理其值
resolved_children = {}
for key, value in current_dict_processing.items():
resolved_children[key] = resolve_json_schema_references(
value, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs
)
return resolved_children
else:
# 模式2: 尝试解析 $ref (如果存在), 然后递归。$$ 开头的键会保留并递归处理。
if "$ref" in current_dict_processing:
ref_path = current_dict_processing["$ref"]
if not isinstance(ref_path, str) or not ref_path.startswith("#/"):
logger.warning(f"不支持的$ref格式或外部引用: {ref_path}。在非丢弃模式下,$ref将作为普通键值对处理。")
# 继续执行后续的常规递归,$ref 将作为 current_dict_processing 中的一个键
else:
path_parts = ref_path[2:].split('/')
target_component_root = full_api_spec
current_target_component = target_component_root
valid_path = True
try:
for part in path_parts:
if isinstance(current_target_component, list):
try:
part_idx = int(part)
current_target_component = current_target_component[part_idx]
except (ValueError, IndexError):
logger.error(f"路径部分 '{part}' (应为整数索引) 无效或越界于列表。路径: {ref_path}")
valid_path = False
break
elif isinstance(current_target_component, dict):
if part not in current_target_component:
logger.error(f"路径部分 '{part}' 在对象中未找到。路径: {ref_path}. 可用键: {list(current_target_component.keys())}")
valid_path = False
break
current_target_component = current_target_component[part]
else:
logger.error(f"尝试在非字典/列表类型 ({type(current_target_component)}) 中访问路径部分 '{part}'。路径: {ref_path}")
valid_path = False
break
if valid_path:
# $ref 解析成功
final_schema_after_ref_resolution = copy.deepcopy(current_target_component)
# 如果解析结果是字典,则将原始 $ref 位置的同级键合并(覆盖)进去
if isinstance(final_schema_after_ref_resolution, dict):
for key, value in current_dict_processing.items():
if key != "$ref": # 合并同级键
final_schema_after_ref_resolution[key] = value
# 如果解析结果不是字典(例如,一个数组或原始类型),则同级键实际上被丢弃,
# 因为返回的是 final_schema_after_ref_resolution 本身。这是 $ref 的标准行为之一。
logger.debug(f"成功解析 $ref: '{ref_path}'。将递归解析其内容(可能已与同级键合并)。")
return resolve_json_schema_references(
final_schema_after_ref_resolution, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs
)
except Exception as e:
logger.error(f"解析$ref '{ref_path}' 时发生意外错误: {e}. 将尝试使用同级节点。", exc_info=False)
valid_path = False
# 如果 $ref 解析失败 (valid_path is False 或 try 块中出现异常)
if not valid_path:
logger.warning(f"$ref '{ref_path}' 解析失败。将移除 $ref 并处理该对象的其余部分。")
current_dict_processing.pop("$ref", None)
# 继续执行后续的常规递归,此时 current_dict_processing 已移除了失败的 $ref
# 常规递归 (模式2: 非丢弃模式 / $ref 已处理或移除)
resolved_children = {}
for key, value in current_dict_processing.items():
resolved_children[key] = resolve_json_schema_references(
value, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs
)
return resolved_children
elif isinstance(schema_to_resolve, list):
return [resolve_json_schema_references(item, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs) for item in schema_to_resolve]
else: # 原始类型
return schema_to_resolve
def util_find_removable_field_path_recursive(
current_schema: Dict[str, Any],
current_path: List[Union[str, int]], # Union added here
full_api_spec_for_refs: Dict[str, Any],
# logger_param: Optional[logging.Logger] = None # Option to pass logger
) -> Optional[List[Union[str, int]]]:
"""
(框架辅助方法) 递归查找第一个可移除的必填字段的路径。
"""
# effective_logger = logger_param or logger # Use passed logger or module logger
resolved_schema = resolve_json_schema_references(current_schema, full_api_spec_for_refs)
if not isinstance(resolved_schema, dict) or resolved_schema.get("type") != "object":
return None
required_fields_at_current_level = resolved_schema.get("required", [])
properties = resolved_schema.get("properties", {})
logger.debug(f"[Util] 递归查找路径: {current_path}, 当前层级必填字段: {required_fields_at_current_level}, 属性: {list(properties.keys())}")
if required_fields_at_current_level and properties:
for field_name in required_fields_at_current_level:
if field_name in properties:
logger.info(f"[Util] 策略1: 在路径 {'.'.join(map(str,current_path)) if current_path else 'root'} 找到可直接移除的必填字段: '{field_name}'")
return current_path + [field_name]
if properties:
for prop_name, prop_schema_orig in properties.items():
prop_schema = resolve_json_schema_references(prop_schema_orig, full_api_spec_for_refs)
if isinstance(prop_schema, dict) and prop_schema.get("type") == "array":
items_schema_orig = prop_schema.get("items")
if isinstance(items_schema_orig, dict):
items_schema = resolve_json_schema_references(items_schema_orig, full_api_spec_for_refs)
if isinstance(items_schema, dict) and items_schema.get("type") == "object":
item_required_fields = items_schema.get("required", [])
item_properties = items_schema.get("properties", {})
if item_required_fields and item_properties:
first_required_field_in_item = next((rf for rf in item_required_fields if rf in item_properties), None)
if first_required_field_in_item:
logger.info(f"[Util] 策略2: 在数组属性 '{prop_name}' (路径 {'.'.join(map(str,current_path)) if current_path else 'root'}) 的元素内找到必填字段: '{first_required_field_in_item}'. 路径: {current_path + [prop_name, 0, first_required_field_in_item]}")
return current_path + [prop_name, 0, first_required_field_in_item]
logger.debug(f"[Util] 在路径 {'.'.join(map(str,current_path)) if current_path else 'root'} 未通过任何策略找到可移除的必填字段。")
return None
def util_remove_value_at_path(
data_container: Any,
path: List[Union[str, int]],
# logger_param: Optional[logging.Logger] = None
) -> Tuple[Any, Any, bool]:
"""
(框架辅助方法) 从嵌套的字典/列表中移除指定路径的值。
返回 (修改后的容器, 被移除的值, 是否成功)。
"""
# effective_logger = logger_param or logger
if not path:
logger.error("[Util] util_remove_value_at_path: 路径不能为空。")
return data_container, None, False
if data_container is None:
if isinstance(path[0], str):
container_copy = {}
elif isinstance(path[0], int):
container_copy = []
else:
logger.error(f"[Util] util_remove_value_at_path: 路径的第一个元素 '{path[0]}' 类型未知。")
return data_container, None, False
else:
container_copy = copy.deepcopy(data_container)
current_level = container_copy
original_value = None
try:
for i, key_or_index in enumerate(path):
is_last_element = (i == len(path) - 1)
if is_last_element:
if isinstance(key_or_index, str):
if isinstance(current_level, dict) and key_or_index in current_level:
original_value = current_level.pop(key_or_index)
logger.info(f"[Util] 从路径 '{'.'.join(map(str,path))}' 成功移除字段 '{key_or_index}' (原值: '{original_value}')。")
return container_copy, original_value, True
elif isinstance(current_level, dict):
logger.warning(f"[Util] 路径的最后一部分 '{key_or_index}' (string key) 在对象中未找到。路径: {'.'.join(map(str,path))}")
return container_copy, None, False
else:
logger.error(f"[Util] 路径的最后一部分 '{key_or_index}' (string key) 期望父级是字典,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}")
return data_container, None, False
else:
if isinstance(current_level, list) and isinstance(key_or_index, int) and 0 <= key_or_index < len(current_level):
original_value = current_level.pop(key_or_index)
logger.info(f"[Util] 从路径 '{'.'.join(map(str,path))}' 成功移除索引 '{key_or_index}' 的元素 (原值: '{original_value}')。")
return container_copy, original_value, True
elif isinstance(current_level, list):
logger.warning(f"[Util] 路径的最后一部分索引 '{key_or_index}' 超出列表范围或类型不符。列表长度: {len(current_level)}. 路径: {'.'.join(map(str,path))}")
return container_copy, None, False
else:
logger.error(f"[Util] 路径的最后一部分 '{key_or_index}' 期望父级是列表,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}")
return data_container, None, False
else:
next_key_or_index = path[i+1]
if isinstance(key_or_index, str):
if not isinstance(current_level, dict):
if current_level is container_copy and not container_copy :
current_level = {}
if i == 0: container_copy = current_level
else:
logger.error(f"[Util] 无法在非根级别从非字典创建路径。")
return data_container, None, False
else:
logger.error(f"[Util] 路径期望字典,但在 '{key_or_index}' 处找到 {type(current_level)}")
return data_container, None, False
if isinstance(next_key_or_index, int):
if key_or_index not in current_level or not isinstance(current_level.get(key_or_index), list):
current_level[key_or_index] = []
current_level = current_level[key_or_index]
else:
if key_or_index not in current_level or not isinstance(current_level.get(key_or_index), dict):
current_level[key_or_index] = {}
current_level = current_level[key_or_index]
elif isinstance(key_or_index, int):
if not isinstance(current_level, list):
logger.error(f"[Util] 路径期望列表以应用索引 '{key_or_index}',但找到 {type(current_level)}")
return data_container, None, False
while len(current_level) <= key_or_index:
if isinstance(next_key_or_index, str):
current_level.append({})
else:
current_level.append([])
if isinstance(next_key_or_index, str):
if not isinstance(current_level[key_or_index], dict):
current_level[key_or_index] = {}
elif isinstance(next_key_or_index, int):
if not isinstance(current_level[key_or_index], list):
current_level[key_or_index] = []
current_level = current_level[key_or_index]
else:
logger.error(f"[Util] 路径部分 '{key_or_index}' 类型未知 ({type(key_or_index)}).")
return data_container, None, False
except Exception as e:
logger.error(f"[Util] 在准备移除字段路径 '{'.'.join(map(str,path))}' 时发生错误: {e}", exc_info=True)
return data_container, None, False
logger.error(f"[Util] util_remove_value_at_path 未能在循环内按预期返回。路径: {'.'.join(map(str,path))}")
return data_container, None, False
def util_set_value_at_path(
data_container: Any,
path: List[Union[str, int]],
new_value: Any,
# logger_param: Optional[logging.Logger] = None
) -> Tuple[Any, bool]:
"""
(框架辅助方法) 在嵌套的字典/列表中为指定路径设置新值。
如果路径中的某些部分不存在,会尝试创建它们 (字典会创建,列表会尝试填充到指定索引,但需谨慎)。
返回 (修改后的容器, 是否成功)。
"""
# effective_logger = logger_param or logger
if not path:
logger.error("[Util] util_set_value_at_path: 路径不能为空。")
# 如果路径为空,是否应该用 new_value 替换整个 data_container
# 当前行为:返回原始容器和失败状态,因为路径通常指向容器内部。
# 如果要支持替换整个容器,需要明确此行为。
if data_container is None and new_value is not None: # 特殊情况如果原始容器是None且路径为空则新值成为容器
logger.info("[Util] util_set_value_at_path: 路径为空原始容器为None新值将作为新容器返回。")
return new_value, True
elif data_container is not None and new_value is None and not path: # 路径为空新值为None则清空容器
logger.info("[Util] util_set_value_at_path: 路径为空新值为None容器将被清空 (返回None)。")
return None, True
# 对于路径为空且两者都不是None的情况目前返回失败因为通常期望有路径。
# 或者可以考虑直接返回 new_value意味着整个对象被替换。
# logger.info(f"[Util] util_set_value_at_path: Path is empty. Replacing entire container.")
# return new_value, True # 备选行为:替换整个对象
return data_container, False
# 深拷贝以避免修改原始输入除非原始输入是None
container_copy = copy.deepcopy(data_container) if data_container is not None else None
current_level = container_copy
try:
for i, key_or_index in enumerate(path):
is_last_element = (i == len(path) - 1)
if is_last_element:
if isinstance(key_or_index, str):
if not isinstance(current_level, dict):
# 如果当前层级不是字典 (例如是None或是被意外替换为其他类型),无法设置键值对
logger.error(f"[Util] util_set_value_at_path: 路径的最后一部分 '{key_or_index}' (string key) 期望父级是字典,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}")
# 尝试强制转换为字典?这可能不是预期行为。
# 如果 current_level 是 None 且它是根 (container_copy is None),则初始化 container_copy
if current_level is None and i == 0: # 路径只有一级且容器本身是None
container_copy = {}
current_level = container_copy
else: # 更深层级的None或类型错误
return data_container, False
current_level[key_or_index] = new_value
logger.info(f"[Util] 在路径 {'.'.join(map(str,path))} (键 '{key_or_index}') 处设置值为 '{new_value}'")
return container_copy, True
elif isinstance(key_or_index, int): # key_or_index is an integer (list index)
if not isinstance(current_level, list):
logger.error(f"[Util] util_set_value_at_path: 路径的最后一部分索引 '{key_or_index}' 期望父级是列表,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}")
if current_level is None and i == 0: # 路径只有一级且容器本身是None
container_copy = [None] * (key_or_index + 1) # 创建足够长度的列表
current_level = container_copy
else:
return data_container, False
elif isinstance(key_or_index, int):
# 确保列表足够长以容纳索引
while len(current_level) <= key_or_index:
current_level.append(None) # 用 None 填充直到达到所需长度
current_level[key_or_index] = new_value
logger.info(f"[Util] 在路径 {'.'.join(map(str,path))} (索引 '{key_or_index}') 处设置值为 '{new_value}'")
return container_copy, True
else:
logger.error(f"[Util] util_set_value_at_path: 路径的最后一部分 '{key_or_index}' 类型未知。路径: {'.'.join(map(str,path))}")
return data_container, False
else: # Not the last element, traverse deeper
next_key_or_index_is_int = isinstance(path[i+1], int)
if isinstance(key_or_index, str): # Current path part is a dictionary key
if not isinstance(current_level, dict):
# 如果在根级别且容器是None则初始化为字典
if current_level is None and i == 0:
container_copy = {}
current_level = container_copy
else:
logger.error(f"[Util] util_set_value_at_path: 路径期望字典,但在 '{key_or_index}' 处找到 {type(current_level)}。路径: {'.'.join(map(str,path[:i+1]))}")
return data_container, False
if key_or_index not in current_level or current_level[key_or_index] is None or \
(next_key_or_index_is_int and not isinstance(current_level[key_or_index], list)) or \
(not next_key_or_index_is_int and not isinstance(current_level[key_or_index], dict)):
# 如果键不存在或值为None或类型与下一路径部分不匹配则创建/重置
logger.debug(f"[Util] util_set_value_at_path: 在路径 '{key_or_index}' 处创建/重置结构。下一个是索引: {next_key_or_index_is_int}")
current_level[key_or_index] = [] if next_key_or_index_is_int else {}
current_level = current_level[key_or_index]
elif isinstance(key_or_index, int): # Current path part is a list index
if not isinstance(current_level, list):
if current_level is None and i == 0:
container_copy = []
current_level = container_copy
else:
logger.error(f"[Util] util_set_value_at_path: 路径期望列表以应用索引 '{key_or_index}',但找到 {type(current_level)}。路径: {'.'.join(map(str,path[:i+1]))}")
return data_container, False
elif isinstance(key_or_index, int):
# 确保列表足够长以容纳索引,并确保该索引处的元素是正确的类型 (list/dict)
while len(current_level) <= key_or_index:
current_level.append(None) # 用 None 填充
if current_level[key_or_index] is None or \
(next_key_or_index_is_int and not isinstance(current_level[key_or_index], list)) or \
(not next_key_or_index_is_int and not isinstance(current_level[key_or_index], dict)):
logger.debug(f"[Util] util_set_value_at_path: 在列表索引 '{key_or_index}' 处创建/重置结构。下一个是索引: {next_key_or_index_is_int}")
current_level[key_or_index] = [] if next_key_or_index_is_int else {}
current_level = current_level[key_or_index]
else:
logger.error(f"[Util] util_set_value_at_path: 路径部分 '{key_or_index}' 类型未知 ({type(key_or_index)})。路径: {'.'.join(map(str,path[:i+1]))}")
return data_container, False
except Exception as e:
logger.error(f"[Util] 在准备设置字段路径 {'.'.join(map(str,path))} 的值时发生错误: {e}", exc_info=True)
return data_container, False
# Should not be reached if logic is correct, path must have at least one element by initial check.
logger.error(f"[Util] util_set_value_at_path 未能在循环内按预期返回。路径: {'.'.join(map(str,path))}")
return data_container, False
def generate_mismatched_value(
original_type: Optional[str],
original_value: Any,
field_schema: Optional[Dict[str, Any]],
logger_param: Optional[logging.Logger] = None
) -> Any:
"""
(框架辅助方法) 根据原始数据类型、原始值和字段 schema 生成一个类型不匹配的值。
主要用于类型不匹配的测试用例。
Args:
original_type: 字段的原始 OpenAPI 类型 (e.g., "string", "integer").
original_value: 字段的原始值 (当前未直接用于生成逻辑,但可供未来扩展).
field_schema: 字段的 schema 定义,用于检查如 "enum" 之类的约束。
logger_param: 可选的 logger 实例。
Returns:
一个与 original_type 不匹配的值。
"""
effective_logger = logger_param or logger
# 优先考虑 schema 中的 enum选择一个不在 enum 中且类型不匹配的值
if field_schema and "enum" in field_schema and isinstance(field_schema["enum"], list):
enum_values = field_schema["enum"]
# Handle case-insensitive type comparisons
if original_type and original_type.lower() == "string":
if 123 not in enum_values: return 123
if False not in enum_values: return False
# 如果数字和布尔都在枚举中,尝试一个与已知枚举值不同的字符串
# (虽然这仍然是字符串类型,但目的是为了触发非枚举值的验证)
# 或者,如果目的是严格类型不匹配,这里应该返回非字符串。
# 当前逻辑倾向于返回一个肯定非字符串的值。
elif original_type and original_type.lower() == "integer":
if "not-an-integer" not in enum_values: return "not-an-integer"
if 3.14 not in enum_values: return 3.14
elif original_type and original_type.lower() == "number": # Includes float/double
if "not-a-number" not in enum_values: return "not-a-number"
elif original_type and original_type.lower() == "boolean":
if "not-a-boolean" not in enum_values: return "not-a-boolean"
if 1 not in enum_values: return 1
# 如果枚举覆盖了所有简单备选,则回退到下面的通用逻辑
# 通用类型不匹配逻辑 (当 enum 不存在或 enum 检查未返回时)
# Handle case-insensitive type comparisons
if original_type and original_type.lower() == "string":
return 12345 # Number instead of string
elif original_type and original_type.lower() == "integer":
return "not-an-integer" # String instead of integer
elif original_type and original_type.lower() == "number": # Includes float/double
return "not-a-number" # String instead of number
elif original_type and original_type.lower() == "boolean":
return "not-a-boolean" # String instead of boolean
elif original_type and original_type.lower() == "array":
return {"value": "not-an-array"} # Object instead of array
elif original_type and original_type.lower() == "object":
return ["not", "an", "object"] # Array instead of object
effective_logger.warning(f"generate_mismatched_value: 原始类型 '{original_type}' 未知或无法生成不匹配值。将返回固定字符串 'mismatch_test_default'")
return "mismatch_test_default" # Fallback for unknown types
def build_object_schema_for_params(params_spec_list: List[Dict[str, Any]], model_name_base: str, logger_param: Optional[logging.Logger] = None) -> Tuple[Optional[Dict[str, Any]], str]:
"""
从参数规范列表构建一个对象的JSON schema主要用于请求体、查询参数或头部的聚合。
Args:
params_spec_list: 参数规范的列表 (例如OpenAPI参数对象列表)。
model_name_base: 用于生成动态模型名称的基础字符串。
logger_param: 可选的 logger 实例。
Returns:
一个元组,包含 (构建的JSON object schema 或 None, 模型名称字符串)。
"""
effective_logger = logger_param or logger # Use passed logger or module logger
if not params_spec_list:
effective_logger.debug(f"参数列表为空,无需为 '{model_name_base}' 构建对象 schema。")
return None, f"{model_name_base}EmptyParams"
properties = {}
required_fields = []
for param_spec in params_spec_list:
param_name = param_spec.get("name")
if not param_name:
effective_logger.warning(f"参数规范缺少 'name' 字段,已跳过: {param_spec}")
continue
# 从参数规范中提取 schema (OpenAPI 3.x)
param_schema = param_spec.get("schema")
if not param_schema:
# 尝试兼容 OpenAPI 2.0 (Swagger) 的情况,其中类型信息直接在参数级别
# 例如: type, format, items, default, enum 等
# https://swagger.io/specification/v2/#parameterObject
# 注意: 这种兼容性可能不完整,因为很多属性需要映射
compatible_schema = {}
if "type" in param_spec:
compatible_schema["type"] = param_spec["type"]
if "format" in param_spec:
compatible_schema["format"] = param_spec["format"]
if "items" in param_spec: # for array types
compatible_schema["items"] = param_spec["items"]
if "default" in param_spec:
compatible_schema["default"] = param_spec["default"]
if "enum" in param_spec:
compatible_schema["enum"] = param_spec["enum"]
# 其他如 description, example 等也可以考虑加入
if compatible_schema: # 如果至少收集到了一些类型信息
param_schema = compatible_schema
effective_logger.debug(f"参数 '{param_name}' 没有 'schema' 字段,但从顶级字段构建了兼容 schema: {param_schema}")
else:
effective_logger.warning(f"参数 '{param_name}' 缺少 'schema' 字段且无法构建兼容schema已跳过。规范: {param_spec}")
continue
properties[param_name] = param_schema
if param_spec.get("required", False):
required_fields.append(param_name)
if not properties:
effective_logger.debug(f"未能从参数列表为 '{model_name_base}' 提取任何属性。")
return None, f"{model_name_base}NoProps"
final_schema: Dict[str, Any] = {
"type": "object",
"properties": properties
}
if required_fields:
final_schema["required"] = required_fields
# 生成一个稍微独特的名字,以防多个操作有相同的 param_type
# 例如 OperationIdQueryRequest, OperationIdHeaderRequest
model_name = f"{model_name_base.replace(' ', '')}Params"
effective_logger.debug(f"'{model_name_base}' 构建的对象 schema: {final_schema}, 模型名: {model_name}")
return final_schema, model_name
def find_first_simple_type_field_recursive(
current_schema: Dict[str, Any],
current_path: Optional[List[Union[str, int]]] = None,
# full_api_spec_for_refs: Optional[Dict[str, Any]] = None, # Schema is expected to be pre-resolved
logger_param: Optional[logging.Logger] = None
) -> Optional[Tuple[List[Union[str, int]], str, Dict[str, Any]]]:
"""
递归地在给定的 schema 中查找第一个简单类型的字段 (string, integer, number, boolean)。
这包括查找嵌套在对象或数组中的简单类型字段。
Args:
current_schema: 当前正在搜索的 schema 部分 (应为字典)。
current_path: 到达当前 schema 的路径列表 (用于构建完整路径)。
logger_param: 可选的 logger 实例。
Returns:
一个元组 (field_path, field_type, field_schema) 如果找到,否则为 None。
field_path 是一个列表,表示从根 schema 到找到的字段的路径。
field_type 是字段的原始类型字符串 (e.g., "string")。
field_schema 是该字段自身的 schema 定义。
"""
effective_logger = logger_param or logger # Use module logger if specific one not provided
path_so_far = current_path if current_path is not None else []
if not isinstance(current_schema, dict):
effective_logger.debug(f"Schema at path {'.'.join(map(str, path_so_far))} is not a dict, cannot search further.")
return None
schema_type = current_schema.get("type")
# effective_logger.debug(f"Searching in path: {'.'.join(map(str, path_so_far))}, Schema Type: '{schema_type}'")
# Handle both 'object' and 'Object' (case-insensitive)
if schema_type and schema_type.lower() == "object":
properties = current_schema.get("properties", {})
for name, prop_schema in properties.items():
if not isinstance(prop_schema, dict):
effective_logger.debug(f"Property '{name}' at path {'.'.join(map(str, path_so_far + [name]))} has non-dict schema. Skipping.")
continue
prop_type = prop_schema.get("type")
# Handle case-insensitive type checking for simple types
if prop_type and prop_type.lower() in ["string", "integer", "number", "boolean"]:
field_path = path_so_far + [name]
effective_logger.info(f"Found simple type field: Path={'.'.join(map(str, field_path))}, Type={prop_type}")
return field_path, prop_type, prop_schema
elif prop_type and prop_type.lower() == "object":
found_in_nested_object = find_first_simple_type_field_recursive(
prop_schema,
path_so_far + [name],
logger_param=effective_logger
)
if found_in_nested_object:
return found_in_nested_object
elif prop_type and prop_type.lower() == "array":
items_schema = prop_schema.get("items")
if isinstance(items_schema, dict):
# Look for simple type or object within array items
item_type = items_schema.get("type")
# Handle case-insensitive type checking for simple types
if item_type and item_type.lower() in ["string", "integer", "number", "boolean"]:
field_path = path_so_far + [name, 0] # Target first item of the array
effective_logger.info(f"Found simple type field in array item: Path={'.'.join(map(str, field_path))}, Type={item_type}")
return field_path, item_type, items_schema
elif item_type and item_type.lower() == "object":
# Path to the first item of the array, then recurse into that item's object schema
found_in_array_item_object = find_first_simple_type_field_recursive(
items_schema,
path_so_far + [name, 0],
logger_param=effective_logger
)
if found_in_array_item_object:
return found_in_array_item_object
# Handle both 'array' and 'Array' (case-insensitive)
elif schema_type and schema_type.lower() == "array": # If the current_schema itself is an array (e.g., root schema is an array)
items_schema = current_schema.get("items")
if isinstance(items_schema, dict):
item_type = items_schema.get("type")
# Handle case-insensitive type checking for simple types
if item_type and item_type.lower() in ["string", "integer", "number", "boolean"]:
field_path = path_so_far + [0] # Target first item of this root/current array
effective_logger.info(f"Found simple type field in root/current array item: Path={'.'.join(map(str, field_path))}, Type={item_type}")
return field_path, item_type, items_schema
elif item_type and item_type.lower() == "object":
# Path to the first item of this root/current array, then recurse
found_in_root_array_item_object = find_first_simple_type_field_recursive(
items_schema,
path_so_far + [0],
logger_param=effective_logger
)
if found_in_root_array_item_object:
return found_in_root_array_item_object
# effective_logger.debug(f"No simple type field found at path {'.'.join(map(str, path_so_far))}")
return None
def util_extract_range_from_description(description: str, logger_param: Optional[logging.Logger] = None) -> Tuple[Optional[float], Optional[float]]:
"""从描述文本中提取数值范围信息。"""
effective_logger = logger_param or logger
if not description:
return None, None
min_value, max_value = None, None
MIN_VALUE_PATTERNS = [
r'(最小|至少|从|大于等于|不小于|>=|>|minimum|min).*?(\d+(\.\d+)?)',
r'(\d+(\.\d+)?).*?(起|开始|以上|最小|minimum|min)',
]
MAX_VALUE_PATTERNS = [
r'(最大|至多|不超过|不大于|小于等于|<=|<|maximum|max).*?(\d+(\.\d+)?)',
r'(\d+(\.\d+)?).*?(以下|以内|之内|最大|maximum|max)',
]
for pattern in MIN_VALUE_PATTERNS:
matches = re.search(pattern, description, re.IGNORECASE)
if matches:
try:
min_value = float(matches.group(2))
effective_logger.debug(f"'{description}' 中提取到最小值: {min_value}")
break
except (ValueError, IndexError):
continue
for pattern in MAX_VALUE_PATTERNS:
matches = re.search(pattern, description, re.IGNORECASE)
if matches:
try:
max_value = float(matches.group(2))
effective_logger.debug(f"'{description}' 中提取到最大值: {max_value}")
break
except (ValueError, IndexError):
continue
return min_value, max_value
def util_find_ranged_field_recursive(
schema: Dict[str, Any],
path: List[Union[str, int]],
logger_param: Optional[logging.Logger] = None
) -> Optional[Tuple[List[Union[str, int]], Dict[str, Any], Optional[str], Optional[float], Optional[float], str]]:
"""
递归地在schema中寻找第一个具有范围限制的字段。
返回 (路径, schema, 类型, 最小值, 最大值, 描述)。
"""
effective_logger = logger_param or logger
if not isinstance(schema, dict):
return None
schema_type = schema.get('type')
description = schema.get('description', '')
# 检查描述和schema关键字
min_val, max_val = util_extract_range_from_description(description, effective_logger)
schema_min = schema.get('minimum')
schema_max = schema.get('maximum')
if schema_min is not None:
min_val = schema_min
if schema_max is not None:
max_val = schema_max
# 如果找到范围限制,则认为这是一个目标字段
if min_val is not None or max_val is not None:
effective_logger.debug(f"在路径 {'.'.join(map(str, path))} 找到范围受限字段。")
return path, schema, schema_type, min_val, max_val, description
# 递归
# Handle both 'object' and 'Object' (case-insensitive)
if schema_type and schema_type.lower() == 'object' and 'properties' in schema:
for prop_name, prop_schema in schema['properties'].items():
result = util_find_ranged_field_recursive(prop_schema, path + [prop_name], effective_logger)
if result:
return result
# Handle both 'array' and 'Array' (case-insensitive)
if schema_type and schema_type.lower() == 'array' and 'items' in schema and isinstance(schema['items'], dict):
result = util_find_ranged_field_recursive(schema['items'], path + [0], effective_logger)
if result:
return result
return None
def util_extract_enum_from_description(description: str, logger: logging.Logger) -> Optional[List[str]]:
"""
从描述字符串中提取枚举值。
例如: "排序字段key=字段名value=排序方式可选值为ASC、DESC" -> ["ASC", "DESC"]
"""
if not isinstance(description, str):
return None
# 正则表达式匹配 "可选值为" 后面的内容,支持中文冒号和英文冒号,以及括号
# 匹配直到行尾或遇到另一个右括号
patterns = [
r"可选值为[|:]([\w\s,、,]+)",
r"可选值 *[:] *([\w\s,、,]+)",
r"\(可选值为[|:](.*?)\)",
r"(可选值为[|:](.*?)\"
]
found_values = None
for pattern in patterns:
match = re.search(pattern, description)
if match:
found_values = match.group(1).strip()
break
if not found_values:
return None
# 按逗号、顿号、空格分割,并去除每个值前后的空白
enums = [item.strip() for item in re.split(r'[,\s、]+', found_values) if item.strip()]
if enums:
logger.debug(f"Extracted enums: {enums} from description: '{description[:50]}...'")
return enums
return None
def util_find_enum_field_recursive(schema: Dict[str, Any], path: List[Union[str, int]], logger: logging.Logger) -> Optional[Tuple[List[Union[str, int]], List[str], str]]:
"""
递归查找第一个包含可解析枚举值的字段。
"""
if not isinstance(schema, dict):
return None
# 优先检查当前层级的 description
description = schema.get("description", "")
enums = util_extract_enum_from_description(description, logger)
if enums:
return path, enums, description
# 如果当前层级是对象,则递归其属性
if schema.get("type") == "object" and "properties" in schema:
for prop_name, prop_schema in schema.get("properties", {}).items():
new_path = path + [prop_name]
result = util_find_enum_field_recursive(prop_schema, new_path, logger)
if result:
return result
# 如果当前层级是数组,则递归其 items
if schema.get("type") == "array" and "items" in schema:
# 假设我们只检查数组项的结构,路径中用 '[]' 或 index 0 表示
new_path = path + [0]
result = util_find_enum_field_recursive(schema["items"], new_path, logger)
if result:
return result
return None
def normalize_yapi_responses(responses: dict, method: str) -> dict:
"""
将YAPI格式的响应规范标准化为OpenAPI格式。
如果responses为空或不包含任何成功状态码(2XX),则根据请求方法添加默认状态码。
Args:
responses: YAPI或已解析的响应规范字典
method: HTTP请求方法 (GET, POST, PUT, DELETE等)
Returns:
标准化后的响应规范字典
"""
if not responses:
responses = {}
# 检查是否已经包含成功状态码
has_success_code = False
for code in responses.keys():
if code.startswith('2') or code == 'default':
has_success_code = True
break
# 如果没有成功状态码,根据请求方法添加默认状态码
if not has_success_code:
if method.upper() == 'POST':
# POST请求默认使用201 Created
default_code = '201'
default_desc = 'Created'
elif method.upper() == 'DELETE':
# DELETE请求默认使用204 No Content
default_code = '204'
default_desc = 'No Content'
else:
# GET, PUT, PATCH等默认使用200 OK
default_code = '200'
default_desc = 'OK'
# 从responses中查找default响应如果存在则复制其内容
default_response = responses.get('default', {})
if not default_response:
default_response = {'description': default_desc}
elif 'description' not in default_response:
default_response['description'] = default_desc
responses[default_code] = default_response
return responses