diff --git a/servers/patchkit_mcp/mcp-rpm.yaml b/servers/patchkit_mcp/mcp-rpm.yaml new file mode 100644 index 0000000000000000000000000000000000000000..95265874fa94b5bbef32d4f60f068cd678e3fcf0 --- /dev/null +++ b/servers/patchkit_mcp/mcp-rpm.yaml @@ -0,0 +1,20 @@ +name: "patchkit_mcp" +summary: "OpenEuler 补丁冲突解决服务" +description: | + 自动化处理OpenEuler 补丁冲突解决的服务 + +dependencies: + system: + - python3 + - git + - patch + python: + - requests + - PyGithub + +files: + required: + - src/server.py + - mcp_config.json + optional: + - src/requirements.txt diff --git a/servers/patchkit_mcp/mcp_config.json b/servers/patchkit_mcp/mcp_config.json new file mode 100644 index 0000000000000000000000000000000000000000..460caa3af3231b141c71c0a778b4a4e739499ffc --- /dev/null +++ b/servers/patchkit_mcp/mcp_config.json @@ -0,0 +1,16 @@ +{ + "mcpServers": { + "patchkit_mcp": { + "command": "uv", + "args": [ + "--directory", "/opt/mcp-servers/servers/patchkit_mcp/src", + "run", "mcp-patch-resolver.py", + "--model_url", "https://api.deepseek.com", + "--api_key", "", + "--model_name", "deepseek-chat" + ], + "disabled": false, + "timeout": 1800 + } + } +} diff --git a/servers/patchkit_mcp/src/README.md b/servers/patchkit_mcp/src/README.md new file mode 100644 index 0000000000000000000000000000000000000000..b7586278281e5215d3373e7bc7c6087ba1d0d5ea --- /dev/null +++ b/servers/patchkit_mcp/src/README.md @@ -0,0 +1,142 @@ +# PatchKit - AI驱动的Patch冲突解决工具 + +PatchKit 是一个专门用于解决 Git patch 冲突的工具,支持分步处理和 AI 辅助冲突解决。 + +## 架构设计 + +### 工具结构 +- **patchkit.py**: 核心工具,提供分步处理功能 +- **mcp-patch-resolver.py**: MCP 服务器,调用 patchkit 工具 + +### 工作目录 +- 所有数据保存在 `~/.patchkit` 目录下 +- 包含源仓库、目标仓库、patch 文件和映射关系 + +## 功能特性 + +### 第一步:克隆仓库 +- 根据源仓库和目标仓库地址克隆代码 +- 智能检测已存在的仓库,避免重复下载 +- 支持指定分支 + +### 第二步:生成 Patch 和映射 +- 根据 commit 列表生成 patch 文件 +- 建立源 commit 到目标 commit 的映射关系 +- 保存映射关系到 JSON 文件 + +### 第三步:AI 冲突解决 +- 如果目标仓库已有解决的 commit,先回退 +- 使用 AI 模型解决 patch 冲突 +- 自动提交解决结果 + +## 使用方法 + +### 直接使用 patchkit 工具 + +#### 1. 克隆仓库 +```bash +python3 patchkit.py --action clone \ + --source_repo "https://github.com/user/source-repo" \ + --source_branch "main" \ + --target_repo "https://github.com/user/target-repo" \ + --target_branch "develop" +``` + +#### 2. 生成 Patch 和映射 +```bash +python3 patchkit.py --action generate \ + --commit_list '[ + { + "commit_message": "修复bug", + "source_commit": "abc123", + "target_commit": "def456" + } + ]' +``` + +#### 3. 解决冲突 +```bash +python3 patchkit.py --action resolve \ + --source_commit "abc123" +``` + +#### 4. 查看状态 +```bash +python3 patchkit.py --action status +``` + +### 使用 MCP 服务器 + +#### 启动服务器 +```bash +python3 mcp-patch-resolver.py \ + --model_url "https://api.openai.com/v1" \ + --api_key "your-api-key" \ + --model_name "gpt-4" +``` + +#### 可用的 MCP 工具 + +1. **clone_repositories**: 克隆源仓库和目标仓库 +2. **generate_patches_and_mapping**: 生成 patch 文件并建立映射关系 +3. **resolve_single_commit**: 解决指定 commit 的冲突 +4. **get_patchkit_status**: 获取当前状态 +5. **resolve_patch_conflicts_batch**: 批量解决冲突(完整流程) + +## 工作流程示例 + +### 分步处理 +1. 使用 `clone_repositories` 克隆仓库 +2. 使用 `generate_patches_and_mapping` 生成 patch 和映射 +3. 使用 `resolve_single_commit` 逐个解决冲突 +4. 使用 `get_patchkit_status` 查看进度 + +### 批量处理 +使用 `resolve_patch_conflicts_batch` 一次性完成所有步骤 + +## 目录结构 + +``` +~/.patchkit/ +├── source/ # 源仓库 +├── target/ # 目标仓库 +├── patches/ # 生成的 patch 文件 +├── commit_mapping.json # commit 映射关系 +└── temp_patch_creator/ # 临时目录(仅在需要时创建) +``` + +## 映射文件格式 + +```json +{ + "abc123": { + "commit_message": "修复bug", + "source_commit": "abc123", + "target_commit": "def456", + "patch_file": "/home/user/.patchkit/patches/abc123.patch", + "status": "resolved", + "resolution_time": "2024-01-01T12:00:00", + "resolution_details": "AI成功解决冲突" + } +} +``` + +## 状态说明 + +- **pending**: 待处理 +- **resolved**: 已解决 +- **failed**: 解决失败 + +## 注意事项 + +1. 确保有足够的磁盘空间存储仓库 +2. AI 模型需要有效的 API 密钥 +3. 网络连接稳定,避免克隆失败 +4. 建议在解决冲突前备份重要数据 + +## 错误处理 + +- 工具会自动检测和跳过已存在的有效仓库 +- 失败的冲突会记录在映射文件中 +- 可以重复执行失败的步骤 +- 详细的错误信息会记录在日志中 \ No newline at end of file diff --git a/servers/patchkit_mcp/src/icon.png b/servers/patchkit_mcp/src/icon.png new file mode 100644 index 0000000000000000000000000000000000000000..5a09d241c68d641054b035e7e3f11004b1d1e6b0 Binary files /dev/null and b/servers/patchkit_mcp/src/icon.png differ diff --git a/servers/patchkit_mcp/src/mcp-patch-resolver.py b/servers/patchkit_mcp/src/mcp-patch-resolver.py new file mode 100644 index 0000000000000000000000000000000000000000..2565cc7c21cd6cd568774bad54da57e4a058c905 --- /dev/null +++ b/servers/patchkit_mcp/src/mcp-patch-resolver.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +AI驱动的Patch冲突解决MCP Server +使用patchkit工具进行分步处理 +""" + +import argparse +import json +import os +import subprocess +import tempfile +import shutil +import csv +from pathlib import Path +from typing import List, Dict, Optional, Tuple +from datetime import datetime +from mcp.server.fastmcp import FastMCP + +# Initialize FastMCP server +mcp = FastMCP("AI驱动的Patch冲突解决工具", log_level="INFO") + +# 全局变量存储AI模型配置 +model_url = "" +model_api_key = "" +model_name = "" + +def _run_patchkit_command(args: List[str]) -> Tuple[int, str, str]: + """执行patchkit命令""" + try: + # 获取patchkit脚本路径 + script_dir = os.path.dirname(os.path.abspath(__file__)) + patchkit_path = os.path.join(script_dir, "patchkit.py") + + # 添加AI模型参数 + cmd = ["python3", patchkit_path] + if model_url: + cmd.extend(["--model_url", model_url]) + if model_api_key: + cmd.extend(["--api_key", model_api_key]) + if model_name: + cmd.extend(["--model_name", model_name]) + cmd.extend(args) + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=600 # 10分钟超时 + ) + return result.returncode, result.stdout, result.stderr + except subprocess.TimeoutExpired: + return -1, "", "命令执行超时" + except Exception as e: + return -1, "", str(e) + +@mcp.tool() +async def clone_repositories( + source_repo: str, + source_branch: str, + target_repo: str, + target_branch: str +) -> str: + """第一步:克隆源仓库和目标仓库 + + Args: + source_repo: 源仓库URL + source_branch: 源仓库分支 + target_repo: 目标仓库URL + target_branch: 目标仓库分支 + """ + try: + args = [ + "--action", "clone", + "--source_repo", source_repo, + "--source_branch", source_branch, + "--target_repo", target_repo, + "--target_branch", target_branch + ] + + returncode, stdout, stderr = _run_patchkit_command(args) + + if returncode == 0: + return f"[Success]仓库克隆成功\n{stdout}" + else: + return f"[Fail]仓库克隆失败\n{stderr}" + + except Exception as e: + return f"[Fail]执行过程中出错: {e}" + +@mcp.tool() +async def generate_patches_and_mapping( + commit_mapping: str +) -> str: + """第二步:生成patch文件并建立映射关系 + + Args: + commit_mapping: commit映射的JSON字符串,格式为: + [ + { + "commit_message": "commit描述", + "source_commit": "原始仓库commit id", + "target_commit": "合入仓库commit id (可选)" + } + ] + """ + try: + # 验证JSON格式 + commits = json.loads(commit_mapping) + if not isinstance(commits, list): + return "[Fail]commit_mapping必须是JSON数组格式" + + args = [ + "--action", "generate", + "--commit_list", commit_mapping + ] + + returncode, stdout, stderr = _run_patchkit_command(args) + + if returncode == 0: + return f"[Success]patch文件生成和映射关系建立成功\n{stdout}" + else: + return f"[Fail]patch文件生成失败\n{stderr}" + + except json.JSONDecodeError as e: + return f"[Fail]JSON解析失败: {e}" + except Exception as e: + return f"[Fail]执行过程中出错: {e}" + +@mcp.tool() +async def resolve_single_commit( + source_commit: str +) -> str: + """第三步:解决指定commit的冲突 + + Args: + source_commit: 要解决的源commit ID + """ + try: + args = [ + "--action", "resolve", + "--source_commit", source_commit + ] + + returncode, stdout, stderr = _run_patchkit_command(args) + + if returncode == 0: + return f"[Success]冲突解决成功\n{stdout}" + else: + return f"[Fail]冲突解决失败\n{stderr}" + + except Exception as e: + return f"[Fail]执行过程中出错: {e}" + +@mcp.tool() +async def get_patchkit_status() -> str: + """获取patchkit当前状态""" + try: + args = ["--action", "status"] + + returncode, stdout, stderr = _run_patchkit_command(args) + + if returncode == 0: + return f"[Success]状态信息\n{stdout}" + else: + return f"[Fail]获取状态失败\n{stderr}" + + except Exception as e: + return f"[Fail]执行过程中出错: {e}" + +@mcp.tool() +async def resolve_patch_conflicts_batch( + source_repo: str, + source_branch: str, + target_repo: str, + target_branch: str, + commit_mapping: str, + output_file: str = "patch_resolution_report.csv" +) -> str: + """批量解决patch冲突(完整流程) + + Args: + source_repo: 原始仓库URL + source_branch: 原始仓库分支 + target_repo: 合入仓库URL + target_branch: 合入仓库分支 + commit_mapping: commit映射的JSON字符串 + output_file: 输出报告文件名 + """ + try: + # 解析commit映射 + commits = json.loads(commit_mapping) + if not isinstance(commits, list): + return "[Fail]commit_mapping必须是JSON数组格式" + + # 第一步:克隆仓库 + print("第一步:克隆仓库...") + clone_result = await clone_repositories(source_repo, source_branch, target_repo, target_branch) + if not clone_result.startswith("[Success]"): + return clone_result + + # 第二步:生成patch和映射 + print("第二步:生成patch文件...") + generate_result = await generate_patches_and_mapping(commit_mapping) + if not generate_result.startswith("[Success]"): + return generate_result + + # 第三步:逐个解决冲突 + print("第三步:解决冲突...") + results = [] + + for commit_info in commits: + source_commit = commit_info.get("source_commit", "") + commit_message = commit_info.get("commit_message", "") + + print(f"解决commit: {commit_message} ({source_commit})") + resolve_result = await resolve_single_commit(source_commit) + + result = { + "commit_message": commit_message, + "source_commit": source_commit, + "target_commit": commit_info.get("target_commit", ""), + "status": "成功" if resolve_result.startswith("[Success]") else "失败", + "details": resolve_result + } + results.append(result) + + # 生成报告 + report_path = os.path.abspath(output_file) + with open(report_path, 'w', newline='', encoding='utf-8') as csvfile: + fieldnames = ['commit_message', 'source_commit', 'target_commit', 'status', 'details'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + writer.writeheader() + for result in results: + writer.writerow(result) + + # 统计结果 + total = len(results) + success_count = sum(1 for r in results if r["status"] == "成功") + failed_count = total - success_count + + summary = f""" +批量处理完成! + +总计: {total} 个commit +成功: {success_count} 个 +失败: {failed_count} 个 + +详细报告已保存到: {report_path} +""" + + return f"[Success]{summary}" + + except json.JSONDecodeError as e: + return f"[Fail]JSON解析失败: {e}" + except Exception as e: + return f"[Fail]处理过程中出错: {e}" + +if __name__ == "__main__": + # Parse command line arguments + parser = argparse.ArgumentParser(description='AI驱动的Patch冲突解决MCP Server') + parser.add_argument('--model_url', required=True, help='Model url') + parser.add_argument('--api_key', required=True, help='API key for the vendor') + parser.add_argument('--model_name', required=True, help='Model name') + args = parser.parse_args() + + # 设置全局变量 + model_url = args.model_url + model_api_key = args.api_key + model_name = args.model_name + + # Initialize and run the server + mcp.run(transport='stdio') \ No newline at end of file diff --git a/servers/patchkit_mcp/src/patchkit.py b/servers/patchkit_mcp/src/patchkit.py new file mode 100644 index 0000000000000000000000000000000000000000..ac01a20276de5c135c157f032d4682a40708ad8c --- /dev/null +++ b/servers/patchkit_mcp/src/patchkit.py @@ -0,0 +1,637 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +PatchKit - AI驱动的Patch冲突解决工具 +支持分步处理patch冲突,包括代码拉取、patch生成、冲突解决等 +""" + +import argparse +import json +import os +import subprocess +import shutil +import csv +from pathlib import Path +from typing import List, Dict, Optional, Tuple +from datetime import datetime +from openai import OpenAI + +# 全局配置 +SYSTEM_CONTENT = """你是一名资深的Linux内核开发专家,精通Git操作和patch冲突解决。 +你的任务是分析patch冲突并提供解决方案。请仔细分析冲突内容,理解代码逻辑, +然后提供正确的合并方案。如果冲突无法自动解决,请明确说明原因。""" + +class PatchKit: + def __init__(self, model_url: str = "", model_api_key: str = "", model_name: str = ""): + self.model_url = model_url + self.model_api_key = model_api_key + self.model_name = model_name + self.work_dir = self._get_work_directory() + self.mapping_file = os.path.join(self.work_dir, "commit_mapping.json") + + def _get_work_directory(self) -> str: + """获取工作目录 ~/.patchkit""" + home_dir = os.path.expanduser("~") + work_dir = os.path.join(home_dir, ".patchkit") + os.makedirs(work_dir, exist_ok=True) + return work_dir + + def _is_repository_valid(self, repo_dir: str) -> bool: + """检查仓库是否有效""" + if not os.path.exists(repo_dir): + return False + + git_dir = os.path.join(repo_dir, ".git") + if not os.path.exists(git_dir): + return False + + # 检查git状态 + cmd = ["git", "status"] + returncode, stdout, stderr = self._run_git_command(cmd, repo_dir) + return returncode == 0 + + def _run_git_command(self, cmd: List[str], cwd: str = None) -> Tuple[int, str, str]: + """执行Git命令""" + try: + result = subprocess.run( + cmd, + cwd=cwd, + capture_output=True, + text=True, + timeout=300 + ) + return result.returncode, result.stdout, result.stderr + except subprocess.TimeoutExpired: + return -1, "", "命令执行超时" + except Exception as e: + return -1, "", str(e) + + def _call_llm(self, content: str) -> str: + """调用LLM API""" + if not self.model_api_key or not self.model_url: + return None + + try: + client = OpenAI(api_key=self.model_api_key, base_url=self.model_url) + response = client.chat.completions.create( + model=self.model_name, + messages=[ + {"role": "system", "content": SYSTEM_CONTENT}, + {"role": "user", "content": content} + ], + temperature=0.1 + ) + return response.choices[0].message.content + except Exception as e: + print(f"LLM API调用失败: {e}") + return None + + def _clone_repository(self, repo_url: str, branch: str, target_dir: str) -> bool: + """克隆仓库,如果已存在则跳过""" + print(f"检查仓库: {repo_url} 分支: {branch}") + + # 检查目标目录是否已存在且有效 + if self._is_repository_valid(target_dir): + print(f"仓库已存在且有效,跳过下载: {target_dir}") + return True + + # 如果目录存在但无效,删除它 + if os.path.exists(target_dir): + print(f"删除无效的仓库目录: {target_dir}") + shutil.rmtree(target_dir) + + # 创建目标目录 + os.makedirs(target_dir, exist_ok=True) + + # 克隆仓库 + print(f"开始克隆仓库: {repo_url} 分支: {branch}") + cmd = ["git", "clone", "-b", branch, repo_url, target_dir] + returncode, stdout, stderr = self._run_git_command(cmd) + + if returncode != 0: + print(f"克隆失败: {stderr}") + return False + + print(f"成功克隆到: {target_dir}") + return True + + def _get_commit_info(self, repo_dir: str, commit_id: str) -> Optional[Dict]: + """获取commit信息""" + cmd = ["git", "show", "--format=fuller", "--no-patch", commit_id] + returncode, stdout, stderr = self._run_git_command(cmd, repo_dir) + + if returncode != 0: + print(f"获取commit信息失败: {stderr}") + return None + + # 解析commit信息 + lines = stdout.split('\n') + commit_info = {} + + for line in lines: + if line.startswith('commit '): + commit_info['id'] = line.split()[1] + elif line.startswith('Author: '): + commit_info['author'] = line[8:] + elif line.startswith('Date: '): + commit_info['date'] = line[6:] + elif line.startswith(' '): + if 'message' not in commit_info: + commit_info['message'] = line[4:] + else: + commit_info['message'] += '\n' + line[4:] + + return commit_info + + def _create_patch(self, repo_dir: str, commit_id: str, patch_file: str) -> bool: + """创建patch文件""" + # 记录生成前的文件列表 + patch_dir = os.path.dirname(patch_file) + existing_files = set(os.listdir(patch_dir)) if os.path.exists(patch_dir) else set() + + cmd = ["git", "format-patch", "-1", commit_id, "-o", patch_dir] + returncode, stdout, stderr = self._run_git_command(cmd, repo_dir) + + if returncode != 0: + print(f"创建patch失败: {stderr}") + return False + + # 找到新生成的文件 + current_files = set(os.listdir(patch_dir)) + new_files = current_files - existing_files + + if not new_files: + print(f"未找到新生成的patch文件") + return False + + # 取第一个新生成的文件 + new_file = list(new_files)[0] + generated_patch = os.path.join(patch_dir, new_file) + print(f"找到新生成的patch文件: {new_file}") + + # 重命名到目标文件名 + shutil.move(generated_patch, patch_file) + print(f"patch文件已保存到: {patch_file}") + return True + + def _reset_to_commit(self, repo_dir: str, commit_id: str) -> bool: + """重置到指定commit""" + cmd = ["git", "reset", "--hard", commit_id] + returncode, stdout, stderr = self._run_git_command(cmd, repo_dir) + + if returncode != 0: + print(f"重置失败: {stderr}") + return False + + print(f"成功重置到commit: {commit_id}") + return True + + def _ai_apply_patch(self, target_dir: str, patch_file: str) -> Tuple[bool, str]: + """使用AI理解patch意图并应用""" + print("开始使用AI理解patch意图并应用...") + + # 读取patch内容 + with open(patch_file, 'r', encoding='utf-8') as f: + patch_content = f.read() + + # 解析patch文件,找到要修改的文件 + import re + file_pattern = r'^diff --git a/(.+) b/\1' + files_to_modify = [] + + for line in patch_content.split('\n'): + match = re.match(file_pattern, line) + if match: + file_path = match.group(1) + files_to_modify.append(file_path) + + print(f"需要修改的文件: {files_to_modify}") + + if not files_to_modify: + return False, "无法解析patch文件中的文件路径" + + # 为每个文件应用修改 + for file_path in files_to_modify: + full_path = os.path.join(target_dir, file_path) + if not os.path.exists(full_path): + print(f"文件不存在: {full_path}") + continue + + print(f"处理文件: {file_path}") + + # 读取当前文件内容 + with open(full_path, 'r', encoding='utf-8') as f: + current_content = f.read() + + # 构建AI提示 + prompt = f""" +请帮我应用以下patch到文件中。 + +Patch内容: +{patch_content} + +当前文件内容: +{current_content} + +请根据patch内容修改文件,返回完整的修改后的文件内容。 +如果patch无法直接应用,请尝试理解patch的意图并进行相应的修改。 + +请只返回修改后的文件内容,不要包含其他解释。 +""" + + # 调用AI + ai_response = self._call_llm(prompt) + if not ai_response: + return False, f"AI调用失败,无法处理文件 {file_path}" + + # 应用AI的修改 + try: + with open(full_path, 'w', encoding='utf-8') as f: + f.write(ai_response) + + # 添加到暂存区 + cmd = ["git", "add", file_path] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + if returncode != 0: + return False, f"添加文件失败: {stderr}" + + except Exception as e: + return False, f"写入文件失败: {e}" + + # 提交结果 + cmd = ["git", "commit", "-m", "Applied patch with AI assistance"] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + if returncode == 0: + return True, "AI应用成功" + else: + return False, f"提交失败: {stderr}" + + def _commit_exists(self, repo_dir: str, commit_id: str) -> bool: + """检查commit是否存在""" + cmd = ["git", "rev-parse", "--quiet", "--verify", commit_id] + returncode, stdout, stderr = self._run_git_command(cmd, repo_dir) + return returncode == 0 + + def _reset_to_previous_commit(self, repo_dir: str, commit_id: str) -> bool: + """重置到指定commit的上一个commit""" + # 获取指定commit的上一个commit + cmd = ["git", "rev-parse", f"{commit_id}^"] + returncode, stdout, stderr = self._run_git_command(cmd, repo_dir) + + if returncode != 0: + print(f"获取上一个commit失败: {stderr}") + return False + + previous_commit = stdout.strip() + print(f"上一个commit: {previous_commit}") + + # 重置到上一个commit + cmd = ["git", "reset", "--hard", previous_commit] + returncode, stdout, stderr = self._run_git_command(cmd, repo_dir) + + if returncode != 0: + print(f"重置失败: {stderr}") + return False + + print(f"成功重置到commit {commit_id} 的上一个commit: {previous_commit}") + return True + + def _apply_patch_with_ai(self, target_dir: str, patch_file: str) -> Tuple[bool, str]: + """使用AI解决patch冲突""" + print(f"尝试应用patch: {patch_file}") + + # 首先尝试直接应用patch + cmd = ["git", "apply", "--check", patch_file] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + + if returncode == 0: + # 没有冲突,直接应用 + cmd = ["git", "apply", patch_file] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + if returncode == 0: + return True, "直接应用成功" + else: + return False, f"应用失败: {stderr}" + + # 有冲突,尝试使用3way模式应用 + print("检测到冲突,尝试使用3way模式应用...") + cmd = ["git", "apply", "--3way", patch_file] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + + # 检查应用后的状态 + cmd = ["git", "status", "--porcelain"] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + + if returncode != 0: + return False, f"获取状态失败: {stderr}" + + # 查找所有冲突文件(UU表示冲突,U表示未暂存) + conflict_files = [] + unstaged_files = [] + for line in stdout.split('\n'): + if line.startswith('UU '): + conflict_files.append(line[3:]) + elif line.startswith('U '): + unstaged_files.append(line[2:]) + + print(f"发现冲突文件: {conflict_files}") + print(f"发现未暂存文件: {unstaged_files}") + + # 如果没有冲突文件,但有未暂存文件,说明部分应用成功 + if not conflict_files and unstaged_files: + print("部分应用成功,添加未暂存文件...") + for file in unstaged_files: + cmd = ["git", "add", file] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + if returncode != 0: + return False, f"添加文件失败: {stderr}" + + # 提交结果 + cmd = ["git", "commit", "-m", "Applied patch with AI assistance"] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + if returncode == 0: + return True, "AI辅助应用成功" + else: + return False, f"提交失败: {stderr}" + + # 如果3way应用失败且没有冲突文件,说明patch无法直接应用 + # 这种情况下,我们需要让AI理解patch的意图并手动应用 + if not conflict_files and not unstaged_files: + print("3way应用失败,使用AI理解patch意图并应用...") + return self._ai_apply_patch(target_dir, patch_file) + + if not conflict_files: + return False, "未找到冲突文件,但patch应用可能失败" + + # 读取patch内容 + with open(patch_file, 'r', encoding='utf-8') as f: + patch_content = f.read() + + # 为每个冲突文件调用AI + for conflict_file in conflict_files: + file_path = os.path.join(target_dir, conflict_file) + if not os.path.exists(file_path): + continue + + # 读取冲突文件内容 + with open(file_path, 'r', encoding='utf-8') as f: + file_content = f.read() + + # 构建AI提示 + prompt = f""" +请帮我解决以下Git patch冲突。 + +Patch文件内容: +{patch_content} + +冲突文件内容: +{file_content} + +请分析冲突并提供解决方案。如果能够自动解决,请提供完整的合并后的文件内容。 +如果无法自动解决,请说明原因。 + +请只返回合并后的文件内容,不要包含其他解释。 +""" + + # 调用AI + ai_response = self._call_llm(prompt) + if not ai_response: + return False, "AI调用失败" + + # 应用AI的解决方案 + try: + with open(file_path, 'w', encoding='utf-8') as f: + f.write(ai_response) + + # 添加到暂存区 + cmd = ["git", "add", conflict_file] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + if returncode != 0: + return False, f"添加文件失败: {stderr}" + + except Exception as e: + return False, f"写入文件失败: {e}" + + # 提交解决结果 + cmd = ["git", "commit", "-m", "AI resolved patch conflicts"] + returncode, stdout, stderr = self._run_git_command(cmd, target_dir) + + if returncode == 0: + return True, "AI成功解决冲突" + else: + return False, f"提交失败: {stderr}" + + def _load_mapping(self) -> Dict: + """加载commit映射关系""" + if os.path.exists(self.mapping_file): + try: + with open(self.mapping_file, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f"加载映射文件失败: {e}") + return {} + + def _save_mapping(self, mapping: Dict): + """保存commit映射关系""" + try: + with open(self.mapping_file, 'w', encoding='utf-8') as f: + json.dump(mapping, f, indent=2, ensure_ascii=False) + print(f"映射关系已保存到: {self.mapping_file}") + except Exception as e: + print(f"保存映射文件失败: {e}") + + def step1_clone_repositories(self, source_repo: str, source_branch: str, + target_repo: str, target_branch: str) -> bool: + """第一步:克隆源仓库和目标仓库""" + print("=== 第一步:克隆仓库 ===") + + source_dir = os.path.join(self.work_dir, "source") + target_dir = os.path.join(self.work_dir, "target") + + # 克隆源仓库 + if not self._clone_repository(source_repo, source_branch, source_dir): + print("克隆源仓库失败") + return False + + # 克隆目标仓库 + if not self._clone_repository(target_repo, target_branch, target_dir): + print("克隆目标仓库失败") + return False + + print("仓库克隆完成") + return True + + def step2_generate_patches_and_mapping(self, commit_list: List[Dict]) -> bool: + """第二步:生成patch文件并建立映射关系""" + print("=== 第二步:生成patch文件并建立映射关系 ===") + + source_dir = os.path.join(self.work_dir, "source") + patches_dir = os.path.join(self.work_dir, "patches") + os.makedirs(patches_dir, exist_ok=True) + + # 加载现有映射 + mapping = self._load_mapping() + + for commit_info in commit_list: + commit_message = commit_info.get("commit_message", "") + source_commit = commit_info.get("source_commit", "") + target_commit = commit_info.get("target_commit", "") + + print(f"处理commit: {commit_message}") + print(f"源commit: {source_commit}") + print(f"目标commit: {target_commit}") + + # 生成patch文件 + patch_file = os.path.join(patches_dir, f"{source_commit}.patch") + if not self._create_patch(source_dir, source_commit, patch_file): + print(f"为commit {source_commit} 生成patch失败") + continue + + # 建立映射关系 + mapping[source_commit] = { + "commit_message": commit_message, + "source_commit": source_commit, + "target_commit": target_commit, + "patch_file": patch_file, + "status": "pending" + } + + # 保存映射关系 + self._save_mapping(mapping) + print("patch文件生成和映射关系建立完成") + return True + + def step3_resolve_conflicts(self, source_commit: str) -> Tuple[bool, str]: + """第三步:解决指定commit的冲突""" + print(f"=== 第三步:解决commit {source_commit} 的冲突 ===") + + target_dir = os.path.join(self.work_dir, "target") + mapping = self._load_mapping() + + if source_commit not in mapping: + return False, f"未找到commit {source_commit} 的映射关系" + + commit_info = mapping[source_commit] + target_commit = commit_info.get("target_commit", "") + patch_file = commit_info.get("patch_file", "") + + if not os.path.exists(patch_file): + return False, f"patch文件不存在: {patch_file}" + + # 如果目标仓库已有解决的commit,先回退到该commit的上一个commit + if target_commit: + print(f"目标仓库已有commit: {target_commit},检查是否需要回退") + # 检查目标commit是否存在 + if self._commit_exists(target_dir, target_commit): + print(f"目标commit存在,回退到上一个commit") + if not self._reset_to_previous_commit(target_dir, target_commit): + return False, f"回退到commit {target_commit} 的上一个commit失败" + else: + print(f"目标commit不存在,跳过回退步骤") + + # 使用AI应用patch + success, details = self._apply_patch_with_ai(target_dir, patch_file) + + # 更新映射状态 + if success: + mapping[source_commit]["status"] = "resolved" + mapping[source_commit]["resolution_time"] = datetime.now().isoformat() + mapping[source_commit]["resolution_details"] = details + else: + mapping[source_commit]["status"] = "failed" + mapping[source_commit]["error_details"] = details + + self._save_mapping(mapping) + + return success, details + + def get_status(self) -> Dict: + """获取当前状态""" + mapping = self._load_mapping() + source_dir = os.path.join(self.work_dir, "source") + target_dir = os.path.join(self.work_dir, "target") + + status = { + "work_directory": self.work_dir, + "source_repo_exists": self._is_repository_valid(source_dir), + "target_repo_exists": self._is_repository_valid(target_dir), + "mapping_file": self.mapping_file, + "total_commits": len(mapping), + "pending_commits": sum(1 for c in mapping.values() if c.get("status") == "pending"), + "resolved_commits": sum(1 for c in mapping.values() if c.get("status") == "resolved"), + "failed_commits": sum(1 for c in mapping.values() if c.get("status") == "failed"), + "commits": mapping + } + + return status + +def main(): + parser = argparse.ArgumentParser(description='PatchKit - AI驱动的Patch冲突解决工具') + parser.add_argument('--model_url', help='Model URL') + parser.add_argument('--api_key', help='API key') + parser.add_argument('--model_name', help='Model name') + parser.add_argument('--action', required=True, + choices=['clone', 'generate', 'resolve', 'status'], + help='执行的操作') + + # clone操作的参数 + parser.add_argument('--source_repo', help='源仓库URL') + parser.add_argument('--source_branch', help='源仓库分支') + parser.add_argument('--target_repo', help='目标仓库URL') + parser.add_argument('--target_branch', help='目标仓库分支') + + # generate操作的参数 + parser.add_argument('--commit_list', help='commit列表的JSON字符串') + + # resolve操作的参数 + parser.add_argument('--source_commit', help='要解决的源commit ID') + + args = parser.parse_args() + + # 创建PatchKit实例 + patchkit = PatchKit(args.model_url, args.api_key, args.model_name) + + if args.action == 'clone': + if not all([args.source_repo, args.source_branch, args.target_repo, args.target_branch]): + print("clone操作需要提供所有仓库参数") + return 1 + + success = patchkit.step1_clone_repositories( + args.source_repo, args.source_branch, + args.target_repo, args.target_branch + ) + return 0 if success else 1 + + elif args.action == 'generate': + if not args.commit_list: + print("generate操作需要提供commit_list参数") + return 1 + + try: + commit_list = json.loads(args.commit_list) + success = patchkit.step2_generate_patches_and_mapping(commit_list) + return 0 if success else 1 + except json.JSONDecodeError as e: + print(f"JSON解析失败: {e}") + return 1 + + elif args.action == 'resolve': + if not args.source_commit: + print("resolve操作需要提供source_commit参数") + return 1 + + success, details = patchkit.step3_resolve_conflicts(args.source_commit) + if success: + print(f"冲突解决成功: {details}") + return 0 + else: + print(f"冲突解决失败: {details}") + return 1 + + elif args.action == 'status': + status = patchkit.get_status() + print(json.dumps(status, indent=2, ensure_ascii=False)) + return 0 + +if __name__ == "__main__": + exit(main()) diff --git a/servers/patchkit_mcp/src/requirements.txt b/servers/patchkit_mcp/src/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..03de40eca8e6ae2a760997ea3ecf69a676ffddcb --- /dev/null +++ b/servers/patchkit_mcp/src/requirements.txt @@ -0,0 +1,3 @@ +mcp +openai +pyyaml