Fetch the repository succeeded.
springboot-rabbitmq
springboot整合rabbitmq实现消息队列的消息生成和消息消费
Rabbit MQ是建立在强大的Erlang OTP平台上,因此安装RabbitMQ之前要先安装Erlang。
首先要去网站下载erlang的安装包http://www.erlang.org/downloads,安装
然后去rabbitmq官网下载rabbitmq的服务安装程序https://www.rabbitmq.com/install-windows.html,安装
配置好erlang的环境变量ERLANG_HOME=C:\Program Files\erl10.4
启动rabbitmq服务,访问localhost:15672 输入guest/guest进入管理界面
比较核心的三个功能为:解耦、异步和削峰
解耦:例如解除ABC三个系统的强连接,消息通信都通过队列去进行
异步:比如一段处理程序分别执行ABC三个动作,这样可以把ABC三个动作拆分开分别去消费队列的消息进行异步处理
削峰:某一时刻数据太大超过了程序的处理能力,可以把多余的消息放入队列里面慢慢处理
rabbitmq有三种模式,普通模式,普通集群模式(无高可用性)和镜像集群模式(高可用方案)
其中普通模式就是我们现在的练习模式,一个单机跑一个rabbitmq实例,无高可用特性
普通集群模式就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个,主要用来提高消息的吞吐量
镜像集群模式,高可用方案,每个RabbitMQ节点都有队列的一个完整镜像,相当于数据全部备份了,一个节点的数据丢失不影响。
那么如何开启镜像集群模式呢?
可以通过命令开启:rabbitmqctl set_policy ha-all “^” ‘{“ha-mode”:“all”}’
也可以在后台管理系统进行策略的配置:Admin---->Policies---->ha-mode
集群模式的缺点
首先有一个显而易见的缺点,就是性能消耗太大,所以的数据所有节点全部备份导致网络带宽压力和消耗很重。
其次扩展性不是很好,因为你越增加机器,那么同步数据所带来的网络带宽压力和消耗会越来越大。
所谓幂等性,就是一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
那么如何保证消息队列的幂等性呢?这个必须在业务里面进行判断。
比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
所谓可靠性,就是我要相信,我给到消息队列里面的消息不能无缘无故的没了
那么,产生消息丢失有以下三种情况
消息生产者丢失
消息队列本身丢失
消费过程中丢失
消息生成过程当中丢失解决办法:
开启生成消息确认模式,该项目当中的MsgProducer类就采用了这种方式
息队列本身丢失:
开启消息队列持久化。
return new Queue(QUEUE_A, true);其中第二个参数durable就是是否持久化。
假如设置为true,队列当中的消息在重启rabbitmq服务的时候还会存在。
消费过程中丢失
关闭rabbitmq的自动ack模式,改为手动ack,假如出错了,就不会发送ack,这样消息就不会丢失。
该项目的MsgReceiver当中就对队列A当中写了两种ack模式,分别为手动和自动ack
首先消息产生积压,基本上只有一种可能,就是消费者出现故障,导致不消费消息或者消费消息速度极慢。
那么一旦出现了这个状况,基本说明服务端出现了故障了。
队列里面积压了千万级的速度怎么办?
这个时候只有两个办法
1、修复消费者---->让消费者按照正常速度去消费消息----->等上几个小时,说不定就消费完了。
2、紧急扩容
首先找出consumer的问题进行修复,使其能够正常消费消息。
停掉现有的consumer,临时建好原先10倍的queue的数量。
写一个分发程序,把消息分发到10个queue里面。
扩容10倍的机器来部署consumer,每一批consumer消费一个queue里面的消息。
等队列里面积压的消息消费完了,恢复到原来的系统架构。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。