This action will force synchronization from Thinkingcao/java-legendary, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
[TOC]
先来看看大佬们的代码是怎么写的:
package org.apache.rocketmq.common.protocol.header;
/**
* Use short variable name to speed up FastJson deserialization process.
*/
public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
@CFNotNull
private String a; // producerGroup;
@CFNotNull
private String b; // topic;
@CFNotNull
private String c; // defaultTopic;
@CFNotNull
private Integer d; // defaultTopicQueueNums;
@CFNotNull
private Integer e; // queueId;
@CFNotNull
private Integer f; // sysFlag;
@CFNotNull
private Long g; // bornTimestamp;
@CFNotNull
private Integer h; // flag;
@CFNullable
private String i; // properties;
@CFNullable
private Integer j; // reconsumeTimes;
@CFNullable
private boolean k; // unitMode = false;
private Integer l; // consumeRetryTimes
@CFNullable
private boolean m; //batch
}
这些abcdefg你爱了吗?以后我就这么写变量名,以注释写真实的含义。谁在骂我我就贴给他顶级开源项目的代码,让他自己慢慢品。
见【2、多个mq如何选型?】
MQ | 描述 |
---|---|
RabbitMQ | erlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理几万到十几万条消息。 |
RocketMQ | java开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,每秒钟大概能处理几十万条消息。 |
Kafka | Scala开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。 |
ActiveMQ | java开发,简单,稳定,性能不如前面三个。小型系统用也ok,但是不推荐。推荐用互联网主流的。 |
因为项目比较大,做了分布式系统,所有远程服务调用请求都是同步执行经常出问题,所以引入了mq
作用 | 描述 |
---|---|
解耦 | 系统耦合度降低,没有强依赖关系 |
异步 | 不需要同步执行的远程调用可以有效提高响应时间 |
削峰 | 请求达到峰值后,后端service还可以保持固定消费速率消费,不会被压垮 |
角色 | 作用 |
---|---|
Nameserver | 无状态,动态列表;这也是和zookeeper的重要区别之一。zookeeper是有状态的。 |
Producer | 消息生产者,负责发消息到Broker。 |
Broker | 就是MQ本身,负责收发消息、持久化消息等。 |
Consumer | 消息消费者,负责从Broker上拉取消息进行消费,消费完进行ack。 |
queue就是来源于数据结构的FIFO队列。而Topic是个抽象的概念,每个Topic底层对应N个queue,而数据也真实存在queue上的。
不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。
4.6版本默认48小时后会删除不再使用的CommitLog文件
源码如下:
/**
* {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isTimeToDelete()}
*/
private boolean isTimeToDelete() {
// when = "04";
String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
// 是04点,就返回true
if (UtilAll.isItTimeToDo(when)) {
return true;
}
// 不是04点,返回false
return false;
}
/**
* {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles()}
*/
private void deleteExpiredFiles() {
// isTimeToDelete()这个方法是判断是不是凌晨四点,是的话就执行删除逻辑。
if (isTimeToDelete()) {
// 默认是72,但是broker配置文件默认改成了48,所以新版本都是48。
long fileReservedTime = 48 * 60 * 60 * 1000;
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(72 * 60 * 60 * 1000, xx, xx, xx);
}
}
/**
* {@link org.apache.rocketmq.store.CommitLog#deleteExpiredFile()}
*/
public int deleteExpiredFile(xxx) {
// 这个方法的主逻辑就是遍历查找最后更改时间+过期时间,小于当前系统时间的话就删了(也就是小于48小时)。
return this.mappedFileQueue.deleteExpiredFileByTime(72 * 60 * 60 * 1000, xx, xx, xx);
}
消费模型由Consumer决定,消费维度为Topic。
1.一条消息只会被同Group中的一个Consumer消费
2.多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据
消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式
broker端属性 longPollingEnable 标记是否开启长轮询。默认开启
源码如下:
// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}
// 看到没,这是一只披着羊皮的狼,名字叫PushConsumerImpl,实际干的确是pull的活。
// 拉取消息,结果放到pullCallback里
this.pullAPIWrapper.pullKernelImpl(pullCallback);
事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。
如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。
Consumer首次请求Broker
通过Topic在多Broker中分布式存储实现。
发送端指定message queue发送消息到相应的broker,来达到写入时的负载均衡
默认策略是随机选择:
其他实现:
也可以自定义实现MessageQueueSelector接口中的select方法
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
采用的是平均分配算法来进行负载均衡。
其他负载均衡算法
平均分配策略(默认)(AllocateMessageQueueAveragely) 环形分配策略(AllocateMessageQueueAveragelyByCircle) 手动配置分配策略(AllocateMessageQueueByConfig) 机房分配策略(AllocateMessageQueueByMachineRoom) 一致性哈希分配策略(AllocateMessageQueueConsistentHash) 靠近机房策略(AllocateMachineRoomNearby)
Consumer和queue会优先平均分配,如果Consumer少于queue的个数,则会存在部分Consumer消费多个queue的情况,如果Consumer等于queue的个数,那就是一个Consumer消费一个queue,如果Consumer个数大于queue的个数,那么会有部分Consumer空余出来,白白的浪费了。
影响消息正常发送和消费的重要原因是网络的不确定性。
引起重复消费的原因
正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除
当ack因为网络原因无法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer
在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次,但是针对不同group的consumer会推送多次
解决方案
处理消息前,使用消息主键在表中带有约束的字段中insert
单机时可以使用map ConcurrentHashMap -> putIfAbsent guava cache
分布式锁搞起来。
你们线上业务用消息中间件的时候,是否需要保证消息的顺序性?
如果不需要保证消息顺序,为什么不需要?假如我有一个场景要保证消息的顺序,你们应该如何保证?
首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。所以总结如下:
同一topic,同一个QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个queue里的消息。
利用唯一id去定位Queue,同一id的消息发向相同队列。
Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断i % 2 == 0
,那就都放到queue1里,否则放到queue2里。
for (int i = 0; i < 5; i++) {
Message message = new Message("orderTopic", ("hello!" + i).getBytes());
producer.send(
// 要发的那条消息
message,
// queue 选择器 ,向 topic中的哪个queue去写消息
new MessageQueueSelector() {
// 手动 选择一个queue
@Override
public MessageQueue select(
// 当前topic 里面包含的所有queue
List<MessageQueue> mqs,
// 具体要发的那条消息
Message msg,
// 对应到 send() 里的 args,也就是2000前面的那个0
Object arg) {
// 向固定的一个queue里写消息,比如这里就是向第一个queue里写消息
if (Integer.parseInt(arg.toString()) % 2 == 0) {
return mqs.get(0);
} else {
return mqs.get(1);
}
}
},
// 自定义参数:0
// 2000代表2000毫秒超时时间
i, 2000);
}
会。所以当使用顺序消费的回调(实现了MessageListenerOrderly接口)时,由于顺序消费是只有前一条消息消费成功了才能继续,所以在其消息状态定义(ConsumeOrderlyStatus)中并没有RECONSUME_LATER状态,而是用SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停当前队列的消费动作,直到消息经过不断重试成功为止。
首先在如下三个部分都可能会出现丢失消息的情况:
producer.setRetryTimesWhenSendFailed(10);
flushDiskType = SYNC_FLUSH
下游消费系统如果宕机了,导致几百万条消息在消息中间件里积压,此时怎么处理?
你们线上是否遇到过消息积压的生产故障?如果没遇到过,你考虑一下如何应对?
首先要找到是什么原因导致的消息堆积,是Producer太多了,Consumer太少了导致的还是说其他情况,总之先定位问题。
然后看下消息消费速度是否正常,正常的话,可以通过上线更多consumer临时解决消息堆积问题
准备一个临时的topic
queue的数量是堆积的几倍
queue分布到多Broker中
上线一台Consumer做消息的搬运工,把原来Topic中的消息挪到新的Topic里,不做业务逻辑处理,只是挪过去
上线N台Consumer同时消费临时Topic中的数据
改bug
恢复原来的Consumer,继续消费之前的Topic
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),16次(默认16次)才会进入死信队列(%DLQ%+ConsumerGroup)。
源码如下:
public class SubscriptionGroupConfig {
private int retryMaxTimes = 16;
}
// {@link org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncConsumerSendMsgBack}
// maxReconsumeTimes = 16
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
// 如果重试次数大于等于16,则创建死信队列
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {
// MixAll.getDLQTopic()就是给原有groupname拼上DLQ,死信队列
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
// 创建死信队列
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(xxx)
}
扩展:每次重试的时间间隔:
public class MessageStoreConfig {
// 每隔如下时间会进行重试,到最后一次时间重试失败的话就进入死信队列了。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}
看到这个源码你可能蒙蔽了,这不是18个时间间隔嘛。怎么是16次?继续看下面代码,我TM也懵了。
/** * {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack()} * * sendMessageBack()这个方法是消费失败后会请求他,意思是把消息重新放到队列,进行重试。 */ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) { Message newMsg = new Message(); // !!!我TM,真相了,3 + xxx。他是从第三个开始的。也就是舍弃了前两个时间间隔,18 - 2 = 16。也就是说第一次重试是在10s,第二次30s。 // TMD!!! // TMD!!! // TMD!!! newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); }
你们用的是RocketMQ?RocketMQ很大的一个特点是对分布式事务的支持,你说说他在分布式事务支持这块机制的底层原理?
分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性
RocketMQ 4.3+提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致
RocketMQ实现方式:
Half Message: 预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态: Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker会定时去回调在重新检查。
超时: 如果超过回查次数,默认回滚消息。
也就是他并未真正进入Topic的queue,而是用了临时queue来放所谓的half message,等提交事务后才会真正的将half message转移到topic下的queue。
我个人觉得从以下几个点回答吧:
要真让我说,我会吐槽蛮烂的,首先没任何注释,可能是之前阿里巴巴写了中文注释,捐赠给apache后,apache觉得中文注释不能留,自己又懒得写英文注释,就都给删了。里面比较典型的设计模式有单例、工厂、策略、门面模式。单例工厂无处不在,策略印象深刻比如发消息和消费消息的时候queue的负载均衡就是N个策略算法类,有随机、hash等,这也是能够快速扩容天然支持集群的必要原因之一。持久化做的也比较完善,采取的CommitLog来落盘,同步异步两种方式。
同一group下,多机部署,并行消费
单个Consumer提高消费线程个数
批量消费
其实就是send消息的时候queue的选择。源码在如下:
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue()
Broker主从架构以及多副本策略。Master收到消息后会同步给Slave,这样一条消息就不止一份了,Master宕机了还有slave中的消息可用,保证了MQ的可靠性和高可用性。而且Rocket MQ4.5.0开始就支持了Dlegder模式,基于raft的,做到了真正意义的HA。
这么问明显在坑你,因为Broker会向所有的NameServer上注册自己的信息,而不是某一个,是每一个,全部!
1)首先启动NameServer。NameServer启动后监听端口,等待Broker、Producer以及Consumer连上来
2)启动Broker。启动之后,会跟所有的NameServer建立并保持一个长连接,定时发送心跳包。心跳包中包含当前Broker信息(ip、port等)、Topic信息以及Borker与Topic的映射关系
3)创建Topic。创建时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
4)Producer发送消息。启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic所在的Broker;然后从队列列表中轮询选择一个队列,与队列所在的Broker建立长连接,进行消息的发送
5)Consumer消费消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,进行消息的消费
RocketMq采用文件系统存储消息,采用顺序写的方式写入消息,使用零拷贝发送消息,这三者的结合极大地保证了RocketMq的性能。
定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。 其实定时消息实现原理比较简单,如果一个topic对应的消息在发送端被设置为定时消息,那么会将该消息先存放在topic为SCHEDULE_TOPIC_XXXX的消息队列中,并将原始消息的信息存放在commitLog文件中,由于topic为SCHEDULE_TOPIC_XXXX,所以该消息不会被立即消息,然后通过定时扫描的方式,将到达延迟时间的消息,转换为正确的消息,发送到相应的队列进行消费。
也称1M。简单来说就是只部署一台Broker(MQ本身)作为主节点提供服务。
这种方式风险最大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用此方式。
也称NM(多主)模式。简单来说就是多个Broker都作为主节点(可读可写)去提供服务。无Slave,全是Master。
优点:配置简单,单个Master宕机或者重启维护对应用无影响,在磁盘配置为RAID10(不懂的Google了解下就行了)时,即使机器宕机不可恢复的情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘会丢失少量,同步刷盘一条不丢,下面会详细说到),性能是最高的。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
例如:
一个Producer的A主题要发送10条消息(一个主题的消息可以发送到不同Master上)到M1和M2两台Broker上,比如M1和M2各五条,当发送完M1和M2的时候恰巧M2宕机了,这时候Consumer是无法继续消费M2上的消息的,所以会消息实时性会受到影响。只能等到M2重新恢复后才可继续消费。数据不会丢失。
也称多Master多Slave,异步复制的方式。简单来说就是N台Master,每个Master又有N台(一般1台)Slave进行主从。形成NMNS。
每个Master配置N(一般是1)个Slave,有多对Master-Slave,HA采用异步复制方式,主备短暂消息延迟(毫秒级别的延迟)。
优点:即使磁盘损坏,丢失的消息非常少**(但是会丢失)**,且消息实时性不会受到影响,因为Master宕机后,消费者仍然可以从Slave上进行消费,此过程对应用完全透明,不需要人工干预,性能同多Master模式几乎一样。
缺点:Master宕机,磁盘损坏情况,会丢失少量消息。
目前主宕机后,备机不能自动切换为主节点。(4.3.1版本)
为什么会丢失少量消息?
因为异步复制原理是Producer发送消息到Broker,这时候Broker有Master和Slave,当Master确认将消息存储成功后就会立刻给Producer返回一个ack,而并不会等到Slave也存储成功后才会发送ack,也就说是这时候Slave的异步复制过去的,若Master存储完毕,返回ack了,突然之间宕机了(毫秒级别的,但是也有可能发生呀),并且磁盘损坏了(无法恢复了)。这时候数据就丢失了。
也称多Master多Slave,同步双写的方式。简单来说就是N台Master,每个Master又有N台(一般1台)Slave进行主从。形成NMNS。
每个Master配置N(一般是1)个Slave,有多对Master-Slave,HA采用同步双写的方式,主备都写成功,向应用返回ack确认。
优点:数据与服务都没有单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。数据不会丢失。
缺点:性能比异步复制模式略低,大约低10%左右,发送单个消息的RT会略高。
目前主宕机后,备机不能自动切换为主节点。(4.3.1版本)
为什么不会丢失少量消息?
因为同步双写原理是Producer发送消息到Broker,这时候Broker有Master和Slave,当Master确认将消息存储成功并且等到Slave也同步存储成功后才会发送ack确认给Producer,所以数据不会丢。
RocketMQ 4.5之后支持了一种叫做Dledger机制,基于Raft协议实现的一个机制。基于Dledger实现RocketMQ高可用自动切换。RocketMQ 4.5版本之前需要手动改配置进行切换。
RocketMq采用文件系统存储消息,并采用顺序写写入消息,使用零拷贝发送消息,极大得保证了RocketMq的性能。
如图所示,消息生产者发送消息到broker,都是会按照顺序存储在CommitLog文件中,每个commitLog文件的大小为1G
CommitLog:存储所有的消息元数据,包括Topic、QueueId以及message
CosumerQueue:消费逻辑队列:存储消息在CommitLog的offset
IndexFile:索引文件:存储消息的key和时间戳等信息,使得RocketMq可以采用key和时间区间来查询消息
也就是说,rocketMq将消息均存储在 CommitLog 中,并分别提供了CosumerQueue和IndexFile两个索引,来快速检索消息
Broker会定时(30s)向NameServer发送心跳
然后 NameServer会定时(10s)运行一个任务,去检查一下各个Broker的最近一次心跳时间,如果某个Broker超过120s都没发送心跳了,那么就认为这个Broker已经挂掉了。
RocketMQ 4.5版本之前,用Slave Broker同步数据,尽量保证数据不丢失,但是一旦Master故障了,Slave是没法自动切换成Master的。
所以在这种情况下,如果Master Broker宕机了,这时就得手动做一些运维操作,把Slave Broker重新修改一些配置,重启机器给调整为Master Broker,这是有点麻烦的,而且会导致中间一段时间不可用。
RocketMQ 4.5之后支持了一种叫做Dledger机制,基于Raft协议实现的一个机制。基于Dledger实现RocketMQ高可用自动切换。
我们可以让一个Master Broker对应多个Slave Broker, 一旦 Master Broker 宕机了,在多个 Slave 中通过 Dledger 技术 将一个 Slave Broker 选为新的 Master Broker 对外提供服务。在生产环境中可以是用Dledger机制实现自动故障切换,只要10秒或者几十秒的时间就可以完成
先说的小坑,同一个消费组,必须保证订阅消息一致。tag是消费组级别的信息,Key为topic,Value为tag。
过滤发生在消费者客户端,如果消费者发现tag不对应就丢弃请求。
生产者层面:
消费者层面:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。