自从使用了Docker之后,发现Docker真的是比虚拟机好用啊, 因此我这里在Docker里面创建一个RabbitMQ容器.
这里注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面.
# 拉取镜像
docker pull rabbitmq:management
# 运行镜像
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
此时你在本机访问http://localhost:15672/即可访问后台管理界面, 这里的默认用户名和密码都是guest.
使用VS创建一个名为RabbitMQ的控制台项目, 项目源码在Gitee.
然后就是使用Nuget安装RabbitMQ.Client.
static void Producter(ConnectionFactory factory)
{
//创建连接
using (IConnection conn = factory.CreateConnection())
{
Console.WriteLine("\nRabbitMQ Connection Succeed!");
//创建通道
using (IModel channel = conn.CreateModel())
{
//声明一个队列
channel.QueueDeclare("firstChannel", false, false, false, null);
Console.WriteLine("\nPlease Enter Message or q to exit");
string input;
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
//发布消息
channel.BasicPublish("", "firstChannel", null, sendBytes);
} while (input.Trim().ToLower() != "q");
}
}
}
static void Consumer(ConnectionFactory factory)
{
//创建连接
using (IConnection conn = factory.CreateConnection())
{
Console.WriteLine("\nRabbitMQ Connection Succeed!");
//创建通道
using (IModel channel = conn.CreateModel())
{
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"Got Message: {message}");
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false);
};
//启动消费者 设置为手动应答消息
channel.BasicConsume("firstChannel", false, consumer);
Console.WriteLine("Consumer Started");
Console.ReadKey();
}
}
}
static void Main(string[] args)
{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
if (args.Length > 0 && args[0] == "p") Producter(factory);
else Consumer(factory);
//Console.WriteLine("Hello World!");
}
启动的时候加一个参数p则是启动生产者, 否则启动的就是消费者, 启动多个实例之后可以发现, 是可以事实订阅到消息的.
我这这边可以同时启动多生产者或者消费者, 每个生产者生产的消息只会被消费一次.
AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。
更多具体用法以及分发模型请参考这篇文章:https://www.cnblogs.com/stulzq/p/7551819.html
20191126 Migration Log:
dotnet sln add ./RabbitMQ/RabbitMQ.csproj
Run Productor:
dotnet run --project ./RabbitMQ/RabbitMQ.csproj -- p
Run Consumer:
dotnet run --project ./RabbitMQ/RabbitMQ.csproj
In Demo1, it seems the message delivery to queue, but it will never happen actually, message send to queue will always through the exchange, it will work like this:
In simple case, the client create exchange automatically, so in this demo, we have not create a exchange, but if you to use the rabbitmq advanced feature, we must know the exchange, there is four patterns to use the exchange:
We will show the four patterns above to belows demo except header pattern, because of its' low efficiency.
I use the Demo2 to show direct exchange usage. the exchange will full match the routekey and send it to specific queue which full match the routekey as well, it works like this.
In this demo, we do not need to change the consumer, but only do a little change in demo1's productor like below:
public void Producter(IModel channel)
{
//定义一个Direct类型交换机
channel.ExchangeDeclare("firstExchange", ExchangeType.Direct, false, false, null);
//声明一个队列
channel.QueueDeclare("secondQueue", false, false, false, null);
//将队列绑定到交换机
channel.QueueBind("secondQueue", "firstExchange", "firstRouteKey", null);
Console.WriteLine("\nPlease Enter Message or q to exit");
string input;
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
//发布消息
channel.BasicPublish("firstExchange", "firstRouteKey", null, sendBytes);
} while (input.Trim().ToLower() != "q");
}
If you change the routekey:
channel.BasicPublish("firstExchange", "otherRouteKey", null, sendBytes);
Then the secondQueue will get no message, so the consumer also will get no message.
This demo will show the usage of fanout exchange, which will exchange message to all queues which is bind to this exchange.
In this demo, when productor send a message, all consumer will receive it at the same time.
This demo shows topic exchange.
Use bellow two snippet to use different routekey start two productor:
channel.BasicPublish("thirdExchange", "thirdRouteKey-0.anyword", null, sendBytes);
channel.BasicPublish("thirdExchange", "thirdRouteKey-1.anyword", null, sendBytes);
You can the the result as bellow:
Reference: 基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)
This demo is used to test consume and cancel
This demo is based on demo4, use topic exchange to send message to a group
This demo is used to test poll message.
This demo is used to test autoDelete.
Referrence:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。