# springboot-demo-rocketmq **Repository Path**: ithua-springboot/springboot-demo-rocketmq ## Basic Information - **Project Name**: springboot-demo-rocketmq - **Description**: - springboot2.x - rocketMQ - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2019-11-25 - **Last Updated**: 2020-12-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # springboot-demo-rocketmq #### 介绍 - springboot2.x - rocketMQ ### 搭建 `RocketMQ` #### 安装 【参考地址】 - `https://www.imooc.com/article/290089` - `http://www.itmuch.com/books/rocketmq/operation.html` ##### 下载 - 下载地址:`http://rocketmq.apache.org/release_notes/` - 下载命令: ```bash wget https://archive.apache.org/dist/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip ``` ##### 安装 解压压缩包完成安装。 ```bash $ unzip rocketmq-all-4.6.0-bin-release.zip ``` #### 启动 - 切换到 `RockeMQ` 根目录 ```bash $ cd rocketmq-all-4.6.0-bin-release ``` - 启动 `NameServer` ```bash $ nohup sh bin/mqnamesrv & ``` - 验证是否启动成功: ```bash $ tail -f ~/logs/rocketmqlogs/namesrv.log # 如果成功启动,能看到类似如下的日志: 2020-03-01 01:13:46 INFO main - The Name Server boot success. serializeType=JSON ``` - 启动 `Broker` ```bash $ nohup sh bin/mqbroker -n localhost:9876 & ``` - 验证是否启动成功: ```bash $ tail -f ~/logs/rocketmqlogs/broker.log # 如果启动成功,能看到类似如下的日志: 2020-03-01 01:21:14 INFO main - The broker[MBP-weiguohua.local, 192.168.0.101:10911] boot success. serializeType=JSON and name server is localhost:9876 ``` #### 验证 验证 `RocketMQ` 功能是否正常。 ##### 验证生产者 ```bash $ export NAMESRV_ADDR=localhost:9876 $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer # 如若成功,会看到类似如下信息输出 SendResult [sendStatus=SEND_OK, msgId=C0A8006500003764951D004DF97C03E7, ``` ##### 验证消费者 ```bash $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer # 如若成功,会看到类似如下信息输出 ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=1, storeSize=203, ``` #### 停止 按顺序执行如下命令: ```bash $ sh bin/mqshutdown broker # 成功信息: Send shutdown request to mqbroker(61461) OK $ sh bin/mqshutdown namesrv # 成功信息: Send shutdown request to mqnamesrv(61312) OK ``` ### 搭建 `RocketMQ` 控制台 #### 下载控制台代码 ```bash # 命令行下载方式 git clone https://github.com/apache/rocketmq-externals.git # 或者 gitee git@gitee.com:mirrors/RocketMQ-Externals.git ``` ```http # 直接下载方式 https://codeload.github.com/apache/rocketmq-externals/zip/master ``` #### 修改控制台代码 ##### 修改配置 找到 `rocketmq-console/src/main/resources/application.properties` 根据需求修改配置。 ```bash # 管理后台访问上下文路径,默认为空 # 如果填写,需写成/xxx的形式,例如/console server.contextPath= # 控制台的端口 server.port=8080 ... # if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 # Name Server地址 rocketmq.config.namesrvAddr= # if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true #set the message track trace topic if you don't want use the default one rocketmq.config.msgTrackTopicName= rocketmq.config.ticketKey=ticket #Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required rocketmq.config.loginRequired=false ``` **只需修改如下**: ```bash # console端口 server.port=8088 # name server地址 # 也可以不修改,在启动完console后,在控制台导航栏 - 运维 - NameSvrAddrList一栏设置 rocketmq.config.namesrvAddr=localhost:9876 ``` ##### 修改依赖 修改 `rocketmq-console/pom.xml` ,修改 `RocketMQ` 相关依赖的版本。 ```xml 4.4.0 ``` 改成 ```xml 4.6.0 ``` ##### 修改代码 因编译报错要修改代码。 - 修改 `MessageServiceImpl` 的 `queryMessageByTopic` 方法: ```java // rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java#queryMessageByTopic DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null); ``` 改为: ```java // rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java#queryMessageByTopic RPCHook rpcHook = null; DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); ``` - 修改 `MQAdminExtImpl` 类,实现 `MQAdminExt` 的所有接口,不让其报错。 - 取消代码检查 ```xml ``` ##### 打包构建 - 切换到控制台目录 ```bash cd rocketmq-externals/rocketmq-console ``` - 构建 ```bash mvn clean package -DskipTests ``` #### 启动 ```bash java -jar rocketmq-externals/rocketmq-console/target/rocketmq-console-ng-1.0.1.jar ``` #### 访问 ```http http://localhost:8088 ``` ### 整合 `spring-boot` #### 添加依赖 ```groovy implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.0' ``` #### 编写生产者 - 添加配置 ```yml # application.yml 添加 rocketmq: name-server: localhost:9876 producer: # 必须指定group group: test-group ``` - 创建消息体模型 ```java @Data @Accessors(chain = true) public class UserDTO { private String username; private String phoneNumber; private String message; } ``` - 发送消息 ```java @Autowired private RocketMQTemplate rocketMQTemplate; ``` ```java UserMsgDTO userMsgDTO = new UserMsgDTO() .setUsername("韦xx") .setPhoneNumber("188......87") .setMessage("userMsgDTO---message"); rocketMQTemplate.convertAndSend("user-msg", userMsgDTO); // user-msg 是topic ``` #### 编写消费者 - 添加配置 ```yaml # application.yml 添加 rocketmq: name-server: localhost:9876 # 要和生产者服务的一样 ``` - 创建消息体模型(要和生产者的 消息体模型 一样) ```java @Data @Accessors(chain = true) public class UserMsgDTO { private String username; private String phoneNumber; private String message; } ``` - 创建消费者监听器 ```java @Slf4j @Service @RocketMQMessageListener(consumerGroup = "consumer-group", topic = "user-msg") public class UserMsgListener implements RocketMQListener { @Override public void onMessage(UserMsgDTO message) { // 当接收到消息的时候执行的业务 log.debug("用户消息消费者,接收消息: {}", message); } } ```