import json import ast import re import logging from typing import List, Dict, Any, Optional, Union, Tuple import requests from urllib.parse import urljoin import copy import urllib3 logger = logging.getLogger(__name__) class BaseEndpoint: """所有端点对象的基类,可以包含一些通用属性或方法。""" def __init__(self, method: str, path: str): self.method = method self.path = path def to_dict(self) -> Dict[str, Any]: # 基类可以提供一个默认的 to_dict 实现或要求子类实现 raise NotImplementedError("Subclasses must implement to_dict") class YAPIEndpoint(BaseEndpoint): # Inherit from BaseEndpoint def __init__(self, data: Dict[str, Any], category_name: Optional[str] = None, category_id: Optional[int] = None): super().__init__(method=data.get("method", "GET").upper(), path=data.get("path", "")) self._raw_data = data self.title: str = data.get("title", "") self.desc: Optional[str] = data.get("desc") self._id: int = data.get("_id") self.project_id: int = data.get("project_id") self.catid: int = data.get("catid") self.req_params: List[Dict[str, Any]] = data.get("req_params", []) self.req_query: List[Dict[str, Any]] = data.get("req_query", []) self.req_headers: List[Dict[str, Any]] = data.get("req_headers", []) self.req_body_form: List[Dict[str, Any]] = data.get("req_body_form", []) self.req_body_type: Optional[str] = data.get("req_body_type") self.req_body_is_json_schema: bool = data.get("req_body_is_json_schema", False) self.req_body_other: Optional[str] = data.get("req_body_other") self.res_body_type: Optional[str] = data.get("res_body_type") self.res_body_is_json_schema: bool = data.get("res_body_is_json_schema", False) self.res_body: Optional[str] = data.get("res_body") self.status: str = data.get("status", "undone") self.api_opened: bool = data.get("api_opened", False) self.uid: int = data.get("uid") self.category_name = category_name self.category_id = category_id if category_id is not None else self.catid self._parsed_req_body_schema: Optional[Dict[str, Any]] = None if self.req_body_type == "json" and self.req_body_other and self.req_body_is_json_schema: try: self._parsed_req_body_schema = json.loads(self.req_body_other) except json.JSONDecodeError as e: logger.error(f"YAPIEndpoint (ID: {self._id}, Title: {self.title}): Failed to parse req_body_other as JSON during init: {e}. Content: {self.req_body_other[:200]}") self._parsed_res_body_schema: Optional[Dict[str, Any]] = None if self.res_body_type == "json" and self.res_body and self.res_body_is_json_schema: try: self._parsed_res_body_schema = json.loads(self.res_body) except json.JSONDecodeError as e: logger.error(f"YAPIEndpoint (ID: {self._id}, Title: {self.title}): Failed to parse res_body as JSON during init: {e}. Content: {self.res_body[:200]}") def to_dict(self) -> Dict[str, Any]: endpoint_dict = { "method": self.method, "path": self.path, "title": self.title, "summary": self.title, "description": self.desc or "", "operationId": f"{self.method.lower()}_{self.path.replace('/', '_').replace('{', '').replace('}', '')}_{self._id}", "tags": [self.category_name or str(self.catid)], "parameters": [], "requestBody": None, "responses": {}, "_source_format": "yapi", "_yapi_id": self._id, "_yapi_raw_data": self._raw_data # Keep raw data for debugging or deeper inspection if needed } # Path parameters from req_params for p_spec in self.req_params: param_name = p_spec.get("name") if not param_name: continue endpoint_dict["parameters"].append({ "name": param_name, "in": "path", "required": True, # Path parameters are always required "description": p_spec.get("desc", ""), "schema": {"type": "string", "example": p_spec.get("example", f"example_{param_name}")} }) # Query parameters from req_query for q_spec in self.req_query: param_name = q_spec.get("name") if not param_name: continue is_required = q_spec.get("required") == "1" # YAPI uses "1" for true param_schema = {"type": "string"} # Default to string, YAPI doesn't specify types well here if "example" in q_spec: param_schema["example"] = q_spec["example"] # Add other fields from YAPI query spec if needed (e.g., desc) endpoint_dict["parameters"].append({ "name": param_name, "in": "query", "required": is_required, "description": q_spec.get("desc", ""), "schema": param_schema }) # Header parameters from req_headers for h_spec in self.req_headers: param_name = h_spec.get("name") if not param_name or param_name.lower() == 'content-type': continue # Content-Type is handled by requestBody is_required = h_spec.get("required") == "1" default_value = h_spec.get("value") # YAPI uses 'value' for default/example header value param_schema = {"type": "string"} if default_value: if is_required: # If required, it's more like an example of what's expected param_schema["example"] = default_value else: # If not required, it's a default value param_schema["default"] = default_value endpoint_dict["parameters"].append({ "name": param_name, "in": "header", "required": is_required, "description": h_spec.get("desc", ""), "schema": param_schema }) # Request body if self.req_body_type == "json" and self._parsed_req_body_schema: endpoint_dict["requestBody"] = { "content": { "application/json": { "schema": self._parsed_req_body_schema } } } elif self.req_body_type == "form" and self.req_body_form: properties = {} required_form_params = [] for form_param in self.req_body_form: name = form_param.get("name") if not name: continue properties[name] = { "type": "string", # YAPI form params are typically strings, file uploads are different "description": form_param.get("desc","") } if form_param.get("example"): properties[name]["example"] = form_param.get("example") if form_param.get("required") == "1": required_form_params.append(name) endpoint_dict["requestBody"] = { "content": { "application/x-www-form-urlencoded": { "schema": { "type": "object", "properties": properties, "required": required_form_params if required_form_params else None # OpenAPI: omit if empty } } # YAPI also supports req_body_type = 'file', which would map to multipart/form-data # This example focuses on json and basic form. } } # Add other req_body_types if necessary (e.g., raw, file) # Responses # YAPI has a simpler response structure. We'll map its res_body to a default success response (e.g., 200 or 201). default_success_status = "200" if self.method == "POST": default_success_status = "201" # Common practice for POST success if self.res_body_type == "json" and self._parsed_res_body_schema: endpoint_dict["responses"][default_success_status] = { "description": "Successful Operation (from YAPI res_body)", "content": { "application/json": { "schema": self._parsed_res_body_schema } } } elif self.res_body_type == "json" and not self._parsed_res_body_schema and self.res_body: # Schema parsing failed but text exists endpoint_dict["responses"][default_success_status] = { "description": "Successful Operation (Schema parsing error, raw text might be available)", "content": {"application/json": {"schema": {"type": "object", "description": "Schema parsing failed for YAPI res_body."}}} # Placeholder } else: # No JSON schema, or other res_body_type endpoint_dict["responses"][default_success_status] = { "description": "Successful Operation (No specific schema provided in YAPI for this response)" } # Ensure there's always a default response if nothing specific was added if not endpoint_dict["responses"]: endpoint_dict["responses"]["default"] = {"description": "Default response from YAPI definition"} return endpoint_dict def __repr__(self): return f"" class SwaggerEndpoint(BaseEndpoint): # Inherit from BaseEndpoint def __init__(self, path: str, method: str, data: Dict[str, Any], global_spec: Dict[str, Any]): super().__init__(method=method.upper(), path=path) self._raw_data = data self._global_spec = global_spec # Store for $ref resolution self.summary: Optional[str] = data.get("summary") self.description: Optional[str] = data.get("description") self.operation_id: Optional[str] = data.get("operationId") self.tags: List[str] = data.get("tags", []) # Parameters, requestBody, responses are processed by to_dict def _resolve_ref(self, ref_path: str) -> Optional[Dict[str, Any]]: """Resolves a $ref path within the global OpenAPI/Swagger spec.""" if not ref_path.startswith("#/"): logger.warning(f"Unsupported $ref path: {ref_path}. Only local refs '#/...' are currently supported.") return None parts = ref_path[2:].split('/') # Remove '#/' and split current_level = self._global_spec try: for part in parts: # Decode URI component encoding if present (e.g. "~0" for "~", "~1" for "/") part = part.replace("~1", "/").replace("~0", "~") current_level = current_level[part] # It's crucial to return a copy if the resolved ref will be modified, # or ensure modifications happen on copies later. # For now, returning as is, assuming downstream processing is careful or uses copies. if isinstance(current_level, dict): return current_level # Potentially json.loads(json.dumps(current_level)) for a deep copy else: # Resolved to a non-dict, which might be valid for some simple refs but unusual for schemas logger.warning(f"$ref '{ref_path}' resolved to a non-dictionary type: {type(current_level)}. Value: {str(current_level)[:100]}") return {"type": "string", "description": f"Resolved $ref '{ref_path}' to non-dict: {str(current_level)[:100]}"} # Placeholder except (KeyError, TypeError, AttributeError) as e: logger.error(f"Failed to resolve $ref '{ref_path}': {e}", exc_info=True) return None def _process_schema_or_ref(self, schema_like: Any) -> Optional[Dict[str, Any]]: """ Processes a schema part, resolving $refs and recursively processing nested structures. Returns a new dictionary with resolved refs, or None if resolution fails badly. """ if not isinstance(schema_like, dict): if schema_like is None: return None logger.warning(f"Expected a dictionary for schema processing, got {type(schema_like)}. Value: {str(schema_like)[:100]}") return {"type": "string", "description": f"Schema was not a dict: {str(schema_like)[:100]}"} # Placeholder for non-dict schema # If it's a $ref, resolve it. if "$ref" in schema_like: return self._resolve_ref(schema_like["$ref"]) # This will be the new base schema_like # Create a copy to avoid modifying the original spec during processing processed_schema = schema_like.copy() # Recursively process 'properties' for object schemas if "properties" in processed_schema and isinstance(processed_schema["properties"], dict): new_properties = {} for prop_name, prop_schema in processed_schema["properties"].items(): resolved_prop = self._process_schema_or_ref(prop_schema) if resolved_prop is not None: # Only add if resolution was successful new_properties[prop_name] = resolved_prop # else: logger.warning(f"Failed to process property '{prop_name}' in {self.operation_id or self.path}") processed_schema["properties"] = new_properties # Recursively process 'items' for array schemas if "items" in processed_schema and isinstance(processed_schema["items"], dict): # 'items' should be a schema object resolved_items = self._process_schema_or_ref(processed_schema["items"]) if resolved_items is not None: processed_schema["items"] = resolved_items # else: logger.warning(f"Failed to process 'items' schema in {self.operation_id or self.path}") # Handle allOf, anyOf, oneOf by trying to merge or process them (simplistic merge for allOf) # This is a complex area of JSON Schema. This is a very basic attempt. if "allOf" in processed_schema and isinstance(processed_schema["allOf"], list): merged_all_of_props = {} merged_all_of_required = set() temp_schema_for_all_of = {"type": processed_schema.get("type", "object"), "properties": {}, "required": []} for sub_schema_data in processed_schema["allOf"]: resolved_sub_schema = self._process_schema_or_ref(sub_schema_data) if resolved_sub_schema and isinstance(resolved_sub_schema, dict): if "properties" in resolved_sub_schema: temp_schema_for_all_of["properties"].update(resolved_sub_schema["properties"]) if "required" in resolved_sub_schema and isinstance(resolved_sub_schema["required"], list): merged_all_of_required.update(resolved_sub_schema["required"]) # Copy other top-level keywords from the resolved_sub_schema if needed, e.g. description for key, value in resolved_sub_schema.items(): if key not in ["properties", "required", "type", "$ref", "allOf", "anyOf", "oneOf"]: if key not in temp_schema_for_all_of or temp_schema_for_all_of[key] is None: # prioritize existing temp_schema_for_all_of[key] = value if temp_schema_for_all_of["properties"]: processed_schema["properties"] = {**processed_schema.get("properties",{}), **temp_schema_for_all_of["properties"]} if merged_all_of_required: current_required = set(processed_schema.get("required", [])) current_required.update(merged_all_of_required) processed_schema["required"] = sorted(list(current_required)) del processed_schema["allOf"] # Remove allOf after processing # Copy other merged attributes back to processed_schema for key, value in temp_schema_for_all_of.items(): if key not in ["properties", "required", "type", "$ref", "allOf", "anyOf", "oneOf"]: if key not in processed_schema or processed_schema[key] is None: processed_schema[key] = value # anyOf, oneOf are harder as they represent choices. For now, we might just list them or pick first. # For simplicity in to_dict, we might not fully expand them but ensure refs inside are resolved. for keyword in ["anyOf", "oneOf"]: if keyword in processed_schema and isinstance(processed_schema[keyword], list): processed_sub_list = [] for sub_item in processed_schema[keyword]: resolved_sub = self._process_schema_or_ref(sub_item) if resolved_sub: processed_sub_list.append(resolved_sub) if processed_sub_list: # only update if some were resolved processed_schema[keyword] = processed_sub_list return processed_schema def to_dict(self) -> Dict[str, Any]: endpoint_data = { "method": self.method, "path": self.path, "summary": self.summary or "", "title": self.summary or self.operation_id or "", # Fallback for title "description": self.description or "", "operationId": self.operation_id or f"{self.method.lower()}_{self.path.replace('/', '_').replace('{', '').replace('}', '')}", "tags": self.tags, "parameters": [], "requestBody": None, "responses": {}, "_source_format": "swagger/openapi", "_swagger_raw_data": self._raw_data, # Keep raw for debugging "_global_api_spec_for_resolution": self._global_spec # For test cases that might need to resolve further } # Process parameters if "parameters" in self._raw_data and isinstance(self._raw_data["parameters"], list): for param_data_raw in self._raw_data["parameters"]: # Each param_data_raw could itself be a $ref or contain a schema that is a $ref processed_param_container = self._process_schema_or_ref(param_data_raw) if processed_param_container and isinstance(processed_param_container, dict): # If the parameter itself was a $ref, processed_param_container is the resolved object. # If it contained a schema that was a $ref, that nested schema should be resolved. # We need to ensure 'schema' key exists if 'in' is path, query, header if "schema" in processed_param_container and isinstance(processed_param_container["schema"], dict): # schema was present, process it further (it might have been already by _process_schema_or_ref if it was a complex object) # but if _process_schema_or_ref was called on param_data_raw which wasn't a ref itself, # the internal 'schema' ref might not have been re-processed with full context. # However, the recursive nature of _process_schema_or_ref should handle nested $refs. pass # Assume it's processed by the main call to _process_schema_or_ref on param_data_raw elif "content" in processed_param_container: # Parameter described by Content Object (OpenAPI 3.x) pass # Content object schemas should have been resolved by _process_schema_or_ref endpoint_data["parameters"].append(processed_param_container) # Process requestBody if "requestBody" in self._raw_data and isinstance(self._raw_data["requestBody"], dict): processed_rb = self._process_schema_or_ref(self._raw_data["requestBody"]) if processed_rb: endpoint_data["requestBody"] = processed_rb # Process responses if "responses" in self._raw_data and isinstance(self._raw_data["responses"], dict): for status_code, resp_data_raw in self._raw_data["responses"].items(): processed_resp = self._process_schema_or_ref(resp_data_raw) if processed_resp: endpoint_data["responses"][status_code] = processed_resp elif resp_data_raw: # If processing failed but raw exists, keep raw (though this is less ideal) endpoint_data["responses"][status_code] = resp_data_raw logger.warning(f"Kept raw response data for {status_code} due to processing failure for {self.operation_id or self.path}") if not endpoint_data["responses"]: # Ensure default response if none processed endpoint_data["responses"]["default"] = {"description": "Default response from Swagger/OpenAPI definition"} return endpoint_data def __repr__(self): return f"" class DMSEndpoint(BaseEndpoint): """Represents an API endpoint discovered dynamically from the DMS service.""" def __init__(self, method: str, path: str, title: str, request_body: Optional[Dict[str, Any]], responses: Dict[str, Any], parameters: Optional[List[Dict[str, Any]]] = None, category_name: Optional[str] = None, raw_record: Optional[Dict[str, Any]] = None, test_mode: str = 'standalone', operation_id: Optional[str] = None, model_pk_name: Optional[str] = None, identity_id_list: Optional[List[str]] = None): super().__init__(method=method.upper(), path=path) self.title = title self.request_body = request_body self.responses = responses self.parameters = parameters if parameters is not None else [] self.category_name = category_name self._raw_record = raw_record self.test_mode = test_mode self.operation_id = operation_id or f"{self.method.lower()}_{self.category_name or 'dms'}_{title.replace(' ', '_')}" self.model_pk_name = model_pk_name self.identity_id_list = identity_id_list or [] def to_dict(self) -> Dict[str, Any]: """Converts the DMS endpoint data into a standardized OpenAPI-like dictionary.""" endpoint_dict = { "method": self.method, "path": self.path, "title": self.title, "summary": self.title, "description": self.title, "operationId": self.operation_id, "tags": [self.category_name] if self.category_name else [], "parameters": self.parameters, "requestBody": self.request_body, "responses": self.responses, "_source_format": "dms", "_dms_raw_record": self._raw_record, "_test_mode": self.test_mode, "_dms_model_pk_name": self.model_pk_name } return endpoint_dict def __repr__(self): return f"" Endpoint = Union[YAPIEndpoint, SwaggerEndpoint, DMSEndpoint] class ParsedAPISpec: """Base class for a parsed API specification from any source.""" def __init__(self, spec_type: str, endpoints: List[Union[YAPIEndpoint, SwaggerEndpoint, 'DMSEndpoint']], spec: Dict[str, Any]): self.spec_type = spec_type self.endpoints = endpoints self.spec = spec # Store the original full spec dictionary, useful for $ref resolution if not pre-resolved class ParsedYAPISpec(ParsedAPISpec): """解析后的YAPI规范""" def __init__(self, endpoints: List[YAPIEndpoint], categories: List[Dict[str, Any]], spec: Dict[str, Any]): super().__init__(spec_type="yapi", endpoints=endpoints, spec=spec) self.categories = categories class ParsedSwaggerSpec(ParsedAPISpec): """解析后的Swagger/OpenAPI规范""" def __init__(self, endpoints: List[SwaggerEndpoint], tags: List[Dict[str, Any]], spec: Dict[str, Any]): super().__init__(spec_type="swagger", endpoints=endpoints, spec=spec) self.tags = tags class ParsedDMSSpec(ParsedAPISpec): """Parsed specification from the dynamic DMS source.""" def __init__(self, endpoints: List[DMSEndpoint], spec: Dict[str, Any]): super().__init__(spec_type="dms", endpoints=endpoints, spec=spec) class InputParser: """负责解析输入(如YAPI JSON)并提取API端点信息""" def __init__(self): self.logger = logging.getLogger(__name__) def _decode_schema_string(self, raw_string: str, api_name: str) -> Optional[Any]: """尽量将字符串形式的模型解析为 Python 对象,并记录失败原因。""" if raw_string is None: return None candidate_str = str(raw_string).strip() if not candidate_str: self.logger.warning(f"Schema for '{api_name}' 是空字符串,无法解析。") return None parse_attempts = [] def log_attempt(stage: str, exc: Exception): parse_attempts.append((stage, exc)) # --- Attempt 1: 标准 JSON --- try: return json.loads(candidate_str) except json.JSONDecodeError as exc: log_attempt("json.loads", exc) # --- Attempt 2: 处理被整体转义或包裹的 JSON 字符串 --- try: unescaped_candidate = json.loads(candidate_str.replace('\\"', '"')) if isinstance(unescaped_candidate, (dict, list)): return unescaped_candidate if isinstance(unescaped_candidate, str): try: return json.loads(unescaped_candidate) except json.JSONDecodeError as exc_nested: log_attempt("json.loads -> nested", exc_nested) except Exception as exc: log_attempt("json.loads after unescape", exc) # --- Attempt 3: unicode 转义还原后再尝试 JSON --- try: unicode_decoded = candidate_str.encode('utf-8').decode('unicode_escape') if unicode_decoded != candidate_str: return json.loads(unicode_decoded) except Exception as exc: log_attempt("json.loads after unicode_escape", exc) # --- Attempt 4: ast.literal_eval (需要替换关键字) --- normalized_literal = candidate_str replacements_for_ast = [ (r"(? Optional[Dict[str, Any]]: """确保候选模型以字典形式返回。""" if isinstance(candidate, dict): return candidate if isinstance(candidate, str): decoded = self._decode_schema_string(candidate, api_name) if isinstance(decoded, dict): return decoded return None def parse_yapi_spec(self, file_path: str) -> Optional[ParsedYAPISpec]: self.logger.info(f"Parsing YAPI spec from: {file_path}") all_endpoints: List[YAPIEndpoint] = [] yapi_categories: List[Dict[str, Any]] = [] raw_spec_data_list: Optional[List[Dict[str, Any]]] = None # YAPI export is a list of categories try: with open(file_path, 'r', encoding='utf-8') as f: raw_spec_data_list = json.load(f) if not isinstance(raw_spec_data_list, list): self.logger.error(f"YAPI spec file {file_path} does not contain a JSON list as expected for categories.") return None for category_data in raw_spec_data_list: if not isinstance(category_data, dict): self.logger.warning(f"Skipping non-dictionary item in YAPI spec list: {str(category_data)[:100]}") continue cat_name = category_data.get("name") cat_id = category_data.get("_id", category_data.get("id")) # YAPI uses _id yapi_categories.append({"name": cat_name, "description": category_data.get("desc"), "id": cat_id}) for endpoint_data in category_data.get("list", []): if not isinstance(endpoint_data, dict): self.logger.warning(f"Skipping non-dictionary endpoint item in category '{cat_name}': {str(endpoint_data)[:100]}") continue try: yapi_endpoint = YAPIEndpoint(endpoint_data, category_name=cat_name, category_id=cat_id) all_endpoints.append(yapi_endpoint) except Exception as e_ep: self.logger.error(f"Error processing YAPI endpoint data (ID: {endpoint_data.get('_id', 'N/A')}, Title: {endpoint_data.get('title', 'N/A')}). Error: {e_ep}", exc_info=True) # The 'spec' for ParsedYAPISpec should be a dict representing the whole document. # Since YAPI export is a list of categories, we wrap it. yapi_full_spec_dict = {"yapi_categories": raw_spec_data_list} return ParsedYAPISpec(endpoints=all_endpoints, categories=yapi_categories, spec=yapi_full_spec_dict) except FileNotFoundError: self.logger.error(f"YAPI spec file not found: {file_path}") except json.JSONDecodeError as e: self.logger.error(f"Error decoding JSON from YAPI spec file {file_path}: {e}") except Exception as e: self.logger.error(f"An unexpected error occurred while parsing YAPI spec {file_path}: {e}", exc_info=True) return None def parse_swagger_spec(self, file_path: str) -> Optional[ParsedSwaggerSpec]: self.logger.info(f"Parsing Swagger/OpenAPI spec from: {file_path}") all_endpoints: List[SwaggerEndpoint] = [] swagger_tags: List[Dict[str, Any]] = [] raw_spec_data_dict: Optional[Dict[str, Any]] = None # Swagger/OpenAPI is a single root object try: with open(file_path, 'r', encoding='utf-8') as f: # TODO: Add YAML support if needed, e.g., using PyYAML raw_spec_data_dict = json.load(f) if not isinstance(raw_spec_data_dict, dict): self.logger.error(f"Swagger spec file {file_path} does not contain a JSON object as expected.") return None swagger_tags = raw_spec_data_dict.get("tags", []) paths = raw_spec_data_dict.get("paths", {}) for path, path_item_obj in paths.items(): if not isinstance(path_item_obj, dict): continue for method, operation_obj in path_item_obj.items(): # Common methods, can be extended if method.lower() not in ["get", "post", "put", "delete", "patch", "options", "head", "trace"]: continue # Skip non-standard HTTP methods or extensions like 'parameters' at path level if not isinstance(operation_obj, dict): continue try: # Pass the full raw_spec_data_dict for $ref resolution within SwaggerEndpoint swagger_endpoint = SwaggerEndpoint(path, method, operation_obj, global_spec=raw_spec_data_dict) all_endpoints.append(swagger_endpoint) except Exception as e_ep: self.logger.error(f"Error processing Swagger endpoint: {method.upper()} {path}. Error: {e_ep}", exc_info=True) return ParsedSwaggerSpec(endpoints=all_endpoints, tags=swagger_tags, spec=raw_spec_data_dict) except FileNotFoundError: self.logger.error(f"Swagger spec file not found: {file_path}") except json.JSONDecodeError as e: self.logger.error(f"Error decoding JSON from Swagger spec file {file_path}: {e}") except Exception as e: self.logger.error(f"An unexpected error occurred while parsing Swagger spec {file_path}: {e}", exc_info=True) return None def parse_dms_spec(self, domain_mapping_path: str, base_url: str, headers: Optional[Dict[str, str]] = None, ignore_ssl: bool = False, page_size: int = 1000, page_no_start: int = 1, fetch_all_pages: bool = True) -> Optional[Tuple[ParsedDMSSpec, Dict[str, Any]]]: self.logger.info(f"Starting DMS spec parsing. Base URL: {base_url}, Domain Map: {domain_mapping_path}") if ignore_ssl: self.logger.warning("SSL certificate verification is disabled for DMS API calls. This is not recommended for production use.") # 禁用SSL警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) headers = headers or {} try: with open(domain_mapping_path, 'r', encoding='utf-8') as f: DOMAIN_MAP = json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: self.logger.warning(f"Could not load or parse domain map file '{domain_mapping_path}'. Using default domain. Error: {e}") DOMAIN_MAP = {} # 🔧 构建关键词到领域ID的映射表 keyword_to_domain_id = {} for domain_id, domain_info in DOMAIN_MAP.items(): if isinstance(domain_info, dict) and 'keywords' in domain_info: keywords = domain_info['keywords'] if isinstance(keywords, list): for keyword in keywords: keyword_to_domain_id[keyword] = domain_id self.logger.debug(f"映射关键词 '{keyword}' -> 领域ID '{domain_id}'") # 实现分页获取API列表 if fetch_all_pages: self.logger.info(f"Fetching ALL API pages with pagination (page_size={page_size}, starting from page {page_no_start})") else: self.logger.info(f"Fetching SINGLE page (page_size={page_size}, page_no={page_no_start})") api_records = [] page_no = page_no_start total_fetched = 0 pagination_info = { "page_size": page_size, "page_no_start": page_no_start, "total_pages": 0, "total_records": 0, "pages_fetched": 0, "current_page": page_no_start, "fetch_all_pages": fetch_all_pages } try: while True: list_url = urljoin(base_url, f"/api/schema/manage/schema?pageNo={page_no}&pageSize={page_size}") self.logger.debug(f"Fetching page {page_no} from: {list_url}") response = requests.get(list_url, headers=headers, verify=not ignore_ssl) response.raise_for_status() api_list_data = response.json() # 检查业务代码是否成功 list_code = api_list_data.get("code") list_code_normalized = str(list_code).strip().lower() if list_code is not None else "" if list_code_normalized not in {"0", "success", "ok", "200"}: self.logger.error( f"DMS API list endpoint returned a business error: {api_list_data.get('message')} (code={list_code})" ) return None, {} # 从分页结构中提取 'records' page_records = api_list_data.get("data", {}).get("records", []) if not page_records: self.logger.info(f"No more records found on page {page_no}, stopping pagination") break api_records.extend(page_records) total_fetched += len(page_records) self.logger.info(f"Fetched {len(page_records)} records from page {page_no}, total: {total_fetched}") # 更新分页信息 data = api_list_data.get("data", {}) total_count = data.get("total", 0) current_count = data.get("current", 0) * data.get("size", page_size) # 第一次获取时更新总数信息 if page_no == 1: pagination_info["total_records"] = total_count pagination_info["total_pages"] = (total_count + page_size - 1) // page_size # 向上取整 pagination_info["pages_fetched"] = page_no - page_no_start + 1 pagination_info["current_page"] = page_no # 如果是单页模式,获取一页后就停止 if not fetch_all_pages: self.logger.info(f"Single page mode: fetched {len(page_records)} records from page {page_no}") break # 全页模式:检查是否还有更多页面 if current_count >= total_count or len(page_records) < page_size: self.logger.info(f"Reached end of data. Total records: {total_fetched}") break page_no += 1 # 安全检查:防止无限循环 if page_no > 1000: # 最多1000页 self.logger.warning("Reached maximum page limit (1000), stopping pagination") break if not api_records: self.logger.warning("DMS API list is empty after pagination.") return ParsedDMSSpec(endpoints=[], spec={"dms_api_list": []}), pagination_info except requests.exceptions.RequestException as e: self.logger.error(f"Failed to fetch API list from DMS: {e}") return None, {} except json.JSONDecodeError: self.logger.error("Failed to decode JSON response from DMS API list.") return None, {} endpoints: List[DMSEndpoint] = [] for item in api_records: domain_name = item.get('domain') name = item.get('name') model_id = item.get('id') if not all(k in item for k in ['domain', 'name', 'id']): self.logger.warning(f"Skipping an item in API list because it's missing 'domain', 'name', or 'id': {item}") continue # 🔧 改进领域映射:支持精确匹配和前缀匹配 instance_code = keyword_to_domain_id.get(domain_name) if instance_code: self.logger.info(f"通过精确关键词匹配:'{domain_name}' -> '{instance_code}'") else: # 尝试前缀匹配(如wb_cd匹配wb) for keyword, domain_id in keyword_to_domain_id.items(): if domain_name.startswith(keyword + '_'): # wb_cd以wb_开头 instance_code = domain_id self.logger.info(f"通过前缀关键词匹配:'{domain_name}' -> '{instance_code}' (匹配关键词: '{keyword}')") break if not instance_code: # Fallback到原有的直接映射 instance_code = DOMAIN_MAP.get(domain_name, domain_name) self.logger.info(f"使用直接映射:'{domain_name}' -> '{instance_code}'") model_url = urljoin(base_url, f"/api/schema/manage/schema/{model_id}") self.logger.info(f"Fetching model for '{name}' from: {model_url}") try: response = requests.get(model_url, headers=headers, verify=not ignore_ssl) response.raise_for_status() model_schema_response = response.json() except requests.exceptions.RequestException as e: self.logger.error(f"Error fetching model for '{name}': {e}") continue except json.JSONDecodeError: self.logger.error(f"Failed to decode JSON for model '{name}'.") continue detail_code = model_schema_response.get('code') detail_code_normalized = str(detail_code).strip().lower() if detail_code is not None else "" if detail_code is not None and detail_code_normalized not in {"0", "success", "ok", "200"}: self.logger.warning( f"Skipping API '{name}' due to business error when fetching schema: {model_schema_response.get('message')} (code={detail_code})" ) continue raw_model_data = model_schema_response.get('data') if not raw_model_data: self.logger.warning(f"Skipping API '{name}' due to missing 'data' section in schema response.") continue model = None identity_id_list = None version = item.get('version', '1.0.0') # 支持多种返回结构 if isinstance(raw_model_data, dict): candidate = self._normalize_model_candidate(raw_model_data.get('model'), name) if not candidate: schema_field = raw_model_data.get('schema') candidate = self._normalize_model_candidate(schema_field, name) if not candidate and 'records' in raw_model_data and isinstance(raw_model_data['records'], list) and raw_model_data['records']: record = raw_model_data['records'][0] if isinstance(record, dict): record_candidate = record.get('model') or record.get('schema') or record candidate = self._normalize_model_candidate(record_candidate, name) identity_id_list = identity_id_list or record.get('identityId') version = record.get('version', version) if not candidate: candidate = self._normalize_model_candidate(raw_model_data, name) if isinstance(candidate, dict): model = candidate identity_id_list = identity_id_list or raw_model_data.get('identityId') version = raw_model_data.get('version', version) else: model = None elif isinstance(raw_model_data, str): model = self._decode_schema_string(raw_model_data, name) if not isinstance(model, dict): self.logger.warning(f"Skipping API '{name}' because schema model could not be resolved to a dictionary.") continue # 新接口可能将实际模型嵌套在 model['model'] 中 if 'properties' not in model and isinstance(model.get('model'), dict): inner_model = model.get('model') if isinstance(inner_model, dict) and 'properties' in inner_model: identity_id_list = identity_id_list or model.get('identityId') or inner_model.get('identityId') version = model.get('version', version) model = inner_model # 某些接口可能将schema序列化为model['modelJson']之类的字段 if 'properties' not in model: json_field = None for key in ('modelJson', 'schemaJson', 'model_content', 'schema_content'): if key in model: json_field = model[key] break if json_field and isinstance(json_field, str): try: decoded_model = json.loads(json_field) if isinstance(decoded_model, dict) and 'properties' in decoded_model: model = decoded_model except json.JSONDecodeError: self.logger.debug(f"Field '{key}' for '{name}' is not valid JSON; ignoring.") if not model or 'properties' not in model or not isinstance(model['properties'], dict) or not model['properties']: self.logger.warning(f"Skipping API '{name}' due to missing or invalid 'properties' in resolved schema.") continue pk_name = None # Find primary key by looking for top-level "identityId" array. if identity_id_list is None: identity_id_list = model.get("identityId") if isinstance(identity_id_list, list) and len(identity_id_list) > 0: candidate_pk_name = identity_id_list[0] # 🔧 验证identityId指向的字段是否真的存在于properties中 if candidate_pk_name in model['properties']: pk_name = candidate_pk_name self.logger.info(f"Found identityId property '{pk_name}' for model '{name}'.") else: self.logger.warning(f"identityId property '{candidate_pk_name}' not found in model properties for '{name}'. Will use fallback.") # 规范化主键ID列表 if isinstance(identity_id_list, str): identity_id_list = [identity_id_list] elif not isinstance(identity_id_list, list): identity_id_list = [] # Fallback to original behavior if no identityId found or identityId field doesn't exist if not pk_name: pk_name = next(iter(model['properties']), None) if pk_name: self.logger.warning(f"No valid 'identityId' found for model '{name}'. Falling back to using the first property '{pk_name}' as the primary key.") if not pk_name: self.logger.warning(f"Skipping API '{name}' because no properties found in model to identify a primary key.") continue # 🔧 再次验证pk_name确实存在于properties中(双重保险) if pk_name not in model['properties']: self.logger.error(f"Critical error: Primary key '{pk_name}' not found in model properties for '{name}'. Skipping this model.") continue pk_schema = model['properties'][pk_name] version = version or model.get('version', '1.0.0') dms_instance_code = instance_code category_name = domain_name success_response = { "200": { "description": "Success", "content": {"application/json": {"schema": {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": {"type": "boolean"}}}}}}} # Create Endpoint (POST) create_path = f"/api/dms/{dms_instance_code}/v1/{name}" create_request_body_schema = {"type": "object", "properties": {"version": {"type": "string", "example": version}, "act": {"type": "integer", "example": -1}, "data": {"type": "array", "items": model}}, "required": ["data"]} endpoints.append(DMSEndpoint(path=create_path, method='post', title=f"Create {name}", request_body={'content': {'application/json': {'schema': create_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"create_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) # List Endpoint (POST) - 🔧 添加pageNo和pageSize查询参数 list_path = f"/api/dms/{dms_instance_code}/v1/{name}/{version}" list_response_schema = {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": {"type": "array", "items": model}}} list_parameters = [ {'name': 'pageNo', 'in': 'query', 'required': False, 'description': '页码(从1开始)', 'schema': {'type': 'integer', 'default': 1}}, {'name': 'pageSize', 'in': 'query', 'required': False, 'description': '分页大小(最大值200)', 'schema': {'type': 'integer', 'default': 1000}} ] # List请求体Schema(包含version字段,但不包含act字段) list_request_body_schema = { "type": "object", "properties": { "version": {"type": "string", "example": version}, "isSearchCount": {"type": "boolean", "example": True}, "query": { "type": "object", "properties": { "fields": {"type": "array", "items": {"type": "string"}}, "filter": {"type": "object"} } } } } endpoints.append(DMSEndpoint(path=list_path, method='post', title=f"List {name}", request_body={'content': {'application/json': {'schema': list_request_body_schema}}}, responses={'200': {'description': 'Successful Operation', 'content': {'application/json': {'schema': list_response_schema}}}}, parameters=list_parameters, test_mode='standalone', operation_id=f"list_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) # Read Endpoint (GET) - 🔧 只为单主键模型生成查询详情接口 if isinstance(identity_id_list, list) and len(identity_id_list) > 1: # 🚫 多主键模型:不生成查询详情接口 self.logger.info(f"跳过多主键模型 '{name}' 的查询详情接口生成,主键数量: {len(identity_id_list)}") else: # ✅ 单主键模型:生成查询详情接口 read_path = f"/api/dms/{dms_instance_code}/v1/{name}/{version}/{{id}}" read_parameters = [{'name': 'id', 'in': 'path', 'required': True, 'description': f'The ID of the {name}, maps to {pk_name}', 'schema': pk_schema}] read_response_schema = {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": model}} endpoints.append(DMSEndpoint(path=read_path, method='get', title=f"Read {name}", request_body=None, responses={'200': {'description': 'Successful Operation', 'content': {'application/json': {'schema': read_response_schema}}}}, parameters=read_parameters, test_mode='scenario_only', operation_id=f"read_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) self.logger.info(f"创建单主键读取端点 '{name}',路径参数: id") # Update Endpoint (PUT) update_path = f"/api/dms/{dms_instance_code}/v1/{name}" update_request_body_schema = {"type": "object", "properties": {"version": {"type": "string", "example": version}, "act": {"type": "integer", "example": -1}, "data": {"type": "array", "items": model}}, "required": ["data"]} endpoints.append(DMSEndpoint(path=update_path, method='put', title=f"Update {name}", request_body={'content': {'application/json': {'schema': update_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"update_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) # Delete Endpoint (DELETE) delete_path = f"/api/dms/{dms_instance_code}/v1/{name}" # 根据identityId列表长度决定删除schema结构 if isinstance(identity_id_list, list) and len(identity_id_list) > 1: # 多主键:使用对象数组 delete_items_properties = {} delete_required_fields = [] for pk_field in identity_id_list: if pk_field in model['properties']: delete_items_properties[pk_field] = model['properties'][pk_field] delete_required_fields.append(pk_field) delete_request_body_schema = { "type": "object", "properties": { "version": {"type": "string", "example": version}, "data": { "type": "array", "items": { "type": "object", "properties": delete_items_properties, "required": delete_required_fields } } }, "required": ["data"] } self.logger.info(f"创建多主键删除端点 '{name}',主键字段: {identity_id_list}") else: # 单主键:使用字符串数组 delete_request_body_schema = { "type": "object", "properties": { "version": {"type": "string", "example": version}, "data": { "type": "array", "items": {"type": "string"} } }, "required": ["data"] } self.logger.info(f"创建单主键删除端点 '{name}',主键字段: {pk_name}") endpoints.append(DMSEndpoint(path=delete_path, method='delete', title=f"Delete {name}", request_body={'content': {'application/json': {'schema': delete_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"delete_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) # The 'spec' for ParsedDMSSpec should represent the whole document. # We can construct a dictionary holding all the raw data we fetched. dms_full_spec_dict = {"dms_api_list": api_records} return ParsedDMSSpec(endpoints=endpoints, spec=dms_full_spec_dict), pagination_info class DmsConfig: def __init__(self, base_url: str, domain_map_file: str, headers: Optional[Dict[str, str]] = None): self.base_url = base_url self.domain_map_file = domain_map_file self.headers = headers def get_endpoints_from_swagger(file_path: str) -> Tuple[List[SwaggerEndpoint], str]: """ Parses a Swagger/OpenAPI JSON file and returns a list of SwaggerEndpoint objects and the base path of the API. """ logger.info(f"Parsing Swagger/OpenAPI spec from: {file_path}") all_endpoints: List[SwaggerEndpoint] = [] swagger_tags: List[Dict[str, Any]] = [] raw_spec_data_dict: Optional[Dict[str, Any]] = None # Swagger/OpenAPI is a single root object try: with open(file_path, 'r', encoding='utf-8') as f: # TODO: Add YAML support if needed, e.g., using PyYAML raw_spec_data_dict = json.load(f) if not isinstance(raw_spec_data_dict, dict): logger.error(f"Swagger spec file {file_path} does not contain a JSON object as expected.") return [], "" swagger_tags = raw_spec_data_dict.get("tags", []) paths = raw_spec_data_dict.get("paths", {}) for path, path_item_obj in paths.items(): if not isinstance(path_item_obj, dict): continue for method, operation_obj in path_item_obj.items(): # Common methods, can be extended if method.lower() not in ["get", "post", "put", "delete", "patch", "options", "head", "trace"]: continue # Skip non-standard HTTP methods or extensions like 'parameters' at path level if not isinstance(operation_obj, dict): continue try: # Pass the full raw_spec_data_dict for $ref resolution within SwaggerEndpoint swagger_endpoint = SwaggerEndpoint(path, method, operation_obj, global_spec=raw_spec_data_dict) all_endpoints.append(swagger_endpoint) except Exception as e_ep: logger.error(f"Error processing Swagger endpoint: {method.upper()} {path}. Error: {e_ep}", exc_info=True) return all_endpoints, raw_spec_data_dict.get("basePath", "") if raw_spec_data_dict.get("basePath") else "" except FileNotFoundError: # It's better to log this error. Assuming a logger is available at self.logger logger.error(f"Swagger spec file not found: {file_path}") return [], "" except json.JSONDecodeError as e: logger.error(f"Error decoding JSON from Swagger spec file {file_path}: {e}") return [], "" except Exception as e: logger.error(f"An unexpected error occurred while parsing {file_path}: {e}", exc_info=True) return [], "" def parse_input_to_endpoints(input_type: str, input_path: str, dms_config: DmsConfig = None) -> Tuple[List[Endpoint], str]: """ Parses input from a given type (YAPI, Swagger, DMS) and returns a list of Endpoint objects and the base path of the API. """ parser = InputParser() if input_type == "yapi": parsed_spec = parser.parse_yapi_spec(input_path) if parsed_spec: return parsed_spec.endpoints, "" # YAPI doesn't have a base path in the same sense as Swagger elif input_type == "swagger": # The standalone get_endpoints_from_swagger is simple, but for consistency let's use the parser parsed_spec = parser.parse_swagger_spec(input_path) if parsed_spec: base_path = parsed_spec.spec.get("basePath", "") or "" # servers URL might be more modern (OpenAPI 3) if not base_path and "servers" in parsed_spec.spec and parsed_spec.spec["servers"]: # Use the first server URL base_path = parsed_spec.spec["servers"][0].get("url", "") return parsed_spec.endpoints, base_path elif input_type == "dms": if dms_config: parsed_spec = parser.parse_dms_spec(dms_config.domain_map_file, dms_config.base_url, dms_config.headers) if parsed_spec: return parsed_spec.endpoints, dms_config.base_url else: logger.error("DMS configuration not provided for DMS input type.") return [], "" else: logger.error(f"Unsupported input type: {input_type}") return [], "" return [], ""