import logging import copy from typing import Dict, List, Any, Optional, Union, Tuple import re # 获取模块级别的 logger logger = logging.getLogger(__name__) def resolve_json_schema_references( schema_to_resolve: Any, full_api_spec: Dict[str, Any], max_depth: int = 10, current_depth: int = 0, discard_refs: bool = True # 新增参数,默认为 True ) -> Any: """ 递归解析JSON Schema中的$ref引用。 Args: schema_to_resolve: 当前需要解析的schema部分 (可以是字典、列表或基本类型)。 full_api_spec: 完整的API规范字典,用于查找$ref路径。 max_depth: 最大递归深度,防止无限循环。 current_depth: 当前递归深度。 discard_refs: 是否在解析前移除 $ref 和 $$ prefixed 键。 Returns: 解析了$ref的schema部分。 """ if current_depth > max_depth: logger.warning(f"达到最大$ref解析深度 ({max_depth}),可能存在循环引用。停止进一步解析。 Schema: {str(schema_to_resolve)[:200]}") return schema_to_resolve if isinstance(schema_to_resolve, dict): current_dict_processing = dict(schema_to_resolve) # 操作副本 if discard_refs: # 模式1: 丢弃 $ref 和 $$ 开头的键, 然后递归处理剩余值 ref_value = current_dict_processing.pop("$ref", None) if ref_value is not None: logger.debug(f"因 discard_refs=True,丢弃 '$ref': {ref_value}") keys_to_remove = [k for k in current_dict_processing if k.startswith("$$")] for key_to_remove in keys_to_remove: key_val = current_dict_processing.pop(key_to_remove, None) logger.debug(f"因 discard_refs=True,丢弃 '{key_to_remove}': {key_val}") # current_dict_processing 已清理完毕,递归处理其值 resolved_children = {} for key, value in current_dict_processing.items(): resolved_children[key] = resolve_json_schema_references( value, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs ) return resolved_children else: # 模式2: 尝试解析 $ref (如果存在), 然后递归。$$ 开头的键会保留并递归处理。 if "$ref" in current_dict_processing: ref_path = current_dict_processing["$ref"] if not isinstance(ref_path, str) or not ref_path.startswith("#/"): logger.warning(f"不支持的$ref格式或外部引用: {ref_path}。在非丢弃模式下,$ref将作为普通键值对处理。") # 继续执行后续的常规递归,$ref 将作为 current_dict_processing 中的一个键 else: path_parts = ref_path[2:].split('/') target_component_root = full_api_spec current_target_component = target_component_root valid_path = True try: for part in path_parts: if isinstance(current_target_component, list): try: part_idx = int(part) current_target_component = current_target_component[part_idx] except (ValueError, IndexError): logger.error(f"路径部分 '{part}' (应为整数索引) 无效或越界于列表。路径: {ref_path}") valid_path = False break elif isinstance(current_target_component, dict): if part not in current_target_component: logger.error(f"路径部分 '{part}' 在对象中未找到。路径: {ref_path}. 可用键: {list(current_target_component.keys())}") valid_path = False break current_target_component = current_target_component[part] else: logger.error(f"尝试在非字典/列表类型 ({type(current_target_component)}) 中访问路径部分 '{part}'。路径: {ref_path}") valid_path = False break if valid_path: # $ref 解析成功 final_schema_after_ref_resolution = copy.deepcopy(current_target_component) # 如果解析结果是字典,则将原始 $ref 位置的同级键合并(覆盖)进去 if isinstance(final_schema_after_ref_resolution, dict): for key, value in current_dict_processing.items(): if key != "$ref": # 合并同级键 final_schema_after_ref_resolution[key] = value # 如果解析结果不是字典(例如,一个数组或原始类型),则同级键实际上被丢弃, # 因为返回的是 final_schema_after_ref_resolution 本身。这是 $ref 的标准行为之一。 logger.debug(f"成功解析 $ref: '{ref_path}'。将递归解析其内容(可能已与同级键合并)。") return resolve_json_schema_references( final_schema_after_ref_resolution, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs ) except Exception as e: logger.error(f"解析$ref '{ref_path}' 时发生意外错误: {e}. 将尝试使用同级节点。", exc_info=False) valid_path = False # 如果 $ref 解析失败 (valid_path is False 或 try 块中出现异常) if not valid_path: logger.warning(f"$ref '{ref_path}' 解析失败。将移除 $ref 并处理该对象的其余部分。") current_dict_processing.pop("$ref", None) # 继续执行后续的常规递归,此时 current_dict_processing 已移除了失败的 $ref # 常规递归 (模式2: 非丢弃模式 / $ref 已处理或移除) resolved_children = {} for key, value in current_dict_processing.items(): resolved_children[key] = resolve_json_schema_references( value, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs ) return resolved_children elif isinstance(schema_to_resolve, list): return [resolve_json_schema_references(item, full_api_spec, max_depth, current_depth + 1, discard_refs=discard_refs) for item in schema_to_resolve] else: # 原始类型 return schema_to_resolve def util_find_removable_field_path_recursive( current_schema: Dict[str, Any], current_path: List[Union[str, int]], # Union added here full_api_spec_for_refs: Dict[str, Any], # logger_param: Optional[logging.Logger] = None # Option to pass logger ) -> Optional[List[Union[str, int]]]: """ (框架辅助方法) 递归查找第一个可移除的必填字段的路径。 """ # effective_logger = logger_param or logger # Use passed logger or module logger resolved_schema = resolve_json_schema_references(current_schema, full_api_spec_for_refs) if not isinstance(resolved_schema, dict) or resolved_schema.get("type") != "object": return None required_fields_at_current_level = resolved_schema.get("required", []) properties = resolved_schema.get("properties", {}) logger.debug(f"[Util] 递归查找路径: {current_path}, 当前层级必填字段: {required_fields_at_current_level}, 属性: {list(properties.keys())}") if required_fields_at_current_level and properties: for field_name in required_fields_at_current_level: if field_name in properties: logger.info(f"[Util] 策略1: 在路径 {'.'.join(map(str,current_path)) if current_path else 'root'} 找到可直接移除的必填字段: '{field_name}'") return current_path + [field_name] if properties: for prop_name, prop_schema_orig in properties.items(): prop_schema = resolve_json_schema_references(prop_schema_orig, full_api_spec_for_refs) if isinstance(prop_schema, dict) and prop_schema.get("type") == "array": items_schema_orig = prop_schema.get("items") if isinstance(items_schema_orig, dict): items_schema = resolve_json_schema_references(items_schema_orig, full_api_spec_for_refs) if isinstance(items_schema, dict) and items_schema.get("type") == "object": item_required_fields = items_schema.get("required", []) item_properties = items_schema.get("properties", {}) if item_required_fields and item_properties: first_required_field_in_item = next((rf for rf in item_required_fields if rf in item_properties), None) if first_required_field_in_item: logger.info(f"[Util] 策略2: 在数组属性 '{prop_name}' (路径 {'.'.join(map(str,current_path)) if current_path else 'root'}) 的元素内找到必填字段: '{first_required_field_in_item}'. 路径: {current_path + [prop_name, 0, first_required_field_in_item]}") return current_path + [prop_name, 0, first_required_field_in_item] logger.debug(f"[Util] 在路径 {'.'.join(map(str,current_path)) if current_path else 'root'} 未通过任何策略找到可移除的必填字段。") return None def util_remove_value_at_path( data_container: Any, path: List[Union[str, int]], # logger_param: Optional[logging.Logger] = None ) -> Tuple[Any, Any, bool]: """ (框架辅助方法) 从嵌套的字典/列表中移除指定路径的值。 返回 (修改后的容器, 被移除的值, 是否成功)。 """ # effective_logger = logger_param or logger if not path: logger.error("[Util] util_remove_value_at_path: 路径不能为空。") return data_container, None, False if data_container is None: if isinstance(path[0], str): container_copy = {} elif isinstance(path[0], int): container_copy = [] else: logger.error(f"[Util] util_remove_value_at_path: 路径的第一个元素 '{path[0]}' 类型未知。") return data_container, None, False else: container_copy = copy.deepcopy(data_container) current_level = container_copy original_value = None try: for i, key_or_index in enumerate(path): is_last_element = (i == len(path) - 1) if is_last_element: if isinstance(key_or_index, str): if isinstance(current_level, dict) and key_or_index in current_level: original_value = current_level.pop(key_or_index) logger.info(f"[Util] 从路径 '{'.'.join(map(str,path))}' 成功移除字段 '{key_or_index}' (原值: '{original_value}')。") return container_copy, original_value, True elif isinstance(current_level, dict): logger.warning(f"[Util] 路径的最后一部分 '{key_or_index}' (string key) 在对象中未找到。路径: {'.'.join(map(str,path))}") return container_copy, None, False else: logger.error(f"[Util] 路径的最后一部分 '{key_or_index}' (string key) 期望父级是字典,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}") return data_container, None, False else: if isinstance(current_level, list) and isinstance(key_or_index, int) and 0 <= key_or_index < len(current_level): original_value = current_level.pop(key_or_index) logger.info(f"[Util] 从路径 '{'.'.join(map(str,path))}' 成功移除索引 '{key_or_index}' 的元素 (原值: '{original_value}')。") return container_copy, original_value, True elif isinstance(current_level, list): logger.warning(f"[Util] 路径的最后一部分索引 '{key_or_index}' 超出列表范围或类型不符。列表长度: {len(current_level)}. 路径: {'.'.join(map(str,path))}") return container_copy, None, False else: logger.error(f"[Util] 路径的最后一部分 '{key_or_index}' 期望父级是列表,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}") return data_container, None, False else: next_key_or_index = path[i+1] if isinstance(key_or_index, str): if not isinstance(current_level, dict): if current_level is container_copy and not container_copy : current_level = {} if i == 0: container_copy = current_level else: logger.error(f"[Util] 无法在非根级别从非字典创建路径。") return data_container, None, False else: logger.error(f"[Util] 路径期望字典,但在 '{key_or_index}' 处找到 {type(current_level)}。") return data_container, None, False if isinstance(next_key_or_index, int): if key_or_index not in current_level or not isinstance(current_level.get(key_or_index), list): current_level[key_or_index] = [] current_level = current_level[key_or_index] else: if key_or_index not in current_level or not isinstance(current_level.get(key_or_index), dict): current_level[key_or_index] = {} current_level = current_level[key_or_index] elif isinstance(key_or_index, int): if not isinstance(current_level, list): logger.error(f"[Util] 路径期望列表以应用索引 '{key_or_index}',但找到 {type(current_level)}。") return data_container, None, False while len(current_level) <= key_or_index: if isinstance(next_key_or_index, str): current_level.append({}) else: current_level.append([]) if isinstance(next_key_or_index, str): if not isinstance(current_level[key_or_index], dict): current_level[key_or_index] = {} elif isinstance(next_key_or_index, int): if not isinstance(current_level[key_or_index], list): current_level[key_or_index] = [] current_level = current_level[key_or_index] else: logger.error(f"[Util] 路径部分 '{key_or_index}' 类型未知 ({type(key_or_index)}).") return data_container, None, False except Exception as e: logger.error(f"[Util] 在准备移除字段路径 '{'.'.join(map(str,path))}' 时发生错误: {e}", exc_info=True) return data_container, None, False logger.error(f"[Util] util_remove_value_at_path 未能在循环内按预期返回。路径: {'.'.join(map(str,path))}") return data_container, None, False def util_set_value_at_path( data_container: Any, path: List[Union[str, int]], new_value: Any, # logger_param: Optional[logging.Logger] = None ) -> Tuple[Any, bool]: """ (框架辅助方法) 在嵌套的字典/列表中为指定路径设置新值。 如果路径中的某些部分不存在,会尝试创建它们 (字典会创建,列表会尝试填充到指定索引,但需谨慎)。 返回 (修改后的容器, 是否成功)。 """ # effective_logger = logger_param or logger if not path: logger.error("[Util] util_set_value_at_path: 路径不能为空。") # 如果路径为空,是否应该用 new_value 替换整个 data_container? # 当前行为:返回原始容器和失败状态,因为路径通常指向容器内部。 # 如果要支持替换整个容器,需要明确此行为。 if data_container is None and new_value is not None: # 特殊情况:如果原始容器是None,且路径为空,则新值成为容器 logger.info("[Util] util_set_value_at_path: 路径为空,原始容器为None,新值将作为新容器返回。") return new_value, True elif data_container is not None and new_value is None and not path: # 路径为空,新值为None,则清空容器 logger.info("[Util] util_set_value_at_path: 路径为空,新值为None,容器将被清空 (返回None)。") return None, True # 对于路径为空且两者都不是None的情况,目前返回失败,因为通常期望有路径。 # 或者可以考虑直接返回 new_value,意味着整个对象被替换。 # logger.info(f"[Util] util_set_value_at_path: Path is empty. Replacing entire container.") # return new_value, True # 备选行为:替换整个对象 return data_container, False # 深拷贝以避免修改原始输入,除非原始输入是None container_copy = copy.deepcopy(data_container) if data_container is not None else None current_level = container_copy try: for i, key_or_index in enumerate(path): is_last_element = (i == len(path) - 1) if is_last_element: if isinstance(key_or_index, str): if not isinstance(current_level, dict): # 如果当前层级不是字典 (例如是None,或是被意外替换为其他类型),无法设置键值对 logger.error(f"[Util] util_set_value_at_path: 路径的最后一部分 '{key_or_index}' (string key) 期望父级是字典,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}") # 尝试强制转换为字典?这可能不是预期行为。 # 如果 current_level 是 None 且它是根 (container_copy is None),则初始化 container_copy if current_level is None and i == 0: # 路径只有一级,且容器本身是None container_copy = {} current_level = container_copy else: # 更深层级的None或类型错误 return data_container, False current_level[key_or_index] = new_value logger.info(f"[Util] 在路径 {'.'.join(map(str,path))} (键 '{key_or_index}') 处设置值为 '{new_value}'") return container_copy, True elif isinstance(key_or_index, int): # key_or_index is an integer (list index) if not isinstance(current_level, list): logger.error(f"[Util] util_set_value_at_path: 路径的最后一部分索引 '{key_or_index}' 期望父级是列表,但找到 {type(current_level)}。路径: {'.'.join(map(str,path))}") if current_level is None and i == 0: # 路径只有一级,且容器本身是None container_copy = [None] * (key_or_index + 1) # 创建足够长度的列表 current_level = container_copy else: return data_container, False elif isinstance(key_or_index, int): # 确保列表足够长以容纳索引 while len(current_level) <= key_or_index: current_level.append(None) # 用 None 填充直到达到所需长度 current_level[key_or_index] = new_value logger.info(f"[Util] 在路径 {'.'.join(map(str,path))} (索引 '{key_or_index}') 处设置值为 '{new_value}'") return container_copy, True else: logger.error(f"[Util] util_set_value_at_path: 路径的最后一部分 '{key_or_index}' 类型未知。路径: {'.'.join(map(str,path))}") return data_container, False else: # Not the last element, traverse deeper next_key_or_index_is_int = isinstance(path[i+1], int) if isinstance(key_or_index, str): # Current path part is a dictionary key if not isinstance(current_level, dict): # 如果在根级别且容器是None,则初始化为字典 if current_level is None and i == 0: container_copy = {} current_level = container_copy else: logger.error(f"[Util] util_set_value_at_path: 路径期望字典,但在 '{key_or_index}' 处找到 {type(current_level)}。路径: {'.'.join(map(str,path[:i+1]))}") return data_container, False if key_or_index not in current_level or current_level[key_or_index] is None or \ (next_key_or_index_is_int and not isinstance(current_level[key_or_index], list)) or \ (not next_key_or_index_is_int and not isinstance(current_level[key_or_index], dict)): # 如果键不存在,或值为None,或类型与下一路径部分不匹配,则创建/重置 logger.debug(f"[Util] util_set_value_at_path: 在路径 '{key_or_index}' 处创建/重置结构。下一个是索引: {next_key_or_index_is_int}") current_level[key_or_index] = [] if next_key_or_index_is_int else {} current_level = current_level[key_or_index] elif isinstance(key_or_index, int): # Current path part is a list index if not isinstance(current_level, list): if current_level is None and i == 0: container_copy = [] current_level = container_copy else: logger.error(f"[Util] util_set_value_at_path: 路径期望列表以应用索引 '{key_or_index}',但找到 {type(current_level)}。路径: {'.'.join(map(str,path[:i+1]))}") return data_container, False elif isinstance(key_or_index, int): # 确保列表足够长以容纳索引,并确保该索引处的元素是正确的类型 (list/dict) while len(current_level) <= key_or_index: current_level.append(None) # 用 None 填充 if current_level[key_or_index] is None or \ (next_key_or_index_is_int and not isinstance(current_level[key_or_index], list)) or \ (not next_key_or_index_is_int and not isinstance(current_level[key_or_index], dict)): logger.debug(f"[Util] util_set_value_at_path: 在列表索引 '{key_or_index}' 处创建/重置结构。下一个是索引: {next_key_or_index_is_int}") current_level[key_or_index] = [] if next_key_or_index_is_int else {} current_level = current_level[key_or_index] else: logger.error(f"[Util] util_set_value_at_path: 路径部分 '{key_or_index}' 类型未知 ({type(key_or_index)})。路径: {'.'.join(map(str,path[:i+1]))}") return data_container, False except Exception as e: logger.error(f"[Util] 在准备设置字段路径 {'.'.join(map(str,path))} 的值时发生错误: {e}", exc_info=True) return data_container, False # Should not be reached if logic is correct, path must have at least one element by initial check. logger.error(f"[Util] util_set_value_at_path 未能在循环内按预期返回。路径: {'.'.join(map(str,path))}") return data_container, False def generate_mismatched_value( original_type: Optional[str], original_value: Any, field_schema: Optional[Dict[str, Any]], logger_param: Optional[logging.Logger] = None ) -> Any: """ (框架辅助方法) 根据原始数据类型、原始值和字段 schema 生成一个类型不匹配的值。 主要用于类型不匹配的测试用例。 Args: original_type: 字段的原始 OpenAPI 类型 (e.g., "string", "integer"). original_value: 字段的原始值 (当前未直接用于生成逻辑,但可供未来扩展). field_schema: 字段的 schema 定义,用于检查如 "enum" 之类的约束。 logger_param: 可选的 logger 实例。 Returns: 一个与 original_type 不匹配的值。 """ effective_logger = logger_param or logger # 优先考虑 schema 中的 enum,选择一个不在 enum 中且类型不匹配的值 if field_schema and "enum" in field_schema and isinstance(field_schema["enum"], list): enum_values = field_schema["enum"] # Handle case-insensitive type comparisons if original_type and original_type.lower() == "string": if 123 not in enum_values: return 123 if False not in enum_values: return False # 如果数字和布尔都在枚举中,尝试一个与已知枚举值不同的字符串 # (虽然这仍然是字符串类型,但目的是为了触发非枚举值的验证) # 或者,如果目的是严格类型不匹配,这里应该返回非字符串。 # 当前逻辑倾向于返回一个肯定非字符串的值。 elif original_type and original_type.lower() == "integer": if "not-an-integer" not in enum_values: return "not-an-integer" if 3.14 not in enum_values: return 3.14 elif original_type and original_type.lower() == "number": # Includes float/double if "not-a-number" not in enum_values: return "not-a-number" elif original_type and original_type.lower() == "boolean": if "not-a-boolean" not in enum_values: return "not-a-boolean" if 1 not in enum_values: return 1 # 如果枚举覆盖了所有简单备选,则回退到下面的通用逻辑 # 通用类型不匹配逻辑 (当 enum 不存在或 enum 检查未返回时) # Handle case-insensitive type comparisons if original_type and original_type.lower() == "string": return 12345 # Number instead of string elif original_type and original_type.lower() == "integer": return "not-an-integer" # String instead of integer elif original_type and original_type.lower() == "number": # Includes float/double return "not-a-number" # String instead of number elif original_type and original_type.lower() == "boolean": return "not-a-boolean" # String instead of boolean elif original_type and original_type.lower() == "array": return {"value": "not-an-array"} # Object instead of array elif original_type and original_type.lower() == "object": return ["not", "an", "object"] # Array instead of object effective_logger.warning(f"generate_mismatched_value: 原始类型 '{original_type}' 未知或无法生成不匹配值。将返回固定字符串 'mismatch_test_default'。") return "mismatch_test_default" # Fallback for unknown types def build_object_schema_for_params(params_spec_list: List[Dict[str, Any]], model_name_base: str, logger_param: Optional[logging.Logger] = None) -> Tuple[Optional[Dict[str, Any]], str]: """ 从参数规范列表构建一个对象的JSON schema,主要用于请求体、查询参数或头部的聚合。 Args: params_spec_list: 参数规范的列表 (例如,OpenAPI参数对象列表)。 model_name_base: 用于生成动态模型名称的基础字符串。 logger_param: 可选的 logger 实例。 Returns: 一个元组,包含 (构建的JSON object schema 或 None, 模型名称字符串)。 """ effective_logger = logger_param or logger # Use passed logger or module logger if not params_spec_list: effective_logger.debug(f"参数列表为空,无需为 '{model_name_base}' 构建对象 schema。") return None, f"{model_name_base}EmptyParams" properties = {} required_fields = [] for param_spec in params_spec_list: param_name = param_spec.get("name") if not param_name: effective_logger.warning(f"参数规范缺少 'name' 字段,已跳过: {param_spec}") continue # 从参数规范中提取 schema (OpenAPI 3.x) param_schema = param_spec.get("schema") if not param_schema: # 尝试兼容 OpenAPI 2.0 (Swagger) 的情况,其中类型信息直接在参数级别 # 例如: type, format, items, default, enum 等 # https://swagger.io/specification/v2/#parameterObject # 注意: 这种兼容性可能不完整,因为很多属性需要映射 compatible_schema = {} if "type" in param_spec: compatible_schema["type"] = param_spec["type"] if "format" in param_spec: compatible_schema["format"] = param_spec["format"] if "items" in param_spec: # for array types compatible_schema["items"] = param_spec["items"] if "default" in param_spec: compatible_schema["default"] = param_spec["default"] if "enum" in param_spec: compatible_schema["enum"] = param_spec["enum"] # 其他如 description, example 等也可以考虑加入 if compatible_schema: # 如果至少收集到了一些类型信息 param_schema = compatible_schema effective_logger.debug(f"参数 '{param_name}' 没有 'schema' 字段,但从顶级字段构建了兼容 schema: {param_schema}") else: effective_logger.warning(f"参数 '{param_name}' 缺少 'schema' 字段且无法构建兼容schema,已跳过。规范: {param_spec}") continue properties[param_name] = param_schema if param_spec.get("required", False): required_fields.append(param_name) if not properties: effective_logger.debug(f"未能从参数列表为 '{model_name_base}' 提取任何属性。") return None, f"{model_name_base}NoProps" final_schema: Dict[str, Any] = { "type": "object", "properties": properties } if required_fields: final_schema["required"] = required_fields # 生成一个稍微独特的名字,以防多个操作有相同的 param_type # 例如 OperationIdQueryRequest, OperationIdHeaderRequest model_name = f"{model_name_base.replace(' ', '')}Params" effective_logger.debug(f"为 '{model_name_base}' 构建的对象 schema: {final_schema}, 模型名: {model_name}") return final_schema, model_name def find_first_simple_type_field_recursive( current_schema: Dict[str, Any], current_path: Optional[List[Union[str, int]]] = None, # full_api_spec_for_refs: Optional[Dict[str, Any]] = None, # Schema is expected to be pre-resolved logger_param: Optional[logging.Logger] = None ) -> Optional[Tuple[List[Union[str, int]], str, Dict[str, Any]]]: """ 递归地在给定的 schema 中查找第一个简单类型的字段 (string, integer, number, boolean)。 这包括查找嵌套在对象或数组中的简单类型字段。 Args: current_schema: 当前正在搜索的 schema 部分 (应为字典)。 current_path: 到达当前 schema 的路径列表 (用于构建完整路径)。 logger_param: 可选的 logger 实例。 Returns: 一个元组 (field_path, field_type, field_schema) 如果找到,否则为 None。 field_path 是一个列表,表示从根 schema 到找到的字段的路径。 field_type 是字段的原始类型字符串 (e.g., "string")。 field_schema 是该字段自身的 schema 定义。 """ effective_logger = logger_param or logger # Use module logger if specific one not provided path_so_far = current_path if current_path is not None else [] if not isinstance(current_schema, dict): effective_logger.debug(f"Schema at path {'.'.join(map(str, path_so_far))} is not a dict, cannot search further.") return None schema_type = current_schema.get("type") # effective_logger.debug(f"Searching in path: {'.'.join(map(str, path_so_far))}, Schema Type: '{schema_type}'") # Handle both 'object' and 'Object' (case-insensitive) if schema_type and schema_type.lower() == "object": properties = current_schema.get("properties", {}) for name, prop_schema in properties.items(): if not isinstance(prop_schema, dict): effective_logger.debug(f"Property '{name}' at path {'.'.join(map(str, path_so_far + [name]))} has non-dict schema. Skipping.") continue prop_type = prop_schema.get("type") # Handle case-insensitive type checking for simple types if prop_type and prop_type.lower() in ["string", "integer", "number", "boolean"]: field_path = path_so_far + [name] effective_logger.info(f"Found simple type field: Path={'.'.join(map(str, field_path))}, Type={prop_type}") return field_path, prop_type, prop_schema elif prop_type and prop_type.lower() == "object": found_in_nested_object = find_first_simple_type_field_recursive( prop_schema, path_so_far + [name], logger_param=effective_logger ) if found_in_nested_object: return found_in_nested_object elif prop_type and prop_type.lower() == "array": items_schema = prop_schema.get("items") if isinstance(items_schema, dict): # Look for simple type or object within array items item_type = items_schema.get("type") # Handle case-insensitive type checking for simple types if item_type and item_type.lower() in ["string", "integer", "number", "boolean"]: field_path = path_so_far + [name, 0] # Target first item of the array effective_logger.info(f"Found simple type field in array item: Path={'.'.join(map(str, field_path))}, Type={item_type}") return field_path, item_type, items_schema elif item_type and item_type.lower() == "object": # Path to the first item of the array, then recurse into that item's object schema found_in_array_item_object = find_first_simple_type_field_recursive( items_schema, path_so_far + [name, 0], logger_param=effective_logger ) if found_in_array_item_object: return found_in_array_item_object # Handle both 'array' and 'Array' (case-insensitive) elif schema_type and schema_type.lower() == "array": # If the current_schema itself is an array (e.g., root schema is an array) items_schema = current_schema.get("items") if isinstance(items_schema, dict): item_type = items_schema.get("type") # Handle case-insensitive type checking for simple types if item_type and item_type.lower() in ["string", "integer", "number", "boolean"]: field_path = path_so_far + [0] # Target first item of this root/current array effective_logger.info(f"Found simple type field in root/current array item: Path={'.'.join(map(str, field_path))}, Type={item_type}") return field_path, item_type, items_schema elif item_type and item_type.lower() == "object": # Path to the first item of this root/current array, then recurse found_in_root_array_item_object = find_first_simple_type_field_recursive( items_schema, path_so_far + [0], logger_param=effective_logger ) if found_in_root_array_item_object: return found_in_root_array_item_object # effective_logger.debug(f"No simple type field found at path {'.'.join(map(str, path_so_far))}") return None def util_extract_range_from_description(description: str, logger_param: Optional[logging.Logger] = None) -> Tuple[Optional[float], Optional[float]]: """从描述文本中提取数值范围信息。""" effective_logger = logger_param or logger if not description: return None, None min_value, max_value = None, None MIN_VALUE_PATTERNS = [ r'(最小|至少|从|大于等于|不小于|>=|>|minimum|min).*?(\d+(\.\d+)?)', r'(\d+(\.\d+)?).*?(起|开始|以上|最小|minimum|min)', ] MAX_VALUE_PATTERNS = [ r'(最大|至多|不超过|不大于|小于等于|<=|<|maximum|max).*?(\d+(\.\d+)?)', r'(\d+(\.\d+)?).*?(以下|以内|之内|最大|maximum|max)', ] for pattern in MIN_VALUE_PATTERNS: matches = re.search(pattern, description, re.IGNORECASE) if matches: try: min_value = float(matches.group(2)) effective_logger.debug(f"从 '{description}' 中提取到最小值: {min_value}") break except (ValueError, IndexError): continue for pattern in MAX_VALUE_PATTERNS: matches = re.search(pattern, description, re.IGNORECASE) if matches: try: max_value = float(matches.group(2)) effective_logger.debug(f"从 '{description}' 中提取到最大值: {max_value}") break except (ValueError, IndexError): continue return min_value, max_value def util_find_ranged_field_recursive( schema: Dict[str, Any], path: List[Union[str, int]], logger_param: Optional[logging.Logger] = None ) -> Optional[Tuple[List[Union[str, int]], Dict[str, Any], Optional[str], Optional[float], Optional[float], str]]: """ 递归地在schema中寻找第一个具有范围限制的字段。 返回 (路径, schema, 类型, 最小值, 最大值, 描述)。 """ effective_logger = logger_param or logger if not isinstance(schema, dict): return None schema_type = schema.get('type') description = schema.get('description', '') # 检查描述和schema关键字 min_val, max_val = util_extract_range_from_description(description, effective_logger) schema_min = schema.get('minimum') schema_max = schema.get('maximum') if schema_min is not None: min_val = schema_min if schema_max is not None: max_val = schema_max # 如果找到范围限制,则认为这是一个目标字段 if min_val is not None or max_val is not None: effective_logger.debug(f"在路径 {'.'.join(map(str, path))} 找到范围受限字段。") return path, schema, schema_type, min_val, max_val, description # 递归 # Handle both 'object' and 'Object' (case-insensitive) if schema_type and schema_type.lower() == 'object' and 'properties' in schema: for prop_name, prop_schema in schema['properties'].items(): result = util_find_ranged_field_recursive(prop_schema, path + [prop_name], effective_logger) if result: return result # Handle both 'array' and 'Array' (case-insensitive) if schema_type and schema_type.lower() == 'array' and 'items' in schema and isinstance(schema['items'], dict): result = util_find_ranged_field_recursive(schema['items'], path + [0], effective_logger) if result: return result return None def util_extract_enum_from_description(description: str, logger: logging.Logger) -> Optional[List[str]]: """ 从描述字符串中提取枚举值。 例如: "排序字段,key=字段名,value=排序方式(可选值为:ASC、DESC)" -> ["ASC", "DESC"] """ if not isinstance(description, str): return None # 正则表达式匹配 "可选值为" 后面的内容,支持中文冒号和英文冒号,以及括号 # 匹配直到行尾或遇到另一个右括号 patterns = [ r"可选值为[:|:]([\w\s,、,]+)", r"可选值 *[::] *([\w\s,、,]+)", r"\(可选值为[:|:](.*?)\)", r"(可选值为[:|:](.*?)\)" ] found_values = None for pattern in patterns: match = re.search(pattern, description) if match: found_values = match.group(1).strip() break if not found_values: return None # 按逗号、顿号、空格分割,并去除每个值前后的空白 enums = [item.strip() for item in re.split(r'[,\s、,]+', found_values) if item.strip()] if enums: logger.debug(f"Extracted enums: {enums} from description: '{description[:50]}...'") return enums return None def util_find_enum_field_recursive(schema: Dict[str, Any], path: List[Union[str, int]], logger: logging.Logger) -> Optional[Tuple[List[Union[str, int]], List[str], str]]: """ 递归查找第一个包含可解析枚举值的字段。 """ if not isinstance(schema, dict): return None # 优先检查当前层级的 description description = schema.get("description", "") enums = util_extract_enum_from_description(description, logger) if enums: return path, enums, description # 如果当前层级是对象,则递归其属性 if schema.get("type") == "object" and "properties" in schema: for prop_name, prop_schema in schema.get("properties", {}).items(): new_path = path + [prop_name] result = util_find_enum_field_recursive(prop_schema, new_path, logger) if result: return result # 如果当前层级是数组,则递归其 items if schema.get("type") == "array" and "items" in schema: # 假设我们只检查数组项的结构,路径中用 '[]' 或 index 0 表示 new_path = path + [0] result = util_find_enum_field_recursive(schema["items"], new_path, logger) if result: return result return None def normalize_yapi_responses(responses: dict, method: str) -> dict: """ 将YAPI格式的响应规范标准化为OpenAPI格式。 如果responses为空或不包含任何成功状态码(2XX),则根据请求方法添加默认状态码。 Args: responses: YAPI或已解析的响应规范字典 method: HTTP请求方法 (GET, POST, PUT, DELETE等) Returns: 标准化后的响应规范字典 """ if not responses: responses = {} # 检查是否已经包含成功状态码 has_success_code = False for code in responses.keys(): if code.startswith('2') or code == 'default': has_success_code = True break # 如果没有成功状态码,根据请求方法添加默认状态码 if not has_success_code: if method.upper() == 'POST': # POST请求默认使用201 Created default_code = '201' default_desc = 'Created' elif method.upper() == 'DELETE': # DELETE请求默认使用204 No Content default_code = '204' default_desc = 'No Content' else: # GET, PUT, PATCH等默认使用200 OK default_code = '200' default_desc = 'OK' # 从responses中查找default响应,如果存在则复制其内容 default_response = responses.get('default', {}) if not default_response: default_response = {'description': default_desc} elif 'description' not in default_response: default_response['description'] = default_desc responses[default_code] = default_response return responses