# springboot-rabbitmq **Repository Path**: zhangbw666/springboot-rabbitmq ## Basic Information - **Project Name**: springboot-rabbitmq - **Description**: RabbitMq消息实战 - **Primary Language**: Java - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 5 - **Forks**: 1 - **Created**: 2021-02-20 - **Last Updated**: 2025-01-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RabbitMQ 典型应用场景实战 #### 一、实战前言 RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、数据延迟处理等。 #### 二、RabbitMQ 官网拜读 首先,让我们先拜读 RabbitMQ 官网的技术开发手册以及相关的 Features,感兴趣的朋友可以耐心的阅读其中的相关介绍,相信会有一定的收获,地址可见:[RabbitMQ 官网](http://www.rabbitmq.com/getstarted.html) 阅读该手册过程中,我们可以得知 RabbitMQ 其实核心就是围绕 “消息模型” 来展开的,其中就包括了组成消息模型的相关组件:生产者,消费者,队列,交换机,路由,消息等!而我们在实战应用中,实际上也是紧紧围绕着 “消息模型” 来展开撸码的! 下面,我就介绍一下这一消息模型的演变历程,当然,这一历程在 RabbitMQ 官网也是可以窥览得到的! ![消息模型的演变历程](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301001.jpg) ![消息模型的演变历程](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301002.jpg) ![消息模型的演变历程](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301003.jpg) 上面几个图就已经概述了几个要点,而且,这几个要点的含义可以说是字如其名! * 生产者:发送消息的程序 * 消费者:监听接收消费消息的程序 * 消息:一串二进制数据流 * 队列:消息的暂存区/存储区 * 交换机:消息的中转站,用于接收分发消息。其中有 fanout、direct、topic、headers 四种 * 路由:相当于密钥/第三者,与交换机绑定即可路由消息到指定的队列! 正如上图所展示的消息模型的演变,接下来我们将以代码的形式实战各种典型的业务场景! >顺带宣传一下我关注的这个原创公众号:专注于 Java 编程技术和程序员软实力的方方面面,欢迎小伙伴们扫一扫关注一下,一定会大有所获。 ![猿码天地](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-wechat/%E5%BE%AE%E4%BF%A1%E5%85%AC%E4%BC%97%E5%8F%B7.jpg) #### 三、SpringBoot 整合 RabbitMQ 实战 我们首先需要借助 IDEA 的 Spring Initializr 用 Maven 构建一个 SpringBoot 的项目,并引入 RabbitMQ、Mybatis、Log4j 等第三方框架的依赖。搭建完成之后,可以简单的写个 RabbitMQController 测试一下项目是否搭建是否成功,下图是构建的项目以及创建好的规范目录: ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301004.jpg) ##### 思考? 在项目或者服务中使用 RabbitMQ,其实无非是有几个核心要点要牢牢把握住,这几个核心要点在撸码过程中需要“时刻的游荡在自己的脑海里”,其中包括: * 我要发送的消息是什么 * 我应该需要创建什么样的消息模型:DirectExchange+RoutingKey?TopicExchange+RoutingKey?等 * 我要处理的消息是实时的还是需要延时/延迟的? * 消息的生产者需要在哪里写,消息的监听消费者需要在哪里写,各自的处理逻辑是啥? ##### 安装RabbitMQ 基于这样的几个要点,我们先小试牛刀一番,采用 RabbitMQ 实战异步写日志与异步发邮件。当然啦,在进行实战前,我们需要安装好 RabbitMQ 及其后端控制台应用,并在项目中配置一下 RabbitMQ 的相关参数以及相关 Bean 组件。 RabbitMQ 安装(请在搜索引擎自行安装)完成后,打开后端控制台应用:http://localhost:15672/,用户名密码:guest/guest登录,看到下图即表示安装成功! ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301005.jpg) ##### 配置文件 然后是项目配置文件层面的配置 application.properties ```xml #rabbitmq spring.rabbitmq.virtual-host=/ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #并发量的配置 #并发消费者的初始化值 spring.rabbitmq.listener.concurrency=10 #并发消费者的最大值 spring.rabbitmq.listener.max-concurrency=20 #每个消费者每次监听时可拉取处理的消息数量 spring.rabbitmq.listener.prefetch=5 ``` 其中,后面三个参数主要是用于“并发量的配置”,表示:并发消费者的初始化值,并发消费者的最大值,每个消费者每次监听时可拉取处理的消息数量。 接下来,我们需要以 Configuration 的方式配置 RabbitMQ 并以 Bean 的方式显示注入 RabbitMQ 在发送接收处理消息时相关 Bean 组件配置其中典型的配置是 RabbitTemplate 以及 SimpleRabbitListenerContainerFactory,前者是充当消息的发送组件,后者是用于管理 RabbitMQ监听器 的容器工厂,其代码如下: ```java /** * rabbitmq配置类 */ @Configuration public class RabbitmqConfig { private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class); @Autowired private Environment env; @Autowired private CachingConnectionFactory connectionFactory; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 单一消费者 * @return */ @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); return factory; } /** * 多个消费者 * @return */ @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory,connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.NONE); factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class)); factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class)); factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class)); return factory; } @Bean public RabbitTemplate rabbitTemplate(){ connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } } ``` #### 四、RabbitMQ 实战:业务模块解耦以及异步通信 在一些企业级系统中,我们经常可以见到一个执行 function 通常是由许多子模块组成的,这个 function 在执行过程中,需要 同步 的将其代码从头开始执行到尾,即执行流程是 module_A -> module_B -> module_C -> module_D,典型的案例可以参见汇编或者 C 语言等面向过程语言开发的应用,现在的一些 JavaWeb 应用也存在着这样的写法。 而我们知道,这个执行流程其实对于整个 function 来讲是有一定的弊端的,主要有两点: * 整个 function 的执行响应时间将很久; 8 如果某个 module 发生异常而没有处理得当,可能会影响其他 module 甚至整个 function 的执行流程与结果; * 整个 function 中代码可能会很冗长,模块与模块之间可能需要进行强通信以及数据的交互,出现问题时难以定位与维护,甚至会陷入 “改一处代码而动全身”的尴尬境地! 故而,我们需要想办法进行优化,我们需要将强关联的业务模块解耦以及某些模块之间实行异步通信!下面就以两个场景来实战我们的优化措施! ##### 场景一:异步记录用户操作日志 对于企业级应用系统或者微服务应用中,我们经常需要追溯跟踪记录用户的操作日志,而这部分的业务在某种程度上是不应该跟主业务模块耦合在一起的,故而我们需要将其单独抽出并以异步的方式与主模块进行异步通信交互数据。 下面我们就用 RabbitMQ 的 DirectExchange+RoutingKey 消息模型也实现“用户登录成功记录日志”的场景。如前面所言,我们需要在脑海里回荡着几个要点: * 消息模型:DirectExchange+RoutingKey 消息模型 8 消息:用户登录的实体信息,包括用户名,登录事件,来源的IP,所属日志模块等信息 * 发送接收:在登录的 Controller 中实现发送,在某个 listener 中实现接收并将监听消费到的消息入数据表;实时发送接收 首先我们需要在上面的 RabbitmqConfig 类中创建消息模型:包括 Queue、Exchange、RoutingKey 等的建立,代码如下: ```java //TODO:用户操作日志消息模型(已实现) @Bean(name = "logUserQueue") public Queue logUserQueue(){ return new Queue(env.getProperty("log.user.queue.name"),true); } @Bean public DirectExchange logUserExchange(){ return new DirectExchange(env.getProperty("log.user.exchange.name"),true,false); } @Bean public Binding logUserBinding(){ return BindingBuilder.bind(logUserQueue()).to(logUserExchange()).with(env.getProperty("log.user.routing.key.name")); } ``` 上图中 env 获取的信息,我们需要在 application.properties 进行配置,其中 mq.env=local: ```xml # 用户操作日志消息模型(已实现) log.user.queue.name=${mq.env}.log.user.queue log.user.exchange.name=${mq.env}.log.user.exchange log.user.routing.key.name=${mq.env}.log.user.routing.key ``` 此时,我们将整个项目/服务跑起来,并打开 RabbitMQ 后端控制台应用,即可看到队列以及交换机及其绑定已经建立好了,如下所示: ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301006.jpg) 接下来,我们需要在 Controller 中执行用户登录逻辑,记录用户登录日志,查询获取用户角色视野资源信息等,由于篇幅关系,在这里我们重点要实现的是用MQ实现 “异步记录用户登录日志” 的逻辑,即在这里 Controller 将充当“生产者”的角色,核心代码如下: ```java /** * 异步记录用户操作日志Controller控制器 */ @RestController public class UserController { private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class); private static final String Prefix="user"; @Autowired private UserMapper userMapper; @Autowired private ObjectMapper objectMapper; @Autowired private UserLogMapper userLogMapper; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; // TODO: http://127.0.0.1:9092/mq/user/login @RequestMapping(value = Prefix+"/login",method = RequestMethod.POST,consumes = MediaType.MULTIPART_FORM_DATA_VALUE) public BaseResponse login(@RequestParam("userName") String userName,@RequestParam("password") String password){ BaseResponse response=new BaseResponse(StatusCode.Success); try { User user=userMapper.selectByUserNamePassword(userName,password); if(user == null){ response=new BaseResponse(StatusCode.Fail); }else{ //TODO:异步写用户日志 try { UserLog userLog=new UserLog(userName,"Login","login",objectMapper.writeValueAsString(user)); userLog.setCreateTime(new Date()); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(env.getProperty("log.user.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("log.user.routing.key.name")); Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(userLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON); rabbitTemplate.convertAndSend(message); }catch (Exception e){ e.printStackTrace(); } //TODO:塞权限数据-资源数据-视图数据 } }catch (Exception e){ e.printStackTrace(); } return response; } } ``` 在上面的“发送逻辑”代码中,其实也体现了我们最开始介绍的演进中的几种消息模型,比如我们是将消息发送到 Exchange 的而不是 Queue,消息是以二进制流的形式进行传输等等。当用 postman 请求到这个 controller 的方法时,我们可以在 RabbitMQ 的后端控制台应用看到一条未确认的消息,通过 GetMessage 即可看到其中的详情,如下: ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301007.jpg) 最后,我们将开发消费端的业务代码,如下: ```java /** * 消息消费Controller控制器 */ @Component public class CommonMqListener { private static final Logger log= LoggerFactory.getLogger(CommonMqListener.class); @Autowired private ObjectMapper objectMapper; @Autowired private UserLogMapper userLogMapper; @Autowired private MailService mailService; /** * 监听消费用户日志 * @param message */ @RabbitListener(queues = "${log.user.queue.name}",containerFactory = "singleListenerContainer") public void consumeUserLogQueue(@Payload byte[] message){ try { UserLog userLog=objectMapper.readValue(message, UserLog.class); log.info("监听消费用户日志 监听到消息: {} ",userLog); userLogMapper.insertSelective(userLog); }catch (Exception e){ e.printStackTrace(); } } } ``` 将服务跑起来之后,我们即可监听消费到上面 Queue 中的消息,即当前用户登录的信息。 “异步记录用户操作日志”的案例我想足以用于诠释上面所讲的相关理论知识点了。 ##### 场景二:异步发送邮件 发送邮件的场景,其实也是比较常见的,比如用户注册需要邮箱验证,用户异地登录发送邮件通知等等,在这里我以 RabbitMQ 实现异步发送邮件。实现的步骤跟场景一几乎一致! 1、消息模型的创建 ```java //TODO:发送邮件消息模型(已实现) @Bean public Queue mailQueue(){ return new Queue(env.getProperty("mail.queue.name"),true); } @Bean public DirectExchange mailExchange(){ return new DirectExchange(env.getProperty("mail.exchange.name"),true,false); } @Bean public Binding mailBinding(){ return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(env.getProperty("mail.routing.key.name")); } ``` 2、配置信息的创建 ```java # 发送邮件消息模型(已实现) mail.queue.name=${mq.env}.mail.queue mail.exchange.name=${mq.env}.mail.exchange mail.routing.key.name=${mq.env}.mail.routing.key ``` 3、生产端 ```java /** * 异步发送邮件Controller控制器 */ @RestController public class MailController { private static final Logger log= LoggerFactory.getLogger(MailController.class); private static final String Prefix="mail"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; // TODO: http://127.0.0.1:9092/mq/mail/send @RequestMapping(value = Prefix+"/send",method = RequestMethod.GET) public BaseResponse sendMail(){ BaseResponse response=new BaseResponse(StatusCode.Success); try { rabbitTemplate.setExchange(env.getProperty("mail.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("mail.routing.key.name")); rabbitTemplate.convertAndSend(MessageBuilder.withBody("mail发送".getBytes("UTF-8")).build()); }catch (Exception e){ e.printStackTrace(); } log.info("邮件发送完毕"); return response; } } ``` 4、消费端 ```java /** * 消息消费Controller控制器 */ @Component public class CommonMqListener { private static final Logger log= LoggerFactory.getLogger(CommonMqListener.class); @Autowired private ObjectMapper objectMapper; @Autowired private UserLogMapper userLogMapper; @Autowired private MailService mailService; /** * 监听消费邮件发送 * @param message */ @RabbitListener(queues = "${mail.queue.name}",containerFactory = "singleListenerContainer") public void consumeMailQueue(@Payload byte[] message){ try { log.info("监听消费邮件发送 监听到消息: {} ",new String(message,"UTF-8")); mailService.sendEmail(); }catch (Exception e){ e.printStackTrace(); } } } ``` 5、操作及结果 使用postman发送请求 http://127.0.0.1:9092/mq/mail/send ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301008.jpg) #### 五、RabbitMQ 实战:并发量配置与消息确认机制 >实战背景 对于消息模型中的 listener 而言,默认情况下是“单消费实例”的配置,即“一个 listener 对应一个消费者”,这种配置对于上面所讲的“异步记录用户操作日志”、“异步发送邮件”等并发量不高的场景下是适用的。但是在对于秒杀系统、商城抢单等场景下可能会显得很吃力! 我们都知道,秒杀系统跟商城抢单均有一个共同的明显的特征,即在某个时刻会有成百上千万的请求到达我们的接口,即瞬间这股巨大的流量将涌入我们的系统,我们可以采用下面一图来大致体现这一现象: ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301009.jpg) 当到了“开始秒杀”、“开始抢单”的时刻,此时系统可能会出现这样的几种现象: * 应用系统配置承载不了这股瞬间流量,导致系统直接挂掉,即传说中的“宕机”现象; * 接口逻辑没有考虑并发情况,数据库读写锁发生冲突,导致最终处理结果跟理论上的结果数据不一致(如商品存库量只有 100,但是高并发情况下,实际表记录的抢到的用户记录数据量却远远大于 100); * 应用占据服务器的资源直接飙高,如 CPU、内存、宽带等瞬间直接飙升,导致同库同表甚至可能同 host 的其他服务或者系统出现卡顿或者挂掉的现象; 于是乎,我们需要寻找解决方案!对于目前来讲,网上均有诸多比较不错的解决方案,在此我顺便提一下我们的应用系统采用的常用解决方案,包括: * 我们会将处理抢单的整体业务逻辑独立、服务化并做集群部署; * 我们会将那股巨大的流量拒在系统的上层,即将其转移至 MQ 而不直接涌入我们的接口,从而减少数据库读写锁冲突的发生以及由于接口逻辑的复杂出现线程堵塞而导致应用占据服务器资源飙升; * 我们会将抢单业务所在系统的其他同数据源甚至同表的业务拆分独立出去服务化,并基于某种 RPC 协议走 HTTP 通信进行数据交互、服务通信等等; * 采用分布式锁解决同一时间同个手机号、同一时间同个 IP 刷单的现象; 下面,我们用 RabbitMQ 来实战上述的第二点!即我们会在“请求” -> "处理抢单业务的接口" 中间架一层消息中间件做“缓冲”、“缓压”处理,如下图所示: ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301010.jpg) >并发量配置与消息确认机制 正如上面所讲的,对于抢单、秒杀等高并发系统而言,如果我们需要用 RabbitMQ 在 “请求” - “接口” 之间充当限流缓压的角色,那便需要我们对 RabbitMQ 提出更高的要求,即支持高并发的配置,在这里我们需要明确一点,“并发消费者”的配置其实是针对 listener 而言,当配置成功后,我们可以在 MQ 的后端控制台应用看到 consumers 的数量,如下所示: ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301011.jpg) 其中,这个 listener 在这里有 10 个 consumer 实例的配置,每个 consumer 可以预监听消费拉取的消息数量为 5 个(如果同一时间处理不完,会将其缓存在 mq 的客户端等待处理!) 另外,对于某些消息而言,我们有时候需要严格的知道消息是否已经被 consumer 监听消费处理了,即我们有一种消息确认机制来保证我们的消息是否已经真正的被消费处理。在 RabbitMQ 中,消息确认处理机制有三种:Auto - 自动、Manual - 手动、None - 无需确认,而确认机制需要 listener 实现 ChannelAwareMessageListener 接口,并重写其中的确认消费逻辑。在这里我们将用 “手动确认” 的机制来实战用户商城抢单场景。 1、在 RabbitMQConfig 中配置确认消费机制以及并发量的配置 ```java //TODO:用户商城抢单实战(已实现) @Bean(name = "userOrderQueue") public Queue userOrderQueue(){ return new Queue(env.getProperty("user.order.queue.name"),true); } @Bean public TopicExchange userOrderExchange(){ return new TopicExchange(env.getProperty("user.order.exchange.name"),true,false); } @Bean public Binding userOrderBinding(){ return BindingBuilder.bind(userOrderQueue()).to(userOrderExchange()).with(env.getProperty("user.order.routing.key.name")); } @Autowired private UserOrderListener userOrderListener; @Bean public SimpleMessageListenerContainer listenerContainerUserOrder(@Qualifier("userOrderQueue") Queue userOrderQueue){ SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setMessageConverter(new Jackson2JsonMessageConverter()); //TODO:并发配置 container.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",Integer.class)); container.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",Integer.class)); container.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",Integer.class)); //TODO:消息确认机制 container.setQueues(userOrderQueue); container.setMessageListener(userOrderListener); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } ``` 2、消息模型的配置信息 ```xml # 用户商城抢单实战(已实现) user.order.queue.name=${mq.env}.user.order.queue user.order.exchange.name=${mq.env}.user.order.exchange user.order.routing.key.name=${mq.env}.user.order.routing.key ``` 3、RabbitMQ 后端控制台应用查看此队列的并发量配置 ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301012.jpg) 4、listener 确认消费处理逻辑:在这里我们需要开发抢单的业务逻辑,即“只有当该商品的库存 >0 时,抢单成功,扣减库存量,并将该抢单的用户信息记录入表,异步通知用户抢单成功!” listener 确认消费处理逻辑,需要实现ChannelAwareMessageListener ```java /** * listener 确认消费处理逻辑 */ @Component("userOrderListener") public class UserOrderListener implements ChannelAwareMessageListener{ private static final Logger log= LoggerFactory.getLogger(UserOrderListener.class); @Autowired private ObjectMapper objectMapper; @Autowired private UserOrderMapper userOrderMapper; @Autowired private ConcurrencyService concurrencyService; @Override public void onMessage(Message message, Channel channel) throws Exception { long tag=message.getMessageProperties().getDeliveryTag(); try { byte[] body=message.getBody(); /*UserOrderDto entity=objectMapper.readValue(body, UserOrderDto.class); log.info("用户商城抢单监听到消息: {} ",entity); UserOrder userOrder=new UserOrder(); BeanUtils.copyProperties(entity,userOrder); userOrder.setStatus(1); userOrderMapper.insertSelective(userOrder);*/ // TODO 监听消费处理逻辑 String mobile=new String(body,"UTF-8"); log.info("监听到抢单手机号: {} ",mobile); concurrencyService.manageRobbing(String.valueOf(mobile)); // TODO 确认消费 channel.basicAck(tag,true); }catch (Exception e){ log.error("用户商城下单 发生异常:",e.fillInStackTrace()); // TODO 确认消费 channel.basicReject(tag,false); } } } ``` 处理抢单逻辑 ```java /** * 处理抢单逻辑 */ @Service public class ConcurrencyService { private static final Logger log= LoggerFactory.getLogger(ConcurrencyService.class); private static final String ProductNo="product_10010"; @Autowired private ProductMapper productMapper; @Autowired private ProductRobbingRecordMapper productRobbingRecordMapper; /** * 处理抢单 * @param mobile */ public void manageRobbing(String mobile){ //TODO --v1.0 /*try { Product product=productMapper.selectByProductNo(ProductNo); if (product!=null && product.getTotal()>0){ log.info("当前手机号:{} 恭喜您抢到单了!",mobile); productMapper.updateTotal(product); }else{ log.error("当前手机号:{} 抢不到单!",mobile); } }catch (Exception e){ log.error("处理抢单发生异常:mobile={} ",mobile); }*/ //TODO --v2.0 /** * 抢单请求处理逻辑 * 1、有库存 * 2、扣减库存后,即能更新成功时记录抢单成功用户信息 * 3、异步发送通知给用户抢单成功(略) */ try { Product product=productMapper.selectByProductNo(ProductNo); if (product!=null && product.getTotal()>0){ int result=productMapper.updateTotal(product); if (result>0) { ProductRobbingRecord entity=new ProductRobbingRecord(); entity.setMobile(mobile); entity.setProductId(product.getId()); productRobbingRecordMapper.insertSelective(entity); } } }catch (Exception e){ log.error("处理抢单发生异常:mobile={} ",mobile); } } } ``` 5、紧接着我们采用 CountDownLatch 模拟产生高并发时的多线程请求(或者采用 jmeter 实施压测也可以!),每个请求将携带产生的随机数:充当手机号 -> 充当消息,最终入抢单队列!在这里,我模拟了 50000 个请求,相当于 50000 手机号同一时间发生抢单的请求,而设置的产品库存量为 100,这在 product 数据库表即可设置。 ```java /** * 模拟产生高并发的抢单请求Service * countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。 * 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。 */ @Service public class InitService { private static final Logger log= LoggerFactory.getLogger(InitService.class); public static final int ThreadNum = 200; private static int mobile=0; @Autowired private ConcurrencyService concurrencyService; @Autowired private CommonMqService commonMqService; //TODO 采用 CountDownLatch 模拟产生高并发时的多线程请求 public void generateMultiThread(){ log.info("开始初始化线程数----> "); try { CountDownLatch countDownLatch=new CountDownLatch(1); for (int i=0;i "); startLatch.await(); mobile += 1; log.info("mobile----> mobile:{}",mobile); //TODO:发送消息入抢单队列:env.getProperty("user.order.queue.name") commonMqService.sendRobbingMsgV2(String.valueOf(mobile)); // TODO:直接处理抢单 // concurrencyService.manageRobbing(String.valueOf(mobile));//v1.0 // TODO: 发送抢单信息入队列 env.getProperty("product.robbing.mq.queue.name") //commonMqService.sendRobbingMsg(String.valueOf(mobile));//v2.0 }catch (Exception e){ e.printStackTrace(); } } } } ``` 6、将抢单请求的手机号信息压入队列,等待排队处理 ```java /** * 将抢单请求的手机号信息压入队列,等待排队处理 */ @Service public class CommonMqService { private static final Logger log= LoggerFactory.getLogger(ConcurrencyService.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; /** * 发送抢单信息入队列 * @param mobile */ public void sendRobbingMsgV2(String mobile){ try { rabbitTemplate.setExchange(env.getProperty("user.order.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("user.order.routing.key.name")); Message message=MessageBuilder.withBody(mobile.getBytes("UTF-8")).setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); rabbitTemplate.send(message); log.info( "将抢单请求的手机号信息压入队列V2,等待排队处理 mobile:{}",mobile); }catch (Exception e){ log.error("发送抢单信息入队列V2 发生异常: mobile={} ",mobile); } } } ``` 7、在最后我们写个 Junit 或者写个 Controller,进行 initService.generateMultiThread(); 调用模拟产生高并发的抢单请求即可。 ```java /** * 模拟产生高并发的抢单请求Contrlooer控制器 */ @RestController public class ConcurrencyController { private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class); private static final String Prefix="concurrency"; @Autowired private InitService initService; // TODO: http://127.0.0.1:9092/mq/concurrency/robbing/thread @RequestMapping(value = Prefix+"/robbing/thread",method = RequestMethod.GET) public BaseResponse robbingThread(){ BaseResponse response=new BaseResponse(StatusCode.Success); initService.generateMultiThread(); return response; } } ``` 8、最后,我们当然是跑起来,在控制台我们可以观察到系统不断的在产生新的请求(线程)– 相当于不断的有抢单的手机号涌入我们的系统,然后入队列,listener 监听到请求之后消费处理抢单逻辑!最后我们可以观察两张数据库表:商品库存表、商品成功抢单的用户记录表 - 只有当库存表中商品对应的库存量为 0、商品成功抢单的用户记录刚好 100 时 即表示我们的实战目的以及效果已经达到了!! ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301013.jpg) ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301014.jpg) 总结:如此一来,我们便将 request 转移到我们的 mq,在一定程度缓解了我们的应用以及接口的压力!当然,实际情况下,我们的配置可能远远不只代码层次上的配置,比如我们的 mq 可能会做集群配置、负载均衡、商品库存的更新可能会考虑分库分表、库存更新可能会考虑独立为库存 Dubbo 服务并通过 Rest Api 异步通信交互并独立部署等等。这些优化以及改进的目的其实无非是为了能限流、缓压、保证系统稳定、数据的一致等!而我们的 MQ,在其中可以起到不可磨灭的作用,其字如其名:“消息队列”,而队列具有 “先进先出” 的特点,故而所有进入 MQ 的消息都将 “乖巧” 的在 MQ 上排好队,先来先排队,先来先被处理消费,由此一来至少可以避免 “瞬间时刻一窝蜂的 request 涌入我们的接口” 的情况! 附注:在用 RabbitMQ 实战上述高并发抢单解决方案,其实我也在数据库层面进行了优化,即在读写存库时采用了“类似乐观锁”的写法,保证:抢单的请求到来时有库存,更新存库时保证有库存可以被更新! #### 六、RabbitMQ 实战:死信队列认识与场景实战 >死信队列认识 死信队列,又可以称之为“延迟/延时队列”,也是队列的一种,只不过与普通的队列最大的不同之处在于创建时的组成成分不同,创建死信队列的“成分”将不仅仅只是:名称、持久化、自动删除等基本属性,还包含了死信交换机、死信路由甚至还有TTL(Time-To-Live)即队列中消息可生存的时间。 死信队列其实最大的作用是可以实现消息或者数据延迟/延时处理,而且还可以动态的设定延迟的时间,即动态设定 TTL。典型的业务场景很多,在这里就不一一列举了,总之,凡是业务中需要延迟一定时间再处理的数据均可以将其压入死信队列中,等待一定的时间后再执行真正的处理逻辑! 下面是死信队列在创建、绑定、生产消息、消费消息过程的结构流程图,在这里其实已经很明确的指出死信队列的创建跟绑定逻辑 以及 真正监听消费处理消息的队列的绑定逻辑。图中问题的答案为:当入死信队列的消息TTL一到,它自然而然的将被路由到 死信交换机绑定的队列 中被真正消费处理!!! ![](https://ymtd-1307390667.cos.ap-guangzhou.myqcloud.com/img-rabbitmq/20210301015.jpg) >死信队列场景实战 有了上面的流程图做指导,接下来,我们将用死信队列实战这样的一个业务场景:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效! 于是乎,我们需要创建两个消息模型,在 RabbitmqConfig 实施: * 死信队列:用于设定指定的待支付的交易订单号在指定的 TTL(单位为 ms)后何去何从! * 真正队列:用于监听消费处理指定的交易订单号,即判断该交易订单号是否已完成,如果否,则失效之! ```java //TODO:用户下单支付超时死信队列(延迟/延时队列)模型(已实现) @Bean public Queue userOrderDeadQueue(){ Map args=new HashMap(); args.put("x-dead-letter-exchange",env.getProperty("user.order.dead.exchange.name")); args.put("x-dead-letter-routing-key",env.getProperty("user.order.dead.routing.key.name")); // TTL超时设置 args.put("x-message-ttl",10000); return new Queue(env.getProperty("user.order.dead.queue.name"),true,false,false,args); } //绑定死信队列-面向生产端 @Bean public TopicExchange userOrderDeadExchange(){ return new TopicExchange(env.getProperty("user.order.dead.produce.exchange.name"),true,false); } @Bean public Binding userOrderDeadBinding(){ return BindingBuilder.bind(userOrderDeadQueue()).to(userOrderDeadExchange()).with(env.getProperty("user.order.dead.produce.routing.key.name")); } //创建并绑定实际监听消费队列-面向消费端 @Bean public Queue userOrderDeadRealQueue(){ return new Queue(env.getProperty("user.order.dead.real.queue.name"),true); } @Bean public TopicExchange userOrderDeadRealExchange(){ return new TopicExchange(env.getProperty("user.order.dead.exchange.name")); } @Bean public Binding userOrderDeadRealBinding(){ return BindingBuilder.bind(userOrderDeadRealQueue()).to(userOrderDeadRealExchange()).with(env.getProperty("user.order.dead.routing.key.name")); } ``` ```xml # 用户下单支付超时死信队列(延迟/延时队列)模型(已实现) user.order.dead.queue.name=${mq.env}.user.order.dead.queue user.order.dead.exchange.name=${mq.env}.user.order.dead.exchange user.order.dead.routing.key.name=${mq.env}.user.order.dead.routing.key # 绑定死信队列-面向生产端(已实现) # 创建并绑定实际监听消费队列-面向消费端(已实现) user.order.dead.produce.exchange.name=${mq.env}.user.order.dead.produce.exchange user.order.dead.produce.routing.key.name=${mq.env}.user.order.dead.produce.routing.key user.order.dead.real.queue.name=${mq.env}.user.order.dead.real.queue ``` 接下来是我们的生产端的逻辑:用户商城下单的处理! UserOrderController类 ```java /** * 用户商城下单 * @param dto * @return */ // TODO: http://127.0.0.1:9092/mq/user/order/push/dead/queue @RequestMapping(value = Prefix+"/push/dead/queue",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE) public BaseResponse pushUserOrderV2(@RequestBody UserOrderDto dto){ BaseResponse response=new BaseResponse(StatusCode.Success); UserOrder userOrder=new UserOrder(); /** * 1、用户商城下单,记录下单信息,此时交易订单状态为1-已保存 * 2、如果用户在指定的TTL内立即支付,则将其变为2-已支付 * 3、如果用户在TTL内没有完成支付,则变为3-已失效 */ try { BeanUtils.copyProperties(dto,userOrder); userOrder.setStatus(1); userOrderMapper.insertSelective(userOrder); }catch (Exception e){ e.printStackTrace(); } try { Integer id=userOrder.getId(); /** * 发送消息入死信队列,等待TTL被真正的对垒消费处理 */ rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(env.getProperty("user.order.dead.produce.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("user.order.dead.produce.routing.key.name")); rabbitTemplate.convertAndSend(id, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties properties=message.getMessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,Integer.class); return message; } }); }catch (Exception e){ e.printStackTrace(); } return response; } ``` 接下来是等待固定的 TTL:在这里设定的是 10s,当消息入死信队列 10s 后,将自然而然的将消息路由到下一个中转站,即真正的消费监听处理队列进行处理:判断该笔交易订单号是否已经付款,如果否,则失效之! UserOrderDeadListener类 ```java /** * 真正的队列处理核心逻辑 * @param id */ @RabbitListener(queues = "${user.order.dead.real.queue.name}",containerFactory = "multiListenerContainer") public void consumeMessage(@Payload Integer id){ try { log.info("死信队列-用户下单超时未支付监听消息: {} ",id); UserOrder entity=userOrderMapper.selectByPkAndStatus(id,1); if (entity!=null){ // TTL时间内未支付 entity.setStatus(3); // 该笔订单失效 entity.setUpdateTime(new Date()); userOrderMapper.updateByPrimaryKeySelective(entity); }else{ //TODO:已支付-可能需要异步 减库存-异步发送其他日志消息 } }catch (Exception e){ e.printStackTrace(); } } ``` 可以将该服务跑起来,然后发起 controller 的用户下单请求,会发现消息入死信队列后不会立马被消费,等待 10s 会,消息会被路由到真正的消费队列中进行处理,这一现象可以在 MQ 的后端控制台应用中看到! 总结:到此我们的死信队列已经实战完毕,回顾我们所介绍的历程,其实核心重点在于上面的那张 “死信队列的结构流程图”,理解了这个结构流程图中的相关组件及其流程,则在实战各种需要延时处理的业务场景将得心应手,包括如何创建死信队列,如何面向生产端绑定死信队列,如何面向消费端绑定真正的队列等等!而对于死信队列的实战场景,前面也介绍过了:凡是需要等待一定时间或者需要缓一缓特定时间的业务、数据均可以通过死信队列来实现! #### 七、总结 RabbitMQ 的认识与实际业务场景的实战到此我都已经介绍完毕,总体而言,RabbitMQ 作为目前应用相当广泛的消息中间件,在我们实际系统的业务模块中具有重要的作用,特别是刚开始介绍的几种消息模型以及死信队列模型在微服务系统、分布式系统中均可充当重要的角色,其中我们实战的业务场景包括业务服务模块解耦异步通信(异步发送日志、异步发送邮件);另外,我们还介绍了消息确认机制,这是一种 MQ 确保消息能被消费者消费的机制,对于一些业务模块也是有广泛的应用;除此之外,我们还模拟实战了秒杀系统、抢单系统这样的业务场景下 RabbitMQ 的作用:限流、排队缓压、减少数据库读写锁的发生等等! #### 八、Spring 的事件驱动模型 在上面篇幅中,我们省略了 Spring 的事件驱动模型的介绍,在此,我们简单的补充一下! 这种模型在某种程度上其实也属于 “消息队列” 的一种,也可以起到业务服务模块异步解耦的作用,只不过可能功能没有 RabbitMQ、RocketMQ、Kafka 的强大。但是,当我们实战了 RabbitMQ 各种消息模型以及查看其底层源码时,会发现其与 ApplicationEvent、ApplicationListener 以及 ApplicationEventPublisher 具有一定的联系! 正如你所看到的,此种消息模型的核心组件包括三个: * ApplicationEvent:相当于消息 - 也是一串数据流 * ApplicationListener:相当于 listener - 即监听消费端 * ApplicationEventPublisher:相当于发送消息的事件源组件 RabbitTemplate 下面,以订单记录信息为案例简要的实战一下Spring的事件驱动模型: 首先是消息的创建 ```java /** * 类似于 Message * 相当于-Rabbitmq的message-一串二进制数据流 */ public class PushOrderRecordEvent extends ApplicationEvent{ private String orderNo; private String orderType; public PushOrderRecordEvent(Object source, String orderNo, String orderType) { super(source); this.orderNo = orderNo; this.orderType = orderType; } public String getOrderNo() { return orderNo; } public void setOrderNo(String orderNo) { this.orderNo = orderNo; } public String getOrderType() { return orderType; } public void setOrderType(String orderType) { this.orderType = orderType; } @Override public String toString() { return "PushOrderRecordEvent{" + "orderNo='" + orderNo + '\'' + ", orderType='" + orderType + '\'' + '}'; } } ``` 接着是消费监听端listener的创建 ```java /** * 这就是监听器-跟RabbitMQ的Listener几乎是一个理念 * ApplicationListener 指定了要监听消费的消息,相当于内置了Binding */ @Component public class PushOrderRecordListener implements ApplicationListener{ private static final Logger log= LoggerFactory.getLogger(PushOrderRecordListener.class); @Autowired private OrderRecordMapper orderRecordMapper; /** * 监听消费处理逻辑 * @param event */ @Override public void onApplicationEvent(PushOrderRecordEvent event) { log.info("监听到的下单记录: {} ",event); try { if (event!=null){ OrderRecord entity=new OrderRecord(); BeanUtils.copyProperties(event,entity); orderRecordMapper.insertSelective(entity); } }catch (Exception e){ log.error("监听下单记录发生异常:{} ",event,e.fillInStackTrace()); } } } ``` 最后是生产端消息的发送 ```java /** * Spring 的事件驱动模型-生产端消息的发送 */ @RestController public class OrderRecordController { private static final Logger log= LoggerFactory.getLogger(OrderRecordController.class); private static final String Prefix="order"; //TODO:类似于RabbitTemplate 发送消息的组件 @Autowired private ApplicationEventPublisher publisher; /** * 下单 * @return */ @RequestMapping(value = Prefix+"/push",method = RequestMethod.GET) public BaseResponse pushOrder(){ BaseResponse response=new BaseResponse(StatusCode.Success); try { // 发送消息 PushOrderRecordEvent event=new PushOrderRecordEvent(this,"orderNo_20180821001","物流12"); publisher.publishEvent(event); }catch (Exception e){ log.error("下单发生异常:",e.fillInStackTrace()); } return response; } } ``` >你多学一样本事,就少说一句求人的话,现在的努力,是为了以后的不求别人,实力是最强的底气。记住,活着不是靠泪水博得同情,而是靠汗水赢得掌声。——《写给程序员朋友》 ## [猿码天地-Java知识学堂脑图](https://www.processon.com/view/link/6035ed1f079129248a64a6af) (查看文件密码:请关注公众号【猿码天地】,回复关键字‘活到老学到老’获取) ## [猿码天地-Java超神之路脑图](https://www.processon.com/view/link/6035f068e0b34d124437e0e1) (查看文件密码:请关注公众号【猿码天地】,回复关键字‘活到老学到老’获取)