# rocketmq源码注释学习 **Repository Path**: caoyanan/rocketmq-source-code ## Basic Information - **Project Name**: rocketmq源码注释学习 - **Description**: rocketmq源码注释学习 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 3 - **Forks**: 0 - **Created**: 2021-01-21 - **Last Updated**: 2021-07-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # rocketmq源码注释学习 ## 场景1:NameServer的启动 ## 场景2:Broker的启动 ## 场景3:Broker向NameServer注册 1. 在brokerstart过程中,会执行向NameServer注册的过程 ```java ... this.registerBrokerAll(true, false, true); ... ``` 2. 创建topic配置,利用对外通信组件brokerOuterAPI将broker注册到所有的NameServer上 ```java ... this.brokerOuterAPI.registerBrokerAll( ...) ... ``` 3. 注册过程中,首先构建请求头、请求体,通过 NettyRemotingClient组件将连接请求发送出去 ```java ... this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); ... ``` 4. NettyRemotingClient网络请求步骤,首先,创建一个channel连接,使用Netty的Bootstrap类的connect方法,然后发送请求 ```java ...缓存获取 this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); this.invokeSyncImpl(channel, request, timeoutMillis - costTime); ``` 5. 使用channel连接将请求发送出去 ```java //发送请求详情时基于channel将网络请求写出去 channel.writeAndFlush() ``` 6. broker定时发送心跳到nameserver 在 BrokerController 的 start() 方法里面启动了一个定时任务 ```java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); ``` 默认每30秒发送一次注册请求 ## 场景4,NameServer如何处理broker发送的注册请求? 1. 在 NamesrvController的初始化initialize()方法中,注册了网络请求处理器processor ```java this.registerProcessor(); //实现里面注册了 DefaultRequestProcessor处理器 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); ``` 2. 所以,网络请求都被 DefaultRequestProcessor 处理器接收处理的 processRequest方法进行处理,不同的code对应不同的处理方法 接收broker注册请求的方法是 this.registerBroker(ctx, request); 3. DefaultRequestProcessor::registerBroker方法如何完成的注册broker 解析注册请求,构建返回响应 加载topic配置 然后调用了 RouteInfoManager 核心路由信息管理组件的注册broker方法 ```java RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), topicConfigWrapper, null, ctx.channel() ); ``` 4. RouteInfoManager 组件式如何注册broker的 RouteInfoManager 将broker数据维护到自己的路由数据表里面 使用了读写锁,一开始加了一个写锁,同一时间,只能一个线程执行 然后有一个维护了所有BrokerName的Set集和, 然后有一个 brokerAddrTable 核心路由数据表。并且维护了一个brokerLiveTable 5. 在 NamesrvController 的 initialize() 方法启动不活跃定时扫描 调用了RouteInfoManager的scanNotActiverBroker()方法 ```java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); ``` 如果发现broker在规定的时间内没有发送心跳,就把broker移除出去 ## 场景5 producer组件 1. 创建流程 创建 DefaultMQProducer类,设置nameServer的地址,调用start方法 而实际上是 DefaultMQProducerImpl这个类才是真正执行发送逻辑的组件 2. topic路由数据 producer向nameServer获取topic的路由数据,比如topic有哪些MessageQueue, 每个MessageQueue在哪台broker上 当第一次发送消息到topic的时候,去拉取topic路由数据,然后选择一个MessageQueue,和对应的broker建立连接,然后发送消息 3. 如何从nameserver拉取topic数据 当发送消息的时候,调用 DefaultMQProducer::send() 方法, 最终路由到 DefaultMQProducerImpl::sendDefaultImpl()方法 获取topic信息: ```java TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); ``` 如果本地缓存存在,直接获取,否则向NameServer拉取 ```java updateTopicRouteInfoFromNameServer(topic, false, null); ``` 拉取的过程就是封装一个request请求对象,利用底层netty的客户端发送请求到NameServer,接收到一个Response对象,取出topic路由数据 4. producer选择哪个Messagequeue去发送消息 在 DefaultMQProducer::sendDefaultImpl()方法中,获取完topic信息之后,选择了一个MessageQueue ```java MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); ``` 如果sendLatencyFaultEnable是true,则就是一个对broker取模的简单轮询 这里面实现了broker故障自动回避机制 5. producer是如何把消息发送到broker的? 调用了 DefaultMQProducer的sendKernelImpl方法 ```java this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ```