diff --git a/backend/artifacts/methods/mcp_methods.py b/backend/artifacts/methods/mcp_methods.py index 0ddcbb6e8a2ed962ef9b6d3de4d7557f0056435e..440d2cabbb0c60ef8bad7611d42cd21422bcd8b4 100644 --- a/backend/artifacts/methods/mcp_methods.py +++ b/backend/artifacts/methods/mcp_methods.py @@ -11,121 +11,399 @@ # See the Mulan PSL v2 for more details. # Create: 2025-07-30 # ====================================================================================================================== - +import base64 import glob import gzip +import json +import os import shutil +from typing import Any, Dict, List, Optional, Tuple from xml.etree import ElementTree - +import zstandard as zstd +from rest_framework import status from artifacts.models import MCPServer -from artifacts.utils import clear_table -from utils.cmd_executor import CommandExecutor +from artifacts.serializers import MCPBulkCreateSerializer +from artifacts.tasks.mcp_action_task import MCPPackageTask +from artifacts.utils import clear_table,check_system_rpm_installed +from constants.paths import CACHE_DIR, MCP_BASE_DIR +from tasks.scheduler import scheduler +from utils.common import is_process_running from utils.logger import init_log +from utils.cmd_executor import CommandExecutor from utils.time import timestamp2local logger = init_log('run.log') - class MCPMethods: - + @staticmethod - def sync_mcps(): + def sync_mcps() -> Dict[str, Any]: """同步MCP服务信息""" - # 更新 MCP 服务的信息 - update_result, msg = MCPMethods._update_mcp_info() - if not update_result: - return { 'is_success': False, 'message': msg } - # 读取 MCP 服务的信息 - mcp_data, msg = MCPMethods._read_mcp_info() - if not mcp_data: - return { 'is_success': False, 'message': msg } - # 将 MCP 服务的信息存入数据库中 - from artifacts.serializers import MCPBulkCreateSerializer - serializer = MCPBulkCreateSerializer(data=mcp_data, many=True) - clear_table(MCPServer._meta.db_table) - if not serializer.is_valid(): - logger.error(f"Failed to validate MCP data, errors: {serializer.errors}") - return { 'is_success': False, 'message': serializer.errors } - mcps = serializer.save() - msg = "Sync MCP data successfully." - logger.info(msg) - return { 'is_success': True, 'message': msg } + try: + mcp_data, message = MCPMethods._read_mcp_info() + if mcp_data is None: + return {'is_success': False, 'message': message} - @staticmethod - def _update_mcp_info(): - """更新MCP仓库信息""" - cmd = ['yum', 'makecache', '--disablerepo=*', '--enablerepo=mcp'] - logger.info(f"Start to execute command [{' '.join(cmd)}].") - cmd_executor = CommandExecutor(cmd) - _, stderr, code = cmd_executor.run() - if code != 0: - msg = f"Failed to execute command: [{' '.join(cmd)}]. Error: {stderr}" - logger.error(msg) - return False, msg - msg = 'Update MCP repo successfully.' - logger.info(msg) - return True, msg + clear_table(MCPServer._meta.db_table) + + if mcp_data: + serializer = MCPBulkCreateSerializer(data=mcp_data, many=True) + if not serializer.is_valid(): + return {'is_success': False, 'message': serializer.errors} + + mcps = serializer.save() + msg = f"Successfully synced {len(mcps)} MCP packages" + else: + msg = "No MCP packages found, database cleared" + + return {'is_success': True, 'message': msg} + + except Exception as e: + logger.error(f"Sync MCP failed: {str(e)}") + return {'is_success': False, 'message': f'Sync failed: {str(e)}'} @staticmethod - def _read_mcp_info(): + def _read_mcp_info() -> Tuple[Optional[List[Dict]], str]: """读取MCP信息并生成数据结构""" - # 匹配 primary.xml.gz 文件 - logger.info('Start to match MCP meta file.') - pattern = "/var/cache/dnf/mcp-*/repodata/*-primary.xml.gz" - matches = glob.glob(pattern) + try: + cached_folders = MCPMethods.get_packages_info(CACHE_DIR) + remote_packages = MCPMethods._parse_all_primary_xml() + if not remote_packages: + return [], "No MCP packages found in any repodata" + + all_packages = [] + packages_to_update = [] + + # 创建name_version键列表 + remote_pkg_keys = [f"{pkg['name']}_{pkg['version']}" for pkg in remote_packages] + + # 转换为集合以便快速查找 + remote_pkg_set = set(remote_pkg_keys) + cached_folders_set = set(cached_folders) + + # 清理本地有但远程没有的包 + folders_to_delete = cached_folders_set - remote_pkg_set + for folder_name in folders_to_delete: + cache_path = os.path.join(CACHE_DIR, folder_name) + if os.path.exists(cache_path): + try: + shutil.rmtree(cache_path, ignore_errors=True) + logger.info(f"Deleted obsolete cache folder: {folder_name}") + if folder_name in cached_folders: + cached_folders.remove(folder_name) + cached_folders_set.remove(folder_name) + except Exception as delete_error: + logger.warning(f"Failed to delete {cache_path}: {delete_error}") + + # 分类处理包 + for pkg in remote_packages: + pkg_folder_name = f"{pkg['name']}_{pkg['version']}" + + if pkg_folder_name in cached_folders_set: + all_packages.append(pkg) + else: + packages_to_update.append(pkg) + + if packages_to_update: + MCPMethods._process_packages_batch(packages_to_update) + all_packages.extend(packages_to_update) + + for pkg in all_packages: + MCPMethods._read_package_resources(pkg) + if not all_packages: + return [], "No MCP packages found or updated" + else: + return all_packages, 'Generate MCP service data successfully.' + + except Exception as e: + logger.error(f"Failed to read MCP info: {str(e)}") + return None, f"Failed to read MCP info: {str(e)}" + + @staticmethod + def get_packages_info(directory: str) -> List[str]: + packages = [] + if not os.path.exists(directory): + return packages + for item in os.listdir(directory): + if os.path.isdir(os.path.join(directory, item)): + packages.append(item) + return packages + + @staticmethod + def _parse_all_primary_xml() -> List[Dict[str, Any]]: + """从所有repodata解析MCP包""" + matches = glob.glob("/var/cache/dnf/*/repodata/*-primary.xml.*") if not matches: - msg = "No match for *-primary.xml.gz." - logger.error(msg) - return [], msg - primary_file = matches[0] - logger.info(f"The MCP meta file: {primary_file}") - - # 开始解压 primary.xml.gz - logger.info(f'Start to extract {primary_file}') - output_file = primary_file.rstrip('.gz') + logger.error("No primary.xml files found in any repodata") + return [] + + logger.info(f"Found {len(matches)} repodata files to scan") + + all_packages = {} + for xml_file in matches: + logger.info(f"Scanning repodata file: {xml_file}") + packages = MCPMethods._parse_single_primary_xml(xml_file) + for pkg in packages: + pkg_name = pkg['name'] + pkg_version = pkg['version'] + if pkg_name not in all_packages: + all_packages[pkg_name] = {} + all_packages[pkg_name][pkg_version] = pkg + + final_packages = [] + for pkg_versions in all_packages.values(): + final_packages.extend(pkg_versions.values()) + + logger.info(f"Found {len(final_packages)} packages across all repodata") + return final_packages + + @staticmethod + def _parse_single_primary_xml(primary_file: str) -> List[Dict[str, Any]]: + """解析单个primary.xml文件,只提取MCP包""" + output_file = primary_file.replace('.gz', '').replace('.zst', '') + + # 解压文件 - 支持gzip和zstd格式 try: - with gzip.open(primary_file, 'rb') as f_in: - with open(output_file, 'wb') as f_out: + if primary_file.endswith('.gz'): + with gzip.open(primary_file, 'rb') as f_in, open(output_file, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) - logger.info(f"extract successfully: {primary_file} -> {output_file}") - except PermissionError: - msg = "No permission." - logger.error(msg) - return [], msg - except gzip.BadGzipFile: - msg = f"{primary_file} is not a valid gzip format." - logger.error(msg) - return [], msg - - # 读取 primary.xml 文件,生成 MCP 服务信息列表 - logger.info("Start to read MCP information files and generate MCP data.") - tree = ElementTree.parse(output_file) - root = tree.getroot() - namespace = {'common': 'http://linux.duke.edu/metadata/common'} - mcp_data = [] - for package in root.findall('common:package', namespace): - mcp_info = {} - package_name = package.find('common:name', namespace).text.strip() - if package_name == 'mcp-servers': - continue - else: - mcp_info['package_name'] = package_name - mcp_info['name'] = package_name.removeprefix('mcp-servers-') - version = package.find('common:version', namespace) - mcp_info['version'] = f"{version.get('ver')}-{version.get('rel')}" # 暂未考虑epoch - timestamp = package.find('common:time', namespace).get('file') - mcp_info['updated_at'] = timestamp2local(int(timestamp)) - mcp_info['key'] = mcp_info['name'] + '_' + mcp_info['version'] - mcp_info['description'] = dict() - mcp_info['description']['default'] = package.find('common:description', namespace).text - mcp_info['size'] = int(package.find('common:size', namespace).get('package')) - mcp_info['repo'] = package.find('common:url', namespace).text - mcp_data.append(mcp_info) - - if not mcp_data: - msg = f"Failed to read mcp information." - logger.error(msg) - return [], msg - msg = 'Generate MCP service data successfully.' - logger.info(msg) - return mcp_data, msg + elif primary_file.endswith('.zst'): + dctx = zstd.ZstdDecompressor() + with open(primary_file, 'rb') as f_in, open(output_file, 'wb') as f_out: + dctx.copy_stream(f_in, f_out) + except Exception as e: + logger.error(f"Failed to extract {primary_file}: {e}") + return [] + + packages = [] + try: + tree = ElementTree.parse(output_file) + root = tree.getroot() + namespace = {'common': 'http://linux.duke.edu/metadata/common'} + + for package in root.findall('common:package', namespace): + package_name_elem = package.find('common:name', namespace) + package_name = package_name_elem.text.strip() + if not package_name.startswith('mcp-servers') or package_name == 'mcp-servers': + continue + + try: + version_elem = package.find('common:version', namespace) + time_elem = package.find('common:time', namespace) + desc_elem = package.find('common:description', namespace) + url_elem = package.find('common:url', namespace) + arch_elem = package.find('common:arch', namespace) + + name = package_name.removeprefix('mcp-servers-') + ver = version_elem.get('ver') + rel = version_elem.get('rel') + clean_rel = rel.split('.')[0] if rel else "1" + version_str = f"{ver}-{clean_rel}" + arch = arch_elem.text if arch_elem is not None else 'noarch' + download_tag = f"{package_name}-{ver}-{rel}.{arch}" + + packages.append({ + 'package_name': package_name, + 'name': name, + 'version': version_str, + 'updated_at': timestamp2local(int(time_elem.get('file'))), + 'key': f"{name}_{version_str}", + 'description': {'default': desc_elem.text if desc_elem is not None and desc_elem.text else ''}, + 'url': url_elem.text if url_elem is not None else '' , + 'download_tag': download_tag + }) + except Exception as e: + logger.error(f"Failed to parse {package_name}: {e}") + + except Exception as e: + logger.error(f"Failed to parse XML {output_file}: {e}") + finally: + # 清理临时文件 + if os.path.exists(output_file) and output_file != primary_file: + try: + os.remove(output_file) + except: + pass + + return packages + + @staticmethod + def _process_packages_batch(packages: List[Dict]) -> List[Dict]: + """批量处理需要更新的包""" + # 批量下载 + download_tags = [pkg['download_tag'] for pkg in packages if 'download_tag' in pkg] + rpm_dir = os.path.join(MCP_BASE_DIR, "rpm_packages") + os.makedirs(rpm_dir, exist_ok=True) + + cmd = ['dnf', 'download', '--downloaddir', rpm_dir] + download_tags + executor = CommandExecutor(cmd, timeout=300) + stdout, stderr, returncode = executor.run() + if returncode != 0: + logger.error(f"Download failed: {stderr}") + return [] + + processed = [] + for pkg in packages: + try: + # 找RPM文件 + rpm_file = None + + for filename in os.listdir(rpm_dir): + if (filename.startswith(pkg['package_name']) and pkg['version'] in filename and filename.endswith('.rpm')): + rpm_file = os.path.join(rpm_dir, filename) + break + if not rpm_file: + continue + + # 解压 + if MCPMethods._extract_rpm_package(pkg, rpm_file): + processed.append(pkg) + # 删除临时RPM文件 + try: + os.remove(rpm_file) + except Exception: + pass + + except Exception as e: + logger.error(f"Failed to process {pkg['name']}: {e}") + return processed + + @staticmethod + def _extract_rpm_package(pkg: Dict, rpm_file: str) -> bool: + """解压RPM包到缓存目录""" + # 使用包含版本信息的缓存目录 + cache_dir = os.path.join(CACHE_DIR, f"{pkg['name']}_{pkg['version']}") + + # 重建缓存目录 + if os.path.exists(cache_dir): + shutil.rmtree(cache_dir) + os.makedirs(cache_dir, exist_ok=True) + + # 解压RPM + cmd = ['bash', '-c', f"cd '{cache_dir}' && rpm2cpio '{rpm_file}' | cpio -idm --quiet './opt/*'"] + executor = CommandExecutor(cmd, timeout=120) + stdout, stderr, returncode = executor.run() + if returncode != 0: + logger.error(f"Download failed: {stderr}") + return [] + + @staticmethod + def _read_package_resources(pkg: Dict) -> bool: + """从缓存目录读取包的资源文件""" + cache_dir = os.path.join(CACHE_DIR, f"{pkg['name']}_{pkg['version']}") + + if not os.path.exists(cache_dir): + logger.error(f"Cache directory not found for {pkg['name']}: {cache_dir}") + return False + + readme = "" + icon = "" + mcp_config = {} + + # 读取资源文件 + base_path = os.path.join(cache_dir, 'opt', 'mcp-servers', 'servers') + + try: + if not os.path.exists(base_path): + logger.warning(f"Base path not found: {base_path}") + return False + + for server_dir in os.listdir(base_path): + server_path = os.path.join(base_path, server_dir, 'src') + + # 读取readme + readme_path = os.path.join(server_path, 'readme.md') + if os.path.exists(readme_path): + with open(readme_path, 'r', encoding='utf-8') as f: + readme = f.read() + logger.debug(f"Read README for {pkg['name']}") + + # 读取icon + icon_path = os.path.join(server_path, 'icon.png') + if os.path.exists(icon_path): + with open(icon_path, 'rb') as f: + icon = base64.b64encode(f.read()).decode('utf-8') + logger.debug(f"Read icon for {pkg['name']}") + + # 读取配置 + config_path = os.path.join(base_path, server_dir, 'mcp_config.json') + if os.path.exists(config_path): + with open(config_path, 'r', encoding='utf-8') as f: + mcp_config = json.load(f) + logger.debug(f"Read config for {pkg['name']}") + + break + + except Exception as e: + logger.error(f"Failed to read resources for {pkg['name']}: {e}") + return False + + # 更新包信息 + pkg['readme'] = readme + pkg['icon'] = icon + pkg['mcp_config'] = mcp_config + + return True + + @staticmethod + def mcp_package_action(key: str, action: str)-> Dict[str, Any]: + """执行MCP包操作(安装/卸载)""" + logger.info(f"Start {action} MCP package with key: {key}") + + try: + # 验证action参数 + if action not in ['install', 'uninstall']: + msg = f"Invalid action: {action}. Must be 'install' or 'uninstall'." + logger.error(msg) + return {'is_success': False, 'message': msg, 'status_code': status.HTTP_400_BAD_REQUEST} + + # 查询包信息 + try: + mcp_server = MCPServer.objects.get(key=key) + except MCPServer.DoesNotExist: + msg = f"MCP package with key [{key}] does not exist." + logger.error(msg) + return {'is_success': False, 'message': msg, 'status_code': status.HTTP_404_NOT_FOUND} + + # 检查当前安装状态 + is_installed = check_system_rpm_installed(mcp_server.package_name) + + if action == 'install': + if is_installed: + msg = f"MCP package [{mcp_server.name}] is already installed" + logger.info(msg) + return {'is_success': True, 'message': msg, 'status_code': status.HTTP_200_OK} + + elif action == 'uninstall': + if not is_installed: + msg = f"MCP package [{mcp_server.name}] is not installed" + logger.info(msg) + return {'is_success': True, 'message': msg,'status_code': status.HTTP_200_OK} + + task_identifier = f"mcp_{action}_{key}_task" + + if is_process_running(task_identifier, timeout=600): + msg = f"MCP package [{mcp_server.name}] {action} task is already running." + logger.error(msg) + return {'is_success': False, "message": msg, 'status_code': status.HTTP_409_CONFLICT} + + # 启动任务 + logger.info(f"Start to run MCP package [{mcp_server.name}] {action}.") + mcp_task = MCPPackageTask(mcp_server, action, name=task_identifier) + scheduler.add_task(mcp_task) + + msg = f"MCP package [{mcp_server.name}] {action} started." + logger.info(msg) + return {'is_success': True, 'message': msg, 'task_name': mcp_task.name,'status_code': status.HTTP_202_ACCEPTED} + except Exception as e: + logger.error(f"Unexpected error in mcp_package_action for key [{key}]: {str(e)}") + return { + 'is_success': False, + 'message': f"Internal server error occurred while processing package [{key}]", + 'status_code': status.HTTP_500_INTERNAL_SERVER_ERROR + } + + + + + diff --git a/backend/artifacts/models.py b/backend/artifacts/models.py index de553cef0a12e2f41927a1539769812efd502336..476f3236689cb0df9b5a86fc1bd3daea167a03a6 100644 --- a/backend/artifacts/models.py +++ b/backend/artifacts/models.py @@ -68,4 +68,4 @@ class MCPServer(models.Model): description = JSONField("简介", default=dict, help_text="字典格式,key:语言,value:文本") readme = models.TextField("README文本", blank=True, null=True) icon = models.TextField("图标数据", blank=True, null=True) - app_list = JSONField("智能体应用列表", default=list, help_text="列表,每个元素包含name,status") + mcp_config = JSONField("MCP配置内容",default=dict, help_text="完整的 mcp_config.json 内容",blank=True, null=True) diff --git a/backend/artifacts/serializers.py b/backend/artifacts/serializers.py index 6346e27d9a02945d817eba5c3d4d8f3aa3ae8789..a25d41d0cabc4719160b302740f200a1d402a848 100644 --- a/backend/artifacts/serializers.py +++ b/backend/artifacts/serializers.py @@ -110,8 +110,8 @@ class MCPDetailSerializer(serializers.ModelSerializer): 'readme', 'icon', 'cmd_list', + 'mcp_config', 'installed_status', - 'app_list', ) @staticmethod @@ -120,7 +120,7 @@ class MCPDetailSerializer(serializers.ModelSerializer): @staticmethod def get_installed_status(obj): - if is_process_running(f'yum install -y {obj.package_name}'): + if is_process_running(f'dnf install -y --nogpgcheck {obj.package_name}'): return Task.Status.IN_PROCESS cmd = ['rpm', '-q', obj.package_name] cmd_executor = CommandExecutor(cmd) @@ -132,7 +132,7 @@ class MCPDetailSerializer(serializers.ModelSerializer): @staticmethod def get_cmd_list(obj): cmd_list = [ - f"sudo yum install -y {obj.package_name}" + f"sudo dnf install -y {obj.package_name}" ] return cmd_list @@ -199,7 +199,7 @@ class MCPBulkCreateSerializer(serializers.ModelSerializer): 'description', 'readme', 'icon', - 'app_list', + 'mcp_config', ) list_serializer_class = MCPListSerializer diff --git a/backend/artifacts/tasks/install_mcp_task.py b/backend/artifacts/tasks/install_mcp_task.py index b18f14904ebae3527e1c2b70f5d7a786d3fcad59..ad4bf7acc9cc612f878cc447c373fa4995d73f9a 100644 --- a/backend/artifacts/tasks/install_mcp_task.py +++ b/backend/artifacts/tasks/install_mcp_task.py @@ -24,7 +24,7 @@ class InstallMCPTask(BaseTask): self.pkg_name = pkg_name def run(self): - cmd = ['yum', 'install', '-y', self.pkg_name] + cmd = ['dnf', 'install', '-y', self.pkg_name] cmd_executor = CommandExecutor(cmd) _, stderr, code = cmd_executor.run() if code != 0: diff --git a/backend/artifacts/tasks/mcp_action_task.py b/backend/artifacts/tasks/mcp_action_task.py new file mode 100644 index 0000000000000000000000000000000000000000..443b2c2dbe733dd2c13602d499ee2deba5162103 --- /dev/null +++ b/backend/artifacts/tasks/mcp_action_task.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. +# oeDeploy is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# Create: 2025-08-14 +# ====================================================================================================================== +import logging +from typing import Any, Dict +from tasks.base_task import BaseTask, TaskExecuteError +from tasks.models import Task +from utils.cmd_executor import CommandExecutor +from artifacts.models import MCPServer +logger = logging.getLogger(__name__) + +class MCPPackageTask(BaseTask): + """执行MCP包操作任务(安装/卸载)""" + + def __init__(self, mcp_server: MCPServer, action: str, **kwargs: Dict[str, Any]) -> None: + task_type = Task.Type.MCP_INSTALL if action == 'install' else Task.Type.MCP_UNINSTALL + super().__init__(task_type=task_type, **kwargs) + self.mcp_server_name = mcp_server.name + self.package_name = mcp_server.package_name + self.action = action + def run(self) -> str: + """执行MCP包操作任务""" + logger.info(f"Start {self.action} MCP package [{self.mcp_server_name}]") + + try: + if self.action == 'install': + return self._install_package() + elif self.action == 'uninstall': + return self._uninstall_package() + else: + raise TaskExecuteError(f"Invalid action: {self.action}") + except TaskExecuteError: + raise + except Exception as e: + logger.error(f"Task execution failed: {str(e)}") + raise TaskExecuteError(f"Task execution failed: {str(e)}") + + def _install_package(self) -> str: + """执行安装操作""" + cmd_executor = CommandExecutor(['dnf', 'clean', 'all'], timeout=60) + cmd_executor.run() + # 执行安装 + return self._execute_dnf_command( + ['dnf', 'install', '-y', '--nogpgcheck', self.package_name], + f"install [{self.mcp_server_name}]" + ) + + def _uninstall_package(self) -> str: + """执行卸载操作""" + return self._execute_dnf_command( + ['dnf', 'remove', '-y', self.package_name], + f"uninstall [{self.mcp_server_name}]" + ) + + def _execute_dnf_command(self, cmd: list, operation: str) -> str: + """执行dnf命令的通用方法""" + cmd_executor = CommandExecutor(cmd, timeout=3600) + _, stderr, code = cmd_executor.run() + + if code != 0: + error_msg = f"Failed to {operation}: {stderr}" + logger.error(error_msg) + raise TaskExecuteError(error_msg) + + success_msg = f"Successfully {operation.replace('[', '').replace(']', '')}" + logger.info(success_msg) + return success_msg diff --git a/backend/artifacts/tasks/plugin_action_task.py b/backend/artifacts/tasks/plugin_action_task.py index ed8eb0008bc428504289f1d28d84f44a53ddce81..c976380ebe154b7f1f8a2d284b79e7c7f5acf382 100644 --- a/backend/artifacts/tasks/plugin_action_task.py +++ b/backend/artifacts/tasks/plugin_action_task.py @@ -15,7 +15,6 @@ import os import logging from typing import Any, Dict - from tasks.base_task import BaseTask, TaskExecuteError from tasks.models import Task from utils.cmd_executor import CommandExecutor @@ -75,3 +74,11 @@ class PluginActionTask(BaseTask): msg = f"Successfully run plugin [{self.plugin.name}] action [{self.action_name}]" logger.info(msg) return msg + + + + + + + + diff --git a/backend/artifacts/utils.py b/backend/artifacts/utils.py index baab4e51c1e824e45ed119ee1ec4c4e0088996e8..2bcb481e0f19756312e52e622ce3bbde032204c3 100644 --- a/backend/artifacts/utils.py +++ b/backend/artifacts/utils.py @@ -13,14 +13,16 @@ # ====================================================================================================================== import os +import subprocess import yaml from django.db import connection from tasks.models import Task +from artifacts.models import MCPServer from artifacts.serializers import PluginItemSerializer from constants.paths import PLUGIN_CACHE_DIR, LOG_DIR from utils.common import is_process_running +from utils.cmd_executor import CommandExecutor from utils.logger import init_log - logger = init_log('run.log') @@ -133,3 +135,17 @@ def get_devstore_log(): except Exception as e: logger.error(f"Unexpected error while reading log file {log_file}: {str(e)}") return f"读取日志文件时发生未知错误: {str(e)}" + +def check_system_rpm_installed(package_name: str) -> bool: + """检查RPM包是否已在系统中安装""" + try: + cmd = ['rpm', '-q', package_name] + cmd_executor = CommandExecutor(cmd, timeout=30) + _, _, code = cmd_executor.run() + if code == 0: + return True + else: + return False + except Exception: + return False + diff --git a/backend/artifacts/views.py b/backend/artifacts/views.py index 7aaca7e9ebf9842bdf600611ce121849db8b0255..6c38b3b0a18560b3e550915949766c7213fe2eff 100644 --- a/backend/artifacts/views.py +++ b/backend/artifacts/views.py @@ -64,6 +64,76 @@ class ArtifactViewSet(viewsets.GenericViewSet): logger.info(msg) return Response({'is_success': True, 'message': msg, 'time': data_time}, status=status.HTTP_200_OK) + @action(methods=['POST'], detail=False, url_path='sync-mcp') + def sync_mcp_only(self, request): + """单独同步MCP服务信息(新增接口)""" + logger.info("==== API: [POST] /v1.0/artifacts/sync-mcp/ ====") + + # 只同步MCP服务信息 + result = MCPMethods.sync_mcps() + if not result['is_success']: + return Response(result, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + msg = "Sync MCP data successfully." + logger.info(msg) + return Response({'is_success': True, 'message': msg}, status=status.HTTP_200_OK) + + + @action(methods=['POST'], detail=False) + @check_scheduler_load + def mcp_install(self, request): + """安装MCP包""" + logger.info(f"==== API: [POST] /v1.0/artifacts/mcp_install/ ====") + + # 获取必需参数 + key = request.query_params.get('key') + + # 参数验证 + if not key: + return Response( + {'is_success': False, 'message': f"Missing required parameter: {key}"}, + status=status.HTTP_400_BAD_REQUEST + ) + + logger.info(f"Installing MCP package for key: {key}") + + # 调用统一的MCP包管理方法 + result = MCPMethods.mcp_package_action(key, 'install') + + # 根据结果设置HTTP状态码 + status_code = result.get('status_code', status.HTTP_500_INTERNAL_SERVER_ERROR) + + return Response(result, status=status_code) + + + @action(methods=['POST'], detail=False) + @check_scheduler_load + def mcp_uninstall(self, request): + """卸载MCP包""" + logger.info(f"==== API: [POST] /v1.0/artifacts/mcp_uninstall/ ====") + + # 获取必需参数 + key = request.query_params.get('key') + + # 参数验证 + if not key: + return Response( + {'is_success': False, 'message': f"Missing required parameter: {key}"}, + status=status.HTTP_400_BAD_REQUEST + ) + + logger.info(f"Uninstalling MCP package for key: {key}") + + # 调用统一的MCP包管理方法 + result = MCPMethods.mcp_package_action(key, 'uninstall') + + # 根据结果设置HTTP状态码 + status_code = result.get('status_code', status.HTTP_500_INTERNAL_SERVER_ERROR) + + return Response(result, status=status_code) + + + def list(self, request): """获取插件和MCP服务列表 """ @@ -219,103 +289,3 @@ class ArtifactViewSet(viewsets.GenericViewSet): else: status_code, result = PluginMethods.get_plugin_log(key) return Response(result, status=status_code) - - @action(methods=['GET'], detail=True) - @check_scheduler_load - def install_mcp(self, request, pk): - """安装MCP服务 - """ - # TODO:同一时间只能安装一个 mcp-servers-xxx 的包 - # TODO:无法安装正在卸载的包 - logger.info(f'==== API: [GET] /v1.0/artifacts/{pk}/install_mcp/ ====') - # 查询 MCP 服务包信息 - logger.info("Start query MCP service package information by key.") - try: - mcp_service = MCPServer.objects.get(id=pk) - except MCPServer.DoesNotExist: - msg = f"The MCP service with key {pk} does not exist." - logger.error(msg) - return Response({ - 'is_success': False, - 'message': msg - }, status.HTTP_400_BAD_REQUEST) - logger.info("Query MCP service package successfully.") - - # 检查 MCP 服务包是否已经安装,后端进行二次校验 - pkg_name = mcp_service.package_name - logger.info(f"Start to check whether package {pkg_name} is installed.") - cmd = ['rpm', '-q', pkg_name] - cmd_executor = CommandExecutor(cmd) - _, _, code = cmd_executor.run() - if code == 0: - msg = f"{pkg_name} has been installed." - logger.info(msg) - return Response({ - 'is_success': True, - 'message': msg - }, status=status.HTTP_200_OK) - - # 安装 MCP 服务包 - logger.info(f"Start to install package {pkg_name}") - install_mcp_task = InstallMCPTask(pkg_name, name=f"install_{pkg_name}_task") - scheduler.add_task(install_mcp_task) - return Response({ - 'is_success': True, - 'message': f"Package {pkg_name} is being installed.", - 'task_name': install_mcp_task.name - }, status=status.HTTP_202_ACCEPTED) - - @action(methods=['GET'], detail=True) - def uninstall_mcp(self, request, pk): - """卸载MCP服务 - """ - # TODO:无法卸载正在安装的包 - logger.info(f"==== API: [GET] /v1.0/artifacts/{pk}/uninstall_mcp/ ====") - # 查询 MCP 服务包信息 - logger.info("Start query MCP service package information by key.") - try: - mcp_service = MCPServer.objects.get(id=pk) - except MCPServer.DoesNotExist: - msg = f"The MCP service with key {pk} does not exist." - logger.error(msg) - return Response({ - 'is_success': False, - 'message': msg - }, status.HTTP_400_BAD_REQUEST) - logger.info("Query MCP service package successfully.") - - # 检查 MCP 服务包是否已经安装,后端进行二次校验 - pkg_name = mcp_service.package_name - logger.info(f"Start to check whether package {pkg_name} is installed.") - cmd = ['rpm', '-q', pkg_name] - cmd_executor = CommandExecutor(cmd) - _, _, code = cmd_executor.run() - if code != 0: - msg = f"{pkg_name} isn't installed." - logger.info(msg) - return Response({ - 'is_success': True, - 'message': msg - }, status=status.HTTP_200_OK) - - # 卸载 MCP 服务包 - logger.info(f"Start to uninstall package {pkg_name}") - cmd = ['yum', 'remove', '-y', pkg_name] - cmd_executor = CommandExecutor(cmd) - _, stderr, code = cmd_executor.run() - if code != 0: - logger.error(f"Failed to uninstall {pkg_name}, error: {stderr}") - return Response({ - 'is_success': False, - 'message': stderr - }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - msg = f"Package {pkg_name} is uninstalled successfully." - logger.info(msg) - return Response({ - 'is_success': True, - 'message': msg - }, status=status.HTTP_200_OK) - - @action(methods=['GET'], detail=True) - def add_to_agent_app(self, request, pk): - pass diff --git a/backend/configs/mcp.repo b/backend/configs/mcp.repo index af3884a4eef663134711aafc22787eac4676b280..aa01c9939fae61c9422cefb72cab3021de13f1ba 100644 --- a/backend/configs/mcp.repo +++ b/backend/configs/mcp.repo @@ -1,4 +1,4 @@ [mcp] name=mcp baseurl=https://eulermaker.compass-ci.openeuler.openatom.cn/api/ems4/repositories/mcps-repo/openEuler%3A24.03-LTS-SP2/$basearch/ -enabled=1 \ No newline at end of file +enabled=1 diff --git a/backend/constants/paths.py b/backend/constants/paths.py index f1b85ee99b372b4604128a4dfc6d8e2e485e0ebf..f5fa16380134c1513a0e6374d94c4ea08c473dc7 100644 --- a/backend/constants/paths.py +++ b/backend/constants/paths.py @@ -14,6 +14,7 @@ import os + # 配置文件目录 CONFIG_DIR = '/etc/dev-store' # /etc/dev-store/mariadb/mariadb.conf MariaDB 配置文件路径 @@ -30,10 +31,11 @@ LOG_DIR = '/var/log/dev-store' PLUGIN_REPO_DIR = '/etc/oedp/config/repo/cache' # /etc/oedp/config/repo/details OEDP 插件配置信息缓存目录 REPO_DETAILS_DIR = '/var/oedp/details' -# MCP 服务 repo 文件 -MCP_REPO_FILE = '/etc/yum.repos.d/mcp.repo' +# MCP 服务缓存地址 +CACHE_DIR = "/var/dev-store/mcp-assets" +# MCP 服务包存储地址 +MCP_BASE_DIR ="/var/dev-store/mcp-save" # 家目录 HOME_DIR = os.path.expanduser('~') # 插件包缓存目录 PLUGIN_CACHE_DIR = os.path.join(HOME_DIR, '.oedp') -