# SimpleMqttClient **Repository Path**: kami_xenos/simple-mqtt-client ## Basic Information - **Project Name**: SimpleMqttClient - **Description**: 基于org.eclipse.paho.client.mqttv3的mqtt客户端的简单实现 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 10 - **Forks**: 7 - **Created**: 2020-09-05 - **Last Updated**: 2025-01-03 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # SimpleMqttClient ## 介绍 基于org.eclipse.paho.client.mqttv3实现的一个简易的MQTT客户端 ## MQTT协议简介 > `MQTT`全称为**Message Queuing Telemetry Transport(消息队列遥测传输)**,是一种基于**发布/订阅**范式的“轻量级”消息协议,需要一个消息中间件 > > 协议主要有三种身份:**发布者(Publisher)**、**代理服务器(Broker)**、**订阅者(Subscriber)**。发布者发布消息到代理服务器,再由订阅者消费消息。 这里我们选择EMQX作为代理服务器 ## 编写一个简单的连接客户端 编写一个自定义的MQTT的客户端,我们需要实现一下几个方法: - 连接服务器 - 订阅消息 - 推送消息 - 断开连接 - 断线是否重连 以下代码是基于`org.eclipse.paho.client.mqttv3`实现的一个简单的客户端 ```java /************************* Client ****************************************/ package xenoscode.cn.mqttclient.mqtt; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * @author Xenos * @version V1.0 * @Package xenoscode.cn.mqttclient.mqtt * @date 2020/9/1 16:46 */ public class SimpleMqttClient { /** * MQTT异步客户端 */ private MqttAsyncClient client = null; /** * 连接配置 */ private MqttConnectOptions options = null; /** * 客户端的ID,唯一且不可重复 */ private String clientid; private String userName; /** * 服务器地址 */ private String host; private int timeOut; private int aliveTime; /** * 订阅的主题列表 */ private String[] listTopic; /** * 主题列表对应的Qos列表 */ private int[] listQos; /** * 最大尝试连接次数 */ private int maxConnectTimes; public SimpleMqttClient(String clientid, String userName, String host, int timeOut, int aliveTime, String[] listTopic, int[] listQos, int maxConnectTimes) { this.clientid = clientid; this.userName = userName; this.host = host; this.timeOut = timeOut; this.aliveTime = aliveTime; this.listTopic = listTopic; this.listQos = listQos; this.maxConnectTimes = maxConnectTimes; } /** * 连接MQTT服务器 */ public synchronized void connect() { if (options == null) { setOptions(); } if (client == null) { creatClient(); } int connectTimes = 0; while (connectTimes < maxConnectTimes && !client.isConnected()) { try { IMqttToken token = client.connect(options); token.waitForCompletion(); connectTimes++; } catch (Exception e) { e.printStackTrace(); System.out.println(clientid + " 连接时发生错误: " + e.toString()); } } } /** * 断开与MQTT服务器的连接 */ public synchronized void disconnect() { if (client != null && client.isConnected()) { try { IMqttToken token = client.disconnect(); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); System.out.println(clientid + " 断开连接时发生错误: " + e.toString()); } } client = null; } /** * 刷新MQTT的连接 */ public synchronized void refresh() { disconnect(); setOptions(); creatClient(); connect(); } /** * 消息订阅 */ public void subscribe() { if (client != null && client.isConnected()) { try { IMqttToken token = client.subscribe(listTopic, listQos); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); System.out.println(clientid + "订阅主题时发生错误: " + e.toString()); } } } /** * 消息推送 * * @param topic 消息的主题名 * @param message 消息报文 */ public void publish(String topic, MqttMessage message) { if (client != null && client.isConnected()) { try { IMqttDeliveryToken token = client.publish(topic, message); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); System.out.println(clientid + "推送消息时发生错误: " + e.toString()); } } } /** * @return 是否处于连接状态 */ public boolean isConnected() { return client != null && client.isConnected(); } public String getClientid() { return clientid; } public int getMaxConnectTimes() { return maxConnectTimes; } /** * 设置连接属性 */ private void setOptions() { if (options != null) { options = null; } options = new MqttConnectOptions(); //将CleanSession设置为true时,一旦客户端断开连接,就会清除相关Session options.setCleanSession(true); options.setConnectionTimeout(timeOut); options.setKeepAliveInterval(aliveTime); options.setUserName(userName); //org.eclipse.paho.client.mqttv3提供的自动重连,默认为false,也可以在回调中进行重连 // options.setAutomaticReconnect(true); } /** * 创建客户端 */ private void creatClient() { if (client == null) { try { // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttAsyncClient(host, clientid, new MemoryPersistence()); // 设置回调函数 client.setCallback(new SimpleMqttClientCallback(SimpleMqttClient.this)); } catch (MqttException e) { e.printStackTrace(); System.out.println("创建连接客户端实例: [" + clientid + "] 时发生错误:" + e.toString()); } } } } /************************* 回调类 SimpleMqttClientCallback ****************************************/ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; public class SimpleMqttClientCallback implements MqttCallbackExtended { private SimpleMqttClient client; private int connectTimes = 0; public SimpleMqttClientCallback(SimpleMqttClient client) { this.client = client; } @Override public void connectComplete(boolean b, String s) { System.out.println("————" + client.getClientid() + " 连接成功!————"); //连接成功后,自动订阅主题 client.subscribe(); connectTimes = 0; } @Override public void connectionLost(Throwable throwable) { System.out.println("————" + client.getClientid() + " 连接丢失!————"); //可以在此处做重连处理 if (connectTimes < client.getMaxConnectTimes()) { client.refresh(); connectTimes++; } else { client.disconnect(); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { LocalDateTime startTime = LocalDateTime.now(); System.out.println("[MQTT]" + client.getClientid() + " ----成功接收消息!---- 时间: " + startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))); String content = new String(mqttMessage.getPayload()); System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + mqttMessage.getQos()); System.out.println("接收消息内容 : " + content); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("[MQTT]" + client.getClientid() + " ----成功发送消息!---- 时间: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))); } } ``` 将客户端连接需要的一些属性写在配置文件中 ```yaml mqtt: # emq的默认端口为1883 host: tcp://127.0.0.1:1883 client: clientid: testClient time-out: 10 alive-time: 20 max-connect-times: 5 topics: ["HELLOWORLD"] qos: [2] ``` 编写配置类 ```java /******************** SimpleMqttClientProperties ***********************************/ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Component @Setter @Getter @ConfigurationProperties("mqtt.client") public class SimpleMqttClientProperties { private String clientid; private String userName; private int timeOut; private int aliveTime; private int maxConnectTimes; private String[] topics; private int[] qos; } /******************** SimpleMqttClientProperties ***********************************/ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import xenoscode.cn.mqttclient.mqtt.SimpleMqttClient; @Configuration public class MqttClientPoolConfiguration { @Value("${mqtt.host}") private String host; @Autowired private SimpleMqttClientProperties simpleMqttClientProperties; @Bean SimpleMqttClient mqttClient() { SimpleMqttClient mqttClient = new SimpleMqttClient(simpleMqttClientProperties.getClientid(), simpleMqttClientProperties.getUserName(), host, simpleMqttClientProperties.getTimeOut(), simpleMqttClientProperties.getAliveTime(), simpleMqttClientProperties.getTopics(), simpleMqttClientProperties.getQos(), simpleMqttClientProperties.getMaxConnectTimes()); return mqttClient; } } ``` 初始化连接 ```java /************************* 启动类 ********************************/ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SimpleMqttPoolApplication { public static void main(String[] args) { SpringApplication.run(SimpleMqttPoolApplication.class, args); } } /************************* Initialize ********************************/ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import xenoscode.cn.mqttclient.mqtt.SimpleMqttClient; @Component public class Initialize implements CommandLineRunner { @Autowired private SimpleMqttClient simpleMqttClient; @Override public void run(String... args) throws Exception { simpleMqttClient.connect(); } } ``` > 实现`CommandLineRunner`接口,并重写`run`方法,可以让其中的方法在Spring Boot项目启动后执行 看一下运行结果 ```txt . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.3.3.RELEASE) 2020-09-04 23:18:45.647 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Starting SimpleMqttPoolApplication on Xenos with PID 5580 (D:\study\workspace\SimpleMqttPool\target\classes started by 89314 in D:\study\workspace\SimpleMqttPool) 2020-09-04 23:18:45.649 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : No active profile set, falling back to default profiles: default 2020-09-04 23:18:46.088 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Started SimpleMqttPoolApplication in 0.747 seconds (JVM running for 1.031) ————testClient 连接成功!———— ``` 可以看到日志中输出连接成功的字样,为了确定是否真的连接成功,我们可以登录**EMQ的Dashboard**查看(dashboard的默认端口为18083,用户名为admin,密码为public): ![login](./img/dashbord-login.PNG) ![client](./img/dashbord-client.PNG) 可以看到客户端与服务器确实已经连接,在看一下是否有主题的订阅 ![subscribe](./img/dashbord-subscribe.PNG) ### 对订阅进行测试 点击Tools,在WebSocket中,可以进行订阅和推送的测试 连接测试用的客户端 ![test client](./img/dashbord-test-client.PNG) 推送测试报文 ![publish test](./img/dashbord-publish-test.PNG) 查看日志输出是否接收到报文 ```txt . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.3.3.RELEASE) 2020-09-04 23:18:45.647 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Starting SimpleMqttPoolApplication on Xenos with PID 5580 (D:\study\workspace\SimpleMqttPool\target\classes started by 89314 in D:\study\workspace\SimpleMqttPool) 2020-09-04 23:18:45.649 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : No active profile set, falling back to default profiles: default 2020-09-04 23:18:46.088 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Started SimpleMqttPoolApplication in 0.747 seconds (JVM running for 1.031) ————testClient 连接成功!———— [MQTT]testClient ----成功接收消息!---- 时间: 2020-09-04 23:44:51.753 接收消息主题 : HELLOWORLD 接收消息Qos : 2 接收消息内容 : { "msg": "Hello, World!" } ``` 可以看到我们的自定义的客户端成功接收到了来自另外一个客户端的消息。 ### 进行消息推送测试 编写一个测试用的`Controller`,每次访问该地址都想服务端推送一条测试报文 ```java import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import xenoscode.cn.mqttclient.mqtt.SimpleMqttClient; @RestController public class TestController { @Autowired private SimpleMqttClient simpleMqttClient; @GetMapping("/test/publish") public void publishTest() { byte[] payload = "publish test".getBytes(); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(2); mqttMessage.setPayload(payload); String topic = "PUBLISH_TEST"; simpleMqttClient.publish(topic, mqttMessage); } } ``` 在dashboard中订阅该主题 ![publish test](./img/dashbord-subscribe-test.PNG) 访问接口,查看结果 后台日志 ```txt . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.3.3.RELEASE) 2020-09-04 23:59:20.843 INFO 3016 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Starting SimpleMqttPoolApplication on Xenos with PID 3016 (D:\study\workspace\SimpleMqttPool\target\classes started by 89314 in D:\study\workspace\SimpleMqttPool) 2020-09-04 23:59:20.845 INFO 3016 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : No active profile set, falling back to default profiles: default 2020-09-04 23:59:21.507 INFO 3016 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http) 2020-09-04 23:59:21.515 INFO 3016 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2020-09-04 23:59:21.515 INFO 3016 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.37] 2020-09-04 23:59:21.581 INFO 3016 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2020-09-04 23:59:21.581 INFO 3016 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 698 ms 2020-09-04 23:59:21.747 INFO 3016 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor' 2020-09-04 23:59:21.863 INFO 3016 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2020-09-04 23:59:21.869 INFO 3016 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Started SimpleMqttPoolApplication in 1.321 seconds (JVM running for 1.621) ————testClient 连接成功!———— 2020-09-04 23:59:24.962 INFO 3016 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2020-09-04 23:59:24.962 INFO 3016 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2020-09-04 23:59:24.965 INFO 3016 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms [MQTT]testClient ----成功发送消息!---- 时间: 2020-09-04 23:59:24.983 ``` Dashboard结果 ![publish test](./img/dashbord-subscribe-test-result.PNG) 可以看到当前的客户端能够将数据上送到服务器中