1 Star 0 Fork 6

kouzhao/windmq

forked from Stan/windmq 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 7.57 KB
一键复制 编辑 原始数据 按行查看 历史

windmq - MQTT快速开发脚手架

star fork

介绍

springboot快速接入MQTT,一个方法注解就搞定消息处理

开发背景

之前做一个硬件项目用到mqtt,处理设备上传的数据

部署产线环境是阿里云,测试用的EMQ都需要支持,就有了这个项目

此项目整合springboot部分和topic规则搬运了一个项目,刚接触这个,十分感谢前辈的经验https://gitee.com/yezhihao/mqtt-sample

关于共享订阅的高可用兼容,如果有方案还望各位不吝赐教

功能

  • MQTT客户端登录凭证分配(ACL支持阿里云\EMQ目前只支持账号密码,可自定义实现)
  • 适合低端设备的查表加密协议非必须(详情见: com.stanwind.wmqtt.security.TableMsgEncrypt)
  • 高可用部署(多实例不同clientID上线,EMQ有提供共享订阅,但是阿里云只能靠规则引擎转发MQ,我们线上使用全盘负责机制,谁发命令谁处理)
  • 异步消息处理池(CPU核心数*2 + 1)
  • Topic注解匹配消息处理,支持模糊匹配(正则实现,可取topic路径参数)和精确匹配

默认建议规则

  • 对客户端发送的TOPIC均以 IOT_CLIENT/xxx形式 (配置可修改)
  • 对服务端发送的TOPIC均以 IOT_SERVER/xxx形式 (配置可修改)
  • 加密行需在payload开头2 byte表示采用哪一行数据进行加密 (若启用加密则IOT开头的topic均会加密,详见:com.stanwind.wmqtt.security.IotDeviceMessageEncrypt)
  • 为兼容阿里云 clientId均以GID_DEVICE@@@开头 (配置可修改)
  • Server采用签名登录,阿里云环境下Client分配的账号密码使用token登录,鉴权信息有效时长12小时
  • topic中{instanceId}表示匹配当前实例ID,{deviceId}表示匹配当前设备序列号(详情: com.stanwind.wmqtt.MqttConfig)

springboot支持版本

  • 2.0.X.RELEASE

项目仓库

<dependency>
  <groupId>com.stanwind</groupId>
  <artifactId>spring-boot-windmq</artifactId>
  <version>1.0.0-RELEASE</version>
</dependency>

样例工程

https://gitee.com/sense7/windmq-demo.git

参考依赖

<!-- windmq dependency -->
<dependency>
  <groupId>com.stanwind</groupId>
  <artifactId>spring-boot-windmq</artifactId>
  <version>1.1.0-RELEASE</version>
</dependency>

<!-- MQTT -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
  <exclusions>
    <exclusion>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <groupId>org.eclipse.paho</groupId>
    </exclusion>
  </exclusions>
</dependency>
<!-- 1.2.0 版本有bug -->
<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.1</version>
</dependency>

启用windmq

@EnableWindMQ
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

使用样例

  • 临时订阅/取消(注入WMHolder)
void addTopic(String... topic);
void addTopic(String topic, int qos);
void addTopics(String[] topic, int[] qos);
void removeTopic(String... topic);
  • 消息发送 IMessageService
/**
 * 发送消息给device 需要payload Class上有@Topic
 * @param deviceId
 * @param payload
 */
void notify(String deviceId, Object payload);

/**
 * 发送消息到topic
 * @param topic
 * @param payload
 */
void notifyToTopic(String topic, Object payload);

/**
 * 同步确认发送消息给设备
 * @param deviceId
 * @param payload
 * @return
 */
MqttResponse request(String deviceId, MqttRequest payload);

/**
 * 同步确认发送消息给设备
 * @param deviceId
 * @param payload
 * @param timeout 等待超时 ms
 * @return
 */
MqttResponse request(String deviceId, MqttRequest payload, long timeout);

/**
 * 响应同步消息
 * @param message
 * @return
 */
boolean response(Message<MqttResponse> message);

/**
* 客户端请求通用回复
* @param messageId
* @param deviceId
* @param result
*/
void sendCommonResponse(Long messageId, String deviceId, Integer result);

/**
* 客户端请求通用回复
* @param messageId
* @param deviceId
*/
void sendCommonResponse(Long messageId, String deviceId);
  • 消息处理
@TopicHandler(topic = "$SYS/brokers/{node}/clients/{deviceId}/connected")
public void connected(MQTTMsg msg) {
    ClientReqVO clientReqVO = JSONObject.parseObject(msg.getPayload().toString(), ClientReqVO.class);
    process(clientReqVO);
}
  • 获取路径参数
@Service
public class DemoHandler extends BaseTopicHandler {
    @TopicHandler(topic = "IOT_SERVER/ping/{instanceId}/{taskId}/{param1}")
    public void uploadPingData(MQTTMsg msg) {
        String taskId = getParam("taskId");
        String param1 = getParam("param1");
        //或
        MqttContext.getContext().getParams().getOrDefault("taskId", null);
    }
}
  • 高可用方案(临时订阅处理完取消 适用于服务端发送控制指令,携带临时随机topic,客户端往服务端指定topic写)
@TopicHandler(topic = "IOT_SERVER/ping/{instanceId}/{taskId}")
public void uploadPingData(MQTTMsg msg) {
    if (!currentHandle()) {
        log.debug("非当前实例任务: [{}]", msg);
        return;
    }

    if (接收完毕) {
        //取消订阅
        producerHolder.removeTopic(msg.getTopic());
    }
}
  • 生成客户端链接数据
@Autowired
private MqttConfig mqttConfig;
@Autowired
private WMHolder holder;
public MqttConnVO generateMqttConnConfig(String sn) throws Exception {
    String r = mqttConfig.getAclRead().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn));
    String w = mqttConfig.getAclWrite().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn));
    MQTTClientConnData connData = holder.getClientConnData(Utils.splitToList(r), Utils.splitToList(w));
    MqttConnVO vo = new MqttConnVO();
    //缺省外网地址则返回统一地址 否则返回外网地址
    vo.setUris(ArrayUtils.isEmpty(mqttConfig.getPubServerURIs()) ? mqttConfig.getServerURIs() : mqttConfig.getPubServerURIs());
    vo.setReadTopics(r);
    vo.setWriteTopics(w);
    vo.setEnc(mqttConfig.getEncTable());
    vo.setEncSize(mqttConfig.getEncCount());
    BeanUtils.copyProperties(connData, vo);
    log.info("{} 获取mqtt: {}", sn, vo);

    return vo;
}

@Data
public class MqttConnVO implements Serializable {
    private static final long serialVersionUID = 1L;
    private String[] uris;
    private Long expire;
    private String username;
    private String password;
    private String readTopics;
    private String writeTopics;
    private String enc;
    private Integer encSize;
}

使用建议

  • 配置参见bootstrap.yml
  • 测试环境使用EMQ,产线使用阿里云

更新日志

  • Ver 1.1.1
  1. 非正则匹配主动获取topic参数导致的NPE问题
  • Ver 1.1
  1. 合并ClientApi和SettingFactory,统一管理鉴权数据
  2. ProducerHolder更名 WMHolder,加入获取客户端鉴权数据接口,能更方便的充当全局工具类使用
  • Ver 1.0.1
  1. 实现同topic下消息顺序到达同线程处理
  • Ver 1.0
  1. 实现mqtt的topic路由处理
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/kouzhaomei/windmq.git
git@gitee.com:kouzhaomei/windmq.git
kouzhaomei
windmq
windmq
master

搜索帮助

Cb406eda 1850385 E526c682 1850385