1 Star 0 Fork 0

JerryQi/netcore-practice

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

Basic Introduction

安装RabbitMQ

自从使用了Docker之后,发现Docker真的是比虚拟机好用啊, 因此我这里在Docker里面创建一个RabbitMQ容器.

这里注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面.

# 拉取镜像
docker pull rabbitmq:management
# 运行镜像
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

此时你在本机访问http://localhost:15672/即可访问后台管理界面, 这里的默认用户名和密码都是guest.

创建项目

使用VS创建一个名为RabbitMQ的控制台项目, 项目源码在Gitee.

然后就是使用Nuget安装RabbitMQ.Client.

生产者代码

        static void Producter(ConnectionFactory factory)
        {
            //创建连接
            using (IConnection conn = factory.CreateConnection())
            {
                Console.WriteLine("\nRabbitMQ Connection Succeed!");
                //创建通道
                using (IModel channel = conn.CreateModel())
                {
                    //声明一个队列
                    channel.QueueDeclare("firstChannel", false, false, false, null);

                    Console.WriteLine("\nPlease Enter Message or q to exit");

                    string input;
                    do
                    {
                        input = Console.ReadLine();
                        var sendBytes = Encoding.UTF8.GetBytes(input);
                        //发布消息
                        channel.BasicPublish("", "firstChannel", null, sendBytes);

                    } while (input.Trim().ToLower() != "q");
                }
            }
        }

消费者代码

        static void Consumer(ConnectionFactory factory)
        {
            //创建连接
            using (IConnection conn = factory.CreateConnection())
            {
                Console.WriteLine("\nRabbitMQ Connection Succeed!");
                //创建通道
                using (IModel channel = conn.CreateModel())
                {
                    //事件基本消费者
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    //接收到消息事件
                    consumer.Received += (ch, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"Got Message: {message}");
                        //确认该消息已被消费
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
                    //启动消费者 设置为手动应答消息
                    channel.BasicConsume("firstChannel", false, consumer);
                    Console.WriteLine("Consumer Started");
                    Console.ReadKey();
                }
            }
        }

启动函数

        static void Main(string[] args)
        {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "guest",//用户名
                Password = "guest",//密码
                HostName = "localhost"//rabbitmq ip
            };
            if (args.Length > 0 && args[0] == "p") Producter(factory);
            else Consumer(factory);
            //Console.WriteLine("Hello World!");
        }

启动的时候加一个参数p则是启动生产者, 否则启动的就是消费者, 启动多个实例之后可以发现, 是可以事实订阅到消息的.

总结

我这这边可以同时启动多生产者或者消费者, 每个生产者生产的消息只会被消费一次.

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

更多具体用法以及分发模型请参考这篇文章:https://www.cnblogs.com/stulzq/p/7551819.html

20191126 Migration Log:

  1. Delete RabbitMQ.sln
  2. Move project to root folder
  3. Add project to netcore-practice.sln
dotnet sln add ./RabbitMQ/RabbitMQ.csproj

Run Productor:

dotnet run --project ./RabbitMQ/RabbitMQ.csproj -- p

Run Consumer:

dotnet run --project ./RabbitMQ/RabbitMQ.csproj

Demo Introduction

Demo1

In Demo1, it seems the message delivery to queue, but it will never happen actually, message send to queue will always through the exchange, it will work like this: Simple

In simple case, the client create exchange automatically, so in this demo, we have not create a exchange, but if you to use the rabbitmq advanced feature, we must know the exchange, there is four patterns to use the exchange:

  • direct
  • fanout
  • topic
  • header

We will show the four patterns above to belows demo except header pattern, because of its' low efficiency.

Demo2

I use the Demo2 to show direct exchange usage. the exchange will full match the routekey and send it to specific queue which full match the routekey as well, it works like this. Simple

In this demo, we do not need to change the consumer, but only do a little change in demo1's productor like below:

public void Producter(IModel channel)
{
    //定义一个Direct类型交换机
    channel.ExchangeDeclare("firstExchange", ExchangeType.Direct, false, false, null);
    //声明一个队列
    channel.QueueDeclare("secondQueue", false, false, false, null);
    //将队列绑定到交换机
    channel.QueueBind("secondQueue", "firstExchange", "firstRouteKey", null);

    Console.WriteLine("\nPlease Enter Message or q to exit");

    string input;
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        //发布消息
        channel.BasicPublish("firstExchange", "firstRouteKey", null, sendBytes);

    } while (input.Trim().ToLower() != "q");
}

If you change the routekey:

channel.BasicPublish("firstExchange", "otherRouteKey", null, sendBytes);

Then the secondQueue will get no message, so the consumer also will get no message.

Demo3

This demo will show the usage of fanout exchange, which will exchange message to all queues which is bind to this exchange. Simple

In this demo, when productor send a message, all consumer will receive it at the same time.

Demo4

This demo shows topic exchange. Simple

Use bellow two snippet to use different routekey start two productor:

channel.BasicPublish("thirdExchange", "thirdRouteKey-0.anyword", null, sendBytes);
channel.BasicPublish("thirdExchange", "thirdRouteKey-1.anyword", null, sendBytes);

You can the the result as bellow: Simple

Reference: 基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)

Demo5

This demo is used to test consume and cancel

Demo6

This demo is based on demo4, use topic exchange to send message to a group

Demo7

This demo is used to test poll message.

Demo8

This demo is used to test autoDelete.

Referrence:

马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C#
1
https://gitee.com/jerryqi/netcore-practice.git
git@gitee.com:jerryqi/netcore-practice.git
jerryqi
netcore-practice
netcore-practice
master

搜索帮助