diff --git a/.DS_Store b/.DS_Store index a60196a..6fad486 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/assets/.DS_Store b/assets/.DS_Store index 418c7da..03cdea9 100644 Binary files a/assets/.DS_Store and b/assets/.DS_Store differ diff --git a/compliance-mcp-agent/__pycache__/response_utils.cpython-312.pyc b/compliance-mcp-agent/__pycache__/response_utils.cpython-312.pyc new file mode 100644 index 0000000..20cab1c Binary files /dev/null and b/compliance-mcp-agent/__pycache__/response_utils.cpython-312.pyc differ diff --git a/compliance-mcp-agent/agent_main_loop.py b/compliance-mcp-agent/agent_main_loop.py new file mode 100644 index 0000000..a031b07 --- /dev/null +++ b/compliance-mcp-agent/agent_main_loop.py @@ -0,0 +1,253 @@ +import asyncio +import json +import traceback +from typing import List, Dict, Any + +from mcp.client.streamable_http import streamablehttp_client +from mcp import ClientSession +from mcp.types import Tool, TextContent + +from llm.llm_service import LLMService + +# --- 配置区 --- +SERVER_ENDPOINTS = { + "api_caller": "http://127.0.0.1:8001/mcp", + "schema_validator": "http://127.0.0.1:8002/mcp", + "dms_provider": "http://127.0.0.1:8003/mcp", + "test_manager": "http://127.0.0.1:8004/mcp", +} +MAX_AGENT_LOOPS = 50 + +def mcp_tools_to_openai_format(mcp_tools: List[Tool]) -> List[Dict[str, Any]]: + """ + 将MCP工具列表转换为OpenAI工具格式。 + """ + openai_tools = [] + for tool in mcp_tools: + # tool is a mcp.types.Tool object, which has .name, .description, and .inputSchema + openai_tools.append({ + "type": "function", + "function": { + "name": tool.name, + "description": tool.description or "", + "parameters": tool.inputSchema or {"type": "object", "properties": {}} + } + }) + return openai_tools + +async def get_structured_response(tool_response: Any) -> Dict[str, Any]: + """ + 健壮地从工具调用响应中获取结构化内容。 + 能处理SDK未能自动解析JSON,而是将其放入TextContent的情况。 + """ + if tool_response.structuredContent: + # 正常情况,SDK已成功解析 + return tool_response.structuredContent + + # 异常情况:尝试从TextContent手动解析JSON + if tool_response.content and isinstance(tool_response.content[0], TextContent): + try: + json_text = tool_response.content[0].text + parsed_json = json.loads(json_text) + return parsed_json + except (json.JSONDecodeError, IndexError) as e: + # 如果手动解析也失败,则抛出致命错误 + raise RuntimeError(f"Failed to manually parse JSON from TextContent: {e}. Raw text: '{json_text}'") + + # 如果既没有structuredContent,也没有可解析的TextContent,则抛出致命错误 + raise RuntimeError("Tool call returned no structuredContent and no parsable TextContent.") + + +async def execute_task(task: Dict, tool_to_session_map: Dict, openai_tools: List[Dict]): + """ + 为一个通用的、由prompt驱动的任务执行完整的、隔离的测试生命周期。 + """ + llm_service = LLMService(tools=openai_tools) + + task_name = task['name'] + prompt = task['prompt'] + + print(f"\n>>>> Starting Task: {task_name} <<<<") + llm_service.start_new_task(prompt) + + # 针对当前任务的子任务循环 + for sub_loop in range(25): # 单个任务的测试循环上限 + print("\n" + "="*20 + f" Sub-Loop for '{task_name}' ({sub_loop+1}/25) " + "="*20) + + tool_name, tool_args, tool_call_id = llm_service.execute_completion() + + if not tool_name: + print(f"Agent: LLM did not request a tool call for task '{task_name}'. It might be confused. Ending task.") + # 即使LLM困惑,我们仍然尝试记录一个失败结果,如果record_test_result可用的话 + record_session = tool_to_session_map.get("record_test_result") + if record_session: + # 我们需要从prompt中猜测api_id,这很脆弱,但比什么都不做要好 + import re + match = re.search(r"API 模型 '([^']+)'", prompt) + api_id_guess = match.group(1) if match else "unknown" + await record_session.call_tool("record_test_result", {"api_id": api_id_guess, "task_name": task_name, "status": "failed", "details": "LLM got confused and stopped calling tools."}) + return # 结束此任务 + + # 核心逻辑:如果LLM调用了record_test_result,说明这个任务结束了 + if tool_name == "record_test_result": + print(f"Agent: LLM is recording result for task '{task_name}'. Task is complete.") + record_session = tool_to_session_map.get("record_test_result") + if record_session: + # 将任务名称加入到参数中,以便更好地跟踪 + tool_args['task_name'] = task_name + await record_session.call_tool(tool_name, tool_args) + return # 核心修复:使用return退出此任务的函数 + + if tool_name == "error_malformed_json": + error_info = tool_args + print(f"Agent: Detected a malformed JSON from LLM for tool '{error_info['tool_name']}'. Asking for correction.") + correction_request = f"你上次试图调用工具 '{error_info['tool_name']}',但提供的参数不是一个有效的JSON。错误是:{error_info['error']}。这是你提供的错误参数:'{error_info['malformed_arguments']}'。请修正这个错误,并重新调用该工具。" + llm_service.add_user_message(correction_request) + continue + + if tool_name in tool_to_session_map: + try: + target_session = tool_to_session_map[tool_name] + result = await target_session.call_tool(tool_name, tool_args) + + structured_result = await get_structured_response(result) + tool_result_str = json.dumps(structured_result, ensure_ascii=False, indent=2) if structured_result else "Tool executed successfully." + + print(f"Agent: Tool '{tool_name}' executed for '{task_name}'. Result: {tool_result_str}") + llm_service.add_tool_call_response(tool_call_id, tool_result_str) + except Exception as e: + error_message = f"An exception occurred while calling tool {tool_name} for '{task_name}': {e}" + print(f"Agent: {error_message}") + traceback.print_exc() + llm_service.add_tool_call_response(tool_call_id, error_message) + else: + error_message = f"Error: LLM tried to call an unknown tool '{tool_name}' for task '{task_name}'." + print(f"Agent: {error_message}") + llm_service.add_tool_call_response(tool_call_id, error_message) + + print(f"Agent: Reached sub-loop limit for task '{task_name}'. Recording as failed and moving on.") + record_session = tool_to_session_map.get("record_test_result") + if record_session: + import re + match = re.search(r"API 模型 '([^']+)'", prompt) + api_id_guess = match.group(1) if match else "unknown" + await record_session.call_tool("record_test_result", {"api_id": api_id_guess, "task_name": task_name, "status": "failed", "details": "Reached sub-loop limit."}) + + +async def main(): + print("LLM-Powered Agent starting...") + + # 使用 `async with` 来确保所有会话都能被正确关闭 + async with streamablehttp_client(SERVER_ENDPOINTS["api_caller"]) as (r1, w1, _), \ + streamablehttp_client(SERVER_ENDPOINTS["schema_validator"]) as (r2, w2, _), \ + streamablehttp_client(SERVER_ENDPOINTS["dms_provider"]) as (r3, w3, _), \ + streamablehttp_client(SERVER_ENDPOINTS["test_manager"]) as (r4, w4, _): + + print("Agent: All MCP server connections established.") + + async with ClientSession(r1, w1) as s1, ClientSession(r2, w2) as s2, ClientSession(r3, w3) as s3, ClientSession(r4, w4) as s4: + + await asyncio.gather(s1.initialize(), s2.initialize(), s3.initialize(), s4.initialize()) + + tool_to_session_map = {tool.name: s1 for tool in (await s1.list_tools()).tools} + tool_to_session_map.update({tool.name: s2 for tool in (await s2.list_tools()).tools}) + tool_to_session_map.update({tool.name: s3 for tool in (await s3.list_tools()).tools}) + tool_to_session_map.update({tool.name: s4 for tool in (await s4.list_tools()).tools}) + + all_mcp_tools = list(tool_to_session_map.keys()) + print(f"Total tools found: {len(all_mcp_tools)}") + + openai_tools = mcp_tools_to_openai_format([tool for session in [s1, s2, s3, s4] for tool in (await session.list_tools()).tools]) + print("Agent: LLM Service tools prepared.") + + # --- Agent主导的宏观测试流程 --- + + # 1. 获取所有待测试的API + print("\n" + "="*20 + " Phase 1: Fetching APIs " + "="*20) + get_api_list_session = tool_to_session_map.get("get_api_list") + if not get_api_list_session: + raise RuntimeError("Critical Error: 'get_api_list' tool not found.") + + api_list_result = await get_api_list_session.call_tool("get_api_list", {}) + api_list_structured = await get_structured_response(api_list_result) + response_data = api_list_structured.get("result", api_list_structured) + api_records = response_data.get('records', []) + api_ids_to_test = [record['id'] for record in api_records if 'id' in record] + + if not api_ids_to_test: + raise RuntimeError(f"Critical Error: DMSProviderServer returned an empty list of APIs.") + print(f"Agent: Found {len(api_ids_to_test)} APIs to test: {api_ids_to_test}") + + # 2. 加载任务模板 + print("\n" + "="*20 + " Phase 2: Loading Task Templates " + "="*20) + try: + with open('compliance-mcp-agent/tasks.json', 'r', encoding='utf-8') as f: + task_templates = json.load(f) + print(f"Agent: Loaded {len(task_templates)} task templates.") + except FileNotFoundError: + raise RuntimeError("Critical Error: 'tasks.json' not found in 'compliance-mcp-agent/' directory.") + except json.JSONDecodeError as e: + raise RuntimeError(f"Critical Error: Failed to parse 'tasks.json'. Error: {e}") + + # 3. 初始化测试计划 + print("\n" + "="*20 + " Phase 3: Initializing Test Plan " + "="*20) + initialize_plan_session = tool_to_session_map.get("initialize_test_plan") + if not initialize_plan_session: + raise RuntimeError("Critical Error: 'initialize_test_plan' tool not found.") + + total_task_count = len(api_ids_to_test) * len(task_templates) + print(f"Agent: Initializing test plan for {total_task_count} total tasks ({len(api_ids_to_test)} APIs x {len(task_templates)} templates)...") + init_result = await initialize_plan_session.call_tool("initialize_test_plan", {"api_ids": api_ids_to_test}) + init_structured = await get_structured_response(init_result) + init_response_data = init_structured.get("result", init_structured) + if init_response_data.get("status") != "success": + raise RuntimeError(f"Failed to initialize test plan. Reason: {init_response_data.get('message')}") + print("Agent: Test plan initialized successfully in TestManager.") + + # 4. 主执行循环 (M x N) + print("\n" + "="*20 + " Phase 4: Main Execution Loop " + "="*20) + + execution_tasks = [] + for api_id in api_ids_to_test: + for template in task_templates: + # 动态生成任务 + final_prompt = template['prompt_template'].format(api_id=api_id) + task_name_with_api = f"{template['name']} for {api_id}" + + task_to_run = { + "name": task_name_with_api, + "prompt": final_prompt + } + + # 为每个任务创建一个异步执行协程 + execution_tasks.append( + execute_task( + task=task_to_run, + tool_to_session_map=tool_to_session_map, + openai_tools=openai_tools + ) + ) + + # 并发执行所有生成的任务 + await asyncio.gather(*execution_tasks) + print("\nAll generated tasks have concluded.") + + # 5. 最终总结 + print("\n" + "="*20 + " Phase 5: Final Summary " + "="*20) + summary_session = tool_to_session_map.get("get_test_summary") + if summary_session: + summary_result = await summary_session.call_tool("get_test_summary", {}) + summary_structured = await get_structured_response(summary_result) + summary_data = summary_structured.get("result", summary_structured) + print("Final Test Summary:") + print(json.dumps(summary_data, indent=2, ensure_ascii=False)) + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nAgent manually interrupted.") + except Exception as e: + print(f"\nAn unexpected error occurred in main: {e}") + traceback.print_exc() \ No newline at end of file diff --git a/compliance-mcp-agent/llm/__pycache__/llm_service.cpython-312.pyc b/compliance-mcp-agent/llm/__pycache__/llm_service.cpython-312.pyc new file mode 100644 index 0000000..8d3bade Binary files /dev/null and b/compliance-mcp-agent/llm/__pycache__/llm_service.cpython-312.pyc differ diff --git a/compliance-mcp-agent/llm/llm_service.py b/compliance-mcp-agent/llm/llm_service.py new file mode 100644 index 0000000..80b112f --- /dev/null +++ b/compliance-mcp-agent/llm/llm_service.py @@ -0,0 +1,103 @@ +import openai +from typing import List, Dict, Any, Tuple +import json +import traceback +import os + +class LLMService: + def __init__(self, model_name="qwen-plus", tools=None): + if tools is None: + tools = [] + self.api_key = os.getenv("OPENAI_API_KEY") + if not self.api_key: + raise ValueError("API key not found. Please set the OPENAI_API_KEY environment variable.") + + self.client = openai.OpenAI(api_key=self.api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1") + self.model_name = model_name + self.tools = tools + self.system_prompt = {"role": "system", "content": "你是一个智能API测试Agent。你的任务是根据用户的要求,通过自主、连续地调用给定的工具来完成API的自动化测试。请仔细分析每一步的结果,并决定下一步应该调用哪个工具。"} + self.messages: List[Dict[str, Any]] = [self.system_prompt] + + def start_new_task(self, task_description: str): + """ + 开始一个新任务,这会重置对话历史,但保留最后的工具调用(如果有的话), + 以提供任务切换的上下文。 + """ + print(f"\n{'='*25} Starting New Task Context {'='*25}") + print(f"Task Description: {task_description}") + + last_tool_call_response = None + if len(self.messages) > 1 and self.messages[-1]["role"] == "tool": + last_tool_call_response = self.messages[-1] + + self.messages = [self.system_prompt] + if last_tool_call_response: + self.messages.append(last_tool_call_response) + print(f"Preserving last tool response for context: {last_tool_call_response['name']}") + + self.add_user_message(task_description) + print(f"{'='*72}\n") + + def add_user_message(self, content: str): + self.messages.append({"role": "user", "content": content}) + + def add_tool_call_response(self, tool_call_id: str, content: str): + self.messages.append( + { + "tool_call_id": tool_call_id, + "role": "tool", + "name": tool_call_id, # 名字可以和ID一样,重要的是ID要匹配 + "content": content, + } + ) + + def get_last_assistant_message(self) -> str: + for msg in reversed(self.messages): + if msg["role"] == "assistant" and msg.get("content"): + return msg["content"] + return "No final response from assistant." + + def execute_completion(self) -> Tuple[str, dict]: + print("\n" + "="*25 + " LLM Request " + "="*25) + print(json.dumps({"model": self.model_name, "messages": self.messages, "tools": self.tools}, ensure_ascii=False, indent=2)) + print("="*71) + + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=self.messages, + tools=self.tools, + tool_choice="auto", + ) + + print("\n" + "="*25 + " LLM Response " + "="*25) + print(response) + print("="*72) + + response_message = response.choices[0].message + tool_calls = response_message.tool_calls + + if tool_calls: + tool_call = tool_calls[0] + tool_name = tool_call.function.name + tool_call_id = tool_call.id + + try: + tool_args = json.loads(tool_call.function.arguments) + # 将成功的tool_call添加到历史记录 + self.messages.append(response_message.model_dump(exclude_unset=True)) + return tool_name, tool_args, tool_call_id + except json.JSONDecodeError as e: + error_msg = f"LLM generated malformed JSON for tool arguments: {tool_call.function.arguments}. Error: {e}" + print(error_msg) + # 不将错误的assistant消息加入历史,而是返回错误信号 + return "error_malformed_json", {"tool_name": tool_name, "malformed_arguments": tool_call.function.arguments, "error": str(e)}, None + + # 如果没有工具调用,就将assistant的回复加入历史 + self.messages.append(response_message.model_dump(exclude_unset=True)) + return None, None, None + + except Exception as e: + print(f"Error calling LLM API: {e}") + traceback.print_exc() + return None, None, None \ No newline at end of file diff --git a/compliance-mcp-agent/memory-bank/activeContext.md b/compliance-mcp-agent/memory-bank/activeContext.md new file mode 100644 index 0000000..eb2f2bf --- /dev/null +++ b/compliance-mcp-agent/memory-bank/activeContext.md @@ -0,0 +1,39 @@ +# 活动上下文 + +## 当前工作焦点 + +我们正处于 **架构验证和问题修复的关键阶段**。在初步搭建了包含多个服务器(`APICaller`, `SchemaValidator`, `DMSProvider`, `TestManager`)的完整 MCP 架构后,我们遇到了一个**持续性的、与文件路径解析相关的核心障碍**。 + +当前所有工作的核心焦点是 **彻底解决在子进程中运行的 MCP 服务器无法正确定位其依赖文件(如 `domain.json`)的问题**,并最终让整个测试流程成功运转起来。 + +### 优先任务 +1. **根源分析**: 彻底理解 `subprocess.Popen` 的工作目录(CWD)继承机制,以及它是如何与 `DMSProviderServer.py` 中 `os.path` 相关函数交互并导致错误的。 +2. **实施健壮的解决方案**: + - **重构 `run_tests.py`**: 修改启动脚本,在创建服务器子进程时,为其**明确设置 `cwd` 参数**,确保每个服务器都在其脚本所在的目录中运行。 + - **简化服务器路径**: 在 `DMSProviderServer.py` 中,将文件路径调整为基于其已被正确设置的 `cwd` 的、更简单的相对路径。 +3. **最终验证**: 运行完整的端到端测试,确认 AI Agent 能够成功从 `DMSProviderServer` 获取 API 列表,并启动其测试循环。 + +## 最近变更 + +* **多服务器架构实现**: 我们已经成功创建并集成了四个独立的 MCP 服务器,每个服务器都提供一组特定的工具。 +* **Agent 逻辑进化**: Agent 的主循环 (`agent_main_loop.py`) 已经从硬编码逻辑演变为一个完全由 LLM 驱动的、动态的测试流程。 +* **启动器脚本**: 创建了 `run_tests.py`,用于统一启动所有服务器和 Agent 进程。 +* **反复的路径修复尝试**: 多次尝试修改 `DMSProviderServer.py` 中的相对路径,但均未成功,这促使我们对问题进行更深入的分析。 + +## 活动决策和考虑 + +### 当前决策 +1. **接受失败并深入分析**: 我们认识到,简单的路径调整是无效的。我们决定暂停“打地鼠”式的修复,转而投入时间去理解问题的根本原因——进程的执行上下文。 +2. **采用 `cwd` 解决方案**: 我们确定,通过在 `subprocess.Popen` 中为每个服务器子进程显式设置 `cwd`,是解决此类问题的最健壮、最可靠的方法。这将使我们的系统对执行环境的变化更具弹性。 + +### 开放问题 +1. **异步错误处理**: 当前的 `agent_main_loop.py` 在 `TaskGroup` 中遇到了未处理的异常。一旦路径问题解决,下一个需要关注的技术点将是如何在 `anyio` 和 `asyncio` 的环境中优雅地捕获和处理并发任务中的错误。 +2. **LLM 的稳定性**: 尽管 Agent 的逻辑是 LLM 驱动的,但我们还未充分测试在真实、长链条的工具调用下,LLM 生成的参数和决策的稳定性。这可能是下一个潜在的问题点。 + +## 下一步计划 + +### 短期目标 (本次会话) +- [x] **重构 `run_tests.py`** 以正确设置服务器的 `cwd`。 (已完成) +- [x] **调整 `DMSProviderServer.py`** 中的文件路径以匹配新的 `cwd`。(已完成) +- [ ] **执行最终测试**: 在您重启对话后,我们将立即运行 `run_tests.py`,并期望看到 `DMSProviderServer` 成功加载 API 列表,Agent 开始执行测试。 +- [ ] **修复 `TaskGroup` 异常**: 解决在 `agent_main_loop.py` 中出现的 `AttributeError: 'NoneType' object has no attribute 'get'`,这个错误很可能是由空的 API 列表间接触发的。 \ No newline at end of file diff --git a/compliance-mcp-agent/memory-bank/productContext.md b/compliance-mcp-agent/memory-bank/productContext.md new file mode 100644 index 0000000..6c51769 --- /dev/null +++ b/compliance-mcp-agent/memory-bank/productContext.md @@ -0,0 +1,25 @@ +# 产品上下文 + +## 问题陈述:传统测试框架的“天花板” + +随着 API 数量的增多和合规性规则日益复杂,我们现有的、基于代码的合规性测试框架正面临一个难以突破的“天花板”。 + +1. **僵化与脆弱**: 每当出现一个新的合规规则,我们就必须编写一个新的、硬编码的测试用例。这种紧耦合的设计使得框架越来越臃肿,修改一处就可能引发意想不到的连锁反应。 +2. **扩展性差**: 添加一种新的测试能力(比如,集成一个新的静态分析工具)需要深入修改核心的测试编排器逻辑。这个过程不仅耗时,而且对开发人员的水平要求很高,阻碍了社区贡献和团队协作。 +3. **可维护性噩梦**: 测试逻辑、工具调用、报告生成等所有功能都混杂在一起,使得代码难以理解和维护。排查一个简单的 bug 可能需要在多个模块之间来回跳转,心智负担极重。 +4. **智能程度低**: 传统框架只能执行预先定义好的、线性的测试路径。它无法理解规则的“意图”,也无法在遇到预期外情况时进行动态调整或探索性测试。 + +## 解决方案:一个“会思考”的测试平台 + +我们提出的基于 MCP 的 AI Agent 框架,旨在从根本上解决上述问题,将我们的测试工具从一个死板的“执行器”升级为一个会思考、可扩展的“平台”。 + +1. **从“硬编码”到“软编排”**: 我们不再编写固定的测试流程。取而代之的是,我们给 Agent 一个**目标**(“验证这个API是否符合这条规则”),然后由 Agent **自主地、动态地**编排和调用一系列原子化的工具来达成这个目标。这种灵活性是革命性的。 +2. **无限的扩展能力**: 想要增加一个新的测试能力?非常简单,只需开发一个独立的、符合 MCP 规范的工具 Server 即可。这个新工具会自动被 Agent 发现并使用,完全不需要修改 Host 或 Client 的核心代码。这为框架的生态发展打开了无限可能。 +3. **清晰的关注点分离**: Host 只关心“流程”,Client 只关心“思考”,Server 只关心“执行”。这种架构上的清晰性使得每个组件都变得简单、可独立开发和测试,极大地降低了维护成本。 +4. **涌现的智能**: Agent 不仅能执行已知的测试,未来还有可能通过推理,发现规则之间隐藏的关联,或者设计出人类工程师没有想到的测试路径,从而找到更深层次的 bug。 + +## 用户体验目标 + +* **对于规则制定者**: 他们可以用更接近自然语言的方式来定义合规性规则,而无需关心具体的测试代码实现。 +* **对于工具开发者**: 他们可以轻松地将自己的工具(如静态扫描器、安全检查器等)封装成 MCP Server,无缝集成到我们的测试生态中。 +* **对于测试工程师**: 他们将得到一个高度自动化且结果可信、过程透明的测试伙伴,能将他们从繁琐的脚本编写中解放出来,专注于更有创造性的测试策略分析。 \ No newline at end of file diff --git a/compliance-mcp-agent/memory-bank/progress.md b/compliance-mcp-agent/memory-bank/progress.md new file mode 100644 index 0000000..7e8d8bb --- /dev/null +++ b/compliance-mcp-agent/memory-bank/progress.md @@ -0,0 +1,57 @@ +# 项目进度 + +## 里程碑 1: 最小可行产品 (MVP) - (已完成) + +**目标**: 搭建并验证 MCP 架构的端到端通信。 + +### 已完成功能 +- ✅ **项目初始化** + - ✅ 创建 `compliance-mcp-agent` 独立目录。 + - ✅ 创建全新的 `memory-bank`。 +- ✅ **核心文档撰写** + - ✅ `projectbrief.md`, `systemPatterns.md`, `techContext.md`, `productContext.md`... +- ✅ **搭建基础框架** + - ✅ 创建 `requirements.txt` 并添加依赖。 + - ✅ 实现 `APICallerServer.py`。 + - ✅ 实现 `run_tests.py` (Host)。 + - ✅ 实现 `agent_main_loop.py` (Client)。 +- ✅ **"Hello World" 级测试** + - ✅ 成功运行了第一个端到端的单服务器测试。 + +--- + +## 里程碑 2: 功能完备版本 - (进行中) + +**目标**: 实现一个功能完备的、由 Agent 驱动的测试流程。 + +### 已完成功能 +- ✅ **多服务器架构** + - ✅ 实现 `SchemaValidatorServer.py`,提供严格和灵活的 Schema 验证工具。 + - ✅ 实现 `DMSProviderServer.py`,动态提供 API 列表和 Schema 定义。 + - ✅ 实现 `TestManagerServer.py`,用于跟踪和管理测试进度。 +- ✅ **LLM 驱动的 Agent** + - ✅ 在 `agent_main_loop.py` 中集成了真实的 LLM 调用。 + - ✅ Agent 能够自主地与所有服务器交互,获取工具并制定初步计划。 + +### 正在进行的工作 +- 🔄 **修复核心架构障碍**: + - [x] **根源定位**: 已准确定位到 `run_tests.py` 启动的子进程因错误的 CWD 而无法找到数据文件。 + - [x] **解决方案实施**: 已重构 `run_tests.py` 以强制设定子进程的 `cwd`,并同步更新了 `DMSProviderServer.py` 中的文件路径。 + - [ ] **最终验证**: 等待下一次运行,以确认 Agent 现在可以成功获取 API 列表并开始执行测试。 + +### 待完成工作 +- ⏳ 解决 `agent_main_loop.py` 中出现的 `TaskGroup` 异步错误。 +- ⏳ 实现完整的 CRUD (Create, Read, Update, Delete, List) 测试生命周期。 +- ⏳ 生成结构化的、可读的测试报告。 + +--- + +## 里程碑 3: 企业就绪版本 (未来规划) + +**目标**: 成为一个健壮、可靠、可用于生产环境的合规性审计平台。 + +- ⏳ 拥有完善的错误处理、重试和超时机制。 +- ⏳ 提供清晰的日志和可观测性。 +- ⏳ 支持更复杂的测试场景,如多 Agent 协作。 +- ⏳ 具备优秀的用户文档和开发者文档。 +- ⏳ (可选) 提供 Web 界面来配置和查看测试结果。 \ No newline at end of file diff --git a/compliance-mcp-agent/memory-bank/projectbrief.md b/compliance-mcp-agent/memory-bank/projectbrief.md new file mode 100644 index 0000000..b215bcb --- /dev/null +++ b/compliance-mcp-agent/memory-bank/projectbrief.md @@ -0,0 +1,23 @@ +# 项目简介:AI Agent 驱动的合规性测试框架 + +## 项目概述 +本项目旨在从零开始,构建一个基于 **模型-上下文-协议 (Model-Context-Protocol, MCP)** 的下一代 API 合规性测试框架。我们将用一个自主决策的 **AI Agent** 来取代传统的、基于固定脚本的测试逻辑。这个 Agent 将利用一套标准化的、可扩展的 **工具集 (MCP Servers)**,动态地规划和执行测试步骤,以验证 API 是否符合指定的合规性规则。 + +## 核心需求 +1. **MCP 原生架构**: 系统的所有组件交互都必须严格遵循 MCP 规范,实现 Host, Client, 和 Servers 之间的清晰分离。 +2. **AI Agent 驱动**: 测试的执行逻辑由一个核心的 LLM Agent 驱动,它能够自主进行推理、规划和调用工具。 +3. **可扩展的工具集**: 所有的测试能力(如 API 调用、数据生成、结果断言)都必须被封装成独立的、符合 MCP 规范的 Server。 +4. **标准化与模块化**: 彻底抛弃硬编码的集成方式,实现测试能力和测试流程的完全解耦。 +5. **透明的可审计性**: Agent 的每一个决策步骤、每一次工具调用都必须被完整记录,形成清晰、可审计的测试日志。 + +## 关键目标 +1. **提升灵活性**: 使测试框架能够轻松适应新的合规规则,甚至在没有明确测试脚本的情况下,也能通过自然语言描述的规则进行测试。 +2. **增强扩展性**: 允许任何开发者通过创建一个新的、符合 MCP 规范的工具服务器来为框架贡献新的测试能力。 +3. **提高可维护性**: 通过将系统拆分为职责单一的独立组件,大幅降低代码的耦合度和维护成本。 +4. **探索 Agentic Workflow**: 验证 AI Agent 在软件测试这一高度结构化领域的自主工作能力,为更复杂的 Agentic 自动化流程积累经验。 + +## 技术栈 +- **核心协议**: Model-Context-Protocol (MCP) +- **官方 SDK**: `model-context-protocol/python-sdk` +- **核心语言**: Python 3.8+ +- **Agent 大脑**: 兼容 OpenAI API 的大语言模型 (LLM) \ No newline at end of file diff --git a/compliance-mcp-agent/memory-bank/systemPatterns.md b/compliance-mcp-agent/memory-bank/systemPatterns.md new file mode 100644 index 0000000..72112dd --- /dev/null +++ b/compliance-mcp-agent/memory-bank/systemPatterns.md @@ -0,0 +1,63 @@ +# 系统架构与设计模式 + +## 核心架构:模型-上下文-协议 (MCP) + +本系统严格遵循 MCP 定义的 **Host-Client-Server** 架构,旨在实现组件的终极解耦和高可扩展性。 + +```mermaid +graph TD + subgraph TestRunnerApp [测试运行程序 (MCP Host)] + style TestRunnerApp fill:#e6f2ff,stroke:#b3d9ff + A[run_mcp_tests.py
(Host 实例)] + A -- 1. 为每个测试任务
创建并管理Client会话 --> B + A -- 4. 汇总所有Client的
结论,生成报告 --> E[最终测试报告] + end + + subgraph AgentSession [独立的Agent会话 (MCP Client)] + style AgentSession fill:#e6ffe6,stroke:#b3ffb3 + B[Client 实例
(包含LLM的Agent核心)] + B -- 2. 向Host请求
使用工具 --> D + end + + subgraph Toolbelt [MCP工具集 (MCP Servers)] + style Toolbelt fill:#fff0e6,stroke:#ffccb3 + D[APICallerServer
DataGenServer
AssertionServer
...
] + D -- 3. 执行操作
并将结果通过Host返回 --> B + end +``` + +## 组件职责详解 + +### 1. MCP Host (测试运行程序) + +* **角色**: 整个测试流程的 **总控制器**、**安全边界** 和 **环境提供者**。它如同一个“办公室”环境,为 Agent 的工作提供场地、工具和规则。 +* **职责**: + * **流程编排**: 加载 API 规范和合规规则,生成测试任务列表,并为每个任务启动一个独立的、隔离的 Client 会话。 + * **生命周期管理**: 负责创建、监督和销毁 Client 实例。如果某个 Agent 会话卡死或崩溃,Host 会终止它并继续下一个任务,确保整体流程的健壮性。 + * **安全与路由**: 作为所有通信的中间人,它接收来自 Client 的工具调用请求,验证其权限,然后将其安全地路由到指定的 Server。它也负责将 Server 的结果返回给正确的 Client。**Client 和 Server 之间永不直接通信**。 + +### 2. MCP Client (Agent 会话) + +* **角色**: 承载 **LLM(大语言模型)** 的执行实体,是 Agent 的“大脑”和“身体”的结合。 +* **职责**: + * **任务执行**: 从 Host 接收一个明确的测试目标。 + * **推理规划**: 内部的 LLM 负责思考和规划,决定需要执行哪些步骤、调用哪些工具来达成测试目标。 + * **与 Host 通信**: 将 LLM 的决策转化为对 Host 的标准 `call_tool` 请求。 + * **状态保持**: 在会话内部维持短期的记忆和上下文,以完成连贯的、多步骤的测试逻辑。 + +### 3. MCP Servers (工具集) + +* **角色**: 提供单一、原子化能力的 **功能模块**。每个 Server 都是一个独立的微服务。 +* **职责**: + * **提供能力**: 封装一种特定的能力,例如: + * `APICallerServer`: 仅负责发起 HTTP 请求。 + * `DataGeneratorServer`: 仅负责根据 Schema 生成数据。 + * `AssertionServer`: 仅负责比较两个值是否相等。 + * **无状态与隔离**: Server 本身是无状态的(或会话状态由 Host 管理),并且对其他 Server 和整个测试任务一无所知。这种设计确保了工具的高度可复用性和可独立测试性。 + +## 设计模式应用 + +* **单一职责原则**: 每个组件(Host, Client, Server)和每个 Server 内部的工具都有单一、明确的职责。 +* **策略模式**: 每个合规性规则可以被看作一种“策略”,Agent 根据不同的策略(规则目标)来组织其工具调用序列。 +* **外观模式**: Host 为 Client 提供了一个统一的、简化的接口来访问背后复杂的工具集,Client 无需关心工具的具体位置和实现。 +* **微服务架构**: 整个工具集由一系列独立的、可独立部署的 Server 构成,体现了微服务的思想,极大地提高了系统的灵活性和可维护性。 \ No newline at end of file diff --git a/compliance-mcp-agent/memory-bank/techContext.md b/compliance-mcp-agent/memory-bank/techContext.md new file mode 100644 index 0000000..7e46b53 --- /dev/null +++ b/compliance-mcp-agent/memory-bank/techContext.md @@ -0,0 +1,54 @@ +# 技术上下文 + +## 核心技术栈 + +| 类别 | 技术/库 | 版本 | 用途 | +| ---------- | ----------------------------------------- | ---- | ---------------------------------------------- | +| 核心协议 | Model-Context-Protocol (MCP) | v1+ | 定义系统所有组件间的通信标准。 | +| **官方SDK** | **`model-context-protocol/python-sdk`** | 最新 | **我们实现Host, Client, Server的基石。** | +| 核心语言 | Python | 3.8+ | 主要开发语言。 | +| AI模型 | 兼容OpenAI API的大语言模型 (LLM) | - | 作为Agent的“大脑”,负责推理和规划。 | +| HTTP客户端 | requests | 最新 | 在APICallerServer中用于执行HTTP请求。 | +| Web框架 | (可选) FastAPI / Flask | - | 或许会用于构建可通过HTTP访问的远程MCP Server。 | + +## 开发环境设置 + +### 必要组件 +- Python 3.8 或更高版本 +- `uv` 或 `pip` (用于管理Python包依赖) +- Git (版本控制) +- 支持Python的IDE (推荐 VS Code 或 PyCharm) + +### 项目安装步骤 (预期) +1. **克隆代码仓库**: + ```bash + git clone <仓库URL> + cd compliance-mcp-agent + ``` +2. **创建虚拟环境**: + ```bash + python -m venv .venv + source .venv/bin/activate + ``` +3. **安装依赖**: + 我们将创建一个 `requirements.txt` 文件,内容至少包括: + ``` + model-context-protocol + requests + # 其他未来可能需要的依赖 + ``` + 然后执行安装: + ```bash + uv pip install -r requirements.txt + ``` +4. **运行项目**: + * **启动所有MCP Servers**: 需要编写一个脚本来并行启动所有工具服务器。 + * **启动MCP Host**: 运行主程序 `run_mcp_tests.py` 来开始整个测试流程。 + +## 关键技术决策 + +1. **SDK 优先**: 我们将尽可能地利用官方 Python SDK 的能力,而不是重新发明轮子。所有的 Host/Client/Server 实现都应基于该 SDK 提供的类和方法。 +2. **Stdio 通信**: 在项目初期,为了简单起见,Host 和 Client 之间的通信将主要通过标准输入/输出 (`stdio`) 进行,这由 `stdio_client` 提供支持。这对于本地运行的 Agent 来说足够高效。 +3. **独立的 Server 进程**: 每个 MCP Server 都将作为一个独立的 Python 进程运行。这确保了工具之间的完全隔离,并为未来将某个工具部署为网络服务(例如使用 FastAPI)提供了可能性。 +4. **异步编程**: 官方 SDK 大量使用了 `asyncio`。因此,我们的 Host 和 Client 代码也必须是异步的,以充分利用 SDK 的性能。 +5. **LLM 接口**: Agent 与 LLM 的交互将通过一个通用的、兼容 OpenAI 的 API 客户端进行。这允许我们未来可以轻松切换不同的后端 LLM 服务。 \ No newline at end of file diff --git a/compliance-mcp-agent/requirements.txt b/compliance-mcp-agent/requirements.txt new file mode 100644 index 0000000..9d690ec --- /dev/null +++ b/compliance-mcp-agent/requirements.txt @@ -0,0 +1,5 @@ +mcp[cli] +requests +uvicorn +Flask +openai \ No newline at end of file diff --git a/compliance-mcp-agent/run_tests.py b/compliance-mcp-agent/run_tests.py new file mode 100644 index 0000000..4556966 --- /dev/null +++ b/compliance-mcp-agent/run_tests.py @@ -0,0 +1,125 @@ +import subprocess +import time +import os +import asyncio + +# 全局常量 +AGENT_SCRIPT = "compliance-mcp-agent/agent_main_loop.py" + +async def log_subprocess_output(stream, prefix): + """异步读取并打印子进程的输出流。""" + while True: + try: + line = await stream.readline() + if line: + print(f"[{prefix}] {line.decode().strip()}") + else: + # End of stream + break + except Exception as e: + print(f"Error reading stream for {prefix}: {e}") + break + +async def start_servers(): + """异步启动所有MCP服务器,并捕获它们的日志。""" + print("="*20 + " Starting MCP Servers " + "="*20) + + server_processes = {} + log_tasks = [] + + server_scripts = [ + "servers/APICallerServer.py", + "servers/SchemaValidatorServer.py", + "servers/DMSProviderServer.py", + "servers/TestManagerServer.py", + ] + + project_root = os.path.dirname(os.path.abspath(__file__)) + + for script_path_rel in server_scripts: + script_path_abs = os.path.join(project_root, script_path_rel) + server_name = os.path.splitext(os.path.basename(script_path_abs))[0] + server_dir = os.path.dirname(script_path_abs) + + print(f"Starting server: {script_path_rel}...") + + env = os.environ.copy() + + process = await asyncio.create_subprocess_exec( + "uv", "run", "python", os.path.basename(script_path_abs), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + cwd=server_dir + ) + + server_processes[server_name] = process + print(f" -> Started {script_path_rel} with PID: {process.pid}") + + log_tasks.append(asyncio.create_task(log_subprocess_output(process.stdout, server_name))) + log_tasks.append(asyncio.create_task(log_subprocess_output(process.stderr, f"{server_name}-ERROR"))) + + print(f"\nAll {len(server_scripts)} servers are running in the background.") + return server_processes, log_tasks + +async def run_agent(agent_script_path): + """异步运行Agent主循环并实时打印其输出。""" + print("\n" + "="*20 + " Running Agent " + "="*20) + + process = await asyncio.create_subprocess_exec( + "uv", "run", "python", agent_script_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + agent_stdout_task = asyncio.create_task(log_subprocess_output(process.stdout, "Agent")) + agent_stderr_task = asyncio.create_task(log_subprocess_output(process.stderr, "Agent-ERROR")) + + await process.wait() + + await asyncio.gather(agent_stdout_task, agent_stderr_task) + + print(f"\nAgent process finished with return code: {process.returncode}") + + +def cleanup_servers(processes): + """停止所有服务器进程。""" + print("\n" + "="*20 + " Cleaning up Servers " + "="*20) + for name, process in processes.items(): + if process.returncode is None: # 仅当进程仍在运行时才终止 + print(f"Terminating server {name} (PID: {process.pid})...") + try: + process.terminate() + print(f" -> Terminated signal sent.") + except ProcessLookupError: + print(f" -> Process {name} (PID: {process.pid}) already gone.") + else: + print(f" -> Server {name} (PID: {process.pid}) already finished with code {process.returncode}.") + + print("Cleanup complete.") + +async def main(): + """ + 主入口点,异步运行所有测试。 + """ + server_processes, log_tasks = await start_servers() + + print("\nWaiting for servers to initialize...") + await asyncio.sleep(8) + + try: + await run_agent(AGENT_SCRIPT) + finally: + cleanup_servers(server_processes) + + # 取消仍在运行的日志任务 + for task in log_tasks: + task.cancel() + await asyncio.gather(*log_tasks, return_exceptions=True) + print("Log watchers terminated.") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nMain process interrupted by user. Cleaning up...") \ No newline at end of file diff --git a/compliance-mcp-agent/servers/APICallerServer.py b/compliance-mcp-agent/servers/APICallerServer.py new file mode 100644 index 0000000..35d1901 --- /dev/null +++ b/compliance-mcp-agent/servers/APICallerServer.py @@ -0,0 +1,58 @@ +import requests +import uvicorn +from mcp.server.fastmcp.server import FastMCP +from typing import Optional +import logging + +# 配置日志记录 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# 1. 使用 FastMCP 创建一个 Server 实例 +mcp = FastMCP( + "APICallerServer", + title="API Caller Server", + description="A server that provides a tool to call APIs.", + version="0.2.0" # a new version +) + +# 2. 使用 @mcp.tool() 装饰器来定义一个工具 +@mcp.tool() +def api_caller(method: str, url: str, headers: Optional[dict] = None, params: Optional[dict] = None, json_body: Optional[dict] = None) -> dict: + """ + 一个通用的API调用工具,可以发送HTTP请求。 + """ + logging.info(f"api_caller: Received request -> method={method}, url={url}, params={params}, json_body={json_body}") + try: + response = requests.request( + method=method, + url=url, + headers=headers, + params=params, + json=json_body + ) + response.raise_for_status() # 如果状态码是 4xx 或 5xx,则引发HTTPError + + logging.info(f"api_caller: Request to {url} successful with status code {response.status_code}") + + # 尝试将响应解析为JSON,如果失败则作为纯文本返回 + try: + response_body = response.json() + except requests.exceptions.JSONDecodeError: + response_body = response.text + + return { + "status_code": response.status_code, + "headers": dict(response.headers), + "body": response_body + } + except requests.exceptions.RequestException as e: + logging.error(f"api_caller: Request to {url} failed. Error: {e}", exc_info=True) + return { + "error": "APIRequestError", + "message": str(e) + } + +# 3. (可选) 如果直接运行此文件,则启动服务器 +if __name__ == "__main__": + # FastMCP对象本身不是ASGI应用,但它的 streamable_http_app() 方法会返回一个 + uvicorn.run(mcp.streamable_http_app(), host="127.0.0.1", port=8001) \ No newline at end of file diff --git a/compliance-mcp-agent/servers/DMSProviderServer.py b/compliance-mcp-agent/servers/DMSProviderServer.py new file mode 100644 index 0000000..e2fe019 --- /dev/null +++ b/compliance-mcp-agent/servers/DMSProviderServer.py @@ -0,0 +1,119 @@ +import requests +import uvicorn +import logging +from mcp.server.fastmcp import FastMCP +from typing import List, Dict, Any +import json +import os + +# --- 配置 --- +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# --- MCP Server 定义 --- +mcp = FastMCP() + +# 定义正确的DMS服务器基地址 +DMS_BASE_URL = "http://127.0.0.1:5001" +MOCK_DMS_API_LIST_URL = f"{DMS_BASE_URL}/api/schema/manage/schema" +MOCK_DMS_SCHEMA_DETAIL_URL_TEMPLATE = f"{DMS_BASE_URL}/api/schema/manage/schema/{{model_id}}" + +@mcp.tool() +def get_api_list() -> Dict[str, Any]: + """ + 通过HTTP请求从模拟的DMS服务器获取所有可用API的列表,并返回一个干净的、包含records的字典。 + """ + logging.info(f"DMSProviderServer: Attempting to fetch API list from {MOCK_DMS_API_LIST_URL}") + try: + response = requests.get(MOCK_DMS_API_LIST_URL, timeout=5) + response.raise_for_status() + + raw_data = response.json() + + # 核心修正:根据mock server的实际返回,深入到'data'键下提取'records' + records = raw_data.get("data", {}).get("records", []) + + logging.info(f"DMSProviderServer: Successfully parsed response. Found {len(records)} records.") + + # 返回一个干净、统一的结构,并加入版本号探针 + return {"records": records, "version": "1.1"} + + except requests.exceptions.RequestException as e: + error_message = f"Failed to connect to mock DMS server for API list: {e}" + logging.error(f"DMSProviderServer: {error_message}") + return {"records": [], "error": error_message} + except json.JSONDecodeError: + error_message = "Failed to decode JSON response for API list from mock DMS server." + logging.error(f"DMSProviderServer: {error_message}") + return {"records": [], "error": error_message} + +@mcp.tool() +def get_schema_by_id(model_id: str) -> Dict[str, Any]: + """ + 根据模型ID,通过HTTP请求从模拟的DMS服务器获取其JSON Schema。 + """ + schema_url = MOCK_DMS_SCHEMA_DETAIL_URL_TEMPLATE.format(model_id=model_id) + logging.info(f"DMSProviderServer: Attempting to fetch schema for '{model_id}' from {schema_url}") + + try: + response = requests.get(schema_url, timeout=5) + response.raise_for_status() + + raw_data = response.json() + + # 核心修正:提取'data'键下的schema对象 + schema = raw_data.get("data") + + if schema: + logging.info(f"DMSProviderServer: Successfully parsed schema for '{model_id}'.") + return {"schema": schema} + else: + error_message = f"Schema data for '{model_id}' is empty or missing in the response." + logging.warning(f"DMSProviderServer: {error_message}") + return {"error": error_message} + + except requests.exceptions.RequestException as e: + error_message = f"Failed to connect to mock DMS server for schema '{model_id}': {e}" + logging.error(f"DMSProviderServer: {error_message}") + return {"error": error_message} + except json.JSONDecodeError: + error_message = f"Failed to decode JSON response for schema '{model_id}' from mock DMS server." + logging.error(f"DMSProviderServer: {error_message}") + return {"error": error_message} + + +@mcp.tool() +def get_dms_crud_endpoints(model_id: str) -> Dict[str, Any]: + """ + 根据模型ID,生成并返回其所有标准的CRUD操作端点(create, list, read, update, delete)的完整定义。 + """ + # 这个函数的逻辑是基于名称生成,暂时不需要对接mock服务,所以保持不变 + base_path = model_id.split('.')[0] + + endpoints = { + "create": { + "method": "POST", + "url": f"{DMS_BASE_URL}/api/dms/wb_ml/v1/{base_path}" + }, + "list": { + "method": "POST", # 根据mock server,list是POST + "url": f"{DMS_BASE_URL}/api/dms/wb_ml/v1/{base_path}/1.0.0" # 根据mock server,需要版本号 + }, + "read": { + "method": "GET", + "url": f"{DMS_BASE_URL}/api/dms/wb_ml/v1/{base_path}/1.0.0/{{id}}" # 根据mock server,需要版本号 + }, + "update": { + "method": "PUT", + "url": f"{DMS_BASE_URL}/api/dms/wb_ml/v1/{base_path}" + }, + "delete": { + "method": "DELETE", + "url": f"{DMS_BASE_URL}/api/dms/wb_ml/v1/{base_path}" + } + } + return endpoints + +# --- 启动服务器 --- +if __name__ == "__main__": + import uvicorn + uvicorn.run(mcp.streamable_http_app(), host="127.0.0.1", port=8003) \ No newline at end of file diff --git a/compliance-mcp-agent/servers/SchemaValidatorServer.py b/compliance-mcp-agent/servers/SchemaValidatorServer.py new file mode 100644 index 0000000..271f826 --- /dev/null +++ b/compliance-mcp-agent/servers/SchemaValidatorServer.py @@ -0,0 +1,88 @@ +from mcp.server.fastmcp.server import FastMCP +from pydantic import BaseModel, ValidationError +import jsonschema +import uvicorn +import logging +from jsonschema import validate, ValidationError +from mcp.server.fastmcp.server import FastMCP + +# 新增导入 +from response_utils import extract_data_for_validation + +mcp = FastMCP( + "SchemaValidatorServer", + title="JSON Schema Validator Server", + description="A server that provides a tool to validate data against a JSON Schema.", + version="0.1.0" +) + +@mcp.tool() +def validate_schema(data_instance: dict, schema: dict) -> dict: + """ + Validates a data instance against a given JSON Schema. + + Args: + data_instance: The data object to validate. + schema: The JSON Schema to validate against. + + Returns: + A dictionary containing the validation result. + {"isValid": True} on success. + {"isValid": False, "error": "Validation error message"} on failure. + """ + try: + jsonschema.validate(instance=data_instance, schema=schema) + return {"isValid": True, "error": None} + except ValidationError as e: + logging.error(f"SchemaValidator: Validation failed. Error: {e.message}", exc_info=True) + return {"isValid": False, "error": e.message} + except Exception as e: + # Catch other potential errors from the jsonschema library + return {"isValid": False, "error": str(e)} + +@mcp.tool() +def validate_flexible_schema(api_response: dict, item_schema: dict) -> dict: + """ + 对一个可能带有标准包装(如 {code, message, data})的API响应进行灵活的schema验证。 + 它能自动提取核心业务数据(无论是单个对象还是列表)并逐项进行验证。 + + Args: + api_response (dict): 完整的API响应体。 + item_schema (dict): 描述核心业务数据**单个元素**的JSON Schema。 + + Returns: + dict: 一个包含验证结果的字典, {"isValid": True} 或 {"isValid": False, "error": "..."}。 + """ + logging.info("SchemaValidator: Running flexible validation...") + try: + # 使用工具函数提取需要验证的数据 + items_to_validate = extract_data_for_validation(api_response) + + if not items_to_validate: + error_message = "Flexible validation failed: Could not extract any items to validate from the response." + logging.warning(error_message) + return {"isValid": False, "error": error_message} + + logging.info(f"Flexible validation: Extracted {len(items_to_validate)} item(s) to validate.") + + # 逐个验证提取出的项 + for i, item in enumerate(items_to_validate): + validate(instance=item, schema=item_schema) + logging.info(f" -> Item {i+1}/{len(items_to_validate)} passed validation.") + + logging.info("SchemaValidator: Flexible validation successful. All items conform to the schema.") + return {"isValid": True} + + except ValidationError as e: + error_message = f"Flexible validation failed on an item. Error: {e.message}" + logging.error(error_message, exc_info=True) + return {"isValid": False, "error": error_message} + except Exception as e: + error_message = f"An unexpected error occurred during flexible validation: {e}" + logging.error(error_message, exc_info=True) + return {"isValid": False, "error": error_message} + + +# --- 启动服务器 --- +if __name__ == "__main__": + uvicorn.run(mcp.streamable_http_app(), host="127.0.0.1", port=8002) \ No newline at end of file diff --git a/compliance-mcp-agent/servers/TestManagerServer.py b/compliance-mcp-agent/servers/TestManagerServer.py new file mode 100644 index 0000000..76dd215 --- /dev/null +++ b/compliance-mcp-agent/servers/TestManagerServer.py @@ -0,0 +1,105 @@ +import uvicorn +import logging +from mcp.server.fastmcp import FastMCP +from typing import List, Dict, Any +import threading + +# --- 配置 --- +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# --- MCP Server 定义 --- +mcp = FastMCP() + +# 使用一个简单的字典和线程锁来安全地管理状态 +test_state: Dict[str, Any] = {} +state_lock = threading.Lock() + +def _reset_state(): + """重置测试状态,不加锁,供内部调用""" + global test_state + test_state = { + "results": {}, # e.g., {"api_id_1": [{"task_name": "...", "status": "...", "details": "..."}]} + "apis_pending_init": [], + } + +_reset_state() # 初始化状态 + +@mcp.tool() +def initialize_test_plan(api_ids: List[str]) -> Dict[str, Any]: + """ + 根据提供的API ID列表,初始化测试计划。 + 这会重置所有测试状态,并为每个API准备好存储多次任务结果的列表。 + """ + with state_lock: + _reset_state() + for api_id in api_ids: + test_state["results"][api_id] = [] + + test_state["apis_pending_init"] = list(api_ids) + logger.info(f"TestManager: Initialized test plan with {len(api_ids)} APIs.") + return { + "status": "success", + "message": f"Test plan initialized for {len(api_ids)} APIs.", + "apis_pending": list(api_ids) + } + +@mcp.tool() +def record_test_result(api_id: str, task_name: str, status: str, details: str) -> Dict[str, Any]: + """ + 记录一个API在一个特定任务上的测试结果。 + + Args: + api_id (str): 被测试的API的ID。 + task_name (str): 执行的任务的名称。 + status (str): 测试状态,例如 'passed' 或 'failed'。 + details (str): 关于测试结果的详细描述或摘要。 + """ + with state_lock: + if api_id not in test_state["results"]: + # 如果由于某种原因API ID不存在,先创建它 + test_state["results"][api_id] = [] + + # 将本次任务的结果追加到列表中 + test_state["results"][api_id].append({ + "task_name": task_name, + "status": status, + "details": details + }) + + logger.info(f"TestManager: Recorded result for {api_id} on task '{task_name}': {status}") + return {"status": "success", "message": f"Result for {api_id} on task '{task_name}' recorded."} + +@mcp.tool() +def get_test_summary() -> Dict[str, Any]: + """ + 获取整个测试活动的最终摘要。 + """ + with state_lock: + total_apis = len(test_state["results"]) + tasks_completed_count = sum(len(tasks) for tasks in test_state["results"].values()) + + summary = { + "total_apis": total_apis, + "total_tasks_completed": tasks_completed_count, + "results": test_state["results"] + } + logger.info("TestManager: Providing test summary.") + return summary + +# --- 启动服务器 --- +if __name__ == "__main__": + import uvicorn + + # 移除get_next_api_to_test工具,因为它在M*N模型中不再需要。 + # 我们使用 try...except 来确保即使工具不存在或属性名不正确,程序也不会崩溃。 + try: + # 基于之前的观察,我们尝试使用 _tools 属性 + if "get_next_api_to_test" in mcp._tools: + del mcp._tools["get_next_api_to_test"] + logger.info("Successfully removed deprecated tool: get_next_api_to_test") + except (AttributeError, KeyError): + logger.warning("Could not remove 'get_next_api_to_test' tool (it may not exist or the tools attribute name is different). Continuing...") + pass + + uvicorn.run(mcp.streamable_http_app(), host="127.0.0.1", port=8004) \ No newline at end of file diff --git a/compliance-mcp-agent/servers/__pycache__/response_utils.cpython-312.pyc b/compliance-mcp-agent/servers/__pycache__/response_utils.cpython-312.pyc new file mode 100644 index 0000000..697fcaa Binary files /dev/null and b/compliance-mcp-agent/servers/__pycache__/response_utils.cpython-312.pyc differ diff --git a/compliance-mcp-agent/servers/response_utils.py b/compliance-mcp-agent/servers/response_utils.py new file mode 100644 index 0000000..ff4ea1e --- /dev/null +++ b/compliance-mcp-agent/servers/response_utils.py @@ -0,0 +1,50 @@ +from typing import Any, List, Optional +import logging + +logger = logging.getLogger(__name__) + +def extract_data_for_validation(response_json: Any, nested_list_keywords: Optional[List[str]] = None) -> List[Any]: + """ + 从原始API响应JSON中智能提取需要被验证的核心业务数据列表。 + 即使只有一个对象,也返回一个单元素的列表。 + + 策略: + 1. 如果响应体是包含 'code' 和 'data' 的标准包装,则提取 'data' 的内容。 + 2. 对上一步的结果,遍历一个关键字列表(nested_list_keywords),检查是否存在分页列表模式,如果存在则提取该列表。 + 3. 如果处理后的数据是列表,直接返回该列表。 + 4. 如果处理后的数据是单个对象(字典),将其包装在单元素列表中返回。 + 5. 如果数据为空或不适用,返回空列表。 + """ + if nested_list_keywords is None: + nested_list_keywords = ["list", "records", "items", "data"] + + if not response_json: + return [] + + data_to_process = response_json + + # 策略 1: 解开标准包装 + if isinstance(response_json, dict) and 'code' in response_json and 'data' in response_json: + logger.debug("检测到标准响应包装,提取 'data' 字段内容进行处理。") + data_to_process = response_json['data'] + + # 策略 2: 提取嵌套的分页列表 + if isinstance(data_to_process, dict): + for keyword in nested_list_keywords: + if keyword in data_to_process and isinstance(data_to_process[keyword], list): + logger.debug(f"检测到关键字为 '{keyword}' 的嵌套列表,提取其内容。") + data_to_process = data_to_process[keyword] + break # 找到第一个匹配的就停止 + + # 策略 3 & 4: 统一返回列表 + if isinstance(data_to_process, list): + logger.debug(f"数据本身为列表,包含 {len(data_to_process)} 个元素,直接返回。") + return data_to_process + + if isinstance(data_to_process, dict): + logger.debug("数据为单个对象,将其包装在列表中返回。") + return [data_to_process] + + # 策略 5: 对于其他情况(如数据为None或非对象/列表类型),返回空列表 + logger.warning(f"待处理的数据既不是列表也不是对象,无法提取进行验证。数据: {str(data_to_process)[:100]}") + return [] \ No newline at end of file diff --git a/compliance-mcp-agent/tasks.json b/compliance-mcp-agent/tasks.json new file mode 100644 index 0000000..d616194 --- /dev/null +++ b/compliance-mcp-agent/tasks.json @@ -0,0 +1,11 @@ +[ + { + "name": "完整的CRUD测试", + "prompt_template": "你的当前唯一任务是为 API 模型 '{api_id}' 执行一个完整的、更严谨的CRUD生命周期测试。推荐遵循以下子步骤:1. **(Setup)** 调用 `get_dms_crud_endpoints` 获取该API的所有端点URL。2. **(CREATE)** 调用'create'端点创建一个新资源。**重要提示**:请求的JSON body必须遵循 `{{\"data\": [ ... ]}}` 的格式,其中 `[...]` 是一个包含一个模型对象的列表。你需要自己根据API的schema来构建这个模型对象,并把它放进列表中。创建成功后,务必从响应中提取并记住新资源的ID。3. **(READ after Create)** 使用上一步获得的ID,调用'read'端点,验证该资源可以被成功读取,且内容与创建时一致。4. **(UPDATE)** 调用'update'端点,修改该资源。和CREATE一样,请求的JSON body也必须遵循 `{{\"data\": [ ... ]}}` 的格式。5. **(READ after Update)** 再次调用'read'端点,验证资源确实已被更新。6. **(DELETE)** 调用'delete'端点,删除该资源。7. **(VERIFY DELETE BY READ)** 再次调用'read'端点,并验证它返回了预期的“未找到”或类似的错误。8. **(VERIFY DELETE BY LIST)** 调用'list'端点,并验证返回的列表中 **不包含** 你已删除的资源的ID。9. **(Record Result)** 最后,调用 `record_test_result` 来记录最终结果。你必须在 'details' 参数中提供一份详细的中文测试总结,说明执行了哪些步骤,关键的断言是什么,以及最终结果是成功还是失败。例如,成功时可以总结:“完成了完整的CRUD流程:成功创建资源(ID: xxx),读取验证一致,更新成功,删除成功,并通过再次读取和列举确认资源已不存在。测试通过。” 失败时则要说明在哪一步失败以及原因。**一旦 `record_test_result` 被调用,你对这个API的任务就彻底结束了。**" + }, + { + "name": "API Schema一致性检查", + "prompt_template": "你的任务是为API模型 '{api_id}' 验证其schema的一致性。请调用 `get_schema_by_id` 获取其JSON Schema。然后,自己构造一个符合该schema的简单数据样本。最后,调用 `validate_schema` 工具,用获取的schema来验证你构造的样本数据。调用 `record_test_result` 记录结果:如果验证成功,则在details中说明“Schema一致性检查通过”,并将状态标记为'passed';否则标记为'failed'并说明原因。" + } +] + diff --git a/mcp-sdk-tmp b/mcp-sdk-tmp new file mode 160000 index 0000000..0b1b52b --- /dev/null +++ b/mcp-sdk-tmp @@ -0,0 +1 @@ +Subproject commit 0b1b52ba45edd5bd3bf4c85e6bf3a8d7baf2766c diff --git a/memory-bank/capability_statement.md b/memory-bank/capability_statement.md new file mode 100644 index 0000000..0a175dc --- /dev/null +++ b/memory-bank/capability_statement.md @@ -0,0 +1,82 @@ +# 项目核心能力说明 (Capability Statement) + +## 1. 引言与愿景 + +本文档旨在提炼并阐述当前自动化测试框架的核心能力。该框架最初为 API 合规性测试而设计,但其底层架构具备高度的灵活性和可扩展性。我们的愿景是基于这些核心能力,将此项目逐步演进为一个支持多种测试类型(如前端UI测试、性能测试、数据一致性测试等)的**通用自动化测试平台**。 + +本文将详细介绍框架的关键设计理念与核心能力,并提供一份清晰的蓝图,展示如何将新的测试领域无缝集成到现有体系中。 + +## 2. 核心能力详解 + +我们的框架通过将测试流程解耦为“目标发现”、“测试执行”、“数据生成”和“结果报告”四个主要阶段,实现了高度的模块化。以下是构成这些阶段的核心能力: + +### 2.1. 可插拔的测试引擎架构 (Pluggable Test Engine) + +这是框架最具扩展性的能力。我们通过“注册表模式”和“基类继承”实现。 + +- **测试用例注册表 (`TestCaseRegistry`)**: 系统会自动发现并注册所有继承自 `BaseTestCase` 的测试用例类。这使得添加新测试用例就像编写一个新类一样简单,无需修改任何核心代码。 +- **通用测试基类 (`BaseTestCase`)**: 它定义了测试用例的生命周期和必要接口(如 `applies_to` 判断适用性,`execute` 执行测试)。 + +**扩展潜力**: +此模式不局限于API测试。我们可以定义新的测试基类,如 `BaseUITestCase` (集成Selenium/Playwright) 或 `BasePerformanceTestCase` (集成JMeter/Locust),测试用例注册表可以同样对它们进行管理和调度。 + +### 2.2. 配置驱动的测试编排 (Configuration-Driven Orchestration) + +测试的执行流程由核心的**测试编排器 (`Orchestrator`)** 控制,但其所有行为都通过外部配置(如YAML文件、命令行参数)来驱动。 + +- **解耦测试逻辑与执行策略**: 用户可以指定测试目标、筛选条件、启用的测试用例、报告输出位置等,而无需触碰代码。 +- **灵活的测试阶段 (`Stage`)**: 编排器支持自定义测试阶段(`BaseStage`),允许在测试执行前后插入自定义逻辑,如环境准备、数据清理等。 + +**扩展潜力**: +当引入新的测试类型时,我们可以为其创建一个新的编排器(如 `UIOrchestrator`),复用相同的配置读取和阶段管理逻辑,仅替换核心的目标发现和测试执行部分。 + +### 2.3. 智能化的数据与场景生成 (Intelligent Data & Scenario Generation) + +框架集成了**LLM服务 (`LLMService`)**,使其超越了传统的数据驱动测试。 + +- **超越静态数据**: LLM能够根据API的Schema动态生成各种有效和无效的测试数据,极大地提升了边缘情况的测试覆盖率。 +- **场景生成**: 未来可以利用LLM生成复杂的用户操作序列(用于UI测试)或模拟真实的用户行为模式(用于性能测试)。 + +**扩展潜力**: +`LLMService` 是一个通用能力,可以为任何需要复杂测试数据的场景提供支持,例如为前端表单生成多样化的输入值。 + +### 2.4. 标准化的多维报告体系 (Standardized, Multi-dimensional Reporting) + +测试框架的核心优势之一是其强大且独立的报告系统。 + +- **执行与查看分离**: `run_api_tests.py` 负责执行测试并生成原始报告数据,而 `history_viewer.py` 提供一个独立的Web应用来查询和可视化所有历史报告。 +- **多种报告格式**: 自动生成机器可读的 `summary.json` 和人类可读的 `api_call_details.md`。 +- **统一数据模型**: 所有测试结果都将被格式化为一个标准的 `TestSummary` 对象。 + +**扩展潜力**: +这个报告系统是完全通用的。任何新的测试引擎(UI测试、性能测试等)只需将其结果构造成 `TestSummary` 格式,就可以立刻被我们的历史查看器支持,无需任何额外开发。 + +### 2.5. 灵活的目标发现与筛选机制 (Flexible Target Discovery & Filtering) + +自动化测试的第一步是确定“测什么”。我们的框架将这一过程抽象化。 + +- **输入源解析器 (`InputParser`)**: 当前系统能解析OpenAPI/Swagger文件来发现API端点。 +- **目标筛选**: 支持通过标签、路径、名称等多种方式筛选出本次需要测试的具体目标。 + +**扩展潜力**: +我们可以轻松添加新的解析器。例如,为前端测试添加一个 `SitemapParser` (解析 `sitemap.xml`) 或 `ComponentManifestParser` (解析组件库的清单文件),以自动发现所有待测页面或组件。 + +## 3. 扩展蓝图:集成前端UI自动化测试 + +为了更具体地说明如何利用上述能力进行扩展,我们以“集成前端UI自动化测试”为例,描绘一个清晰的实施路径: + +1. **定义测试目标输入**: 约定使用 `sitemap.xml` 或自定义的 `ui-targets.json` 文件来描述所有待测试的Web页面及其关键元素。 +2. **实现新解析器**: 创建一个 `SitemapParser` 类,用于解析站点地图文件,并返回一个标准化的“待测目标”列表。 +3. **实现UI测试基类**: 创建 `BaseUITestCase(BaseTestCase)`,它在内部初始化一个WebDriver实例(如Selenium),并提供一些基础的UI操作方法(如 `click`, `type_text`)。 +4. **编写具体UI测试用例**: + - `TC-UI-001-TitleCheck(BaseUITestCase)`: 检查页面标题是否正确。 + - `TC-UI-002-LoginForm(BaseUITestCase)`: 测试登录表单的校验逻辑。 + - `TC-UI-003-BrokenLinks(BaseUITestCase)`: 检查页面是否存在死链。 +5. **适配/创建编排器**: 创建 `UIOrchestrator`,它使用 `SitemapParser` 来发现目标,并调度所有适用的 `BaseUITestCase` 子类来执行测试。 +6. **统一报告格式**: 确保 `UIOrchestrator` 在测试结束后,将其执行结果(包括截图、操作日志等)封装到标准的 `TestSummary` 对象中,并存入报告目录。 + +完成以上步骤后,`history_viewer.py` 将能直接展示UI测试的历史结果,实现了新测试能力的无缝集成。 + +## 4. 结论 + +本框架通过其模块化、可插拔和配置驱动的设计,已为成为一个通用测试平台奠定了坚实的基础。其核心能力并非仅仅为API测试服务,而是构成了一套通用的自动化测试解决方案。通过遵循本文档提供的扩展蓝图,我们可以高效、低成本地将新的测试领域整合进来,逐步实现平台的宏伟愿景。 \ No newline at end of file diff --git a/mock_dms_server.py b/mock_dms_server.py index 912bd90..969d3fa 100644 --- a/mock_dms_server.py +++ b/mock_dms_server.py @@ -73,6 +73,7 @@ def preload_schemas(): def get_api_list(): """模拟获取DMS中所有API列表的接口。""" logging.info("Mock服务器: 收到API列表请求。") + print(f"API_LIST_DATA: {API_LIST_DATA}") return jsonify(API_LIST_DATA) @app.route('/api/schema/manage/schema/', methods=['GET']) @@ -99,6 +100,7 @@ def create_resource(dms_instance_code, name): logging.info(f"Mock服务器: 收到对 '{name}' 的CREATE请求") request_data = request.get_json(silent=True) if not request_data or 'data' not in request_data or not isinstance(request_data['data'], list): + print(f"Mock服务器: 收到对 '{name}' 的CREATE请求, 请求体格式错误: {request_data}") return jsonify({"code": 400, "message": "请求体格式错误,应为 {'data': [...]}"}), 400 if name not in IN_MEMORY_DB: