# spring-boot-rocketmq-demo
**Repository Path**: hi-liyan/spring-boot-rocketmq-demo
## Basic Information
- **Project Name**: spring-boot-rocketmq-demo
- **Description**: RocketMQ 快速开始
- **Primary Language**: Unknown
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: https://www.shiguangping.com/spring-boot-rocketmq.html
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-06-29
- **Last Updated**: 2022-06-13
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# spring-boot-rocketmq-demo
## 简介
本文将介绍Spring Boot整合RocketMQ,完成消息生产与消费。
博客原文:[RocketMQ 快速开始](https://www.shiguangping.com/spring-boot-rocketmq.html)
## 仓库
示例代码托管在Gitee:[spring-boot-rocketmq-demo 项目地址](https://gitee.com/ENNRIAAA/spring-boot-rocketmq-demo.git)
## Quict Start
1. 创建消息生产者项目`mq-producer`,并添加依赖。
```xml
org.apache.rocketmq
rocketmq-spring-boot-starter
2.2.0
```
2. 写配置,`application.yml`。
```yaml
server:
port: 8001
spring:
application:
name: mq-producer
rocketmq:
# namesrv地址
name-server: 121.196.189.156:9876
# 生产者
producer:
# 组
group: mq-producer-group
# 发送超时
send-message-timeout: 30000
producer:
demo:
topic: rmq-test-topic
```
3. 写代码。
`RocketMQTemplate`提供了操作RocketMQ的大多数方法。
```java
/**
* 单独封装一层发送mq的工具类,如实现发送同步消息、异步消息、延迟消息等
*/
@Slf4j
@Component
public class RocketMqProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public boolean sendMessage(String topic, String msg) {
log.info("mq发送开始,msg:{}", msg);
SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
log.info("mq发送结束:{}", sendResult);
return "SEND_OK".equals(sendResult.getSendStatus().name()) ? true : false;
}
}
```
调用工具类发送消息,发送消息必须指定`topic`(消息主题)。
```java
/**
* 业务上要发送的mq都定义到这里
*/
@Slf4j
@Component
public class MqProducer {
@Autowired
private RocketMqProducer rocketMqProducer;
@Value("${producer.demo.topic}")
private String testTopic;
public boolean test(String msg) {
return rocketMqProducer.sendMessage(testTopic, msg);
}
}
```
创建TestController,用于后面测试将请求参数中的字符串作为消息发送给消费者。
```java
@RestController
@RequestMapping("test")
public class TestController {
@Autowired
private MqProducer mqProducer;
@GetMapping("msg")
public String test(String msg) {
return mqProducer.test(msg) ? "Send Success" : "Send Fail";
}
}
```
4. 创建消息消费者项目`mq-consumer`,并添加如上依赖。
5. 写配置,`application.yml`。
```yaml
server:
port: 8002
spring:
application:
name: mq-consumer
rocketmq:
name-server: 121.196.189.156:9876
consumer:
group: mq-consumer-group
consumer:
demo:
topic: rmq-test-topic
group: rmq-test-group
```
6. 写代码。
创建`RocketMQListener`接口的实现类,并添加`@RocketMQMessageListener`注解,用于监听某一`topic`消息。
```java
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${consumer.demo.group}", topic = "${consumer.demo.topic}", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.CLUSTERING, consumeTimeout = 60000L)
public class TestMessage implements RocketMQListener {
@Override
public void onMessage(String message) {
log.info("消息消费成功,msg:{}", message);
}
}
```
7. 测试。
使用Postman调用`mq-producer`中定义的测试接口,观察消费者是否能监听到生产者发送的消息。

消费者收到消息并打印日志。
