# springboot-demo-rocketmq
**Repository Path**: ithua-springboot/springboot-demo-rocketmq
## Basic Information
- **Project Name**: springboot-demo-rocketmq
- **Description**: - springboot2.x
- rocketMQ
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2019-11-25
- **Last Updated**: 2020-12-20
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# springboot-demo-rocketmq
#### 介绍
- springboot2.x
- rocketMQ
### 搭建 `RocketMQ`
#### 安装
【参考地址】
- `https://www.imooc.com/article/290089`
- `http://www.itmuch.com/books/rocketmq/operation.html`
##### 下载
- 下载地址:`http://rocketmq.apache.org/release_notes/`
- 下载命令:
```bash
wget https://archive.apache.org/dist/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip
```
##### 安装
解压压缩包完成安装。
```bash
$ unzip rocketmq-all-4.6.0-bin-release.zip
```
#### 启动
- 切换到 `RockeMQ` 根目录
```bash
$ cd rocketmq-all-4.6.0-bin-release
```
- 启动 `NameServer`
```bash
$ nohup sh bin/mqnamesrv &
```
- 验证是否启动成功:
```bash
$ tail -f ~/logs/rocketmqlogs/namesrv.log
# 如果成功启动,能看到类似如下的日志:
2020-03-01 01:13:46 INFO main - The Name Server boot success. serializeType=JSON
```
- 启动 `Broker`
```bash
$ nohup sh bin/mqbroker -n localhost:9876 &
```
- 验证是否启动成功:
```bash
$ tail -f ~/logs/rocketmqlogs/broker.log
# 如果启动成功,能看到类似如下的日志:
2020-03-01 01:21:14 INFO main - The broker[MBP-weiguohua.local, 192.168.0.101:10911] boot success. serializeType=JSON and name server is localhost:9876
```
#### 验证
验证 `RocketMQ` 功能是否正常。
##### 验证生产者
```bash
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 如若成功,会看到类似如下信息输出
SendResult [sendStatus=SEND_OK, msgId=C0A8006500003764951D004DF97C03E7,
```
##### 验证消费者
```bash
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 如若成功,会看到类似如下信息输出
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=1, storeSize=203,
```
#### 停止
按顺序执行如下命令:
```bash
$ sh bin/mqshutdown broker # 成功信息: Send shutdown request to mqbroker(61461) OK
$ sh bin/mqshutdown namesrv # 成功信息: Send shutdown request to mqnamesrv(61312) OK
```
### 搭建 `RocketMQ` 控制台
#### 下载控制台代码
```bash
# 命令行下载方式
git clone https://github.com/apache/rocketmq-externals.git
# 或者 gitee
git@gitee.com:mirrors/RocketMQ-Externals.git
```
```http
# 直接下载方式
https://codeload.github.com/apache/rocketmq-externals/zip/master
```
#### 修改控制台代码
##### 修改配置
找到 `rocketmq-console/src/main/resources/application.properties` 根据需求修改配置。
```bash
# 管理后台访问上下文路径,默认为空
# 如果填写,需写成/xxx的形式,例如/console
server.contextPath=
# 控制台的端口
server.port=8080
...
# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
# Name Server地址
rocketmq.config.namesrvAddr=
# if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket
#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=false
```
**只需修改如下**:
```bash
# console端口
server.port=8088
# name server地址 # 也可以不修改,在启动完console后,在控制台导航栏 - 运维 - NameSvrAddrList一栏设置
rocketmq.config.namesrvAddr=localhost:9876
```
##### 修改依赖
修改 `rocketmq-console/pom.xml` ,修改 `RocketMQ` 相关依赖的版本。
```xml
4.4.0
```
改成
```xml
4.6.0
```
##### 修改代码
因编译报错要修改代码。
- 修改 `MessageServiceImpl` 的 `queryMessageByTopic` 方法:
```java
// rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java#queryMessageByTopic
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
```
改为:
```java
// rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java#queryMessageByTopic
RPCHook rpcHook = null;
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
```
- 修改 `MQAdminExtImpl` 类,实现 `MQAdminExt` 的所有接口,不让其报错。
- 取消代码检查
```xml
```
##### 打包构建
- 切换到控制台目录
```bash
cd rocketmq-externals/rocketmq-console
```
- 构建
```bash
mvn clean package -DskipTests
```
#### 启动
```bash
java -jar rocketmq-externals/rocketmq-console/target/rocketmq-console-ng-1.0.1.jar
```
#### 访问
```http
http://localhost:8088
```
### 整合 `spring-boot`
#### 添加依赖
```groovy
implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.0'
```
#### 编写生产者
- 添加配置
```yml
# application.yml 添加
rocketmq:
name-server: localhost:9876
producer:
# 必须指定group
group: test-group
```
- 创建消息体模型
```java
@Data
@Accessors(chain = true)
public class UserDTO {
private String username;
private String phoneNumber;
private String message;
}
```
- 发送消息
```java
@Autowired
private RocketMQTemplate rocketMQTemplate;
```
```java
UserMsgDTO userMsgDTO = new UserMsgDTO()
.setUsername("韦xx")
.setPhoneNumber("188......87")
.setMessage("userMsgDTO---message");
rocketMQTemplate.convertAndSend("user-msg", userMsgDTO); // user-msg 是topic
```
#### 编写消费者
- 添加配置
```yaml
# application.yml 添加
rocketmq:
name-server: localhost:9876 # 要和生产者服务的一样
```
- 创建消息体模型(要和生产者的 消息体模型 一样)
```java
@Data
@Accessors(chain = true)
public class UserMsgDTO {
private String username;
private String phoneNumber;
private String message;
}
```
- 创建消费者监听器
```java
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "user-msg")
public class UserMsgListener implements RocketMQListener {
@Override
public void onMessage(UserMsgDTO message) {
// 当接收到消息的时候执行的业务
log.debug("用户消息消费者,接收消息: {}", message);
}
}
```