# mqttx **Repository Path**: ymofen/mqttx ## Basic Information - **Project Name**: mqttx - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-12-01 - **Last Updated**: 2025-12-02 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # mqttx `mqttx` 是一个基于 [Eclipse Paho MQTT Go Client](https://github.com/eclipse/paho.mqtt.golang) 封装的、支持多订阅者、线程安全、可热更新配置的 MQTT 客户端库,适用于需要灵活管理多个订阅回调、自动重连、配置动态变更等场景。 --- ## 特性 - **多订阅者支持**:同一主题可注册多个独立 ID 的订阅回调。 - **线程安全**:所有操作通过内部命令通道串行化执行,避免并发问题。 - **热更新配置**:运行时可动态修改 Broker、用户名、密码、ClientID 等连接参数。 - **自动重连与恢复**:断线后自动重连,并在重连成功后重新订阅所有有效主题。 - **同步/异步订阅**:提供 `Subscribe`(同步)和 `SubscribeAsync`(异步)两种订阅方式。 - **生命周期回调**:支持 `Connected` 和 `Disconnected` 事件通知。 - **资源安全关闭**:提供 `Close()` 方法确保优雅退出,释放所有资源。 - **测试友好**:支持注入 Mock 客户端用于单元测试。 --- ## 安装 ```bash go get gitee.com/ymofen/mqttx@latest ``` 依赖项(已在 `go.mod` 中声明): - `github.com/eclipse/paho.mqtt.golang v1.5.1` - `gitee.com/ymofen/gotest v1.0.3`(仅测试使用) --- ## 快速开始 ```go package main import ( "fmt" "gitee.com/ymofen/mqttx" mqtt "github.com/eclipse/paho.mqtt.golang" "log" "time" ) func main() { mqttx.Debugf = func(fmt string, args ...interface{}) { log.Printf(fmt, args...) } mqttx.Warnf = func(fmt string, args ...interface{}) { log.Printf(fmt, args...) } // 创建客户端 client := mqttx.NewIDBasedMQTTClient() defer client.Close() // 配置连接参数 config := mqttx.MQTTClientConfig{ Broker: "tcp://broker.hivemq.com:1883", ClientID: fmt.Sprintf("mqttx-%d", time.Now().UnixNano()), CleanSession: true, AutoReconnect: true, KeepAlive: 30 * time.Second, } // 设置连接/断开回调(可选) client.Connected = func(c *mqttx.IDBasedMQTTClient) { fmt.Println("✅ 已连接到 MQTT 代理") } client.Disconnected = func(c *mqttx.IDBasedMQTTClient, err error) { fmt.Printf("⚠️ 连接断开: %v\n", err) } // 更新配置并自动连接 err := client.UpdateConfig(config) if err != nil { panic(err) } // 手动连接(UpdateConfig 会自动触发连接) // 或者直接调用 client.Connect() // 订阅主题(同步) err = client.Subscribe("sub001", "test/topic/mqttx", 1, func(c mqtt.Client, msg mqtt.Message) { fmt.Printf("📥 收到消息 [%s]: %s\n", msg.Topic(), string(msg.Payload())) }) if err != nil { fmt.Printf("订阅失败: %v\n", err) } // 发布消息 err = client.Publish("test/topic/mqttx", 1, false, "Hello from mqttx!") if err != nil { fmt.Printf("发布失败: %v\n", err) } // 等待几秒 time.Sleep(5 * time.Second) } ``` --- ## API 概览 ### 客户端创建与配置 - `NewIDBasedMQTTClient()` → 创建新客户端实例 - `UpdateConfig(MQTTClientConfig)` → 全量更新配置并重连 - `UpdateConfigOptions(...MQTTConfigOption)` → 增量更新部分配置(如仅改密码) ### 订阅管理 - `Subscribe(id, topic, qos, handler)` → 同步订阅(阻塞直到完成) - `SubscribeAsync(id, topic, qos, handler)` → 异步订阅(立即返回) - `Unsubscribe(id, topic)` → 取消指定 ID 的订阅 - `HasSubscriptionForTopic(topic)` → 判断主题是否已成功订阅 - `HasSubscriptionForId(topic, id)` → 判断指定 ID 是否订阅了该主题 - `GetSubscriptionsByTopic(topic)` → 获取某主题的所有订阅记录 - `GetAllSubscriptions()` → 获取所有订阅记录 ### 连接与发布 - `Connect()` → 手动连接(通常由 `UpdateConfig` 自动调用) - `Disconnect()` → 主动断开(保留订阅状态) - `Publish(topic, qos, retained, payload)` → 发布消息 - `IsConnected()` → 检查是否已连接 - `IsClosed()` → 检查客户端是否已关闭 ### 生命周期与资源 - `Close()` → 优雅关闭客户端(断开连接 + 清理订阅 + 释放资源) - `Connected` / `Disconnected` → 可设置的回调函数 ### 测试支持 - `SetClientForTest(mqtt.Client)` → 注入模拟客户端,便于单元测试 --- ## 配置说明 (`MQTTClientConfig`) | 字段 | 类型 | 说明 | |------|------|------| | `Broker` | `string` | MQTT 代理地址,如 `tcp://localhost:1883` | | `ClientID` | `string` | 客户端唯一标识 | | `Username` / `Password` | `string` | 认证凭据 | | `CleanSession` | `bool` | 是否清除会话 | | `AutoReconnect` | `bool` | 是否启用自动重连(底层 Paho 默认开启,本库设为 `false` 并自行管理) | | `KeepAlive` | `time.Duration` | 心跳间隔 | | `ConnectTimeout` | `time.Duration` | 连接超时 | | `MaxReconnectInterval` | `time.Duration` | 最大重连间隔 | | `WillTopic` / `WillMessage` / `WillQoS` / `WillRetained` | - | 遗嘱消息配置 | > ⚠️ 注意:本库在 `setupClient` 中显式设置了 `opts.SetAutoReconnect(false)`,转而通过 `onConnectionLost` + `resubAll` + 内部重试机制实现更可控的重连逻辑。 --- ## 调试日志 可通过设置全局变量启用调试输出: ```go mqttx.Debugf = log.Printf mqttx.Warnf = log.Printf ``` --- ## 单元测试 项目使用 `gitee.com/ymofen/gotest` 辅助测试,支持通过 `SetClientForTest` 注入 mock 客户端,便于隔离网络依赖。 --- ## 许可证 请参考项目源码仓库的 LICENSE 文件。 --- > 项目托管于 Gitee:`gitee.com/ymofen/mqttx` > 基于 Eclipse Paho MQTT Go Client (v1.5.1)