9 Star 16 Fork 9

iots / mqtt-client

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

Overview

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.

mqtt-client provides an ASL 2.0 licensed API to MQTT. It takes care of automatically reconnecting to your MQTT server and restoring your client session if any network failures occur. Applications can use a blocking API style, a futures based API, or a callback/continuations passing API style.

Using from Maven

Add the following to your maven pom.xml file.

<dependency>
  <groupId>org.fusesource.mqtt-client</groupId>
  <artifactId>mqtt-client</artifactId>
  <version>1.12</version>
</dependency>

Using from Gradle

Add the following to your gradle file.

compile 'org.fusesource.mqtt-client:mqtt-client:1.12'

Using from any Other Build System

Download the uber jar file and add it to your build. The uber contains all the stripped down dependencies which the mqtt-client depends on from other projects.

Using on Java 1.4

We also provide an java 1.4 uber jar file which is compatible with Java 1.4 JVMs. This version of the jar does not support SSL connections since the SSLEngine class used to implement SSL on NIO was not introduced until Java 1.5.

Configuring the MQTT Connection

The blocking, future, and callback APIs all share the same connection setup. You create a new instance of the MQTT class and configure it with connection and socket related options. At a minimum the setHost method be called before attempting to connect.

MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
// or 
mqtt.setHost("tcp://localhost:1883");

Controlling MQTT Options

  • setClientId : Use to set the client Id of the session. This is what an MQTT server uses to identify a session where setCleanSession(false); is being used. The id must be 23 characters or less. Defaults to auto generated id (based on your socket address, port and timestamp).

  • setCleanSession : Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. Defaults to true.

  • setKeepAlive : Configures the Keep Alive timer in seconds. Defines the maximum time interval between messages received from a client. It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout.

  • setUserName : Sets the user name used to authenticate against the server.

  • setPassword : Sets the password used to authenticate against the server.

  • setWillTopic: If set the server will publish the client's Will message to the specified topics if the client has an unexpected disconnection.

  • setWillMessage: The Will message to send. Defaults to a zero length message.

  • setWillQos : Sets the quality of service to use for the Will message. Defaults to QoS.AT_MOST_ONCE.

  • setWillRetain: Set to true if you want the Will to be published with the retain option.

  • setVersion: Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the 3.1 protocol version.

Controlling Connection Reconnects

Connection will automatically reconnect and re-establish messaging session if any network error occurs. You can control how often the reconnect is attempted and define maximum number of attempts of reconnects using the following methods:

  • setConnectAttemptsMax : The maximum number of reconnect attempts before an error is reported back to the client on the first attempt by the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to -1.
  • setReconnectAttemptsMax : The maximum number of reconnect attempts before an error is reported back to the client after a server connection had previously been established. Set to -1 to use unlimited attempts. Defaults to -1.
  • setReconnectDelay : How long to wait in ms before the first reconnect attempt. Defaults to 10.
  • setReconnectDelayMax : The maximum amount of time in ms to wait between reconnect attempts. Defaults to 30,000.
  • setReconnectBackOffMultiplier : The Exponential backoff be used between reconnect attempts. Set to 1 to disable exponential backoff. Defaults to 2.

Configuring Socket Options

You can adjust some socket options by using the following methods:

  • setReceiveBufferSize : Sets the size of the internal socket receive buffer. Defaults to 65536 (64k)

  • setSendBufferSize : Sets the size of the internal socket send buffer.
    Defaults to 65536 (64k)

  • setTrafficClass : Sets traffic class or type-of-service octet in the IP header for packets sent from the transport. Defaults to 8 which means the traffic should be optimized for throughput.

Throttling Connections

If you want slow down the read or write rate of your connections, use the following methods:

  • setMaxReadRate : Sets the maximum bytes per second that this transport will receive data at. This setting throttles reads so that the rate is not exceeded. Defaults to 0 which disables throttling.

  • setMaxWriteRate : Sets the maximum bytes per second that this transport will send data at. This setting throttles writes so that the rate is not exceeded. Defaults to 0 which disables throttling.

Using SSL connections

If you want to connect over SSL/TLS instead of TCP, use an "ssl://" or "tls://" URI prefix instead of "tcp://" for the host field. For finer grained control of which algorithm is used. Supported protocol values are:

  • ssl:// - Use the JVM default version of the SSL algorithm.
  • sslv*:// - Use a specific SSL version where * is a version supported by your JVM. Example: sslv3
  • tls:// - Use the JVM default version of the TLS algorithm.
  • tlsv*:// - Use a specific TLS version where * is a version supported by your JVM. Example: tlsv1.1

The client will use the default JVM SSLContext which is configured via JVM system properties unless you configure the MQTT instance using the setSslContext method.

SSL connections perform blocking operations against internal thread pool unless you call the setBlockingExecutor method to configure that executor they will use instead.

Selecting the Dispatch Queue

A HawtDispatch dispatch queue is used to synchronize access to the connection. If an explicit queue is not configured via the setDispatchQueue method, then a new queue will be created for the connection. Setting an explicit queue might be handy if you want multiple connection to share the same queue for synchronization.

Using the Blocking API

The MQTT.connectBlocking method establishes a connection and provides you a connection with an blocking API.

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();

Publish messages to a topic using the publish method:

connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);

You can subscribe to multiple topics using the the subscribe method:

Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);

Then receive and acknowledge consumption of messages using the receive, and ack methods:

Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// process the message then:
message.ack();

Finally to disconnect:

connection.disconnect();

Using the Future based API

The MQTT.connectFuture method establishes a connection and provides you a connection with an futures style API. All operations against the connection are non-blocking and return the result via a Future.

FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
f1.await();

Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});
byte[] qoses = f2.await();

// We can start future receive..
Future<Message> receive = connection.receive();

// send the message..
Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);

// Then the receive will get the message.
Message message = receive.await();
message.ack();

Future<Void> f4 = connection.disconnect();
f4.await();

Using the Callback/Continuation Passing based API

The MQTT.connectCallback method establishes a connection and provides you a connection with an callback style API. This is the most complex to use API style, but can provide the best performance. The future and blocking APIs use the callback api under the covers. All operations on the connection are non-blocking and results of an operation are passed to callback interfaces you implement.

Example:

final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {
  
    public void onDisconnected() {
    }
    public void onConnected() {
    }

    public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
        // You can now process a received message from a topic.
        // Once process execute the ack runnable.
        ack.run();
    }
    public void onFailure(Throwable value) {
        connection.close(null); // a connection failure occured.
    }
})
connection.connect(new Callback<Void>() {
    public void onFailure(Throwable value) {
        result.failure(value); // If we could not connect to the server.
    }

    // Once we connect..
    public void onSuccess(Void v) {
    
        // Subscribe to a topic
        Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
        connection.subscribe(topics, new Callback<byte[]>() {
            public void onSuccess(byte[] qoses) {
                // The result of the subcribe request.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // subscribe failed.
            }
        });

        // Send a message to a topic
        connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
            public void onSuccess(Void v) {
              // the pubish operation completed successfully.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // publish failed.
            }
        });
        
        // To disconnect..
        connection.disconnect(new Callback<Void>() {
            public void onSuccess(Void v) {
              // called once the connection is disconnected.
            }
            public void onFailure(Throwable value) {
              // Disconnects never fail.
            }
        });
    }
});

Every connection has a HawtDispatch dispatch queue which it uses to process IO events for the socket. The dispatch queue is an Executor that provides serial execution of IO and processing events and is used to ensure synchronized access of connection.

The callbacks will be executing the dispatch queue associated with the connection so it safe to use the connection from the callback but you MUST NOT perform any blocking operations within the callback. If you need to perform some processing which MAY block, you must send it to another thread pool for processing. Furthermore, if another thread needs to interact with the connection it can only do it by using a Runnable submitted to the connection's dispatch queue.

Example of executing a Runnable on the connection's dispatch queue:

connection.getDispatchQueue().execute(new Runnable(){
    public void run() {
      connection.publish( ..... );
    }
});
Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

mqtt-client provides an ASL 2.0 licensed API to MQTT. It takes care of automatically reconnecting to your MQTT server and restoring your client session if any network failures occur. 展开 收起
Java 等 4 种语言
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/iots/mqtt-client.git
git@gitee.com:iots/mqtt-client.git
iots
mqtt-client
mqtt-client
master

搜索帮助