From ebed6499e5739d42e9d79b6b7eb437f5d119084f Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 24 Nov 2025 19:56:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E4=B8=8A=E4=B8=8B=E6=96=87?= =?UTF-8?q?=E4=BF=9D=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/constants.py | 2 + apps/models/record.py | 2 +- apps/models/task.py | 12 +-- apps/scheduler/executor/agent.py | 3 - apps/scheduler/executor/step.py | 3 - apps/scheduler/scheduler/data.py | 134 +++++++++----------------- apps/scheduler/scheduler/init.py | 55 ++++++++--- apps/scheduler/scheduler/scheduler.py | 23 ++--- apps/scheduler/scheduler/util.py | 10 -- apps/services/record.py | 40 ++++---- design/executor/agent.md | 3 - 11 files changed, 127 insertions(+), 160 deletions(-) diff --git a/apps/constants.py b/apps/constants.py index da50474d4..4748a598a 100644 --- a/apps/constants.py +++ b/apps/constants.py @@ -52,3 +52,5 @@ AGENT_MAX_STEPS = 25 AGENT_FINAL_STEP_NAME = "FINAL" # 大模型超时时间 LLM_TIMEOUT = 300.0 +# 对话标题最大长度 +CONVERSATION_TITLE_MAX_LENGTH = 30 diff --git a/apps/models/record.py b/apps/models/record.py index 2e97404cf..84d9180cc 100644 --- a/apps/models/record.py +++ b/apps/models/record.py @@ -23,7 +23,7 @@ class Record(Base): ) """对话ID""" taskId: Mapped[uuid.UUID | None] = mapped_column( # noqa: N815 - UUID(as_uuid=True), ForeignKey("framework_task.id"), nullable=True, + UUID(as_uuid=True), nullable=True, ) """任务ID""" content: Mapped[str] = mapped_column(Text, nullable=False) diff --git a/apps/models/task.py b/apps/models/task.py index 51df70961..4c5c65876 100644 --- a/apps/models/task.py +++ b/apps/models/task.py @@ -94,7 +94,9 @@ class TaskRuntime(Base): primary_key=True, ) """任务ID""" - time: Mapped[float] = mapped_column(Float, default=0.0, nullable=False) + time: Mapped[float] = mapped_column( + Float, default_factory=lambda: round(datetime.now(UTC).timestamp(), 2), nullable=False, + ) """时间""" fullTime: Mapped[float] = mapped_column(Float, default=0.0, nullable=False) # noqa: N815 """完整时间成本""" @@ -158,14 +160,8 @@ class ExecutorHistory(Base): __tablename__ = "framework_executor_history" - taskId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("framework_task.id"), nullable=False) # noqa: N815 + taskId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) # noqa: N815 """任务ID""" - executorId: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815 - """执行器ID(例如工作流ID)""" - executorName: Mapped[str] = mapped_column(String(255)) # noqa: N815 - """执行器名称(例如工作流名称)""" - executorStatus: Mapped[ExecutorStatus] = mapped_column(Enum(ExecutorStatus), nullable=False) # noqa: N815 - """执行器状态""" stepId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) # noqa: N815 """步骤ID""" stepName: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815 diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 545a2af98..8d09f0668 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -199,9 +199,6 @@ class MCPAgentExecutor(BaseExecutor): stepName=self._current_tool.toolName if self._current_tool else self.task.state.stepName, stepType=str(self._current_tool.id) if self._current_tool else "", stepStatus=step_status, - executorId=self.task.state.executorId, - executorName=self.task.state.executorName, - executorStatus=self.task.state.executorStatus, inputData=input_data or {}, outputData=output_data or {}, extraData=extra_data, diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index 6f42987fd..0e8790616 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -267,9 +267,6 @@ class StepExecutor(BaseExecutor): # 更新context history = ExecutorHistory( taskId=self.task.metadata.id, - executorId=self.task.state.executorId, - executorName=self.task.state.executorName, - executorStatus=self.task.state.executorStatus, stepId=self.step.step_id, stepName=self.step.step.name, stepType=self.task.state.stepType, diff --git a/apps/scheduler/scheduler/data.py b/apps/scheduler/scheduler/data.py index d95747239..b5990606f 100644 --- a/apps/scheduler/scheduler/data.py +++ b/apps/scheduler/scheduler/data.py @@ -2,12 +2,14 @@ """数据管理相关的Mixin类""" import logging -import uuid from datetime import UTC, datetime from typing import Any +from uuid import UUID from apps.common.security import Security -from apps.models import Conversation, ExecutorStatus, Record, RecordMetadata +from apps.models import ExecutorStatus +from apps.models import Record as PgRecord +from apps.models import RecordMetadata as PgRecordMetadata from apps.schemas.record import RecordContent from apps.schemas.request_data import RequestData from apps.schemas.task import TaskData @@ -18,9 +20,6 @@ from apps.services.task import TaskManager _logger = logging.getLogger(__name__) -# 对话标题最大长度 -_CONVERSATION_TITLE_MAX_LENGTH = 30 - class DataMixin: """处理数据保存和管理相关的逻辑""" @@ -50,57 +49,69 @@ class DataMixin: data={}, ) - def _build_record(self, encrypt_data: str, encrypt_config: dict[str, Any], current_time: float) -> Record: - """构建记录对象""" + def _build_record( + self, + encrypt_data: str, + encrypt_config: dict[str, Any], + current_time: float, + ) -> tuple[PgRecord, PgRecordMetadata]: + """构建记录对象和元数据对象""" task = self.task user_id = task.metadata.userId + record_id = task.metadata.id - return Record( - id=task.metadata.id, + pg_record = PgRecord( + id=record_id, conversationId=task.metadata.conversationId, taskId=task.metadata.id, userId=user_id, content=encrypt_data, key=encrypt_config, - metadata=RecordMetadata( - timeCost=0, - inputTokens=0, - outputTokens=0, - feature={}, - ), - createdAt=current_time, + createdAt=datetime.fromtimestamp(current_time, tz=UTC), + ) + + pg_metadata = PgRecordMetadata( + recordId=record_id, + timeCost=0, + inputTokens=0, + outputTokens=0, ) - async def _handle_document_management(self, record_group: str | None, used_docs: list[dict]) -> None: + return pg_record, pg_metadata + + async def _handle_document_management(self, used_docs: list[dict], record_id: UUID) -> None: """处理文档管理相关操作""" user_id = self.task.metadata.userId post_body = self.post_body - if record_group and post_body.conversation_id: - await DocumentManager.change_doc_status(user_id, post_body.conversation_id, record_group) + if post_body.conversation_id: + await DocumentManager.change_doc_status(user_id, post_body.conversation_id) - if record_group and used_docs: - await DocumentManager.save_answer_doc(user_id, record_group, used_docs) + if used_docs: + await DocumentManager.save_answer_doc(user_id, record_id, used_docs) - async def _save_record_data(self, record_content: RecordContent, current_time: float) -> None: - """加密并保存记录数据""" + async def _save_record_data(self, record_content: RecordContent, current_time: float) -> UUID | None: + """加密并保存记录数据,返回记录ID""" # 加密记录内容 try: encrypt_data, encrypt_config = Security.encrypt(record_content.model_dump_json(by_alias=True)) except Exception: _logger.exception("[Scheduler] 问答对加密错误") - return + return None - # 构建记录对象 - record = self._build_record(encrypt_data, encrypt_config, current_time) + # 构建记录对象和元数据对象 + pg_record, pg_metadata = self._build_record(encrypt_data, encrypt_config, current_time) # 保存记录 if self.post_body.conversation_id: await RecordManager.insert_record_data( self.task.metadata.userId, self.post_body.conversation_id, - record, + pg_record, + pg_metadata, ) + return pg_record.id + return None async def _handle_task_state(self) -> None: """根据任务状态判断删除或保存Task""" @@ -115,79 +126,28 @@ class DataMixin: else: await TaskManager.save_task(task) - async def _ensure_conversation_exists(self) -> None: - """确保存在conversation,如果不存在则创建""" - # 如果已经有 conversation_id,无需创建 - if self.task.metadata.conversationId: - return - - _logger.info("[Scheduler] 当前无 conversation_id,创建新对话") - - # 确定标题:直接使用问题的前 _CONVERSATION_TITLE_MAX_LENGTH 个字符 - title = "" - if hasattr(self.task.runtime, "question"): - question_attr = getattr(self.task.runtime, "question", "") - if question_attr and isinstance(question_attr, str): - question = question_attr.strip() - if question: - # 截取前 N 个字符作为标题 - if len(question) > _CONVERSATION_TITLE_MAX_LENGTH: - title = question[:_CONVERSATION_TITLE_MAX_LENGTH] + "..." - else: - title = question - - # 确定 app_id - app_id: uuid.UUID | None = None - if self.post_body.app and self.post_body.app.app_id: - app_id = self.post_body.app.app_id - - # 确定是否为调试模式 - debug = getattr(self.post_body, "debug", False) - - try: - # 调用 InitMixin 中的 _create_new_conversation 方法创建对话 - new_conversation: Conversation = await self._create_new_conversation( # type: ignore[attr-defined] - title=title, - user_id=self.task.metadata.userId, - app_id=app_id, - debug=debug, - ) - - # 更新 task 和 post_body 中的 conversation_id - self.task.metadata.conversationId = new_conversation.id - self.post_body.conversation_id = new_conversation.id - - _logger.info( - "[Scheduler] 成功创建新对话,conversation_id: %s, title: %s", - new_conversation.id, - title, - ) - except Exception: - _logger.exception("[Scheduler] 创建新对话失败") - raise - async def _save_data(self) -> None: """保存当前Executor、Task、Record等的数据""" task = self.task - record_group = None used_docs = self._extract_used_documents() record_content = self._build_record_content() + # 先处理任务状态(删除或保存Task) + await self._handle_task_state() + + # 再保存flow context if task.state: await TaskManager.save_flow_context(task.context) - # 在保存 Record 之前,确保存在 conversation - await self._ensure_conversation_exists() - current_time = round(datetime.now(UTC).timestamp(), 2) - - await self._handle_document_management(record_group, used_docs) - await self._save_record_data(record_content, current_time) + # 先保存record,获取record_id + record_id = await self._save_record_data(record_content, current_time) + # 再处理文档管理,传入record_id + if record_id: + await self._handle_document_management(used_docs, record_id) # 更新应用中心最近使用的应用 if self.post_body.app and self.post_body.app.app_id: await AppCenterManager.update_recent_app(self.task.metadata.userId, self.post_body.app.app_id) - - await self._handle_task_state() diff --git a/apps/scheduler/scheduler/init.py b/apps/scheduler/scheduler/init.py index 84d53c032..f515a0660 100644 --- a/apps/scheduler/scheduler/init.py +++ b/apps/scheduler/scheduler/init.py @@ -9,8 +9,9 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from apps.common.queue import MessageQueue +from apps.constants import CONVERSATION_TITLE_MAX_LENGTH from apps.llm import LLM -from apps.models import Conversation, Task, TaskRuntime, User +from apps.models import LanguageType, Task, TaskRuntime, User from apps.schemas.request_data import RequestData from apps.schemas.task import TaskData from apps.services.appcenter import AppCenterManager @@ -56,6 +57,7 @@ class InitMixin: authHeader=auth_header, userInput=self.post_body.question, language=self.post_body.language, + time=round(datetime.now(UTC).timestamp(), 2), ), state=None, context=[], @@ -119,18 +121,44 @@ class InitMixin: extensions=["jinja2.ext.loopcontrols"], ) - async def _create_new_conversation( - self, title: str, user_id: str, app_id: uuid.UUID | None = None, - *, - debug: bool = False, - ) -> Conversation: - """判断并创建新对话,并将conversation ID写入task""" - if app_id and not await AppCenterManager.validate_user_app_access(user_id, app_id): + def _extract_conversation_title(self) -> str: + """从task runtime中提取对话标题""" + default_titles: dict[LanguageType, str] = { + LanguageType.CHINESE: "新对话", + LanguageType.ENGLISH: "New Conversation", + } + title = default_titles[self.task.runtime.language] + + user_input = self.task.runtime.userInput.strip() + if user_input: + return ( + user_input[:CONVERSATION_TITLE_MAX_LENGTH] + "..." + if len(user_input) > CONVERSATION_TITLE_MAX_LENGTH + else user_input + ) + + return title + + async def _ensure_conversation_exists(self) -> None: + """确保存在conversation,如果不存在则创建""" + if self.task.metadata.conversationId: + return + + _logger.info("[Scheduler] 当前无 conversation_id,创建新对话") + title = self._extract_conversation_title() + app_id: uuid.UUID | None = None + if self.post_body.app and self.post_body.app.app_id: + app_id = self.post_body.app.app_id + + debug = getattr(self.post_body, "debug", False) + + if app_id and not await AppCenterManager.validate_user_app_access(self.task.metadata.userId, app_id): err = "Invalid app_id." raise RuntimeError(err) + new_conv = await ConversationManager.add_conversation_by_user( title=title, - user_id=user_id, + user_id=self.task.metadata.userId, app_id=app_id, debug=debug, ) @@ -138,8 +166,11 @@ class InitMixin: err = "Create new conversation failed." raise RuntimeError(err) - # 将conversation ID写入task self.task.metadata.conversationId = new_conv.id - _logger.info("[Scheduler] Conversation ID已写入Task: %s", new_conv.id) + self.post_body.conversation_id = new_conv.id - return new_conv + _logger.info( + "[Scheduler] 成功创建新对话,conversation_id: %s, title: %s", + new_conv.id, + title, + ) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index b302fe826..f0f865744 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -95,20 +95,15 @@ class Scheduler( await self._check_and_handle_executor_result() - if not self.task.metadata.conversationId: - _logger.info("[Scheduler] 创建新对话") - title = self.task.runtime.userInput[:50] if self.task.runtime.userInput else "新对话" - app_id = self.post_body.app.app_id if self.post_body.app else None - - try: - await self._create_new_conversation( - title=title, - user_id=self.task.metadata.userId, - app_id=app_id, - debug=False, - ) - except Exception: - _logger.exception("[Scheduler] 创建Conversation失败") + try: + await self._ensure_conversation_exists() + except Exception: + _logger.exception("[Scheduler] 确保Conversation存在失败") + + try: + await self._save_data() + except Exception: + _logger.exception("[Scheduler] 保存数据失败") await self._push_executor_stop_message() await self._push_done_message() diff --git a/apps/scheduler/scheduler/util.py b/apps/scheduler/scheduler/util.py index d074e97d2..e18132e56 100644 --- a/apps/scheduler/scheduler/util.py +++ b/apps/scheduler/scheduler/util.py @@ -72,16 +72,6 @@ class UtilMixin: async def _check_and_handle_executor_result(self) -> None: """检查并处理executor执行结果""" - if not self.task.runtime.fullAnswer or self.task.runtime.fullAnswer.strip() == "": - _logger.warning("[Scheduler] fullAnswer为空,设置executor状态为ERROR") - if self.task.state: - self.task.state.executorStatus = ExecutorStatus.ERROR - if not self.task.state.errorMessage: - self.task.state.errorMessage = { - "err_msg": "执行完成但未生成任何答案", - "data": {}, - } - if self.task.state and self.task.state.executorStatus == ExecutorStatus.ERROR: error_msg = "Executor执行失败" if self.task.state.errorMessage: diff --git a/apps/services/record.py b/apps/services/record.py index 5bf110973..baa136243 100644 --- a/apps/services/record.py +++ b/apps/services/record.py @@ -1,17 +1,19 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """问答对Manager""" +import json import logging import uuid -from datetime import UTC, datetime from typing import Literal from sqlalchemy import and_, select from apps.common.postgres import postgres -from apps.models import Conversation +from apps.common.security import Security +from apps.models import CommentType, Conversation from apps.models import Record as PgRecord -from apps.schemas.record import RecordComment, RecordData, RecordMetadata +from apps.models import RecordMetadata as PgRecordMetadata +from apps.schemas.record import RecordContent, RecordData, RecordMetadata logger = logging.getLogger(__name__) @@ -42,8 +44,13 @@ class RecordManager: @staticmethod - async def insert_record_data(user_id: str, conversation_id: uuid.UUID, record: RecordData) -> uuid.UUID | None: - """Record插入PostgreSQL""" + async def insert_record_data( + user_id: str, + conversation_id: uuid.UUID, + record: PgRecord, + metadata: PgRecordMetadata, + ) -> uuid.UUID | None: + """Record和RecordMetadata插入PostgreSQL""" async with postgres.session() as session: conv = (await session.scalars( select(Conversation).where( @@ -57,15 +64,8 @@ class RecordManager: logger.error("[RecordManager] 对话不存在: %s", conversation_id) return None - session.add(PgRecord( - id=record.id, - conversationId=conversation_id, - taskId=record.task_id, - userId=user_id, - content=record.content, - key=record.key, - createdAt=datetime.fromtimestamp(record.created_at, tz=UTC), - )) + session.add(record) + session.add(metadata) await session.commit() return record.id @@ -93,16 +93,18 @@ class RecordManager: records = [] for pg_record in pg_records: + # Decrypt and parse the content + decrypted_content = Security.decrypt(pg_record.content, pg_record.key) + record_content = RecordContent.model_validate(json.loads(decrypted_content)) + record = RecordData( id=pg_record.id, conversationId=pg_record.conversationId, - task_id=pg_record.taskId, - user_id=pg_record.userId, - content=pg_record.content, - key=pg_record.key, + taskId=pg_record.taskId, + content=record_content, createdAt=pg_record.createdAt.timestamp(), metadata=RecordMetadata(), - comment=RecordComment(), + comment=CommentType.NONE, ) records.append(record) return records diff --git a/design/executor/agent.md b/design/executor/agent.md index c58ca2f92..4a9324119 100644 --- a/design/executor/agent.md +++ b/design/executor/agent.md @@ -749,9 +749,6 @@ erDiagram ExecutorHistory { UUID id PK "历史记录ID" UUID taskId FK "任务ID" - string executorId "执行器ID" - string executorName "执行器名称" - ExecutorStatus executorStatus "执行器状态" UUID stepId "步骤ID" string stepName "步骤名称" string stepType "步骤类型" -- Gitee