# AI Workflow Engine **Repository Path**: coderyrepos/ai-workflow-engine ## Basic Information - **Project Name**: AI Workflow Engine - **Description**: 一个专注于构建AI工作流引擎的开源项目,支持自动化、可扩展的工作流设计与执行,适用于多种AI应用场景。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2025-12-26 - **Last Updated**: 2026-01-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # AI Workflow Engine AI Workflow Engine 是一个强大的工作流引擎,旨在帮助开发者构建和执行复杂的AI驱动工作流。该引擎特别适用于以下场景: - AI应用开发:构建需要AI模型处理的复杂业务流程 - 自动化任务:通过AI和代码节点自动化业务流程 - 数据处理:结合AI和代码节点进行数据处理和分析 - 智能决策:通过分支节点实现基于AI输出的智能决策 - 目前已有应用:[AI基金分析助手](https://gitee.com/coderyrepos/fund_analyzer) ## 目录 - [功能特性](#功能特性) - [安装](#安装) - [快速开始](#快速开始) - [配置](#配置) - [节点类型](#节点类型) - [工作流序列化](#工作流序列化) - [MCP协议工具支持](#mcp协议工具支持) - [错误处理](#错误处理) - [扩展性](#扩展性) - [贡献](#贡献) - [许可证](#许可证) ## 功能特性 - **AI节点 (AINode)**: 与AI模型集成,支持工具调用 - **代码节点 (CodeNode)**: 执行Python函数,可访问和修改上下文变量 - **循环节点 (LoopNode)**: 支持条件循环执行 - **分支节点 (BranchNode)**: 基于条件表达式或函数的分支执行 - **子循环节点 (SubLoopNode)**: 引用和复用循环结构 - **MCP协议工具支持**: 支持通过标准MCP协议调用外部工具和服务 ## 安装 ```bash # 从PyPI安装 pip install ai-workflow-engine # 或从源码安装 git clone https://gitee.com/coderyrepos/ai-workflow-engine cd ai-workflow-engine pip install . ``` ### 配置AI模型 在api_config.json文件中配置AI模型信息,您可以参考examples/api_config_example.json文件: ```json { "qwen": { "model_name": "qwen-max", "temperature": 0.7, "max_tokens": 500, "api_key": "your_qwen_api_key_here", "endpoint": "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions", "executor": "qwen", "role_mapping": { "user": "user", "assistant": "assistant", "system": "system", "tool": "function" } }, "moonshot": { "model_name": "moonshot-v1-8k", "temperature": 0.7, "max_tokens": 500, "api_key": "your_moonshot_api_key_here", "endpoint": "https://api.moonshot.cn/v1/chat/completions", "executor": "moonshot", "role_mapping": { "user": "user", "assistant": "assistant", "system": "system", "tool": "function" } }, "custom_openai": { "model_name": "gpt-3.5-turbo", "temperature": 0.7, "max_tokens": 500, "api_key": "your_openai_api_key_here", "endpoint": "https://api.openai.com/v1/chat/completions", "executor": "./examples/custom_openai_executor.py", "role_mapping": { "user": "user", "assistant": "assistant", "system": "system", "tool": "function" } } } ``` #### 执行器配置选项 现在支持两种executor配置方式: **方式1:内置执行器(传统方式)** ```json { "qwen": { "model_name": "your_model_name", "api_key": "your_api_key", "endpoint": "your_api_endpoint", "executor": "qwen" // 这会在AIExecutors目录下查找qwen_executor.py } } ``` **方式2:自定义执行器文件路径(新增功能)** ```json { "custom_model": { "model_name": "your_model_name", "api_key": "your_api_key", "endpoint": "your_api_endpoint", "executor": "./path/to/your/custom_executor.py" // 使用绝对或相对路径 } } ``` ## 使用方法 ### 基础工作流创建 ``python from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import AINode # 创建工作流 workflow = Workflow("my_workflow", "我的工作流") # 创建AI节点 ai_node = AINode( node_id="ai_node_1", name="AI节点1", ai_model="qwen", prompt="请帮我分析当前的市场趋势", temperature=0.7, max_tokens=200 ) # 添加节点到工作流 workflow.add_node(ai_node) workflow.set_start_node(ai_node) # 执行工作流 result = workflow.execute() ``` ### 完整工作流示例 项目中包含一个完整的示例文件 `examples/complete_example.py`,展示了如何组合使用各种节点类型: ```python from ai_workflow_engine import Workflow, ApiConfig from ai_workflow_engine.nodes import AINode, CodeNode, BranchNode # 设置配置文件路径 ApiConfig.config_file_path = "examples/api_config_example.json" # 创建工作流 workflow = Workflow("complete_workflow", "完整示例工作流") # 创建AI节点 - 数据分析 analysis_node = AINode( node_id="analysis", name="数据分析", ai_model="qwen", prompt="请分析当前的市场趋势和用户行为", temperature=0.6, max_tokens=200 ) # 创建代码节点 - 数据处理 def process_data(context): """数据处理函数""" raw_data = context.get('raw_data', [1, 2, 3, 4, 5]) processed_data = [x * 2 for x in raw_data] context['processed_data'] = processed_data return processed_data process_node = CodeNode( node_id="process", name="数据处理", func=process_data ) # 创建分支节点 - 决策逻辑 def check_threshold(context): """检查阈值条件""" data = context.get('processed_data', []) avg_value = sum(data) / len(data) if data else 0 return avg_value > 5 branch_node = BranchNode( node_id="branch", name="阈值检查", condition_func=check_threshold, true_branch=["high_value_action"], false_branch=["low_value_action"] ) # 添加节点到工作流 workflow.add_node(analysis_node) workflow.add_node(process_node) workflow.add_node(branch_node) # 连接节点 analysis_node.link_to(process_node).link_to(branch_node) # 设置起始节点 workflow.set_start_node(analysis_node) # 执行工作流 context = {"raw_data": [2, 3, 4, 5, 6]} result = workflow.execute(context) ``` 要运行完整示例,请执行: ```bash python examples/complete_example.py ``` ### 序列化最佳实践示例 为了确保工作流能够正确序列化和反序列化,特别是包含CodeNode时,请参考 `examples/serialization_best_practices.py` 示例: ```python import json from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import CodeNode # 在模块级别定义函数 - 这样可以在序列化后正确恢复 def data_processor(context): """数据处理函数""" raw_data = context.get('raw_data', []) processed_data = [x * 2 for x in raw_data] context['processed_data'] = processed_data return processed_data # 创建工作流 workflow = Workflow("best_practice_wf", "序列化最佳实践工作流") # 创建代码节点(使用模块级别的函数) processor_node = CodeNode( node_id="processor", name="数据处理节点", func=data_processor ) # 添加节点到工作流 workflow.add_node(processor_node) workflow.set_start_node(processor_node) # 序列化工作流 workflow_dict = workflow.to_dict() # 保存到文件 with open("workflow.json", "w", encoding="utf-8") as f: json.dump(workflow_dict, f, ensure_ascii=False, indent=2) # 从文件加载并反序列化 with open("workflow.json", "r", encoding="utf-8") as f: loaded_workflow_dict = json.load(f) loaded_workflow = Workflow.from_dict(loaded_workflow_dict) ``` 要运行序列化最佳实践示例,请执行: ```bash python examples/serialization_best_practices.py ``` 还有一个更完整的正确序列化示例,展示了如何在独立模块中定义函数以确保正确序列化: ```bash python examples/proper_serialization_example.py ``` ### 工具调用示例 ``python from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import AINode def calculate_sum(a: int, b: int) -> int: """计算两个数的和""" return a + b def get_weather(city: str) -> str: """获取天气信息""" return f"{city}的天气是晴天" # 定义工具 tools = [ { "name": "calculate_sum", "description": "计算两个数的和", "instance": calculate_sum }, { "name": "get_weather", "description": "获取指定城市的天气信息", "instance": get_weather } ] workflow = Workflow("tool_workflow", "工具调用工作流") ai_node = AINode( node_id="tool_ai_node", name="工具AI节点", ai_model="qwen", prompt="计算5和3的和,然后查询北京的天气", tools=tools, temperature=0.7, max_tokens=200 ) workflow.add_node(ai_node) workflow.set_start_node(ai_node) result = workflow.execute() ``` ### 隐藏工具使用 隐藏工具不会直接显示在AI的提示词中,但可以通过工具搜索功能使用: ``python from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import AINode def hidden_weather_tool(city: str) -> str: """获取天气信息(隐藏工具)""" return f"{city}的天气是晴天,温度25度" workflow = Workflow("hidden_tool_workflow", "隐藏工具工作流") ai_node = AINode( node_id="hidden_tool_ai_node", name="隐藏工具AI节点", ai_model="qwen", prompt="我需要查询天气,但我不确定有哪些工具可用,请帮我查找并使用合适的工具", hidden_tools=[hidden_weather_tool], # 隐藏工具 temperature=0.7, max_tokens=300 ) workflow.add_node(ai_node) workflow.set_start_node(ai_node) result = workflow.execute() ``` ### 链式编程示例 ``python from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import AINode # 创建工作流 workflow = Workflow("chain_wf", "链式编程示例") # 创建节点 node1 = AINode( node_id="node1", name="节点1", ai_model="qwen", prompt="执行第一步操作", config_path="api_config.json", # 新增配置文件路径参数 temperature=0.7, max_tokens=50, ) node2 = AINode( node_id="node2", name="节点2", ai_model="qwen", prompt="执行第二步操作", config_path="api_config.json", # 新增配置文件路径参数 temperature=0.7, max_tokens=50, ) node3 = AINode( node_id="node3", name="节点3", ai_model="qwen", prompt="执行第三步操作", config_path="api_config.json", # 新增配置文件路径参数 temperature=0.7, max_tokens=50, ) # 添加节点到工作流 workflow.add_node(node1) workflow.add_node(node2) workflow.add_node(node3) # 使用链式编程连接节点 node1.link_to(node2).link_to(node3) workflow.set_start_node("node1") ``` ## 配置文件配置方法 ### api_config.json 配置详解 AI Workflow Engine 使用 `api_config.json` 文件来配置AI模型参数,您需要将此文件放置在项目根目录或在代码中指定路径: ```json { "qwen": { "model_name": "qwen-max", "temperature": 0.7, "max_tokens": 500, "api_key": "your_qwen_api_key_here", "endpoint": "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions", "executor": "qwen", "role_mapping": { "user": "user", "assistant": "assistant", "system": "system", "tool": "function" } }, "moonshot": { "model_name": "moonshot-v1-8k", "temperature": 0.7, "max_tokens": 500, "api_key": "your_moonshot_api_key_here", "endpoint": "https://api.moonshot.cn/v1/chat/completions", "executor": "moonshot", "role_mapping": { "user": "user", "assistant": "assistant", "system": "system", "tool": "function" } }, "custom_openai": { "model_name": "gpt-3.5-turbo", "temperature": 0.7, "max_tokens": 500, "api_key": "your_openai_api_key_here", "endpoint": "https://api.openai.com/v1/chat/completions", "executor": "./examples/custom_openai_executor.py", "role_mapping": { "user": "user", "assistant": "assistant", "system": "system", "tool": "function" } } } ``` #### 配置参数说明 - `model_name`: AI模型的名称 - `temperature`: 控制AI输出的随机性(0.0-1.0) - `max_tokens`: 最大生成token数 - `api_key`: AI模型API密钥 - `endpoint`: AI模型API端点 - `executor`: 执行器,可以是预定义的执行器名称或自定义执行器文件路径 - `role_mapping`: 角色映射,用于不同AI模型的角色标识 #### 创建配置文件 您可以创建一个名为 `api_config.json` 的文件,放置在项目根目录下,或通过代码指定配置文件路径: ```python from ai_workflow_engine import ApiConfig # 设置全局配置文件路径 ApiConfig.config_file_path = "path/to/your/api_config.json" ``` ### 环境变量配置 您也可以通过环境变量来配置API密钥: ```bash export QWEN_API_KEY="your_api_key_here" ``` 然后在配置文件中引用环境变量: ```json { "qwen": { "model_name": "your_model_name", "api_key": "${QWEN_API_KEY}", "endpoint": "your_api_endpoint", "executor": "qwen" } } ``` ## 工作流的进阶用法 ### 复杂工作流结构 ``python from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import AINode, CodeNode, BranchNode # 创建复杂工作流 workflow = Workflow("complex_workflow", "复杂工作流示例") # AI节点 - 数据分析 analysis_node = AINode( node_id="analysis", name="数据分析", ai_model="qwen", prompt="分析以下数据并提供结论", temperature=0.6 ) # 代码节点 - 数据处理 def process_data(context): """数据处理函数""" raw_data = context.get('raw_data', []) processed_data = [x * 2 for x in raw_data] context['processed_data'] = processed_data return processed_data process_node = CodeNode( node_id="process", name="数据处理", func=process_data ) # 分支节点 - 决策逻辑 def check_threshold(context): """检查阈值条件""" data = context.get('processed_data', []) avg_value = sum(data) / len(data) if data else 0 return avg_value > 10 branch_node = BranchNode( node_id="branch", name="阈值检查", condition_func=check_threshold, true_branch=["high_value_action"], false_branch=["low_value_action"] ) # 高值处理节点 high_value_node = AINode( node_id="high_value_action", name="高值处理", ai_model="qwen", prompt="当前值较高,建议采取相应措施" ) # 低值处理节点 low_value_node = AINode( node_id="low_value_action", name="低值处理", ai_model="qwen", prompt="当前值较低,建议采取相应措施" ) # 添加节点到工作流 workflow.add_node(analysis_node) workflow.add_node(process_node) workflow.add_node(branch_node) workflow.add_node(high_value_node) workflow.add_node(low_value_node) # 连接节点 analysis_node.link_to(process_node).link_to(branch_node) branch_node.link_to(high_value_node, condition=True) branch_node.link_to(low_value_node, condition=False) workflow.set_start_node(analysis_node) ``` ### 上下文变量管理 工作流中的节点可以通过上下文变量共享数据: ``python from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import AINode, CodeNode workflow = Workflow("context_workflow", "上下文变量工作流") # 第一个节点 - 设置初始值 init_node = AINode( node_id="init", name="初始化", ai_model="qwen", prompt="请生成一个随机数字", response_key="initial_number" ) # 代码节点 - 处理数据 def multiply_by_two(context): """将初始数字乘以2""" initial = context.get('initial_number', 0) result = initial * 2 context['doubled_number'] = result return result multiply_node = CodeNode( node_id="multiply", name="乘法运算", func=multiply_by_two ) # AI节点 - 使用处理后的数据 result_node = AINode( node_id="result", name="结果分析", ai_model="qwen", prompt="分析数字 {{doubled_number}} 的特征和意义", temperature=0.7 ) workflow.add_node(init_node) workflow.add_node(multiply_node) workflow.add_node(result_node) init_node.link_to(multiply_node).link_to(result_node) workflow.set_start_node(init_node) ``` ### 工具搜索功能 AI Workflow Engine 提供了工具搜索功能,使AI能够动态查找可用的工具: ``python from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import AINode def weather_tool(city: str) -> str: """天气查询工具""" return f"{city}的天气是晴天" def database_query(sql: str) -> str: """数据库查询工具""" return f"查询结果:{sql}" def email_sender(recipient: str, subject: str, content: str) -> str: """邮件发送工具""" return f"邮件已发送给{recipient}" # 将一些工具设置为隐藏,AI可以通过搜索功能发现它们 workflow = Workflow("search_workflow", "工具搜索工作流") ai_node = AINode( node_id="search_ai", name="工具搜索AI", ai_model="qwen", prompt="我需要查询天气和发送邮件,但我不确定有哪些工具可用,请帮我找到合适的工具并使用它们", hidden_tools=[weather_tool, database_query, email_sender], temperature=0.7, max_tokens=400 ) workflow.add_node(ai_node) workflow.set_start_node(ai_node) ``` ## 扩展方法 ### 自定义执行器 您可以创建自定义执行器来支持不同的AI模型: 1. 在项目根目录或指定路径创建执行器文件,例如 `custom_executor.py`: ```python """ 自定义AI执行器示例 """ import requests import json def execute(model_config, prompt, temperature, max_tokens, context, messages): """ 执行AI调用的函数 Args: model_config: 模型配置字典 prompt: 提示词 temperature: 温度参数 max_tokens: 最大token数 context: 上下文变量 messages: 对话历史 Returns: AI模型的响应结果 """ # 实现您的AI调用逻辑 api_key = model_config.get("api_key") endpoint = model_config.get("endpoint") model_name = model_config.get("model_name") headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": model_name, "messages": messages, "temperature": temperature, "max_tokens": max_tokens } response = requests.post(endpoint, headers=headers, json=payload) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"] ``` 2. 在配置文件中引用自定义执行器: ```json { "custom_model": { "model_name": "your_custom_model", "api_key": "your_api_key", "endpoint": "your_endpoint", "executor": "./custom_executor.py" } } ``` ### 自定义节点类型 您可以创建自定义节点类型来扩展工作流功能: ```python from ai_workflow_engine.base_node import BaseNode from ai_workflow_engine.enums import NodeType, ExecutionStatus from ai_workflow_engine.models import NodeResult import time from typing import Any, Dict class CustomNode(BaseNode): """自定义节点示例""" def __init__(self, node_id: str, name: str, custom_param: str, max_execution_count: int = 20): super().__init__(node_id, name, NodeType.CUSTOM, max_execution_count) self.custom_param = custom_param def execute(self, context: Dict[str, Any]) -> NodeResult: """执行自定义节点逻辑""" try: # 实现您的自定义逻辑 result = f"Custom node executed with param: {self.custom_param}" # 将结果存储到上下文中 context[f"{self.node_id}_result"] = result return NodeResult( node_id=self.node_id, result=result, status=ExecutionStatus.COMPLETED, timestamp=time.time() ) except Exception as e: return NodeResult( node_id=self.node_id, result=None, status=ExecutionStatus.FAILED, timestamp=time.time(), error=str(e) ) def to_dict(self) -> Dict[str, Any]: data = super().to_dict() data.update({ "custom_param": self.custom_param }) return data @classmethod def from_dict(cls, data: Dict[str, Any]) -> "CustomNode": node = cls( node_id=data["node_id"], name=data["name"], custom_param=data["custom_param"] ) # 恢复基类属性 node.inputs = data.get("inputs", {}) node.outputs = data.get("outputs", {}) node.next_nodes = data.get("next_nodes", []) return node ``` ### 工作流序列化与反序列化 工作流可以被序列化保存并在以后重新加载: ```python # 保存工作流 workflow.save("my_workflow.json") # 加载工作流 loaded_workflow = Workflow.load("my_workflow.json") # 执行加载的工作流 context = {"input_data": "some data"} result = loaded_workflow.execute(context) ``` ## MCP协议工具支持 AI Workflow Engine 现在支持MCP(Model Context Protocol)协议工具,允许AI节点通过标准协议调用外部服务。 ### MCP协议工具格式 ```python { "name": "工具名称", "description": "工具描述", "mcp_endpoint": "http://mcp-service:8080/api/v1/tools", # MCP服务端点 "mcp_headers": { # 可选:HTTP请求头 "Authorization": "Bearer token" }, "mcp_auth": ("username", "password") # 可选:HTTP基本认证 } ``` ### MCP工具示例 ```python # 定义MCP协议工具 mcp_tools = [ { "name": "weather_service", "description": "获取指定城市的天气信息", "mcp_endpoint": "http://weather-api:8080/v1/weather", "mcp_headers": { "Authorization": "Bearer YOUR_WEATHER_API_TOKEN", "Content-Type": "application/json" } }, { "name": "database_query", "description": "查询数据库的MCP工具", "mcp_endpoint": "http://db-api:8080/v1/query" } ] # 创建使用MCP工具的AI节点 ai_node = AINode( node_id="mcp_ai_node", name="MCP AI节点", ai_model="qwen", prompt="请根据用户请求,选择合适的工具来获取信息。用户请求:获取北京的天气信息,并查询相关数据。", tools=mcp_tools, temperature=0.7, max_tokens=300 ) ``` ### 混合工具使用 您还可以同时使用本地工具和MCP协议工具: ```python # 定义本地工具函数 def local_data_processor(data: str) -> str: """本地数据处理工具""" processed = f"Processed: {data.upper()}" return processed # 定义工具列表,混合本地工具和MCP协议工具 mixed_tools = [ # 本地工具 { "name": "local_data_processor", "description": "本地数据处理工具,将输入文本转换为大写", "instance": local_data_processor }, # MCP协议工具 { "name": "remote_weather", "description": "获取天气信息的远程服务", "mcp_endpoint": "http://weather-service:8080/api/v1/get_weather", "mcp_headers": { "Authorization": "Bearer token", "Content-Type": "application/json" } } ] # 创建使用混合工具的AINode ai_node = AINode( node_id="mixed_tools_node", name="混合工具节点", ai_model="qwen", prompt="请使用适当的工具来完成任务:1. 使用本地工具处理用户输入的数据 2. 使用远程服务获取天气信息", tools=mixed_tools, temperature=0.7, max_tokens=400 ) ``` 更多关于MCP协议工具的详细信息,请参见 [MCP_TOOLS.md](MCP_TOOLS.md)。 ## 工作流执行 工作流可以序列化和反序列化: ```python # 序列化工作流为字典 workflow_dict = workflow.to_dict() # 从字典反序列化工作流 loaded_workflow = Workflow.from_dict(workflow_dict) # 执行工作流 context = {"initial_value": 42} result = loaded_workflow.execute(context) ``` ### 保存和加载序列化数据 您可以将序列化的工作流数据保存到文件中: ```python import json # 保存序列化数据到文件 workflow_dict = workflow.to_dict() with open("workflow.json", "w", encoding="utf-8") as f: json.dump(workflow_dict, f, ensure_ascii=False, indent=2) # 从文件加载序列化数据 with open("workflow.json", "r", encoding="utf-8") as f: workflow_dict = json.load(f) loaded_workflow = Workflow.from_dict(workflow_dict) ``` ### 工作流状态管理 AI Workflow Engine 还支持工作流执行状态的保存和恢复,用于断点续跑: ```python # 保存执行状态(用于断点续跑) workflow.save_execution_state("execution_state.json") # 从文件加载执行状态 workflow.load_execution_state("execution_state.json") # 恢复执行 result = workflow.resume() ``` ### 序列化注意事项 **重要提示**:在序列化和反序列化过程中需要注意以下几点: 1. **函数对象**:CodeNode中的函数对象在反序列化时需要从原始模块中重新导入。如果函数定义在本地作用域(如main函数内),则无法正确恢复。建议将函数定义在模块级别,并且最好定义在独立的模块中(而不是`__main__`模块): ```python # 推荐:在独立模块中定义函数 # 文件: my_tools.py def my_function(context): context['result'] = "函数执行成功" return "完成" # 在工作流定义文件中导入 # 文件: workflow.py from my_tools import my_function processor_node = CodeNode( node_id="processor", name="处理器", func=my_function # 这样函数的模块名将被正确记录 ) # 避免:在局部作用域或__main__模块中定义函数 if __name__ == "__main__": def local_function(context): # 这样的函数在反序列化时无法恢复 pass ``` 2. **工具函数**:AINode中的工具函数同样需要能够从模块中重新导入。 3. **自定义对象**:复杂的自定义对象可能无法正确序列化,建议使用基本数据类型。 4. **MCP工具**:MCP协议工具通常可以正确序列化,因为它们只包含配置信息而非函数对象。 5. **独立工具模块**:为了确保函数能够正确序列化和反序列化,建议创建独立的工具模块: ```python # 文件: my_tools.py def my_data_processor(context): """数据处理函数""" raw_data = context.get('raw_data', []) processed_data = [x * 2 for x in raw_data] context['processed_data'] = processed_data return processed_data def my_result_analyzer(context): """结果分析函数""" processed = context.get('processed_data', []) avg_value = sum(processed) / len(processed) if processed else 0 context['average'] = avg_value return f"平均值: {avg_value}" ``` 然后在工作流定义中导入这些函数: ```python # 文件: workflow_definition.py from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import CodeNode from my_tools import my_data_processor, my_result_analyzer workflow = Workflow("my_workflow", "我的工作流") processor_node = CodeNode( node_id="processor", name="数据处理器", func=my_data_processor # 来自独立模块,模块名为'my_tools' ) analyzer_node = CodeNode( node_id="analyzer", name="结果分析器", func=my_result_analyzer # 来自独立模块,模块名为'my_tools' ) workflow.add_node(processor_node) workflow.add_node(analyzer_node) processor_node.link_to(analyzer_node) workflow.set_start_node(processor_node) ``` ### 打印工作流结构 您可以使用 [print_workflow()](./ai_workflow_engine/workflow.py#L233-L240) 方法来查看工作流的结构: ```python # 打印工作流结构 workflow.print_workflow() ``` ## 快速开始 ### 基本使用 ```python from ai_workflow_engine import Workflow, ApiConfig from ai_workflow_engine.nodes import AINode # 设置配置文件路径 ApiConfig.config_file_path = "api_config.json" # 创建工作流 workflow = Workflow("my_workflow", "我的第一个工作流") # 创建AI节点 ai_node = AINode( node_id="ai_node_1", name="AI节点", ai_model="qwen", prompt="请帮我总结人工智能的发展历史", temperature=0.7, max_tokens=200 ) # 添加节点到工作流并设置起始节点 workflow.add_node(ai_node) workflow.set_start_node(ai_node) # 执行工作流 result = workflow.execute() print(result) ``` ## 节点类型 ### AI节点 (AINode) AI节点用于与AI模型交互,支持工具调用、上下文变量管理和条件分支等功能。 ### 代码节点 (CodeNode) 代码节点用于执行Python函数,可访问和修改工作流上下文变量。 ### 分支节点 (BranchNode) 分支节点根据条件表达式或函数结果决定工作流的执行路径。 ### 循环节点 (LoopNode) 循环节点支持基于条件的重复执行,适用于需要迭代处理的场景。 ### 子循环节点 (SubLoopNode) 子循环节点用于引用和复用已定义的循环结构,提高工作流的可维护性。 ## JSON工作流自动生成功能 AI Workflow Engine 提供了强大的JSON工作流自动生成功能,可以根据工具列表和用户需求自动生成完整的工作流定义。这个功能特别适用于快速原型开发和动态工作流创建。 ### 功能特点 - **智能工作流生成**:根据用户需求描述和可用工具自动生成工作流 - **多节点支持**:自动生成AI节点、分支节点、代码节点等 - **条件分支**:根据需求自动生成条件分支逻辑 - **工具集成**:自动将可用工具集成到工作流节点中 ### 使用方法 ```python from ai_workflow_engine.workflow_generator import JSONWorkflowGenerator # 定义可用工具 def sample_tool_add(a: int, b: int) -> int: """计算两个数的和""" return a + b def sample_tool_multiply(x: int, y: int) -> int: """计算两个数的乘积""" return x * y def sample_tool_get_weather(city: str) -> str: """获取指定城市的天气信息(模拟)""" weather_data = { "北京": "晴天,温度15°C", "上海": "多云,温度18°C", "广州": "雨天,温度22°C", "深圳": "晴天,温度24°C", } return weather_data.get(city, f"未找到{city}的天气信息") # 定义可用工具列表 available_tools = [sample_tool_get_weather, sample_tool_multiply, sample_tool_add] # 用户需求描述 user_requirements = """ 创建一个工作流,用于分析天气数据并执行数学计算。 工作流应该: 1. 首先获取指定城市的天气信息 2. 根据天气信息执行相应的数学计算 3. 如果天气是晴天,执行加法运算 4. 如果天气是多云,执行乘法运算 5. 最后输出结果 """ # 生成工作流 generated_workflow = JSONWorkflowGenerator.generate_workflow_from_requirements( tools=available_tools, user_requirements=user_requirements, ) print("生成的工作流JSON:") print(generated_workflow) ``` ### 生成的工作流示例 生成的工作流JSON结构如下: ```json { "workflow_id": "weather_math_workflow", "name": "天气数据分析与数学计算工作流", "start_node_id": "get_weather_node", "init_context": { "city": "北京", "a": 5, "b": 3, "x": 4, "y": 6 }, "nodes": { "get_weather_node": { "node_id": "get_weather_node", "name": "获取天气信息", "node_type": "ai", "prompt": "请获取{city}的天气信息", "tools": [ { "name": "sample_tool_get_weather", "description": "获取指定城市的天气信息", "module": "__main__", "function_name": "sample_tool_get_weather" } ], "context_key": "weather_result", "next_nodes": ["weather_branch_node"] }, "weather_branch_node": { "node_id": "weather_branch_node", "name": "天气条件分支", "node_type": "branch", "condition_expr": "context.get('weather_result', {}).get('weather', '') == '晴天'", "true_branch": ["add_node"], "false_branch": ["multiply_node"], "next_nodes": [] }, "add_node": { "node_id": "add_node", "name": "加法计算", "node_type": "ai", "prompt": "请计算{a}和{b}的和", "tools": [ { "name": "sample_tool_add", "description": "计算两个数的和", "module": "__main__", "function_name": "sample_tool_add" } ], "context_key": "math_result", "next_nodes": ["output_node"] }, "multiply_node": { "node_id": "multiply_node", "name": "乘法计算", "node_type": "ai", "prompt": "请计算{x}和{y}的乘积", "tools": [ { "name": "sample_tool_multiply", "description": "计算两个数的乘积", "module": "__main__", "function_name": "sample_tool_multiply" } ], "context_key": "math_result", "next_nodes": ["output_node"] }, "output_node": { "node_id": "output_node", "name": "输出结果", "node_type": "code", "func": { "code": "print(f'最终计算结果: {context[\"math_result\"]}')", "type": "code_string" }, "next_nodes": [] } } } ``` ### 使用生成的工作流 生成的工作流可以直接用于创建和执行工作流: ```python from ai_workflow_engine.workflow import Workflow # 从生成的JSON创建工作流 workflow = Workflow.from_dict(generated_workflow) # 执行工作流 result = workflow.execute() final_result = result.result() print(f"执行状态: {final_result['status']}") print(f"执行顺序: {final_result['execution_order']}") ``` ### 示例运行 项目中包含一个完整的JSON工作流生成示例,您可以运行以下命令查看效果: ```bash python -m examples.json_workflow_demo ``` ## 工作流序列化 工作流支持完整的序列化和反序列化功能: - **保存工作流**:将工作流定义保存为JSON文件 - **加载工作流**:从JSON文件重新构建工作流 - **状态管理**:支持执行状态的保存和恢复 ### 工作流状态管理 AI Workflow Engine 支持工作流执行状态的保存和恢复,用于断点续跑: ```python # 保存执行状态(用于断点续跑) workflow.save_execution_state("execution_state.json") # 从文件加载执行状态 workflow.load_execution_state("execution_state.json") # 恢复执行 result = workflow.resume() ``` ### 序列化注意事项 **重要提示**:在序列化和反序列化过程中需要注意以下几点: 1. **函数对象**:CodeNode中的函数对象在反序列化时需要从原始模块中重新导入。如果函数定义在本地作用域(如main函数内),则无法正确恢复。建议将函数定义在模块级别,并且最好定义在独立的模块中(而不是`__main__`模块): ```python # 推荐:在独立模块中定义函数 # 文件: my_tools.py def my_function(context): context['result'] = "函数执行成功" return "完成" # 在工作流定义文件中导入 # 文件: workflow.py from my_tools import my_function processor_node = CodeNode( node_id="processor", name="处理器", func=my_function # 这样函数的模块名将被正确记录 ) # 避免:在局部作用域或__main__模块中定义函数 if __name__ == "__main__": def local_function(context): # 这样的函数在反序列化时无法恢复 pass ``` 2. **工具函数**:AINode中的工具函数同样需要能够从模块中重新导入。 3. **自定义对象**:复杂的自定义对象可能无法正确序列化,建议使用基本数据类型。 4. **MCP工具**:MCP协议工具通常可以正确序列化,因为它们只包含配置信息而非函数对象。 5. **独立工具模块**:为了确保函数能够正确序列化和反序列化,建议创建独立的工具模块: ```python # 文件: my_tools.py def my_data_processor(context): """数据处理函数""" raw_data = context.get('raw_data', []) processed_data = [x * 2 for x in raw_data] context['processed_data'] = processed_data return processed_data def my_result_analyzer(context): """结果分析函数""" processed = context.get('processed_data', []) avg_value = sum(processed) / len(processed) if processed else 0 context['average'] = avg_value return f"平均值: {avg_value}" ``` 然后在工作流定义中导入这些函数: ```python # 文件: workflow_definition.py from ai_workflow_engine import Workflow from ai_workflow_engine.nodes import CodeNode from my_tools import my_data_processor, my_result_analyzer workflow = Workflow("my_workflow", "我的工作流") processor_node = CodeNode( node_id="processor", name="数据处理器", func=my_data_processor # 来自独立模块,模块名为'my_tools' ) analyzer_node = CodeNode( node_id="analyzer", name="结果分析器", func=my_result_analyzer # 来自独立模块,模块名为'my_tools' ) workflow.add_node(processor_node) workflow.add_node(analyzer_node) processor_node.link_to(analyzer_node) workflow.set_start_node(processor_node) ``` ## 错误处理 引擎提供完整的错误处理机制: - 节点执行错误 - 工作流执行错误 - 上下文变量错误 - AI模型调用错误 ### 自定义日志输出 您还可以自定义日志输出方式: ```python from ai_workflow_engine import ApiConfig def custom_log_function(level, message): """自定义日志函数""" print(f"[{level}] {message}") # 这里可以添加写入文件、发送到日志服务等逻辑 # 设置自定义日志函数 ApiConfig.log_function = custom_log_function ``` ## 扩展性 引擎设计具有良好的扩展性: - 支持自定义节点类型 - 支持多种AI模型 - 支持自定义工具 - 支持MCP协议工具 ## 贡献 欢迎提交Issue和Pull Request来改进项目。 ## 许可证 该项目采用MIT许可证 - 详见 [LICENSE](LICENSE) 文件。