# goang-mqtt-broker **Repository Path**: yyz116/goang-mqtt-broker ## Basic Information - **Project Name**: goang-mqtt-broker - **Description**: 一个用 Go 语言从零开始实现的完整 MQTT broker,支持 MQTT 3.1.1 协议的所有核心功能。 - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 3 - **Forks**: 0 - **Created**: 2025-05-24 - **Last Updated**: 2025-06-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # MQTT Broker 一个用 Go 语言从零开始实现的完整 MQTT broker,支持 MQTT 3.1.1 协议的所有核心功能。 项目主页: https://blog.csdn.net/yyz_1987 ## 功能特性 ### 核心功能 - ✅ **完整的 MQTT 3.1.1 协议支持** - ✅ **QoS 0, 1, 2 消息传递保证** - ✅ **会话管理**(持久会话和清理会话) - ✅ **保留消息**(Retained Messages) - ✅ **遗嘱消息**(Last Will and Testament) - ✅ **主题通配符**(+ 和 # 通配符支持) - ✅ **客户端认证**(用户名/密码) - ✅ **保活机制**(Keep Alive) - ✅ **并发安全** ### 架构特性 - 🏗️ **模块化设计**,易于扩展 - 🔌 **可插拔存储接口** - 🔒 **线程安全**的并发处理 - 📊 **内置监控指标** - 🐳 **Docker 支持** ``` mqtt-broker/ ├── README.md ├── Makefile ├── Dockerfile ├── go.mod ├── go.sum ├── etc/ │ └── config.yaml ├── main.go ├── cmd/ │ ├── broker/ │ │ └── │ └── test-client/ │ └── main.go ├── internal/ │ ├── auth/ │ │ └── auth.go │ ├── broker/ │ │ ├── broker.go │ │ ├── client.go │ │ └── topic.go │ ├── protocol/ │ │ ├── common/ │ │ │ └── types.go │ │ └── mqtt311/ │ │ └── packet.go │ └── storage/ │ ├── interface.go │ └── memory/ │ └── store.go |___ redis/ |__ redis_store.go └── pkg/ └── mqtt/ └── packet.go ``` ## 快速开始 ### 环境要求 - Go 1.21 或更高版本 - Make(可选,用于构建脚本) ### 安装和运行 1. **克隆项目** ```bash git clone cd mqtt-broker go mod tidy ``` ### 安装依赖 ```bash go mod tidy ``` 构建项目 ```bash make build ``` #### 或者 go build -o bin/mqtt-broker main.go 运行 Broker ```bash make run ``` #### 或者 ./bin/mqtt-broker -addr=:1883 -debug #### 或者 直接 go run main.go #### 或者使用 Docker #### 构建镜像 docker build -t mqtt-broker . #### 运行容器 docker run -p 1883:1883 mqtt-broker ### 使用示例 启动 Broker #### 默认端口 1883 go run main.go #### 自定义端口和调试模式 go run main.go -debug ### 端口及配置在etc文件夹 #### yaml格式配置文件 ```bash # etc/config.yaml server: port: 1883 max_connections: 10000 max_message_size: 1048576 # 1MB, change this value to suit your messageSize storage: type: "memory" # "memory" or "redis" redis: address: "locolhost:6379" password: "" db: 0 pool_size: 100 min_idle_conn: 10 ``` ### 测试客户端 项目包含一个简单的测试客户端,可以用来测试 broker 功能: 订阅消息: go run cmd/test-client/main.go -mode=sub -topic=test/hello -client=subscriber1 发布消息: go run cmd/test-client/main.go -mode=pub -topic=test/hello -msg="Hello MQTT!" -client=publisher1 ### 使用第三方客户端 你也可以使用任何标准的 MQTT 客户端连接到 broker: 使用 mosquitto 客户端: #### 订阅 mosquitto_sub -h localhost -p 1883 -t "test/topic" #### 发布 mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello World" 使用认证: #### 默认用户:admin/password, test/test123 mosquitto_pub -h localhost -p 1883 -u admin -P password -t "test/topic" -m "Authenticated message" 配置说明 命令行参数 参数 默认值 说明 -addr :1883 Broker 监听地址 -debug false 启用调试日志 #### 内置用户 Broker 默认创建了以下测试用户: 用户名 密码 admin password test test123 ### 架构设计 核心组件 ``` ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ TCP Server │ │ Broker │ │ Topic Manager │ │ │────│ │────│ │ │ Connection Mgmt │ │ Message Routing │ │ Subscription │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ │ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Client │ │ Storage │ │ Auth │ │ │ │ │ │ │ │ Protocol Handle │ │ Session/Message │ │ Authentication │ └─────────────────┘ └─────────────────┘ └─────────────────┘ ``` 数据流 客户端连接 → TCP Server 接受连接 协议解析 → Client 解析 MQTT 数据包 认证验证 → Auth 模块验证用户凭据 会话管理 → Storage 加载/保存会话信息 消息路由 → Broker 根据订阅关系路由消息 主题匹配 → Topic Manager 处理通配符匹配 ### 开发指南 项目结构说明 ``` mqtt-broker/ ├── README.md # 项目说明文档 ├── Makefile # 构建脚本 ├── Dockerfile # Docker 镜像构建文件 ├── go.mod # Go 模块依赖文件 ├── go.sum # Go 模块依赖校验文件 ├── etc/ # 配置文件目录 │ └── config.yaml # 配置文件 ├── main.go # 项目入口文件 ├── cmd/ # 应用程序目录 │ ├── broker/ # Broker 主程序 │ └── test-client/ # 测试客户端 ├── internal/ # 内部包(不对外暴露) │ ├── auth/ # 认证模块 │ ├── broker/ # Broker 核心逻辑 │ ├── protocol/ # MQTT 协议实现 │ └── storage/ # 存储接口和实现 └── pkg/ # 公共包 └── mqtt/ # MQTT 工具包 ``` ### 扩展存储后端 实现 storage.Store 接口来添加新的存储后端: ```golang type Store interface { SaveSession(clientID string, session *Session) error LoadSession(clientID string) (*Session, error) DeleteSession(clientID string) error // ... 其他方法 } ``` 示例:Redis 存储实现 ```golang type RedisStore struct { client *redis.Client } func (r *RedisStore) SaveSession(clientID string, session *Session) error { // Redis 实现 } ``` 添加新的认证方式 实现 auth.Authenticator 接口: ```golang type Authenticator interface { Authenticate(username, password string) bool AddUser(username, password string) error RemoveUser(username string) error } ``` ### 扩展方式 自定义协议处理 在 internal/protocol/ 下添加新的协议版本支持,例如 MQTT 5.0。 性能和限制 默认配置 最大连接数: 10,000 最大消息大小: 1MB 保留消息限制: 10,000 条 会话过期时间: 24 小时 消息过期时间: 24 小时 ### 性能优化建议 生产环境使用持久化存储(Redis、PostgreSQL 等) 调整连接池大小和缓冲区大小 启用消息压缩(如果需要) 使用负载均衡器进行水平扩展 ### 测试用例 ### 运行单元测试 go test -v ./... 集成测试 #### 启动 broker make run #### 在另一个终端运行测试 make test-sub & make test-pub ### 压力测试 使用 MQTT 压力测试工具: #### 使用 mqtt-benchmark mqtt-benchmark -broker tcp://localhost:1883 -count 1000 -size 100 ### 监控和日志 日志级别 INFO: 连接/断开事件 DEBUG: 详细的协议交互(使用 -debug 启用) ERROR: 错误和异常情况 监控指标 Broker 提供以下监控指标: stats := broker.GetStats() // { // "connected_clients": 42, // "max_connections": 10000, // "uptime": "2h30m15s" // } ### 使用说明 1. 构建和运行 #### 安装依赖 make deps #### 构建 make build #### 运行broker make run #### 或者直接运行 go run main.go -addr=:1883 -debug 2. 测试 #### 终端1:启动订阅者 make test-sub #### 终端2:发送消息 make test-pub 3. 功能特性 这个完整的 MQTT broker 实现包含: 协议支持: MQTT 3.1.1 完整实现 QoS 支持: QoS 0, 1, 2 的完整实现 会话管理: 持久会话和清理会话 保留消息: 完整的保留消息机制 遗嘱消息: 客户端异常断开时的遗嘱消息 主题通配符: 支持 + 和 # 通配符 认证: 简单的用户名密码认证 并发安全: 完整的并发控制 内存存储: 可扩展的存储接口 连接管理: 保活检测和连接限制 4. 后续扩展方向 添加 MQTT 5.0 支持 实现持久化存储(Redis、MySQL等) 添加集群支持 实现 WebSocket 支持 添加监控和指标 实现更复杂的认证和授权 添加消息桥接功能 增加Web管理界面 增加插件系统 ### 故障排除 常见问题 Q: 客户端连接被拒绝 A: 检查认证信息,确保用户名密码正确 Q: 消息没有收到 A: 检查主题订阅是否正确,确认 QoS 设置 Q: 连接频繁断开 A: 检查 Keep Alive 设置,确认网络稳定性 ### 调试技巧 启用调试日志: 使用 -debug 参数 检查网络连接: 使用 telnet localhost 1883 监控资源使用: 检查内存和 CPU 使用情况 ### 贡献指南 #### 开发流程 Fork 项目 创建功能分支 (git checkout -b feature/amazing-feature) 提交更改 (git commit -m 'Add amazing feature') 推送到分支 (git push origin feature/amazing-feature) 创建 Pull Request ### 代码规范 使用 go fmt 格式化代码 运行 go vet 检查代码 添加必要的单元测试 更新相关文档 ### 路线图 短期目标 MQTT 5.0 协议支持 Redis 存储后端 WebSocket 支持 集群模式 长期目标 消息桥接 插件系统 Web管理界面 高可用部署 ### 许可证 本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。 ### 致谢 MQTT 协议规范 Eclipse Mosquitto - 参考实现 Go 社区的优秀开源项目 ### 联系方式 项目主页: https://blog.csdn.net/yyz_1987 问题反馈: https://blog.csdn.net/yyz_1987 邮箱: 534117529@qq.com