From 303044fa4b3f7302015e57b883c7929d79a2c959 Mon Sep 17 00:00:00 2001
From: zt <921646028@qq.com>
Date: Sun, 14 Aug 2022 13:22:30 +0800
Subject: [PATCH 1/2] rocketMq
---
pom.xml | 5 ++
.../com/example/demo/rocketmq/MqConsumer.java | 22 +++++++++
.../example/demo/rocketmq/MqConsumerTest.java | 38 +++++++++++++++
.../com/example/demo/rocketmq/MqProducer.java | 38 +++++++++++++++
.../example/demo/rocketmq/MqProducerTest.java | 46 +++++++++++++++++++
5 files changed, 149 insertions(+)
create mode 100644 src/main/java/com/example/demo/rocketmq/MqConsumer.java
create mode 100644 src/main/java/com/example/demo/rocketmq/MqConsumerTest.java
create mode 100644 src/main/java/com/example/demo/rocketmq/MqProducer.java
create mode 100644 src/main/java/com/example/demo/rocketmq/MqProducerTest.java
diff --git a/pom.xml b/pom.xml
index 9c7072e..b5c3149 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,11 @@
log4j-core
2.17.2
+
+ org.apache.rocketmq
+ rocketmq-client
+ 4.7.1
+
diff --git a/src/main/java/com/example/demo/rocketmq/MqConsumer.java b/src/main/java/com/example/demo/rocketmq/MqConsumer.java
new file mode 100644
index 0000000..05e404f
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketmq/MqConsumer.java
@@ -0,0 +1,22 @@
+package com.example.demo.rocketmq;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+
+public class MqConsumer {
+ private static DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("myConsumer");
+ private static int initialstate=0;
+
+ public MqConsumer() {
+ }
+ public static DefaultMQPushConsumer getDefaultMQPushConsumer(){
+ if(consumer==null){
+ consumer=new DefaultMQPushConsumer("myConsumer");
+ }
+ if(initialstate==0){
+ consumer.setNamesrvAddr("103.160.151.37:9876");
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ }
+ return consumer;
+ }
+}
diff --git a/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java b/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java
new file mode 100644
index 0000000..3064c5b
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java
@@ -0,0 +1,38 @@
+package com.example.demo.rocketmq;
+
+import com.sun.org.apache.xpath.internal.operations.String;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class MqConsumerTest {
+ public static void main(String[] args) {
+
+ }
+ public static void receiveMsg(){
+ DefaultMQPushConsumer consumer=MqConsumer.getDefaultMQPushConsumer();
+ try {
+ consumer.subscribe("TopicTest","*");
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+ MessageExt msg=msgs.get(0);
+ if(msg.getTopic().equals("TopicTest")){
+ if(msg.getTags().equals("TagA")){
+ System.out.println(msg.getBody().toString());
+ }
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ consumer.start();
+ } catch (MQClientException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/com/example/demo/rocketmq/MqProducer.java b/src/main/java/com/example/demo/rocketmq/MqProducer.java
new file mode 100644
index 0000000..3fc6941
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketmq/MqProducer.java
@@ -0,0 +1,38 @@
+package com.example.demo.rocketmq;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class MqProducer {
+ // 实例化消息生产者Producer
+ private static DefaultMQProducer producer = new DefaultMQProducer("myProducer");
+ private static int initialState=0;
+
+ public MqProducer() {
+ }
+ public static DefaultMQProducer getDefaultMQProducer() throws MQClientException {
+ if(producer==null){
+ producer=new DefaultMQProducer("myProducer");
+ }
+ if(initialState==0){
+ // 设置NameServer的地址
+ producer.setNamesrvAddr("103.160.151.37:9876");
+ // 启动Producer实例
+ producer.start();
+ initialState=1;
+ }
+ return producer;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+
+
+
+
+ }
+}
+
diff --git a/src/main/java/com/example/demo/rocketmq/MqProducerTest.java b/src/main/java/com/example/demo/rocketmq/MqProducerTest.java
new file mode 100644
index 0000000..d43def3
--- /dev/null
+++ b/src/main/java/com/example/demo/rocketmq/MqProducerTest.java
@@ -0,0 +1,46 @@
+package com.example.demo.rocketmq;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.io.UnsupportedEncodingException;
+
+public class MqProducerTest {
+ public static void main(String[] args) {
+ sendMsg();
+ }
+ public static void sendMsg() {
+ DefaultMQProducer producer= null;
+ try {
+ producer = MqProducer.getDefaultMQProducer();
+ for (int i = 0; i < 100; i++) {
+ // 创建消息,并指定Topic,Tag和消息体
+ Message msg = new Message(
+ "TopicTest" /* Topic */,
+ "TagA" /* Tag */,
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
+ );
+ // 发送消息到一个Broker
+ SendResult sendResult = producer.send(msg);
+ // 通过sendResult返回消息是否成功送达
+ System.out.printf("%s%n", sendResult);
+ }
+ } catch (MQClientException | UnsupportedEncodingException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (RemotingException e) {
+ e.printStackTrace();
+ } catch (MQBrokerException e) {
+ e.printStackTrace();
+ }
+ // 如果不再发送消息,关闭Producer实例。
+ producer.shutdown();
+ }
+
+}
--
Gitee
From ccd663f5abbc80551ff68442032afdf8d6a0e3fa Mon Sep 17 00:00:00 2001
From: zt <921646028@qq.com>
Date: Sun, 14 Aug 2022 15:11:42 +0800
Subject: [PATCH 2/2] rocketMq
---
src/main/java/com/example/demo/rocketmq/MqConsumerTest.java | 5 +++--
src/main/java/com/example/demo/rocketmq/MqProducerTest.java | 6 ++++--
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java b/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java
index 3064c5b..b3baf36 100644
--- a/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java
+++ b/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java
@@ -1,6 +1,7 @@
package com.example.demo.rocketmq;
import com.sun.org.apache.xpath.internal.operations.String;
+import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -9,7 +10,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
-
+@Slf4j
public class MqConsumerTest {
public static void main(String[] args) {
@@ -24,7 +25,7 @@ public class MqConsumerTest {
MessageExt msg=msgs.get(0);
if(msg.getTopic().equals("TopicTest")){
if(msg.getTags().equals("TagA")){
- System.out.println(msg.getBody().toString());
+ log.error(msg.getBody().toString());
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
diff --git a/src/main/java/com/example/demo/rocketmq/MqProducerTest.java b/src/main/java/com/example/demo/rocketmq/MqProducerTest.java
index d43def3..c5c5d0e 100644
--- a/src/main/java/com/example/demo/rocketmq/MqProducerTest.java
+++ b/src/main/java/com/example/demo/rocketmq/MqProducerTest.java
@@ -1,5 +1,6 @@
package com.example.demo.rocketmq;
+import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -9,7 +10,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
-
+@Slf4j
public class MqProducerTest {
public static void main(String[] args) {
sendMsg();
@@ -28,7 +29,8 @@ public class MqProducerTest {
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
- System.out.printf("%s%n", sendResult);
+ //System.out.printf("%s%n", sendResult);
+ log.error(sendResult.toString());
}
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
--
Gitee