# rocketmq-java-spring-boot **Repository Path**: public_land/rocketmq-java-spring-boot ## Basic Information - **Project Name**: rocketmq-java-spring-boot - **Description**: RocketMQ 5.x版本的spring boot starter脚手架,基于新的gRpc/protobuf协议。 https://rocketmq.apache.org/zh/docs/sdk/01overview - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 29 - **Forks**: 9 - **Created**: 2023-08-02 - **Last Updated**: 2025-08-14 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 背景 RocketMQ client有两个版本,4.x及以下版本是基于remoting协议,连接端点为'name server';当前的5.x版本是基于新的gRpc/protobuf协议, 连接端点为'proxy server',适合于新的存储和计算分离架构。 RocketMQ官方已有一个spring boot starter脚手架,是基于remoting协议版本的client, 由于5.x版本与4.x及以下版本API不同,故不适用5.x版本。 在开发此项目时官方没有维护5.x系列的spring boot starter脚手架,因工作需要开发了此`rocketmq-java-spring-boot-starter`脚手架。只适用于5.x版本 RocketMQ,若在用老版本的服务,请仍使用老版本的脚手架。 **注: 从5.0.6版本开始框架集成了发送消息时进行流控处理,默认开启流控,可以修改配置来关闭流控。** ## 功能 - [x] 同步方式发送消息 - [x] 异步方式发送消息 - [x] 支持发送普通/顺序/定时延迟消息 - [x] 暂不支持事务消息 - [x] PushConsumer方式消费消息 - [x] SimpleConsumer方式消费消息(开发中) - [x] 支持用tag或SQL92表达式过滤消息 - [x] 支持鉴权和认证 - [x] 支持生产方发送消息时进行流控 ## 发送消息的流控功能说明 生产方发送消息推送到RocketMQ服务器,MQ服务器的承载能力不是无限的。尤其是对于云环境的MQ服务器,一般都有限制MQ服务器集群的收发消息速率, 并且不同的收发消息速率对应不同的套餐费用。比如阿里云上的RocketMQ服务集群,一些专业版限制收发TPS峰值在5000发送,5000接收,如果超出了 限制,整个服务器集群都会暂时受限,生产和消费消息都无法处理。另一方面,从自建服务器的成本来看,MQ服务器集群的规格是固定的,承载能力也是 有限的,为了不把服务器打垮,在业务场景允许的情况下,也要尽量减少TPS大小,减轻服务器负载。由此,发送消息时的速率限流就是一个必选项。 集成的流控是集群的维度,非单机,所以依赖redis做集群速率大小计算。流控计算本身使用了开源的[Bucket4j](https://bucket4j.com)框架, 工作机制是基于令牌桶的限流原理,非常优秀成熟。 Redis配置,限流的大小和开关配置,可以使用任何配置方式或配置中心,只要能和Spring框架集成起来就行。 在框架的样例工程里面,给了两种方式的配置例子:一个是在项目的本地配置文件`application.yml`,一个是使用了Nacos配置中心,与RocketMQ本身都是 在国内用到的较多的技术栈。支持限流大小和开关配置修改后,实时生效。 集成的流控功能完全以约定好的方式运行。约定的规范如下: 1. 业务消息生产方没有做限流配置时,默认在topic维度对topic进行限流,默认限流大小50qps每个topic。 2. 消息生产方的限流配置有两个维度 - topic维度,配置项名字是'.limit'名字;topic加tag维度(最细粒度),配置项名字是'..limit'的拼接。配置项大小就是限流大小(qps)。 3. 消息生产方的限流支持开关配置 - 全局的框架维度,设置`rocketmq-java.producer.enable-rate-limit=false`。默认为true,打开了限流 ```yaml rocketmq-java: producer: enable-rate-limit: false ``` - topic维度,开关配置的名字是'.enabled';topic加tag维度(最细粒度),开关配置的名字是'..enabled'。默认为true打开状态,当配置了false值时会关闭限流。 4. 消息生产方的限流配置动态调整,依赖于配置方式的实现。比如用Nacos配置中心,只需要开启动态刷新即可。 **举例说明:** 假设业务消息的topic和tag如下: ```yaml business: topic: biz_ecommerce tag: create ``` * 若限流配置没有设置'biz_ecommerce.enabled', 'biz_ecommerce.limit', 'biz_ecommerce.create.enabled', 'biz_ecommerce.create.limit'配置项,则当发送biz_ecommerce topic消息或(biz_ecommerce + create) topic&tag消息时,默认的限流在biz_ecommerce topic维度,限流大小是50 qps. * 若限流配置有如下设置, ```yaml biz_ecommerce: # enabled: true # limit: 100 create: # enabled: true limit: 1 ``` 则当发送(biz_ecommerce + create) topic&tag消息时,限流大小就是单独设置的1qps;发送biz_ecommerce topic消息还在biz_ecommerce topic维度限流,大小是50 qps. 若这时有另一个(biz_ecommerce + update) topic&tag消息,则其限流大小和biz_ecommerce topic共享,两者加起来是50qps. * 若限流配置变为如下设置, ```yaml biz_ecommerce: # enabled: true limit: 100 create: enabled: false limit: 1 ``` 则当发送(biz_ecommerce + create) topic&tag消息时就没有了限流。另外发送biz_ecommerce topic消息时限流大小增大到了100qps ## 前提要求 * JDK 1.8及以上 * Maven 3.0及以上 * Spring 2.x 系列版本 ## 使用方法 > 此脚手架版本跟随官方`rocketmq-client-java`版本进行迭代。 `git clone`下载源码,构建打包然后添加Maven依赖, ```xml cn.baiyang.rocketmq rocketmq-java-spring-boot-starter 5.0.6 ``` 主要RocketMQ使用配置(完整见API), ```yaml rocketmq-java: endpoints: {abcd.rmq.aliyuncs.com:8080} accesskey: {accesskey} secretkey: {secretkey} enable-ssl: true producer: topics: {topics seperated with comma} ``` ### Producer 脚手架根据配置会默认提供一个defaultProducer,在代码中直接使用即可;defaultProducer支持发送普通消息、顺序消息、延迟消息,不支持事务消息。 也可以定义自己的Producer,或定义多个。 Spring框架通常会遵循`Spring Messaging`风格,提供一个template来封装支持常见消息发送操作。 这里有两种风格, * 实例级别继承方式-主要是为了简化明了,满足需要 此脚手架提供了一个类似的`AbstractRocketMQSender`封装类,目的是为了 保证发送方一定要提供topic,tag或messageGroup参数; 同时参数最好以应用配置的形式存在。写一个继承该封装类的对象,再通过setProducer方法注入spring初始化好的Producer实例即可。 可以是defaultProducer也可以是其他自定义的。 * Template模版方式 样例配置, ```yaml rocketmq-java: endpoints: {abcd.rmq.aliyuncs.com:8080} accesskey: {accesskey} secretkey: {secretkey} enable-ssl: true producer: topics: {biz_ecom-example} yourbusiness: topic: {biz_ecom-example} tag: {create-example} ``` **样例代码**, * 实例级别继承方式 ```java @Component public class ProducerExample extends AbstractRocketMQSender { protected ProducerExample(@Value("${yourbusiness.topic}") String topic, @Value("${yourbusiness.tag}") String tag) { super(topic, tag); } @Resource @Qualifier("defaultProducer") @Override protected void setProducer(Producer producer) { this.producer = producer; } } ``` * Template模版方式 ```java @Component public class TemplateExample { @Resource private RocketMQProducerTemplate producerTemplate; @Value("${business.topic}") private String topic; @Value("${business.tag}") private String tag; public String testSending(String body) throws ClientException { return producerTemplate.send(topic, tag, body); } } ``` ### Consumer 脚手架提供完全以annotation的方式来完成一个Consumer,支持所有的配置。实现一个Consumer只需要实现`RocketMQListener`接口, 并配置RocketMQMessageListener标注即可。 具体的配置信息见`RocketMQMessageListener`对象。 样例配置(这些配置也都支持在Consumer对象标注上来设置), ```yaml rocketmq-java: endpoints: {abcd.rmq.aliyuncs.com:8080} accesskey: {accesskey} secretkey: {secretkey} enable-ssl: true yourbusiness: topic: {biz_ecom-example} tag: {create-example} consumer-group: {creating-group-example} ``` 样例代码, ```java @Slf4j @Service @RocketMQMessageListener( topic = "${yourbusiness.topic}", consumerGroup = "${yourbusiness.consumer-group}", selectorType = SelectorType.TAG, selectorExpression = "${yourbusiness.tag}" ) public class ListenerExample extends RocketMQListenerHelper implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { log.info("message: {}; --> body: {}", messageView, getBodyAsString(messageView)); //TODO Business Logic Here return ConsumeResult.SUCCESS; } } ``` **注**: RocketMQListenerHelper是一个工具类,提供了从MessageView获取消息体的方法`byte[] getBody(MessageView messageView)` 和`String getBodyAsString(MessageView messageView)`,增加代码复用能力。你也可以不继承这个工具类,完全自己来写。 > 脚手架目前没有提供MessageConverter的自动封装能力,因为这不是核心流程。 > 大家拿到消息体byte[]数组或String字符串对象后,自己可以根据业务上下游定义的数据格式,来对数据做处理,比如json反序列化等。 > 另一方面,如果完全封装后,一则可能仍然需要自己在代码中定义MessageConverter对象;二则如果想要拿到原始的MessageView对象, > 比如看消息的properties等,就不用再做额外配置。 后续再提供MessageConverter的封装。 ## 样例代码 [rocketmq-java-spring-boot-example](rocketmq-java-spring-boot-example) ## 贡献代码 欢迎发起Issue或PR来贡献/共享代码。 ## 许可证 [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation