# agent-message-bus **Repository Path**: YyUHua/agent-message-bus ## Basic Information - **Project Name**: agent-message-bus - **Description**: 轻量级 SQLite 驱动的 AI 多智能体消息总线,让Agent之间零延迟、零依赖、可靠地对话。 - **Primary Language**: Python - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-05-08 - **Last Updated**: 2026-05-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README Agent Message Bus 轻量级 SQLite 驱动的 AI 多智能体消息总线 —— 让 AI 助手之间零延迟、零依赖地对话。 A lightweight, SQLite-backed message bus for Agent-to-Agent (A2A) communication. --- 为什么需要它? AI 助手之间需要互相通信。常见方式都有问题: - 文件队列:竞态条件,没有原子操作 - HTTP 桥接:端口冲突、线程阻塞、超时地狱 - 群聊同步:10秒延迟、消息丢失、刷屏 Agent Message Bus 用一个 SQLite 数据库 + HTTP API 解决: - 原子性消息分派(BEGIN IMMEDIATE 事务) - 自动重试 + 可配置超时 - 死信队列(失败消息不丢失) - 心跳健康监控 - 优雅启停 / 恢复 - 零外部依赖(stdlib + pyyaml) - 不到 600 行代码 --- 架构 ``` Agent A POST /send Agent Msg Bus GET /poll Agent B Adapter ────────────────→ Core ←──────────────── Adapter (8641) ←──────────────── (8648) ──────────────────→ (8647) ACK + reply │ POST /ack_pending │ Heartbeats ▼ agents 表 (健康监控) ``` --- 快速开始 ```bash 1. 安装依赖 pip install pyyaml 2. 启动总线 python3 bus_core.py [Agent Msg Bus] listening on 0.0.0.0:8648 3. 启动 Agent 适配器(另开终端) 先修改 adapter_template.py 里的 ADAPTER_CONFIG python3 adapter_template.py [14:00:00] [my_agentAdapter] All threads started 4. 发送测试消息 curl -X POST http://127.0.0.1:8648/send \ -H 'Content-Type: application/json' \ -d '{"from":"user","to":"my_agent","content":"你好!"}' ``` --- API 文档 `GET /health` 总线健康检查。返回 `{"ok": true, "system_state": "idle", ...}` `GET /status` 完整状态:各状态消息数量、已注册 Agent、死信数量。 `GET /poll?agent=NAME&limit=N` 原子领取待处理消息。返回状态为 DISPATCHED 的消息。 `POST /send` 发送消息。请求体: ```json { "from": "agent_a", "to": "agent_b", "content": "Hello!", "type": "chat", "priority": 5, "client_msg_id": "optional-dedup-key" } ``` `POST /ack_pending` 认领消息开始处理。请求体:`{"message_id": "...", "agent": "..."}` `POST /ack` 确认消息处理完成。请求体: ```json { "message_id": "...", "agent": "...", "reply": "处理成功", "processing_mode": "ai" } ``` `POST /heartbeat` 注册/更新 Agent 心跳。适配器自动调用。 `POST /control` 系统指令:`stop_all` / `resume`。请求体:`{"command": "stop_all", "requested_by": "admin"}` `GET /dead` 查看死信队列(永久失败的消息)。 `POST /dead/{id}/retry` 将死信重新加入处理队列。 --- 消息生命周期 ``` QUEUED -> (poll) -> DISPATCHED -> (ack_pending) -> ACK_PENDING -> (ack) -> ACKED | | | +-> (stop_all) -> CANCELLED +-> timeout -> QUEUED +-> timeout -> QUEUED | (max retries) +-> PERMANENTLY_FAILED -> dead_letters ``` --- 配置 编辑 `config.yaml`: ```yaml bus: host: "0.0.0.0" port: 8648 db_dir: "./data" timeout: dispatched: 10 ack_pending: 300 max_retries: 3 adapter: poll_interval: 0.5 heartbeat_interval: 5 brain_timeout: 120 poll_limit: 3 ``` 设置环境变量 `ANYUE_CONFIG` 可指定自定义配置文件路径。 --- 自定义适配器 编辑 `adapter_template.py`: 1. 修改 `ADAPTER_CONFIG["agent_id"]` —— 你的 Agent 名字 2. 修改 `ADAPTER_CONFIG["brain_url"]` —— 你的 AI 模型 API 地址 3. 设置 `ADAPTER_CONFIG["auto_reply"]` —— True 自动回复,False 手动确认 4. 自定义 `build_prompt()` —— 你的 Agent 人设 / System Prompt --- 环境要求 - Python 3.8+ - PyYAML(配置文件支持) --- 开源协议 MIT