# rocketmq-demo **Repository Path**: yunhoio/rocketmq-demo ## Basic Information - **Project Name**: rocketmq-demo - **Description**: 从rocketmq原生API到spring-cloud-stream集成的实战笔记 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 4 - **Created**: 2021-04-13 - **Last Updated**: 2024-03-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RocketMq 最近在研究RocketMQ,将研究过程中的笔记记了下来,有需要的小伙伴可以参考。 文档地址https://yunho.io/rocketmq/01.html ## 什么是MQ MQ(Message Queue)[消息队列](https://baike.baidu.com/item/消息队列/4751675),是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。 ## 作用 ### 异步 例子:快递员发快递,直接到客户家效率会很低。引入菜鸟驿站后,快递员只需要把快递放到菜鸟驿站,就可以继续发其他快递去了。客户再按自己的时间安排去菜鸟驿站取快递。 ### 解耦 例子: 《Thinking in JAVA》很经典,但是都是英文,我们看不懂,所以需要编辑社,将文章翻译成其他语言,这样就可以完成英语与其他语言的交流。 作用: 1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。 2、另外,解拥后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。 ### 削峰 例子:长江每年都会涨水,但是下游出水口的速度是基本稳定的,所以会涨水。 引入三峡大坝后,可以把水储存起来,下游慢慢排水。 作用:以稳定的系统资源应对突发的流量冲击。 ## 缺点 ### 系统可用性降低 系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就需要考虑如何保证MQ的高可用。 ### 系统复杂度提高 引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保讶消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问题 ### 消息一致性问题 A系统处理完业务,通过MQ发送消息给B、 C系统进行后续的业务处理。如果B系统处理成功, C系统处理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。 ## 常见MQ ![image-20210320094325411](rocketmq.assets/image-20210320094325411.png) ## 官网 home:http://rocketmq.apache.org/docs/quick-start/ ## 实战 ### 安装 下载rocketmq-all-4.7.1-bin-release.zip ```shell #解压缩 unzip rocketmq-all-4.7.1-bin-release.zip #确认环境变量 vi /etc/profile #增加rocketmq环境变量 JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.275.b01-0.el7_9.x86_64 CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar ROCKETMQ_HOME=/var/tools/rocketmq PATH=$ROCKETMQ_HOME/bin:$PATH:$JAVA_HOME/bin NAMESRV_ADDR=localhost:9876 export JAVA_HOME CLASS_PATH ROCKETMQ_HOME NAMESRV_ADDR PATH # 使环境变量生效 source /etc/profile ``` ### 修改启动脚本 ```shell # runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" #修改后 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" # runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" #修改后 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m" ``` ### 组成 ![image-20210320094341347](rocketmq.assets/image-20210320094341347.png) ### 配置 ```properties #为了自动创建可以先配置上以下配置项 autoCreateTopicEnable = true ``` ### 启动 ```shell #启动nameserver [root@localhost bin]# nohup bin/mqnamesrv & [root@localhost bin]# nohup: ignoring input and appending output to ‘nohup.out’ [root@localhost bin]# tail nohup.out OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. The Name Server boot success. serializeType=JSON #启动broker [root@localhost conf]# nohup ../bin/mqbroker -c broker.conf & #-c指定配置文件 [root@localhost conf]# nohup ../bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true & #制定服务名和开启自动创建Topic [2] 3479 [root@localhost conf]# nohup: ignoring input and appending output to ‘nohup.out’ #查看进程 [root@localhost conf]# jps 3442 NamesrvStartup 3540 Jps 3484 BrokerStartup ``` ### 测试 #### Send & Receive Messages ```shell ./bin/tools.sh org.apache.rocketmq.example.quickstart.Producer ``` > 出现异常: > > ````shell > RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). > RocketMQLog:WARN Please initialize the logger system properly. > java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed > at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679) > at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509) > at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:693) > at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:557) > at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) > at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289) > at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325) > at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67) > Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed > at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394) > at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1363) > at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1353) > at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:622) > ... 7 more > ```` > > 解决: > > 根据报错connect to null failed,因此判断是服务的IP和端口有问题,应该是出在环境变量配置上,通过查看环境变量配置文件确实没有把NAMESRV_ADDR EXPORT因此未生效。修改后不报错。 #### Receive Messages ```shell ./bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ``` ### 停止 ```shell #停止broker [root@localhost bin]# mqshutdown broker The mqbroker(3484) is running... Send shutdown request to mqbroker(3484) OK #停止namesrv [root@localhost bin]# mqshutdown namesrv The mqnamesrv(3442) is running... Send shutdown request to mqnamesrv(3442) OK ``` ### 集群 #### 规划 准备三台服务器 ```shell 192.168.3.210 worker1 192.168.3.211 worker2 192.168.3.212 worker3 ``` 为了便于观察,这次搭建一个2主2从异步刷盘的集群,所以我们会使用conf/2m-2s-async下的配置文件,实际项目中,为了达到高可用,一般会使用dleger。预备设计的集群如下: | 机器名 | Nameserver节点部署 | broker节点部署 | | ------- | ------------------ | -------------------- | | worker1 | nameserver | | | worker2 | nameserver | broker-a,broker-b-s | | worker3 | nameserver | broker-b,broker-a-s | 所以修改配置文件是进入rocketmq的config目录下修改2m-2s-async的配置文件,--只需要配置broker.conf #### 官方提供 ![image-20210322143718711](rocketmq.assets/image-20210322143718711.png) 2m-2s-async 2master,2slave,async异步复制 2m-2s-sync 2master,2slave,sync同步同步 2m-noslave 2master 没有从节点 本案例采用第一种 #### 实操 三台服务器均启动nameserver 将worker2的配置文件保留broker-a,broker-b-s 将worker3的配置文件保留broker-b,broker-a-s ##### **详细配置** **broker-a** ```properties brokerClusterName=rocket-cluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH namesrvAddr=192.168.3.210:9876;192.168.3.211:9876;192.168.3.211:9876; #存储路径 storePathRootDir=/var/tools/rocketmq/store #commitLog 存储路径 storePathCommitLog=/var/tools/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/var/tools/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/var/tools/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/var/tools/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/var/tools/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #并发send线程数,多线程来发送消息可能会出现broker busy sendMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true #是否支持根据属性过滤 如果使用基于标准的sql92模式过滤消息则改参数必须设置为true enablePropertyFilter=true ``` **broker-b-s** ```properties brokerClusterName=rocket-cluster brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH namesrvAddr=192.168.3.210:9876;192.168.3.211:9876;192.168.3.211:9876; listenPort=11011 #存储路径 storePathRootDir=/var/tools/rocketmq/bslave #commitLog 存储路径 storePathCommitLog=/var/tools/rocketmq/bslave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/var/tools/rocketmq/bslave/consumequeue #消息索引存储路径 storePathIndex=/var/tools/rocketmq/bslave/index #checkpoint 文件存储路径 storeCheckpoint=/var/tools/rocketmq/bslave/checkpoint #abort 文件存储路径 abortFile=/var/tools/rocketmq/bslave/abort #限制的消息大小 maxMessageSize=65536 #并发send线程数,多线程来发送消息可能会出现broker busy sendMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true #是否支持根据属性过滤 如果使用基于标准的sql92模式过滤消息则改参数必须设置为true enablePropertyFilter=true ``` **broker-b** ```properties brokerClusterName=rocket-cluster brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH namesrvAddr=192.168.3.210:9876;192.168.3.211:9876;192.168.3.211:9876; #存储路径 storePathRootDir=/var/tools/rocketmq/bstore #commitLog 存储路径 storePathCommitLog=/var/tools/rocketmq/bstore/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/var/tools/rocketmq/bstore/consumequeue #消息索引存储路径 storePathIndex=/var/tools/rocketmq/bstore/index #checkpoint 文件存储路径 storeCheckpoint=/var/tools/rocketmq/bstore/checkpoint #abort 文件存储路径 abortFile=/var/tools/rocketmq/bstore/abort #限制的消息大小 maxMessageSize=65536 #并发send线程数,多线程来发送消息可能会出现broker busy sendMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true #是否支持根据属性过滤 如果使用基于标准的sql92模式过滤消息则改参数必须设置为true enablePropertyFilter=true ``` **broker**-a-s ```properties brokerClusterName=rocket-cluster brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH namesrvAddr=192.168.3.210:9876;192.168.3.211:9876;192.168.3.211:9876; listenPort=11011 #存储路径 storePathRootDir=/var/tools/rocketmq/aslave #commitLog 存储路径 storePathCommitLog=/var/tools/rocketmq/aslave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/var/tools/rocketmq/aslave/consumequeue #消息索引存储路径 storePathIndex=/var/tools/rocketmq/aslave/index #checkpoint 文件存储路径 storeCheckpoint=/var/tools/rocketmq/aslave/checkpoint #abort 文件存储路径 abortFile=/var/tools/rocketmq/aslave/abort #限制的消息大小 maxMessageSize=65536 #并发send线程数,多线程来发送消息可能会出现broker busy sendMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true #是否支持根据属性过滤 如果使用基于标准的sql92模式过滤消息则改参数必须设置为true enablePropertyFilter=true ``` ##### 启动集群 ```shell #启动broker-a nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties & [root@localhost bin]# tail nohup.out The broker[broker-a, 172.17.0.1:10911] boot success. serializeType=JSON and name server is 192.168.3.210:9876;192.168.3.211:9876;192.168.3.211:9876; #启动broker-b nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties & #启动broker-a-s nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties & #启动broker-b-s nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties & [root@localhost bin]# jps 4257 Jps 3987 BrokerStartup 4167 BrokerStartup 3439 NamesrvStartup ``` #### 注释 > 在以上几种集群的配置方式中,各适用场景如下: > > 2m-2s-async:吞吐量较大,但是消息可能丢失 > > 2m-2s-sync:吞吐量会下降,但是消息更安全 > > 2m-noslave:单点故障,然后还可以直接配置broker.conf > > 一个集群无Slave,全是Master,例如2个Master或者3个Master 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响 ### 管理mqadmin RocketMQ 并没有提供类似于nacos或者RabbitMQ的控制台,只提供了mqadmin指令管理,命令在bin下,使用方式 ./mqadmin {command} {args} #### 查看集群列表 ```shell mqadmin clusterList -n 192.168.3.210:9876 ``` > 当使用多网卡时,需要配置broker对应的IP地址: > > ```properties > #broker-a > brokerIP1 = 192.168.3.211 > brokerIP2 = 192.168.3.212 > #broker-a-s > brokerIP1 = 192.168.3.212 > ``` #### 详细参数 ```shell [root@localhost bin]# mqadmin The most commonly used mqadmin commands are: updateTopic Update or create topic deleteTopic Delete topic from broker and NameServer. updateSubGroup Update or create subscription group deleteSubGroup Delete subscription group from broker. updateBrokerConfig Update broker's config updateTopicPerm Update topic perm topicRoute Examine topic route info topicStatus Examine topic Status info topicClusterList get cluster info for topic brokerStatus Fetch broker runtime status data queryMsgById Query Message by Id queryMsgByKey Query Message by Key queryMsgByUniqueKey Query Message by Unique key queryMsgByOffset Query Message by offset printMsg Print Message Detail printMsgByQueue Print Message Detail sendMsgStatus send msg to broker. brokerConsumeStats Fetch broker consume stats data producerConnection Query producer's socket connection and client version consumerConnection Query consumer's socket connection, client version and subscription consumerProgress Query consumers's progress, speed consumerStatus Query consumer's internal data structure cloneGroupOffset clone offset from other group. clusterList List all of clusters topicList Fetch all topic list from name server updateKvConfig Create or update KV config. deleteKvConfig Delete KV config. wipeWritePerm Wipe write perm of broker in all name server resetOffsetByTime Reset consumer offset by timestamp(without client restart). updateOrderConf Create or update or delete order conf cleanExpiredCQ Clean expired ConsumeQueue on broker. cleanUnusedTopic Clean unused topic on broker. startMonitoring Start Monitoring statsAll Topic and Consumer tps stats allocateMQ Allocate MQ checkMsgSendRT check message send response time clusterRT List All clusters Message Send RT getNamesrvConfig Get configs of name server. updateNamesrvConfig Update configs of name server. getBrokerConfig Get broker config by cluster or special broker! queryCq Query cq command. sendMessage Send a message consumeMessage Consume message updateAclConfig Update acl config yaml file in broker deleteAccessConfig Delete Acl Config Account in broker clusterAclConfigVersion List all of acl config version information in cluster updateGlobalWhiteAddr Update global white address for acl Config File in broker getAccessConfigSubCommand List all of acl config information in cluster ``` ### 管理控制台 RocketMQ源代码没有提供控制台,但是RocketMQ社区扩展项目中提供了控制台,https://github.com/apache/rocketmq-externals 由于这几天访问不了github,同时分享下码云的地址: https://gitee.com/mirrors/RocketMQ-Externals 进入 rocketmq-console模块 ```shell mvn clean package -Dmaven.test.skip=true ``` 打包完成后,将rocketmq-console-ng-2.0.0.jar复制到虚拟机上 自定义application.properties中的nameserver地址和端口 ```properties rocketmq.config.namesrvAddr=192.168.3.210:9876 ``` 启动执行文件 ```shell [root@localhost tools]# nohup java -jar rocketmq-console-ng-2.0.0.jar & [1] 6539 [root@localhost tools]# nohup: ignoring input and appending output to ‘nohup.out’ [root@localhost tools]# tail nohup.out [2021-03-21 01:40:57.199] INFO Starting Servlet engine: [Apache Tomcat/9.0.29] [2021-03-21 01:40:57.291] INFO Initializing Spring embedded WebApplicationContext [2021-03-21 01:40:57.291] INFO Root WebApplicationContext: initialization completed in 3216 ms [2021-03-21 01:40:58.979] INFO Initializing ExecutorService 'applicationTaskExecutor' [2021-03-21 01:40:59.226] INFO Adding welcome page: class path resource [static/index.html] [2021-03-21 01:40:59.524] INFO Initializing ExecutorService 'taskScheduler' [2021-03-21 01:40:59.555] INFO Exposing 2 endpoint(s) beneath base path '/actuator' [2021-03-21 01:40:59.627] INFO Starting ProtocolHandler ["http-nio-0.0.0.0-8080"] [2021-03-21 01:40:59.657] INFO Tomcat started on port(s): 8080 (http) with context path '' [2021-03-21 01:40:59.661] INFO Started App in 6.609 seconds (JVM running for 7.335) ``` 浏览器访问http://192.168.3.210:8080 ![image-20210323082323752](rocketmq.assets/image-20210323082323752.png) ## 消息样例 ### 生产者 **基本样例部分我们使用消息生产者分别通过三种方式发送消息,同步发送、异步发送以及单向发送** **然后使用消费者来消费这些消息**。 #### 同步发送 > **同步发送消息的样例见: org.apache.rocketmq.example.simple.Producer1等待消息返回后再继续进行下面的操作。** ```java public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } } ``` #### 异步发送 > **异步发送消息的样例见:** > **org.apache.rocketmq.example.simple.AsyncProducer** > **这个示例有个比较有趣的地方就是引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer,所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往Broker发送消息的时候也要作为服务端提供服务。** ```java public class AsyncProducer { public static void main( String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; // final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } } ``` #### 单向发送 > 关键点是单向发送只管发,没有返回值也没有回调。 ```` public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Wait for sending to complete Thread.sleep(5000); producer.shutdown(); } } ```` ### 消费者 消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式.另一种是消费者等待Broker把消息推送过来的推模式。 > 拉模式的样例见: org.apache.rocketmq,.example.simple.Ppullconsumer推模式的样例见: org.apache.rocketmq.example.simple.PushConsumer > 通常情况下,用推模式比较简单。 > 实际上RocketMQ的推模式也是由拉模式封装出来的。 > > 4.7.1版本中DefaultMQPullconsumerlmpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是 > DefaultLitePullCconsumerlmpl #### 拉取消费 拉取消费较为复杂,包括多个概念,该方式主要是首先获取某个主题下的消息队列清单,并遍历各消息获取详细消息。 关键点:需要自己管理偏移量(MessageQueueOffset) ```java public class PullConsumer { //用于存取偏移量 private static final Map OFFSE_TABLE = new HashMap(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); //某个主题下的消息队列 Set mqs = consumer.fetchSubscribeMessageQueues("broker-a"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { //拉取消息体 //pullBlockIfNotFound 中比较复杂的是第三个参数 //offset:偏移量,是消息订阅者即消费者,上次消费到的消息位点 //maxNums:最大获取消息数据量 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } ``` ![image-20210329165311910](rocketmq.assets/image-20210329165311910.png) ##### 新的拉取模式 由于原先拉取模式的消费者对象已过期,官方提供了新的替代方案。 LitePullConsumerAssign 和 LitePullConsumerSubscribe ```java /** *LitePullConsumerAssign */ public class LitePullConsumerAssign { public static volatile boolean running = true; public static void main(String[] args) throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); litePullConsumer.setAutoCommit(false); litePullConsumer.start(); Collection mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); List list = new ArrayList<>(mqSet); List assignList = new ArrayList<>(); for (int i = 0; i < list.size() / 2; i++) { assignList.add(list.get(i)); } litePullConsumer.assign(assignList); litePullConsumer.seek(assignList.get(0), 10); try { while (running) { List messageExts = litePullConsumer.poll(); System.out.printf("%s %n", messageExts); litePullConsumer.commitSync(); } } finally { litePullConsumer.shutdown(); } } } /** *LitePullConsumerSubscribe */ public class LitePullConsumerSubscribe { public static volatile boolean running = true; public static void main(String[] args) throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test"); litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); litePullConsumer.subscribe("TopicTest", "*"); litePullConsumer.start(); try { while (running) { List messageExts = litePullConsumer.poll(); System.out.printf("%s%n", messageExts); } } finally { litePullConsumer.shutdown(); } } } ``` #### 等待推送消费 等待推送消费即将消费者按照服务的方式启动,并建立消息监听器,当有消息从生产者推送时即可获取到消息。 ```java public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } } ``` ### 顺序消息 RocketMQ的顺序消息是局部有序,而全局是无序的。 > 顺序消息生产者样例见: org.apache.rocketmq.example.order.Producer > 顺序消息消费者样例见: org.apache.rocketmq.example.order.Consumer验证时,可以启动多个Consumer实例,观察下每一个订单的消息分配以及每个订单下多个步骤的消费顺序。 > 不管订单在多个Consumer实例之前是如何分配的,每个订单下的多条消息顺序都是固定从0~5的。 > RocketMQ保证的是消息的局部有序,而不是全局有序。 > 先从控制台上看下List mas是什么。 > 再回看我们的样例,实际上, RocketMQ也只保证了每个OrderID的所有消息有序发到了同一个队列,而并不能保证所有消息有序 #### 生产者 通过定义MessageQueue选择器,保证同一个topic的同一个tag(可以理解为同一个订单)下的多个消息是顺序的。 定义MessageQueue选择器的目的是,同一个tag的一组消息发送到同一个消息队列中,利用FIFO特性保证消息的顺序。 ```java public class Producer { public static void main(String[] args) throws UnsupportedEncodingException { try { MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } producer.shutdown(); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } } } ``` #### 消费者 消费者在定义消息监听器是需要采用MessageListenerOrderly而不是普通的消息监听器,这样保证消费者在消费消息时也是有序的。 ```java public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } } ``` ### 广播消息 > 广播消息的消息生产者样例见: > org.apache.rocketmq.example.broadcas.PushConsumer > 广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的),而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。 ```java public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); consumer.setNamesrvAddr("192.168.3.210:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } } ``` ### 延迟消息 延迟消息是rocketmq中一个比较特色的消息。 ```java public class Producer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. //配置NAMESRV_ADDR环境变量后可以不需要制定nameserver服务器地址 producer.setNamesrvAddr("192.168.3.210:9876"); producer.setSendMsgTimeout(6000); //Launch the instance. producer.start(); for (int i = 0; i < 10; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h msg.setDelayTimeLevel(3); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } } ``` > 延迟消息的一个重点就是延迟的级别 > > ```java > //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h > msg.setDelayTimeLevel(3); > ``` > > **但是由于开源版只提供了这个级别,无法直接设置延迟时间,因此各大厂实际应用的时候,这个延迟级别就是一个改造的重点。** ### 批量消息 批量消息是指将多条消息合并成一个批量消息,一次发送出去这样的好处是可以减少网络IO,提升吞吐量。 批量消息的消息生产者样例见:org.apache.rocketmq.example.batch.SimpleBatchProducer和 org.apache.rocketmq.example.batch.SplitBatchProducer > 相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB,就不要用一个批次发送,而要拆分成多个批次消息发 > > 送。也就是说,一个批次消息的大小不要超过1MB > > 实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB,但是使用批量消息时,这个消息长度确实是必 > > 须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitstoreMsgOK,而且不能是延迟消息 > > 事务消息等。 ```java public class SplitBatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.start(); //large batch String topic = "BatchTest"; List messages = new ArrayList<>(100 * 1000); for (int i = 0; i < 100 * 1000; i++) { messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes())); } //split the large batch into small ones: ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { List listItem = splitter.next(); producer.send(listItem); } } } class ListSplitter implements Iterator> { private int sizeLimit = 10 * 1000; private final List messages; private int currIndex; public ListSplitter(List messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { //计算message的大小 Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; //for log overhead //计算message的大小 end if (tmpSize > sizeLimit) { //it is unexpected that single message exceeds the sizeLimit //here just let it go, otherwise it will block the splitting process if (nextIndex - currIndex == 0) { //if the next sublist has no element, add this one and then break, otherwise just break nextIndex++; } break; } if (tmpSize + totalSize > sizeLimit) { break; } else { totalSize += tmpSize; } } List subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } @Override public void remove() { throw new UnsupportedOperationException("Not allowed to remove"); } } ``` ### 过滤消息 **在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。** **使用Tag过滤消息的消息生产者案例见:** **org.apache.rocketmq.example.filter.TagFilterProducer** **使用Tag过滤消息的消息消费者案例见:** **org.apache.rocketmq.example.filter.TagFilterConsumer** > 主要是看消息消费者. consumer.subscribe(TagFilterTest","TagA 111TagC";这句只订阅TagA和TagC的消息。 > > TAG是RocketMQ中特有的一个消息属性。 RocketMQ的最佳实践中就建议,使用RocketMQ时,一应用可以就用一个Topic,而应用中的不同 > > 业务就用TAG来区分。 **但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了。这时候,可以使用SQL表达式来对** **消息进行过滤。** **SQL过滤的消息生产者案例见:** **org.apache.rocketmq.example.filter.SqlFilterProducer** **SQL过滤的消息消费者案例见:** **org.apache.rocketmq.example.filter.SąlFilterConsumer** > 这个模式的关键是在消费者端使用MessageSelector.bySq(String sql)返回的一个MessageSelector,这里面的sql语句是按照SQL92标准来 > > 执行的。 sql中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性。 > > SQL92语法: > > RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。 > > 大家想一下,这个消息过滤是在Broker端进行的还是在Consumer端进行的? > > RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。 > > 值比较,比如: >, >=, <, <=, BETWEEN, =; > > 字符比较,比如: =, <>, IN; > > 15 NULL或者IS NOT NULL辑符号AND, OR, NOT; > > 常量支持类型为: > > •数值,比如: 123, 3.1415; > > 字符,比如: 'abe,必须用单引号包裹起来; > > NULL,特殊的常量 > > •值, TRUE或FALSE > > 使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。 ### 事务消息 > rocketmq最强大、最复杂的消息 > > 是RocketMQ提供的一个非常有特色的功能,需要着重理解。 **首先,我们了解下什么是事务消息。** > 官网的介绍是:事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。 **其次,我们来理解下事务消息的编程模型。事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉** **及到消息发送者,对于消息消费者来说,并没有什么特别的。** **事务消息生产者的案例见:** **org.apache.rocketmq.example.transaction.TransactionProducer** ```java # TransactionProducer public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } } # TransactionListenerImpl public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } } ``` > 事务消息的关键是在TransactionMQProducer中指定了一个Transactiontistener事务监听器,这个事务监听器就是事务消息的关键控制器。源码中的案例有点复杂,我这里准备了一个更清晰明了的事务监听器示例 然后,我们要了解下事务消息的使用限制: > 1、事务消息不支持延迟消息和批量消息。 > > 2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactioncheckMax参数来修改此限制,如果已经检查某条消息超过N次的话(N= transactioncheckMax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。 > > 3、事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECKIMMUNITY TIME IN SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数 > > 4、事务性消息可能不止一次被检查或消费。 > > 5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。 > > 6、事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。 > 事务消息的实现机制,参见下图: > > **事务消息是保证整个分布式事务的一半。** ![image-20210402154747082](rocketmq.assets/image-20210402154747082.png) **事务消息机制的关键是在发送消息时,会将消息转为一half半消息,并存入RocketMQ内部的一个RMQ SYS TRANS HALF TOPIC这个** **Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。** **最后,我们还需要思考下事务消息的作用。** **大家想一下这个事务消息跟分布式事务有什么关系?为什么扑到了分布式事务相** #### 具体业务场景 ![image-20210408103250667](rocketmq.assets/image-20210408103250667.png) 1、订单服务确认消息可用 2、订单服务保存本地事务 3、订单服务制定定时检查支付情况(一定时间内,执行15次),若最终未支付则丢弃事务,本地订单修改状态;若过程中支付成功,则通知下游服务,本地订单我修改状态 **通过事务消息,可以高效率、简洁地实现多步骤、延迟场景的事务整体一致性。** ### ACL权限控制 官方文档:项目路径/docs/cn/acl/user_guide.md **权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。** **用户在使用RocketMQ权限控制时,可以在Client客户端通过RPCHook注入Accesskey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和Accesskey和SecretKey签名等)设置在$ROCKETMQ HOME/conf/plain acl.yml的配置文件中。** **Broker端对Accesskey所拥有的权限进行校验,校验不过,抛出异常; ACL客户端可以参考: org.apache.rocketmq.example.simple包下面的AclClient代码。** > 注意,如果要在自己的客户端中使用RocketMQ的ACL功能,还需要引入一个单独的依赖包 #### 打开权限控制 配置文件路径:conf/broker.conf ```properties brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH storePathRootDir=/data/rocketmq/rootdir-a-m storePathCommitLog=/data/rocketmq/commitlog-a-m autoCreateSubscriptionGroup=true ## if acl is open,the flag will be true aclEnable=true //开启ACL listenPort=10911 brokerIP1=XX.XX.XX.XX1 namesrvAddr=XX.XX.XX.XX:9876 ``` #### ACL规则配置 配置文件路径:**conf/plain_acl.yml** ```yaml globalWhiteRemoteAddresses: - 10.10.103.* - 192.168.0.* accounts: - accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true ``` ### 消息轨迹 略 ## SpringBoot整合 这部分我们看下SpringBoot如何快速集成RocketMQ. > 在使用SpringBoot的tarter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-bootstarter.2.0.4版本开发的代码,升级到目前最新的rocketmq-springboot-starter.2.1.1后,基本就用不了了。 ### 需要注意 **SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。** **SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。** **最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。** **apache有一个官方的rocketmq-spring示例,地址: https://github.com/apache/rocketmq-spring.git以后如果版本更新了,可以参考下这个示例代码。** ### 依赖引入 ```xml org.apache.rocketmq rocketmq-spring-boot-starter 2.1.1 org.springframework.boot spring-boot-starter org.springframework spring-core org.springframework spring-webmvc org.springframework.boot spring-boot-starter-web 2.1.6.RELEASE org.springframework.boot spring-boot-starter-test 2.1.6.RELEASE ``` ### 配置信息(生产者) ![image-20210412154703270](rocketmq.assets/image-20210412154703270.png) ### 消费者的配置 消费者的配置都在注释中,@RocketMQMessageListener ```java @Component @RocketMQMessageListener(consumerGroup = "MyConsumerGroup",topic = "TestTopic",consumeMode = ConsumeMode.CONCURRENTLY) public class SpringConsumer implements RocketMQListener { @Override public void onMessage(String s) { System.out.println("Recieved message :"+s); } } //RocketMQMessageListener public @interface RocketMQMessageListener { String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}"; String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}"; String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; String consumerGroup(); String topic(); SelectorType selectorType() default SelectorType.TAG;//过滤消息,支持TAG和SQL92 String selectorExpression() default "*"; ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;//顺序消费和并发消费 MessageModel messageModel() default MessageModel.CLUSTERING;//集群和广播 int consumeThreadMax() default 64; long consumeTimeout() default 15L; String accessKey() default "${rocketmq.consumer.access-key:}";//acl权限控制 String secretKey() default "${rocketmq.consumer.secret-key:}";//acl权限控制 boolean enableMsgTrace() default true;//消息轨迹 String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";//消息轨迹 String nameServer() default "${rocketmq.name-server:}"; String accessChannel() default "${rocketmq.access-channel:}"; } ``` ### 消息体 SpringBoot的消息体为spring的消息对象,因此获取一些属性时会有些变化。 ```java //获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性 String tags = message.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString(); //RocketMQHeaders public class RocketMQHeaders { public static final String PREFIX = "rocketmq_"; public static final String KEYS = "KEYS"; public static final String TAGS = "TAGS"; public static final String TOPIC = "TOPIC"; public static final String MESSAGE_ID = "MESSAGE_ID"; public static final String BORN_TIMESTAMP = "BORN_TIMESTAMP"; public static final String BORN_HOST = "BORN_HOST"; public static final String FLAG = "FLAG"; public static final String QUEUE_ID = "QUEUE_ID"; public static final String SYS_FLAG = "SYS_FLAG"; public static final String TRANSACTION_ID = "TRANSACTION_ID"; public RocketMQHeaders() { } } ``` ### 多个事务处理的情况 需要新创建一个rocketMQtemplate ```java @ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}") public class ExtRocketMQTemplate extends RocketMQTemplate { } ``` 将对应的事务处理逻辑对象指向该rocketMQtemplate ```java @RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate") public class MyTransactionListenerImpl implements RocketMQLocalTransactionListener ``` ## SpringCloud-Stream 整合 > SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MO消息中间件产品。 > 目前官方支持RabbitMQ、Apache Kafka和Kafka Streams。 > 我们来看看SpringCloudstream如何来集成RocketMQ. **框架封装的目的就是尽可能的简化开发代码,让开发人员只需要专注于自己的业务中。** ### 引入依赖 需要注意版本,由于我们环境搭建的是4.7.1的RocketMQ,因此需要修改对应的版本。 ```xml org.apache.rocketmq rocketmq-client 4.7.1 org.apache.rocketmq rocketmq-acl 4.7.1 com.alibaba.cloud spring-cloud-starter-stream-rocketmq 2.2.3.RELEASE org.apache.rockemq rocketmq-client org.apache.rockemq rocketmq-acl org.springframework.boot spring-boot-starter-web 2.3.3.RELEASE ``` ### 编码 使用sc-stream的代码非常简单,该框架本身就是为了简化消息对接的开发量,个性化的属性配置都在配置文件中进行。 ```java //定义消费者 @Component public class ScConsumer { //关于消费者过滤消息的条件,可以在@StreamListener的condition属性中写SPEL表达式配置 //但是一般建议还是在配置文件中来配置 //@StreamListener(value=Sink.INPUT,condition= "headers['"+RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS+"']=='testTag') @StreamListener(value = Sink.INPUT) public void onMessage(String message){ System.out.println("recieved message :"+message+"from binding:"+Sink.INPUT ); } } //定义生产者 @Component public class ScProducer { @Resource private Source source; public void sendMessage(String message){ Map headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_TAGS,"testTag"); MessageHeaders messageHeaders = new MessageHeaders(headers); Message msg = MessageBuilder.createMessage(message,messageHeaders); this.source.output().send(msg); // MessageBuilder builder = MessageBuilder.withPayload(message) // .setHeader(RocketMQHeaders.TAGS,"testTag") // .setHeader(RocketMQHeaders.KEYS,"myKey") // .setHeader("DELAY","1"); // Message messages = builder.build(); // this.source.output().send(messages); } } //业务测试 @RestController @RequestMapping("/MQTest") public class RocketMQController { @Resource private ScProducer scProducer; @GetMapping("/send") public String sendMessage(String message){ scProducer.sendMessage(message); return "消息发送完成"; } } //启动类 import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.cloud.stream.messaging.Source; @EnableBinding({Source.class, Sink.class}) @SpringBootApplication public class ScRocketMQApplication { public static void main(String[] args) { SpringApplication.run(ScRocketMQApplication.class,args); } } ``` ### 配置文件 这个是sc-stream的重点 ```properties #ScStream通用的配置以spring.cloud.stream开头 spring.cloud.stream.bindings.input.destination=TestTopic spring.cloud.stream.bindings.input.group=scGroup spring.cloud.stream.bindings.output.destination=TestTopic #RocketMQ的个性化配置以Spring.cloud.stream.rocketmq开头 #Binder配置 spring.cloud.stream.rocketmq.binder.name-server=192.168.3.210:9876 #spring.cloud.stream.rocketmq.binder.access-key= #spring.cloud.stream.rocketmq.binder.secret-key= ##是否为Producer和Consumer开启消息轨迹 #spring.cloud.stream.rocketmq.binder.enable-msg-trace=false ##消息轨迹开启后存储的topic名称 #spring.cloud.stream.rocketmq.binder.customized-trace-topic= #消费者配置 #spring.cloud.stream.rocketmq.bindings.input.consumer.enabled=true #spring.cloud.stream.rocketmq.bindings.input.consumer.tags=testTag #spring.cloud.stream.rocketmq.bindings.input.consumer.sql= #spring.cloud.stream.rocketmq.bindings.input.consumer.broadcasting=false #spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true #spring.cloud.stream.rocketmq.bindings.input.consumer.delayLevelWhenNextConsume=-1or0or>0 ##同步消费消息模式下消费失败后再次消费的时间间隔 #spring.cloud.stream.rocketmq.bindings.input.consumer.suspendCurrentQueueTimeMillis=testTag #生产者配置 #spring.cloud.stream.rocketmq.bindings.output.producer.enabled=true #spring.cloud.stream.rocketmq.bindings.output.producer.group=sendGroup #spring.cloud.stream.rocketmq.bindings.output.producer.maxMessageSize=true #spring.cloud.stream.rocketmq.bindings.output.producer.transactional=true #spring.cloud.stream.rocketmq.bindings.output.producer.sync=true ##是否在VIP channel上发送消息 #spring.cloud.stream.rocketmq.bindings.output.producer.vipChannelEnabled=true ##发送消息的超时时间(毫秒) #spring.cloud.stream.rocketmq.bindings.output.producer.sendMessageTimeOut=3000 ##消息体压缩阈值(当消息体超过4K的时候会被压缩) #spring.cloud.stream.rocketmq.bindings.output.producer.compressMessageBodyThreshold=4096 ##在同步发送消息的模式下,消息发送失败的重试次数 #spring.cloud.stream.rocketmq.bindings.output.producer.retryTimesWhenSendFailed=2 ##在同步发送消息的模式下,消息发送失败的重试次数 #spring.cloud.stream.rocketmq.bindings.output.producer.retryTimesWhenSendAsyncFailed=2 ##消息发送失败的情况下是否重试其他的broker #spring.cloud.stream.rocketmq.bindings.output.producer.retryNextServer=true server.port=8099 ``` ### 注意 > 关于SpringCloudStream,这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。 > > 例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。 > SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka, kafka Stream, RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。 > > 所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0,这个差距就非常大了。 > > 另一方面, RocketMQ这帮大神不屑于写文档的问题也特别严重, SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。 > 总之,对于RocketMQ来说, SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。 ### 总结 虽然目前阿里巴巴来维护的rocketmq的stream并不是很实用,但是考虑到stream的这种设计思路,我还是希望自己来花时间实现对于rocketmq的支持。