# Rabbitmq_Init **Repository Path**: tangss_695/rabbitmq_init ## Basic Information - **Project Name**: Rabbitmq_Init - **Description**: 从0开始学起,下载->安装->配置->知识点->实践,一步一步吃透RabbitMQ - **Primary Language**: C# - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-09-05 - **Last Updated**: 2024-09-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 一、前言 N年前在工作中用到过RabbitMQ,如今找工作经常被问到与RabbitMQ相关的知识,现在从0开始学起,下载->安装->配置->知识点->实践,一步一步把RabbitMQ吃透。 # 二.RabbitMQ安装与配置 ## 1.下载RabbitMQ | **类型** | **地址** | | ------------ | ------------------------------------------------------- | | RabbitMQ官网 | https://www.rabbitmq.com/docs/install-windows#installer | ![](images\101.download-rabbitmq.png) ## 2.查看对应Erlan的版本 RabbitMQ与Erlang的版本对应关系: | **类型** | **地址** | | ------------ | ------------------------------------------ | | RabbitMQ官网 | https://www.rabbitmq.com/docs/which-erlang | ![](images\102.version-erlang-rabbitmq.png) ## 3.下载Erlang | **类型** | **地址** | | ---------- | -------------------------------- | | Erlang官网 | https://www.erlang.org/downloads | 在这里点击Windows下载,获得下载包(如果下载rabbitmq-server-3.13,那么erlang25并不支持)。 ![](images\103.download-erlang.png) ## 4.安装Erlang 默认选项,点击“Next”。 ![](images\104-install-erlang.png) 修改安装Erlang目标文件夹,点击“Next”。 ![](images\105-install-erlang.png) 安装完成,点击“Close”。 ![](images\106-install-erlang.png) ## 5.配置Erlang系统环境变量(可选) | **环境变量名** | **操作** | **环境变量值** | |----|----|----| | ERLANG_HOME | 添加 | 其值为Erlang的安装目录(例如,E:\Program Files\Erlang OTP) | | Path | 修改 | %ERLANG_HOME%\bin | 配置系统环境变量“ERLANG_HOME”: ![](images\107-install-erlang.png) 配置系统环境变量“Path”: ![](images\108-erlang-set.png) ![](images\109-erlang-set.png) 配置好后cmd输入:erl可查看erlang版本,有版本号显示则成功安装。 ![](images\110.erlang-version.png) ## 6.安装RabbitMQ 默认选项,点击“Next”。 ![](images\111.rabbitmq-install.png) 修改安装RabbitMQ目标文件夹,点击“Next”。 ![](images\112.rabbitmq-install.png) 继续点击“Next”。 ![](images\113.rabbitmq-install.png) 默认勾选启动RabbitMQ服务,点击“Finish”。 ![](images\114.rabbitmq-install.png) ## 7.配置RabbitMQ系统环境变量(可选但推荐) | **环境变量名** | **操作** | **环境变量值** | |----|----|----| | RABBITMQ_SERVER | 添加 | 其值为RabbitMQ的安装目录(例如E:\Program Files\RabbitMQ Server\rabbitmq_server-3.13.6) | | Path | 修改 | %RABBITMQ_SERVER%\sbin | 配置系统环境变量“RABBITMQ_SERVER”: ![](images\115.rabbitmq-set.png) 配置系统环境变量“Path”: ![](images\116.rabbitmq-set.png) ## 8.启动RabbitMQ服务 当Windows服务显示RabbitMQ正在运行,则表示你的RabbitMQ已经安装成功了。如果没有启动的话,这里需要右键后点击启动。 ![](images\117.rabbitmq-start.png) ## 9.处理“启动RabbitMq线程意外终止” 启动rabbit MQ的服务的时候,提示“错误1067:进程意外停止。” ![](images\118.rabbitmq-error.png) 应该是端口号“15672”被占用了,先执行“netstat -ano \| findstr 15672” | **操作** | **命令** | **示例** | |----|----|----| | 查看所有端口占用情况 | netstat -ano | | | 查看特定端口占用情况 | netstat -ano \| findstr 端口号 | netstat -ano \| findstr 15672 | | ![](images\119.rabbitmq-port.png) | | | | 以上返回的列从左到右依次是“协议、本地地址、外部地址、状态、PID”,可以看出被占用的端口号是15672,该端口对应的 PID为48980。 | | | | 终止进程,释放端口 | taskkill /PID "进程PID" | taskkill /PID "48980" | | | taskkill /t /f /im 进程PID | taskkill /t /f /im 48980 | | 找到占用端口的程序 | tasklist \| findstr 进程PID | tasklist \| findstr 48980 | | ![](images\120.rabbitmq-port.png) | | | ## 10.启用RabbitMQ管理插件(可选但推荐) | **类型** | 地址 | | ------------ | ---------------------------------------- | | RabbitMQ官网 | https://www.rabbitmq.com/docs/management | 打开命令提示符,进入RabbitMQ的sbin目录或者通过菜单栏单击RabbitMQ Command Prompt (sbin dir)进入。 ![](images\121.rabbitmq-cmd.png) 输入命令rabbitmq-plugins enable rabbitmq_management来启用管理插件。这将允许你通过Web界面来管理RabbitMQ。 ![](images\122.rabbitmq-cmd.png) 成功安装输出如下: ![](images\123.rabbitmq-cmd.png) (报错分享解决)如果输出以下内容,那么这篇文章可以帮你解决(https://baijiahao.baidu.com/s?id=1720472084636520996&wfr=spider&for=pc) ![](images\124.rabbitmq-cmd.png) ## 11.验证RabbitMQ安装 打开浏览器,访问http://localhost:15672/(默认的用户名和密码都是guest,但请注意guest用户只能从localhost访问)。 | **地址** | **用户名** | **密码** | **服务端的端口号** | | ----------------------- | ---------- | -------- | ------------------ | | http://localhost:15672/ | guest | guest | 5672 | ![](images\125.rabbitmq-web.png) 如果RabbitMQ安装成功且管理插件已启用,你将看到RabbitMQ的管理界面。 ![](images\126.rabbitmq-web.png) | **导航栏** | **说明** | |----|----| | Overview | 概述界面 | | Connection | 连接页面 | | Channels | 链接中的信道信息:Channel是在连接中存在的,一个Connection中可以有多个Channel。 | | Exchange | 交换机界面,Exchange作为消费的生产者和消息队列的一个中介,其将Producer生产的消息进行分发给消息队列。在没有使用交换机的简单模式中,实际上则使用的是默认的交换机(AMQP-Default) | | Queues | 消息队列界面,消息队列,在控制台页面中我们在queues页签中我们可以看到服务器端当前的消息队列信息。在java代码中我们需要先对消息队列进行声明后才可以使用。点击某一队列,在队列中我们也可以查看当前队列的消息信息。 | | Admin | 用户管理界面,在Admin的user下我们可以查询当前的MQ用户信息,以及新增用户信息并设置权限。 | ## 12.配置远程访问(可选,但推荐)   ### 12.1.创建用户 由于guest这个用户,只能在本地访问,所以我们要新增一个用户admin,选择超级管理员权限: ![](images\127.rabbitmq-web.png) 五中不同角色配置: | **角色** | **英文** | **说明** | |----|----|----| | 普通管理者 | management | 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 | | 策略制定者 | policymaker | 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息。 | | 监控者 | monitoring | 登录管理控制台 查看所有的信息(Rabbit的相关节点的信息,内存使用信息,磁盘的情况) | | 超级管理员 | administrator | 登录管理控制台查看所有的信息,对所有用户的策略进行操作 | | 其他 | 无法登陆管理控制台,通常就是普通的生产者和消费者(仅仅接收信息或者发送消息)。 | | ### 12.2.用户授权 此时这个账户是没有权限的,需要授权,点击“admin”: ![](images\128.rabbitmq-web.png) 点击“Set permission”和“Set topic permission”。 ![](images\129.rabbitmq-web.png) ## 13.Virtual Hosts配置 每个Vhost(Virtual Hosts)相当于一个相对独立的RabbitMQ服务器。Vhost用作逻辑隔离,分别管理Exchange、Queue和Binding,使得应用安全地运行在不同的Vhost上,相互之间不会干扰。一个实例下可以有多个Vhost,一个Vhost里可以有若干个Exchange和Queue。生产者和消费者连接RabbitMQ实例时,需要指定一个Vhost。 | **类型** | **地址** | | -------- | ------------------------------------ | | 官网 | https://www.rabbitmq.com/vhosts.html | ### 13.1.创建Virtual Hosts 在顶部导航栏选择“Admin”,进入Admin页面,在右侧导航栏选择“Virtual Hosts”,进入Virtual Hosts页面。 ![](images\130.rabbitmq-web.png) 在“Add a new virtual host”区域,输入Vhost名称,单击“Add virtual host”。 ![](images\131.rabbitmq-web.png) 创建成功后,在“All virtual hosts”区域,显示创建成功的Vhost。 ### 13.2.权限的分配 点击对应Virtual Hostsv 名字,进入配置页面: ![](images\132.rabbitmq-web.png) 该权限配置参数说明: | **参数** | **说明** | |----|----| | user | 用户名 | | configure | 一个正则表达式,用户对符合该正则表达式的所有资源拥有 configure 操作的权限 | | write | 一个正则表达式,用户对符合该正则表达式的所有资源拥有 write 操作的权限 | | read | 一个正则表达式,用户对符合该正则表达式的所有资源拥有 read 操作的权限 | # 三、交换机 RabbitMQ 支持多种交换机(Exchange)类型,每种类型都用于不同的消息路由和分发策略: ## 1.默认交换机(Default Exchange) 这是 RabbitMQ 默认实现的一种交换机,它不需要手动创建。当消息发布到默认交换机时,路由键会被解释为队列的名称,消息会被路由到与路由键名称相同的队列。默认交换机通常用于点对点通信,但不支持复杂的路由策略。 这些不同类型的交换机允许你在 RabbitMQ 中实现各种不同的消息路由和分发策略,以满足不同的应用需求。选择适当的交换机类型对于有效的消息传递非常重要。 ## 2.直连交换机(Direct Exchange) 这种交换机根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。只有当消息的路由键与队列绑定时指定的路由键完全相同时,消息才会被路由到队列。这是一种简单的路由策略,适用于点对点通信。 ![](images\201.png) 这种配置下,我们可以看到有两个队列Q1、Q2绑定到了直连交换机X上。第一个队列用的是橘色(orange)绑定键,第二个有两个绑定键,其中一个绑定键是黑色(black),另一个绑定键是绿色(green)。在此设置中,发布到交换机的带有橘色(orange)路由键的消息会被路由给队列Q1。带有黑色(black)或绿色(green)路由键的消息会被路由给Q2。其他的消息则会被丢弃。 ## 3.扇型交换机(Fanout Exchange) 这种交换机将消息广播到与之绑定的所有队列,无论消息的路由键是什么。用于发布/订阅模式,其中一个消息被广播给所有订阅者。 ![](images\202.png) 当一个Msg发送到扇形交换机X上时,则扇形交换机X会将消息分别发送给所有绑定到X上的消息队列。扇形交换机将消息路由给绑定到自身的所有消息队列,也就是说路由键在扇形交换机里没有作用,故消息队列绑定扇形交换机时,路由键可为空。这个模式类似于广播。 ## 4.头交换机(Headers Exchange) 这种交换机根据消息的标头信息(Headers)来决定消息的路由,而不是使用路由键。队列和交换机之间的绑定规则是根据标头键值对来定义的,只有当消息的标头与绑定规则完全匹配时,消息才会被路由到队列。适用于需要复杂消息匹配的场景。 在头交换机里有一个特别的参数”x-match”,当”x-match”的值为“any”时,只需要消息头的任意一个值匹配成功即可,当”x-match”值为“all”时,要求消息头的所有值都需相等才可匹配成功。 ## 5.主题交换机(Topic Exchange) 这种交换机根据消息的路由键与队列绑定时指定的路由键模式(通配符)匹配程度,将消息路由到一个或多个队列。路由键可以使用通配符符号 \*(**匹配一个单词**)和 \#(**匹配零个或多个单词**),允许更灵活的消息路由。用于发布/订阅模式和复杂的消息路由需求。 ![](images\203.png) 我们将会发送用来描述动物的多条消息。发送的消息包含带有三个单词(两个点号)的路由键(routing key)。路由键中第一个单词描述速度,第二个单词是颜色,第三个是品种: “\<速度\>.\<颜色\>.\<品种\>”。我们创建三个绑定:Q1通过".orange.“绑定键进行绑定,Q2使用”..rabbit" 和 “lazy.#”。 注意事项: 主题交换机非常强大,并且可以表现的跟其他交换机相似。 1. 当一个队列使用"#"(井号)绑定键进行绑定。它会表现的像扇形交换机一样,不理会路由键,接收所有消息。 2. 当绑定当中不包含任何一个 “\*” (星号) 和 “#” (井号)特殊字符的时候,主题交换机会表现的跟直连交换机一毛一样。   # 四、常用命令 | **操作** | **命令** | | -------------- | -------------------------------------------- | | 查看ErLang版本 | erl | | 启动RabbitMQ | net start RabbitMQ | | | rabbitmq-service start | | 停止RabbitMQ | net stop RabbitMQ | | | rabbitmq-service stop | | 重启命令 | net stop RabbitMQ && net start | | 健康检查 | rabbitmqctl status | | 启动监控管理器 | rabbitmq-plugins enable rabbitmq_management | | 关闭监控 | rabbitmq-plugins disable rabbitmq_management | | 帮助命令 | rabbitmqctl help | # 五、消息队列 MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的空器。多用于分布式系统之间的通信。使用MQ使得应用间解耦,提升容错性和可扩展性。 | **类型** | **地址** | |----|----| | 视频教程 | https://www.bilibili.com/video/BV1GU4y1w7Yq/?p=2&spm_id_from=pageDriver&vd_source=bad95ca6359389d05c64799ee99a25be | | 网上资料 | https://cloud.tencent.com/developer/article/2335397 | 消息队列应用场景包括: ## 1.异步处理 异步处理,就是将一些非核心的业务流程以异步并行的方式执行,从而减少请求响应时间,提高系统吞吐量。 ![](images\401.png) 以下单为例,用户下单后需要生成订单、赠送活动积分、赠送红包、发送下单成功通知等一系列业务处理。假设三个业务节点每个使用100毫秒钟,不考虑网络等其他开销,则串行方式的时间是400毫秒,并行的时间只需要200毫秒。这样就大大提高了系统的吞吐量。 ## 2.应用解耦 应用解耦,顾名思义就是解除应用系统之间的耦合依赖。通过消息队列,使得每个应用系统不必受其他系统影响,可以更独立自主。 以电商系统为例,用户下单后,订单系统需要通知积分系统。一般的做法是:订单系统直接调用积分系统的接口。这就使得应用系统间的耦合特别紧密。如果积分系统无法访问,则积分处理失败,从而导致订单失败。 ![](images\402.png) 加入消息队列之后,用户下单后,订单系统完成下单业务后,将消息写入消息队列,返回用户订单下单成功。积分系统通过订阅下单消息的方式获取下单通知消息,从而进行积分操作。实现订单系统与库存系统的应用解耦。如果,在下单时积分系统系统异常,也不会影响用户正常下单。 ## 3.流量削峰 流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。 以秒杀活动为例,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列,后台系统根据消息队列中的消息信息,进行秒杀业务处理。 ![](images\403.png) 如上图所示,服务器接收到用户的请求后,首先写入消息队列,后台系统根据消息队列中的请求信息,做后续业务处理。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。 ## 4.消息通讯 消息通讯是指应用间的数据通信。消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等点对点通讯。 ![](images\404.png) 以上实际是消息队列的两种消息模式,点对点或发布订阅模式。 # 六.RabbitMQ的5种消息队列 | **类型** | **地址** | |----|----| | 官网 | https://www.rabbitmq.com/tutorials | | 网上资料 | https://blog.csdn.net/qq_30614345/article/details/132327864 | ![](images\501.png) ![](images\502.png) ![](images\503.png) ## 1.简单模式(Hello World) ![](images\504.png) 一个生产者对应一个消费者,RabbitMQ 相当于一个消息代理,负责将 A 的消息转发给 B。 应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人。 ## 2.工作队列模式(Work queues) ![](images\505.png) 在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理。 应用场景:一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况。 ## 3.订阅模式(Publish/Subscribe) ![](images\506.png) 一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。 在订阅模型中,多了一个Exchange角色,而且过程略有变化: Exchange:交换机(X)。一方面接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或者将消息丢弃。如何操作,取决于Exchange的类型。 Exchange(交换机)只负责转发消息,不具备存储消息的能力,因为如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失! 应用场景:更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是: 1. 一个 fanout 类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列; 2. 一个缓存消息队列对应着多个缓存消费者; 3. 一个数据库消息队列对应着多个数据库消费者。   ## 4.路由模式(Routing) ![](images\507.png) 有选择地 (Routing key) 接收消息,发送消息到交换机并且要指定路由 key,消费者将队列绑定到交换机时需要指定路由 key,仅消费指定路由 key 的消息。 应用场景:如在商品库存中增加了 1 台 iphone12,iphone12 促销活动消费者指定 routing key 为 iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此 routing key 的消息。 ## 5.主题模式(Topics) ![](images\508.png) 根据主题 (Topics) 来接收消息,将路由 key 和某模式进行匹配,RoutingKey一般都是由一个或多个单词组成,多个单词之间以“.”分隔,此时队列需要绑定在一个模式上,#匹配一个词或多个词,\*只匹配一个词。 | **路由 key** | **item.insert** | **item.user** | **item.insert.user** | | ------------ | --------------- | ------------- | -------------------- | | item.\* | 匹配 | 匹配 | 不匹配 | | item.# | 匹配 | 匹配 | 匹配 | 应用场景:同上 iphone 促销活动可以接收主题为 iphone 的消息,如 iphone12、iphone13 等。 ## 6.远程过程调用(RPC) ![](images\509.png) 如果我们需要在远程计算机上运行功能并等待结果就可以使用 RPC,具体流程可以看图。 应用场景:需要等待接口返回数据,如订单支付。 ## 7.发布者确认(Publisher Confirms) 与发布者进行可靠的发布确认,发布者确认是 RabbitMQ 扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ 将异步确认发送者发布的消息,这意味着它们已在服务器端处理。 应用场景:对于消息可靠性要求较高,比如钱包扣款。 # 七、RabbitMQ实战 | **源代码链接** | **提取码** | | ----------------------------------------------- | ---------- | | https://pan.baidu.com/s/1xSyh-Rzs7FCEQjzNeeNNwg | w04c | ## 1.简单模式(Hello World)   ### 1.1.生产者 源代码【CodeMan.Rabbitmq.Producer】-\>【HelloProducer】: ``` public static void HelloWorldShow() { //获取TCP 长连接 using (var connection = RabbitUtils.GetConnection().CreateConnection()) { //创建通信“通道”,相当于TCP中的虚拟连接 using (var channel = connection.CreateModel()) { /* * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 * 第一个参数:队列名称ID * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用 * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列 * 其他额外参数为null */ channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null); Console.ForegroundColor = ConsoleColor.Red; string message = "hello CodeMan 666";//消息内容 var body = Encoding.UTF8.GetBytes(message);//发送消息的内容一写要转换为字节数组 /* 发布消息 * 第一个参数exchange:交换机,暂时用不到,在进行发布订阅时才会用到 * 第二个参数:路由key * 额外的设置属性 * 最后一个参数是要传递的消息字节数组 */ channel.BasicPublish("", RabbitConstant.QUEUE_HELLO_WORLD, null, body); Console.WriteLine("producer【{0}】发送消息:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), message); } } } ``` 运行生产者代码【dotnet run】: ![](images\601.png) ### 1.2.消费者 源代码【CodeMan.Rabbitmq.Consumer01】-\>【HelloConsumer】: ``` public static void HelloWorldShow() { //获取TCP 长连接 using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { /* * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 * 第一个参数:队列名称ID * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用 * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列 * 其他额外参数为null */ channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null); Console.ForegroundColor = ConsoleColor.Cyan; EventingBasicConsumer consumers = new EventingBasicConsumer(channel);//创建事件的消费者,事件消费者要通过这个类才能接收到消息的内容 // 触发事件 consumers.Received += (model, ea) => { var body = ea.Body.ToArray();//接收到的消息 var message = Encoding.UTF8.GetString(body);//将消息转换为字符串 Console.WriteLine("Consumer01【{0}】接收消息:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), message);//打印出消息 //false只是确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息 //假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。 channel.BasicAck(ea.DeliveryTag, false); }; /* * 从MQ服务器中获取数据 * 创建一个消息消费者 * 第一个参数:队列名 * 第二个参数:是否自动确认收到消息, true表示自动应答,false表示手动应答,这里要改成手动应答,这是MQ推荐的做法 * 第三个参数:要传入的IBasicConsumer接口 * */ channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false, consumers);//消息的消费者 Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } ``` 运行消费者代码【dotnet run】: ![](images\602.png) ## 2.工作队列模式(Work queues) ### 2.1.生产者 源代码【CodeMan.Rabbitmq.Producer】-\>【SmsSender】: ``` public static void Sender() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //声明并创建一个队列 channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); int count = 100; for (int i = 0; i < count; i++) { Sms sms = new Sms("乘客" + i, "139000000" + i, "您的车票已预定成功"); string jsonSms = JsonConvert.SerializeObject(sms); var body = Encoding.UTF8.GetBytes(jsonSms); //发布消息 channel.BasicPublish("", RabbitConstant.QUEUE_SMS, null, body); Console.WriteLine($"正在发送内容:{jsonSms}"); } Console.WriteLine("【{0}】已经成功发送{1}条数据", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), count); } } } ``` 运行生产者代码【dotnet run】: ![](images\603.png) ### 2.2.消费者1 源代码【CodeMan.Rabbitmq.Consumer01】-\>【SmsReceive】: ``` public static void Sender() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //声明并创建一个队列 channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); int count = 1; consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(30);//休眠30毫秒 Console.WriteLine("{0}.SmsSender01【{1}】-发送短信成功:{2}", count++, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), message); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } ``` 运行生产者代码【dotnet run】: ![](images\604.png) ### 2.3.消费者2 源代码【CodeMan.Rabbitmq.Consumer02】-\>【SmsReceive】: ``` public static void Sender() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //声明并创建一个队列 channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.BasicQos(0, 1, false);//处理完一个取一个 var consumer = new EventingBasicConsumer(channel); int count = 1; consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(60);//休眠60毫秒 Console.WriteLine("{0}.SmsSender02【{1}】-发送短信成功:{2}", count++, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), message); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } ``` 运行生产者代码【dotnet run】: ![](images\605.png) ## 3.订阅模式(Publish/Subscribe) ### 3.1.生产者 源代码【CodeMan.Rabbitmq.Producer】-\>【WeatherFanout】: ``` public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { int count = 5; for (var i = 0; i < count; i++) { string message = $"{i + 20}度"; var body = Encoding.UTF8.GetBytes(message); //发布消息 channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body); Console.WriteLine($"正在发送天气信息:{message}"); //Console.WriteLine("WeatherFanout【{0}】发送天气信息成功!", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")); } Console.WriteLine("【{0}】已经成功发送{1}条数据", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), count); } } } ``` 运行生产者代码【dotnet run】: ![](images\606.png) ### 3.2.消费者1 源代码【CodeMan.Rabbitmq.Consumer01】-\>【WeatherFanout】: ``` public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //声明并创建一个交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout); //声明并创建一个队列 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, ""); channel.BasicQos(0, 1, false);//处理完一个取一个 var consumer = new EventingBasicConsumer(channel); int count = 1; consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine("{0}.WeatherFanout01【{1}】百度收到的气象信息:{2}", count++, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), message); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } ``` 运行生产者代码【dotnet run】: ![](images\607.png) ### 3.3.消费者2 源代码【CodeMan.Rabbitmq.Consumer02】-\>【WeatherFanout】: ``` public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //声明并创建一个交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout); //声明队列信息 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, ""); channel.BasicQos(0, 1, false);//处理完一个取一个 var consumer = new EventingBasicConsumer(channel); int count = 1; consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine("{0}.WeatherFanout02【{1}】新浪收到的气象信息:{2}", count++, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), message); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } ``` 运行生产者代码【dotnet run】: ![](images\609.png) ## 4.路由模式(Routing) ### 4.1.生产者 源代码【CodeMan.Rabbitmq.Producer】-\>【WeatherDirect】: ``` public static void Weather() { Dictionary area = new Dictionary(); area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据"); area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据"); area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据"); area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据"); using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { foreach (var item in area) { //发布消息 channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key, null, Encoding.UTF8.GetBytes(item.Value)); } Console.WriteLine("气象信息发送成功!"); } } } ``` 运行生产者代码【dotnet run】: ![](images\610.png) ### 4.2.消费者1 源代码【CodeMan.Rabbitmq.Consumer01】-\>【WeatherDirect】: ``` public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //声明并创建一个交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); //声明并创建一个队列 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525"); channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525"); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } ``` 运行生产者代码【dotnet run】: ![](images\611.png) ### 4.3.消费者2 源代码【CodeMan.Rabbitmq.Consumer02】-\>【WeatherDirect】: ``` public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //声明并创建一个交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); //声明并创建一个队列 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key */ channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525"); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525"); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525"); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"新浪收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } ``` 运行生产者代码【dotnet run】: ![](images\612.png) ## 5.主题模式(Topics) ### 5.1.生产者 源代码【CodeMan.Rabbitmq.Producer】-\>【WeatherTopic】: ``` public static void Weather() { Dictionary area = new Dictionary(); area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据"); area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据"); area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据"); area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据"); using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { foreach (var item in area) { //发布消息 channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key, null, Encoding.UTF8.GetBytes(item.Value)); } Console.WriteLine("气象信息发送成功!"); } } } ``` 运行生产者代码【dotnet run】: ![](images\613.png) ### 5.2.消费者1 源代码【CodeMan.Rabbitmq.Consumer01】-\>【WeatherTopic】: ``` public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //声明并创建一个交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic); //声明并创建一个队列 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#"); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } ``` 运行生产者代码【dotnet run】: ![](images\614.png) ### 5.3.消费者2 源代码【CodeMan.Rabbitmq.Consumer02】-\>【WeatherTopic】: ``` public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { /* * 生产者发送消息 * 队列名称 * 交换机名称 * 路由key * */ //声明并创建一个交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic); //声明并创建一个队列 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525"); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"新浪收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } ``` 运行生产者代码【dotnet run】: ![](images\615.png) # 八、.Net Core中使用RabbitMQ   ## 1.订阅模式 ### 1.1.生产者 源代码【CodeMan.RabbitMq.HttpApi.Order】-\>【OrderController】: ``` [HttpGet("test")] public IActionResult Test(string message) { _orderService.SendTestMessage(message); return Ok(); } ``` 源代码【CodeMan.RabbitMq.Service】-\>【OrderService】: ``` public void SendTestMessage(string message) { Console.WriteLine($"send message:{message}"); _rabbitProducer.Publish(RabbitConstant.TEST_EXCHANGE, "", message); } ``` 运行生产者代码【dotnet run】: ![](images\701.png) 进入swagger页面【https://localhost:5001/swagger/index.html】,调用“/api/order/test”接口: ![](images\702.png) 发送消息日志: ![](images\703.png) ### 1.2.消费者的配置 源代码【CodeMan.RabbitMq.Test】-\>【ProcessTest】: ``` public ProcessTest(RabbitConnection connection) { _connection = connection; Queues.Add(new QueueInfo() { ExchangeType = ExchangeType.Fanout,//交换机类型 Exchange = RabbitConstant.TEST_EXCHANGE,//交换机名称 Queue = RabbitConstant.TEST_QUEUE,//队列名称 RoutingKey = "",//路由名称 OnReceived = this.Receive//具体的接收方法委托 }); } ``` ### 1.3.消费者接收消息 源代码【CodeMan.RabbitMq.Test】-\>【ProcessTest】: ``` public void Receive(RabbitMessageEntity message) { Console.WriteLine("message.BasicDeliver.DeliveryTag={0} message={1}", message.BasicDeliver.DeliveryTag, message.Content); //Thread.Sleep(5000);//休眠5秒 Console.WriteLine($"Test Receive Message:{message.Content}"); //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 ) //假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。 message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, false);//确认签收消息 } ``` 运行生产者代码【dotnet run】: ![](images\704.png) ## 2.延时队列 ### 2.1.延时队列场景 1. 订单在10分钟内未支付则自动取消。 2. 新创建的店铺,如果在10天内都没有上传过商口,则自动发送消息提醒。 3. 账单在一周内未支付,则自动结算。 4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。 5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。 6. 预定会议后,需要在规定的时间点前10分钟通知各个与会人员参加会议。 ### 2.2.生产者 生产者,可以这样定义延时队列: ![](images\705.png) 源代码【CodeMan.RabbitMq.HttpApi.Order】-\>【OrderController】: ``` [HttpGet] public IActionResult Order() { _orderService.SendOrderMessage(); return Ok(); } ``` 源代码【CodeMan.RabbitMq.Service】-\>【SendOrderMessage】,发送20秒的延时消息: ``` public void SendOrderMessage() { OrderInfo orderInfo = new OrderInfo(); orderInfo.GoodsCount = 1; orderInfo.GoodsId = 1; orderInfo.GoodsName = ".NET零基础架构师特训班"; orderInfo.Status = 0; orderInfo.UserId = 1; Account account = new Account(); account.Username = "Kevin"; account.Password = "123"; account.Email = "kevin@codeman.com"; account.Phone = "13999999999"; OrderMessage orderMessage = new OrderMessage(); orderMessage.Account = account; orderMessage.OrderInfo = orderInfo; Console.WriteLine("短信/邮件异步通知"); Console.WriteLine($"send message:{JsonConvert.SerializeObject(orderMessage)}"); // 支付服务 _rabbitProducer.Publish(RabbitConstant.DELAY_EXCHANGE, RabbitConstant.DELAY_ROUTING_KEY, new Dictionary() { { "x-delay", 1000 * 20 } }, orderMessage); } ``` 运行生产者代码【dotnet run】: ![](images\706.png) 进入swagger页面【https://localhost:5001/swagger/index.html】,调用“/api/order”接口: ![](images\707.png) 发送消息日志: ![](images\708.png) ### 2.3.消费者 消费者,可以这样定义延时队列,例如给每条消费设置60秒的延时时间: ![](images\709.png) ## 3.列信队列 ### 3.1.概念 1. 配置业务队列,绑定到业务交换机上 2. 为业务队列配置死信交换机和routing key 3. 为死信交换机配置死信队列 ### 3.2.死信队列生命周期 1. 业务消息被投入业务队列 2. 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nack或者reject操作 3. 被nack或reject的消息由RabbitMQ投递到死信交换机中 4. 死信交换机将消息投入相应的死信队列 5. 死信队列的消费者消费死信消息 ### 3.3.死信队列的配置 源代码【CodeMan.RabbitMq.Pay】-\>【ProcessPay】: ``` public ProcessPay(RabbitConnection connection, IPayService payService) { _connection = connection; _payService = payService; Queues.Add(new QueueInfo() { ExchangeType = ExchangeType.Direct, Exchange = RabbitConstant.DELAY_EXCHANGE, Queue = RabbitConstant.DELAY_QUEUE, RoutingKey = RabbitConstant.DELAY_ROUTING_KEY, props = new Dictionary() { {"x-dead-letter-exchange", RabbitConstant.DEAD_LETTER_EXCHANGE}, {"x-dead-letter-routing-key", RabbitConstant.DEAD_LETTER_ROUTING_KEY} }, OnReceived = this.Receive }); } ``` ### 3.4.死信队列接收消息 源代码【CodeMan.RabbitMq.Pay】-\>【ProcessPay】-【Receive】: ``` public void Receive(RabbitMessageEntity message) { Console.WriteLine($"Pay Receive Message:{message.Content}"); OrderMessage orderMessage = JsonConvert.DeserializeObject(message.Content); // 超时未支付 string many = ""; // 支付处理 Console.WriteLine("请输入:"); // 超时未支付进行处理 Task.Factory.StartNew(() => { many = Console.ReadLine(); Console.WriteLine($"many:{many}"); }).Wait(20 * 1000); if (string.Equals(many, "100")) { orderMessage.OrderInfo.Status = 1; _payService.UpdateOrderPayState(orderMessage.OrderInfo); Console.WriteLine("支付完成"); message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, false); } else { //重试几次依然失败 Console.WriteLine("等待一定时间内失效超时未支付的订单"); message.Consumer.Model.BasicNack(message.BasicDeliver.DeliveryTag, false, false); } } ``` 运行生产者代码【dotnet run】: ![](images\710.png) ### 3.5.超时未支付的配置 源代码【CodeMan.RabbitMq.Pay.Timeout】-\>【ProcessPayTimeout】: ``` public ProcessPayTimeout(RabbitConnection connection, IPayService payService) { _connection = connection; _payService = payService; Queues.Add(new QueueInfo() { ExchangeType = ExchangeType.Direct, Exchange = RabbitConstant.DEAD_LETTER_EXCHANGE, Queue = RabbitConstant.DEAD_LETTER_QUEUE, RoutingKey = RabbitConstant.DEAD_LETTER_ROUTING_KEY, OnReceived = this.Receive }); } ``` ### 3.6.超时未支付接收消息 源代码【CodeMan.RabbitMq.Pay.Timeout】-\>【ProcessPayTimeout】-\>【Receive】: ``` public void Receive(RabbitMessageEntity message) { Console.WriteLine($"Pay Timeout Receive Message:{message.Content}"); OrderMessage orderMessage = JsonConvert.DeserializeObject(message.Content); orderMessage.OrderInfo.Status = 2; Console.WriteLine("超时未支付"); _payService.UpdateOrderPayState(orderMessage.OrderInfo); message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, false); } ``` 运行生产者代码【dotnet run】: ![](images\711.png) # 九、RabbitMQ集群 ## 1.同步元数据 1. 队列元数据:队列名称和它的属性 2. 交换机元数据:交换机名称、类型和属性 3. 绑定元数据:一张简单的表格展示了如何将消息路由到队列 4. vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性 ## 2.镜像模式特点 1. 实现了高可用性。部分节点挂掉后,不会影响RabbitMQ的使用 2. 降低了系统性能。镜像队列数量过多,大量的消息同步会加大网络带宽开销 3. 适合对可用性要求较高的业务场景 ## 3.搭建RabbitMQ集群 ### 3.1.拉取镜像 | **步骤** | **命令** | | -------- | ------------------------------- | | 拉取镜像 | docker pull rabbitmq:management | ### 3.2.运行容器 | **容器名称** | **命令** | |----|----| | myrabbit1 | docker run -d --hostname rabbit1 --name myrabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' | | myrabbit2 | docker run -d --hostname rabbit2 --name myrabbit2 -p 5673:5672 --link myrabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:management | | myrabbit3 | docker run -d --hostname rabbit3 --name myrabbit3 -p 5674:5672 --link myrabbit1:rabbit1 --link myrabbit2:rabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:management | ### 3.3.加入节点到集群 #### 1).设置节点1 | **操作** | **命令** | | --------- | ------------------------------ | | 进入节点1 | docker exec -it myrabbit1 bash | | 停止 | rabbitmqctl stop_app | | 重置 | rabbitmqctl reset | | 启动 | rabbitmqctl start_app | | 退出 | exit | #### 2).设置节点2 | **操作** | **命令** | | ------------ | --------------------------------------------- | | 进入节点2 | docker exec -it myrabbit2 bash | | 停止 | rabbitmqctl stop_app | | 重置 | rabbitmqctl reset | | 加入到集群中 | rabbitmqctl join_cluster --ram rabbit@rabbit1 | | 启动 | rabbitmqctl start_app | | 退出 | exit | #### 3).设置节点3 | **操作** | **命令** | | ------------ | --------------------------------------------- | | 进入节点3 | docker exec -it myrabbit3 bash | | 停止 | rabbitmqctl stop_app | | 重置 | rabbitmqctl reset | | 加入到集群中 | rabbitmqctl join_cluster --ram rabbit@rabbit1 | | 启动 | rabbitmqctl start_app | | 退出 | exit | ### 3.4.添加策略前的首页和队列 首页: ![](images\801.png) 队列: ![](images\802.png) ### 3.5.添加策略 | **步骤** | **命令** | | --------- | ----------------------------------------------------- | | 进入节点1 | docker exec -it myrabbit1 bash | | 添加策略 | rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' | 只需要添加策略,就可以实现镜像模式。 ### 3.6.添加策略后的队列 ![](images\803.png) ## 4.集群配置 ``` "RabbitMQ": { "Hostname": "127.0.0.1", "Address": "127.0.0.1:5672,127.0.0.1:5673,127.0.0.1:5674", "Port": 5672, "Username": "admin", "Password": "123456", "VirtualHost": "my_vhost" } ``` ## 5.初始化连接工厂 源代码【CodeMan.RabbitMq.Base.RabbitMq】-\>【RabbitConnection】: ``` public IConnection GetConnection() { if (_connection == null) { if (string.IsNullOrEmpty(_config.Address)) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = _config.Hostname; factory.Port = _config.Port; factory.UserName = _config.Username; factory.Password = _config.Password; factory.VirtualHost = _config.VirtualHost; _connection = factory.CreateConnection(); } else { //集群 ConnectionFactory factory = new ConnectionFactory(); factory.UserName = _config.Username; factory.Password = _config.Password; factory.VirtualHost = _config.VirtualHost; var address = _config.Address; List endpoints = new List(); foreach (var endpoint in address.Split(",")) { endpoints.Add(new AmqpTcpEndpoint(endpoint.Split(":")[0], int.Parse(endpoint.Split(":")[1]))); } _connection = factory.CreateConnection(endpoints); } } return _connection; } ``` # 十、尾声 本人邮箱([732320850@qq.com](mailto:732320850@qq.com)),有需要可以@我互相交流一下技术和经验。