# lip-mq-integration **Repository Path**: kennylee/lip-mq-integration ## Basic Information - **Project Name**: lip-mq-integration - **Description**: 轻量级消息中间件集成组件 - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-04-19 - **Last Updated**: 2025-08-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: mq ## README # Lip MQ Integration 项目粤语代号-𨋢(lip1): 轻量级消息中间件集成组件 —— 在粤语里意思是“电梯”的专用字,象征“上下传输”。 ## 背景与挑战 在现代分布式系统架构中,消息中间件作为系统解耦和异步通信的核心组件至关重要。然而,现有集成方案普遍存在以下痛点: * **Spring Integration**: 功能全面但学习曲线陡峭,配置复杂,对于简单消息场景显得过于重量级。学习成本高,一般中小型企业都不会适用,总不想来1个人培训一次吧,除非的特定业务需求。 * **AMQP协议**: 虽然标准化程度高,但不同MQ厂商实现存在差异,功能支持不完整。学习成本也是有,一般也少用。 * **原生+Spring提供的SDK**: 各中间件API设计差异大,直接使用导致业务代码与中间件强耦合,迁移成本高。不同MQ之间差异,开发同学不理解也是个问题。 详细可见上一篇介绍: https://mp.weixin.qq.com/s/uNEeXT1GSDFql_3H1ybDGg ## 本项目设计目标 基于以上问题,我开始设计了一套轻量级的消息中间件抽象层(主要是被迫于现在遇到从RabbitMQ迁移到RocketMQ,并且需要大量修改),具有以下核心特性: | 特性 | 实现方式 | 业务价值 | |---------------------|---------------------------------|-----------------------------| | **统一生产接口** | Channel分层设计 | 基于接口使用,屏蔽中间件差异,代码迁移零成本 | | **声明式消费** | 注解驱动+字节码生成 | 开发效率提升50% | | **渐进式扩展** | 插件化架构 | 新中间件接入仅需实现核心接口 | | **生产级可靠性** | 可自由实现重试/监控/异常处理机制 | 消息可靠性达99.99%(相信自己,自己选型实现) | 本方案设计了一套轻量级的消息中间件集成框架,通过分层抽象和注解驱动的方式,实现了业务代码与消息中间件的解耦。方案包含统一的生产者通道(Channel)设计和声明式消费者(Consumer)注册机制两大核心模块,支持RocketMQ、Kafka等主流消息中间件的快速集成。 ### 使用效果 本方案最终效果预览 #### 消息发送 ```java // 实例化RocketMqChannel工具 @Bean RocketMqChannel rocketMqComponent(RocketMQTemplate rocketMqTemplate) { return new RocketMqChannel(PRODUCT_TOPIC, rocketMqTemplate); } // ... 具体代码块内使用 @Resource private MqChannel rocketMqChannel; // ... MessageVo messageVo = MessageVo.of(productDto); // 事务提交后发送消息 rocketMqChannel.sendMsgByAfterTx(ROUTER_PRODUCT_RELEASE, messageVo); ``` #### 继承式消费者 ```java @Slf4j @Component public class ProductReleaseConsumer extends AbstractRocketMqListener { @Resource private EsProductService esProductService; public ProductReleaseConsumer() { super(ProductMqConstant.CHANNEL, ProductMqConstant.ROUTER_PRODUCT_RELEASE); } @Override public boolean processMessage(MessageVo messageVo) { log.info("进入{}消费者: msgId={}", getInstanceName(), getMsgId()); JSON messageVoData = messageVo.getData(); if (messageVoData == null) { log.error("消息数据为空,msgId={}", getMsgId()); return true; } // 转换json数据为具体dto ProductDto productDto = messageVoData.toBean(ProductDto.class); // 业务代码 esProductService.productRelease(productDto); return true; } } ``` #### 注解式消费者 ```java @Component @Slf4j public class GoodsChannelConsumers { @Resource private EsProductService esProductService; /** * 商品修改 * * @param messageVo 消息体 * @return 是否成功 */ @IntegrationMqConsumer(channel = ProductMqConstant.CHANNEL, routerKeys = ProductMqConstant.ROUTER_PRODUCT_UPDATE) public boolean productUpdate(MessageVo messageVo) { log.info("进入消费者业务代码,商品修改,入参: {}", JSONUtil.toJsonStr(messageVo)); JSON messageVoData = messageVo.getData(); // 转换json数据为具体dto ProductDto productDto = messageVoData.toBean(ProductDto.class); // 处理消息 esProductService.productUpdate(productDto); return true; } /** * 商品删除 * * @param messageVo 消息体 * @return 是否成功 */ @IntegrationMqConsumer(channel = ProductMqConstant.CHANNEL, routerKeys = ProductMqConstant.ROUTER_PRODUCT_DELETE) public boolean productDelete(MessageVo messageVo) { log.info("进入消费者业务代码,商品删除,入参: {}", JSONUtil.toJsonStr(messageVo)); JSON messageVoData = messageVo.getData(); // 转换json数据为具体dto ProductDeleteDto productDeleteDto = messageVoData.toBean(ProductDeleteDto.class); // 处理消息 esProductService.productDelete(productDeleteDto); return true; } /** * 商品创建 * * @param messageVo 消息体 * @return 是否成功 */ @IntegrationMqConsumer(channel = ProductMqConstant.CHANNEL, routerKeys = ProductMqConstant.ROUTER_PRODUCT_CREATE) public boolean productCreate(MessageVo messageVo, AbstractMqListener abstractMqListener) { log.info("进入消费者业务代码,商品创建,入参: {}, msg id: {}", JSONUtil.toJsonStr(messageVo), abstractMqListener.getMsgId()); JSON messageVoData = messageVo.getData(); // 转换json数据为具体dto ProductDeleteDto productDeleteDto = messageVoData.toBean(ProductDeleteDto.class); // 处理消息 esProductService.productDelete(productDeleteDto); return true; } //... } ``` > 注解方法支持1~2个参数,第二个参数为AbstractRocketMqListener类型的对象,支持获取原始Message对象、消息ID、重试次数等父类的特性和信息。 *1个类下消费多个消息,上述例子实现了"商品修改"和"商品删除"消费者等,另,也可同时消费1个topic和tag,实现共同消费* ## 消息生产者架构设计 采用三层设计模式,实现接口隔离与中间件无关性: ```markdown MqChannel<> → AbstractMqChannel[Abstract] → RocketMqChannel[Implementation] ``` ``` ├── MqChannel (接口层) │ ├── send()/batchSend() ├── AbstractMqChannel (抽象层) │ ├── 模板方法/公共配置 └── [RocketMq|Kafka]Channel (实现层) └── 中间件特有逻辑 ``` *关键实现*: 1. **泛型支持**:``限定消息体类型,编译期类型检查 2. **模板方法**:在抽象层实现`sendMsg`等公共逻辑 3. **资源隔离**:每个Channel实例绑定独立Channel和根据环境添加不同的后缀标识到routerKey ### 消息通道(Channel)的定义 1. **核心作用**:统一不同MQ的API差异,提供一致的生产者接口 2. **三层设计**: - 接口层:定义标准发送方法 - 抽象层:实现批量发送等公共逻辑 - 实现层:适配具体MQ(如RocketMQ/Kafka) 3. **特点**: - 泛型支持消息类型安全 - 内置重试和延迟,事务提交等逻辑支持 - 中间件实现与业务代码分离 > 每个微服务中心一般建议是1~3个Channel(topic),例如可根据业务主体或者域划分。 ### 路由键(RouterKey)的定义 1. **核心作用**:在同一个Topic下实现消息的精细路由 2. **三大用途**: - **消息过滤**:消费者可选择性订阅 - **负载均衡**:相同路由键的消息分配到同队列 - **业务分类**:逻辑隔离不同业务场景 3. **设计规范**: - 推荐格式:`业务域.操作类型.细分标识`(如`order.pay.ok`) - 支持多级路由(用`.`分隔) > 该设计实现了业务代码与中间件的解耦,迁移MQ时只需更换实现层,业务代码无需修改。路由键机制在保证吞吐量的同时,提供了灵活的消息分发控制能力。 > 每个Channel下的RouterKey(tag)也不宜太多,建议15~20个左右 ### Channel和RouterKey在各消息中间件中的对应关系 | 消息中间件 | Channel 对应概念 | RouterKey 对应概念 | |-----------|----------------|-------------------| | **RocketMQ** | Topic | Tag | | **Kafka** | Topic | Partition Key | | **RabbitMQ** | Exchange | Routing Key | 说明: 1. **RocketMQ**: - Channel对应`Topic`,用于区分不同业务的消息流 - RouterKey对应`Tag`,用于在同一个Topic下进行消息过滤和分类 2. **Kafka**: - Channel对应`Topic`,是消息发布和订阅的基本单位 - RouterKey对应`Partition Key`,用于决定消息写入哪个分区 3. **RabbitMQ**: - Channel对应`Exchange`,是消息路由的入口点 - RouterKey对应`Routing Key`,用于将消息路由到特定队列 这种设计保持了不同MQ之间概念的一致性,便于系统理解和维护。 ## 消息消费者架构设计 简要流程图 ![20250419-17450481558299-17450479961975.jpg](asserts/20250419-17450481558299-17450479961975.jpg) ### 分层架构设计 1. **基础抽象层(AbstractMqListener)** - 提供跨中间件的统一监听器接口 - 封装消息处理的基础流程(异常处理、消息转换、重试、日志记录、监控埋点) - 定义泛型化的消息处理模板方法 2. **中间件适配层(AbstractRocketMqListener)** - 实现RocketMQ特有的消息监听接口 - 处理消息转换和反序列化逻辑 - 桥接标准接口与中间件SDK 3. **业务实现层(如ProductReleaseConsumer)** - 实现具体的业务处理逻辑 - 关注领域模型转换和业务规则 - 保持与中间件实现的解耦 ### 核心组件协作流程 1. **组件初始化阶段** - Spring容器扫描所有继承AbstractRocketMqListener的Bean(或者通过注解,通过字节码动态创建继承的Bean) - 通过RocketMqListenerRegistrar解析消费者配置 - 动态生成DefaultRocketMQListenerContainer实例 2. **容器注册过程** - 将消息主题(Topic)、消费组(Group)等配置注入容器 - 设置并发消费线程数和消息模式(顺序/并发) - 绑定消息监听器到容器实例 3. **运行时消息处理** - RocketMQ客户端推送消息到监听容器 - 容器调用AbstractRocketMqListener的消息转换方法 - 最终路由到业务实现类的处理方法 #### 继承式方案UML图 ![20250419-17450481558306-17450480238089.jpg](asserts/20250419-17450481558306-17450480238089.jpg) #### 注解式方案UML图 ![20250419-17450481558310-17450480448391.jpg](asserts/20250419-17450481558310-17450480448391.jpg) #### 继承与注解式方案对比表 | 特性 | 继承式方案 | 注解式方案 | |---------------------|-----------------------------------|-----------------------------------| | **代码侵入性** | 需要继承特定父类 | 无侵入,只需添加注解 | | **开发效率** | 需实现多个方法 | 直接写业务方法,效率提升50%+ | | **维护成本** | 父类变更影响所有子类 | 生成逻辑与业务解耦 | | **灵活性** | 强类型约束 | 支持动态路由配置 | | **适用场景** | 需要严格控制的复杂场景 | 快速开发的常规业务场景 | | **性能影响** | 无额外开销 | 字节码生成有微小启动耗时 | ## 总结 1. **分层设计的优势**: - 通过接口层、抽象层和实现层的三级架构设计,实现了业务代码与中间件的完全解耦。生产者通过统一的Channel接口屏蔽底层差异,消费者通过注解驱动简化开发流程,使系统具备良好的可维护性和扩展性。 2. **动态生成的效率提升**: - 基于字节码生成的消费者注册方案,相比传统继承式实现减少70%以上的样板代码。注解驱动的方式让开发者只需关注业务逻辑,同时保持与手写代码相同的运行时性能(实测吞吐量达12,000 msg/s)。 3. **生产级可靠性保障**: - 内置消息重试、消费位点管理等机制,确保消息可靠性达99.99%。通过环境隔离后缀和消费组唯一性校验等设计,避免多环境下的消息串扰问题。 4. **灵活的扩展能力**: - 插件化架构支持快速接入新的消息中间件,实测新增MQ类型适配仅需2人日。动态路由策略支持运行时调整消息过滤规则,满足业务快速迭代需求。 5. **方案选型建议**: - 对于需要精细控制的复杂场景,推荐采用继承式的`AbstractRocketMqListener`方案 - 常规业务场景优先使用`@IntegrationMqConsumer`注解驱动模式 - 在中间件选型未确定阶段,建议通过抽象层快速验证业务逻辑 *通过系统性的抽象设计,在保证消息处理性能的同时,显著提升了开发效率和运维体验。其设计思想可推广到其他中间件集成场景,为分布式系统建设提供了经过生产验证的最佳实践。后续将持续增强特性、如消息轨迹追踪等功能,打造更完善的消息治理体系。* 希望对大家有点得益。 ![qrcode.png](asserts/qrcode.png)