# cloud-rocketmq-example **Repository Path**: nehc_ruoyi_plus/cloud-rocketmq-example ## Basic Information - **Project Name**: cloud-rocketmq-example - **Description**: No description available - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: rocketmq-example - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-05 - **Last Updated**: 2025-09-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RuoYi-Cloud-Plus RocketMQ 项目 本项目是基于 Spring Cloud 和 RocketMQ 的消息队列测试和示例项目,提供了完整的消息生产、消费和管理功能,以及 RocketMQ 各种特性的详细示例和最佳实践。 ## 📋 目录 1. [项目概述](#项目概述) 2. [项目结构](#项目结构) 3. [功能模块](#功能模块) 4. [RocketMQ 版本对比](#rocketmq-版本对比) 5. [API 对比分析](#api-对比分析) 6. [消息可靠性保障](#消息可靠性保障) 7. [SQL 过滤功能](#sql-过滤功能) 8. [快速开始](#快速开始) ## 项目概述 本项目是 RuoYi-Cloud-Plus 框架的 RocketMQ 测试和示例模块,旨在提供全面的 RocketMQ 功能演示和最佳实践。项目基于 Spring Boot 和 RocketMQ,实现了各种类型的消息发送和消费场景,包括普通消息、顺序消息、延时消息、事务消息等,同时提供了详细的文档和示例代码。 ## 项目结构 ``` src/main/java/org/dromara/stream/ ├── controller/ # 控制器层 - REST API 接口 ├── producer/ # 生产者层 - 消息发送 ├── consumer/ # 消费者层 - 消息接收 ├── listener/ # 监听器 - 事务消息监听 ├── config/ # 配置类 └── callback/ # 回调处理 ``` ## 功能模块 ### 1. 普通消息 (Normal Message) 基础的消息发送和接收功能,支持同步和异步发送。 - **控制器**: `NormalRocketController.java` - 提供多种消息发送接口,包括普通消息、带标签消息、延迟消息等 - 支持七种不同的发送方式测试 - **生产者**: `NormalRocketProducer.java` - 实现各种消息发送方法 - 提供同步、异步、单向等多种发送模式 - **消费者**: `NormalRocketConsumer.java` - 处理普通消息的消费逻辑 ### 2. 过滤消息 (Filter Message) 支持基于 Tag 和 SQL 表达式的消息过滤,实现精确的消息路由。 - **控制器**: `FilterRocketController.java` - 提供 Tag 过滤和 SQL 过滤消息发送接口 - 支持多种业务场景的过滤消息测试 - **生产者**: `FilterRocketProducer.java` - 实现带标签和 SQL 属性的消息发送 - 支持多种业务类型的消息发送 - **消费者**: - `TagFilterRocketConsumer.java` - 标签过滤消费者 - `SqlFilterRocketConsumer.java` - SQL 过滤消费者 ### 3. 延时消息 (Delay Message) 延时消息允许消息在指定时间后才被消费,适用于定时任务、订单超时处理等场景。 - **控制器**: `DelayRocketController.java` - 提供延时消息发送接口 - 支持多种延时级别和业务场景 - **生产者**: `DelayRocketProducer.java` - 实现延时消息发送方法 - 支持毫秒级延时和指定时间戳发送 - **消费者**: - `DelayRocketConsumer.java` - 延时消息消费者 - `OrderTimeoutConsumer.java` - 订单超时消费者 - `TaskReminderConsumer.java` - 任务提醒消费者 ### 4. 顺序消息 (Ordered Message) 保证消息按照发送顺序被消费,适用于需要严格顺序的业务场景。 - **控制器**: `OrderedRocketController.java` - 提供全局顺序和分区顺序消息发送接口 - 支持多种业务场景的顺序消息测试 - **生产者**: `OrderedRocketProducer.java` - 实现全局顺序和分区顺序消息发送 - 支持多种业务类型的顺序消息 ## RocketMQ 版本对比 ### RocketMQ 4.x 与 5.x 版本对比 #### 1. 架构变化 | 特性 | RocketMQ 4.x | RocketMQ 5.x | | --- | --- | --- | | 整体架构 | 经典的NameServer + Broker架构 | 引入Controller组件,支持Raft协议 | | 高可用模式 | Master-Slave模式,异步或同步复制 | 基于Raft的多副本机制,提供更强的一致性保证 | | 元数据管理 | 依赖NameServer,松散耦合 | 引入Controller集中管理元数据,更加可靠 | | 部署复杂度 | 相对简单 | 增加了Controller组件,部署稍复杂 | #### 2. 功能增强 | 特性 | RocketMQ 4.x | RocketMQ 5.x | | --- | --- | --- | | 消息类型 | 普通、顺序、事务、延时消息 | 增加了批量消息处理能力,优化了事务消息 | | 消息过滤 | 支持Tag和SQL92 | 增强的SQL过滤能力,性能更优 | | 消息轨迹 | 基础支持 | 增强的消息轨迹,更完善的链路追踪 | | 消息存储 | 基于CommitLog的存储 | 优化的存储结构,提高IO效率 | | 延时消息 | 有限级别的延时 | 支持任意精度的延时消息 | ### rocketmq-spring-boot-starter 版本对比 #### 版本兼容性 | 特性 | rocketmq-spring-boot-starter 2.2.x | rocketmq-spring-boot-starter 2.3.x | | --- | --- | --- | | RocketMQ版本 | 兼容RocketMQ 4.9.x | 兼容RocketMQ 5.0.0+ | | Spring Boot版本 | 兼容Spring Boot 2.0.x - 2.6.x | 兼容Spring Boot 2.6.x+ 和 3.0.x+ | | JDK版本 | JDK 8+ | JDK 8+ (Spring Boot 3.x需要JDK 17+) | ## API 对比分析 ### Spring RocketMQ 与原生 RocketMQ API 对比 #### 基础消息发送 | 功能特性 | Spring RocketMQ | 原生 RocketMQ API | 区别说明 | |---------|----------------|------------------|----------| | 同步发送 | ✅ `convertAndSend()` | ✅ `send()` | Spring封装更简单,原生API更灵活 | | 同步发送(显式) | ✅ `syncSend()` | ✅ `send()` | syncSend明确同步语义,convertAndSend默认同步 | | 通用发送 | ✅ `send()` | ✅ `send()` | Spring的send方法支持多种发送模式 | | 异步发送 | ✅ `asyncSend()` | ✅ `send(msg, callback)` | 原生API回调控制更精细 | | 单向发送 | ✅ `sendOneWay()` | ✅ `sendOneway()` | 功能基本一致 | #### 消息类型支持 | 功能特性 | Spring RocketMQ | 原生 RocketMQ API | 区别说明 | |---------|----------------|------------------|----------| | 普通消息 | ✅ 完全支持 | ✅ 完全支持 | 两者都完全支持 | | 延迟消息 | ✅ 支持延迟级别 | ✅ 支持延迟级别 | 原生API支持更多配置选项 | | 顺序消息(同步) | ✅ `syncSendOrderly()` | ✅ `send(msg, selector, arg)` | 原生API队列选择更灵活 | | 顺序消息(异步) | ✅ `asyncSendOrderly()` | ✅ `send(msg, selector, arg, callback)` | 原生API异步回调控制更精细 | | 事务消息 | ✅ `sendMessageInTransaction()` | ✅ `sendMessageInTransaction()` | 原生API事务监听器配置更灵活 | ### 选择建议 #### 选择 Spring RocketMQ 的场景 - 快速开发和原型验证 - Spring Boot/Cloud 项目 - 对 RocketMQ 高级特性需求不高 - 团队对 Spring 生态更熟悉 - 需要与其他 Spring 组件集成 #### 选择原生 RocketMQ API 的场景 - 需要使用 RocketMQ 全部特性 - 对性能有极致要求 - 需要精细控制消息发送行为 - 复杂的业务场景(如自定义队列选择) - 需要深度定制和优化 ## 消息可靠性保障 ### 消息可靠性概述 消息可靠性是指在分布式系统中,确保消息从生产者发送到消费者接收的整个过程中,消息不会丢失、不会重复、不会乱序的能力。 ### 可靠性等级对比 | 可靠性等级 | 生产者策略 | Broker策略 | 消费者策略 | 性能影响 | 适用场景 | |------------|------------|------------|------------|----------|-------------| | **最高** | 同步发送+事务 | 同步刷盘+主从同步 | 手动ACK+重试 | 低 | 金融交易 | | **高** | 同步发送 | 异步刷盘+主从同步 | 自动ACK+重试 | 中 | 订单处理 | | **中** | 异步发送 | 异步刷盘+主从异步 | 自动ACK | 高 | 日志收集 | | **低** | 单向发送 | 内存存储 | 无重试 | 最高 | 监控数据 | ### 生产者可靠性 1. **同步发送(最高可靠性)** - 等待Broker的ACK响应,确保消息成功存储后才返回 - 适用于对可靠性要求高的场景 2. **异步发送(平衡性能与可靠性)** - 通过回调机制处理发送结果,提供较好的性能 - 适用于对性能和可靠性都有要求的场景 3. **事务消息(最高可靠性)** - 保证消息发送和本地事务的原子性 - 适用于分布式事务场景 ### Broker可靠性 1. **刷盘策略** - 同步刷盘:消息写入磁盘后才返回成功,可靠性高但性能较低 - 异步刷盘:消息写入内存后返回成功,定期刷盘,性能高但可靠性较低 2. **主从复制** - 同步复制:消息同步复制到从节点后才返回成功,可靠性高但性能较低 - 异步复制:消息异步复制到从节点,性能高但可靠性较低 ### 消费者可靠性 1. **消费确认** - 自动确认:消费者接收到消息后自动确认,简单但可能丢失消息 - 手动确认:消费者处理完消息后手动确认,可靠性高 2. **重试机制** - 消费失败重试:消费失败后自动重试,提高消息处理成功率 - 死信队列:多次重试失败后进入死信队列,便于后续处理 ## SQL 过滤功能 ### SQL过滤概述 SQL过滤是RocketMQ提供的一种高级消息过滤机制,允许消费者使用SQL92表达式基于消息属性进行复杂的过滤条件设置。 ### 核心特点 - **服务端过滤**: 过滤在Broker端进行,减少网络传输 - **表达式丰富**: 支持SQL92语法,包括数值比较、字符串匹配、布尔运算等 - **性能优化**: 避免不必要的消息传输和客户端处理 - **灵活配置**: 支持复杂的业务逻辑过滤条件 ### 与Tag过滤的区别 | 特性 | Tag过滤 | SQL过滤 | |------|---------|----------| | 过滤位置 | Broker端 | Broker端 | | 表达式复杂度 | 简单Tag匹配 | 复杂SQL表达式 | | 支持的操作 | 等值匹配、OR | 数值比较、字符串匹配、逻辑运算 | | 性能开销 | 低 | 中等 | | 适用场景 | 简单分类 | 复杂业务逻辑 | ### 实现方式 1. **生产者发送带属性的消息** - 使用`putUserProperty`方法设置消息属性 - 根据业务需求设置不同的属性组合 2. **消费者使用SQL表达式过滤** - 使用SQL92语法编写过滤表达式 - 在消费者订阅时指定过滤表达式 ## 快速开始 ### 环境要求 - JDK 8+ - Spring Boot 2.x/3.x - RocketMQ 4.x/5.x ### 配置示例 ```yaml rocketmq: name-server: 127.0.0.1:9876 producer: group: my-producer-group send-message-timeout: 3000 compress-message-body-threshold: 4096 max-message-size: 4194304 retry-times-when-send-failed: 2 retry-times-when-send-async-failed: 2 retry-next-server: true enable-msg-trace: true customized-trace-topic: RMQ_SYS_TRACE_TOPIC ``` ### 示例代码 1. **发送消息** ```java @RestController @RequestMapping("/mq") public class MQController { @Resource private RocketMQTemplate rocketMQTemplate; @GetMapping("/send") public String send(String message) { rocketMQTemplate.convertAndSend("test-topic", message); return "发送成功"; } } ``` 2. **消费消息** ```java @Component @RocketMQMessageListener( consumerGroup = "test-consumer-group", topic = "test-topic", consumeMode = ConsumeMode.CONCURRENTLY ) public class TestConsumer implements RocketMQListener { @Override public void onMessage(String message) { System.out.println("接收到消息:" + message); } } ``` --- 本项目提供了丰富的RocketMQ功能示例和最佳实践,欢迎参考和使用。如有问题,请提交Issue或PR。