diff --git a/jiuwen/core/component/llm_comp.py b/jiuwen/core/component/llm_comp.py index aca71ae94a8ea2497931875e4113d3699ca0025d..9b08a05185df8367566ef3bdee86841e444a4f80 100644 --- a/jiuwen/core/component/llm_comp.py +++ b/jiuwen/core/component/llm_comp.py @@ -13,6 +13,7 @@ from jiuwen.core.common.utils.utils import WorkflowLLMUtils, OutputFormatter, Va from jiuwen.core.component.base import ComponentConfig, WorkflowComponent from jiuwen.core.context.context import Context from jiuwen.core.graph.executable import Executable, Input, Output +from jiuwen.core.stream.writer import CustomSchema from jiuwen.core.utils.llm.base import BaseChatModel from jiuwen.core.utils.llm.model_utils.model_factory import ModelFactory from jiuwen.core.utils.prompt.template.template import Template @@ -96,7 +97,7 @@ class LLMPromptFormatter: if res_type == "markdown": instruction = ( response_format.get("markdownInstruction") - or self._DEFAULT_MARKDOWN_INSTRUCTION + or LLMPromptFormatter._DEFAULT_MARKDOWN_INSTRUCTION ) prompt = instruction.replace("${query}", query) @@ -145,6 +146,10 @@ class LLMExecutable(Executable): logger.info("[%s] model inputs %s", self._context.executable_id, model_inputs) llm_response = await self._llm.ainvoke(model_inputs) response = llm_response.content + + # 临时调试:用于调用streamWriter实现流式输出 + await context.stream_writer_manager.get_custom_writer().write(CustomSchema(**dict(streamOutput=response))) + self._context.state.update({"response": response}) logger.info("[%s] model outputs %s", self._context.executable_id, response) return self._create_output(response) diff --git a/jiuwen/core/component/questioner_comp.py b/jiuwen/core/component/questioner_comp.py index 661d15f58635ce420bd7d8a0a3d8374fb3785029..bf39f697f4d88d6cdc3016af681a11248e0e1824 100644 --- a/jiuwen/core/component/questioner_comp.py +++ b/jiuwen/core/component/questioner_comp.py @@ -401,9 +401,9 @@ class QuestionerExecutable(Executable): async def invoke(self, inputs: Input, context: Context) -> Output: tracer = context.tracer if tracer: - tracer.trigger("tracer_workflow", "on_invoke", invoke_id=context.executable_id, - parent_node_id=context.parent_id, - on_invoke_data={"on_invoke_data": "extra trace data"}) + await tracer.trigger("tracer_workflow", "on_invoke", + invoke_id=context.executable_id, parent_node_id=context.parent_id, + on_invoke_data={"on_invoke_data": "extra trace data"}) state_from_context = self._load_state_from_context(context) if state_from_context.is_undergoing_interaction(): diff --git a/jiuwen/core/component/start_comp.py b/jiuwen/core/component/start_comp.py index e1720f18646c658c9ba1bb5f67a8d8a3026d11a3..1612de0381a6f525c208088e2a3b0c333f46bd03 100644 --- a/jiuwen/core/component/start_comp.py +++ b/jiuwen/core/component/start_comp.py @@ -29,7 +29,7 @@ class Start(Executable,WorkflowComponent): return dict( { SYSTEM_FIELDS: { - "query": inputs_copy.pop("query", ""), + "query": inputs_copy.get("systemFields").get("query", ""), "dialogueHistory": [], "conversationHistory": inputs_copy.pop("conversationHistory", []), }, diff --git a/tests/system_tests/workflow/test_real_workflow.py b/tests/system_tests/workflow/test_real_workflow.py index 172cb9d485101cba7787b8a167480319f5e45cf6..ab6c2020134b2b2f652fd364a73adeb9f6de4f20 100644 --- a/tests/system_tests/workflow/test_real_workflow.py +++ b/tests/system_tests/workflow/test_real_workflow.py @@ -15,11 +15,13 @@ from __future__ import annotations import asyncio +import os import unittest from unittest.mock import patch from jiuwen.core.component.branch_comp import BranchComponent from jiuwen.core.component.common.configs.model_config import ModelConfig +from jiuwen.core.component.end_comp import End from jiuwen.core.component.intent_detection_comp import ( IntentDetectionComponent, IntentDetectionConfig, @@ -30,10 +32,12 @@ from jiuwen.core.component.questioner_comp import ( QuestionerComponent, QuestionerConfig, ) +from jiuwen.core.component.start_comp import Start from jiuwen.core.component.tool_comp import ToolComponent, ToolComponentConfig from jiuwen.core.context.config import Config from jiuwen.core.context.context import Context from jiuwen.core.context.memory.base import InMemoryState +from jiuwen.core.stream.writer import CustomSchema from jiuwen.core.utils.llm.base import BaseModelInfo from jiuwen.core.utils.prompt.template.template import Template from jiuwen.core.utils.tool.service_api.param import Param @@ -44,10 +48,10 @@ from jiuwen.graph.pregel.graph import PregelGraph from tests.unit_tests.workflow.test_mock_node import MockEndNode, MockStartNode # 注意:切勿将真实密钥提交到仓库! -API_BASE = "https://api.siliconflow.cn/v1/chat/completions" -API_KEY = "sk-cuitxjipgkvlhjubkcxjrgxfphczzpihefbeutobhytbfuig" -MODEL_NAME = "Qwen/Qwen3-14B" -MODEL_PROVIDER = "siliconflow" +API_BASE = os.getenv("API_BASE", "") +API_KEY = os.getenv("API_KEY", "") +MODEL_NAME = os.getenv("MODEL_NAME", "") +MODEL_PROVIDER = os.getenv("MODEL_PROVIDER", "") # Mock 插件返回值 _FINAL_RESULT: str = "上海今天晴 30°C" @@ -210,6 +214,12 @@ class RealWorkflowTest(unittest.TestCase): tool_config = ToolComponentConfig(needValidate=False) return ToolComponent(tool_config) + @staticmethod + async def _async_stream_workflow_for_stream_writer(flow, inputs, context, tracer_chunks): + async for chunk in flow.stream(inputs, context): + if isinstance(chunk, CustomSchema): + tracer_chunks.append(chunk) + def _build_workflow( self, mock_plugin_get_tool, @@ -312,3 +322,44 @@ class RealWorkflowTest(unittest.TestCase): result = self.loop.run_until_complete(flow.invoke(inputs, context)) self.assertEqual(result, '上海今天晴 30°C') + + def test_stream_workflow_llm_with_stream_writer(self): + """ + 测试LLM组件通过StreamWriter流出数据 + """ + context = Context(config=Config(), state=InMemoryState(), store=None) + flow = Workflow(workflow_config=WorkflowConfig(), graph=PregelGraph()) + + start_component = Start("s", + { + "userFields": {"inputs": [], "outputs": []}, + "systemFields": {"input": [ + {"id": "query", "type": "String", "required": "true", "sourceType": "ref"} + ] + } + } + ) + end_component = End("e", "e", {"responseTemplate": "{{output}}"}) + + llm_config = LLMCompConfig( + model=RealWorkflowTest._create_model_config(), + template_content=[{"role": "user", "content": "{{query}}"}], + response_format={"type": "text"}, + output_config={ + "joke": {"type": "string", "description": "笑话", "required": True} + }, + ) + llm_component = LLMComponent(llm_config) + + flow.set_start_comp("s", start_component, inputs_schema={"systemFields": {"query": "${query}"}}) + flow.set_end_comp("e", end_component, + inputs_schema={"userFields": {"output": "${llm.userFields}"}}) + flow.add_workflow_comp("llm", llm_component, inputs_schema={"userFields": {"query": "${s.systemFields.query}"}}) + + flow.add_connection("s", "llm") + flow.add_connection("llm", "e") + + inputs = {"query": "写一个笑话。注意:不要超过20个字!"} + writer_chunks = [] + self.loop.run_until_complete(self._async_stream_workflow_for_stream_writer(flow, inputs, context, writer_chunks)) + print(writer_chunks) diff --git a/tests/unit_tests/workflow/test_questioner_comp.py b/tests/unit_tests/workflow/test_questioner_comp.py index f192d42c47176d5954d6b21e28d4e7f09dca9921..236c95b0a5c0215cbc22475a2686c9a628f18581 100644 --- a/tests/unit_tests/workflow/test_questioner_comp.py +++ b/tests/unit_tests/workflow/test_questioner_comp.py @@ -3,15 +3,14 @@ import unittest from unittest.mock import patch from jiuwen.core.component.common.configs.model_config import ModelConfig +from jiuwen.core.component.end_comp import End from jiuwen.core.component.questioner_comp import QuestionerInteractState, FieldInfo, QuestionerConfig, \ QuestionerComponent +from jiuwen.core.component.start_comp import Start from jiuwen.core.context.config import Config from jiuwen.core.context.context import Context from jiuwen.core.context.memory.base import InMemoryState -from jiuwen.core.stream.emitter import StreamEmitter -from jiuwen.core.stream.manager import StreamWriterManager from jiuwen.core.stream.writer import TraceSchema -from jiuwen.core.tracer.tracer import Tracer from jiuwen.core.utils.prompt.template.template import Template from jiuwen.core.workflow.base import Workflow from tests.unit_tests.workflow.test_mock_node import MockStartNode, MockEndNode @@ -55,8 +54,16 @@ class QuestionerTest(unittest.TestCase): FieldInfo(field_name="time", description="时间", required=True, default_value="today") ] - start_component = MockStartNode("s") - end_component = MockEndNode("e") + start_component = Start("s", + { + "userFields": {"inputs": [], "outputs": []}, + "systemFields": {"input": [ + {"id": "query", "type": "String", "required": "true", "sourceType": "ref"} + ] + } + } + ) + end_component = End("e", "e", {"responseTemplate": "{{output}}"}) model_config = ModelConfig(model_provider="openai") questioner_config = QuestionerConfig( @@ -68,15 +75,16 @@ class QuestionerTest(unittest.TestCase): ) questioner_component = QuestionerComponent(questioner_comp_config=questioner_config) - flow.set_start_comp("s", start_component, inputs_schema={"query": "${query}"}) - flow.set_end_comp("e", end_component, inputs_schema={"output": "${questioner.userFields.key_fields}"}) + flow.set_start_comp("s", start_component, inputs_schema={"systemFields": {"query": "${query}"}}) + flow.set_end_comp("e", end_component, + inputs_schema={"userFields": {"output": "${questioner.userFields.key_fields}"}}) flow.add_workflow_comp("questioner", questioner_component, inputs_schema={"query": "${start.query}"}) flow.add_connection("s", "questioner") flow.add_connection("questioner", "e") result = self.invoke_workflow({"query": "查询杭州的天气"}, context, flow) - assert result == {'output': {'location': 'hangzhou', 'time': 'today'}} + assert result == {'responseContent': "{'location': 'hangzhou', 'time': 'today'}"} @patch("jiuwen.core.component.questioner_comp.QuestionerExecutable._load_state_from_context") @@ -86,6 +94,7 @@ class QuestionerTest(unittest.TestCase): @patch("jiuwen.core.utils.llm.model_utils.model_factory.ModelFactory.get_model") def test_invoke_questioner_component_in_workflow_repeat_ask(self, mock_get_model, mock_init_prompt, mock_llm_inputs, mock_extraction, mock_state_from_context): + # TODO: 调试中断恢复时调整这个用例 mock_get_model.return_value = MockLLMModel() mock_prompt_template = [ dict(role="system", content="系统提示词"), @@ -141,6 +150,8 @@ class QuestionerTest(unittest.TestCase): 3. agent用agent_tracer,workflow用workflow_tracer 4. event有定义,参考handler.py的@trigger_event 5. on_invoke_data是固定的结构,结构为 dict(on_invoke_data={"on_invoke_data": "extra trace data"}) + 6. span只有agent需要 + 7. workflow不需要context显式地调用set_tracer ''' mock_get_model.return_value = MockLLMModel() @@ -153,10 +164,6 @@ class QuestionerTest(unittest.TestCase): mock_extraction.return_value = dict(location="hangzhou") context = Context(config=Config(), state=InMemoryState(), store=None) - context.set_stream_writer_manager(StreamWriterManager(StreamEmitter())) - tracer = Tracer() - tracer.init(context.stream_writer_manager, context.callback_manager) - context.set_tracer(tracer) flow = create_flow() key_fields = [ @@ -164,8 +171,16 @@ class QuestionerTest(unittest.TestCase): FieldInfo(field_name="time", description="时间", required=True, default_value="today") ] - start_component = MockStartNode("s") - end_component = MockEndNode("e") + start_component = Start("s", + { + "userFields":{"inputs":[],"outputs":[]}, + "systemFields":{"input":[ + {"id":"query", "type":"String", "required":"true", "sourceType":"ref"} + ] + } + } + ) + end_component = End("e", "e", {"responseTemplate": "{{output}}"}) model_config = ModelConfig(model_provider="openai") questioner_config = QuestionerConfig( @@ -177,8 +192,8 @@ class QuestionerTest(unittest.TestCase): ) questioner_component = QuestionerComponent(questioner_comp_config=questioner_config) - flow.set_start_comp("s", start_component, inputs_schema={"query": "${query}"}) - flow.set_end_comp("e", end_component, inputs_schema={"output": "${questioner.userFields.key_fields}"}) + flow.set_start_comp("s", start_component, inputs_schema={"systemFields": {"query": "${query}"}}) + flow.set_end_comp("e", end_component, inputs_schema={"userFields": {"output": "${questioner.userFields.key_fields}"}}) flow.add_workflow_comp("questioner", questioner_component, inputs_schema={"query": "${start.query}"}) flow.add_connection("s", "questioner")