# rocketmq-template-demo
**Repository Path**: hgw689/rocketmq-template-demo
## Basic Information
- **Project Name**: rocketmq-template-demo
- **Description**: rocketmq-template发送各种消息demo
- **Primary Language**: Unknown
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 2
- **Forks**: 1
- **Created**: 2023-02-22
- **Last Updated**: 2024-02-20
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
[文档详情链接](https://blog.csdn.net/m0_49183244/article/details/129169326?spm=1001.2014.3001.5501),如果能为您提供帮助不妨Star一下吧
## 一、前置工作
1、导入依赖
```xml
org.apache.rocketmq
rocketmq-spring-boot-starter
2.1.1
```
2、编写配置
~~~yaml
rocketmq:
name-server: 47.96.232.192:9876
producer:
# 生产组名
group: demoGroup
# 消息发送超时时间
send-message-timeout: 3000
# 消息体阈值,4k以上会压缩
compress-message-body-threshold: 4096
# 在同步模式下发送失败之前在内部执行的最大重试次数。
retry-times-when-send-failed: 3
# 在异步模式下发送失败之前在内部执行的最大重试次数。
retry-times-when-send-async-failed: 3
# 消息阈值,最大4MB,在 4KB 之内性能最佳
max-message-size: 4096
~~~
## 二、消费者
实现RocketMQListener接口,使用 `@RocketMQMessageListener` 注册监听,需指定消费者组和Topic。
```java
@Component
@RocketMQMessageListener(
consumerGroup = "demo-consumer-group", // consumerGroup:消费者组名
topic = "Demo", // topic:订阅的主题
selectorExpression = "*", // selectorExpression:控制可以选择的消息,可以使用SelectorType.SQL92语法。设置为 * 时,表示全部。
messageModel = MessageModel.CLUSTERING // messageModel: 控制消息模式。MessageModel.CLUSTERING:负载均衡;MessageModel.BROADCASTING:广播模式
)
public class MqConsumerListener implements RocketMQListener {
@Override
public void onMessage(String message) {
System.out.println("消费消息-" + message);
}
}
```
## 三、生产者
### 1. 普通消息
普通消息无返回值,只负责发送消息⽽不等待服务器回应且没有回调函数触发。
~~~java
@Override
public void convertAndSend(D destination, Object payload) throws MessagingException {
convertAndSend(destination, payload, (Map) null);
}
~~~
> 发送一个普通消息吧~
```java
@SpringBootTest
class RocketmqTemplateDemoApplicationTests {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 普通消息无返回值,只负责发送消息⽽不等待服务器回应且没有回调函数触发
* - 参数一:topicName:tags,主题:标签,可单Topic不指定Tag
* - 参数二:消息体
*/
@Test
public void sendBaseMsg() {
rocketMQTemplate.convertAndSend("Demo:base","普通消息测试");
}
}
```
### 2. 过滤消息
convertAndSend还有另外一个可携带属性的重载方法,可以通过给消息携带属性的方式,消费者利用sql92的方式实现消息过滤~
```java
@Override
public void convertAndSend(D destination, Object payload, @Nullable Map headers)
throws MessagingException {
convertAndSend(destination, payload, headers, null);
}
```
> 发送携带属性的消息吧~
发送若干条消息携带属性a,属性值分别为0~9。消费者消息过滤仅消费携带属性a且属性值在[6,9]范围内的(包含)
```java
/**
* 过滤消息
*/
@GetMapping("/filter")
public void sendFilterMsg() {
for (int i = 0; i < 10; i++) {
HashMap harder = new HashMap<>(1);
harder.put("a", String.valueOf(i));
rocketMQTemplate.convertAndSend("Demo:filter","过滤消息测试" + i, harder);
}
}
```
通过sql92过滤消息
1、修改选择消息模式为 SelectorType.SQL92,默认模式是SelectorType.TAG。
~~~java
selectorType = SelectorType.SQL92
~~~
2、编写过滤sql:
~~~java
selectorExpression = "a BETWEEN 6 AND 9"
~~~
```java
@Component
@RocketMQMessageListener(
consumerGroup = "demo-consumer-group", // consumerGroup:消费者组名
topic = "Demo", // topic:订阅的主题
selectorType = SelectorType.SQL92, // selectorType:那种模式选择消息
selectorExpression = "a BETWEEN 6 AND 9", // selectorExpression:控制可以选择的消息,可以使用SelectorType.SQL92语法。设置为 * 时,表示全部。
messageModel = MessageModel.CLUSTERING // messageModel: 控制消息模式。MessageModel.CLUSTERING:负载均衡;MessageModel.BROADCASTING:广播模式
)
public class MqConsumerListener implements RocketMQListener