7 Star 49 Fork 22

thales/springboot

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

springboot

介绍

springboot-rabbitmq

springboot整合rabbitmq实现消息队列的消息生成和消息消费

rabbitmq的安装

Rabbit MQ是建立在强大的Erlang OTP平台上,因此安装RabbitMQ之前要先安装Erlang。

首先要去网站下载erlang的安装包http://www.erlang.org/downloads,安装

然后去rabbitmq官网下载rabbitmq的服务安装程序https://www.rabbitmq.com/install-windows.html,安装

配置好erlang的环境变量ERLANG_HOME=C:\Program Files\erl10.4

启动rabbitmq服务,访问localhost:15672 输入guest/guest进入管理界面

为什么使用消息队列?
比较核心的三个功能为:解耦、异步和削峰

  解耦:例如解除ABC三个系统的强连接,消息通信都通过队列去进行
  异步:比如一段处理程序分别执行ABC三个动作,这样可以把ABC三个动作拆分开分别去消费队列的消息进行异步处理
  削峰:某一时刻数据太大超过了程序的处理能力,可以把多余的消息放入队列里面慢慢处理
如何保证rabbbitmq的高可用?也就是保证消息队列的可用性。
rabbitmq有三种模式,普通模式,普通集群模式(无高可用性)和镜像集群模式(高可用方案)

  其中普通模式就是我们现在的练习模式,一个单机跑一个rabbitmq实例,无高可用特性
  普通集群模式就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个,主要用来提高消息的吞吐量
  镜像集群模式,高可用方案,每个RabbitMQ节点都有队列的一个完整镜像,相当于数据全部备份了,一个节点的数据丢失不影响。
  
  
那么如何开启镜像集群模式呢?

  可以通过命令开启:rabbitmqctl set_policy ha-all “^” ‘{“ha-mode”:“all”}’
  也可以在后台管理系统进行策略的配置:Admin---->Policies---->ha-mode
   
集群模式的缺点

  首先有一个显而易见的缺点,就是性能消耗太大,所以的数据所有节点全部备份导致网络带宽压力和消耗很重。
  其次扩展性不是很好,因为你越增加机器,那么同步数据所带来的网络带宽压力和消耗会越来越大。
如何保证消息的幂等性?
所谓幂等性,就是一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

那么如何保证消息队列的幂等性呢?这个必须在业务里面进行判断。

    比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
    比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
    比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
如何保证消息的可靠性?
所谓可靠性,就是我要相信,我给到消息队列里面的消息不能无缘无故的没了

那么,产生消息丢失有以下三种情况

  消息生产者丢失
  消息队列本身丢失
  消费过程中丢失
	
消息生成过程当中丢失解决办法:

  开启生成消息确认模式,该项目当中的MsgProducer类就采用了这种方式
  
息队列本身丢失:

  开启消息队列持久化。
  return new Queue(QUEUE_A, true);其中第二个参数durable就是是否持久化。
  假如设置为true,队列当中的消息在重启rabbitmq服务的时候还会存在。
  
消费过程中丢失

  关闭rabbitmq的自动ack模式,改为手动ack,假如出错了,就不会发送ack,这样消息就不会丢失。
  该项目的MsgReceiver当中就对队列A当中写了两种ack模式,分别为手动和自动ack
消息队列消息积压怎么办?
首先消息产生积压,基本上只有一种可能,就是消费者出现故障,导致不消费消息或者消费消息速度极慢。

那么一旦出现了这个状况,基本说明服务端出现了故障了。

队列里面积压了千万级的速度怎么办?

这个时候只有两个办法

1、修复消费者---->让消费者按照正常速度去消费消息----->等上几个小时,说不定就消费完了。
2、紧急扩容
  首先找出consumer的问题进行修复,使其能够正常消费消息。
  停掉现有的consumer,临时建好原先10倍的queue的数量。
  写一个分发程序,把消息分发到10个queue里面。
  扩容10倍的机器来部署consumer,每一批consumer消费一个queue里面的消息。
  等队列里面积压的消息消费完了,恢复到原来的系统架构。
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ydd1989/springboot.git
git@gitee.com:ydd1989/springboot.git
ydd1989
springboot
springboot
master

搜索帮助