# rocketmq-java-spring-boot **Repository Path**: zhanglei/rocketmq-java-spring-boot ## Basic Information - **Project Name**: rocketmq-java-spring-boot - **Description**: RocketMQ 5.x版本的spring boot starter脚手架,基于新的gRpc/protobuf协议。 https://rocketmq.apache.org/zh/docs/sdk/01overview - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 9 - **Created**: 2023-09-23 - **Last Updated**: 2023-09-23 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 背景 RocketMQ client有两个版本,4.x及以下版本是基于remoting协议,连接端点为'name server';当前的5.x版本是基于新的gRpc/protobuf协议, 连接端点为'proxy server',适合于新的存储和计算分离架构。 RocketMQ官方已有一个spring boot starter脚手架,是基于remoting协议版本的client, 由于5.x版本与4.x及以下版本API不同,故不适用5.x版本。 在开发此项目时官方没有维护5.x系列的spring boot starter脚手架,因工作需要开发了此`rocketmq-java-spring-boot-starter`脚手架。只适用于5.x版本 RocketMQ,若在用老版本的服务,请仍使用老版本的脚手架。 ## 功能 - [x] 同步方式发送消息 - [x] 异步方式发送消息 - [x] 支持发送普通/顺序/定时延迟消息 - [x] 暂不支持事务消息 - [x] PushConsumer方式消费消息 - [x] SimpleConsumer方式消费消息(开发中) - [x] 支持用tag或SQL92表达式过滤消息 - [x] 支持鉴权和认证 ## 前提要求 * JDK 1.8及以上 * Maven 3.0及以上 * Spring 2.x 系列版本 ## 使用方法 > 此脚手架版本跟随官方`rocketmq-client-java`版本进行迭代。 `git clone`下载源码,构建打包然后添加Maven依赖, ```xml cn.baiyang.rocketmq rocketmq-java-spring-boot-starter 5.0.5 ``` 主要RocketMQ使用配置(完整见API), ```yaml rocketmq-java: endpoints: {abcd.rmq.aliyuncs.com:8080} accesskey: {accesskey} secretkey: {secretkey} enable-ssl: true producer: topics: {topics seperated with comma} ``` ### Producer 脚手架根据配置会默认提供一个defaultProducer,在代码中直接使用即可;defaultProducer支持发送普通消息、顺序消息、延迟消息,不支持事务消息。 也可以定义自己的Producer,或定义多个。 Spring框架通常会遵循`Spring Messaging`风格,提供一个template来封装支持常见消息发送操作;这里没有遵循此风格,主要是为了简化明了,符合实际 需要最简单;且template里面封装了太多一般不需要的API,看起来费事费力。此脚手架提供了一个类似的`AbstractRocketMQSender`封装类,目的是为了 保证发送方一定要提供topic,tag或messageGroup参数;同时参数最好以应用配置的形式存在。写一个继承该封装类的对象,再通过setProducer方法注入 spring初始化好的Producer实例即可。可以是defaultProducer也可以是其他自定义的。 样例配置, ```yaml rocketmq-java: endpoints: {abcd.rmq.aliyuncs.com:8080} accesskey: {accesskey} secretkey: {secretkey} enable-ssl: true producer: topics: {biz_ecom-example} yourbusiness: topic: {biz_ecom-example} tag: {create-example} ``` 样例代码, ```java import cn.baiyang.rocketmq.spring.core.AbstractRocketMQSender; import javax.annotation.Resource; import org.apache.rocketmq.client.apis.producer.Producer; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class ProducerExample extends AbstractRocketMQSender { protected ProducerExample(@Value("${yourbusiness.topic}") String topic, @Value("${yourbusiness.tag}") String tag) { super(topic, tag); } @Resource @Qualifier("defaultProducer") @Override protected void setProducer(Producer producer) { this.producer = producer; } } ``` ### Consumer 脚手架提供完全以annotation的方式来完成一个Consumer,支持所有的配置。实现一个Consumer只需要实现`RocketMQListener`接口, 并配置RocketMQMessageListener标注即可。 具体的配置信息见`RocketMQMessageListener`对象。 样例配置(这些配置也都支持在Consumer对象标注上来设置), ```yaml rocketmq-java: endpoints: {abcd.rmq.aliyuncs.com:8080} accesskey: {accesskey} secretkey: {secretkey} enable-ssl: true yourbusiness: topic: {biz_ecom-example} tag: {create-example} consumer-group: {creating-group-example} ``` 样例代码, ```java import cn.baiyang.rocketmq.spring.annotation.RocketMQMessageListener; import cn.baiyang.rocketmq.spring.annotation.SelectorType; import cn.baiyang.rocketmq.spring.core.RocketMQListener; import cn.baiyang.rocketmq.spring.core.RocketMQListenerHelper; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.message.MessageView; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; @Slf4j @Service @RocketMQMessageListener( topic = "${yourbusiness.topic}", consumerGroup = "${yourbusiness.consumer-group}", selectorType = SelectorType.TAG, selectorExpression = "${yourbusiness.tag}" ) public class ListenerExample extends RocketMQListenerHelper implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { log.info("message: {}; --> body: {}", messageView, getBodyAsString(messageView)); //TODO Business Logic Here return ConsumeResult.SUCCESS; } } ``` **注**: RocketMQListenerHelper是一个工具类,提供了从MessageView获取消息体的方法`byte[] getBody(MessageView messageView)` 和`String getBodyAsString(MessageView messageView)`,增加代码复用能力。你也可以不继承这个工具类,完全自己来写。 > 脚手架目前没有提供MessageConverter的自动封装能力,因为这不是核心流程。 > 大家拿到消息体byte[]数组或String字符串对象后,自己可以根据业务上下游定义的数据格式,来对数据做处理,比如json反序列化等。 > 另一方面,如果完全封装后,一则可能仍然需要自己在代码中定义MessageConverter对象;二则如果想要拿到原始的MessageView对象, > 比如看消息的properties等,就不用再做额外配置。 后续再提供MessageConverter的封装。 ## 样例代码 [rocketmq-java-spring-boot-example](rocketmq-java-spring-boot-example) ## 贡献代码 欢迎发起Issue或PR来贡献/共享代码。 ## 许可证 [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation