# redisMQ-spring-boot-starter
**Repository Path**: hadwinling/redisMQ-spring-boot-starter
## Basic Information
- **Project Name**: redisMQ-spring-boot-starter
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-11-12
- **Last Updated**: 2025-11-12
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 简介
消息队列(Message Queue)是一种使用队列(Queue)作为底层存储数据结构,可以用于解决不同进程与应用程序之间通讯的分布式消息容器,也可以称为消息中间件。
目前比较常用的消息队列有ActiveMQ、RabbitMQ、Kafka、RocketMQ、Redis等。
为什么要开发本项目?
因为Redis应用实在是非常广泛,基本上每个项目都离不开它,而常用的消息队列中间件,如RabbitMQ,则有些"笨重";对于中小型项目,显得"大材小用"了,并且需要额外的运维成本;
虽然Redis支持消息队列,但直接使用并不方便,要作很多细节处理,而且代码移植也是个问题……
本项目的优点?
- 开箱即用,你几乎不用添加额外的配置
- 支持消息队列、延时队列,并提供精细化配置参数
- 提供消息确认机制
- 支持发布与订阅模式
- 支持虚拟空间,不同虚拟空间的数据互相隔离
- 支持web控制台,实时查看各个队列的消费情况
# 开始使用
## 引用依赖
springboot3.0以下版本:
```xml
io.github.lengmianshi
redisMQ-spring-boot-starter
1.0.8
org.springframework.boot
spring-boot-starter-data-redis
2.1.0.RELEASE
redis.clients
jedis
2.9.2
com.alibaba
fastjson
1.2.83
```
注:`spring-boot-starter-data-redis`依赖于`spring-data-redis`,如果发生依赖冲突,要确保`spring-data-redis`的版本不低于`2.1.0.RELEASE`,可在你的pom.xml中锁定版本:
```xml
org.springframework.data
spring-data-redis
2.1.2.RELEASE
```
springboot3.0:
```xml
io.github.lengmianshi
redisMQ-spring-boot-starter
2.0.8
org.springframework.boot
spring-boot-starter-data-redis
3.2.1
com.alibaba
fastjson
1.2.83
```
## 配置redis
一般引入redis的项目都会事先配置,如果你的项目没配置过,则可在`application.yml`中加上如下配置:
springboot3.0以下版本:
```yaml
spring:
redis:
host: test.redis.com #改成你的
password: vC403V2KMc0Kghz #改成你的
port: 6379 #改成你的
jedis:
pool:
max-active: 100
max-idle: 10
min-idle: 10
timeout: 2000
# 覆盖重复注册的bean
main:
allow-bean-definition-overriding: true
```
springboot3.0:
```yaml
spring:
data:
redis:
host: test.redis.com #改成你的
password: vC403V2KMc0Kghz #改成你的
port: 6379 #改成你的
jedis:
pool:
max-active: 100
max-idle: 10
min-idle: 10
timeout: 2000
# 覆盖重复注册的bean
main:
allow-bean-definition-overriding: true
```
## 消息队列
### 生产者发送消息
```java
@Autowired
private RedisQueueTemplate redisQueueTemplate;
/**
* 1次只发送一条消息
*/
@Test
public void test1() {
JSONObject message = new JSONObject();
message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");
message.put("year", 2022);
redisQueueTemplate.sendMessage("test_queue", message);
}
/**
* 批量发送消息
*/
@Test
public void test2() {
List messageList = new ArrayList<>();
for (int i = 0; i < 5000; i++) {
JSONObject mess = new JSONObject();
mess.put("index", i);
messageList.add(mess);
}
redisQueueTemplate.sendMessageAll("test_queue", messageList);
}
```
注:示例中每条MQ消息都用`JSONObject`包装,这只是我的个人习惯,你也可以使用实体类
### 消费者消费消息
消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:
```java
@Component
public class QueueConsumer {
//使用默认参数
@RedisQueueListener(queue = "test_queue")
public void test(JSONObject message){
System.out.println(message);
}
//指定单个实例下使用5个消费线程
@RedisQueueListener(queue = "test_queue2", consumers = 5)
public void test2(JSONObject message){
System.out.println(message);
}
//单个实例5个线程,手动确认
@RedisQueueListener(queue = "test_queue3", consumers = 5, autoAck = false)
public void test3(JSONObject message){
System.out.println(message);
}
//消费失败尝试3次,每次间隔500毫秒
@RedisQueueListener(queue = "test_queue4", maxAttempts = 3, backOffPeriod = 500)
public void test4(JSONObject message){
System.out.println(message);
}
//消费失败后重新入队
@RedisQueueListener(queue = "test_queue4", maxAttempts = 3, backOffPeriod = 500, requeue = true)
public void test5(JSONObject message){
System.out.println(message);
}
}
```
`@RedisQueueListener`注解支持的所有参数:
```java
package com.leng.project.redisqueue.annotation;
import java.lang.annotation.*;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisQueueListener {
/**
* 队列名
*
* @return
*/
String queue() default "";
/**
* 消费者线程数
*
* @return
*/
int consumers() default 1;
/**
* 是否自动确认
*
* @return
*/
boolean autoAck() default true;
/**
* 一次从队列中取多少数据
*
* @return
*/
int prefetch() default 100;
/**
* 获取消息的频率,单位秒
*
* @return
*/
long frequency() default 2;
/**
* 失败重试时最多执行的次数,至少为1
*
* @return
*/
int maxAttempts() default 1;
/**
* 失败重试时间隔时长,毫秒,必须大于0
*
* @return
*/
long backOffPeriod() default 3000;
/**
* 消费失败时是否重新入队
* @return
*/
boolean requeue() default false;
}
```
其中:
- consumers:单个实例下启动多少个消费线程,默认为1
- autoAck:是否自动确认消息,默认为true。自动确认与手动确认的区别:
- 自动确认:消费线程从队列中取出消息,如果消费失败,则该条消息丟失
- 手动确认:消费线程从队列中取出消息,并将消息写入待确认队列中;如果消费失败,则一段时间后(15分钟)会重新入队,消费端要做幂等性处理
- prefetch:一个消费线程一次性从队列中取出多少条消息,因为涉及锁的竞争,不宜过小,默认为50
- frequency:单个消费线程每隔多少秒获取一次消息,默认为2,最小值为1。有人可能会奇怪,消息不是应该即时消费吗?不是越快越好吗?实际上,有些业务场景对消息的实时性要求很低,几天、几个月、甚至一年才执行一次,这时我们完全可以把frequency调大,以减轻redis的压力
- maxAttempts:失败重试时最多执行的次数,至少为1
- requeue:当消费失败时,是否将消息重新加入队列中。如果为true,则消息会不断投递,直到消费成功为止
## 延时队列
延时队列的常用场景如用户下单,xx分钟后没有支付则自动关闭订单;已支持的订单,xxx天后自动确认收货等。
### 生产者发送消息
```java
@Autowired
private RedisQueueTemplate redisQueueTemplate;
/**
* 1次只发送1条消息
*/
public void test1(){
JSONObject message = new JSONObject();
message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");
message.put("year", 2022);
//延时5秒
redisQueueTemplate.sendDelayMessage("test_delay_queue", message, 5, TimeUnit.SECONDS);
}
/**
* 批量发送,每条消息的延时时长一样
*/
public void test2(){
List messageList = new ArrayList<>();
for (int i = 0; i < 5000; i++) {
JSONObject mess = new JSONObject();
mess.put("index", i);
messageList.add(mess);
}
//延时5秒
redisQueueTemplate.sendDelayMessageAll(queue, messageList, 5, TimeUnit.SECONDS);
}
/**
* 批量发送,每条消息的延时时长各不相同
*/
public void test3(){
List messageList=new ArrayList<>();
for(int i=0; i< 5000; i++){
JSONObject mess=new JSONObject();
mess.put("index",i);
//每条消息可以使用不同的延时时长,这里为了简便,统一写成5了
DelayMessageParam param=new DelayMessageParam(mess,5,TimeUnit.SECONDS);
messageList.add(param);
}
redisQueueTemplate.sendDelayMessageAll(queue,messageList);
}
```
注:示例中每条MQ消息都用`JSONObject`包装,这只是我的个人习惯,你也可以使用实体类
### 消费者消费消息
`@RedisDelayQueueListener`注解的参数与`@RedisQueueListener`完全相同;消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:
```java
@Component
public class DelayQueueConsumer {
/**
* 使用默认参数
* @param message
*/
@RedisDelayQueueListener(queue = "test_delay_queue")
public void test(JSONObject message){
System.out.println(message);
}
/**
* 单个实例5个消费线程
* @param message
*/
@RedisDelayQueueListener(queue = "test_delay_queue2", consumers = 5)
public void test2(JSONObject message){
System.out.println(message);
}
/**
* 单个实例5个消费线程,手动确认
* @param message
*/
@RedisDelayQueueListener(queue = "test_delay_queue3", consumers = 5, autoAck = false)
public void test3(JSONObject message){
System.out.println(message);
}
//消费失败尝试3次,每次间隔500毫秒
@RedisDelayQueueListener(queue = "test_delay_queue4", maxAttempts = 3, backOffPeriod = 500)
public void test4(JSONObject message){
System.out.println(message);
}
//消费失败后重新入队
@RedisDelayQueueListener(queue = "test_delay_queue5", maxAttempts = 3, backOffPeriod = 500, requeue = true)
public void test5(JSONObject message){
System.out.println(message);
}
}
```
## 发布与订阅模式
发布者向某个channel发布消息,所有订阅了该channel的消费端均能接受消息
### 消费端
使用`@RedisSubscribeListener`修饰某个方法,即可定义一个消费端,其中`channel`表示要订阅的频道:
```java
import com.alibaba.fastjson.JSONObject;
import com.leng.project.redisqueue.annotation.RedisSubscribeListener;
import org.springframework.stereotype.Component;
@Component
public class ChannelConsumer {
//带参数
@RedisSubscribeListener(channel = "test_channel")
public void test(JSONObject message) {
System.out.println("test_channel:" + message);
}
//不带参数
@RedisSubscribeListener(channel = "test_channel_02")
public void test() {
System.out.println("test_channel_02");
}
}
```
channel还可以使用通配符:
```java
import com.alibaba.fastjson.JSONObject;
import com.leng.project.redisqueue.annotation.RedisSubscribeListener;
import com.leng.project.redisqueue.utils.SubscribeUtils;
import org.springframework.stereotype.Component;
@Component
public class ChannelConsumer {
@RedisSubscribeListener(channel = "test_channel*")
public void test2() {
System.out.println("test_channel*:" + SubscribeUtils.currentChannel());
}
}
```
使用通配符后,如果想确切地知道是哪个channel产生的消息,可以使用`SubscribeUtils.currentChannel()`
### 生产者
```java
@Autowired
private RedisQueueTemplate redisQueueTemplate;
public String testSendChannelMessage(){
JSONObject message = new JSONObject();
message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");
message.put("year", 2022);
//带参数
redisQueueTemplate.sendChannelMessage("test_channel", message);
//不带参数
redisQueueTemplate.sendChannelMessage("test_channel_02");
return "OK";
}
```
## 虚拟空间
参考了RabbitMQ的设计。虚拟空间很有必要,例如,开发环境和测试环境的数据如果没有隔离,在调试时被测试环境的消费端干扰。
配置虚拟空间:
```yaml
queue:
virtual-host: /dev #默认为 /
```
## Web管理平台
浏览器访问:,默认的账号密码为admin/admin
配置账号:
```yaml
queue:
console:
#是否启用web控制台
enable: true
username: admin #登录用户名
password: 123456 #密码
```
登录成功后的界面,可查看所有虚拟空间的队列及消费情况:

注:如果你的系统使用了权限控制框架,如shiro、spring-security等,则需要对如下3个资源放行:
- /queue.html
- /queue/**
- /static/**