diff --git a/backend/artifacts/methods/plugin_methods.py b/backend/artifacts/methods/plugin_methods.py index f67bad7ad06f7915040b37ea5b6a83e3cb51233c..311af5a5934665bad4ea696b159d257217aa65dc 100644 --- a/backend/artifacts/methods/plugin_methods.py +++ b/backend/artifacts/methods/plugin_methods.py @@ -17,8 +17,8 @@ import os import shutil import yaml -from django.db import connection from pathlib import Path +from rest_framework import status from artifacts.models import OEDPPlugin from artifacts.serializers import PluginBulkCreateSerializer @@ -35,6 +35,7 @@ from constants.paths import PLUGIN_REPO_DIR, REPO_DETAILS_DIR, PLUGIN_CACHE_DIR from tasks.models import Task from tasks.scheduler import scheduler from utils.cmd_executor import CommandExecutor +from utils.common import is_process_running from utils.file_handler.base_handler import FileError from utils.file_handler.yaml_handler import YAMLHandler from utils.logger import init_log @@ -80,8 +81,14 @@ class PluginMethods: logger.info("Query plugin package information successfully.") # 检查是否已经在下载中 - if plugin.download_status == Task.Status.IN_PROCESS: - msg = f"Plugin [{plugin.name}] download already in process." + target_project = os.path.join(PLUGIN_CACHE_DIR, key) + if is_process_running(f'oedp init {plugin.name} -p {target_project} -f', timeout=600): + # 更新插件下载状态 in process + if not update_plugin_status(plugin, Task.Status.IN_PROCESS): + msg = f"Plugin [{key}] failed to update status [{Task.Status.IN_PROCESS}]." + logger.error(msg) + return {'is_success': False, 'message': msg} + msg = f"Plugin [{key}] download already in process." logger.info(msg) return {'is_success': True, 'message': msg} @@ -90,32 +97,32 @@ class PluginMethods: if PluginMethods._check_plugin_file_exist(key): # 更新插件下载状态 success if not update_plugin_status(plugin, Task.Status.SUCCESS): - msg = f"Plugin [{plugin.name}] failed to update status [{Task.Status.SUCCESS}]." + msg = f"Plugin [{key}] failed to update status [{Task.Status.SUCCESS}]." logger.error(msg) return {'is_success': False, 'message': msg} # 更新action列表 action_list = get_plugin_action_list(plugin) if not update_plugin_action_list(plugin, action_list): - msg = f"Failed to update plugin [{plugin.name}] action_list" + msg = f"Failed to update plugin [{key}] action_list" logger.error(msg) return {'is_success': False, 'message': msg} - msg = f"Plugin [{plugin.name}] already exists." + msg = f"Plugin [{key}] already exists." logger.info(msg) return {'is_success': True, 'message': msg} logger.info(f"Start to download plugin [{key}].") # 更新插件下载状态 in process if not update_plugin_status(plugin, Task.Status.IN_PROCESS): - msg = f"Plugin [{plugin.name}] failed to update status [{Task.Status.IN_PROCESS}]." + msg = f"Plugin [{key}] failed to update status [{Task.Status.IN_PROCESS}]." logger.error(msg) return {'is_success': False, 'message': msg} download_plugin_task = PluginDownloadTask(plugin, name=f"plugin_download_{key}_task") scheduler.add_task(download_plugin_task) - msg = f"Plugin [{plugin.name}] begin downloading." + msg = f"Plugin [{key}] begin downloading." logger.info(msg) return {'is_success': True, 'message': msg, 'task_name': download_plugin_task.name} @@ -193,17 +200,26 @@ class PluginMethods: logger.error(msg) return {'is_success': False, "message": msg} - # 检查action合法性以及是否正在执行中 + # 检查action合法性 action_list = plugin.action_list action_object = PluginMethods._get_plugin_action_from_list(action_list, action_name) if not action_object: msg = f"Plugin [{plugin.name}] has no action [{action_name}]." logger.error(msg) return {'is_success': False, "message": msg} - if action_object['status'] == Task.Status.IN_PROCESS: + + # 检查是否已经在执行中 + target_project = os.path.join(PLUGIN_CACHE_DIR, key) + if is_process_running(f"oedp run -p {target_project} -lt {action_name}", timeout=3600): msg = f"Plugin [{plugin.name}] action [{action_name}] is in process." logger.error(msg) return {'is_success': False, "message": msg} + + # 每个插件同时只允许触发一个action + if is_process_running(f"oedp run -p {target_project} -lt", timeout=3600): + msg = f"Plugin [{plugin.name}]'s another action is in process, only one action can be run at a time." + logger.error(msg) + return {'is_success': False, "message": msg} # 更新部署操作状态 in process set_plugin_action_status(action_list, action_name, Task.Status.IN_PROCESS) @@ -221,6 +237,102 @@ class PluginMethods: logger.info(msg) return {'is_success': True, 'message': msg, 'task_name': plugin_action_task.name} + @staticmethod + def get_plugin_config(key: str): + if not PluginMethods._check_plugin_file_exist(key): + msg = f"Plugin [{key}] is not downloaded or files missing." + logger.error(msg) + return status.HTTP_400_BAD_REQUEST ,{'is_success': False, "message": msg, "config_text": ""} + target_project = os.path.join(PLUGIN_CACHE_DIR, key) + config_path = os.path.join(target_project, "config.yaml") + try: + with open(config_path, 'r', encoding='utf-8') as f: + config_text = f.read() + msg = f"Get plugin [{key}] config successfully." + return status.HTTP_200_OK, {'is_success': True, "message": msg, "config_text": config_text} + except Exception as e: + msg = f"Failed to read plugin [{key}] config file: {str(e)}" + logger.error(msg) + return status.HTTP_500_INTERNAL_SERVER_ERROR, {'is_success': False, "message": msg, "config_text": ""} + + @staticmethod + def set_plugin_config(key: str, config_text: str): + if not PluginMethods._check_plugin_file_exist(key): + msg = f"Plugin [{key}] is not downloaded or files missing." + logger.error(msg) + return status.HTTP_400_BAD_REQUEST ,{'is_success': False, "message": msg, "config_text": ""} + target_project = os.path.join(PLUGIN_CACHE_DIR, key) + config_path = os.path.join(target_project, "config.yaml") + config_bak_path = os.path.join(target_project, "config.yaml.bak") + + try: + # 如果备份文件不存在,创建备份 + if not os.path.exists(config_bak_path) and os.path.exists(config_path): + shutil.copy2(config_path, config_bak_path) + # 写入新配置 + fd = os.open(config_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o644) + with os.fdopen(fd, 'w', encoding='utf-8') as f: + f.write(config_text) + msg = f"Set plugin [{key}] config successfully." + return status.HTTP_200_OK, {'is_success': True, "message": msg, "config_text": ""} + except Exception as e: + msg = f"Failed to set plugin [{key}] config: {str(e)}" + logger.error(msg) + return status.HTTP_500_INTERNAL_SERVER_ERROR, {'is_success': False, "message": msg, "config_text": ""} + + @staticmethod + def reset_plugin_config(key: str): + if not PluginMethods._check_plugin_file_exist(key): + msg = f"Plugin [{key}] is not downloaded or files missing." + logger.error(msg) + return status.HTTP_400_BAD_REQUEST ,{'is_success': False, "message": msg, "config_text": ""} + target_project = os.path.join(PLUGIN_CACHE_DIR, key) + config_path = os.path.join(target_project, "config.yaml") + config_bak_path = os.path.join(target_project, "config.yaml.bak") + + try: + if os.path.exists(config_bak_path): + if os.path.exists(config_path): + os.remove(config_path) + shutil.copy2(config_bak_path, config_path) + with open(config_path, 'r', encoding='utf-8') as f: + config_text = f.read() + msg = f"Reset plugin [{key}] config successfully." + return status.HTTP_200_OK, {'is_success': True, "message": msg, "config_text": config_text} + + except Exception as e: + msg = f"Failed to reset plugin [{key}] config: {str(e)}" + logger.error(msg) + return status.HTTP_500_INTERNAL_SERVER_ERROR, {'is_success': False, "message": msg, "config_text": ""} + + @staticmethod + def get_plugin_log(key: str): + if not PluginMethods._check_plugin_file_exist(key): + msg = f"Plugin [{key}] is not downloaded or files missing." + logger.error(msg) + return status.HTTP_400_BAD_REQUEST ,{'is_success': False, "message": msg, "config_text": ""} + target_project = os.path.join(PLUGIN_CACHE_DIR, key) + log_file = os.path.join(target_project, "run.log") + if not os.path.exists(log_file): + msg = f"Plugin [{key}] has no log file." + return status.HTTP_200_OK, {'is_success': True, "message": msg, "log_text": ""} + + try: + file_size = os.path.getsize(log_file) + if file_size > 100 * 1024: # 超过100KB + with open(log_file, 'rb') as f: + f.seek(-100 * 1024, os.SEEK_END) + log_text = f.read().decode('utf-8', errors='ignore') + else: + with open(log_file, 'r', encoding='utf-8') as f: + log_text = f.read() + msg = f"Get log successfully." + return status.HTTP_200_OK, {'is_success': True, "message": msg, "log_text": log_text} + except Exception as e: + msg = f"Failed to read log file {log_file}: {str(e)}" + logger.error(msg) + return status.HTTP_500_INTERNAL_SERVER_ERROR, {'is_success': False, "message": msg, "log_text": ""} + @staticmethod def _update_plugin_info(): """更新插件仓库信息""" diff --git a/backend/artifacts/models.py b/backend/artifacts/models.py index dca48570d2840230953f3ff19622eb16bf26673a..de553cef0a12e2f41927a1539769812efd502336 100644 --- a/backend/artifacts/models.py +++ b/backend/artifacts/models.py @@ -43,7 +43,7 @@ class OEDPPlugin(models.Model): readme = models.TextField("README文本", blank=True, null=True) icon = models.TextField("图标数据", blank=True, null=True) localhost_available = models.BooleanField("是否支持本地单节点部署", default=False) - download_status = models.CharField("下载状态", max_length=256, default="not yet") + download_status = models.CharField("下载状态", max_length=256, default=Task.Status.NOT_YET) action_list = JSONField("部署操作列表", default=list, help_text="列表,每个元素包含name,title,description,status") diff --git a/backend/artifacts/serializers.py b/backend/artifacts/serializers.py index db5f4c1b65e862a3c7cf171928563de7c9850238..6346e27d9a02945d817eba5c3d4d8f3aa3ae8789 100644 --- a/backend/artifacts/serializers.py +++ b/backend/artifacts/serializers.py @@ -19,7 +19,6 @@ from rest_framework import serializers from artifacts.models import MCPServer, OEDPPlugin from constants.choices import ArtifactTag -from constants.paths import PLUGIN_CACHE_DIR from tasks.models import Task from utils.cmd_executor import CommandExecutor from utils.common import is_process_running diff --git a/backend/artifacts/tasks/plugin_action_task.py b/backend/artifacts/tasks/plugin_action_task.py index 896acca3b15767c79ea7ddafed9d65770936b439..ed8eb0008bc428504289f1d28d84f44a53ddce81 100644 --- a/backend/artifacts/tasks/plugin_action_task.py +++ b/backend/artifacts/tasks/plugin_action_task.py @@ -49,11 +49,10 @@ class PluginActionTask(BaseTask): action_list = self.plugin.action_list target_project = os.path.join(PLUGIN_CACHE_DIR, self.plugin.key) - log_file = os.path.join(target_project, "run.log") # 执行部署操作 - cmd = ['oedp', 'run', self.action_name, '-p', target_project, '-lt'] - cmd_executor = CommandExecutor(cmd) + cmd = ['oedp', 'run', '-p', target_project, '-lt', self.action_name] + cmd_executor = CommandExecutor(cmd, timeout=3600) _, stderr, code = cmd_executor.run() if code != 0: diff --git a/backend/artifacts/tasks/plugin_download_task.py b/backend/artifacts/tasks/plugin_download_task.py index 252836e57aa9a12a98f0671ef41ebe540c8a5f22..11f98275aa20461e4fb190c9262f051fa6e02b17 100644 --- a/backend/artifacts/tasks/plugin_download_task.py +++ b/backend/artifacts/tasks/plugin_download_task.py @@ -44,7 +44,7 @@ class PluginDownloadTask(BaseTask): Raises: TaskExecuteError: 当下载失败时抛出 """ - logger.info(f"Start downloading plugin: {self.plugin.name}") + logger.info(f"Start downloading plugin: {self.plugin.key}") # 确保插件缓存目录存在 if not os.path.exists(PLUGIN_CACHE_DIR): @@ -55,11 +55,11 @@ class PluginDownloadTask(BaseTask): # 执行下载命令 cmd = ['oedp', 'init', self.plugin.name, '-p', target_project, '-f'] - cmd_executor = CommandExecutor(cmd) + cmd_executor = CommandExecutor(cmd, timeout=580) _, stderr, code = cmd_executor.run() if code != 0: - logger.error(f"Failed to download plugin {self.plugin.name}, error: {stderr}") + logger.error(f"Failed to download plugin {self.plugin.key}, error: {stderr}") # 执行失败需清理目录 if os.path.exists(target_project): try: @@ -70,21 +70,21 @@ class PluginDownloadTask(BaseTask): # 更新插件下载状态 not yet if not update_plugin_status(self.plugin, Task.Status.NOT_YET): - logger.error(f"Failed to update plugin [{self.plugin.name}] status to [{Task.Status.NOT_YET}]") + logger.error(f"Failed to update plugin [{self.plugin.key}] status to [{Task.Status.NOT_YET}]") raise TaskExecuteError(f"Failed to update plugin status") raise TaskExecuteError(stderr) - logger.info(f"Successfully downloaded plugin: {self.plugin.name}") + logger.info(f"Successfully downloaded plugin: {self.plugin.key}") # 更新插件下载状态 success if not update_plugin_status(self.plugin, Task.Status.SUCCESS): - logger.error(f"Failed to update plugin [{self.plugin.name}] status to [{Task.Status.SUCCESS}]") + logger.error(f"Failed to update plugin [{self.plugin.key}] status to [{Task.Status.SUCCESS}]") raise TaskExecuteError(f"Failed to update plugin status") action_list = get_plugin_action_list(self.plugin) if not update_plugin_action_list(self.plugin, action_list): - logger.error(f"Failed to update plugin [{self.plugin.name}] action_list") + logger.error(f"Failed to update plugin [{self.plugin.key}] action_list") raise TaskExecuteError(f"Failed to update plugin action_list") - return f"Download plugin [{self.plugin.name}] successfully." + return f"Download plugin [{self.plugin.key}] successfully." diff --git a/backend/artifacts/utils.py b/backend/artifacts/utils.py index 6af8264208a3f287da0725e443066d06b7a96fff..8199ded02afb7304ac1532717c3b9cfbfe141ab5 100644 --- a/backend/artifacts/utils.py +++ b/backend/artifacts/utils.py @@ -16,9 +16,9 @@ import os import yaml from django.db import connection from tasks.models import Task -from artifacts.models import OEDPPlugin from artifacts.serializers import PluginItemSerializer from constants.paths import PLUGIN_CACHE_DIR +from utils.common import is_process_running from utils.logger import init_log logger = init_log('run.log') @@ -68,11 +68,14 @@ def get_plugin_action_list(plugin): action_list = [] for action_name, action_data in main['action'].items(): + status = Task.Status.NOT_YET + if is_process_running(f"oedp run -p {target_project} -lt {action_name}", timeout=3600): + status = Task.Status.IN_PROCESS action_info = { "name": action_name, "title": action_data.get('title', action_name), "description": action_data.get('description', ''), - "status": Task.Status.NOT_YET + "status": status } action_list.append(action_info) diff --git a/backend/artifacts/views.py b/backend/artifacts/views.py index 0a459558c294d000967e0cab000812acef6d4c23..49cab0d43eb510c0afc927f7cb70e92cd5e8382d 100644 --- a/backend/artifacts/views.py +++ b/backend/artifacts/views.py @@ -12,6 +12,9 @@ # Create: 2025-07-18 # ====================================================================================================================== +import os + +from datetime import datetime from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.response import Response @@ -55,9 +58,10 @@ class ArtifactViewSet(viewsets.GenericViewSet): return Response(result, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # 仅返回调用结果 + data_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") msg = "Sync data successfully." logger.info(msg) - return Response({'is_success': True, 'message': msg}, status=status.HTTP_200_OK) + return Response({'is_success': True, 'message': msg, 'time': data_time}, status=status.HTTP_200_OK) def list(self, request): """获取插件和MCP服务列表 @@ -166,6 +170,7 @@ class ArtifactViewSet(viewsets.GenericViewSet): return Response(result, status=status_code) @action(methods=['POST'], detail=False) + @check_scheduler_load def plugin_action(self, request): """执行插件的某个部署操作 """ @@ -181,6 +186,33 @@ class ArtifactViewSet(viewsets.GenericViewSet): status_code = status.HTTP_202_ACCEPTED return Response(result, status=status_code) + + @action(methods=['GET'], detail=False) + def plugin_config(self, request): + """插件用户配置相关操作 + """ + logger.info(f"==== API: [POST] /v1.0/artifacts/plugin_config/ ====") + key = request.query_params.get('key') + operation = request.query_params.get('operation', "get") + config_text = request.query_params.get('config_text', "") + + if operation == "set": + status_code, result = PluginMethods.set_plugin_config(key, config_text) + elif operation == "reset": + status_code, result = PluginMethods.reset_plugin_config(key) + else: + status_code, result = PluginMethods.get_plugin_config(key) + + return Response(result, status=status_code) + + @action(methods=['GET'], detail=False) + def plugin_log(self, request): + """插件用户配置相关操作 + """ + logger.info(f"==== API: [POST] /v1.0/artifacts/plugin_log/ ====") + key = request.query_params.get('key') + status_code, result = PluginMethods.get_plugin_log(key) + return Response(result, status=status_code) @action(methods=['GET'], detail=True) @check_scheduler_load diff --git a/backend/utils/cmd_executor.py b/backend/utils/cmd_executor.py index 5b6835e53e4a92ebe24ad258b0a5783b1d479b61..7fd09240257b5a2a1faa571dc55368e8961725ac 100644 --- a/backend/utils/cmd_executor.py +++ b/backend/utils/cmd_executor.py @@ -23,7 +23,7 @@ TIMEOUT_CODE = 2 class CommandExecutor: - def __init__(self, cmd, encoding=sys.getdefaultencoding(), timeout=300): + def __init__(self, cmd, encoding=sys.getdefaultencoding(), timeout=120): self.process = subprocess.Popen( cmd, universal_newlines=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, start_new_session=True, encoding=encoding diff --git a/backend/utils/common.py b/backend/utils/common.py index 7cb1b5c14d5d0560e6d65dbcddef8e5c1aa5eb1f..83fe22377e3f0ae95c25336b9555370e4ddf00bd 100644 --- a/backend/utils/common.py +++ b/backend/utils/common.py @@ -12,20 +12,25 @@ # Create: 2025-07-18 # ====================================================================================================================== +import time import psutil -def is_process_running(keyword): +def is_process_running(keyword, timeout=600): """ 检查是否有进程的命令行或名称中包含指定关键字 :param keyword: 要搜索的关键字(字符串) :return: True/False 表示是否找到匹配的进程 + 如果进程运行超时(可能出现运行故障),则忽略 """ - for proc in psutil.process_iter(['pid', 'name', 'cmdline']): + for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'create_time']): try: # 检查进程名或命令行参数中是否包含关键字 if keyword.lower() in ' '.join(proc.info['cmdline']).lower() or \ keyword.lower() in proc.info['name'].lower(): + # 检查进程运行时间是否超过600秒 + if (time.time() - proc.info['create_time']) > timeout: + continue return True except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue