diff --git a/jiuwen/core/tracer/span.py b/jiuwen/core/tracer/span.py index 15f4e0bdab7a71aa2638746586e35ac1d1abeaa6..4cf9824602abedaa6e276c1eb68b6498528d5d94 100644 --- a/jiuwen/core/tracer/span.py +++ b/jiuwen/core/tracer/span.py @@ -5,7 +5,7 @@ from pydantic import ConfigDict, Field, BaseModel class Span(BaseModel): - trace_id: str + trace_id: str = Field(alias="traceId") start_time: datetime = Field(default=None, alias="startTime") end_time: Optional[datetime] = Field(default=None, alias="endTime") inputs: Optional[dict] = Field(default=None, alias="inputs") diff --git a/tests/unit_tests/tracer/test_agent.py b/tests/unit_tests/tracer/test_agent.py index 101e3a06f265e275f83c862711b12b31fe1a00bf..e27f5b8865124ee23a50069e12afc9282d4a8659 100644 --- a/tests/unit_tests/tracer/test_agent.py +++ b/tests/unit_tests/tracer/test_agent.py @@ -3,8 +3,7 @@ import unittest from jiuwen.core.agent.task.task_context import TaskContext from jiuwen.core.common.logging.base import logger -from jiuwen.core.stream.writer import TraceSchema, CustomSchema -from jiuwen.core.tracer.tracer import Tracer +from jiuwen.core.stream.writer import CustomSchema from tests.unit_tests.tracer.test_mock_node_with_tracer import StreamNodeWithTracer from tests.unit_tests.tracer.test_workflow import record_tracer_info, create_flow from tests.unit_tests.workflow.test_mock_node import MockEndNode, MockStartNode @@ -59,15 +58,15 @@ class MockAgent(unittest.TestCase): def tearDown(self): record_tracer_info(self.tracer_chunks, "test_agent_workflow_seq_exec_stream_workflow_with_tracer.json") - async def run_workflow_seq_exec_stream_workflow_with_tracer(self, tracer: Tracer): + async def run_workflow_seq_exec_stream_workflow_with_tracer(self, context: TaskContext): """ start -> a -> b -> end """ # workflow与agent共用一个tracer - context = TaskContext(id="test") + workflow_context = context.create_workflow_context() + assert (workflow_context.tracer() is self.tracer) - # async def stream_workflow(): flow = create_flow() flow.set_start_comp("start", MockStartNode("start"), inputs_schema={ @@ -110,16 +109,12 @@ class MockAgent(unittest.TestCase): } index_dict = {key: 0 for key in expected_datas_model.keys()} - async for chunk in flow.stream({"a": 1, "b": "haha"}, context.create_workflow_context()): - if not isinstance(chunk, TraceSchema): - node_id = chunk.node_id - index = index_dict[node_id] - assert chunk == expected_datas_model[node_id][index], f"Mismatch at node {node_id} index {index}" - logger.info(f"stream chunk: {chunk}") - index_dict[node_id] = index_dict[node_id] + 1 - else: - print(f"stream chunk: {chunk}") - self.tracer_chunks.append(chunk) + async for chunk in flow.stream({"a": 1, "b": "haha"}, workflow_context): + node_id = chunk.node_id + index = index_dict[node_id] + assert chunk == expected_datas_model[node_id][index], f"Mismatch at node {node_id} index {index}" + logger.info(f"stream chunk: {chunk}") + index_dict[node_id] = index_dict[node_id] + 1 async def run_agent_workflow_seq_exec_stream_workflow_with_tracer(self): # context手动初始化tracer,agent和workflow共用一个tracer @@ -138,7 +133,7 @@ class MockAgent(unittest.TestCase): await runner.stream(runner_span) # 模拟运行workflow - await self.run_workflow_seq_exec_stream_workflow_with_tracer(context.tracer()) + await self.run_workflow_seq_exec_stream_workflow_with_tracer(context) await self.tracer.trigger("tracer_agent", "on_chain_end", span=agent_span, outputs={"outputs": "mock chain"}, diff --git a/tests/unit_tests/tracer/test_workflow.py b/tests/unit_tests/tracer/test_workflow.py index 3c52f5c4576fd3a8e742c4c8ec7163d7d7b230a9..0d2da007438bb4cee29be07e47773c5461027310 100644 --- a/tests/unit_tests/tracer/test_workflow.py +++ b/tests/unit_tests/tracer/test_workflow.py @@ -400,7 +400,23 @@ class WorkflowTest(unittest.TestCase): sub_workflow.add_connection("sub_start", "sub_a") sub_workflow.add_connection("sub_a", "sub_end") - sub_workflow_2 = copy.deepcopy(sub_workflow) + sub_workflow_2 = create_flow() + sub_workflow_2.set_start_comp("sub_start", MockStartNode("start"), + inputs_schema={ + "a": "${a}", + "b": "${b}", + "c": 1, + "d": [1, 2, 3]}) + + sub_workflow_2.add_workflow_comp("sub_a", StreamNodeWithTracer("a", expected_datas), + inputs_schema={ + "aa": "${sub2_start.a}", + "ac": "${sub2_start.c}"}) + sub_workflow_2.set_end_comp("sub_end", MockEndNode("end"), + inputs_schema={ + "result": "${sub_a.aa}"}) + sub_workflow_2.add_connection("sub_start", "sub_a") + sub_workflow_2.add_connection("sub_a", "sub_end") # main_workflow: start->a(sub workflow) | b(sub workflow) ->end main_workflow = create_flow()