# rabbitmq **Repository Path**: thanlin/rabbitmq ## Basic Information - **Project Name**: rabbitmq - **Description**: RabbitMQ SpringBoot 示例 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2019-08-01 - **Last Updated**: 2024-08-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RabbitMq 示例 #### 参考 - [retry 处理 https://yq.aliyun.com/articles/316814](https://yq.aliyun.com/articles/316814) - [retry https://my.oschina.net/dengfuwei/blog/1595047](https://my.oschina.net/dengfuwei/blog/1595047) - [retry https://www.jianshu.com/p/4904c609632f](https://www.jianshu.com/p/4904c609632f) - [retry https://github.com/spring-projects/spring-retry](https://github.com/spring-projects/spring-retry) - [https://www.cnblogs.com/pokid/p/10527708.html](https://www.cnblogs.com/pokid/p/10527708.html) - [https://www.cnblogs.com/pokid/p/10527765.html](https://www.cnblogs.com/pokid/p/10527765.html) ## 消息可靠性 ##### Queue 持久化 ``` public Queue(String name, boolean durable) ``` ##### Message 持久化 ``` //convertAndSend 默认已持久化(MessageProperties deliveryMode=PERSISTENT) rabbitTemplate.convertAndSend(exchange, routingKey,msg); //手动设置消息编码,持久化功能 MessageProperties properties = new MessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setContentEncoding("UTF-8"); Message message = new Message(msg.getBytes(StandardCharsets.UTF_8),properties); rabbitTemplate.send(exchange,routingKey,message); ``` ##### Exchange 持久化 ``` public DirectExchange(String name, boolean durable, boolean autoDelete) ``` ##### 消费端手动确认开启 ``` #1.开启手动确认功能 acknowledge-mode: manual #2.消费端手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); ``` ##### 发送端回调确认 ``` #如果发送到交换器失败(比如说删除了交换器),ack 返回值为 false #消息不管是否投递到交换机都进行ConfirmCallback回调,投递成功ack=true,否则为false RabbitTemplate.ConfirmCallback.confirm #交换机匹配到队列成功则不进行ReturnCallback回调,否则先进行ReturnCallback回调再进行ConfirmCallback回调 RabbitTemplate.ReturnCallback.returnMessage ``` ##### 调大 channel size ``` #避免 connection closed 导致消息丢失 cache.channel.size: 100 ``` ##### 消息重试机制 ###### 重试机制 - 消费方发生异常时,需要 throw 异常,消息才会重发. - 使用nack(requque为false)确认消息消费失败,消息不会再次进入队列,且不会重发 注意:未配置重试机制情况,消息消费失败 消费方重启后才会再次受到消息,重试的目的是解决消息不及时问题. ``` #开启消息重试配置 retry: enabled: true max-attempts: 3 max-interval: 1200000 initial-interval: 5000 multiplier: 2 ``` ##### 失败转发处理 ###### 第一种方式 - 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败,消息直接进入死信队列 - 此方式接收方catch异常,并调用nack 使用消息进入死信队列,由于使用nack消息失败后不在推送,因此重发意义不大 ###### 第二种方式 - 使用 `MessageRecoverer` 转发失败的消息。 - catch异常并log记录,并抛出异常,达到最大retry次数后,会调用 `MessageRecoverer.recover ` 方法,,在此处手动发送到指定队列 - 该方式 数据仍旧存在Queue中 接收方重启后还会收到消息,需要消费处理重新消息。 ###### 第三种方式 - 消息重发次数完成后,转发消息(死信队列),并反馈MQ 消息消费成功。 ###### 第四种方式 - 消息重发次数完成后,使用 `MessageRecoverer.recover ` 转发消息,并反馈MQ消费成功.