1 Star 5 Fork 5

tanghc / paho-mqtt-client

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MIT

paho-mqtt-client

一个MQTT客户端,基于paho

使用方式

maven依赖:

<dependency>
    <groupId>net.oschina.durcframework</groupId>
    <artifactId>paho-mqtt-client</artifactId>
    <version>1.0.0</version>
</dependency>

编写代码

PahoMqttClient mqttClient = PahoMqttClient.create()
    .broker(broker)
    .auth(username, password)
    .clientId(clientId)
    .cleanSession(true)
    // 自动重连
    .automaticReconnect(true)
    // 订阅消息
    .subscribe(topic, 2)
    // 设置回调,处理消息    
    .callback(new MyMqttCallback())
    .connect();


public static class MyMqttCallback extends PahoMqttCallback {

    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("[client]失去连接:" + throwable.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        System.out.println("[client]收到消息: topic:" + topic + ", msg:" + mqttMessage.toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}

发送消息

mqttClient.publish(upLocationTopic, location.getBytes(), 1);

测试用例

这里使用车联网作为测试场景,汽车终端设备每隔一段时间上报定位,IOT平台每隔一段时间下发OTA升级指令。

IOT平台实现功能如下:

  • 订阅定位上报topic,接收设备上报的定位数据
  • 发送OTA消息到汽车终端

汽车终端设备实现功能如下:

  • 订阅点对点topic,接收平台端下发的指令
  • 订阅定位上报topic,上报定位信息

IOT平台功能代码

package com.gitee.mqttclient;

import com.gitee.mqttclient.callback.PahoMqttCallback;
import com.gitee.mqttclient.client.PahoMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * 模拟平台端
 * @author thc
 */
public class ServerTest {

    protected static String broker = "tcp://1.1.1.1:1883";
    protected static String username = "s001";
    protected static String password = "123456";

    static PahoMqttClient mqttClient;


    /**
     * 启动平台端
     * @throws InterruptedException
     * @throws MqttException
     */
    @Test
    public void server() throws InterruptedException, MqttException {
        String clientId = "server-node-1";
        // 平台端监听所有车型定位topic
        String topic = "prod/+/+/base/location/#";
        mqttClient = PahoMqttClient.create()
                .broker(broker)
                .auth(username, password)
                .clientId(clientId)
                .cleanSession(true)
                // 自动重连
                .automaticReconnect(true)
                // 订阅消息
                .subscribe(topic, 1)
                .callback(new ServerMqttCallback())
                .connect();

        // 另起一个线程,进行指令下发
        new Thread(() -> {
            // 每隔20秒对A000001这辆车进行OTA升级
            String downTopic = "o2o/A000001/ota";
            while (true) {
                String otaContent = String.valueOf(System.currentTimeMillis());
                System.out.println("发送ota指令:" + otaContent);
                byte[] otaFile = otaContent.getBytes();
                try {
                    mqttClient.publish(downTopic, otaFile, 2);
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
                try {
                    TimeUnit.SECONDS.sleep(20);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        TimeUnit.DAYS.sleep(1);
    }

    /**
     * 服务端接收消息回调
     */
    public static class ServerMqttCallback extends PahoMqttCallback {

        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("[server]失去连接:" + throwable.getMessage());
        }

        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
            System.out.println("[server]收到消息: topic:" + topic + ", msg:" + mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    }

}

汽车终端功能代码

package com.gitee.mqttclient;

import com.gitee.mqttclient.callback.PahoMqttCallback;
import com.gitee.mqttclient.client.PahoMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 模拟设备端,汽车终端
 * @author thc
 */
public class ClientTest {

    protected static String broker = "tcp://1.1.1.1:1883";
    protected static String username = "u001";
    protected static String password = "123456";

    // 汽车类型
    static String carType = "suv";
    // 车架号
    static String vin = "A000001";

    // 格式:client-用户名-车架号
    static String clientId = "client-" + username + "-" + vin;


    private static PahoMqttClient mqttClient;

    private void init() throws MqttException {
        // 订阅点对点消息
        String topic = "o2o/" + vin + "/+";
        mqttClient = PahoMqttClient.create()
                .broker(broker)
                .auth(username, password)
                .clientId(clientId)
                .cleanSession(true)
                // 自动重连
                .automaticReconnect(true)
                // 订阅消息
                .subscribe(topic, 2)
                .callback(new MyMqttCallback())
                .connect();
    }

    @Test
    public void device() throws InterruptedException {
        try {
            init();
        } catch (MqttException me) {
            System.err.println("MQTT连接失败");
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
            return;
        }

        // 另起一个线程,上报信息
        new Thread(() -> {
            // 上报定位topic
            String upLocationTopic = String.format("prod/%s/%s/base/location", carType, vin);

            // 每隔10秒上报一次定位信息
            while (true) {
                try {
                    // 随机经纬度
                    String lon = "120.123" + new Random().nextInt(100);
                    String lat = "30.123" + new Random().nextInt(100);
                    String location = lon + "," + lat;
                    System.out.println("上报定位信息:" + location);
                    // 上报经纬度
                    mqttClient.publish(upLocationTopic, location.getBytes(), 1);
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        // stop here
        TimeUnit.DAYS.sleep(1);
    }

    public static class MyMqttCallback extends PahoMqttCallback {

        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("[client]失去连接:" + throwable.getMessage());
        }

        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
            System.out.println("[client]收到消息: topic:" + topic + ", msg:" + mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    }


}

broker, username, password改成自己的

先执行平台端测试用例,在执行设备端测试用例。

完整代码参见项目中test包

MIT License Copyright (c) 2023 tanghc Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

简介

一个MQTT客户端,基于paho,可实现topic订阅发布,掉线重连等功能 展开 收起
Java
MIT
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/durcframework/paho-mqtt-client.git
git@gitee.com:durcframework/paho-mqtt-client.git
durcframework
paho-mqtt-client
paho-mqtt-client
master

搜索帮助