# reliable-mq **Repository Path**: ismartyx/reliable-mq ## Basic Information - **Project Name**: reliable-mq - **Description**: 基于 内存队列 + MySQL 持久化 的轻量级可靠消息队列,无需引入 RabbitMQ / Kafka 等外部中间件,适用于中小规模、对消息可靠性有基本要求的 Spring Boot 项目。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-05-29 - **Last Updated**: 2026-05-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # reliable-mq 基于 **内存队列 + MySQL 持久化** 的轻量级可靠消息队列,无需引入 RabbitMQ / Kafka 等外部中间件,适用于中小规模、对消息可靠性有基本要求的 Spring Boot 项目。 --- ## 核心特性 | 特性 | 说明 | |-------------|----------------------------------------| | **至少投递一次** | 消息先落库再入队,补偿扫描兜底,保证消息不丢 | | **事务原子性** | 消息与业务操作共享同一数据库事务,业务回滚则消息不发出 | | **多实例竞争消费** | 基于数据库 CAS(`status=0→1`)实现分布式锁,多实例部署安全 | | **延迟消息** | 支持相对延迟(秒)和绝对时间两种方式,精度取决于补偿扫描间隔(默认 10s) | | **退避重试** | 消费失败后按策略延迟重试,支持固定间隔和指数退避两种算法,可配置切换 | | **死信处理** | 超过最大重试次数进入死信,支持注册回调由业务方自定义处理 | | **超时重置** | 实例崩溃导致消息卡在"消费中"时,定时将其重置为待消费 | | **优雅停机** | 应用关闭时等待消费线程完成当前消息(最多 5 秒) | | **可配置** | 超时时间、重试策略通过 `application.yaml` 配置 | --- ## 数据库初始化 执行 `init.sql`: ```sql CREATE TABLE `mq_message` ( `id` bigint NOT NULL AUTO_INCREMENT, `topic` varchar(64) NOT NULL, `payload` text NOT NULL, `status` tinyint NOT NULL DEFAULT '0' COMMENT '0=待消费 1=消费中 2=已完成 3=死信', `retry_count` int NOT NULL DEFAULT '0', `execute_at` datetime DEFAULT NULL COMMENT '延迟执行时间,NULL 表示立即执行', `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, `updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `idx_topic_status` (`topic`, `status`), KEY `idx_topic_status_execute` (`topic`, `status`, `execute_at`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; ``` --- ## 配置项 `application.yaml`: ```yaml mq: timeout-minutes: 5 # 消息卡在"消费中"超过此时间后自动重置为待消费,默认 5 分钟 # 注意:需大于业务消费的最大耗时,否则正在处理中的消息会被误重置 retry-policy: exponential # fixed: 固定间隔 10s | exponential: 指数退避 1s/5s/30s(默认) compensate-scan-seconds: 10 # 补偿扫描间隔,默认 10 秒,同时也是延迟消息的兜底精度 mybatis: configuration: map-underscore-to-camel-case: true ``` --- ## 消息状态流转 ``` INSERT → status=0(待消费,execute_at 未到时不入内存队列) │ ▼ execute_at <= now,tryLock CAS status=1(消费中) │ ┌────┴────┐ │ │ 成功 失败/异常 │ │ ▼ ▼ retry_count < MAX_RETRY status=2 status=0(execute_at = now + backoff,等待补偿扫描) (已完成) │ │ retry_count >= MAX_RETRY ▼ status=3(死信)→ 触发 DeadLetterHandler ``` --- ## 快速接入 ### 第一步:注册消费者 在 `@PostConstruct` 或任意初始化代码中调用: ```java @Autowired private ReliableMessageQueue mq; @PostConstruct public void init() { // 单线程消费 mq.subscribe("order.created", message -> { System.out.println("收到订单:" + message.getPayload()); return true; // true=成功,false=失败触发重试 }); // 多线程并发消费(10 个消费线程) mq.subscribe("order.created", 10, message -> { // 业务逻辑... return true; }); // 注册死信处理器(可选) mq.registerDeadLetterHandler("order.created", message -> System.err.println("死信:id=" + message.getId() + ", payload=" + message.getPayload()) ); } ``` > **注意**:同一 topic 重复调用 `subscribe` 时,新的 callback 和 concurrency **覆盖**旧值,旧线程池自动关闭。 > 多线程并行消费请增大 `concurrency`,而非多次调用 `subscribe`。 --- ### 第二步:发送消息 所有发送方法必须在 `@Transactional` 方法内调用: ```java @Autowired private MqMessageService mqMessageService; // 立即发送 @Transactional public void createOrder(String orderJson) { orderRepository.save(order); // 业务操作与消息落库共享同一事务 mqMessageService.send("order.created", orderJson); } // 延迟发送(相对时间,60 秒后投递) @Transactional public void createOrderDelayed(String orderJson) { mqMessageService.sendDelay("order.created", orderJson, 60L); } // 延迟发送(绝对时间,指定时刻投递) @Transactional public void createOrderAt(String orderJson, LocalDateTime executeAt) { mqMessageService.sendDelay("order.created", orderJson, executeAt); } ``` > **重要**:`send` / `sendDelay` 使用 `Propagation.MANDATORY`,调用方**必须已开启事务**, > 否则抛出 `IllegalTransactionStateException`。 --- ### 消息体(`MqMessage`) | 字段 | 类型 | 说明 | |--------------|-----------------|------------------------| | `id` | `Long` | 自增主键 | | `topic` | `String` | 消息主题 | | `payload` | `String` | 消息内容(建议 JSON) | | `status` | `Integer` | 0=待消费 1=消费中 2=已完成 3=死信 | | `retryCount` | `Integer` | 已重试次数 | | `executeAt` | `LocalDateTime` | 延迟执行时间,`null` 表示立即执行 | | `createdAt` | `LocalDateTime` | 创建时间 | | `updatedAt` | `LocalDateTime` | 最后更新时间 | --- ## 内部机制说明 ### 发送流程(Outbox Pattern) ``` 调用方 @Transactional 方法 │ ├─ 业务 DB 操作(同一事务) ├─ mqMessageService.send/sendDelay() → mapper.insert()(同一事务落库) │ └─ 事务 COMMIT │ └─ @TransactionalEventListener(AFTER_COMMIT) └─ mapper.findById(id) ├─ execute_at 未到 → 跳过,由 compensateScan 到期捞起 └─ execute_at 已到 → queue.enqueue()(投入内存队列) ``` ### 消费流程 ``` 消费线程 queue.take() │ └─ mapper.tryLock(id) ← CAS status=0→1,多线程/多实例安全 │ ├─ 返回 0(已被抢占)→ 直接 return │ └─ 返回 1(抢占成功) │ └─ mapper.findById(id)(读最新状态) │ ├─ callback.onMessage() 成功 → status=2 │ └─ 失败/异常 ├─ retry_count < MAX_RETRY │ └─ execute_at = now + policy.nextDelay(retryCount) │ status=0(等补偿扫描到期捞起) └─ retry_count >= MAX_RETRY → status=3 + DeadLetterHandler ``` ### 退避重试策略 | 策略 | 配置值 | 第1次失败 | 第2次失败 | 第3次失败 | |----------|---------------|-------|-------|-------| | 固定间隔 | `fixed` | 10s | 10s | 10s | | 指数退避(默认) | `exponential` | 1s | 5s | 25s | 也可实现 `RetryPolicy` 接口注入自定义策略: ```java // 线性递增:10s / 20s / 30s RetryPolicy linear = retryCount -> retryCount * 10L; ``` ### 延迟消息 发送延迟消息后走**双路径**保障精度和可靠性: ``` onCommit() ├─ executeAt 未到 → scheduler.schedule(delayMs) ← 毫秒级精准触发 │ └─ 到期直接 enqueue() └─ executeAt 已到 → 直接 enqueue() compensateScan(每 compensate-scan-seconds 秒) └─ 兜底:进程重启 / JVM 崩溃导致 schedule 任务丢失时捞起 ``` - **正常路径**:`ScheduledExecutorService.schedule(delay)` 精确到毫秒,与 `compensateScan` 间隔无关 - **兜底路径**:`compensateScan` 定期扫描 DB,补偿进程重启后内存任务丢失的情况 - 调小 `compensate-scan-seconds` 可缩短重启后的最大恢复延迟 ### 补偿扫描(每 `compensate-scan-seconds` 秒,默认 10 秒) - 捞起 `status=0 AND (execute_at IS NULL OR execute_at <= NOW())` 但不在内存队列里的消息 - 覆盖重启恢复、重试回退、精准调度任务丢失三个场景 - `AtomicBoolean` 防止上次扫描未完成时重入 ### 超时重置(每 30 秒) - 将 `status=1` 且 `updated_at < now - timeout-minutes` 的消息重置为 `status=0` - 处理实例崩溃导致消息永久卡在"消费中"的场景,使用独立事务保证原子性 --- ## 多实例部署 多个实例共享同一 MySQL 时,`tryLock` 的 CAS 保证同一条消息只被一个实例消费: ```sql UPDATE mq_message SET status=1, updated_at=NOW() WHERE id=? AND status=0 ``` 返回影响行数为 0 时表示已被其他实例抢占,当前实例直接跳过。 --- ## 注意事项 1. **payload 建议使用 JSON 字符串**,方便扩展和反序列化。 2. **消费者幂等**:由于"至少投递一次"语义,极端情况下同一消息可能被重复投递,业务逻辑需保证幂等。 3. **延迟精度**:正常情况下为毫秒级(`ScheduledExecutorService` 精准调度);进程重启后降级为 `compensate-scan-seconds` 精度(默认 10 秒)。 4. **单 JVM 内存队列**,重启后内存队列清空,最长 10 秒(补偿扫描间隔)内恢复历史待消费消息。 5. **队列容量上限**默认 10,000 条,超出时新消息由补偿扫描捞起(不阻塞发送方)。 6. **`timeout-minutes` 需大于业务消费最大耗时**,否则正在处理中的消息会被误重置导致重复消费。 --- ## 项目结构 ``` src/main/java/com/example/reliablemq/ ├── mq/ │ ├── core/ │ │ ├── ReliableMessageQueue.java # 门面:subscribe / registerDeadLetterHandler / send / enqueue │ │ ├── MqConsumer.java # 消费逻辑:tryLock、callback 调用、重试/死信判断 │ │ └── MqScheduler.java # 定时任务:compensateScan、resetTimeout、线程池生命周期 │ ├── mapper/ │ │ ├── MqMessageMapper.java # MyBatis Mapper │ │ └── MqTransactionalOps.java # 需要独立事务的 DB 操作 │ ├── model/ │ │ ├── MqMessage.java # 消息实体 │ │ └── MqEnqueueEvent.java # 事务提交后入队事件(内部使用) │ ├── policy/ │ │ ├── RetryPolicy.java # 重试策略接口 │ │ ├── FixedRetryPolicy.java # 固定间隔策略 │ │ └── ExponentialRetryPolicy.java# 指数退避策略 │ ├── service/ │ │ └── MqMessageService.java # 对外发送入口(Outbox Pattern) │ └── spi/ │ ├── MessageCallback.java # 消费回调接口 │ └── DeadLetterHandler.java # 死信处理回调接口 └── biz/ ├── service/ │ └── OrderService.java # 业务接入示例 └── controller/ └── OrderController.java # HTTP 接口示例 ```