612 lines
36 KiB
Python
612 lines
36 KiB
Python
import logging
|
||
import copy
|
||
from typing import Dict, List, Any, Optional, Union, Tuple
|
||
|
||
# 获取模块级别的 logger
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def resolve_json_schema_references(
|
||
schema_to_resolve: Any,
|
||
full_api_spec: Dict[str, Any],
|
||
max_depth: int = 10,
|
||
current_depth: int = 0,
|
||
discard_refs: bool = True # 新增参数,默认为 True
|
||
) -> Any:
|
||
"""
|
||
递归解析JSON Schema中的$ref引用。
|
||
Args:
|
||
schema_to_resolve: 当前需要解析的schema部分 (可以是字典、列表或基本类型)。
|
||
full_api_spec: 完整的API规范字典,用于查找$ref路径。
|
||
max_depth: 最大递归深度,防止无限循环。
|
||
current_depth: 当前递归深度。
|
||
discard_refs: 是否在解析前移除 $ref 和 $$ prefixed 键。
|
||
Returns:
|
||
解析了$ref的schema部分。
|
||
"""
|
||
if current_depth > max_depth:
|
||
logger.warning(f"达到最大$ref解析深度 ({max_depth}),可能存在循环引用。停止进一步解析。 Schema: {str(schema_to_resolve)[:200]}")
|
||
return schema_to_resolve
|
||
|
||
if isinstance(schema_to_resolve, dict):
|
||
current_dict_processing = dict(schema_to_resolve) # 操作副本
|
||
|
||
if discard_refs:
|
||
# 模式1: 丢弃 $ref 和 $$ 开头的键, 然后递归处理剩余值
|
||
ref_value = current_dict_processing.pop("$ref", None)
|
||
if ref_value is not None:
|
||
logger.debug(f"因 discard_refs=True,丢弃 '$ref': {ref_value}")
|
||
|
||
keys_to_remove = [k for k in current_dict_processing if k.startswith("$$")]
|
||
for key_to_remove in keys_to_remove:
|
||
key_val = current_dict_processing.pop(key_to_remove, None)
|
||
logger.debug(f"因 discard_refs=True,丢弃 '{key_to_remove}': {key_val}")
|
||
|
||
# current_dict_processing 已清理完毕,递归处理其值
|
||
resolved_children = {}
|
||
for key, value in current_dict_processing.items():
|
||
resolved_children[key] = resolve_json_schema_references(
|
||
value, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs
|
||
)
|
||
return resolved_children
|
||
|
||
else:
|
||
# 模式2: 尝试解析 $ref (如果存在), 然后递归。$$ 开头的键会保留并递归处理。
|
||
if "$ref" in current_dict_processing:
|
||
ref_path = current_dict_processing["$ref"]
|
||
if not isinstance(ref_path, str) or not ref_path.startswith("#/"):
|
||
logger.warning(f"不支持的$ref格式或外部引用: {ref_path}。在非丢弃模式下,$ref将作为普通键值对处理。")
|
||
# 继续执行后续的常规递归,$ref 将作为 current_dict_processing 中的一个键
|
||
else:
|
||
path_parts = ref_path[2:].split('/')
|
||
target_component_root = full_api_spec
|
||
current_target_component = target_component_root
|
||
valid_path = True
|
||
try:
|
||
for part in path_parts:
|
||
if isinstance(current_target_component, list):
|
||
try:
|
||
part_idx = int(part)
|
||
current_target_component = current_target_component[part_idx]
|
||
except (ValueError, IndexError):
|
||
logger.error(f"路径部分 '{part}' (应为整数索引) 无效或越界于列表。路径: {ref_path}")
|
||
valid_path = False
|
||
break
|
||
elif isinstance(current_target_component, dict):
|
||
if part not in current_target_component:
|
||
logger.error(f"路径部分 '{part}' 在对象中未找到。路径: {ref_path}. 可用键: {list(current_target_component.keys())}")
|
||
valid_path = False
|
||
break
|
||
current_target_component = current_target_component[part]
|
||
else:
|
||
logger.error(f"尝试在非字典/列表类型 ({type(current_target_component)}) 中访问路径部分 '{part}'。路径: {ref_path}")
|
||
valid_path = False
|
||
break
|
||
|
||
if valid_path:
|
||
# $ref 解析成功
|
||
final_schema_after_ref_resolution = copy.deepcopy(current_target_component)
|
||
# 如果解析结果是字典,则将原始 $ref 位置的同级键合并(覆盖)进去
|
||
if isinstance(final_schema_after_ref_resolution, dict):
|
||
for key, value in current_dict_processing.items():
|
||
if key != "$ref": # 合并同级键
|
||
final_schema_after_ref_resolution[key] = value
|
||
# 如果解析结果不是字典(例如,一个数组或原始类型),则同级键实际上被丢弃,
|
||
# 因为返回的是 final_schema_after_ref_resolution 本身。这是 $ref 的标准行为之一。
|
||
|
||
logger.debug(f"成功解析 $ref: '{ref_path}'。将递归解析其内容(可能已与同级键合并)。")
|
||
return resolve_json_schema_references(
|
||
final_schema_after_ref_resolution, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析$ref '{ref_path}' 时发生意外错误: {e}. 将尝试使用同级节点。", exc_info=False)
|
||
valid_path = False
|
||
|
||
# 如果 $ref 解析失败 (valid_path is False 或 try 块中出现异常)
|
||
if not valid_path:
|
||
logger.warning(f"$ref '{ref_path}' 解析失败。将移除 $ref 并处理该对象的其余部分。")
|
||
current_dict_processing.pop("$ref", None)
|
||
# 继续执行后续的常规递归,此时 current_dict_processing 已移除了失败的 $ref
|
||
|
||
# 常规递归 (模式2: 非丢弃模式 / $ref 已处理或移除)
|
||
resolved_children = {}
|
||
for key, value in current_dict_processing.items():
|
||
resolved_children[key] = resolve_json_schema_references(
|
||
value, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs
|
||
)
|
||
return resolved_children
|
||
|
||
elif isinstance(schema_to_resolve, list):
|
||
return [resolve_json_schema_references(item, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs) for item in schema_to_resolve]
|
||
|
||
else: # 原始类型
|
||
return schema_to_resolve
|
||
|
||
def util_find_removable_field_path_recursive(
|
||
current_schema: Dict[str, Any],
|
||
current_path: List[Union[str, int]], # Union added here
|
||
full_api_spec_for_refs: Dict[str, Any],
|
||
# logger_param: Optional[logging.Logger] = None # Option to pass logger
|
||
) -> Optional[List[Union[str, int]]]:
|
||
"""
|
||
(框架辅助方法) 递归查找第一个可移除的必填字段的路径。
|
||
"""
|
||
# effective_logger = logger_param or logger # Use passed logger or module logger
|
||
resolved_schema = resolve_json_schema_references(current_schema, full_api_spec_for_refs)
|
||
|
||
if not isinstance(resolved_schema, dict) or resolved_schema.get("type") != "object":
|
||
return None
|
||
|
||
required_fields_at_current_level = resolved_schema.get("required", [])
|
||
properties = resolved_schema.get("properties", {})
|
||
logger.debug(f"[Util] 递归查找路径: {current_path}, 当前层级必填字段: {required_fields_at_current_level}, 属性: {list(properties.keys())}")
|
||
|
||
if required_fields_at_current_level and properties:
|
||
for field_name in required_fields_at_current_level:
|
||
if field_name in properties:
|
||
logger.info(f"[Util] 策略1: 在路径 {'.'.join(map(str,current_path)) if current_path else 'root'} 找到可直接移除的必填字段: '{field_name}'")
|
||
return current_path + [field_name]
|
||
|
||
if properties:
|
||
for prop_name, prop_schema_orig in properties.items():
|
||
prop_schema = resolve_json_schema_references(prop_schema_orig, full_api_spec_for_refs)
|
||
if isinstance(prop_schema, dict) and prop_schema.get("type") == "array":
|
||
items_schema_orig = prop_schema.get("items")
|
||
if isinstance(items_schema_orig, dict):
|
||
items_schema = resolve_json_schema_references(items_schema_orig, full_api_spec_for_refs)
|
||
if isinstance(items_schema, dict) and items_schema.get("type") == "object":
|
||
item_required_fields = items_schema.get("required", [])
|
||
item_properties = items_schema.get("properties", {})
|
||
if item_required_fields and item_properties:
|
||
first_required_field_in_item = next((rf for rf in item_required_fields if rf in item_properties), None)
|
||
if first_required_field_in_item:
|
||
logger.info(f"[Util] 策略2: 在数组属性 '{prop_name}' (路径 {'.'.join(map(str,current_path)) if current_path else 'root'}) 的元素内找到必填字段: '{first_required_field_in_item}'. 路径: {current_path + [prop_name, 0, first_required_field_in_item]}")
|
||
return current_path + [prop_name, 0, first_required_field_in_item]
|
||
|
||
logger.debug(f"[Util] 在路径 {'.'.join(map(str,current_path)) if current_path else 'root'} 未通过任何策略找到可移除的必填字段。")
|
||
return None
|
||
|
||
def util_remove_value_at_path(
|
||
data_container: Any,
|
||
path: List[Union[str, int]],
|
||
# logger_param: Optional[logging.Logger] = None
|
||
) -> Tuple[Any, Any, bool]:
|
||
"""
|
||
(框架辅助方法) 从嵌套的字典/列表中移除指定路径的值。
|
||
返回 (修改后的容器, 被移除的值, 是否成功)。
|
||
"""
|
||
# effective_logger = logger_param or logger
|
||
if not path:
|
||
logger.error("[Util] util_remove_value_at_path: 路径不能为空。")
|
||
return data_container, None, False
|
||
|
||
if data_container is None:
|
||
if isinstance(path[0], str):
|
||
container_copy = {}
|
||
elif isinstance(path[0], int):
|
||
container_copy = []
|
||
else:
|
||
logger.error(f"[Util] util_remove_value_at_path: 路径的第一个元素 '{path[0]}' 类型未知。")
|
||
return data_container, None, False
|
||
else:
|
||
container_copy = copy.deepcopy(data_container)
|
||
|
||
current_level = container_copy
|
||
original_value = None
|
||
|
||
try:
|
||
for i, key_or_index in enumerate(path):
|
||
is_last_element = (i == len(path) - 1)
|
||
if is_last_element:
|
||
if isinstance(key_or_index, str):
|
||
if isinstance(current_level, dict) and key_or_index in current_level:
|
||
original_value = current_level.pop(key_or_index)
|
||
logger.info(f"[Util] 从路径 '{'.'.join(map(str,path))}' 成功移除字段 '{key_or_index}' (原值: '{original_value}')。")
|
||
return container_copy, original_value, True
|
||
elif isinstance(current_level, dict):
|
||
logger.warning(f"[Util] 路径的最后一部分 '{key_or_index}' (string key) 在对象中未找到。路径: {'.'.join(map(str,path))}")
|
||
return container_copy, None, False
|
||
else:
|
||
logger.error(f"[Util] 路径的最后一部分 '{key_or_index}' (string key) 期望父级是字典,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}")
|
||
return data_container, None, False
|
||
else:
|
||
if isinstance(current_level, list) and isinstance(key_or_index, int) and 0 <= key_or_index < len(current_level):
|
||
original_value = current_level.pop(key_or_index)
|
||
logger.info(f"[Util] 从路径 '{'.'.join(map(str,path))}' 成功移除索引 '{key_or_index}' 的元素 (原值: '{original_value}')。")
|
||
return container_copy, original_value, True
|
||
elif isinstance(current_level, list):
|
||
logger.warning(f"[Util] 路径的最后一部分索引 '{key_or_index}' 超出列表范围或类型不符。列表长度: {len(current_level)}. 路径: {'.'.join(map(str,path))}")
|
||
return container_copy, None, False
|
||
else:
|
||
logger.error(f"[Util] 路径的最后一部分 '{key_or_index}' 期望父级是列表,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}")
|
||
return data_container, None, False
|
||
else:
|
||
next_key_or_index = path[i+1]
|
||
if isinstance(key_or_index, str):
|
||
if not isinstance(current_level, dict):
|
||
if current_level is container_copy and not container_copy :
|
||
current_level = {}
|
||
if i == 0: container_copy = current_level
|
||
else:
|
||
logger.error(f"[Util] 无法在非根级别从非字典创建路径。")
|
||
return data_container, None, False
|
||
else:
|
||
logger.error(f"[Util] 路径期望字典,但在 '{key_or_index}' 处找到 {type(current_level)}。")
|
||
return data_container, None, False
|
||
|
||
if isinstance(next_key_or_index, int):
|
||
if key_or_index not in current_level or not isinstance(current_level.get(key_or_index), list):
|
||
current_level[key_or_index] = []
|
||
current_level = current_level[key_or_index]
|
||
else:
|
||
if key_or_index not in current_level or not isinstance(current_level.get(key_or_index), dict):
|
||
current_level[key_or_index] = {}
|
||
current_level = current_level[key_or_index]
|
||
|
||
elif isinstance(key_or_index, int):
|
||
if not isinstance(current_level, list):
|
||
logger.error(f"[Util] 路径期望列表以应用索引 '{key_or_index}',但找到 {type(current_level)}。")
|
||
return data_container, None, False
|
||
|
||
while len(current_level) <= key_or_index:
|
||
if isinstance(next_key_or_index, str):
|
||
current_level.append({})
|
||
else:
|
||
current_level.append([])
|
||
|
||
if isinstance(next_key_or_index, str):
|
||
if not isinstance(current_level[key_or_index], dict):
|
||
current_level[key_or_index] = {}
|
||
elif isinstance(next_key_or_index, int):
|
||
if not isinstance(current_level[key_or_index], list):
|
||
current_level[key_or_index] = []
|
||
|
||
current_level = current_level[key_or_index]
|
||
|
||
else:
|
||
logger.error(f"[Util] 路径部分 '{key_or_index}' 类型未知 ({type(key_or_index)}).")
|
||
return data_container, None, False
|
||
except Exception as e:
|
||
logger.error(f"[Util] 在准备移除字段路径 '{'.'.join(map(str,path))}' 时发生错误: {e}", exc_info=True)
|
||
return data_container, None, False
|
||
|
||
logger.error(f"[Util] util_remove_value_at_path 未能在循环内按预期返回。路径: {'.'.join(map(str,path))}")
|
||
return data_container, None, False
|
||
|
||
def util_set_value_at_path(
|
||
data_container: Any,
|
||
path: List[Union[str, int]],
|
||
new_value: Any,
|
||
# logger_param: Optional[logging.Logger] = None
|
||
) -> Tuple[Any, bool]:
|
||
"""
|
||
(框架辅助方法) 在嵌套的字典/列表中为指定路径设置新值。
|
||
如果路径中的某些部分不存在,会尝试创建它们 (字典会创建,列表会尝试填充到指定索引,但需谨慎)。
|
||
返回 (修改后的容器, 是否成功)。
|
||
"""
|
||
# effective_logger = logger_param or logger
|
||
if not path:
|
||
logger.error("[Util] util_set_value_at_path: 路径不能为空。")
|
||
# 如果路径为空,是否应该用 new_value 替换整个 data_container?
|
||
# 当前行为:返回原始容器和失败状态,因为路径通常指向容器内部。
|
||
# 如果要支持替换整个容器,需要明确此行为。
|
||
if data_container is None and new_value is not None: # 特殊情况:如果原始容器是None,且路径为空,则新值成为容器
|
||
logger.info("[Util] util_set_value_at_path: 路径为空,原始容器为None,新值将作为新容器返回。")
|
||
return new_value, True
|
||
elif data_container is not None and new_value is None and not path: # 路径为空,新值为None,则清空容器
|
||
logger.info("[Util] util_set_value_at_path: 路径为空,新值为None,容器将被清空 (返回None)。")
|
||
return None, True
|
||
# 对于路径为空且两者都不是None的情况,目前返回失败,因为通常期望有路径。
|
||
# 或者可以考虑直接返回 new_value,意味着整个对象被替换。
|
||
# logger.info(f"[Util] util_set_value_at_path: Path is empty. Replacing entire container.")
|
||
# return new_value, True # 备选行为:替换整个对象
|
||
return data_container, False
|
||
|
||
# 深拷贝以避免修改原始输入,除非原始输入是None
|
||
container_copy = copy.deepcopy(data_container) if data_container is not None else None
|
||
|
||
current_level = container_copy
|
||
|
||
try:
|
||
for i, key_or_index in enumerate(path):
|
||
is_last_element = (i == len(path) - 1)
|
||
|
||
if is_last_element:
|
||
if isinstance(key_or_index, str):
|
||
if not isinstance(current_level, dict):
|
||
# 如果当前层级不是字典 (例如是None,或是被意外替换为其他类型),无法设置键值对
|
||
logger.error(f"[Util] util_set_value_at_path: 路径的最后一部分 '{key_or_index}' (string key) 期望父级是字典,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}")
|
||
# 尝试强制转换为字典?这可能不是预期行为。
|
||
# 如果 current_level 是 None 且它是根 (container_copy is None),则初始化 container_copy
|
||
if current_level is None and i == 0: # 路径只有一级,且容器本身是None
|
||
container_copy = {}
|
||
current_level = container_copy
|
||
else: # 更深层级的None或类型错误
|
||
return data_container, False
|
||
current_level[key_or_index] = new_value
|
||
logger.info(f"[Util] 在路径 {'.'.join(map(str,path))} (键 '{key_or_index}') 处设置值为 '{new_value}'")
|
||
return container_copy, True
|
||
elif isinstance(key_or_index, int): # key_or_index is an integer (list index)
|
||
if not isinstance(current_level, list):
|
||
logger.error(f"[Util] util_set_value_at_path: 路径的最后一部分索引 '{key_or_index}' 期望父级是列表,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}")
|
||
if current_level is None and i == 0: # 路径只有一级,且容器本身是None
|
||
container_copy = [None] * (key_or_index + 1) # 创建足够长度的列表
|
||
current_level = container_copy
|
||
else:
|
||
return data_container, False
|
||
elif isinstance(key_or_index, int):
|
||
# 确保列表足够长以容纳索引
|
||
while len(current_level) <= key_or_index:
|
||
current_level.append(None) # 用 None 填充直到达到所需长度
|
||
current_level[key_or_index] = new_value
|
||
logger.info(f"[Util] 在路径 {'.'.join(map(str,path))} (索引 '{key_or_index}') 处设置值为 '{new_value}'")
|
||
return container_copy, True
|
||
else:
|
||
logger.error(f"[Util] util_set_value_at_path: 路径的最后一部分 '{key_or_index}' 类型未知。路径: {'.'.join(map(str,path))}")
|
||
return data_container, False
|
||
else: # Not the last element, traverse deeper
|
||
next_key_or_index_is_int = isinstance(path[i+1], int)
|
||
|
||
if isinstance(key_or_index, str): # Current path part is a dictionary key
|
||
if not isinstance(current_level, dict):
|
||
# 如果在根级别且容器是None,则初始化为字典
|
||
if current_level is None and i == 0:
|
||
container_copy = {}
|
||
current_level = container_copy
|
||
else:
|
||
logger.error(f"[Util] util_set_value_at_path: 路径期望字典,但在 '{key_or_index}' 处找到 {type(current_level)}。路径: {'.'.join(map(str,path[:i+1]))}")
|
||
return data_container, False
|
||
if key_or_index not in current_level or current_level[key_or_index] is None or \
|
||
(next_key_or_index_is_int and not isinstance(current_level[key_or_index], list)) or \
|
||
(not next_key_or_index_is_int and not isinstance(current_level[key_or_index], dict)):
|
||
# 如果键不存在,或值为None,或类型与下一路径部分不匹配,则创建/重置
|
||
logger.debug(f"[Util] util_set_value_at_path: 在路径 '{key_or_index}' 处创建/重置结构。下一个是索引: {next_key_or_index_is_int}")
|
||
current_level[key_or_index] = [] if next_key_or_index_is_int else {}
|
||
current_level = current_level[key_or_index]
|
||
elif isinstance(key_or_index, int): # Current path part is a list index
|
||
if not isinstance(current_level, list):
|
||
if current_level is None and i == 0:
|
||
container_copy = []
|
||
current_level = container_copy
|
||
else:
|
||
logger.error(f"[Util] util_set_value_at_path: 路径期望列表以应用索引 '{key_or_index}',但找到 {type(current_level)}。路径: {'.'.join(map(str,path[:i+1]))}")
|
||
return data_container, False
|
||
elif isinstance(key_or_index, int):
|
||
# 确保列表足够长以容纳索引,并确保该索引处的元素是正确的类型 (list/dict)
|
||
while len(current_level) <= key_or_index:
|
||
current_level.append(None) # 用 None 填充
|
||
if current_level[key_or_index] is None or \
|
||
(next_key_or_index_is_int and not isinstance(current_level[key_or_index], list)) or \
|
||
(not next_key_or_index_is_int and not isinstance(current_level[key_or_index], dict)):
|
||
logger.debug(f"[Util] util_set_value_at_path: 在列表索引 '{key_or_index}' 处创建/重置结构。下一个是索引: {next_key_or_index_is_int}")
|
||
current_level[key_or_index] = [] if next_key_or_index_is_int else {}
|
||
current_level = current_level[key_or_index]
|
||
else:
|
||
logger.error(f"[Util] util_set_value_at_path: 路径部分 '{key_or_index}' 类型未知 ({type(key_or_index)})。路径: {'.'.join(map(str,path[:i+1]))}")
|
||
return data_container, False
|
||
except Exception as e:
|
||
logger.error(f"[Util] 在准备设置字段路径 {'.'.join(map(str,path))} 的值时发生错误: {e}", exc_info=True)
|
||
return data_container, False
|
||
|
||
# Should not be reached if logic is correct, path must have at least one element by initial check.
|
||
logger.error(f"[Util] util_set_value_at_path 未能在循环内按预期返回。路径: {'.'.join(map(str,path))}")
|
||
return data_container, False
|
||
|
||
def generate_mismatched_value(
|
||
original_type: Optional[str],
|
||
original_value: Any,
|
||
field_schema: Optional[Dict[str, Any]],
|
||
logger_param: Optional[logging.Logger] = None
|
||
) -> Any:
|
||
"""
|
||
(框架辅助方法) 根据原始数据类型、原始值和字段 schema 生成一个类型不匹配的值。
|
||
主要用于类型不匹配的测试用例。
|
||
Args:
|
||
original_type: 字段的原始 OpenAPI 类型 (e.g., "string", "integer").
|
||
original_value: 字段的原始值 (当前未直接用于生成逻辑,但可供未来扩展).
|
||
field_schema: 字段的 schema 定义,用于检查如 "enum" 之类的约束。
|
||
logger_param: 可选的 logger 实例。
|
||
Returns:
|
||
一个与 original_type 不匹配的值。
|
||
"""
|
||
effective_logger = logger_param or logger
|
||
|
||
# 优先考虑 schema 中的 enum,选择一个不在 enum 中且类型不匹配的值
|
||
if field_schema and "enum" in field_schema and isinstance(field_schema["enum"], list):
|
||
enum_values = field_schema["enum"]
|
||
if original_type == "string":
|
||
if 123 not in enum_values: return 123
|
||
if False not in enum_values: return False
|
||
# 如果数字和布尔都在枚举中,尝试一个与已知枚举值不同的字符串
|
||
# (虽然这仍然是字符串类型,但目的是为了触发非枚举值的验证)
|
||
# 或者,如果目的是严格类型不匹配,这里应该返回非字符串。
|
||
# 当前逻辑倾向于返回一个肯定非字符串的值。
|
||
elif original_type == "integer":
|
||
if "not-an-integer" not in enum_values: return "not-an-integer"
|
||
if 3.14 not in enum_values: return 3.14
|
||
elif original_type == "number": # Includes float/double
|
||
if "not-a-number" not in enum_values: return "not-a-number"
|
||
elif original_type == "boolean":
|
||
if "not-a-boolean" not in enum_values: return "not-a-boolean"
|
||
if 1 not in enum_values: return 1
|
||
# 如果枚举覆盖了所有简单备选,则回退到下面的通用逻辑
|
||
|
||
# 通用类型不匹配逻辑 (当 enum 不存在或 enum 检查未返回时)
|
||
if original_type == "string":
|
||
return 12345 # Number instead of string
|
||
elif original_type == "integer":
|
||
return "not-an-integer" # String instead of integer
|
||
elif original_type == "number": # Includes float/double
|
||
return "not-a-number" # String instead of number
|
||
elif original_type == "boolean":
|
||
return "not-a-boolean" # String instead of boolean
|
||
elif original_type == "array":
|
||
return {"value": "not-an-array"} # Object instead of array
|
||
elif original_type == "object":
|
||
return ["not", "an", "object"] # Array instead of object
|
||
|
||
effective_logger.warning(f"generate_mismatched_value: 原始类型 '{original_type}' 未知或无法生成不匹配值。将返回固定字符串 'mismatch_test_default'。")
|
||
return "mismatch_test_default" # Fallback for unknown types
|
||
|
||
def build_object_schema_for_params(params_spec_list: List[Dict[str, Any]], model_name_base: str, logger_param: Optional[logging.Logger] = None) -> Tuple[Optional[Dict[str, Any]], str]:
|
||
"""
|
||
从参数规范列表构建一个对象的JSON schema,主要用于请求体、查询参数或头部的聚合。
|
||
Args:
|
||
params_spec_list: 参数规范的列表 (例如,OpenAPI参数对象列表)。
|
||
model_name_base: 用于生成动态模型名称的基础字符串。
|
||
logger_param: 可选的 logger 实例。
|
||
Returns:
|
||
一个元组,包含 (构建的JSON object schema 或 None, 模型名称字符串)。
|
||
"""
|
||
effective_logger = logger_param or logger # Use passed logger or module logger
|
||
|
||
if not params_spec_list:
|
||
effective_logger.debug(f"参数列表为空,无需为 '{model_name_base}' 构建对象 schema。")
|
||
return None, f"{model_name_base}EmptyParams"
|
||
|
||
properties = {}
|
||
required_fields = []
|
||
|
||
for param_spec in params_spec_list:
|
||
param_name = param_spec.get("name")
|
||
if not param_name:
|
||
effective_logger.warning(f"参数规范缺少 'name' 字段,已跳过: {param_spec}")
|
||
continue
|
||
|
||
# 从参数规范中提取 schema (OpenAPI 3.x)
|
||
param_schema = param_spec.get("schema")
|
||
if not param_schema:
|
||
# 尝试兼容 OpenAPI 2.0 (Swagger) 的情况,其中类型信息直接在参数级别
|
||
# 例如: type, format, items, default, enum 等
|
||
# https://swagger.io/specification/v2/#parameterObject
|
||
# 注意: 这种兼容性可能不完整,因为很多属性需要映射
|
||
compatible_schema = {}
|
||
if "type" in param_spec:
|
||
compatible_schema["type"] = param_spec["type"]
|
||
if "format" in param_spec:
|
||
compatible_schema["format"] = param_spec["format"]
|
||
if "items" in param_spec: # for array types
|
||
compatible_schema["items"] = param_spec["items"]
|
||
if "default" in param_spec:
|
||
compatible_schema["default"] = param_spec["default"]
|
||
if "enum" in param_spec:
|
||
compatible_schema["enum"] = param_spec["enum"]
|
||
# 其他如 description, example 等也可以考虑加入
|
||
if compatible_schema: # 如果至少收集到了一些类型信息
|
||
param_schema = compatible_schema
|
||
effective_logger.debug(f"参数 '{param_name}' 没有 'schema' 字段,但从顶级字段构建了兼容 schema: {param_schema}")
|
||
else:
|
||
effective_logger.warning(f"参数 '{param_name}' 缺少 'schema' 字段且无法构建兼容schema,已跳过。规范: {param_spec}")
|
||
continue
|
||
|
||
properties[param_name] = param_schema
|
||
if param_spec.get("required", False):
|
||
required_fields.append(param_name)
|
||
|
||
if not properties:
|
||
effective_logger.debug(f"未能从参数列表为 '{model_name_base}' 提取任何属性。")
|
||
return None, f"{model_name_base}NoProps"
|
||
|
||
final_schema: Dict[str, Any] = {
|
||
"type": "object",
|
||
"properties": properties
|
||
}
|
||
if required_fields:
|
||
final_schema["required"] = required_fields
|
||
|
||
# 生成一个稍微独特的名字,以防多个操作有相同的 param_type
|
||
# 例如 OperationIdQueryRequest, OperationIdHeaderRequest
|
||
model_name = f"{model_name_base.replace(' ', '')}Params"
|
||
effective_logger.debug(f"为 '{model_name_base}' 构建的对象 schema: {final_schema}, 模型名: {model_name}")
|
||
return final_schema, model_name
|
||
|
||
def find_first_simple_type_field_recursive(
|
||
current_schema: Dict[str, Any],
|
||
current_path: Optional[List[Union[str, int]]] = None,
|
||
# full_api_spec_for_refs: Optional[Dict[str, Any]] = None, # Schema is expected to be pre-resolved
|
||
logger_param: Optional[logging.Logger] = None
|
||
) -> Optional[Tuple[List[Union[str, int]], str, Dict[str, Any]]]:
|
||
"""
|
||
递归地在给定的 schema 中查找第一个简单类型的字段 (string, integer, number, boolean)。
|
||
这包括查找嵌套在对象或数组中的简单类型字段。
|
||
|
||
Args:
|
||
current_schema: 当前正在搜索的 schema 部分 (应为字典)。
|
||
current_path: 到达当前 schema 的路径列表 (用于构建完整路径)。
|
||
logger_param: 可选的 logger 实例。
|
||
|
||
Returns:
|
||
一个元组 (field_path, field_type, field_schema) 如果找到,否则为 None。
|
||
field_path 是一个列表,表示从根 schema 到找到的字段的路径。
|
||
field_type 是字段的原始类型字符串 (e.g., "string")。
|
||
field_schema 是该字段自身的 schema 定义。
|
||
"""
|
||
effective_logger = logger_param or logger # Use module logger if specific one not provided
|
||
path_so_far = current_path if current_path is not None else []
|
||
|
||
if not isinstance(current_schema, dict):
|
||
effective_logger.debug(f"Schema at path {'.'.join(map(str, path_so_far))} is not a dict, cannot search further.")
|
||
return None
|
||
|
||
schema_type = current_schema.get("type")
|
||
# effective_logger.debug(f"Searching in path: {'.'.join(map(str, path_so_far))}, Schema Type: '{schema_type}'")
|
||
|
||
if schema_type == "object":
|
||
properties = current_schema.get("properties", {})
|
||
for name, prop_schema in properties.items():
|
||
if not isinstance(prop_schema, dict):
|
||
effective_logger.debug(f"Property '{name}' at path {'.'.join(map(str, path_so_far + [name]))} has non-dict schema. Skipping.")
|
||
continue
|
||
|
||
prop_type = prop_schema.get("type")
|
||
if prop_type in ["string", "integer", "number", "boolean"]:
|
||
field_path = path_so_far + [name]
|
||
effective_logger.info(f"Found simple type field: Path={'.'.join(map(str, field_path))}, Type={prop_type}")
|
||
return field_path, prop_type, prop_schema
|
||
elif prop_type == "object":
|
||
found_in_nested_object = find_first_simple_type_field_recursive(
|
||
prop_schema,
|
||
path_so_far + [name],
|
||
logger_param=effective_logger
|
||
)
|
||
if found_in_nested_object:
|
||
return found_in_nested_object
|
||
elif prop_type == "array":
|
||
items_schema = prop_schema.get("items")
|
||
if isinstance(items_schema, dict):
|
||
# Look for simple type or object within array items
|
||
item_type = items_schema.get("type")
|
||
if item_type in ["string", "integer", "number", "boolean"]:
|
||
field_path = path_so_far + [name, 0] # Target first item of the array
|
||
effective_logger.info(f"Found simple type field in array item: Path={'.'.join(map(str, field_path))}, Type={item_type}")
|
||
return field_path, item_type, items_schema
|
||
elif item_type == "object":
|
||
# Path to the first item of the array, then recurse into that item's object schema
|
||
found_in_array_item_object = find_first_simple_type_field_recursive(
|
||
items_schema,
|
||
path_so_far + [name, 0],
|
||
logger_param=effective_logger
|
||
)
|
||
if found_in_array_item_object:
|
||
return found_in_array_item_object
|
||
|
||
elif schema_type == "array": # If the current_schema itself is an array (e.g., root schema is an array)
|
||
items_schema = current_schema.get("items")
|
||
if isinstance(items_schema, dict):
|
||
item_type = items_schema.get("type")
|
||
if item_type in ["string", "integer", "number", "boolean"]:
|
||
field_path = path_so_far + [0] # Target first item of this root/current array
|
||
effective_logger.info(f"Found simple type field in root/current array item: Path={'.'.join(map(str, field_path))}, Type={item_type}")
|
||
return field_path, item_type, items_schema
|
||
elif item_type == "object":
|
||
# Path to the first item of this root/current array, then recurse
|
||
found_in_root_array_item_object = find_first_simple_type_field_recursive(
|
||
items_schema,
|
||
path_so_far + [0],
|
||
logger_param=effective_logger
|
||
)
|
||
if found_in_root_array_item_object:
|
||
return found_in_root_array_item_object
|
||
|
||
# effective_logger.debug(f"No simple type field found at path {'.'.join(map(str, path_so_far))}")
|
||
return None |