# springboot-redisson-delayqueue **Repository Path**: zhouquanstudy/springboot-redisson-delayqueue ## Basic Information - **Project Name**: springboot-redisson-delayqueue - **Description**: Spring Boot与Redisson结合实现的延迟队列解决方案,提供高性能、易用的异步任务处理能力。 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2025-06-24 - **Last Updated**: 2025-08-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Spring Boot 项目集成 Redisson 延迟队列 ## 延迟队列应用场景 1. **订单支付超时**:用户下单后30分钟未支付,自动取消订单。 2. **订单评价超时**:订单签收后7天未评价,系统默认好评。 3. **商家接单超时**:下单成功后商家5分钟未接单,订单取消。 4. **配送超时提醒**:配送超时,推送短信提醒。 ## 技术选型分析 针对延迟任务处理机制,主要可选方案有以下四种:定时轮询、Redisson 延迟队列、消息中间件、Redis 过期监听。 ### 1. 定时任务轮询 **机制:** 通过定时任务如 `@Scheduled` 或 Quartz),以固定频率轮询数据库或 Redis,查找已到期的任务并处理。 **优点:**由于springboot原生支持,实现成本低,并且可以任务统一管理 **缺点:** - 处理存在延迟非实时,精度取决于轮询间隔。 - 容易空轮询,浪费 CPU 或数据库资源。 - 不适合高并发或对时效要求高的业务场景。 **适用场景:**小型系统、任务量小、业务容忍较大延迟的场景。 ### 2. Redisson 延迟队列(推荐) **机制说明:** 基于 Redis ZSet 和 List 封装的延迟队列,由 Redisson 实现,支持回调消费。 **优点:** - 接入简单,Redisson 封装完备。 - 实时性较好,精度可达秒级,满足大多数业务需求。 - 可注册不同处理器,业务扩展方便。 - 支持分布式部署,天然适配 Redis 集群环境。 **缺点:**实现依赖 Redisson,同时需要保证Redis 崩溃等情况设计补偿保护机制 **适用场景:**中大型系统、微服务架构下的延迟任务处理。 ### 3. 消息中间件延迟队列( RabbitMQ、Kafka等) **机制:**通过消息中间件的 TTL(消息生存时间)和死信队列机制实现延迟任务,例如 RabbitMQ 的 DLX(死信交换机)或 Kafka 的延迟消费。 **优点:** - 毫秒级精度,适合高并发和对时效性要求高的业务。 - 高可靠性,天然支持异步解耦与分布式处理。 - 支持大规模任务并发调度。 **缺点:** - 配置复杂,需要配置 TTL、DLX 等。 - 引入 MQ 系统,提升系统复杂度与维护成本。 - 对运维能力有一定要求。 **适用场景:**高并发、高可用要求的核心业务,如订单超时关闭、促销活动控制等。 ### 4. Redis Key 过期监听(不推荐) 最初是通过redis的思路来实现延迟队列功能,但是通过查询资料和官方文档发现,==redis并不适合此种场景== **机制:** 通过启用 Redis 的 Keyspace Notification 功能,监听键过期事件(需设置 `notify-keyspace-events` 配置项)。 **官方文档说明:** > Redis 中 Key 的过期事件 `expired` 有两种触发方式: > > - 在访问 Key 时发现其已过期 > - 后台线程定期扫描并删除过期 Key > > 因此,并不能保证在 TTL 恰好归零时立即触发过期事件,也不保证事件一定会触发。 **缺点:** - 不可靠,事件触发不精确,且可能丢失。 - 无法支持分布式监听,Redis 集群环境下存在局限。 - 对核心业务流程不具备可控性。 **适用场景:**临时性、非强一致性场景,如验证码、状态标记清理等。 **参考:** - [Redis 官方文档 - Keyspace Notifications](https://redis.io/docs/latest/develop/use/keyspace-notifications/#timing-of-expired-events) - [掘金文章 - 请勿过度依赖 Redis 的过期监听](https://juejin.cn/post/6844904158227595271) ## 选型建议总结 | 方案 | 实现难度 | 实时性 | 可靠性 | 分布式支持 | 推荐场景 | | ----------------- | -------- | ------ | ------ | ---------- | ------------------------------ | | 定时任务轮询 | 低 | 低 | 中 | 有限 | 简单、低频业务 | | Redisson 延迟队列 | 中 | 中 | 高 | 好 | 分布式、业务量中等、场景标准 | | MQ 延迟队列 | 高 | 高 | 高 | 极好 | 高并发、大量异步、核心任务场景 | | Redis 过期监听 | 低 | 不确定 | 低 | 差 | 非核心场景,缓存状态变更类任务 | ## Redisson 延迟队列实现 ### 项目结构 ``` shell ├── config │ └── RedissonConfig.java # 配置 Redisson 客户端,创建 RedissonClient Bean ├── controller │ └── DeliveryController.java # 提供REST 接口模拟订单创建和收货操作,触发延迟任务 ├── enums │ └── DelayQueueEnum.java # 定义延迟队列的业务枚举及其关联的处理类 ├── hander │ ├── DelayQueueHandler.java # 延迟队列处理器接口,定义 execute 方法 │ ├── EvaluationTimeoutHandler.java # 处理评价超时逻辑的具体实现类 │ └── OrderPaymentTimeoutHandler.java # 处理订单支付超时逻辑的具体实现类 ├── runner │ └── RedisDelayQueueRunner.java # 启动后监听并执行延迟队列任务,使用线程池并发处理 ├── utils │ ├── RedisDelayQueueUtil.java # 封装 Redis 延迟队列的操作方法(添加/获取元素) │ └── SpringUtils.java # 工具类,用于在非 Spring 管理类中获取 Bean └── SpringbootApplication.java # Spring Boot 主类,包含程序入口 main 方法 ``` ### Redis延迟队列工具类 ```java package com.zhou.demo.utils; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingDeque; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; /** * redis延迟队列工具 */ @Slf4j @Component public class RedisDelayQueueUtil { @Resource private RedissonClient redissonClient; /** * 将元素添加到延迟队列中 * * @param queueCode 队列键(用于标识不同的队列) * @param value 要添加到队列中的值(泛型类型) * @param delay 延迟时间(指定元素在队列中延迟被消费的时间) * @param timeUnit 时间单位(与延迟时间配合使用,如秒、毫秒等) */ public void addDelayQueue(String queueCode, T value, long delay, TimeUnit timeUnit) { try { RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode); RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque); delayedQueue.offer(value, delay, timeUnit); log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}秒", queueCode, value, timeUnit.toSeconds(delay)); } catch (Exception e) { log.error("(添加延时队列失败) {}", e.getMessage(), e); throw new RuntimeException("(添加延时队列失败)", e); } } /** * 获取延迟队列中的元素 * * @param queueCode 队列键 * @param 元素类型 * @return 队列中的元素 */ public T getDelayQueue(String queueCode) throws InterruptedException { RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode); return blockingDeque.take(); } } ``` ### Redis延迟队列运行器 用于在Spring Boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。 ```java package com.zhou.demo.runner; import com.zhou.demo.enums.DelayQueueEnum; import com.zhou.demo.hander.DelayQueueHandler; import com.zhou.demo.utils.RedisDelayQueueUtil; import com.zhou.demo.utils.SpringUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Redis延迟队列运行器 * 用于在Spring Boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。 * * @author zhouquan */ @Slf4j @Component public class RedisDelayQueueRunner implements CommandLineRunner { @Resource private RedisDelayQueueUtil redisDelayQueueUtil; /** * 线程池,用于并发监听不同的延迟队列 */ private final ExecutorService executorService = Executors.newCachedThreadPool(); /** * 运行状态标志,控制线程是否持续监听队列 */ private volatile boolean running = true; /** * Spring Boot 启动完成后自动运行的方法 * 遍历所有延迟队列枚举,为每个队列创建一个监听线程 * * @param args 命令行参数 */ @Override public void run(String... args) { for (DelayQueueEnum queueEnum : DelayQueueEnum.values()) { executorService.execute(() -> { log.info("启动延迟队列监听线程:{}", queueEnum.getCode()); while (running) { try { Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode()); DelayQueueHandler handler = SpringUtils.getBean(queueEnum.getBeanClass()); handler.execute(value); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("线程中断:{}", queueEnum.getCode()); } catch (Exception ex) { log.error("延迟队列 [{}] 处理异常:{}", queueEnum.getCode(), ex.getMessage(), ex); } } }); } log.info("所有 Redis 延迟队列监听启动完成"); } /** * 在 Bean 销毁前关闭线程池,释放资源 */ @PreDestroy public void shutdown() { log.info("准备关闭 Redis 延迟队列监听线程池"); running = false; executorService.shutdownNow(); } } ``` ### 业务枚举类 ```java package com.zhou.demo.enums; import com.zhou.demo.hander.EvaluationTimeoutHandler; import com.zhou.demo.hander.OrderPaymentTimeoutHandler; import com.zhou.demo.hander.DelayQueueHandler; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; /** * 延迟队列业务枚举 * * @author 18324 */ @Getter @NoArgsConstructor @AllArgsConstructor public enum DelayQueueEnum { /** * 订单超时 */ ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "订单支付超时", OrderPaymentTimeoutHandler.class), /** * 评价超时 */ EVALUATION_TIMEOUT("evaluation_timeout", "评价超时", EvaluationTimeoutHandler.class); /** * 延迟队列 Redis Key */ private String code; /** * 中文描述 */ private String name; /** * 延迟队列具体业务实现的 Bean * 可通过 Spring 的上下文获取 */ private Class> beanClass; } ``` ### 测试接口类 ```java package com.zhou.demo.enums; import com.zhou.demo.hander.EvaluationTimeoutHandler; import com.zhou.demo.hander.OrderPaymentTimeoutHandler; import com.zhou.demo.hander.DelayQueueHandler; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; /** * 延迟队列业务枚举 * * @author 18324 */ @Getter @NoArgsConstructor @AllArgsConstructor public enum DelayQueueEnum { /** * 订单超时 */ ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "订单支付超时", OrderPaymentTimeoutHandler.class), /** * 评价超时 */ EVALUATION_TIMEOUT("evaluation_timeout", "评价超时", EvaluationTimeoutHandler.class); /** * 延迟队列 Redis Key */ private String code; /** * 中文描述 */ private String name; /** * 延迟队列具体业务实现的 Bean * 可通过 Spring 的上下文获取 */ private Class> beanClass; } ``` ### 延迟队列任务测试 ![image-20250624145034682](https://gitee.com/zhouquanstudy/typora_pic/raw/master/img/image-20250624145034682.png) ![image-20250624145154627](https://gitee.com/zhouquanstudy/typora_pic/raw/master/img/image-20250624145154627.png) # 参考 [1][SpringBoot集成Redisson实现延迟队列_redisson delayedqueue spring-CSDN博客](https://blog.csdn.net/qq_40087415/article/details/115940092)