# AgentArtery **Repository Path**: shadowyuan/agent-artery ## Basic Information - **Project Name**: AgentArtery - **Description**: 为 AI Agent 提供微秒级流式消息路由的嵌入式 C++ 中间件。库优先,可选独立服务。 Embedded C++ messaging middleware for microsecond streaming routing in AI Agents. Library-first, with optional standalone server. - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-04-30 - **Last Updated**: 2026-05-02 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # AgentArtery **高性能Agent消息总线 — 为AI Agent提供微秒级的流式消息中枢** > 状态标识:✅ 已实现 | 🔧 开发中 | 📋 计划中 AgentArtery 是一个用 C++ 构建的高性能消息总线,专为 AI Agent 的复杂通信模式设计。它解决了当前 Python Agent 框架在高并发场景下的核心瓶颈:多个工具调用、流式 LLM 响应和状态变更事件的低延迟路由与合并。 ## 为什么需要 AgentArtery 当 Agent 同时调用搜索引擎、代码解释器和数据库时,每个工具以不同的速率返回流式结果。现有框架用单进程事件循环处理这一切——在原型阶段可行,但面对 10 万并发 Agent 会话时会崩溃。 AgentArtery 让这些问题消失。它提供了一个有背压控制和有序投递保证的流式多路复用层,并规划了零拷贝传输路径。 ## 典型场景 ### 场景一:多工具并发调用与结果合并 Agent 同时调用搜索引擎、代码解释器和数据库,各工具以不同速率返回流式结果。StreamMultiplexer 将多个输出流按优先级归并为统一事件序列,背压控制器防止慢工具拖垮快工具。 ``` Agent 调用: search_web(query) → 搜索结果流(快,~50ms) execute_code(code) → 执行输出流(慢,分多次,~2s) query_database(sql) → 查询结果流(中等,~200ms) StreamMultiplexer (PRIORITY_FIRST) → 统一事件序列回传给 LLM → 背压控制:慢工具自动降级,不阻塞快工具 ``` ### 场景二:跨进程/跨机器 Agent 协作 多个 Agent 进程(或分布在不同机器上的 Agent)通过服务端模式共享消息总线。主 Agent 发布任务,工作 Agent 订阅并执行,结果通过 Trie 路由自动分发。 ``` 主 Agent (TCP) ──> AgentArtery Server ──> 工作 Agent A (SHM, 同机) │ ──> 工作 Agent B (TCP, 远程) │ ──> 工作 Agent C (IPC, 同机) ▼ WAL 持久化(关键消息不丢失) Prometheus 监控(实时观测吞吐与延迟) ``` ### 场景三:流式 LLM Token 实时分发 LLM 推理服务产生高频 Token 流(~1000 token/s),需要同时分发给多个订阅者:前端 UI 实时展示、日志系统记录、评估系统打分。Trie 路由实现一对多高效分发。 ``` LLM 推理服务 ──publish──> /agent/{id}/llm/stream │ TrieRouter 匹配通配符订阅 │ ┌─────────┼─────────┐ ▼ ▼ ▼ 前端 UI 日志系统 评估系统 (实时渲染) (持久化) (质量打分) ``` ## 核心特性 - ✅ **主题发布/订阅**:基于 Trie 树的高效路由引擎,支持通配符 `*` 和精确匹配 - ✅ **流式多路复用**:合并多个工具输出流为统一的事件序列,支持优先级归并(ROUND_ROBIN / PRIORITY_FIRST / LATEST_FIRST) - ✅ **多层背压控制**:水位线自适应降级策略(ACCEPT → SLOW → BLOCK),带滞后恢复 - ✅ **有序投递保证**:会话级单调递增序列号 - ✅ **进程内传输**:SPSC 有界环形缓冲区,阻塞/非阻塞两种消费模式 - ✅ **Python 绑定**:pybind11 封装,阻塞调用自动释放 GIL - ✅ **共享内存传输**:POSIX shm + MPSC 环形缓冲,eventfd 通知,同机超低延迟 - ✅ **独立服务端模式**:支持 IPC / TCP / SHM 三种传输,epoll 多 Reactor、会话管理、心跳检测与 Prometheus 指标导出 - ✅ **TCP + TLS**:跨机器通信,可选 mbedTLS 加密 - ✅ **WAL 持久化**:CRC32 校验、优先级过滤、文件轮转、启动回放 - ✅ **优雅关闭**:SHUTDOWN 消息通知、drain timeout、客户端自动重连 - ✅ **Docker 部署**:多阶段构建,docker-compose + Prometheus 监控 ## 快速开始 ### 构建库(✅ 已实现) ```bash mkdir build && cd build cmake .. -DCMAKE_BUILD_TYPE=Release -DAGENT_ARTERY_BUILD_TESTS=ON -DAGENT_ARTERY_BUILD_PYTHON=ON make -j$(nproc) # 运行测试 ctest --output-on-failure ``` ### 独立服务端(✅ 已实现) ```bash # 构建服务端(需启用 AGENT_ARTERY_BUILD_SERVER) mkdir build && cd build cmake .. -DCMAKE_BUILD_TYPE=Release -DAGENT_ARTERY_BUILD_SERVER=ON make -j$(nproc) # IPC 模式(Unix Socket) ./agent_artery_server --transport ipc --socket /tmp/agent_artery.sock --workers 4 # TCP 模式 ./agent_artery_server --transport tcp --listen 0.0.0.0:9090 --workers 4 # 共享内存模式(同机超低延迟) ./agent_artery_server --transport shm --workers 4 # Docker 部署 docker-compose up -d # 查看所有选项 ./agent_artery_server --help ``` 服务端启动参数: | 参数 | 说明 | 默认值 | |------|------|--------| | `-t, --transport ` | 传输类型:`ipc`、`tcp`、`shm` | `ipc` | | `-s, --socket ` | Unix Socket 路径(IPC 模式) | `/tmp/agent_artery.sock` | | `-l, --listen ` | TCP 监听地址 | `0.0.0.0:9090` | | `-w, --workers ` | Worker 线程数 | CPU 核心数 | | `-b, --buffer ` | 每端点缓冲区大小 | 4096 | | `-m, --max-sessions ` | 最大并发会话数 | 10000 | | `--metrics-port ` | Prometheus HTTP 端口(0=禁用) | 9090 | | `--heartbeat-timeout ` | 心跳超时(毫秒) | 15000 | #### 架构 ``` Client (Agent A) ──┐ ├──> Transport (IPC / TCP / SHM) Client (Agent B) ──┘ │ ▼ MultiReactorPool (epoll) │ │ │ ▼ ▼ ▼ Worker Worker Worker │ │ │ └───┬───┘ │ ▼ │ TrieRouter │ ▼ │ SessionManager │ ▼ │ WalManager (WAL 持久化) ▼ │ MetricsCollector (Prometheus) ``` #### 传输方式对比 | 特性 | InProcess | IPC | TCP | SHM | |------|-----------|-----|-----|-----| | 跨进程 | 否 | 是 | 是 | 是 | | 跨机器 | 否 | 否 | 是 | 否 | | 延迟 | ~1μs | ~10μs | ~50μs | ~2μs | | TLS | 否 | 否 | 是 (mbedTLS) | 否 | #### C++ 客户端 ```cpp #include "agent_artery/ipc_client.h" namespace aa = agent_artery; // 连接到服务端 aa::IpcClient client; aa::SessionConfig cfg{"agent_001", 4096}; auto result = client.connect("/tmp/agent_artery.sock", cfg); if (!result.ok) { // 处理连接失败 } // 获取会话,使用方式与进程内模式完全一致 aa::Session* session = client.session(); // 发布消息 aa::Publisher pub(session, aa::Topic("/agent/tool/search/stream")); aa::Message msg(aa::MessageType::DATA, "/agent/tool/search/stream", "{\"result\": 42}"); pub.publish(msg); // 订阅消息 aa::Subscriber sub(session, aa::TopicPattern("/agent/tool/*/stream")); aa::Message received; if (sub.poll(received, std::chrono::milliseconds(100))) { // 处理收到的消息 } // 断开连接(自动清理心跳线程) client.disconnect(); ``` #### Python 客户端 ```python import _agent_artery as aa # 连接到服务端 client = aa.IpcClient() config = aa.SessionConfig(agent_id="agent_001", buffer_size=4096) result = client.connect("/tmp/agent_artery.sock", config) if not result: raise RuntimeError(f"连接失败: {result.error}") # 获取会话 session = client.session() # 发布消息 pub = session.publisher("/agent/tool/search/stream") msg = aa.Message(aa.MessageType.DATA, "/agent/tool/search/stream", '{"result": 42}') pub.publish(msg) # 订阅消息 sub = session.subscriber("/agent/tool/*/stream") received = sub.poll(0.1) # timeout in seconds if received: print(f"收到: {received.payload()}") # 断开连接 client.disconnect() ``` ## 当前实现(✅ MVP) ### C++ API ```cpp #include "agent_artery/agent_artery.h" namespace aa = agent_artery; // 创建总线和会话 aa::AgentArtery bus; aa::SessionConfig cfg{"agent_001", 4096}; auto* session = bus.create_session(cfg); // 发布者:工具侧 aa::Publisher pub(session, aa::Topic("/agent/tool/search/stream")); aa::Message msg(aa::MessageType::DATA, "/agent/tool/search/stream", "{\"result\": 42}"); pub.publish(msg); // 订阅者:Agent 大脑侧 aa::Subscriber sub(session, aa::TopicPattern("/agent/tool/*/stream")); aa::Message received; if (sub.poll(received, std::chrono::milliseconds(100))) { // 处理收到的消息 } // 多路复用:合并多个工具流 aa::StreamMultiplexer mux(aa::MergePolicy::PRIORITY_FIRST); mux.add_stream(1, 10); // 高优先级 mux.add_stream(2, 5); // 低优先级 mux.push(1, {"search chunk", false}); mux.push(2, {"code chunk", false}); auto result = mux.poll(std::chrono::milliseconds(50)); // 背压控制 aa::BackpressureController bp({100, 50}); // 高水位 100,低水位 50 auto action = bp.evaluate(current_queue_size, aa::Watermark{100, 50}); // action: ACCEPT / SLOW / BLOCK ``` ### Python API ```python import _agent_artery as aa bus = aa.AgentArtery() session = bus.create_session(aa.SessionConfig(agent_id="agent_001")) pub = aa.Publisher(session, "/agent/tool/search/stream") msg = aa.Message(aa.MessageType.DATA, "/agent/tool/search/stream", '{"result": 42}') pub.publish(msg) sub = aa.Subscriber(session, "/agent/tool/*/stream") received = sub.poll(100) # timeout_ms, returns Message or None mux = aa.StreamMultiplexer(aa.MergePolicy.PRIORITY_FIRST) mux.add_stream(1, 10) mux.push(1, aa.Chunk("data", False)) result = mux.poll(50) # timeout_ms ``` ## 概念验证:合并三个工具流 ``` 订阅者:Agent 大脑 │ │ 订阅主题 /agent/abc123/tool/*/stream ▼ AgentArtery ── 合并三个工具的输出为单一有序流 ──> Agent 大脑 ▲ │ 发布消息到各自主题 │ 搜索工具 ──> /agent/abc123/tool/search/stream 代码工具 ──> /agent/abc123/tool/code_exec/stream 数据库 ────> /agent/abc123/tool/db_query/stream ``` ## 性能 ### ✅ 已实现(进程内嵌入模式) | 指标 | 实测值 | |------|--------| | 路由吞吐量 | ~450,000 msg/s | | 单次路由延迟 | ~2,200 ns/op | ### 🔧 服务端模式(已实现,待压测) 服务端通过 Unix Socket + epoll 多 Reactor 架构实现,支持以下能力: - **并发会话**:默认上限 10,000(可配置) - **心跳检测**:自动清理超时客户端(默认 15 秒超时) - **指标导出**:Prometheus 格式,HTTP 端口 9090,含消息计数、延迟直方图、活跃会话/连接数 - **Worker 池**:基于 epoll 的 MultiReactorPool,支持 CPU 亲和性绑定 ### 📋 长期目标 | 指标 | 目标值 | |------|--------| | 并发会话 | 100,000+ | | 端到端延迟 (P99) | < 5ms | | 单节点吞吐量 | 1,000,000+ msg/s | ## 技术栈 C++17, CMake, Google Test, pybind11, SPSC RingBuffer, Trie Router, epoll Reactor, Unix Socket / TCP / POSIX Shared Memory, mbedTLS (可选), WAL (CRC32), Prometheus Metrics, Docker ## 协议 Apache License 2.0 — 自由商用,保留署名,专利保护。