# rocketmq-sample **Repository Path**: noah-hong/rocketmq-sample ## Basic Information - **Project Name**: rocketmq-sample - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-03-04 - **Last Updated**: 2023-03-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RocketMQ系列分享 ## 架构 ![](image/rocketmq_architecture_3.png) ### 角色职责 RocketMQ架构上主要分为四部分,如上图所示: - Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。 - Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。对于生产者和消费者来说它是服务端,对于NameServer来说它是客户端 - NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。 - BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。 1. Remoting Module:整个Broker的实体,负责处理来自Client端的请求。 2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。 3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。 4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。 5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。 ### 工作流程 - 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。 - Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。 - 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。 - Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。 - Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。 ### 名词解释 #### 1 消息模型(Message Model) RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。 #### 2 消息生产者(Producer) 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。 #### 3 消息消费者(Consumer) 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。 #### 4 主题(Topic) 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 #### 5 代理服务器(Broker Server) 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 #### 6 名称服务(Name Server) 名称服务充当路由消息的提供者。生产者或消费者能够通过名称服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。 #### 7 拉取式消费(Pull Consumer) Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。 #### 8 推动式消费(Push Consumer) 一般性解释:Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。 在RocketMQ中推动式消费主要由客户端拉取请求包装而成,只有在客户端没有拉取到消息时,此后的消息才由服务端推送到客户端(仅一次,并且有时间限制) #### 9 生产者组(Producer Group) 同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。 #### 10 消费者组(Consumer Group) 同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。 #### 11 集群消费(Clustering) 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。 #### 12 广播消费(Broadcasting) 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。 #### 13 普通顺序消息(Normal Ordered Message) 普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。 #### 14 严格顺序消息(Strictly Ordered Message) 严格顺序消息模式下,消费者收到的所有消息均是有顺序的。 #### 15 消息(Message) 消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。 #### 16 标签(Tag) 为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 ## 集群介绍 ### 集群部署模式 - 搭建方式:NameSrv-2个,主Broker-2个,从Broker-2个 - 部署说明:每个主Broker将携带一个从服务器,数据同步采用异步的方式 - 部署场景: 1. 主服务器横向库容可减轻写入负载带来的压力 2. 从服务器可减轻所属的主服务器消息订阅压力 ### 集群启动步骤 1. [下载RocketMQ源码](https://github.com/apache/rocketmq.git) 并切换分支[release-4.3.0] 2. [下载RocketMQ配置](https://archive.apache.org/dist/rocketmq/4.3.0/rocketmq-all-4.3.0-bin-release.zip) 3. 配置ROCKETMQ_HOME环境变量 作用:指定NameServer和Broker的日志配置地址 4. 开始运行NameServer - 配置第一个NameServer ***应用程序参数*** ``` -c /Users/noah.hong/rocketmq-conf/namesrv1.conf ``` ***[namesrv1.conf]的配置内容*** ``` listenPort=9876 ``` - 配置第二个NameServer ***应用程序参数*** ``` -c /Users/noah.hong/rocketmq-conf/namesrv2.conf ``` ***[namesrv2.conf]的配置内容*** ``` listenPort=9877 ``` 5. 开始运行Broker - 配置第一个Broker的主服务器 ***应用程序参数*** ``` -c /Users/noah.hong/rocketmq-conf/broker.conf ``` ***[broker.conf]的配置内容*** ``` #当前启动broker的IP地址 brokerIP1=127.0.0.1 #监听端口 listenPort=12011 #集群名称 brokerClusterName = DefaultCluster #Broker名称,主从Broker名称要相同 brokerName = broker-a #BrokerRole为MASTER时可以不设置,为Slave时需要大于0 brokerId = 0 #Broker 的角色 brokerRole = ASYNC_MASTER # ASYNC_MASTER #刷盘方式 flushDiskType = SYNC_FLUSH namesrvAddr =localhost:9876;localhost:9877 #存储路径 storePathRootDir=/Users/noah.hong/rocketmq-conf/store #commitLog 存储路径 storePathCommitLog=/Users/noah.hong/rocketmq-conf/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/Users/noah.hong/rocketmq-conf/store/consumequeue #消息索引存储路径 storePathIndex=/Users/noah.hong/rocketmq-conf/store/index #checkpoint 文件存储路径 storeCheckpoint=/Users/noah.hong/rocketmq-conf/store/checkpoint #abort 文件存储路径 abortFile=/Users/noah.hong/rocketmq-conf/store/abort ``` - 配置第一个Broker的从服务器 ***应用程序程序参数*** ``` -c /Users/noah.hong/rocketmq-conf/broker-slave.conf ``` ***[broker-slave.conf]的配置内容*** ``` brokerIP1=127.0.0.1 #主Broker的IP,用于从服务器同步数据的IP brokerIP2=127.0.0.1 listenPort=11011 brokerClusterName=DefaultCluster brokerName=broker-a brokerId=1 brokerRole=SLAVE namesrvAddr =localhost:9876;localhost:9877 storePathRootDir=/Users/noah.hong/rocketmq-conf/store-slave storePathCommitLog=/Users/noah.hong/rocketmq-conf/store-slave/commitlog storePathConsumeQueue=/Users/noah.hong/rocketmq-conf/store-slave/consumequeue storePathIndex=/Users/noah.hong/rocketmq-conf/store-slave/index storeCheckpoint=/Users/noah.hong/rocketmq-conf/store-slave/checkpoint abortFile=/Users/noah.hong/rocketmq-conf/store-slave/abort ``` - 配置第二个Broker的主服务器 ***应用程序程序参数*** ``` -c /Users/noah.hong/rocketmq-conf/broker-b.conf ``` ***[broker-b.conf]的配置内容*** ``` brokerIP1=127.0.0.1 listenPort=13011 brokerClusterName = DefaultCluster brokerName = broker-b brokerRole = ASYNC_MASTER # ASYNC_MASTER flushDiskType = SYNC_FLUSH namesrvAddr =localhost:9876;localhost:9877 storePathRootDir=/Users/noah.hong/rocketmq-conf/store-b storePathCommitLog=/Users/noah.hong/rocketmq-conf/store-b/commitlog storePathConsumeQueue=/Users/noah.hong/rocketmq-conf/store-b/consumequeue storePathIndex=/Users/noah.hong/rocketmq-conf/store-b/index storeCheckpoint=/Users/noah.hong/rocketmq-conf/store-b/checkpoint abortFile=/Users/noah.hong/rocketmq-conf/store-b/abort ``` - 配置第二个Borker的从服务器 ***应用程序程序参数*** ``` -c /Users/noah.hong/rocketmq-conf/broker-b-slave.conf ``` ***[broker-b-slave.conf]的配置内容*** ``` brokerIP1=127.0.0.1 brokerIP2=127.0.0.1 listenPort=14011 brokerClusterName=DefaultCluster brokerName=broker-b brokerId=1 brokerRole=SLAVE namesrvAddr =localhost:9876;localhost:9877 storePathRootDir=/Users/noah.hong/rocketmq-conf/store-slave-b storePathCommitLog=/Users/noah.hong/rocketmq-conf/store-slave-b/commitlog storePathConsumeQueue=/Users/noah.hong/rocketmq-conf/store-slave-b/consumequeue storePathIndex=/Users/noah.hong/rocketmq-conf/store-slave-b/index storeCheckpoint=/Users/noah.hong/rocketmq-conf/store-slave-b/checkpoint abortFile=/Users/noah.hong/rocketmq-conf/store-slave-b/abort ``` 6. 查看集群运行情况 ***运行命令*** ``` ./mqadmin clusterList -n localhost:9876 ``` ***运行结果*** ![](docs/集群运行情况.png) ## 客户端特性介绍 ### 生产者特性 ***生产者流程概览*** ![](docs/生产流程概览.png) #### 特性1:发送同步消息 ***流程图:普通消息主要执行流程*** ![](docs/普通消息发送流程.png) ***前置工作*** 1. 在broker上创建Topic ``` ./mqadmin updateTopic -b localhost:12011 -c DefaultCluster -n localhost:9876 -p 6 -t SyncProducerTopic -r 8 -w 8 ``` ***代码示例*** ```java public class SyncProducer { public static void main(String[] args) throws Exception { final int sendCount = 10; final String topic = "SyncProducerTopic"; DefaultMQProducer producer = new DefaultMQProducer("SyncProducerGroup"); producer.setNamesrvAddr("localhost:9876"); //注册HOOK,可注册多个 producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() { @Override public String hookName() { return null; } @Override public void sendMessageBefore(SendMessageContext context) { //TODO 消息执行前的HOOK } @Override public void sendMessageAfter(SendMessageContext context) { //TODO 消息执行后的HOOK //获取发送结果 //context.getSendResult(); //获取发送异常,不为空,就意味消息发送失败 //context.getException() } }); producer.start(); for (int i = 0; i < sendCount; i++) { byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET); Message msg = new Message(topic, body); //服务端为异步刷盘时失去作用 msg.setWaitStoreMsgOK(true); try { SendResult sendResult = producer.send(msg); String sendResultFormat = MessageFormat.format("发送状态:{0},所属Broker:{1},所属队列:{2},消息编号:{3}" , sendResult.getSendStatus().name() , sendResult.getMessageQueue().getBrokerName() , sendResult.getMessageQueue().getQueueId() , sendResult.getMsgId()); System.out.println(sendResultFormat); } catch (Exception ex) { ex.printStackTrace(); } } System.exit(0); } } ``` ***返回结果结构:SendResult*** ```java //发送结果 private SendStatus sendStatus; //消息编号,客户端生成,可用于控制台查询 private String msgId; //消息发往的消息队列 private MessageQueue messageQueue; //消息队列里的偏移量 private long queueOffset; //消息事务编号,客户端生成 private String transactionId; //由服务端生成的消息编号,可用于queryMsgByUniqueKey命令查询消息 private String offsetMsgId; //Broker配置里的regionId private String regionId; ``` ***注意事项*** 1. 客户端等待broker存储消息成功后返回结果的配置,只有当broker为同步刷盘时生效 ***[服务端]关键代码:CommitLog#handleDiskFlush*** ```java if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } ``` 2. 消息体大小的控制[DefaultMQProducer#maxMessageSize]默认4MB,服务端也是4MB,要使消息体大小超过4MB,需要服务端同步变更 3. **Topic的长度Broker限制长度127位,客户端为255位,以服务端为主** 4. **消息属性头的长度Broker限制长度32767** 5. 未指定发送队列时,每次发送都将轮流将消息发送到不同队列里,当发送失败时,将通过切换broker进行重试,默认重试次数3次,可通过配置retryTimesWhenSendFailed配置改变,如果不想切换Broker可以通过改变sendLatencyFaultEnable为true来做到 6. 消息超过阈值[compressMsgBodyOverHowmuch:4kb]将会尝试压缩,默认压缩级别5,可通过系统变量rocketmq.message.compressLevel更改 #### 特性2:发送异步消息 ***前置工作*** 1. 在broker上创建Topic ``` ./mqadmin updateTopic -b localhost:13011 -c DefaultCluster -n localhost:9876 -p 6 -t ASyncProducerTopic -r 8 -w 8 ``` ***代码示例*** ```java public class ASyncProducer { public static void main(String[] args) throws Exception { final int sendCount = 10; final String topic = "ASyncProducerTopic"; DefaultMQProducer producer = new DefaultMQProducer("ASyncProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < sendCount; i++) { byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET); Message msg = new Message(topic, body); //服务端为异步刷盘时失去作用 msg.setWaitStoreMsgOK(true); try { //发送消息后,结果通过回调函数返回 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { String sendResultFormat = MessageFormat.format("发送状态:{0},所属Broker:{1},所属队列:{2},消息编号:{3}" , sendResult.getSendStatus().name() , sendResult.getMessageQueue().getBrokerName() , sendResult.getMessageQueue().getQueueId() , sendResult.getMsgId()); System.out.println(sendResultFormat); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); //发送消息后,不关心返回结果 producer.sendOneway(msg); } catch (Exception ex) { ex.printStackTrace(); } } } } ``` #### 特性3:指定发送队列 ***注意事项*** 1. 当指定发送队列,Broker异常时无法切换Broker进行重试 2. 当消息队列数量发生变化时,要注意处理变动后所可能产生的问题,如消息乱序 ***流程图:自定义队列主要执行流程*** ![](docs/自定义队列消息发送流程.png) ##### 特性3.1:HASH模式-单Broker ***前置工作*** 1. 在broker-b上创建Topic ```shell ./mqadmin updateTopic -b localhost:13011 -c DefaultCluster -n localhost:9876 -p 6 -t AllocateQueueProducerTopic -r 8 -w 8 ``` ***代码示例*** ```java public class AllocateQueueProducer { public static void main(String[] args) throws Exception { final int sendCount = 10; //注册该Topic到broker-b上 final String topic = "AllocateQueueProducerTopic"; DefaultMQProducer producer = new DefaultMQProducer("AllocateQueueProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < sendCount; i++) { byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET); Message msg = new Message(topic, body); //服务端为异步刷盘时失去作用 msg.setWaitStoreMsgOK(true); try { SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), i); String sendResultFormat = MessageFormat.format("发送状态:{0},所属Broker:{1},所属队列:{2},消息编号:{3}" , sendResult.getSendStatus().name() , sendResult.getMessageQueue().getBrokerName() , sendResult.getMessageQueue().getQueueId() , sendResult.getMsgId()); System.out.println(sendResultFormat); } catch (Exception ex) { ex.printStackTrace(); } } System.exit(0); } } ``` ##### 特性3.2:HASH模式-双Broker ***前置工作*** 1. 在broker-a和broker-b上创建topic ``` ./mqadmin updateTopic -c DefaultCluster -n localhost:9876 -p 6 -t AllocateQueueInAllBrokerProducerTopic -r 8 -w 8 ``` ***代码示例*** ```java public class AllocateQueueInAllBrokerProducer { public static void main(String[] args) throws Exception { final int sendCount = 10; //注册该Topic到broker-a与broker-b上 final String topic = "AllocateQueueInAllBrokerProducerTopic"; DefaultMQProducer producer = new DefaultMQProducer("AllocateQueueInAllBrokerProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < sendCount; i++) { byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET); Message msg = new Message(topic, body); //服务端为异步刷盘时失去作用 msg.setWaitStoreMsgOK(true); try { SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), i); String sendResultFormat = MessageFormat.format("发送状态:{0},所属Broker:{1},所属队列:{2},消息编号:{3}" , sendResult.getSendStatus().name() , sendResult.getMessageQueue().getBrokerName() , sendResult.getMessageQueue().getQueueId() , sendResult.getMsgId()); System.out.println(sendResultFormat); } catch (Exception ex) { ex.printStackTrace(); } } System.exit(0); } } ``` ##### 特性3.3:自定义模式 ***前置工作*** 1. 在broker上创建Topic ```shell ./mqadmin updateTopic -b localhost:12011 -c DefaultCluster -n localhost:9876 -p 6 -t AllocateQueueByCustomProducerTopic -r 8 -w 8 ``` ***代码示例*** ```java public class CustomSelectMessageQueueByHash implements MessageQueueSelector { @Override public MessageQueue select(List mqs, Message msg, Object arg) { final MessageQueue messageQueue = mqs.get(0); return messageQueue; } } ``` ```java public class AllocateQueueByCustomProducer { public static void main(String[] args) throws Exception { final int sendCount = 10; //注册该Topic到broker-a上 final String topic = "AllocateQueueByCustomProducerTopic"; DefaultMQProducer producer = new DefaultMQProducer("AllocateQueueByCustomProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < sendCount; i++) { byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET); Message msg = new Message(topic, body); //服务端为异步刷盘时失去作用 msg.setWaitStoreMsgOK(true); try { SendResult sendResult = producer.send(msg, new CustomSelectMessageQueueByHash(), i); String sendResultFormat = MessageFormat.format("发送状态:{0},所属Broker:{1},所属队列:{2},消息编号:{3}" , sendResult.getSendStatus().name() , sendResult.getMessageQueue().getBrokerName() , sendResult.getMessageQueue().getQueueId() , sendResult.getMsgId()); System.out.println(sendResultFormat); } catch (Exception ex) { ex.printStackTrace(); } } System.exit(0); } } ``` #### 特性4:发送批量消息 ***流程图:批量消息主要执行流程*** ![](docs/批量消息发送流程.png) ***代码示例*** ```java public class SimpleBatchProducer { public static void main(String[] args) throws Exception { final int sendCount = 50; DefaultMQProducer producer = new DefaultMQProducer("SimpleBatchProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); String topic = "SimpleBatchProducerTopic"; List messages = new ArrayList<>(); for (int i = 0; i < sendCount; i++) { byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET); Message msg = new Message(topic, "tag" + i, body); messages.add(msg); } final SendResult sendResult = producer.send(messages); String sendResultFormat = MessageFormat.format("发送状态:{0},所属Broker:{1},所属队列:{2},消息编号:{3}" , sendResult.getSendStatus().name() , sendResult.getMessageQueue().getBrokerName() , sendResult.getMessageQueue().getQueueId() , sendResult.getMsgId()); System.out.println(sendResultFormat); System.exit(0); } } ``` ***注意事项*** 1. 在broker上创建topic ``` ./mqadmin updateTopic -c DefaultCluster -n localhost:9876 -p 6 -t SimpleBatchProducerTopic -r 8 -w 8 ``` 2. 配置的Topic和isWaitStoreMsgOK以第一个为准并且延时消息不支持批量发送 ***关键代码:MessageBatch#generateFromList*** ```java for (Message message : messages) { if (message.getDelayTimeLevel() > 0) { throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching"); } if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { throw new UnsupportedOperationException("Retry Group is not supported for batching"); } if (first == null) { first = message; } else { if (!first.getTopic().equals(message.getTopic())) { throw new UnsupportedOperationException("The topic of the messages in one batch should be the same"); } if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) { throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same"); } } messageList.add(message); } ``` 3. 所有消息会压缩成一条再发送到服务端处理 ***关键代码:DefaultMQProducer#batch*** ```java private MessageBatch batch(Collection msgs) throws MQClientException { MessageBatch msgBatch; try { msgBatch = MessageBatch.generateFromList(msgs); for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); } msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch", e); } return msgBatch; } ``` 4. 发送消息的总大小不能超过Broker设定的消息大小[maxMessageSize],默认4mb ***客户端-关键代码:Validators#checkMessage*** ```java if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } ``` ***服务端-关键代码:DefaultMessageStore#putMessages*** ```java if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } ``` 5. 所有的消息都会发送到同一个队列中,并且所有消息都会同时刷到内存(commitlog)中,不会发生部分消息刷写成功,部分失败 #### 特性5:发送延时消息 ***前置工作*** 1. 在broker上创建topic ``` ./mqadmin updateTopic -c DefaultCluster -n localhost:9876 -p 6 -t DelayProducerTopic -r 8 -w 8 ``` ***流程图:延时消息主要执行流程*** ![](docs/延时消息发送流程.png) ***流程图:延时消息主要调度流程*** ![](docs/延时消息调度流程.png) ***代码示例*** ```java public class DelayProducer { public static void main(String[] args) throws Exception { final String topic = "DelayProducerTopic"; DefaultMQProducer producer = new DefaultMQProducer("TransactionProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); byte[] body = ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET); Message msg = new Message(topic, body); //设置消息延迟级别,有18个级别 msg.setDelayTimeLevel(3); try { SendResult sendResult = producer.send(msg); String sendResultFormat = MessageFormat.format("发送状态:{0},所属Broker:{1},所属队列:{2},消息编号:{3}" , sendResult.getSendStatus().name() , sendResult.getMessageQueue().getBrokerName() , sendResult.getMessageQueue().getQueueId() , sendResult.getMsgId()); System.out.println(sendResultFormat); } catch (Exception ex) { ex.printStackTrace(); } System.exit(0); } } ``` ***注意事项*** 1. 延时级别[delayTimeLevel]固定有18个级别,分别从1到18为 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",可通过配置messageDelayLevel改变延时级别 2. broker接收到延时消息时,实际commitlog中存储的topic为SCHEDULE_TOPIC_XXXX,当时间到达预设值时才会将正确的topic信息存储到commitlog中 ***关键代码:CommitLog#putMessage*** ```java if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } //实际存储的topic为SCHEDULE_TOPIC_XXXX topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } ``` ***关键代码:ScheduleMessageService#DeliverDelayedMessageTimerTask#executeOnTimeup*** ```java long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; //到达预设时间会执行存储实际topic的代码 if (countdown <= 0) { MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { //将构建真实的消息体 MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); //保存真实的消息体 PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore .putMessage(msgInner); } //忽略代码 } ``` 3. 调度服务在正常运行时会每隔100ms拉取Topic为SCHEDULE_TOPIC_XXXX的消息,用于判断当前是否有消息需要处理,如果有定时消息则判断当前定时消息是否到达预订时间,如果未到达则该延迟级别的消息拉取将在(预订时间-当前时间)的时间后执行 ***关键代码:ScheduleMessageService#DeliverDelayedMessageTimerTask#executeOnTimeup*** ```java //拉取定时消息 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); //忽略代码 //未到达定时时间则隔差异时间后再调度 ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); //忽略代码 //100ms后继续执行executeOnTimeup ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); ``` 4. 延时消息过多可能造成 - 延时消息保存成真实消息时与实际期望的延迟时间差异较大,因为保存延时消息为真实消息时是一条条循环遍历保存的 - 客户端拉取消费时,可能拉取到过多的消息,造成客户端压力,客户端订阅延时消息时要注意限流 #### 特性6:发送事务消息 ***前置工作*** 1. 在broker上创建topic ``` ./mqadmin updateTopic -c DefaultCluster -n localhost:9876 -p 6 -t TransactionProducerTopic -r 8 -w 8 ``` ***流程图:事务消息主要执行流程*** ![](docs/事务消息.png) ***流程图:正常事务消息执行流程*** ![](docs/事务消息发送流程.png) ***流程图:异常事务消息执行流程*** ![](docs/异常事务消息调度流程.png) ***代码示例*** ```java public class TransactionProducer { static class CustomTransactionListener implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("checkLocalTransaction:" + msg.getTransactionId()); return LocalTransactionState.COMMIT_MESSAGE; } } //./mqadmin topicStatus -t RMQ_SYS_TRANS_HALF_TOPIC -n localhost:9876 public static void main(String[] args) throws Exception { final String topic = "TransactionProducerTopic"; byte[] body = ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET); TransactionListener transactionListener = new CustomTransactionListener(); TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerTopicGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setTransactionListener(transactionListener); producer.start(); try { Message msg = new Message(topic, body); MessageAccessor .putProperty(msg, MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "5"); SendResult sendResult = producer.sendMessageInTransaction(msg, null); String sendResultFormat = MessageFormat.format("发送状态:{0},所属Broker:{1},所属队列:{2},消息编号:{3}" , sendResult.getSendStatus().name() , sendResult.getMessageQueue().getBrokerName() , sendResult.getMessageQueue().getQueueId() , sendResult.getMsgId()); System.out.println(sendResultFormat); } catch (Exception e) { e.printStackTrace(); } } } ``` ***注意事项*** 1. broker接收到事务消息时,实际commitlog中存储的topic为RMQ_SYS_TRANS_HALF_TOPIC ***[服务端]关键代码:TransactionalMessageBridge#parseHalfMessageInner*** ```java private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; } ``` 2. 当本地事务状态为COMMIT_MESSAGE时,会向broker发送请求将真正的topic存储到commitlog中 ***[服务端]关键代码:EndTransactionProcessor#processRequest*** ```java //取出对应偏移量的事务消息 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { //构建真正存储的消息 MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); //保存该消息到commitlog中 RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { //存储RMQ_SYS_TRANS_OP_HALF_TOPIC到commitlog中,存储body为当前半消息的偏移量,后续调度程序将通过该偏移量过滤掉该半消息 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } ``` 3. 每次回查或者未到达回查时间时commitlog会再加一条半消息,最多回查[transactionCheckMax]次,默认值为5 ***可能导致回查的情况*** 1)客户端没有告知服务端事务消息结果或者告知结果为UNKNOW 2)服务端主动回查时,客户端返回结果时间太久,导致服务端下一轮回查 3)客户端下线,无法告知事务消息结果 ***[服务端]关键代码:TransactionalMessageServiceImpl#check*** ```java //忽略代码 //事件诞生到目前为止经过了多少时间,用于跟设置的回查时间做对比,判断是否执行后续逻辑 long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); long checkImmunityTime = transactionTimeout; //获取客户端设置的回查时间 String checkImmunityTimeStr = msgExt .getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { //如果指定了客户端回查时间则以客户端为主 checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { //如果未到达回查时间则再加一条半消息到commitlog中 if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) { newOffset = i + 1; i++; continue; } } } //忽略代码 if (isNeedCheck) { //将需要确认回查结果的半消息存储,主要用于更新此半消息最新信息,如已回查次数 if (!putBackHalfMsgQueue(msgExt, i)) { continue; } //触发客户端回查请求 listener.resolveHalfMsg(msgExt); } //忽略代码 ``` 4. 当事务消息检查次数超过[transactionCheckMax]时,默认5次,将不再回查,导致该消息丢失 ***可能导致不回查的情况*** 1)事务消息的定时调度程序(定时执行间隔由[transactionCheckInterval]决定,默认60s),拉取待确认的半消息属性时,发现该半消息还未到达回查时间 1.1) 如何设置事务性消息回查时间 1.1.1) 服务端可通过transactionTimeOut指定回查时间,默认3s; 1.1.2) 客户端可通过消息头属性[MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS]指定,如果设置了客户端回查时间以客户端为准 2)无法找到客户端回查函数,如客户端下线,未找到对应的producer-group 3)客户端一直未返回回查结果或者返回UNKNOW 4)其他应用服务使用了同名的producer-group导致回查结果混乱 ***[服务端]关键代码:TransactionalMessageServiceImpl#check*** ```java //如果重试次数到达设置的阈值则跳过 if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } ``` ***[服务端]关键代码:TransactionalMessageServiceImpl#check*** ```java //如果重试次数到达设置的阈值则跳过 if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } ``` ***[客户端]关键代码:ClientRemotingProcessor#checkTransactionState*** ```java final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); if (group != null) { MQProducerInner producer = this.mqClientFactory.selectProducer(group); if (producer != null) { final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); //执行指定group的回查函数 producer.checkTransactionState(addr, messageExt, requestHeader); } else { log.debug("checkTransactionState, pick producer by group[{}] failed", group); } } ``` #### 特性7:自动创建Topic ***流程图:自动创建topic主要执行流程*** ![](docs/自动创建topic流程.png) ***代码示例*** ```java public class AutoCreateTopicProducer { public static void main(String[] args) throws Exception { final int sendCount = 1; final String topic = "TestAutoCreateTopic"; DefaultMQProducer producer = new DefaultMQProducer("AutoCreateTopicProducerGroup"); //设置默认创建的Topic读写队列数量 producer.setDefaultTopicQueueNums(10); producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < sendCount; i++) { byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET); Message msg = new Message(topic, body); try { SendResult sendResult = producer.send(msg); String sendResultFormat = MessageFormat.format("发送状态:{0},所属Broker:{1},所属队列:{2},消息编号:{3}" , sendResult.getSendStatus().name() , sendResult.getMessageQueue().getBrokerName() , sendResult.getMessageQueue().getQueueId() , sendResult.getMsgId()); System.out.println(sendResultFormat); } catch (Exception ex) { ex.printStackTrace(); } } System.exit(0); } } ``` ***注意事项*** 1. 需要配置broker.conf中的autoCreateTopicEnable=true - 如果原先为false,可设置为true并重启应用服务,启用自动创建Topic功能 - 如果原先为true,可使用命令./mqadmin updateBrokerConfig 更新为false,禁用自动创建Topic功能 ***命令示例*** ``` ./mqadmin updateBrokerConfig -n localhost:9876 -b localhost:12011 -k autoCreateTopicEnable -v true ``` 2. 当TOPIC创建时的读写队列数量,无法超过AUTO_CREATE_TOPIC_KEY的队列数量,每个Broker单独计算 - 当客户端要求创建的队列数量超过[AUTO_CREATE_TOPIC_KEY]的读队列数量时,所创建的队列数量变更为服务端的读队列数量(客户端) ***关键代码:MQClientInstance#updateTopicRouteInfoFromNameServer*** ```java TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } ``` - 当客户端要求创建的队列数量超过[AUTO_CREATE_TOPIC_KEY]的写队列数量时,所创建的队列数量变更为服务端的写队列数量(服务端) ***关键代码:TopicConfigManager#createTopicInSendMessageMethod*** ```java int queueNums = clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig .getWriteQueueNums() : clientDefaultTopicQueueNums; if (queueNums < 0) { queueNums = 0; } topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); ``` - 配置broker.conf文件中的默认队列数量[defaultTopicQueueNums]无效,因为会被topics.json中的[AUTO_CREATE_TOPIC_KEY]配置所覆盖 - 需要配置config/topics.json中的AUTO_CREATE_TOPIC_KEY的读写队列数量,该配置会覆盖broker.conf中的[defaultTopicQueueNums]配置项 3. 自动创建的topic会随机创建在一个启用了自动创建topic功能的broker上 #### 特性8:流量控制 > 通过拒绝服务端发送请求来达到流控目的 1. commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,定时执行,参数默认为1000ms,返回流控。 ***关键代码:BrokerFastFailure#cleanExpiredRequest*** ```java while (this.brokerController.getMessageStore().isOSPageCacheBusy()) { try { if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS); if (null == runnable) { break; } final RequestTask rt = castRunnable(runnable); rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size())); } else { break; } } catch (Throwable ignored) { } } ``` 2. 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。 ***关键代码:AllocateMappedFileService#putRequestAndReturnMappedFile*** ```java if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //计算剩余资源 canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size(); } } ··· if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs()); this.requestTable.remove(nextFilePath); return null; } ``` 3. broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。 ***关键代码:BrokerFastFailure#cleanExpiredRequestInQueue*** ```java while (true) { try { if (!blockingQueue.isEmpty()) { final Runnable runnable = blockingQueue.peek(); if (null == runnable) { break; } final RequestTask rt = castRunnable(runnable); if (rt == null || rt.isStopRun()) { break; } final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); if (behind >= maxWaitTimeMillsInQueue) { if (blockingQueue.remove(runnable)) { rt.setStopRun(true); rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size())); } } else { break; } } else { break; } } catch (Throwable ignored) { } } ``` ### 消费者特性 ***消费者流程概览*** ![](docs/消费流程概览.png) #### 特性1:集群模式-普通消息订阅模式 ***流程图:订阅并发消息主要执行流程*** ![](docs/集群模式-并行消费流程.png) ***代码示例*** ```java public class ConcurrentlyConsumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConcurrentlyConsumerGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.setInstanceName(UUID.randomUUID().toString()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("ConcurrentlyConsumerTopic", "*"); consumer.setConsumeMessageBatchMaxSize(100); consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() { @Override public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { return mqAll; } @Override public String getName() { return null; } }); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { //后续消息重试间隔 //-1:不重试,直接放入死信队列 //0:由Broker服务端定义,默认初次为10s //>0:自定义重试间隔 context.setDelayLevelWhenNextConsume(-1); //部分成功,定义已成功消费的消息下标 //context.setAckIndex(0); final String format = MessageFormat .format("{0} Receive New Messages,QueueId {1} Thread Name {2}", msgs.size(), context.getMessageQueue().getQueueId(), Thread.currentThread().getName()); System.out.println(format); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); } } ``` ***注意事项*** 1. 消费者组在集群模式下会默认订阅重试Topic(%RETRY%消费者组名称),用于订阅进入重试队列的失败消息 ***关键代码:DefaultMQPushConsumerImpl#copySubscription*** ```java switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } ``` 2. 消费无论成功还是失败都将继续拉取后续的消息,消费失败的消息会进入重试队列,失败次数超过阈值将进入死信队列 ***关键代码:SendMessageProcessor#consumerSendMsgBack*** ```java if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return response; } } ``` 3. 多实例的消费者组可根据AllocateMessageQueueStrategy定义的策略,消费同一消费队列的消息 ***示例*** ```java consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() { @Override public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { return mqAll; } @Override public String getName() { return null; } }); ``` 4. 进入死信队列的消息,需要额外订阅死信消息进行消费(%DLQ%消费者组名称),并且所有失败的消息都会共用同一个死信TOPIC,实际消费时,需要获取属性头中的真实TOPIC进行区分,并开启死信Topic的读写权限,除此之外还可以通过重新发送原有来的消息进行重消费 ***示例:真实Topic*** ```s ./mqadmin queryMsgByOffset -n localhost:9876 -t %DLQ%ConcurrentlyConsumerGroup -b broker-a -i 0 -o 1 OffsetID: 7F00000100002EEB000000000D957671 OffsetID: 7F00000100002EEB000000000D957671 Topic: %DLQ%ConcurrentlyConsumerGroup Tags: [null] Keys: [null] Queue ID: 0 Queue Offset: 1 CommitLog Offset: 227898993 Reconsume Times: 4 Born Timestamp: 2022-01-28 19:02:18,914 Store Timestamp: 2022-02-10 09:55:42,258 Born Host: 127.0.0.1:56815 Store Host: 127.0.0.1:12011 System Flag: 0 Properties: {MIN_OFFSET=0, REAL_TOPIC=%DLQ%ConcurrentlyConsumerGroup, ORIGIN_MESSAGE_ID=7F00000100002EEB000000000D906660, RETRY_TOPIC=AllocateQueueByCustomProducerTopic2, MAX_OFFSET=9, UNIQ_KEY=0A019BE60CF818B4AAC28F2186220005, WAIT=false, DELAY=5, REAL_QID=0} Message Body Path: /tmp/rocketmq/msgbodys/0A019BE60CF818B4AAC28F2186220005 ``` ***代码示例:死信消息消费*** ```java public class DLQConcurrentlyConsumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConcurrentlyConsumerGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.setInstanceName("1"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("%DLQ%ConcurrentlyConsumerGroup", "*"); consumer.setConsumeMessageBatchMaxSize(100); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { final String format = MessageFormat .format("{0} Receive New Messages,QueueId {1} Thread Name {2}", msgs.size(), context.getMessageQueue().getQueueId(), Thread.currentThread().getName()); System.out.println(format); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); } } ``` 5. 在ConsumeTimeout(默认15分钟)后将清除本地(ProcessQueue,影响Offset更新)还未消费的数据,并将这些数据发往Broker进入重试队列,注意幂等处理 ***关键代码:ConsumeMessageConcurrentlyService#start*** ```java this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { cleanExpireMsg(); } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); ``` 6. 当消费者组没有配置重试队列(retryQueueNums<1)时,消费失败后的消息不会再重试 ***关键代码:SendMessageProcessor#consumerSendMsgBack*** ```java if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } ``` 7. InstanceName属性**需要不同**,否则可能造成消息在相同消费者组的多个实例中消费 8. 注意相同消费者组不同实例订阅关系是否一致,如果不一致需要注意订阅逻辑可能产生混乱,如果有需要变动原有的订阅关系需注意变更后产生的影响 - 例1:如果topic一致,但tag不一样,会造成相互覆盖订阅信息,造成拉取消息时获取数据问题,因为订阅信息会定时发送给broker更新,拉取消息时从Broker本地获取消费者组的订阅信息 - 例2:如果订阅的topic不一致,在分配Topic队列的时候,会造成没有该topic订阅的消费者组不会分配队列 9. ConsumeFromWhere为CONSUME_FROM_LAST_OFFSET时并非指定Topic的最大偏移量,而是当前Topic所消费位置的偏移量,如果新增了Topic,则必定偏移量从0开始,可通过重写OffsetStore改变规 10. ConsumeMessageBatchMaxSize超过PullBatchSize无意义,PullBatchSize代表每次从Broker拉取的消息数量,ConsumeMessageBatchMaxSize代表从Broker拉取消息后,每次本地处理的消息数量 ``` pullBatchSize能够从Broker拉取的消息数量受到以下几个Broker参数影响 [getMaxTransferBytesOnMessageInMemory]:消息在内存时允许传输的字节 [getMaxTransferCountOnMessageInMemory]:消息在内存时允许传输的数量 [getMaxTransferBytesOnMessageInDisk]:消息在磁盘时允许传输的字节 [getMaxTransferCountOnMessageInDisk]:消息在磁盘时允许传输的数量 ``` 11. subscribe方法里的fullClassName, filterClassSource属性无效 12. 如果拉取Topic的偏移量超出最大偏移量,或者小于最小偏移量(消息会定期清除,导致最小偏移量会不断靠拢最大偏移量)时,将移除本地拥有的队列,并更新从服务端获取的最小偏移量,然后重新从服务端获取此消费实例的队列 #### 特性2:广播模式-普通消息订阅模式 ***流程图:广播消息主要执行流程*** ![](docs/广播模式-并行消费流程.png) ***示例代码*** ```java public class BroadcastingConsumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BroadcastingConsumerGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.setInstanceName("1"); //指定消费者为广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //指定从最新偏移量开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("BroadcastingConsumerTopic", "*"); consumer.setConsumeMessageBatchMaxSize(100); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { final String format = MessageFormat .format("{0} Receive New Messages,QueueId {1} Thread Name {2}", msgs.size(), context.getMessageQueue().getQueueId(), Thread.currentThread().getName()); System.out.println(format); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); } } ``` ***注意事项*** 1. **消费失败后不会再重新投递消息** 2. 消费Topic后,偏移量会更新在本地文件中 3. ConsumeFromWhere为CONSUME_FROM_LAST_OFFSET时,如果从未消费过该Topic,会从最新的消息开始消费。如果已经消费过该Topic,将从本地读取该Topic消费位置继续消费 4. 广播模式不支持顺序消息消费,无法持有处理队列锁,并且顺序消费理念与广播消息相违背,顺序消息只能由一个服务实例持有锁,而广播消息是所有实例都持有消费队列 ***关键代码:RebalanceImpl#updateProcessQueueTableInRebalance*** ```java //广播消息isOrder为false,无法持有处理队列 if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } ``` #### 特性3:集群模式-顺序消息订阅模式 ***流程图:订阅顺序消息主要执行流程*** ![](docs/集群模式-顺序消费流程.png) ***示例代码*** ```java public class OrderlyConsumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderlyConsumerGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.setInstanceName(UUID.randomUUID().toString()); consumer.setMessageModel(MessageModel.CLUSTERING); //从消费者组当前topic已消费位置开始继续消费,非该topic最大偏移量 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("OrderlyConsumerTopic", "*"); consumer.setConsumeMessageBatchMaxSize(100); consumer.setPullBatchSize(100); consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { //失败后10秒后重试 context.setSuspendCurrentQueueTimeMillis(10*1000); final String format = MessageFormat .format("{0} Receive New Messages,QueueId {1} Thread Name {2}", msgs.size(), context.getMessageQueue().getQueueId(), Thread.currentThread().getName()); System.out.println(format); return ConsumeOrderlyStatus.SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); } } ``` #### 特性4:消息过滤 ##### 根据Tag过滤 ```java public class FilterByTagConsumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FilterConsumerGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.setInstanceName("1"); //从消费者组当前topic已消费位置开始继续消费,非该topic最大偏移量 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //ABCDEa123abc和ABCDFB123abc的hashcode相同会被视为同一个tag //consumer.subscribe("FilterConsumerTopic2", "ABCDEa123abc || ABCDFB123abc"); consumer.subscribe("FilterConsumerTopic3", "A"); consumer.setConsumeMessageBatchMaxSize(32); consumer.setPullBatchSize(32); consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() { @Override public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { return mqAll; } @Override public String getName() { return null; } }); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { final String format = MessageFormat .format("{0} Receive New Messages,QueueId {1} Thread Name {2}", msgs.size(), context.getMessageQueue().getQueueId(), Thread.currentThread().getName()); System.out.println(format); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); } } ``` ***注意事项*** 1. tag在服务端进行过滤时,是已tag对应的hashcode的形式对比的,如果不同tag的hashcode相同则认为是相同的tag ***关键代码[客户端]:FilterAPI#buildSubscriptionData*** ```java public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|"); if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { subscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception("subString split error"); } } return subscriptionData; } ``` ***关键代码[服务端]:ExpressionMessageFilter#isMatchedByConsumeQueue*** ```java @Override public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { if (null == subscriptionData) { return true; } if (subscriptionData.isClassFilterMode()) { return true; } // by tags code. if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { if (tagsCode == null) { return true; } if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) { return true; } return subscriptionData.getCodeSet().contains(tagsCode.intValue()); } } ``` 2. dashbord上关于事件是否被消费的trackType属性为CONSUMED不一定为已消费,同理为CONSUMED_BUT_FILTERED不一定为未消费,最终该值要取决于当前消费偏移量和消费者组是否定义了该TAG 如果消费者偏移量为最新,定义了A||B两个TAG,即使之前没有消费过B的TAG的消息,相关信息也会为CONSUMED_BUT_FILTERED | TrackType | 消费者TAG未变动 | 消费者TAG变动 | | :-------------------: | :------------------------------------: | :-----------: | | CONSUMED | 已消费 | 不一定已消费 | | CONSUMED_BUT_FILTERED | 消息已在服务端过滤,客户端未拉取到消息 | 不一定未消费 | | NOT_CONSUME_YET | 未消费 | 未消费 | 3. 即使消息被过滤没有拉取到消息,消费者也会更新当前最大偏移量 ***关键代码[客户端]:DefaultMQPushConsumerImpl#pullMessage*** ```java PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: ... break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); //更新本地偏移量 DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); //更新本地偏移量 DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: ... default: break; } } } ... }; ``` ***关键代码[服务端]:PullMessageProcessor#processRequest*** ```java responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); ``` ##### 根据Sql过滤 ***示例代码*** ```java public class FilterBySqlConsumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FilterBySqlConsumerGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.setInstanceName("1"); //从消费者组当前topic已消费位置开始继续消费,非该topic最大偏移量 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("FilterBySqlTopic", MessageSelector.bySql("TAGS in ('A') and a='c'")); consumer.setConsumeMessageBatchMaxSize(32); consumer.setPullBatchSize(32); consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() { @Override public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { return mqAll; } @Override public String getName() { return null; } }); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { final String format = MessageFormat .format("{0} Receive New Messages,QueueId {1} Thread Name {2}", msgs.size(), context.getMessageQueue().getQueueId(), Thread.currentThread().getName()); System.out.println(format); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); } } ``` ***注意事项*** 1. 以sql方式过滤时,broker的配置属性enablePropertyFilter需要为true,否则无法消费消息 2. 对比Tag效率略低,内部使用了BloomFilter避免了每次都去执行行。 #### 特性5:流量控制 1. [客户端]消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。超过后50ms后尝试重新拉取 2. [客户端]消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。超过后50ms后尝试重新拉取 3. [客户端]消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。超过后50ms后尝试重新拉取 4. [客户端]消费者本地可配置pullInterval来控制每次拉取的时间间隔,默认0 5. [客户端]消费者本地可配置pullBatchSize来控制每次从Borker拉取的消息数量,默认32 6. [客户端]消费者本地可配置consumeThreadMin和consumeThreadMax来控制队列消费的并行度,默认最小20,最大64 7. [服务端]Broker可配置maxTransferCountOnMessageInMemory来控制客户端每次拉取的最大消息数量,默认32 8. [服务端]Broker还可配置MaxTransferCountOnMessageInDisk、MaxTransferBytesOnMessageInDisk、MaxTransferBytesOnMessageInMemory、MaxTransferCountOnMessageInMemory等属性来控制客户端每次拉取的数量 ## 服务端特性介绍 ### Broker特性 #### 特性1:消息持久化 ***持久化文件介绍*** * CommitLog:主要用于根据偏移量查询消息,存储了消息的元数据 * ConsumerQueue:主要用于客户端拉取消息时快速定位消息,存储以队列为维度的数据,内容包含偏移量、消息大小、tagsCode的hash值 * IndexFile:可选,增强消息查询方式,可通过key与时间区间来查询消息,实现方式类似HashMap ***流程图:消息持久化主要执行流程*** ![](docs/服务端-消息持久化流程.png) ## 命令介绍 ### Topic相关
名称 含义 命令选项 说明
updateTopic 创建更新Topic配置 -b Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
-c cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询)
-h- 打印帮助
-n NameServer服务地址,格式 ip:port
-p 指定新topic的读写权限( W=2|R=4|WR=6 )
-r 可读队列数(默认为 8)
-w 可写队列数(默认为 8)
-t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
deleteTopic 删除Topic -c cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询)
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
topicList 查看 Topic 列表信息 -h 打印帮助
-c 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数
-n NameServer 服务地址,格式 ip:port
topicRoute 查看 Topic 路由信息 -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
topicStatus 查看 Topic 消息队列offset -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
topicClusterList 查看 Topic 所在集群列表 -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
updateTopicPerm 更新 Topic 读写权限 -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
-b Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
-p 指定新 topic 的读写权限( W=2|R=4|WR=6 )
-c cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令
updateOrderConf 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic,键
-v orderConf,值
-m method,可选get、put、delete
allocateMQ 以平均负载算法计算消费者列表负载消息队列的负载结果 -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
-i ipList,用逗号分隔,计算这些ip去负载Topic的消息队列
statsAll 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-a 是否只打印活跃topic
-t 指定topic
### 集群相关
名称 含义 命令选项 说明
clusterList 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 -m 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
-i 打印间隔,单位秒
clusterRT 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 -a amount,每次探测的总数,RT = 总时间 / amount
-s 消息大小,单位B
-c 探测哪个集群
-p 是否打印格式化日志,以|分割,默认不打印
-h 打印帮助
-m 所属机房,打印使用
-i 发送间隔,单位秒
-n NameServer 服务地址,格式 ip:port
### Broker相关
名称 含义 命令选项 说明
updateBrokerConfig 更新 Broker 配置文件,会修改Broker.conf -b Broker 地址,格式为ip:port
-c cluster 名称
-k key 值
-v value 值
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
brokerStatus 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) -b Broker 地址,地址为ip:port
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
brokerConsumeStats Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 -b Broker 地址,地址为ip:port
-t 请求超时时间
-l diff阈值,超过阈值才打印
-o 是否为顺序topic,一般为false
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
getBrokerConfig 获取Broker配置 -b Broker 地址,地址为ip:port
-n NameServer 服务地址,格式 ip:port
wipeWritePerm 从NameServer上清除 Broker写权限 -b BrokerName
addWritePerm 从NameServer上添加 Broker写权限 -b BrokerName
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
cleanExpiredCQ 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b Broker 地址,地址为ip:port
-c 集群名称
cleanUnusedTopic 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b Broker 地址,地址为ip:port
-c 集群名称
sendMsgStatus 向Broker发消息,返回发送状态和RT -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b BrokerName,注意不同于Broker地址
-s 消息大小,单位B
-c 发送次数
### 消息相关
名称 含义 命令选项 说明
queryMsgById 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 -i msgId
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
queryMsgByKey 根据消息 Key 查询消息 -k msgKey
-t Topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
queryMsgByOffset 根据 Offset 查询消息 -b Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到)
-i query 队列 id
-o offset 值
-t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
queryMsgByUniqueKey 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-i uniqe msg id
-g consumerGroup
-d clientId
-t topic名称
checkMsgSendRT 检测向topic发消息的RT,功能类似clusterRT -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-a 探测次数
-s 消息大小
sendMessage 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-p body,消息体
-k keys
-c tags
-b BrokerName
-i queueId
consumeMessage 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-b BrokerName
-o 从offset开始消费
-i queueId
-g 消费者分组
-s 开始时间戳,格式详见-h
-d 结束时间戳
-c 消费多少条消息
printMsg 从Broker消费消息并打印,可选时间段 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-c 字符集,例如UTF-8
-s subExpress,过滤表达式
-b 开始时间戳,格式参见-h
-e 结束时间戳
-d 是否打印消息体
printMsgByQueue 类似printMsg,但指定Message Queue -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-i queueId
-a BrokerName
-c 字符集,例如UTF-8
-s subExpress,过滤表达式
-b 开始时间戳,格式参见-h
-e 结束时间戳
-p 是否打印消息
-d 是否打印消息体
-f 是否统计tag数量并打印
resetOffsetByTime 按时间戳重置offset,Broker和consumer都会重置 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-g 消费者分组
-t topic名称
-s 重置为此时间戳对应的offset
-f 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系
-c 是否重置c++客户端offset
### 消费者、消费组相关
名称 含义 命令选项 说明
consumerProgress 查看订阅组消费状态,可以查看具体的client IP的消息积累量 -g 消费者所属组名
-s 是否打印client IP
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
consumerStatus 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-g consumer group
-i clientId
-s 是否执行jstack
updateSubGroup 更新或创建订阅关系 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b Broker地址
-c 集群名称
-g 消费者分组名称
-s 分组是否允许消费
-m 是否从最小offset开始消费
-d 是否是广播模式
-q 重试队列数量
-r 最大重试次数
-i 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费
-w 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1
-a 当消费者数量变化时是否通知其他消费者负载均衡
deleteSubGroup 从Broker删除订阅关系 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b Broker地址
-c 集群名称
-g 消费者分组名称
cloneGroupOffset 在目标群组中使用源群组的offset -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-s 源消费者组
-d 目标消费者组
-t topic名称
-o 暂未使用
### 连接相关
名称 含义 命令选项 说明
consumerConnection 查询 Consumer 的网络连接 -g 消费者所属组名
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
producerConnection 查询 Producer 的网络连接 -g 生产者所属组名
-t 主题名称
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
### NameServer相关
名称 含义 命令选项 说明
updateKvConfig 更新NameServer的kv配置,目前还未使用 -s 命名空间
-k key
-v value
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
deleteKvConfig 删除NameServer的kv配置 -s 命名空间
-k key
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
getNamesrvConfig 获取NameServer配置 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
updateNamesrvConfig 修改NameServer配置 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-k key
-v value
### 其他
名称 含义 命令选项 说明
startMonitoring 开启监控进程,监控消息误删、重试队列消息数等 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
## 常用命令示例 ### 创建或者更新topic ***在集群上所有Broker注册该Topic*** ```bash ./mqadmin updateTopic -c DefaultCluster -n localhost:9876 -p 6 -t SampleTopic -r 8 -w 8 ``` ***在集群上指定Broker注册该Topic*** ```bash ./mqadmin updateTopic -b localhost:12011 -c DefaultCluster -n localhost:9876 -p 6 -t SampleTopic -r 8 -w 8 ``` ### 发送消息 ```bash ./mqadmin sendMessage -b broker-a -c A -i 0 -k T -n localhost:9876 -p 1 -t SampleTopic ``` ### 消费消息 ```base ./mqadmin consumeMessage -t SampleTopic -b broker-a -o 0 -i 0 -g SampleTopicConsumerGroup -c 10 -n localhost:9876 ``` ### 删除topic ```bash ./mqadmin deleteTopic -c DefaultCluster -n localhost:9876 -t SampleTopic ``` ### 列出NameServer上注册的所有Topic ```bash ./mqadmin topicList -c -n localhost:9876 ``` ### 查看Topic路由信息 ```bash ./mqadmin topicRoute -n localhost:9876 -t SampleTopic ``` ### 查看Topic状态 ```bash ./mqadmin topicStatus -n localhost:9876 -t SampleTopic ``` ### 更新Broker配置 ***注意事项*** 部分参数不会及时生效需要重启Broker ***及时生效*** ```base ./mqadmin updateBrokerConfig -n localhost:9876 -b localhost:12011 -k flushDiskType -v ASYNC_FLUSH ``` ***重启生效*** ```base ./mqadmin updateBrokerConfig -n localhost:9876 -b localhost:12011 -k autoCreateTopicEnable -v true ``` ### 查看Broker配置 ```base ./mqadmin getBrokerConfig -n localhost:9876 -b localhost:12011 ``` ### 根据MessageId查询消息 ```base ./mqadmin queryMsgByUniqueKey -n localhost:9876 -i 0A019BE6172318B4AAC2130146CE0000 -t FilterConsumerTopic3 ``` ### 根据MessageKey查询消息 ```base ./mqadmin queryMsgByKey -n localhost:9876 -k test -t FilterConsumerTopic3 ``` ### 根据队列偏移量查询消息 ```base ./mqadmin queryMsgByOffset -b broker-a -n localhost:9876 -t FilterConsumerTopic3 -i 0 -o 413 ``` ### 查看消费进度 ```base ./mqadmin consumerProgress -g FilterConsumerGroup -n localhost:9876 ``` ### 查看消费者状态 ***详细描述*** 1. 可查看同一个分组中是否都是相同的订阅 2. 可查看Process Queue是否堆积 3. 可查看消费者jstack结果 4. 可查看消费者组ClientId 5. 可查看客户端Rocket版本号 ```base ./mqadmin consumerStatus -g FilterConsumerGroup -n localhost:9876 -s ``` ***注意事项*** 1. 消费者组需要启动后命令才能执行 ### 查看消费者连接 ```base ./mqadmin consumerConnection -g FilterConsumerGroup -n localhost:9876 ``` ## 常用配置&数据结构 ### Broker 配置 | 参数名 | 默认值 | 说明 | | ----------------------- | ---------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | | listenPort | 10911 | 接受客户端连接的监听端口 | | namesrvAddr | null | nameServer 地址 | | brokerIP1 | 网卡的 InetAddress | 当前 broker 监听的 IP | | brokerIP2 | 跟 brokerIP1 一样 | 存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步 | | brokerName | null | broker 的名称 | | brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 | | brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave | | storePathRootDir | $HOME/store/ | 存储根路径 | | storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 | | mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 | ​ | | deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log | ​ | | fileReservedTime | 72 | 以小时计算的文件保留时间 | ​ | | brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE | ​ | | flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 | ​ | ### 客户端的公共配置 | 参数名 | 默认值 | 说明 | | ----------------------------- | ------- | ------------------------------------------------------------------------------------------------------------ | | namesrvAddr | | Name Server地址列表,多个NameServer地址用分号隔开 | | clientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 | | instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) | | clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 | | pollNameServerInteval | 30000 | 轮询Name Server间隔时间,单位毫秒 | | heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 | | persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 | ### Producer配置 | 参数名 | 默认值 | 说明 | | -------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------------------------------------------ | | producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组 | | createTopicKey | TBW102 | 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。 | | defaultTopicQueueNums | 4 | 在发送消息,自动创建服务器不存在的topic时,默认创建的队列数 | | sendMsgTimeout | 3000 | 发送消息超时时间,单位毫秒 | | compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 | | retryAnotherBrokerWhenNotStoreOK | FALSE | 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 | | retryTimesWhenSendFailed | 2 | 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用 | | maxMessageSize | 4MB | 客户端限制的消息大小,超过报错,同时服务端也会限制,所以需要跟服务端配合使用。 | | transactionCheckListener | | 事务消息回查监听器,如果发送事务消息,必须设置 | | checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池最小线程数 | | checkThreadPoolMaxSize | 1 | Broker回查Producer事务状态时,线程池最大线程数 | | checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 | | RPCHook | null | 该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作。 | ### PushConsumer配置 | 参数名 | 默认值 | 说明 | | ---------------------------- | ----------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 | | messageModel | CLUSTERING | 消费模型支持集群消费和广播消费两种 | | consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从上次消费的位置开始消费,这包含两种情况:一种是上次消费的位置未过期,则消费从上次中止的位置进行;一种是上次消费位置已经过期,则从当前队列第一条消息开始消费 | | consumeTimestamp | 半个小时前 | 只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP时才起作用。 | | allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 | | subscription | | 订阅关系 | | messageListener | | 消息监听器 | | offsetStore | | 消费进度存储 | | consumeThreadMin | 20 | 消费线程池最小线程数 | | consumeThreadMax | 20 | 消费线程池最大线程数 | | consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 | | pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 | | pullInterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒 | | consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 | | pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 | ### PullConsumer配置 | 参数名 | 默认值 | 说明 | | -------------------------------- | ----------------------------- | -------------------------------------------------------------------------------------------------- | | consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 | | brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 | | consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒 | | consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 | | messageModel | BROADCASTING | 消息支持两种模式:集群消费和广播消费 | | messageQueueListener | | 监听队列变化 | | offsetStore | | 消费进度存储 | | registerTopics | | 注册的topic集合 | | allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 | ### Message数据结构 | 字段名 | 默认值 | 说明 | | -------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | Topic | null | 必填,消息所属topic的名称 | | Body | null | 必填,消息体 | | Tags | null | 选填,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个tag | | Keys | null | 选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。 | | Flag | 0 | 选填,完全由应用来设置,RocketMQ不做干预 | | DelayTimeLevel | 0 | 选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 | | WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答。 | ## 其他 1. 实际使用时,注意配置客户端日志大小,否则应用服务长时间运行将产生大量日志数据,默认最大堆积10G日志数据 ***关键代码*** ```java //配置每个日志文件大小 System.getProperty(ClientLogger.CLIENT_LOG_FILESIZE, String.valueOf(1024 * 1024 * 1)); //配置总共可以有多少个日志文件 System.getProperty(ClientLogger.CLIENT_LOG_MAXINDEX, String.valueOf(1)); ``` 2. 如果要打印RocketMQ相关日志,需要在配置文件中配置想要打印的日志类目 ***关键配置*** ```yml logging: level: root: INFO RocketmqClient: WARN RocketmqRemoting: WARN RocketmqCommon: WARN ``` 3. 消息会因为各种情况导致重消费,客户端要自己保证消息幂等性 - 客户端消费后宕机导致消费位点未更新到服务端 - 消息消费失败后的消息重投 - 并行消费模式下队列重复分配,导致多个实例消费同一个队里的消息 - 并行消费模式下,多批消息进行消费,后一批消息先消费完成,此时服务器宕机,还会从第一批消费点位起始位置开始拉取消息,导致后续已消费消息重消费 - 消息队列重新进行分配时,因为消费位点未及时更新可能导致,新消费者组实例从旧点位开始拉取消息消费 4. 扩容Topic队列时如何保障影响降低到最小 - 并发模式:可以随时增加队列 - 顺序模式:先停止生产,等消费者消费完成,然后就可以增加队列,最后开启生产 ## 附录