1088 lines
60 KiB
Python
1088 lines
60 KiB
Python
import json
|
||
import ast
|
||
import re
|
||
import logging
|
||
from typing import List, Dict, Any, Optional, Union, Tuple
|
||
import requests
|
||
from urllib.parse import urljoin
|
||
import copy
|
||
import urllib3
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class BaseEndpoint:
|
||
"""所有端点对象的基类,可以包含一些通用属性或方法。"""
|
||
def __init__(self, method: str, path: str):
|
||
self.method = method
|
||
self.path = path
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
# 基类可以提供一个默认的 to_dict 实现或要求子类实现
|
||
raise NotImplementedError("Subclasses must implement to_dict")
|
||
|
||
class YAPIEndpoint(BaseEndpoint): # Inherit from BaseEndpoint
|
||
def __init__(self, data: Dict[str, Any], category_name: Optional[str] = None, category_id: Optional[int] = None):
|
||
super().__init__(method=data.get("method", "GET").upper(), path=data.get("path", ""))
|
||
self._raw_data = data
|
||
self.title: str = data.get("title", "")
|
||
self.desc: Optional[str] = data.get("desc")
|
||
self._id: int = data.get("_id")
|
||
self.project_id: int = data.get("project_id")
|
||
self.catid: int = data.get("catid")
|
||
|
||
self.req_params: List[Dict[str, Any]] = data.get("req_params", [])
|
||
self.req_query: List[Dict[str, Any]] = data.get("req_query", [])
|
||
self.req_headers: List[Dict[str, Any]] = data.get("req_headers", [])
|
||
self.req_body_form: List[Dict[str, Any]] = data.get("req_body_form", [])
|
||
|
||
self.req_body_type: Optional[str] = data.get("req_body_type")
|
||
self.req_body_is_json_schema: bool = data.get("req_body_is_json_schema", False)
|
||
self.req_body_other: Optional[str] = data.get("req_body_other")
|
||
|
||
self.res_body_type: Optional[str] = data.get("res_body_type")
|
||
self.res_body_is_json_schema: bool = data.get("res_body_is_json_schema", False)
|
||
self.res_body: Optional[str] = data.get("res_body")
|
||
|
||
self.status: str = data.get("status", "undone")
|
||
self.api_opened: bool = data.get("api_opened", False)
|
||
self.uid: int = data.get("uid")
|
||
|
||
self.category_name = category_name
|
||
self.category_id = category_id if category_id is not None else self.catid
|
||
|
||
self._parsed_req_body_schema: Optional[Dict[str, Any]] = None
|
||
if self.req_body_type == "json" and self.req_body_other and self.req_body_is_json_schema:
|
||
try:
|
||
self._parsed_req_body_schema = json.loads(self.req_body_other)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"YAPIEndpoint (ID: {self._id}, Title: {self.title}): Failed to parse req_body_other as JSON during init: {e}. Content: {self.req_body_other[:200]}")
|
||
|
||
self._parsed_res_body_schema: Optional[Dict[str, Any]] = None
|
||
if self.res_body_type == "json" and self.res_body and self.res_body_is_json_schema:
|
||
try:
|
||
self._parsed_res_body_schema = json.loads(self.res_body)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"YAPIEndpoint (ID: {self._id}, Title: {self.title}): Failed to parse res_body as JSON during init: {e}. Content: {self.res_body[:200]}")
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
endpoint_dict = {
|
||
"method": self.method,
|
||
"path": self.path,
|
||
"title": self.title,
|
||
"summary": self.title,
|
||
"description": self.desc or "",
|
||
"operationId": f"{self.method.lower()}_{self.path.replace('/', '_').replace('{', '').replace('}', '')}_{self._id}",
|
||
"tags": [self.category_name or str(self.catid)],
|
||
"parameters": [],
|
||
"requestBody": None,
|
||
"responses": {},
|
||
"_source_format": "yapi",
|
||
"_yapi_id": self._id,
|
||
"_yapi_raw_data": self._raw_data # Keep raw data for debugging or deeper inspection if needed
|
||
}
|
||
|
||
# Path parameters from req_params
|
||
for p_spec in self.req_params:
|
||
param_name = p_spec.get("name")
|
||
if not param_name: continue
|
||
endpoint_dict["parameters"].append({
|
||
"name": param_name,
|
||
"in": "path",
|
||
"required": True, # Path parameters are always required
|
||
"description": p_spec.get("desc", ""),
|
||
"schema": {"type": "string", "example": p_spec.get("example", f"example_{param_name}")}
|
||
})
|
||
|
||
# Query parameters from req_query
|
||
for q_spec in self.req_query:
|
||
param_name = q_spec.get("name")
|
||
if not param_name: continue
|
||
is_required = q_spec.get("required") == "1" # YAPI uses "1" for true
|
||
param_schema = {"type": "string"} # Default to string, YAPI doesn't specify types well here
|
||
if "example" in q_spec: param_schema["example"] = q_spec["example"]
|
||
# Add other fields from YAPI query spec if needed (e.g., desc)
|
||
endpoint_dict["parameters"].append({
|
||
"name": param_name,
|
||
"in": "query",
|
||
"required": is_required,
|
||
"description": q_spec.get("desc", ""),
|
||
"schema": param_schema
|
||
})
|
||
|
||
# Header parameters from req_headers
|
||
for h_spec in self.req_headers:
|
||
param_name = h_spec.get("name")
|
||
if not param_name or param_name.lower() == 'content-type': continue # Content-Type is handled by requestBody
|
||
is_required = h_spec.get("required") == "1"
|
||
default_value = h_spec.get("value") # YAPI uses 'value' for default/example header value
|
||
param_schema = {"type": "string"}
|
||
if default_value:
|
||
if is_required: # If required, it's more like an example of what's expected
|
||
param_schema["example"] = default_value
|
||
else: # If not required, it's a default value
|
||
param_schema["default"] = default_value
|
||
|
||
endpoint_dict["parameters"].append({
|
||
"name": param_name,
|
||
"in": "header",
|
||
"required": is_required,
|
||
"description": h_spec.get("desc", ""),
|
||
"schema": param_schema
|
||
})
|
||
|
||
# Request body
|
||
if self.req_body_type == "json" and self._parsed_req_body_schema:
|
||
endpoint_dict["requestBody"] = {
|
||
"content": {
|
||
"application/json": {
|
||
"schema": self._parsed_req_body_schema
|
||
}
|
||
}
|
||
}
|
||
elif self.req_body_type == "form" and self.req_body_form:
|
||
properties = {}
|
||
required_form_params = []
|
||
for form_param in self.req_body_form:
|
||
name = form_param.get("name")
|
||
if not name: continue
|
||
properties[name] = {
|
||
"type": "string", # YAPI form params are typically strings, file uploads are different
|
||
"description": form_param.get("desc","")
|
||
}
|
||
if form_param.get("example"): properties[name]["example"] = form_param.get("example")
|
||
if form_param.get("required") == "1": required_form_params.append(name)
|
||
|
||
endpoint_dict["requestBody"] = {
|
||
"content": {
|
||
"application/x-www-form-urlencoded": {
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": properties,
|
||
"required": required_form_params if required_form_params else None # OpenAPI: omit if empty
|
||
}
|
||
}
|
||
# YAPI also supports req_body_type = 'file', which would map to multipart/form-data
|
||
# This example focuses on json and basic form.
|
||
}
|
||
}
|
||
# Add other req_body_types if necessary (e.g., raw, file)
|
||
|
||
# Responses
|
||
# YAPI has a simpler response structure. We'll map its res_body to a default success response (e.g., 200 or 201).
|
||
default_success_status = "200"
|
||
if self.method == "POST": default_success_status = "201" # Common practice for POST success
|
||
|
||
if self.res_body_type == "json" and self._parsed_res_body_schema:
|
||
endpoint_dict["responses"][default_success_status] = {
|
||
"description": "Successful Operation (from YAPI res_body)",
|
||
"content": {
|
||
"application/json": {
|
||
"schema": self._parsed_res_body_schema
|
||
}
|
||
}
|
||
}
|
||
elif self.res_body_type == "json" and not self._parsed_res_body_schema and self.res_body: # Schema parsing failed but text exists
|
||
endpoint_dict["responses"][default_success_status] = {
|
||
"description": "Successful Operation (Schema parsing error, raw text might be available)",
|
||
"content": {"application/json": {"schema": {"type": "object", "description": "Schema parsing failed for YAPI res_body."}}} # Placeholder
|
||
}
|
||
else: # No JSON schema, or other res_body_type
|
||
endpoint_dict["responses"][default_success_status] = {
|
||
"description": "Successful Operation (No specific schema provided in YAPI for this response)"
|
||
}
|
||
|
||
# Ensure there's always a default response if nothing specific was added
|
||
if not endpoint_dict["responses"]:
|
||
endpoint_dict["responses"]["default"] = {"description": "Default response from YAPI definition"}
|
||
|
||
return endpoint_dict
|
||
|
||
def __repr__(self):
|
||
return f"<YAPIEndpoint ID:{self._id} Method:{self.method} Path:{self.path} Title:'{self.title}'>"
|
||
|
||
class SwaggerEndpoint(BaseEndpoint): # Inherit from BaseEndpoint
|
||
def __init__(self, path: str, method: str, data: Dict[str, Any], global_spec: Dict[str, Any]):
|
||
super().__init__(method=method.upper(), path=path)
|
||
self._raw_data = data
|
||
self._global_spec = global_spec # Store for $ref resolution
|
||
self.summary: Optional[str] = data.get("summary")
|
||
self.description: Optional[str] = data.get("description")
|
||
self.operation_id: Optional[str] = data.get("operationId")
|
||
self.tags: List[str] = data.get("tags", [])
|
||
# Parameters, requestBody, responses are processed by to_dict
|
||
|
||
def _resolve_ref(self, ref_path: str) -> Optional[Dict[str, Any]]:
|
||
"""Resolves a $ref path within the global OpenAPI/Swagger spec."""
|
||
if not ref_path.startswith("#/"):
|
||
logger.warning(f"Unsupported $ref path: {ref_path}. Only local refs '#/...' are currently supported.")
|
||
return None
|
||
|
||
parts = ref_path[2:].split('/') # Remove '#/' and split
|
||
current_level = self._global_spec
|
||
try:
|
||
for part in parts:
|
||
# Decode URI component encoding if present (e.g. "~0" for "~", "~1" for "/")
|
||
part = part.replace("~1", "/").replace("~0", "~")
|
||
current_level = current_level[part]
|
||
# It's crucial to return a copy if the resolved ref will be modified,
|
||
# or ensure modifications happen on copies later.
|
||
# For now, returning as is, assuming downstream processing is careful or uses copies.
|
||
if isinstance(current_level, dict):
|
||
return current_level # Potentially json.loads(json.dumps(current_level)) for a deep copy
|
||
else: # Resolved to a non-dict, which might be valid for some simple refs but unusual for schemas
|
||
logger.warning(f"$ref '{ref_path}' resolved to a non-dictionary type: {type(current_level)}. Value: {str(current_level)[:100]}")
|
||
return {"type": "string", "description": f"Resolved $ref '{ref_path}' to non-dict: {str(current_level)[:100]}"} # Placeholder
|
||
except (KeyError, TypeError, AttributeError) as e:
|
||
logger.error(f"Failed to resolve $ref '{ref_path}': {e}", exc_info=True)
|
||
return None
|
||
|
||
def _process_schema_or_ref(self, schema_like: Any) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Processes a schema part, resolving $refs and recursively processing nested structures.
|
||
Returns a new dictionary with resolved refs, or None if resolution fails badly.
|
||
"""
|
||
if not isinstance(schema_like, dict):
|
||
if schema_like is None: return None
|
||
logger.warning(f"Expected a dictionary for schema processing, got {type(schema_like)}. Value: {str(schema_like)[:100]}")
|
||
return {"type": "string", "description": f"Schema was not a dict: {str(schema_like)[:100]}"} # Placeholder for non-dict schema
|
||
|
||
# If it's a $ref, resolve it.
|
||
if "$ref" in schema_like:
|
||
return self._resolve_ref(schema_like["$ref"]) # This will be the new base schema_like
|
||
|
||
# Create a copy to avoid modifying the original spec during processing
|
||
processed_schema = schema_like.copy()
|
||
|
||
# Recursively process 'properties' for object schemas
|
||
if "properties" in processed_schema and isinstance(processed_schema["properties"], dict):
|
||
new_properties = {}
|
||
for prop_name, prop_schema in processed_schema["properties"].items():
|
||
resolved_prop = self._process_schema_or_ref(prop_schema)
|
||
if resolved_prop is not None: # Only add if resolution was successful
|
||
new_properties[prop_name] = resolved_prop
|
||
# else: logger.warning(f"Failed to process property '{prop_name}' in {self.operation_id or self.path}")
|
||
processed_schema["properties"] = new_properties
|
||
|
||
# Recursively process 'items' for array schemas
|
||
if "items" in processed_schema and isinstance(processed_schema["items"], dict): # 'items' should be a schema object
|
||
resolved_items = self._process_schema_or_ref(processed_schema["items"])
|
||
if resolved_items is not None:
|
||
processed_schema["items"] = resolved_items
|
||
# else: logger.warning(f"Failed to process 'items' schema in {self.operation_id or self.path}")
|
||
|
||
# Handle allOf, anyOf, oneOf by trying to merge or process them (simplistic merge for allOf)
|
||
# This is a complex area of JSON Schema. This is a very basic attempt.
|
||
if "allOf" in processed_schema and isinstance(processed_schema["allOf"], list):
|
||
merged_all_of_props = {}
|
||
merged_all_of_required = set()
|
||
temp_schema_for_all_of = {"type": processed_schema.get("type", "object"), "properties": {}, "required": []}
|
||
|
||
for sub_schema_data in processed_schema["allOf"]:
|
||
resolved_sub_schema = self._process_schema_or_ref(sub_schema_data)
|
||
if resolved_sub_schema and isinstance(resolved_sub_schema, dict):
|
||
if "properties" in resolved_sub_schema:
|
||
temp_schema_for_all_of["properties"].update(resolved_sub_schema["properties"])
|
||
if "required" in resolved_sub_schema and isinstance(resolved_sub_schema["required"], list):
|
||
merged_all_of_required.update(resolved_sub_schema["required"])
|
||
# Copy other top-level keywords from the resolved_sub_schema if needed, e.g. description
|
||
for key, value in resolved_sub_schema.items():
|
||
if key not in ["properties", "required", "type", "$ref", "allOf", "anyOf", "oneOf"]:
|
||
if key not in temp_schema_for_all_of or temp_schema_for_all_of[key] is None: # prioritize existing
|
||
temp_schema_for_all_of[key] = value
|
||
|
||
if temp_schema_for_all_of["properties"]:
|
||
processed_schema["properties"] = {**processed_schema.get("properties",{}), **temp_schema_for_all_of["properties"]}
|
||
if merged_all_of_required:
|
||
current_required = set(processed_schema.get("required", []))
|
||
current_required.update(merged_all_of_required)
|
||
processed_schema["required"] = sorted(list(current_required))
|
||
del processed_schema["allOf"] # Remove allOf after processing
|
||
# Copy other merged attributes back to processed_schema
|
||
for key, value in temp_schema_for_all_of.items():
|
||
if key not in ["properties", "required", "type", "$ref", "allOf", "anyOf", "oneOf"]:
|
||
if key not in processed_schema or processed_schema[key] is None:
|
||
processed_schema[key] = value
|
||
|
||
|
||
# anyOf, oneOf are harder as they represent choices. For now, we might just list them or pick first.
|
||
# For simplicity in to_dict, we might not fully expand them but ensure refs inside are resolved.
|
||
for keyword in ["anyOf", "oneOf"]:
|
||
if keyword in processed_schema and isinstance(processed_schema[keyword], list):
|
||
processed_sub_list = []
|
||
for sub_item in processed_schema[keyword]:
|
||
resolved_sub = self._process_schema_or_ref(sub_item)
|
||
if resolved_sub:
|
||
processed_sub_list.append(resolved_sub)
|
||
if processed_sub_list: # only update if some were resolved
|
||
processed_schema[keyword] = processed_sub_list
|
||
|
||
return processed_schema
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
endpoint_data = {
|
||
"method": self.method,
|
||
"path": self.path,
|
||
"summary": self.summary or "",
|
||
"title": self.summary or self.operation_id or "", # Fallback for title
|
||
"description": self.description or "",
|
||
"operationId": self.operation_id or f"{self.method.lower()}_{self.path.replace('/', '_').replace('{', '').replace('}', '')}",
|
||
"tags": self.tags,
|
||
"parameters": [],
|
||
"requestBody": None,
|
||
"responses": {},
|
||
"_source_format": "swagger/openapi",
|
||
"_swagger_raw_data": self._raw_data, # Keep raw for debugging
|
||
"_global_api_spec_for_resolution": self._global_spec # For test cases that might need to resolve further
|
||
}
|
||
|
||
# Process parameters
|
||
if "parameters" in self._raw_data and isinstance(self._raw_data["parameters"], list):
|
||
for param_data_raw in self._raw_data["parameters"]:
|
||
# Each param_data_raw could itself be a $ref or contain a schema that is a $ref
|
||
processed_param_container = self._process_schema_or_ref(param_data_raw)
|
||
if processed_param_container and isinstance(processed_param_container, dict):
|
||
# If the parameter itself was a $ref, processed_param_container is the resolved object.
|
||
# If it contained a schema that was a $ref, that nested schema should be resolved.
|
||
# We need to ensure 'schema' key exists if 'in' is path, query, header
|
||
if "schema" in processed_param_container and isinstance(processed_param_container["schema"], dict):
|
||
# schema was present, process it further (it might have been already by _process_schema_or_ref if it was a complex object)
|
||
# but if _process_schema_or_ref was called on param_data_raw which wasn't a ref itself,
|
||
# the internal 'schema' ref might not have been re-processed with full context.
|
||
# However, the recursive nature of _process_schema_or_ref should handle nested $refs.
|
||
pass # Assume it's processed by the main call to _process_schema_or_ref on param_data_raw
|
||
elif "content" in processed_param_container: # Parameter described by Content Object (OpenAPI 3.x)
|
||
pass # Content object schemas should have been resolved by _process_schema_or_ref
|
||
|
||
endpoint_data["parameters"].append(processed_param_container)
|
||
|
||
# Process requestBody
|
||
if "requestBody" in self._raw_data and isinstance(self._raw_data["requestBody"], dict):
|
||
processed_rb = self._process_schema_or_ref(self._raw_data["requestBody"])
|
||
if processed_rb:
|
||
endpoint_data["requestBody"] = processed_rb
|
||
|
||
# Process responses
|
||
if "responses" in self._raw_data and isinstance(self._raw_data["responses"], dict):
|
||
for status_code, resp_data_raw in self._raw_data["responses"].items():
|
||
processed_resp = self._process_schema_or_ref(resp_data_raw)
|
||
if processed_resp:
|
||
endpoint_data["responses"][status_code] = processed_resp
|
||
elif resp_data_raw: # If processing failed but raw exists, keep raw (though this is less ideal)
|
||
endpoint_data["responses"][status_code] = resp_data_raw
|
||
logger.warning(f"Kept raw response data for {status_code} due to processing failure for {self.operation_id or self.path}")
|
||
|
||
if not endpoint_data["responses"]: # Ensure default response if none processed
|
||
endpoint_data["responses"]["default"] = {"description": "Default response from Swagger/OpenAPI definition"}
|
||
|
||
return endpoint_data
|
||
|
||
def __repr__(self):
|
||
return f"<SwaggerEndpoint Method:{self.method} Path:{self.path} Summary:'{self.summary}'>"
|
||
|
||
class DMSEndpoint(BaseEndpoint):
|
||
"""Represents an API endpoint discovered dynamically from the DMS service."""
|
||
def __init__(self, method: str, path: str, title: str,
|
||
request_body: Optional[Dict[str, Any]],
|
||
responses: Dict[str, Any],
|
||
parameters: Optional[List[Dict[str, Any]]] = None,
|
||
category_name: Optional[str] = None,
|
||
raw_record: Optional[Dict[str, Any]] = None,
|
||
test_mode: str = 'standalone',
|
||
operation_id: Optional[str] = None,
|
||
model_pk_name: Optional[str] = None,
|
||
identity_id_list: Optional[List[str]] = None):
|
||
super().__init__(method=method.upper(), path=path)
|
||
self.title = title
|
||
self.request_body = request_body
|
||
self.responses = responses
|
||
self.parameters = parameters if parameters is not None else []
|
||
self.category_name = category_name
|
||
self._raw_record = raw_record
|
||
self.test_mode = test_mode
|
||
self.operation_id = operation_id or f"{self.method.lower()}_{self.category_name or 'dms'}_{title.replace(' ', '_')}"
|
||
self.model_pk_name = model_pk_name
|
||
self.identity_id_list = identity_id_list or []
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""Converts the DMS endpoint data into a standardized OpenAPI-like dictionary."""
|
||
endpoint_dict = {
|
||
"method": self.method,
|
||
"path": self.path,
|
||
"title": self.title,
|
||
"summary": self.title,
|
||
"description": self.title,
|
||
"operationId": self.operation_id,
|
||
"tags": [self.category_name] if self.category_name else [],
|
||
"parameters": self.parameters,
|
||
"requestBody": self.request_body,
|
||
"responses": self.responses,
|
||
"_source_format": "dms",
|
||
"_dms_raw_record": self._raw_record,
|
||
"_test_mode": self.test_mode,
|
||
"_dms_model_pk_name": self.model_pk_name
|
||
}
|
||
return endpoint_dict
|
||
|
||
def __repr__(self):
|
||
return f"<DMSEndpoint Method:{self.method} Path:{self.path} Title:'{self.title}' Mode:{self.test_mode}>"
|
||
|
||
Endpoint = Union[YAPIEndpoint, SwaggerEndpoint, DMSEndpoint]
|
||
|
||
class ParsedAPISpec:
|
||
"""Base class for a parsed API specification from any source."""
|
||
def __init__(self, spec_type: str, endpoints: List[Union[YAPIEndpoint, SwaggerEndpoint, 'DMSEndpoint']], spec: Dict[str, Any]):
|
||
self.spec_type = spec_type
|
||
self.endpoints = endpoints
|
||
self.spec = spec # Store the original full spec dictionary, useful for $ref resolution if not pre-resolved
|
||
|
||
class ParsedYAPISpec(ParsedAPISpec):
|
||
"""解析后的YAPI规范"""
|
||
def __init__(self, endpoints: List[YAPIEndpoint], categories: List[Dict[str, Any]], spec: Dict[str, Any]):
|
||
super().__init__(spec_type="yapi", endpoints=endpoints, spec=spec)
|
||
self.categories = categories
|
||
|
||
class ParsedSwaggerSpec(ParsedAPISpec):
|
||
"""解析后的Swagger/OpenAPI规范"""
|
||
def __init__(self, endpoints: List[SwaggerEndpoint], tags: List[Dict[str, Any]], spec: Dict[str, Any]):
|
||
super().__init__(spec_type="swagger", endpoints=endpoints, spec=spec)
|
||
self.tags = tags
|
||
|
||
class ParsedDMSSpec(ParsedAPISpec):
|
||
"""Parsed specification from the dynamic DMS source."""
|
||
def __init__(self, endpoints: List[DMSEndpoint], spec: Dict[str, Any]):
|
||
super().__init__(spec_type="dms", endpoints=endpoints, spec=spec)
|
||
|
||
class InputParser:
|
||
"""负责解析输入(如YAPI JSON)并提取API端点信息"""
|
||
def __init__(self):
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
def _decode_schema_string(self, raw_string: str, api_name: str) -> Optional[Any]:
|
||
"""尽量将字符串形式的模型解析为 Python 对象,并记录失败原因。"""
|
||
if raw_string is None:
|
||
return None
|
||
|
||
candidate_str = str(raw_string).strip()
|
||
if not candidate_str:
|
||
self.logger.warning(f"Schema for '{api_name}' 是空字符串,无法解析。")
|
||
return None
|
||
|
||
parse_attempts = []
|
||
|
||
def log_attempt(stage: str, exc: Exception):
|
||
parse_attempts.append((stage, exc))
|
||
|
||
# --- Attempt 1: 标准 JSON ---
|
||
try:
|
||
return json.loads(candidate_str)
|
||
except json.JSONDecodeError as exc:
|
||
log_attempt("json.loads", exc)
|
||
|
||
# --- Attempt 2: 处理被整体转义或包裹的 JSON 字符串 ---
|
||
try:
|
||
unescaped_candidate = json.loads(candidate_str.replace('\\"', '"'))
|
||
if isinstance(unescaped_candidate, (dict, list)):
|
||
return unescaped_candidate
|
||
if isinstance(unescaped_candidate, str):
|
||
try:
|
||
return json.loads(unescaped_candidate)
|
||
except json.JSONDecodeError as exc_nested:
|
||
log_attempt("json.loads -> nested", exc_nested)
|
||
except Exception as exc:
|
||
log_attempt("json.loads after unescape", exc)
|
||
|
||
# --- Attempt 3: unicode 转义还原后再尝试 JSON ---
|
||
try:
|
||
unicode_decoded = candidate_str.encode('utf-8').decode('unicode_escape')
|
||
if unicode_decoded != candidate_str:
|
||
return json.loads(unicode_decoded)
|
||
except Exception as exc:
|
||
log_attempt("json.loads after unicode_escape", exc)
|
||
|
||
# --- Attempt 4: ast.literal_eval (需要替换关键字) ---
|
||
normalized_literal = candidate_str
|
||
replacements_for_ast = [
|
||
(r"(?<![A-Za-z0-9_])NaN(?![A-Za-z0-9_])", "None"),
|
||
(r"(?<![A-Za-z0-9_])-?Infinity(?![A-Za-z0-9_])", "None"),
|
||
(r"(?<![A-Za-z0-9_])null(?![A-Za-z0-9_])", "None"),
|
||
(r"(?<![A-Za-z0-9_])true(?![A-Za-z0-9_])", "True"),
|
||
(r"(?<![A-Za-z0-9_])false(?![A-Za-z0-9_])", "False"),
|
||
]
|
||
for pattern, replacement in replacements_for_ast:
|
||
normalized_literal = re.sub(pattern, replacement, normalized_literal, flags=re.IGNORECASE)
|
||
|
||
try:
|
||
return ast.literal_eval(normalized_literal)
|
||
except (ValueError, SyntaxError) as exc:
|
||
log_attempt("ast.literal_eval", exc)
|
||
|
||
# --- Attempt 5: 单引号 JSON 简单归一化后再次尝试 ---
|
||
if candidate_str.count("'") and candidate_str.count('"') == 0:
|
||
approx_json = candidate_str.replace("'", '"')
|
||
approx_json = re.sub(r"(?<![A-Za-z0-9_])None(?![A-Za-z0-9_])", "null", approx_json)
|
||
approx_json = re.sub(r"(?<![A-Za-z0-9_])True(?![A-Za-z0-9_])", "true", approx_json)
|
||
approx_json = re.sub(r"(?<![A-Za-z0-9_])False(?![A-Za-z0-9_])", "false", approx_json)
|
||
try:
|
||
return json.loads(approx_json)
|
||
except json.JSONDecodeError as exc:
|
||
log_attempt("json.loads single-quote normalized", exc)
|
||
|
||
snippet = candidate_str[:500]
|
||
self.logger.warning(
|
||
f"Schema for '{api_name}' 仍无法解析,已尝试多种策略。示例片段: {snippet}"
|
||
)
|
||
for stage, exc in parse_attempts:
|
||
self.logger.debug(f"解析失败阶段 [{stage}]: {exc}")
|
||
return None
|
||
|
||
def _normalize_model_candidate(self, candidate: Any, api_name: str) -> Optional[Dict[str, Any]]:
|
||
"""确保候选模型以字典形式返回。"""
|
||
if isinstance(candidate, dict):
|
||
return candidate
|
||
if isinstance(candidate, str):
|
||
decoded = self._decode_schema_string(candidate, api_name)
|
||
if isinstance(decoded, dict):
|
||
return decoded
|
||
return None
|
||
|
||
def parse_yapi_spec(self, file_path: str) -> Optional[ParsedYAPISpec]:
|
||
self.logger.info(f"Parsing YAPI spec from: {file_path}")
|
||
all_endpoints: List[YAPIEndpoint] = []
|
||
yapi_categories: List[Dict[str, Any]] = []
|
||
raw_spec_data_list: Optional[List[Dict[str, Any]]] = None # YAPI export is a list of categories
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
raw_spec_data_list = json.load(f)
|
||
|
||
if not isinstance(raw_spec_data_list, list):
|
||
self.logger.error(f"YAPI spec file {file_path} does not contain a JSON list as expected for categories.")
|
||
return None
|
||
|
||
for category_data in raw_spec_data_list:
|
||
if not isinstance(category_data, dict):
|
||
self.logger.warning(f"Skipping non-dictionary item in YAPI spec list: {str(category_data)[:100]}")
|
||
continue
|
||
cat_name = category_data.get("name")
|
||
cat_id = category_data.get("_id", category_data.get("id")) # YAPI uses _id
|
||
yapi_categories.append({"name": cat_name, "description": category_data.get("desc"), "id": cat_id})
|
||
|
||
for endpoint_data in category_data.get("list", []):
|
||
if not isinstance(endpoint_data, dict):
|
||
self.logger.warning(f"Skipping non-dictionary endpoint item in category '{cat_name}': {str(endpoint_data)[:100]}")
|
||
continue
|
||
try:
|
||
yapi_endpoint = YAPIEndpoint(endpoint_data, category_name=cat_name, category_id=cat_id)
|
||
all_endpoints.append(yapi_endpoint)
|
||
except Exception as e_ep:
|
||
self.logger.error(f"Error processing YAPI endpoint data (ID: {endpoint_data.get('_id', 'N/A')}, Title: {endpoint_data.get('title', 'N/A')}). Error: {e_ep}", exc_info=True)
|
||
|
||
# The 'spec' for ParsedYAPISpec should be a dict representing the whole document.
|
||
# Since YAPI export is a list of categories, we wrap it.
|
||
yapi_full_spec_dict = {"yapi_categories": raw_spec_data_list}
|
||
return ParsedYAPISpec(endpoints=all_endpoints, categories=yapi_categories, spec=yapi_full_spec_dict)
|
||
except FileNotFoundError:
|
||
self.logger.error(f"YAPI spec file not found: {file_path}")
|
||
except json.JSONDecodeError as e:
|
||
self.logger.error(f"Error decoding JSON from YAPI spec file {file_path}: {e}")
|
||
except Exception as e:
|
||
self.logger.error(f"An unexpected error occurred while parsing YAPI spec {file_path}: {e}", exc_info=True)
|
||
return None
|
||
|
||
def parse_swagger_spec(self, file_path: str) -> Optional[ParsedSwaggerSpec]:
|
||
self.logger.info(f"Parsing Swagger/OpenAPI spec from: {file_path}")
|
||
all_endpoints: List[SwaggerEndpoint] = []
|
||
swagger_tags: List[Dict[str, Any]] = []
|
||
raw_spec_data_dict: Optional[Dict[str, Any]] = None # Swagger/OpenAPI is a single root object
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
# TODO: Add YAML support if needed, e.g., using PyYAML
|
||
raw_spec_data_dict = json.load(f)
|
||
|
||
if not isinstance(raw_spec_data_dict, dict):
|
||
self.logger.error(f"Swagger spec file {file_path} does not contain a JSON object as expected.")
|
||
return None
|
||
|
||
swagger_tags = raw_spec_data_dict.get("tags", [])
|
||
paths = raw_spec_data_dict.get("paths", {})
|
||
|
||
for path, path_item_obj in paths.items():
|
||
if not isinstance(path_item_obj, dict): continue
|
||
for method, operation_obj in path_item_obj.items():
|
||
# Common methods, can be extended
|
||
if method.lower() not in ["get", "post", "put", "delete", "patch", "options", "head", "trace"]:
|
||
continue # Skip non-standard HTTP methods or extensions like 'parameters' at path level
|
||
if not isinstance(operation_obj, dict): continue
|
||
try:
|
||
# Pass the full raw_spec_data_dict for $ref resolution within SwaggerEndpoint
|
||
swagger_endpoint = SwaggerEndpoint(path, method, operation_obj, global_spec=raw_spec_data_dict)
|
||
all_endpoints.append(swagger_endpoint)
|
||
except Exception as e_ep:
|
||
self.logger.error(f"Error processing Swagger endpoint: {method.upper()} {path}. Error: {e_ep}", exc_info=True)
|
||
|
||
return ParsedSwaggerSpec(endpoints=all_endpoints, tags=swagger_tags, spec=raw_spec_data_dict)
|
||
except FileNotFoundError:
|
||
self.logger.error(f"Swagger spec file not found: {file_path}")
|
||
except json.JSONDecodeError as e:
|
||
self.logger.error(f"Error decoding JSON from Swagger spec file {file_path}: {e}")
|
||
except Exception as e:
|
||
self.logger.error(f"An unexpected error occurred while parsing Swagger spec {file_path}: {e}", exc_info=True)
|
||
return None
|
||
|
||
def parse_dms_spec(self, domain_mapping_path: str, base_url: str, headers: Optional[Dict[str, str]] = None, ignore_ssl: bool = False, page_size: int = 1000, page_no_start: int = 1, fetch_all_pages: bool = True) -> Optional[Tuple[ParsedDMSSpec, Dict[str, Any]]]:
|
||
self.logger.info(f"Starting DMS spec parsing. Base URL: {base_url}, Domain Map: {domain_mapping_path}")
|
||
|
||
if ignore_ssl:
|
||
self.logger.warning("SSL certificate verification is disabled for DMS API calls. This is not recommended for production use.")
|
||
# 禁用SSL警告
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
headers = headers or {}
|
||
|
||
try:
|
||
with open(domain_mapping_path, 'r', encoding='utf-8') as f:
|
||
DOMAIN_MAP = json.load(f)
|
||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||
self.logger.warning(f"Could not load or parse domain map file '{domain_mapping_path}'. Using default domain. Error: {e}")
|
||
DOMAIN_MAP = {}
|
||
|
||
# 🔧 构建关键词到领域ID的映射表
|
||
keyword_to_domain_id = {}
|
||
for domain_id, domain_info in DOMAIN_MAP.items():
|
||
if isinstance(domain_info, dict) and 'keywords' in domain_info:
|
||
keywords = domain_info['keywords']
|
||
if isinstance(keywords, list):
|
||
for keyword in keywords:
|
||
keyword_to_domain_id[keyword] = domain_id
|
||
self.logger.debug(f"映射关键词 '{keyword}' -> 领域ID '{domain_id}'")
|
||
|
||
# 实现分页获取API列表
|
||
if fetch_all_pages:
|
||
self.logger.info(f"Fetching ALL API pages with pagination (page_size={page_size}, starting from page {page_no_start})")
|
||
else:
|
||
self.logger.info(f"Fetching SINGLE page (page_size={page_size}, page_no={page_no_start})")
|
||
|
||
api_records = []
|
||
page_no = page_no_start
|
||
total_fetched = 0
|
||
pagination_info = {
|
||
"page_size": page_size,
|
||
"page_no_start": page_no_start,
|
||
"total_pages": 0,
|
||
"total_records": 0,
|
||
"pages_fetched": 0,
|
||
"current_page": page_no_start,
|
||
"fetch_all_pages": fetch_all_pages
|
||
}
|
||
|
||
try:
|
||
while True:
|
||
list_url = urljoin(base_url, f"/api/schema/manage/schema?pageNo={page_no}&pageSize={page_size}")
|
||
self.logger.debug(f"Fetching page {page_no} from: {list_url}")
|
||
|
||
response = requests.get(list_url, headers=headers, verify=not ignore_ssl)
|
||
response.raise_for_status()
|
||
api_list_data = response.json()
|
||
|
||
# 检查业务代码是否成功
|
||
list_code = api_list_data.get("code")
|
||
list_code_normalized = str(list_code).strip().lower() if list_code is not None else ""
|
||
if list_code_normalized not in {"0", "success", "ok", "200"}:
|
||
self.logger.error(
|
||
f"DMS API list endpoint returned a business error: {api_list_data.get('message')} (code={list_code})"
|
||
)
|
||
return None, {}
|
||
|
||
# 从分页结构中提取 'records'
|
||
page_records = api_list_data.get("data", {}).get("records", [])
|
||
if not page_records:
|
||
self.logger.info(f"No more records found on page {page_no}, stopping pagination")
|
||
break
|
||
|
||
api_records.extend(page_records)
|
||
total_fetched += len(page_records)
|
||
self.logger.info(f"Fetched {len(page_records)} records from page {page_no}, total: {total_fetched}")
|
||
|
||
# 更新分页信息
|
||
data = api_list_data.get("data", {})
|
||
total_count = data.get("total", 0)
|
||
current_count = data.get("current", 0) * data.get("size", page_size)
|
||
|
||
# 第一次获取时更新总数信息
|
||
if page_no == 1:
|
||
pagination_info["total_records"] = total_count
|
||
pagination_info["total_pages"] = (total_count + page_size - 1) // page_size # 向上取整
|
||
|
||
pagination_info["pages_fetched"] = page_no - page_no_start + 1
|
||
pagination_info["current_page"] = page_no
|
||
|
||
# 如果是单页模式,获取一页后就停止
|
||
if not fetch_all_pages:
|
||
self.logger.info(f"Single page mode: fetched {len(page_records)} records from page {page_no}")
|
||
break
|
||
|
||
# 全页模式:检查是否还有更多页面
|
||
if current_count >= total_count or len(page_records) < page_size:
|
||
self.logger.info(f"Reached end of data. Total records: {total_fetched}")
|
||
break
|
||
|
||
page_no += 1
|
||
|
||
# 安全检查:防止无限循环
|
||
if page_no > 1000: # 最多1000页
|
||
self.logger.warning("Reached maximum page limit (1000), stopping pagination")
|
||
break
|
||
|
||
if not api_records:
|
||
self.logger.warning("DMS API list is empty after pagination.")
|
||
return ParsedDMSSpec(endpoints=[], spec={"dms_api_list": []}), pagination_info
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
self.logger.error(f"Failed to fetch API list from DMS: {e}")
|
||
return None, {}
|
||
except json.JSONDecodeError:
|
||
self.logger.error("Failed to decode JSON response from DMS API list.")
|
||
return None, {}
|
||
|
||
endpoints: List[DMSEndpoint] = []
|
||
|
||
for item in api_records:
|
||
domain_name = item.get('domain')
|
||
name = item.get('name')
|
||
model_id = item.get('id')
|
||
if not all(k in item for k in ['domain', 'name', 'id']):
|
||
self.logger.warning(f"Skipping an item in API list because it's missing 'domain', 'name', or 'id': {item}")
|
||
continue
|
||
|
||
# 🔧 改进领域映射:支持精确匹配和前缀匹配
|
||
instance_code = keyword_to_domain_id.get(domain_name)
|
||
if instance_code:
|
||
self.logger.info(f"通过精确关键词匹配:'{domain_name}' -> '{instance_code}'")
|
||
else:
|
||
# 尝试前缀匹配(如wb_cd匹配wb)
|
||
for keyword, domain_id in keyword_to_domain_id.items():
|
||
if domain_name.startswith(keyword + '_'): # wb_cd以wb_开头
|
||
instance_code = domain_id
|
||
self.logger.info(f"通过前缀关键词匹配:'{domain_name}' -> '{instance_code}' (匹配关键词: '{keyword}')")
|
||
break
|
||
|
||
if not instance_code:
|
||
# Fallback到原有的直接映射
|
||
instance_code = DOMAIN_MAP.get(domain_name, domain_name)
|
||
self.logger.info(f"使用直接映射:'{domain_name}' -> '{instance_code}'")
|
||
model_url = urljoin(base_url, f"/api/schema/manage/schema/{model_id}")
|
||
self.logger.info(f"Fetching model for '{name}' from: {model_url}")
|
||
|
||
try:
|
||
response = requests.get(model_url, headers=headers, verify=not ignore_ssl)
|
||
response.raise_for_status()
|
||
model_schema_response = response.json()
|
||
except requests.exceptions.RequestException as e:
|
||
self.logger.error(f"Error fetching model for '{name}': {e}")
|
||
continue
|
||
except json.JSONDecodeError:
|
||
self.logger.error(f"Failed to decode JSON for model '{name}'.")
|
||
continue
|
||
|
||
detail_code = model_schema_response.get('code')
|
||
detail_code_normalized = str(detail_code).strip().lower() if detail_code is not None else ""
|
||
if detail_code is not None and detail_code_normalized not in {"0", "success", "ok", "200"}:
|
||
self.logger.warning(
|
||
f"Skipping API '{name}' due to business error when fetching schema: {model_schema_response.get('message')} (code={detail_code})"
|
||
)
|
||
continue
|
||
|
||
raw_model_data = model_schema_response.get('data')
|
||
if not raw_model_data:
|
||
self.logger.warning(f"Skipping API '{name}' due to missing 'data' section in schema response.")
|
||
continue
|
||
|
||
model = None
|
||
identity_id_list = None
|
||
version = item.get('version', '1.0.0')
|
||
|
||
# 支持多种返回结构
|
||
if isinstance(raw_model_data, dict):
|
||
candidate = self._normalize_model_candidate(raw_model_data.get('model'), name)
|
||
|
||
if not candidate:
|
||
schema_field = raw_model_data.get('schema')
|
||
candidate = self._normalize_model_candidate(schema_field, name)
|
||
|
||
if not candidate and 'records' in raw_model_data and isinstance(raw_model_data['records'], list) and raw_model_data['records']:
|
||
record = raw_model_data['records'][0]
|
||
if isinstance(record, dict):
|
||
record_candidate = record.get('model') or record.get('schema') or record
|
||
candidate = self._normalize_model_candidate(record_candidate, name)
|
||
identity_id_list = identity_id_list or record.get('identityId')
|
||
version = record.get('version', version)
|
||
|
||
if not candidate:
|
||
candidate = self._normalize_model_candidate(raw_model_data, name)
|
||
|
||
if isinstance(candidate, dict):
|
||
model = candidate
|
||
identity_id_list = identity_id_list or raw_model_data.get('identityId')
|
||
version = raw_model_data.get('version', version)
|
||
else:
|
||
model = None
|
||
elif isinstance(raw_model_data, str):
|
||
model = self._decode_schema_string(raw_model_data, name)
|
||
|
||
if not isinstance(model, dict):
|
||
self.logger.warning(f"Skipping API '{name}' because schema model could not be resolved to a dictionary.")
|
||
continue
|
||
|
||
# 新接口可能将实际模型嵌套在 model['model'] 中
|
||
if 'properties' not in model and isinstance(model.get('model'), dict):
|
||
inner_model = model.get('model')
|
||
if isinstance(inner_model, dict) and 'properties' in inner_model:
|
||
identity_id_list = identity_id_list or model.get('identityId') or inner_model.get('identityId')
|
||
version = model.get('version', version)
|
||
model = inner_model
|
||
|
||
# 某些接口可能将schema序列化为model['modelJson']之类的字段
|
||
if 'properties' not in model:
|
||
json_field = None
|
||
for key in ('modelJson', 'schemaJson', 'model_content', 'schema_content'):
|
||
if key in model:
|
||
json_field = model[key]
|
||
break
|
||
if json_field and isinstance(json_field, str):
|
||
try:
|
||
decoded_model = json.loads(json_field)
|
||
if isinstance(decoded_model, dict) and 'properties' in decoded_model:
|
||
model = decoded_model
|
||
except json.JSONDecodeError:
|
||
self.logger.debug(f"Field '{key}' for '{name}' is not valid JSON; ignoring.")
|
||
|
||
if not model or 'properties' not in model or not isinstance(model['properties'], dict) or not model['properties']:
|
||
self.logger.warning(f"Skipping API '{name}' due to missing or invalid 'properties' in resolved schema.")
|
||
continue
|
||
|
||
pk_name = None
|
||
# Find primary key by looking for top-level "identityId" array.
|
||
if identity_id_list is None:
|
||
identity_id_list = model.get("identityId")
|
||
if isinstance(identity_id_list, list) and len(identity_id_list) > 0:
|
||
candidate_pk_name = identity_id_list[0]
|
||
# 🔧 验证identityId指向的字段是否真的存在于properties中
|
||
if candidate_pk_name in model['properties']:
|
||
pk_name = candidate_pk_name
|
||
self.logger.info(f"Found identityId property '{pk_name}' for model '{name}'.")
|
||
else:
|
||
self.logger.warning(f"identityId property '{candidate_pk_name}' not found in model properties for '{name}'. Will use fallback.")
|
||
|
||
# 规范化主键ID列表
|
||
if isinstance(identity_id_list, str):
|
||
identity_id_list = [identity_id_list]
|
||
elif not isinstance(identity_id_list, list):
|
||
identity_id_list = []
|
||
|
||
# Fallback to original behavior if no identityId found or identityId field doesn't exist
|
||
if not pk_name:
|
||
pk_name = next(iter(model['properties']), None)
|
||
if pk_name:
|
||
self.logger.warning(f"No valid 'identityId' found for model '{name}'. Falling back to using the first property '{pk_name}' as the primary key.")
|
||
|
||
if not pk_name:
|
||
self.logger.warning(f"Skipping API '{name}' because no properties found in model to identify a primary key.")
|
||
continue
|
||
|
||
# 🔧 再次验证pk_name确实存在于properties中(双重保险)
|
||
if pk_name not in model['properties']:
|
||
self.logger.error(f"Critical error: Primary key '{pk_name}' not found in model properties for '{name}'. Skipping this model.")
|
||
continue
|
||
|
||
pk_schema = model['properties'][pk_name]
|
||
|
||
version = version or model.get('version', '1.0.0')
|
||
dms_instance_code = instance_code
|
||
category_name = domain_name
|
||
|
||
success_response = {
|
||
"200": { "description": "Success", "content": {"application/json": {"schema": {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": {"type": "boolean"}}}}}}}
|
||
|
||
# Create Endpoint (POST)
|
||
create_path = f"/api/dms/{dms_instance_code}/v1/{name}"
|
||
create_request_body_schema = {"type": "object", "properties": {"version": {"type": "string", "example": version}, "act": {"type": "integer", "example": -1}, "data": {"type": "array", "items": model}}, "required": ["data"]}
|
||
endpoints.append(DMSEndpoint(path=create_path, method='post', title=f"Create {name}", request_body={'content': {'application/json': {'schema': create_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"create_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list))
|
||
|
||
# List Endpoint (POST) - 🔧 添加pageNo和pageSize查询参数
|
||
list_path = f"/api/dms/{dms_instance_code}/v1/{name}/{version}"
|
||
list_response_schema = {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": {"type": "array", "items": model}}}
|
||
list_parameters = [
|
||
{'name': 'pageNo', 'in': 'query', 'required': False, 'description': '页码(从1开始)', 'schema': {'type': 'integer', 'default': 1}},
|
||
{'name': 'pageSize', 'in': 'query', 'required': False, 'description': '分页大小(最大值200)', 'schema': {'type': 'integer', 'default': 1000}}
|
||
]
|
||
# List请求体Schema(包含version字段,但不包含act字段)
|
||
list_request_body_schema = {
|
||
"type": "object",
|
||
"properties": {
|
||
"version": {"type": "string", "example": version},
|
||
"isSearchCount": {"type": "boolean", "example": True},
|
||
"query": {
|
||
"type": "object",
|
||
"properties": {
|
||
"fields": {"type": "array", "items": {"type": "string"}},
|
||
"filter": {"type": "object"}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
endpoints.append(DMSEndpoint(path=list_path, method='post', title=f"List {name}", request_body={'content': {'application/json': {'schema': list_request_body_schema}}}, responses={'200': {'description': 'Successful Operation', 'content': {'application/json': {'schema': list_response_schema}}}}, parameters=list_parameters, test_mode='standalone', operation_id=f"list_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list))
|
||
|
||
# Read Endpoint (GET) - 🔧 只为单主键模型生成查询详情接口
|
||
if isinstance(identity_id_list, list) and len(identity_id_list) > 1:
|
||
# 🚫 多主键模型:不生成查询详情接口
|
||
self.logger.info(f"跳过多主键模型 '{name}' 的查询详情接口生成,主键数量: {len(identity_id_list)}")
|
||
else:
|
||
# ✅ 单主键模型:生成查询详情接口
|
||
read_path = f"/api/dms/{dms_instance_code}/v1/{name}/{version}/{{id}}"
|
||
read_parameters = [{'name': 'id', 'in': 'path', 'required': True, 'description': f'The ID of the {name}, maps to {pk_name}', 'schema': pk_schema}]
|
||
read_response_schema = {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": model}}
|
||
endpoints.append(DMSEndpoint(path=read_path, method='get', title=f"Read {name}", request_body=None, responses={'200': {'description': 'Successful Operation', 'content': {'application/json': {'schema': read_response_schema}}}}, parameters=read_parameters, test_mode='scenario_only', operation_id=f"read_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list))
|
||
self.logger.info(f"创建单主键读取端点 '{name}',路径参数: id")
|
||
|
||
# Update Endpoint (PUT)
|
||
update_path = f"/api/dms/{dms_instance_code}/v1/{name}"
|
||
update_request_body_schema = {"type": "object", "properties": {"version": {"type": "string", "example": version}, "act": {"type": "integer", "example": -1}, "data": {"type": "array", "items": model}}, "required": ["data"]}
|
||
endpoints.append(DMSEndpoint(path=update_path, method='put', title=f"Update {name}", request_body={'content': {'application/json': {'schema': update_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"update_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list))
|
||
|
||
# Delete Endpoint (DELETE)
|
||
delete_path = f"/api/dms/{dms_instance_code}/v1/{name}"
|
||
|
||
# 根据identityId列表长度决定删除schema结构
|
||
if isinstance(identity_id_list, list) and len(identity_id_list) > 1:
|
||
# 多主键:使用对象数组
|
||
delete_items_properties = {}
|
||
delete_required_fields = []
|
||
for pk_field in identity_id_list:
|
||
if pk_field in model['properties']:
|
||
delete_items_properties[pk_field] = model['properties'][pk_field]
|
||
delete_required_fields.append(pk_field)
|
||
|
||
delete_request_body_schema = {
|
||
"type": "object",
|
||
"properties": {
|
||
"version": {"type": "string", "example": version},
|
||
"data": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": delete_items_properties,
|
||
"required": delete_required_fields
|
||
}
|
||
}
|
||
},
|
||
"required": ["data"]
|
||
}
|
||
self.logger.info(f"创建多主键删除端点 '{name}',主键字段: {identity_id_list}")
|
||
else:
|
||
# 单主键:使用字符串数组
|
||
delete_request_body_schema = {
|
||
"type": "object",
|
||
"properties": {
|
||
"version": {"type": "string", "example": version},
|
||
"data": {
|
||
"type": "array",
|
||
"items": {"type": "string"}
|
||
}
|
||
},
|
||
"required": ["data"]
|
||
}
|
||
self.logger.info(f"创建单主键删除端点 '{name}',主键字段: {pk_name}")
|
||
|
||
endpoints.append(DMSEndpoint(path=delete_path, method='delete', title=f"Delete {name}", request_body={'content': {'application/json': {'schema': delete_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"delete_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list))
|
||
|
||
# The 'spec' for ParsedDMSSpec should represent the whole document.
|
||
# We can construct a dictionary holding all the raw data we fetched.
|
||
dms_full_spec_dict = {"dms_api_list": api_records}
|
||
return ParsedDMSSpec(endpoints=endpoints, spec=dms_full_spec_dict), pagination_info
|
||
|
||
class DmsConfig:
|
||
def __init__(self, base_url: str, domain_map_file: str, headers: Optional[Dict[str, str]] = None):
|
||
self.base_url = base_url
|
||
self.domain_map_file = domain_map_file
|
||
self.headers = headers
|
||
|
||
def get_endpoints_from_swagger(file_path: str) -> Tuple[List[SwaggerEndpoint], str]:
|
||
"""
|
||
Parses a Swagger/OpenAPI JSON file and returns a list of SwaggerEndpoint objects
|
||
and the base path of the API.
|
||
"""
|
||
logger.info(f"Parsing Swagger/OpenAPI spec from: {file_path}")
|
||
all_endpoints: List[SwaggerEndpoint] = []
|
||
swagger_tags: List[Dict[str, Any]] = []
|
||
raw_spec_data_dict: Optional[Dict[str, Any]] = None # Swagger/OpenAPI is a single root object
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
# TODO: Add YAML support if needed, e.g., using PyYAML
|
||
raw_spec_data_dict = json.load(f)
|
||
|
||
if not isinstance(raw_spec_data_dict, dict):
|
||
logger.error(f"Swagger spec file {file_path} does not contain a JSON object as expected.")
|
||
return [], ""
|
||
|
||
swagger_tags = raw_spec_data_dict.get("tags", [])
|
||
paths = raw_spec_data_dict.get("paths", {})
|
||
|
||
for path, path_item_obj in paths.items():
|
||
if not isinstance(path_item_obj, dict): continue
|
||
for method, operation_obj in path_item_obj.items():
|
||
# Common methods, can be extended
|
||
if method.lower() not in ["get", "post", "put", "delete", "patch", "options", "head", "trace"]:
|
||
continue # Skip non-standard HTTP methods or extensions like 'parameters' at path level
|
||
if not isinstance(operation_obj, dict): continue
|
||
try:
|
||
# Pass the full raw_spec_data_dict for $ref resolution within SwaggerEndpoint
|
||
swagger_endpoint = SwaggerEndpoint(path, method, operation_obj, global_spec=raw_spec_data_dict)
|
||
all_endpoints.append(swagger_endpoint)
|
||
except Exception as e_ep:
|
||
logger.error(f"Error processing Swagger endpoint: {method.upper()} {path}. Error: {e_ep}", exc_info=True)
|
||
|
||
return all_endpoints, raw_spec_data_dict.get("basePath", "") if raw_spec_data_dict.get("basePath") else ""
|
||
except FileNotFoundError:
|
||
# It's better to log this error. Assuming a logger is available at self.logger
|
||
logger.error(f"Swagger spec file not found: {file_path}")
|
||
return [], ""
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"Error decoding JSON from Swagger spec file {file_path}: {e}")
|
||
return [], ""
|
||
except Exception as e:
|
||
logger.error(f"An unexpected error occurred while parsing {file_path}: {e}", exc_info=True)
|
||
return [], ""
|
||
|
||
|
||
def parse_input_to_endpoints(input_type: str, input_path: str, dms_config: DmsConfig = None) -> Tuple[List[Endpoint], str]:
|
||
"""
|
||
Parses input from a given type (YAPI, Swagger, DMS) and returns a list of Endpoint objects
|
||
and the base path of the API.
|
||
"""
|
||
parser = InputParser()
|
||
if input_type == "yapi":
|
||
parsed_spec = parser.parse_yapi_spec(input_path)
|
||
if parsed_spec:
|
||
return parsed_spec.endpoints, "" # YAPI doesn't have a base path in the same sense as Swagger
|
||
elif input_type == "swagger":
|
||
# The standalone get_endpoints_from_swagger is simple, but for consistency let's use the parser
|
||
parsed_spec = parser.parse_swagger_spec(input_path)
|
||
if parsed_spec:
|
||
base_path = parsed_spec.spec.get("basePath", "") or ""
|
||
# servers URL might be more modern (OpenAPI 3)
|
||
if not base_path and "servers" in parsed_spec.spec and parsed_spec.spec["servers"]:
|
||
# Use the first server URL
|
||
base_path = parsed_spec.spec["servers"][0].get("url", "")
|
||
return parsed_spec.endpoints, base_path
|
||
elif input_type == "dms":
|
||
if dms_config:
|
||
parsed_spec = parser.parse_dms_spec(dms_config.domain_map_file, dms_config.base_url, dms_config.headers)
|
||
if parsed_spec:
|
||
return parsed_spec.endpoints, dms_config.base_url
|
||
else:
|
||
logger.error("DMS configuration not provided for DMS input type.")
|
||
return [], ""
|
||
else:
|
||
logger.error(f"Unsupported input type: {input_type}")
|
||
return [], ""
|
||
|
||
return [], "" |