From 303044fa4b3f7302015e57b883c7929d79a2c959 Mon Sep 17 00:00:00 2001 From: zt <921646028@qq.com> Date: Sun, 14 Aug 2022 13:22:30 +0800 Subject: [PATCH 1/2] rocketMq --- pom.xml | 5 ++ .../com/example/demo/rocketmq/MqConsumer.java | 22 +++++++++ .../example/demo/rocketmq/MqConsumerTest.java | 38 +++++++++++++++ .../com/example/demo/rocketmq/MqProducer.java | 38 +++++++++++++++ .../example/demo/rocketmq/MqProducerTest.java | 46 +++++++++++++++++++ 5 files changed, 149 insertions(+) create mode 100644 src/main/java/com/example/demo/rocketmq/MqConsumer.java create mode 100644 src/main/java/com/example/demo/rocketmq/MqConsumerTest.java create mode 100644 src/main/java/com/example/demo/rocketmq/MqProducer.java create mode 100644 src/main/java/com/example/demo/rocketmq/MqProducerTest.java diff --git a/pom.xml b/pom.xml index 9c7072e..b5c3149 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,11 @@ log4j-core 2.17.2 + + org.apache.rocketmq + rocketmq-client + 4.7.1 + diff --git a/src/main/java/com/example/demo/rocketmq/MqConsumer.java b/src/main/java/com/example/demo/rocketmq/MqConsumer.java new file mode 100644 index 0000000..05e404f --- /dev/null +++ b/src/main/java/com/example/demo/rocketmq/MqConsumer.java @@ -0,0 +1,22 @@ +package com.example.demo.rocketmq; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; + +public class MqConsumer { + private static DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("myConsumer"); + private static int initialstate=0; + + public MqConsumer() { + } + public static DefaultMQPushConsumer getDefaultMQPushConsumer(){ + if(consumer==null){ + consumer=new DefaultMQPushConsumer("myConsumer"); + } + if(initialstate==0){ + consumer.setNamesrvAddr("103.160.151.37:9876"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + } + return consumer; + } +} diff --git a/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java b/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java new file mode 100644 index 0000000..3064c5b --- /dev/null +++ b/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java @@ -0,0 +1,38 @@ +package com.example.demo.rocketmq; + +import com.sun.org.apache.xpath.internal.operations.String; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + +public class MqConsumerTest { + public static void main(String[] args) { + + } + public static void receiveMsg(){ + DefaultMQPushConsumer consumer=MqConsumer.getDefaultMQPushConsumer(); + try { + consumer.subscribe("TopicTest","*"); + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + MessageExt msg=msgs.get(0); + if(msg.getTopic().equals("TopicTest")){ + if(msg.getTags().equals("TagA")){ + System.out.println(msg.getBody().toString()); + } + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.start(); + } catch (MQClientException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/example/demo/rocketmq/MqProducer.java b/src/main/java/com/example/demo/rocketmq/MqProducer.java new file mode 100644 index 0000000..3fc6941 --- /dev/null +++ b/src/main/java/com/example/demo/rocketmq/MqProducer.java @@ -0,0 +1,38 @@ +package com.example.demo.rocketmq; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class MqProducer { + // 实例化消息生产者Producer + private static DefaultMQProducer producer = new DefaultMQProducer("myProducer"); + private static int initialState=0; + + public MqProducer() { + } + public static DefaultMQProducer getDefaultMQProducer() throws MQClientException { + if(producer==null){ + producer=new DefaultMQProducer("myProducer"); + } + if(initialState==0){ + // 设置NameServer的地址 + producer.setNamesrvAddr("103.160.151.37:9876"); + // 启动Producer实例 + producer.start(); + initialState=1; + } + return producer; + } + + public static void main(String[] args) throws Exception { + + + + + + } +} + diff --git a/src/main/java/com/example/demo/rocketmq/MqProducerTest.java b/src/main/java/com/example/demo/rocketmq/MqProducerTest.java new file mode 100644 index 0000000..d43def3 --- /dev/null +++ b/src/main/java/com/example/demo/rocketmq/MqProducerTest.java @@ -0,0 +1,46 @@ +package com.example.demo.rocketmq; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.io.UnsupportedEncodingException; + +public class MqProducerTest { + public static void main(String[] args) { + sendMsg(); + } + public static void sendMsg() { + DefaultMQProducer producer= null; + try { + producer = MqProducer.getDefaultMQProducer(); + for (int i = 0; i < 100; i++) { + // 创建消息,并指定Topic,Tag和消息体 + Message msg = new Message( + "TopicTest" /* Topic */, + "TagA" /* Tag */, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ + ); + // 发送消息到一个Broker + SendResult sendResult = producer.send(msg); + // 通过sendResult返回消息是否成功送达 + System.out.printf("%s%n", sendResult); + } + } catch (MQClientException | UnsupportedEncodingException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } + // 如果不再发送消息,关闭Producer实例。 + producer.shutdown(); + } + +} -- Gitee From ccd663f5abbc80551ff68442032afdf8d6a0e3fa Mon Sep 17 00:00:00 2001 From: zt <921646028@qq.com> Date: Sun, 14 Aug 2022 15:11:42 +0800 Subject: [PATCH 2/2] rocketMq --- src/main/java/com/example/demo/rocketmq/MqConsumerTest.java | 5 +++-- src/main/java/com/example/demo/rocketmq/MqProducerTest.java | 6 ++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java b/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java index 3064c5b..b3baf36 100644 --- a/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java +++ b/src/main/java/com/example/demo/rocketmq/MqConsumerTest.java @@ -1,6 +1,7 @@ package com.example.demo.rocketmq; import com.sun.org.apache.xpath.internal.operations.String; +import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -9,7 +10,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; - +@Slf4j public class MqConsumerTest { public static void main(String[] args) { @@ -24,7 +25,7 @@ public class MqConsumerTest { MessageExt msg=msgs.get(0); if(msg.getTopic().equals("TopicTest")){ if(msg.getTags().equals("TagA")){ - System.out.println(msg.getBody().toString()); + log.error(msg.getBody().toString()); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; diff --git a/src/main/java/com/example/demo/rocketmq/MqProducerTest.java b/src/main/java/com/example/demo/rocketmq/MqProducerTest.java index d43def3..c5c5d0e 100644 --- a/src/main/java/com/example/demo/rocketmq/MqProducerTest.java +++ b/src/main/java/com/example/demo/rocketmq/MqProducerTest.java @@ -1,5 +1,6 @@ package com.example.demo.rocketmq; +import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -9,7 +10,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; - +@Slf4j public class MqProducerTest { public static void main(String[] args) { sendMsg(); @@ -28,7 +29,8 @@ public class MqProducerTest { // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 - System.out.printf("%s%n", sendResult); + //System.out.printf("%s%n", sendResult); + log.error(sendResult.toString()); } } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); -- Gitee