代码拉取完成,页面将自动刷新
---
id: 302
title: MqttServer
---
import useBaseUrl from "@docusaurus/useBaseUrl";
import Tag from "@site/src/components/Tag.js";
import Highlight from '@site/src/components/Highlight.js';
:::tip `提示`
通过自定义脚本,可快速适配业务模型
:::
## 一、说明
MqttServer支持Tcp/webSocket方式接入,可以定时/变化发布数据,支持Rpc写入
通道只支持 Other
## 二、插件属性配置项
<img src={require("@site/static/img/docs/MqttServer1.png").default} />
| 属性 | 说明 | 备注|
| ---------------| --------------------------| ---|
| IP | ServerIP,为空时指任意IP | |
| 端口 | Tcp绑定端口 | 1883|
| 详细日志 | Flase=>日志输出上传数量,True=>日志输出上传内容 | False |
| WebSocket端口 | WebSocket绑定端口 | 8083,固定/mqtt路由 |
| 允许连接的ID(前缀) | 允许连接的ID(前缀) |ClientId必须以这个属性开头,为空时可以是任意Id |
| 允许匿名登录 | 允许匿名登录 | 设为Flase时,客户端需要填入网关的用户名和密码 |
| 允许Rpc写入 | 是否允许写入变量 | |
| Rpc写入主题 | 写入变量的主题 | 如果检测适配固定的topic标识,会按默认规则返回,比如``thingsboard``平台为``v1/gateway/rpc`` 。默认规则为:固定通配 ``RpcWrite/+`` ,其中RpcWrite为该属性填入内容,+通配符是请求GUID值;返回结果主题会在主题后添加Response , 也就是``RpcWrite/+/Response``|
| Rpc请求数据主题 | 该主题接受到任何消息都会发布全部信息到对应的变量/设备/报警主题中 | |
| 分组上传 | 启用后,无论是定时还是变化模式,始终会上传**变量分组属性**为key分组的全部变量 。在变化模式时,每次变量变化都会触发一次组上传 | False |
| 选择全部变量 | 选择全部变量 | False |
| 设备状态列表上传 | 设备是否列表上传,false时每个设备实体都会单独发布,注意性能需求,默认为true | |
| 变量列表上传 | 变量是否列表上传,false时每个变量实体都会单独发布,注意性能需求,默认为true | |
| 报警列表上传 | 报警是否列表上传,false时每个报警实体都会单独发布,注意性能需求,默认为true | |
| 设备Topic | 设备实体的发布主题 ,使用``${key}``作为匹配项,key必须是上传实体中的属性 | |
| 变量Topic | 变量实体的发布主题 ,使用``${key}``作为匹配项,key必须是上传实体中的属性 | |
| 报警Topic | 报警实体的发布主题 ,使用``${key}``作为匹配项,key必须是上传实体中的属性 | |
| 设备实体脚本 | 脚本返回新的实体列表,动态类中需继承**IDynamicModel**,传入列表为**DeviceData**,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
| 变量实体脚本 | 脚本返回新的实体列表,动态类中需继承**IDynamicModel**,传入列表为**VariableBasicData**,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
| 报警实体脚本 | 脚本返回新的实体列表,动态类中需继承**IDynamicModel**,传入列表为**AlarmVariable**,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
| 选择全部变量 | 是否选择全部变量,true时不需要单个变量添加业务属性 | |
| 上传模式 | 间隔/变化/变化和间隔同时生效 | |
| 定时上传间隔 | 间隔执行时间 | |
| 启用缓存 | 是否启用缓存 | |
| 缓存文件最大长度(mb) | 缓存文件最大长度 | |
| 上传每页条数 | 每一次上传的列表最大数量 | |
| 内存队列最大数量 | 内存队列的最大数量,超出或失败时转入文件缓存,根据数据量设定适当值 | |
## 三、脚本与实体
查看MqttClient页面[脚本接口](./MqttClient.mdx#脚本接口)
## 四、变量业务属性
<img src={require("@site/static/img/docs/MqttServer2.png").default} />
| 属性 | 说明 | 备注|
| ---------------| --------------------------| ---|
| 启用RPC | 单独配置变量是否允许写入 |true |
| Enable | 是否启用 | true |
## 五、Rpc
### 1、Rpc脚本
脚本类定义
```
using MQTTnet;
using MQTTnet.Server;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
namespace ThingsGateway.Plugin.Mqtt;
public abstract class DynamicMqttServerRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttServer">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典),返回rpc结果</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public virtual async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, InterceptingPublishEventArgs args, MqttServerProperty driverPropertys, MQTTnet.Server.MqttServer mqttServer, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, CancellationToken cancellationToken)
{
}
}
```
RPC脚本demo
```
using MQTTnet;
using MQTTnet.Server;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
using ThingsGateway.Plugin.Mqtt;
public class DynamicMqttServerRpc:DynamicMqttServerRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttServer">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典),返回rpc结果</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public override async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, InterceptingPublishEventArgs args, MqttServerProperty driverPropertys, MQTTnet.Server.MqttServer mqttServer, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, CancellationToken cancellationToken)
{
if (driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;
if(args.ApplicationMessage.Topic != driverPropertys.RpcWriteTopic) return;
var rpcDatas = Encoding.UTF8.GetString(args.ApplicationMessage.Payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var mqttRpcResult = await getRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);
try
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(driverPropertys.JsonFormattingIndented)).Build();
await mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(variableMessage), cancellationToken).ConfigureAwait(false);
}
catch
{
}
}
}
```
### 2、默认格式与MqttClient相同
查看MqttClient页面[Rpc](./MqttClient.mdx#四rpc)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。