1 Star 0 Fork 0

NATS/nats-java-vertx-client

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

Vert.x NATS client

This component provides a NATS client for reading and sending messages from/to a https://nats.io server. The client supports both Core NATS as well as JetStream.

The nats-java-vertx-client is a Java client library for connecting to the NATS messaging system. It is built on top of the Vert.x event-driven framework and provides an asynchronous, non-blocking API for sending and receiving messages over NATS.

Current Release: 2.0.3   Current Snapshot: 2.1.0-SNAPSHOT

License Apache 2 Maven Central javadoc Coverage Status Build Main Badge Release Badge

Using the Vert.x NATS client

To use this component, add the following dependency to the dependencies section of your build descriptor:

Maven (in your pom.xml):

<dependency>
  <groupId>io.nats</groupId>
  <artifactId>nats-vertx-interface</artifactId>
  <version>${maven.version}</version>
</dependency>

Gradle (in your build.gradle file):

compile io.nats:nats-vertx-interface:${maven.version}

Connecting to NATS

To connect to NATS, you must first create a NatsClient object by specifying the NATS server URL and any optional configuration options. Here is an example:

        // Set options
        final NatsOptions natsOptions = new NatsOptions();
        natsOptions.getNatsBuilder().server("localhost:" + port);

        // Create client
        final NatsClient natsClient = NatsClient.create(natsOptions);
        final Future<Void> connect = natsClient.connect();

This code creates a NatsClient object, sets the configuration options, and connects to a NATS server.

The first two lines create a new NatsOptions object and set the NATS server to connect to using the servers method of the NatsBuilder object. In this case, the server is set to "localhost" and the port variable is used to specify the port number.

The following two lines create a new NatsClient object using the create method and passing in the NatsOptions object as a parameter. The connect method is then called on the NatsClient object to establish a connection to the NATS server. This method returns a Future object, representing the connection attempt's asynchronous result.


NatsClient client = NatsClient.create(config.setVertx(vertx));

// Connect
client.connect()
    .onSuccess(v -> System.out.println("NATS successfully connected!"))
    .onFailure(err -> System.out.println("Fail to connect to NATS " + err.getMessage()));

This code creates a NatsClient object, and sets the Vert.x instance, and connects to a NATS server using the Vert.x Future interface to handle the asynchronous result.

Using the create method, the first line creates a new NatsClient object and passes in a NatsOptions configuration object as a parameter. In this case, the configuration object is created by setting the Vert.x instance to use with the NATS client. The vertx is an instance of the Vert.x class. The following few lines establish a connection to the NATS server using the connect method of the NatsClient object. This method returns a Future<Void> object, representing the connection attempt's asynchronous result. The onSuccess and onFailure methods of the Future object are then called (one or the other depending on outcome) to handle the result of the connection attempt. In this example, we just print a message to the console to indicate whether the connection succeeded or failed.

Publishing

Once connected, publishing is accomplished via one of three methods:

  1. With a subject and message body:
    client
    .publish("subject", "hello world".getBytes(StandardCharsets.UTF_8))
    .onSuccess(v ->
    System.out.println("Message published!"))
    .onFailure(err ->
    System.out.println("Something went wrong " + err.getMessage()));

This code publishes a message to a NATS subject using the publish method of a NatsClient object, and handles the asynchronous result of the operation using the Vert.x Future interface.

The publish method takes two parameters: the subject to publish the message to, and the message data as a byte array.

The onSuccess and onFailure methods are called on the Future object returned by the publish method, and are used to handle the asynchronous result of the operation. In this example, we're just printing a message to the console to indicate whether the operation succeeded or failed.

If the operation succeeds, the onSuccess method is called with a Void parameter. In this case, we're just printing the message "Message published!" to the console.

If the operation fails, the onFailure method is called with a Throwable parameter. In this case, we're just printing a message to the console that includes the error message returned by the Throwable object.

  1. With a subject and message body, as well as a subject for the receiver to reply to:
    client
    .publish("subject", "replyto", "hello world".getBytes(StandardCharsets.UTF_8))
    .onSuccess(v -> System.out.println("Message published!"))
    .onFailure(err ->   System.out.println("Something went wrong " + err.getMessage()));

This code publishes a message to a NATS subject and specifies a reply-to subject to use for any responses received in reply to the message. It also handles the asynchronous result of the operation using the Vert.x Future interface.

The publish method takes three parameters: the subject to publish the message to, the reply-to subject to use for any responses, and the message data as a byte array.

The onSuccess and onFailure methods are called on the Future object returned by the publish method, and are used to handle the asynchronous result of the operation. In this example, we're just printing a message to the console to indicate whether the operation succeeded or failed.

If the operation succeeds, the onSuccess method is called with a Void parameter. In this case, we're just printing the message "Message published!" to the console.

If the operation fails, the onFailure method is called with a Throwable parameter. In this case, we're just printing a message to the console that includes the error message returned by the Throwable object.

  1. When a request expects a reply, a response is provided. Under the covers as request/reply pair is the same as a publish/subscribe only the library manages the subscription for you.

client
    .request("subject", "hello world".getBytes(StandardCharsets.UTF_8))
    .onSuccess(response ->
    System.out.println("Received response " + response.getData()))
    .onFailure(err ->
    System.out.println("Something went wrong " + err.getMessage()));

When a request is made and a reply is expected, the NATS messaging system provides a response. This is achieved through a request/reply pair. This is the same as a publish/subscribe pair, except the library handles the subscription for you.

This code makes a request to a NATS subject and handles a response using the request method of a NatsClient object, and handles the asynchronous result of the operation using the Vert.x Future interface.

The request method takes two parameters: the subject to send the request to, and the message data as a byte array. When a response is received, the onSuccess method is called with a Message parameter, which contains the response data as a byte array.

If the operation fails, the onFailure method is called with a Throwable parameter. In this case, we're just printing a message to the console that includes the error message returned by the Throwable object.

All of these methods, as well as the incoming message code use byte arrays for maximum flexibility. Applications can send JSON, Strings, YAML, Protocol Buffers, or any other format through NATS to applications written in a wide range of languages.

Subscribing

The Java NATS library also provides a mechanism to listen to messages.


natsClient.subscribe(SUBJECT_NAME, "FOO", event -> {
doSomethingWithTheEvent(event);
        
        });

Unsubscribing from messages.


client.unsubscribe(SUBJECT_NAME)
        .onFailure(Throwable::printStackTrace)
        .onSuccess(event -> System.out.println("Success"));

JetStream

Publishing and subscribing to JetStream enabled servers is straightforward. A JetStream enabled application will connect to a server, establish a JetStream context, and then publish or subscribe. This can be mixed and matched with standard NATS subject, and JetStream subscribers, depending on configuration, receive messages from both streams and directly from other NATS producers.

The JetStream Context

After establishing a connection as described above, create a JetStream Context.


final Future<NatsStream> streamFuture = natsClient.jetStream();

streamFuture.onSuccess(natsStream -> {
doSomethingWithStream(natsStream)
        }).onFailure(error -> {
handleTheStreamFailed(error);
        });

You can pass options to configure the JetStream client, although the defaults should suffice for most users. See the JetStreamOptions class.

There is no limit to the number of contexts used, although normally, one would only require a single context.

Publishing

To publish messages, use the publish method.




final NatsStream jetStreamPub = ...
final NatsStream jetStreamSub = ...

final String data = "data";

jetStreamSub.subscribe(SUBJECT_NAME, event -> {
doSomethingWithMessage(event.message());

    }, true, PushSubscribeOptions.builder().build());


// Send a message
final NatsMessage message = NatsMessage.builder()
    .subject(SUBJECT_NAME)
    .data(data + i, StandardCharsets.UTF_8).build();
            jetStreamPub.publish(message).onSuccess(event -> ...).onError(error -> ...);

To unsubscribe from JetStream the interface is similar to unsubscribing to a NATS subscription.

        jetStreamSub.unsubscribe(SUBJECT_NAME).onSuccess(event -> System.out.println("Unsubscribed"))
    .onFailure(Throwable::printStackTrace);

There are a variety of publish options that can be set when publishing. When duplicate checking has been enabled on the stream, a message ID should be set. One set of options is expectations. You can set a publish expectation such as a particular stream name, previous message ID, or previous sequence number. These are hints to the server that it should reject messages where these are not met, primarily for enforcing your ordering or ensuring messages are not stored on the wrong stream.


void publish(Message data, Handler<AsyncResult<Void>> handler);
Future<Void> publish(Message data);
Future<Void> publish(String subject, String replyTo, String message);
Future<Void> publish(String subject, String message);

Subscribing

There are two methods of subscribing, Push and Pull with each variety having its own set of options and abilities.

Push Subscribing

Push subscriptions are asynchronous. The server pushes messages to the client.



PushSubscribeOptions so = PushSubscribeOptions.builder()
    .durable("optional-durable-name")
    .build();

boolean autoAck = ...

    js.subscribe("my-subject", (msg) -> {
// Process the message.
// Ack the message depending on the ack model
    }, autoAck, so)
    .onSuccess(done ->
    System.out.println("Subscribe success."))
    .onFailure(err ->
    System.out.println("Something went wrong " + err.getMessage()));

Pull Subscribing

Pull subscriptions are always synchronous. The server organizes messages into a batch that it sends when requested.

PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
    .durable("durable-name-is-required")
    .build();

js.subscribe("subject", pullOptions)
.onSuccess(done ->
    System.out.println("Subscribe success.")
JetStreamSubscription sub = done.result()

      sub
          .fetch(100, Duration.ofSeconds(1))
    .onSuccess(messages ->
    for (Message m : messages) {
    // process message
    m.ackAsync().onSuccess(e -> ...).onError(err -> ...);
    }
    )
    .onFailure(err ->
    System.out.println("Something went wrong " + err.getMessage()))

    .onFailure(err ->
    System.out.println("Something went wrong " + err.getMessage()));

The fetch pull is a macro pull that uses advanced pulls under the covers to return a list of messages. The list may be empty or contain at most the batch size. All status messages are handled for you. The client can provide a timeout to wait for the first message in a batch. The timeout may be exceeded if the server sends messages very near the end of the timeout period.

Ordered Push Subscription Option

See https://github.com/nats-io/nats.java#ordered-push-subscription-option

Subscription Creation Checks

See https://github.com/nats-io/nats.java#subscription-creation-checks

Message Acknowledgements

There are multiple types of acknowledgments in JetStream:

  • Message.ackAsync(): Acknowledges a message.
  • Message.ackSync(Duration): Acknowledges a message and waits for a confirmation. When used with deduplications, this creates exactly once delivery guarantees (within the deduplication window). This deduplication may significantly impact the performance of the system.
  • Message.nakAsync(): A negative acknowledgment indicating processing failed, and the message should be resent later.
  • Message.termAsync(): Never send this message again, regardless of configuration.
  • Message.inProgressAsync(): The message is being processed, and reset the redelivery timer in the server. The message must be acknowledged later when processing is complete.

Note that exactly once delivery guarantee can be achieved by using a consumer with explicit ack mode attached to stream setup with a deduplication window and using the ackSync to acknowledge messages. The guarantee is only valid for the duration of the deduplication window.

You should always use the async versions of the methods when running in the vert.x event loop.

Conclusion

The nats-java-vertx-client library provides a simple and easy-to-use API for connecting to NATS messaging system from Java applications, using the Vert.x interface. With the asynchronous, non-blocking API and Vert.x event-driven framework, it is well-suited for building high-performance, scalable messaging applications.

Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

暂无描述 展开 收起
Java
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/nats-io/nats-java-vertx-client.git
git@gitee.com:nats-io/nats-java-vertx-client.git
nats-io
nats-java-vertx-client
nats-java-vertx-client
main

搜索帮助