From 89b009dafb7f045a347fbac8555cc9eb930bea5b Mon Sep 17 00:00:00 2001 From: ruoyunbai <19376215@buaa.edu.cn> Date: Mon, 29 Sep 2025 10:29:44 +0800 Subject: [PATCH] parser --- .gitignore | 398 ++-- ddms_compliance_suite/input_parser/parser.py | 1927 +++++++++--------- fastapi_server.py | 2 +- 3 files changed, 1207 insertions(+), 1120 deletions(-) diff --git a/.gitignore b/.gitignore index 1dc356e..75f847e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,198 +1,200 @@ -./test_reports/* -./test_reports -./mvp -./memory-bank -./logs -./build -# 构建和分发目录 -build/ -dist/ -log* -dms-compliance-compose* -# Python相关 -__pycache__/ -*.py[cod] -*$py.class -*.so -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -pip-wheel-metadata/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# 虚拟环境 -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# IDE相关 -.vscode/ -.idea/ -*.swp -*.swo -*~ -.DS_Store -.DS_Store? -._* -.Spotlight-V100 -.Trashes -ehthumbs.db -Thumbs.db - -# 测试和覆盖率 -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ - -# 日志文件 -*.log -logs/ -*.log.* - -# 数据库 -*.db -*.sqlite -*.sqlite3 - -# 测试报告和上传文件 -test_reports/ -uploads/ -temp/ -tmp/ - -# 配置文件(包含敏感信息) -config.json -config.yaml -config.yml -.env.local -.env.*.local -secrets.json - -# Docker相关临时文件 -.dockerignore - -# 系统文件 -Thumbs.db - -# 编辑器临时文件 -*.tmp -*.temp -*.bak -*.backup - -# 压缩文件 -*.zip -*.tar.gz -*.rar - -# 证书和密钥 -*.pem -*.key -*.crt -*.p12 -ssl/ - -# 本地开发文件 -local_* -dev_* -debug_* - -# 用户特定文件 -users.db -session_* - -# 缓存目录 -.cache/ -cache/ - -# Jupyter Notebook -.ipynb_checkpoints - -# pyenv -.python-version - -# pipenv -Pipfile.lock - -# PEP 582 -__pypackages__/ - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json -# 大文件和构建产物 -build/ -dist/ -*.zip -*.tar.gz -*.rar -*.7z - -# 日志文件 -*.log -*.log.* -dms.log -post_output.log - -# 数据库文件 -*.db -*.sqlite -*.sqlite3 -# users.db # 注释掉,允许跟踪users.db -!users.db # 例外规则:允许跟踪users.db - -# Python编译文件 -*.pyc -*.pyo -__pycache__/ - -# 系统文件 -.DS_Store -Thumbs.db - -# 临时文件 -*.tmp -*.temp -*~ - -# 大的字体文件(如果不需要版本控制) -# assets/fonts/*.ttc -# assets/fonts/*.otf - -# 二进制文件 -*.bin -*.exe -*.pkg - -# 历史文件 -history_local -mvp.zip -归档.zip +./test_reports/* +./test_reports +./mvp +./memory-bank +./logs +./build +# 构建和分发目录 +build/ +dist/ +log* +docker/* +dms-compliance* +dms-compliance-compose* +# Python相关 +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# 虚拟环境 +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE相关 +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# 测试和覆盖率 +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# 日志文件 +*.log +logs/ +*.log.* + +# 数据库 +*.db +*.sqlite +*.sqlite3 + +# 测试报告和上传文件 +test_reports/ +uploads/ +temp/ +tmp/ + +# 配置文件(包含敏感信息) +config.json +config.yaml +config.yml +.env.local +.env.*.local +secrets.json + +# Docker相关临时文件 +.dockerignore + +# 系统文件 +Thumbs.db + +# 编辑器临时文件 +*.tmp +*.temp +*.bak +*.backup + +# 压缩文件 +*.zip +*.tar.gz +*.rar + +# 证书和密钥 +*.pem +*.key +*.crt +*.p12 +ssl/ + +# 本地开发文件 +local_* +dev_* +debug_* + +# 用户特定文件 +users.db +session_* + +# 缓存目录 +.cache/ +cache/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# pipenv +Pipfile.lock + +# PEP 582 +__pypackages__/ + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json +# 大文件和构建产物 +build/ +dist/ +*.zip +*.tar.gz +*.rar +*.7z + +# 日志文件 +*.log +*.log.* +dms.log +post_output.log + +# 数据库文件 +*.db +*.sqlite +*.sqlite3 +# users.db # 注释掉,允许跟踪users.db +!users.db # 例外规则:允许跟踪users.db + +# Python编译文件 +*.pyc +*.pyo +__pycache__/ + +# 系统文件 +.DS_Store +Thumbs.db + +# 临时文件 +*.tmp +*.temp +*~ + +# 大的字体文件(如果不需要版本控制) +# assets/fonts/*.ttc +# assets/fonts/*.otf + +# 二进制文件 +*.bin +*.exe +*.pkg + +# 历史文件 +history_local +mvp.zip +归档.zip diff --git a/ddms_compliance_suite/input_parser/parser.py b/ddms_compliance_suite/input_parser/parser.py index 0197ad0..045d0ca 100644 --- a/ddms_compliance_suite/input_parser/parser.py +++ b/ddms_compliance_suite/input_parser/parser.py @@ -1,922 +1,1007 @@ -import json -import logging -from typing import List, Dict, Any, Optional, Union, Tuple -import requests -from urllib.parse import urljoin -import copy -import urllib3 - -logger = logging.getLogger(__name__) - -class BaseEndpoint: - """所有端点对象的基类,可以包含一些通用属性或方法。""" - def __init__(self, method: str, path: str): - self.method = method - self.path = path - - def to_dict(self) -> Dict[str, Any]: - # 基类可以提供一个默认的 to_dict 实现或要求子类实现 - raise NotImplementedError("Subclasses must implement to_dict") - -class YAPIEndpoint(BaseEndpoint): # Inherit from BaseEndpoint - def __init__(self, data: Dict[str, Any], category_name: Optional[str] = None, category_id: Optional[int] = None): - super().__init__(method=data.get("method", "GET").upper(), path=data.get("path", "")) - self._raw_data = data - self.title: str = data.get("title", "") - self.desc: Optional[str] = data.get("desc") - self._id: int = data.get("_id") - self.project_id: int = data.get("project_id") - self.catid: int = data.get("catid") - - self.req_params: List[Dict[str, Any]] = data.get("req_params", []) - self.req_query: List[Dict[str, Any]] = data.get("req_query", []) - self.req_headers: List[Dict[str, Any]] = data.get("req_headers", []) - self.req_body_form: List[Dict[str, Any]] = data.get("req_body_form", []) - - self.req_body_type: Optional[str] = data.get("req_body_type") - self.req_body_is_json_schema: bool = data.get("req_body_is_json_schema", False) - self.req_body_other: Optional[str] = data.get("req_body_other") - - self.res_body_type: Optional[str] = data.get("res_body_type") - self.res_body_is_json_schema: bool = data.get("res_body_is_json_schema", False) - self.res_body: Optional[str] = data.get("res_body") - - self.status: str = data.get("status", "undone") - self.api_opened: bool = data.get("api_opened", False) - self.uid: int = data.get("uid") - - self.category_name = category_name - self.category_id = category_id if category_id is not None else self.catid - - self._parsed_req_body_schema: Optional[Dict[str, Any]] = None - if self.req_body_type == "json" and self.req_body_other and self.req_body_is_json_schema: - try: - self._parsed_req_body_schema = json.loads(self.req_body_other) - except json.JSONDecodeError as e: - logger.error(f"YAPIEndpoint (ID: {self._id}, Title: {self.title}): Failed to parse req_body_other as JSON during init: {e}. Content: {self.req_body_other[:200]}") - - self._parsed_res_body_schema: Optional[Dict[str, Any]] = None - if self.res_body_type == "json" and self.res_body and self.res_body_is_json_schema: - try: - self._parsed_res_body_schema = json.loads(self.res_body) - except json.JSONDecodeError as e: - logger.error(f"YAPIEndpoint (ID: {self._id}, Title: {self.title}): Failed to parse res_body as JSON during init: {e}. Content: {self.res_body[:200]}") - - def to_dict(self) -> Dict[str, Any]: - endpoint_dict = { - "method": self.method, - "path": self.path, - "title": self.title, - "summary": self.title, - "description": self.desc or "", - "operationId": f"{self.method.lower()}_{self.path.replace('/', '_').replace('{', '').replace('}', '')}_{self._id}", - "tags": [self.category_name or str(self.catid)], - "parameters": [], - "requestBody": None, - "responses": {}, - "_source_format": "yapi", - "_yapi_id": self._id, - "_yapi_raw_data": self._raw_data # Keep raw data for debugging or deeper inspection if needed - } - - # Path parameters from req_params - for p_spec in self.req_params: - param_name = p_spec.get("name") - if not param_name: continue - endpoint_dict["parameters"].append({ - "name": param_name, - "in": "path", - "required": True, # Path parameters are always required - "description": p_spec.get("desc", ""), - "schema": {"type": "string", "example": p_spec.get("example", f"example_{param_name}")} - }) - - # Query parameters from req_query - for q_spec in self.req_query: - param_name = q_spec.get("name") - if not param_name: continue - is_required = q_spec.get("required") == "1" # YAPI uses "1" for true - param_schema = {"type": "string"} # Default to string, YAPI doesn't specify types well here - if "example" in q_spec: param_schema["example"] = q_spec["example"] - # Add other fields from YAPI query spec if needed (e.g., desc) - endpoint_dict["parameters"].append({ - "name": param_name, - "in": "query", - "required": is_required, - "description": q_spec.get("desc", ""), - "schema": param_schema - }) - - # Header parameters from req_headers - for h_spec in self.req_headers: - param_name = h_spec.get("name") - if not param_name or param_name.lower() == 'content-type': continue # Content-Type is handled by requestBody - is_required = h_spec.get("required") == "1" - default_value = h_spec.get("value") # YAPI uses 'value' for default/example header value - param_schema = {"type": "string"} - if default_value: - if is_required: # If required, it's more like an example of what's expected - param_schema["example"] = default_value - else: # If not required, it's a default value - param_schema["default"] = default_value - - endpoint_dict["parameters"].append({ - "name": param_name, - "in": "header", - "required": is_required, - "description": h_spec.get("desc", ""), - "schema": param_schema - }) - - # Request body - if self.req_body_type == "json" and self._parsed_req_body_schema: - endpoint_dict["requestBody"] = { - "content": { - "application/json": { - "schema": self._parsed_req_body_schema - } - } - } - elif self.req_body_type == "form" and self.req_body_form: - properties = {} - required_form_params = [] - for form_param in self.req_body_form: - name = form_param.get("name") - if not name: continue - properties[name] = { - "type": "string", # YAPI form params are typically strings, file uploads are different - "description": form_param.get("desc","") - } - if form_param.get("example"): properties[name]["example"] = form_param.get("example") - if form_param.get("required") == "1": required_form_params.append(name) - - endpoint_dict["requestBody"] = { - "content": { - "application/x-www-form-urlencoded": { - "schema": { - "type": "object", - "properties": properties, - "required": required_form_params if required_form_params else None # OpenAPI: omit if empty - } - } - # YAPI also supports req_body_type = 'file', which would map to multipart/form-data - # This example focuses on json and basic form. - } - } - # Add other req_body_types if necessary (e.g., raw, file) - - # Responses - # YAPI has a simpler response structure. We'll map its res_body to a default success response (e.g., 200 or 201). - default_success_status = "200" - if self.method == "POST": default_success_status = "201" # Common practice for POST success - - if self.res_body_type == "json" and self._parsed_res_body_schema: - endpoint_dict["responses"][default_success_status] = { - "description": "Successful Operation (from YAPI res_body)", - "content": { - "application/json": { - "schema": self._parsed_res_body_schema - } - } - } - elif self.res_body_type == "json" and not self._parsed_res_body_schema and self.res_body: # Schema parsing failed but text exists - endpoint_dict["responses"][default_success_status] = { - "description": "Successful Operation (Schema parsing error, raw text might be available)", - "content": {"application/json": {"schema": {"type": "object", "description": "Schema parsing failed for YAPI res_body."}}} # Placeholder - } - else: # No JSON schema, or other res_body_type - endpoint_dict["responses"][default_success_status] = { - "description": "Successful Operation (No specific schema provided in YAPI for this response)" - } - - # Ensure there's always a default response if nothing specific was added - if not endpoint_dict["responses"]: - endpoint_dict["responses"]["default"] = {"description": "Default response from YAPI definition"} - - return endpoint_dict - - def __repr__(self): - return f"" - -class SwaggerEndpoint(BaseEndpoint): # Inherit from BaseEndpoint - def __init__(self, path: str, method: str, data: Dict[str, Any], global_spec: Dict[str, Any]): - super().__init__(method=method.upper(), path=path) - self._raw_data = data - self._global_spec = global_spec # Store for $ref resolution - self.summary: Optional[str] = data.get("summary") - self.description: Optional[str] = data.get("description") - self.operation_id: Optional[str] = data.get("operationId") - self.tags: List[str] = data.get("tags", []) - # Parameters, requestBody, responses are processed by to_dict - - def _resolve_ref(self, ref_path: str) -> Optional[Dict[str, Any]]: - """Resolves a $ref path within the global OpenAPI/Swagger spec.""" - if not ref_path.startswith("#/"): - logger.warning(f"Unsupported $ref path: {ref_path}. Only local refs '#/...' are currently supported.") - return None - - parts = ref_path[2:].split('/') # Remove '#/' and split - current_level = self._global_spec - try: - for part in parts: - # Decode URI component encoding if present (e.g. "~0" for "~", "~1" for "/") - part = part.replace("~1", "/").replace("~0", "~") - current_level = current_level[part] - # It's crucial to return a copy if the resolved ref will be modified, - # or ensure modifications happen on copies later. - # For now, returning as is, assuming downstream processing is careful or uses copies. - if isinstance(current_level, dict): - return current_level # Potentially json.loads(json.dumps(current_level)) for a deep copy - else: # Resolved to a non-dict, which might be valid for some simple refs but unusual for schemas - logger.warning(f"$ref '{ref_path}' resolved to a non-dictionary type: {type(current_level)}. Value: {str(current_level)[:100]}") - return {"type": "string", "description": f"Resolved $ref '{ref_path}' to non-dict: {str(current_level)[:100]}"} # Placeholder - except (KeyError, TypeError, AttributeError) as e: - logger.error(f"Failed to resolve $ref '{ref_path}': {e}", exc_info=True) - return None - - def _process_schema_or_ref(self, schema_like: Any) -> Optional[Dict[str, Any]]: - """ - Processes a schema part, resolving $refs and recursively processing nested structures. - Returns a new dictionary with resolved refs, or None if resolution fails badly. - """ - if not isinstance(schema_like, dict): - if schema_like is None: return None - logger.warning(f"Expected a dictionary for schema processing, got {type(schema_like)}. Value: {str(schema_like)[:100]}") - return {"type": "string", "description": f"Schema was not a dict: {str(schema_like)[:100]}"} # Placeholder for non-dict schema - - # If it's a $ref, resolve it. - if "$ref" in schema_like: - return self._resolve_ref(schema_like["$ref"]) # This will be the new base schema_like - - # Create a copy to avoid modifying the original spec during processing - processed_schema = schema_like.copy() - - # Recursively process 'properties' for object schemas - if "properties" in processed_schema and isinstance(processed_schema["properties"], dict): - new_properties = {} - for prop_name, prop_schema in processed_schema["properties"].items(): - resolved_prop = self._process_schema_or_ref(prop_schema) - if resolved_prop is not None: # Only add if resolution was successful - new_properties[prop_name] = resolved_prop - # else: logger.warning(f"Failed to process property '{prop_name}' in {self.operation_id or self.path}") - processed_schema["properties"] = new_properties - - # Recursively process 'items' for array schemas - if "items" in processed_schema and isinstance(processed_schema["items"], dict): # 'items' should be a schema object - resolved_items = self._process_schema_or_ref(processed_schema["items"]) - if resolved_items is not None: - processed_schema["items"] = resolved_items - # else: logger.warning(f"Failed to process 'items' schema in {self.operation_id or self.path}") - - # Handle allOf, anyOf, oneOf by trying to merge or process them (simplistic merge for allOf) - # This is a complex area of JSON Schema. This is a very basic attempt. - if "allOf" in processed_schema and isinstance(processed_schema["allOf"], list): - merged_all_of_props = {} - merged_all_of_required = set() - temp_schema_for_all_of = {"type": processed_schema.get("type", "object"), "properties": {}, "required": []} - - for sub_schema_data in processed_schema["allOf"]: - resolved_sub_schema = self._process_schema_or_ref(sub_schema_data) - if resolved_sub_schema and isinstance(resolved_sub_schema, dict): - if "properties" in resolved_sub_schema: - temp_schema_for_all_of["properties"].update(resolved_sub_schema["properties"]) - if "required" in resolved_sub_schema and isinstance(resolved_sub_schema["required"], list): - merged_all_of_required.update(resolved_sub_schema["required"]) - # Copy other top-level keywords from the resolved_sub_schema if needed, e.g. description - for key, value in resolved_sub_schema.items(): - if key not in ["properties", "required", "type", "$ref", "allOf", "anyOf", "oneOf"]: - if key not in temp_schema_for_all_of or temp_schema_for_all_of[key] is None: # prioritize existing - temp_schema_for_all_of[key] = value - - if temp_schema_for_all_of["properties"]: - processed_schema["properties"] = {**processed_schema.get("properties",{}), **temp_schema_for_all_of["properties"]} - if merged_all_of_required: - current_required = set(processed_schema.get("required", [])) - current_required.update(merged_all_of_required) - processed_schema["required"] = sorted(list(current_required)) - del processed_schema["allOf"] # Remove allOf after processing - # Copy other merged attributes back to processed_schema - for key, value in temp_schema_for_all_of.items(): - if key not in ["properties", "required", "type", "$ref", "allOf", "anyOf", "oneOf"]: - if key not in processed_schema or processed_schema[key] is None: - processed_schema[key] = value - - - # anyOf, oneOf are harder as they represent choices. For now, we might just list them or pick first. - # For simplicity in to_dict, we might not fully expand them but ensure refs inside are resolved. - for keyword in ["anyOf", "oneOf"]: - if keyword in processed_schema and isinstance(processed_schema[keyword], list): - processed_sub_list = [] - for sub_item in processed_schema[keyword]: - resolved_sub = self._process_schema_or_ref(sub_item) - if resolved_sub: - processed_sub_list.append(resolved_sub) - if processed_sub_list: # only update if some were resolved - processed_schema[keyword] = processed_sub_list - - return processed_schema - - def to_dict(self) -> Dict[str, Any]: - endpoint_data = { - "method": self.method, - "path": self.path, - "summary": self.summary or "", - "title": self.summary or self.operation_id or "", # Fallback for title - "description": self.description or "", - "operationId": self.operation_id or f"{self.method.lower()}_{self.path.replace('/', '_').replace('{', '').replace('}', '')}", - "tags": self.tags, - "parameters": [], - "requestBody": None, - "responses": {}, - "_source_format": "swagger/openapi", - "_swagger_raw_data": self._raw_data, # Keep raw for debugging - "_global_api_spec_for_resolution": self._global_spec # For test cases that might need to resolve further - } - - # Process parameters - if "parameters" in self._raw_data and isinstance(self._raw_data["parameters"], list): - for param_data_raw in self._raw_data["parameters"]: - # Each param_data_raw could itself be a $ref or contain a schema that is a $ref - processed_param_container = self._process_schema_or_ref(param_data_raw) - if processed_param_container and isinstance(processed_param_container, dict): - # If the parameter itself was a $ref, processed_param_container is the resolved object. - # If it contained a schema that was a $ref, that nested schema should be resolved. - # We need to ensure 'schema' key exists if 'in' is path, query, header - if "schema" in processed_param_container and isinstance(processed_param_container["schema"], dict): - # schema was present, process it further (it might have been already by _process_schema_or_ref if it was a complex object) - # but if _process_schema_or_ref was called on param_data_raw which wasn't a ref itself, - # the internal 'schema' ref might not have been re-processed with full context. - # However, the recursive nature of _process_schema_or_ref should handle nested $refs. - pass # Assume it's processed by the main call to _process_schema_or_ref on param_data_raw - elif "content" in processed_param_container: # Parameter described by Content Object (OpenAPI 3.x) - pass # Content object schemas should have been resolved by _process_schema_or_ref - - endpoint_data["parameters"].append(processed_param_container) - - # Process requestBody - if "requestBody" in self._raw_data and isinstance(self._raw_data["requestBody"], dict): - processed_rb = self._process_schema_or_ref(self._raw_data["requestBody"]) - if processed_rb: - endpoint_data["requestBody"] = processed_rb - - # Process responses - if "responses" in self._raw_data and isinstance(self._raw_data["responses"], dict): - for status_code, resp_data_raw in self._raw_data["responses"].items(): - processed_resp = self._process_schema_or_ref(resp_data_raw) - if processed_resp: - endpoint_data["responses"][status_code] = processed_resp - elif resp_data_raw: # If processing failed but raw exists, keep raw (though this is less ideal) - endpoint_data["responses"][status_code] = resp_data_raw - logger.warning(f"Kept raw response data for {status_code} due to processing failure for {self.operation_id or self.path}") - - if not endpoint_data["responses"]: # Ensure default response if none processed - endpoint_data["responses"]["default"] = {"description": "Default response from Swagger/OpenAPI definition"} - - return endpoint_data - - def __repr__(self): - return f"" - -class DMSEndpoint(BaseEndpoint): - """Represents an API endpoint discovered dynamically from the DMS service.""" - def __init__(self, method: str, path: str, title: str, - request_body: Optional[Dict[str, Any]], - responses: Dict[str, Any], - parameters: Optional[List[Dict[str, Any]]] = None, - category_name: Optional[str] = None, - raw_record: Optional[Dict[str, Any]] = None, - test_mode: str = 'standalone', - operation_id: Optional[str] = None, - model_pk_name: Optional[str] = None, - identity_id_list: Optional[List[str]] = None): - super().__init__(method=method.upper(), path=path) - self.title = title - self.request_body = request_body - self.responses = responses - self.parameters = parameters if parameters is not None else [] - self.category_name = category_name - self._raw_record = raw_record - self.test_mode = test_mode - self.operation_id = operation_id or f"{self.method.lower()}_{self.category_name or 'dms'}_{title.replace(' ', '_')}" - self.model_pk_name = model_pk_name - self.identity_id_list = identity_id_list or [] - - def to_dict(self) -> Dict[str, Any]: - """Converts the DMS endpoint data into a standardized OpenAPI-like dictionary.""" - endpoint_dict = { - "method": self.method, - "path": self.path, - "title": self.title, - "summary": self.title, - "description": self.title, - "operationId": self.operation_id, - "tags": [self.category_name] if self.category_name else [], - "parameters": self.parameters, - "requestBody": self.request_body, - "responses": self.responses, - "_source_format": "dms", - "_dms_raw_record": self._raw_record, - "_test_mode": self.test_mode, - "_dms_model_pk_name": self.model_pk_name - } - return endpoint_dict - - def __repr__(self): - return f"" - -Endpoint = Union[YAPIEndpoint, SwaggerEndpoint, DMSEndpoint] - -class ParsedAPISpec: - """Base class for a parsed API specification from any source.""" - def __init__(self, spec_type: str, endpoints: List[Union[YAPIEndpoint, SwaggerEndpoint, 'DMSEndpoint']], spec: Dict[str, Any]): - self.spec_type = spec_type - self.endpoints = endpoints - self.spec = spec # Store the original full spec dictionary, useful for $ref resolution if not pre-resolved - -class ParsedYAPISpec(ParsedAPISpec): - """解析后的YAPI规范""" - def __init__(self, endpoints: List[YAPIEndpoint], categories: List[Dict[str, Any]], spec: Dict[str, Any]): - super().__init__(spec_type="yapi", endpoints=endpoints, spec=spec) - self.categories = categories - -class ParsedSwaggerSpec(ParsedAPISpec): - """解析后的Swagger/OpenAPI规范""" - def __init__(self, endpoints: List[SwaggerEndpoint], tags: List[Dict[str, Any]], spec: Dict[str, Any]): - super().__init__(spec_type="swagger", endpoints=endpoints, spec=spec) - self.tags = tags - -class ParsedDMSSpec(ParsedAPISpec): - """Parsed specification from the dynamic DMS source.""" - def __init__(self, endpoints: List[DMSEndpoint], spec: Dict[str, Any]): - super().__init__(spec_type="dms", endpoints=endpoints, spec=spec) - -class InputParser: - """负责解析输入(如YAPI JSON)并提取API端点信息""" - def __init__(self): - self.logger = logging.getLogger(__name__) - - def parse_yapi_spec(self, file_path: str) -> Optional[ParsedYAPISpec]: - self.logger.info(f"Parsing YAPI spec from: {file_path}") - all_endpoints: List[YAPIEndpoint] = [] - yapi_categories: List[Dict[str, Any]] = [] - raw_spec_data_list: Optional[List[Dict[str, Any]]] = None # YAPI export is a list of categories - try: - with open(file_path, 'r', encoding='utf-8') as f: - raw_spec_data_list = json.load(f) - - if not isinstance(raw_spec_data_list, list): - self.logger.error(f"YAPI spec file {file_path} does not contain a JSON list as expected for categories.") - return None - - for category_data in raw_spec_data_list: - if not isinstance(category_data, dict): - self.logger.warning(f"Skipping non-dictionary item in YAPI spec list: {str(category_data)[:100]}") - continue - cat_name = category_data.get("name") - cat_id = category_data.get("_id", category_data.get("id")) # YAPI uses _id - yapi_categories.append({"name": cat_name, "description": category_data.get("desc"), "id": cat_id}) - - for endpoint_data in category_data.get("list", []): - if not isinstance(endpoint_data, dict): - self.logger.warning(f"Skipping non-dictionary endpoint item in category '{cat_name}': {str(endpoint_data)[:100]}") - continue - try: - yapi_endpoint = YAPIEndpoint(endpoint_data, category_name=cat_name, category_id=cat_id) - all_endpoints.append(yapi_endpoint) - except Exception as e_ep: - self.logger.error(f"Error processing YAPI endpoint data (ID: {endpoint_data.get('_id', 'N/A')}, Title: {endpoint_data.get('title', 'N/A')}). Error: {e_ep}", exc_info=True) - - # The 'spec' for ParsedYAPISpec should be a dict representing the whole document. - # Since YAPI export is a list of categories, we wrap it. - yapi_full_spec_dict = {"yapi_categories": raw_spec_data_list} - return ParsedYAPISpec(endpoints=all_endpoints, categories=yapi_categories, spec=yapi_full_spec_dict) - except FileNotFoundError: - self.logger.error(f"YAPI spec file not found: {file_path}") - except json.JSONDecodeError as e: - self.logger.error(f"Error decoding JSON from YAPI spec file {file_path}: {e}") - except Exception as e: - self.logger.error(f"An unexpected error occurred while parsing YAPI spec {file_path}: {e}", exc_info=True) - return None - - def parse_swagger_spec(self, file_path: str) -> Optional[ParsedSwaggerSpec]: - self.logger.info(f"Parsing Swagger/OpenAPI spec from: {file_path}") - all_endpoints: List[SwaggerEndpoint] = [] - swagger_tags: List[Dict[str, Any]] = [] - raw_spec_data_dict: Optional[Dict[str, Any]] = None # Swagger/OpenAPI is a single root object - try: - with open(file_path, 'r', encoding='utf-8') as f: - # TODO: Add YAML support if needed, e.g., using PyYAML - raw_spec_data_dict = json.load(f) - - if not isinstance(raw_spec_data_dict, dict): - self.logger.error(f"Swagger spec file {file_path} does not contain a JSON object as expected.") - return None - - swagger_tags = raw_spec_data_dict.get("tags", []) - paths = raw_spec_data_dict.get("paths", {}) - - for path, path_item_obj in paths.items(): - if not isinstance(path_item_obj, dict): continue - for method, operation_obj in path_item_obj.items(): - # Common methods, can be extended - if method.lower() not in ["get", "post", "put", "delete", "patch", "options", "head", "trace"]: - continue # Skip non-standard HTTP methods or extensions like 'parameters' at path level - if not isinstance(operation_obj, dict): continue - try: - # Pass the full raw_spec_data_dict for $ref resolution within SwaggerEndpoint - swagger_endpoint = SwaggerEndpoint(path, method, operation_obj, global_spec=raw_spec_data_dict) - all_endpoints.append(swagger_endpoint) - except Exception as e_ep: - self.logger.error(f"Error processing Swagger endpoint: {method.upper()} {path}. Error: {e_ep}", exc_info=True) - - return ParsedSwaggerSpec(endpoints=all_endpoints, tags=swagger_tags, spec=raw_spec_data_dict) - except FileNotFoundError: - self.logger.error(f"Swagger spec file not found: {file_path}") - except json.JSONDecodeError as e: - self.logger.error(f"Error decoding JSON from Swagger spec file {file_path}: {e}") - except Exception as e: - self.logger.error(f"An unexpected error occurred while parsing Swagger spec {file_path}: {e}", exc_info=True) - return None - - def parse_dms_spec(self, domain_mapping_path: str, base_url: str, headers: Optional[Dict[str, str]] = None, ignore_ssl: bool = False, page_size: int = 1000, page_no_start: int = 1, fetch_all_pages: bool = True) -> Optional[Tuple[ParsedDMSSpec, Dict[str, Any]]]: - self.logger.info(f"Starting DMS spec parsing. Base URL: {base_url}, Domain Map: {domain_mapping_path}") - - if ignore_ssl: - self.logger.warning("SSL certificate verification is disabled for DMS API calls. This is not recommended for production use.") - # 禁用SSL警告 - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - headers = headers or {} - - try: - with open(domain_mapping_path, 'r', encoding='utf-8') as f: - DOMAIN_MAP = json.load(f) - except (FileNotFoundError, json.JSONDecodeError) as e: - self.logger.warning(f"Could not load or parse domain map file '{domain_mapping_path}'. Using default domain. Error: {e}") - DOMAIN_MAP = {} - - # 🔧 构建关键词到领域ID的映射表 - keyword_to_domain_id = {} - for domain_id, domain_info in DOMAIN_MAP.items(): - if isinstance(domain_info, dict) and 'keywords' in domain_info: - keywords = domain_info['keywords'] - if isinstance(keywords, list): - for keyword in keywords: - keyword_to_domain_id[keyword] = domain_id - self.logger.debug(f"映射关键词 '{keyword}' -> 领域ID '{domain_id}'") - - # 实现分页获取API列表 - if fetch_all_pages: - self.logger.info(f"Fetching ALL API pages with pagination (page_size={page_size}, starting from page {page_no_start})") - else: - self.logger.info(f"Fetching SINGLE page (page_size={page_size}, page_no={page_no_start})") - - api_records = [] - page_no = page_no_start - total_fetched = 0 - pagination_info = { - "page_size": page_size, - "page_no_start": page_no_start, - "total_pages": 0, - "total_records": 0, - "pages_fetched": 0, - "current_page": page_no_start, - "fetch_all_pages": fetch_all_pages - } - - try: - while True: - list_url = urljoin(base_url, f"/api/schema/manage/schema?pageNo={page_no}&pageSize={page_size}") - self.logger.debug(f"Fetching page {page_no} from: {list_url}") - - response = requests.get(list_url, headers=headers, verify=not ignore_ssl) - response.raise_for_status() - api_list_data = response.json() - - # 检查业务代码是否成功 - if api_list_data.get("code") != 0: - self.logger.error(f"DMS API list endpoint returned a business error: {api_list_data.get('message')}") - return None, {} - - # 从分页结构中提取 'records' - page_records = api_list_data.get("data", {}).get("records", []) - if not page_records: - self.logger.info(f"No more records found on page {page_no}, stopping pagination") - break - - api_records.extend(page_records) - total_fetched += len(page_records) - self.logger.info(f"Fetched {len(page_records)} records from page {page_no}, total: {total_fetched}") - - # 更新分页信息 - data = api_list_data.get("data", {}) - total_count = data.get("total", 0) - current_count = data.get("current", 0) * data.get("size", page_size) - - # 第一次获取时更新总数信息 - if page_no == 1: - pagination_info["total_records"] = total_count - pagination_info["total_pages"] = (total_count + page_size - 1) // page_size # 向上取整 - - pagination_info["pages_fetched"] = page_no - page_no_start + 1 - pagination_info["current_page"] = page_no - - # 如果是单页模式,获取一页后就停止 - if not fetch_all_pages: - self.logger.info(f"Single page mode: fetched {len(page_records)} records from page {page_no}") - break - - # 全页模式:检查是否还有更多页面 - if current_count >= total_count or len(page_records) < page_size: - self.logger.info(f"Reached end of data. Total records: {total_fetched}") - break - - page_no += 1 - - # 安全检查:防止无限循环 - if page_no > 1000: # 最多1000页 - self.logger.warning("Reached maximum page limit (1000), stopping pagination") - break - - if not api_records: - self.logger.warning("DMS API list is empty after pagination.") - return ParsedDMSSpec(endpoints=[], spec={"dms_api_list": []}), pagination_info - - except requests.exceptions.RequestException as e: - self.logger.error(f"Failed to fetch API list from DMS: {e}") - return None, {} - except json.JSONDecodeError: - self.logger.error("Failed to decode JSON response from DMS API list.") - return None, {} - - endpoints: List[DMSEndpoint] = [] - - for item in api_records: - domain_name = item.get('domain') - name = item.get('name') - model_id = item.get('id') - if not all(k in item for k in ['domain', 'name', 'id']): - self.logger.warning(f"Skipping an item in API list because it's missing 'domain', 'name', or 'id': {item}") - continue - - # 🔧 改进领域映射:支持精确匹配和前缀匹配 - instance_code = keyword_to_domain_id.get(domain_name) - if instance_code: - self.logger.info(f"通过精确关键词匹配:'{domain_name}' -> '{instance_code}'") - else: - # 尝试前缀匹配(如wb_cd匹配wb) - for keyword, domain_id in keyword_to_domain_id.items(): - if domain_name.startswith(keyword + '_'): # wb_cd以wb_开头 - instance_code = domain_id - self.logger.info(f"通过前缀关键词匹配:'{domain_name}' -> '{instance_code}' (匹配关键词: '{keyword}')") - break - - if not instance_code: - # Fallback到原有的直接映射 - instance_code = DOMAIN_MAP.get(domain_name, domain_name) - self.logger.info(f"使用直接映射:'{domain_name}' -> '{instance_code}'") - model_url = urljoin(base_url, f"/api/schema/manage/schema/{model_id}") - self.logger.info(f"Fetching model for '{name}' from: {model_url}") - - try: - response = requests.get(model_url, headers=headers, verify=not ignore_ssl) - response.raise_for_status() - model_schema_response = response.json() - except requests.exceptions.RequestException as e: - self.logger.error(f"Error fetching model for '{name}': {e}") - continue - except json.JSONDecodeError: - self.logger.error(f"Failed to decode JSON for model '{name}'.") - continue - - if not model_schema_response or 'data' not in model_schema_response or not model_schema_response['data']: - self.logger.warning(f"Skipping API '{name}' due to missing or empty model schema in response.") - continue - - model_data = model_schema_response['data'] - # Based on user feedback, model_data itself is the schema, not model_data['model'] - model = model_data - if not model or 'properties' not in model or not model['properties']: - self.logger.warning(f"Skipping API '{name}' due to missing or invalid 'model' object in schema.") - continue - - pk_name = None - # Find primary key by looking for top-level "identityId" array. - identity_id_list = model.get("identityId") - if isinstance(identity_id_list, list) and len(identity_id_list) > 0: - candidate_pk_name = identity_id_list[0] - # 🔧 验证identityId指向的字段是否真的存在于properties中 - if candidate_pk_name in model['properties']: - pk_name = candidate_pk_name - self.logger.info(f"Found identityId property '{pk_name}' for model '{name}'.") - else: - self.logger.warning(f"identityId property '{candidate_pk_name}' not found in model properties for '{name}'. Will use fallback.") - - # Fallback to original behavior if no identityId found or identityId field doesn't exist - if not pk_name: - pk_name = next(iter(model['properties']), None) - if pk_name: - self.logger.warning(f"No valid 'identityId' found for model '{name}'. Falling back to using the first property '{pk_name}' as the primary key.") - - if not pk_name: - self.logger.warning(f"Skipping API '{name}' because no properties found in model to identify a primary key.") - continue - - # 🔧 再次验证pk_name确实存在于properties中(双重保险) - if pk_name not in model['properties']: - self.logger.error(f"Critical error: Primary key '{pk_name}' not found in model properties for '{name}'. Skipping this model.") - continue - - pk_schema = model['properties'][pk_name] - - version = model_data.get('version', '1.0.0') - dms_instance_code = instance_code - category_name = domain_name - - success_response = { - "200": { "description": "Success", "content": {"application/json": {"schema": {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": {"type": "boolean"}}}}}}} - - # Create Endpoint (POST) - create_path = f"/api/dms/{dms_instance_code}/v1/{name}" - create_request_body_schema = {"type": "object", "properties": {"version": {"type": "string", "example": version}, "act": {"type": "integer", "example": -1}, "data": {"type": "array", "items": model}}, "required": ["data"]} - endpoints.append(DMSEndpoint(path=create_path, method='post', title=f"Create {name}", request_body={'content': {'application/json': {'schema': create_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"create_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) - - # List Endpoint (POST) - 🔧 添加pageNo和pageSize查询参数 - list_path = f"/api/dms/{dms_instance_code}/v1/{name}/{version}" - list_response_schema = {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": {"type": "array", "items": model}}} - list_parameters = [ - {'name': 'pageNo', 'in': 'query', 'required': False, 'description': '页码(从1开始)', 'schema': {'type': 'integer', 'default': 1}}, - {'name': 'pageSize', 'in': 'query', 'required': False, 'description': '分页大小(最大值200)', 'schema': {'type': 'integer', 'default': 1000}} - ] - # List请求体Schema(包含version字段,但不包含act字段) - list_request_body_schema = { - "type": "object", - "properties": { - "version": {"type": "string", "example": version}, - "isSearchCount": {"type": "boolean", "example": True}, - "query": { - "type": "object", - "properties": { - "fields": {"type": "array", "items": {"type": "string"}}, - "filter": {"type": "object"} - } - } - } - } - endpoints.append(DMSEndpoint(path=list_path, method='post', title=f"List {name}", request_body={'content': {'application/json': {'schema': list_request_body_schema}}}, responses={'200': {'description': 'Successful Operation', 'content': {'application/json': {'schema': list_response_schema}}}}, parameters=list_parameters, test_mode='standalone', operation_id=f"list_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) - - # Read Endpoint (GET) - 🔧 只为单主键模型生成查询详情接口 - if isinstance(identity_id_list, list) and len(identity_id_list) > 1: - # 🚫 多主键模型:不生成查询详情接口 - self.logger.info(f"跳过多主键模型 '{name}' 的查询详情接口生成,主键数量: {len(identity_id_list)}") - else: - # ✅ 单主键模型:生成查询详情接口 - read_path = f"/api/dms/{dms_instance_code}/v1/{name}/{version}/{{id}}" - read_parameters = [{'name': 'id', 'in': 'path', 'required': True, 'description': f'The ID of the {name}, maps to {pk_name}', 'schema': pk_schema}] - read_response_schema = {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": model}} - endpoints.append(DMSEndpoint(path=read_path, method='get', title=f"Read {name}", request_body=None, responses={'200': {'description': 'Successful Operation', 'content': {'application/json': {'schema': read_response_schema}}}}, parameters=read_parameters, test_mode='scenario_only', operation_id=f"read_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) - self.logger.info(f"创建单主键读取端点 '{name}',路径参数: id") - - # Update Endpoint (PUT) - update_path = f"/api/dms/{dms_instance_code}/v1/{name}" - update_request_body_schema = {"type": "object", "properties": {"version": {"type": "string", "example": version}, "act": {"type": "integer", "example": -1}, "data": {"type": "array", "items": model}}, "required": ["data"]} - endpoints.append(DMSEndpoint(path=update_path, method='put', title=f"Update {name}", request_body={'content': {'application/json': {'schema': update_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"update_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) - - # Delete Endpoint (DELETE) - delete_path = f"/api/dms/{dms_instance_code}/v1/{name}" - - # 根据identityId列表长度决定删除schema结构 - if isinstance(identity_id_list, list) and len(identity_id_list) > 1: - # 多主键:使用对象数组 - delete_items_properties = {} - delete_required_fields = [] - for pk_field in identity_id_list: - if pk_field in model['properties']: - delete_items_properties[pk_field] = model['properties'][pk_field] - delete_required_fields.append(pk_field) - - delete_request_body_schema = { - "type": "object", - "properties": { - "version": {"type": "string", "example": version}, - "data": { - "type": "array", - "items": { - "type": "object", - "properties": delete_items_properties, - "required": delete_required_fields - } - } - }, - "required": ["data"] - } - self.logger.info(f"创建多主键删除端点 '{name}',主键字段: {identity_id_list}") - else: - # 单主键:使用字符串数组 - delete_request_body_schema = { - "type": "object", - "properties": { - "version": {"type": "string", "example": version}, - "data": { - "type": "array", - "items": {"type": "string"} - } - }, - "required": ["data"] - } - self.logger.info(f"创建单主键删除端点 '{name}',主键字段: {pk_name}") - - endpoints.append(DMSEndpoint(path=delete_path, method='delete', title=f"Delete {name}", request_body={'content': {'application/json': {'schema': delete_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"delete_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) - - # The 'spec' for ParsedDMSSpec should represent the whole document. - # We can construct a dictionary holding all the raw data we fetched. - dms_full_spec_dict = {"dms_api_list": api_records} - return ParsedDMSSpec(endpoints=endpoints, spec=dms_full_spec_dict), pagination_info - -class DmsConfig: - def __init__(self, base_url: str, domain_map_file: str, headers: Optional[Dict[str, str]] = None): - self.base_url = base_url - self.domain_map_file = domain_map_file - self.headers = headers - -def get_endpoints_from_swagger(file_path: str) -> Tuple[List[SwaggerEndpoint], str]: - """ - Parses a Swagger/OpenAPI JSON file and returns a list of SwaggerEndpoint objects - and the base path of the API. - """ - logger.info(f"Parsing Swagger/OpenAPI spec from: {file_path}") - all_endpoints: List[SwaggerEndpoint] = [] - swagger_tags: List[Dict[str, Any]] = [] - raw_spec_data_dict: Optional[Dict[str, Any]] = None # Swagger/OpenAPI is a single root object - try: - with open(file_path, 'r', encoding='utf-8') as f: - # TODO: Add YAML support if needed, e.g., using PyYAML - raw_spec_data_dict = json.load(f) - - if not isinstance(raw_spec_data_dict, dict): - logger.error(f"Swagger spec file {file_path} does not contain a JSON object as expected.") - return [], "" - - swagger_tags = raw_spec_data_dict.get("tags", []) - paths = raw_spec_data_dict.get("paths", {}) - - for path, path_item_obj in paths.items(): - if not isinstance(path_item_obj, dict): continue - for method, operation_obj in path_item_obj.items(): - # Common methods, can be extended - if method.lower() not in ["get", "post", "put", "delete", "patch", "options", "head", "trace"]: - continue # Skip non-standard HTTP methods or extensions like 'parameters' at path level - if not isinstance(operation_obj, dict): continue - try: - # Pass the full raw_spec_data_dict for $ref resolution within SwaggerEndpoint - swagger_endpoint = SwaggerEndpoint(path, method, operation_obj, global_spec=raw_spec_data_dict) - all_endpoints.append(swagger_endpoint) - except Exception as e_ep: - logger.error(f"Error processing Swagger endpoint: {method.upper()} {path}. Error: {e_ep}", exc_info=True) - - return all_endpoints, raw_spec_data_dict.get("basePath", "") if raw_spec_data_dict.get("basePath") else "" - except FileNotFoundError: - # It's better to log this error. Assuming a logger is available at self.logger - logger.error(f"Swagger spec file not found: {file_path}") - return [], "" - except json.JSONDecodeError as e: - logger.error(f"Error decoding JSON from Swagger spec file {file_path}: {e}") - return [], "" - except Exception as e: - logger.error(f"An unexpected error occurred while parsing {file_path}: {e}", exc_info=True) - return [], "" - - -def parse_input_to_endpoints(input_type: str, input_path: str, dms_config: DmsConfig = None) -> Tuple[List[Endpoint], str]: - """ - Parses input from a given type (YAPI, Swagger, DMS) and returns a list of Endpoint objects - and the base path of the API. - """ - parser = InputParser() - if input_type == "yapi": - parsed_spec = parser.parse_yapi_spec(input_path) - if parsed_spec: - return parsed_spec.endpoints, "" # YAPI doesn't have a base path in the same sense as Swagger - elif input_type == "swagger": - # The standalone get_endpoints_from_swagger is simple, but for consistency let's use the parser - parsed_spec = parser.parse_swagger_spec(input_path) - if parsed_spec: - base_path = parsed_spec.spec.get("basePath", "") or "" - # servers URL might be more modern (OpenAPI 3) - if not base_path and "servers" in parsed_spec.spec and parsed_spec.spec["servers"]: - # Use the first server URL - base_path = parsed_spec.spec["servers"][0].get("url", "") - return parsed_spec.endpoints, base_path - elif input_type == "dms": - if dms_config: - parsed_spec = parser.parse_dms_spec(dms_config.domain_map_file, dms_config.base_url, dms_config.headers) - if parsed_spec: - return parsed_spec.endpoints, dms_config.base_url - else: - logger.error("DMS configuration not provided for DMS input type.") - return [], "" - else: - logger.error(f"Unsupported input type: {input_type}") - return [], "" - +import json +import logging +from typing import List, Dict, Any, Optional, Union, Tuple +import requests +from urllib.parse import urljoin +import copy +import urllib3 + +logger = logging.getLogger(__name__) + +class BaseEndpoint: + """所有端点对象的基类,可以包含一些通用属性或方法。""" + def __init__(self, method: str, path: str): + self.method = method + self.path = path + + def to_dict(self) -> Dict[str, Any]: + # 基类可以提供一个默认的 to_dict 实现或要求子类实现 + raise NotImplementedError("Subclasses must implement to_dict") + +class YAPIEndpoint(BaseEndpoint): # Inherit from BaseEndpoint + def __init__(self, data: Dict[str, Any], category_name: Optional[str] = None, category_id: Optional[int] = None): + super().__init__(method=data.get("method", "GET").upper(), path=data.get("path", "")) + self._raw_data = data + self.title: str = data.get("title", "") + self.desc: Optional[str] = data.get("desc") + self._id: int = data.get("_id") + self.project_id: int = data.get("project_id") + self.catid: int = data.get("catid") + + self.req_params: List[Dict[str, Any]] = data.get("req_params", []) + self.req_query: List[Dict[str, Any]] = data.get("req_query", []) + self.req_headers: List[Dict[str, Any]] = data.get("req_headers", []) + self.req_body_form: List[Dict[str, Any]] = data.get("req_body_form", []) + + self.req_body_type: Optional[str] = data.get("req_body_type") + self.req_body_is_json_schema: bool = data.get("req_body_is_json_schema", False) + self.req_body_other: Optional[str] = data.get("req_body_other") + + self.res_body_type: Optional[str] = data.get("res_body_type") + self.res_body_is_json_schema: bool = data.get("res_body_is_json_schema", False) + self.res_body: Optional[str] = data.get("res_body") + + self.status: str = data.get("status", "undone") + self.api_opened: bool = data.get("api_opened", False) + self.uid: int = data.get("uid") + + self.category_name = category_name + self.category_id = category_id if category_id is not None else self.catid + + self._parsed_req_body_schema: Optional[Dict[str, Any]] = None + if self.req_body_type == "json" and self.req_body_other and self.req_body_is_json_schema: + try: + self._parsed_req_body_schema = json.loads(self.req_body_other) + except json.JSONDecodeError as e: + logger.error(f"YAPIEndpoint (ID: {self._id}, Title: {self.title}): Failed to parse req_body_other as JSON during init: {e}. Content: {self.req_body_other[:200]}") + + self._parsed_res_body_schema: Optional[Dict[str, Any]] = None + if self.res_body_type == "json" and self.res_body and self.res_body_is_json_schema: + try: + self._parsed_res_body_schema = json.loads(self.res_body) + except json.JSONDecodeError as e: + logger.error(f"YAPIEndpoint (ID: {self._id}, Title: {self.title}): Failed to parse res_body as JSON during init: {e}. Content: {self.res_body[:200]}") + + def to_dict(self) -> Dict[str, Any]: + endpoint_dict = { + "method": self.method, + "path": self.path, + "title": self.title, + "summary": self.title, + "description": self.desc or "", + "operationId": f"{self.method.lower()}_{self.path.replace('/', '_').replace('{', '').replace('}', '')}_{self._id}", + "tags": [self.category_name or str(self.catid)], + "parameters": [], + "requestBody": None, + "responses": {}, + "_source_format": "yapi", + "_yapi_id": self._id, + "_yapi_raw_data": self._raw_data # Keep raw data for debugging or deeper inspection if needed + } + + # Path parameters from req_params + for p_spec in self.req_params: + param_name = p_spec.get("name") + if not param_name: continue + endpoint_dict["parameters"].append({ + "name": param_name, + "in": "path", + "required": True, # Path parameters are always required + "description": p_spec.get("desc", ""), + "schema": {"type": "string", "example": p_spec.get("example", f"example_{param_name}")} + }) + + # Query parameters from req_query + for q_spec in self.req_query: + param_name = q_spec.get("name") + if not param_name: continue + is_required = q_spec.get("required") == "1" # YAPI uses "1" for true + param_schema = {"type": "string"} # Default to string, YAPI doesn't specify types well here + if "example" in q_spec: param_schema["example"] = q_spec["example"] + # Add other fields from YAPI query spec if needed (e.g., desc) + endpoint_dict["parameters"].append({ + "name": param_name, + "in": "query", + "required": is_required, + "description": q_spec.get("desc", ""), + "schema": param_schema + }) + + # Header parameters from req_headers + for h_spec in self.req_headers: + param_name = h_spec.get("name") + if not param_name or param_name.lower() == 'content-type': continue # Content-Type is handled by requestBody + is_required = h_spec.get("required") == "1" + default_value = h_spec.get("value") # YAPI uses 'value' for default/example header value + param_schema = {"type": "string"} + if default_value: + if is_required: # If required, it's more like an example of what's expected + param_schema["example"] = default_value + else: # If not required, it's a default value + param_schema["default"] = default_value + + endpoint_dict["parameters"].append({ + "name": param_name, + "in": "header", + "required": is_required, + "description": h_spec.get("desc", ""), + "schema": param_schema + }) + + # Request body + if self.req_body_type == "json" and self._parsed_req_body_schema: + endpoint_dict["requestBody"] = { + "content": { + "application/json": { + "schema": self._parsed_req_body_schema + } + } + } + elif self.req_body_type == "form" and self.req_body_form: + properties = {} + required_form_params = [] + for form_param in self.req_body_form: + name = form_param.get("name") + if not name: continue + properties[name] = { + "type": "string", # YAPI form params are typically strings, file uploads are different + "description": form_param.get("desc","") + } + if form_param.get("example"): properties[name]["example"] = form_param.get("example") + if form_param.get("required") == "1": required_form_params.append(name) + + endpoint_dict["requestBody"] = { + "content": { + "application/x-www-form-urlencoded": { + "schema": { + "type": "object", + "properties": properties, + "required": required_form_params if required_form_params else None # OpenAPI: omit if empty + } + } + # YAPI also supports req_body_type = 'file', which would map to multipart/form-data + # This example focuses on json and basic form. + } + } + # Add other req_body_types if necessary (e.g., raw, file) + + # Responses + # YAPI has a simpler response structure. We'll map its res_body to a default success response (e.g., 200 or 201). + default_success_status = "200" + if self.method == "POST": default_success_status = "201" # Common practice for POST success + + if self.res_body_type == "json" and self._parsed_res_body_schema: + endpoint_dict["responses"][default_success_status] = { + "description": "Successful Operation (from YAPI res_body)", + "content": { + "application/json": { + "schema": self._parsed_res_body_schema + } + } + } + elif self.res_body_type == "json" and not self._parsed_res_body_schema and self.res_body: # Schema parsing failed but text exists + endpoint_dict["responses"][default_success_status] = { + "description": "Successful Operation (Schema parsing error, raw text might be available)", + "content": {"application/json": {"schema": {"type": "object", "description": "Schema parsing failed for YAPI res_body."}}} # Placeholder + } + else: # No JSON schema, or other res_body_type + endpoint_dict["responses"][default_success_status] = { + "description": "Successful Operation (No specific schema provided in YAPI for this response)" + } + + # Ensure there's always a default response if nothing specific was added + if not endpoint_dict["responses"]: + endpoint_dict["responses"]["default"] = {"description": "Default response from YAPI definition"} + + return endpoint_dict + + def __repr__(self): + return f"" + +class SwaggerEndpoint(BaseEndpoint): # Inherit from BaseEndpoint + def __init__(self, path: str, method: str, data: Dict[str, Any], global_spec: Dict[str, Any]): + super().__init__(method=method.upper(), path=path) + self._raw_data = data + self._global_spec = global_spec # Store for $ref resolution + self.summary: Optional[str] = data.get("summary") + self.description: Optional[str] = data.get("description") + self.operation_id: Optional[str] = data.get("operationId") + self.tags: List[str] = data.get("tags", []) + # Parameters, requestBody, responses are processed by to_dict + + def _resolve_ref(self, ref_path: str) -> Optional[Dict[str, Any]]: + """Resolves a $ref path within the global OpenAPI/Swagger spec.""" + if not ref_path.startswith("#/"): + logger.warning(f"Unsupported $ref path: {ref_path}. Only local refs '#/...' are currently supported.") + return None + + parts = ref_path[2:].split('/') # Remove '#/' and split + current_level = self._global_spec + try: + for part in parts: + # Decode URI component encoding if present (e.g. "~0" for "~", "~1" for "/") + part = part.replace("~1", "/").replace("~0", "~") + current_level = current_level[part] + # It's crucial to return a copy if the resolved ref will be modified, + # or ensure modifications happen on copies later. + # For now, returning as is, assuming downstream processing is careful or uses copies. + if isinstance(current_level, dict): + return current_level # Potentially json.loads(json.dumps(current_level)) for a deep copy + else: # Resolved to a non-dict, which might be valid for some simple refs but unusual for schemas + logger.warning(f"$ref '{ref_path}' resolved to a non-dictionary type: {type(current_level)}. Value: {str(current_level)[:100]}") + return {"type": "string", "description": f"Resolved $ref '{ref_path}' to non-dict: {str(current_level)[:100]}"} # Placeholder + except (KeyError, TypeError, AttributeError) as e: + logger.error(f"Failed to resolve $ref '{ref_path}': {e}", exc_info=True) + return None + + def _process_schema_or_ref(self, schema_like: Any) -> Optional[Dict[str, Any]]: + """ + Processes a schema part, resolving $refs and recursively processing nested structures. + Returns a new dictionary with resolved refs, or None if resolution fails badly. + """ + if not isinstance(schema_like, dict): + if schema_like is None: return None + logger.warning(f"Expected a dictionary for schema processing, got {type(schema_like)}. Value: {str(schema_like)[:100]}") + return {"type": "string", "description": f"Schema was not a dict: {str(schema_like)[:100]}"} # Placeholder for non-dict schema + + # If it's a $ref, resolve it. + if "$ref" in schema_like: + return self._resolve_ref(schema_like["$ref"]) # This will be the new base schema_like + + # Create a copy to avoid modifying the original spec during processing + processed_schema = schema_like.copy() + + # Recursively process 'properties' for object schemas + if "properties" in processed_schema and isinstance(processed_schema["properties"], dict): + new_properties = {} + for prop_name, prop_schema in processed_schema["properties"].items(): + resolved_prop = self._process_schema_or_ref(prop_schema) + if resolved_prop is not None: # Only add if resolution was successful + new_properties[prop_name] = resolved_prop + # else: logger.warning(f"Failed to process property '{prop_name}' in {self.operation_id or self.path}") + processed_schema["properties"] = new_properties + + # Recursively process 'items' for array schemas + if "items" in processed_schema and isinstance(processed_schema["items"], dict): # 'items' should be a schema object + resolved_items = self._process_schema_or_ref(processed_schema["items"]) + if resolved_items is not None: + processed_schema["items"] = resolved_items + # else: logger.warning(f"Failed to process 'items' schema in {self.operation_id or self.path}") + + # Handle allOf, anyOf, oneOf by trying to merge or process them (simplistic merge for allOf) + # This is a complex area of JSON Schema. This is a very basic attempt. + if "allOf" in processed_schema and isinstance(processed_schema["allOf"], list): + merged_all_of_props = {} + merged_all_of_required = set() + temp_schema_for_all_of = {"type": processed_schema.get("type", "object"), "properties": {}, "required": []} + + for sub_schema_data in processed_schema["allOf"]: + resolved_sub_schema = self._process_schema_or_ref(sub_schema_data) + if resolved_sub_schema and isinstance(resolved_sub_schema, dict): + if "properties" in resolved_sub_schema: + temp_schema_for_all_of["properties"].update(resolved_sub_schema["properties"]) + if "required" in resolved_sub_schema and isinstance(resolved_sub_schema["required"], list): + merged_all_of_required.update(resolved_sub_schema["required"]) + # Copy other top-level keywords from the resolved_sub_schema if needed, e.g. description + for key, value in resolved_sub_schema.items(): + if key not in ["properties", "required", "type", "$ref", "allOf", "anyOf", "oneOf"]: + if key not in temp_schema_for_all_of or temp_schema_for_all_of[key] is None: # prioritize existing + temp_schema_for_all_of[key] = value + + if temp_schema_for_all_of["properties"]: + processed_schema["properties"] = {**processed_schema.get("properties",{}), **temp_schema_for_all_of["properties"]} + if merged_all_of_required: + current_required = set(processed_schema.get("required", [])) + current_required.update(merged_all_of_required) + processed_schema["required"] = sorted(list(current_required)) + del processed_schema["allOf"] # Remove allOf after processing + # Copy other merged attributes back to processed_schema + for key, value in temp_schema_for_all_of.items(): + if key not in ["properties", "required", "type", "$ref", "allOf", "anyOf", "oneOf"]: + if key not in processed_schema or processed_schema[key] is None: + processed_schema[key] = value + + + # anyOf, oneOf are harder as they represent choices. For now, we might just list them or pick first. + # For simplicity in to_dict, we might not fully expand them but ensure refs inside are resolved. + for keyword in ["anyOf", "oneOf"]: + if keyword in processed_schema and isinstance(processed_schema[keyword], list): + processed_sub_list = [] + for sub_item in processed_schema[keyword]: + resolved_sub = self._process_schema_or_ref(sub_item) + if resolved_sub: + processed_sub_list.append(resolved_sub) + if processed_sub_list: # only update if some were resolved + processed_schema[keyword] = processed_sub_list + + return processed_schema + + def to_dict(self) -> Dict[str, Any]: + endpoint_data = { + "method": self.method, + "path": self.path, + "summary": self.summary or "", + "title": self.summary or self.operation_id or "", # Fallback for title + "description": self.description or "", + "operationId": self.operation_id or f"{self.method.lower()}_{self.path.replace('/', '_').replace('{', '').replace('}', '')}", + "tags": self.tags, + "parameters": [], + "requestBody": None, + "responses": {}, + "_source_format": "swagger/openapi", + "_swagger_raw_data": self._raw_data, # Keep raw for debugging + "_global_api_spec_for_resolution": self._global_spec # For test cases that might need to resolve further + } + + # Process parameters + if "parameters" in self._raw_data and isinstance(self._raw_data["parameters"], list): + for param_data_raw in self._raw_data["parameters"]: + # Each param_data_raw could itself be a $ref or contain a schema that is a $ref + processed_param_container = self._process_schema_or_ref(param_data_raw) + if processed_param_container and isinstance(processed_param_container, dict): + # If the parameter itself was a $ref, processed_param_container is the resolved object. + # If it contained a schema that was a $ref, that nested schema should be resolved. + # We need to ensure 'schema' key exists if 'in' is path, query, header + if "schema" in processed_param_container and isinstance(processed_param_container["schema"], dict): + # schema was present, process it further (it might have been already by _process_schema_or_ref if it was a complex object) + # but if _process_schema_or_ref was called on param_data_raw which wasn't a ref itself, + # the internal 'schema' ref might not have been re-processed with full context. + # However, the recursive nature of _process_schema_or_ref should handle nested $refs. + pass # Assume it's processed by the main call to _process_schema_or_ref on param_data_raw + elif "content" in processed_param_container: # Parameter described by Content Object (OpenAPI 3.x) + pass # Content object schemas should have been resolved by _process_schema_or_ref + + endpoint_data["parameters"].append(processed_param_container) + + # Process requestBody + if "requestBody" in self._raw_data and isinstance(self._raw_data["requestBody"], dict): + processed_rb = self._process_schema_or_ref(self._raw_data["requestBody"]) + if processed_rb: + endpoint_data["requestBody"] = processed_rb + + # Process responses + if "responses" in self._raw_data and isinstance(self._raw_data["responses"], dict): + for status_code, resp_data_raw in self._raw_data["responses"].items(): + processed_resp = self._process_schema_or_ref(resp_data_raw) + if processed_resp: + endpoint_data["responses"][status_code] = processed_resp + elif resp_data_raw: # If processing failed but raw exists, keep raw (though this is less ideal) + endpoint_data["responses"][status_code] = resp_data_raw + logger.warning(f"Kept raw response data for {status_code} due to processing failure for {self.operation_id or self.path}") + + if not endpoint_data["responses"]: # Ensure default response if none processed + endpoint_data["responses"]["default"] = {"description": "Default response from Swagger/OpenAPI definition"} + + return endpoint_data + + def __repr__(self): + return f"" + +class DMSEndpoint(BaseEndpoint): + """Represents an API endpoint discovered dynamically from the DMS service.""" + def __init__(self, method: str, path: str, title: str, + request_body: Optional[Dict[str, Any]], + responses: Dict[str, Any], + parameters: Optional[List[Dict[str, Any]]] = None, + category_name: Optional[str] = None, + raw_record: Optional[Dict[str, Any]] = None, + test_mode: str = 'standalone', + operation_id: Optional[str] = None, + model_pk_name: Optional[str] = None, + identity_id_list: Optional[List[str]] = None): + super().__init__(method=method.upper(), path=path) + self.title = title + self.request_body = request_body + self.responses = responses + self.parameters = parameters if parameters is not None else [] + self.category_name = category_name + self._raw_record = raw_record + self.test_mode = test_mode + self.operation_id = operation_id or f"{self.method.lower()}_{self.category_name or 'dms'}_{title.replace(' ', '_')}" + self.model_pk_name = model_pk_name + self.identity_id_list = identity_id_list or [] + + def to_dict(self) -> Dict[str, Any]: + """Converts the DMS endpoint data into a standardized OpenAPI-like dictionary.""" + endpoint_dict = { + "method": self.method, + "path": self.path, + "title": self.title, + "summary": self.title, + "description": self.title, + "operationId": self.operation_id, + "tags": [self.category_name] if self.category_name else [], + "parameters": self.parameters, + "requestBody": self.request_body, + "responses": self.responses, + "_source_format": "dms", + "_dms_raw_record": self._raw_record, + "_test_mode": self.test_mode, + "_dms_model_pk_name": self.model_pk_name + } + return endpoint_dict + + def __repr__(self): + return f"" + +Endpoint = Union[YAPIEndpoint, SwaggerEndpoint, DMSEndpoint] + +class ParsedAPISpec: + """Base class for a parsed API specification from any source.""" + def __init__(self, spec_type: str, endpoints: List[Union[YAPIEndpoint, SwaggerEndpoint, 'DMSEndpoint']], spec: Dict[str, Any]): + self.spec_type = spec_type + self.endpoints = endpoints + self.spec = spec # Store the original full spec dictionary, useful for $ref resolution if not pre-resolved + +class ParsedYAPISpec(ParsedAPISpec): + """解析后的YAPI规范""" + def __init__(self, endpoints: List[YAPIEndpoint], categories: List[Dict[str, Any]], spec: Dict[str, Any]): + super().__init__(spec_type="yapi", endpoints=endpoints, spec=spec) + self.categories = categories + +class ParsedSwaggerSpec(ParsedAPISpec): + """解析后的Swagger/OpenAPI规范""" + def __init__(self, endpoints: List[SwaggerEndpoint], tags: List[Dict[str, Any]], spec: Dict[str, Any]): + super().__init__(spec_type="swagger", endpoints=endpoints, spec=spec) + self.tags = tags + +class ParsedDMSSpec(ParsedAPISpec): + """Parsed specification from the dynamic DMS source.""" + def __init__(self, endpoints: List[DMSEndpoint], spec: Dict[str, Any]): + super().__init__(spec_type="dms", endpoints=endpoints, spec=spec) + +class InputParser: + """负责解析输入(如YAPI JSON)并提取API端点信息""" + def __init__(self): + self.logger = logging.getLogger(__name__) + + def parse_yapi_spec(self, file_path: str) -> Optional[ParsedYAPISpec]: + self.logger.info(f"Parsing YAPI spec from: {file_path}") + all_endpoints: List[YAPIEndpoint] = [] + yapi_categories: List[Dict[str, Any]] = [] + raw_spec_data_list: Optional[List[Dict[str, Any]]] = None # YAPI export is a list of categories + try: + with open(file_path, 'r', encoding='utf-8') as f: + raw_spec_data_list = json.load(f) + + if not isinstance(raw_spec_data_list, list): + self.logger.error(f"YAPI spec file {file_path} does not contain a JSON list as expected for categories.") + return None + + for category_data in raw_spec_data_list: + if not isinstance(category_data, dict): + self.logger.warning(f"Skipping non-dictionary item in YAPI spec list: {str(category_data)[:100]}") + continue + cat_name = category_data.get("name") + cat_id = category_data.get("_id", category_data.get("id")) # YAPI uses _id + yapi_categories.append({"name": cat_name, "description": category_data.get("desc"), "id": cat_id}) + + for endpoint_data in category_data.get("list", []): + if not isinstance(endpoint_data, dict): + self.logger.warning(f"Skipping non-dictionary endpoint item in category '{cat_name}': {str(endpoint_data)[:100]}") + continue + try: + yapi_endpoint = YAPIEndpoint(endpoint_data, category_name=cat_name, category_id=cat_id) + all_endpoints.append(yapi_endpoint) + except Exception as e_ep: + self.logger.error(f"Error processing YAPI endpoint data (ID: {endpoint_data.get('_id', 'N/A')}, Title: {endpoint_data.get('title', 'N/A')}). Error: {e_ep}", exc_info=True) + + # The 'spec' for ParsedYAPISpec should be a dict representing the whole document. + # Since YAPI export is a list of categories, we wrap it. + yapi_full_spec_dict = {"yapi_categories": raw_spec_data_list} + return ParsedYAPISpec(endpoints=all_endpoints, categories=yapi_categories, spec=yapi_full_spec_dict) + except FileNotFoundError: + self.logger.error(f"YAPI spec file not found: {file_path}") + except json.JSONDecodeError as e: + self.logger.error(f"Error decoding JSON from YAPI spec file {file_path}: {e}") + except Exception as e: + self.logger.error(f"An unexpected error occurred while parsing YAPI spec {file_path}: {e}", exc_info=True) + return None + + def parse_swagger_spec(self, file_path: str) -> Optional[ParsedSwaggerSpec]: + self.logger.info(f"Parsing Swagger/OpenAPI spec from: {file_path}") + all_endpoints: List[SwaggerEndpoint] = [] + swagger_tags: List[Dict[str, Any]] = [] + raw_spec_data_dict: Optional[Dict[str, Any]] = None # Swagger/OpenAPI is a single root object + try: + with open(file_path, 'r', encoding='utf-8') as f: + # TODO: Add YAML support if needed, e.g., using PyYAML + raw_spec_data_dict = json.load(f) + + if not isinstance(raw_spec_data_dict, dict): + self.logger.error(f"Swagger spec file {file_path} does not contain a JSON object as expected.") + return None + + swagger_tags = raw_spec_data_dict.get("tags", []) + paths = raw_spec_data_dict.get("paths", {}) + + for path, path_item_obj in paths.items(): + if not isinstance(path_item_obj, dict): continue + for method, operation_obj in path_item_obj.items(): + # Common methods, can be extended + if method.lower() not in ["get", "post", "put", "delete", "patch", "options", "head", "trace"]: + continue # Skip non-standard HTTP methods or extensions like 'parameters' at path level + if not isinstance(operation_obj, dict): continue + try: + # Pass the full raw_spec_data_dict for $ref resolution within SwaggerEndpoint + swagger_endpoint = SwaggerEndpoint(path, method, operation_obj, global_spec=raw_spec_data_dict) + all_endpoints.append(swagger_endpoint) + except Exception as e_ep: + self.logger.error(f"Error processing Swagger endpoint: {method.upper()} {path}. Error: {e_ep}", exc_info=True) + + return ParsedSwaggerSpec(endpoints=all_endpoints, tags=swagger_tags, spec=raw_spec_data_dict) + except FileNotFoundError: + self.logger.error(f"Swagger spec file not found: {file_path}") + except json.JSONDecodeError as e: + self.logger.error(f"Error decoding JSON from Swagger spec file {file_path}: {e}") + except Exception as e: + self.logger.error(f"An unexpected error occurred while parsing Swagger spec {file_path}: {e}", exc_info=True) + return None + + def parse_dms_spec(self, domain_mapping_path: str, base_url: str, headers: Optional[Dict[str, str]] = None, ignore_ssl: bool = False, page_size: int = 1000, page_no_start: int = 1, fetch_all_pages: bool = True) -> Optional[Tuple[ParsedDMSSpec, Dict[str, Any]]]: + self.logger.info(f"Starting DMS spec parsing. Base URL: {base_url}, Domain Map: {domain_mapping_path}") + + if ignore_ssl: + self.logger.warning("SSL certificate verification is disabled for DMS API calls. This is not recommended for production use.") + # 禁用SSL警告 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + headers = headers or {} + + try: + with open(domain_mapping_path, 'r', encoding='utf-8') as f: + DOMAIN_MAP = json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + self.logger.warning(f"Could not load or parse domain map file '{domain_mapping_path}'. Using default domain. Error: {e}") + DOMAIN_MAP = {} + + # 🔧 构建关键词到领域ID的映射表 + keyword_to_domain_id = {} + for domain_id, domain_info in DOMAIN_MAP.items(): + if isinstance(domain_info, dict) and 'keywords' in domain_info: + keywords = domain_info['keywords'] + if isinstance(keywords, list): + for keyword in keywords: + keyword_to_domain_id[keyword] = domain_id + self.logger.debug(f"映射关键词 '{keyword}' -> 领域ID '{domain_id}'") + + # 实现分页获取API列表 + if fetch_all_pages: + self.logger.info(f"Fetching ALL API pages with pagination (page_size={page_size}, starting from page {page_no_start})") + else: + self.logger.info(f"Fetching SINGLE page (page_size={page_size}, page_no={page_no_start})") + + api_records = [] + page_no = page_no_start + total_fetched = 0 + pagination_info = { + "page_size": page_size, + "page_no_start": page_no_start, + "total_pages": 0, + "total_records": 0, + "pages_fetched": 0, + "current_page": page_no_start, + "fetch_all_pages": fetch_all_pages + } + + try: + while True: + list_url = urljoin(base_url, f"/api/schema/manage/schema?pageNo={page_no}&pageSize={page_size}") + self.logger.debug(f"Fetching page {page_no} from: {list_url}") + + response = requests.get(list_url, headers=headers, verify=not ignore_ssl) + response.raise_for_status() + api_list_data = response.json() + + # 检查业务代码是否成功 + list_code = api_list_data.get("code") + list_code_normalized = str(list_code).strip().lower() if list_code is not None else "" + if list_code_normalized not in {"0", "success", "ok", "200"}: + self.logger.error( + f"DMS API list endpoint returned a business error: {api_list_data.get('message')} (code={list_code})" + ) + return None, {} + + # 从分页结构中提取 'records' + page_records = api_list_data.get("data", {}).get("records", []) + if not page_records: + self.logger.info(f"No more records found on page {page_no}, stopping pagination") + break + + api_records.extend(page_records) + total_fetched += len(page_records) + self.logger.info(f"Fetched {len(page_records)} records from page {page_no}, total: {total_fetched}") + + # 更新分页信息 + data = api_list_data.get("data", {}) + total_count = data.get("total", 0) + current_count = data.get("current", 0) * data.get("size", page_size) + + # 第一次获取时更新总数信息 + if page_no == 1: + pagination_info["total_records"] = total_count + pagination_info["total_pages"] = (total_count + page_size - 1) // page_size # 向上取整 + + pagination_info["pages_fetched"] = page_no - page_no_start + 1 + pagination_info["current_page"] = page_no + + # 如果是单页模式,获取一页后就停止 + if not fetch_all_pages: + self.logger.info(f"Single page mode: fetched {len(page_records)} records from page {page_no}") + break + + # 全页模式:检查是否还有更多页面 + if current_count >= total_count or len(page_records) < page_size: + self.logger.info(f"Reached end of data. Total records: {total_fetched}") + break + + page_no += 1 + + # 安全检查:防止无限循环 + if page_no > 1000: # 最多1000页 + self.logger.warning("Reached maximum page limit (1000), stopping pagination") + break + + if not api_records: + self.logger.warning("DMS API list is empty after pagination.") + return ParsedDMSSpec(endpoints=[], spec={"dms_api_list": []}), pagination_info + + except requests.exceptions.RequestException as e: + self.logger.error(f"Failed to fetch API list from DMS: {e}") + return None, {} + except json.JSONDecodeError: + self.logger.error("Failed to decode JSON response from DMS API list.") + return None, {} + + endpoints: List[DMSEndpoint] = [] + + for item in api_records: + domain_name = item.get('domain') + name = item.get('name') + model_id = item.get('id') + if not all(k in item for k in ['domain', 'name', 'id']): + self.logger.warning(f"Skipping an item in API list because it's missing 'domain', 'name', or 'id': {item}") + continue + + # 🔧 改进领域映射:支持精确匹配和前缀匹配 + instance_code = keyword_to_domain_id.get(domain_name) + if instance_code: + self.logger.info(f"通过精确关键词匹配:'{domain_name}' -> '{instance_code}'") + else: + # 尝试前缀匹配(如wb_cd匹配wb) + for keyword, domain_id in keyword_to_domain_id.items(): + if domain_name.startswith(keyword + '_'): # wb_cd以wb_开头 + instance_code = domain_id + self.logger.info(f"通过前缀关键词匹配:'{domain_name}' -> '{instance_code}' (匹配关键词: '{keyword}')") + break + + if not instance_code: + # Fallback到原有的直接映射 + instance_code = DOMAIN_MAP.get(domain_name, domain_name) + self.logger.info(f"使用直接映射:'{domain_name}' -> '{instance_code}'") + model_url = urljoin(base_url, f"/api/schema/manage/schema/{model_id}") + self.logger.info(f"Fetching model for '{name}' from: {model_url}") + + try: + response = requests.get(model_url, headers=headers, verify=not ignore_ssl) + response.raise_for_status() + model_schema_response = response.json() + except requests.exceptions.RequestException as e: + self.logger.error(f"Error fetching model for '{name}': {e}") + continue + except json.JSONDecodeError: + self.logger.error(f"Failed to decode JSON for model '{name}'.") + continue + + detail_code = model_schema_response.get('code') + detail_code_normalized = str(detail_code).strip().lower() if detail_code is not None else "" + if detail_code is not None and detail_code_normalized not in {"0", "success", "ok", "200"}: + self.logger.warning( + f"Skipping API '{name}' due to business error when fetching schema: {model_schema_response.get('message')} (code={detail_code})" + ) + continue + + raw_model_data = model_schema_response.get('data') + if not raw_model_data: + self.logger.warning(f"Skipping API '{name}' due to missing 'data' section in schema response.") + continue + + model = None + identity_id_list = None + version = item.get('version', '1.0.0') + + # 支持多种返回结构 + if isinstance(raw_model_data, dict): + # 1) data.model 格式 + candidate = raw_model_data.get('model') + # 2) data.schema 或其他命名 + candidate = candidate or raw_model_data.get('schema') + # 3) data.records[0] 内嵌模型 + if not candidate and 'records' in raw_model_data and isinstance(raw_model_data['records'], list) and raw_model_data['records']: + record = raw_model_data['records'][0] + if isinstance(record, dict): + candidate = record.get('model') or record.get('schema') or record + identity_id_list = identity_id_list or record.get('identityId') + version = record.get('version', version) + # 4) data 本身就是模型 + candidate = candidate or raw_model_data + + # 处理JSON字符串形式 + if isinstance(candidate, str): + try: + candidate = json.loads(candidate) + except json.JSONDecodeError: + self.logger.warning(f"Schema for '{name}' is a string but not valid JSON; skipping this model.") + candidate = None + + if isinstance(candidate, dict): + model = candidate + identity_id_list = identity_id_list or raw_model_data.get('identityId') + version = raw_model_data.get('version', version) + else: + model = None + elif isinstance(raw_model_data, str): + try: + model = json.loads(raw_model_data) + except json.JSONDecodeError: + self.logger.warning(f"Schema for '{name}' returned as string is not valid JSON; skipping this model.") + model = None + + if not isinstance(model, dict): + self.logger.warning(f"Skipping API '{name}' because schema model could not be resolved to a dictionary.") + continue + + # 新接口可能将实际模型嵌套在 model['model'] 中 + if 'properties' not in model and isinstance(model.get('model'), dict): + inner_model = model.get('model') + if isinstance(inner_model, dict) and 'properties' in inner_model: + identity_id_list = identity_id_list or model.get('identityId') or inner_model.get('identityId') + version = model.get('version', version) + model = inner_model + + # 某些接口可能将schema序列化为model['modelJson']之类的字段 + if 'properties' not in model: + json_field = None + for key in ('modelJson', 'schemaJson', 'model_content', 'schema_content'): + if key in model: + json_field = model[key] + break + if json_field and isinstance(json_field, str): + try: + decoded_model = json.loads(json_field) + if isinstance(decoded_model, dict) and 'properties' in decoded_model: + model = decoded_model + except json.JSONDecodeError: + self.logger.debug(f"Field '{key}' for '{name}' is not valid JSON; ignoring.") + + if not model or 'properties' not in model or not isinstance(model['properties'], dict) or not model['properties']: + self.logger.warning(f"Skipping API '{name}' due to missing or invalid 'properties' in resolved schema.") + continue + + pk_name = None + # Find primary key by looking for top-level "identityId" array. + if identity_id_list is None: + identity_id_list = model.get("identityId") + if isinstance(identity_id_list, list) and len(identity_id_list) > 0: + candidate_pk_name = identity_id_list[0] + # 🔧 验证identityId指向的字段是否真的存在于properties中 + if candidate_pk_name in model['properties']: + pk_name = candidate_pk_name + self.logger.info(f"Found identityId property '{pk_name}' for model '{name}'.") + else: + self.logger.warning(f"identityId property '{candidate_pk_name}' not found in model properties for '{name}'. Will use fallback.") + + # 规范化主键ID列表 + if isinstance(identity_id_list, str): + identity_id_list = [identity_id_list] + elif not isinstance(identity_id_list, list): + identity_id_list = [] + + # Fallback to original behavior if no identityId found or identityId field doesn't exist + if not pk_name: + pk_name = next(iter(model['properties']), None) + if pk_name: + self.logger.warning(f"No valid 'identityId' found for model '{name}'. Falling back to using the first property '{pk_name}' as the primary key.") + + if not pk_name: + self.logger.warning(f"Skipping API '{name}' because no properties found in model to identify a primary key.") + continue + + # 🔧 再次验证pk_name确实存在于properties中(双重保险) + if pk_name not in model['properties']: + self.logger.error(f"Critical error: Primary key '{pk_name}' not found in model properties for '{name}'. Skipping this model.") + continue + + pk_schema = model['properties'][pk_name] + + version = version or model.get('version', '1.0.0') + dms_instance_code = instance_code + category_name = domain_name + + success_response = { + "200": { "description": "Success", "content": {"application/json": {"schema": {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": {"type": "boolean"}}}}}}} + + # Create Endpoint (POST) + create_path = f"/api/dms/{dms_instance_code}/v1/{name}" + create_request_body_schema = {"type": "object", "properties": {"version": {"type": "string", "example": version}, "act": {"type": "integer", "example": -1}, "data": {"type": "array", "items": model}}, "required": ["data"]} + endpoints.append(DMSEndpoint(path=create_path, method='post', title=f"Create {name}", request_body={'content': {'application/json': {'schema': create_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"create_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) + + # List Endpoint (POST) - 🔧 添加pageNo和pageSize查询参数 + list_path = f"/api/dms/{dms_instance_code}/v1/{name}/{version}" + list_response_schema = {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": {"type": "array", "items": model}}} + list_parameters = [ + {'name': 'pageNo', 'in': 'query', 'required': False, 'description': '页码(从1开始)', 'schema': {'type': 'integer', 'default': 1}}, + {'name': 'pageSize', 'in': 'query', 'required': False, 'description': '分页大小(最大值200)', 'schema': {'type': 'integer', 'default': 1000}} + ] + # List请求体Schema(包含version字段,但不包含act字段) + list_request_body_schema = { + "type": "object", + "properties": { + "version": {"type": "string", "example": version}, + "isSearchCount": {"type": "boolean", "example": True}, + "query": { + "type": "object", + "properties": { + "fields": {"type": "array", "items": {"type": "string"}}, + "filter": {"type": "object"} + } + } + } + } + endpoints.append(DMSEndpoint(path=list_path, method='post', title=f"List {name}", request_body={'content': {'application/json': {'schema': list_request_body_schema}}}, responses={'200': {'description': 'Successful Operation', 'content': {'application/json': {'schema': list_response_schema}}}}, parameters=list_parameters, test_mode='standalone', operation_id=f"list_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) + + # Read Endpoint (GET) - 🔧 只为单主键模型生成查询详情接口 + if isinstance(identity_id_list, list) and len(identity_id_list) > 1: + # 🚫 多主键模型:不生成查询详情接口 + self.logger.info(f"跳过多主键模型 '{name}' 的查询详情接口生成,主键数量: {len(identity_id_list)}") + else: + # ✅ 单主键模型:生成查询详情接口 + read_path = f"/api/dms/{dms_instance_code}/v1/{name}/{version}/{{id}}" + read_parameters = [{'name': 'id', 'in': 'path', 'required': True, 'description': f'The ID of the {name}, maps to {pk_name}', 'schema': pk_schema}] + read_response_schema = {"type": "object", "properties": {"code": {"type": "integer"}, "message": {"type": "string"}, "data": model}} + endpoints.append(DMSEndpoint(path=read_path, method='get', title=f"Read {name}", request_body=None, responses={'200': {'description': 'Successful Operation', 'content': {'application/json': {'schema': read_response_schema}}}}, parameters=read_parameters, test_mode='scenario_only', operation_id=f"read_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) + self.logger.info(f"创建单主键读取端点 '{name}',路径参数: id") + + # Update Endpoint (PUT) + update_path = f"/api/dms/{dms_instance_code}/v1/{name}" + update_request_body_schema = {"type": "object", "properties": {"version": {"type": "string", "example": version}, "act": {"type": "integer", "example": -1}, "data": {"type": "array", "items": model}}, "required": ["data"]} + endpoints.append(DMSEndpoint(path=update_path, method='put', title=f"Update {name}", request_body={'content': {'application/json': {'schema': update_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"update_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) + + # Delete Endpoint (DELETE) + delete_path = f"/api/dms/{dms_instance_code}/v1/{name}" + + # 根据identityId列表长度决定删除schema结构 + if isinstance(identity_id_list, list) and len(identity_id_list) > 1: + # 多主键:使用对象数组 + delete_items_properties = {} + delete_required_fields = [] + for pk_field in identity_id_list: + if pk_field in model['properties']: + delete_items_properties[pk_field] = model['properties'][pk_field] + delete_required_fields.append(pk_field) + + delete_request_body_schema = { + "type": "object", + "properties": { + "version": {"type": "string", "example": version}, + "data": { + "type": "array", + "items": { + "type": "object", + "properties": delete_items_properties, + "required": delete_required_fields + } + } + }, + "required": ["data"] + } + self.logger.info(f"创建多主键删除端点 '{name}',主键字段: {identity_id_list}") + else: + # 单主键:使用字符串数组 + delete_request_body_schema = { + "type": "object", + "properties": { + "version": {"type": "string", "example": version}, + "data": { + "type": "array", + "items": {"type": "string"} + } + }, + "required": ["data"] + } + self.logger.info(f"创建单主键删除端点 '{name}',主键字段: {pk_name}") + + endpoints.append(DMSEndpoint(path=delete_path, method='delete', title=f"Delete {name}", request_body={'content': {'application/json': {'schema': delete_request_body_schema}}}, responses=success_response, test_mode='scenario_only', operation_id=f"delete_{name}", category_name=category_name, raw_record=item, model_pk_name=pk_name, identity_id_list=identity_id_list)) + + # The 'spec' for ParsedDMSSpec should represent the whole document. + # We can construct a dictionary holding all the raw data we fetched. + dms_full_spec_dict = {"dms_api_list": api_records} + return ParsedDMSSpec(endpoints=endpoints, spec=dms_full_spec_dict), pagination_info + +class DmsConfig: + def __init__(self, base_url: str, domain_map_file: str, headers: Optional[Dict[str, str]] = None): + self.base_url = base_url + self.domain_map_file = domain_map_file + self.headers = headers + +def get_endpoints_from_swagger(file_path: str) -> Tuple[List[SwaggerEndpoint], str]: + """ + Parses a Swagger/OpenAPI JSON file and returns a list of SwaggerEndpoint objects + and the base path of the API. + """ + logger.info(f"Parsing Swagger/OpenAPI spec from: {file_path}") + all_endpoints: List[SwaggerEndpoint] = [] + swagger_tags: List[Dict[str, Any]] = [] + raw_spec_data_dict: Optional[Dict[str, Any]] = None # Swagger/OpenAPI is a single root object + try: + with open(file_path, 'r', encoding='utf-8') as f: + # TODO: Add YAML support if needed, e.g., using PyYAML + raw_spec_data_dict = json.load(f) + + if not isinstance(raw_spec_data_dict, dict): + logger.error(f"Swagger spec file {file_path} does not contain a JSON object as expected.") + return [], "" + + swagger_tags = raw_spec_data_dict.get("tags", []) + paths = raw_spec_data_dict.get("paths", {}) + + for path, path_item_obj in paths.items(): + if not isinstance(path_item_obj, dict): continue + for method, operation_obj in path_item_obj.items(): + # Common methods, can be extended + if method.lower() not in ["get", "post", "put", "delete", "patch", "options", "head", "trace"]: + continue # Skip non-standard HTTP methods or extensions like 'parameters' at path level + if not isinstance(operation_obj, dict): continue + try: + # Pass the full raw_spec_data_dict for $ref resolution within SwaggerEndpoint + swagger_endpoint = SwaggerEndpoint(path, method, operation_obj, global_spec=raw_spec_data_dict) + all_endpoints.append(swagger_endpoint) + except Exception as e_ep: + logger.error(f"Error processing Swagger endpoint: {method.upper()} {path}. Error: {e_ep}", exc_info=True) + + return all_endpoints, raw_spec_data_dict.get("basePath", "") if raw_spec_data_dict.get("basePath") else "" + except FileNotFoundError: + # It's better to log this error. Assuming a logger is available at self.logger + logger.error(f"Swagger spec file not found: {file_path}") + return [], "" + except json.JSONDecodeError as e: + logger.error(f"Error decoding JSON from Swagger spec file {file_path}: {e}") + return [], "" + except Exception as e: + logger.error(f"An unexpected error occurred while parsing {file_path}: {e}", exc_info=True) + return [], "" + + +def parse_input_to_endpoints(input_type: str, input_path: str, dms_config: DmsConfig = None) -> Tuple[List[Endpoint], str]: + """ + Parses input from a given type (YAPI, Swagger, DMS) and returns a list of Endpoint objects + and the base path of the API. + """ + parser = InputParser() + if input_type == "yapi": + parsed_spec = parser.parse_yapi_spec(input_path) + if parsed_spec: + return parsed_spec.endpoints, "" # YAPI doesn't have a base path in the same sense as Swagger + elif input_type == "swagger": + # The standalone get_endpoints_from_swagger is simple, but for consistency let's use the parser + parsed_spec = parser.parse_swagger_spec(input_path) + if parsed_spec: + base_path = parsed_spec.spec.get("basePath", "") or "" + # servers URL might be more modern (OpenAPI 3) + if not base_path and "servers" in parsed_spec.spec and parsed_spec.spec["servers"]: + # Use the first server URL + base_path = parsed_spec.spec["servers"][0].get("url", "") + return parsed_spec.endpoints, base_path + elif input_type == "dms": + if dms_config: + parsed_spec = parser.parse_dms_spec(dms_config.domain_map_file, dms_config.base_url, dms_config.headers) + if parsed_spec: + return parsed_spec.endpoints, dms_config.base_url + else: + logger.error("DMS configuration not provided for DMS input type.") + return [], "" + else: + logger.error(f"Unsupported input type: {input_type}") + return [], "" + return [], "" \ No newline at end of file diff --git a/fastapi_server.py b/fastapi_server.py index 87072b7..be4d1ef 100644 --- a/fastapi_server.py +++ b/fastapi_server.py @@ -128,7 +128,7 @@ class TestConfig(BaseModel): # 过滤选项 strictness_level: str = Field("CRITICAL", description="测试严格等级", pattern="^(CRITICAL|HIGH|MEDIUM|LOW)$") - + ignore_ssl: bool = Field(True, description="是否忽略SSL证书错误", examples=[True, False]) @field_validator('base_url') @classmethod def validate_base_url(cls, v):