"""API Caller Module""" import requests import json # Added for cURL body pretty printing from typing import Any, Dict, Optional, Union, List, Tuple # Added Tuple import shlex # Added for shell quoting import urllib.parse # Moved import to top level for reuse # Attempt to import curlify, if not found, the function will raise an error or fallback try: import curlify except ImportError: curlify = None # So we can check its availability from pydantic import BaseModel, Field, HttpUrl # It's a good practice to define input/output models, # even for internal components, using Pydantic. class APIRequest(BaseModel): method: str # GET, POST, PUT, DELETE, etc. url: HttpUrl headers: Optional[Dict[str, str]] = None params: Optional[Dict[str, Any]] = None json_data: Optional[Any] = None # For POST/PUT with JSON body (can be dict, list, string, etc.) body: Optional[Any] = Field(default=None, description="Alias for json_data") # 添加别名,方便调用 data: Optional[Any] = None # For form data etc. timeout: int = 30 # seconds def model_post_init(self, __context): """初始化后处理,将 body 赋值给 json_data(如果提供了body)""" if self.body is not None and self.json_data is None: self.json_data = self.body class APIResponse(BaseModel): status_code: int headers: Dict[str, str] content: bytes # Raw content json_content: Optional[Any] = None # Parsed JSON content if applicable elapsed_time: float # in seconds class APICallDetail(BaseModel): """Model to store detailed information about a single API call for logging.""" request_method: str request_url: str request_headers: Dict[str, str] request_params: Optional[Dict[str, Any]] = None request_body: Optional[Any] = None curl_command: str response_status_code: int response_headers: Dict[str, str] response_body: Optional[Any] = None # Could be str for non-JSON, or parsed JSON response_elapsed_time: float class APICaller: """ Responsible for executing HTTP/S API calls to the DDMS services. """ def __init__(self, default_timeout: int = 30, default_headers: Optional[Dict[str, str]] = None): self.default_timeout = default_timeout self.default_headers = default_headers or {} # Session object can be initialized here if we want to reuse it across calls, # but for curlify, a temporary one in _generate_curl_command is fine. self._session_for_curlify = requests.Session() def _generate_curl_command(self, request_data: APIRequest, actual_headers: Dict[str, str]) -> str: """Generates an equivalent cURL command using the curlify library.""" if curlify is None: # Fallback or error message if curlify is not installed # For now, returning a simple message. Ideally, log this. print("ERROR: curlify library is not installed. Cannot generate cURL command.") return "curlify_not_installed" # Construct the full URL with parameters for the Request object url_with_params = str(request_data.url) if request_data.params: query_string = urllib.parse.urlencode(request_data.params) url_with_params = f"{url_with_params}?{query_string}" # Create a requests.Request object # Note: requests.Request takes 'data' for form data and 'json' for json body. # Our APIRequest has json_data (preferred) and data. req = requests.Request( method=request_data.method.upper(), url=url_with_params, headers=actual_headers, # actual_headers already includes defaults + request-specific data=request_data.data, # Pass form data if present json=request_data.json_data # Pass json data if present (requests handles one or the other) ) # Prepare the request using a session (needed by curlify) # Using the session from the APICaller instance prepared_request = self._session_for_curlify.prepare_request(req) try: # Generate cURL command using curlify # Adding verify=False to match current requests.request(verify=False) behavior # compressed=True by default in curlify, which is usually fine (adds --compressed) curl_command_str = curlify.to_curl(prepared_request, verify=False) print(f"DEBUG: curlify generated command (raw): {curl_command_str}") # Debug print print(f"DEBUG: curlify generated command (repr): {repr(curl_command_str)}") # Added repr print return curl_command_str except Exception as e: print(f"ERROR: Failed to generate cURL command with curlify: {e}") return f"curlify_generation_failed: {e}" def call_api(self, request_data: APIRequest) -> Tuple[APIResponse, APICallDetail]: # Modified return type """ Makes an API call based on the provided request data. Args: request_data: An APIRequest Pydantic model instance. Returns: A tuple containing: - An APIResponse Pydantic model instance. - An APICallDetail Pydantic model instance. """ merged_headers = {**self.default_headers, **(request_data.headers or {})} timeout = request_data.timeout or self.default_timeout json_payload = request_data.json_data # Generate cURL command before making the request curl_command = self._generate_curl_command(request_data, merged_headers) request_body_for_log = None if json_payload is not None: request_body_for_log = json_payload elif request_data.data is not None: request_body_for_log = request_data.data # Store as is, might be dict or str try: response = requests.request( method=request_data.method.upper(), url=str(request_data.url), headers=merged_headers, params=request_data.params, json=json_payload, data=request_data.data, timeout=timeout, verify=False # As per original code ) status_code = response.status_code response_headers_dict = dict(response.headers) response_content_bytes = response.content response_elapsed_time = response.elapsed.total_seconds() parsed_json_content = None response_body_for_log: Any = response_content_bytes.decode('utf-8', errors='replace') # Default to decoded string try: if response_headers_dict.get('Content-Type', '').startswith('application/json'): parsed_json_content = response.json() response_body_for_log = parsed_json_content # If JSON, log the parsed JSON except requests.exceptions.JSONDecodeError: pass # Keep response_body_for_log as decoded string api_response = APIResponse( status_code=status_code, headers=response_headers_dict, content=response_content_bytes, json_content=parsed_json_content, elapsed_time=response_elapsed_time ) api_call_detail = APICallDetail( request_method=request_data.method.upper(), request_url=str(request_data.url), request_headers=merged_headers, request_params=request_data.params, request_body=request_body_for_log, curl_command=curl_command, response_status_code=status_code, response_headers=response_headers_dict, response_body=response_body_for_log, response_elapsed_time=response_elapsed_time ) return api_response, api_call_detail except requests.exceptions.HTTPError as e: # For HTTPError, response object should exist status_code = e.response.status_code if e.response else 500 response_headers_dict = dict(e.response.headers) if e.response else {} response_content_bytes = e.response.content if e.response else str(e).encode() response_elapsed_time = e.response.elapsed.total_seconds() if e.response and hasattr(e.response, 'elapsed') else 0 response_body_for_log = response_content_bytes.decode('utf-8', errors='replace') # Try to parse JSON from error response if possible, for logging parsed_json_content_error = None if response_headers_dict.get('Content-Type', '').startswith('application/json'): try: parsed_json_content_error = json.loads(response_body_for_log) # use json.loads for string response_body_for_log = parsed_json_content_error except json.JSONDecodeError: pass api_response_err = APIResponse( status_code=status_code, headers=response_headers_dict, content=response_content_bytes, json_content=parsed_json_content_error, # Log parsed JSON if available elapsed_time=response_elapsed_time ) api_call_detail_err = APICallDetail( request_method=request_data.method.upper(), request_url=str(request_data.url), request_headers=merged_headers, request_params=request_data.params, request_body=request_body_for_log, # request body for log curl_command=curl_command, response_status_code=status_code, response_headers=response_headers_dict, response_body=response_body_for_log, # Log decoded string or parsed JSON response_elapsed_time=response_elapsed_time ) return api_response_err, api_call_detail_err except requests.exceptions.RequestException as e: # For other RequestExceptions, e.response might not exist or be partial status_code = getattr(e.response, 'status_code', 503) # 503 Service Unavailable seems fitting response_headers_dict = dict(getattr(e.response, 'headers', {})) response_content_str = str(e) response_content_bytes = response_content_str.encode() api_response_exc = APIResponse( status_code=status_code, headers=response_headers_dict, content=response_content_bytes, json_content=None, elapsed_time=0 ) api_call_detail_exc = APICallDetail( request_method=request_data.method.upper(), request_url=str(request_data.url), request_headers=merged_headers, request_params=request_data.params, request_body=request_body_for_log, # request body for log curl_command=curl_command, response_status_code=status_code, response_headers=response_headers_dict, response_body=response_content_str, # Log the error string response_elapsed_time=0 ) return api_response_exc, api_call_detail_exc # Example Usage (can be moved to tests or main application logic) if __name__ == '__main__': caller = APICaller(default_headers={"X-App-Name": "DDMSComplianceSuite"}) # Example GET request get_req_data = APIRequest( method="GET", url=HttpUrl("https://jsonplaceholder.typicode.com/todos/1"), headers={"X-Request-ID": "12345"} ) response, detail = caller.call_api(get_req_data) # Unpack two values now print("GET Response:") if response.json_content: print(f"Status: {response.status_code}, Data: {response.json_content}") else: print(f"Status: {response.status_code}, Content: {response.content.decode()}") print(f"Time taken: {response.elapsed_time:.4f}s") print("GET Call Detail:") print(detail.model_dump_json(indent=2)) # Use model_dump_json for pretty print print("\n") # Example POST request with json_data post_req_data = APIRequest( method="POST", url=HttpUrl("https://jsonplaceholder.typicode.com/posts"), json_data={"title": "foo", "body": "bar", "userId": 1}, headers={"Content-Type": "application/json; charset=UTF-8"} ) response, detail = caller.call_api(post_req_data) print("POST Response with json_data:") if response.json_content: print(f"Status: {response.status_code}, Data: {response.json_content}") else: print(f"Status: {response.status_code}, Content: {response.content.decode()}") print(f"Time taken: {response.elapsed_time:.4f}s") print("POST Call Detail (json_data):") print(detail.model_dump_json(indent=2)) # Example POST request with body (alias for json_data) post_req_data_with_body = APIRequest( method="POST", url=HttpUrl("https://jsonplaceholder.typicode.com/posts"), body={"title": "using body", "body": "testing body alias", "userId": 2}, headers={"Content-Type": "application/json; charset=UTF-8"} ) response, detail = caller.call_api(post_req_data_with_body) print("\nPOST Response with body:") if response.json_content: print(f"Status: {response.status_code}, Data: {response.json_content}") else: print(f"Status: {response.status_code}, Content: {response.content.decode()}") print(f"Time taken: {response.elapsed_time:.4f}s") print("POST Call Detail (body):") print(detail.model_dump_json(indent=2)) # Example list body request (demonstrating array body support) array_req_data = APIRequest( method="POST", url=HttpUrl("https://jsonplaceholder.typicode.com/posts"), body=["test_string", "another_value"], headers={"Content-Type": "application/json; charset=UTF-8"} ) response, detail = caller.call_api(array_req_data) print("\nPOST Response with array body:") if response.json_content: print(f"Status: {response.status_code}, Data: {response.json_content}") else: print(f"Status: {response.status_code}, Content: {response.content.decode()}") print(f"Time taken: {response.elapsed_time:.4f}s") print("POST Call Detail (array body):") print(detail.model_dump_json(indent=2)) # Example Error request (non-existent domain) error_req_data = APIRequest( method="GET", url=HttpUrl("https://nonexistentdomain.invalid"), ) response, detail = caller.call_api(error_req_data) print("\nError GET Response:") print(f"Status: {response.status_code}, Content: {response.content.decode()}") print(f"Time taken: {response.elapsed_time:.4f}s") print("Error GET Call Detail:") print(detail.model_dump_json(indent=2))