# delay-queue **Repository Path**: Boy-NXK/delay-queue ## Basic Information - **Project Name**: delay-queue - **Description**: 高精度延迟队列服务 - 基于 Spring Boot 3 + Netty 时间轮 核心特性: ⚡ 10ms 高精度调度(Netty HashedWheelTimer) 💾 Redis 高速调度 + MySQL 永久存储 🔄 失败自动重试(最多3次) 📊 完整任务日志查询 🚀 开箱即用,轻量部署 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2025-12-28 - **Last Updated**: 2025-12-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 延迟队列服务 (Delay Queue Service) 基于 Spring Boot 3 + Java 21 的高精度延迟消息队列服务,支持毫秒级延迟任务调度。 ## 技术栈 - **Java 21** - 开发语言 - **Spring Boot 3.2.0** - 应用框架 - **Spring Data Redis** - Redis 操作 - **MySQL + MyBatis-Plus** - 任务持久化与日志查询 - **Fastjson2** - JSON 序列化 - **Netty HashedWheelTimer** - 高精度延迟调度引擎 - **Logback** - 日志管理 ## 核心特性 - ✅ **高精度调度**:基于 Netty 的 HashedWheelTimer 实现,精度可达 10ms - ✅ **双重持久化**:Redis 高性能调度 + MySQL 可靠存储,任务日志永久保存 - ✅ **自动重试**:任务失败自动重试(最多3次) - ✅ **任务恢复**:服务启动时自动恢复未完成的任务 - ✅ **HTTP 回调**:任务到期后自动调用客户端提供的回调地址 - ✅ **日志查询**:通过 MySQL 存储完整任务日志,方便追踪和统计分析 ## 快速开始 ### 1. 环境要求 - JDK 21+ - Maven 3.6+ - Redis 5.0+ - MySQL 5.7+ / 8.0+ ### 2. 数据库初始化 **创建数据库:** ```sql CREATE DATABASE delay_queue DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; ``` **执行建表脚本:** 运行 `src/main/resources/sql/delay_task.sql` 创建任务表。 ### 3. 配置文件 编辑 `src/main/resources/application.yml`: ```yaml server: port: 9001 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/delay_queue?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false username: root password: root data: redis: host: 192.168.0.19 port: 6379 password: test database: 0 delay: env: test recovery-range-ms: 300000 # 恢复未来5分钟内的任务 ``` ### 4. 启动服务 **开发环境:** ```bash mvn spring-boot:run ``` **生产环境:** ```bash # 打包 mvn clean package # 运行 java -jar target/delay-queue-1.0-SNAPSHOT.jar ``` ## API 使用说明 ### 提交延迟任务 **接口:** `POST /delay` **请求示例:** ```json { "fromSever": "order-service", "msgId": "order_12345", "eventName": "order-timeout", "delayMills": 30000, "backUrl": "http://your-service/callback", "msg": { "orderId": "12345", "userId": "user001" } } ``` **响应示例:** ```json { "success": true } ``` ### 查询任务列表 **接口:** `GET /tasks` **请求参数:** - `page`: 页码(默认1) - `size`: 每页数量(默认10) - `fromSever`: 来源服务名(可选) - `status`: 任务状态(可选,0-待执行, 1-执行中, 2-已完成, 3-失败) **请求示例:** ``` GET /tasks?page=1&size=10&fromSever=order-service&status=0 ``` **响应示例:** ```json { "records": [ { "taskId": "order-service_order_12345", "fromSever": "order-service", "msgId": "order_12345", "eventName": "order-timeout", "status": 2, "submitTime": "2025-12-17 10:00:00", "completeTime": "2025-12-17 10:00:30" } ], "total": 100, "size": 10, "current": 1 } ``` ### 查询任务详情 **接口:** `GET /task/{taskId}` **请求示例:** ``` GET /task/order-service_order_12345 ``` ### 字段说明 | 字段 | 类型 | 必填 | 说明 | |------|------|------|------| | fromSever | String | 是 | 来源服务名(每个服务需要唯一) | | msgId | String | 是 | 消息ID(客户端唯一ID) | | eventName | String | 是 | 事件名称 | | delayMills | Long | 是 | 延迟执行的毫秒数 | | backUrl | String | 是 | 回调地址(HTTP POST) | | msg | Map | 否 | 业务数据 | ### 接收回调 当任务到期时,服务会向 `backUrl` 发送 POST 请求,请求体为完整的任务信息。 **回调示例(Spring Boot):** ```java @PostMapping("/callback") public String handleDelayTask(@RequestBody DelayMsg msg) { // 处理到期的延迟任务 logger.info("收到延迟任务回调: {}", msg.getEventName()); // 执行你的业务逻辑... return "OK"; } ``` ## 任务状态 - `PENDING(0)` - 等待执行 - `PROCESSING(1)` - 执行中 - `COMPLETED(2)` - 已完成 - `FAILED(3)` - 已失败(重试3次后仍失败) ## Redis 数据结构 ### 任务详情 - **类型:** String - **Key:** `{env}:delay:tasks:{taskId}` - **Value:** 任务的 JSON 字符串 - **示例:** `test:delay:tasks:order-service_12345` ### 任务索引 - **类型:** Sorted Set - **Key:** `{env}:delay:task_ids` - **Member:** taskId - **Score:** 执行时间戳(毫秒) ### 失败任务索引 - **类型:** Sorted Set - **Key:** `{env}:delay:fail_task_ids` ## 项目结构 ``` src/main/java/delay/queue/ ├── ServerMain.java # Spring Boot 启动类 ├── config/ │ └── DelayQueueProperties.java # 配置属性类 ├── controller/ │ └── DelayController.java # REST API 控制器 ├── entity/ │ └── DelayTaskEntity.java # 数据库实体 ├── mapper/ │ └── DelayTaskMapper.java # MyBatis-Plus Mapper ├── model/ │ ├── DelayMsg.java # 延迟消息模型 │ ├── TaskStatus.java # 任务状态常量 │ └── SubmitRes.java # 提交响应模型 └── service/ └── HighPrecisionDelayService.java # 核心延迟服务 ``` ## 工作原理 1. **任务提交**:客户端通过 HTTP POST 提交任务到 `/delay` 接口 2. **双重持久化**: - MySQL:永久存储任务数据,记录完整生命周期 - Redis:使用 ZSet 记录执行时间,用于快速调度 3. **内存调度**:未来 5 分钟内的任务加载到 Netty 时间轮进行高精度调度 4. **任务执行**:到期后通过 HTTP POST 回调客户端的 `backUrl` 5. **状态同步**:任务状态变更(执行中/完成/失败)实时同步到 MySQL 6. **重试机制**:回调失败自动重试最多 3 次,重试次数记录到数据库 7. **任务恢复**:服务每 2 分钟自动扫描 Redis,恢复未完成的任务 ## 适用场景 - 订单超时自动取消 - 定时消息推送 - 延迟任务调度 - 支付超时提醒 - 任何需要延迟执行的业务场景 ## 数据查询示例 **查询待执行的任务:** ```sql SELECT * FROM delay_task WHERE status = 0 ORDER BY execute_time; ``` **查询失败的任务:** ```sql SELECT * FROM delay_task WHERE status = 3 ORDER BY create_time DESC; ``` **按服务统计任务数量:** ```sql SELECT from_sever, status, COUNT(*) as count FROM delay_task GROUP BY from_sever, status; ``` **查询某个时间段的任务:** ```sql SELECT * FROM delay_task WHERE submit_time BETWEEN '2025-12-01' AND '2025-12-31' ORDER BY submit_time DESC; ``` ## License Apache-2.0