# tomato-ms **Repository Path**: wanchen97/tomato-ms ## Basic Information - **Project Name**: tomato-ms - **Description**: 基于springcloud-stream的消费端可靠性消息投递策略 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2021-02-22 - **Last Updated**: 2021-02-22 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### tomato-ms(Message Strategy) 使用说明 #### 一、简介 ##### tomato-ms是一个统一处理了消息幂等性的问题与提供多套消费端消费失败后的消息处理策略的程序 #### 二、tomato-ms 使用: ##### 1.导入jar包 ```xml com.hq spring-boot-starter-tomato-ms 1.0.0-RELEASE ``` ##### 2.开启TomatoMs ```java // ...其他注解省略 @SpringBootApplication @EnableTomatoMs //开启TomatoMs public class Bootstrap{ public static void main(String[] args) { SpringApplication.run(Bootstrap.class, args); } } ``` ##### 3.配置死信队列 ```yml spring: cloud: stream: rabbit: bindings: omsapi-order-dispatch-code-input: #示例beanName consumer: auto-bind-dlq: true requeueRejected: true republishToDlq: true ``` ##### 4.加上@ReliabilityStrategy注解 ```java @StreamListener(value = InputInterface.IN_OMSAPI_ORDER_DISPATCH_CODE) @ReliabilityStrategy public void updateOrderDispatchCode(@Payload Message payload) { ... } ``` #### 三、内核分析 ##### 1.设计图: ![输入图片说明](https://images.gitee.com/uploads/images/2019/1129/110540_47e9b70b_1550341.png "企业微信截图_15743053681876.png") ##### 2.策略详解 2.1 borker向mq推送一条消息时,当集成了spring-boot-starter-data-redis并且开启了幂等功能时,此条消息会在aop中完成幂等性操作。 2.2 当消息处理异常时,若消息配置了重试,则会在重试仍然失败后进入所配置的死信队列 2.3 死信队列会统一交给私信消息转发器去统一找到对应的策略,并执行 2.3.1目前提供了三种进入死信队列后的处理策略 1. 重入队列/Requeue 2. 丢弃消息,打印log 3. 存储到mongodb,定时器重发,或者人工干预 2.4扩展策略 ​ 当实现DlqSrategy接口,可完成自定义策略,在注解中指定对应的策略即可 #### 四、策略选择/@ReliabilityStrategy 介绍 ##### 1.注解源码 ```java @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface ReliabilityStrategy { /**是否需要幂等**/ boolean idempotent() default true; /**可靠性策略**/ Class strategy() default MongoDbDlqStrategy.class; } ``` ##### 2.注解介绍 ##### 注解提供了两种选择属性,分别是是否需要幂等(idempotent)和可靠性策略(strategy) ###### 1.idempotent:是否需要幂等,默认为幂等 此属性标识为true时,表示消息会进行消息幂等性操作,即当同一条消息(根据messageId标识)异常情况下推送多次,例如网络抖动、连接发生问题时会进行消息重复推送,此策略为保证多条消息重复推送时,只有一条消息能被消费。 ###### 2.strategy:当消息异常进入死信队列时所执行的策略,此处提供了是三种策略,默认为mongo入库 1.MongoDbDlqStrategy:入库mango 等待定时器重发,或者人工干预 2.DiscardDlqStrategy:丢弃消息计入log 3.RequeueDlqStrategy:当消费异常时,重新回到队列头(不推荐,可能造成消费循环) ##### 3.处理策略扩展 扩展策略只需要实现com.hq.tomato.ms.strategy.DlqStrategy 接口即可,下面为demo ```java @Component @Slf4j public class MyStrategy implements DlqStrategy { @Override public void process(Message message, Channel channel) { log.info("【这是我自定义的策略】"); } } ``` @Param message: Amqp Message对象 @Param channel:rabbitmq channel ##### 五、自动装配 1.tomato-ms使用@Enable 模式来完成自动装配模式,当不添加注解时,所有功能都将不起作用,避免了导入包后,因为@ComponentScan引起的一些问题 2.程序实现了可插拔性(或者说降级处理) 当前的功能幂等性,依赖于redis。入库mongo依赖于mognodb,但可能某些场景下,并不需要做幂等,或者根本就不需要存入mongo,为了避免这种强依赖的关系,实现使用了@Condition注解避免了强引用问题。 2.1.当不引用redis时,则去除幂等操作,但不影响后面的死信队列策略。 2.2 当不引入mongo时,则去除死信队列的mongo策略,但不影响幂等和其他的策略 ##### 六、配置 ```yaml tomato: ms: maxRetryNum: 3 checkoutTemplate: true maxLimit: 100 ``` maxRetryNum: 当消息处理异常进入mongo后,允许最大重试的次数(即定时任务重发的最大次数),默认为10 maxLimit: 定时任务每次冲mongodb中获取的最大条数,默认为100条 checkoutTemplate:是否选择mongotemlate,默认为false,即默认选择的为MongoDbUtil ##### 七、定时任务处理器 考虑到定时任务的实现有多样性(可能为xx-job/quartz/spring-scheduling),即此程序暂时不做提供实现,只提供了一个处理类com.hq.tomato.ms.handler.ReSendMessageHandler,实现如下: ```java @Bean public ReSendMessageHandler reSendMessageHandler(){ return new ReSendMessageHandler(); } //调用 reSendMessageHandler.execute(); ```