# rabbitmp_demo **Repository Path**: jefferyeven/rabbitmp_demo ## Basic Information - **Project Name**: rabbitmp_demo - **Description**: 简单的rabbitmp的几中消息调度策略(fanout,direct,headers,topic),和rabbitmp发布确认和接收确认优化 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-04-13 - **Last Updated**: 2022-04-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 一. RabbitMQ 简介 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。 RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。 RabbitMQ要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。 # 二.RabbitMQ 基本概况: 1.Channel(信道):数据通道,进行交换数据。(网卡-内存/内存-网卡) 2.Producer(消息的生产者):向消息队列发布消息的客户端应用程序。 3.Consumer(消息的消费者):从消息队列取得消息的客户端应用程序。 4.Message(消息):消息由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。 5.Routing Key(路由键):消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。最大长度255 字节。 6.Queue(消息队列):存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。 7.Exchange(交换器|路由器):提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略(下面会介绍),分别是fanout, direct, topic, headers。 8.Binding(绑定):用于建立Exchange和Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。 6.Binding Key(绑定键):Exchange与Queue的绑定关系,用于匹配Routing Key。最大长度255 字节。 7.Broker:RabbitMQ Server,服务器实体。 # 三.Exchange类型 ## fanout类型 如下图,发布订阅模型,添加两个队列,分别各用一个消费者监听,设置一个交换机,类型为广播(fanout),交换机会将收到的消息广播给所有相连的队列: ![输入图片说明](image/fanout1image.png) ![输入图片说明](image/fanout2image.png) 具体思路,将多个队列绑定到同一个FanoutExchange上 交换机配置 ```java //发布订阅模式的配置,包括两个队列和对应的订阅者,发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机 @Configuration public class PublishSubscribeConfig { @Bean public Queue myQueue1() { Queue queue = new Queue("publish_queue1"); return queue; } @Bean public Queue myQueue2() { Queue queue = new Queue("publish_queue2"); return queue; } @Bean public FanoutExchange fanoutExchange() { FanoutExchange fanoutExchange = new FanoutExchange("fanout"); return fanoutExchange; } @Bean public Binding binding1() { Binding binding = BindingBuilder.bind(myQueue1()).to(fanoutExchange()); return binding; } @Bean public Binding binding2() { Binding binding = BindingBuilder.bind(myQueue2()).to(fanoutExchange()); return binding; } } ``` 发送数据 ``` @RestController public class SendMessageController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 // 发布订阅者 @GetMapping("/publishSubscribe") public String publishSubscribe() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: 发布订阅 ,扇形交换机"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("fanout", "", map); return "ok"; } } ``` 接收数据 ```java @RabbitListener(queues = "publish_queue2") public void subscribe2(Map map,Channel channel, Message message) throws IOException { System.out.println(map.get("name")) } ``` ## direct(路由模式) 这种模式是比较常用的模式 ### 生产者消费者模型 生产者消费者模型:添加了一个队列,并创建了两个消费者用于监听队列消息,我们发现,当有消息到达时,两个消费者会交替收到消息。这一过程虽然不用创建交换机,但会使用默认的交换机,并用默认的直连(default-direct)策略连接队列; ![输入图片说明](image/default_directimage.png) 配置 ``` @Configuration public class ProducerConsumerConfig { @Bean public Queue myQueue() { Queue queue=new Queue("myqueue"); return queue; } } ``` 发布数据 rabbitTemplate.convertAndSend("myqueue", manMap); 接收数据 @RabbitListener(queues = "myqueue") public void displayMail(Map mail, Channel channel, Message message) ## direct直连交换机通信模型 包括一个direct交换机 Direct交换器需要消息的Routing Key与 Exchange和Queue 之间的Binding Key完全匹配,如果匹配成功,将消息分发到该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct是Exchange的默认模式。RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有的Queue(每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。所以,有时候我们感觉不需要交换器也可以发送和接收消息,但是实际上是使用了RabbitMQ默认提供的Exchange。 ![输入图片说明](image/directimage.png) 配置 ```java @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { return new Queue("TestDirectQueue", true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange", true, false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { //将队列 绑定to exchange with (绑定的key) return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } } ``` 发送数据 rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); 接收数据 只需要监听队列就行 @RabbitListener(queues = "TestDirectQueue") # Topic(通配符模式) Topic交换器按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(也可以是零个或一个)。 ![输入图片说明](image/topicimage.png) 配置 ``` @Configuration public class TopicRabbitConfig { //绑定键 public final static String man = "topic.man"; public final static String total = "topic.total"; @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man); } @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.total); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man //这样只要是消息携带的路由键是topic.man,才会分发到该队列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } } ``` 发布数据 rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap); 接收数据 @RabbitListener(queues = "topic.total") # headers(头部类型) headers exchange主要通过发送的request message中的header进行匹配,其中匹配规则(x-match)又分为all和any,all代表必须所有的键值对匹配,any代表只要有一个键值对匹配即可。headers exchange的默认匹配规则(x-match)是any。 ```java @Configuration public class HeaderRabbitConfig { @Bean public Queue queueN1() { return new Queue("queueN1"); } @Bean public Queue queueN2() { return new Queue("queueN2"); } @Bean public HeadersExchange headersExchange(){ return new HeadersExchange("headersExchange"); } @Bean public Binding queueN1Binding(){ Map map = new HashMap<>(); map.put("queueName","queueN1"); map.put("bindType","whereAll"); return BindingBuilder.bind(queueN1()).to(headersExchange()).whereAll(map).match(); } @Bean public Binding queueN2Binding(){ Map map = new HashMap<>(); map.put("queueName","queueN2"); map.put("bindType","whereAny"); return BindingBuilder.bind(queueN2()).to(headersExchange()).whereAny(map).match(); } } ``` 发送信息 ``` // 发布header模式 @GetMapping("/headerAll") public String headersAll() { String messageData = "message: header 模式 All"; MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("queueName","queueN1"); messageProperties.setHeader("bindType","whereAll"); Message message = new Message(messageData.getBytes(), messageProperties); rabbitTemplate.convertAndSend("headersExchange",null,message); return "ok"; } @GetMapping("/headerAny") public String headersAny() { String messageData = "message: header 模式 any"; MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("queueName","queueN2"); messageProperties.setHeader("bindType","whereAll"); Message message = new Message(messageData.getBytes(), messageProperties); rabbitTemplate.send("headersExchange",null,message); return "ok"; } ``` 接收消息 @RabbitListener(queues = "queueN1") @RabbitListener(queues = "queueN2") # rpc ![输入图片说明](image/rpcimage.png) RPC的处理流程: 当客户端启动时,创建一个匿名的回调队列。 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。 请求被发送到rpc_queue队列中。 RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。 # 实现方式 我们主要通过更改RabbitTemplate来实现,具体来说取消使用模板自动的返回队列,设置返回队列,设置使用UserCorrelationId. ```java @Autowired ConnectionFactory connectionFactory; @Autowired // rpc 需要使用的template public RabbitTemplate directRabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); //这一步非常关键 template.setUseTemporaryReplyQueues(false); // 设置消息返回地址 template.setReplyAddress("amq.rabbitmq.reply-to"); // template.expectedQueueNames(); // 设置唯一对应id,发送一条信息必须携带身份id,然后根据身份在返回地址接收数据 template.setUserCorrelationId(true); //设置请求超时时间为10s template.setReplyTimeout(10000); return template; } ``` 发送信息 主要是使用sendAndReceive接收返回值 ``` // 测试rpc @GetMapping("/rpcSend") public String sendRpcMessage(){ //设置消息唯一id CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); String start = "10"; System.out.println(" [x] Requesting fib(" + start + ")"); //直接发送message对象 MessageProperties messageProperties = new MessageProperties(); //过期时间10秒,也是为了减少消息挤压的可能 messageProperties.setExpiration("10000"); messageProperties.setCorrelationId(correlationId.getId()); Message message = new Message(start.getBytes(), messageProperties); Message response = directRabbitTemplate() .sendAndReceive("tut.rpc", "rpc",message,correlationId); if (response != null) { System.out.println(" [.] Got '" + new String(response.getBody()) + "'"); }else{ System.out.println("请求超时"); } return "调用rpc:"+response; } ``` # 接收信息 设置return值。 @RabbitListener(queues = "tut.rpc.requests") public String process(String in, Channel channel, Message message) throws IOException { } # rabbitmq 优化 ## 发布确认 我们可以设置回调函数,可以自定义失败的处理函数,例如找不到exchange,找不到queque ```java //重写RabbitTemplate 重新设置callback @Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"相关数据:"+correlationData); System.out.println("ConfirmCallback: "+"确认情况:"+ack); System.out.println("ConfirmCallback: "+"原因:"+cause); } }); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("ReturnCallback: "+"消息:"+returnedMessage.getMessage()); System.out.println("ReturnCallback: "+"回应码:"+returnedMessage.getReplyCode()); System.out.println("ReturnCallback: "+"回应信息:"+returnedMessage.getReplyText()); System.out.println("ReturnCallback: "+"交换机:"+returnedMessage.getExchange()); System.out.println("ReturnCallback: "+"路由键:"+returnedMessage.getRoutingKey()); } }); return rabbitTemplate; } } ``` ## 接收确认和限流 将acknowledge-mode设定未manual 并且设定消费数量。 在yml添加设定 ``` listener: direct: acknowledge-mode: manual consumers-per-queue: 1 ``` 接收方需要手动确认 hannel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 具体内容可以看gitee https://gitee.com/jefferyeven/rabbitmp_demo