代码拉取完成,页面将自动刷新
---
id: mqttclient
title: Mqtt客户端使用
---
import Tag from "@site/src/components/Tag.js";
### 定义
命名空间:TouchSocket.Mqtt <br/>
程序集:[TouchSocket.Mqtt.dll](https://www.nuget.org/packages/TouchSocket.Mqtt)
## 一、说明
`MqttClient`是遵循Mqtt协议的消息客户端,支持连接Mqtt Broker、订阅主题、发布消息等功能,支持QoS 0/1/2三种消息质量等级。
## 二、特点
- 轻量级协议支持
- QoS消息质量保障
- 遗嘱消息支持
- 保留消息处理
- TLS加密通信
- 主题通配符匹配(+/#)
## 三、应用场景
- IoT设备数据上报
- 跨平台消息推送
- 低带宽环境通信
- 设备状态同步
## 四、可配置项
继承所有[TcpClient](./tcpclient.mdx)的配置。除此之外还支持Mqtt的一些专有配置。
<details>
<summary>可配置项</summary>
<div>
#### SetMqttConnectOptions
`MqttConnectOptions` 类用于配置 Mqtt 客户端连接到 Mqtt 服务端时的参数,支持 Mqtt 3.1.1 及 5.0+ 协议版本。以下是各配置项的详细说明:
**基础连接选项(通用)**
| 属性名 | Mqtt 版本支持 | 说明 |
|-------------------------|---------------------|------------------------------------------------------------------------------------------|
| `CleanSession` | 3.1.1+ | 是否清理会话(断开后删除未完成的遗嘱和离线消息)。 |
| `ClientId` | 3.1.1+ | 客户端唯一标识符(服务端用于识别客户端);Mqtt 3.1.1 中必填(长度 1-23 字节,ASCII 字符);5.0+ 允许空字符串(仅当服务端允许时)。 |
| `KeepAlive` | 3.1.1+ | 心跳保活时长(单位:秒),表示客户端与服务端保持连接的最长时间;需与服务端配置兼容(服务端可能拒绝过大或过小的值),默认值通常为 60 秒。 |
| `Password` | 3.1.1+ | 连接认证密码(与 `UserName` 配合使用);若 `UserName` 为空则无效;5.0+ 支持二进制数据(需通过 `AuthenticationData` 传递)。 |
| `ProtocolName` | 3.1.1+ | Mqtt 协议名称(固定为 "MQTT"),默认值为 "MQTT",通常无需手动设置。 |
| `UserName` | 3.1.1+ | 连接认证用户名(与 `Password` 配合使用);5.0+ 支持二进制数据(需通过 `AuthenticationData` 传递)。 |
| `Version` | 3.1.1+/5.0+ | Mqtt 协议版本(如 `MqttProtocolVersion.V311` 或 `V500`);需根据服务端支持的版本设置,否则可能导致连接失败。 |
---
**遗嘱(Will)消息配置(可选)**
当 `WillFlag` 为 `true` 时,服务端会在客户端异常断开时发布遗嘱消息。仅部分属性在 `WillFlag=true` 时生效。
| 属性名 | Mqtt 版本支持 | 说明 |
|-----------------------------|---------------------|------------------------------------------------------------------------------------------|
| `WillFlag` | 3.1.1+ | 是否启用遗嘱消息(若为 `true`,需配置遗嘱相关属性)。 |
| `WillQos` | 3.1.1+ | 遗嘱消息的服务质量等级(0/1/2);需与服务端支持的 QoS 等级兼容。 |
| `WillRetain` | 3.1.1+ | 是否保留遗嘱消息(服务端保留最新遗嘱消息供新订阅者接收);默认值为 `false`;若为 `true`,服务端需支持保留消息功能。 |
| `WillMessage` | 3.1.1+ | 遗嘱消息的内容(文本类型,Mqtt 5.0+ 支持二进制数据,需通过 `WillContentType` 和 `WillPayloadFormatIndicator` 配合)。 |
| `WillTopic` | 3.1.1+ | 遗嘱消息发布的主题(需符合 Mqtt 主题命名规则);主题需提前订阅才能接收遗嘱消息。 |
| `WillContentType` | 5.0+ | 遗嘱消息内容的 MIME 类型(如 "text/plain"、"application/json");描述 `WillMessage` 的格式(仅 5.0+ 有效)。 |
| `WillPayloadFormatIndicator`| 5.0+ | 遗嘱消息负载格式指示符(如 `Raw`、`Utf8`);更细粒度描述负载格式(仅 5.0+ 有效)。 |
| `WillResponseTopic` | 5.0+ | 遗嘱消息触发后,服务端向客户端发送响应的主题;需客户端提前订阅该主题以接收响应。 |
| `WillCorrelationData` | 5.0+ | 遗嘱消息关联的上下文数据(用于匹配请求与响应);需与服务端约定格式(仅 5.0+ 有效)。 |
| `WillDelayInterval` | 5.0+ | 遗嘱消息延迟发布的时间间隔(单位:秒);遗嘱触发后,延迟指定时间再发布(仅 5.0+ 有效)。 |
| `WillMessageExpiryInterval` | 5.0+ | 遗嘱消息的有效期(单位:秒,过期后服务端删除);超过此时间未发布的遗嘱消息将被丢弃(仅 5.0+ 有效)。 |
---
**Mqtt 5.0+ 扩展配置(可选)**
仅当使用 Mqtt 5.0+ 协议时生效,用于更细粒度的连接控制。
| 属性名 | Mqtt 版本支持 | 说明 |
|-----------------------------|---------------------|------------------------------------------------------------------------------------------|
| `UserProperties` | 5.0+ | 用户自定义属性(键值对,用于传递业务元数据);键值对需符合 Mqtt 5.0 规范(键为 UTF-8 字符串,值支持多种类型)。 |
| `AuthenticationMethod` | 5.0+ | 认证方法名称(如 "OAuth2"、"Custom");需与服务端支持的认证方法一致(仅 5.0+ 支持扩展认证)。 |
| `AuthenticationData` | 5.0+ | 认证方法的附加数据(如令牌、签名等);配合 `AuthenticationMethod` 使用(仅 5.0+ 有效)。 |
| `MaximumPacketSize` | 5.0+ | 客户端请求的最大数据包大小(单位:字节);服务端会返回其支持的最大值(客户端需遵守)。 |
| `ReceiveMaximum` | 5.0+ | 客户端能接收的最大 QoS 1/2 消息数量(防止消息堆积);默认值通常为 65535(需与服务端配置兼容)。 |
| `RequestProblemInformation` | 5.0+ | 是否请求服务端在出错时返回详细问题信息(如原因码、原因字符串);默认值为 `true`(建议开启以便排查问题)。 |
| `RequestResponseInformation`| 5.0+ | 是否请求服务端返回连接响应的额外信息(如会话过期时间);默认值为 `false`(仅当需要时启用)。 |
| `SessionExpiryInterval` | 5.0+ | 会话过期时间间隔(单位:秒,0 表示立即过期);客户端断开后,服务端保留会话的时间(仅当 `CleanSession=false` 时有效)。 |
| `TopicAliasMaximum` | 5.0+ | 客户端允许的最大主题别名值(服务站不能使用超过此值的别名);默认值为 0(表示不使用主题别名)。 |
---
**注意事项**
1. **协议版本兼容性**:部分属性(如 `UserProperties`、`AuthenticationMethod`)仅在 Mqtt 5.0+ 中有效,需根据服务端版本选择配置。
2. **遗嘱消息生效条件**:`WillFlag` 必须为 `true`,且 `WillTopic` 和 `WillMessage` 需有效配置,否则遗嘱不会被发送。
3. **认证扩展**:5.0+ 的 `AuthenticationData` 需配合自定义认证插件使用,具体格式由服务端定义。
4. **只读属性**:`WillMessage` 和 `WillTopic` 标记为 `internal set`,通常通过构造函数或专用方法设置(如 `MqttApplicationMessage`)。
</div>
</details>
## 五、支持插件
| 插件方法| 功能 |
| --- | --- |
| IMqttConnectingPlugin | 当Mqtt客户端正在连接之前调用此方法。 |
| IMqttConnectedPlugin | 当Mqtt客户端连接成功时调用。 |
| IMqttClosingPlugin | 当Mqtt客户端正在关闭时调用。|
| IMqttClosedPlugin | 当Mqtt客户端断开连接后触发。 |
| IMqttReceivingPlugin | 在收到Mqtt所有消息时触发,可以通过`e.MqttMessage`获取到`Mqtt`的所有消息,包括订阅、订阅确认、发布、发布确认等。 |
| IMqttReceivedPlugin | 当接收到Mqtt发布消息,且成功接收时触发。可以通过`e.MqttMessage`获取到`Mqtt`的发布消息。 |
## 六、创建Mqtt客户端
```csharp showLineNumbers
var client = new MqttTcpClient();
await client.SetupAsync(new TouchSocketConfig()
.SetRemoteIPHost("tcp://127.0.0.1:7789")
.SetMqttConnectOptions(options =>
{
options.ClientId = "TestClient";
options.UserName = "TestUser";
options.Password = "TestPassword";
options.ProtocolName = "MQTT";
options.Version = MqttProtocolVersion.V311;
options.KeepAlive = 60;
options.CleanSession = true;
// v5可以继续配置其他
// options.UserProperties = new[]
// {
// new MqttUserProperty("key1","value1"),
// new MqttUserProperty("key2","value2")
// };
})
.ConfigurePlugins(a =>
{
a.AddMqttConnectingPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Connecting:{e.ConnectMessage.ClientId}");
await e.InvokeNext();
});
a.AddMqttConnectedPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Connected:{e.ConnectMessage.ClientId}");
await e.InvokeNext();
});
a.AddMqttReceivingPlugin(async (mqttSession, e) =>
{
await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
});
a.AddMqttReceivedPlugin(async (mqttSession, e) =>
{
var message = e.Message;
await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
});
a.AddMqttClosingPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Closing:{e.MqttMessage.MessageType}");
await e.InvokeNext();
});
a.AddMqttClosedPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Closed:{e.Message}");
await e.InvokeNext();
});
}));//载入配置
await client.ConnectAsync();//连接
```
## 七、订阅与取消订阅
### 7.1 订阅
按照Mqtt协议,客户端可以通过`Subscribe`指令订阅一个或多个主题,服务端会返回订阅结果。
```csharp {10} showLineNumbers
var topic1 = "topic1";
var topic2 = "topic2";
SubscribeRequest subscribeRequest1=new SubscribeRequest(topic1, QosLevel.AtLeastOnce);//订阅请求
SubscribeRequest subscribeRequest2=new SubscribeRequest(topic2, QosLevel.AtMostOnce);//可以设置不同的Qos级别
//多个订阅请求
var mqttSubscribeMessage = new MqttSubscribeMessage(subscribeRequest1, subscribeRequest2);
//执行订阅
var mqttSubAckMessage = await client.SubscribeAsync(mqttSubscribeMessage);
//输出订阅结果
foreach (var item in mqttSubAckMessage.ReturnCodes)
{
Console.WriteLine($"ReturnCode:{item}");
}
```
### 7.2 取消订阅
按照Mqtt协议,客户端可以通过`Unsubscribe`指令取消订阅一个或多个主题,服务端会返回所有取消订阅结果。
```csharp {5} showLineNumbers
var topic1 = "topic1";
var topic2 = "topic2";
//取消订阅
var mqttUnsubAckMessage = await client.UnsubscribeAsync(new MqttUnsubscribeMessage(topic1,topic2));
```
## 八、接收消息
Mqtt组件的消息都是通过插件抛出的。你可以订阅`IMqttReceivingPlugin`插件和`IMqttReceivedPlugin`插件来接收消息。
正如插件说明所示,`IMqttReceivingPlugin`插件可以在收到所有消息时触发,`IMqttReceivedPlugin`插件可以在收到发布消息时触发。
所以如果你只关心发布成功的消息,那么可以只订阅`IMqttReceivedPlugin`插件即可。
你可以直接使用委托实现订阅:
```csharp {3-17} showLineNumbers
.ConfigurePlugins(a =>
{
a.AddMqttReceivedPlugin(async (client, e) =>
{
var mqttMessage = e.MqttMessage;
Console.WriteLine("Reved:" + mqttMessage);
//订阅消息的主题
var topicName = mqttMessage.TopicName;
//订阅消息的Qos级别
var qosLevel = mqttMessage.QosLevel;
//订阅消息的Payload
var payload = mqttMessage.Payload;
await e.InvokeNext();
});
})
```
或者使用插件类来继承接口实现:
```csharp showLineNumbers
class MyMqttReceivedPlugin : PluginBase, IMqttReceivedPlugin
{
public async Task OnMqttReceived(IMqttSession client, MqttReceivedEventArgs e)
{
var mqttMessage = e.MqttMessage;
//订阅消息的主题
var topicName = mqttMessage.TopicName;
//订阅消息的Qos级别
var qosLevel = mqttMessage.QosLevel;
//订阅消息的Payload
var payload = mqttMessage.Payload;
await e.InvokeNext();
}
}
```
然后把这个类添加到插件即可:
```csharp {3} showLineNumbers
.ConfigurePlugins(a =>
{
a.Add<MyMqttReceivedPlugin>();//添加自定义插件
})
```
## 九、发布消息
```csharp {3} showLineNumbers
MqttPublishMessage message = new(topic1, false, QosLevel.AtLeastOnce, Encoding.UTF8.GetBytes("Hello World"));
await client.PublishAsync(message);
```
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。